Skip to content
Snippets Groups Projects
Commit a0c86311 authored by Martin Bauer's avatar Martin Bauer
Browse files

CUDA Parallel Streams & UniformGridGPU Benchmark

- LB on GPU, Uniform Grid Benchmark app
- helper class to schedule tasks to multiple CUDA streams
parent a5c9cb2f
No related merge requests found
...@@ -10,3 +10,4 @@ add_subdirectory( PeriodicGranularGas ) ...@@ -10,3 +10,4 @@ add_subdirectory( PeriodicGranularGas )
add_subdirectory( PoiseuilleChannel ) add_subdirectory( PoiseuilleChannel )
add_subdirectory( SchaeferTurek ) add_subdirectory( SchaeferTurek )
add_subdirectory( UniformGrid ) add_subdirectory( UniformGrid )
add_subdirectory( UniformGridGPU )
waLBerla_link_files_to_builddir( "*.prm" )
waLBerla_add_executable ( NAME UniformGridBenchmarkGPU
FILES UniformGridGPU.cpp UniformGridGPU.gen.py
DEPENDS blockforest boundary core cuda domain_decomposition field geometry timeloop vtk )
#include "core/Environment.h"
#include "python_coupling/CreateConfig.h"
#include "blockforest/Initialization.h"
#include "lbm/field/PdfField.h"
#include "lbm/field/AddToStorage.h"
#include "field/FlagField.h"
#include "field/AddToStorage.h"
#include "lbm/communication/PdfFieldPackInfo.h"
#include "lbm/vtk/VTKOutput.h"
#include "lbm/PerformanceLogger.h"
#include "blockforest/communication/UniformBufferedScheme.h"
#include "timeloop/all.h"
#include "core/math/Random.h"
#include "geometry/all.h"
#include "cuda/HostFieldAllocator.h"
#include "cuda/communication/GPUPackInfo.h"
#include "cuda/ParallelStreams.h"
#include "core/timing/TimingPool.h"
#include "core/timing/RemainingTimeLogger.h"
#include "cuda/AddGPUFieldToStorage.h"
#include "cuda/communication/UniformGPUScheme.h"
#include "lbm/sweeps/CellwiseSweep.h"
#include "domain_decomposition/SharedSweep.h"
#include "UniformGridGPU_LatticeModel.h"
#include "UniformGridGPU_LbKernel.h"
#include "UniformGridGPU_PackInfo.h"
#include "UniformGridGPU_UBB.h"
#include "UniformGridGPU_NoSlip.h"
using namespace walberla;
using LatticeModel_T = lbm::UniformGridGPU_LatticeModel;
using Stencil_T = LatticeModel_T::Stencil;
using CommunicationStencil_T = LatticeModel_T::CommunicationStencil;
using PdfField_T = lbm::PdfField<LatticeModel_T>;
using CommScheme_T = cuda::communication::UniformGPUScheme<CommunicationStencil_T>;
using flag_t = walberla::uint8_t;
using FlagField_T = FlagField<flag_t>;
int main( int argc, char **argv )
{
mpi::Environment env( argc, argv );
for( auto cfg = python_coupling::configBegin( argc, argv ); cfg != python_coupling::configEnd(); ++cfg )
{
auto config = *cfg;
auto blocks = blockforest::createUniformBlockGridFromConfig( config );
// Reading parameters
auto parameters = config->getOneBlock( "Parameters" );
const real_t omega = parameters.getParameter<real_t>( "omega", real_c( 1.4 ));
const uint_t timesteps = parameters.getParameter<uint_t>( "timesteps", uint_c( 50 ));
const Vector3<real_t> initialVelocity = parameters.getParameter< Vector3<real_t> >( "initialVelocity", Vector3<real_t>() );
// Creating fields
auto latticeModel = LatticeModel_T( omega );
BlockDataID pdfFieldCpuID = lbm::addPdfFieldToStorage( blocks, "pdfs on CPU", latticeModel, initialVelocity, real_t(1), field::fzyx );
BlockDataID pdfFieldGpuID = cuda::addGPUFieldToStorage<PdfField_T >( blocks, pdfFieldCpuID, "pdfs on GPU", true );
BlockDataID flagFieldID = field::addFlagFieldToStorage< FlagField_T >( blocks, "flag field" );
// Boundaries
const FlagUID fluidFlagUID( "Fluid" );
auto boundariesConfig = config->getOneBlock( "Boundaries" );
geometry::initBoundaryHandling<FlagField_T>(*blocks, flagFieldID, boundariesConfig);
geometry::setNonBoundaryCellsToDomain<FlagField_T>(*blocks, flagFieldID, fluidFlagUID);
lbm::UniformGridGPU_UBB ubb(blocks, pdfFieldGpuID);
lbm::UniformGridGPU_NoSlip noSlip(blocks, pdfFieldGpuID);
//lbm::GeneratedFixedDensity pressure(blocks, pdfFieldGpuID);
ubb.fillFromFlagField<FlagField_T>( blocks, flagFieldID, FlagUID("UBB"), fluidFlagUID );
noSlip.fillFromFlagField<FlagField_T>( blocks, flagFieldID, FlagUID("NoSlip"), fluidFlagUID );
//pressure.fillFromFlagField<FlagField_T>( blocks, flagFieldID, FlagUID("pressure"), fluidFlagUID );
// Communication setup
bool overlapCommunication = parameters.getParameter<bool>( "overlapCommunication", true );
bool cudaEnabledMPI = parameters.getParameter<bool>( "cudaEnabledMPI", false );
int streamHighPriority = 0;
int streamLowPriority = 0;
WALBERLA_CUDA_CHECK( cudaDeviceGetStreamPriorityRange(&streamLowPriority, &streamHighPriority) );
pystencils::UniformGridGPU_LbKernel lbKernel( pdfFieldGpuID, omega );
lbKernel.setOuterPriority( streamHighPriority );
CommScheme_T gpuComm( blocks, cudaEnabledMPI );
gpuComm.addPackInfo( make_shared<pystencils::UniformGridGPU_PackInfo>( pdfFieldGpuID ));
auto defaultStream = cuda::StreamRAII::newPriorityStream( streamLowPriority );
auto innerOuterStreams = cuda::ParallelStreams( streamHighPriority );
auto boundaryOuterStreams = cuda::ParallelStreams( streamHighPriority );
auto boundaryInnerStreams = cuda::ParallelStreams( streamHighPriority );
auto overlapTimeStep = [&]()
{
auto innerOuterSection = innerOuterStreams.parallelSection( defaultStream );
innerOuterSection.run([&]( auto innerStream )
{
for( auto &block: *blocks )
{
{
auto p = boundaryInnerStreams.parallelSection( innerStream );
p.run( [&]( auto s ) { ubb.inner( &block, s ); } );
p.run( [&]( auto s ) { noSlip.inner( &block, s ); } );
}
lbKernel.inner( &block, innerStream );
}
});
innerOuterSection.run([&]( auto outerStream )
{
gpuComm( outerStream );
for( auto &block: *blocks )
{
{
auto p = boundaryOuterStreams.parallelSection( outerStream );
p.run( [&]( auto s ) { ubb.outer( &block, s ); } );
p.run( [&]( auto s ) { noSlip.outer( &block, s ); } );
}
lbKernel.outer( &block, outerStream );
}
});
};
auto boundaryStreams = cuda::ParallelStreams( streamHighPriority );
auto normalTimeStep = [&]()
{
gpuComm();
for( auto &block: *blocks )
{
{
auto p = boundaryStreams.parallelSection( defaultStream );
p.run( [&]( auto s ) { ubb( &block, s ); } );
p.run( [&]( auto s ) { noSlip( &block, s ); } );
}
lbKernel( &block );
}
};
SweepTimeloop timeLoop( blocks->getBlockStorage(), timesteps );
std::function<void()> timeStep = overlapCommunication ? std::function<void()>( overlapTimeStep ) :
std::function<void()>( normalTimeStep );
timeLoop.add() << BeforeFunction( timeStep )
<< Sweep( []( IBlock * ) {}, "time step" );
// VTK
uint_t vtkWriteFrequency = parameters.getParameter<uint_t>( "vtkWriteFrequency", 0 );
if( vtkWriteFrequency > 0 )
{
auto vtkOutput = vtk::createVTKOutput_BlockData( *blocks, "vtk", vtkWriteFrequency, 0, false, "vtk_out",
"simulation_step", false, true, true, false, 0 );
vtkOutput->addCellDataWriter(
make_shared<lbm::VelocityVTKWriter<LatticeModel_T> >( pdfFieldCpuID, "Velocity" ));
vtkOutput->addCellDataWriter( make_shared<lbm::DensityVTKWriter<LatticeModel_T> >( pdfFieldCpuID, "Density" ));
vtkOutput->addBeforeFunction(
cuda::fieldCpyFunctor<PdfField_T, cuda::GPUField<real_t> >( blocks, pdfFieldCpuID, pdfFieldGpuID ));
timeLoop.addFuncAfterTimeStep( vtk::writeFiles( vtkOutput ), "VTK Output" );
}
auto remainingTimeLoggerFrequency = parameters.getParameter< double >( "remainingTimeLoggerFrequency", 3.0 ); // in seconds
timeLoop.addFuncAfterTimeStep( timing::RemainingTimeLogger( timeLoop.getNrOfTimeSteps(), remainingTimeLoggerFrequency ), "remaining time logger" );
lbm::PerformanceLogger<FlagField_T> performanceLogger(blocks, flagFieldID, fluidFlagUID, 500);
timeLoop.addFuncAfterTimeStep( performanceLogger, "remaining time logger" );
timeLoop.run();
}
return 0;
}
\ No newline at end of file
import sympy as sp
from lbmpy_walberla import generate_lattice_model_files
from lbmpy.creationfunctions import create_lb_update_rule
from pystencils_walberla.sweep import Sweep
from lbmpy.boundaries import NoSlip, UBB
from lbmpy.creationfunctions import create_lb_method
from lbmpy_walberla.boundary import create_boundary_class
from pystencils_walberla.cmake_integration import codegen
dtype = 'float64'
# LB options
options = {
'method': 'srt',
'stencil': 'D3Q19',
'relaxation_rate': sp.Symbol("omega"),
'field_name': 'pdfs',
'compressible': False,
'temporary_field_name': 'pdfs_tmp',
'optimization': {'cse_global': True,
'cse_pdfs': True,
'double_precision': dtype == 'float64'}
}
# GPU optimization options
inner_opt = {'gpu_indexing_params': {'block_size': (128, 1, 1)}, 'data_type': dtype}
outer_opt = {'gpu_indexing_params': {'block_size': (32, 32, 32)}, 'data_type': dtype}
def lb_assignments():
ur = create_lb_update_rule(**options)
return ur.all_assignments
def genBoundary():
boundary = UBB([0.05, 0, 0], dim=3, name="UniformGridGPU_UBB")
return create_boundary_class(boundary, create_lb_method(**options), target='gpu')
def genNoSlip():
boundary = NoSlip(name='UniformGridGPU_NoSlip')
return create_boundary_class(boundary, create_lb_method(**options), target='gpu')
generate_lattice_model_files(class_name='UniformGridGPU_LatticeModel', **options)
Sweep.generate_inner_outer_kernel('UniformGridGPU_LbKernel',
lambda: create_lb_update_rule(**options).all_assignments,
target='gpu',
temporary_fields=['pdfs_tmp'],
field_swaps=[('pdfs', 'pdfs_tmp')],
optimization=inner_opt,
outer_optimization=outer_opt)
Sweep.generate_pack_info('UniformGridGPU_PackInfo', lb_assignments, target='gpu')
codegen.register(['UniformGridGPU_UBB.h', 'UniformGridGPU_UBB.cu'], genBoundary)
codegen.register(['UniformGridGPU_NoSlip.h', 'UniformGridGPU_NoSlip.cu'], genNoSlip)
Parameters
{
omega 1.8;
timesteps 10;
remainingTimeLoggerFrequency 3;
vtkWriteFrequency 0;
overlapCommunication true;
cudaEnabledMPI false;
}
DomainSetup
{
blocks < 1, 1, 1 >;
cellsPerBlock < 300, 300, 150 >;
periodic < 0, 0, 1 >;
}
Boundaries
{
Border { direction W; walldistance -1; flag NoSlip; }
Border { direction E; walldistance -1; flag NoSlip; }
Border { direction S; walldistance -1; flag NoSlip; }
Border { direction N; walldistance -1; flag UBB; }
}
Parameters
{
omega 1.8;
timesteps 2;
remainingTimeLoggerFrequency 3;
vtkWriteFrequency 0;
overlapCommunication false;
cudaEnabledMPI false;
}
DomainSetup
{
blocks < 1, 1, 1 >;
cellsPerBlock < 50, 20, 10 >;
periodic < 0, 0, 1 >;
}
Boundaries
{
Border { direction W; walldistance -1; flag NoSlip; }
Border { direction E; walldistance -1; flag NoSlip; }
Border { direction S; walldistance -1; flag NoSlip; }
Border { direction N; walldistance -1; flag UBB; }
}
...@@ -6,4 +6,4 @@ ...@@ -6,4 +6,4 @@
waLBerla_add_module( DEPENDS blockforest core communication domain_decomposition python_coupling field stencil BUILD_ONLY_IF_FOUND CUDA ) waLBerla_add_module( DEPENDS blockforest core communication domain_decomposition python_coupling field stencil BUILD_ONLY_IF_FOUND CUDA )
################################################################################################### ###################################################################################################
\ No newline at end of file \ No newline at end of file
...@@ -18,67 +18,97 @@ ...@@ -18,67 +18,97 @@
//! \author Martin Bauer <martin.bauer@fau.de> //! \author Martin Bauer <martin.bauer@fau.de>
// //
//====================================================================================================================== //======================================================================================================================
#pragma once
#include "ErrorChecking.h" #include "ErrorChecking.h"
namespace walberla { namespace walberla {
namespace cuda { namespace cuda {
class StreamRAII class StreamRAII
{ {
public: public:
~StreamRAII() { ~StreamRAII()
if( stream_ != 0 ) { {
WALBERLA_CUDA_CHECK( cudaStreamDestroy( stream_ )); if( stream_ != 0 ) {
} WALBERLA_CUDA_CHECK( cudaStreamDestroy( stream_ ));
} }
}
StreamRAII( StreamRAII && other) {
stream_ = other.stream_; StreamRAII( StreamRAII &&other )
other.stream_ = 0; {
} stream_ = other.stream_;
other.stream_ = 0;
StreamRAII(const StreamRAII&) = delete; }
void operator=( const StreamRAII &) = delete;
operator cudaStream_t() const { return stream_; } StreamRAII( const StreamRAII & ) = delete;
void operator=( const StreamRAII & ) = delete;
static StreamRAII defaultStream() {
StreamRAII result; operator cudaStream_t() const { return stream_; }
result.stream_ = 0;
return result;
} static StreamRAII defaultStream()
{
static StreamRAII newPriorityStream(int priority) { StreamRAII result;
StreamRAII result; result.stream_ = 0;
WALBERLA_CUDA_CHECK( cudaStreamCreateWithPriority( &result.stream_, cudaStreamDefault, priority )); return result;
return result; }
}
static StreamRAII newPriorityStream( int priority )
static StreamRAII newStream() { {
StreamRAII result; StreamRAII result;
WALBERLA_CUDA_CHECK( cudaStreamCreate( &result.stream_)); WALBERLA_CUDA_CHECK( cudaStreamCreateWithPriority( &result.stream_, cudaStreamDefault, priority ));
return result; return result;
} }
private: static StreamRAII newStream()
StreamRAII() {} {
cudaStream_t stream_; StreamRAII result;
}; WALBERLA_CUDA_CHECK( cudaStreamCreate( &result.stream_ ));
return result;
}
struct EventRAII
{ private:
explicit EventRAII() { WALBERLA_CUDA_CHECK( cudaEventCreate(&event) ); } StreamRAII() {}
~EventRAII() { WALBERLA_CUDA_CHECK( cudaEventDestroy(event) ); }
EventRAII(const EventRAII &) = delete; cudaStream_t stream_;
void operator=( const EventRAII &) = delete; };
operator cudaEvent_t() const { return event; }
cudaEvent_t event; class EventRAII
}; {
public:
explicit EventRAII()
{
event = cudaEvent_t();
WALBERLA_CUDA_CHECK( cudaEventCreate( &event ));
}
~EventRAII()
{
if( event != cudaEvent_t() )
{
WALBERLA_CUDA_CHECK( cudaEventDestroy( event ));
}
}
EventRAII( const EventRAII & ) = delete;
void operator=( const EventRAII & ) = delete;
EventRAII( EventRAII &&other )
{
event = other.event;
other.event = cudaEvent_t();
}
operator cudaEvent_t() const { return event; }
private:
cudaEvent_t event;
};
} // namespace cuda } // namespace cuda
......
//======================================================================================================================
//
// This file is part of waLBerla. waLBerla is free software: you can
// redistribute it and/or modify it under the terms of the GNU General Public
// License as published by the Free Software Foundation, either version 3 of
// the License, or (at your option) any later version.
//
// waLBerla is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
// for more details.
//
// You should have received a copy of the GNU General Public License along
// with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
//
//! \file ParallelStreams.cpp
//! \ingroup cuda
//! \author Martin Bauer <martin.bauer@fau.de>
//
//======================================================================================================================
#include "cuda/ParallelStreams.h"
namespace walberla {
namespace cuda {
ParallelSection::ParallelSection(ParallelStreams * parent, cudaStream_t mainStream)
: parent_( parent ), mainStream_( mainStream ), counter_( 0 )
{
WALBERLA_CUDA_CHECK( cudaEventCreate(&startEvent_) );
WALBERLA_CUDA_CHECK( cudaEventRecord( startEvent_, mainStream_ ) );
}
ParallelSection::~ParallelSection()
{
synchronize();
WALBERLA_CUDA_CHECK( cudaEventDestroy(startEvent_) );
}
void ParallelSection::next()
{
if( counter_ > 0 ) {
WALBERLA_CUDA_CHECK( cudaEventRecord( parent_->events_[counter_ - 1], parent_->sideStreams_[counter_ - 1] ) );
}
else {
WALBERLA_CUDA_CHECK( cudaEventRecord( parent_->mainEvent_, mainStream_ ) );
}
++counter_;
parent_->ensureSize( counter_ );
WALBERLA_CUDA_CHECK( cudaStreamWaitEvent( stream(), startEvent_, 0 ));
}
void ParallelSection::run(const std::function<void( cudaStream_t)> & f)
{
f( stream() );
next();
}
void ParallelSection::synchronize()
{
for( uint_t i=0; i < counter_; ++i )
for( uint_t j=0; j < counter_; ++j )
{
if( i == j )
continue;
auto & event = i == 0 ? parent_->mainEvent_ : parent_->events_[i - 1];
cudaStream_t stream = j == 0 ? mainStream_ : parent_->sideStreams_[j - 1];
WALBERLA_CUDA_CHECK( cudaStreamWaitEvent( stream, event, 0 ));
}
WALBERLA_CUDA_CHECK( cudaEventRecord( startEvent_, mainStream_ ) );
}
cudaStream_t ParallelSection::stream()
{
return counter_ == 0 ? mainStream_ : parent_->sideStreams_[counter_ - 1];
}
ParallelStreams::ParallelStreams( int priority )
: streamPriority_( priority )
{
}
ParallelSection ParallelStreams::parallelSection( cudaStream_t stream ) {
return ParallelSection(this, stream);
}
void ParallelStreams::ensureSize( uint_t size ) {
for( uint_t i = sideStreams_.size(); i < size; ++i )
{
sideStreams_.emplace_back( StreamRAII::newPriorityStream(streamPriority_));
events_.emplace_back( EventRAII() );
}
}
void ParallelStreams::setStreamPriority( int priority )
{
streamPriority_ = priority;
sideStreams_.clear();
events_.clear();
}
} // namespace cuda
} // namespace walberla
\ No newline at end of file
//======================================================================================================================
//
// This file is part of waLBerla. waLBerla is free software: you can
// redistribute it and/or modify it under the terms of the GNU General Public
// License as published by the Free Software Foundation, either version 3 of
// the License, or (at your option) any later version.
//
// waLBerla is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
// for more details.
//
// You should have received a copy of the GNU General Public License along
// with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
//
//! \file ParallelStreams.h
//! \ingroup cuda
//! \author Martin Bauer <martin.bauer@fau.de>
//
//======================================================================================================================
#pragma once
#include "cuda/ErrorChecking.h"
#include "cuda/CudaRAII.h"
#include <vector>
namespace walberla {
namespace cuda {
class ParallelStreams;
class ParallelSection
{
public:
~ParallelSection();
void run( const std::function<void( cudaStream_t )> &f );
private:
friend class ParallelStreams;
ParallelSection( ParallelStreams *parent, cudaStream_t mainStream );
void synchronize();
cudaStream_t stream();
void next();
ParallelStreams * parent_;
cudaStream_t mainStream_;
cudaEvent_t startEvent_;
uint_t counter_;
};
//*******************************************************************************************************************
/*!
* Helper class to run CUDA operations on parallel streams
*
* This class introduces "side streams" that overlap with one "main stream". In a parallel section, multiple
* kernels (or other CUDA operations) are scheduled to the streams. The first "run" is scheduled on the main stream
* all subsequent operations on the side streams. The passed priority affects only the side streams. When
* the parallel section goes out of scope the side streams are synchronized to the main stream via CUDA events.
*
* Example:
*
* \code
* ParallelStreams streams;
* {
* // new scope for the parallel section
* ParallelSection sec = streams.parallelSection( mainCudaStream );
* sec.run([&] ( cudaStream_t sideStream ) {
* // run something on the side stream
* });
* // after the parallel section goes out of scope the side streams are synchronized to the main stream
* }
*
* \endcode
*
*/
//*******************************************************************************************************************
class ParallelStreams
{
public:
ParallelStreams( int priority = 0 );
ParallelSection parallelSection( cudaStream_t stream );
void setStreamPriority( int priority );
private:
friend class ParallelSection;
void ensureSize( uint_t size );
std::vector<StreamRAII> sideStreams_;
std::vector<EventRAII> events_;
EventRAII mainEvent_;
int streamPriority_;
};
} // namespace cuda
} // namespace walberla
\ No newline at end of file
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "cuda/CudaRAII.h" #include "cuda/CudaRAII.h"
#include "cuda/communication/GeneratedGPUPackInfo.h" #include "cuda/communication/GeneratedGPUPackInfo.h"
#include "cuda/communication/CustomMemoryBuffer.h" #include "cuda/communication/CustomMemoryBuffer.h"
#include "cuda/ParallelStreams.h"
#include <chrono> #include <chrono>
#include <thread> #include <thread>
...@@ -46,23 +47,21 @@ namespace communication { ...@@ -46,23 +47,21 @@ namespace communication {
{ {
public: public:
explicit UniformGPUScheme( weak_ptr_wrapper<StructuredBlockForest> bf, explicit UniformGPUScheme( weak_ptr_wrapper<StructuredBlockForest> bf,
const shared_ptr<cuda::EventRAII> & startWaitEvent,
bool sendDirectlyFromGPU = false, bool sendDirectlyFromGPU = false,
const int tag = 5432 ); const int tag = 5432 );
void addPackInfo( const shared_ptr<GeneratedGPUPackInfo> &pi ); void addPackInfo( const shared_ptr<GeneratedGPUPackInfo> &pi );
void startCommunication(); void startCommunication( cudaStream_t stream = 0);
void wait(); void wait( cudaStream_t stream = 0);
void operator()() { communicate(); } void operator()( cudaStream_t stream = 0 ) { communicate( stream ); }
inline void communicate() { startCommunication(); wait(); } inline void communicate( cudaStream_t stream = 0 ) { startCommunication(stream); wait(stream); }
private: private:
void setupCommunication(); void setupCommunication();
weak_ptr_wrapper<StructuredBlockForest> blockForest_; weak_ptr_wrapper<StructuredBlockForest> blockForest_;
shared_ptr<cuda::EventRAII> startWaitEvent_;
uint_t forestModificationStamp_; uint_t forestModificationStamp_;
bool setupBeforeNextCommunication_; bool setupBeforeNextCommunication_;
...@@ -76,7 +75,8 @@ namespace communication { ...@@ -76,7 +75,8 @@ namespace communication {
mpi::GenericBufferSystem<GpuBuffer_T, GpuBuffer_T> bufferSystemGPU_; mpi::GenericBufferSystem<GpuBuffer_T, GpuBuffer_T> bufferSystemGPU_;
std::vector<shared_ptr<GeneratedGPUPackInfo> > packInfos_; std::vector<shared_ptr<GeneratedGPUPackInfo> > packInfos_;
std::map<stencil::Direction, cuda::StreamRAII> streams_;
ParallelStreams parallelSectionManager_;
struct Header struct Header
{ {
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
// //
//====================================================================================================================== //======================================================================================================================
#include "cuda/ParallelStreams.h"
namespace walberla { namespace walberla {
namespace cuda { namespace cuda {
...@@ -27,26 +28,26 @@ namespace communication { ...@@ -27,26 +28,26 @@ namespace communication {
template<typename Stencil> template<typename Stencil>
UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockForest> bf, UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockForest> bf,
const shared_ptr <cuda::EventRAII> &startWaitEvent,
bool sendDirectlyFromGPU, bool sendDirectlyFromGPU,
const int tag ) const int tag )
: blockForest_( bf ), : blockForest_( bf ),
startWaitEvent_( startWaitEvent ),
setupBeforeNextCommunication_( true ), setupBeforeNextCommunication_( true ),
communicationInProgress_( false ), communicationInProgress_( false ),
sendFromGPU_( sendDirectlyFromGPU ), sendFromGPU_( sendDirectlyFromGPU ),
bufferSystemCPU_( mpi::MPIManager::instance()->comm(), tag ), bufferSystemCPU_( mpi::MPIManager::instance()->comm(), tag ),
bufferSystemGPU_( mpi::MPIManager::instance()->comm(), tag ) {} bufferSystemGPU_( mpi::MPIManager::instance()->comm(), tag ),
parallelSectionManager_( -1 )
{}
template<typename Stencil> template<typename Stencil>
void UniformGPUScheme<Stencil>::startCommunication() void UniformGPUScheme<Stencil>::startCommunication( cudaStream_t stream )
{ {
WALBERLA_ASSERT( !communicationInProgress_ ); WALBERLA_ASSERT( !communicationInProgress_ );
auto forest = blockForest_.lock(); auto forest = blockForest_.lock();
if( setupBeforeNextCommunication_ || auto currentBlockForestStamp = forest->getBlockForest().getModificationStamp();
forest->getBlockForest().getModificationStamp() != forestModificationStamp_ ) if( setupBeforeNextCommunication_ || currentBlockForestStamp != forestModificationStamp_ )
setupCommunication(); setupCommunication();
// Schedule Receives // Schedule Receives
...@@ -61,44 +62,40 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo ...@@ -61,44 +62,40 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo
bufferSystemGPU_.sendBuffer( it.first ).clear(); bufferSystemGPU_.sendBuffer( it.first ).clear();
// Start filling send buffers // Start filling send buffers
for( auto &iBlock : *forest )
{ {
auto block = dynamic_cast< Block * >( &iBlock ); auto parallelSection = parallelSectionManager_.parallelSection( stream );
for( auto dir = Stencil::beginNoCenter(); dir != Stencil::end(); ++dir ) for( auto &iBlock : *forest )
{ {
const auto neighborIdx = blockforest::getBlockNeighborhoodSectionIndex( *dir ); auto block = dynamic_cast< Block * >( &iBlock );
if( block->getNeighborhoodSectionSize( neighborIdx ) == uint_t( 0 )) for( auto dir = Stencil::beginNoCenter(); dir != Stencil::end(); ++dir )
continue;
auto nProcess = mpi::MPIRank( block->getNeighborProcess( neighborIdx, uint_t( 0 )));
if( streams_.find( *dir ) == streams_.end() )
{
streams_.emplace( *dir, StreamRAII::newPriorityStream( -1 ));
}
auto &ci = streams_.at( *dir );
for( auto &pi : packInfos_ )
{ {
auto size = pi->size( *dir, block ); const auto neighborIdx = blockforest::getBlockNeighborhoodSectionIndex( *dir );
auto gpuDataPtr = bufferSystemGPU_.sendBuffer( nProcess ).advanceNoResize( size ); if( block->getNeighborhoodSectionSize( neighborIdx ) == uint_t( 0 ))
WALBERLA_ASSERT_NOT_NULLPTR( gpuDataPtr ); continue;
WALBERLA_CUDA_CHECK( cudaStreamWaitEvent( ci, *startWaitEvent_, 0 )); auto nProcess = mpi::MPIRank( block->getNeighborProcess( neighborIdx, uint_t( 0 )));
pi->pack( *dir, gpuDataPtr, block, ci );
if( !sendFromGPU_ ) for( auto &pi : packInfos_ )
{ {
auto cpuDataPtr = bufferSystemCPU_.sendBuffer( nProcess ).advanceNoResize( size ); parallelSection.run([&](auto s) {
WALBERLA_ASSERT_NOT_NULLPTR( cpuDataPtr ); auto size = pi->size( *dir, block );
WALBERLA_CUDA_CHECK( auto gpuDataPtr = bufferSystemGPU_.sendBuffer( nProcess ).advanceNoResize( size );
cudaMemcpyAsync( cpuDataPtr, gpuDataPtr, size, cudaMemcpyDeviceToHost, ci )); WALBERLA_ASSERT_NOT_NULLPTR( gpuDataPtr );
pi->pack( *dir, gpuDataPtr, block, s );
if( !sendFromGPU_ )
{
auto cpuDataPtr = bufferSystemCPU_.sendBuffer( nProcess ).advanceNoResize( size );
WALBERLA_ASSERT_NOT_NULLPTR( cpuDataPtr );
WALBERLA_CUDA_CHECK( cudaMemcpyAsync( cpuDataPtr, gpuDataPtr, size, cudaMemcpyDeviceToHost, s ));
}
});
} }
} }
} }
} }
// Busy waiting for packing to finish - then send // wait for packing to finish
for( auto &ci : streams_ ) WALBERLA_CUDA_CHECK( cudaStreamSynchronize( ci.second )); cudaStreamSynchronize( stream );
if( sendFromGPU_ ) if( sendFromGPU_ )
bufferSystemGPU_.sendAll(); bufferSystemGPU_.sendAll();
...@@ -110,7 +107,7 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo ...@@ -110,7 +107,7 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo
template<typename Stencil> template<typename Stencil>
void UniformGPUScheme<Stencil>::wait() void UniformGPUScheme<Stencil>::wait( cudaStream_t stream )
{ {
WALBERLA_ASSERT( communicationInProgress_ ); WALBERLA_ASSERT( communicationInProgress_ );
...@@ -118,11 +115,11 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo ...@@ -118,11 +115,11 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo
if( sendFromGPU_ ) if( sendFromGPU_ )
{ {
auto parallelSection = parallelSectionManager_.parallelSection( stream );
for( auto recvInfo = bufferSystemGPU_.begin(); recvInfo != bufferSystemGPU_.end(); ++recvInfo ) for( auto recvInfo = bufferSystemGPU_.begin(); recvInfo != bufferSystemGPU_.end(); ++recvInfo )
{ {
for( auto &header : headers_[recvInfo.rank()] ) for( auto &header : headers_[recvInfo.rank()] )
{ {
auto &ci = streams_.at( header.dir );
auto block = dynamic_cast< Block * >( forest->getBlock( header.blockId )); auto block = dynamic_cast< Block * >( forest->getBlock( header.blockId ));
for( auto &pi : packInfos_ ) for( auto &pi : packInfos_ )
...@@ -130,13 +127,16 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo ...@@ -130,13 +127,16 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo
auto size = pi->size( header.dir, block ); auto size = pi->size( header.dir, block );
auto gpuDataPtr = recvInfo.buffer().advanceNoResize( size ); auto gpuDataPtr = recvInfo.buffer().advanceNoResize( size );
WALBERLA_ASSERT_NOT_NULLPTR( gpuDataPtr ); WALBERLA_ASSERT_NOT_NULLPTR( gpuDataPtr );
pi->unpack( stencil::inverseDir[header.dir], gpuDataPtr, block, ci ); parallelSection.run([&](auto s) {
pi->unpack( stencil::inverseDir[header.dir], gpuDataPtr, block, s );
});
} }
} }
} }
} }
else else
{ {
auto parallelSection = parallelSectionManager_.parallelSection( stream );
for( auto recvInfo = bufferSystemCPU_.begin(); recvInfo != bufferSystemCPU_.end(); ++recvInfo ) for( auto recvInfo = bufferSystemCPU_.begin(); recvInfo != bufferSystemCPU_.end(); ++recvInfo )
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
...@@ -145,7 +145,6 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo ...@@ -145,7 +145,6 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo
gpuBuffer.clear(); gpuBuffer.clear();
for( auto &header : headers_[recvInfo.rank()] ) { for( auto &header : headers_[recvInfo.rank()] ) {
auto &ci = streams_.at( header.dir );
auto block = dynamic_cast< Block * >( forest->getBlock( header.blockId )); auto block = dynamic_cast< Block * >( forest->getBlock( header.blockId ));
for( auto &pi : packInfos_ ) for( auto &pi : packInfos_ )
...@@ -156,17 +155,16 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo ...@@ -156,17 +155,16 @@ UniformGPUScheme<Stencil>::UniformGPUScheme( weak_ptr_wrapper <StructuredBlockFo
WALBERLA_ASSERT_NOT_NULLPTR( cpuDataPtr ); WALBERLA_ASSERT_NOT_NULLPTR( cpuDataPtr );
WALBERLA_ASSERT_NOT_NULLPTR( gpuDataPtr ); WALBERLA_ASSERT_NOT_NULLPTR( gpuDataPtr );
WALBERLA_CUDA_CHECK( cudaMemcpyAsync( gpuDataPtr, cpuDataPtr, size, parallelSection.run([&](auto s) {
cudaMemcpyHostToDevice, ci )); WALBERLA_CUDA_CHECK( cudaMemcpyAsync( gpuDataPtr, cpuDataPtr, size,
pi->unpack( stencil::inverseDir[header.dir], gpuDataPtr, block, ci ); cudaMemcpyHostToDevice, s ));
pi->unpack( stencil::inverseDir[header.dir], gpuDataPtr, block, s );
});
} }
} }
} }
} }
for( auto &ci : streams_ )
WALBERLA_CUDA_CHECK( cudaStreamSynchronize( ci.second ));
communicationInProgress_ = false; communicationInProgress_ = false;
} }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment