p2pcommunicator.hh
1 /*
2  Copyright 2015 IRIS AS
3 
4  This file is part of the Open Porous Media project (OPM).
5 
6  OPM is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  OPM is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with OPM. If not, see <http://www.gnu.org/licenses/>.
18 */
19 #ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
20 #define DUNE_COMMUNICATOR_HEADER_INCLUDED
21 
22 #include <algorithm>
23 #include <vector>
24 #include <set>
25 #include <map>
26 
27 #include <dune/common/version.hh>
28 
29 #if DUNE_VERSION_NEWER(DUNE_COMMON,2,3)
30 #include <dune/common/parallel/mpihelper.hh>
31 #include <dune/common/parallel/collectivecommunication.hh>
32 
33 // the following implementation is only available in case MPI is available
34 #if HAVE_MPI
35 #include <dune/common/parallel/mpicollectivecommunication.hh>
36 #endif
37 #else // DUNE_COMMON 2.2
38 #include <dune/common/mpihelper.hh>
39 #include <dune/common/collectivecommunication.hh>
40 
41 // the following implementation is only available in case MPI is available
42 #if HAVE_MPI
43 #include <dune/common/mpicollectivecommunication.hh>
44 #endif
45 #endif // #if DUNE_VERSION_NEWER()
46 
47 
48 namespace Dune
49 {
51  {
52  typedef std::vector< char > BufferType;
53 
54  mutable BufferType buffer_;
55  const double factor_;
56  mutable size_t pos_;
57 public:
60  SimpleMessageBuffer( const double factor = 1.1 )
61  : buffer_(), factor_( factor )
62  {
64  }
65 
67  void clear() { buffer_.clear(); resetReadPosition(); }
69  void resetReadPosition() { pos_ = 0 ; }
71  size_t size() const { return buffer_.size(); }
72 
74  void reserve( const size_t size )
75  {
76  buffer_.reserve( size );
77  }
78 
80  void resize( const size_t size )
81  {
82  buffer_.resize( size );
83  }
84 
86  template <class T>
87  void write( const T& value )
88  {
89  // union to access bytes in value
90  const size_t tsize = sizeof( T );
91  size_t pos = buffer_.size();
92  const size_t sizeNeeded = pos + tsize ;
93  // reserve with some 10% overestimation
94  if( buffer_.capacity() < sizeNeeded )
95  {
96  reserve( size_t(factor_ * sizeNeeded) ) ;
97  }
98  // resize to size need to store value
99  buffer_.resize( sizeNeeded );
100  // copy value to buffer
101  std::copy_n( reinterpret_cast<const char *> (&value), tsize, buffer_.data()+pos );
102  }
103 
105  template <class T>
106  void read( T& value ) const
107  {
108  // read bytes from stream and store in value
109  const size_t tsize = sizeof( T );
110  assert( pos_ + tsize <= buffer_.size() );
111  std::copy_n( buffer_.data()+pos_, tsize, reinterpret_cast<char *> (&value) );
112  pos_ += tsize;
113  }
114 
116  std::pair< char* , int > buffer() const
117  {
118  return std::make_pair( buffer_.data(), int(buffer_.size()) );
119  }
120  };
121 
123  template < class MsgBuffer >
124  class Point2PointCommunicator : public CollectiveCommunication< MPIHelper::MPICommunicator >
125  {
126  public:
128  typedef MPIHelper::MPICommunicator MPICommunicator ;
129 
131  typedef MsgBuffer MessageBufferType ;
132 
133  protected:
134  typedef CollectiveCommunication< MPICommunicator > BaseType;
136 
137  // starting message tag
138  static const int messagetag = 234;
139 
140  typedef std::map< int, int > linkage_t;
141  typedef std::vector< int > vector_t;
142 
143  linkage_t sendLinkage_ ;
144  linkage_t recvLinkage_ ;
145 
146  vector_t sendDest_ ;
147  vector_t recvSource_ ;
148 
149  mutable vector_t _recvBufferSizes;
150  mutable bool _recvBufferSizesComputed;
151 
152  public :
153  using BaseType :: rank;
154  using BaseType :: size;
155 
156  /* \brief data handle interface that needs to be implemented for use with some of
157  * the exchange methods */
159  {
160  protected:
161  DataHandleInterface () {}
162  public:
163  virtual ~DataHandleInterface () {}
164  virtual void pack( const int link, MessageBufferType& os ) = 0 ;
165  virtual void unpack( const int link, MessageBufferType& os ) = 0 ;
166  // should contain work that could be done between send and receive
167  virtual void localComputation () {}
168  };
169 
170  public:
172  Point2PointCommunicator( const MPICommunicator& mpiComm = MPIHelper::getCommunicator() )
173  : BaseType( mpiComm ) { removeLinkage(); }
174 
176  Point2PointCommunicator( const BaseType& comm ) : BaseType( comm ) { removeLinkage(); }
177 
178 
180  inline void insertRequest( const std::set< int >& sendLinks, const std::set< int >& recvLinks );
181 
183  inline int sendLinks () const { return sendLinkage_.size(); }
184 
186  inline int recvLinks () const { return recvLinkage_.size(); }
187 
189  const vector_t& recvBufferSizes() const { return _recvBufferSizes; }
190 
192  inline int sendLink (const int rank) const
193  {
194  assert (sendLinkage_.end () != sendLinkage_.find (rank)) ;
195  return (* sendLinkage_.find (rank)).second ;
196  }
197 
199  inline int recvLink (const int rank) const
200  {
201  assert (recvLinkage_.end () != recvLinkage_.find (rank)) ;
202  return (* recvLinkage_.find (rank)).second ;
203  }
204 
206  const std::vector< int > &sendDest () const { return sendDest_; }
208  const std::vector< int > &recvSource () const { return recvSource_; }
209 
211  inline void removeLinkage () ;
212 
214  virtual std::vector< MessageBufferType > exchange (const std::vector< MessageBufferType > &) const;
215 
217  virtual void exchange ( DataHandleInterface& ) const;
218 
222  virtual void exchangeCached ( DataHandleInterface& ) const;
223 
224  protected:
225  inline void computeDestinations( const linkage_t& linkage, vector_t& dest );
226 
227  // return new tag number for the exchange messages
228  static int getMessageTag( const unsigned int increment )
229  {
230  static int tag = messagetag + 2 ;
231  // increase tag counter
232  const int retTag = tag;
233  tag += increment ;
234  // the MPI standard guaratees only up to 2^15-1
235  if( tag >= 32767 )
236  {
237  // reset tag to initial value
238  tag = messagetag + 2 ;
239  }
240  return retTag;
241  }
242 
243  // return new tag number for the exchange messages
244  static int getMessageTag()
245  {
246  return getMessageTag( 1 );
247  }
248  };
249 
250 } // namespace Dune
251 
252 // include inline implementation
253 #include "p2pcommunicator_impl.hh"
254 
255 #endif // #ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
SimpleMessageBuffer(const double factor=1.1)
constructor taking memory reserve estimation factor (default is 1.1, i.e.
Definition: p2pcommunicator.hh:60
virtual void exchangeCached(DataHandleInterface &) const
exchange data with peers, handle defines pack and unpack of data, if receive buffers are known from p...
Definition: p2pcommunicator_impl.hh:612
std::pair< char *, int > buffer() const
return pointer to buffer and size for use with MPI functions
Definition: p2pcommunicator.hh:116
void removeLinkage()
remove stored linkage
Definition: p2pcommunicator_impl.hh:30
void read(T &value) const
read value from buffer, value must implement the operator= correctly (i.e.
Definition: p2pcommunicator.hh:106
void resize(const size_t size)
resize buffer to &#39;size&#39; entries
Definition: p2pcommunicator.hh:80
void write(const T &value)
write value to buffer, value must implement the operator= correctly (i.e.
Definition: p2pcommunicator.hh:87
Point2PointCommunicator(const MPICommunicator &mpiComm=MPIHelper::getCommunicator())
constructor taking mpi communicator
Definition: p2pcommunicator.hh:172
Holds the implementation of the CpGrid as a pimple.
Definition: OpmParserIncludes.hpp:42
int recvLink(const int rank) const
return recv link number for a given recv rank number
Definition: p2pcommunicator.hh:199
const std::vector< int > & sendDest() const
return vector containing all process numbers we will send to
Definition: p2pcommunicator.hh:206
size_t size() const
return size of buffer
Definition: p2pcommunicator.hh:71
MsgBuffer MessageBufferType
type of message buffer used
Definition: p2pcommunicator.hh:131
void clear()
clear the buffer
Definition: p2pcommunicator.hh:67
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition: p2pcommunicator_impl.hh:595
Definition: p2pcommunicator.hh:158
int recvLinks() const
return number of processes we will receive data from
Definition: p2pcommunicator.hh:186
int sendLinks() const
return number of processes we will send data to
Definition: p2pcommunicator.hh:183
Point-2-Point communicator for exchange messages between processes.
Definition: p2pcommunicator.hh:124
MPIHelper::MPICommunicator MPICommunicator
type of MPI communicator, either MPI_Comm or NoComm as defined in MPIHelper
Definition: p2pcommunicator.hh:128
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from ...
Definition: p2pcommunicator_impl.hh:59
const std::vector< int > & recvSource() const
return vector containing all process numbers we will receive from
Definition: p2pcommunicator.hh:208
Definition: p2pcommunicator.hh:50
void resetReadPosition()
reset read position of buffer to beginning
Definition: p2pcommunicator.hh:69
void reserve(const size_t size)
reserve memory for &#39;size&#39; entries
Definition: p2pcommunicator.hh:74
const vector_t & recvBufferSizes() const
return vector containing possible recv buffer sizes
Definition: p2pcommunicator.hh:189
int sendLink(const int rank) const
return send link number for a given send rank number
Definition: p2pcommunicator.hh:192
Point2PointCommunicator(const BaseType &comm)
constructor taking collective communication
Definition: p2pcommunicator.hh:176