From 93ebe096373e3967dbcc0bea68577a73f82477ee Mon Sep 17 00:00:00 2001 From: Martin Bauer <martin.bauer@fau.de> Date: Thu, 14 Jun 2018 16:07:49 +0200 Subject: [PATCH] New Time Loop - makes small block scenarios faster - new time loop caches all kernel functions with their argument dict -> inner loop just calls C functions with cached kwargs --- boundaries/boundaryhandling.py | 17 +++++ cpu/cpujit.py | 1 - datahandling/parallel_datahandling.py | 72 ++++++++---------- datahandling/serial_datahandling.py | 6 ++ timeloop.py | 104 +++++++++++++++----------- 5 files changed, 114 insertions(+), 86 deletions(-) diff --git a/boundaries/boundaryhandling.py b/boundaries/boundaryhandling.py index b38d86656..0c72ab34b 100644 --- a/boundaries/boundaryhandling.py +++ b/boundaries/boundaryhandling.py @@ -211,6 +211,23 @@ class BoundaryHandling: self._boundary_object_to_boundary_info[b_obj].kernel(**kwargs) + def add_fixed_steps(self, fixed_loop, **kwargs): + if self._dirty: + self.prepare() + + for b in self._data_handling.iterate(gpu=self._target == 'gpu'): + for b_obj, idx_arr in b[self._index_array_name].boundary_object_to_index_list.items(): + arguments = kwargs.copy() + arguments[self._field_name] = b[self._field_name] + arguments['indexField'] = idx_arr + data_used_in_kernel = (p.field_name + for p in self._boundary_object_to_boundary_info[b_obj].kernel.parameters + if p.is_field_ptr_argument and p.field_name not in arguments) + arguments.update({name: b[name] for name in data_used_in_kernel if name not in arguments}) + + kernel = self._boundary_object_to_boundary_info[b_obj].kernel + fixed_loop.add_call(kernel, arguments) + def geometry_to_vtk(self, file_name='geometry', boundaries='all', ghost_layers=False): """ Writes a VTK field where each cell with the given boundary is marked with 1, other cells are 0 diff --git a/cpu/cpujit.py b/cpu/cpujit.py index 62ed51cda..c5fddc2c3 100644 --- a/cpu/cpujit.py +++ b/cpu/cpujit.py @@ -50,7 +50,6 @@ import platform import shutil import textwrap import numpy as np -import functools import subprocess from appdirs import user_config_dir, user_cache_dir from collections import OrderedDict diff --git a/datahandling/parallel_datahandling.py b/datahandling/parallel_datahandling.py index a5d88c6d9..0c85e4a77 100644 --- a/datahandling/parallel_datahandling.py +++ b/datahandling/parallel_datahandling.py @@ -1,11 +1,11 @@ import numpy as np +import warnings from pystencils import Field from pystencils.datahandling.datahandling_interface import DataHandling from pystencils.datahandling.blockiteration import sliced_block_iteration, block_iteration from pystencils.utils import DotDict # noinspection PyPep8Naming import waLBerla as wlb -import warnings class ParallelDataHandling(DataHandling): @@ -16,14 +16,15 @@ class ParallelDataHandling(DataHandling): """ Creates data handling based on walberla block storage - :param blocks: walberla block storage - :param default_ghost_layers: nr of ghost layers used if not specified in add() method - :param default_layout: layout used if no layout is given to add() method - :param dim: dimension of scenario, - walberla always uses three dimensions, so if dim=2 the extend of the - z coordinate of blocks has to be 1 - :param default_target: either 'cpu' or 'gpu' . If set to 'gpu' for each array also a GPU version is allocated - if not overwritten in add_array, and synchronization functions are for the GPU by default + Args: + blocks: walberla block storage + default_ghost_layers: nr of ghost layers used if not specified in add() method + default_layout: layout used if no layout is given to add() method + dim: dimension of scenario, + walberla always uses three dimensions, so if dim=2 the extend of the + z coordinate of blocks has to be 1 + default_target: either 'cpu' or 'gpu' . If set to 'gpu' for each array also a GPU version is allocated + if not overwritten in add_array, and synchronization functions are for the GPU by default """ super(ParallelDataHandling, self).__init__() assert dim in (2, 3) @@ -135,8 +136,6 @@ class ParallelDataHandling(DataHandling): self._field_name_to_cpu_data_name[name] = name if gpu: self._field_name_to_gpu_data_name[name] = self.GPU_DATA_PREFIX + name - - self._rebuild_data_cache() return self.fields[name] def has_data(self, name): @@ -156,15 +155,8 @@ class ParallelDataHandling(DataHandling): def swap(self, name1, name2, gpu=False): if gpu: - for d in self._data_cache_gpu: - d[name1], d[name2] = d[name2], d[name1] - name1 = self.GPU_DATA_PREFIX + name1 name2 = self.GPU_DATA_PREFIX + name2 - else: - for d in self._data_cache_cpu: - d[name1], d[name2] = d[name2], d[name1] - for block in self.blocks: block[name1].swapDataPointers(block[name2]) @@ -222,31 +214,31 @@ class ParallelDataHandling(DataHandling): arr = arr[:, :, 0] return arr - def _rebuild_data_cache(self): - self._data_cache_cpu = [] - self._data_cache_gpu = [] - - elements = [(self._data_cache_cpu, wlb.field.toArray, self._field_name_to_cpu_data_name)] - if self._field_name_to_gpu_data_name: - elements.append((self._data_cache_gpu, wlb.cuda.toGpuArray, self._field_name_to_gpu_data_name)) - - for cache, to_array, name_to_data_name in elements: - for block in self.blocks: - field_args = {} - for field_name, data_name in name_to_data_name.items(): - field = self.fields[field_name] - arr = to_array(block[data_name], withGhostLayers=[True, True, self.dim == 3]) - arr = self._normalize_arr_shape(arr, field.index_dimensions) - field_args[field_name] = arr - cache.append(field_args) - def run_kernel(self, kernel_function, **kwargs): + for arg_dict in self.get_kernel_kwargs(kernel_function, **kwargs): + kernel_function(**arg_dict) + + def get_kernel_kwargs(self, kernel_function, **kwargs): if kernel_function.ast.backend == 'gpucuda': - for d in self._data_cache_gpu: - kernel_function(**d, **kwargs) + name_map = self._field_name_to_gpu_data_name + to_array = wlb.cuda.toGpuArray else: - for d in self._data_cache_cpu: - kernel_function(**d, **kwargs) + name_map = self._field_name_to_cpu_data_name + to_array = wlb.field.toArray + data_used_in_kernel = [(name_map[p.field_name], self.fields[p.field_name]) + for p in kernel_function.parameters if + p.is_field_ptr_argument and p.field_name not in kwargs] + + result = [] + for block in self.blocks: + field_args = {} + for data_name, f in data_used_in_kernel: + arr = to_array(block[data_name], withGhostLayers=[True, True, self.dim == 3]) + arr = self._normalize_arr_shape(arr, f.index_dimensions) + field_args[f.name] = arr + field_args.update(kwargs) + result.append(field_args) + return result def to_cpu(self, name): if name in self._custom_data_transfer_functions: diff --git a/datahandling/serial_datahandling.py b/datahandling/serial_datahandling.py index c6026a81a..3bd2ff95c 100644 --- a/datahandling/serial_datahandling.py +++ b/datahandling/serial_datahandling.py @@ -211,6 +211,12 @@ class SerialDataHandling(DataHandling): arrays = self.gpu_arrays if kernel_function.ast.backend == 'gpucuda' else self.cpu_arrays kernel_function(**arrays, **kwargs) + def get_kernel_kwargs(self, kernel_function, **kwargs): + result = {} + result.update(self.gpu_arrays if kernel_function.ast.backend == 'gpucuda' else self.cpu_arrays) + result.update(kwargs) + return [result] + def to_cpu(self, name): if name in self._custom_data_transfer_functions: transfer_func = self._custom_data_transfer_functions[name][1] diff --git a/timeloop.py b/timeloop.py index 9a4a3c461..ec5b32e47 100644 --- a/timeloop.py +++ b/timeloop.py @@ -1,56 +1,81 @@ import time +from pystencils.integer_functions import modulo_ceil + class TimeLoop: - def __init__(self): - self._preRunFunctions = [] - self._postRunFunctions = [] - self._timeStepFunctions = [] - self._functionNames = [] + def __init__(self, steps=2): + self._call_data = [] + self._fixed_steps = steps + self._pre_run_functions = [] + self._post_run_functions = [] + self._single_step_functions = [] self.time_steps_run = 0 - def add_step(self, step_obj): - if hasattr(step_obj, 'pre_run'): - self.add_pre_run_function(step_obj.pre_run) - if hasattr(step_obj, 'post_run'): - self.add_post_run_function(step_obj.post_run) - self.add(step_obj.time_step, step_obj.name) - - def add(self, time_step_function, name=None): - if name is None: - name = str(time_step_function) - self._timeStepFunctions.append(time_step_function) - self._functionNames.append(name) - - def add_kernel(self, data_handling, kernel_func, name=None): - self.add(lambda: data_handling.run_kernel(kernel_func), name) + @property + def fixed_steps(self): + return self._fixed_steps def add_pre_run_function(self, f): - self._preRunFunctions.append(f) + self._pre_run_functions.append(f) def add_post_run_function(self, f): - self._postRunFunctions.append(f) + self._post_run_functions.append(f) + + def add_single_step_function(self, f): + self._single_step_functions.append(f) + + def add_call(self, functor, argument_list): + if hasattr(functor, 'kernel'): + functor = functor.kernel + if not isinstance(argument_list, list): + argument_list = [argument_list] + + for argument_dict in argument_list: + self._call_data.append((functor, argument_dict)) + + def pre_run(self): + for f in self._pre_run_functions: + f() + + def post_run(self): + for f in self._post_run_functions: + f() def run(self, time_steps=1): self.pre_run() - + fixed_steps = self._fixed_steps + call_data = self._call_data + main_iterations, rest_iterations = divmod(time_steps, fixed_steps) try: - for i in range(time_steps): - self.time_step() + for _ in range(main_iterations): + for func, kwargs in call_data: + func(**kwargs) + self.time_steps_run += fixed_steps + for _ in range(rest_iterations): + for func in self._single_step_functions: + func() + self.time_steps_run += 1 except KeyboardInterrupt: pass - self.post_run() def benchmark_run(self, time_steps=0, init_time_steps=0): + init_time_steps_rounded = modulo_ceil(init_time_steps, self._fixed_steps) + time_steps_rounded = modulo_ceil(time_steps, self._fixed_steps) + self.pre_run() - for i in range(init_time_steps): - self.time_step() + for i in range(init_time_steps_rounded // self._fixed_steps): + for func, kwargs in self._call_data: + func(**kwargs) + self.time_steps_run += init_time_steps_rounded start = time.perf_counter() - for i in range(time_steps): - self.time_step() + for i in range(time_steps_rounded // self._fixed_steps): + for func, kwargs in self._call_data: + func(**kwargs) end = time.perf_counter() + self.time_steps_run += time_steps_rounded self.post_run() time_for_one_iteration = (end - start) / time_steps @@ -61,10 +86,12 @@ class TimeLoop: self.pre_run() start = time.perf_counter() while time.perf_counter() < start + seconds: - self.time_step() - iterations += 1 + for func, kwargs in self._call_data: + func(**kwargs) + iterations += self._fixed_steps end = time.perf_counter() self.post_run() + self.time_steps_run += iterations return iterations, end - start def benchmark(self, time_for_benchmark=5, init_time_steps=2, number_of_time_steps_for_estimation='auto'): @@ -88,16 +115,3 @@ class TimeLoop: time_steps = int(time_for_benchmark / duration_of_time_step) time_steps = max(time_steps, 4) return self.benchmark_run(time_steps, init_time_steps) - - def pre_run(self): - for f in self._preRunFunctions: - f() - - def post_run(self): - for f in self._postRunFunctions: - f() - - def time_step(self): - for f in self._timeStepFunctions: - f() - self.time_steps_run += 1 -- GitLab