//====================================================================================================================== // // This file is part of waLBerla. waLBerla is free software: you can // redistribute it and/or modify it under the terms of the GNU General Public // License as published by the Free Software Foundation, either version 3 of // the License, or (at your option) any later version. // // waLBerla is distributed in the hope that it will be useful, but WITHOUT // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License // for more details. // // You should have received a copy of the GNU General Public License along // with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>. // //! \file BufferSystemTest.cpp //! \ingroup core //! \author Martin Bauer <martin.bauer@fau.de> //! \brief Tests for BufferSystem: symmetric and asymmetric MPI communication tests // //====================================================================================================================== #include "core/Abort.h" #include "core/debug/TestSubsystem.h" #include "core/logging/Logging.h" #include "core/mpi/BufferSystem.h" #include "core/mpi/Environment.h" #include <boost/random/mersenne_twister.hpp> #include <boost/random/uniform_int.hpp> #include <boost/random/variate_generator.hpp> #include <boost/thread/thread.hpp> #include <cmath> #include <iostream> #include <set> using namespace walberla; using mpi::BufferSystem; typedef boost::mt19937 base_generator_type; /** * Utility function for sleeping a random time * used to simulate a variable process load */ void randomSleep( int maxTimeInMs = 20 ) { static base_generator_type generator(42u); static unsigned int counter =0; counter += 100; int rank = MPIManager::instance()->worldRank(); unsigned int seed = static_cast<unsigned int>(std::time(0)) + static_cast<unsigned int>(rank*1000) + counter; generator.seed(seed); boost::uniform_int<> uni_dist(0,maxTimeInMs); boost::variate_generator<base_generator_type&, boost::uniform_int<> > uni(generator, uni_dist); int sleepTime = uni(); boost::this_thread::sleep( boost::posix_time::milliseconds( sleepTime ) ); } /** * Every process sends a message containing its own rank * to the neighboring processes (1D , periodic boundary) */ void symmetricCommunication() { const int MSG_SIZE = 10; auto mpiManager = MPIManager::instance(); int numProcesses = mpiManager->numProcesses(); int rank = mpiManager->worldRank(); int leftNeighbor = (rank-1+numProcesses) % numProcesses; int rightNeighbor = (rank+1) % numProcesses; WALBERLA_CHECK_GREATER_EQUAL( numProcesses, 3 ); BufferSystem bs ( MPI_COMM_WORLD ); // Pack Message to left neighbor containing own rank for( int i=0; i< MSG_SIZE; ++i ) bs.sendBuffer( leftNeighbor ) << rank; // Pack Message to right neighbor containing own rank for( int i=0; i< MSG_SIZE; ++i ) bs.sendBuffer( rightNeighbor ) << rank; bs.setReceiverInfoFromSendBufferState( true, false ); randomSleep(); bs.sendAll(); // In between we could do some computation randomSleep(); for( auto it = bs.begin(); it != bs.end(); ++it ) { WALBERLA_CHECK ( it.rank() == leftNeighbor || it.rank() == rightNeighbor ); WALBERLA_CHECK_EQUAL( it.buffer().size(), MSG_SIZE * sizeof(int) + MSG_SIZE * mpi::BUFFER_DEBUG_OVERHEAD ); int receivedVal = -1; it.buffer() >> receivedVal; WALBERLA_CHECK_EQUAL( receivedVal, it.rank() ); } } /** * Every process sends a message as big as his rank number * to the neighboring processes (1D , periodic boundary) */ void asymmetricCommunication() { auto mpiManager = MPIManager::instance(); int numProcesses = mpiManager->numProcesses(); int rank = mpiManager->worldRank(); int leftNeighbor = (rank-1+numProcesses) % numProcesses; int rightNeighbor = (rank+1) % numProcesses; WALBERLA_CHECK_GREATER_EQUAL( numProcesses, 3 ); BufferSystem bs ( MPI_COMM_WORLD ); // Set receiver information std::set<int> receiveFrom; if ( leftNeighbor > 0 ) receiveFrom.insert( leftNeighbor ); if ( rightNeighbor > 0 ) receiveFrom.insert( rightNeighbor ); bs.setReceiverInfo( receiveFrom, false ); const uint_t NUM_STEPS = 3; for ( uint_t step = 0; step < NUM_STEPS; ++step ) { // Pack Messages to neighbors containing rank times rank value for( int i=0; i< rank; ++i ) bs.sendBuffer( leftNeighbor ) << rank; for( int i=0; i< rank; ++i ) bs.sendBuffer( rightNeighbor ) << rank; randomSleep(); bs.sendAll(); // In between we could do some computation randomSleep(); for( auto it = bs.begin(); it != bs.end(); ++it ) { if ( it.rank() == leftNeighbor ) { for( int i=0; i < leftNeighbor; ++i ) { int value = -1; it.buffer() >> value; WALBERLA_CHECK_EQUAL( value, leftNeighbor ); } } else if ( it.rank() == rightNeighbor ) { for( int i=0; i < rightNeighbor; ++i ) { int value = -1; it.buffer() >> value; WALBERLA_CHECK_EQUAL( value, rightNeighbor ); } } else WALBERLA_CHECK( false ); // unexpected sender WALBERLA_CHECK( it.buffer().isEmpty() ); } } } // like asymmetricCommunication, but the message size is a random value // that changes every communication step void timeVaryingCommunication() { auto mpiManager = MPIManager::instance(); int numProcesses = mpiManager->numProcesses(); int rank = mpiManager->worldRank(); int leftNeighbor = (rank-1+numProcesses) % numProcesses; int rightNeighbor = (rank+1) % numProcesses; WALBERLA_CHECK_GREATER_EQUAL( numProcesses, 3 ); BufferSystem bs ( MPI_COMM_WORLD ); // artificial special case: no message from root bs.sendBuffer( rightNeighbor ); bs.sendBuffer( leftNeighbor ); bs.setReceiverInfoFromSendBufferState( false, true ); const uint_t NUM_STEPS = 5; for ( uint_t step = 1; step <= NUM_STEPS; ++step ) { for( uint_t i=0; i < std::max<uint_t>( uint_c(rank * leftNeighbor) * step % 17, 1ul); ++i ) bs.sendBuffer( leftNeighbor ) << i; bs.send( leftNeighbor ); for( uint_t i=0; i < std::max<uint_t>( uint_c(rank * rightNeighbor) * step % 17, 1ul); ++i ) bs.sendBuffer( rightNeighbor ) << i; bs.send( rightNeighbor ); WALBERLA_CHECK( bs.isCommunciationRunning() ); for( auto it = bs.begin(); it != bs.end(); ++it ) { if ( it.rank() == leftNeighbor ) { for( uint_t i=0; i < std::max<uint_t>( uint_c(rank * leftNeighbor) * step % 17, 1ul); ++i ) { uint_t value = 0; it.buffer() >> value; WALBERLA_CHECK_EQUAL( value, i ); } } else if ( it.rank() == rightNeighbor ) { for( uint_t i=0; i < std::max<uint_t>( uint_c(rank * rightNeighbor) * step % 17,1ul); ++i ) { uint_t value = 0; it.buffer() >> value; WALBERLA_CHECK_EQUAL( value, i ); } } else WALBERLA_CHECK( false ); // unexpected sender WALBERLA_CHECK( it.buffer().isEmpty() ); } WALBERLA_CHECK( ! bs.isCommunciationRunning() ); } } /** * Gathering using asymmetric communication * every process sends a message of size rank*sizeof(int) containing only its own rank to root process * i.e. rank 1 sends a "1" once, rank 2 sends a message containing two "2"'s ... */ void gatherUsingAsymmetricCommunication() { int rank = MPIManager::instance()->worldRank(); int numProcesses = MPIManager::instance()->numProcesses(); WALBERLA_CHECK_GREATER_EQUAL( numProcesses, 3 ); const int TAG=42; BufferSystem bs (MPI_COMM_WORLD, TAG ); if ( rank ==0 ) bs.setReceiverInfo( BufferSystem::allRanksButRoot(), true ); else bs.setReceiverInfo( std::set<mpi::MPIRank>(), true ); if(rank > 0) { for( int i=0; i < rank; ++i ) bs.sendBuffer(0) << rank; } bs.sendAll(); randomSleep(); for( auto it = bs.begin(); it != bs.end(); ++it ) { WALBERLA_CHECK( rank == 0); // only root should receive something for( int i=0; i < it.rank(); ++i ) { int received = -1; it.buffer() >> received; WALBERLA_CHECK_EQUAL( received, it.rank() ); } } } void selfSend() { int rank = MPIManager::instance()->worldRank(); int numProcesses = MPIManager::instance()->numProcesses(); WALBERLA_CHECK_GREATER_EQUAL( numProcesses, 3 ); const int TAG=42; BufferSystem bs (MPI_COMM_WORLD, TAG ); if ( rank ==0 ) bs.setReceiverInfo( BufferSystem::allRanks(), true ); else bs.setReceiverInfo( std::set<mpi::MPIRank>(), true ); bs.sendBuffer(0) << rank; bs.sendAll(); randomSleep(); for( auto it = bs.begin(); it != bs.end(); ++it ) { WALBERLA_CHECK( rank == 0); // only root should receive something int received = -1; it.buffer() >> received; WALBERLA_CHECK_EQUAL( received, it.rank() ); } } int main(int argc, char**argv) { mpi::Environment mpiEnv( argc, argv ); debug::enterTestMode(); auto mpiManager = MPIManager::instance(); int numProcesses = mpiManager->numProcesses(); if(numProcesses <= 2) { WALBERLA_ABORT("This test has to be executed on at least 3 processes. Executed on " << numProcesses); return 1; } WALBERLA_ROOT_SECTION() { WALBERLA_LOG_INFO("Testing Symmetric Communication..." ); } symmetricCommunication(); WALBERLA_ROOT_SECTION() { WALBERLA_LOG_INFO("Testing Asymmetric Communication..."); } asymmetricCommunication(); WALBERLA_ROOT_SECTION() { WALBERLA_LOG_INFO("Testing time-varying Communication..."); } timeVaryingCommunication(); WALBERLA_ROOT_SECTION() { WALBERLA_LOG_INFO("Testing Gather Operation..."); } gatherUsingAsymmetricCommunication(); WALBERLA_ROOT_SECTION() { WALBERLA_LOG_INFO("Testing selfsend..."); } selfSend(); return EXIT_SUCCESS; }