Commit 7ca4d272 authored by Sebastian Eibl's avatar Sebastian Eibl

Merge branch 'ProbeVsExtraMessage' into 'master'

added IProbe communication for varying size communication

See merge request !198
parents 29572961 2c27ccff
Pipeline #14882 failed with stages
in 808 minutes and 30 seconds
......@@ -8,6 +8,7 @@ add_subdirectory( NonUniformGrid )
add_subdirectory( MotionSingleHeavySphere )
add_subdirectory( PeriodicGranularGas )
add_subdirectory( PoiseuilleChannel )
add_subdirectory( ProbeVsExtraMessage )
add_subdirectory( SchaeferTurek )
add_subdirectory( UniformGrid )
add_subdirectory( UniformGridGPU )
waLBerla_add_executable ( NAME ProbeVsExtraMessage
DEPENDS core postprocessing stencil )
//======================================================================================================================
//
// This file is part of waLBerla. waLBerla is free software: you can
// redistribute it and/or modify it under the terms of the GNU General Public
// License as published by the Free Software Foundation, either version 3 of
// the License, or (at your option) any later version.
//
// waLBerla is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
// for more details.
//
// You should have received a copy of the GNU General Public License along
// with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
//
//! \file ProbeVsExtraMessage.h
//! \author Martin Bauer <martin.bauer@fau.de>
//! \author Sebastian Eibl <sebastian.eibl@fau.de>
//! \brief Micro Benchmark, measuring time for different variable sized communications
//
//======================================================================================================================
#include "core/DataTypes.h"
#include "core/Environment.h"
#include "core/math/Vector3.h"
#include "core/mpi/BufferSystem.h"
#include "core/mpi/MPIManager.h"
#include "core/timing/TimingPool.h"
#include "postprocessing/sqlite/SQLite.h"
#include "stencil/D3Q27.h"
#include "stencil/D3Q19.h"
#include "stencil/D3Q7.h"
#include <array>
#include <iostream>
#include <sstream>
namespace walberla {
class MPIInfo
{
public:
MPIInfo( const Vector3<uint_t>& procs, const Vector3<bool>& periodicity );
int getNeighborRank(const stencil::Direction& dir);
private:
Vector3<uint_t> procs_;
Vector3<bool> periodicity_;
Vector3<int> pos_;
};
MPIInfo::MPIInfo( const Vector3<uint_t>& procs, const Vector3<bool>& periodicity )
: procs_(procs)
, periodicity_(periodicity)
{
mpi::MPIManager::instance()->createCartesianComm(procs[0], procs[1], procs[2], periodicity[0], periodicity[1], periodicity[2]);
mpi::MPIManager::instance()->cartesianCoord(pos_.data());
}
int MPIInfo::getNeighborRank( const stencil::Direction& dir )
{
auto neighborCoord = pos_;
neighborCoord[0] += stencil::cx[dir];
neighborCoord[1] += stencil::cy[dir];
neighborCoord[2] += stencil::cz[dir];
if (neighborCoord[0] < 0) return -1;
if (neighborCoord[1] < 0) return -1;
if (neighborCoord[2] < 0) return -1;
if (neighborCoord[0] >= int_c(procs_[0])) return -1;
if (neighborCoord[1] >= int_c(procs_[1])) return -1;
if (neighborCoord[2] >= int_c(procs_[2])) return -1;
return mpi::MPIManager::instance()->cartesianRank(uint_c(neighborCoord[0]), uint_c(neighborCoord[1]), uint_c(neighborCoord[2]));
}
template <typename Stencil>
void communicate( MPIInfo& mpiInfo,
const uint_t iterations,
const uint_t messageSize,
const bool iProbe,
WcTimingPool& tp)
{
std::vector<char> sendBuf(messageSize);
std::vector<char> recvBuf(messageSize);
WcTimer& timer = tp[iProbe ? "IProbe" : "twoMessage"];
mpi::BufferSystem bs( mpi::MPIManager::instance()->comm() );
bs.useIProbe(iProbe);
for( uint_t i =0; i < iterations; ++i )
{
timer.start();
for (auto dirIt = Stencil::beginNoCenter(); dirIt != Stencil::end(); ++dirIt)
{
auto recvRank = mpiInfo.getNeighborRank( *dirIt );
if (recvRank == -1) continue;
bs.sendBuffer(recvRank) << sendBuf;
WALBERLA_ASSERT_EQUAL(bs.sendBuffer(recvRank).size(), messageSize + sizeof(size_t));
}
bs.setReceiverInfoFromSendBufferState(false, true);
bs.sendAll();
for( auto it = bs.begin(); it != bs.end(); ++it )
{
WALBERLA_ASSERT_EQUAL(it.buffer().size(), messageSize + sizeof(size_t));
it.buffer() >> recvBuf;
WALBERLA_ASSERT_EQUAL(recvBuf.size(), messageSize);
WALBERLA_ASSERT(it.buffer().isEmpty());
}
timer.end();
}
}
int main( int argc, char ** argv )
{
mpi::Environment mpiEnv( argc, argv );
if ( argc != 10 )
{
WALBERLA_ROOT_SECTION()
{
std::cout << "Usage ./probeVsExtraMessage x y z px py pz iterations messageSize stencil " << std::endl;
std::cout << std::endl;
std::cout << "x: number of processes in x direction" << std::endl;
std::cout << "y: number of processes in y direction" << std::endl;
std::cout << "z: number of processes in z direction" << std::endl;
std::cout << "px: periodic in x direction?" << std::endl;
std::cout << "py: periodic in y direction?" << std::endl;
std::cout << "pz: periodic in z direction?" << std::endl;
std::cout << "iterations: number of communications per case" << std::endl;
std::cout << "messageSize: size of the SendBuffer in bytes" << std::endl;
std::cout << "stencil: communication stencil (D3Q27, D3Q19, D3Q7)" << std::endl;
}
return EXIT_FAILURE;
}
Vector3<uint_t> procs;
procs[0] = std::stoul(argv[1]);
procs[1] = std::stoul(argv[2]);
procs[2] = std::stoul(argv[3]);
WALBERLA_CHECK_EQUAL(procs[0]*procs[1]*procs[2], mpi::MPIManager::instance()->numProcesses());
Vector3<bool> periodicity;
periodicity[0] = std::stoul(argv[4]);
periodicity[1] = std::stoul(argv[5]);
periodicity[2] = std::stoul(argv[6]);
uint_t iterations = std::stoul(argv[7]);
uint_t messageSize = std::stoul(argv[8]);
std::string stencil = argv[9];
WALBERLA_CHECK_EQUAL(stencil, "D3Q27", "only D3Q27 is supported!");
WALBERLA_LOG_DEVEL_VAR_ON_ROOT(procs);
WALBERLA_LOG_DEVEL_VAR_ON_ROOT(periodicity);
WALBERLA_LOG_DEVEL_VAR_ON_ROOT(iterations);
WALBERLA_LOG_DEVEL_VAR_ON_ROOT(messageSize);
WALBERLA_LOG_DEVEL_VAR_ON_ROOT(stencil);
MPIInfo mpiInfo(procs, periodicity);
WcTimingPool tp;
WALBERLA_MPI_BARRIER();
if (stencil == "D3Q27")
{
communicate<stencil::D3Q27>(mpiInfo, iterations, messageSize, false, tp);
communicate<stencil::D3Q27>(mpiInfo, iterations, messageSize, true, tp);
} else if (stencil == "D3Q19")
{
communicate<stencil::D3Q19>(mpiInfo, iterations, messageSize, false, tp);
communicate<stencil::D3Q19>(mpiInfo, iterations, messageSize, true, tp);
} else if (stencil == "D3Q7")
{
communicate<stencil::D3Q7>(mpiInfo, iterations, messageSize, false, tp);
communicate<stencil::D3Q7>(mpiInfo, iterations, messageSize, true, tp);
} else
{
WALBERLA_ABORT("stencil not supported: " << stencil);
}
WALBERLA_LOG_INFO_ON_ROOT(tp);
WALBERLA_ROOT_SECTION()
{
std::map< std::string, walberla::int64_t > integerProperties;
std::map< std::string, double > realProperties;
std::map< std::string, std::string > stringProperties;
integerProperties["procs_x"] = int64_c(procs[0]);
integerProperties["procs_y"] = int64_c(procs[1]);
integerProperties["procs_z"] = int64_c(procs[2]);
integerProperties["priodicity_x"] = int64_c(periodicity[0]);
integerProperties["priodicity_y"] = int64_c(periodicity[1]);
integerProperties["priodicity_z"] = int64_c(periodicity[2]);
integerProperties["iterations"] = int64_c(iterations);
integerProperties["messageSize"] = int64_c(messageSize);
stringProperties["stencil"] = stencil;
auto runId = postprocessing::storeRunInSqliteDB( "ProbeVsTwoMessages.sqlite", integerProperties, stringProperties, realProperties );
postprocessing::storeTimingPoolInSqliteDB( "ProbeVsTwoMessages.sqlite", runId, tp, "Timings" );
}
return 0;
}
} // namespace walberla
int main( int argc, char* argv[] )
{
return walberla::main( argc, argv );
}
......@@ -151,6 +151,7 @@ public:
inline const Matrix2 getInverse() const;
inline bool isSingular() const;
inline bool isSymmetric() const;
inline Type* data() {return v_;}
//@}
//*******************************************************************************************************************
......
......@@ -173,6 +173,7 @@ public:
inline const Matrix3 getCholesky() const;
template< typename Other > inline const Vector3<HIGH> solve( const Vector3<Other> &rhs ) const;
inline Type trace() const;
inline Type* data() {return v_;}
//@}
//*******************************************************************************************************************
......
......@@ -159,6 +159,7 @@ public:
inline Length length() const;
inline Type sqrLength() const;
inline Vector2<Length> getNormalized() const;
inline Type* data() {return v_;}
//@}
//*******************************************************************************************************************
......
......@@ -165,6 +165,7 @@ public:
inline Vector3<Length> getNormalized() const;
inline Vector3<Length> getNormalizedOrZero() const;
inline void reset();
inline Type* data() {return v_;}
//@}
//*******************************************************************************************************************
......
......@@ -194,6 +194,9 @@ public:
//@}
//*******************************************************************************************************************
void useIProbe(const bool use) { useIProbe_ = use; }
bool isIProbedUsed() const { return useIProbe_; }
///Bytes sent during the current or last communication
int64_t getBytesSent() const { return bytesSent_; }
///Bytes received during the current or last communication
......@@ -226,10 +229,11 @@ protected:
RecvBuffer_T * waitForNext( MPIRank & fromRank );
void setCommunicationType( const bool knownSize );
internal::KnownSizeCommunication<RecvBuffer_T, SendBuffer_T> knownSizeComm_;
internal::UnknownSizeCommunication<RecvBuffer_T, SendBuffer_T> unknownSizeComm_;
internal::NoMPICommunication<RecvBuffer_T, SendBuffer_T> noMPIComm_;
internal::AbstractCommunication<RecvBuffer_T, SendBuffer_T> * currentComm_; //< after receiver setup, this points to unknown- or knownSizeComm_
internal::KnownSizeCommunication<RecvBuffer_T, SendBuffer_T> knownSizeComm_;
internal::UnknownSizeCommunication<RecvBuffer_T, SendBuffer_T> unknownSizeComm_;
internal::UnknownSizeCommunicationIProbe<RecvBuffer_T, SendBuffer_T> unknownSizeCommIProbe_;
internal::NoMPICommunication<RecvBuffer_T, SendBuffer_T> noMPIComm_;
internal::AbstractCommunication<RecvBuffer_T, SendBuffer_T> * currentComm_; //< after receiver setup, this points to unknown- or knownSizeComm_
bool sizeChangesEverytime_; //< if set to true, the receiveSizeUnknown_ is set to true before communicating
bool communicationRunning_; //< indicates if a communication step is currently running
......@@ -251,6 +255,8 @@ protected:
//each concurrently running communication uses different tags
static std::set<int> activeTags_;
bool useIProbe_ = false; ///< switch betwenn IProbe and two message communication for varying size communication
int64_t bytesSent_ = 0; ///< number of bytes sent during last communication
int64_t bytesReceived_ = 0; ///< number of bytes received during last communication
......
......@@ -127,6 +127,7 @@ template< typename Rb, typename Sb>
GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const MPI_Comm & communicator, int tag )
: knownSizeComm_ ( communicator, tag ),
unknownSizeComm_( communicator, tag ),
unknownSizeCommIProbe_( communicator, tag ),
noMPIComm_( communicator, tag ),
currentComm_ ( nullptr ),
sizeChangesEverytime_( true ),
......@@ -138,6 +139,7 @@ template< typename Rb, typename Sb>
GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &other )
: knownSizeComm_ ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
unknownSizeComm_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
unknownSizeCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
noMPIComm_ ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
currentComm_ ( nullptr ),
sizeChangesEverytime_( other.sizeChangesEverytime_ ),
......@@ -150,6 +152,8 @@ GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &oth
currentComm_ = &knownSizeComm_;
else if ( other.currentComm_ == &other.unknownSizeComm_ )
currentComm_ = &unknownSizeComm_;
else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
currentComm_ = &unknownSizeCommIProbe_;
else if ( other.currentComm_ == &other.noMPIComm_ )
currentComm_ = &noMPIComm_;
else
......@@ -170,6 +174,8 @@ GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const Gene
currentComm_ = &knownSizeComm_;
else if ( other.currentComm_ == &other.unknownSizeComm_ )
currentComm_ = &unknownSizeComm_;
else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
currentComm_ = &unknownSizeCommIProbe_;
else if ( other.currentComm_ == &other.noMPIComm_ )
currentComm_ = &noMPIComm_;
else
......@@ -508,6 +514,8 @@ void GenericBufferSystem<Rb, Sb>::setCommunicationType( const bool knownSize )
{
if( knownSize )
currentComm_ = &knownSizeComm_;
else if ( useIProbe_ )
currentComm_ = &unknownSizeCommIProbe_;
else
currentComm_ = &unknownSizeComm_;
}
......
......@@ -161,6 +161,34 @@ namespace internal {
};
template< typename RecvBuffer_T, typename SendBuffer_T>
class UnknownSizeCommunicationIProbe : public AbstractCommunication<RecvBuffer_T, SendBuffer_T>
{
public:
using typename AbstractCommunication<RecvBuffer_T, SendBuffer_T>::ReceiveInfo;
UnknownSizeCommunicationIProbe( const MPI_Comm & communicator, int tag = 0 )
: AbstractCommunication<RecvBuffer_T, SendBuffer_T>( communicator, tag ), sending_(false), receiving_(false) {}
virtual ~UnknownSizeCommunicationIProbe() {}
virtual void send( MPIRank receiver, const SendBuffer_T & sendBuffer );
virtual void waitForSends();
virtual void scheduleReceives( std::map<MPIRank, ReceiveInfo> & recvInfos );
/// size field of recvInfos can be invalid, is filled in with the actual message size
virtual MPIRank waitForNextReceive( std::map<MPIRank, ReceiveInfo> & recvInfos );
private:
bool sending_;
bool receiving_;
int pendingReceives_;
std::vector<MPI_Request> sendRequests_;
};
template< typename RecvBuffer_T, typename SendBuffer_T>
class NoMPICommunication : public AbstractCommunication<RecvBuffer_T, SendBuffer_T>
{
......
......@@ -349,6 +349,133 @@ MPIRank UnknownSizeCommunication<Rb, Sb>::waitForNextReceive( std::map<MPIRank,
//======================================================================================================================
//
// Unknown Size Communication (IProbe method)
//
//======================================================================================================================
template< typename Rb, typename Sb>
void UnknownSizeCommunicationIProbe<Rb, Sb>::send( MPIRank receiver, const Sb & sendBuffer )
{
WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
if ( ! sending_ )
sending_ = true;
// Send content message
sendRequests_.push_back( MPI_REQUEST_NULL );
MPI_Request & contentMsgReq = sendRequests_.back();
//WALBERLA_LOG_DEVEL("sending " << sendBuffer.size() << " to " << receiver);
MPI_Isend(sendBuffer.ptr(), // pointer to size buffer
int_c( sendBuffer.size() ), // send one size
MPI_BYTE, // type
receiver, // receiver rank
this->tag_, // message tag
this->communicator_, // communicator
& contentMsgReq // request needed for wait
);
}
template< typename Rb, typename Sb>
void UnknownSizeCommunicationIProbe<Rb, Sb>::waitForSends()
{
WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
sending_ = false;
if ( sendRequests_.empty() )
return;
MPI_Waitall( int_c( sendRequests_.size() ),
&sendRequests_[0],
MPI_STATUSES_IGNORE );
sendRequests_.clear();
}
template< typename Rb, typename Sb>
void UnknownSizeCommunicationIProbe<Rb, Sb>::scheduleReceives( std::map<MPIRank, ReceiveInfo> & recvInfos )
{
WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
receiving_ = true;
pendingReceives_ = int_c(recvInfos.size());
for( auto it = recvInfos.begin(); it != recvInfos.end(); ++it )
{
ReceiveInfo & recvInfo = it->second;
recvInfo.size = INVALID_SIZE;
}
}
template< typename Rb, typename Sb>
MPIRank UnknownSizeCommunicationIProbe<Rb, Sb>::waitForNextReceive( std::map<MPIRank, ReceiveInfo> & recvInfos )
{
WALBERLA_NON_MPI_SECTION() { WALBERLA_ASSERT( false ); }
WALBERLA_ASSERT( receiving_ );
if( pendingReceives_ < 1 ) {
receiving_ = false;
return INVALID_RANK;
}
// On first entry in this function:
// - receive request vector contains requests for size messages
// - all ReceiveInfo's have sizeReceived=false
while( true ) // this loops as long as size messages are received
{
for( auto it = recvInfos.begin(); it != recvInfos.end(); ++it )
{
const MPIRank sender = it->first;
ReceiveInfo & recvInfo = it->second;
if (recvInfo.size != INVALID_SIZE) continue;
int probeFlag;
MPI_Status probeStatus;
MPI_Iprobe( sender,
this->tag_,
this->communicator_,
&probeFlag,
&probeStatus);
if (probeFlag)
{
int count = 0;
MPI_Get_count( &probeStatus, MPI_BYTE, &count );
//WALBERLA_LOG_DEVEL("received " << count << " from " << sender);
recvInfo.size = count;
recvInfo.buffer.resize( uint_c( recvInfo.size ) );
MPI_Status recvStatus;
MPI_Recv( recvInfo.buffer.ptr(), // where to store received size
count, // size of expected message
MPI_BYTE, // type
sender, // rank of sender process
this->tag_, // message tag
this->communicator_, // communicator
&recvStatus // request, needed for wait
);
--pendingReceives_;
return sender;
}
}
}
WALBERLA_ASSERT( false );
return INVALID_RANK; //cannot happen - only to prevent compiler warnings
}
//======================================================================================================================
//
// NoMPI Communication
......
......@@ -28,6 +28,8 @@
#include "WcPolicy.h"
#include "core/DataTypes.h"
#include <iomanip>
#include <iostream>
#include <limits>
......@@ -442,6 +444,24 @@ inline void Timer<TP>::merge( const Timer<TP> & other )
//**********************************************************************************************************************
//======================================================================================================================
//
// OSTREAM OVERLOAD
//
//======================================================================================================================
template< typename TP > // Timing policy
std::ostream & operator<< ( std::ostream & os, const Timer<TP> & timer )
{
os << std::fixed << std::setprecision(3) <<
"average: " << timer.average() <<
" | min: " << timer.min() <<
" | max: " << timer.max() <<
" | variance: " << timer.variance();
return os;
}
} // namespace timing
typedef timing::Timer<timing::CpuPolicy> CpuTimer;
......
......@@ -148,9 +148,6 @@ waLBerla_execute_test( NAME SetReductionTest27 COMMAND $<TARGET_FILE:SetReductio
waLBerla_compile_test( FILES mpi/ProbeVsExtraMessage.cpp DEPENDS postprocessing)
##############
# selectable #
##############
......
......@@ -119,7 +119,7 @@ void symmetricCommunication()
* Every process sends a message as big as his rank number
* to the neighboring processes (1D , periodic boundary)
*/
void asymmetricCommunication()
void asymmetricCommunication(const bool useIProbe)
{
auto mpiManager = MPIManager::instance();
......@@ -132,6 +132,7 @@ void asymmetricCommunication()
BufferSystem bs ( MPI_COMM_WORLD );
bs.useIProbe(useIProbe);
// Set receiver information
std::set<int> receiveFrom;
......@@ -186,7 +187,7 @@ void asymmetricCommunication()
// like asymmetricCommunication, but the message size is a random value
// that changes every communication step
void timeVaryingCommunication()
void timeVaryingCommunication(const bool useIProbe)
{
auto mpiManager = MPIManager::instance();
......@@ -198,6 +199,7 @@ void timeVaryingCommunication()
WALBERLA_CHECK_GREATER_EQUAL( numProcesses, 3 );
BufferSystem bs ( MPI_COMM_WORLD );
bs.useIProbe(useIProbe);
// artificial special case: no message from root
bs.sendBuffer( rightNeighbor );
......@@ -257,7 +259,7 @@ void timeVaryingCommunication()
* i.e. rank 1 sends a "1" once, rank 2 sends a message containing two "2"'s ...
*/
void gatherUsingAsymmetricCommunication()
void gatherUsingAsymmetricCommunication(const bool useIProbe)
{
int rank = MPIManager::instance()->worldRank();
int numProcesses = MPIManager::instance()->numProcesses();
......@@ -267,6 +269,7 @@ void gatherUsingAsymmetricCommunication()
const int TAG=42;
BufferSystem bs (MPI_COMM_WORLD, TAG );
bs.useIProbe(useIProbe);
if ( rank ==0 )
......@@ -381,13 +384,16 @@ int main(int argc, char**argv)
symmetricCommunication();
WALBERLA_LOG_INFO_ON_ROOT("Testing Asymmetric Communication...");
asymmetricCommunication();
asymmetricCommunication(false);
asymmetricCommunication(true);
WALBERLA_LOG_INFO_ON_ROOT("Testing time-varying Communication...");
timeVaryingCommunication();
timeVaryingCommunication(false);
timeVaryingCommunication(true);
WALBERLA_LOG_INFO_ON_ROOT("Testing Gather Operation...");
gatherUsingAsymmetricCommunication();
gatherUsingAsymmetricCommunication(false);
gatherUsingAsymmetricCommunication(true);
WALBERLA_LOG_INFO_ON_ROOT("Testing self-send...");
selfSend();
......
//======================================================================================================================
//
// This file is part of waLBerla. waLBerla is free software: you can
// redistribute it and/or modify it under the terms of the GNU General Public
// License as published by the Free Software Foundation, either version 3 of
// the License, or (at your option) any later version.
//
// waLBerla is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
// for more details.
//
// You should have received a copy of the GNU General Public License along
// with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
//
//! \file ProbeVsExtraMessage.h
//! \author Martin Bauer <martin.bauer@fau.de>
//! \brief Micro Benchmark, measuring time for different variable sized communications
//
//======================================================================================================================
#include "core/debug/TestSubsystem.h"
#include "core/Environment.h"
#include "core/mpi/MPIManager.h"
#include "core/math/Random.h"
#include "core/timing/TimingPool.h"
#include "postprocessing/sqlite/SQLite.h"
#include <iostream>
#include <sstream>
namespace walberla {
int getProcToReceiveFrom()
{
auto mpi = MPIManager::instance();
int res = mpi->worldRank() -1;
if ( res < 0 )
res = mpi->numProcesses() -1;
return res;
}
int getProcToSendTo()
{
auto mpi = MPIManager::instance();
int res = mpi->worldRank() +1;
if ( res == mpi->numProcesses() )
res = 0;
return res;
}
int getRandomMessageSize( uint_t maxMessageSize )
{
return int_c( math::intRandom( maxMessageSize / 3, maxMessageSize-1 ) );
}
void iprobeVersion( uint_t iterations, uint_t maxMessageSize, WcTimingPool & timingPool )
{
char * sendBuf = new char[ maxMessageSize ];
char * recvBuf = new char[ maxMessageSize ];
auto & timer = timingPool["iprobe"];
for( uint_t i =0; i < iterations; ++i )
{
int messageSize = getRandomMessageSize( maxMessageSize );
timer.start();
MPI_Request sendRequest;
MPI_Isend( sendBuf, messageSize, MPI_BYTE, getProcToSendTo(), 0, MPI_COMM_WORLD, &sendRequest );
MPI_Status probeStatus;
MPI_Probe( getProcToReceiveFrom(), 0, MPI_COMM_WORLD, &probeStatus );
int count = 0;
MPI_Get_count( &probeStatus, MPI_BYTE, &count );
MPI_Recv( recvBuf, count, MPI_BYTE, getProcToReceiveFrom(), 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE );
MPI_Waitall( 1, &sendRequest, MPI_STATUSES_IGNORE );
timer.end();
}
delete [] sendBuf;
delete [] recvBuf;
}