datahandling_interface.py 17.7 KB
Newer Older
1
2
import numpy as np
from abc import ABC, abstractmethod
Martin Bauer's avatar
Martin Bauer committed
3
4
from typing import Optional, Callable, Sequence, Iterable, Tuple, Dict
from pystencils.field import Field
5
6
7
8
9
10


class DataHandling(ABC):
    """
    Manages the storage of arrays and maps them to a symbolic field.
    Two versions are available: a simple, pure Python implementation for single node
Martin Bauer's avatar
Martin Bauer committed
11
    simulations :py:class:SerialDataHandling and a distributed version using walberla in :py:class:ParallelDataHandling
12
13
14
15
16
17
18
19
20

    Keep in mind that the data can be distributed, so use the 'access' method whenever possible and avoid the
    'gather' function that has collects (parts of the) distributed data on a single process.
    """

    # ---------------------------- Adding and accessing data -----------------------------------------------------------

    @property
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
21
    def dim(self) -> int:
22
23
        """Dimension of the domain, either 2 or 3"""

Martin Bauer's avatar
Martin Bauer committed
24
25
    @property
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
26
27
    def shape(self) -> Tuple[int, ...]:
        """Shape of outer bounding box."""
Martin Bauer's avatar
Martin Bauer committed
28
29
30

    @property
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
31
32
    def periodicity(self) -> Tuple[bool, ...]:
        """Returns tuple of booleans for x,y,(z) directions with True if domain is periodic in that direction."""
Martin Bauer's avatar
Martin Bauer committed
33

34
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
35
    def add_array(self, name: str, values_per_cell: int = 1, dtype=np.float64,
Martin Bauer's avatar
Martin Bauer committed
36
                  latex_name: Optional[str] = None, ghost_layers: Optional[int] = None, layout: Optional[str] = None,
37
                  cpu: bool = True, gpu: Optional[bool] = None, alignment=False) -> Field:
Martin Bauer's avatar
Martin Bauer committed
38
39
        """Adds a (possibly distributed) array to the handling that can be accessed using the given name.

40
41
        For each array a symbolic field is available via the 'fields' dictionary

Martin Bauer's avatar
Martin Bauer committed
42
43
44
45
46
47
48
49
50
51
52
53
        Args:
            name: unique name that is used to access the field later
            values_per_cell: shape of the dim+1 coordinate. DataHandling supports zero or one index dimensions,
                             i.e. scalar fields and vector fields. This parameter gives the shape of the index
                             dimensions. The default value of 1 means no index dimension are created.
            dtype: data type of the array as numpy data type
            latex_name: optional, name of the symbolic field, if not given 'name' is used
            ghost_layers: number of ghost layers - if not specified a default value specified in the constructor
                         is used
            layout: memory layout of array, either structure of arrays 'SoA' or array of structures 'AoS'.
                    this is only important if values_per_cell > 1
            cpu: allocate field on the CPU
Martin Bauer's avatar
Martin Bauer committed
54
            gpu: allocate field on the GPU, if None, a GPU field is allocated if default_target is 'gpu'
55
            alignment: either False for no alignment, or the number of bytes to align to
Martin Bauer's avatar
Martin Bauer committed
56
57
        Returns:
            pystencils field, that can be used to formulate symbolic kernels
58
59
60
        """

    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
61
62
    def has_data(self, name):
        """Returns true if a field or custom data element with this name was added."""
63
64

    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
65
    def add_array_like(self, name, name_of_template_field, latex_name=None, cpu=True, gpu=None):
66
        """
Martin Bauer's avatar
Martin Bauer committed
67
68
69
70
71
72
73
74
        Adds an array with the same parameters (number of ghost layers, values_per_cell, dtype) as existing array.

        Args:
            name: name of new array
            name_of_template_field: name of array that is used as template
            latex_name: see 'add' method
            cpu: see 'add' method
            gpu: see 'add' method
75
76
77
        """

    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
78
79
80
81
82
83
84
85
86
    def add_custom_data(self, name: str, cpu_creation_function,
                        gpu_creation_function=None, cpu_to_gpu_transfer_func=None, gpu_to_cpu_transfer_func=None):
        """Adds custom (non-array) data to domain.

        Args:
            name: name to access data
            cpu_creation_function: function returning a new instance of the data that should be stored
            gpu_creation_function: optional, function returning a new instance, stored on GPU
            cpu_to_gpu_transfer_func: function that transfers cpu to gpu version,
Martin Bauer's avatar
Martin Bauer committed
87
                                      getting two parameters (gpu_instance, cpu_instance)
Martin Bauer's avatar
Martin Bauer committed
88
            gpu_to_cpu_transfer_func: function that transfers gpu to cpu version, getting two parameters
Martin Bauer's avatar
Martin Bauer committed
89
                                      (gpu_instance, cpu_instance)
90
91
        """

