BufferSystem.impl.h 21.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//======================================================================================================================
//
//  This file is part of waLBerla. waLBerla is free software: you can 
//  redistribute it and/or modify it under the terms of the GNU General Public
//  License as published by the Free Software Foundation, either version 3 of 
//  the License, or (at your option) any later version.
//  
//  waLBerla is distributed in the hope that it will be useful, but WITHOUT 
//  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
//  FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License 
//  for more details.
//  
//  You should have received a copy of the GNU General Public License along
//  with waLBerla (see COPYING.txt). If not, see <http://www.gnu.org/licenses/>.
//
16
//! \file GenericBufferSystem.cpp
17
18
19
20
21
22
23
//! \ingroup core
//! \author Martin Bauer <martin.bauer@fau.de>
//
//======================================================================================================================

#include "core/logging/Logging.h"
#include "core/mpi/MPIManager.h"
24
#include "core/debug/CheckFunctions.h"
25
26
27
28
29
30


namespace walberla {
namespace mpi {


31
32
template< typename Rb, typename Sb>
std::set<int> GenericBufferSystem<Rb, Sb>::activeTags_;
33

34
35
36
37
38
39
40
41

//======================================================================================================================
//
//  Iterator
//
//======================================================================================================================


42
43
template< typename Rb, typename Sb>
GenericBufferSystem<Rb, Sb>::iterator::iterator( GenericBufferSystem<Rb, Sb> & bufferSystem, bool begin )
44
    : bufferSystem_( bufferSystem), currentRecvBuffer_( nullptr ), currentSenderRank_( -1 )
45
46
47
48
49
{
   if ( begin ) // init iterator
      ++(*this);
}

50
51
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::iterator::operator++()
52
53
54
55
{
   currentRecvBuffer_ = bufferSystem_.waitForNext( currentSenderRank_ );
   if ( ! currentRecvBuffer_ ) {
      WALBERLA_ASSERT_EQUAL( currentSenderRank_, -1 );
56
57
58
   } else
   {
      bufferSystem_.bytesReceived_ += currentRecvBuffer_->size() * sizeof(RecvBuffer::ElementType);
59
      bufferSystem_.numberOfReceives_ += 1;
60
61
62
   }
}

63
64
template< typename Rb, typename Sb>
bool GenericBufferSystem<Rb, Sb>::iterator::operator==( const typename GenericBufferSystem<Rb, Sb>::iterator & other )
65
66
67
68
69
70
71
{
   // only equality checks with end iterators are allowed
   WALBERLA_ASSERT( other.currentSenderRank_ == -1 || currentSenderRank_ == -1 );

   return ( currentSenderRank_ == other.currentSenderRank_ );
}

72
73
template< typename Rb, typename Sb>
bool GenericBufferSystem<Rb, Sb>::iterator::operator!=( const typename GenericBufferSystem<Rb, Sb>::iterator & other )
74
75
76
77
78
79
80
81
{
   // only equality checks with end iterators are allowed
   WALBERLA_ASSERT( other.currentSenderRank_ == -1 || currentSenderRank_ == -1 );

   return ( currentSenderRank_ != other.currentSenderRank_ );
}


82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
template< typename Rb, typename Sb>
template<typename Range>
void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const Range & range, bool changingSize )
{
   setReceiverInfo( range.begin(), range.end(), changingSize );
}

template< typename Rb, typename Sb>
template<typename RankIter>
void GenericBufferSystem<Rb, Sb>::setReceiverInfo( RankIter rankBegin, RankIter rankEnd, bool changingSize )
{
   WALBERLA_ASSERT( ! communicationRunning_ );

   recvInfos_.clear();
   for ( auto it = rankBegin; it != rankEnd; ++it )
   {
      const MPIRank sender = *it;
      recvInfos_[ sender ].size = INVALID_SIZE;
   }

   sizeChangesEverytime_ = changingSize;
   setCommunicationType( false );
}

template< typename Rb, typename Sb>
inline size_t GenericBufferSystem<Rb, Sb>::size() const
{
   size_t sum = 0;
   for( auto iter = sendInfos_.begin(); iter != sendInfos_.end(); ++iter )
   {
      sum += iter->second.buffer.size();
   }
   return sum;
}


118
119
120
121
122
123
124
125


//======================================================================================================================
//
//  Constructors
//
//======================================================================================================================

126
127
template< typename Rb, typename Sb>
GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const MPI_Comm & communicator, int tag )
128
129
   : knownSizeComm_  ( communicator, tag ),
     unknownSizeComm_( communicator, tag ),
