Skip to content
Snippets Groups Projects
Commit dd95c69d authored by Martin Bauer's avatar Martin Bauer
Browse files

DataHandling: Custom data support

parent 0e2c793a
No related merge requests found
...@@ -15,6 +15,7 @@ try: ...@@ -15,6 +15,7 @@ try:
except ImportError: except ImportError:
gpuarray = None gpuarray = None
class DataHandling(ABC): class DataHandling(ABC):
""" """
Manages the storage of arrays and maps them to a symbolic field. Manages the storage of arrays and maps them to a symbolic field.
...@@ -37,7 +38,7 @@ class DataHandling(ABC): ...@@ -37,7 +38,7 @@ class DataHandling(ABC):
"""Dimension of the domain, either 2 or 3""" """Dimension of the domain, either 2 or 3"""
@abstractmethod @abstractmethod
def add(self, name, fSize=1, dtype=np.float64, latexName=None, ghostLayers=None, layout=None, cpu=True, gpu=False): def addArray(self, name, fSize=1, dtype=np.float64, latexName=None, ghostLayers=None, layout=None, cpu=True, gpu=False):
""" """
Adds a (possibly distributed) array to the handling that can be accessed using the given name. Adds a (possibly distributed) array to the handling that can be accessed using the given name.
For each array a symbolic field is available via the 'fields' dictionary For each array a symbolic field is available via the 'fields' dictionary
...@@ -63,7 +64,7 @@ class DataHandling(ABC): ...@@ -63,7 +64,7 @@ class DataHandling(ABC):
""" """
@abstractmethod @abstractmethod
def addLike(self, name, nameOfTemplateField, latexName=None, cpu=True, gpu=False): def addArrayLike(self, name, nameOfTemplateField, latexName=None, cpu=True, gpu=False):
""" """
Adds an array with the same parameters (number of ghost layers, fSize, dtype) as existing array Adds an array with the same parameters (number of ghost layers, fSize, dtype) as existing array
:param name: name of new array :param name: name of new array
...@@ -73,13 +74,26 @@ class DataHandling(ABC): ...@@ -73,13 +74,26 @@ class DataHandling(ABC):
:param gpu: see 'add' method :param gpu: see 'add' method
""" """
@abstractmethod
def addCustomData(self, name, cpuCreationFunction,
gpuCreationFunction=None, cpuToGpuTransferFunc=None, gpuToCpuTransferFunc=None):
"""
Adds custom (non-array) data to domain
:param name: name to access data
:param cpuCreationFunction: function returning a new instance of the data that should be stored
:param gpuCreationFunction: optional, function returning a new instance, stored on GPU
:param cpuToGpuTransferFunc: function that transfers cpu to gpu version, getting two parameters (gpuInstance, cpuInstance)
:param gpuToCpuTransferFunc: function that transfers gpu to cpu version, getting two parameters (gpuInstance, cpuInstance)
:return:
"""
@property @property
@abstractmethod @abstractmethod
def fields(self): def fields(self):
"""Dictionary mapping data name to symbolic pystencils field - use this to create pystencils kernels""" """Dictionary mapping data name to symbolic pystencils field - use this to create pystencils kernels"""
@abstractmethod @abstractmethod
def access(self, name, sliceObj=None, innerGhostLayers=None, outerGhostLayers=0): def accessArray(self, name, sliceObj=None, innerGhostLayers=None, outerGhostLayers=0):
""" """
Generator yielding locally stored sub-arrays together with information about their place in the global domain Generator yielding locally stored sub-arrays together with information about their place in the global domain
...@@ -91,7 +105,13 @@ class DataHandling(ABC): ...@@ -91,7 +105,13 @@ class DataHandling(ABC):
""" """
@abstractmethod @abstractmethod
def gather(self, name, sliceObj=None, allGather=False): def accessCustomData(self, name):
"""
Similar to accessArray, however for custom data no slicing and ghost layer info is necessary/available
"""
@abstractmethod
def gatherArray(self, name, sliceObj=None, allGather=False):
""" """
Gathers part of the domain on a local process. Whenever possible use 'access' instead, since this method copies Gathers part of the domain on a local process. Whenever possible use 'access' instead, since this method copies
the distributed data to a single process which is inefficient and may exhaust the available memory the distributed data to a single process which is inefficient and may exhaust the available memory
...@@ -182,6 +202,10 @@ class SerialDataHandling(DataHandling): ...@@ -182,6 +202,10 @@ class SerialDataHandling(DataHandling):
self._fields = DotDict() self._fields = DotDict()
self.cpuArrays = DotDict() self.cpuArrays = DotDict()
self.gpuArrays = DotDict() self.gpuArrays = DotDict()
self.customDataCpu = DotDict()
self.customDataGpu = DotDict()
self._customDataTransferFunctions = {}
if periodicity is None or periodicity is False: if periodicity is None or periodicity is False:
periodicity = [False] * self.dim periodicity = [False] * self.dim
if periodicity is True: if periodicity is True:
...@@ -198,7 +222,7 @@ class SerialDataHandling(DataHandling): ...@@ -198,7 +222,7 @@ class SerialDataHandling(DataHandling):
def fields(self): def fields(self):
return self._fields return self._fields
def add(self, name, fSize=1, dtype=np.float64, latexName=None, ghostLayers=None, layout=None, cpu=True, gpu=False): def addArray(self, name, fSize=1, dtype=np.float64, latexName=None, ghostLayers=None, layout=None, cpu=True, gpu=False):
if ghostLayers is None: if ghostLayers is None:
ghostLayers = self.defaultGhostLayers ghostLayers = self.defaultGhostLayers
if layout is None: if layout is None:
...@@ -239,13 +263,24 @@ class SerialDataHandling(DataHandling): ...@@ -239,13 +263,24 @@ class SerialDataHandling(DataHandling):
self.fields[name] = Field.createFixedSize(latexName, shape=kwargs['shape'], indexDimensions=indexDimensions, self.fields[name] = Field.createFixedSize(latexName, shape=kwargs['shape'], indexDimensions=indexDimensions,
dtype=kwargs['dtype'], layout=kwargs['order']) dtype=kwargs['dtype'], layout=kwargs['order'])
def addCustomData(self, name, cpuCreationFunction,
gpuCreationFunction=None, cpuToGpuTransferFunc=None, gpuToCpuTransferFunc=None):
assert name not in self.cpuArrays
assert name not in self.customDataCpu
self.customDataCpu[name] = cpuCreationFunction()
if gpuCreationFunction:
self.customDataGpu[name] = gpuCreationFunction()
if cpuToGpuTransferFunc is None or gpuToCpuTransferFunc is None:
raise ValueError("For GPU data, both transfer functions have to be specified")
self._customDataTransferFunctions[name] = (cpuToGpuTransferFunc, gpuToCpuTransferFunc)
def hasData(self, name): def hasData(self, name):
return name in self.fields return name in self.fields
def addLike(self, name, nameOfTemplateField, latexName=None, cpu=True, gpu=False): def addArrayLike(self, name, nameOfTemplateField, latexName=None, cpu=True, gpu=False):
self.add(name,latexName=latexName, cpu=cpu, gpu=gpu, **self._fieldInformation[nameOfTemplateField]) self.addArray(name, latexName=latexName, cpu=cpu, gpu=gpu, **self._fieldInformation[nameOfTemplateField])
def access(self, name, sliceObj=None, outerGhostLayers='all', **kwargs): def accessArray(self, name, sliceObj=None, outerGhostLayers='all', **kwargs):
if outerGhostLayers == 'all': if outerGhostLayers == 'all':
outerGhostLayers = self._fieldInformation[name]['ghostLayers'] outerGhostLayers = self._fieldInformation[name]['ghostLayers']
...@@ -260,7 +295,10 @@ class SerialDataHandling(DataHandling): ...@@ -260,7 +295,10 @@ class SerialDataHandling(DataHandling):
sliceObj = normalizeSlice(sliceObj, arr.shape[:self.dim]) sliceObj = normalizeSlice(sliceObj, arr.shape[:self.dim])
yield arr[sliceObj], BlockIterationInfo(None, tuple(s.start for s in sliceObj), sliceObj) yield arr[sliceObj], BlockIterationInfo(None, tuple(s.start for s in sliceObj), sliceObj)
def gather(self, name, sliceObj=None, **kwargs): def accessCustomData(self, name):
yield self.customDataCpu[name], ((0,0,0)[:self.dim], self._domainSize)
def gatherArray(self, name, sliceObj=None, **kwargs):
with self.accessWrapper(name): with self.accessWrapper(name):
gls = self._fieldInformation[name]['ghostLayers'] gls = self._fieldInformation[name]['ghostLayers']
arr = self.cpuArrays[name] arr = self.cpuArrays[name]
...@@ -277,18 +315,26 @@ class SerialDataHandling(DataHandling): ...@@ -277,18 +315,26 @@ class SerialDataHandling(DataHandling):
self.gpuArrays[name1], self.gpuArrays[name2] = self.gpuArrays[name2], self.gpuArrays[name1] self.gpuArrays[name1], self.gpuArrays[name2] = self.gpuArrays[name2], self.gpuArrays[name1]
def allToCpu(self): def allToCpu(self):
for name in self.cpuArrays.keys() & self.gpuArrays.keys(): for name in (self.cpuArrays.keys() & self.gpuArrays.keys()) | self._customDataTransferFunctions.keys():
self.toCpu(name) self.toCpu(name)
def allToGpu(self): def allToGpu(self):
for name in self.cpuArrays.keys() & self.gpuArrays.keys(): for name in (self.cpuArrays.keys() & self.gpuArrays.keys()) | self._customDataTransferFunctions.keys():
self.toGpu(name) self.toGpu(name)
def toCpu(self, name): def toCpu(self, name):
self.gpuArrays[name].get(self.cpuArrays[name]) if name in self._customDataTransferFunctions:
transferFunc = self._customDataTransferFunctions[name][1]
transferFunc(self.customDataGpu[name], self.customDataCpu[name])
else:
self.gpuArrays[name].get(self.cpuArrays[name])
def toGpu(self, name): def toGpu(self, name):
self.gpuArrays[name].set(self.cpuArrays[name]) if name in self._customDataTransferFunctions:
transferFunc = self._customDataTransferFunctions[name][0]
transferFunc(self.customDataGpu[name], self.customDataCpu[name])
else:
self.gpuArrays[name].set(self.cpuArrays[name])
def synchronizationFunctionCPU(self, names, stencilName=None, **kwargs): def synchronizationFunctionCPU(self, names, stencilName=None, **kwargs):
return self._synchronizationFunctor(names, stencilName, 'cpu') return self._synchronizationFunctor(names, stencilName, 'cpu')
......
...@@ -30,6 +30,7 @@ class ParallelDataHandling(DataHandling): ...@@ -30,6 +30,7 @@ class ParallelDataHandling(DataHandling):
self._dim = dim self._dim = dim
self._fieldInformation = {} self._fieldInformation = {}
self._cpuGpuPairs = [] self._cpuGpuPairs = []
self._customDataTransferFunctions = {}
if self._dim == 2: if self._dim == 2:
assert self.blocks.getDomainCellBB().size[2] == 1 assert self.blocks.getDomainCellBB().size[2] == 1
...@@ -41,7 +42,16 @@ class ParallelDataHandling(DataHandling): ...@@ -41,7 +42,16 @@ class ParallelDataHandling(DataHandling):
def fields(self): def fields(self):
return self._fields return self._fields
def add(self, name, fSize=1, dtype=np.float64, latexName=None, ghostLayers=None, layout=None, cpu=True, gpu=False): def addCustomData(self, name, cpuCreationFunction,
gpuCreationFunction=None, cpuToGpuTransferFunc=None, gpuToCpuTransferFunc=None):
self.blocks.addBlockData(name, cpuCreationFunction)
if gpuCreationFunction:
self.blocks.addBlockData(self.GPU_DATA_PREFIX + name, gpuCreationFunction)
if cpuToGpuTransferFunc is None or gpuToCpuTransferFunc is None:
raise ValueError("For GPU data, both transfer functions have to be specified")
self._customDataTransferFunctions[name] = (cpuToGpuTransferFunc, gpuToCpuTransferFunc)
def addArray(self, name, fSize=1, dtype=np.float64, latexName=None, ghostLayers=None, layout=None, cpu=True, gpu=False):
if ghostLayers is None: if ghostLayers is None:
ghostLayers = self.defaultGhostLayers ghostLayers = self.defaultGhostLayers
if layout is None: if layout is None:
...@@ -85,8 +95,8 @@ class ParallelDataHandling(DataHandling): ...@@ -85,8 +95,8 @@ class ParallelDataHandling(DataHandling):
def hasData(self, name): def hasData(self, name):
return name in self._fields return name in self._fields
def addLike(self, name, nameOfTemplateField, latexName=None, cpu=True, gpu=False): def addArrayLike(self, name, nameOfTemplateField, latexName=None, cpu=True, gpu=False):
self.add(name,latexName=latexName, cpu=cpu, gpu=gpu, **self._fieldInformation[nameOfTemplateField]) self.addArray(name, latexName=latexName, cpu=cpu, gpu=gpu, **self._fieldInformation[nameOfTemplateField])
def swap(self, name1, name2, gpu=False): def swap(self, name1, name2, gpu=False):
if gpu: if gpu:
...@@ -95,7 +105,7 @@ class ParallelDataHandling(DataHandling): ...@@ -95,7 +105,7 @@ class ParallelDataHandling(DataHandling):
for block in self.blocks: for block in self.blocks:
block[name1].swapDataPointers(block[name2]) block[name1].swapDataPointers(block[name2])
def access(self, name, sliceObj=None, innerGhostLayers='all', outerGhostLayers='all'): def accessArray(self, name, sliceObj=None, innerGhostLayers='all', outerGhostLayers='all'):
fieldInfo = self._fieldInformation[name] fieldInfo = self._fieldInformation[name]
with self.accessWrapper(name): with self.accessWrapper(name):
if innerGhostLayers is 'all': if innerGhostLayers is 'all':
...@@ -111,7 +121,16 @@ class ParallelDataHandling(DataHandling): ...@@ -111,7 +121,16 @@ class ParallelDataHandling(DataHandling):
arr = arr[:, :, 0] arr = arr[:, :, 0]
yield arr, iterInfo yield arr, iterInfo
def gather(self, name, sliceObj=None, allGather=False): def accessCustomData(self, name):
with self.accessWrapper(name):
for block in self.blocks:
data = block[name]
cellBB = self.blocks.getBlockCellBB(block)
min = cellBB.min[:self.dim]
max = tuple(e + 1 for e in cellBB.max[:self.dim])
yield data, (min, max)
def gatherArray(self, name, sliceObj=None, allGather=False):
with self.accessWrapper(name): with self.accessWrapper(name):
if sliceObj is None: if sliceObj is None:
sliceObj = makeSlice[:, :, :] sliceObj = makeSlice[:, :, :]
...@@ -123,18 +142,32 @@ class ParallelDataHandling(DataHandling): ...@@ -123,18 +142,32 @@ class ParallelDataHandling(DataHandling):
yield array yield array
def toCpu(self, name): def toCpu(self, name):
wlb.cuda.copyFieldToCpu(self.blocks, self.GPU_DATA_PREFIX + name, name) if name in self._customDataTransferFunctions:
transferFunc = self._customDataTransferFunctions[name][1]
for block in self.blocks:
transferFunc(block[self.GPU_DATA_PREFIX + name], block[name])
else:
wlb.cuda.copyFieldToCpu(self.blocks, self.GPU_DATA_PREFIX + name, name)
def toGpu(self, name): def toGpu(self, name):
wlb.cuda.copyFieldToGpu(self.blocks, self.GPU_DATA_PREFIX + name, name) if name in self._customDataTransferFunctions:
transferFunc = self._customDataTransferFunctions[name][0]
for block in self.blocks:
transferFunc(block[self.GPU_DATA_PREFIX + name], block[name])
else:
wlb.cuda.copyFieldToGpu(self.blocks, self.GPU_DATA_PREFIX + name, name)
def allToCpu(self): def allToCpu(self):
for cpuName, gpuName in self._cpuGpuPairs: for cpuName, gpuName in self._cpuGpuPairs:
wlb.cuda.copyFieldToCpu(self.blocks, gpuName, cpuName) wlb.cuda.copyFieldToCpu(self.blocks, gpuName, cpuName)
for name in self._customDataTransferFunctions.keys():
self.toCpu(name)
def allToGpu(self): def allToGpu(self):
for cpuName, gpuName in self._cpuGpuPairs: for cpuName, gpuName in self._cpuGpuPairs:
wlb.cuda.copyFieldToGpu(self.blocks, gpuName, cpuName) wlb.cuda.copyFieldToGpu(self.blocks, gpuName, cpuName)
for name in self._customDataTransferFunctions.keys():
self.toGpu(name)
def synchronizationFunctionCPU(self, names, stencil=None, buffered=True, **kwargs): def synchronizationFunctionCPU(self, names, stencil=None, buffered=True, **kwargs):
return self._synchronizationFunction(names, stencil, buffered, 'cpu') return self._synchronizationFunction(names, stencil, buffered, 'cpu')
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment