Commit ba9b9d1a authored by Markus Holzer's avatar Markus Holzer
Browse files

Merge branch 'advanced_streaming_extensions' into 'master'

Advanced Streaming Extensions

See merge request !40
parents fb514f53 e8ed830d
Pipeline #28006 waiting for manual action with stage
in 24 minutes and 15 seconds
......@@ -10,4 +10,5 @@ __pycache__
\ No newline at end of file
\ No newline at end of file
......@@ -113,13 +113,9 @@ pycodegen-integration:
- pip install -e pystencils/
- pip install -e lbmpy/
- ./
- export NUM_CORES=$(nproc --all)
- mkdir -p ~/.config/matplotlib
- echo "backend:template" > ~/.config/matplotlib/matplotlibrc
- cd ../lbmpy
- py.test -v -n $NUM_CORES .
- cd ../walberla/build/
- make CodegenJacobiCPU CodegenJacobiGPU CodegenPoissonCPU CodegenPoissonGPU MicroBenchmarkGpuLbm LbCodeGenerationExample UniformGridBenchmarkGPU_trt UniformGridBenchmarkGPU_entropic_kbc_n4
# build all integration tests
- cd walberla/build/
- make -j $NUM_CORES MicroBenchmarkGpuLbm LbCodeGenerationExample
- cd apps/benchmarks/UniformGridGPU
- make -j $NUM_CORES
- cd ../UniformGridGenerated
from .indexing import BetweenTimestepsIndexing, NeighbourOffsetArrays
from .communication import get_communication_slices, LBMPeriodicityHandling
from .utility import Timestep, get_accessor, is_inplace, get_timesteps, \
numeric_index, numeric_offsets, inverse_dir_index, AccessPdfValues
__all__ = ['BetweenTimestepsIndexing', 'NeighbourOffsetArrays',
'get_communication_slices', 'LBMPeriodicityHandling',
'Timestep', 'get_accessor', 'is_inplace', 'get_timesteps',
'numeric_index', 'numeric_offsets', 'inverse_dir_index', 'AccessPdfValues']
from pystencils import Field, Assignment
from pystencils.slicing import shift_slice, get_slice_before_ghost_layer, normalize_slice
from lbmpy.advanced_streaming.utility import is_inplace, get_accessor, numeric_index, \
numeric_offsets, Timestep, get_timesteps
from lbmpy.stencils import get_stencil
from pystencils.datahandling import SerialDataHandling
from itertools import chain
def _trim_slice_in_direction(slices, direction):
assert len(slices) == len(direction)
result = []
for s, d in zip(slices, direction):
if isinstance(s, int):
start = s.start + 1 if d == -1 else s.start
stop = s.stop - 1 if d == 1 else s.stop
result.append(slice(start, stop, s.step))
return tuple(result)
def _extend_dir(direction):
if len(direction) == 0:
yield tuple()
elif direction[0] == 0:
for d in [-1, 0, 1]:
for rest in _extend_dir(direction[1:]):
yield (d, ) + rest
for rest in _extend_dir(direction[1:]):
yield (direction[0], ) + rest
def _get_neighbour_transform(direction, ghost_layers):
return tuple(d * (ghost_layers + 1) for d in direction)
def _fix_length_one_slices(slices):
"""Slices of length one are replaced by their start value for correct periodic shifting"""
if isinstance(slices, int):
return slices
elif isinstance(slices, slice):
if slices.stop is not None and abs(slices.start - slices.stop) == 1:
return slices.start
elif slices.stop is None and slices.start == -1:
return -1 # [-1:] also has length one
return slices
return tuple(_fix_length_one_slices(s) for s in slices)
def get_communication_slices(
stencil, comm_stencil=None, streaming_pattern='pull', prev_timestep=Timestep.BOTH, ghost_layers=1):
Return the source and destination slices for periodicity handling or communication between blocks.
:param stencil: The stencil used by the LB method.
:param comm_stencil: The stencil defining the communication directions. If None, it will be set to stencil.
:param streaming_pattern: The streaming pattern.
:param prev_timestep: Timestep after which communication is run.
:param ghost_layers: Number of ghost layers in each direction.
if comm_stencil is None:
comm_stencil = stencil
pdfs = Field.create_generic('pdfs', spatial_dimensions=len(stencil[0]), index_shape=(len(stencil),))
write_accesses = get_accessor(streaming_pattern, prev_timestep).write(pdfs, stencil)
slices_per_comm_direction = dict()
for comm_dir in comm_stencil:
if all(d == 0 for d in comm_dir):
slices_for_dir = []
for streaming_dir in set(_extend_dir(comm_dir)) & set(stencil):
d = stencil.index(streaming_dir)
write_offsets = numeric_offsets(write_accesses[d])
write_index = numeric_index(write_accesses[d])[0]
tangential_dir = tuple(s - c for s, c in zip(streaming_dir, comm_dir))
origin_slice = get_slice_before_ghost_layer(comm_dir, ghost_layers=ghost_layers, thickness=1)
origin_slice = _fix_length_one_slices(origin_slice)
src_slice = shift_slice(_trim_slice_in_direction(origin_slice, tangential_dir), write_offsets)
neighbour_transform = _get_neighbour_transform(comm_dir, ghost_layers)
dst_slice = shift_slice(src_slice, neighbour_transform)
src_slice = src_slice + (write_index, )
dst_slice = dst_slice + (write_index, )
slices_for_dir.append((src_slice, dst_slice))
slices_per_comm_direction[comm_dir] = slices_for_dir
return slices_per_comm_direction
def periodic_pdf_copy_kernel(pdf_field, src_slice, dst_slice,
domain_size=None, target='gpu',
opencl_queue=None, opencl_ctx=None):
"""Copies a rectangular array slice onto another non-overlapping array slice"""
from pystencils.gpucuda.kernelcreation import create_cuda_kernel
pdf_idx = src_slice[-1]
assert isinstance(pdf_idx, int), "PDF index needs to be an integer constant"
assert pdf_idx == dst_slice[-1], "Source and Destination PDF indices must be equal"
src_slice = src_slice[:-1]
dst_slice = dst_slice[:-1]
if domain_size is None:
domain_size = pdf_field.spatial_shape
normalized_from_slice = normalize_slice(src_slice, domain_size)
normalized_to_slice = normalize_slice(dst_slice, domain_size)
def _start(s):
return s.start if isinstance(s, slice) else s
def _stop(s):
return s.stop if isinstance(s, slice) else s
offset = [_start(s1) - _start(s2) for s1, s2 in zip(normalized_from_slice, normalized_to_slice)]
assert offset == [_stop(s1) - _stop(s2) for s1, s2 in zip(normalized_from_slice, normalized_to_slice)], \
"Slices have to have same size"
copy_eq = Assignment(pdf_field(pdf_idx), pdf_field[tuple(offset)](pdf_idx))
ast = create_cuda_kernel([copy_eq], iteration_slice=dst_slice, skip_independence_check=True)
if target == 'gpu':
from pystencils.gpucuda import make_python_function
return make_python_function(ast)
elif target == 'opencl':
from pystencils.opencl import make_python_function
return make_python_function(ast, opencl_queue, opencl_ctx)
raise ValueError('Invalid target:', target)
class LBMPeriodicityHandling:
def __init__(self, stencil, data_handling, pdf_field_name,
streaming_pattern='pull', ghost_layers=1,
opencl_queue=None, opencl_ctx=None,
Periodicity Handling for Lattice Boltzmann Streaming.
**On the usage with cuda/opencl:**
- pycuda allows the copying of sliced arrays within device memory using the numpy syntax,
e.g. `dst[:,0] = src[:,-1]`. In this implementation, this is the default for periodicity
handling. Alternatively, if you set `pycuda_direct_copy=False`, GPU kernels are generated and
compiled. The compiled kernels are almost twice as fast in execution as pycuda array copying,
but especially for large stencils like D3Q27, their compilation can take up to 20 seconds.
Choose your weapon depending on your use case.
- pyopencl does not support copying of non-contiguous sliced arrays, so the usage of compiled
copy kernels is forced on us. On the positive side, compilation of the OpenCL kernels appears
to be about four times faster.
if not isinstance(data_handling, SerialDataHandling):
raise ValueError('Only serial data handling is supported!')
if isinstance(stencil, str):
stencil = get_stencil(stencil)
self.stencil = stencil
self.dh = data_handling
target = data_handling.default_target
assert target in ['cpu', 'gpu', 'opencl']
self.pdf_field_name = pdf_field_name
self.ghost_layers = ghost_layers
periodicity = data_handling.periodicity
self.inplace_pattern = is_inplace(streaming_pattern) = target
self.cpu = target == 'cpu'
self.opencl_queue = opencl_queue
self.opencl_ctx = opencl_ctx
self.pycuda_direct_copy = target == 'gpu' and pycuda_direct_copy
def is_copy_direction(direction):
for d, p in zip(direction, periodicity):
if d != 0 and not p:
return False
return True
copy_directions = tuple(filter(is_copy_direction, stencil[1:]))
self.comm_slices = []
timesteps = get_timesteps(streaming_pattern)
for timestep in timesteps:
slices_per_comm_dir = get_communication_slices(stencil=stencil,
self.comm_slices.append(list(chain.from_iterable(v for k, v in slices_per_comm_dir.items())))
if target == 'opencl' or (target == 'gpu' and not pycuda_direct_copy):
self.device_copy_kernels = []
for timestep in timesteps:
def __call__(self, prev_timestep=Timestep.BOTH):
if self.cpu:
def _periodicity_handling_cpu(self, prev_timestep):
arr = self.dh.cpu_arrays[self.pdf_field_name]
comm_slices = self.comm_slices[prev_timestep.idx]
for src, dst in comm_slices:
arr[dst] = arr[src]
def _compile_copy_kernels(self, timestep):
pdf_field = self.dh.fields[self.pdf_field_name]
kernels = []
for src, dst in self.comm_slices[timestep.idx]:
pdf_field, src, dst,,
opencl_queue=self.opencl_queue, opencl_ctx=self.opencl_ctx))
return kernels
def _periodicity_handling_gpu(self, prev_timestep):
arr = self.dh.gpu_arrays[self.pdf_field_name]
if self.pycuda_direct_copy:
for src, dst in self.comm_slices[prev_timestep.idx]:
arr[dst] = arr[src]
kernel_args = {self.pdf_field_name: arr}
for kernel in self.device_copy_kernels[prev_timestep.idx]:
import numpy as np
import sympy as sp
import pystencils as ps
from pystencils.data_types import TypedSymbol, create_type
from pystencils.backends.cbackend import CustomCodeNode
from lbmpy.stencils import get_stencil
from lbmpy.advanced_streaming.utility import get_accessor, inverse_dir_index, is_inplace, Timestep
from itertools import product
def _array_pattern(dtype, name, content):
return f"const {str(dtype)} {name} [] = {{ {','.join(str(c) for c in content)} }}; \n"
class BetweenTimestepsIndexing:
# ==============================================
# Symbols for usage in kernel definitions
# ==============================================
def proxy_fields(self):
return ps.fields(f"f_out({self._q}), f_in({self._q}): [{self._dim}D]")
def dir_symbol(self):
return TypedSymbol('dir', create_type(self._index_dtype))
def inverse_dir_symbol(self):
"""Symbol denoting the inversion of a PDF field index.
Use only at top-level of index to f_out or f_in, otherwise it can't be correctly replaced."""
return sp.IndexedBase('invdir')
# =============================
# Constructor and State
# =============================
def __init__(self, pdf_field, stencil, prev_timestep=Timestep.BOTH, streaming_pattern='pull',
index_dtype=np.int32, offsets_dtype=np.int32):
if prev_timestep == Timestep.BOTH and is_inplace(streaming_pattern):
raise ValueError('Cannot create index arrays for both kinds of timesteps for inplace streaming pattern '
+ streaming_pattern)
if isinstance(stencil, str):
stencil = get_stencil(stencil)
prev_accessor = get_accessor(streaming_pattern, prev_timestep)
next_accessor = get_accessor(streaming_pattern,
outward_accesses = prev_accessor.write(pdf_field, stencil)
inward_accesses =, stencil)
self._accesses = {'out': outward_accesses, 'in': inward_accesses}
self._pdf_field = pdf_field
self._stencil = stencil
self._dim = len(stencil[0])
self._q = len(stencil)
self._coordinate_names = ['x', 'y', 'z'][:self._dim]
self._index_dtype = create_type(index_dtype)
self._offsets_dtype = create_type(offsets_dtype)
self._required_index_arrays = set()
self._required_offset_arrays = set()
self._trivial_index_translations, self._trivial_offset_translations = self._collect_trivial_translations()
def _index_array_symbol(self, f_dir, inverse):
assert f_dir in ['in', 'out']
inv = '_inv' if inverse else ''
name = f"f_{f_dir}{inv}_dir_idx"
return TypedSymbol(name, self._index_dtype)
def _offset_array_symbols(self, f_dir, inverse):
assert f_dir in ['in', 'out']
inv = '_inv' if inverse else ''
name_base = f"f_{f_dir}{inv}_offsets_"
symbols = [TypedSymbol(name_base + d, self._index_dtype) for d in self._coordinate_names]
return symbols
def _array_symbols(self, f_dir, inverse, index):
if (f_dir, inverse) in self._trivial_index_translations:
translated_index = index
index_array_symbol = self._index_array_symbol(f_dir, inverse)
translated_index = sp.IndexedBase(index_array_symbol, shape=(1,))[index]
self._required_index_arrays.add((f_dir, inverse))
if (f_dir, inverse) in self._trivial_offset_translations:
offsets = (0, ) * self._dim
offset_array_symbols = self._offset_array_symbols(f_dir, inverse)
offsets = tuple(sp.IndexedBase(s, shape=(1,))[index] for s in offset_array_symbols)
self._required_offset_arrays.add((f_dir, inverse))
return {'index': translated_index, 'offsets': offsets}
# =================================
# Proxy fields substitution
# =================================
def substitute_proxies(self, assignments):
if isinstance(assignments, ps.Assignment):
assignments = [assignments]
if not isinstance(assignments, ps.AssignmentCollection):
assignments = ps.AssignmentCollection(assignments)
accesses = self._accesses
f_out, f_in = self.proxy_fields
inv_dir = self.inverse_dir_symbol
accessor_subs = dict()
for fa in assignments.atoms(ps.Field.Access):
if fa.field == f_out:
f_dir = 'out'
elif fa.field == f_in:
f_dir = 'in'
inv = False
idx = fa.index[0]
if isinstance(idx, sp.Indexed) and idx.base == inv_dir:
idx = idx.indices[0]
if isinstance(sp.sympify(idx), sp.Integer):
idx = inverse_dir_index(self._stencil, idx)
inv = True
if isinstance(sp.sympify(idx), sp.Integer):
accessor_subs[fa] = accesses[f_dir][idx].get_shifted(*(fa.offsets))
arr = self._array_symbols(f_dir, inv, idx)
accessor_subs[fa] = self._pdf_field[arr['offsets']](arr['index']).get_shifted(*(fa.offsets))
return assignments.new_with_substitutions(accessor_subs)
# =================
# Internals
# =================
def _get_translated_indices_and_offsets(self, f_dir, inv):
accesses = self._accesses[f_dir]
if inv:
inverse_indices = [inverse_dir_index(self._stencil, i)
for i in range(len(self._stencil))]
accesses = [accesses[idx] for idx in inverse_indices]
indices = [a.index[0] for a in accesses]
offsets = []
for d in range(self._dim):
offsets.append([a.offsets[d] for a in accesses])
return indices, offsets
def _collect_trivial_translations(self):
trivial_index_translations = set()
trivial_offset_translations = set()
trivial_indices = list(range(self._q))
trivial_offsets = [[0] * self._q] * self._dim
for f_dir, inv in product(['in', 'out'], [False, True]):
indices, offsets = self._get_translated_indices_and_offsets(f_dir, inv)
if indices == trivial_indices:
trivial_index_translations.add((f_dir, inv))
if offsets == trivial_offsets:
trivial_offset_translations.add((f_dir, inv))
return trivial_index_translations, trivial_offset_translations
def create_code_node(self):
return BetweenTimestepsIndexing.TranslationArraysNode(self)
class TranslationArraysNode(CustomCodeNode):
def __init__(self, indexing):
code = ''
symbols_defined = set()
for f_dir, inv in indexing._required_index_arrays:
indices, offsets = indexing._get_translated_indices_and_offsets(f_dir, inv)
index_array_symbol = indexing._index_array_symbol(f_dir, inv)
code += _array_pattern(indexing._index_dtype,, indices)
for f_dir, inv in indexing._required_offset_arrays:
indices, offsets = indexing._get_translated_indices_and_offsets(f_dir, inv)
offset_array_symbols = indexing._offset_array_symbols(f_dir, inv)
symbols_defined |= set(offset_array_symbols)
for d, arrsymb in enumerate(offset_array_symbols):
code += _array_pattern(indexing._offsets_dtype,, offsets[d])
super(BetweenTimestepsIndexing.TranslationArraysNode, self).__init__(
code, symbols_read=set(), symbols_defined=symbols_defined)
def __str__(self):
return "Variable PDF Access Translation Arrays"
def __repr__(self):
return "Variable PDF Access Translation Arrays"
# end class AdvancedStreamingIndexing
class NeighbourOffsetArrays(CustomCodeNode):
def neighbour_offset(dir_idx, stencil):
if isinstance(sp.sympify(dir_idx), sp.Integer):
return stencil[dir_idx]
return tuple([sp.IndexedBase(symbol, shape=(1,))[dir_idx]
for symbol in NeighbourOffsetArrays._offset_symbols(len(stencil[0]))])
def _offset_symbols(dim):
return [TypedSymbol(f"neighbour_offset_{d}", create_type(np.int64)) for d in ['x', 'y', 'z'][:dim]]
def __init__(self, stencil, offsets_dtype=np.int32):
offsets_dtype = create_type(offsets_dtype)
dim = len(stencil[0])
array_symbols = NeighbourOffsetArrays._offset_symbols(dim)
code = "\n"
for i, arrsymb in enumerate(array_symbols):
code += _array_pattern(offsets_dtype,, (d[i] for d in stencil))
offset_symbols = NeighbourOffsetArrays._offset_symbols(dim)
super(NeighbourOffsetArrays, self).__init__(code, symbols_read=set(),
from lbmpy.fieldaccess import PdfFieldAccessor, \
StreamPullTwoFieldsAccessor, \
StreamPushTwoFieldsAccessor, \
AAEvenTimeStepAccessor, \
AAOddTimeStepAccessor, \
EsoTwistEvenTimeStepAccessor, \
import numpy as np
import pystencils as ps
from enum import IntEnum
class Timestep(IntEnum):
EVEN = 0
ODD = 1
BOTH = 2
def next(self):
return self if self == Timestep.BOTH else Timestep((self + 1) % 2)
def idx(self):
"""To use this timestep as an array index"""
return self % 2
streaming_patterns = ['push', 'pull', 'aa', 'esotwist']
even_accessors = {
'pull': StreamPullTwoFieldsAccessor,
'push': StreamPushTwoFieldsAccessor,
'aa': AAEvenTimeStepAccessor,
'esotwist': EsoTwistEvenTimeStepAccessor
odd_accessors = {
'pull': StreamPullTwoFieldsAccessor,
'push': StreamPushTwoFieldsAccessor,
'aa': AAOddTimeStepAccessor,
'esotwist': EsoTwistOddTimeStepAccessor
def get_accessor(streaming_pattern: str, timestep: Timestep) -> PdfFieldAccessor:
if streaming_pattern not in streaming_patterns:
raise ValueError(
"Invalid value of parameter 'streaming_pattern'.", streaming_pattern)
if timestep == Timestep.EVEN:
return even_accessors[streaming_pattern]
return odd_accessors[streaming_pattern]
def is_inplace(streaming_pattern):
if streaming_pattern not in streaming_patterns:
raise ValueError('Invalid streaming pattern', streaming_pattern)
return streaming_pattern in ['aa', 'esotwist']
def get_timesteps(streaming_pattern):
return (Timestep.EVEN, Timestep.ODD) if is_inplace(streaming_pattern) else (Timestep.BOTH, )
def numeric_offsets(field_access: ps.Field.Access):
return tuple(int(o) for o in field_access.offsets)
def numeric_index(field_access: ps.Field.Access):
return tuple(int(i) for i in field_access.index)
def inverse_dir_index(stencil, direction):
return stencil.index(tuple(-d for d in stencil[direction]))