Commit 51bdeacc authored by Nils Kohl's avatar Nils Kohl 🌝
Browse files

Implemented unknown sender communication.

See issue #121 and MR !297

Allows to set the receiver info in the GenericBufferSystem
to a number of sender ranks, without specifying the exact
ranks themselves.

Implementation optimized for minimal code duplication.
Functionality was implemented by extending IProbe-based
communication.
parent 13760c48
...@@ -233,12 +233,13 @@ protected: ...@@ -233,12 +233,13 @@ protected:
internal::KnownSizeCommunication<RecvBuffer_T, SendBuffer_T> knownSizeComm_; internal::KnownSizeCommunication<RecvBuffer_T, SendBuffer_T> knownSizeComm_;
internal::UnknownSizeCommunication<RecvBuffer_T, SendBuffer_T> unknownSizeComm_; internal::UnknownSizeCommunication<RecvBuffer_T, SendBuffer_T> unknownSizeComm_;
internal::UnknownSizeCommunicationIProbe<RecvBuffer_T, SendBuffer_T> unknownSizeCommIProbe_; internal::UnknownSizeCommunicationIProbe<RecvBuffer_T, SendBuffer_T> unknownSizeCommIProbe_;
internal::UnknownSizeCommunicationIProbe<RecvBuffer_T, SendBuffer_T> unknownSizeAndSenderCommIProbe_;
internal::NoMPICommunication<RecvBuffer_T, SendBuffer_T> noMPIComm_; internal::NoMPICommunication<RecvBuffer_T, SendBuffer_T> noMPIComm_;
internal::AbstractCommunication<RecvBuffer_T, SendBuffer_T> * currentComm_; //< after receiver setup, this points to unknown- or knownSizeComm_ internal::AbstractCommunication<RecvBuffer_T, SendBuffer_T> * currentComm_; //< after receiver setup, this points to unknown- or knownSizeComm_
bool sizeChangesEverytime_; //< if set to true, the receiveSizeUnknown_ is set to true before communicating bool sizeChangesEverytime_; //< if set to true, the receiveSizeUnknown_ is set to true before communicating
bool communicationRunning_; //< indicates if a communication step is currently running bool communicationRunning_; //< indicates if a communication step is currently running
bool senderKnown_; //< if false, the sender ranks are unknown before receiving
/// Info about the message to be received from a certain rank: /// Info about the message to be received from a certain rank:
/// information holds the buffer and, if known, the message size /// information holds the buffer and, if known, the message size
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "core/mpi/MPIManager.h" #include "core/mpi/MPIManager.h"
#include "core/debug/CheckFunctions.h" #include "core/debug/CheckFunctions.h"
#include <algorithm>
namespace walberla { namespace walberla {
namespace mpi { namespace mpi {
...@@ -128,10 +129,12 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const MPI_Comm & communicator, ...@@ -128,10 +129,12 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const MPI_Comm & communicator,
: knownSizeComm_ ( communicator, tag ), : knownSizeComm_ ( communicator, tag ),
unknownSizeComm_( communicator, tag ), unknownSizeComm_( communicator, tag ),
unknownSizeCommIProbe_( communicator, tag ), unknownSizeCommIProbe_( communicator, tag ),
unknownSizeAndSenderCommIProbe_( communicator, tag, false ),
noMPIComm_( communicator, tag ), noMPIComm_( communicator, tag ),
currentComm_ ( nullptr ), currentComm_ ( nullptr ),
sizeChangesEverytime_( true ), sizeChangesEverytime_( true ),
communicationRunning_( false ) communicationRunning_( false ),
senderKnown_( true )
{ {
} }
...@@ -140,10 +143,12 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth ...@@ -140,10 +143,12 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth
: knownSizeComm_ ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ), : knownSizeComm_ ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
unknownSizeComm_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ), unknownSizeComm_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
unknownSizeCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ), unknownSizeCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
unknownSizeAndSenderCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag(), false ),
noMPIComm_ ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ), noMPIComm_ ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
currentComm_ ( nullptr ), currentComm_ ( nullptr ),
sizeChangesEverytime_( other.sizeChangesEverytime_ ), sizeChangesEverytime_( other.sizeChangesEverytime_ ),
communicationRunning_( other.communicationRunning_ ), communicationRunning_( other.communicationRunning_ ),
senderKnown_( true ),
recvInfos_( other.recvInfos_ ), recvInfos_( other.recvInfos_ ),
sendInfos_( other.sendInfos_ ) sendInfos_( other.sendInfos_ )
{ {
...@@ -154,6 +159,8 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth ...@@ -154,6 +159,8 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth
currentComm_ = &unknownSizeComm_; currentComm_ = &unknownSizeComm_;
else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ ) else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
currentComm_ = &unknownSizeCommIProbe_; currentComm_ = &unknownSizeCommIProbe_;
else if ( other.currentComm_ == &other.unknownSizeAndSenderCommIProbe_ )
currentComm_ = &unknownSizeAndSenderCommIProbe_;
else if ( other.currentComm_ == &other.noMPIComm_ ) else if ( other.currentComm_ == &other.noMPIComm_ )
currentComm_ = &noMPIComm_; currentComm_ = &noMPIComm_;
else else
...@@ -167,6 +174,7 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene ...@@ -167,6 +174,7 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene
sizeChangesEverytime_ = other.sizeChangesEverytime_; sizeChangesEverytime_ = other.sizeChangesEverytime_;
communicationRunning_ = other.communicationRunning_; communicationRunning_ = other.communicationRunning_;
senderKnown_ = other.senderKnown_;
recvInfos_ = other.recvInfos_; recvInfos_ = other.recvInfos_;
sendInfos_ = other.sendInfos_; sendInfos_ = other.sendInfos_;
...@@ -176,6 +184,8 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene ...@@ -176,6 +184,8 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene
currentComm_ = &unknownSizeComm_; currentComm_ = &unknownSizeComm_;
else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ ) else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
currentComm_ = &unknownSizeCommIProbe_; currentComm_ = &unknownSizeCommIProbe_;
else if ( other.currentComm_ == &other.unknownSizeAndSenderCommIProbe_ )
currentComm_ = &unknownSizeAndSenderCommIProbe_;
else if ( other.currentComm_ == &other.noMPIComm_ ) else if ( other.currentComm_ == &other.noMPIComm_ )
currentComm_ = &noMPIComm_; currentComm_ = &noMPIComm_;
else else
...@@ -228,7 +238,20 @@ void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const std::set<MPIRank> & ran ...@@ -228,7 +238,20 @@ void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const std::set<MPIRank> & ran
template< typename Rb, typename Sb> template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const int numReceives ) void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const int numReceives )
{ {
WALBERLA_ABORT("NOT IMPLEMENTED!"); WALBERLA_ASSERT( ! communicationRunning_ );
recvInfos_.clear();
for ( MPIRank sender = 0; sender < numReceives; sender++ )
{
recvInfos_[ - 1 - sender ].size = INVALID_SIZE;
}
// "any sender"-communication is only supported via IProbe
useIProbe( true );
sizeChangesEverytime_ = true;
senderKnown_ = false;
setCommunicationType( false );
} }
...@@ -526,12 +549,21 @@ void GenericBufferSystem<Rb, Sb>::setCommunicationType( const bool knownSize ) ...@@ -526,12 +549,21 @@ void GenericBufferSystem<Rb, Sb>::setCommunicationType( const bool knownSize )
WALBERLA_MPI_SECTION() WALBERLA_MPI_SECTION()
{ {
if( knownSize ) if ( senderKnown_ )
currentComm_ = &knownSizeComm_; {
else if ( useIProbe_ ) if (knownSize)
currentComm_ = &unknownSizeCommIProbe_; currentComm_ = &knownSizeComm_;
else if (useIProbe_)
currentComm_ = &unknownSizeCommIProbe_;
else
currentComm_ = &unknownSizeComm_;
}
else else
currentComm_ = &unknownSizeComm_; {
WALBERLA_CHECK( useIProbe_, "Unknown sender communication is currently only supported with IProbe-based "
"communication." )
currentComm_ = &unknownSizeAndSenderCommIProbe_;
}
} }
} }
......
...@@ -167,8 +167,9 @@ namespace internal { ...@@ -167,8 +167,9 @@ namespace internal {
public: public:
using typename AbstractCommunication<RecvBuffer_T, SendBuffer_T>::ReceiveInfo; using typename AbstractCommunication<RecvBuffer_T, SendBuffer_T>::ReceiveInfo;
UnknownSizeCommunicationIProbe( const MPI_Comm & communicator, int tag = 0 ) UnknownSizeCommunicationIProbe( const MPI_Comm & communicator, int tag = 0, bool senderKnown = true )
: AbstractCommunication<RecvBuffer_T, SendBuffer_T>( communicator, tag ), sending_(false), receiving_(false) {} : AbstractCommunication<RecvBuffer_T, SendBuffer_T>( communicator, tag ), sending_(false), receiving_(false),
senderKnown_( senderKnown ) {}
virtual ~UnknownSizeCommunicationIProbe() {} virtual ~UnknownSizeCommunicationIProbe() {}
...@@ -184,6 +185,7 @@ namespace internal { ...@@ -184,6 +185,7 @@ namespace internal {
bool sending_; bool sending_;
bool receiving_; bool receiving_;
int pendingReceives_; int pendingReceives_;
bool senderKnown_;
std::vector<MPI_Request> sendRequests_; std::vector<MPI_Request> sendRequests_;
}; };
......
...@@ -352,6 +352,7 @@ MPIRank UnknownSizeCommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank, ...@@ -352,6 +352,7 @@ MPIRank UnknownSizeCommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank,
//====================================================================================================================== //======================================================================================================================
// //
// Unknown Size Communication (IProbe method) // Unknown Size Communication (IProbe method)
// Can also be used if there are unknown sender ranks.
// //
//====================================================================================================================== //======================================================================================================================
...@@ -438,15 +439,29 @@ MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPI ...@@ -438,15 +439,29 @@ MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPI
if (recvInfo.size != INVALID_SIZE) continue; if (recvInfo.size != INVALID_SIZE) continue;
const MPIRank iprobeSender = senderKnown_ ? sender : MPI_ANY_SOURCE;
int probeFlag; int probeFlag;
MPI_Status probeStatus; MPI_Status probeStatus;
MPI_Iprobe( sender, MPI_Iprobe( iprobeSender,
this->tag_, this->tag_,
this->communicator_, this->communicator_,
&probeFlag, &probeFlag,
&probeStatus); &probeStatus);
if (probeFlag) if (probeFlag)
{ {
const MPIRank actualSender = probeStatus.MPI_SOURCE;
if ( !senderKnown_ )
{
recvInfos.erase( sender );
recvInfo = recvInfos[actualSender];
}
else
{
WALBERLA_ASSERT_EQUAL( sender, actualSender );
}
int count = 0; int count = 0;
MPI_Get_count( &probeStatus, MPI_BYTE, &count ); MPI_Get_count( &probeStatus, MPI_BYTE, &count );
//WALBERLA_LOG_DEVEL("received " << count << " from " << sender); //WALBERLA_LOG_DEVEL("received " << count << " from " << sender);
...@@ -457,14 +472,14 @@ MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPI ...@@ -457,14 +472,14 @@ MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPI
MPI_Recv( recvInfo.buffer.ptr(), // where to store received size MPI_Recv( recvInfo.buffer.ptr(), // where to store received size
count, // size of expected message count, // size of expected message
MPI_BYTE, // type MPI_BYTE, // type
sender, // rank of sender process actualSender, // rank of sender process
this->tag_, // message tag this->tag_, // message tag
this->communicator_, // communicator this->communicator_, // communicator
&recvStatus // request, needed for wait &recvStatus // request, needed for wait
); );
--pendingReceives_; --pendingReceives_;
return sender; return actualSender;
} }
} }
} }
...@@ -539,6 +554,7 @@ MPIRank NoMPICommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank, Receiv ...@@ -539,6 +554,7 @@ MPIRank NoMPICommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank, Receiv
} // namespace internal } // namespace internal
} // namespace mpi } // namespace mpi
} // namespace walberla } // namespace walberla
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment