Newer
Older
from pystencils.datahandling.datahandling_interface import DataHandling
from pystencils.parallel.blockiteration import slicedBlockIteration, blockIteration
from pystencils.utils import DotDict
import waLBerla as wlb
class ParallelDataHandling(DataHandling):
GPU_DATA_PREFIX = "gpu_"
def __init__(self, blocks, defaultGhostLayers=1, defaultLayout='SoA', dim=3):
"""
Creates data handling based on waLBerla block storage
:param blocks: waLBerla block storage
:param defaultGhostLayers: nr of ghost layers used if not specified in add() method
:param defaultLayout: layout used if no layout is given to add() method
:param dim: dimension of scenario,
waLBerla always uses three dimensions, so if dim=2 the extend of the
z coordinate of blocks has to be 1
"""
super(ParallelDataHandling, self).__init__()
assert dim in (2, 3)
self.blocks = blocks
self.defaultGhostLayers = defaultGhostLayers
self.defaultLayout = defaultLayout
self._fields = DotDict() # maps name to symbolic pystencils field
self._fieldNameToCpuDataName = {}
self._fieldNameToGpuDataName = {}
self.dataNames = set()
self._dim = dim
self._fieldInformation = {}
self._cpuGpuPairs = []
self._reduceMap = {
'sum': wlb.mpi.SUM,
'min': wlb.mpi.MIN,
'max': wlb.mpi.MAX,
}
if self._dim == 2:
assert self.blocks.getDomainCellBB().size[2] == 1
@property
def dim(self):
return self._dim
@property
def shape(self):
return self.blocks.getDomainCellBB().size[:self.dim]
@property
def periodicity(self):
return self.blocks.periodic[:self._dim]
@property
def fields(self):
return self._fields
def ghostLayersOfField(self, name):
return self._fieldInformation[name]['ghostLayers']
def addCustomData(self, name, cpuCreationFunction,
gpuCreationFunction=None, cpuToGpuTransferFunc=None, gpuToCpuTransferFunc=None):
if cpuCreationFunction and gpuCreationFunction:
if cpuToGpuTransferFunc is None or gpuToCpuTransferFunc is None:
raise ValueError("For GPU data, both transfer functions have to be specified")
self._customDataTransferFunctions[name] = (cpuToGpuTransferFunc, gpuToCpuTransferFunc)
if cpuCreationFunction:
self.blocks.addBlockData(name, cpuCreationFunction)
if gpuCreationFunction:
self.blocks.addBlockData(self.GPU_DATA_PREFIX + name, gpuCreationFunction)
def addArray(self, name, fSize=1, dtype=np.float64, latexName=None, ghostLayers=None,
layout=None, cpu=True, gpu=False):
if ghostLayers is None:
ghostLayers = self.defaultGhostLayers
if layout is None:
layout = self.defaultLayout
if latexName is None:
latexName = name
if len(self.blocks) == 0:
raise ValueError("Data handling expects that each process has at least one block")
if hasattr(dtype, 'type'):
dtype = dtype.type
if name in self.blocks[0] or self.GPU_DATA_PREFIX + name in self.blocks[0]:
raise ValueError("Data with this name has already been added")
self._fieldInformation[name] = {'ghostLayers': ghostLayers,
'fSize': fSize,
'layout': layout,
'dtype': dtype}
layoutMap = {'fzyx': wlb.field.Layout.fzyx, 'zyxf': wlb.field.Layout.zyxf,
'f': wlb.field.Layout.fzyx,
'SoA': wlb.field.Layout.fzyx, 'AoS': wlb.field.Layout.zyxf}
if cpu:
wlb.field.addToStorage(self.blocks, name, dtype, fSize=fSize, layout=layoutMap[layout],
ghostLayers=ghostLayers)
if gpu:
wlb.cuda.addGpuFieldToStorage(self.blocks, self.GPU_DATA_PREFIX+name, dtype, fSize=fSize,
usePitchedMem=False, ghostLayers=ghostLayers, layout=layoutMap[layout])
if cpu and gpu:
self._cpuGpuPairs.append((name, self.GPU_DATA_PREFIX + name))
blockBB = self.blocks.getBlockCellBB(self.blocks[0])
shape = tuple(s + 2 * ghostLayers for s in blockBB.size[:self.dim])
indexDimensions = 1 if fSize > 1 else 0
if indexDimensions == 1:
shape += (fSize, )
assert all(f.name != latexName for f in self.fields.values()), "Symbolic field with this name already exists"
self.fields[name] = Field.createGeneric(latexName, self.dim, dtype, indexDimensions, layout,
indexShape=(fSize,) if indexDimensions > 0 else None)
self._fieldNameToCpuDataName[latexName] = name
if gpu:
self._fieldNameToGpuDataName[latexName] = self.GPU_DATA_PREFIX + name
def hasData(self, name):
return name in self._fields
def addArrayLike(self, name, nameOfTemplateField, latexName=None, cpu=True, gpu=False):
self.addArray(name, latexName=latexName, cpu=cpu, gpu=gpu, **self._fieldInformation[nameOfTemplateField])
def swap(self, name1, name2, gpu=False):
if gpu:
name1 = self.GPU_DATA_PREFIX + name1
name2 = self.GPU_DATA_PREFIX + name2
for block in self.blocks:
block[name1].swapDataPointers(block[name2])
def iterate(self, sliceObj=None, gpu=False, ghostLayers=True, innerGhostLayers=True):
if ghostLayers is True:
ghostLayers = self.defaultGhostLayers
elif ghostLayers is False:
ghostLayers = 0
if innerGhostLayers is True:
innerGhostLayers = self.defaultGhostLayers
elif innerGhostLayers is False:
innerGhostLayers = 0
prefix = self.GPU_DATA_PREFIX if gpu else ""
yield from slicedBlockIteration(self.blocks, sliceObj, innerGhostLayers, ghostLayers,
self.dim, prefix)
else:
yield from blockIteration(self.blocks, ghostLayers, self.dim, prefix)
def gatherArray(self, name, sliceObj=None, allGather=False):
sliceObj = tuple([slice(None, None, None)] * self.dim)
if self.dim == 2:
sliceObj += (0.5,)
for array in wlb.field.gatherGenerator(self.blocks, name, sliceObj, allGather):
if self.fields[name].indexDimensions == 0:
array = array[..., 0]
if self.dim == 2:
array = array[:, :, 0]
yield array
def _normalizeArrShape(self, arr, indexDimensions):
if indexDimensions == 0:
arr = arr[..., 0]
if self.dim == 2:
arr = arr[:, :, 0]
return arr

