diff --git a/runhelper/__init__.py b/runhelper/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..54d3d2b4093435cfd1d071b6fec5d36b3f910bd2 --- /dev/null +++ b/runhelper/__init__.py @@ -0,0 +1,2 @@ +from pystencils.runhelper.db import Database +from pystencils.runhelper.parameterstudy import ParameterStudy \ No newline at end of file diff --git a/db.py b/runhelper/db.py similarity index 63% rename from db.py rename to runhelper/db.py index 0066f6cab75fef30749d16f3bf47f9dcc801eb9f..4ffea6f2267a7d469126e0318089ae3a69355806 100644 --- a/db.py +++ b/runhelper/db.py @@ -1,6 +1,7 @@ import time import socket from collections import OrderedDict + import blitzdb import pandas as pd from pystencils.cpu.cpujit import getCompilerConfig @@ -12,7 +13,7 @@ def removeConstantColumns(df): return remainingDf, constants -class Database: +class Database(object): class SimulationResult(blitzdb.Document): pass @@ -20,22 +21,29 @@ class Database: self.backend = blitzdb.FileBackend(file) self.backend.autocommit = True - def save(self, params, result, env={}): - env = env.copy() - env['timestamp'] = time.mktime(time.gmtime()) - env['hostname'] = socket.gethostname() - env['cpuCompilerConfig'] = getCompilerConfig() + @staticmethod + def getEnv(): + return { + 'timestamp': time.mktime(time.gmtime()), + 'hostname': socket.gethostname(), + 'cpuCompilerConfig': getCompilerConfig(), + } + + def save(self, params, result, env=None): documentDict = { 'params': params, 'result': result, - 'env': env, + 'env': env if env else self.getEnv(), } document = Database.SimulationResult(documentDict, backend=self.backend) document.save() self.backend.commit() - def filter(self, **kwargs): - self.backend.filter(Database.SimulationResult, **kwargs) + def filter(self, *args, **kwargs): + return self.backend.filter(Database.SimulationResult, *args, **kwargs) + + def alreadySimulated(self, parameters): + return len(self.filter({'params': parameters})) > 0 def toPandas(self, query): queryResult = self.backend.filter(self.SimulationResult, query) @@ -49,12 +57,6 @@ class Database: df = pd.DataFrame.from_records(records) - #df.set_index([df[a] for a in list(index)], drop=True, inplace=True) - #for ind in index: - # del df[ind] - - #df.set_index([getattr(df, n) for n in index], inplace=True) - #df.set_index(tuple(index), inplace=True) - #df.set_index([df[col] for col in index], inplace=True) return df + diff --git a/runhelper/parameterstudy.py b/runhelper/parameterstudy.py new file mode 100644 index 0000000000000000000000000000000000000000..ace2916636064d539cf4532d6db5ad0711c9f9d8 --- /dev/null +++ b/runhelper/parameterstudy.py @@ -0,0 +1,169 @@ +import json +import datetime +import os +import socket +from collections import namedtuple +from time import sleep +from pystencils.runhelper import Database + + +class ParameterStudy(object): + + Run = namedtuple("Run", ['parameterDict', 'weight']) + + def __init__(self, runFunction, listOfRuns=[], databaseFile='./db'): + self.listOfRuns = listOfRuns + self.runFunction = runFunction + self.db = Database(databaseFile) + + def addRun(self, parameterDict, weight=1): + self.listOfRuns.append(self.Run(parameterDict, weight)) + + def filterAlreadySimulated(self, allRuns): + return [r for r in allRuns if not self.db.alreadySimulated(r.parameterDict)] + + @staticmethod + def distributeRuns(allRuns, process, numProcesses): + sortedRuns = sorted(allRuns, key=lambda e: e.weight, reverse=True) + result = sortedRuns[process::numProcesses] + result.reverse() # start with faster scenarios + return result + + def runServer(self, ip="0.0.0.0", port=8342): + from http.server import BaseHTTPRequestHandler, HTTPServer + filteredRuns = self.filterAlreadySimulated(self.listOfRuns) + + if not filteredRuns: + print("No Scenarios to simulate") + return + + class ParameterStudyServer(BaseHTTPRequestHandler): + parameterStudy = self + allRuns = filteredRuns + runs = filteredRuns.copy() + currentlyRunning = {} + finishedRuns = [] + + def nextScenario(self, receivedJsonData): + clientName = receivedJsonData['clientName'] + if len(self.runs) > 0: + runStatus = "%d/%d" % (len(self.finishedRuns), len(self.allRuns)) + workStatus = "%d/%d" % (sum(r.weight for r in self.finishedRuns), + sum(r.weight for r in self.allRuns)) + formatArgs = { + 'remaining': len(self.runs), + 'time': datetime.datetime.now().strftime("%H:%M:%S"), + 'clientName': clientName, + 'runStatus': runStatus, + 'workStatus': workStatus, + } + + scenario = self.runs.pop(0) + print(" {time} {clientName} fetched scenario. Scenarios: {runStatus}, Work: {workStatus}" + .format(**formatArgs)) + self.currentlyRunning[clientName] = scenario + return {'status': 'ok', 'params': scenario.parameterDict} + else: + return {'status': 'finished'} + + def result(self, receivedJsonData): + clientName = receivedJsonData['clientName'] + self.finishedRuns.append(self.currentlyRunning[clientName]) + del self.currentlyRunning[clientName] + d = receivedJsonData + self.parameterStudy.db.save(d['params'], d['result'], d['env']) + return {} + + def do_POST(self): + mapping = {'/nextScenario': self.nextScenario, + '/result': self.result} + if self.path in mapping.keys(): + data = self.rfile.read(int(self.headers['Content-Length'])) + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + jsonData = json.loads(data.decode()) + response = mapping[self.path](jsonData) + self.wfile.write(json.dumps(response).encode()) + else: + self.send_response(400) + + def do_GET(self): + return self.do_POST() + + def log_message(self, format, *args): + return + + print("Listening to connections on {}:{}. Scenarios to simulate: {}".format(ip, port, len(filteredRuns))) + server = HTTPServer((ip, port), ParameterStudyServer) + while len(ParameterStudyServer.currentlyRunning) > 0 or len(ParameterStudyServer.runs) > 0: + server.handle_request() + server.handle_request() + + def runClient(self, clientName="{hostname}_{pid}", server='localhost', port=8342, parameterUpdate={}): + from urllib.request import urlopen, URLError + url = "http://{}:{}".format(server, port) + clientName = clientName.format(hostname=socket.gethostname(), pid=os.getpid()) + while True: + try: + httpResponse = urlopen(url + "/nextScenario", + data=json.dumps({'clientName': clientName}).encode()) + scenario = json.loads(httpResponse.read().decode()) + if scenario['status'] != 'ok': + break + scenario['params'].update(parameterUpdate) + result = self.runFunction(**scenario['params']) + + answer = {'params': scenario['params'], + 'result': result, + 'env': Database.getEnv(), + 'clientName': clientName} + urlopen(url + '/result', data=json.dumps(answer).encode()) + except URLError: + print("Cannot connect to server {} retrying in 5 seconds...".format(url)) + sleep(5) + + def run(self, process, numProcesses): + ownRuns = self.distributeRuns(self.listOfRuns, process, numProcesses) + for run in ownRuns: + result = self.runFunction(**run.parameterDict) + self.db.save(run.parameterDict, result) + + def runFromCommandLine(self, argv=None): + from argparse import ArgumentParser + + def server(a): + if a.database: + self.db = Database(a.database) + self.runServer(a.host, a.port) + + def client(a): + print(a.parameterOverride) + self.runClient(a.clientName, a.host, a.port, json.loads(a.parameterOverride)) + + parser = ArgumentParser() + subparsers = parser.add_subparsers() + serverParser = subparsers.add_parser('server', aliases=['serv', 's'], + help="Runs server to distribute different scenarios to workers",) + serverParser.add_argument("-p", "--port", type=int, default=8342, help="Port to listen on") + serverParser.add_argument("-H", "--host", type=str, default="0.0.0.0", help="IP/Hostname to listen on") + serverParser.add_argument("-d", "--database", type=str, default="") + serverParser.set_defaults(func=server) + + clientParser = subparsers.add_parser('client', aliases=['c'], + help="Runs a worker client connection to scenario distribution server") + clientParser.add_argument("-p", "--port", type=int, default=8342, help="Port to connect to") + clientParser.add_argument("-H", "--host", type=str, default="localhost", help="Host or IP to connect to") + clientParser.add_argument("-n", "--clientName", type=str, default="{hostname}_{pid}", + help="Unique client name, you can use {hostname} and {pid} as placeholder") + clientParser.add_argument("-P", "--parameterOverride", type=str, default="{}", + help="JSON: the parameter dictionary is updated with these parameters. Use this to " + "set host specific options like GPU call parameters. Enclose in \" ") + clientParser.set_defaults(func=client) + + args = parser.parse_args(argv) + if not len(vars(args)): + parser.print_help() + else: + args.func(args) +