From c43672d2ae38630cbff6061d88daa36f82e461c7 Mon Sep 17 00:00:00 2001 From: Martin Bauer <martin.bauer@fau.de> Date: Sat, 31 Mar 2018 01:18:21 +0200 Subject: [PATCH] Refactored pystencils runhelpers - added documentation - PEP8 renaming --- runhelper/__init__.py | 2 +- runhelper/db.py | 208 +++++++++++++++------- runhelper/parameterstudy.py | 342 +++++++++++++++++++++++------------- 3 files changed, 366 insertions(+), 186 deletions(-) diff --git a/runhelper/__init__.py b/runhelper/__init__.py index 54d3d2b40..9a854cea6 100644 --- a/runhelper/__init__.py +++ b/runhelper/__init__.py @@ -1,2 +1,2 @@ from pystencils.runhelper.db import Database -from pystencils.runhelper.parameterstudy import ParameterStudy \ No newline at end of file +from pystencils.runhelper.parameterstudy import ParameterStudy diff --git a/runhelper/db.py b/runhelper/db.py index 5ef5d7cfd..4f37ebf8e 100644 --- a/runhelper/db.py +++ b/runhelper/db.py @@ -1,103 +1,185 @@ import time import socket +from typing import Dict, Sequence, Iterator import blitzdb from pystencils.cpu.cpujit import getCompilerConfig -def removeConstantColumns(df): - import pandas as pd - remainingDf = df.loc[:, df.apply(pd.Series.nunique) > 1] - constants = df.loc[:, df.apply(pd.Series.nunique) <= 1].iloc[0] - return remainingDf, constants - - -def removeColumnsByPrefix(df, prefixes, inplace=False): - if not inplace: - df = df.copy() +class Database: + """NoSQL database for storing simulation results. + + Two backends are supported: + * `blitzdb`: simple file-based solution similar to sqlite for SQL databases, stores json files + no server setup required, but slow for larger collections + * `mongodb`: mongodb backend via `pymongo` + + A simulation result is stored as an object consisting of + * parameters: dict with simulation parameters + * results: dict with results + * environment: information about the machine, compiler configuration and time + + Args: + file: database identifier, for blitzdb pass a directory name here. Database folder is created if it doesn't + exist yet. For larger collections use mongodb. In this case pass a pymongo connection string + e.g. "mongo://server:9131" + + Example: + >>> from tempfile import TemporaryDirectory + >>> with TemporaryDirectory() as tmp_dir: + ... db = Database(tmp_dir) # create database in temporary folder + ... params = {'method': 'finite_diff', 'dx': 1.5} # some hypothetical simulation parameters + ... db.save(params, result={'error': 1e-6}) # save simulation parameters together with hypothetical results + ... assert db.was_already_simulated(params) # search for parameters in database + ... assert next(db.filter_params(params))['params'] == params # get data set, keys are 'params', 'results' + ... # and 'env' + ... # get a pandas object with all results matching a query + ... db.to_pandas({'dx': 1.5}, remove_prefix=True) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE + dx method error + pk + ... 1.5 finite_diff 0.000001 + """ - for columnName in df.columns: - for prefix in prefixes: - if columnName.startswith(prefix): - del df[columnName] - return df - - -def removePrefixInColumnName(df, inplace=False): - if not inplace: - df = df.copy() - - newColumnNames = [] - for columnName in df.columns: - if '.' in columnName: - newColumnNames.append(columnName[columnName.index('.') + 1:]) - else: - newColumnNames.append(columnName) - df.columns = newColumnNames - return df - - -class Database(object): class SimulationResult(blitzdb.Document): pass - def __init__(self, file): + def __init__(self, file: str) -> None: if file.startswith("mongo://"): from pymongo import MongoClient - dbName = file[len("mongo://"):] + db_name = file[len("mongo://"):] c = MongoClient() - self.backend = blitzdb.MongoBackend(c[dbName]) + self.backend = blitzdb.MongoBackend(c[db_name]) else: self.backend = blitzdb.FileBackend(file) self.backend.autocommit = True - @staticmethod - def getEnv(): - return { - 'timestamp': time.mktime(time.gmtime()), - 'hostname': socket.gethostname(), - 'cpuCompilerConfig': getCompilerConfig(), - } + def save(self, params: Dict, result: Dict, env: Dict=None, **kwargs) -> None: + """Stores a simulation result in the database. - def save(self, params, result, env=None, **kwargs): - documentDict = { + Args: + params: dict of simulation parameters + result: dict of simulation results + env: optional environment - if None a default environment with compiler configuration, machine info and time + is used + **kwargs: the final object is updated with the keyword arguments + + """ + document_dict = { 'params': params, 'result': result, - 'env': env if env else self.getEnv(), + 'env': env if env else self.get_environment(), } - documentDict.update(kwargs) - document = Database.SimulationResult(documentDict, backend=self.backend) + document_dict.update(kwargs) + document = Database.SimulationResult(document_dict, backend=self.backend) document.save() self.backend.commit() - def filter(self, *args, **kwargs): - return self.backend.filter(Database.SimulationResult, *args, **kwargs) + def filter_params(self, parameter_query: Dict, *args, **kwargs) -> Iterator[SimulationResult]: + """Query using simulation parameters. + + See blitzdb documentation for filter + + Args: + parameter_query: blitzdb filter dict using only simulation parameters + *args: arguments passed to blitzdb filter + **kwargs: arguments passed to blitzdb filter - def filterParams(self, query, *args, **kwargs): - query = {'params.' + k: v for k, v in query.items()} + Returns: + generator of SimulationResult, which is a dict-like object with keys 'params', 'result' and 'env' + """ + query = {'params.' + k: v for k, v in parameter_query.items()} return self.filter(query, *args, **kwargs) - def alreadySimulated(self, parameters): + def filter(self, *args, **kwargs): + """blitzdb filter on SimulationResult, not only simulation parameters. + + Can be used to filter for results or environment options. + The filter dictionary has to have prefixes "params." , "env." or "result." + """ + return self.backend.filter(Database.SimulationResult, *args, **kwargs) + + def was_already_simulated(self, parameters): + """Checks if there is at least one simulation result matching the passed parameters.""" return len(self.filter({'params': parameters})) > 0 # Columns with these prefixes are not included in pandas result pandasColumnsToIgnore = ['changedParams.', 'env.'] - def toPandas(self, parameterQuery, removePrefix=True, dropConstantColumns=False): - import pandas as pd + def to_pandas(self, parameter_query, remove_prefix=True, drop_constant_columns=False): + """Queries for simulations with given parameters and returns them in a pandas data frame. - queryResult = self.filterParams(parameterQuery) - if len(queryResult) == 0: - return + Args: + parameter_query: see filter method + remove_prefix: if True the name of the pandas columns are not prefixed with "params." or "results." + drop_constant_columns: if True, all columns are dropped that have the same value is all rows + + Returns: + pandas data frame + """ + from pandas.io.json import json_normalize - df = pd.io.json.json_normalize([e.attributes for e in queryResult]) + query_result = self.filter_params(parameter_query) + attributes = [e.attributes for e in query_result] + if not attributes: + return + df = json_normalize(attributes) df.set_index('pk', inplace=True) if self.pandasColumnsToIgnore: - removeColumnsByPrefix(df, self.pandasColumnsToIgnore, inplace=True) - if removePrefix: - removePrefixInColumnName(df, inplace=True) - if dropConstantColumns: - df, _ = removeConstantColumns(df) + remove_columns_by_prefix(df, self.pandasColumnsToIgnore, inplace=True) + if remove_prefix: + remove_prefix_in_column_name(df, inplace=True) + if drop_constant_columns: + df, _ = remove_constant_columns(df) return df + + @staticmethod + def get_environment(): + return { + 'timestamp': time.mktime(time.gmtime()), + 'hostname': socket.gethostname(), + 'cpuCompilerConfig': getCompilerConfig(), + } + + +# ----------------------------------------- Helper Functions ----------------------------------------------------------- + + +def remove_constant_columns(df): + """Removes all columns of a pandas data frame that have the same value in all rows.""" + import pandas as pd + remaining_df = df.loc[:, df.apply(pd.Series.nunique) > 1] + constants = df.loc[:, df.apply(pd.Series.nunique) <= 1].iloc[0] + return remaining_df, constants + + +def remove_columns_by_prefix(df, prefixes: Sequence[str], inplace: bool = False): + """Remove all columns from a pandas data frame whose name starts with one of the given prefixes.""" + if not inplace: + df = df.copy() + + for columnName in df.columns: + for prefix in prefixes: + if columnName.startswith(prefix): + del df[columnName] + return df + + +def remove_prefix_in_column_name(df, inplace: bool = False): + """Removes dotted prefixes from pandas column names. + + A column named 'result.finite_diff.dx' is renamed to 'finite_diff.dx', everything before the first dot is removed. + If the column name does not contain a dot, the column name is not changed. + """ + if not inplace: + df = df.copy() + + new_column_names = [] + for columnName in df.columns: + if '.' in columnName: + new_column_names.append(columnName[columnName.index('.') + 1:]) + else: + new_column_names.append(columnName) + df.columns = new_column_names + return df diff --git a/runhelper/parameterstudy.py b/runhelper/parameterstudy.py index f3599caf1..716332f56 100644 --- a/runhelper/parameterstudy.py +++ b/runhelper/parameterstudy.py @@ -6,212 +6,299 @@ import itertools from copy import deepcopy from collections import namedtuple from time import sleep +from typing import Dict, Callable, Sequence, Any, Tuple, Optional from pystencils.runhelper import Database from pystencils.utils import DotDict -class ParameterStudy(object): +ParameterDict = Dict[str, Any] +WeightFunction = Callable[[Dict], int] +FilterFunction = Callable[[ParameterDict], Optional[ParameterDict]] + + +class ParameterStudy: + """Manages and runs multiple configurations locally or distributed and stores results in NoSQL database. + + To run a parameter study, define a run function that takes all parameters as keyword arguments and returns the + results as a (possibly nested) dictionary. Then, define the parameter sets that this function should be run with. + + Examples: + >>> import tempfile + >>> + >>> def dummy_run_function(p1, p2, p3, p4): + ... print("Run called with", p1, p2, p3, p4) + ... return { 'result1': p1 * p2, 'result2': p3 + p4 } + >>> + >>> with tempfile.TemporaryDirectory() as tmp_dir: + ... ps = ParameterStudy(dummy_run_function, database_connector=tmp_dir) + ... ps.add_run({'p1': 5, 'p2': 42, 'p3': 'abc', 'p4': 'def'}) + ... ps.add_combinations( [('p1', [1, 2]), + ... ('p3', ['x', 'y'])], constant_parameters={'p2': 5, 'p4': 'z' }) + ... ps.run() + ... ps.run_from_command_line(argv=['local']) # alternative to run - exposes a command line interface if + ... # no argv is passed. Does not run anything here, because + ... # configuration already in database are skipped + Run called with 2 5 y z + Run called with 2 5 x z + Run called with 1 5 y z + Run called with 1 5 x z + Run called with 5 42 abc def + + Above example runs all parameter combinations locally and stores the returned result in the NoSQL database. + It is also possible to distribute the runs to multiple processes, by starting a server on one machine and multiple + executing runners on other machines. The server distributes configurations to the runners, collects their results + to stores the results in the database. + """ + Run = namedtuple("Run", ['parameterDict', 'weight']) - def __init__(self, runFunction, listOfRuns=[], databaseFile='./db'): - self.listOfRuns = listOfRuns - self.runFunction = runFunction - self.db = Database(databaseFile) - - def addRun(self, parameterDict, weight=1): - self.listOfRuns.append(self.Run(parameterDict, weight)) - - def addCombinations(self, degreesOfFreedom, constantParameters=None, filterFunction=None, weightFunction=None): - parameterNames = [e[0] for e in degreesOfFreedom] - parameterValues = [e[1] for e in degreesOfFreedom] - - defaultParamsDict = {} if constantParameters is None else constantParameters - for valueTuple in itertools.product(*parameterValues): - paramsDict = deepcopy(defaultParamsDict) - paramsDict.update({name: value for name, value in zip(parameterNames, valueTuple)}) - params = DotDict(paramsDict) - if filterFunction: - params = filterFunction(params) + def __init__(self, run_function: Callable[..., Dict], runs: Sequence = (), database_connector: str='./db') -> None: + self.runs = list(runs) + self.run_function = run_function + self.db = Database(database_connector) + + def add_run(self, parameter_dict: ParameterDict, weight: int = 1) -> None: + """Schedule a dictionary of parameters to run in this parameter study. + + Args: + parameter_dict: used as keyword arguments to the run function. + weight: weight of the run configuration which should be proportional to runtime of this case, + used for progress display and distribution to processes. + """ + self.runs.append(self.Run(parameter_dict, weight)) + + def add_combinations(self, degrees_of_freedom: Sequence[Tuple[str, Sequence[Any]]], + constant_parameters: Optional[ParameterDict] = None, + filter_function: Optional[FilterFunction] = None, + runtime_weight_function: Optional[WeightFunction] = None) -> None: + """Add all possible combinations of given parameters as runs. + + This is a convenience function to simulate all possible parameter combinations of a scenario. + Configurations can be filtered and weighted by passing filter- and weighting functions. + + Args: + degrees_of_freedom: defines for each parameter the possible values it can take on + constant_parameters: parameter dict, for parameters that should not be changed + filter_function: optional function that receives a parameter dict and returns the potentially modified dict + or None if this combination should not be added. + runtime_weight_function: function mapping a parameter dict to the runtime weight (see weight at add_runs) + + Examples: + degrees_of_freedom = [('p1', [1,2]), + ('p2', ['a', 'b'])] + is equivalent to calling add_run four times, with all possible parameter combinations. + """ + parameter_names = [e[0] for e in degrees_of_freedom] + parameter_values = [e[1] for e in degrees_of_freedom] + + default_params_dict = {} if constant_parameters is None else constant_parameters + for valueTuple in itertools.product(*parameter_values): + params_dict = deepcopy(default_params_dict) + params_dict.update({name: value for name, value in zip(parameter_names, valueTuple)}) + params = DotDict(params_dict) + if filter_function: + params = filter_function(params) if params is None: continue - weight = 1 if not weightFunction else weightFunction(params) - self.addRun(params, weight) - - def filterAlreadySimulated(self, allRuns): - return [r for r in allRuns if not self.db.alreadySimulated(r.parameterDict)] - - @staticmethod - def distributeRuns(allRuns, process, numProcesses): - sortedRuns = sorted(allRuns, key=lambda e: e.weight, reverse=True) - result = sortedRuns[process::numProcesses] - result.reverse() # start with faster scenarios - return result - - def runServer(self, ip="0.0.0.0", port=8342): + weight = 1 if not runtime_weight_function else runtime_weight_function(params) + self.add_run(params, weight) + + def run(self, process: int = 0, num_processes: int = 1, parameter_update: Optional[ParameterDict] = None) -> None: + """Runs all added configurations. + + Args: + process: configurations are split into num_processes chunks according to weights and only the + process'th chunk is run. To run all, use process=0 and num_processes=1 + num_processes: see above + parameter_update: Extend/override all configurations with this dictionary. + """ + parameter_update = {} if parameter_update is None else parameter_update + own_runs = self._distribute_runs(self.runs, process, num_processes) + for run in own_runs: + parameter_dict = run.parameterDict.copy() + parameter_dict.update(parameter_update) + result = self.run_function(**parameter_dict) + + self.db.save(run.parameterDict, result, None, changed_params=parameter_update) + + def run_scenarios_not_in_database(self, parameter_update: Optional[ParameterDict] = None) -> None: + """Same as run method, but runs only configuration for which no result is in the database yet.""" + parameter_update = {} if parameter_update is None else parameter_update + filtered_runs = self._filter_already_simulated(self.runs) + for run in filtered_runs: + parameter_dict = run.parameterDict.copy() + parameter_dict.update(parameter_update) + result = self.run_function(**parameter_dict) + + self.db.save(run.parameterDict, result, changed_params=parameter_update) + + def run_server(self, ip: str ="0.0.0.0", port: int = 8342): + """Runs server to supply runner clients with scenarios to simulate and collect results from them. + Skips scenarios that are already in the database.""" from http.server import BaseHTTPRequestHandler, HTTPServer - filteredRuns = self.filterAlreadySimulated(self.listOfRuns) + filtered_runs = self._filter_already_simulated(self.runs) - if not filteredRuns: + if not filtered_runs: print("No Scenarios to simulate") return class ParameterStudyServer(BaseHTTPRequestHandler): parameterStudy = self - allRuns = filteredRuns - runs = filteredRuns.copy() + allRuns = filtered_runs + runs = filtered_runs.copy() currentlyRunning = {} finishedRuns = [] - def nextScenario(self, receivedJsonData): - clientName = receivedJsonData['clientName'] + def next_scenario(self, received_json_data): + client_name = received_json_data['client_name'] if len(self.runs) > 0: - runStatus = "%d/%d" % (len(self.finishedRuns), len(self.allRuns)) - workStatus = "%d/%d" % (sum(r.weight for r in self.finishedRuns), - sum(r.weight for r in self.allRuns)) - formatArgs = { + run_status = "%d/%d" % (len(self.finishedRuns), len(self.allRuns)) + work_status = "%d/%d" % (sum(r.weight for r in self.finishedRuns), + sum(r.weight for r in self.allRuns)) + format_args = { 'remaining': len(self.runs), 'time': datetime.datetime.now().strftime("%H:%M:%S"), - 'clientName': clientName, - 'runStatus': runStatus, - 'workStatus': workStatus, + 'client_name': client_name, + 'run_status': run_status, + 'work_status': work_status, } scenario = self.runs.pop(0) - print(" {time} {clientName} fetched scenario. Scenarios: {runStatus}, Work: {workStatus}" - .format(**formatArgs)) - self.currentlyRunning[clientName] = scenario + print(" {time} {client_name} fetched scenario. Scenarios: {run_status}, Work: {work_status}" + .format(**format_args)) + self.currentlyRunning[client_name] = scenario return {'status': 'ok', 'params': scenario.parameterDict} else: return {'status': 'finished'} - def result(self, receivedJsonData): - clientName = receivedJsonData['clientName'] - run = self.currentlyRunning[clientName] + def result(self, received_json_data): + client_name = received_json_data['client_name'] + run = self.currentlyRunning[client_name] self.finishedRuns.append(run) - del self.currentlyRunning[clientName] - d = receivedJsonData + del self.currentlyRunning[client_name] + d = received_json_data - def hash_dict(d): + def hash_dict(dictionary): import hashlib - return hashlib.sha1(json.dumps(d, sort_keys=True).encode()).hexdigest() + return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest() assert hash_dict(d['params']) == hash_dict(run.parameterDict) - self.parameterStudy.db.save(run.parameterDict, d['result'], d['env'], changedParams=d['changedParams']) + self.parameterStudy.db.save(run.parameterDict, + result=d['result'], env=d['env'], changed_params=d['changed_params']) return {} - def do_POST(self): - mapping = {'/nextScenario': self.nextScenario, + # noinspection PyPep8Naming + def do_POST(self) -> None: + mapping = {'/next_scenario': self.next_scenario, '/result': self.result} if self.path in mapping.keys(): data = self.rfile.read(int(self.headers['Content-Length'])) self.send_response(200) self.send_header("Content-type", "application/json") self.end_headers() - jsonData = json.loads(data.decode()) - response = mapping[self.path](jsonData) + json_data = json.loads(data.decode()) + response = mapping[self.path](json_data) self.wfile.write(json.dumps(response).encode()) else: self.send_response(400) + # noinspection PyPep8Naming def do_GET(self): return self.do_POST() - def log_message(self, format, *args): + def log_message(self, fmt, *args): return - print("Listening to connections on {}:{}. Scenarios to simulate: {}".format(ip, port, len(filteredRuns))) + print("Listening to connections on {}:{}. Scenarios to simulate: {}".format(ip, port, len(filtered_runs))) server = HTTPServer((ip, port), ParameterStudyServer) while len(ParameterStudyServer.currentlyRunning) > 0 or len(ParameterStudyServer.runs) > 0: server.handle_request() server.handle_request() - def runClient(self, clientName="{hostname}_{pid}", server='localhost', port=8342, parameterUpdate={}): - from urllib.request import urlopen, URLError + def run_client(self, client_name: str="{hostname}_{pid}", server: str='localhost', port: int=8342, + parameter_update: Optional[ParameterDict] = None) -> None: + """Start runner client that retrieves configuration from server, runs it and reports results back to server. + + Args: + client_name: name of the client. Has to be unique for each client. + Placeholders {hostname} and {pid} can be used to generate unique name. + server: url to server + port: port as specified in run_server + parameter_update: Used to override/extend parameters received from the server. + Typical use cases is to set optimization or GPU parameters for some clients to make + some clients simulate on CPU, others on GPU + """ + from urllib.request import urlopen + from urllib.error import URLError + parameter_update = {} if parameter_update is None else parameter_update url = "http://{}:{}".format(server, port) - clientName = clientName.format(hostname=socket.gethostname(), pid=os.getpid()) + client_name = client_name.format(hostname=socket.gethostname(), pid=os.getpid()) while True: try: - httpResponse = urlopen(url + "/nextScenario", - data=json.dumps({'clientName': clientName}).encode()) - scenario = json.loads(httpResponse.read().decode()) + http_response = urlopen(url + "/next_scenario", + data=json.dumps({'client_name': client_name}).encode()) + scenario = json.loads(http_response.read().decode()) if scenario['status'] != 'ok': break - originalParams = scenario['params'].copy() - scenario['params'].update(parameterUpdate) - result = self.runFunction(**scenario['params']) + original_params = scenario['params'].copy() + scenario['params'].update(parameter_update) + result = self.run_function(**scenario['params']) - answer = {'params': originalParams, - 'changedParams': parameterUpdate, + answer = {'params': original_params, + 'changed_params': parameter_update, 'result': result, - 'env': Database.getEnv(), - 'clientName': clientName} + 'env': Database.get_environment(), + 'client_name': client_name} urlopen(url + '/result', data=json.dumps(answer).encode()) except URLError: print("Cannot connect to server {} retrying in 5 seconds...".format(url)) sleep(5) - def run(self, process, numProcesses, parameterUpdate={}): - ownRuns = self.distributeRuns(self.listOfRuns, process, numProcesses) - for run in ownRuns: - parameterDict = run.parameterDict.copy() - parameterDict.update(parameterUpdate) - result = self.runFunction(**parameterDict) - - self.db.save(run.parameterDict, result, None, changedParams=parameterUpdate) - - def runScenariosNotInDatabase(self, parameterUpdate={}): - filteredRuns = self.filterAlreadySimulated(self.listOfRuns) - for run in filteredRuns: - parameterDict = run.parameterDict.copy() - parameterDict.update(parameterUpdate) - result = self.runFunction(**parameterDict) - - self.db.save(run.parameterDict, result, None, changedParams=parameterUpdate) - - def runFromCommandLine(self, argv=None): + def run_from_command_line(self, argv: Optional[Sequence[str]] = None) -> None: + """Exposes interface to command line with possibility to run directly or distributed via server/client.""" from argparse import ArgumentParser def server(a): if a.database: self.db = Database(a.database) - self.runServer(a.host, a.port) + self.run_server(a.host, a.port) def client(a): - self.runClient(a.clientName, a.host, a.port, json.loads(a.parameterOverride)) + self.run_client(a.client_name, a.host, a.port, json.loads(a.parameterOverride)) def local(a): if a.database: self.db = Database(a.database) - self.runScenariosNotInDatabase(json.loads(a.parameterOverride)) + self.run_scenarios_not_in_database(json.loads(a.parameterOverride)) parser = ArgumentParser() subparsers = parser.add_subparsers() - localParser = subparsers.add_parser('local', aliases=['l'], - help="Run scenarios locally which are not yet in database",) - localParser.add_argument("-d", "--database", type=str, default="") - localParser.add_argument("-P", "--parameterOverride", type=str, default="{}", - help="JSON: the parameter dictionary is updated with these parameters. Use this to " - "set host specific options like GPU call parameters. Enclose in \" ") - localParser.set_defaults(func=local) - - serverParser = subparsers.add_parser('server', aliases=['serv', 's'], - help="Runs server to distribute different scenarios to workers",) - serverParser.add_argument("-p", "--port", type=int, default=8342, help="Port to listen on") - serverParser.add_argument("-H", "--host", type=str, default="0.0.0.0", help="IP/Hostname to listen on") - serverParser.add_argument("-d", "--database", type=str, default="") - serverParser.set_defaults(func=server) - - clientParser = subparsers.add_parser('client', aliases=['c'], - help="Runs a worker client connection to scenario distribution server") - clientParser.add_argument("-p", "--port", type=int, default=8342, help="Port to connect to") - clientParser.add_argument("-H", "--host", type=str, default="localhost", help="Host or IP to connect to") - clientParser.add_argument("-n", "--clientName", type=str, default="{hostname}_{pid}", - help="Unique client name, you can use {hostname} and {pid} as placeholder") - clientParser.add_argument("-P", "--parameterOverride", type=str, default="{}", + local_parser = subparsers.add_parser('local', aliases=['l'], + help="Run scenarios locally which are not yet in database", ) + local_parser.add_argument("-d", "--database", type=str, default="") + local_parser.add_argument("-P", "--parameterOverride", type=str, default="{}", help="JSON: the parameter dictionary is updated with these parameters. Use this to " "set host specific options like GPU call parameters. Enclose in \" ") - clientParser.set_defaults(func=client) + local_parser.set_defaults(func=local) + + server_parser = subparsers.add_parser('server', aliases=['s'], + help="Runs server to distribute different scenarios to workers", ) + server_parser.add_argument("-p", "--port", type=int, default=8342, help="Port to listen on") + server_parser.add_argument("-H", "--host", type=str, default="0.0.0.0", help="IP/Hostname to listen on") + server_parser.add_argument("-d", "--database", type=str, default="") + server_parser.set_defaults(func=server) + + client_parser = subparsers.add_parser('client', aliases=['c'], + help="Runs a worker client connection to scenario distribution server") + client_parser.add_argument("-p", "--port", type=int, default=8342, help="Port to connect to") + client_parser.add_argument("-H", "--host", type=str, default="localhost", help="Host or IP to connect to") + client_parser.add_argument("-n", "--client_name", type=str, default="{hostname}_{pid}", + help="Unique client name, you can use {hostname} and {pid} as placeholder") + client_parser.add_argument("-P", "--parameterOverride", type=str, default="{}", + help="JSON: the parameter dictionary is updated with these parameters. Use this to " + "set host specific options like GPU call parameters. Enclose in \" ") + client_parser.set_defaults(func=client) args = parser.parse_args(argv) if not len(vars(args)): @@ -219,3 +306,14 @@ class ParameterStudy(object): else: args.func(args) + def _filter_already_simulated(self, all_runs): + """Removes all runs from the given list, that are already in the database""" + return [r for r in all_runs if not self.db.was_already_simulated(r.parameterDict)] + + @staticmethod + def _distribute_runs(all_runs, process, num_processes): + """Partitions runs by their weights into num_processes chunks and returns the process's chunk.""" + sorted_runs = sorted(all_runs, key=lambda e: e.weight, reverse=True) + result = sorted_runs[process::num_processes] + result.reverse() # start with faster scenarios + return result -- GitLab