Skip to content
Snippets Groups Projects
Commit 032f1be5 authored by Martin Bauer's avatar Martin Bauer
Browse files

Added communication and periodicity to data handling

parent ed36b869
Branches
Tags
No related merge requests found
......@@ -5,6 +5,7 @@ from collections import defaultdict
from contextlib import contextmanager
from lbmpy.boundaries.periodicityhandling import PeriodicityHandling
from lbmpy.stencils import getStencil
from pystencils import Field, makeSlice
from pystencils.parallel.blockiteration import BlockIterationInfo
from pystencils.slicing import normalizeSlice, removeGhostLayers
......@@ -196,6 +197,24 @@ class DataHandling(ABC):
"""Copies data from CPU to GPU for all arrays that have a CPU and a GPU representation"""
pass
# ------------------------------- Communication --------------------------------------------------------------------
def synchronizationFunctionCPU(self, names, stencil=None, **kwargs):
"""
Synchronizes ghost layers for distributed arrays - for serial scenario this has to be called
for correct periodicity handling
:param names: what data to synchronize: name of array or sequence of names
:param stencil: stencil as string defining which neighbors are synchronized e.g. 'D2Q9', 'D3Q19'
if None, a full synchronization (i.e. D2Q9 or D3Q27) is done
:param kwargs: implementation specific, optional optimization parameters for communication
:return: function object to run the communication
"""
def synchronizationFunctionGPU(self, names, stencil=None, **kwargs):
"""
Synchronization of GPU fields, for documentation see CPU version above
"""
class SerialDataHandling(DataHandling):
......@@ -206,7 +225,7 @@ class SerialDataHandling(DataHandling):
def __enter__(self, *args, **kwargs):
return self.arr
def __init__(self, domainSize, defaultGhostLayers=1, defaultLayout='SoA'):
def __init__(self, domainSize, defaultGhostLayers=1, defaultLayout='SoA', periodicity=False):
"""
Creates a data handling for single node simulations
......@@ -221,12 +240,12 @@ class SerialDataHandling(DataHandling):
self._fields = DotDict()
self.cpuArrays = DotDict()
self.gpuArrays = DotDict()
#if periodicity is None or periodicity is False:
# periodicity = [False] * self.dim
#if periodicity is True:
# periodicity = [True] * self.dim
#
#self._periodicity = periodicity
if periodicity is None or periodicity is False:
periodicity = [False] * self.dim
if periodicity is True:
periodicity = [True] * self.dim
self._periodicity = periodicity
self._fieldInformation = {}
@property
......@@ -250,7 +269,7 @@ class SerialDataHandling(DataHandling):
kwargs = {
'shape': tuple(s + 2 * ghostLayers for s in self._domainSize),
'dtype': dtype,
'order': 'c' if layout == 'AoS' else 'f',
'order': 'C' if layout == 'AoS' else 'F',
}
self._fieldInformation[name] = {
'ghostLayers': ghostLayers,
......@@ -287,7 +306,10 @@ class SerialDataHandling(DataHandling):
raise ValueError("addLike() does not work for flag arrays")
self.add(name,latexName=latexName, cpu=cpu, gpu=gpu, **self._fieldInformation[nameOfTemplateField])
def access(self, name, sliceObj=None, outerGhostLayers=0, **kwargs):
def access(self, name, sliceObj=None, outerGhostLayers='all', **kwargs):
if outerGhostLayers == 'all':
outerGhostLayers = self._fieldInformation[name]['ghostLayers']
if sliceObj is None:
sliceObj = [slice(None, None)] * self.dim
......@@ -328,3 +350,56 @@ class SerialDataHandling(DataHandling):
def toGpu(self, name):
self.gpuArrays[name].set(self.cpuArrays[name])
def synchronizationFunctionCPU(self, names, stencilName=None, **kwargs):
return self._synchronizationFunctor(names, stencilName, 'cpu')
def synchronizationFunctionGPU(self, names, stencilName=None, **kwargs):
return self._synchronizationFunctor(names, stencilName, 'gpu')
def _synchronizationFunctor(self, names, stencil, target):
if stencil is None:
stencil = 'D3Q27' if self.dim == 3 else 'D2Q9'
assert stencil in ("D2Q9", 'D3Q27'), "Serial scenario support only D2Q9 or D3Q27 for periodicity sync"
assert target in ('cpu', 'gpu')
if not hasattr(names, '__len__') or type(names) is str:
names = [names]
filteredStencil = []
for direction in getStencil(stencil):
useDirection = True
if direction == (0, 0) or direction == (0, 0, 0):
useDirection = False
for component, periodicity in zip(direction, self._periodicity):
if not periodicity and component != 0:
useDirection = False
if useDirection:
filteredStencil.append(direction)
resultFunctors = []
for name in names:
gls = self._fieldInformation[name]['ghostLayers']
if len(filteredStencil) > 0:
if target == 'cpu':
from pystencils.slicing import getPeriodicBoundaryFunctor
resultFunctors.append(getPeriodicBoundaryFunctor(filteredStencil, ghostLayers=gls))
else:
from pystencils.gpucuda.periodicity import getPeriodicBoundaryFunctor
resultFunctors.append(getPeriodicBoundaryFunctor(filteredStencil, self._domainSize,
indexDimensions=self.fields[name].indexDimensions,
indexDimShape=self._fieldInformation[name]['fSize'],
dtype=self.fields[name].dtype.numpyDtype,
ghostLayers=gls))
if target == 'cpu':
def resultFunctor():
for func in resultFunctors:
func(pdfs=self.cpuArrays[name])
else:
def resultFunctor():
for func in resultFunctors:
func(pdfs=self.gpuArrays[name])
return resultFunctor
......@@ -458,10 +458,11 @@ def spatialLayoutStringToTuple(layoutStr, dim):
def layoutStringToTuple(layoutStr, dim):
if layoutStr == 'fzyx' or layoutStr == 'SoA':
layoutStr = layoutStr.lower()
if layoutStr == 'fzyx' or layoutStr == 'soa':
assert dim <= 4
return tuple(reversed(range(dim)))
elif layoutStr == 'zyxf' or layoutStr == 'AoS':
elif layoutStr == 'zyxf' or layoutStr == 'aos':
assert dim <= 4
return tuple(reversed(range(dim - 1))) + (dim-1,)
elif layoutStr == 'f' or layoutStr == 'reverseNumpy':
......
import sympy as sp
import numpy as np
from pystencils import Field
from pystencils.slicing import normalizeSlice, getPeriodicBoundarySrcDstSlices
from pystencils.gpucuda import makePythonFunction
from pystencils.gpucuda.kernelcreation import createCUDAKernel
def createCopyKernel(domainSize, fromSlice, toSlice, indexDimensions=0, indexDimShape=1):
def createCopyKernel(domainSize, fromSlice, toSlice, indexDimensions=0, indexDimShape=1, dtype=np.float64):
"""Copies a rectangular part of an array to another non-overlapping part"""
if indexDimensions not in (0, 1):
raise NotImplementedError("Works only for one or zero index coordinates")
f = Field.createGeneric("pdfs", len(domainSize), indexDimensions=indexDimensions)
f = Field.createGeneric("pdfs", len(domainSize), indexDimensions=indexDimensions, dtype=dtype)
normalizedFromSlice = normalizeSlice(fromSlice, f.spatialShape)
normalizedToSlice = normalizeSlice(toSlice, f.spatialShape)
......@@ -26,12 +27,13 @@ def createCopyKernel(domainSize, fromSlice, toSlice, indexDimensions=0, indexDim
return makePythonFunction(ast)
def getPeriodicBoundaryFunctor(stencil, domainSize, indexDimensions=0, indexDimShape=1, ghostLayers=1, thickness=None):
def getPeriodicBoundaryFunctor(stencil, domainSize, indexDimensions=0, indexDimShape=1, ghostLayers=1,
thickness=None, dtype=float):
srcDstSliceTuples = getPeriodicBoundarySrcDstSlices(stencil, ghostLayers, thickness)
kernels = []
indexDimensions = indexDimensions
for srcSlice, dstSlice in srcDstSliceTuples:
kernels.append(createCopyKernel(domainSize, srcSlice, dstSlice, indexDimensions, indexDimShape))
kernels.append(createCopyKernel(domainSize, srcSlice, dstSlice, indexDimensions, indexDimShape, dtype))
def functor(pdfs, **kwargs):
for kernel in kernels:
......
......@@ -58,13 +58,12 @@ class ParallelDataHandling(DataHandling):
for block in self.blocks:
block[name1].swapDataPointers(block[name2])
def access(self, name, sliceObj=None, innerGhostLayers=None, outerGhostLayers=0):
def access(self, name, sliceObj=None, innerGhostLayers='all', outerGhostLayers='all'):
fieldInfo = self._fieldInformation[name]
with self.accessWrapper(name):
if innerGhostLayers is None:
if innerGhostLayers is 'all':
innerGhostLayers = fieldInfo['ghostLayers']
if outerGhostLayers is None:
if outerGhostLayers is 'all':
outerGhostLayers = fieldInfo['ghostLayers']
for iterInfo in slicedBlockIteration(self.blocks, sliceObj, innerGhostLayers, outerGhostLayers):
......@@ -102,6 +101,12 @@ class ParallelDataHandling(DataHandling):
for cpuName, gpuName in self._cpuGpuPairs:
wlb.cuda.copyFieldToGpu(self.blocks, gpuName, cpuName)
def synchronizationFunctionCPU(self, names, stencil=None, buffered=True, **kwargs):
return self._synchronizationFunction(names, stencil, buffered, 'cpu')
def synchronizationFunctionGPU(self, names, stencil=None, buffered=True, **kwargs):
return self._synchronizationFunction(names, stencil, buffered, 'gpu')
def _add(self, name, fSize=1, dtype=np.float64, latexName=None, ghostLayers=None, layout=None,
cpu=True, gpu=False, flagField=False):
if ghostLayers is None:
......@@ -150,3 +155,23 @@ class ParallelDataHandling(DataHandling):
assert all(f.name != latexName for f in self.fields.values()), "Symbolic field with this name already exists"
self.fields[name] = Field.createFixedSize(latexName, shape, indexDimensions, dtype, layout)
def _synchronizationFunction(self, names, stencil, buffered, target):
if stencil is None:
stencil = 'D3Q27' if self.dim == 3 else 'D2Q9'
if not hasattr(names, '__len__') or type(names) is str:
names = [names]
createScheme = wlb.createUniformBufferedScheme if buffered else wlb.createUniformDirectScheme
if target == 'cpu':
createPacking = wlb.field.createPackInfo if buffered else wlb.field.createMPIDatatypeInfo
elif target == 'gpu':
createPacking = wlb.cuda.createPackInfo if buffered else wlb.cuda.createMPIDatatypeInfo
names = [self.GPU_DATA_PREFIX + name for name in names]
syncFunction = createScheme(self.blocks, stencil)
for name in names:
syncFunction.addDataToCommunicate(createPacking(self.blocks, name))
return syncFunction
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment