parallel_datahandling.py 16.4 KB
Newer Older
1
import os
2
import warnings
Martin Bauer's avatar
Martin Bauer committed
3
4
5
6
7
8

import numpy as np
# noinspection PyPep8Naming
import waLBerla as wlb

from pystencils.datahandling.blockiteration import block_iteration, sliced_block_iteration
9
from pystencils.datahandling.datahandling_interface import DataHandling
10
from pystencils.field import Field, FieldType
11
from pystencils.kernelparameters import FieldPointerSymbol
Martin Bauer's avatar
Martin Bauer committed
12
13
from pystencils.utils import DotDict

Martin Bauer's avatar
Martin Bauer committed
14

Martin Bauer's avatar
Martin Bauer committed
15
16
class ParallelDataHandling(DataHandling):
    GPU_DATA_PREFIX = "gpu_"
Martin Bauer's avatar
Martin Bauer committed
17
    VTK_COUNTER = 0
Martin Bauer's avatar
Martin Bauer committed
18

Martin Bauer's avatar
Martin Bauer committed
19
    def __init__(self, blocks, default_ghost_layers=1, default_layout='SoA', dim=3, default_target='cpu'):
Martin Bauer's avatar
Martin Bauer committed
20
        """
Martin Bauer's avatar
Martin Bauer committed
21
        Creates data handling based on walberla block storage
Martin Bauer's avatar
Martin Bauer committed
22

23
24
25
26
27
28
29
30
31
        Args:
            blocks: walberla block storage
            default_ghost_layers: nr of ghost layers used if not specified in add() method
            default_layout: layout used if no layout is given to add() method
            dim: dimension of scenario,
                 walberla always uses three dimensions, so if dim=2 the extend of the
                 z coordinate of blocks has to be 1
            default_target: either 'cpu' or 'gpu' . If set to 'gpu' for each array also a GPU version is allocated
                           if not overwritten in add_array, and synchronization functions are for the GPU by default
Martin Bauer's avatar
Martin Bauer committed
32
        """
33
        super(ParallelDataHandling, self).__init__()
Martin Bauer's avatar
Martin Bauer committed
34
35
        assert dim in (2, 3)
        self.blocks = blocks
Martin Bauer's avatar
Martin Bauer committed
36
37
        self.default_ghost_layers = default_ghost_layers
        self.default_layout = default_layout
Martin Bauer's avatar
Martin Bauer committed
38
        self._fields = DotDict()  # maps name to symbolic pystencils field
Martin Bauer's avatar
Martin Bauer committed
39
40
        self._field_name_to_cpu_data_name = {}
        self._field_name_to_gpu_data_name = {}
Martin Bauer's avatar
Martin Bauer committed
41
        self.data_names = set()
Martin Bauer's avatar
Martin Bauer committed
42
43
        self._dim = dim
        self._fieldInformation = {}
Martin Bauer's avatar
Martin Bauer committed
44
45
46
47
        self._cpu_gpu_pairs = []
        self._custom_data_transfer_functions = {}
        self._custom_data_names = []
        self._reduce_map = {
Martin Bauer's avatar
Martin Bauer committed
48
49
50
51
52
            'sum': wlb.mpi.SUM,
            'min': wlb.mpi.MIN,
            'max': wlb.mpi.MAX,
        }

Martin Bauer's avatar
Martin Bauer committed
53
54
        if self._dim == 2:
            assert self.blocks.getDomainCellBB().size[2] == 1
Martin Bauer's avatar
Martin Bauer committed
55
        self.default_target = default_target
Martin Bauer's avatar
Martin Bauer committed
56
57
58
59
60

    @property
    def dim(self):
        return self._dim

Martin Bauer's avatar
Martin Bauer committed
61
62
63
64
65
66
67
68
    @property
    def shape(self):
        return self.blocks.getDomainCellBB().size[:self.dim]

    @property
    def periodicity(self):
        return self.blocks.periodic[:self._dim]