Martin Bauer's avatar
Martin Bauer committed
92
93
94
95
96
97
98
99
100
    def add_custom_class(self, name: str, class_obj, cpu: bool = True, gpu: bool = False):
        """Adds non-array data by passing a class object with optional 'to_gpu' and 'to_cpu' member functions."""
        cpu_to_gpu_transfer_func = class_obj.to_gpu if cpu and gpu and hasattr(class_obj, 'to_gpu') else None
        gpu_to_cpu_transfer_func = class_obj.to_cpu if cpu and gpu and hasattr(class_obj, 'to_cpu') else None
        self.add_custom_data(name,
                             cpu_creation_function=class_obj if cpu else None,
                             gpu_creation_function=class_obj if gpu else None,
                             cpu_to_gpu_transfer_func=cpu_to_gpu_transfer_func,
                             gpu_to_cpu_transfer_func=gpu_to_cpu_transfer_func)
101
102
103

    @property
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
104
105
    def fields(self) -> Dict[str, Field]:
        """Dictionary mapping data name to symbolic pystencils field - use this to create pystencils kernels."""
106

107
108
    @property
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
109
110
    def array_names(self) -> Sequence[str]:
        """Sequence of all array names."""
111
112
113

    @property
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
114
115
    def custom_data_names(self) -> Sequence[str]:
        """Sequence of all custom data names."""
116

117
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
118
119
    def ghost_layers_of_field(self, name: str) -> int:
        """Returns the number of ghost layers for a specific field/array."""
120

121
    @abstractmethod
122
    def values_per_cell(self, name: str) -> Tuple[int, ...]:
Martin Bauer's avatar
Martin Bauer committed
123
        """Returns values_per_cell of array."""
124

125
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
126
127
128
    def iterate(self, slice_obj=None, gpu=False, ghost_layers=None,
                inner_ghost_layers=True) -> Iterable['Block']:
        """Iterate over local part of potentially distributed data structure."""
129
130

    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
131
    def gather_array(self, name, slice_obj=None, all_gather=False, ghost_layers=False) -> Optional[np.ndarray]:
132
133
134
135
        """
        Gathers part of the domain on a local process. Whenever possible use 'access' instead, since this method copies
        the distributed data to a single process which is inefficient and may exhaust the available memory

Martin Bauer's avatar
Martin Bauer committed
136
137
138
139
140
141
142
143
        Args:
            name: name of the array to gather
            slice_obj: slice expression of the rectangular sub-part that should be gathered
            all_gather: if False only the root process receives the result, if True all processes
            ghost_layers: number of outer ghost layers to include (only available for serial version of data handling)

        Returns:
            gathered field that does not include any ghost layers, or None if gathered on another process
144
145
146
        """

    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
147
148
149
150
151
    def run_kernel(self, kernel_function, *args, **kwargs) -> None:
        """Runs a compiled pystencils kernel.

        Uses the arrays stored in the DataHandling class for all array parameters. Additional passed arguments are
        directly passed to the kernel function and override possible parameters from the DataHandling
152
153
        """

154
    @abstractmethod
155
    def swap(self, name1, name2, gpu=False):
156
157
        """Swaps data of two arrays"""

158
159
160
    # ------------------------------- CPU/GPU transfer -----------------------------------------------------------------

    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
161
    def to_cpu(self, name):
162
        """Copies GPU data of array with specified name to CPU.
Martin Bauer's avatar
Martin Bauer committed
163
        Works only if 'cpu=True' and 'gpu=True' has been used in 'add' method."""
164
165

    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
166
    def to_gpu(self, name):
167
        """Copies GPU data of array with specified name to GPU.
Martin Bauer's avatar
Martin Bauer committed
168
        Works only if 'cpu=True' and 'gpu=True' has been used in 'add' method."""
169
170

    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
171
172
    def all_to_cpu(self):
        """Copies data from GPU to CPU for all arrays that have a CPU and a GPU representation."""
173
174

    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
175
176
    def all_to_gpu(self):
        """Copies data from CPU to GPU for all arrays that have a CPU and a GPU representation."""
177

178
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
179
180
    def is_on_gpu(self, name):
        """Checks if this data was also allocated on the GPU - does not check if this data item is in synced."""
Martin Bauer's avatar
Martin Bauer committed
181
182

    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
