Skip to content
Snippets Groups Projects
Commit 9a9a629b authored by Martin Bauer's avatar Martin Bauer
Browse files

New ParameterStudy run helper (server & worker clients)

- helps when simulating a large amount of small scenarios
parent 330ffcf3
No related merge requests found
from pystencils.runhelper.db import Database
from pystencils.runhelper.parameterstudy import ParameterStudy
\ No newline at end of file
import time import time
import socket import socket
from collections import OrderedDict from collections import OrderedDict
import blitzdb import blitzdb
import pandas as pd import pandas as pd
from pystencils.cpu.cpujit import getCompilerConfig from pystencils.cpu.cpujit import getCompilerConfig
...@@ -12,7 +13,7 @@ def removeConstantColumns(df): ...@@ -12,7 +13,7 @@ def removeConstantColumns(df):
return remainingDf, constants return remainingDf, constants
class Database: class Database(object):
class SimulationResult(blitzdb.Document): class SimulationResult(blitzdb.Document):
pass pass
...@@ -20,22 +21,29 @@ class Database: ...@@ -20,22 +21,29 @@ class Database:
self.backend = blitzdb.FileBackend(file) self.backend = blitzdb.FileBackend(file)
self.backend.autocommit = True self.backend.autocommit = True
def save(self, params, result, env={}): @staticmethod
env = env.copy() def getEnv():
env['timestamp'] = time.mktime(time.gmtime()) return {
env['hostname'] = socket.gethostname() 'timestamp': time.mktime(time.gmtime()),
env['cpuCompilerConfig'] = getCompilerConfig() 'hostname': socket.gethostname(),
'cpuCompilerConfig': getCompilerConfig(),
}
def save(self, params, result, env=None):
documentDict = { documentDict = {
'params': params, 'params': params,
'result': result, 'result': result,
'env': env, 'env': env if env else self.getEnv(),
} }
document = Database.SimulationResult(documentDict, backend=self.backend) document = Database.SimulationResult(documentDict, backend=self.backend)
document.save() document.save()
self.backend.commit() self.backend.commit()
def filter(self, **kwargs): def filter(self, *args, **kwargs):
self.backend.filter(Database.SimulationResult, **kwargs) return self.backend.filter(Database.SimulationResult, *args, **kwargs)
def alreadySimulated(self, parameters):
return len(self.filter({'params': parameters})) > 0
def toPandas(self, query): def toPandas(self, query):
queryResult = self.backend.filter(self.SimulationResult, query) queryResult = self.backend.filter(self.SimulationResult, query)
...@@ -49,12 +57,6 @@ class Database: ...@@ -49,12 +57,6 @@ class Database:
df = pd.DataFrame.from_records(records) df = pd.DataFrame.from_records(records)
#df.set_index([df[a] for a in list(index)], drop=True, inplace=True)
#for ind in index:
# del df[ind]
#df.set_index([getattr(df, n) for n in index], inplace=True)
#df.set_index(tuple(index), inplace=True)
#df.set_index([df[col] for col in index], inplace=True)
return df return df
import json
import datetime
import os
import socket
from collections import namedtuple
from time import sleep
from pystencils.runhelper import Database
class ParameterStudy(object):
Run = namedtuple("Run", ['parameterDict', 'weight'])
def __init__(self, runFunction, listOfRuns=[], databaseFile='./db'):
self.listOfRuns = listOfRuns
self.runFunction = runFunction
self.db = Database(databaseFile)
def addRun(self, parameterDict, weight=1):
self.listOfRuns.append(self.Run(parameterDict, weight))
def filterAlreadySimulated(self, allRuns):
return [r for r in allRuns if not self.db.alreadySimulated(r.parameterDict)]
@staticmethod
def distributeRuns(allRuns, process, numProcesses):
sortedRuns = sorted(allRuns, key=lambda e: e.weight, reverse=True)
result = sortedRuns[process::numProcesses]
result.reverse() # start with faster scenarios
return result
def runServer(self, ip="0.0.0.0", port=8342):
from http.server import BaseHTTPRequestHandler, HTTPServer
filteredRuns = self.filterAlreadySimulated(self.listOfRuns)
if not filteredRuns:
print("No Scenarios to simulate")
return
class ParameterStudyServer(BaseHTTPRequestHandler):
parameterStudy = self
allRuns = filteredRuns
runs = filteredRuns.copy()
currentlyRunning = {}
finishedRuns = []
def nextScenario(self, receivedJsonData):
clientName = receivedJsonData['clientName']
if len(self.runs) > 0:
runStatus = "%d/%d" % (len(self.finishedRuns), len(self.allRuns))
workStatus = "%d/%d" % (sum(r.weight for r in self.finishedRuns),
sum(r.weight for r in self.allRuns))
formatArgs = {
'remaining': len(self.runs),
'time': datetime.datetime.now().strftime("%H:%M:%S"),
'clientName': clientName,
'runStatus': runStatus,
'workStatus': workStatus,
}
scenario = self.runs.pop(0)
print(" {time} {clientName} fetched scenario. Scenarios: {runStatus}, Work: {workStatus}"
.format(**formatArgs))
self.currentlyRunning[clientName] = scenario
return {'status': 'ok', 'params': scenario.parameterDict}
else:
return {'status': 'finished'}
def result(self, receivedJsonData):
clientName = receivedJsonData['clientName']
self.finishedRuns.append(self.currentlyRunning[clientName])
del self.currentlyRunning[clientName]
d = receivedJsonData
self.parameterStudy.db.save(d['params'], d['result'], d['env'])
return {}
def do_POST(self):
mapping = {'/nextScenario': self.nextScenario,
'/result': self.result}
if self.path in mapping.keys():
data = self.rfile.read(int(self.headers['Content-Length']))
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
jsonData = json.loads(data.decode())
response = mapping[self.path](jsonData)
self.wfile.write(json.dumps(response).encode())
else:
self.send_response(400)
def do_GET(self):
return self.do_POST()
def log_message(self, format, *args):
return
print("Listening to connections on {}:{}. Scenarios to simulate: {}".format(ip, port, len(filteredRuns)))
server = HTTPServer((ip, port), ParameterStudyServer)
while len(ParameterStudyServer.currentlyRunning) > 0 or len(ParameterStudyServer.runs) > 0:
server.handle_request()
server.handle_request()
def runClient(self, clientName="{hostname}_{pid}", server='localhost', port=8342, parameterUpdate={}):
from urllib.request import urlopen, URLError
url = "http://{}:{}".format(server, port)
clientName = clientName.format(hostname=socket.gethostname(), pid=os.getpid())
while True:
try:
httpResponse = urlopen(url + "/nextScenario",
data=json.dumps({'clientName': clientName}).encode())
scenario = json.loads(httpResponse.read().decode())
if scenario['status'] != 'ok':
break
scenario['params'].update(parameterUpdate)
result = self.runFunction(**scenario['params'])
answer = {'params': scenario['params'],
'result': result,
'env': Database.getEnv(),
'clientName': clientName}
urlopen(url + '/result', data=json.dumps(answer).encode())
except URLError:
print("Cannot connect to server {} retrying in 5 seconds...".format(url))
sleep(5)
def run(self, process, numProcesses):
ownRuns = self.distributeRuns(self.listOfRuns, process, numProcesses)
for run in ownRuns:
result = self.runFunction(**run.parameterDict)
self.db.save(run.parameterDict, result)
def runFromCommandLine(self, argv=None):
from argparse import ArgumentParser
def server(a):
if a.database:
self.db = Database(a.database)
self.runServer(a.host, a.port)
def client(a):
print(a.parameterOverride)
self.runClient(a.clientName, a.host, a.port, json.loads(a.parameterOverride))
parser = ArgumentParser()
subparsers = parser.add_subparsers()
serverParser = subparsers.add_parser('server', aliases=['serv', 's'],
help="Runs server to distribute different scenarios to workers",)
serverParser.add_argument("-p", "--port", type=int, default=8342, help="Port to listen on")
serverParser.add_argument("-H", "--host", type=str, default="0.0.0.0", help="IP/Hostname to listen on")
serverParser.add_argument("-d", "--database", type=str, default="")
serverParser.set_defaults(func=server)
clientParser = subparsers.add_parser('client', aliases=['c'],
help="Runs a worker client connection to scenario distribution server")
clientParser.add_argument("-p", "--port", type=int, default=8342, help="Port to connect to")
clientParser.add_argument("-H", "--host", type=str, default="localhost", help="Host or IP to connect to")
clientParser.add_argument("-n", "--clientName", type=str, default="{hostname}_{pid}",
help="Unique client name, you can use {hostname} and {pid} as placeholder")
clientParser.add_argument("-P", "--parameterOverride", type=str, default="{}",
help="JSON: the parameter dictionary is updated with these parameters. Use this to "
"set host specific options like GPU call parameters. Enclose in \" ")
clientParser.set_defaults(func=client)
args = parser.parse_args(argv)
if not len(vars(args)):
parser.print_help()
else:
args.func(args)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment