//====================================================================================================================== // // This file is part of waLBerla. waLBerla is free software: you can // redistribute it and/or modify it under the terms of the GNU General Public // License as published by the Free Software Foundation, either version 3 of // the License, or (at your option) any later version. // // waLBerla is distributed in the hope that it will be useful, but WITHOUT // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License // for more details. // // You should have received a copy of the GNU General Public License along // with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>. // //! \file BufferSystem.h //! \ingroup core //! \author Martin Bauer <martin.bauer@fau.de> //! \brief Header file for BufferSystem // //====================================================================================================================== #pragma once #include "BufferSystemHelper.h" #include "core/DataTypes.h" #include "core/debug/Debug.h" #include <boost/range/counting_range.hpp> #include <map> #include <set> #include <vector> namespace walberla { namespace mpi { class OpenMPBufferSystem; //********************************************************************************************************************** /*! Manages MPI Communication with a set of known communication partners. * * Features / Restrictions: * - usable in every case where normal MPI_Send, MPI_Recv is needed * - communication partners have to be known, message size not necessarily * - unknown message sizes possible ( -> automatic extra message to exchange sizes ) * - Implemented with non-blocking MPI calls, and MPI_Waitany to process a message as soon * as it was received while still waiting for other messages * * * \ingroup mpi * * * Example: * -------- * \code BufferSystem bs ( MPI_COMM_WORLD ); // we assume every process has exactly two neighbors ( periodic ) // and the neighbor ranks are stored in 'neighborRank1' and 'neighborRank2' bs.sendBuffer( neighborRank1 ) << 42; bs.sendBuffer( neighborRank2 ) << 4242; // We expect to receive the same amount as we send: bs.setReceiverInfoFromSendBufferState( true, false ); bs.sendAll(); for ( auto i = bs.begin(); i != bs.end(); ++i ) { cout << "Message received from " << i.rank() << endl; int messageContent; i.buffer() >> messageContent; cout << "Received " << messageContent << endl; } \endcode * * * Usage: * ------ * * 1. Setup: * - define receive information using setReceiverInfo() or setReceiverInfoFromSendBufferState() * With these functions one defines the processes that we receive from and optionally the size * of the received messages. If the message sizes are unknown, they have to be communicated first. * One also defines if the sizes stay constant or if they change in each communication step * (size message is then sent before every content message) * - The receiver and send information can be changed, if no communication is currently running. * * 2. Communication Step: * - Optionally call scheduleReceives() -> starts communication step and causes MPI_IRecv's to be called. * This is also automatically called on first send operation. * - fill send buffers * - call send() for each buffer after filling the buffer, or call sendAll() after filling all buffers. * The send*() functions return immediately, internally MPI_ISend's are called. * The send*() functions start the communication step if it was not already started by scheduleReceives() * - Receiving: iterate over incoming messages using begin() and end(). Internally a MPI_Waitany is executed that * returns as soon as a single message was received. Then this message can be processed while waiting * for the other messages. * \attention Even if the current process does not receive anything, call begin() and end() * to finish the communication step * - When iteration has reached the end the communication step is finished. * - when communication has finished all Send- and RecvBuffers are automatically cleared * * When running multiple BufferSystems concurrently different MPI tags have to be used * for the systems: the tag can be passed in the constructor. * */ //********************************************************************************************************************** class BufferSystem { public: class iterator; //**Construction and Destruction************************************************************************************* /*!\name Constructors */ //@{ explicit BufferSystem( const MPI_Comm & communicator, int tag = 0 ); BufferSystem( const BufferSystem & other ); BufferSystem & operator=( const BufferSystem & other ); ~BufferSystem() {} //@} //******************************************************************************************************************* //** Receiver Registration *********************************************************************************** /*! \name Receiver Registration */ //@{ template<typename RankIter> void setReceiverInfo( RankIter begin, RankIter end, bool changingSize ); template<typename Range> void setReceiverInfo( const Range & range, bool changingSize ); void setReceiverInfo( const std::set<MPIRank> & ranksToRecvFrom, bool changingSize ); void setReceiverInfo( const std::map<MPIRank,MPISize> & ranksToRecvFrom ); void setReceiverInfoFromSendBufferState( bool useSizeFromSendBuffers, bool changingSize ); void sizeHasChanged( bool alwaysChangingSize = false ); //@} //******************************************************************************************************************* //** Executing Communication Step ********************************************************************************* /*! \name Executing Communication Step */ //@{ void scheduleReceives() { startCommunication(); } SendBuffer & sendBuffer ( MPIRank rank ); SendBuffer & sendBuffer ( uint_t rank ) { return sendBuffer( int_c( rank ) ); } inline size_t size() const; void sendAll(); void send( MPIRank rank ); iterator begin() { WALBERLA_ASSERT( communicationRunning_); return iterator( *this, true ); } iterator end() { return iterator( *this, false); } //@} //******************************************************************************************************************* //** Iterator ************************************************************************************************ /*! \name Iterator */ //@{ class iterator { public: MPIRank rank() { return currentSenderRank_; } RecvBuffer & buffer() { return *currentRecvBuffer_; } void operator++(); bool operator==( const iterator & other ); bool operator!=( const iterator & other ); private: iterator( BufferSystem & bufferSystem, bool begin ); BufferSystem & bufferSystem_; RecvBuffer * currentRecvBuffer_; MPIRank currentSenderRank_; friend class BufferSystem; }; friend class iterator; //@} //******************************************************************************************************************* //** Status Queries ****************************************************************************************** /*! \name Status Queries */ //@{ bool isSizeCommunicatedInNextStep() const { return (currentComm_ == &unknownSizeComm_); } bool isCommunciationRunning() const { return communicationRunning_; } bool isReceiverInformationSet() const { return currentComm_ != NULL; } //@} //******************************************************************************************************************* int64_t getBytesSent() const { return bytesSent_; } int64_t getBytesReceived() const { return bytesReceived_; } //* Rank Ranges ************************************************************************************************* /*! \name Rank Ranges */ //@{ typedef boost::counting_iterator<MPIRank> RankCountIter; typedef boost::iterator_range< RankCountIter > RankRange; static RankRange noRanks(); static RankRange allRanks(); static RankRange allRanksButRoot(); static RankRange onlyRoot(); static RankRange onlyRank( MPIRank includedRank ); //@} //******************************************************************************************************************* protected: friend class OpenMPBufferSystem; void startCommunication(); void endCommunication(); RecvBuffer * waitForNext( MPIRank & fromRank ); void setCommunicationType( const bool knownSize ); internal::KnownSizeCommunication knownSizeComm_; internal::UnknownSizeCommunication unknownSizeComm_; internal::NoMPICommunication noMPIComm_; internal::AbstractCommunication * currentComm_; //< after receiver setup, this points to unknown- or knownSizeComm_ bool sizeChangesEverytime_; //< if set to true, the receiveSizeUnknown_ is set to true before communicating bool communicationRunning_; //< indicates if a communication step is currently running /// Info about the message to be received from a certain rank: /// information holds the buffer and, if known, the message size std::map<MPIRank, internal::AbstractCommunication::ReceiveInfo> recvInfos_; struct SendInfo { SendInfo() : alreadySent(false) {} SendBuffer buffer; bool alreadySent; }; std::map<MPIRank, SendInfo> sendInfos_; //stores tags of running communications in debug mode to ensure that //each concurrently running communication uses different tags static std::set<int> activeTags_; int64_t bytesSent_ = 0; ///< number of bytes sent during last communication int64_t bytesReceived_ = 0; ///< number of bytes received during last communication }; //====================================================================================================================== // // Template function definitions // //====================================================================================================================== template<typename Range> void BufferSystem::setReceiverInfo( const Range & range, bool changingSize ) { setReceiverInfo( range.begin(), range.end(), changingSize ); } template<typename RankIter> void BufferSystem::setReceiverInfo( RankIter rankBegin, RankIter rankEnd, bool changingSize ) { WALBERLA_ASSERT( ! communicationRunning_ ); recvInfos_.clear(); for ( auto it = rankBegin; it != rankEnd; ++it ) { const MPIRank sender = *it; recvInfos_[ sender ].size = INVALID_SIZE; } sizeChangesEverytime_ = changingSize; setCommunicationType( false ); } inline size_t BufferSystem::size() const { size_t sum = 0; for( auto iter = sendInfos_.begin(); iter != sendInfos_.end(); ++iter ) { sum += iter->second.buffer.size(); } return sum; } } // namespace mpi } // namespace walberla