From 630fce6d0dcab28e51fa844a0dc33f99d0a5265d Mon Sep 17 00:00:00 2001
From: Sebastian Eibl <sebastian.eibl@fau.de>
Date: Wed, 10 Apr 2019 14:50:02 +0200
Subject: [PATCH] added IProbe communication for varying size communication

---
 src/core/math/Matrix2.h                |   1 +
 src/core/math/Matrix3.h                |   1 +
 src/core/math/Vector2.h                |   1 +
 src/core/math/Vector3.h                |   1 +
 src/core/mpi/BufferSystem.h            |  14 ++-
 src/core/mpi/BufferSystem.impl.h       |   8 ++
 src/core/mpi/BufferSystemHelper.h      |  28 ++++++
 src/core/mpi/BufferSystemHelper.impl.h | 127 +++++++++++++++++++++++++
 tests/core/mpi/BufferSystemTest.cpp    |  18 ++--
 9 files changed, 189 insertions(+), 10 deletions(-)

diff --git a/src/core/math/Matrix2.h b/src/core/math/Matrix2.h
index 3dae71c58..8e0504359 100644
--- a/src/core/math/Matrix2.h
+++ b/src/core/math/Matrix2.h
@@ -151,6 +151,7 @@ public:
    inline const Matrix2       getInverse()                               const;
    inline bool                isSingular()                               const;
    inline bool                isSymmetric()                              const;
+   inline Type*               data()                                     {return v_;}
    //@}
    //*******************************************************************************************************************
 
diff --git a/src/core/math/Matrix3.h b/src/core/math/Matrix3.h
index 85efdb583..311a04120 100644
--- a/src/core/math/Matrix3.h
+++ b/src/core/math/Matrix3.h
@@ -173,6 +173,7 @@ public:
                               inline const Matrix3       getCholesky()                              const;
    template< typename Other > inline const Vector3<HIGH> solve( const Vector3<Other> &rhs )         const;
                               inline Type                trace()                                    const;
+                              inline Type*               data()                                     {return v_;}
    //@}
    //*******************************************************************************************************************
 
diff --git a/src/core/math/Vector2.h b/src/core/math/Vector2.h
index b09038329..c9b9dba4b 100644
--- a/src/core/math/Vector2.h
+++ b/src/core/math/Vector2.h
@@ -159,6 +159,7 @@ public:
    inline Length          length()                       const;
    inline Type            sqrLength()                    const;
    inline Vector2<Length> getNormalized()                const;
+   inline Type*           data()                         {return v_;}
    //@}
    //*******************************************************************************************************************
 
diff --git a/src/core/math/Vector3.h b/src/core/math/Vector3.h
index eb0e84816..c322c1380 100644
--- a/src/core/math/Vector3.h
+++ b/src/core/math/Vector3.h
@@ -165,6 +165,7 @@ public:
    inline Vector3<Length> getNormalized()                const;
    inline Vector3<Length> getNormalizedOrZero()          const;
    inline void            reset();
+   inline Type*           data()                         {return v_;}
    //@}
    //*******************************************************************************************************************
 
diff --git a/src/core/mpi/BufferSystem.h b/src/core/mpi/BufferSystem.h
index 6a0048b2c..e4ba192e5 100644
--- a/src/core/mpi/BufferSystem.h
+++ b/src/core/mpi/BufferSystem.h
@@ -194,6 +194,9 @@ public:
    //@}
    //*******************************************************************************************************************
 
+   void useIProbe(const bool use) { useIProbe_ = use; }
+   bool isIProbedUsed() const { return useIProbe_; }
+
    ///Bytes sent during the current or last communication
    int64_t getBytesSent() const { return bytesSent_; }
    ///Bytes received during the current or last communication
@@ -226,10 +229,11 @@ protected:
    RecvBuffer_T * waitForNext( MPIRank & fromRank );
    void setCommunicationType( const bool knownSize );
 
