From 51bdeacceab790f094620eb4702c23c1dfdd220c Mon Sep 17 00:00:00 2001
From: Nils Kohl <nils.kohl@fau.de>
Date: Tue, 30 Jun 2020 16:06:55 +0200
Subject: [PATCH] Implemented unknown sender communication.

See issue #121 and MR !297

Allows to set the receiver info in the GenericBufferSystem
to a number of sender ranks, without specifying the exact
ranks themselves.

Implementation optimized for minimal code duplication.
Functionality was implemented by extending IProbe-based
communication.
---
 src/core/mpi/BufferSystem.h            |  3 +-
 src/core/mpi/BufferSystem.impl.h       | 46 ++++++++++++++++++++++----
 src/core/mpi/BufferSystemHelper.h      |  6 ++--
 src/core/mpi/BufferSystemHelper.impl.h | 22 ++++++++++--
 4 files changed, 64 insertions(+), 13 deletions(-)

diff --git a/src/core/mpi/BufferSystem.h b/src/core/mpi/BufferSystem.h
index e37c3a76a..b40fa75f5 100644
--- a/src/core/mpi/BufferSystem.h
+++ b/src/core/mpi/BufferSystem.h
@@ -233,12 +233,13 @@ protected:
    internal::KnownSizeCommunication<RecvBuffer_T, SendBuffer_T>         knownSizeComm_;
    internal::UnknownSizeCommunication<RecvBuffer_T, SendBuffer_T>       unknownSizeComm_;
    internal::UnknownSizeCommunicationIProbe<RecvBuffer_T, SendBuffer_T> unknownSizeCommIProbe_;
+   internal::UnknownSizeCommunicationIProbe<RecvBuffer_T, SendBuffer_T> unknownSizeAndSenderCommIProbe_;
    internal::NoMPICommunication<RecvBuffer_T, SendBuffer_T>             noMPIComm_;
    internal::AbstractCommunication<RecvBuffer_T, SendBuffer_T> *        currentComm_;  //< after receiver setup, this points to unknown- or knownSizeComm_
 
    bool sizeChangesEverytime_; //< if set to true, the receiveSizeUnknown_ is set to true before communicating
    bool communicationRunning_; //< indicates if a communication step is currently running
-
+   bool senderKnown_;          //< if false, the sender ranks are unknown before receiving
 
    /// Info about the message to be received from a certain rank:
    /// information holds the buffer and, if known, the message size
diff --git a/src/core/mpi/BufferSystem.impl.h b/src/core/mpi/BufferSystem.impl.h
index 242df47c8..c7f5656d5 100644
--- a/src/core/mpi/BufferSystem.impl.h
+++ b/src/core/mpi/BufferSystem.impl.h
@@ -23,6 +23,7 @@
 #include "core/mpi/MPIManager.h"
 #include "core/debug/CheckFunctions.h"
 
+#include <algorithm>
 
 namespace walberla {
 namespace mpi {
@@ -128,10 +129,12 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const MPI_Comm & communicator,
    : knownSizeComm_  ( communicator, tag ),
      unknownSizeComm_( communicator, tag ),
      unknownSizeCommIProbe_( communicator, tag ),
+     unknownSizeAndSenderCommIProbe_( communicator, tag, false ),
      noMPIComm_( communicator, tag ),
      currentComm_    ( nullptr ),
      sizeChangesEverytime_( true ),
-     communicationRunning_( false )
+     communicationRunning_( false ),
+     senderKnown_( true )
 {
 }
 
@@ -140,10 +143,12 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth
    : knownSizeComm_  ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
      unknownSizeComm_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
      unknownSizeCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
+     unknownSizeAndSenderCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag(), false ),
      noMPIComm_      ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
      currentComm_ ( nullptr ),
      sizeChangesEverytime_( other.sizeChangesEverytime_ ),
      communicationRunning_( other.communicationRunning_ ),
+     senderKnown_( true ),
      recvInfos_( other.recvInfos_ ),
      sendInfos_( other.sendInfos_ )
 {
@@ -154,6 +159,8 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth
       currentComm_ = &unknownSizeComm_;
    else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
       currentComm_ = &unknownSizeCommIProbe_;
+   else if ( other.currentComm_ == &other.unknownSizeAndSenderCommIProbe_ )
+      currentComm_ = &unknownSizeAndSenderCommIProbe_;
    else if ( other.currentComm_ == &other.noMPIComm_ )
       currentComm_ = &noMPIComm_;
    else
@@ -167,6 +174,7 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene
 
    sizeChangesEverytime_ = other.sizeChangesEverytime_;
    communicationRunning_ = other.communicationRunning_;
+   senderKnown_ = other.senderKnown_;
    recvInfos_ = other.recvInfos_;
    sendInfos_ = other.sendInfos_;
 
@@ -176,6 +184,8 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene
       currentComm_ = &unknownSizeComm_;
    else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
       currentComm_ = &unknownSizeCommIProbe_;
+   else if ( other.currentComm_ == &other.unknownSizeAndSenderCommIProbe_ )
+      currentComm_ = &unknownSizeAndSenderCommIProbe_;
    else if ( other.currentComm_ == &other.noMPIComm_ )
       currentComm_ = &noMPIComm_;
    else
@@ -228,7 +238,20 @@ void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const std::set<MPIRank> & ran
 template< typename Rb, typename Sb>
 void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const int numReceives )
 {
-   WALBERLA_ABORT("NOT IMPLEMENTED!");
+   WALBERLA_ASSERT( ! communicationRunning_ );
+
+   recvInfos_.clear();
+   for ( MPIRank sender = 0; sender < numReceives; sender++ )
+   {
+      recvInfos_[ - 1 - sender ].size = INVALID_SIZE;
+   }
+
+   // "any sender"-communication is only supported via IProbe
+   useIProbe( true );
+
+   sizeChangesEverytime_ = true;
+   senderKnown_          = false;
+   setCommunicationType( false );
 }
 
 
@@ -526,12 +549,21 @@ void GenericBufferSystem<Rb, Sb>::setCommunicationType( const bool knownSize )
 
    WALBERLA_MPI_SECTION()
    {
-      if( knownSize )
-         currentComm_ = &knownSizeComm_;
-      else if ( useIProbe_ )
-         currentComm_ = &unknownSizeCommIProbe_;
+      if ( senderKnown_ )
+      {
+         if (knownSize)
+            currentComm_ = &knownSizeComm_;
+         else if (useIProbe_)
+            currentComm_ = &unknownSizeCommIProbe_;
+         else
+            currentComm_ = &unknownSizeComm_;
+      }
       else
-         currentComm_ = &unknownSizeComm_;
+      {
+         WALBERLA_CHECK( useIProbe_, "Unknown sender communication is currently only supported with IProbe-based "
+                                     "communication." )
+         currentComm_ = &unknownSizeAndSenderCommIProbe_;
+      }
    }
 }
 
