Skip to content
Snippets Groups Projects
datahandling_interface.py 17.67 KiB
import numpy as np
from abc import ABC, abstractmethod
from typing import Optional, Callable, Sequence, Iterable, Tuple, Dict
from pystencils.field import Field


class DataHandling(ABC):
    """
    Manages the storage of arrays and maps them to a symbolic field.
    Two versions are available: a simple, pure Python implementation for single node
    simulations :py:class:SerialDataHandling and a distributed version using walberla in :py:class:ParallelDataHandling

    Keep in mind that the data can be distributed, so use the 'access' method whenever possible and avoid the
    'gather' function that has collects (parts of the) distributed data on a single process.
    """

    # ---------------------------- Adding and accessing data -----------------------------------------------------------

    @property
    @abstractmethod
    def dim(self) -> int:
        """Dimension of the domain, either 2 or 3"""

    @property
    @abstractmethod
    def shape(self) -> Tuple[int, ...]:
        """Shape of outer bounding box."""

    @property
    @abstractmethod
    def periodicity(self) -> Tuple[bool, ...]:
        """Returns tuple of booleans for x,y,(z) directions with True if domain is periodic in that direction."""

    @abstractmethod
    def add_array(self, name: str, values_per_cell: int = 1, dtype=np.float64,
                  latex_name: Optional[str]=None, ghost_layers: Optional[int] = None, layout: Optional[str] = None,
                  cpu: bool = True, gpu: Optional[bool] = None, alignment=False) -> Field:
        """Adds a (possibly distributed) array to the handling that can be accessed using the given name.

        For each array a symbolic field is available via the 'fields' dictionary

        Args:
            name: unique name that is used to access the field later
            values_per_cell: shape of the dim+1 coordinate. DataHandling supports zero or one index dimensions,
                             i.e. scalar fields and vector fields. This parameter gives the shape of the index
                             dimensions. The default value of 1 means no index dimension are created.
            dtype: data type of the array as numpy data type
            latex_name: optional, name of the symbolic field, if not given 'name' is used
            ghost_layers: number of ghost layers - if not specified a default value specified in the constructor
                         is used
            layout: memory layout of array, either structure of arrays 'SoA' or array of structures 'AoS'.
                    this is only important if values_per_cell > 1
            cpu: allocate field on the CPU
            gpu: allocate field on the GPU, if None, a GPU field is allocated if default_target is 'gpu'
            alignment: either False for no alignment, or the number of bytes to align to
        Returns:
            pystencils field, that can be used to formulate symbolic kernels
        """

    @abstractmethod
    def has_data(self, name):
        """Returns true if a field or custom data element with this name was added."""

    @abstractmethod
    def add_array_like(self, name, name_of_template_field, latex_name=None, cpu=True, gpu=None):
        """
        Adds an array with the same parameters (number of ghost layers, values_per_cell, dtype) as existing array.

        Args:
            name: name of new array
            name_of_template_field: name of array that is used as template
            latex_name: see 'add' method
            cpu: see 'add' method
            gpu: see 'add' method
        """

    @abstractmethod
    def add_custom_data(self, name: str, cpu_creation_function,
                        gpu_creation_function=None, cpu_to_gpu_transfer_func=None, gpu_to_cpu_transfer_func=None):
        """Adds custom (non-array) data to domain.

        Args:
            name: name to access data
            cpu_creation_function: function returning a new instance of the data that should be stored
            gpu_creation_function: optional, function returning a new instance, stored on GPU
            cpu_to_gpu_transfer_func: function that transfers cpu to gpu version,
                                      getting two parameters (gpu_instance, cpu_instance)
            gpu_to_cpu_transfer_func: function that transfers gpu to cpu version, getting two parameters
                                      (gpu_instance, cpu_instance)
        """

    def add_custom_class(self, name: str, class_obj, cpu: bool = True, gpu: bool = False):
        """Adds non-array data by passing a class object with optional 'to_gpu' and 'to_cpu' member functions."""
        cpu_to_gpu_transfer_func = class_obj.to_gpu if cpu and gpu and hasattr(class_obj, 'to_gpu') else None
        gpu_to_cpu_transfer_func = class_obj.to_cpu if cpu and gpu and hasattr(class_obj, 'to_cpu') else None
        self.add_custom_data(name,
                             cpu_creation_function=class_obj if cpu else None,
                             gpu_creation_function=class_obj if gpu else None,
                             cpu_to_gpu_transfer_func=cpu_to_gpu_transfer_func,
                             gpu_to_cpu_transfer_func=gpu_to_cpu_transfer_func)

    @property
    @abstractmethod
    def fields(self) -> Dict[str, Field]:
        """Dictionary mapping data name to symbolic pystencils field - use this to create pystencils kernels."""

    @property
    @abstractmethod
    def array_names(self) -> Sequence[str]:
        """Sequence of all array names."""

    @property
    @abstractmethod
    def custom_data_names(self) -> Sequence[str]:
        """Sequence of all custom data names."""

    @abstractmethod
    def ghost_layers_of_field(self, name: str) -> int:
        """Returns the number of ghost layers for a specific field/array."""

    @abstractmethod
    def values_per_cell(self, name: str) -> int:
        """Returns values_per_cell of array."""

    @abstractmethod
    def iterate(self, slice_obj=None, gpu=False, ghost_layers=None,
                inner_ghost_layers=True) -> Iterable['Block']:
        """Iterate over local part of potentially distributed data structure."""

    @abstractmethod
    def gather_array(self, name, slice_obj=None, all_gather=False, ghost_layers=False) -> Optional[np.ndarray]:
        """
        Gathers part of the domain on a local process. Whenever possible use 'access' instead, since this method copies
        the distributed data to a single process which is inefficient and may exhaust the available memory

        Args:
            name: name of the array to gather
            slice_obj: slice expression of the rectangular sub-part that should be gathered
            all_gather: if False only the root process receives the result, if True all processes
            ghost_layers: number of outer ghost layers to include (only available for serial version of data handling)

        Returns:
            gathered field that does not include any ghost layers, or None if gathered on another process
        """

    @abstractmethod
    def run_kernel(self, kernel_function, *args, **kwargs) -> None:
        """Runs a compiled pystencils kernel.

        Uses the arrays stored in the DataHandling class for all array parameters. Additional passed arguments are
        directly passed to the kernel function and override possible parameters from the DataHandling
        """

    @abstractmethod
    def swap(self, name1, name2, gpu=False):
        """Swaps data of two arrays"""

    # ------------------------------- CPU/GPU transfer -----------------------------------------------------------------

    @abstractmethod
    def to_cpu(self, name):
        """Copies GPU data of array with specified name to CPU.
        Works only if 'cpu=True' and 'gpu=True' has been used in 'add' method."""

    @abstractmethod
    def to_gpu(self, name):
        """Copies GPU data of array with specified name to GPU.
        Works only if 'cpu=True' and 'gpu=True' has been used in 'add' method."""

    @abstractmethod
    def all_to_cpu(self):
        """Copies data from GPU to CPU for all arrays that have a CPU and a GPU representation."""

    @abstractmethod
    def all_to_gpu(self):
        """Copies data from CPU to GPU for all arrays that have a CPU and a GPU representation."""

    @abstractmethod
    def is_on_gpu(self, name):
        """Checks if this data was also allocated on the GPU - does not check if this data item is in synced."""

    @abstractmethod
    def create_vtk_writer(self, file_name, data_names, ghost_layers=False) -> Callable[[int], None]:
        """VTK output for one or multiple arrays.

        Args
            file_name: base file name without extension for the VTK output
            data_names: list of array names that should be included in the vtk output
            ghost_layers: true if ghost layer information should be written out as well

        Returns:
            a function that can be called with an integer time step to write the current state
            i.e create_vtk_writer('some_file', ['velocity', 'density']) (1)
        """

    @abstractmethod
    def create_vtk_writer_for_flag_array(self, file_name, data_name, masks_to_name,
                                         ghost_layers=False) -> Callable[[int], None]:
        """VTK output for an unsigned integer field, where bits are interpreted as flags.

        Args:
            file_name: see create_vtk_writer
            data_name: name of an array with uint type
            masks_to_name: dictionary mapping integer masks to a name in the output
            ghost_layers: see create_vtk_writer

        Returns:
            functor that can be called with time step
         """

    # ------------------------------- Communication --------------------------------------------------------------------

    @abstractmethod
    def synchronization_function(self, names, stencil=None, target=None, **kwargs) -> Callable[[], None]:
        """Synchronizes ghost layers for distributed arrays.

        For serial scenario this has to be called for correct periodicity handling

        Args:
            names: what data to synchronize: name of array or sequence of names
            stencil: stencil as string defining which neighbors are synchronized e.g. 'D2Q9', 'D3Q19'
                     if None, a full synchronization (i.e. D2Q9 or D3Q27) is done
            target: either 'cpu' or 'gpu
            kwargs: implementation specific, optional optimization parameters for communication

        Returns:
            function object to run the communication
        """

    def reduce_float_sequence(self, sequence, operation, all_reduce=False) -> np.array:
        """Takes a sequence of floating point values on each process and reduces it element-wise.

        If all_reduce, all processes get the result, otherwise only the root process.
        Possible operations are 'sum', 'min', 'max'
        """

    def reduce_int_sequence(self, sequence, operation, all_reduce=False) -> np.array:
        """See function reduce_float_sequence - this is the same for integers"""

    # ------------------------------- Data access and modification -----------------------------------------------------

    def fill(self, array_name: str, val, value_idx: Optional[int] = None,
             slice_obj=None, ghost_layers=False, inner_ghost_layers=False) -> None:
        """Sets all cells to the same value.

        Args:
            array_name: name of the array that should be modified
            val: value to set the array to
            value_idx: If an array stores multiple values per cell, this index chooses which of this values to fill.
                       If None, all values are set
            slice_obj: if passed, only the defined slice is filled
            ghost_layers: True if the outer ghost layers should also be filled
            inner_ghost_layers: True if the inner ghost layers should be filled. Inner ghost layers occur only in
                                parallel setups for distributed memory communication.
        """
        if ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        if inner_ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        if value_idx is not None and self.values_per_cell(array_name) < 2:
            raise ValueError("value_idx parameter only valid for fields with values_per_cell > 1")
        for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
            if value_idx is not None:
                b[array_name][..., value_idx].fill(val)
            else:
                b[array_name].fill(val)

    def min(self, array_name, slice_obj=None, ghost_layers=False, inner_ghost_layers=False, reduce=True):
        """Returns the minimum value inside the domain or slice of the domain.

        For meaning of arguments see documentation of :func:`DataHandling.fill`.

        Returns:
            the minimum of the locally stored domain part is returned if reduce is False, otherwise the global minimum
            on the root process, on other processes None
        """
        result = None
        if ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        if inner_ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
            m = np.min(b[array_name])
            result = m if result is None else np.min(result, m)
        return self.reduce_float_sequence([result], 'min', all_reduce=True)[0] if reduce else result

    def max(self, array_name, slice_obj=None, ghost_layers=False, inner_ghost_layers=False, reduce=True):
        """Returns the maximum value inside the domain or slice of the domain.

        For argument description see :func:`DataHandling.min`
        """
        result = None
        if ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        if inner_ghost_layers is True:
            ghost_layers = self.ghost_layers_of_field(array_name)
        for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
            m = np.max(b[array_name])
            result = m if result is None else np.max(result, m)

        return self.reduce_float_sequence([result], 'max', all_reduce=True)[0] if reduce else result

    def save_all(self, file):
        """Saves all field data to disk into a file"""

    def load_all(self, file):
        """Loads all field data from disk into a file

        Works only if save_all was called with exactly the same field sizes, layouts etc.
        When run in parallel save and load has to be called with the same number of processes.
        Use for check pointing only - to store results use VTK output
        """

    def __str__(self):
        result = ""

        first_column_width = max(len("Name"), max(len(a) for a in self.array_names))
        row_format = "{:>%d}|{:>21}|{:>21}\n" % (first_column_width,)
        separator_line = "-" * (first_column_width + 21 + 21 + 2) + "\n"
        result += row_format.format("Name", "Inner (min/max)", "WithGl (min/max)")
        result += separator_line
        for arr_name in sorted(self.array_names):
            inner_min_max = (self.min(arr_name, ghost_layers=False), self.max(arr_name, ghost_layers=False))
            with_gl_min_max = (self.min(arr_name, ghost_layers=True), self.max(arr_name, ghost_layers=True))
            inner_min_max = "({0[0]:3.3g},{0[1]:3.3g})".format(inner_min_max)
            with_gl_min_max = "({0[0]:3.3g},{0[1]:3.3g})".format(with_gl_min_max)
            result += row_format.format(arr_name, inner_min_max, with_gl_min_max)
        return result

    def log(self, *args, level='INFO'):
        """Similar to print with additional information (time, rank)."""

    def log_on_root(self, *args, level='INFO'):
        """Logs only on root process. For serial setups this is equivalent to log"""

    @property
    def is_root(self):
        """Returns True for exactly one process in the simulation"""

    @property
    def world_rank(self):
        """Number of current process"""


