-
Martin Bauer authored17bafdf6
Forked from
waLBerla / waLBerla
2119 commits behind the upstream repository.
ProbeVsExtraMessage.cpp 6.57 KiB
//======================================================================================================================
//
// This file is part of waLBerla. waLBerla is free software: you can
// redistribute it and/or modify it under the terms of the GNU General Public
// License as published by the Free Software Foundation, either version 3 of
// the License, or (at your option) any later version.
//
// waLBerla is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
// for more details.
//
// You should have received a copy of the GNU General Public License along
// with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
//
//! \file ProbeVsExtraMessage.h
//! \author Martin Bauer <martin.bauer@fau.de>
//! \brief Micro Benchmark, measuring time for different variable sized communications
//
//======================================================================================================================
#include "core/debug/TestSubsystem.h"
#include "core/Environment.h"
#include "core/mpi/MPIManager.h"
#include "core/math/Random.h"
#include "core/timing/TimingPool.h"
#include "postprocessing/sqlite/SQLite.h"
#include <iostream>
#include <sstream>
using namespace walberla;
int getProcToReceiveFrom()
{
auto mpi = MPIManager::instance();
int res = mpi->worldRank() -1;
if ( res < 0 )
res = mpi->numProcesses() -1;
return res;
}
int getProcToSendTo()
{
auto mpi = MPIManager::instance();
int res = mpi->worldRank() +1;
if ( res == mpi->numProcesses() )
res = 0;
return res;
}
int getRandomMessageSize( uint_t maxMessageSize )
{
return int_c( math::intRandom( maxMessageSize / 3, maxMessageSize-1 ) );
}
void iprobeVersion( uint_t iterations, uint_t maxMessageSize, WcTimingPool & timingPool )
{
char * sendBuf = new char[ maxMessageSize ];
char * recvBuf = new char[ maxMessageSize ];
auto & timer = timingPool["iprobe"];
for( uint_t i =0; i < iterations; ++i )
{
int messageSize = getRandomMessageSize( maxMessageSize );
timer.start();
MPI_Request sendRequest;
MPI_Isend( sendBuf, messageSize, MPI_BYTE, getProcToSendTo(), 0, MPI_COMM_WORLD, &sendRequest );
MPI_Status probeStatus;
MPI_Probe( getProcToReceiveFrom(), 0, MPI_COMM_WORLD, &probeStatus );
int count = 0;
MPI_Get_count( &probeStatus, MPI_BYTE, &count );
MPI_Recv( recvBuf, count, MPI_BYTE, getProcToReceiveFrom(), 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE );
MPI_Waitall( 1, &sendRequest, MPI_STATUSES_IGNORE );
timer.end();
}
delete [] sendBuf;
delete [] recvBuf;
}
void twoMessageVersion( uint_t iterations, uint_t maxMessageSize, WcTimingPool & timingPool )
{
char * sendBuf = new char[ maxMessageSize ];
char * recvBuf = new char[ maxMessageSize ];
auto & timer = timingPool["twoMessages"];
int recvSize;
MPI_Request sendRequests[2];
const int TAG_SIZE_MSG = 1;
const int TAG_CONTENT_MSG = 0;
for( uint_t i =0; i < iterations; ++i )
{
int sendSize = getRandomMessageSize( maxMessageSize );
timer.start();
MPI_Request recvSizeMsgRequest;
MPI_Irecv( &recvSize, 1 , MPI_INT, getProcToReceiveFrom(), TAG_SIZE_MSG, MPI_COMM_WORLD, &recvSizeMsgRequest );
MPI_Isend( &sendSize, 1 , MPI_INT, getProcToSendTo(), TAG_SIZE_MSG, MPI_COMM_WORLD, &sendRequests[0] );
MPI_Isend( sendBuf , sendSize, MPI_BYTE, getProcToSendTo(), TAG_CONTENT_MSG, MPI_COMM_WORLD, &sendRequests[1] );
// wait for size message to arrive
MPI_Waitall( 1, &recvSizeMsgRequest, MPI_STATUSES_IGNORE );
// receive content
MPI_Recv( recvBuf, recvSize, MPI_BYTE, getProcToReceiveFrom(), TAG_CONTENT_MSG, MPI_COMM_WORLD, MPI_STATUS_IGNORE );
// wait for sends to finish
MPI_Waitall( 2, sendRequests, MPI_STATUSES_IGNORE );
timer.end();
}
delete [] sendBuf;
delete [] recvBuf;
}
void maxMessageSizeVersion( uint_t iterations, uint_t maxMessageSize, WcTimingPool & timingPool )
{
char * sendBuf = new char[ maxMessageSize ];
char * recvBuf = new char[ maxMessageSize ];
auto & timer = timingPool["maxMessageSizeKnown"];
for( uint_t i =0; i < iterations; ++i )
{
int messageSize = getRandomMessageSize( maxMessageSize );
timer.start();
MPI_Request sendRequest;
MPI_Request recvRequest;
MPI_Irecv( recvBuf, int_c(maxMessageSize), MPI_BYTE, getProcToReceiveFrom(), 0, MPI_COMM_WORLD, &recvRequest );
MPI_Isend( sendBuf, messageSize, MPI_BYTE, getProcToSendTo(), 0, MPI_COMM_WORLD, &sendRequest );
MPI_Status status;
MPI_Waitall( 1, &recvRequest, &status );
int count = 0;
MPI_Get_count( &status, MPI_BYTE, &count );
MPI_Waitall( 1, &sendRequest, MPI_STATUSES_IGNORE );
timer.end();
}
delete [] sendBuf;
delete [] recvBuf;
}
int main( int argc, char ** argv )
{
debug::enterTestMode();
mpi::Environment mpiEnv( argc, argv );
MPIManager::instance()->useWorldComm();
using namespace std;
WALBERLA_ROOT_SECTION() {
if ( argc != 3 && argc != 4 ) {
cerr << "Wrong number of arguments " << endl;
cerr << "Usage ./probeVsExtraMessage <iterations> <messageSize> " << endl;
}
}
uint_t iterations;
uint_t messageSize;
std::string arg1( argv[1] );
std::string arg2( argv[2] );
std::stringstream streamIterations ( arg1 );
std::stringstream streamMessageSize ( arg2 );
streamIterations >> iterations;
streamMessageSize >> messageSize;
WcTimingPool tp;
iprobeVersion ( iterations, messageSize, tp );
twoMessageVersion ( iterations, messageSize, tp );
maxMessageSizeVersion ( iterations, messageSize, tp );
WALBERLA_ROOT_SECTION() {
cout << tp << endl;
}
if( argc == 4) {
const auto reducedTimeloopTiming = tp.getReduced();
WALBERLA_ROOT_SECTION() {
std::string dbFile( argv[3] );
std::map<std::string, walberla::int64_t> integerProperties;
integerProperties["iterations"] = int64_c(iterations);
integerProperties["messageSize"] = int64_c(messageSize);
integerProperties["processes"] = int64_c(mpi::MPIManager::instance()->numProcesses());
postprocessing::SQLiteDB db( dbFile );
auto runid = db.storeRun( integerProperties, std::map<std::string, std::string>(), std::map<string, double>());
db.storeTimingPool( runid, *reducedTimeloopTiming, "timings" );
}
}
return 0;
}