130
     unknownSizeCommIProbe_( communicator, tag ),
131
     noMPIComm_( communicator, tag ),
132
     currentComm_    ( nullptr ),
133
134
135
136
137
     sizeChangesEverytime_( true ),
     communicationRunning_( false )
{
}

138
139
template< typename Rb, typename Sb>
GenericBufferSystem<Rb, Sb>::GenericBufferSystem( const GenericBufferSystem &other )
140
141
   : knownSizeComm_  ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
     unknownSizeComm_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
142
     unknownSizeCommIProbe_( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
143
     noMPIComm_      ( other.knownSizeComm_.getCommunicator(), other.knownSizeComm_.getTag() ),
144
     currentComm_ ( nullptr ),
145
146
147
148
149
     sizeChangesEverytime_( other.sizeChangesEverytime_ ),
     communicationRunning_( other.communicationRunning_ ),
     recvInfos_( other.recvInfos_ ),
     sendInfos_( other.sendInfos_ )
{
150
   WALBERLA_ASSERT( !communicationRunning_, "Can't copy GenericBufferSystem while communication is running" );
151
152
153
154
   if( other.currentComm_ == &other.knownSizeComm_ )
      currentComm_ = &knownSizeComm_;
   else if ( other.currentComm_ == &other.unknownSizeComm_ )
      currentComm_ = &unknownSizeComm_;
155
156
   else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
      currentComm_ = &unknownSizeCommIProbe_;
157
158
159
   else if ( other.currentComm_ == &other.noMPIComm_ )
      currentComm_ = &noMPIComm_;
   else
160
      currentComm_ = nullptr; // receiver information not yet set
161
162
}

163
164
template< typename Rb, typename Sb>
GenericBufferSystem<Rb, Sb> & GenericBufferSystem<Rb, Sb>::operator=( const GenericBufferSystem<Rb, Sb> & other )
165
{
166
   WALBERLA_ASSERT( !communicationRunning_, "Can't copy GenericBufferSystem while communication is running" );
167
168
169
170
171
172
173
174
175
176

   sizeChangesEverytime_ = other.sizeChangesEverytime_;
   communicationRunning_ = other.communicationRunning_;
   recvInfos_ = other.recvInfos_;
   sendInfos_ = other.sendInfos_;

   if( other.currentComm_ == &other.knownSizeComm_ )
      currentComm_ = &knownSizeComm_;
   else if ( other.currentComm_ == &other.unknownSizeComm_ )
      currentComm_ = &unknownSizeComm_;
177
178
   else if ( other.currentComm_ == &other.unknownSizeCommIProbe_ )
      currentComm_ = &unknownSizeCommIProbe_;
179
180
181
   else if ( other.currentComm_ == &other.noMPIComm_ )
      currentComm_ = &noMPIComm_;
   else
182
      currentComm_ = nullptr; // receiver information not yet set
183
184
185
186

   return *this;
}

187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
//======================================================================================================================
//
//  Receive Information Setup
//
//======================================================================================================================


//**********************************************************************************************************************
/*! Sets receiver information, when message sizes are unknown
*
* \param ranksToRecvFrom  set of all ranks where messages are received
* \param changingSize     true if the message size is different in each communication step.
*                         If false the message size is exchanged once and is expected to be constant
*                         If true the message size is exchanged before each communication step.
*                         The behavior can be changed later one using setReceiverInfo() or sizeHasChanged().
*/
//**********************************************************************************************************************
204
205
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const std::set<MPIRank> & ranksToRecvFrom, bool changingSize )
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
{
   WALBERLA_ASSERT( ! communicationRunning_ );

   recvInfos_.clear();
   for ( auto it = ranksToRecvFrom.begin(); it != ranksToRecvFrom.end(); ++it )
   {
      const MPIRank sender = *it;
      recvInfos_[ sender ].size = INVALID_SIZE;
   }

   sizeChangesEverytime_ = changingSize;
   setCommunicationType( false ); // no size information on first run -> UnknownSizeCommunication
}



