Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 3991 additions and 0 deletions
import warnings
import numpy as np
try:
import pyximport
pyximport.install(language_level=3)
cython_funcs_available = True
except ImportError:
cython_funcs_available = False
if cython_funcs_available:
from pystencils.boundaries.createindexlistcython import (
create_boundary_neighbor_index_list_2d,
create_boundary_neighbor_index_list_3d,
create_boundary_cell_index_list_2d,
create_boundary_cell_index_list_3d,
)
boundary_index_array_coordinate_names = ["x", "y", "z"]
direction_member_name = "dir"
default_index_array_dtype = np.int32
def numpy_data_type_for_boundary_object(boundary_object, dim):
coordinate_names = boundary_index_array_coordinate_names[:dim]
return np.dtype(
[(name, default_index_array_dtype) for name in coordinate_names]
+ [(direction_member_name, default_index_array_dtype)]
+ [(i[0], i[1].numpy_dtype) for i in boundary_object.additional_data],
align=True,
)
def _create_index_list_python(
flag_field_arr,
boundary_mask,
fluid_mask,
stencil,
single_link,
inner_or_boundary=False,
nr_of_ghost_layers=None,
):
if inner_or_boundary and nr_of_ghost_layers is None:
raise ValueError(
"If inner_or_boundary is set True the number of ghost layers "
"around the inner domain has to be specified"
)
if nr_of_ghost_layers is None:
nr_of_ghost_layers = 0
coordinate_names = boundary_index_array_coordinate_names[
: len(flag_field_arr.shape)
]
index_arr_dtype = np.dtype(
[(name, default_index_array_dtype) for name in coordinate_names]
+ [(direction_member_name, default_index_array_dtype)]
)
# boundary cells are extracted via np.where. To ensure continous memory access in the compute kernel these cells
# have to be sorted.
boundary_cells = np.transpose(np.nonzero(flag_field_arr == boundary_mask))
for i in range(len(flag_field_arr.shape)):
boundary_cells = boundary_cells[boundary_cells[:, i].argsort(kind="mergesort")]
# First a set is created to save all fluid cells which are near boundary
fluid_cells = set()
for cell in boundary_cells:
cell = tuple(cell)
for dir_idx, direction in enumerate(stencil):
neighbor_cell = tuple(
[cell_i + dir_i for cell_i, dir_i in zip(cell, direction)]
)
# prevent out ouf bounds access. If boundary cell is at the border, some stencil directions would be out.
if any(
not 0 + nr_of_ghost_layers <= e < upper - nr_of_ghost_layers
for e, upper in zip(neighbor_cell, flag_field_arr.shape)
):
continue
if flag_field_arr[neighbor_cell] & fluid_mask:
fluid_cells.add(neighbor_cell)
# then this is set is transformed to a list to make it sortable. This ensures continoous memory access later.
fluid_cells = list(fluid_cells)
if len(flag_field_arr.shape) == 3:
fluid_cells.sort(key=lambda tup: (tup[-1], tup[-2], tup[0]))
else:
fluid_cells.sort(key=lambda tup: (tup[-1], tup[0]))
cells_to_iterate = fluid_cells if inner_or_boundary else boundary_cells
checkmask = boundary_mask if inner_or_boundary else fluid_mask
result = []
for cell in cells_to_iterate:
cell = tuple(cell)
sum_cells = np.zeros(len(cell))
for dir_idx, direction in enumerate(stencil):
neighbor_cell = tuple(
[cell_i + dir_i for cell_i, dir_i in zip(cell, direction)]
)
# prevent out ouf bounds access. If boundary cell is at the border, some stencil directions would be out.
if any(
not 0 <= e < upper
for e, upper in zip(neighbor_cell, flag_field_arr.shape)
):
continue
if flag_field_arr[neighbor_cell] & checkmask:
if single_link:
sum_cells += np.array(direction)
else:
result.append(tuple(cell) + (dir_idx,))
# the discrete normal direction is the one which gives the maximum inner product to the stencil direction
if single_link and any(sum_cells != 0):
idx = np.argmax(np.inner(sum_cells, stencil))
result.append(tuple(cell) + (idx,))
return np.array(result, dtype=index_arr_dtype)
def create_boundary_index_list(
flag_field,
stencil,
boundary_mask,
fluid_mask,
nr_of_ghost_layers=1,
inner_or_boundary=True,
single_link=False,
):
"""Creates a numpy array storing links (connections) between domain cells and boundary cells.
Args:
flag_field: flag integer array where boundary and domain cells are marked (interpreted as bit vector)
stencil: list of directions, for possible links. When single_link is set to true the order matters, because
then only the first link is added to the list
boundary_mask: cells where (cell & mask) is true are considered boundary cells
fluid_mask: cells where (cell & mask) is true are considered fluid/inner cells cells
nr_of_ghost_layers: only relevant if neighbors is True
inner_or_boundary: if true, the result contains the cell coordinates of the domain cells -
if false the boundary cells are listed
single_link: if true only the link in normal direction to this cell is reported
"""
dim = len(flag_field.shape)
coordinate_names = boundary_index_array_coordinate_names[:dim]
index_arr_dtype = np.dtype(
[(name, default_index_array_dtype) for name in coordinate_names]
+ [(direction_member_name, default_index_array_dtype)]
)
stencil = np.array(stencil, dtype=default_index_array_dtype)
args = (
flag_field,
nr_of_ghost_layers,
boundary_mask,
fluid_mask,
stencil,
single_link,
)
args_no_gl = (flag_field, boundary_mask, fluid_mask, stencil, single_link)
if cython_funcs_available:
if dim == 2:
if inner_or_boundary:
idx_list = create_boundary_neighbor_index_list_2d(*args)
else:
idx_list = create_boundary_cell_index_list_2d(*args_no_gl)
elif dim == 3:
if inner_or_boundary:
idx_list = create_boundary_neighbor_index_list_3d(*args)
else:
idx_list = create_boundary_cell_index_list_3d(*args_no_gl)
else:
raise ValueError("Flag field has to be a 2 or 3 dimensional numpy array")
return np.array(idx_list, dtype=index_arr_dtype)
else:
if flag_field.size > 1e6:
warnings.warn(
"Boundary setup may take very long! Consider installing cython to speed it up"
)
return _create_index_list_python(
*args_no_gl,
inner_or_boundary=inner_or_boundary,
nr_of_ghost_layers=nr_of_ghost_layers,
)
def create_boundary_index_array(
flag_field,
stencil,
boundary_mask,
fluid_mask,
boundary_object,
nr_of_ghost_layers=1,
inner_or_boundary=True,
single_link=False,
):
idx_array = create_boundary_index_list(
flag_field,
stencil,
boundary_mask,
fluid_mask,
nr_of_ghost_layers,
inner_or_boundary,
single_link,
)
dim = len(flag_field.shape)
if boundary_object.additional_data:
coordinate_names = boundary_index_array_coordinate_names[:dim]
index_arr_dtype = numpy_data_type_for_boundary_object(boundary_object, dim)
extended_idx_field = np.empty(len(idx_array), dtype=index_arr_dtype)
for prop in coordinate_names + ["dir"]:
extended_idx_field[prop] = idx_array[prop]
idx_array = extended_idx_field
return idx_array
# cython: language_level=3str
import cython
ctypedef fused IntegerType:
short
int
long
long long
unsigned short
unsigned int
unsigned long
@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False) # turn off negative index wrapping for entire function
def create_boundary_neighbor_index_list_2d(object[IntegerType, ndim=2] flag_field,
int nr_of_ghost_layers, IntegerType boundary_mask, IntegerType fluid_mask,
object[int, ndim=2] stencil, int single_link):
cdef int xs, ys, x, y
cdef int dirIdx, num_directions, dx, dy
cdef int sum_x, sum_y
cdef float dot, maxn
cdef int calculated_idx
xs, ys = flag_field.shape
boundary_index_list = []
num_directions = stencil.shape[0]
for y in range(nr_of_ghost_layers, ys - nr_of_ghost_layers):
for x in range(nr_of_ghost_layers, xs - nr_of_ghost_layers):
sum_x = 0; sum_y = 0;
if flag_field[x, y] & fluid_mask:
for dirIdx in range(num_directions):
dx = stencil[dirIdx,0]; dy = stencil[dirIdx,1]
if flag_field[x + dx, y + dy] & boundary_mask:
if single_link:
sum_x += dx; sum_y += dy;
else:
boundary_index_list.append((x, y, dirIdx))
dot = 0; maxn = 0; calculated_idx = 0
if single_link and (sum_x != 0 or sum_y != 0):
for dirIdx in range(num_directions):
dx = stencil[dirIdx, 0]; dy = stencil[dirIdx, 1];
dot = dx * sum_x + dy * sum_y
if dot > maxn:
maxn = dot
calculated_idx = dirIdx
boundary_index_list.append((x, y, calculated_idx))
return boundary_index_list
@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False) # turn off negative index wrapping for entire function
def create_boundary_neighbor_index_list_3d(object[IntegerType, ndim=3] flag_field,
int nr_of_ghost_layers, IntegerType boundary_mask, IntegerType fluid_mask,
object[int, ndim=2] stencil, int single_link):
cdef int xs, ys, zs, x, y, z
cdef int dirIdx, num_directions, dx, dy, dz
cdef int sum_x, sum_y, sum_z
cdef float dot, maxn
cdef int calculated_idx
xs, ys, zs = flag_field.shape
boundary_index_list = []
num_directions = stencil.shape[0]
for z in range(nr_of_ghost_layers, zs - nr_of_ghost_layers):
for y in range(nr_of_ghost_layers, ys - nr_of_ghost_layers):
for x in range(nr_of_ghost_layers, xs - nr_of_ghost_layers):
sum_x = 0; sum_y = 0; sum_z = 0
if flag_field[x, y, z] & fluid_mask:
for dirIdx in range(num_directions):
dx = stencil[dirIdx,0]; dy = stencil[dirIdx,1]; dz = stencil[dirIdx,2]
if flag_field[x + dx, y + dy, z + dz] & boundary_mask:
if single_link:
sum_x += dx; sum_y += dy; sum_z += dz
else:
boundary_index_list.append((x, y, z, dirIdx))
dot = 0; maxn = 0; calculated_idx = 0
if single_link and (sum_x != 0 or sum_y != 0 or sum_z != 0):
for dirIdx in range(num_directions):
dx = stencil[dirIdx, 0]; dy = stencil[dirIdx, 1]; dz = stencil[dirIdx, 2]
dot = dx * sum_x + dy * sum_y + dz * sum_z
if dot > maxn:
maxn = dot
calculated_idx = dirIdx
boundary_index_list.append((x, y, z, calculated_idx))
return boundary_index_list
@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False) # turn off negative index wrapping for entire function
def create_boundary_cell_index_list_2d(object[IntegerType, ndim=2] flag_field,
IntegerType boundary_mask, IntegerType fluid_mask,
object[int, ndim=2] stencil, int single_link):
cdef int xs, ys, x, y
cdef int dirIdx, num_directions, dx, dy
cdef int sum_x, sum_y
cdef float dot, maxn
cdef int calculated_idx
xs, ys = flag_field.shape
boundary_index_list = []
num_directions = stencil.shape[0]
for y in range(0, ys):
for x in range(0, xs):
sum_x = 0; sum_y = 0;
if flag_field[x, y] & boundary_mask:
for dirIdx in range(num_directions):
dx = stencil[dirIdx,0]; dy = stencil[dirIdx,1]
if 0 <= x + dx < xs and 0 <= y + dy < ys:
if flag_field[x + dx, y + dy] & fluid_mask:
if single_link:
sum_x += dx; sum_y += dy
else:
boundary_index_list.append((x, y, dirIdx))
dot = 0; maxn = 0; calculated_idx = 0
if single_link and (sum_x != 0 or sum_y != 0):
for dirIdx in range(num_directions):
dx = stencil[dirIdx, 0]; dy = stencil[dirIdx, 1]
dot = dx * sum_x + dy * sum_y
if dot > maxn:
maxn = dot
calculated_idx = dirIdx
boundary_index_list.append((x, y, calculated_idx))
return boundary_index_list
@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False) # turn off negative index wrapping for entire function
def create_boundary_cell_index_list_3d(object[IntegerType, ndim=3] flag_field,
IntegerType boundary_mask, IntegerType fluid_mask,
object[int, ndim=2] stencil, int single_link):
cdef int xs, ys, zs, x, y, z
cdef int dirIdx, num_directions, dx, dy, dz
cdef int sum_x, sum_y, sum_z
cdef float dot, maxn
cdef int calculated_idx
xs, ys, zs = flag_field.shape
boundary_index_list = []
num_directions = stencil.shape[0]
for z in range(0, zs):
for y in range(0, ys):
for x in range(0, xs):
sum_x = 0; sum_y = 0; sum_z = 0
if flag_field[x, y, z] & boundary_mask:
for dirIdx in range(num_directions):
dx = stencil[dirIdx, 0]; dy = stencil[dirIdx, 1]; dz = stencil[dirIdx, 2]
if 0 <= x + dx < xs and 0 <= y + dy < ys and 0 <= z + dz < zs:
if flag_field[x + dx, y + dy, z + dz] & fluid_mask:
if single_link:
sum_x += dx; sum_y += dy; sum_z += dz
else:
boundary_index_list.append((x, y, z, dirIdx))
dot = 0; maxn = 0; calculated_idx=0
if single_link and (sum_x != 0 or sum_y !=0 or sum_z !=0):
for dirIdx in range(num_directions):
dx = stencil[dirIdx, 0]; dy = stencil[dirIdx, 1]; dz = stencil[dirIdx, 2]
dot = dx*sum_x + dy*sum_y + dz*sum_z
if dot > maxn:
maxn = dot
calculated_idx = dirIdx
boundary_index_list.append((x, y, z, calculated_idx))
return boundary_index_list
\ No newline at end of file
import sympy as sp
from pystencils.boundaries.boundaryhandling import DEFAULT_FLAG_TYPE
from pystencils.typing import TypedSymbol, create_type
from pystencils.field import Field
from pystencils.integer_functions import bitwise_and
def add_neumann_boundary(eqs, fields, flag_field, boundary_flag="neumann_flag", inverse_flag=False):
"""
Replaces all neighbor accesses by flag field guarded accesses.
If flag in neighboring cell is set, the center value is used instead
Args:
eqs: list of equations containing field accesses to direct neighbors
fields: fields for which the Neumann boundary should be applied
flag_field: integer field marking boundary cells
boundary_flag: if flag field has value 'boundary_flag' (no bit operations yet)
the cell is assumed to be boundary
inverse_flag: if true, boundary cells are where flag field has not the value of boundary_flag
Returns:
list of equations with guarded field accesses
"""
if not hasattr(fields, "__len__"):
fields = [fields]
fields = set(fields)
if type(boundary_flag) is str:
boundary_flag = TypedSymbol(boundary_flag, dtype=create_type(DEFAULT_FLAG_TYPE))
substitutions = {}
for eq in eqs:
for fa in eq.atoms(Field.Access):
if fa.field not in fields:
continue
if not all(offset in (-1, 0, 1) for offset in fa.offsets):
raise ValueError("Works only for single neighborhood stencils")
if all(offset == 0 for offset in fa.offsets):
continue
if inverse_flag:
condition = sp.Eq(bitwise_and(flag_field[tuple(fa.offsets)], boundary_flag), 0)
else:
condition = sp.Ne(bitwise_and(flag_field[tuple(fa.offsets)], boundary_flag), 0)
center = fa.field(*fa.index)
substitutions[fa] = sp.Piecewise((center, condition), (fa, True))
return [eq.subs(substitutions) for eq in eqs]
import os
from collections.abc import Hashable
from functools import partial, wraps
from itertools import chain
from functools import lru_cache as memorycache
from joblib import Memory
from appdirs import user_cache_dir
if 'PYSTENCILS_CACHE_DIR' in os.environ:
cache_dir = os.environ['PYSTENCILS_CACHE_DIR']
else:
cache_dir = user_cache_dir('pystencils')
disk_cache = Memory(cache_dir, verbose=False).cache
disk_cache_no_fallback = disk_cache
def _wrapper(wrapped_func, cached_func, *args, **kwargs):
if all(isinstance(a, Hashable) for a in chain(args, kwargs.values())):
return cached_func(*args, **kwargs)
else:
return wrapped_func(*args, **kwargs)
def memorycache_if_hashable(maxsize=128, typed=False):
def wrapper(func):
return partial(_wrapper, func, memorycache(maxsize, typed)(func))
return wrapper
def sharedmethodcache(cache_id: str):
"""Decorator for memoization of instance methods, allowing multiple methods to use the same cache.
This decorator caches results of instance methods per instantiated object of the surrounding class.
It allows multiple methods to use the same cache, by passing them the same `cache_id` string.
Cached values are stored in a dictionary, which is added as a member `self.<cache_id>` to the
`self` object instance. Make sure that this doesn't cause any naming conflicts with other members!
Of course, for this to be useful, said methods must have the same signature (up to additional kwargs)
and must return the same result when called with the same arguments."""
def _decorator(user_method):
@wraps(user_method)
def _decorated_func(self, *args, **kwargs):
objdict = self.__dict__
cache = objdict.setdefault(cache_id, dict())
key = args
for item in kwargs.items():
key += item
if key not in cache:
result = user_method(self, *args, **kwargs)
cache[key] = result
return result
else:
return cache[key]
return _decorated_func
return _decorator
def clear_cache():
"""
Clears the pystencils cache created by joblib.
"""
memory = Memory(cache_dir, verbose=0)
memory.clear(warn=False)
# Disable memory cache:
# disk_cache = lambda o: o
# disk_cache_no_fallback = lambda o: o
from copy import copy
from collections import defaultdict
from dataclasses import dataclass, field
from types import MappingProxyType
from typing import Union, Tuple, List, Dict, Callable, Any, DefaultDict, Iterable
from pystencils import Target, Backend, Field
from pystencils.typing.typed_sympy import BasicType
from pystencils.typing.utilities import collate_types
import numpy as np
# TODO: There exists DTypeLike in NumPy which would be better than type for type hinting, to new at the moment
# from numpy.typing import DTypeLike
# TODO: CreateKernelConfig is bloated think of more classes better usage, factory whatever ...
# Proposition: CreateKernelConfigs Classes for different targets?
@dataclass
class CreateKernelConfig:
"""
**Below all parameters for the CreateKernelConfig are explained**
"""
target: Target = Target.CPU
"""
All targets are defined in :class:`pystencils.enums.Target`
"""
backend: Backend = None
"""
All backends are defined in :class:`pystencils.enums.Backend`
"""
function_name: str = 'kernel'
"""
Name of the generated function - only important if generated code is written out
"""
data_type: Union[type, str, DefaultDict[str, BasicType], Dict[str, BasicType]] = np.float64
"""
Data type used for all untyped symbols (i.e. non-fields), can also be a dict from symbol name to type.
If specified as a dict ideally a defaultdict is used to define a default value for symbols not listed in the
dict. If a plain dict is provided it will be transformed into a defaultdict internally. The default value
will then be specified via type collation then.
"""
default_number_float: Union[type, str, BasicType] = None
"""
Data type used for all untyped floating point numbers (i.e. 0.5). By default the value of data_type is used.
If data_type is given as a defaultdict its default_factory is used.
"""
default_number_int: Union[type, str, BasicType] = np.int64
"""
Data type used for all untyped integer numbers (i.e. 1)
"""
iteration_slice: Tuple = None
"""
Rectangular subset to iterate over, if not specified the complete non-ghost layer part of the field is iterated over
"""
ghost_layers: Union[bool, int, List[Tuple[int]]] = None
"""
A single integer specifies the ghost layer count at all borders, can also be a sequence of
pairs ``[(x_lower_gl, x_upper_gl), .... ]``. These layers are excluded from the iteration.
If left to default, the number of ghost layers is determined automatically from the assignments.
"""
cpu_openmp: Union[bool, int] = False
"""
`True` or number of threads for OpenMP parallelization, `False` for no OpenMP. If set to `True`, the maximum number
of available threads will be chosen.
"""
cpu_vectorize_info: Dict = None
"""
A dictionary with keys, 'vector_instruction_set', 'assume_aligned' and 'nontemporal'
for documentation of these parameters see vectorize function. Example:
'{'instruction_set': 'avx512', 'assume_aligned': True, 'nontemporal':True}'
"""
cpu_blocking: Tuple[int] = None
"""
A tuple of block sizes or `None` if no blocking should be applied
"""
omp_single_loop: bool = True
"""
If OpenMP is active: whether multiple outer loops are permitted
"""
base_pointer_specification: Union[List[Iterable[str]], List[Iterable[int]]] = None
"""
Specification of how many and which intermediate pointers are created for a field access.
For example [ (0), (2,3,)] creates on base pointer for coordinates 2 and 3 and writes the offset for coordinate
zero directly in the field access. These specifications are defined dependent on the loop ordering.
This function translates more readable version into the specification above.
For more information see: `pystencils.transformations.create_intermediate_base_pointer`
"""
gpu_indexing: str = 'block'
"""
Either 'block' or 'line' , or custom indexing class, see `pystencils.gpu.AbstractIndexing`
"""
gpu_indexing_params: MappingProxyType = field(default_factory=lambda: MappingProxyType({}))
"""
Dict with indexing parameters (constructor parameters of indexing class)
e.g. for 'block' one can specify '{'block_size': (20, 20, 10) }'.
"""
# TODO Markus rework this docstring
default_assignment_simplifications: bool = False
"""
If `True` default simplifications are first performed on the Assignments. If problems occur during the
simplification a warning will be thrown.
Furthermore, it is essential to know that this is a two-stage process. The first stage of the process acts
on the level of the `pystencils.AssignmentCollection`. In this part,
`pystencil.simp.create_simplification_strategy` from pystencils.simplificationfactory will be used to
apply optimisations like insertion of constants to
remove pressure from the registers. Thus the first part of the optimisations can only be executed if
an `AssignmentCollection` is passed. The second part of the optimisation acts on the level of each Assignment
individually. In this stage, all optimisations from `sympy.codegen.rewriting.optims_c99` are applied
to each Assignment. Thus this stage can also be applied if a list of Assignments is passed.
"""
cpu_prepend_optimizations: List[Callable] = field(default_factory=list)
"""
List of extra optimizations to perform first on the AST.
"""
use_auto_for_assignments: bool = False
"""
If set to `True`, auto can be used in the generated code for data types. This makes the type system more robust.
"""
index_fields: List[Field] = None
"""
List of index fields, i.e. 1D fields with struct data type. If not `None`, `create_index_kernel`
instead of `create_domain_kernel` is used.
"""
coordinate_names: Tuple[str, Any] = ('x', 'y', 'z')
"""
Name of the coordinate fields in the struct data type.
"""
allow_double_writes: bool = False
"""
If True, don't check if every field is only written at a single location. This is required
for example for kernels that are compiled with loop step sizes > 1, that handle multiple
cells at once. Use with care!
"""
skip_independence_check: bool = False
"""
By default the assignment list is checked for read/write independence. This means fields are only written at
locations where they are read. Doing so guarantees thread safety. In some cases e.g. for
periodicity kernel, this can not be assured and does the check needs to be deactivated. Use with care!
"""
class DataTypeFactory:
"""Because of pickle, we need to have a nested class, instead of a lambda in __post_init__"""
def __init__(self, dt):
self.dt = dt
def __call__(self):
return BasicType(self.dt)
def _check_type(self, dtype_to_check):
if isinstance(dtype_to_check, str) and (dtype_to_check == 'float' or dtype_to_check == 'int'):
self._typing_error()
if isinstance(dtype_to_check, type) and not hasattr(dtype_to_check, "dtype"):
# NumPy-types are also of type 'type'. However, they have more properties
self._typing_error()
@staticmethod
def _typing_error():
raise ValueError("It is not possible to use python types (float, int) for datatypes because these "
"types are ambiguous. For example float will map to double. "
"Also the string version like 'float' is not allowed, e.g. use 'float64' instead")
def __post_init__(self):
# ---- Legacy parameters
if not isinstance(self.target, Target):
raise ValueError("target must be provided by the 'Target' enum")
# ---- Auto Backend
if not self.backend:
if self.target == Target.CPU:
self.backend = Backend.C
elif self.target == Target.GPU:
self.backend = Backend.CUDA
else:
raise NotImplementedError(f'Target {self.target} has no default backend')
if not isinstance(self.backend, Backend):
raise ValueError("backend must be provided by the 'Backend' enum")
# Normalise data types
for dtype in [self.data_type, self.default_number_float, self.default_number_int]:
self._check_type(dtype)
if not isinstance(self.data_type, dict):
dt = copy(self.data_type) # The copy is necessary because BasicType has sympy shinanigans
self.data_type = defaultdict(self.DataTypeFactory(dt))
if isinstance(self.data_type, dict) and not isinstance(self.data_type, defaultdict):
for dtype in self.data_type.values():
self._check_type(dtype)
dt = collate_types([BasicType(dtype) for dtype in self.data_type.values()])
dtype_dict = self.data_type
self.data_type = defaultdict(self.DataTypeFactory(dt), dtype_dict)
assert isinstance(self.data_type, defaultdict), "At this point data_type must be a defaultdict!"
for dtype in self.data_type.values():
self._check_type(dtype)
self._check_type(self.data_type.default_factory())
if self.default_number_float is None:
self.default_number_float = self.data_type.default_factory()
if not isinstance(self.default_number_float, BasicType):
self.default_number_float = BasicType(self.default_number_float)
if not isinstance(self.default_number_int, BasicType):
self.default_number_int = BasicType(self.default_number_int)
from pystencils.cpu.cpujit import make_python_function
from pystencils.cpu.kernelcreation import add_openmp, create_indexed_kernel, create_kernel, add_pragmas
__all__ = ['create_kernel', 'create_indexed_kernel', 'add_openmp', 'add_pragmas', 'make_python_function']
r"""
*pystencils* automatically searches for a compiler, so in most cases no explicit configuration is required.
On Linux make sure that 'gcc' and 'g++' are installed and in your path.
On Windows a recent Visual Studio installation is required.
In case anything does not work as expected or a special compiler should be used, changes can be specified
in a configuration file.
*pystencils* looks for a configuration file in JSON format at the following locations in the listed order.
1. at the path specified in the environment variable ``PYSTENCILS_CONFIG``
2. in the current working direction for a file named ``pystencils.json``
3. or in your home directory at ``~/.config/pystencils/config.json`` (Linux) or
``%HOMEPATH%\.pystencils\config.json`` (Windows)
If no configuration file is found, a default configuration is created at the above-mentioned location in your home.
So run *pystencils* once, then edit the created configuration file.
Compiler Config (Linux)
-----------------------
- **'os'**: should be detected automatically as 'linux'
- **'command'**: path to C++ compiler (defaults to 'g++')
- **'flags'**: space separated list of compiler flags. Make sure to activate OpenMP in your compiler
- **'restrict_qualifier'**: the 'restrict' qualifier is not standardized across compilers.
For most Linux compilers the qualifier is ``__restrict__``
Compiler Config (Windows)
-------------------------
*pystencils* uses the mechanism of *setuptools.msvc* to search for a compilation environment.
Then 'cl.exe' is used to compile.
- **'os'**: should be detected automatically as 'windows'
- **'msvc_version'**: either a version number, year number, 'auto' or 'latest' for automatic detection of latest
installed version or 'setuptools' for setuptools-based detection. Alternatively path to folder
where Visual Studio is installed. This path has to contain a file called 'vcvarsall.bat'
- **'arch'**: 'x86' or 'x64'
- **'flags'**: flags passed to 'cl.exe', make sure OpenMP is activated
- **'restrict_qualifier'**: the 'restrict' qualifier is not standardized across compilers.
For Windows compilers the qualifier should be ``__restrict``
"""
from appdirs import user_cache_dir, user_config_dir
from collections import OrderedDict
import hashlib
import importlib.util
import json
import os
import platform
import shutil
import subprocess
import sysconfig
import tempfile
import textwrap
import time
import warnings
import pathlib
import numpy as np
from pystencils import FieldType
from pystencils.astnodes import LoopOverCoordinate
from pystencils.backends.cbackend import generate_c, get_headers
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets
from pystencils.cpu.msvc_detection import get_environment
from pystencils.include import get_pystencils_include_path
from pystencils.kernel_wrapper import KernelWrapper
from pystencils.typing import BasicType, CastFunc, VectorType, VectorMemoryAccess
from pystencils.utils import atomic_file_write, recursive_dict_update
def make_python_function(kernel_function_node, custom_backend=None):
"""
Creates C code from the abstract syntax tree, compiles it and makes it accessible as Python function
The parameters of the kernel are:
- numpy arrays for each field used in the kernel. The keyword argument name is the name of the field
- all symbols which are not defined in the kernel itself are expected as parameters
:param kernel_function_node: the abstract syntax tree
:param custom_backend: use own custom printer for code generation
:return: kernel functor
"""
result = compile_and_load(kernel_function_node, custom_backend)
return result
def set_config(config):
"""
Override the configuration provided in config file
Configuration of compiler parameters:
If this function is not called the configuration is taken from a config file in JSON format which
is searched in the following locations in the order specified:
- at location provided in environment variable PYSTENCILS_CONFIG (if this variable exists)
- a file called ".pystencils.json" in the current working directory
- ~/.pystencils.json in your home
If none of these files exist a file ~/.pystencils.json is created with a default configuration using
the GNU 'g++'
An example JSON file with all possible keys. If not all keys are specified, default values are used
``
{
'compiler' :
{
"command": "/software/intel/2017/bin/icpc",
"flags": "-Ofast -DNDEBUG -fPIC -march=native -fopenmp",
"env": {
"LM_PROJECT": "iwia",
}
}
}
``
"""
global _config
_config = config.copy()
def get_configuration_file_path():
config_path_in_home = os.path.join(user_config_dir('pystencils'), 'config.json')
# 1) Read path from environment variable if found
if 'PYSTENCILS_CONFIG' in os.environ:
return os.environ['PYSTENCILS_CONFIG']
# 2) Look in current directory for pystencils.json
elif os.path.exists("pystencils.json"):
return "pystencils.json"
# 3) Try ~/.pystencils.json
elif os.path.exists(config_path_in_home):
return config_path_in_home
else:
return config_path_in_home
def create_folder(path, is_file):
if is_file:
path = os.path.split(path)[0]
try:
os.makedirs(path)
except os.error:
pass
def read_config():
if platform.system().lower() == 'linux':
default_compiler_config = OrderedDict([
('os', 'linux'),
('command', 'g++'),
('flags', '-Ofast -DNDEBUG -fPIC -march=native -fopenmp -std=c++11'),
('restrict_qualifier', '__restrict__')
])
if platform.machine().startswith('ppc64') or platform.machine() == 'arm64':
default_compiler_config['flags'] = default_compiler_config['flags'].replace('-march=native',
'-mcpu=native')
elif platform.system().lower() == 'windows':
default_compiler_config = OrderedDict([
('os', 'windows'),
('msvc_version', 'latest'),
('arch', 'x64'),
('flags', '/Ox /fp:fast /OpenMP /arch:avx'),
('restrict_qualifier', '__restrict')
])
if platform.machine() == 'ARM64':
default_compiler_config['arch'] = 'ARM64'
default_compiler_config['flags'] = default_compiler_config['flags'].replace(' /arch:avx', '')
elif platform.system().lower() == 'darwin':
default_compiler_config = OrderedDict([
('os', 'darwin'),
('command', 'clang++'),
('flags', '-Ofast -DNDEBUG -fPIC -march=native -Xclang -fopenmp -std=c++11'),
('restrict_qualifier', '__restrict__')
])
if platform.machine() == 'arm64':
if 'sme' in get_supported_instruction_sets():
flag = '-march=armv8.7-a+sme '
else:
flag = ''
default_compiler_config['flags'] = default_compiler_config['flags'].replace('-march=native ', flag)
for libomp in ['/opt/local/lib/libomp/libomp.dylib', '/usr/local/lib/libomp.dylib',
'/opt/homebrew/lib/libomp.dylib']:
if os.path.exists(libomp):
default_compiler_config['flags'] += ' ' + libomp
break
else:
raise NotImplementedError('Generation of default compiler flags for %s is not implemented' %
(platform.system(),))
default_cache_config = OrderedDict([
('object_cache', os.path.join(user_cache_dir('pystencils'), 'objectcache')),
('clear_cache_on_start', False),
])
default_config = OrderedDict([('compiler', default_compiler_config),
('cache', default_cache_config)])
from fasteners import InterProcessLock
config_path = pathlib.Path(get_configuration_file_path())
config_path.parent.mkdir(parents=True, exist_ok=True)
config = default_config.copy()
lockfile = config_path.with_suffix(config_path.suffix + ".lock")
with InterProcessLock(lockfile):
if config_path.exists():
with open(config_path, 'r') as json_config_file:
loaded_config = json.load(json_config_file)
config = recursive_dict_update(config, loaded_config)
else:
with open(config_path, 'w') as f:
json.dump(config, f, indent=4)
if config['cache']['object_cache'] is not False:
config['cache']['object_cache'] = os.path.expanduser(config['cache']['object_cache']).format(pid=os.getpid())
clear_cache_on_start = False
cache_status_file = os.path.join(config['cache']['object_cache'], 'last_config.json')
if os.path.exists(cache_status_file):
# check if compiler config has changed
last_config = json.load(open(cache_status_file, 'r'))
if set(last_config.items()) != set(config['compiler'].items()):
clear_cache_on_start = True
else:
for key in last_config.keys():
if last_config[key] != config['compiler'][key]:
clear_cache_on_start = True
if config['cache']['clear_cache_on_start'] or clear_cache_on_start:
shutil.rmtree(config['cache']['object_cache'], ignore_errors=True)
create_folder(config['cache']['object_cache'], False)
with tempfile.NamedTemporaryFile('w', dir=os.path.dirname(cache_status_file), delete=False) as f:
json.dump(config['compiler'], f, indent=4)
os.replace(f.name, cache_status_file)
if config['compiler']['os'] == 'windows':
msvc_env = get_environment(config['compiler']['msvc_version'], config['compiler']['arch'])
if 'env' not in config['compiler']:
config['compiler']['env'] = {}
config['compiler']['env'].update(msvc_env)
return config
_config = read_config()
def get_compiler_config():
return _config['compiler']
def get_cache_config():
return _config['cache']
def add_or_change_compiler_flags(flags):
if not isinstance(flags, list) and not isinstance(flags, tuple):
flags = [flags]
compiler_config = get_compiler_config()
cache_config = get_cache_config()
cache_config['object_cache'] = False # disable cache
for flag in flags:
flag = flag.strip()
if '=' in flag:
base = flag.split('=')[0].strip()
else:
base = flag
new_flags = [c for c in compiler_config['flags'].split() if not c.startswith(base)]
new_flags.append(flag)
compiler_config['flags'] = ' '.join(new_flags)
def clear_cache():
cache_config = get_cache_config()
if cache_config['object_cache'] is not False:
shutil.rmtree(cache_config['object_cache'], ignore_errors=True)
create_folder(cache_config['object_cache'], False)
# TODO don't hardcode C type. [1] of tuple output
type_mapping = {
np.float32: ('PyFloat_AsDouble', 'float'),
np.float64: ('PyFloat_AsDouble', 'double'),
np.int16: ('PyLong_AsLong', 'int16_t'),
np.int32: ('PyLong_AsLong', 'int32_t'),
np.int64: ('PyLong_AsLong', 'int64_t'),
np.uint16: ('PyLong_AsUnsignedLong', 'uint16_t'),
np.uint32: ('PyLong_AsUnsignedLong', 'uint32_t'),
np.uint64: ('PyLong_AsUnsignedLong', 'uint64_t'),
}
template_extract_scalar = """
PyObject * obj_{name} = PyDict_GetItemString(kwargs, "{name}");
if( obj_{name} == NULL) {{ PyErr_SetString(PyExc_TypeError, "Keyword argument '{name}' missing"); return NULL; }};
{target_type} {name} = ({target_type}) {extract_function}( obj_{name} );
if( PyErr_Occurred() ) {{ return NULL; }}
"""
template_extract_array = """
PyObject * obj_{name} = PyDict_GetItemString(kwargs, "{name}");
if( obj_{name} == NULL) {{ PyErr_SetString(PyExc_TypeError, "Keyword argument '{name}' missing"); return NULL; }};
Py_buffer buffer_{name};
int buffer_{name}_res = PyObject_GetBuffer(obj_{name}, &buffer_{name}, PyBUF_STRIDES | PyBUF_WRITABLE | PyBUF_FORMAT);
if (buffer_{name}_res == -1) {{ return NULL; }}
"""
template_release_buffer = """
PyBuffer_Release(&buffer_{name});
"""
template_function_boilerplate = """
static PyObject * {func_name}(PyObject * self, PyObject * args, PyObject * kwargs)
{{
if( !kwargs || !PyDict_Check(kwargs) ) {{
PyErr_SetString(PyExc_TypeError, "No keyword arguments passed");
return NULL;
}}
{pre_call_code}
kernel_{func_name}({parameters});
{post_call_code}
Py_RETURN_NONE;
}}
"""
template_check_array = """
if(!({cond})) {{
PyErr_SetString(PyExc_ValueError, "Wrong {what} of array {name}. Expected {expected}");
return NULL;
}}
"""
template_size_check = """
if(!({cond})) {{
PyErr_SetString(PyExc_TypeError, "Arrays must have same shape"); return NULL;
}}"""
template_module_boilerplate = """
static PyMethodDef method_definitions[] = {{
{method_definitions}
{{NULL, NULL, 0, NULL}}
}};
static struct PyModuleDef module_definition = {{
PyModuleDef_HEAD_INIT,
"{module_name}", /* name of module */
NULL, /* module documentation, may be NULL */
-1, /* size of per-interpreter state of the module,
or -1 if the module keeps state in global variables. */
method_definitions
}};
PyMODINIT_FUNC
PyInit_{module_name}(void)
{{
return PyModule_Create(&module_definition);
}}
"""
def equal_size_check(fields):
fields = list(fields)
if len(fields) <= 1:
return ""
ref_field = fields[0]
cond = [f"(buffer_{field_to_test.name}.shape[{i}] == buffer_{ref_field.name}.shape[{i}])"
for field_to_test in fields[1:]
for i in range(fields[0].spatial_dimensions)]
cond = " && ".join(cond)
return template_size_check.format(cond=cond)
def create_function_boilerplate_code(parameter_info, name, ast_node, insert_checks=True):
pre_call_code = ""
parameters = []
post_call_code = ""
variable_sized_normal_fields = set()
variable_sized_index_fields = set()
for param in parameter_info:
if param.is_field_pointer:
field = param.fields[0]
pre_call_code += template_extract_array.format(name=field.name)
post_call_code += template_release_buffer.format(name=field.name)
parameters.append(f"({str(field.dtype)} *)buffer_{field.name}.buf")
if insert_checks:
np_dtype = field.dtype.numpy_dtype
item_size = np_dtype.itemsize
aligned = False
if ast_node.assignments:
aligned = any([a.lhs.args[2] for a in ast_node.assignments
if hasattr(a, 'lhs') and isinstance(a.lhs, CastFunc)
and hasattr(a.lhs, 'dtype') and isinstance(a.lhs.dtype, VectorType)])
if ast_node.instruction_set and aligned:
byte_width = ast_node.instruction_set['width'] * item_size
if 'cachelineZero' in ast_node.instruction_set:
has_openmp, has_nontemporal = False, False
for loop in ast_node.atoms(LoopOverCoordinate):
has_openmp = has_openmp or any(['#pragma omp' in p for p in loop.prefix_lines])
has_nontemporal = has_nontemporal or any([a.args[0].field == field and a.args[3] for a in
loop.atoms(VectorMemoryAccess)])
if has_openmp and has_nontemporal:
cl_size = ast_node.instruction_set['cachelineSize']
byte_width = f"({cl_size}) < SIZE_MAX ? ({cl_size}) : ({byte_width})"
offset = max(max(ast_node.ghost_layers)) * item_size
offset_cond = f"(((uintptr_t) buffer_{field.name}.buf) + {offset}) % ({byte_width}) == 0"
message = str(offset) + ". This is probably due to a different number of ghost_layers chosen for " \
"the arrays and the kernel creation. If the number of ghost layers for " \
"the kernel creation is not specified it will choose a suitable value " \
"automatically. This value might not " \
"be compatible with the allocated arrays."
if type(byte_width) is not int:
message += " Note that when both OpenMP and non-temporal stores are enabled, alignment to the "\
"cacheline size is required."
pre_call_code += template_check_array.format(cond=offset_cond, what="offset", name=field.name,
expected=message)
if (np_dtype.isbuiltin and FieldType.is_generic(field)
and not np.issubdtype(field.dtype.numpy_dtype, np.complexfloating)):
dtype_cond = f"buffer_{field.name}.format[0] == '{field.dtype.numpy_dtype.char}'"
pre_call_code += template_check_array.format(cond=dtype_cond, what="data type", name=field.name,
expected=str(field.dtype.numpy_dtype))
item_size_cond = f"buffer_{field.name}.itemsize == {item_size}"
pre_call_code += template_check_array.format(cond=item_size_cond, what="itemsize", name=field.name,
expected=item_size)
if field.has_fixed_shape:
shape_cond = [f"buffer_{field.name}.shape[{i}] == {s}"
for i, s in enumerate(field.spatial_shape)]
shape_cond = " && ".join(shape_cond)
pre_call_code += template_check_array.format(cond=shape_cond, what="shape", name=field.name,
expected=str(field.shape))
expected_strides = [e * item_size for e in field.spatial_strides]
stride_check_code = "(buffer_{name}.strides[{i}] == {s} || buffer_{name}.shape[{i}]<=1)"
strides_cond = " && ".join([stride_check_code.format(s=s, i=i, name=field.name)
for i, s in enumerate(expected_strides)])
pre_call_code += template_check_array.format(cond=strides_cond, what="strides", name=field.name,
expected=str(expected_strides))
else:
if FieldType.is_generic(field):
variable_sized_normal_fields.add(field)
elif FieldType.is_indexed(field):
variable_sized_index_fields.add(field)
elif param.is_field_stride:
field = param.fields[0]
item_size = field.dtype.numpy_dtype.itemsize
parameters.append(f"buffer_{field.name}.strides[{param.symbol.coordinate}] / {item_size}")
elif param.is_field_shape:
parameters.append(f"buffer_{param.field_name}.shape[{param.symbol.coordinate}]")
else:
extract_function, target_type = type_mapping[param.symbol.dtype.numpy_dtype.type]
pre_call_code += template_extract_scalar.format(extract_function=extract_function,
target_type=target_type,
name=param.symbol.name)
parameters.append(param.symbol.name)
pre_call_code += equal_size_check(variable_sized_normal_fields)
pre_call_code += equal_size_check(variable_sized_index_fields)
pre_call_code = textwrap.indent(pre_call_code, ' ')
post_call_code = textwrap.indent(post_call_code, ' ')
return template_function_boilerplate.format(func_name=name, pre_call_code=pre_call_code,
post_call_code=post_call_code, parameters=", ".join(parameters))
def create_module_boilerplate_code(module_name, names):
method_definition = '{{"{name}", (PyCFunction){name}, METH_VARARGS | METH_KEYWORDS, ""}},'
method_definitions = "\n".join([method_definition.format(name=name) for name in names])
return template_module_boilerplate.format(module_name=module_name, method_definitions=method_definitions)
def load_kernel_from_file(module_name, function_name, path):
try:
spec = importlib.util.spec_from_file_location(name=module_name, location=path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
except ImportError:
warnings.warn(f"Could not load {path}, trying on more time in 5 seconds ...")
time.sleep(5)
spec = importlib.util.spec_from_file_location(name=module_name, location=path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return getattr(mod, function_name)
def run_compile_step(command):
compiler_config = get_compiler_config()
config_env = compiler_config['env'] if 'env' in compiler_config else {}
compile_environment = os.environ.copy()
compile_environment.update(config_env)
try:
shell = True if compiler_config['os'].lower() == 'windows' else False
subprocess.check_output(command, env=compile_environment, stderr=subprocess.STDOUT, shell=shell)
except subprocess.CalledProcessError as e:
print(" ".join(command))
print(e.output.decode('utf8'))
raise e
class ExtensionModuleCode:
def __init__(self, module_name='generated', custom_backend=None):
self.module_name = module_name
self._ast_nodes = []
self._function_names = []
self._custom_backend = custom_backend
self._code_string = str()
self._code_hash = None
def add_function(self, ast, name=None):
self._ast_nodes.append(ast)
self._function_names.append(name if name is not None else ast.function_name)
def create_code_string(self, restrict_qualifier, function_prefix):
self._code_string = str()
headers = {'<math.h>', '<stdint.h>'}
for ast in self._ast_nodes:
for field in ast.fields_accessed:
if isinstance(field.dtype, BasicType) and field.dtype.is_half():
# Add the half precision header only if half precision numbers occur in the AST
headers.add('"half_precision.h"')
headers.update(get_headers(ast))
header_list = sorted(headers)
header_list.insert(0, '"Python.h"')
ps_headers = [os.path.join(os.path.dirname(__file__), '..', 'include', h[1:-1]) for h in header_list
if os.path.exists(os.path.join(os.path.dirname(__file__), '..', 'include', h[1:-1]))]
header_hash = b''.join([hashlib.sha256(open(h, 'rb').read()).digest() for h in ps_headers])
includes = "\n".join([f"#include {include_file}" for include_file in header_list])
self._code_string += includes
self._code_string += "\n"
self._code_string += f"#define RESTRICT {restrict_qualifier} \n"
self._code_string += f"#define FUNC_PREFIX {function_prefix}"
self._code_string += "\n"
for ast, name in zip(self._ast_nodes, self._function_names):
old_name = ast.function_name
ast.function_name = f"kernel_{name}"
self._code_string += generate_c(ast, custom_backend=self._custom_backend)
self._code_string += create_function_boilerplate_code(ast.get_parameters(), name, ast)
ast.function_name = old_name
self._code_hash = "mod_" + hashlib.sha256(self._code_string.encode() + header_hash).hexdigest()
self._code_string += create_module_boilerplate_code(self._code_hash, self._function_names)
def get_hash_of_code(self):
assert self._code_string, "The code must be generated first"
return self._code_hash
def write_to_file(self, file):
assert self._code_string, "The code must be generated first"
print(self._code_string, file=file)
def compile_module(code, code_hash, base_dir, compile_flags=None):
if compile_flags is None:
compile_flags = []
compiler_config = get_compiler_config()
extra_flags = ['-I' + sysconfig.get_paths()['include'], '-I' + get_pystencils_include_path()] + compile_flags
if compiler_config['os'].lower() == 'windows':
lib_suffix = '.pyd'
object_suffix = '.obj'
windows = True
else:
lib_suffix = '.so'
object_suffix = '.o'
windows = False
src_file = os.path.join(base_dir, code_hash + ".cpp")
lib_file = os.path.join(base_dir, code_hash + lib_suffix)
object_file = os.path.join(base_dir, code_hash + object_suffix)
if not os.path.exists(object_file):
try:
with open(src_file, 'x') as f:
code.write_to_file(f)
except FileExistsError:
pass
if windows:
compile_cmd = ['cl.exe', '/c', '/EHsc'] + compiler_config['flags'].split()
compile_cmd += [*extra_flags, src_file, '/Fo' + object_file]
run_compile_step(compile_cmd)
else:
with atomic_file_write(object_file) as file_name:
compile_cmd = [compiler_config['command'], '-c'] + compiler_config['flags'].split()
compile_cmd += [*extra_flags, '-o', file_name, src_file]
run_compile_step(compile_cmd)
# Linking
if windows:
config_vars = sysconfig.get_config_vars()
py_lib = os.path.join(config_vars["installed_base"], "libs",
f"python{config_vars['py_version_nodot']}.lib")
run_compile_step(['link.exe', py_lib, '/DLL', '/out:' + lib_file, object_file])
elif platform.system().lower() == 'darwin':
with atomic_file_write(lib_file) as file_name:
run_compile_step([compiler_config['command'], '-shared', object_file, '-o', file_name, '-undefined',
'dynamic_lookup']
+ compiler_config['flags'].split())
else:
with atomic_file_write(lib_file) as file_name:
run_compile_step([compiler_config['command'], '-shared', object_file, '-o', file_name]
+ compiler_config['flags'].split())
return lib_file
def compile_and_load(ast, custom_backend=None):
cache_config = get_cache_config()
compiler_config = get_compiler_config()
if compiler_config['os'].lower() == 'windows':
function_prefix = '__declspec(dllexport)'
elif ast.instruction_set and 'function_prefix' in ast.instruction_set:
function_prefix = ast.instruction_set['function_prefix']
else:
function_prefix = ''
code = ExtensionModuleCode(custom_backend=custom_backend)
code.add_function(ast, ast.function_name)
code.create_code_string(compiler_config['restrict_qualifier'], function_prefix)
code_hash_str = code.get_hash_of_code()
compile_flags = []
if ast.instruction_set and 'compile_flags' in ast.instruction_set:
compile_flags = ast.instruction_set['compile_flags']
if cache_config['object_cache'] is False:
with tempfile.TemporaryDirectory() as base_dir:
lib_file = compile_module(code, code_hash_str, base_dir, compile_flags=compile_flags)
result = load_kernel_from_file(code_hash_str, ast.function_name, lib_file)
else:
lib_file = compile_module(code, code_hash_str, base_dir=cache_config['object_cache'],
compile_flags=compile_flags)
result = load_kernel_from_file(code_hash_str, ast.function_name, lib_file)
return KernelWrapper(result, ast.get_parameters(), ast)
import sympy as sp
import pystencils.astnodes as ast
from pystencils.config import CreateKernelConfig
from pystencils.enums import Target, Backend
from pystencils.astnodes import Block, KernelFunction, LoopOverCoordinate, SympyAssignment
from pystencils.cpu.cpujit import make_python_function
from pystencils.typing import StructType, TypedSymbol, create_type
from pystencils.typing.transformations import add_types
from pystencils.field import Field, FieldType
from pystencils.node_collection import NodeCollection
from pystencils.transformations import (
filtered_tree_iteration, iterate_loops_by_depth, get_base_buffer_index, get_optimal_loop_ordering,
make_loop_over_domain, add_outer_loop_over_indexed_elements,
move_constants_before_loop, parse_base_pointer_info, resolve_buffer_accesses,
resolve_field_accesses, split_inner_loop)
def create_kernel(assignments: NodeCollection,
config: CreateKernelConfig) -> KernelFunction:
"""Creates an abstract syntax tree for a kernel function, by taking a list of update rules.
Loops are created according to the field accesses in the equations.
Args:
assignments: list of sympy equations, containing accesses to :class:`pystencils.field.Field`.
Defining the update rules of the kernel
config: create kernel config
Returns:
AST node representing a function, that can be printed as C or CUDA code
"""
function_name = config.function_name
iteration_slice = config.iteration_slice
ghost_layers = config.ghost_layers
fields_written = assignments.bound_fields
fields_read = assignments.rhs_fields
split_groups = ()
if 'split_groups' in assignments.simplification_hints:
split_groups = assignments.simplification_hints['split_groups']
assignments = assignments.all_assignments
# TODO Cleanup: move add_types to create_domain_kernel or create_kernel
assignments = add_types(assignments, config)
all_fields = fields_read.union(fields_written)
read_only_fields = set([f.name for f in fields_read - fields_written])
buffers = set([f for f in all_fields if FieldType.is_buffer(f)])
fields_without_buffers = all_fields - buffers
body = ast.Block(assignments)
loop_order = get_optimal_loop_ordering(fields_without_buffers)
loop_node, ghost_layer_info = make_loop_over_domain(body, iteration_slice=iteration_slice,
ghost_layers=ghost_layers, loop_order=loop_order)
loop_node = add_outer_loop_over_indexed_elements(loop_node)
ast_node = KernelFunction(loop_node, Target.CPU, Backend.C, compile_function=make_python_function,
ghost_layers=ghost_layer_info, function_name=function_name, assignments=assignments)
if split_groups:
type_info = config.data_type
def type_symbol(term):
if isinstance(term, Field.Access) or isinstance(term, TypedSymbol):
return term
elif isinstance(term, sp.Symbol):
if isinstance(type_info, str) or not hasattr(type_info, '__getitem__'):
return TypedSymbol(term.name, create_type(type_info))
else:
return TypedSymbol(term.name, type_info[term.name])
else:
raise ValueError("Term has to be field access or symbol")
typed_split_groups = [[type_symbol(s) for s in split_group] for split_group in split_groups]
split_inner_loop(ast_node, typed_split_groups)
base_pointer_spec = config.base_pointer_specification
if base_pointer_spec is None:
base_pointer_spec = []
base_pointer_info = {field.name: parse_base_pointer_info(base_pointer_spec, loop_order,
field.spatial_dimensions, field.index_dimensions)
for field in fields_without_buffers}
buffer_base_pointer_info = {field.name: parse_base_pointer_info([['spatialInner0']], [0],
field.spatial_dimensions, field.index_dimensions)
for field in buffers}
base_pointer_info.update(buffer_base_pointer_info)
if any(FieldType.is_buffer(f) for f in all_fields):
resolve_buffer_accesses(ast_node, get_base_buffer_index(ast_node), read_only_fields)
# TODO think about typing
resolve_field_accesses(ast_node, read_only_fields, field_to_base_pointer_info=base_pointer_info)
move_constants_before_loop(ast_node)
return ast_node
def create_indexed_kernel(assignments: NodeCollection,
config: CreateKernelConfig) -> KernelFunction:
"""
Similar to :func:`create_kernel`, but here not all cells of a field are updated but only cells with
coordinates which are stored in an index field. This traversal method can e.g. be used for boundary handling.
The coordinates are stored in a separate index_field, which is a one dimensional array with struct data type.
This struct has to contain fields named 'x', 'y' and for 3D fields ('z'). These names are configurable with the
'coordinate_names' parameter. The struct can have also other fields that can be read and written in the kernel, for
example boundary parameters.
Args:
assignments: list of assignments
config: Kernel configuration
"""
function_name = config.function_name
index_fields = config.index_fields
coordinate_names = config.coordinate_names
fields_written = assignments.bound_fields
fields_read = assignments.rhs_fields
all_fields = fields_read.union(fields_written)
# extract the index fields based on the name. The original index field might have been modified
index_fields = [idx_field for idx_field in index_fields if idx_field.name in [f.name for f in all_fields]]
non_index_fields = [f for f in all_fields if f not in index_fields]
spatial_coordinates = {f.spatial_dimensions for f in non_index_fields}
assert len(spatial_coordinates) == 1, f"Non-index fields do not have the same number of spatial coordinates " \
f"Non index fields are {non_index_fields}, spatial coordinates are " \
f"{spatial_coordinates}"
spatial_coordinates = list(spatial_coordinates)[0]
assignments = assignments.all_assignments
assignments = add_types(assignments, config)
for index_field in index_fields:
index_field.field_type = FieldType.INDEXED
assert FieldType.is_indexed(index_field)
assert index_field.spatial_dimensions == 1, "Index fields have to be 1D"
def get_coordinate_symbol_assignment(name):
for idx_field in index_fields:
assert isinstance(idx_field.dtype, StructType), "Index fields have to have a struct data type"
data_type = idx_field.dtype
if data_type.has_element(name):
rhs = idx_field[0](name)
lhs = TypedSymbol(name, data_type.get_element_type(name))
return SympyAssignment(lhs, rhs)
raise ValueError(f"Index {name} not found in any of the passed index fields")
coordinate_symbol_assignments = [get_coordinate_symbol_assignment(n)
for n in coordinate_names[:spatial_coordinates]]
coordinate_typed_symbols = [eq.lhs for eq in coordinate_symbol_assignments]
assignments = coordinate_symbol_assignments + assignments
# make 1D loop over index fields
loop_body = Block([])
loop_node = LoopOverCoordinate(loop_body, coordinate_to_loop_over=0, start=0, stop=index_fields[0].shape[0])
for assignment in assignments:
loop_body.append(assignment)
function_body = Block([loop_node])
ast_node = KernelFunction(function_body, Target.CPU, Backend.C, make_python_function,
ghost_layers=None, function_name=function_name, assignments=assignments)
fixed_coordinate_mapping = {f.name: coordinate_typed_symbols for f in non_index_fields}
read_only_fields = set([f.name for f in fields_read - fields_written])
resolve_field_accesses(ast_node, read_only_fields, field_to_fixed_coordinates=fixed_coordinate_mapping)
move_constants_before_loop(ast_node)
return ast_node
def add_openmp(ast_node, schedule="static", num_threads=True, collapse=None, assume_single_outer_loop=True):
"""Parallelize the outer loop with OpenMP.
Args:
ast_node: abstract syntax tree created e.g. by :func:`create_kernel`
schedule: OpenMP scheduling policy e.g. 'static' or 'dynamic'
num_threads: explicitly specify number of threads
collapse: number of nested loops to include in parallel region (see OpenMP collapse)
assume_single_outer_loop: if True an exception is raised if multiple outer loops are detected for all but
optimized staggered kernels the single-outer-loop assumption should be true
"""
if not num_threads:
return
assert type(ast_node) is ast.KernelFunction
body = ast_node.body
threads_clause = "" if num_threads and isinstance(num_threads, bool) else f" num_threads({num_threads})"
wrapper_block = ast.PragmaBlock('#pragma omp parallel' + threads_clause, body.take_child_nodes())
body.append(wrapper_block)
outer_loops = [l for l in filtered_tree_iteration(body, LoopOverCoordinate, stop_type=SympyAssignment)
if l.is_outermost_loop]
assert outer_loops, "No outer loop found"
if assume_single_outer_loop and len(outer_loops) > 1:
raise ValueError("More than one outer loop found, only one outer loop expected")
for loop_to_parallelize in outer_loops:
try:
loop_range = int(loop_to_parallelize.stop - loop_to_parallelize.start)
except TypeError:
loop_range = None
if loop_range is not None and loop_range < num_threads and not collapse:
contained_loops = [l for l in loop_to_parallelize.body.args if isinstance(l, LoopOverCoordinate)]
if len(contained_loops) == 1:
contained_loop = contained_loops[0]
try:
contained_loop_range = int(contained_loop.stop - contained_loop.start)
if contained_loop_range > loop_range:
loop_to_parallelize = contained_loop
except TypeError:
pass
prefix = f"#pragma omp for schedule({schedule})"
if collapse:
prefix += f" collapse({collapse})"
loop_to_parallelize.prefix_lines.append(prefix)
def add_pragmas(ast_node, pragma_lines, nesting_depth=-1):
"""Prepends given pragma lines to all loops of specified nesting depth.
Args:
ast_node: pystencils abstract syntax tree
pragma_lines: Iterable of strings containing the pragma lines
nesting_depth: Nesting depth of the loops the pragmas should be applied to.
Outermost loop has depth 0.
A depth of -1 indicates the innermost loops.
"""
loop_nodes = iterate_loops_by_depth(ast_node, nesting_depth)
for n in loop_nodes:
n.prefix_lines += list(pragma_lines)
import os
import subprocess
def get_environment(version_specifier, arch='x64'):
"""Returns an environment dictionary, for activating the Visual Studio compiler.
Args:
version_specifier: either a version number, year number, 'auto' or 'latest' for automatic detection of latest
installed version or 'setuptools' for setuptools-based detection
arch: x86 or x64
"""
if version_specifier == 'setuptools':
return get_environment_from_setup_tools(arch)
elif '\\' in version_specifier:
vc_vars_path = find_vc_vars_all_via_filesystem_search(version_specifier)
return get_environment_from_vc_vars_file(vc_vars_path, arch)
else:
try:
if version_specifier in ('auto', 'latest'):
version_nr = find_latest_msvc_version_using_environment_variables()
else:
version_nr = normalize_msvc_version(version_specifier)
vc_vars_path = get_vc_vars_path_via_environment_variable(version_nr)
except ValueError:
vc_vars_path = find_vc_vars_all_via_filesystem_search("C:\\Program Files (x86)\\Microsoft Visual Studio")
if vc_vars_path is None:
vc_vars_path = find_vc_vars_all_via_filesystem_search("C:\\Program Files\\Microsoft Visual Studio")
if vc_vars_path is None:
raise ValueError("Visual Studio not found. Write path to VS folder in pystencils config")
return get_environment_from_vc_vars_file(vc_vars_path, arch)
def find_latest_msvc_version_using_environment_variables():
import re
# noinspection SpellCheckingInspection
regex = re.compile(r'VS(\d\d)\dCOMNTOOLS')
versions = []
for key, value in os.environ.items():
match = regex.match(key)
if match:
versions.append(int(match.group(1)))
if len(versions) == 0:
raise ValueError("Visual Studio not found.")
versions.sort()
return versions[-1]
def normalize_msvc_version(version):
"""
Takes version specifiers in the following form:
- year: 2012, 2013, 2015, either as int or string
- version numbers with or without dot i.e. 11.0 or 11
:return: integer version number
"""
if isinstance(version, str) and '.' in version:
version = version.split('.')[0]
version = int(version)
mapping = {
2015: 14,
2013: 12,
2012: 11
}
if version in mapping:
return mapping[version]
else:
return version
def get_environment_from_vc_vars_file(vc_vars_file, arch):
out = subprocess.check_output(
f'cmd /u /c "{vc_vars_file}" {arch} && set',
stderr=subprocess.STDOUT,
).decode('utf-16le', errors='replace')
env = {key.upper(): value for key, _, value in (line.partition('=') for line in out.splitlines()) if key and value}
return env
def get_vc_vars_path_via_environment_variable(version_nr):
# noinspection SpellCheckingInspection
environment_var_name = 'VS%d0COMNTOOLS' % (version_nr,)
vc_path = os.environ[environment_var_name]
path = os.path.join(vc_path, '..', '..', 'VC', 'vcvarsall.bat')
return os.path.abspath(path)
def get_environment_from_setup_tools(arch):
from setuptools.msvc import msvc14_get_vc_env
msvc_env = msvc14_get_vc_env(arch)
return {k.upper(): v for k, v in msvc_env.items()}
def find_vc_vars_all_via_filesystem_search(base_path):
matches = []
for root, dir_names, file_names in os.walk(base_path):
for filename in file_names:
if filename == 'vcvarsall.bat':
matches.append(os.path.join(root, filename))
matches.sort(reverse=True)
if matches:
return matches[0]
import warnings
from typing import Container, Union
import numpy as np
import sympy as sp
from sympy.logic.boolalg import BooleanFunction, BooleanAtom
import pystencils.astnodes as ast
from pystencils.backends.simd_instruction_sets import get_supported_instruction_sets, get_vector_instruction_set
from pystencils.typing import (BasicType, PointerType, TypedSymbol, VectorType, CastFunc, collate_types,
get_type_of_expression, VectorMemoryAccess)
from pystencils.functions import DivFunc
from pystencils.field import Field
from pystencils.integer_functions import modulo_ceil, modulo_floor
from pystencils.sympyextensions import fast_subs
from pystencils.transformations import cut_loop, filtered_tree_iteration, replace_inner_stride_with_one
# noinspection PyPep8Naming
class vec_any(sp.Function):
nargs = (1,)
# noinspection PyPep8Naming
class vec_all(sp.Function):
nargs = (1,)
class NontemporalFence(ast.Node):
def __init__(self):
super(NontemporalFence, self).__init__(parent=None)
@property
def symbols_defined(self):
return set()
@property
def undefined_symbols(self):
return set()
@property
def args(self):
return []
def __eq__(self, other):
return isinstance(other, NontemporalFence)
class CachelineSize(ast.Node):
symbol = sp.Symbol("_clsize")
mask_symbol = sp.Symbol("_clsize_mask")
last_symbol = sp.Symbol("_cl_lastvec")
def __init__(self):
super(CachelineSize, self).__init__(parent=None)
@property
def symbols_defined(self):
return {self.symbol, self.mask_symbol, self.last_symbol}
@property
def undefined_symbols(self):
return set()
@property
def args(self):
return []
def __eq__(self, other):
return isinstance(other, CachelineSize)
def __hash__(self):
return hash(self.symbol)
def vectorize(kernel_ast: ast.KernelFunction, instruction_set: str = 'best',
assume_aligned: bool = False, nontemporal: Union[bool, Container[Union[str, Field]]] = False,
assume_inner_stride_one: bool = False, assume_sufficient_line_padding: bool = True):
# TODO Vectorization Revamp we first introduce the remainder loop and then check if we can even vectorise.
# Maybe first copy the ast and return the copied version on failure
"""Explicit vectorization using SIMD vectorization via intrinsics.
Args:
kernel_ast: abstract syntax tree (KernelFunction node)
instruction_set: one of the supported vector instruction sets, currently ('sse', 'avx' and 'avx512')
assume_aligned: assume that the first inner cell of each line is aligned. If false, only unaligned-loads are
used. If true, some of the loads are assumed to be from aligned memory addresses.
For example if x is the fastest coordinate, the access to center can be fetched via an
aligned-load instruction, for the west or east accesses potentially slower unaligend-load
instructions have to be used.
nontemporal: a container of fields or field names for which nontemporal (streaming) stores are used.
If true, nontemporal access instructions are used for all fields.
assume_inner_stride_one: kernels with non-constant inner loop bound and strides can not be vectorized since
the inner loop stride is a runtime variable and thus might not be always 1.
If this parameter is set to true, the inner stride is assumed to be always one.
This has to be ensured at runtime!
assume_sufficient_line_padding: if True and assume_inner_stride_one, no tail loop is created but loop is
extended by at most (vector_width-1) elements
assumes that at the end of each line there is enough padding with dummy data
depending on the access pattern there might be additional padding
required at the end of the array
"""
if instruction_set == 'best':
if get_supported_instruction_sets():
instruction_set = get_supported_instruction_sets()[-1]
else:
instruction_set = 'avx'
if instruction_set is None:
return
all_fields = kernel_ast.fields_accessed
if nontemporal is None or nontemporal is False:
nontemporal = {}
elif nontemporal is True:
nontemporal = all_fields
if assume_inner_stride_one:
replace_inner_stride_with_one(kernel_ast)
field_float_dtypes = set(f.dtype for f in all_fields if f.dtype.is_float())
if len(field_float_dtypes) != 1:
raise NotImplementedError("Cannot vectorize kernels that contain accesses "
"to differently typed floating point fields")
float_size = field_float_dtypes.pop().numpy_dtype.itemsize
assert float_size in (8, 4)
default_float_type = 'float64' if float_size == 8 else 'float32'
vector_is = get_vector_instruction_set(default_float_type, instruction_set=instruction_set)
kernel_ast.instruction_set = vector_is
if nontemporal and 'cachelineZero' in vector_is:
kernel_ast.use_all_written_field_sizes = True
strided = 'storeS' in vector_is and 'loadS' in vector_is
keep_loop_stop = '{loop_stop}' in vector_is['storeA' if assume_aligned and 'storeA' in vector_is else 'storeU']
vectorize_inner_loops_and_adapt_load_stores(kernel_ast, assume_aligned, nontemporal,
strided, keep_loop_stop, assume_sufficient_line_padding,
default_float_type)
def vectorize_inner_loops_and_adapt_load_stores(ast_node, assume_aligned, nontemporal_fields,
strided, keep_loop_stop, assume_sufficient_line_padding,
default_float_type):
"""Goes over all innermost loops, changes increment to vector width and replaces field accesses by vector type."""
all_loops = list(filtered_tree_iteration(ast_node, ast.LoopOverCoordinate, stop_type=ast.SympyAssignment))
inner_loops = [loop for loop in all_loops if loop.is_innermost_loop]
zero_loop_counters = {loop.loop_counter_symbol: 0 for loop in all_loops}
vector_is = ast_node.instruction_set
assert vector_is, "The ast needs to hold information about the instruction_set for the vectorisation"
vector_width = vector_is['width']
vector_int_width = vector_is['intwidth']
for loop_node in inner_loops:
loop_range = loop_node.stop - loop_node.start
# cut off loop tail, that is not a multiple of four
if keep_loop_stop:
pass
elif assume_aligned and assume_sufficient_line_padding:
loop_range = loop_node.stop - loop_node.start
new_stop = loop_node.start + modulo_ceil(loop_range, vector_width)
loop_node.stop = new_stop
else:
cutting_point = modulo_floor(loop_range, vector_width) + loop_node.start
# TODO cut_loop calls deepcopy on the loop_node. This is bad as documented in cut_loop
loop_nodes = [loop for loop in cut_loop(loop_node, [cutting_point]).args
if isinstance(loop, ast.LoopOverCoordinate)]
assert len(loop_nodes) in (0, 1, 2) # 2 for main and tail loop, 1 if loop range divisible by vector width
if len(loop_nodes) == 0:
continue
loop_node = loop_nodes[0]
# loop_node is the vectorized one
# Find all array accesses (indexed) that depend on the loop counter as offset
loop_counter_symbol = ast.LoopOverCoordinate.get_loop_counter_symbol(loop_node.coordinate_to_loop_over)
substitutions = {}
successful = True
for indexed in loop_node.atoms(sp.Indexed):
base, index = indexed.args
if loop_counter_symbol in index.atoms(sp.Symbol):
if 'loadA' not in vector_is and 'storeA' not in vector_is and 'maskStoreA' not in vector_is:
# don't need to generate the alignment check when there are no aligned load/store instructions
aligned_access = False
else:
if not isinstance(vector_width, int):
raise NotImplementedError('Access alignment cannot be statically determined for sizeless '
'vector ISAs')
aligned_access = (index - loop_counter_symbol).subs(zero_loop_counters) % vector_width == 0
loop_counter_is_offset = loop_counter_symbol not in (index - loop_counter_symbol).atoms()
stride = sp.simplify(index.subs({loop_counter_symbol: loop_counter_symbol + 1}) - index)
if not loop_counter_is_offset and (not strided or loop_counter_symbol in stride.atoms()):
successful = False
break
typed_symbol = base.label
assert type(typed_symbol.dtype) is PointerType, f"Type of access is {typed_symbol.dtype}, {indexed}"
vec_type = VectorType(typed_symbol.dtype.base_type, vector_width)
use_aligned_access = aligned_access and assume_aligned
nontemporal = False
if hasattr(indexed, 'field'):
nontemporal = (indexed.field in nontemporal_fields) or (indexed.field.name in nontemporal_fields)
substitutions[indexed] = VectorMemoryAccess(indexed, vec_type, use_aligned_access, nontemporal, True,
stride if strided else 1)
if nontemporal:
# insert NontemporalFence after the outermost loop
parent = loop_node.parent
while type(parent.parent.parent) is not ast.KernelFunction:
parent = parent.parent
parent.parent.insert_after(NontemporalFence(), parent, if_not_exists=True)
# insert CachelineSize at the beginning of the kernel
parent.parent.insert_front(CachelineSize(), if_not_exists=True)
if not successful:
warnings.warn("Could not vectorize loop because of non-consecutive memory access")
continue
loop_node.step = vector_width
loop_node.subs(substitutions)
arg_1 = CastFunc(loop_counter_symbol, VectorType(loop_counter_symbol.dtype, vector_int_width))
arg_2 = CastFunc(tuple(range(vector_int_width if type(vector_int_width) is int else 2)),
VectorType(loop_counter_symbol.dtype, vector_int_width))
vector_loop_counter = arg_1 + arg_2
fast_subs(loop_node, {loop_counter_symbol: vector_loop_counter},
skip=lambda e: isinstance(e, ast.ResolvedFieldAccess) or isinstance(e, VectorMemoryAccess))
mask_conditionals(loop_node)
from pystencils.rng import RNGBase
substitutions = {}
for rng in loop_node.atoms(RNGBase):
new_result_symbols = [TypedSymbol(s.name, VectorType(s.dtype, width=vector_width))
for s in rng.result_symbols]
substitutions.update({s[0]: s[1] for s in zip(rng.result_symbols, new_result_symbols)})
rng._symbols_defined = set(new_result_symbols)
fast_subs(loop_node, substitutions, skip=lambda e: isinstance(e, RNGBase))
insert_vector_casts(loop_node, vector_is, default_float_type)
def mask_conditionals(loop_body):
def visit_node(node, mask):
if isinstance(node, ast.Conditional):
cond = node.condition_expr
skip = (loop_body.loop_counter_symbol not in cond.atoms(sp.Symbol)) or cond.func in (vec_all, vec_any)
cond = True if skip else cond
true_mask = sp.And(cond, mask)
visit_node(node.true_block, true_mask)
if node.false_block:
false_mask = sp.And(sp.Not(node.condition_expr), mask)
visit_node(node, false_mask)
if not skip:
node.condition_expr = vec_any(node.condition_expr)
elif isinstance(node, ast.SympyAssignment):
if mask is not True:
s = {ma: VectorMemoryAccess(*ma.args[0:4], sp.And(mask, ma.args[4]), *ma.args[5:])
for ma in node.atoms(VectorMemoryAccess)}
node.subs(s)
else:
for arg in node.args:
visit_node(arg, mask)
visit_node(loop_body, mask=True)
def insert_vector_casts(ast_node, instruction_set, default_float_type='double'):
"""Inserts necessary casts from scalar values to vector values."""
handled_functions = (sp.Add, sp.Mul, vec_any, vec_all, DivFunc, sp.Abs)
def is_scalar(expr) -> bool:
if hasattr(expr, "dtype"):
if type(expr.dtype) is VectorType:
return False
# Else branch: If expr is a CastFunc, then whether the expression
# is scalar is determined by the argument (remember: vector casts
# are not inserted yet). Therefore, we must recurse into the args of
# expr below. Otherwise, this expression is atomic and in that case
# it is assumed to be scalar below.
if isinstance(expr, ast.ResolvedFieldAccess):
# expr.field is not in expr.args
return is_scalar(expr.field)
elif isinstance(expr, (vec_any, vec_all)):
return True
if not hasattr(expr, "args"):
return True
return all(is_scalar(arg) for arg in expr.args)
# TODO Vectorization Revamp: get rid of default_type
def visit_expr(expr, default_type='double', force_vectorize=False):
if isinstance(expr, VectorMemoryAccess):
return VectorMemoryAccess(*expr.args[0:4], visit_expr(expr.args[4], default_type, force_vectorize),
*expr.args[5:])
elif isinstance(expr, CastFunc):
cast_type = expr.args[1]
arg = visit_expr(expr.args[0], default_type, force_vectorize)
assert cast_type in [BasicType('float32'), BasicType('float64')], \
f'Vectorization cannot vectorize type {cast_type}'
return expr.func(arg, VectorType(cast_type, instruction_set['width']))
elif expr.func is sp.Abs and 'abs' not in instruction_set:
new_arg = visit_expr(expr.args[0], default_type, force_vectorize)
base_type = get_type_of_expression(expr.args[0]).base_type if type(expr.args[0]) is VectorMemoryAccess \
else get_type_of_expression(expr.args[0])
pw = sp.Piecewise((-new_arg, new_arg < CastFunc(0, base_type.numpy_dtype)),
(new_arg, True))
return visit_expr(pw, default_type, force_vectorize)
elif expr.func in handled_functions or isinstance(expr, sp.Rel) or isinstance(expr, BooleanFunction):
if expr.func is sp.Mul and expr.args[0] == -1:
# special treatment for the unary minus: make sure that the -1 has the same type as the argument
dtype = int
for arg in expr.atoms(VectorMemoryAccess):
if arg.dtype.base_type.is_float():
dtype = arg.dtype.base_type.numpy_dtype.type
for arg in expr.atoms(TypedSymbol):
if type(arg.dtype) is VectorType and arg.dtype.base_type.is_float():
dtype = arg.dtype.base_type.numpy_dtype.type
if dtype is not int:
if dtype is np.float32:
default_type = 'float'
expr = sp.Mul(dtype(expr.args[0]), *expr.args[1:])
new_args = [visit_expr(a, default_type, force_vectorize) for a in expr.args]
arg_types = [get_type_of_expression(a, default_float_type=default_type) for a in new_args]
if not any(type(t) is VectorType for t in arg_types):
return expr
else:
target_type = collate_types(arg_types)
casted_args = [
CastFunc(a, target_type) if t != target_type and not isinstance(a, VectorMemoryAccess) else a
for a, t in zip(new_args, arg_types)]
return expr.func(*casted_args)
elif expr.func is sp.UnevaluatedExpr:
assert expr.args[0].is_Pow or expr.args[0].is_Mul, "UnevaluatedExpr only implemented holding Mul or Pow"
# TODO this is only because cut_loop evaluates the multiplications again due to deepcopy. All this should
# TODO be fixed for real at some point.
if expr.args[0].is_Pow:
base = expr.args[0].base
exp = expr.args[0].exp
expr = sp.UnevaluatedExpr(sp.Mul(*([base] * +exp), evaluate=False))
new_args = [visit_expr(a, default_type, force_vectorize) for a in expr.args[0].args]
arg_types = [get_type_of_expression(a, default_float_type=default_type) for a in new_args]
target_type = collate_types(arg_types)
if not any(type(t) is VectorType for t in arg_types):
target_type = VectorType(target_type, instruction_set['width'])
casted_args = [
CastFunc(a, target_type) if t != target_type and not isinstance(a, VectorMemoryAccess) else a
for a, t in zip(new_args, arg_types)]
return expr.func(expr.args[0].func(*casted_args, evaluate=False))
elif expr.func is sp.Pow:
new_arg = visit_expr(expr.args[0], default_type, force_vectorize)
return expr.func(new_arg, expr.args[1])
elif expr.func == sp.Piecewise:
new_results = [visit_expr(a[0], default_type, force_vectorize) for a in expr.args]
new_conditions = [visit_expr(a[1], default_type, force_vectorize) for a in expr.args]
types_of_results = [get_type_of_expression(a) for a in new_results]
types_of_conditions = [get_type_of_expression(a) for a in new_conditions]
result_target_type = get_type_of_expression(expr)
condition_target_type = collate_types(types_of_conditions)
if type(condition_target_type) is VectorType and type(result_target_type) is not VectorType:
result_target_type = VectorType(result_target_type, width=condition_target_type.width)
if type(condition_target_type) is not VectorType and type(result_target_type) is VectorType:
condition_target_type = VectorType(condition_target_type, width=result_target_type.width)
casted_results = [CastFunc(a, result_target_type) if t != result_target_type else a
for a, t in zip(new_results, types_of_results)]
casted_conditions = [CastFunc(a, condition_target_type)
if t != condition_target_type and a is not True else a
for a, t in zip(new_conditions, types_of_conditions)]
return sp.Piecewise(*[(r, c) for r, c in zip(casted_results, casted_conditions)])
elif isinstance(expr, TypedSymbol):
if force_vectorize:
expr_type = get_type_of_expression(expr)
if type(expr_type) is not VectorType:
vector_type = VectorType(expr_type, instruction_set['width'])
return CastFunc(expr, vector_type)
return expr
elif isinstance(expr, (sp.Number, BooleanAtom)):
return expr
else:
raise NotImplementedError(f'Due to defensive programming we handle only specific expressions.\n'
f'The expression {expr} of type {type(expr)} is not known yet.')
def visit_node(node, substitution_dict, default_type='double'):
substitution_dict = substitution_dict.copy()
for arg in node.args:
if isinstance(arg, ast.SympyAssignment):
assignment = arg
# If there is a remainder loop we do not vectorise it, thus lhs will indicate this
# if isinstance(assignment.lhs, ast.ResolvedFieldAccess):
# continue
subs_expr = fast_subs(assignment.rhs, substitution_dict,
skip=lambda e: isinstance(e, ast.ResolvedFieldAccess))
# If either side contains a vectorized subexpression, both sides
# must be fully vectorized.
lhs_scalar = is_scalar(assignment.lhs)
rhs_scalar = is_scalar(subs_expr)
assignment.rhs = visit_expr(subs_expr, default_type, force_vectorize=not (lhs_scalar and rhs_scalar))
if isinstance(assignment.lhs, TypedSymbol):
if lhs_scalar and not rhs_scalar:
lhs_type = get_type_of_expression(assignment.lhs)
rhs_type = get_type_of_expression(assignment.rhs)
new_lhs_type = VectorType(lhs_type, rhs_type.width)
new_lhs = TypedSymbol(assignment.lhs.name, new_lhs_type)
substitution_dict[assignment.lhs] = new_lhs
assignment.lhs = new_lhs
elif isinstance(assignment.lhs, VectorMemoryAccess):
assignment.lhs = visit_expr(assignment.lhs, default_type)
elif isinstance(arg, ast.Conditional):
arg.condition_expr = fast_subs(arg.condition_expr, substitution_dict,
skip=lambda e: isinstance(e, ast.ResolvedFieldAccess))
arg.condition_expr = visit_expr(arg.condition_expr, default_type)
visit_node(arg, substitution_dict, default_type)
else:
visit_node(arg, substitution_dict, default_type)
visit_node(ast_node, {}, default_float_type)
import warnings
from typing import Tuple, Union
from .datahandling_interface import DataHandling
from ..enums import Target
from .serial_datahandling import SerialDataHandling
try:
# noinspection PyPep8Naming
import waLBerla as wlb
if wlb.cpp_available:
from pystencils.datahandling.parallel_datahandling import ParallelDataHandling
else:
wlb = None
except ImportError:
wlb = None
ParallelDataHandling = None
def create_data_handling(domain_size: Tuple[int, ...],
periodicity: Union[bool, Tuple[bool, ...]] = False,
default_layout: str = 'SoA',
default_target: Target = Target.CPU,
parallel: bool = False,
default_ghost_layers: int = 1,
device_number: Union[int, None] = None) -> DataHandling:
"""Creates a data handling instance.
Args:
domain_size: size of the rectangular domain
periodicity: either True, False for full or no periodicity or a tuple of booleans indicating periodicity
for each coordinate
default_layout: default array layout, that is used if not explicitly specified in 'add_array'
default_target: `Target`
parallel: if True a parallel domain is created using walberla - each MPI process gets a part of the domain
default_ghost_layers: default number of ghost layers if not overwritten in 'add_array'
device_number: If `default_target` is set to 'GPU' and `parallel` is False, a device number should be
specified. If none is given, the device with the largest amount of memory is used. If multiple
devices have the same amount of memory, the one with the lower number is used
"""
if isinstance(default_target, str):
new_target = Target[default_target.upper()]
warnings.warn(f'Target "{default_target}" as str is deprecated. Use {new_target} instead',
category=DeprecationWarning)
default_target = new_target
if parallel:
if wlb is None:
raise ValueError("Cannot create parallel data handling because walberla module is not available")
if periodicity is False or periodicity is None:
periodicity = (0, 0, 0)
elif periodicity is True:
periodicity = (1, 1, 1)
else:
periodicity = tuple(int(bool(x)) for x in periodicity)
if len(periodicity) == 2:
periodicity += (1,)
if len(domain_size) == 2:
dim = 2
domain_size = (domain_size[0], domain_size[1], 1)
else:
dim = 3
# noinspection PyArgumentList
block_storage = wlb.createUniformBlockGrid(cells=domain_size, periodic=periodicity)
return ParallelDataHandling(blocks=block_storage, dim=dim, default_target=default_target,
default_layout=default_layout, default_ghost_layers=default_ghost_layers)
else:
return SerialDataHandling(domain_size,
periodicity=periodicity,
default_target=default_target,
default_layout=default_layout,
default_ghost_layers=default_ghost_layers,
device_number=device_number)
__all__ = ['create_data_handling']
"""
This module contains function that simplify the iteration over walberla's distributed data structure.
These function simplify the iteration over rectangular slices, managing the mapping between block local coordinates and
global coordinates.
"""
import numpy as np
from pystencils.datahandling.datahandling_interface import Block
from pystencils.slicing import normalize_slice
try:
# noinspection PyPep8Naming
import waLBerla as wlb
except ImportError:
wlb = None
def block_iteration(blocks, ghost_layers, dim=3, access_prefix=''):
"""Simple iteration over parallel walberla domain.
Iterator that simplifies the access to field data by automatically converting from walberla fields to
numpy arrays
Args:
blocks: walberla block data structure
ghost_layers: how many ghost layers to include (outer and inner)
dim: walberla's block data structure is 3D - 2D domains can be done by setting z_size=1
if dim=2 is set here, the third coordinate of the returned fields is accessed at z=0 automatically
access_prefix: see documentation of sliced_block_iteration
"""
for block in blocks:
cell_interval = blocks.getBlockCellBB(block)
cell_interval.expand(ghost_layers)
local_slice = [slice(0, w, None) for w in cell_interval.size]
if dim == 2:
local_slice[2] = ghost_layers
yield ParallelBlock(block, cell_interval.min[:dim], tuple(local_slice), ghost_layers, access_prefix)
def sliced_block_iteration(blocks, slice_obj=None, inner_ghost_layers=1, outer_ghost_layers=1, dim=3, access_prefix=''):
"""Iterates of all blocks that have an intersection with the given slice object.
For intersection blocks a Block object is yielded
Args:
blocks: walberla block data structure
slice_obj: a slice (i.e. rectangular sub-region), can be created with make_slice[]
inner_ghost_layers: how many ghost layers are included in the local slice and the optional index arrays
outer_ghost_layers: slices can have relative coordinates e.g. make_slice[0.2, :, :]
when computing absolute values, the domain size is needed. This parameter
specifies how many ghost layers are taken into account for this operation.
dim: set to 2 for pseudo 2D simulation (i.e. where z coordinate of blocks has extent 1)
the arrays returned when indexing the block
access_prefix: when accessing block data, this prefix is prepended to the access name
mostly used to switch between CPU and GPU field access (gpu fields are added with a
certain prefix 'gpu_')
Example:
assume no slice is given, then slice_normalization_ghost_layers effectively sets how much ghost layers at the
border of the domain are included. The inner_ghost_layers parameter specifies how many inner ghost layers are
included
"""
if slice_obj is None:
slice_obj = tuple([slice(None, None, None)] * dim)
if dim == 2:
slice_obj += (inner_ghost_layers,)
domain_cell_bb = blocks.getDomainCellBB()
domain_extent = [s + 2 * outer_ghost_layers for s in domain_cell_bb.size]
slice_obj = normalize_slice(slice_obj, domain_extent)
target_cell_bb = wlb.CellInterval.fromSlice(slice_obj)
target_cell_bb.shift(*[a - outer_ghost_layers for a in domain_cell_bb.min])
for block in blocks:
intersection = blocks.getBlockCellBB(block).getExpanded(inner_ghost_layers)
intersection.intersect(target_cell_bb)
if intersection.empty():
continue
local_target_bb = blocks.transformGlobalToLocal(block, intersection)
local_target_bb.shift(inner_ghost_layers, inner_ghost_layers, inner_ghost_layers)
local_slice = local_target_bb.toSlice(False)
if dim == 2:
local_slice = (local_slice[0], local_slice[1], inner_ghost_layers)
yield ParallelBlock(block, intersection.min[:dim], local_slice, inner_ghost_layers, access_prefix)
# ----------------------------- Implementation details -----------------------------------------------------------------
class SerialBlock(Block):
"""Simple mock-up block that is used for SerialDataHandling."""
def __init__(self, field_dict, offset, local_slice):
super(SerialBlock, self).__init__(offset, local_slice)
self._fieldDict = field_dict
def __getitem__(self, data_name):
result = self._fieldDict[data_name]
if isinstance(result, np.ndarray):
result = result[self._localSlice]
return result
class ParallelBlock(Block):
def __init__(self, block, offset, local_slice, inner_ghost_layers, name_prefix):
super(ParallelBlock, self).__init__(offset, local_slice)
self._block = block
self._gls = inner_ghost_layers
self._name_prefix = name_prefix
def __getitem__(self, data_name):
result = self._block[self._name_prefix + data_name]
type_name = type(result).__name__
if 'GhostLayerField' in type_name:
result = wlb.field.toArray(result, with_ghost_layers=self._gls)
result = self._normalize_array_shape(result)
elif 'GpuField' in type_name:
result = wlb.gpu.toGpuArray(result, with_ghost_layers=self._gls)
result = self._normalize_array_shape(result)
return result
def _normalize_array_shape(self, arr):
if arr.shape[-1] == 1 and len(arr.shape) == 4:
arr = arr[..., 0]
return arr[self._localSlice]
from abc import ABC, abstractmethod
from typing import Callable, Dict, Iterable, Optional, Sequence, Tuple, Union
import numpy as np
from pystencils.enums import Target, Backend
from pystencils.field import Field, FieldType
class DataHandling(ABC):
"""
Manages the storage of arrays and maps them to a symbolic field.
Two versions are available: a simple, pure Python implementation for single node
simulations :py:class:SerialDataHandling and a distributed version using walberla in :py:class:ParallelDataHandling
Keep in mind that the data can be distributed, so use the 'access' method whenever possible and avoid the
'gather' function that has collects (parts of the) distributed data on a single process.
"""
_GPU_LIKE_TARGETS = [Target.GPU]
_GPU_LIKE_BACKENDS = [Backend.CUDA]
# ---------------------------- Adding and accessing data -----------------------------------------------------------
@property
@abstractmethod
def default_target(self) -> Target:
"""Target Enum indicating the target of the computation"""
@property
@abstractmethod
def dim(self) -> int:
"""Dimension of the domain, either 2 or 3"""
@property
@abstractmethod
def shape(self) -> Tuple[int, ...]:
"""Shape of outer bounding box."""
@property
@abstractmethod
def periodicity(self) -> Tuple[bool, ...]:
"""Returns tuple of booleans for x,y,(z) directions with True if domain is periodic in that direction."""
@abstractmethod
def add_array(self, name: str, values_per_cell, dtype=np.float64,
latex_name: Optional[str] = None, ghost_layers: Optional[int] = None, layout: Optional[str] = None,
cpu: bool = True, gpu: Optional[bool] = None, alignment=False, field_type=FieldType.GENERIC) -> Field:
"""Adds a (possibly distributed) array to the handling that can be accessed using the given name.
For each array a symbolic field is available via the 'fields' dictionary
Args:
name: unique name that is used to access the field later
values_per_cell: shape of the dim+1 coordinate. DataHandling supports zero or one index dimensions,
i.e. scalar fields and vector fields. This parameter gives the shape of the index
dimensions. The default value of 1 means no index dimension are created.
dtype: data type of the array as numpy data type
latex_name: optional, name of the symbolic field, if not given 'name' is used
ghost_layers: number of ghost layers - if not specified a default value specified in the constructor
is used
layout: memory layout of array, either structure of arrays 'SoA' or array of structures 'AoS'.
this is only important if values_per_cell > 1
cpu: allocate field on the CPU
gpu: allocate field on the GPU, if None, a GPU field is allocated if default_target is 'GPU'
alignment: either False for no alignment, or the number of bytes to align to
Returns:
pystencils field, that can be used to formulate symbolic kernels
"""
def add_arrays(self,
description: str,
dtype=np.float64,
ghost_layers: Optional[int] = None,
layout: Optional[str] = None,
cpu: bool = True,
gpu: Optional[bool] = None,
alignment=False,
field_type=FieldType.GENERIC) -> Tuple[Field]:
"""Adds multiple arrays using a string description similar to :func:`pystencils.fields`
>>> from pystencils.datahandling import create_data_handling
>>> dh = create_data_handling((20, 30))
>>> x, y =dh.add_arrays('x, y(9)')
>>> print(dh.fields)
{'x': x: double[22,32], 'y': y(9): double[22,32]}
>>> assert x == dh.fields['x']
>>> assert dh.fields['x'].shape == (22, 32)
>>> assert dh.fields['y'].index_shape == (9,)
Args:
description (str): String description of the fields to add
dtype: data type of the array as numpy data type
ghost_layers: number of ghost layers - if not specified a default value specified in the constructor
is used
layout: memory layout of array, either structure of arrays 'SoA' or array of structures 'AoS'.
this is only important if values_per_cell > 1
cpu: allocate field on the CPU
gpu: allocate field on the GPU, if None, a GPU field is allocated if default_target is 'GPU'
alignment: either False for no alignment, or the number of bytes to align to
Returns:
Fields representing the just created arrays
"""
from pystencils.field import _parse_part1
names = []
for name, indices in _parse_part1(description):
names.append(name)
self.add_array(name,
values_per_cell=indices,
dtype=dtype,
ghost_layers=ghost_layers,
layout=layout,
cpu=cpu,
gpu=gpu,
alignment=alignment,
field_type=field_type)
return (self.fields[n] for n in names)
@abstractmethod
def has_data(self, name):
"""Returns true if a field or custom data element with this name was added."""
@abstractmethod
def add_array_like(self, name, name_of_template_field, latex_name=None, cpu=True, gpu=None):
"""
Adds an array with the same parameters (number of ghost layers, values_per_cell, dtype) as existing array.
Args:
name: name of new array
name_of_template_field: name of array that is used as template
latex_name: see 'add' method
cpu: see 'add' method
gpu: see 'add' method
"""
@abstractmethod
def add_custom_data(self, name: str, cpu_creation_function,
gpu_creation_function=None, cpu_to_gpu_transfer_func=None, gpu_to_cpu_transfer_func=None):
"""Adds custom (non-array) data to domain.
Args:
name: name to access data
cpu_creation_function: function returning a new instance of the data that should be stored
gpu_creation_function: optional, function returning a new instance, stored on GPU
cpu_to_gpu_transfer_func: function that transfers cpu to gpu version,
getting two parameters (gpu_instance, cpu_instance)
gpu_to_cpu_transfer_func: function that transfers gpu to cpu version, getting two parameters
(gpu_instance, cpu_instance)
"""
def add_custom_class(self, name: str, class_obj, cpu: bool = True, gpu: bool = False):
"""Adds non-array data by passing a class object with optional 'to_gpu' and 'to_cpu' member functions."""
cpu_to_gpu_transfer_func = class_obj.to_gpu if cpu and gpu and hasattr(class_obj, 'to_gpu') else None
gpu_to_cpu_transfer_func = class_obj.to_cpu if cpu and gpu and hasattr(class_obj, 'to_cpu') else None
self.add_custom_data(name,
cpu_creation_function=class_obj if cpu else None,
gpu_creation_function=class_obj if gpu else None,
cpu_to_gpu_transfer_func=cpu_to_gpu_transfer_func,
gpu_to_cpu_transfer_func=gpu_to_cpu_transfer_func)
@property
@abstractmethod
def fields(self) -> Dict[str, Field]:
"""Dictionary mapping data name to symbolic pystencils field - use this to create pystencils kernels."""
@property
@abstractmethod
def array_names(self) -> Sequence[str]:
"""Sequence of all array names."""
@property
@abstractmethod
def custom_data_names(self) -> Sequence[str]:
"""Sequence of all custom data names."""
@abstractmethod
def ghost_layers_of_field(self, name: str) -> int:
"""Returns the number of ghost layers for a specific field/array."""
@abstractmethod
def values_per_cell(self, name: str) -> Tuple[int, ...]:
"""Returns values_per_cell of array."""
@abstractmethod
def iterate(self, slice_obj=None, gpu=False, ghost_layers=None,
inner_ghost_layers=True) -> Iterable['Block']:
"""Iterate over local part of potentially distributed data structure."""
@abstractmethod
def gather_array(self, name, slice_obj=None, all_gather=False, ghost_layers=False) -> Optional[np.ndarray]:
"""
Gathers part of the domain on a local process. Whenever possible use 'access' instead, since this method copies
the distributed data to a single process which is inefficient and may exhaust the available memory
Args:
name: name of the array to gather
slice_obj: slice expression of the rectangular sub-part that should be gathered
all_gather: if False only the root process receives the result, if True all processes
ghost_layers: number of outer ghost layers to include (only available for serial version of data handling)
Returns:
gathered field that does not include any ghost layers, or None if gathered on another process
"""
@abstractmethod
def run_kernel(self, kernel_function, *args, **kwargs) -> None:
"""Runs a compiled pystencils kernel.
Uses the arrays stored in the DataHandling class for all array parameters. Additional passed arguments are
directly passed to the kernel function and override possible parameters from the DataHandling
"""
@abstractmethod
def get_kernel_kwargs(self, kernel_function, **kwargs):
"""Returns the input arguments of a kernel"""
@abstractmethod
def swap(self, name1, name2, gpu=False):
"""Swaps data of two arrays"""
# ------------------------------- CPU/GPU transfer -----------------------------------------------------------------
@abstractmethod
def to_cpu(self, name):
"""Copies GPU data of array with specified name to CPU.
Works only if 'cpu=True' and 'gpu=True' has been used in 'add' method."""
@abstractmethod
def to_gpu(self, name):
"""Copies GPU data of array with specified name to GPU.
Works only if 'cpu=True' and 'gpu=True' has been used in 'add' method."""
@abstractmethod
def all_to_cpu(self):
"""Copies data from GPU to CPU for all arrays that have a CPU and a GPU representation."""
@abstractmethod
def all_to_gpu(self):
"""Copies data from CPU to GPU for all arrays that have a CPU and a GPU representation."""
@abstractmethod
def is_on_gpu(self, name):
"""Checks if this data was also allocated on the GPU - does not check if this data item is in synced."""
@abstractmethod
def create_vtk_writer(self, file_name, data_names, ghost_layers=False) -> Callable[[int], None]:
"""VTK output for one or multiple arrays.
Args
file_name: base file name without extension for the VTK output
data_names: list of array names that should be included in the vtk output
ghost_layers: true if ghost layer information should be written out as well
Returns:
a function that can be called with an integer time step to write the current state
i.e create_vtk_writer('some_file', ['velocity', 'density']) (1)
"""
@abstractmethod
def create_vtk_writer_for_flag_array(self, file_name, data_name, masks_to_name,
ghost_layers=False) -> Callable[[int], None]:
"""VTK output for an unsigned integer field, where bits are interpreted as flags.
Args:
file_name: see create_vtk_writer
data_name: name of an array with uint type
masks_to_name: dictionary mapping integer masks to a name in the output
ghost_layers: see create_vtk_writer
Returns:
functor that can be called with time step
"""
# ------------------------------- Communication --------------------------------------------------------------------
@abstractmethod
def synchronization_function(self, names, stencil=None, target=None, **kwargs) -> Callable[[], None]:
"""Synchronizes ghost layers for distributed arrays.
For serial scenario this has to be called for correct periodicity handling
Args:
names: what data to synchronize: name of array or sequence of names
stencil: stencil as string defining which neighbors are synchronized e.g. 'D2Q9', 'D3Q19'
if None, a full synchronization (i.e. D2Q9 or D3Q27) is done
target: `Target` either 'CPU' or 'GPU'
kwargs: implementation specific, optional optimization parameters for communication
Returns:
function object to run the communication
"""
def reduce_float_sequence(self, sequence, operation, all_reduce=False) -> np.array:
"""Takes a sequence of floating point values on each process and reduces it element-wise.
If all_reduce, all processes get the result, otherwise only the root process.
Possible operations are 'sum', 'min', 'max'
"""
def reduce_int_sequence(self, sequence, operation, all_reduce=False) -> np.array:
"""See function reduce_float_sequence - this is the same for integers"""
# ------------------------------- Data access and modification -----------------------------------------------------
def fill(self, array_name: str, val, value_idx: Optional[Union[int, Tuple[int, ...]]] = None,
slice_obj=None, ghost_layers=False, inner_ghost_layers=False) -> None:
"""Sets all cells to the same value.
Args:
array_name: name of the array that should be modified
val: value to set the array to
value_idx: If an array stores multiple values per cell, this index chooses which of this values to fill.
If None, all values are set
slice_obj: if passed, only the defined slice is filled
ghost_layers: True if the outer ghost layers should also be filled
inner_ghost_layers: True if the inner ghost layers should be filled. Inner ghost layers occur only in
parallel setups for distributed memory communication.
"""
if ghost_layers is True:
ghost_layers = self.ghost_layers_of_field(array_name)
if inner_ghost_layers is True:
ghost_layers = self.ghost_layers_of_field(array_name)
for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
if value_idx is not None:
if isinstance(value_idx, int):
value_idx = (value_idx,)
assert len(value_idx) == len(self.values_per_cell(array_name))
b[array_name][(Ellipsis, *value_idx)].fill(val)
else:
b[array_name].fill(val)
self.to_gpu(array_name)
def min(self, array_name, slice_obj=None, ghost_layers=False, inner_ghost_layers=False, reduce=True):
"""Returns the minimum value inside the domain or slice of the domain.
For meaning of arguments see documentation of :func:`DataHandling.fill`.
Returns:
the minimum of the locally stored domain part is returned if reduce is False, otherwise the global minimum
on the root process, on other processes None
"""
result = None
if ghost_layers is True:
ghost_layers = self.ghost_layers_of_field(array_name)
if inner_ghost_layers is True:
ghost_layers = self.ghost_layers_of_field(array_name)
for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
m = np.min(b[array_name])
result = m if result is None else np.min(result, m)
return self.reduce_float_sequence([result], 'min', all_reduce=True)[0] if reduce else result
def max(self, array_name, slice_obj=None, ghost_layers=False, inner_ghost_layers=False, reduce=True):
"""Returns the maximum value inside the domain or slice of the domain.
For argument description see :func:`DataHandling.min`
"""
result = None
if ghost_layers is True:
ghost_layers = self.ghost_layers_of_field(array_name)
if inner_ghost_layers is True:
ghost_layers = self.ghost_layers_of_field(array_name)
for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
m = np.max(b[array_name])
result = m if result is None else np.max(result, m)
return self.reduce_float_sequence([result], 'max', all_reduce=True)[0] if reduce else result
def save_all(self, file):
"""Saves all field data to disk into a file"""
def load_all(self, file):
"""Loads all field data from disk into a file
Works only if save_all was called with exactly the same field sizes, layouts etc.
When run in parallel save and load has to be called with the same number of processes.
Use for check pointing only - to store results use VTK output
"""
def __str__(self):
result = ""
first_column_width = max(len("Name"), max(len(a) for a in self.array_names))
row_format = "{:>%d}|{:>21}|{:>21}\n" % (first_column_width,)
separator_line = "-" * (first_column_width + 21 + 21 + 2) + "\n"
result += row_format.format("Name", "Inner (min/max)", "WithGl (min/max)")
result += separator_line
for arr_name in sorted(self.array_names):
inner_min_max = (self.min(arr_name, ghost_layers=False), self.max(arr_name, ghost_layers=False))
with_gl_min_max = (self.min(arr_name, ghost_layers=True), self.max(arr_name, ghost_layers=True))
inner_min_max = "({0[0]:3.3g},{0[1]:3.3g})".format(inner_min_max)
with_gl_min_max = "({0[0]:3.3g},{0[1]:3.3g})".format(with_gl_min_max)
result += row_format.format(arr_name, inner_min_max, with_gl_min_max)
return result
def log(self, *args, level='INFO'):
"""Similar to print with additional information (time, rank)."""
def log_on_root(self, *args, level='INFO'):
"""Logs only on root process. For serial setups this is equivalent to log"""
@property
def is_root(self):
"""Returns True for exactly one process in the simulation"""
@property
def world_rank(self):
"""Number of current process"""
class Block:
"""Represents locally stored part of domain.
Instances of this class are returned by DataHandling.iterate, do not create manually!
"""
def __init__(self, offset, local_slice):
self._offset = offset
self._localSlice = local_slice
@property
def offset(self):
"""Offset of the current block in global coordinates (where lower ghost layers have negative indices)."""
return self._offset
@property
def cell_index_arrays(self):
"""Global coordinate mesh-grid of cell coordinates.
Cell indices start at 0 at the first inner cell, lower ghost layers have negative indices
"""
mesh_grid_params = [offset + np.arange(width, dtype=np.int32)
for offset, width in zip(self.offset, self.shape)]
return np.meshgrid(*mesh_grid_params, indexing='ij', copy=False)
@property
def midpoint_arrays(self):
"""Global coordinate mesh-grid of cell midpoints which are shifted by 0.5 compared to cell indices."""
mesh_grid_params = [offset + 0.5 + np.arange(width, dtype=float)
for offset, width in zip(self.offset, self.shape)]
return np.meshgrid(*mesh_grid_params, indexing='ij', copy=False)
@property
def shape(self):
"""Shape of the fields (potentially including ghost layers)."""
return tuple(s.stop - s.start for s in self._localSlice[:len(self._offset)])
@property
def global_slice(self):
"""Slice in global coordinates."""
return tuple(slice(off, off + size) for off, size in zip(self._offset, self.shape))
def __getitem__(self, data_name: str) -> np.ndarray:
raise NotImplementedError()
import os
import warnings
import numpy as np
# noinspection PyPep8Naming
import waLBerla as wlb
from pystencils.datahandling.blockiteration import block_iteration, sliced_block_iteration
from pystencils.datahandling.datahandling_interface import DataHandling
from pystencils.enums import Backend
from pystencils.field import Field, FieldType
from pystencils.typing.typed_sympy import FieldPointerSymbol
from pystencils.utils import DotDict
from pystencils import Target
class ParallelDataHandling(DataHandling):
GPU_DATA_PREFIX = "gpu_"
VTK_COUNTER = 0
def __init__(self, blocks, default_ghost_layers=1, default_layout='SoA', dim=3, default_target=Target.CPU):
"""
Creates data handling based on walberla block storage
Args:
blocks: walberla block storage
default_ghost_layers: nr of ghost layers used if not specified in add() method
default_layout: layout used if no layout is given to add() method
dim: dimension of scenario,
walberla always uses three dimensions, so if dim=2 the extend of the
z coordinate of blocks has to be 1
default_target: `Target`, either 'CPU' or 'GPU' . If set to 'GPU' for each array also a GPU version is
allocated if not overwritten in add_array, and synchronization functions are for the GPU by
default
"""
super(ParallelDataHandling, self).__init__()
assert dim in (2, 3)
self._blocks = blocks
self._default_ghost_layers = default_ghost_layers
self._default_layout = default_layout
self._fields = DotDict() # maps name to symbolic pystencils field
self._field_name_to_cpu_data_name = {}
self._field_name_to_gpu_data_name = {}
self._data_names = set()
self._dim = dim
self._fieldInformation = {}
self._cpu_gpu_pairs = []
self._custom_data_transfer_functions = {}
self._custom_data_names = []
self._reduce_map = {
'sum': wlb.mpi.SUM,
'min': wlb.mpi.MIN,
'max': wlb.mpi.MAX,
}
if self._dim == 2:
assert self.blocks.getDomainCellBB().size[2] == 1
self._default_target = default_target
@property
def default_target(self):
return self._default_target
@property
def dim(self):
return self._dim
@property
def shape(self):
return self.blocks.getDomainCellBB().size[:self.dim]
@property
def periodicity(self):
return self.blocks.periodic[:self._dim]
@property
def fields(self):
return self._fields
@property
def blocks(self):
return self._blocks
@property
def default_ghost_layers(self):
return self._default_ghost_layers
@property
def default_layout(self):
return self._default_layout
@property
def data_names(self):
return self.data_names
def ghost_layers_of_field(self, name):
return self._fieldInformation[name]['ghost_layers']
def values_per_cell(self, name):
return self._fieldInformation[name]['values_per_cell']
def add_custom_data(self, name, cpu_creation_function,
gpu_creation_function=None, cpu_to_gpu_transfer_func=None, gpu_to_cpu_transfer_func=None):
if cpu_creation_function and gpu_creation_function:
if cpu_to_gpu_transfer_func is None or gpu_to_cpu_transfer_func is None:
raise ValueError("For GPU data, both transfer functions have to be specified")
self._custom_data_transfer_functions[name] = (cpu_to_gpu_transfer_func, gpu_to_cpu_transfer_func)
if cpu_creation_function:
self.blocks.addBlockData(name, cpu_creation_function)
if gpu_creation_function:
self.blocks.addBlockData(self.GPU_DATA_PREFIX + name, gpu_creation_function)
self._custom_data_names.append(name)
def add_array(self, name, values_per_cell=1, dtype=np.float64, latex_name=None, ghost_layers=None,
layout=None, cpu=True, gpu=None, alignment=False, field_type=FieldType.GENERIC):
if ghost_layers is None:
ghost_layers = self.default_ghost_layers
if gpu is None:
gpu = self.default_target == Target.GPU
if layout is None:
layout = self.default_layout
if len(self.blocks) == 0:
raise ValueError("Data handling expects that each process has at least one block")
if hasattr(dtype, 'type'):
dtype = dtype.type
if name in self.blocks[0].fieldNames or self.GPU_DATA_PREFIX + name in self.blocks[0].fieldNames:
raise ValueError("Data with this name has already been added")
if alignment is False or alignment is None:
alignment = 0
if hasattr(values_per_cell, '__len__'):
raise NotImplementedError("Parallel data handling does not support multiple index dimensions")
self._fieldInformation[name] = {
'ghost_layers': ghost_layers,
'values_per_cell': values_per_cell,
'layout': layout,
'dtype': dtype,
'alignment': alignment,
'field_type': field_type,
}
layout_map = {'fzyx': wlb.field.Layout.fzyx, 'zyxf': wlb.field.Layout.zyxf,
'f': wlb.field.Layout.fzyx,
'SoA': wlb.field.Layout.fzyx, 'AoS': wlb.field.Layout.zyxf}
if cpu:
wlb.field.addToStorage(self.blocks, name, dtype, fSize=values_per_cell, layout=layout_map[layout],
ghostLayers=ghost_layers, alignment=alignment)
if gpu:
if alignment != 0:
raise ValueError("Alignment for walberla GPU fields not yet supported")
wlb.gpu.addGpuFieldToStorage(self.blocks, self.GPU_DATA_PREFIX + name, dtype, fSize=values_per_cell,
usePitchedMem=False, ghostLayers=ghost_layers, layout=layout_map[layout])
if cpu and gpu:
self._cpu_gpu_pairs.append((name, self.GPU_DATA_PREFIX + name))
block_bb = self.blocks.getBlockCellBB(self.blocks[0])
shape = tuple(s + 2 * ghost_layers for s in block_bb.size[:self.dim])
index_dimensions = 1 if values_per_cell > 1 else 0
if index_dimensions == 1:
shape += (values_per_cell,)
assert all(f.name != name for f in self.fields.values()), "Symbolic field with this name already exists"
self.fields[name] = Field.create_generic(name, self.dim, dtype, index_dimensions, layout,
index_shape=(values_per_cell,) if index_dimensions > 0 else None,
field_type=field_type)
self.fields[name].latex_name = latex_name
self._field_name_to_cpu_data_name[name] = name
if gpu:
self._field_name_to_gpu_data_name[name] = self.GPU_DATA_PREFIX + name
return self.fields[name]
def has_data(self, name):
return name in self._fields
@property
def array_names(self):
return tuple(self.fields.keys())
@property
def custom_data_names(self):
return tuple(self._custom_data_names)
def add_array_like(self, name, name_of_template_field, latex_name=None, cpu=True, gpu=None):
return self.add_array(name, latex_name=latex_name, cpu=cpu, gpu=gpu,
**self._fieldInformation[name_of_template_field])
def swap(self, name1, name2, gpu=False):
if gpu:
name1 = self.GPU_DATA_PREFIX + name1
name2 = self.GPU_DATA_PREFIX + name2
for block in self.blocks:
block[name1].swapDataPointers(block[name2])
def iterate(self, slice_obj=None, gpu=False, ghost_layers=True, inner_ghost_layers=True):
if ghost_layers is True:
ghost_layers = self.default_ghost_layers
elif ghost_layers is False:
ghost_layers = 0
elif isinstance(ghost_layers, str):
ghost_layers = self.ghost_layers_of_field(ghost_layers)
if inner_ghost_layers is True:
inner_ghost_layers = self.default_ghost_layers
elif inner_ghost_layers is False:
inner_ghost_layers = 0
elif isinstance(ghost_layers, str):
ghost_layers = self.ghost_layers_of_field(ghost_layers)
prefix = self.GPU_DATA_PREFIX if gpu else ""
if slice_obj is not None:
yield from sliced_block_iteration(self.blocks, slice_obj, inner_ghost_layers, ghost_layers,
self.dim, prefix)
else:
yield from block_iteration(self.blocks, ghost_layers, self.dim, prefix)
def gather_array(self, name, slice_obj=None, all_gather=False, ghost_layers=False):
if ghost_layers is not False:
warnings.warn("gather_array with ghost layers is only supported in serial data handling. "
"Array without ghost layers is returned")
if slice_obj is None:
slice_obj = tuple([slice(None, None, None)] * self.dim)
if self.dim == 2:
slice_obj = slice_obj[:2] + (0.5,) + slice_obj[2:]
last_element = slice_obj[3:]
array = wlb.field.gatherField(self.blocks, name, slice_obj[:3], all_gather)
if array is None:
return None
if self.dim == 2:
array = array[:, :, 0]
if last_element and self.fields[name].index_dimensions > 0:
array = array[..., last_element[0]]
return array
def _normalize_arr_shape(self, arr, index_dimensions):
if index_dimensions == 0 and len(arr.shape) > 3:
arr = arr[..., 0]
if self.dim == 2 and len(arr.shape) > 2:
arr = arr[:, :, 0]
return arr
def run_kernel(self, kernel_function, **kwargs):
for arg_dict in self.get_kernel_kwargs(kernel_function, **kwargs):
kernel_function(**arg_dict)
def get_kernel_kwargs(self, kernel_function, **kwargs):
if kernel_function.ast.backend == Backend.CUDA:
name_map = self._field_name_to_gpu_data_name
to_array = wlb.gpu.toGpuArray
else:
name_map = self._field_name_to_cpu_data_name
to_array = wlb.field.toArray
data_used_in_kernel = [(name_map[p.symbol.field_name], self.fields[p.symbol.field_name])
for p in kernel_function.parameters if
isinstance(p.symbol, FieldPointerSymbol) and p.symbol.field_name not in kwargs]
result = []
for block in self.blocks:
field_args = {}
for data_name, f in data_used_in_kernel:
arr = to_array(block[data_name], with_ghost_layers=[True, True, self.dim == 3])
arr = self._normalize_arr_shape(arr, f.index_dimensions)
field_args[f.name] = arr
field_args.update(kwargs)
result.append(field_args)
return result
def to_cpu(self, name):
if name in self._custom_data_transfer_functions:
transfer_func = self._custom_data_transfer_functions[name][1]
for block in self.blocks:
transfer_func(block[self.GPU_DATA_PREFIX + name], block[name])
else:
if self.is_on_gpu(name):
wlb.gpu.copyFieldToCpu(self.blocks, self.GPU_DATA_PREFIX + name, name)
def to_gpu(self, name):
if name in self._custom_data_transfer_functions:
transfer_func = self._custom_data_transfer_functions[name][0]
for block in self.blocks:
transfer_func(block[self.GPU_DATA_PREFIX + name], block[name])
else:
if self.is_on_gpu(name):
wlb.gpu.copyFieldToGpu(self.blocks, self.GPU_DATA_PREFIX + name, name)
def is_on_gpu(self, name):
return (name, self.GPU_DATA_PREFIX + name) in self._cpu_gpu_pairs
def all_to_cpu(self):
for cpu_name, gpu_name in self._cpu_gpu_pairs:
wlb.gpu.copyFieldToCpu(self.blocks, gpu_name, cpu_name)
for name in self._custom_data_transfer_functions.keys():
self.to_cpu(name)
def all_to_gpu(self):
for cpu_name, gpu_name in self._cpu_gpu_pairs:
wlb.gpu.copyFieldToGpu(self.blocks, gpu_name, cpu_name)
for name in self._custom_data_transfer_functions.keys():
self.to_gpu(name)
def synchronization_function_cpu(self, names, stencil=None, buffered=True, stencil_restricted=False, **_):
return self.synchronization_function(names, stencil, Target.CPU, buffered, stencil_restricted)
def synchronization_function_gpu(self, names, stencil=None, buffered=True, stencil_restricted=False, **_):
return self.synchronization_function(names, stencil, Target.GPU, buffered, stencil_restricted)
def synchronization_function(self, names, stencil=None, target=None, buffered=True, stencil_restricted=False):
if target is None:
target = self.default_target
if stencil is None:
stencil = 'D3Q27' if self.dim == 3 else 'D2Q9'
if not hasattr(names, '__len__') or type(names) is str:
names = [names]
create_scheme = wlb.createUniformBufferedScheme if buffered else wlb.createUniformDirectScheme
if target == Target.CPU:
create_packing = wlb.field.createPackInfo if buffered else wlb.field.createMPIDatatypeInfo
if buffered and stencil_restricted:
create_packing = wlb.field.createStencilRestrictedPackInfo
else:
assert target == Target.GPU
create_packing = wlb.gpu.createPackInfo if buffered else wlb.gpu.createMPIDatatypeInfo
names = [self.GPU_DATA_PREFIX + name for name in names]
sync_function = create_scheme(self.blocks, stencil)
for name in names:
sync_function.addDataToCommunicate(create_packing(self.blocks, name))
return sync_function
def reduce_float_sequence(self, sequence, operation, all_reduce=False):
if all_reduce:
return np.array(wlb.mpi.allreduceReal(sequence, self._reduce_map[operation.lower()]))
else:
result = np.array(wlb.mpi.reduceReal(sequence, self._reduce_map[operation.lower()], 0))
return result if wlb.mpi.worldRank() == 0 else None
def reduce_int_sequence(self, sequence, operation, all_reduce=False):
if all_reduce:
return np.array(wlb.mpi.allreduceInt(sequence, self._reduce_map[operation.lower()]))
else:
result = np.array(wlb.mpi.reduceInt(sequence, self._reduce_map[operation.lower()], 0))
return result if wlb.mpi.worldRank() == 0 else None
def create_vtk_writer(self, file_name, data_names, ghost_layers=False):
if ghost_layers is False:
ghost_layers = 0
if ghost_layers is True:
ghost_layers = min(self.ghost_layers_of_field(n) for n in data_names)
file_name = "%s_%02d" % (file_name, ParallelDataHandling.VTK_COUNTER)
ParallelDataHandling.VTK_COUNTER += 1
output = wlb.vtk.makeOutput(self.blocks, file_name, ghostLayers=ghost_layers)
for n in data_names:
output.addCellDataWriter(wlb.field.createVTKWriter(self.blocks, n))
return output
def create_vtk_writer_for_flag_array(self, file_name, data_name, masks_to_name, ghost_layers=False):
if ghost_layers is False:
ghost_layers = 0
if ghost_layers is True:
ghost_layers = self.ghost_layers_of_field(data_name)
output = wlb.vtk.makeOutput(self.blocks, file_name, ghostLayers=ghost_layers)
for mask, name in masks_to_name.items():
w = wlb.field.createBinarizationVTKWriter(self.blocks, data_name, mask, name)
output.addCellDataWriter(w)
return output
@staticmethod
def log(*args, level='INFO'):
_log_map = {
'DEVEL': wlb.log_devel,
'RESULT': wlb.log_result,
'INFO': wlb.log_info,
'WARNING': wlb.log_warning,
'PROGRESS': wlb.log_progress,
}
level = level.upper()
message = " ".join(str(e) for e in args)
_log_map[level](message)
def log_on_root(self, *args, level='INFO'):
if self.is_root:
ParallelDataHandling.log(*args, level=level)
@property
def is_root(self):
return wlb.mpi.worldRank() == 0
@property
def world_rank(self):
return wlb.mpi.worldRank()
def save_all(self, directory):
if not os.path.exists(directory):
os.mkdir(directory)
if os.path.isfile(directory):
raise RuntimeError(f"Trying to save to {directory}, but file exists already")
for field_name, data_name in self._field_name_to_cpu_data_name.items():
self.blocks.writeBlockData(data_name, os.path.join(directory, field_name + ".dat"))
def load_all(self, directory):
for field_name, data_name in self._field_name_to_cpu_data_name.items():
self.blocks.readBlockData(data_name, os.path.join(directory, field_name + ".dat"))
import itertools
import time
from typing import Sequence, Union
import numpy as np
from pystencils.datahandling.blockiteration import SerialBlock
from pystencils.datahandling.datahandling_interface import DataHandling
from pystencils.enums import Target
from pystencils.field import (Field, FieldType, create_numpy_array_with_layout,
layout_string_to_tuple, spatial_layout_string_to_tuple)
from pystencils.gpu.gpu_array_handler import GPUArrayHandler, GPUNotAvailableHandler
from pystencils.slicing import normalize_slice, remove_ghost_layers
from pystencils.utils import DotDict
class SerialDataHandling(DataHandling):
def __init__(self,
domain_size: Sequence[int],
default_ghost_layers: int = 1,
default_layout: str = 'SoA',
periodicity: Union[bool, Sequence[bool]] = False,
default_target: Target = Target.CPU,
array_handler=None,
device_number=None) -> None:
"""
Creates a data handling for single node simulations.
Args:
domain_size: size of the spatial domain as tuple
default_ghost_layers: default number of ghost layers used, if not overridden in add_array() method
default_layout: default layout used, if not overridden in add_array() method
periodicity: List of booleans that indicate which dimensions have periodic boundary conditions.
Alternatively, a single boolean can be given, which is used for all dimensions. Defaults to
False (non-periodic)
default_target: `Target` either 'CPU' or 'GPU'. If set to 'GPU' for each array also a GPU version is
allocated if not overwritten in add_array, and synchronization functions are for the GPU by
default
array_handler: An object that provides the same interface as `GPUArrayHandler`, which is used for creation
and transferring of GPU arrays. Default is to construct a fresh `GPUArrayHandler`
device_number: If `default_target` is set to 'GPU', a device number should be specified. If none is given,
the device with the largest amount of memory is used. If multiple devices have the same
amount of memory, the one with the lower number is used
"""
super(SerialDataHandling, self).__init__()
self._domainSize = tuple(domain_size)
self.default_ghost_layers = default_ghost_layers
self.default_layout = default_layout
self._fields = DotDict()
self.cpu_arrays = DotDict()
self.gpu_arrays = DotDict()
self.custom_data_cpu = DotDict()
self.custom_data_gpu = DotDict()
self._custom_data_transfer_functions = {}
if not array_handler:
try:
if device_number is None:
import cupy.cuda.runtime
if cupy.cuda.runtime.getDeviceCount() > 0:
device_number = sorted(range(cupy.cuda.runtime.getDeviceCount()),
key=lambda i: cupy.cuda.Device(i).mem_info[1], reverse=True)[0]
self.array_handler = GPUArrayHandler(device_number)
except ImportError:
self.array_handler = GPUNotAvailableHandler()
else:
self.array_handler = array_handler
if periodicity is None or periodicity is False:
periodicity = [False] * self.dim
if periodicity is True:
periodicity = [True] * self.dim
self._periodicity = periodicity
self._field_information = {}
self._default_target = default_target
self._start_time = time.perf_counter()
@property
def default_target(self):
return self._default_target
@property
def dim(self):
return len(self._domainSize)
@property
def shape(self):
return self._domainSize
@property
def periodicity(self):
return self._periodicity
@property
def fields(self):
return self._fields
def ghost_layers_of_field(self, name):
return self._field_information[name]['ghost_layers']
def values_per_cell(self, name):
return self._field_information[name]['values_per_cell']
def add_array(self, name, values_per_cell=1, dtype=np.float64, latex_name=None, ghost_layers=None, layout=None,
cpu=True, gpu=None, alignment=False, field_type=FieldType.GENERIC):
if ghost_layers is None:
ghost_layers = self.default_ghost_layers
if layout is None:
layout = self.default_layout
if gpu is None:
gpu = self.default_target in self._GPU_LIKE_TARGETS
kwargs = {
'shape': tuple(s + 2 * ghost_layers for s in self._domainSize),
'dtype': dtype,
}
if not hasattr(values_per_cell, '__len__'):
values_per_cell = (values_per_cell,)
if len(values_per_cell) == 1 and values_per_cell[0] == 1:
values_per_cell = ()
self._field_information[name] = {
'ghost_layers': ghost_layers,
'values_per_cell': values_per_cell,
'layout': layout,
'dtype': dtype,
'alignment': alignment,
'field_type': field_type,
}
index_dimensions = len(values_per_cell)
kwargs['shape'] = kwargs['shape'] + values_per_cell
if index_dimensions > 0:
layout_tuple = layout_string_to_tuple(layout, self.dim + index_dimensions)
else:
layout_tuple = spatial_layout_string_to_tuple(layout, self.dim)
# cpu_arr is always created - since there is no create_gpu_array_with_layout()
byte_offset = ghost_layers * np.dtype(dtype).itemsize
if gpu:
cpu_arr = self.array_handler.pinned_numpy_array(shape=kwargs['shape'], layout=layout_tuple, dtype=dtype)
else:
cpu_arr = create_numpy_array_with_layout(layout=layout_tuple, alignment=alignment,
byte_offset=byte_offset, **kwargs)
if alignment and gpu:
raise NotImplementedError("Alignment for GPU fields not supported")
if cpu:
if name in self.cpu_arrays:
raise ValueError("CPU Field with this name already exists")
self.cpu_arrays[name] = cpu_arr
if gpu:
if name in self.gpu_arrays:
raise ValueError("GPU Field with this name already exists")
self.gpu_arrays[name] = self.array_handler.to_gpu(cpu_arr)
assert all(f.name != name for f in self.fields.values()), "Symbolic field with this name already exists"
self.fields[name] = Field.create_from_numpy_array(name, cpu_arr, index_dimensions=index_dimensions,
field_type=field_type)
self.fields[name].latex_name = latex_name
return self.fields[name]
def add_custom_data(self, name, cpu_creation_function,
gpu_creation_function=None, cpu_to_gpu_transfer_func=None, gpu_to_cpu_transfer_func=None):
if cpu_creation_function and gpu_creation_function:
if cpu_to_gpu_transfer_func is None or gpu_to_cpu_transfer_func is None:
raise ValueError("For GPU data, both transfer functions have to be specified")
self._custom_data_transfer_functions[name] = (cpu_to_gpu_transfer_func, gpu_to_cpu_transfer_func)
assert name not in self.custom_data_cpu
if cpu_creation_function:
assert name not in self.cpu_arrays
self.custom_data_cpu[name] = cpu_creation_function()
if gpu_creation_function:
assert name not in self.gpu_arrays
self.custom_data_gpu[name] = gpu_creation_function()
def has_data(self, name):
return name in self.fields
def add_array_like(self, name, name_of_template_field, latex_name=None, cpu=True, gpu=None):
return self.add_array(name, latex_name=latex_name, cpu=cpu, gpu=gpu,
**self._field_information[name_of_template_field])
def iterate(self, slice_obj=None, gpu=False, ghost_layers=True, inner_ghost_layers=True):
if ghost_layers is True:
ghost_layers = self.default_ghost_layers
elif ghost_layers is False:
ghost_layers = 0
elif isinstance(ghost_layers, str):
ghost_layers = self.ghost_layers_of_field(ghost_layers)
if slice_obj is None:
slice_obj = (slice(None, None, None),) * self.dim
slice_obj = normalize_slice(slice_obj, tuple(s + 2 * ghost_layers for s in self._domainSize))
slice_obj = tuple(s if type(s) is slice else slice(s, s + 1, None) for s in slice_obj)
arrays = self.gpu_arrays if gpu else self.cpu_arrays
custom_data_dict = self.custom_data_gpu if gpu else self.custom_data_cpu
iter_dict = custom_data_dict.copy()
for name, arr in arrays.items():
field_gls = self._field_information[name]['ghost_layers']
if field_gls < ghost_layers:
continue
arr = remove_ghost_layers(arr, index_dimensions=len(arr.shape) - self.dim,
ghost_layers=field_gls - ghost_layers)
iter_dict[name] = arr
offset = tuple(s.start - ghost_layers for s in slice_obj)
yield SerialBlock(iter_dict, offset, slice_obj)
def gather_array(self, name, slice_obj=None, ghost_layers=False, **kwargs):
gl_to_remove = self._field_information[name]['ghost_layers']
if isinstance(ghost_layers, int):
gl_to_remove -= ghost_layers
if ghost_layers is True:
gl_to_remove = 0
arr = self.cpu_arrays[name]
ind_dimensions = self.fields[name].index_dimensions
spatial_dimensions = self.fields[name].spatial_dimensions
arr = remove_ghost_layers(arr, index_dimensions=ind_dimensions, ghost_layers=gl_to_remove)
if slice_obj is not None:
normalized_slice = normalize_slice(slice_obj[:spatial_dimensions], arr.shape[:spatial_dimensions])
normalized_slice = tuple(s if type(s) is slice else slice(s, s + 1, None) for s in normalized_slice)
normalized_slice += slice_obj[spatial_dimensions:]
arr = arr[normalized_slice]
else:
arr = arr.view()
arr.flags.writeable = False
return arr
def swap(self, name1, name2, gpu=None):
if gpu is None:
gpu = self.default_target in self._GPU_LIKE_TARGETS
arr = self.gpu_arrays if gpu else self.cpu_arrays
arr[name1], arr[name2] = arr[name2], arr[name1]
def all_to_cpu(self):
for name in (self.cpu_arrays.keys() & self.gpu_arrays.keys()) | self._custom_data_transfer_functions.keys():
self.to_cpu(name)
def all_to_gpu(self):
for name in (self.cpu_arrays.keys() & self.gpu_arrays.keys()) | self._custom_data_transfer_functions.keys():
self.to_gpu(name)
def run_kernel(self, kernel_function, **kwargs):
arrays = self.gpu_arrays if kernel_function.ast.backend in self._GPU_LIKE_BACKENDS else self.cpu_arrays
kernel_function(**{**arrays, **kwargs})
def get_kernel_kwargs(self, kernel_function, **kwargs):
result = {}
result.update(self.gpu_arrays if kernel_function.ast.backend in self._GPU_LIKE_BACKENDS else self.cpu_arrays)
result.update(kwargs)
return [result]
def to_cpu(self, name):
if name in self._custom_data_transfer_functions:
transfer_func = self._custom_data_transfer_functions[name][1]
transfer_func(self.custom_data_gpu[name], self.custom_data_cpu[name])
else:
if name in self.cpu_arrays.keys() & self.gpu_arrays.keys():
self.array_handler.download(self.gpu_arrays[name], self.cpu_arrays[name])
def to_gpu(self, name):
if name in self._custom_data_transfer_functions:
transfer_func = self._custom_data_transfer_functions[name][0]
transfer_func(self.custom_data_gpu[name], self.custom_data_cpu[name])
else:
if name in self.cpu_arrays.keys() & self.gpu_arrays.keys():
self.array_handler.upload(self.gpu_arrays[name], self.cpu_arrays[name])
def is_on_gpu(self, name):
return name in self.gpu_arrays
def synchronization_function_cpu(self, names, stencil_name=None, **_):
return self.synchronization_function(names, stencil_name, target=Target.CPU)
def synchronization_function_gpu(self, names, stencil_name=None, **_):
return self.synchronization_function(names, stencil_name, target=Target.GPU)
def synchronization_function(self, names, stencil=None, target=None, functor=None, **_):
if target is None:
target = self.default_target
assert target in (Target.CPU, Target.GPU)
if not hasattr(names, '__len__') or type(names) is str:
names = [names]
filtered_stencil = []
neighbors = [-1, 0, 1]
if (stencil is None and self.dim == 2) or (stencil is not None and stencil.startswith('D2')):
directions = itertools.product(*[neighbors] * 2)
elif (stencil is None and self.dim == 3) or (stencil is not None and stencil.startswith('D3')):
directions = itertools.product(*[neighbors] * 3)
else:
raise ValueError("Invalid stencil")
for direction in directions:
use_direction = True
if direction == (0, 0) or direction == (0, 0, 0):
use_direction = False
for component, periodicity in zip(direction, self._periodicity):
if not periodicity and component != 0:
use_direction = False
if use_direction:
filtered_stencil.append(direction)
result = []
for name in names:
gls = self._field_information[name]['ghost_layers']
values_per_cell = self._field_information[name]['values_per_cell']
if values_per_cell == ():
values_per_cell = (1,)
if len(values_per_cell) == 1:
values_per_cell = values_per_cell[0]
if len(filtered_stencil) > 0:
if target == Target.CPU:
if functor is None:
from pystencils.slicing import get_periodic_boundary_functor
functor = get_periodic_boundary_functor
result.append(functor(filtered_stencil, ghost_layers=gls))
else:
if functor is None:
from pystencils.gpu.periodicity import get_periodic_boundary_functor as functor
target = Target.GPU
result.append(functor(filtered_stencil, self._domainSize,
index_dimensions=self.fields[name].index_dimensions,
index_dim_shape=values_per_cell,
dtype=self.fields[name].dtype.numpy_dtype,
ghost_layers=gls,
target=target))
if target == Target.CPU:
def result_functor():
for arr_name, func in zip(names, result):
func(pdfs=self.cpu_arrays[arr_name])
else:
def result_functor():
for arr_name, func in zip(names, result):
func(pdfs=self.gpu_arrays[arr_name])
return result_functor
@property
def array_names(self):
return tuple(self.fields.keys())
@property
def custom_data_names(self):
return tuple(self.custom_data_cpu.keys())
def reduce_float_sequence(self, sequence, operation, all_reduce=False) -> np.array:
return np.array(sequence)
def reduce_int_sequence(self, sequence, operation, all_reduce=False) -> np.array:
return np.array(sequence)
def create_vtk_writer(self, file_name, data_names, ghost_layers=False):
from pystencils.datahandling.vtk import image_to_vtk
def writer(step):
full_file_name = "%s_%08d" % (file_name, step,)
cell_data = {}
for name in data_names:
field = self._get_field_with_given_number_of_ghost_layers(name, ghost_layers)
if self.dim == 2:
field = field[:, :, np.newaxis]
if len(field.shape) == 3:
cell_data[name] = np.ascontiguousarray(field)
elif len(field.shape) == 4:
values_per_cell = field.shape[-1]
if values_per_cell == self.dim:
field = [np.ascontiguousarray(field[..., i]) for i in range(values_per_cell)]
if len(field) == 2:
field.append(np.zeros_like(field[0]))
cell_data[name] = tuple(field)
else:
for i in range(values_per_cell):
cell_data["%s[%d]" % (name, i)] = np.ascontiguousarray(field[..., i])
else:
raise NotImplementedError("VTK export for fields with more than one index "
"coordinate not implemented")
image_to_vtk(full_file_name, cell_data=cell_data)
return writer
def create_vtk_writer_for_flag_array(self, file_name, data_name, masks_to_name, ghost_layers=False):
from pystencils.datahandling.vtk import image_to_vtk
def writer(step):
full_file_name = "%s_%08d" % (file_name, step,)
field = self._get_field_with_given_number_of_ghost_layers(data_name, ghost_layers)
if self.dim == 2:
field = field[:, :, np.newaxis]
cell_data = {name: np.ascontiguousarray(np.bitwise_and(field, field.dtype.type(mask)) > 0, dtype=np.uint8)
for mask, name in masks_to_name.items()}
image_to_vtk(full_file_name, cell_data=cell_data)
return writer
def _get_field_with_given_number_of_ghost_layers(self, name, ghost_layers):
actual_ghost_layers = self.ghost_layers_of_field(name)
if ghost_layers is True:
ghost_layers = actual_ghost_layers
gl_to_remove = actual_ghost_layers - ghost_layers
ind_dims = len(self._field_information[name]['values_per_cell'])
return remove_ghost_layers(self.cpu_arrays[name], ind_dims, gl_to_remove)
def log(self, *args, level='INFO'):
level = level.upper()
message = " ".join(str(e) for e in args)
time_running = time.perf_counter() - self._start_time
spacing = 7 - len(str(int(time_running)))
message = f"[{level: <8}]{spacing * '-'}({time_running:.3f} sec) {message} "
print(message, flush=True)
def log_on_root(self, *args, level='INFO'):
self.log(*args, level=level)
@property
def is_root(self):
return True
@property
def world_rank(self):
return 0
def save_all(self, filename, compressed=True, synchronise_data=True):
if synchronise_data:
for name in (self.cpu_arrays.keys() & self.gpu_arrays.keys()):
self.to_cpu(name)
if compressed:
np.savez_compressed(filename, **self.cpu_arrays)
else:
np.savez(filename, **self.cpu_arrays)
def load_all(self, filename, synchronise_data=True):
if '.npz' not in filename:
filename += '.npz'
file_contents = np.load(filename)
for arr_name, arr_contents in self.cpu_arrays.items():
if arr_name not in file_contents:
print(f"Skipping read data {arr_name} because there is no data with this name in data handling")
continue
if file_contents[arr_name].shape != arr_contents.shape:
print(f"Skipping read data {arr_name} because shapes don't match. "
f"Read array shape {file_contents[arr_name].shape}, existing array shape {arr_contents.shape}")
continue
np.copyto(arr_contents, file_contents[arr_name])
if synchronise_data:
if arr_name in self.gpu_arrays.keys():
self.to_gpu(arr_name)
from pyevtk.hl import _addDataToFile, _appendDataToFile
from pyevtk.vtk import VtkFile, VtkImageData
def image_to_vtk(path, cell_data, origin=(0.0, 0.0, 0.0), spacing=(1.0, 1.0, 1.0)):
"""
Writes numpy data to VTK
Numpy arrays have to be contiguous in memory - if this is a problem call :func:`numpy.ascontiguousarray` first
Patched version of same pyevtk function that also supports vector-valued data
Args:
path: path with file name, without file ending (.vtk) where data should be stored
cell_data: dictionary, mapping name of the data to a 3D numpy array, or to a 3-tuple of 3D numpy arrays
in case of vector-valued data
origin: 3-tuple describing the origin of the field in 3D
spacing: 3-tuple describing the grid spacing in x,y, z direction
Returns:
path to file that was written
Examples:
>>> from tempfile import TemporaryDirectory
>>> import os
>>> import numpy as np
>>> with TemporaryDirectory() as tmp_dir:
... path = os.path.join(tmp_dir, 'out')
... size = (20, 20, 20)
... res_file = image_to_vtk(path, cell_data={'vector': (np.ones(size), np.ones(size), np.ones(size)),
... 'scalar': np.zeros(size)
... })
"""
# Extract dimensions
start = (0, 0, 0)
end = None
keys = list(cell_data.keys())
data = cell_data[keys[0]]
if hasattr(data, 'shape'):
end = data.shape
elif isinstance(data, tuple):
shapes = set(d.shape for d in data)
if len(shapes) > 1:
raise ValueError("All components have to have the same shape")
end = shapes.pop()
# Write data to file
w = VtkFile(path, VtkImageData)
w.openGrid(start=start, end=end, origin=origin, spacing=spacing)
w.openPiece(start=start, end=end)
_addDataToFile(w, cell_data, pointData=None)
w.closePiece()
w.closeGrid()
_appendDataToFile(w, cell_data, pointData=None)
w.save()
return w.getFileName()
from typing import Any, Dict, Optional, Union
import sympy as sp
from pystencils.astnodes import KernelFunction
from pystencils.enums import Backend
from pystencils.kernel_wrapper import KernelWrapper
def to_dot(expr: sp.Expr, graph_style: Optional[Dict[str, Any]] = None, short=True):
"""Show a sympy or pystencils AST as dot graph"""
from pystencils.astnodes import Node
try:
import graphviz
except ImportError:
print("graphviz is not installed. Visualizing the AST is not available")
return
graph_style = {} if graph_style is None else graph_style
if isinstance(expr, Node):
from pystencils.backends.dot import print_dot
return graphviz.Source(print_dot(expr, short=short, graph_attr=graph_style))
else:
from sympy.printing.dot import dotprint
return graphviz.Source(dotprint(expr, graph_attr=graph_style))
def highlight_cpp(code: str):
"""Highlight the given C/C++ source code with pygments."""
from IPython.display import HTML, display
from pygments import highlight
# noinspection PyUnresolvedReferences
from pygments.formatters import HtmlFormatter
# noinspection PyUnresolvedReferences
from pygments.lexers import CppLexer
css = HtmlFormatter().get_style_defs('.highlight')
css_tag = f"<style>{css}</style>"
display(HTML(css_tag))
return HTML(highlight(code, CppLexer(), HtmlFormatter()))
def get_code_obj(ast: Union[KernelFunction, KernelWrapper], custom_backend=None):
"""Returns an object to display generated code (C/C++ or CUDA)
Can either be displayed as HTML in Jupyter notebooks or printed as normal string.
"""
from pystencils.backends.cbackend import generate_c
if isinstance(ast, KernelWrapper):
ast = ast.ast
if ast.backend not in {Backend.C, Backend.CUDA}:
raise NotImplementedError(f'get_code_obj is not implemented for backend {ast.backend}')
dialect = ast.backend
class CodeDisplay:
def __init__(self, ast_input):
self.ast = ast_input
def _repr_html_(self):
return highlight_cpp(generate_c(self.ast, dialect=dialect, custom_backend=custom_backend)).__html__()
def __str__(self):
return generate_c(self.ast, dialect=dialect, custom_backend=custom_backend)
def __repr__(self):
return generate_c(self.ast, dialect=dialect, custom_backend=custom_backend)
return CodeDisplay(ast)
def get_code_str(ast, custom_backend=None):
return str(get_code_obj(ast, custom_backend))
def _isnotebook():
try:
shell = get_ipython().__class__.__name__
if shell == 'ZMQInteractiveShell':
return True # Jupyter notebook or qtconsole
elif shell == 'TerminalInteractiveShell':
return False # Terminal running IPython
else:
return False # Other type (?)
except NameError:
return False
def show_code(ast: Union[KernelFunction, KernelWrapper], custom_backend=None):
code = get_code_obj(ast, custom_backend)
if _isnotebook():
from IPython.display import display
display(code)
else:
try:
import rich.syntax
import rich.console
syntax = rich.syntax.Syntax(str(code), "c++", theme="monokai", line_numbers=True)
console = rich.console.Console()
console.print(syntax)
except ImportError:
print(code)
from enum import Enum, auto
class Target(Enum):
"""
The Target enumeration represents all possible targets that can be used for the code generation.
"""
CPU = auto()
"""
Target CPU architecture.
"""
GPU = auto()
"""
Target GPU architecture.
"""
class Backend(Enum):
"""
The Backend enumeration represents all possible backends that can be used for the code generation.
Backends and targets must be combined with care. For example CPU as a target and CUDA as a backend makes no sense.
"""
C = auto()
"""
Use the C Backend of pystencils.
"""
CUDA = auto()
"""
Use the CUDA backend to generate code for NVIDIA GPUs.
"""
from typing import List, Union
import sympy as sp
from pystencils.astnodes import Node
from pystencils.simp import AssignmentCollection
from pystencils.assignment import Assignment
# noinspection PyPep8Naming
class fast_division(sp.Function):
"""
Produces special float instructions for CUDA kernels
"""
nargs = (2,)
# noinspection PyPep8Naming
class fast_sqrt(sp.Function):
"""
Produces special float instructions for CUDA kernels
"""
nargs = (1, )
# noinspection PyPep8Naming
class fast_inv_sqrt(sp.Function):
"""
Produces special float instructions for CUDA kernels
"""
nargs = (1, )
def _run(term, visitor):
if isinstance(term, AssignmentCollection):
new_main_assignments = _run(term.main_assignments, visitor)
new_subexpressions = _run(term.subexpressions, visitor)
return term.copy(new_main_assignments, new_subexpressions)
elif isinstance(term, list):
return [_run(e, visitor) for e in term]
else:
return visitor(term)
def insert_fast_sqrts(term: Union[sp.Expr, List[sp.Expr], AssignmentCollection, Assignment]):
def visit(expr):
if isinstance(expr, Node):
return expr
if expr.func == sp.Pow and isinstance(expr.exp, sp.Rational) and expr.exp.q == 2:
power = expr.exp.p
if power < 0:
return fast_inv_sqrt(expr.args[0]) ** (-power)
else:
return fast_sqrt(expr.args[0]) ** power
else:
new_args = [visit(a) for a in expr.args]
return expr.func(*new_args) if new_args else expr
return _run(term, visit)
def insert_fast_divisions(term: Union[sp.Expr, List[sp.Expr], AssignmentCollection, Assignment]):
def visit(expr):
if isinstance(expr, Node):
return expr
if expr.func == sp.Mul:
div_args = []
other_args = []
for a in expr.args:
if a.func == sp.Pow and a.exp.is_integer and a.exp < 0:
div_args.append(visit(a.base) ** (-a.exp))
else:
other_args.append(visit(a))
if div_args:
return fast_division(sp.Mul(*other_args), sp.Mul(*div_args))
else:
return sp.Mul(*other_args)
elif expr.func == sp.Pow and expr.exp.is_integer and expr.exp < 0:
return fast_division(1, visit(expr.base) ** (-expr.exp))
else:
new_args = [visit(a) for a in expr.args]
return expr.func(*new_args) if new_args else expr
return _run(term, visit)
from .derivative import (
Diff, DiffOperator, collect_diffs, combine_diff_products, diff, diff_terms, evaluate_diffs,
expand_diff_full, expand_diff_linear, expand_diff_products, functional_derivative,
normalize_diff_order, zero_diffs)
from .finitedifferences import Discretization2ndOrder, advection, diffusion, transient
from .finitevolumes import FVM1stOrder, VOF
from .spatial import discretize_spatial, discretize_spatial_staggered
__all__ = ['Diff', 'diff', 'DiffOperator', 'diff_terms', 'collect_diffs',
'zero_diffs', 'evaluate_diffs', 'normalize_diff_order', 'expand_diff_full', 'expand_diff_linear',
'expand_diff_products', 'combine_diff_products', 'functional_derivative',
'advection', 'diffusion', 'transient', 'Discretization2ndOrder', 'discretize_spatial',
'discretize_spatial_staggered', 'FVM1stOrder', 'VOF']