class Block:
    """Represents locally stored part of domain.

    Instances of this class are returned by DataHandling.iterate, do not create manually!
    """

    def __init__(self, offset, local_slice):
        self._offset = offset
        self._localSlice = local_slice

    @property
    def offset(self):
        """Offset of the current block in global coordinates (where lower ghost layers have negative indices)."""
        return self._offset

    @property
    def cell_index_arrays(self):
        """Global coordinate mesh-grid of cell coordinates.

        Cell indices start at 0 at the first inner cell, lower ghost layers have negative indices
        """
        mesh_grid_params = [offset + np.arange(width, dtype=np.int32)
                            for offset, width in zip(self.offset, self.shape)]
        return np.meshgrid(*mesh_grid_params, indexing='ij', copy=False)

    @property
    def midpoint_arrays(self):
        """Global coordinate mesh-grid of cell midpoints which are shifted by 0.5 compared to cell indices."""
        mesh_grid_params = [offset + 0.5 + np.arange(width, dtype=float)
                            for offset, width in zip(self.offset, self.shape)]
        return np.meshgrid(*mesh_grid_params, indexing='ij', copy=False)

    @property
    def shape(self):
        """Shape of the fields (potentially including ghost layers)."""
        return tuple(s.stop - s.start for s in self._localSlice[:len(self._offset)])

    @property
    def global_slice(self):
        """Slice in global coordinates."""
        return tuple(slice(off, off + size) for off, size in zip(self._offset, self.shape))

    def __getitem__(self, data_name: str) -> np.ndarray:
        raise NotImplementedError()