Martin Bauer's avatar
Martin Bauer committed
69
70
71
72
    @property
    def fields(self):
        return self._fields

Martin Bauer's avatar
Martin Bauer committed
73
74
    def ghost_layers_of_field(self, name):
        return self._fieldInformation[name]['ghost_layers']
75

Martin Bauer's avatar
Martin Bauer committed
76
77
    def values_per_cell(self, name):
        return self._fieldInformation[name]['values_per_cell']
78

Martin Bauer's avatar
Martin Bauer committed
79
80
81
82
    def add_custom_data(self, name, cpu_creation_function,
                        gpu_creation_function=None, cpu_to_gpu_transfer_func=None, gpu_to_cpu_transfer_func=None):
        if cpu_creation_function and gpu_creation_function:
            if cpu_to_gpu_transfer_func is None or gpu_to_cpu_transfer_func is None:
83
                raise ValueError("For GPU data, both transfer functions have to be specified")
Martin Bauer's avatar
Martin Bauer committed
84
85
86
87
88
89
90
91
92
            self._custom_data_transfer_functions[name] = (cpu_to_gpu_transfer_func, gpu_to_cpu_transfer_func)

        if cpu_creation_function:
            self.blocks.addBlockData(name, cpu_creation_function)
        if gpu_creation_function:
            self.blocks.addBlockData(self.GPU_DATA_PREFIX + name, gpu_creation_function)
        self._custom_data_names.append(name)

    def add_array(self, name, values_per_cell=1, dtype=np.float64, latex_name=None, ghost_layers=None,
93
                  layout=None, cpu=True, gpu=None, alignment=False, field_type=FieldType.GENERIC):
Martin Bauer's avatar
Martin Bauer committed
94
95
        if ghost_layers is None:
            ghost_layers = self.default_ghost_layers
96
        if gpu is None:
Martin Bauer's avatar
Martin Bauer committed
97
            gpu = self.default_target == 'gpu'
Martin Bauer's avatar
Martin Bauer committed
98
        if layout is None:
Martin Bauer's avatar
Martin Bauer committed
99
            layout = self.default_layout
Martin Bauer's avatar
Martin Bauer committed
100
101
102
103
104
105
        if len(self.blocks) == 0:
            raise ValueError("Data handling expects that each process has at least one block")
        if hasattr(dtype, 'type'):
            dtype = dtype.type
        if name in self.blocks[0] or self.GPU_DATA_PREFIX + name in self.blocks[0]:
            raise ValueError("Data with this name has already been added")
106

107
108
        if alignment is False or alignment is None:
            alignment = 0
109
110
        if hasattr(values_per_cell, '__len__'):
            raise NotImplementedError("Parallel data handling does not support multiple index dimensions")
Martin Bauer's avatar
Martin Bauer committed
111

112
113
114
115
116
117
118
119
        self._fieldInformation[name] = {
            'ghost_layers': ghost_layers,
            'values_per_cell': values_per_cell,
            'layout': layout,
            'dtype': dtype,
            'alignment': alignment,
            'field_type': field_type,
        }
120

Martin Bauer's avatar
Martin Bauer committed
121
122
123
        layout_map = {'fzyx': wlb.field.Layout.fzyx, 'zyxf': wlb.field.Layout.zyxf,
                      'f': wlb.field.Layout.fzyx,
                      'SoA': wlb.field.Layout.fzyx, 'AoS': wlb.field.Layout.zyxf}
Martin Bauer's avatar
Martin Bauer committed
124
125

        if cpu:
Martin Bauer's avatar
Martin Bauer committed
126
            wlb.field.addToStorage(self.blocks, name, dtype, fSize=values_per_cell, layout=layout_map[layout],
127
                                   ghostLayers=ghost_layers, alignment=alignment)
Martin Bauer's avatar
Martin Bauer committed
128
        if gpu:
129
130
            if alignment != 0:
                raise ValueError("Alignment for walberla GPU fields not yet supported")