183
184
185
186
187
188
189
190
191
192
    def create_vtk_writer(self, file_name, data_names, ghost_layers=False) -> Callable[[int], None]:
        """VTK output for one or multiple arrays.

        Args
            file_name: base file name without extension for the VTK output
            data_names: list of array names that should be included in the vtk output
            ghost_layers: true if ghost layer information should be written out as well

        Returns:
            a function that can be called with an integer time step to write the current state
Martin Bauer's avatar
Martin Bauer committed
193
            i.e create_vtk_writer('some_file', ['velocity', 'density']) (1)
Martin Bauer's avatar
Martin Bauer committed
194
        """
Martin Bauer's avatar
Martin Bauer committed
195

Martin Bauer's avatar
Martin Bauer committed
196
    @abstractmethod
Martin Bauer's avatar
Martin Bauer committed
197
198
199
200
201
202
203
204
205
206
207
208
    def create_vtk_writer_for_flag_array(self, file_name, data_name, masks_to_name,
                                         ghost_layers=False) -> Callable[[int], None]:
        """VTK output for an unsigned integer field, where bits are interpreted as flags.

        Args:
            file_name: see create_vtk_writer
            data_name: name of an array with uint type
            masks_to_name: dictionary mapping integer masks to a name in the output
            ghost_layers: see create_vtk_writer

        Returns:
            functor that can be called with time step
Martin Bauer's avatar
Martin Bauer committed
209
210
         """

211
212
    # ------------------------------- Communication --------------------------------------------------------------------

213
    @abstractmethod
214
    def synchronization_function(self, names, stencil=None, target=None, **kwargs) -> Callable[[], None]:
Martin Bauer's avatar
Martin Bauer committed
215
216
217
218
219
220
221
222
223
224
225
226
227
        """Synchronizes ghost layers for distributed arrays.

        For serial scenario this has to be called for correct periodicity handling

        Args:
            names: what data to synchronize: name of array or sequence of names
            stencil: stencil as string defining which neighbors are synchronized e.g. 'D2Q9', 'D3Q19'
                     if None, a full synchronization (i.e. D2Q9 or D3Q27) is done
            target: either 'cpu' or 'gpu
            kwargs: implementation specific, optional optimization parameters for communication

        Returns:
            function object to run the communication
228
229
        """

Martin Bauer's avatar
Martin Bauer committed
230
231
232
233
    def reduce_float_sequence(self, sequence, operation, all_reduce=False) -> np.array:
        """Takes a sequence of floating point values on each process and reduces it element-wise.

        If all_reduce, all processes get the result, otherwise only the root process.
Martin Bauer's avatar
Martin Bauer committed
234
235
236
        Possible operations are 'sum', 'min', 'max'
        """

Martin Bauer's avatar
Martin Bauer committed
237
238
239
240
241
    def reduce_int_sequence(self, sequence, operation, all_reduce=False) -> np.array:
        """See function reduce_float_sequence - this is the same for integers"""

    # ------------------------------- Data access and modification -----------------------------------------------------

242
    def fill(self, array_name: str, val, value_idx: Optional[Tuple[int, ...]] = None,
Martin Bauer's avatar
Martin Bauer committed
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
             slice_obj=None, ghost_layers=False, inner_ghost_layers=False) -> None:
        """Sets all cells to the same value.

        Args:
            array_name: name of the array that should be modified
            val: value to set the array to
            value_idx: If an array stores multiple values per cell, this index chooses which of this values to fill.
                       If None, all values are set
            slice_obj: if passed, only the defined slice is filled
            ghost_layers: True if the outer ghost layers should also be filled
            inner_ghost_layers: True if the inner ghost layers should be filled. Inner ghost layers occur only in
                                parallel setups for distributed memory communication.
        """
        if ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        if inner_ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
260

Martin Bauer's avatar
Martin Bauer committed
261
262
        for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
            if value_idx is not None:
263
264
265
266
                if isinstance(value_idx, int):
                    value_idx = (value_idx,)
                assert len(value_idx) == len(self.values_per_cell(array_name))
                b[array_name][(Ellipsis, *value_idx)].fill(val)
267
            else:
Martin Bauer's avatar
Martin Bauer committed
268
269
270
271
                b[array_name].fill(val)

    def min(self, array_name, slice_obj=None, ghost_layers=False, inner_ghost_layers=False, reduce=True):
        """Returns the minimum value inside the domain or slice of the domain.
272

Martin Bauer's avatar
Martin Bauer committed
273
274
275
276
277
278
        For meaning of arguments see documentation of :func:`DataHandling.fill`.

        Returns:
            the minimum of the locally stored domain part is returned if reduce is False, otherwise the global minimum
            on the root process, on other processes None
        """
279
        result = None
Martin Bauer's avatar
Martin Bauer committed
280
281
282
283
284
285
        if ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        if inner_ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
            m = np.min(b[array_name])
286
            result = m if result is None else np.min(result, m)