-   internal::KnownSizeCommunication<RecvBuffer_T, SendBuffer_T>   knownSizeComm_;
-   internal::UnknownSizeCommunication<RecvBuffer_T, SendBuffer_T> unknownSizeComm_;
-   internal::NoMPICommunication<RecvBuffer_T, SendBuffer_T>       noMPIComm_;
-   internal::AbstractCommunication<RecvBuffer_T, SendBuffer_T> *  currentComm_;  //< after receiver setup, this points to unknown- or knownSizeComm_
+   internal::KnownSizeCommunication<RecvBuffer_T, SendBuffer_T>         knownSizeComm_;
+   internal::UnknownSizeCommunication<RecvBuffer_T, SendBuffer_T>       unknownSizeComm_;
+   internal::UnknownSizeCommunicationIProbe<RecvBuffer_T, SendBuffer_T> unknownSizeCommIProbe_;
+   internal::NoMPICommunication<RecvBuffer_T, SendBuffer_T>             noMPIComm_;
+   internal::AbstractCommunication<RecvBuffer_T, SendBuffer_T> *        currentComm_;  //< after receiver setup, this points to unknown- or knownSizeComm_
 
    bool sizeChangesEverytime_; //< if set to true, the receiveSizeUnknown_ is set to true before communicating
    bool communicationRunning_; //< indicates if a communication step is currently running
@@ -251,6 +255,8 @@ protected:
    //each concurrently running communication uses different tags
    static std::set<int> activeTags_;
 
+   bool useIProbe_ = false; ///< switch betwenn IProbe and two message communication for varying size communication
+
    int64_t bytesSent_        = 0; ///< number of bytes sent during last communication
    int64_t bytesReceived_    = 0; ///< number of bytes received during last communication
 
diff --git a/src/core/mpi/BufferSystem.impl.h b/src/core/mpi/BufferSystem.impl.h
index 240f49a0c..8a4185dd5 100644
--- a/src/core/mpi/BufferSystem.impl.h
+++ b/src/core/mpi/BufferSystem.impl.h
@@ -127,6 +127,7 @@ template< typename Rb, typename Sb>
 GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const MPI_Comm & communicator, int tag )
    : knownSizeComm_  ( communicator, tag ),
      unknownSizeComm_( communicator, tag ),
+     unknownSizeCommIProbe_( communicator, tag ),
      noMPIComm_( communicator, tag ),
      currentComm_    ( nullptr ),
      sizeChangesEverytime_( true ),
@@ -138,6 +139,7 @@ template< typename Rb, typename Sb>
 GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &other )
    : knownSizeComm_  ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
      unknownSizeComm_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
+     unknownSizeCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
      noMPIComm_      ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
      currentComm_ ( nullptr ),
      sizeChangesEverytime_( other.sizeChangesEverytime_ ),
@@ -150,6 +152,8 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth
       currentComm_ = &knownSizeComm_;
    else if ( other.currentComm_ == &other.unknownSizeComm_ )
       currentComm_ = &unknownSizeComm_;
+   else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
+      currentComm_ = &unknownSizeCommIProbe_;
    else if ( other.currentComm_ == &other.noMPIComm_ )
       currentComm_ = &noMPIComm_;
    else
@@ -170,6 +174,8 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene
       currentComm_ = &knownSizeComm_;
    else if ( other.currentComm_ == &other.unknownSizeComm_ )
       currentComm_ = &unknownSizeComm_;
+   else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
+      currentComm_ = &unknownSizeCommIProbe_;
    else if ( other.currentComm_ == &other.noMPIComm_ )
       currentComm_ = &noMPIComm_;
    else
@@ -508,6 +514,8 @@ void GenericBufferSystem<Rb, Sb>::setCommunicationType( const bool knownSize )
    {
       if( knownSize )
          currentComm_ = &knownSizeComm_;
+      else if ( useIProbe_ )
+         currentComm_ = &unknownSizeCommIProbe_;
       else
          currentComm_ = &unknownSizeComm_;
    }
diff --git a/src/core/mpi/BufferSystemHelper.h b/src/core/mpi/BufferSystemHelper.h
index e143f6b10..661e46358 100644
--- a/src/core/mpi/BufferSystemHelper.h
+++ b/src/core/mpi/BufferSystemHelper.h
@@ -161,6 +161,34 @@ namespace internal {
    };
 
 