Martin Bauer's avatar
Martin Bauer committed
131
132
            wlb.cuda.addGpuFieldToStorage(self.blocks, self.GPU_DATA_PREFIX + name, dtype, fSize=values_per_cell,
                                          usePitchedMem=False, ghostLayers=ghost_layers, layout=layout_map[layout])
Martin Bauer's avatar
Martin Bauer committed
133
134

        if cpu and gpu:
Martin Bauer's avatar
Martin Bauer committed
135
            self._cpu_gpu_pairs.append((name, self.GPU_DATA_PREFIX + name))
Martin Bauer's avatar
Martin Bauer committed
136

Martin Bauer's avatar
Martin Bauer committed
137
138
139
140
141
        block_bb = self.blocks.getBlockCellBB(self.blocks[0])
        shape = tuple(s + 2 * ghost_layers for s in block_bb.size[:self.dim])
        index_dimensions = 1 if values_per_cell > 1 else 0
        if index_dimensions == 1:
            shape += (values_per_cell,)
Martin Bauer's avatar
Martin Bauer committed
142

Martin Bauer's avatar
Martin Bauer committed
143
        assert all(f.name != name for f in self.fields.values()), "Symbolic field with this name already exists"
144

Martin Bauer's avatar
Martin Bauer committed
145
        self.fields[name] = Field.create_generic(name, self.dim, dtype, index_dimensions, layout,
Michael Kuron's avatar
Michael Kuron committed
146
                                                 index_shape=(values_per_cell,) if index_dimensions > 0 else None,
147
                                                 field_type=field_type)
Martin Bauer's avatar
Martin Bauer committed
148
149
        self.fields[name].latex_name = latex_name
        self._field_name_to_cpu_data_name[name] = name
150
        if gpu:
Martin Bauer's avatar
Martin Bauer committed
151
            self._field_name_to_gpu_data_name[name] = self.GPU_DATA_PREFIX + name
152
        return self.fields[name]
Martin Bauer's avatar
Martin Bauer committed
153

Martin Bauer's avatar
Martin Bauer committed
154
    def has_data(self, name):
Martin Bauer's avatar
Martin Bauer committed
155
156
        return name in self._fields

157
    @property
Martin Bauer's avatar
Martin Bauer committed
158
    def array_names(self):
159
160
161
        return tuple(self.fields.keys())

    @property
Martin Bauer's avatar
Martin Bauer committed
162
163
    def custom_data_names(self):
        return tuple(self._custom_data_names)
164

Martin Bauer's avatar
Martin Bauer committed
165
166
167
    def add_array_like(self, name, name_of_template_field, latex_name=None, cpu=True, gpu=None):
        return self.add_array(name, latex_name=latex_name, cpu=cpu, gpu=gpu,
                              **self._fieldInformation[name_of_template_field])
168
169
170
171
172
173
174
175

    def swap(self, name1, name2, gpu=False):
        if gpu:
            name1 = self.GPU_DATA_PREFIX + name1
            name2 = self.GPU_DATA_PREFIX + name2
        for block in self.blocks:
            block[name1].swapDataPointers(block[name2])

Martin Bauer's avatar
Martin Bauer committed
176
177
178
179
180
181
182
183
184
185
186
187
188
189
    def iterate(self, slice_obj=None, gpu=False, ghost_layers=True, inner_ghost_layers=True):
        if ghost_layers is True:
            ghost_layers = self.default_ghost_layers
        elif ghost_layers is False:
            ghost_layers = 0
        elif isinstance(ghost_layers, str):
            ghost_layers = self.ghost_layers_of_field(ghost_layers)

        if inner_ghost_layers is True:
            inner_ghost_layers = self.default_ghost_layers
        elif inner_ghost_layers is False:
            inner_ghost_layers = 0
        elif isinstance(ghost_layers, str):
            ghost_layers = self.ghost_layers_of_field(ghost_layers)
190

191
        prefix = self.GPU_DATA_PREFIX if gpu else ""