287
        return self.reduce_float_sequence([result], 'min', all_reduce=True)[0] if reduce else result
Martin Bauer's avatar
Martin Bauer committed
288
289
290

    def max(self, array_name, slice_obj=None, ghost_layers=False, inner_ghost_layers=False, reduce=True):
        """Returns the maximum value inside the domain or slice of the domain.
291

Martin Bauer's avatar
Martin Bauer committed
292
293
        For argument description see :func:`DataHandling.min`
        """
294
        result = None
Martin Bauer's avatar
Martin Bauer committed
295
296
297
298
299
300
        if ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        if inner_ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
            m = np.max(b[array_name])
301
            result = m if result is None else np.max(result, m)
302
303

        return self.reduce_float_sequence([result], 'max', all_reduce=True)[0] if reduce else result
304

305
306
307
308
309
310
311
312
313
314
315
    def save_all(self, file):
        """Saves all field data to disk into a file"""

    def load_all(self, file):
        """Loads all field data from disk into a file

        Works only if save_all was called with exactly the same field sizes, layouts etc.
        When run in parallel save and load has to be called with the same number of processes.
        Use for check pointing only - to store results use VTK output
        """

316
317
318
    def __str__(self):
        result = ""

Martin Bauer's avatar
Martin Bauer committed
319
320
321
322
323
        first_column_width = max(len("Name"), max(len(a) for a in self.array_names))
        row_format = "{:>%d}|{:>21}|{:>21}\n" % (first_column_width,)
        separator_line = "-" * (first_column_width + 21 + 21 + 2) + "\n"
        result += row_format.format("Name", "Inner (min/max)", "WithGl (min/max)")
        result += separator_line
Martin Bauer's avatar
Martin Bauer committed
324
325
326
        for arr_name in sorted(self.array_names):
            inner_min_max = (self.min(arr_name, ghost_layers=False), self.max(arr_name, ghost_layers=False))
            with_gl_min_max = (self.min(arr_name, ghost_layers=True), self.max(arr_name, ghost_layers=True))
Martin Bauer's avatar
Martin Bauer committed
327
328
            inner_min_max = "({0[0]:3.3g},{0[1]:3.3g})".format(inner_min_max)
            with_gl_min_max = "({0[0]:3.3g},{0[1]:3.3g})".format(with_gl_min_max)
Martin Bauer's avatar
Martin Bauer committed
329
            result += row_format.format(arr_name, inner_min_max, with_gl_min_max)
330
        return result
Martin Bauer's avatar
Martin Bauer committed
331

332
333
334
335
336
337
338
339
340
341
342
343
344
345
    def log(self, *args, level='INFO'):
        """Similar to print with additional information (time, rank)."""

    def log_on_root(self, *args, level='INFO'):
        """Logs only on root process. For serial setups this is equivalent to log"""

    @property
    def is_root(self):
        """Returns True for exactly one process in the simulation"""

    @property
    def world_rank(self):
        """Number of current process"""

Martin Bauer's avatar
Martin Bauer committed
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386

class Block:
    """Represents locally stored part of domain.

    Instances of this class are returned by DataHandling.iterate, do not create manually!
    """

    def __init__(self, offset, local_slice):
        self._offset = offset
        self._localSlice = local_slice

    @property
    def offset(self):
        """Offset of the current block in global coordinates (where lower ghost layers have negative indices)."""
        return self._offset

    @property
    def cell_index_arrays(self):
        """Global coordinate mesh-grid of cell coordinates.

        Cell indices start at 0 at the first inner cell, lower ghost layers have negative indices
        """
        mesh_grid_params = [offset + np.arange(width, dtype=np.int32)
                            for offset, width in zip(self.offset, self.shape)]
        return np.meshgrid(*mesh_grid_params, indexing='ij', copy=False)

    @property
    def midpoint_arrays(self):
        """Global coordinate mesh-grid of cell midpoints which are shifted by 0.5 compared to cell indices."""
        mesh_grid_params = [offset + 0.5 + np.arange(width, dtype=float)
                            for offset, width in zip(self.offset, self.shape)]
        return np.meshgrid(*mesh_grid_params, indexing='ij', copy=False)

    @property
    def shape(self):
        """Shape of the fields (potentially including ghost layers)."""
        return tuple(s.stop - s.start for s in self._localSlice[:len(self._offset)])

    @property
    def global_slice(self):
        """Slice in global coordinates."""
Martin Bauer's avatar
Martin Bauer committed
387
        return tuple(slice(off, off + size) for off, size in zip(self._offset, self.shape))
Martin Bauer's avatar
Martin Bauer committed
388
389
390

    def __getitem__(self, data_name: str) -> np.ndarray:
        raise NotImplementedError()