+   template< typename RecvBuffer_T, typename SendBuffer_T>
+   class UnknownSizeCommunicationIProbe : public AbstractCommunication<RecvBuffer_T, SendBuffer_T>
+   {
+   public:
+      using typename AbstractCommunication<RecvBuffer_T, SendBuffer_T>::ReceiveInfo;
+
+      UnknownSizeCommunicationIProbe( const MPI_Comm & communicator, int tag = 0 )
+           :  AbstractCommunication<RecvBuffer_T, SendBuffer_T>( communicator, tag ), sending_(false), receiving_(false) {}
+
+      virtual ~UnknownSizeCommunicationIProbe() {}
+
+      virtual void send( MPIRank receiver, const SendBuffer_T & sendBuffer );
+      virtual void waitForSends();
+
+      virtual void scheduleReceives( std::map<MPIRank, ReceiveInfo> & recvInfos );
+
+      /// size field of recvInfos can be invalid, is filled in with the actual message size
+      virtual MPIRank waitForNextReceive( std::map<MPIRank, ReceiveInfo> & recvInfos );
+
+   private:
+      bool sending_;
+      bool receiving_;
+      int  pendingReceives_;
+
+      std::vector<MPI_Request> sendRequests_;
+   };
+
+
    template< typename RecvBuffer_T, typename SendBuffer_T>
    class NoMPICommunication : public AbstractCommunication<RecvBuffer_T, SendBuffer_T>
    {
diff --git a/src/core/mpi/BufferSystemHelper.impl.h b/src/core/mpi/BufferSystemHelper.impl.h
index f982aab68..fc2c6a5ae 100644
--- a/src/core/mpi/BufferSystemHelper.impl.h
+++ b/src/core/mpi/BufferSystemHelper.impl.h
@@ -349,6 +349,133 @@ MPIRank UnknownSizeCommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank,
 
 
 
+//======================================================================================================================
+//
+//  Unknown Size Communication (IProbe method)
+//
+//======================================================================================================================
+
+template< typename Rb, typename Sb>
+void UnknownSizeCommunicationIProbe<Rb, Sb>::send( MPIRank receiver, const Sb & sendBuffer )
+{
+   WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
+
+   if ( ! sending_ )
+      sending_ = true;
+
+
+   // Send content message
+
+   sendRequests_.push_back( MPI_REQUEST_NULL );
+   MPI_Request & contentMsgReq = sendRequests_.back();
+
+   //WALBERLA_LOG_DEVEL("sending " << sendBuffer.size() << " to " << receiver);
+
+   MPI_Isend(sendBuffer.ptr(),           // pointer to size buffer
+             int_c( sendBuffer.size() ), // send one size
+             MPI_BYTE,                   // type
+             receiver,                   // receiver rank
+             this->tag_,                 // message tag
+             this->communicator_,        // communicator
+             & contentMsgReq             // request needed for wait
+             );
+}
+
+template< typename Rb, typename Sb>
+void UnknownSizeCommunicationIProbe<Rb, Sb>::waitForSends()
+{
+   WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
+
+   sending_ = false;
+
+   if ( sendRequests_.empty() )
+      return;
+
+   MPI_Waitall( int_c( sendRequests_.size() ),
+                &sendRequests_[0],
+                MPI_STATUSES_IGNORE );
+
+   sendRequests_.clear();
+}
+
+template< typename Rb, typename Sb>
+void UnknownSizeCommunicationIProbe<Rb, Sb>::scheduleReceives( std::map<MPIRank, ReceiveInfo> & recvInfos )
+{
+   WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
+
+   receiving_ = true;
+
+   pendingReceives_ = int_c(recvInfos.size());
+   for( auto it = recvInfos.begin(); it != recvInfos.end(); ++it )
+   {
+      ReceiveInfo & recvInfo = it->second;
+      recvInfo.size = INVALID_SIZE;
+   }
+}
+
+
+template< typename Rb, typename Sb>
+MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPIRank, ReceiveInfo> & recvInfos )
+{
+   WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
+
+   WALBERLA_ASSERT( receiving_ );
+
+   if( pendingReceives_ < 1 ) {
+      receiving_ = false;
+      return INVALID_RANK;
+   }
+
+   // On first entry in this function:
+   //  - receive request vector contains requests for size messages
+   //  - all ReceiveInfo's have sizeReceived=false
+   while( true ) // this loops as long as size messages are received
+   {
+      for( auto it = recvInfos.begin(); it != recvInfos.end(); ++it )
+      {
+         const MPIRank sender   = it->first;
+         ReceiveInfo & recvInfo = it->second;
+
+         if (recvInfo.size != INVALID_SIZE) continue;
+
+         int probeFlag;
+         MPI_Status probeStatus;
+         MPI_Iprobe( sender,
+                     this->tag_,
+                     this->communicator_,
+                     &probeFlag,
+                     &probeStatus);
+         if (probeFlag)
+         {
+            int count = 0;
+            MPI_Get_count( &probeStatus, MPI_BYTE, &count );
+            //WALBERLA_LOG_DEVEL("received " << count << " from " << sender);
+            recvInfo.size = count;
+            recvInfo.buffer.resize( uint_c( recvInfo.size ) );
+
+            MPI_Status recvStatus;
+            MPI_Recv( recvInfo.buffer.ptr(),      // where to store received size
+                      count,                      // size of expected message
+                      MPI_BYTE,                   // type
+                      sender,                     // rank of sender process
+                      this->tag_,                 // message tag
+                      this->communicator_,        // communicator
+                      &recvStatus                 // request, needed for wait
+                      );
+
+            --pendingReceives_;
+            return sender;
+         }
+      }
+   }
+
+   WALBERLA_ASSERT( false );
+   return INVALID_RANK; //cannot happen - only to prevent compiler warnings
+}
+
+
+
+
 //======================================================================================================================
 //
 //  NoMPI Communication