222
223
224
225
226
227
228
229
230
231
232
233
234
235
//**********************************************************************************************************************
/*! Sets receiver information, when the number of receives is known but the ranks are unknown
*
* \param numReceives number of expected messages
*/
//**********************************************************************************************************************
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const int numReceives )
{
   WALBERLA_ABORT("NOT IMPLEMENTED!");
}



236
237
238
239
240
241
242
243
244
//**********************************************************************************************************************
/*! Sets receiver information, when message sizes are known
*
* \param ranksToRecvFrom  Map containing all ranks, where messages are received from, as keys
*                         and the message sizes as values.
*                         The message sizes are expected to be constant for all communication step until
*                         behavior is changed with setReceiverInfo*() or sizeHasChanged()
*/
//**********************************************************************************************************************
245
246
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::setReceiverInfo( const std::map<MPIRank,MPISize> & ranksToRecvFrom )
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
{
   WALBERLA_ASSERT( ! communicationRunning_ );

   recvInfos_.clear();
   for ( auto it = ranksToRecvFrom.begin(); it != ranksToRecvFrom.end(); ++it )
   {
      const MPIRank sender       = it->first;
      const MPISize senderSize   = it->second;
      WALBERLA_ASSERT_GREATER( senderSize, 0 );
      recvInfos_[ sender ].size   = senderSize;
   }

   sizeChangesEverytime_ = false;
   setCommunicationType( true );
}



//**********************************************************************************************************************
/*! Sets receiver information, using SendBuffers (symmetric communication)
*
268
* Gives the GenericBufferSystem the information that messages are received from the same processes that we
269
270
271
272
273
274
275
276
277
278
279
280
281
282
* send to (i.e. from all ranks where SendBuffers were already filled )
* sendBuffer() has to be called before, and corresponding SendBuffers have to be filled.
*
*
* \param useSizeFromSendBuffers  If true, the sizes are expected to be known and equal to the size
*                                of the SendBuffers. SendBuffers with zero size are ignored.
*                                If false, all SendBuffers (also with zero size) are registered as ranks where
*                                messages are received from. The size is unknown, and communicated before.
*
* \param changingSize            if true the size is communicated before every communication step.
*                                if useSizeFromSendBuffer==true and changingSize==true, the size is not
*                                communicated in the first step but in all following steps.
*/
//**********************************************************************************************************************
283
284
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::setReceiverInfoFromSendBufferState( bool useSizeFromSendBuffers, bool changingSize )
285
286
287
288
289
290
291
{
   WALBERLA_ASSERT( ! communicationRunning_ );

   recvInfos_.clear();
   for ( auto it = sendInfos_.begin(); it != sendInfos_.end(); ++it )
   {
      const MPIRank sender = it->first;
292
      const Sb & buffer = it->second.buffer;
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307

      if ( buffer.size() == 0 && useSizeFromSendBuffers )
         continue;

      recvInfos_[ sender ].size  = useSizeFromSendBuffers ? int_c( buffer.size() )  : INVALID_SIZE;
   }

   sizeChangesEverytime_ = changingSize;

   setCommunicationType( useSizeFromSendBuffers );
}



//**********************************************************************************************************************
308
/*! Notifies that GenericBufferSystem that message sizes have changed ( and optionally are changing in all following steps)
309
*
310
* Useful when setReceiverInfo was set such that GenericBufferSystem assumes constant message sizes for all steps.
311
312
313
314
315
316
* Can only be called if no communication is currently running.
*
* \param alwaysChangingSize  if true the message sizes is communicated in all following steps, if false
*                            only in the next step.
*/
//**********************************************************************************************************************
317
318
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::sizeHasChanged( bool alwaysChangingSize )
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
{
   WALBERLA_ASSERT( ! communicationRunning_ );

   sizeChangesEverytime_ = alwaysChangingSize;
   setCommunicationType( false );
}