Martin Bauer's avatar
Martin Bauer committed
192
193
194
        if slice_obj is not None:
            yield from sliced_block_iteration(self.blocks, slice_obj, inner_ghost_layers, ghost_layers,
                                              self.dim, prefix)
195
        else:
Martin Bauer's avatar
Martin Bauer committed
196
            yield from block_iteration(self.blocks, ghost_layers, self.dim, prefix)
197

Martin Bauer's avatar
Martin Bauer committed
198
199
200
    def gather_array(self, name, slice_obj=None, all_gather=False, ghost_layers=False):
        if ghost_layers is not False:
            warnings.warn("gather_array with ghost layers is only supported in serial data handling. "
201
202
                          "Array without ghost layers is returned")

Martin Bauer's avatar
Martin Bauer committed
203
204
        if slice_obj is None:
            slice_obj = tuple([slice(None, None, None)] * self.dim)
205
        if self.dim == 2:
Martin Bauer's avatar
Martin Bauer committed
206
            slice_obj = slice_obj[:2] + (0.5,) + slice_obj[2:]
207

Martin Bauer's avatar
Martin Bauer committed
208
        last_element = slice_obj[3:]
Martin Bauer's avatar
Martin Bauer committed
209

Martin Bauer's avatar
Martin Bauer committed
210
        array = wlb.field.gatherField(self.blocks, name, slice_obj[:3], all_gather)
211
212
213
214
215
        if array is None:
            return None

        if self.dim == 2:
            array = array[:, :, 0]
Martin Bauer's avatar
Martin Bauer committed
216
217
218
        if last_element and self.fields[name].index_dimensions > 0:
            array = array[..., last_element[0]]
        if self.fields[name].index_dimensions == 0:
Martin Bauer's avatar
Martin Bauer committed
219
220
            array = array[..., 0]

221
        return array
222

Martin Bauer's avatar
Martin Bauer committed
223
224
    def _normalize_arr_shape(self, arr, index_dimensions):
        if index_dimensions == 0:
225
226
227
228
229
            arr = arr[..., 0]
        if self.dim == 2:
            arr = arr[:, :, 0]
        return arr

230
    def run_kernel(self, kernel_function, **kwargs):
231
232
233
234
        for arg_dict in self.get_kernel_kwargs(kernel_function, **kwargs):
            kernel_function(**arg_dict)

    def get_kernel_kwargs(self, kernel_function, **kwargs):
Martin Bauer's avatar
Martin Bauer committed
235
        if kernel_function.ast.backend == 'gpucuda':
236
237
            name_map = self._field_name_to_gpu_data_name
            to_array = wlb.cuda.toGpuArray
238
        else:
239
240
            name_map = self._field_name_to_cpu_data_name
            to_array = wlb.field.toArray
241
        data_used_in_kernel = [(name_map[p.symbol.field_name], self.fields[p.symbol.field_name])
242
                               for p in kernel_function.parameters if
243
                               isinstance(p.symbol, FieldPointerSymbol) and p.symbol.field_name not in kwargs]
244
245
246
247
248
249
250
251
252
253
254

        result = []
        for block in self.blocks:
            field_args = {}
            for data_name, f in data_used_in_kernel:
                arr = to_array(block[data_name], withGhostLayers=[True, True, self.dim == 3])
                arr = self._normalize_arr_shape(arr, f.index_dimensions)
                field_args[f.name] = arr
            field_args.update(kwargs)
            result.append(field_args)
        return result
Martin Bauer's avatar
Martin Bauer committed
255
256
257
258

    def to_cpu(self, name):
        if name in self._custom_data_transfer_functions:
            transfer_func = self._custom_data_transfer_functions[name][1]
259
            for block in self.blocks:
Martin Bauer's avatar
Martin Bauer committed
260
                transfer_func(block[self.GPU_DATA_PREFIX + name], block[name])
261
262
        else:
            wlb.cuda.copyFieldToCpu(self.blocks, self.GPU_DATA_PREFIX + name, name)
263