diff --git a/tests/core/mpi/BufferSystemTest.cpp b/tests/core/mpi/BufferSystemTest.cpp
index d84d12da3..6de0e0ce4 100644
--- a/tests/core/mpi/BufferSystemTest.cpp
+++ b/tests/core/mpi/BufferSystemTest.cpp
@@ -119,7 +119,7 @@ void symmetricCommunication()
  * Every process sends a message as big as his rank number
  * to the neighboring processes (1D , periodic boundary)
  */
-void asymmetricCommunication()
+void asymmetricCommunication(const bool useIProbe)
 {
    auto mpiManager = MPIManager::instance();
 
@@ -132,6 +132,7 @@ void asymmetricCommunication()
 
 
    BufferSystem bs ( MPI_COMM_WORLD );
+   bs.useIProbe(useIProbe);
 
    // Set receiver information
    std::set<int> receiveFrom;
@@ -186,7 +187,7 @@ void asymmetricCommunication()
 
 // like asymmetricCommunication, but the message size is a random value
 // that changes every communication step
-void timeVaryingCommunication()
+void timeVaryingCommunication(const bool useIProbe)
 {
    auto mpiManager = MPIManager::instance();
 
@@ -198,6 +199,7 @@ void timeVaryingCommunication()
    WALBERLA_CHECK_GREATER_EQUAL( numProcesses, 3 );
 
    BufferSystem bs ( MPI_COMM_WORLD );
+   bs.useIProbe(useIProbe);
 
    // artificial special case: no message from root
    bs.sendBuffer( rightNeighbor );
@@ -257,7 +259,7 @@ void timeVaryingCommunication()
  *    i.e. rank 1 sends a "1" once, rank 2 sends a message containing two "2"'s ...
  */
 
-void gatherUsingAsymmetricCommunication()
+void gatherUsingAsymmetricCommunication(const bool useIProbe)
 {
    int rank          = MPIManager::instance()->worldRank();
    int numProcesses  = MPIManager::instance()->numProcesses();
@@ -267,6 +269,7 @@ void gatherUsingAsymmetricCommunication()
    const int TAG=42;
 
    BufferSystem bs (MPI_COMM_WORLD, TAG );
+   bs.useIProbe(useIProbe);
 
 
    if ( rank ==0 )
@@ -381,13 +384,16 @@ int main(int argc, char**argv)
    symmetricCommunication();
 
    WALBERLA_LOG_INFO_ON_ROOT("Testing Asymmetric Communication...");
-   asymmetricCommunication();
+   asymmetricCommunication(false);
+   asymmetricCommunication(true);
 
    WALBERLA_LOG_INFO_ON_ROOT("Testing time-varying Communication...");
-   timeVaryingCommunication();
+   timeVaryingCommunication(false);
+   timeVaryingCommunication(true);
 
    WALBERLA_LOG_INFO_ON_ROOT("Testing Gather Operation...");
-   gatherUsingAsymmetricCommunication();
+   gatherUsingAsymmetricCommunication(false);
+   gatherUsingAsymmetricCommunication(true);
 
    WALBERLA_LOG_INFO_ON_ROOT("Testing self-send...");
    selfSend();
-- 
GitLab