//======================================================================================================================
//
//  Step 1: Schedule Receives and ISends
//
//======================================================================================================================



//**********************************************************************************************************************
/*! Returns an existing SendBuffer, or creates a new one (only if !isCommunicationRunning() )
*
* \param rank  the rank where the buffer should be sent to
*/
//**********************************************************************************************************************
342
343
template< typename Rb, typename Sb>
Sb & GenericBufferSystem<Rb, Sb>::sendBuffer( MPIRank rank )
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
{
   return sendInfos_[rank].buffer;
}





//**********************************************************************************************************************
/*! Sends filled ( nonzero length) SendBuffers to their destination ranks
*
* If some of the SendBuffers have been sent manually before using send(int rank) they are skipped,
* only the remaining buffers are sent.
*
* If communication was not started before, it is started with this function.
*/
//**********************************************************************************************************************
361
362
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::sendAll()
363
364
365
366
367
368
369
370
371
372
373
{
   WALBERLA_ASSERT_NOT_NULLPTR( currentComm_ ); // call setReceiverInfo first!

   if ( !communicationRunning_ )
      startCommunication();

   for( auto iter = sendInfos_.begin(); iter != sendInfos_.end(); ++iter )
   {
      if ( ! iter->second.alreadySent )
      {
         if ( iter->second.buffer.size() > 0 )
374
         {
375
376
            bytesSent_     += iter->second.buffer.size() * sizeof(SendBuffer::ElementType);
            numberOfSends_ += 1;
377
            currentComm_->send( iter->first, iter->second.buffer );
378
         }
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393

         iter->second.alreadySent = true;
      }
   }
}


//**********************************************************************************************************************
/*! Sends a single SendBuffer to its destination rank.
*
* If SendBuffer is empty no message is sent.
*
* If communication was not started before, it is started with this function.
*/
//**********************************************************************************************************************
394
395
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::send( MPIRank rank )
396
397
398
399
400
401
402
403
404
405
406
{
   WALBERLA_ASSERT_NOT_NULLPTR( currentComm_ ); // call setReceiverInfo first!

   if ( !communicationRunning_ )
      startCommunication();

   auto iter = sendInfos_.find(rank);
   WALBERLA_ASSERT( iter != sendInfos_.end() );   // no send buffer was created for this rank
   WALBERLA_ASSERT( ! iter->second.alreadySent ); // this buffer has already been sent

   if ( iter->second.buffer.size() > 0 )
407
   {
408
409
      bytesSent_     += iter->second.buffer.size() * sizeof(SendBuffer::ElementType);
      numberOfSends_ += 1;
410
      currentComm_->send( rank, iter->second.buffer );
411
   }
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430

   iter->second.alreadySent = true;
}



//======================================================================================================================
//
//  Private Helper Functions
//
//======================================================================================================================


//**********************************************************************************************************************
/*! Starts communication
*
* - schedules receives and reserves space for MPI_Request vectors in the currentComm_ member
*/
//**********************************************************************************************************************
431
432
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::startCommunication()
433
{
434
435
436
437
438
   // Clear receive buffers
   for( auto iter = recvInfos_.begin(); iter != recvInfos_.end(); ++iter )  {
      iter->second.buffer.clear();
   }

439
   const auto tag = currentComm_->getTag();
440
441
442
   WALBERLA_CHECK_EQUAL(activeTags_.find(tag), activeTags_.end(),
                        "Another communication with the same MPI tag is currently in progress.");
   activeTags_.insert(tag);
443

444
   WALBERLA_CHECK( ! communicationRunning_ );
445
446
447

   currentComm_->scheduleReceives( recvInfos_ );
   communicationRunning_ = true;
448

449
450
451
452
453
   bytesSent_        = 0;
   bytesReceived_    = 0;

   numberOfSends_    = 0;
   numberOfReceives_ = 0;
454
455
456
457
458
459
460
461
462
463
464
465
}