Martin Bauer's avatar
Martin Bauer committed
264
265
266
    def to_gpu(self, name):
        if name in self._custom_data_transfer_functions:
            transfer_func = self._custom_data_transfer_functions[name][0]
267
            for block in self.blocks:
Martin Bauer's avatar
Martin Bauer committed
268
                transfer_func(block[self.GPU_DATA_PREFIX + name], block[name])
269
270
        else:
            wlb.cuda.copyFieldToGpu(self.blocks, self.GPU_DATA_PREFIX + name, name)
271

Martin Bauer's avatar
Martin Bauer committed
272
273
    def is_on_gpu(self, name):
        return (name, self.GPU_DATA_PREFIX + name) in self._cpu_gpu_pairs
274

Martin Bauer's avatar
Martin Bauer committed
275
    def all_to_cpu(self):
Martin Bauer's avatar
Martin Bauer committed
276
277
        for cpu_name, gpu_name in self._cpu_gpu_pairs:
            wlb.cuda.copyFieldToCpu(self.blocks, gpu_name, cpu_name)
Martin Bauer's avatar
Martin Bauer committed
278
279
        for name in self._custom_data_transfer_functions.keys():
            self.to_cpu(name)
280

Martin Bauer's avatar
Martin Bauer committed
281
    def all_to_gpu(self):
Martin Bauer's avatar
Martin Bauer committed
282
283
        for cpu_name, gpu_name in self._cpu_gpu_pairs:
            wlb.cuda.copyFieldToGpu(self.blocks, gpu_name, cpu_name)
Martin Bauer's avatar
Martin Bauer committed
284
285
        for name in self._custom_data_transfer_functions.keys():
            self.to_gpu(name)
286

287
288
    def synchronization_function_cpu(self, names, stencil=None, buffered=True, stencil_restricted=False, **_):
        return self.synchronization_function(names, stencil, 'cpu', buffered, stencil_restricted)
289

290
291
    def synchronization_function_gpu(self, names, stencil=None, buffered=True, stencil_restricted=False, **_):
        return self.synchronization_function(names, stencil, 'gpu', buffered, stencil_restricted)
292

293
    def synchronization_function(self, names, stencil=None, target=None, buffered=True, stencil_restricted=False):
294
        if target is None:
Martin Bauer's avatar
Martin Bauer committed
295
            target = self.default_target
296

297
298
299
300
301
302
        if stencil is None:
            stencil = 'D3Q27' if self.dim == 3 else 'D2Q9'

        if not hasattr(names, '__len__') or type(names) is str:
            names = [names]

Martin Bauer's avatar
Martin Bauer committed
303
        create_scheme = wlb.createUniformBufferedScheme if buffered else wlb.createUniformDirectScheme
304
        if target == 'cpu':
Martin Bauer's avatar
Martin Bauer committed
305
            create_packing = wlb.field.createPackInfo if buffered else wlb.field.createMPIDatatypeInfo
306
307
            if not buffered and stencil_restricted:
                create_packing = wlb.field.createStencilRestrictedPackInfo
Martin Bauer's avatar
Martin Bauer committed
308
309
310
        else:
            assert target == 'gpu'
            create_packing = wlb.cuda.createPackInfo if buffered else wlb.cuda.createMPIDatatypeInfo
311
312
            names = [self.GPU_DATA_PREFIX + name for name in names]

Martin Bauer's avatar
Martin Bauer committed
313
        sync_function = create_scheme(self.blocks, stencil)
314
        for name in names:
Martin Bauer's avatar
Martin Bauer committed
315
            sync_function.addDataToCommunicate(create_packing(self.blocks, name))
316

Martin Bauer's avatar
Martin Bauer committed
317
        return sync_function
Martin Bauer's avatar
Martin Bauer committed
318

Martin Bauer's avatar
Martin Bauer committed
319
320
321
    def reduce_float_sequence(self, sequence, operation, all_reduce=False):
        if all_reduce:
            return np.array(wlb.mpi.allreduceReal(sequence, self._reduce_map[operation.lower()]))