Martin Bauer
committed
def runKernel(self, kernelFunc, *args, **kwargs):
if kernelFunc.ast.backend == 'gpucuda':
nameMap = self._fieldNameToGpuDataName
toArray = wlb.cuda.toGpuArray
else:
nameMap = self._fieldNameToCpuDataName
toArray = wlb.field.toArray
dataUsedInKernel = [(nameMap[p.fieldName], self.fields[p.fieldName])
for p in kernelFunc.parameters if p.isFieldPtrArgument]

Martin Bauer
committed
for block in self.blocks:
fieldArgs = {}
for dataName, f in dataUsedInKernel:
arr = toArray(block[dataName], withGhostLayers=[True, True, self.dim == 3])
arr = self._normalizeArrShape(arr, f.indexDimensions)
fieldArgs[f.name] = arr

Martin Bauer
committed
fieldArgs.update(kwargs)

Martin Bauer
committed
if name in self._customDataTransferFunctions:
transferFunc = self._customDataTransferFunctions[name][1]
for block in self.blocks:
transferFunc(block[self.GPU_DATA_PREFIX + name], block[name])
else:
wlb.cuda.copyFieldToCpu(self.blocks, self.GPU_DATA_PREFIX + name, name)
if name in self._customDataTransferFunctions:
transferFunc = self._customDataTransferFunctions[name][0]
for block in self.blocks:
transferFunc(block[self.GPU_DATA_PREFIX + name], block[name])
else:
wlb.cuda.copyFieldToGpu(self.blocks, self.GPU_DATA_PREFIX + name, name)
def allToCpu(self):
for cpuName, gpuName in self._cpuGpuPairs:
wlb.cuda.copyFieldToCpu(self.blocks, gpuName, cpuName)
for name in self._customDataTransferFunctions.keys():
self.toCpu(name)
def allToGpu(self):
for cpuName, gpuName in self._cpuGpuPairs:
wlb.cuda.copyFieldToGpu(self.blocks, gpuName, cpuName)
for name in self._customDataTransferFunctions.keys():
self.toGpu(name)
def synchronizationFunctionCPU(self, names, stencil=None, buffered=True, **kwargs):
return self._synchronizationFunction(names, stencil, buffered, 'cpu')
def synchronizationFunctionGPU(self, names, stencil=None, buffered=True, **kwargs):
return self._synchronizationFunction(names, stencil, buffered, 'gpu')
def _synchronizationFunction(self, names, stencil, buffered, target):
if stencil is None:
stencil = 'D3Q27' if self.dim == 3 else 'D2Q9'
if not hasattr(names, '__len__') or type(names) is str:
names = [names]
createScheme = wlb.createUniformBufferedScheme if buffered else wlb.createUniformDirectScheme
if target == 'cpu':
createPacking = wlb.field.createPackInfo if buffered else wlb.field.createMPIDatatypeInfo
elif target == 'gpu':
createPacking = wlb.cuda.createPackInfo if buffered else wlb.cuda.createMPIDatatypeInfo
names = [self.GPU_DATA_PREFIX + name for name in names]
syncFunction = createScheme(self.blocks, stencil)
for name in names:
syncFunction.addDataToCommunicate(createPacking(self.blocks, name))
return syncFunction
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def reduceFloatSequence(self, sequence, operation, allReduce=False):
if allReduce:
return np.array(wlb.mpi.allreduceReal(sequence, self._reduceMap[operation.lower()]))
else:
return np.array(wlb.mpi.reduceReal(sequence, self._reduceMap[operation.lower()]))
def reduceIntSequence(self, sequence, operation, allReduce=False):
if allReduce:
return np.array(wlb.mpi.allreduceInt(sequence, self._reduceMap[operation.lower()]))
else:
return np.array(wlb.mpi.reduceInt(sequence, self._reduceMap[operation.lower()]))
def vtkWriter(self, fileName, dataNames, ghostLayers=False):
if ghostLayers is False:
ghostLayers = 0
if ghostLayers is True:
ghostLayers = min(self.ghostLayersOfField(n) for n in dataNames)
output = wlb.vtk.makeOutput(self.blocks, fileName, ghostLayers=ghostLayers)
for n in dataNames:
output.addCellDataWriter(wlb.field.createVTKWriter(self.blocks, n))
return output
def vtkWriterFlags(self, fileName, dataName, masksToName, ghostLayers=False):
if ghostLayers is False:
ghostLayers = 0
if ghostLayers is True:
ghostLayers = self.ghostLayersOfField(dataName)
output = wlb.vtk.makeOutput(self.blocks, fileName, ghostLayers=ghostLayers)
for mask, name in masksToName.items():
w = wlb.field.createBinarizationVTKWriter(self.blocks, dataName, mask, name)
output.addCellDataWriter(w)
return output