From f151c637285be929714002bb1d243bfdc434bfb6 Mon Sep 17 00:00:00 2001
From: Martin Bauer <martin.bauer@fau.de>
Date: Mon, 18 Jun 2018 13:59:40 +0200
Subject: [PATCH] Bugfixes & Improvements for MPI parallel runs

- fix for field collection and reduction operations
- support for aligned fields in parallel data handling
---
 datahandling/datahandling_interface.py |  5 +++--
 datahandling/parallel_datahandling.py  | 17 +++++++++++------
 2 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/datahandling/datahandling_interface.py b/datahandling/datahandling_interface.py
index f8c67c9b0..606a0a5cf 100644
--- a/datahandling/datahandling_interface.py
+++ b/datahandling/datahandling_interface.py
@@ -282,7 +282,7 @@ class DataHandling(ABC):
         for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
             m = np.min(b[array_name])
             result = m if result is None else np.min(result, m)
-        return self.reduce_float_sequence([result], 'min')[0] if reduce else result
+        return self.reduce_float_sequence([result], 'min', all_reduce=True)[0] if reduce else result
 
     def max(self, array_name, slice_obj=None, ghost_layers=False, inner_ghost_layers=False, reduce=True):
         """Returns the maximum value inside the domain or slice of the domain.
@@ -297,7 +297,8 @@ class DataHandling(ABC):
         for b in self.iterate(slice_obj, ghost_layers=ghost_layers, inner_ghost_layers=inner_ghost_layers):
             m = np.max(b[array_name])
             result = m if result is None else np.max(result, m)
-        return self.reduce_float_sequence([result], 'max')[0] if reduce else result
+
+        return self.reduce_float_sequence([result], 'max', all_reduce=True)[0] if reduce else result
 
     def __str__(self):
         result = ""
diff --git a/datahandling/parallel_datahandling.py b/datahandling/parallel_datahandling.py
index 0c85e4a77..26cdc95a9 100644
--- a/datahandling/parallel_datahandling.py
+++ b/datahandling/parallel_datahandling.py
@@ -100,13 +100,14 @@ class ParallelDataHandling(DataHandling):
         if name in self.blocks[0] or self.GPU_DATA_PREFIX + name in self.blocks[0]:
             raise ValueError("Data with this name has already been added")
 
-        if alignment:
-            raise NotImplementedError("Aligned field allocated not yet supported in parallel data handling")
+        if alignment is False or alignment is None:
+            alignment = 0
 
         self._fieldInformation[name] = {'ghost_layers': ghost_layers,
                                         'values_per_cell': values_per_cell,
                                         'layout': layout,
-                                        'dtype': dtype}
+                                        'dtype': dtype,
+                                        'alignment': alignment}
 
         layout_map = {'fzyx': wlb.field.Layout.fzyx, 'zyxf': wlb.field.Layout.zyxf,
                       'f': wlb.field.Layout.fzyx,
@@ -114,8 +115,10 @@ class ParallelDataHandling(DataHandling):
 
         if cpu:
             wlb.field.addToStorage(self.blocks, name, dtype, fSize=values_per_cell, layout=layout_map[layout],
-                                   ghostLayers=ghost_layers)
+                                   ghostLayers=ghost_layers, alignment=alignment)
         if gpu:
+            if alignment != 0:
+                raise ValueError("Alignment for walberla GPU fields not yet supported")
             wlb.cuda.addGpuFieldToStorage(self.blocks, self.GPU_DATA_PREFIX + name, dtype, fSize=values_per_cell,
                                           usePitchedMem=False, ghostLayers=ghost_layers, layout=layout_map[layout])
 
@@ -305,13 +308,15 @@ class ParallelDataHandling(DataHandling):
         if all_reduce:
             return np.array(wlb.mpi.allreduceReal(sequence, self._reduce_map[operation.lower()]))
         else:
-            return np.array(wlb.mpi.reduceReal(sequence, self._reduce_map[operation.lower()]))
+            result = np.array(wlb.mpi.reduceReal(sequence, self._reduce_map[operation.lower()], 0))
+            return result if wlb.mpi.worldRank() == 0 else None
 
     def reduce_int_sequence(self, sequence, operation, all_reduce=False):
         if all_reduce:
             return np.array(wlb.mpi.allreduceInt(sequence, self._reduce_map[operation.lower()]))
         else:
-            return np.array(wlb.mpi.reduceInt(sequence, self._reduce_map[operation.lower()]))
+            result = np.array(wlb.mpi.reduceInt(sequence, self._reduce_map[operation.lower()], 0))
+            return result if wlb.mpi.worldRank() == 0 else None
 
     def create_vtk_writer(self, file_name, data_names, ghost_layers=False):
         if ghost_layers is False:
-- 
GitLab