Martin Bauer's avatar
Martin Bauer committed
322
        else:
323
324
            result = np.array(wlb.mpi.reduceReal(sequence, self._reduce_map[operation.lower()], 0))
            return result if wlb.mpi.worldRank() == 0 else None
Martin Bauer's avatar
Martin Bauer committed
325

Martin Bauer's avatar
Martin Bauer committed
326
327
328
    def reduce_int_sequence(self, sequence, operation, all_reduce=False):
        if all_reduce:
            return np.array(wlb.mpi.allreduceInt(sequence, self._reduce_map[operation.lower()]))
Martin Bauer's avatar
Martin Bauer committed
329
        else:
330
331
            result = np.array(wlb.mpi.reduceInt(sequence, self._reduce_map[operation.lower()], 0))
            return result if wlb.mpi.worldRank() == 0 else None
Martin Bauer's avatar
Martin Bauer committed
332
333
334
335
336
337
338

    def create_vtk_writer(self, file_name, data_names, ghost_layers=False):
        if ghost_layers is False:
            ghost_layers = 0
        if ghost_layers is True:
            ghost_layers = min(self.ghost_layers_of_field(n) for n in data_names)
        file_name = "%s_%02d" % (file_name, ParallelDataHandling.VTK_COUNTER)
Martin Bauer's avatar
Martin Bauer committed
339
        ParallelDataHandling.VTK_COUNTER += 1
Martin Bauer's avatar
Martin Bauer committed
340
341
        output = wlb.vtk.makeOutput(self.blocks, file_name, ghostLayers=ghost_layers)
        for n in data_names:
Martin Bauer's avatar
Martin Bauer committed
342
343
344
            output.addCellDataWriter(wlb.field.createVTKWriter(self.blocks, n))
        return output

Martin Bauer's avatar
Martin Bauer committed
345
346
347
348
349
    def create_vtk_writer_for_flag_array(self, file_name, data_name, masks_to_name, ghost_layers=False):
        if ghost_layers is False:
            ghost_layers = 0
        if ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(data_name)
Martin Bauer's avatar
Martin Bauer committed
350

Martin Bauer's avatar
Martin Bauer committed
351
352
353
        output = wlb.vtk.makeOutput(self.blocks, file_name, ghostLayers=ghost_layers)
        for mask, name in masks_to_name.items():
            w = wlb.field.createBinarizationVTKWriter(self.blocks, data_name, mask, name)
Martin Bauer's avatar
Martin Bauer committed
354
355
            output.addCellDataWriter(w)
        return output
356
357
358

    @staticmethod
    def log(*args, level='INFO'):
359
360
361
362
363
364
365
        _log_map = {
            'DEVEL': wlb.log_devel,
            'RESULT': wlb.log_result,
            'INFO': wlb.log_info,
            'WARNING': wlb.log_warning,
            'PROGRESS': wlb.log_progress,
        }
366
367
        level = level.upper()
        message = " ".join(str(e) for e in args)
368
        _log_map[level](message)
369
370
371
372
373
374
375
376
377
378
379
380

    def log_on_root(self, *args, level='INFO'):
        if self.is_root:
            ParallelDataHandling.log(*args, level=level)

    @property
    def is_root(self):
        return wlb.mpi.worldRank() == 0

    @property
    def world_rank(self):
        return wlb.mpi.worldRank()
381
382
383
384
385

    def save_all(self, directory):
        if not os.path.exists(directory):
            os.mkdir(directory)
        if os.path.isfile(directory):
386
            raise RuntimeError(f"Trying to save to {directory}, but file exists already")
387
388
389
390
391
392
393

        for field_name, data_name in self._field_name_to_cpu_data_name.items():
            self.blocks.writeBlockData(data_name, os.path.join(directory, field_name + ".dat"))

    def load_all(self, directory):
        for field_name, data_name in self._field_name_to_cpu_data_name.items():
            self.blocks.readBlockData(data_name, os.path.join(directory, field_name + ".dat"))