diff --git a/src/core/mpi/BufferSystemHelper.h b/src/core/mpi/BufferSystemHelper.h
index 661e46358..15405e432 100644
--- a/src/core/mpi/BufferSystemHelper.h
+++ b/src/core/mpi/BufferSystemHelper.h
@@ -167,8 +167,9 @@ namespace internal {
    public:
       using typename AbstractCommunication<RecvBuffer_T, SendBuffer_T>::ReceiveInfo;
 
-      UnknownSizeCommunicationIProbe( const MPI_Comm & communicator, int tag = 0 )
-           :  AbstractCommunication<RecvBuffer_T, SendBuffer_T>( communicator, tag ), sending_(false), receiving_(false) {}
+      UnknownSizeCommunicationIProbe( const MPI_Comm & communicator, int tag = 0, bool senderKnown = true )
+           :  AbstractCommunication<RecvBuffer_T, SendBuffer_T>( communicator, tag ), sending_(false), receiving_(false),
+              senderKnown_( senderKnown ) {}
 
       virtual ~UnknownSizeCommunicationIProbe() {}
 
@@ -184,6 +185,7 @@ namespace internal {
       bool sending_;
       bool receiving_;
       int  pendingReceives_;
+      bool senderKnown_;
 
       std::vector<MPI_Request> sendRequests_;
    };
diff --git a/src/core/mpi/BufferSystemHelper.impl.h b/src/core/mpi/BufferSystemHelper.impl.h
index fc2c6a5ae..93f89606c 100644
--- a/src/core/mpi/BufferSystemHelper.impl.h
+++ b/src/core/mpi/BufferSystemHelper.impl.h
@@ -352,6 +352,7 @@ MPIRank UnknownSizeCommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank,
 //======================================================================================================================
 //
 //  Unknown Size Communication (IProbe method)
+//  Can also be used if there are unknown sender ranks.
 //
 //======================================================================================================================
 
@@ -438,15 +439,29 @@ MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPI
 
          if (recvInfo.size != INVALID_SIZE) continue;
 
+         const MPIRank iprobeSender  = senderKnown_ ? sender : MPI_ANY_SOURCE;
+
          int probeFlag;
          MPI_Status probeStatus;
-         MPI_Iprobe( sender,
+         MPI_Iprobe( iprobeSender,
                      this->tag_,
                      this->communicator_,
                      &probeFlag,
                      &probeStatus);
          if (probeFlag)
          {
+            const MPIRank actualSender = probeStatus.MPI_SOURCE;
+
+            if ( !senderKnown_ )
+            {
+               recvInfos.erase( sender );
+               recvInfo = recvInfos[actualSender];
+            }
+            else
+            {
+               WALBERLA_ASSERT_EQUAL( sender, actualSender );
+            }
+
             int count = 0;
             MPI_Get_count( &probeStatus, MPI_BYTE, &count );
             //WALBERLA_LOG_DEVEL("received " << count << " from " << sender);
@@ -457,14 +472,14 @@ MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPI
             MPI_Recv( recvInfo.buffer.ptr(),      // where to store received size
                       count,                      // size of expected message
                       MPI_BYTE,                   // type
-                      sender,                     // rank of sender process
+                      actualSender,               // rank of sender process
                       this->tag_,                 // message tag
                       this->communicator_,        // communicator
                       &recvStatus                 // request, needed for wait
                       );
 
             --pendingReceives_;
-            return sender;
+            return actualSender;
          }
       }
    }
@@ -539,6 +554,7 @@ MPIRank NoMPICommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank, Receiv
 
 
 
+
 } // namespace internal
 } // namespace mpi
 } // namespace walberla
-- 
GitLab