//**********************************************************************************************************************
/*! Cleanup after communication
*
* - wait for sends to complete
* - clear buffers
* - manage sizeChangesEverytime
*/
//**********************************************************************************************************************
466
467
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::endCommunication()
468
{
469
   WALBERLA_CHECK( communicationRunning_ );
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
   currentComm_->waitForSends();

   // Clear send buffers
   for( auto iter = sendInfos_.begin(); iter != sendInfos_.end(); ++iter )
   {
      iter->second.alreadySent = false;
      iter->second.buffer.clear();
   }


   if( !sizeChangesEverytime_ )
      setCommunicationType( true );

   communicationRunning_ = false;

   activeTags_.erase( activeTags_.find( currentComm_->getTag() ) );
}



//**********************************************************************************************************************
/*! Helper function for iterator
*
*  See documentation of AbstractCommunication::waitForNext()
*/
//**********************************************************************************************************************
496
497
template< typename Rb, typename Sb>
Rb * GenericBufferSystem<Rb, Sb>::waitForNext( MPIRank & fromRank )
498
499
500
501
502
503
504
505
506
507
{
   WALBERLA_ASSERT( communicationRunning_ );

   fromRank = currentComm_->waitForNextReceive( recvInfos_ );

   if( fromRank >= 0 )
      return & ( recvInfos_[fromRank].buffer );
   else
   {
      endCommunication();
508
      return nullptr;
509
510
511
512
513
514
515
516
517
518
   }

}



//**********************************************************************************************************************
/*! Sets the communication type to known size, unknown size or NoMPI comm
*/
//**********************************************************************************************************************
519
520
template< typename Rb, typename Sb>
void GenericBufferSystem<Rb, Sb>::setCommunicationType( const bool knownSize )
521
522
523
524
525
526
527
528
529
530
{
   WALBERLA_NON_MPI_SECTION()
   {
      currentComm_ = &noMPIComm_;
   }

   WALBERLA_MPI_SECTION()
   {
      if( knownSize )
         currentComm_ = &knownSizeComm_;
531
532
      else if ( useIProbe_ )
         currentComm_ = &unknownSizeCommIProbe_;
533
534
535
536
537
538
539
540
541
542
543
544
      else
         currentComm_ = &unknownSizeComm_;
   }
}


//======================================================================================================================
//
//  Rank Ranges
//
//======================================================================================================================

545
546
template< typename Rb, typename Sb>
typename GenericBufferSystem<Rb, Sb>::RankRange GenericBufferSystem<Rb,Sb>::noRanks()
547
{
548
   return RankRange();
549
}
550
551
template< typename Rb, typename Sb>
typename GenericBufferSystem<Rb, Sb>::RankRange GenericBufferSystem<Rb,Sb>::allRanks()
552
553
{
   int numProcesses = MPIManager::instance()->numProcesses();
554
555
556
   RankRange r;
   std::generate_n( std::inserter(r, r.end()), numProcesses, [&]{ return MPIRank(r.size()); } );
   return r;
557
}
558
559
template< typename Rb, typename Sb>
typename GenericBufferSystem<Rb, Sb>::RankRange GenericBufferSystem<Rb,Sb>::allRanksButRoot()
560
561
{
   int numProcesses = MPIManager::instance()->numProcesses();
562
563
564
   RankRange r;
   std::generate_n( std::inserter(r, r.end()), numProcesses-1, [&]{ return MPIRank(r.size()+size_t(1)); } );
   return r;
565
}
566
567
template< typename Rb, typename Sb>
typename GenericBufferSystem<Rb, Sb>::RankRange GenericBufferSystem<Rb,Sb>::onlyRank( int includedRank )
568
569
{
   WALBERLA_ASSERT_LESS( includedRank, MPIManager::instance()->numProcesses() );
570
   return RankRange { includedRank };
571
572
}

573
574
template< typename Rb, typename Sb>
typename GenericBufferSystem<Rb, Sb>::RankRange GenericBufferSystem<Rb,Sb>::onlyRoot()
575
{
576
   return RankRange { 0 };
577
578
579
580
581
582
583
584
585
586
587
588
589
}



} // namespace mpi
} // namespace walberla