//====================================================================================================================== // // This file is part of waLBerla. waLBerla is free software: you can // redistribute it and/or modify it under the terms of the GNU General Public // License as published by the Free Software Foundation, either version 3 of // the License, or (at your option) any later version. // // waLBerla is distributed in the hope that it will be useful, but WITHOUT // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License // for more details. // // You should have received a copy of the GNU General Public License along // with waLBerla (see COPYING.txt). If not, see . // //! \file UniformGPUScheme.impl.h //! \ingroup cuda //! \author Martin Bauer // //====================================================================================================================== #include "cuda/ParallelStreams.h" namespace walberla { namespace cuda { namespace communication { template UniformGPUScheme::UniformGPUScheme( weak_ptr bf, bool sendDirectlyFromGPU, const int tag ) : blockForest_( bf ), setupBeforeNextCommunication_( true ), communicationInProgress_( false ), sendFromGPU_( sendDirectlyFromGPU ), bufferSystemCPU_( mpi::MPIManager::instance()->comm(), tag ), bufferSystemGPU_( mpi::MPIManager::instance()->comm(), tag ), parallelSectionManager_( -1 ) {} template void UniformGPUScheme::startCommunication( cudaStream_t stream ) { WALBERLA_ASSERT( !communicationInProgress_ ); auto forest = blockForest_.lock(); auto currentBlockForestStamp = forest->getBlockForest().getModificationStamp(); if( setupBeforeNextCommunication_ || currentBlockForestStamp != forestModificationStamp_ ) setupCommunication(); // Schedule Receives if( sendFromGPU_ ) bufferSystemGPU_.scheduleReceives(); else bufferSystemCPU_.scheduleReceives(); if( !sendFromGPU_ ) for( auto it : headers_ ) bufferSystemGPU_.sendBuffer( it.first ).clear(); // Start filling send buffers { auto parallelSection = parallelSectionManager_.parallelSection( stream ); for( auto &iBlock : *forest ) { auto block = dynamic_cast< Block * >( &iBlock ); for( auto dir = Stencil::beginNoCenter(); dir != Stencil::end(); ++dir ) { const auto neighborIdx = blockforest::getBlockNeighborhoodSectionIndex( *dir ); if( block->getNeighborhoodSectionSize( neighborIdx ) == uint_t( 0 )) continue; auto nProcess = mpi::MPIRank( block->getNeighborProcess( neighborIdx, uint_t( 0 ))); for( auto &pi : packInfos_ ) { parallelSection.run([&](auto s) { auto size = pi->size( *dir, block ); auto gpuDataPtr = bufferSystemGPU_.sendBuffer( nProcess ).advanceNoResize( size ); WALBERLA_ASSERT_NOT_NULLPTR( gpuDataPtr ); pi->pack( *dir, gpuDataPtr, block, s ); if( !sendFromGPU_ ) { auto cpuDataPtr = bufferSystemCPU_.sendBuffer( nProcess ).advanceNoResize( size ); WALBERLA_ASSERT_NOT_NULLPTR( cpuDataPtr ); WALBERLA_CUDA_CHECK( cudaMemcpyAsync( cpuDataPtr, gpuDataPtr, size, cudaMemcpyDeviceToHost, s )); } }); } } } } // wait for packing to finish cudaStreamSynchronize( stream ); if( sendFromGPU_ ) bufferSystemGPU_.sendAll(); else bufferSystemCPU_.sendAll(); communicationInProgress_ = true; } template void UniformGPUScheme::wait( cudaStream_t stream ) { WALBERLA_ASSERT( communicationInProgress_ ); auto forest = blockForest_.lock(); if( sendFromGPU_ ) { auto parallelSection = parallelSectionManager_.parallelSection( stream ); for( auto recvInfo = bufferSystemGPU_.begin(); recvInfo != bufferSystemGPU_.end(); ++recvInfo ) { recvInfo.buffer().clear(); for( auto &header : headers_[recvInfo.rank()] ) { auto block = dynamic_cast< Block * >( forest->getBlock( header.blockId )); for( auto &pi : packInfos_ ) { auto size = pi->size( header.dir, block ); auto gpuDataPtr = recvInfo.buffer().advanceNoResize( size ); WALBERLA_ASSERT_NOT_NULLPTR( gpuDataPtr ); parallelSection.run([&](auto s) { pi->unpack( stencil::inverseDir[header.dir], gpuDataPtr, block, s ); }); } } } } else { auto parallelSection = parallelSectionManager_.parallelSection( stream ); for( auto recvInfo = bufferSystemCPU_.begin(); recvInfo != bufferSystemCPU_.end(); ++recvInfo ) { auto &gpuBuffer = bufferSystemGPU_.sendBuffer( recvInfo.rank()); recvInfo.buffer().clear(); gpuBuffer.clear(); for( auto &header : headers_[recvInfo.rank()] ) { auto block = dynamic_cast< Block * >( forest->getBlock( header.blockId )); for( auto &pi : packInfos_ ) { auto size = pi->size( header.dir, block ); auto cpuDataPtr = recvInfo.buffer().advanceNoResize( size ); auto gpuDataPtr = gpuBuffer.advanceNoResize( size ); WALBERLA_ASSERT_NOT_NULLPTR( cpuDataPtr ); WALBERLA_ASSERT_NOT_NULLPTR( gpuDataPtr ); parallelSection.run([&](auto s) { WALBERLA_CUDA_CHECK( cudaMemcpyAsync( gpuDataPtr, cpuDataPtr, size, cudaMemcpyHostToDevice, s )); pi->unpack( stencil::inverseDir[header.dir], gpuDataPtr, block, s ); }); } } } } communicationInProgress_ = false; } template void UniformGPUScheme::setupCommunication() { auto forest = blockForest_.lock(); headers_.clear(); std::map receiverInfo; // how many bytes to send to each neighbor mpi::BufferSystem headerExchangeBs( mpi::MPIManager::instance()->comm(), 123 ); for( auto &iBlock : *forest ) { auto block = dynamic_cast< Block * >( &iBlock ); for( auto dir = Stencil::beginNoCenter(); dir != Stencil::end(); ++dir ) { // skip if block has no neighbors in this direction const auto neighborIdx = blockforest::getBlockNeighborhoodSectionIndex( *dir ); if( block->getNeighborhoodSectionSize( neighborIdx ) == uint_t( 0 )) continue; WALBERLA_ASSERT( block->neighborhoodSectionHasEquallySizedBlock( neighborIdx ), "Works for uniform setups only" ); WALBERLA_ASSERT_EQUAL( block->getNeighborhoodSectionSize( neighborIdx ), uint_t( 1 ), "Works for uniform setups only" ); const BlockID &nBlockId = block->getNeighborId( neighborIdx, uint_t( 0 )); auto nProcess = mpi::MPIRank( block->getNeighborProcess( neighborIdx, uint_t( 0 ))); for( auto &pi : packInfos_ ) receiverInfo[nProcess] += mpi::MPISize( pi->size( *dir, block )); auto &headerBuffer = headerExchangeBs.sendBuffer( nProcess ); nBlockId.toBuffer( headerBuffer ); headerBuffer << *dir; } } headerExchangeBs.setReceiverInfoFromSendBufferState( false, true ); headerExchangeBs.sendAll(); for( auto recvIter = headerExchangeBs.begin(); recvIter != headerExchangeBs.end(); ++recvIter ) { auto &headerVector = headers_[recvIter.rank()]; auto &buffer = recvIter.buffer(); while ( buffer.size()) { Header header; header.blockId.fromBuffer( buffer ); buffer >> header.dir; headerVector.push_back( header ); } } bufferSystemCPU_.setReceiverInfo( receiverInfo ); bufferSystemGPU_.setReceiverInfo( receiverInfo ); for( auto it : receiverInfo ) { bufferSystemCPU_.sendBuffer( it.first ).resize( size_t(it.second) ); bufferSystemGPU_.sendBuffer( it.first ).resize( size_t(it.second) ); } forestModificationStamp_ = forest->getBlockForest().getModificationStamp(); setupBeforeNextCommunication_ = false; } template void UniformGPUScheme::addPackInfo( const shared_ptr &pi ) { WALBERLA_ASSERT( !communicationInProgress_, "Cannot add pack info while communication is in progress" ); packInfos_.push_back( pi ); setupBeforeNextCommunication_ = true; } } // namespace communication } // namespace cuda } // namespace walberla