Commit 630fce6d authored by Sebastian Eibl's avatar Sebastian Eibl

added IProbe communication for varying size communication

parent 71a7b79f
......@@ -151,6 +151,7 @@ public:
inline const Matrix2 getInverse() const;
inline bool isSingular() const;
inline bool isSymmetric() const;
inline Type* data() {return v_;}
//@}
//*******************************************************************************************************************
......
......@@ -173,6 +173,7 @@ public:
inline const Matrix3 getCholesky() const;
template< typename Other > inline const Vector3<HIGH> solve( const Vector3<Other> &rhs ) const;
inline Type trace() const;
inline Type* data() {return v_;}
//@}
//*******************************************************************************************************************
......
......@@ -159,6 +159,7 @@ public:
inline Length length() const;
inline Type sqrLength() const;
inline Vector2<Length> getNormalized() const;
inline Type* data() {return v_;}
//@}
//*******************************************************************************************************************
......
......@@ -165,6 +165,7 @@ public:
inline Vector3<Length> getNormalized() const;
inline Vector3<Length> getNormalizedOrZero() const;
inline void reset();
inline Type* data() {return v_;}
//@}
//*******************************************************************************************************************
......
......@@ -194,6 +194,9 @@ public:
//@}
//*******************************************************************************************************************
void useIProbe(const bool use) { useIProbe_ = use; }
bool isIProbedUsed() const { return useIProbe_; }
///Bytes sent during the current or last communication
int64_t getBytesSent() const { return bytesSent_; }
///Bytes received during the current or last communication
......@@ -226,10 +229,11 @@ protected:
RecvBuffer_T * waitForNext( MPIRank & fromRank );
void setCommunicationType( const bool knownSize );
internal::KnownSizeCommunication<RecvBuffer_T, SendBuffer_T> knownSizeComm_;
internal::UnknownSizeCommunication<RecvBuffer_T, SendBuffer_T> unknownSizeComm_;
internal::NoMPICommunication<RecvBuffer_T, SendBuffer_T> noMPIComm_;
internal::AbstractCommunication<RecvBuffer_T, SendBuffer_T> * currentComm_; //< after receiver setup, this points to unknown- or knownSizeComm_
internal::KnownSizeCommunication<RecvBuffer_T, SendBuffer_T> knownSizeComm_;
internal::UnknownSizeCommunication<RecvBuffer_T, SendBuffer_T> unknownSizeComm_;
internal::UnknownSizeCommunicationIProbe<RecvBuffer_T, SendBuffer_T> unknownSizeCommIProbe_;
internal::NoMPICommunication<RecvBuffer_T, SendBuffer_T> noMPIComm_;
internal::AbstractCommunication<RecvBuffer_T, SendBuffer_T> * currentComm_; //< after receiver setup, this points to unknown- or knownSizeComm_
bool sizeChangesEverytime_; //< if set to true, the receiveSizeUnknown_ is set to true before communicating
bool communicationRunning_; //< indicates if a communication step is currently running
......@@ -251,6 +255,8 @@ protected:
//each concurrently running communication uses different tags
static std::set<int> activeTags_;
bool useIProbe_ = false; ///< switch betwenn IProbe and two message communication for varying size communication
int64_t bytesSent_ = 0; ///< number of bytes sent during last communication
int64_t bytesReceived_ = 0; ///< number of bytes received during last communication
......
......@@ -127,6 +127,7 @@ template< typename Rb, typename Sb>
GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const MPI_Comm & communicator, int tag )
: knownSizeComm_ ( communicator, tag ),
unknownSizeComm_( communicator, tag ),
unknownSizeCommIProbe_( communicator, tag ),
noMPIComm_( communicator, tag ),
currentComm_ ( nullptr ),
sizeChangesEverytime_( true ),
......@@ -138,6 +139,7 @@ template< typename Rb, typename Sb>
GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &other )
: knownSizeComm_ ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
unknownSizeComm_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
unknownSizeCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
noMPIComm_ ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
currentComm_ ( nullptr ),
sizeChangesEverytime_( other.sizeChangesEverytime_ ),
......@@ -150,6 +152,8 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth
currentComm_ = &knownSizeComm_;
else if ( other.currentComm_ == &other.unknownSizeComm_ )
currentComm_ = &unknownSizeComm_;
else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
currentComm_ = &unknownSizeCommIProbe_;
else if ( other.currentComm_ == &other.noMPIComm_ )
currentComm_ = &noMPIComm_;
else
......@@ -170,6 +174,8 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene
currentComm_ = &knownSizeComm_;
else if ( other.currentComm_ == &other.unknownSizeComm_ )
currentComm_ = &unknownSizeComm_;
else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
currentComm_ = &unknownSizeCommIProbe_;
else if ( other.currentComm_ == &other.noMPIComm_ )
currentComm_ = &noMPIComm_;
else
......@@ -508,6 +514,8 @@ void GenericBufferSystem<Rb, Sb>::setCommunicationType( const bool knownSize )
{
if( knownSize )
currentComm_ = &knownSizeComm_;
else if ( useIProbe_ )
currentComm_ = &unknownSizeCommIProbe_;
else
currentComm_ = &unknownSizeComm_;
}
......
......@@ -161,6 +161,34 @@ namespace internal {
};
template< typename RecvBuffer_T, typename SendBuffer_T>
class UnknownSizeCommunicationIProbe : public AbstractCommunication<RecvBuffer_T, SendBuffer_T>
{
public:
using typename AbstractCommunication<RecvBuffer_T, SendBuffer_T>::ReceiveInfo;
UnknownSizeCommunicationIProbe( const MPI_Comm & communicator, int tag = 0 )
: AbstractCommunication<RecvBuffer_T, SendBuffer_T>( communicator, tag ), sending_(false), receiving_(false) {}
virtual ~UnknownSizeCommunicationIProbe() {}
virtual void send( MPIRank receiver, const SendBuffer_T & sendBuffer );
virtual void waitForSends();
virtual void scheduleReceives( std::map<MPIRank, ReceiveInfo> & recvInfos );
/// size field of recvInfos can be invalid, is filled in with the actual message size
virtual MPIRank waitForNextReceive( std::map<MPIRank, ReceiveInfo> & recvInfos );
private:
bool sending_;
bool receiving_;
int pendingReceives_;
std::vector<MPI_Request> sendRequests_;
};
template< typename RecvBuffer_T, typename SendBuffer_T>
class NoMPICommunication : public AbstractCommunication<RecvBuffer_T, SendBuffer_T>
{
......
......@@ -349,6 +349,133 @@ MPIRank UnknownSizeCommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank,
//======================================================================================================================
//
// Unknown Size Communication (IProbe method)
//
//======================================================================================================================
template< typename Rb, typename Sb>
void UnknownSizeCommunicationIProbe<Rb, Sb>::send( MPIRank receiver, const Sb & sendBuffer )
{
WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
if ( ! sending_ )
sending_ = true;
// Send content message
sendRequests_.push_back( MPI_REQUEST_NULL );
MPI_Request & contentMsgReq = sendRequests_.back();
//WALBERLA_LOG_DEVEL("sending " << sendBuffer.size() << " to " << receiver);
MPI_Isend(sendBuffer.ptr(), // pointer to size buffer
int_c( sendBuffer.size() ), // send one size
MPI_BYTE, // type
receiver, // receiver rank
this->tag_, // message tag
this->communicator_, // communicator
& contentMsgReq // request needed for wait
);
}
template< typename Rb, typename Sb>
void UnknownSizeCommunicationIProbe<Rb, Sb>::waitForSends()
{
WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
sending_ = false;
if ( sendRequests_.empty() )
return;
MPI_Waitall( int_c( sendRequests_.size() ),
&sendRequests_[0],
MPI_STATUSES_IGNORE );
sendRequests_.clear();
}
template< typename Rb, typename Sb>
void UnknownSizeCommunicationIProbe<Rb, Sb>::scheduleReceives( std::map<MPIRank, ReceiveInfo> & recvInfos )
{
WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
receiving_ = true;
pendingReceives_ = int_c(recvInfos.size());
for( auto it = recvInfos.begin(); it != recvInfos.end(); ++it )
{
ReceiveInfo & recvInfo = it->second;
recvInfo.size = INVALID_SIZE;
}
}
template< typename Rb, typename Sb>
MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPIRank, ReceiveInfo> & recvInfos )
{
WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
WALBERLA_ASSERT( receiving_ );
if( pendingReceives_ < 1 ) {
receiving_ = false;
return INVALID_RANK;
}
// On first entry in this function:
// - receive request vector contains requests for size messages
// - all ReceiveInfo's have sizeReceived=false
while( true ) // this loops as long as size messages are received
{
for( auto it = recvInfos.begin(); it != recvInfos.end(); ++it )
{
const MPIRank sender = it->first;
ReceiveInfo & recvInfo = it->second;
if (recvInfo.size != INVALID_SIZE) continue;
int probeFlag;
MPI_Status probeStatus;
MPI_Iprobe( sender,
this->tag_,
this->communicator_,
&probeFlag,
&probeStatus);
if (probeFlag)
{
int count = 0;
MPI_Get_count( &probeStatus, MPI_BYTE, &count );
//WALBERLA_LOG_DEVEL("received " << count << " from " << sender);
recvInfo.size = count;
recvInfo.buffer.resize( uint_c( recvInfo.size ) );
MPI_Status recvStatus;
MPI_Recv( recvInfo.buffer.ptr(), // where to store received size
count, // size of expected message
MPI_BYTE, // type
sender, // rank of sender process
this->tag_, // message tag
this->communicator_, // communicator
&recvStatus // request, needed for wait
);
--pendingReceives_;
return sender;
}
}
}
WALBERLA_ASSERT( false );
return INVALID_RANK; //cannot happen - only to prevent compiler warnings
}
//======================================================================================================================
//
// NoMPI Communication
......
......@@ -119,7 +119,7 @@ void symmetricCommunication()
* Every process sends a message as big as his rank number
* to the neighboring processes (1D , periodic boundary)
*/
void asymmetricCommunication()
void asymmetricCommunication(const bool useIProbe)
{
auto mpiManager = MPIManager::instance();
......@@ -132,6 +132,7 @@ void asymmetricCommunication()
BufferSystem bs ( MPI_COMM_WORLD );
bs.useIProbe(useIProbe);
// Set receiver information
std::set<int> receiveFrom;
......@@ -186,7 +187,7 @@ void asymmetricCommunication()
// like asymmetricCommunication, but the message size is a random value
// that changes every communication step
void timeVaryingCommunication()
void timeVaryingCommunication(const bool useIProbe)
{
auto mpiManager = MPIManager::instance();
......@@ -198,6 +199,7 @@ void timeVaryingCommunication()
WALBERLA_CHECK_GREATER_EQUAL( numProcesses, 3 );
BufferSystem bs ( MPI_COMM_WORLD );
bs.useIProbe(useIProbe);
// artificial special case: no message from root
bs.sendBuffer( rightNeighbor );
......@@ -257,7 +259,7 @@ void timeVaryingCommunication()
* i.e. rank 1 sends a "1" once, rank 2 sends a message containing two "2"'s ...
*/
void gatherUsingAsymmetricCommunication()
void gatherUsingAsymmetricCommunication(const bool useIProbe)
{
int rank = MPIManager::instance()->worldRank();
int numProcesses = MPIManager::instance()->numProcesses();
......@@ -267,6 +269,7 @@ void gatherUsingAsymmetricCommunication()
const int TAG=42;
BufferSystem bs (MPI_COMM_WORLD, TAG );
bs.useIProbe(useIProbe);
if ( rank ==0 )
......@@ -381,13 +384,16 @@ int main(int argc, char**argv)
symmetricCommunication();
WALBERLA_LOG_INFO_ON_ROOT("Testing Asymmetric Communication...");
asymmetricCommunication();
asymmetricCommunication(false);
asymmetricCommunication(true);
WALBERLA_LOG_INFO_ON_ROOT("Testing time-varying Communication...");
timeVaryingCommunication();
timeVaryingCommunication(false);
timeVaryingCommunication(true);
WALBERLA_LOG_INFO_ON_ROOT("Testing Gather Operation...");
gatherUsingAsymmetricCommunication();
gatherUsingAsymmetricCommunication(false);
gatherUsingAsymmetricCommunication(true);
WALBERLA_LOG_INFO_ON_ROOT("Testing self-send...");
selfSend();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment