Commit 330c43f6 authored by João Victor Tozatti Risso's avatar João Victor Tozatti Risso Committed by Martin Bauer

GPU communication

- parameter files for UniformGridGPU benchmark
- memcpy-based GPU PackInfo
parent 4b0c66e9
waLBerla_link_files_to_builddir( "*.prm" )
waLBerla_link_files_to_builddir( "simulation_setup" )
waLBerla_python_file_generates(UniformGridGPU.py
UniformGridGPU_LatticeModel.cpp UniformGridGPU_LatticeModel.h
......
#include "core/Environment.h"
#include "python_coupling/CreateConfig.h"
#include "python_coupling/PythonCallback.h"
#include "python_coupling/DictWrapper.h"
#include "blockforest/Initialization.h"
#include "lbm/field/PdfField.h"
#include "lbm/field/AddToStorage.h"
......@@ -28,6 +30,7 @@
#include "UniformGridGPU_PackInfo.h"
#include "UniformGridGPU_UBB.h"
#include "UniformGridGPU_NoSlip.h"
#include "UniformGridGPU_Communication.h"
using namespace walberla;
......@@ -80,11 +83,10 @@ int main( int argc, char **argv )
noSlip.fillFromFlagField<FlagField_T>( blocks, flagFieldID, FlagUID("NoSlip"), fluidFlagUID );
//pressure.fillFromFlagField<FlagField_T>( blocks, flagFieldID, FlagUID("pressure"), fluidFlagUID );
// Communication setup
bool overlapCommunication = parameters.getParameter<bool>( "overlapCommunication", true );
bool cudaEnabledMPI = parameters.getParameter<bool>( "cudaEnabledMPI", false );
int communicationScheme = parameters.getParameter<int>( "communicationScheme", (int) CommunicationSchemeType::UniformGPUScheme_Baseline );
int streamHighPriority = 0;
int streamLowPriority = 0;
......@@ -92,9 +94,10 @@ int main( int argc, char **argv )
pystencils::UniformGridGPU_LbKernel lbKernel( pdfFieldGpuID, omega );
lbKernel.setOuterPriority( streamHighPriority );
CommScheme_T gpuComm( blocks, cudaEnabledMPI );
gpuComm.addPackInfo( make_shared<pystencils::UniformGridGPU_PackInfo>( pdfFieldGpuID ));
//CommScheme_T gpuComm( blocks, cudaEnabledMPI );
//gpuComm.addPackInfo( make_shared<pystencils::UniformGridGPU_PackInfo>( pdfFieldGpuID ));
UniformGridGPU_Communication< CommunicationStencil_T, cuda::GPUField< double > >
gpuComm( blocks, pdfFieldGpuID, (CommunicationSchemeType) communicationScheme, cudaEnabledMPI );
auto defaultStream = cuda::StreamRAII::newPriorityStream( streamLowPriority );
auto innerOuterStreams = cuda::ParallelStreams( streamHighPriority );
......@@ -121,6 +124,7 @@ int main( int argc, char **argv )
innerOuterSection.run([&]( auto outerStream )
{
gpuComm( outerStream );
for( auto &block: *blocks )
{
{
......@@ -172,11 +176,42 @@ int main( int argc, char **argv )
auto remainingTimeLoggerFrequency = parameters.getParameter< double >( "remainingTimeLoggerFrequency", 3.0 ); // in seconds
timeLoop.addFuncAfterTimeStep( timing::RemainingTimeLogger( timeLoop.getNrOfTimeSteps(), remainingTimeLoggerFrequency ), "remaining time logger" );
/*
lbm::PerformanceLogger<FlagField_T> performanceLogger(blocks, flagFieldID, fluidFlagUID, 500);
timeLoop.addFuncAfterTimeStep( performanceLogger, "remaining time logger" );
timeLoop.run();
*/
auto performanceReportFrequency = parameters.getParameter< uint_t >( "performanceReportFrequency", 500 ); // in timesteps
lbm::PerformanceLogger<FlagField_T> performanceLogger(blocks, flagFieldID, fluidFlagUID, performanceReportFrequency);
timeLoop.addFuncAfterTimeStep([&performanceLogger] { performanceLogger(); }, "performance logger" );
timeLoop.run();
std::map< std::string, int > integerProperties;
std::map< std::string, double > realProperties;
std::map< std::string, std::string > stringProperties;
performanceLogger.logOverallResultsOnRoot();
performanceLogger.getBestResultsForSQLOnRoot(integerProperties, realProperties, stringProperties);
WALBERLA_ROOT_SECTION()
{
python_coupling::PythonCallback pythonCallbackResults ( "results_callback" );
if ( pythonCallbackResults.isCallable() )
{
pythonCallbackResults.data().exposeValue( "mlups_total", realProperties["MLUPS"] );
pythonCallbackResults.data().exposeValue( "mlups_process", realProperties["MLUPS_process"] );
pythonCallbackResults.data().exposeValue( "mflups_total", realProperties["MFLUPS"] );
pythonCallbackResults.data().exposeValue( "mflups_process", realProperties["MFLUPS_process"] );
// Call Python function to report results
pythonCallbackResults();
}
}
}
return 0;
}
\ No newline at end of file
}
#pragma once
#include "core/debug/Debug.h"
#include "blockforest/Block.h"
#include "blockforest/StructuredBlockForest.h"
#include "blockforest/communication/UniformBufferedScheme.h"
#include "cuda/communication/GPUPackInfo.h"
#include "cuda/communication/UniformGPUScheme.h"
#include "cuda/communication/MemcpyPackInfo.h"
#include "UniformGridGPU_PackInfo.h"
using namespace walberla;
enum CommunicationSchemeType {
GPUPackInfo_Baseline = 0,
GPUPackInfo_Streams = 1,
UniformGPUScheme_Baseline = 2,
UniformGPUScheme_Memcpy = 3
};
template<typename StencilType, typename GPUFieldType >
class UniformGridGPU_Communication
{
public:
explicit UniformGridGPU_Communication(weak_ptr_wrapper<StructuredBlockForest> bf, const BlockDataID & bdId,
CommunicationSchemeType commSchemeType, bool cudaEnabledMPI = false)
: _commSchemeType(commSchemeType), _cpuCommunicationScheme(nullptr), _gpuPackInfo(nullptr),
_gpuCommunicationScheme(nullptr), _generatedPackInfo(nullptr)
{
switch(_commSchemeType)
{
case GPUPackInfo_Baseline:
_gpuPackInfo = make_shared< cuda::communication::GPUPackInfo< GPUFieldType > >( bdId );
_cpuCommunicationScheme = make_shared< blockforest::communication::UniformBufferedScheme< StencilType > >( bf );
_cpuCommunicationScheme->addPackInfo( _gpuPackInfo );
break;
case GPUPackInfo_Streams:
_gpuPackInfo = make_shared< cuda::communication::GPUPackInfo< GPUFieldType > >( bdId );
_cpuCommunicationScheme = make_shared< blockforest::communication::UniformBufferedScheme< StencilType > >( bf );
_cpuCommunicationScheme->addPackInfo( _gpuPackInfo );
break;
case UniformGPUScheme_Baseline:
_gpuCommunicationScheme = make_shared< cuda::communication::UniformGPUScheme< StencilType > >( bf, cudaEnabledMPI );
_generatedPackInfo = make_shared<pystencils::UniformGridGPU_PackInfo>( bdId );
_gpuCommunicationScheme->addPackInfo( _generatedPackInfo );
break;
case UniformGPUScheme_Memcpy:
_gpuCommunicationScheme = make_shared< cuda::communication::UniformGPUScheme< StencilType > >( bf, cudaEnabledMPI );
_memcpyPackInfo = make_shared< cuda::communication::MemcpyPackInfo< GPUFieldType > >( bdId );
_gpuCommunicationScheme->addPackInfo( _memcpyPackInfo );
break;
default:
WALBERLA_ABORT("Invalid GPU communication scheme specified!");
}
}
UniformGridGPU_Communication(UniformGridGPU_Communication &) = delete;
void operator()( cudaStream_t communicationStream = 0 )
{
startCommunication( communicationStream );
wait( communicationStream );
}
void startCommunication( cudaStream_t communicationStream )
{
switch( _commSchemeType )
{
case GPUPackInfo_Streams:
// Set communication stream to enable asynchronous operations
// in GPUPackInfo.
WALBERLA_ASSERT_NOT_NULLPTR( _gpuPackInfo );
_gpuPackInfo->setCommunicationStream( communicationStream );
// Start communication using UniformBufferedScheme
WALBERLA_ASSERT_NOT_NULLPTR( _cpuCommunicationScheme );
_cpuCommunicationScheme->startCommunication();
break;
case GPUPackInfo_Baseline:
// Start communication using UniformBufferedScheme
WALBERLA_ASSERT_NOT_NULLPTR( _cpuCommunicationScheme );
_cpuCommunicationScheme->startCommunication();
break;
case UniformGPUScheme_Baseline:
WALBERLA_ASSERT_NOT_NULLPTR( _gpuCommunicationScheme );
_gpuCommunicationScheme->startCommunication( communicationStream );
break;
case UniformGPUScheme_Memcpy:
WALBERLA_ASSERT_NOT_NULLPTR( _gpuCommunicationScheme );
_gpuCommunicationScheme->startCommunication( communicationStream );
break;
}
}
void wait( cudaStream_t communicationStream )
{
switch( _commSchemeType )
{
case GPUPackInfo_Baseline:
WALBERLA_ASSERT_NOT_NULLPTR( _cpuCommunicationScheme );
_cpuCommunicationScheme->wait();
break;
case GPUPackInfo_Streams:
WALBERLA_ASSERT_NOT_NULLPTR( _cpuCommunicationScheme );
_gpuPackInfo->setCommunicationStream( communicationStream );
_cpuCommunicationScheme->wait();
break;
case UniformGPUScheme_Baseline:
WALBERLA_ASSERT_NOT_NULLPTR( _gpuCommunicationScheme );
_gpuCommunicationScheme->wait( communicationStream );
break;
case UniformGPUScheme_Memcpy:
WALBERLA_ASSERT_NOT_NULLPTR( _gpuCommunicationScheme );
_gpuCommunicationScheme->wait( communicationStream );
break;
}
}
private:
CommunicationSchemeType _commSchemeType;
shared_ptr< blockforest::communication::UniformBufferedScheme< StencilType > > _cpuCommunicationScheme;
shared_ptr< cuda::communication::GPUPackInfo< GPUFieldType > > _gpuPackInfo;
shared_ptr< cuda::communication::UniformGPUScheme< StencilType > > _gpuCommunicationScheme;
shared_ptr< pystencils::UniformGridGPU_PackInfo > _generatedPackInfo;
shared_ptr< cuda::communication::MemcpyPackInfo< GPUFieldType > > _memcpyPackInfo;
};
# encoding: utf-8
import math
import waLBerla as wlb
# Constants that define the size of blocks that are used in the benchmarks
MIN_CELLS_PER_BLOCK = 16
MAX_CELLS_PER_BLOCK = 256
INC_CELLS_PER_BLOCK = 16
# Amount of cells per block
cells_per_block_interval = range(MIN_CELLS_PER_BLOCK, MAX_CELLS_PER_BLOCK + 1, INC_CELLS_PER_BLOCK)
# Blocks with size in [16, 32, 64, 128, 256]
cells_per_block = [num_cells for num_cells in cells_per_block_interval]
# Number of active MPI processes
num_processes = wlb.mpi.numProcesses()
# Whether to overlap computation with communication
overlap_communication = [False, True]
# Whether MPI supports buffers in GPU memory
cuda_enabled_mpi = [False, True]
# Supported communication schemes
communication_schemes = ['GPUPackInfo_Streams', 'UniformGPUScheme_Baseline', 'UniformGPUScheme_Memcpy']
def get_block_decomposition(block_decomposition, num_processes):
bx = by = bz = 1
blocks_per_axis = int(math.log(num_processes, 2))
for i in range(blocks_per_axis):
decomposition_axis = block_decomposition[i % len(block_decomposition)]
if decomposition_axis == 'y':
by *= 2
elif decomposition_axis == 'z':
bz *= 2
elif decomposition_axis == 'x':
bx *= 2
assert (bx * by * bz) == num_processes
return (bx, by, bz)
# encoding: utf-8
import os
import pandas as pd
import waLBerla as wlb
import copy
from datetime import datetime
CommunicationSchemeType = {
'GPUPackInfo_Baseline': 0,
'GPUPackInfo_Streams': 1,
'UniformGPUScheme_Baseline': 2,
'UniformGPUScheme_Memcpy': 3,
}
CommunicationSchemeName = {
0: 'GPUPackInfo_Baseline',
1: 'GPUPackInfo_Streams',
2: 'UniformGPUScheme_Baseline',
3: 'UniformGPUScheme_Memcpy',
}
# Base configuration for the benchmark
BASE_CONFIG = {
'DomainSetup' : {
'cellsPerBlock': (64, 64, 64),
'blocks': (1, 1, 1),
'nrOfProcesses': (1, 1, 1),
'periodic': (0, 0, 1),
'dx': 1.0
},
'Parameters': {
'omega': 1.8,
'timesteps': 1001,
'remainingTimeLoggerFrequency': 250,
'vtkWriteFrequency': 0,
'overlapCommunication': False,
'cudaEnabledMPI': False,
'initialVelocity': (0, 0, 0),
'performanceReportFrequency': 250,
'communicationScheme': CommunicationSchemeType['UniformGPUScheme_Baseline'],
},
'Boundaries': {
'Border': [
{'direction': 'W', 'walldistance': -1, 'flag': 'NoSlip'},
{'direction': 'E', 'walldistance': -1, 'flag': 'NoSlip'},
{'direction': 'S', 'walldistance': -1, 'flag': 'NoSlip'},
{'direction': 'N', 'walldistance': -1, 'flag': 'UBB'},
]
}
}
class BenchmarkScenario:
def __init__(self, testcase, decomposition_axes=None):
self.testcase = testcase
self.scenario_config = copy.deepcopy(BASE_CONFIG)
self.decomposition_axes = decomposition_axes
now = datetime.now().replace(second=0, microsecond=0)
self.output_filename = f'{self.testcase}_{now.strftime("%Y-%m-%d_%H-%M")}.csv'
@wlb.member_callback
def config(self, **kwargs):
return self.scenario_config
@wlb.member_callback
def results_callback(self, **kwargs):
block_setup = self.scenario_config.get('DomainSetup')
params = self.scenario_config.get('Parameters')
data = [{
'processesX': block_setup.get('nrOfProcesses')[0],
'processesY': block_setup.get('nrOfProcesses')[1],
'processesZ': block_setup.get('nrOfProcesses')[2],
'blocksX': block_setup.get('blocks')[0],
'blocksY': block_setup.get('blocks')[1],
'blocksZ': block_setup.get('blocks')[2],
'cellsPerBlockX': block_setup.get('cellsPerBlock')[0],
'cellsPerBlockY': block_setup.get('cellsPerBlock')[1],
'cellsPerBlockZ': block_setup.get('cellsPerBlock')[2],
'cudaEnabledMPI': params.get('cudaEnabledMPI'),
'overlapCommunication': params.get('overlapCommunication'),
'domainDecomposition': self.decomposition_axes,
'communicationScheme': CommunicationSchemeName[params.get('communicationScheme')],
'mlupsTotal': kwargs.get('mlups_total'),
'mlupsProcess': kwargs.get('mlups_process'),
'mflupsTotal': kwargs.get('mflups_total'),
'mflupsProcess': kwargs.get('mflups_process'),
}]
self.save_data(data)
def save_data(self, data):
df = pd.DataFrame(data)
if not os.path.isfile(self.output_filename):
df.to_csv(self.output_filename, index=False)
else:
df.to_csv(self.output_filename, index=False, mode='a', header=False)
# encoding: utf-8
import itertools
import waLBerla as wlb
from base import get_block_decomposition, communication_schemes, overlap_communication, \
cuda_enabled_mpi, cells_per_block, num_processes
from benchmark import BenchmarkScenario, CommunicationSchemeType
# Stores the scenarios for the current simulation
scenarios = wlb.ScenarioManager()
# Generates all block decompositions of xyz, 2 directions at a time
#block_decompositions = itertools.combinations_with_replacement('xyz', r=2)
block_decompositions = ['xy', 'yz', 'xz']
scenario_generator = itertools.product(communication_schemes, overlap_communication, cuda_enabled_mpi, block_decompositions, cells_per_block)
testcase_name = "inter-node"
for scenario_params in scenario_generator:
# Extract parameters from tuple
comm_scheme, is_communication_overlapped, is_cuda_enabled_mpi, decomposition_axes, num_cells_per_block = scenario_params
if comm_scheme != 'UniformGPUScheme_Baseline' and is_cuda_enabled_mpi is True:
# Skip CUDA enabled MPI tests for GPUPackInfo tests
continue
elif comm_scheme == 'GPUPackInfo_Baseline' and is_communication_overlapped is True:
# Skip communication overlap tests for GPUPackInfo baseline
continue
# Convert the axes decompositions to string
decomposition_axes_str = ''.join(decomposition_axes)
# Compute block decomposition based on the specified axes and the number of processes
blocks = get_block_decomposition(decomposition_axes, num_processes)
# Create a benchmark scenario
scenario = BenchmarkScenario(testcase=testcase_name, decomposition_axes=decomposition_axes_str)
# Domain Setup parameters
domain_setup = scenario.scenario_config['DomainSetup']
domain_setup['cellsPerBlock'] = 3 * (num_cells_per_block,)
domain_setup['nrOfProcesses'] = blocks
domain_setup['blocks'] = blocks
# Additional parameters for benchmarking
params = scenario.scenario_config['Parameters']
params['cudaEnabledMPI'] = is_cuda_enabled_mpi
params['overlapCommunication'] = is_communication_overlapped
params['communicationScheme'] = CommunicationSchemeType[comm_scheme]
# Add scenario for execution
scenarios.add(scenario)
# encoding: utf-8
import itertools
import waLBerla as wlb
from base import get_block_decomposition, communication_schemes, overlap_communication, \
cuda_enabled_mpi, cells_per_block, num_processes
from benchmark import BenchmarkScenario, CommunicationSchemeType
# Stores the scenarios for the current simulation
scenarios = wlb.ScenarioManager()
# Generates all block decompositions of xyz, 2 directions at a time
#block_decompositions = itertools.combinations_with_replacement('xyz', r=2)
block_decompositions = ['xy', 'yz', 'zx']
scenario_generator = itertools.product(communication_schemes, overlap_communication, cuda_enabled_mpi, block_decompositions, cells_per_block)
testcase_name = "intra-node"
for scenario_params in scenario_generator:
# Extract parameters from tuple
comm_scheme, is_communication_overlapped, is_cuda_enabled_mpi, decomposition_axes, num_cells_per_block = scenario_params
if comm_scheme != 'UniformGPUScheme_Baseline' and is_cuda_enabled_mpi is True:
# Skip CUDA enabled MPI tests for GPUPackInfo tests
continue
elif comm_scheme == 'GPUPackInfo_Baseline' and is_communication_overlapped is True:
# Skip communication overlap tests for GPUPackInfo baseline
continue
# Convert the axes decompositions to string
decomposition_axes_str = ''.join(decomposition_axes)
# Compute block decomposition based on the specified axes and the number of processes
blocks = get_block_decomposition(decomposition_axes, num_processes)
# Create a benchmark scenario
scenario = BenchmarkScenario(testcase=testcase_name, decomposition_axes=decomposition_axes_str)
# Domain Setup parameters
domain_setup = scenario.scenario_config['DomainSetup']
domain_setup['cellsPerBlock'] = 3 * (num_cells_per_block,)
domain_setup['nrOfProcesses'] = blocks
domain_setup['blocks'] = blocks
# Additional parameters for benchmarking
params = scenario.scenario_config['Parameters']
params['cudaEnabledMPI'] = is_cuda_enabled_mpi
params['overlapCommunication'] = is_communication_overlapped
params['communicationScheme'] = CommunicationSchemeType[comm_scheme]
# Add scenario for execution
scenarios.add(scenario)
# encoding: utf-8
import itertools
import waLBerla as wlb
from base import cells_per_block, num_processes
from benchmark import BenchmarkScenario
scenarios = wlb.ScenarioManager()
testcase_name = "single-node"
assert num_processes == 1
for num_cells_per_block in cells_per_block:
# Create a benchmark scenario
scenario = BenchmarkScenario(testcase=testcase_name)
scenario.scenario_config['DomainSetup']['cellsPerBlock'] = 3 * (num_cells_per_block,)
# Add scenario for execution
scenarios.add(scenario)
# encoding: utf-8
import itertools
import waLBerla as wlb
from base import get_block_decomposition, communication_schemes, overlap_communication, \
cuda_enabled_mpi, num_processes
from benchmark import BenchmarkScenario, CommunicationSchemeType
# Stores the scenarios for the current simulation
scenarios = wlb.ScenarioManager()
# Generates all block decompositions of xyz, 2 directions at a time
#block_decompositions = itertools.combinations_with_replacement('xyz', r=2)
block_decompositions = ['xyz', 'yzx', 'yxz', 'zyx']
cells_per_block = [256,]
if num_processes == 1:
scenario_generator = itertools.product(communication_schemes, [False,], [False,],
block_decompositions, cells_per_block)
else:
scenario_generator = itertools.product(communication_schemes, overlap_communication,
cuda_enabled_mpi, block_decompositions, cells_per_block)
testcase_name = "strong-scaling"
for scenario_params in scenario_generator:
# Extract parameters from tuple
comm_scheme, is_communication_overlapped, is_cuda_enabled_mpi, decomposition_axes, num_cells_per_block = scenario_params
if comm_scheme != 'UniformGPUScheme_Baseline' and is_cuda_enabled_mpi is True:
# Skip CUDA enabled MPI tests for GPUPackInfo tests
continue
elif comm_scheme == 'GPUPackInfo_Baseline' and is_communication_overlapped is True:
# Skip communication overlap tests for GPUPackInfo baseline
continue
# Convert the axes decompositions to string
decomposition_axes_str = ''.join(decomposition_axes)
# Compute block decomposition based on the specified axes and the number of processes
blocks = get_block_decomposition(decomposition_axes, num_processes)
# Create a benchmark scenario
scenario = BenchmarkScenario(testcase=testcase_name, decomposition_axes=decomposition_axes_str)
# Domain Setup parameters
domain_setup = scenario.scenario_config['DomainSetup']
domain_setup['cellsPerBlock'] = tuple(num_cells_per_block // block for block in blocks)
domain_setup['nrOfProcesses'] = blocks
domain_setup['blocks'] = blocks
# Additional parameters for benchmarking
params = scenario.scenario_config['Parameters']
params['cudaEnabledMPI'] = is_cuda_enabled_mpi
params['overlapCommunication'] = is_communication_overlapped
params['communicationScheme'] = CommunicationSchemeType[comm_scheme]
# Add scenario for execution
scenarios.add(scenario)
# encoding: utf-8
import itertools
import waLBerla as wlb
from base import get_block_decomposition, communication_schemes, overlap_communication, \
cuda_enabled_mpi, num_processes
from benchmark import BenchmarkScenario, CommunicationSchemeType
# Stores the scenarios for the current simulation
scenarios = wlb.ScenarioManager()
# Generates all block decompositions of xyz, 2 directions at a time
#block_decompositions = itertools.combinations_with_replacement('xyz', r=2)
block_decompositions = ['xyz', 'yzx', 'zyx', 'yxz']
cells_per_block = [64, 128, 240, 256]
if num_processes == 1:
scenario_generator = itertools.product(communication_schemes, [False,], [False,],
block_decompositions, cells_per_block)
else:
scenario_generator = itertools.product(communication_schemes, overlap_communication,
cuda_enabled_mpi, block_decompositions, cells_per_block)
testcase_name = "weak-scaling"
for scenario_params in scenario_generator:
# Extract parameters from tuple
comm_scheme, is_communication_overlapped, is_cuda_enabled_mpi, decomposition_axes, num_cells_per_block = scenario_params
if comm_scheme != 'UniformGPUScheme_Baseline' and is_cuda_enabled_mpi is True:
# Skip CUDA enabled MPI tests for GPUPackInfo tests
continue
elif comm_scheme == 'GPUPackInfo_Baseline' and is_communication_overlapped is True:
# Skip communication overlap tests for GPUPackInfo without streams
continue
# Convert the axes decompositions to string
decomposition_axes_str = ''.join(decomposition_axes)
# Compute block decomposition based on the specified axes and the number of processes
blocks = get_block_decomposition(decomposition_axes, num_processes)
# Create a benchmark scenario
scenario = BenchmarkScenario(testcase=testcase_name, decomposition_axes=decomposition_axes_str)
# Domain Setup parameters
domain_setup = scenario.scenario_config['DomainSetup']
domain_setup['cellsPerBlock'] = 3 * (num_cells_per_block,)
domain_setup['nrOfProcesses'] = blocks
domain_setup['blocks'] = blocks
# Additional parameters for benchmarking
params = scenario.scenario_config['Parameters']
params['cudaEnabledMPI'] = is_cuda_enabled_mpi
params['overlapCommunication'] = is_communication_overlapped
params['communicationScheme'] = CommunicationSchemeType[comm_scheme]
# Add scenario for execution
scenarios.add(scenario)
......@@ -57,42 +57,16 @@ class GPUPackInfo : public walberla::communication::UniformPackInfo
public:
typedef typename GPUField_T::value_type FieldType;
GPUPackInfo( const BlockDataID & bdId, cudaStream_t stream = 0 )
: bdId_( bdId ), communicateAllGhostLayers_( true ), numberOfGhostLayers_( 0 )
GPUPackInfo( const BlockDataID & bdId )
: bdId_( bdId ), communicateAllGhostLayers_( true ), numberOfGhostLayers_( 0 ),
copyAsync_( false ), communicationStream_( 0 )
{
streams_.push_back( stream );
copyAsync_ = (stream != 0);
}
GPUPackInfo( const BlockDataID & bdId, const uint_t numberOfGHostLayers, cudaStream_t stream = 0 )
: bdId_( bdId ), communicateAllGhostLayers_( false ), numberOfGhostLayers_( numberOfGHostLayers )
GPUPackInfo( const BlockDataID & bdId, const uint_t numberOfGHostLayers )
: bdId_( bdId ), communicateAllGhostLayers_( false ), numberOfGhostLayers_( numberOfGHostLayers ),
copyAsync_( false ), communicationStream_( 0 )
{
streams_.push_back( stream );
copyAsync_ = (stream != 0);
}
GPUPackInfo( const BlockDataID & bdId, std::vector< cudaStream_t > & streams )
: bdId_( bdId ), communicateAllGhostLayers_( true ), numberOfGhostLayers_( 0 ), streams_( streams )
{
copyAsync_ = true;
for( auto streamIt = streams_.begin(); streamIt != streams_.end(); ++streamIt )
if ( *streamIt == 0 )
{
copyAsync_ = false;
break;
}
}
GPUPackInfo( const BlockDataID & bdId, const uint_t numberOfGHostLayers, std::vector< cudaStream_t > & streams )
: bdId_( bdId ), communicateAllGhostLayers_( false ), numberOfGhostLayers_( numberOfGHostLayers ), streams_( streams )
{
copyAsync_ = true;
for( auto streamIt = streams_.begin(); streamIt != streams_.end(); ++streamIt )
if ( *streamIt == 0 )
{
copyAsync_ = false;
break;
}
}
virtual ~GPUPackInfo() {}
......@@ -104,18 +78,25 @@ public:
void communicateLocal(const IBlock * sender, IBlock * receiver, stencil::Direction dir);
void setCommunicationStream( cudaStream_t stream )
{
if ( stream != 0 )
{
copyAsync_ = true;
communicationStream_ = stream;
}
}
protected:
void packDataImpl(const IBlock * sender, stencil::Direction dir, mpi::SendBuffer & outBuffer) const;
uint_t numberOfGhostLayersToCommunicate( const GPUField_T * const field ) const;
inline cudaStream_t & getStream(stencil::Direction & dir) { return streams_[dir % streams_.size()]; }
const BlockDataID bdId_;
bool communicateAllGhostLayers_;
uint_t numberOfGhostLayers_;
std::vector< cudaStream_t > streams_;
bool copyAsync_;
cudaStream_t communicationStream_;
std::map< stencil::Direction, PinnedMemoryBuffer > pinnedRecvBuffers_;
mutable std::map< stencil::Direction, PinnedMemoryBuffer > pinnedSendBuffers_;
};
......@@ -145,7 +126,7 @@ void GPUPackInfo<GPUField_T>::unpackData(IBlock * receiver, stencil::Direction d
std::copy( bufPtr, static_cast< unsigned char * >( bufPtr + nrOfBytesToRead ), copyBufferPtr );
}
cudaStream_t & unpackStream = getStream(dir);
cudaStream_t & unpackStream = communicationStream_;
auto dstOffset = std::make_tuple( uint_c(fieldCi.xMin() + nrOfGhostLayers),
uint_c(fieldCi.yMin() + nrOfGhostLayers),
......@@ -201,7 +182,7 @@ void GPUPackInfo<GPUField_T>::communicateLocal(const IBlock * sender, IBlock * r
CellInterval sCi = field::getSliceBeforeGhostLayer( *sf, dir, nrOfGhostLayers, false );
CellInterval rCi = field::getGhostRegion( *rf, stencil::inverseDir[dir], nrOfGhostLayers, false );