Tpetra parallel linear algebra  Version of the Day
Tpetra_Distributor.cpp
1 // ***********************************************************************
2 //
3 // Tpetra: Templated Linear Algebra Services Package
4 // Copyright (2008) Sandia Corporation
5 //
6 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7 // the U.S. Government retains certain rights in this software.
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are
11 // met:
12 //
13 // 1. Redistributions of source code must retain the above copyright
14 // notice, this list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright
17 // notice, this list of conditions and the following disclaimer in the
18 // documentation and/or other materials provided with the distribution.
19 //
20 // 3. Neither the name of the Corporation nor the names of the
21 // contributors may be used to endorse or promote products derived from
22 // this software without specific prior written permission.
23 //
24 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 //
36 // Questions? Contact Michael A. Heroux (maherou@sandia.gov)
37 //
38 // ************************************************************************
39 // @HEADER
40 
41 #include "Tpetra_Distributor.hpp"
42 #include "Teuchos_StandardParameterEntryValidators.hpp"
43 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
44 
45 
46 namespace Tpetra {
47  namespace Details {
48  std::string
50  {
51  if (sendType == DISTRIBUTOR_ISEND) {
52  return "Isend";
53  }
54  else if (sendType == DISTRIBUTOR_RSEND) {
55  return "Rsend";
56  }
57  else if (sendType == DISTRIBUTOR_SEND) {
58  return "Send";
59  }
60  else if (sendType == DISTRIBUTOR_SSEND) {
61  return "Ssend";
62  }
63  else {
64  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
65  "EDistributorSendType enum value " << sendType << ".");
66  }
67  }
68 
69  std::string
71  {
72  switch (how) {
73  case Details::DISTRIBUTOR_NOT_INITIALIZED:
74  return "Not initialized yet";
75  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
76  return "By createFromSends";
77  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
78  return "By createFromRecvs";
79  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
80  return "By createReverseDistributor";
81  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
82  return "By copy constructor";
83  default:
84  return "INVALID";
85  }
86  }
87  } // namespace Details
88 
89  Array<std::string>
91  {
92  Array<std::string> sendTypes;
93  sendTypes.push_back ("Isend");
94  sendTypes.push_back ("Rsend");
95  sendTypes.push_back ("Send");
96  sendTypes.push_back ("Ssend");
97  return sendTypes;
98  }
99 
100  // We set default values of Distributor's Boolean parameters here,
101  // in this one place. That way, if we want to change the default
102  // value of a parameter, we don't have to search the whole file to
103  // ensure a consistent setting.
104  namespace {
105  // Default value of the "Debug" parameter.
106  const bool tpetraDistributorDebugDefault = false;
107  // Default value of the "Barrier between receives and sends" parameter.
108  const bool barrierBetween_default = false;
109  // Default value of the "Use distinct tags" parameter.
110  const bool useDistinctTags_default = true;
111  // Default value of the "Enable MPI CUDA RDMA support"
112 #ifdef TPETRA_ENABLE_MPI_CUDA_RDMA
113  const bool enable_cuda_rdma_default = true;
114 #else
115  const bool enable_cuda_rdma_default = false;
116 #endif
117  } // namespace (anonymous)
118 
119  int Distributor::getTag (const int pathTag) const {
120  return useDistinctTags_ ? pathTag : comm_->getTag ();
121  }
122 
123 
124 #ifdef TPETRA_DISTRIBUTOR_TIMERS
125  void Distributor::makeTimers () {
126  const std::string name_doPosts3 = "Tpetra::Distributor: doPosts(3)";
127  const std::string name_doPosts4 = "Tpetra::Distributor: doPosts(4)";
128  const std::string name_doWaits = "Tpetra::Distributor: doWaits";
129  const std::string name_doPosts3_recvs = "Tpetra::Distributor: doPosts(3): recvs";
130  const std::string name_doPosts4_recvs = "Tpetra::Distributor: doPosts(4): recvs";
131  const std::string name_doPosts3_barrier = "Tpetra::Distributor: doPosts(3): barrier";
132  const std::string name_doPosts4_barrier = "Tpetra::Distributor: doPosts(4): barrier";
133  const std::string name_doPosts3_sends = "Tpetra::Distributor: doPosts(3): sends";
134  const std::string name_doPosts4_sends = "Tpetra::Distributor: doPosts(4): sends";
135 
136  timer_doPosts3_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3);
137  timer_doPosts4_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4);
138  timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (name_doWaits);
139  timer_doPosts3_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_recvs);
140  timer_doPosts4_recvs_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_recvs);
141  timer_doPosts3_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_barrier);
142  timer_doPosts4_barrier_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_barrier);
143  timer_doPosts3_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts3_sends);
144  timer_doPosts4_sends_ = Teuchos::TimeMonitor::getNewTimer (name_doPosts4_sends);
145  }
146 #endif // TPETRA_DISTRIBUTOR_TIMERS
147 
148  void
149  Distributor::init (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
150  const Teuchos::RCP<Teuchos::FancyOStream>& out,
151  const Teuchos::RCP<Teuchos::ParameterList>& plist)
152  {
153  this->out_ = out.is_null () ?
154  Teuchos::getFancyOStream (Teuchos::rcpFromRef (std::cerr)) : out;
155  if (! plist.is_null ()) {
156  this->setParameterList (plist);
157  }
158 
159 #ifdef TPETRA_DISTRIBUTOR_TIMERS
160  makeTimers ();
161 #endif // TPETRA_DISTRIBUTOR_TIMERS
162 
163  if (debug_) {
164  TEUCHOS_TEST_FOR_EXCEPTION
165  (out_.is_null (), std::logic_error, "Tpetra::Distributor::init: debug_ "
166  "is true but out_ (pointer to the output stream) is NULL. Please "
167  "report this bug to the Tpetra developers.");
168  Teuchos::OSTab tab (out_);
169  std::ostringstream os;
170  os << comm_->getRank ()
171  << ": Distributor ctor done" << std::endl;
172  *out_ << os.str ();
173  }
174  }
175 
176  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
177  : comm_ (comm)
178  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
179  , sendType_ (Details::DISTRIBUTOR_SEND)
180  , barrierBetween_ (barrierBetween_default)
181  , debug_ (tpetraDistributorDebugDefault)
182  , enable_cuda_rdma_ (enable_cuda_rdma_default)
183  , numExports_ (0)
184  , selfMessage_ (false)
185  , numSends_ (0)
186  , maxSendLength_ (0)
187  , numReceives_ (0)
188  , totalReceiveLength_ (0)
189  , lastRoundBytesSend_ (0)
190  , lastRoundBytesRecv_ (0)
191  , useDistinctTags_ (useDistinctTags_default)
192  {
193  init (comm, Teuchos::null, Teuchos::null);
194  }
195 
196  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
197  const Teuchos::RCP<Teuchos::FancyOStream>& out)
198  : comm_ (comm)
199  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
200  , sendType_ (Details::DISTRIBUTOR_SEND)
201  , barrierBetween_ (barrierBetween_default)
202  , debug_ (tpetraDistributorDebugDefault)
203  , enable_cuda_rdma_ (enable_cuda_rdma_default)
204  , numExports_ (0)
205  , selfMessage_ (false)
206  , numSends_ (0)
207  , maxSendLength_ (0)
208  , numReceives_ (0)
209  , totalReceiveLength_ (0)
210  , lastRoundBytesSend_ (0)
211  , lastRoundBytesRecv_ (0)
212  , useDistinctTags_ (useDistinctTags_default)
213  {
214  init (comm, out, Teuchos::null);
215  }
216 
217  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
218  const Teuchos::RCP<Teuchos::ParameterList>& plist)
219  : comm_ (comm)
220  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
221  , sendType_ (Details::DISTRIBUTOR_SEND)
222  , barrierBetween_ (barrierBetween_default)
223  , debug_ (tpetraDistributorDebugDefault)
224  , enable_cuda_rdma_ (enable_cuda_rdma_default)
225  , numExports_ (0)
226  , selfMessage_ (false)
227  , numSends_ (0)
228  , maxSendLength_ (0)
229  , numReceives_ (0)
230  , totalReceiveLength_ (0)
231  , lastRoundBytesSend_ (0)
232  , lastRoundBytesRecv_ (0)
233  , useDistinctTags_ (useDistinctTags_default)
234  {
235  init (comm, Teuchos::null, plist);
236  }
237 
238  Distributor::Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
239  const Teuchos::RCP<Teuchos::FancyOStream>& out,
240  const Teuchos::RCP<Teuchos::ParameterList>& plist)
241  : comm_ (comm)
242  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
243  , sendType_ (Details::DISTRIBUTOR_SEND)
244  , barrierBetween_ (barrierBetween_default)
245  , debug_ (tpetraDistributorDebugDefault)
246  , enable_cuda_rdma_ (enable_cuda_rdma_default)
247  , numExports_ (0)
248  , selfMessage_ (false)
249  , numSends_ (0)
250  , maxSendLength_ (0)
251  , numReceives_ (0)
252  , totalReceiveLength_ (0)
253  , lastRoundBytesSend_ (0)
254  , lastRoundBytesRecv_ (0)
255  , useDistinctTags_ (useDistinctTags_default)
256  {
257  init (comm, out, plist);
258  }
259 
260  Distributor::Distributor (const Distributor & distributor)
261  : comm_ (distributor.comm_)
262  , out_ (distributor.out_)
263  , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
264  , sendType_ (distributor.sendType_)
265  , barrierBetween_ (distributor.barrierBetween_)
266  , debug_ (distributor.debug_)
267  , enable_cuda_rdma_ (distributor.enable_cuda_rdma_)
268  , numExports_ (distributor.numExports_)
269  , selfMessage_ (distributor.selfMessage_)
270  , numSends_ (distributor.numSends_)
271  , imagesTo_ (distributor.imagesTo_)
272  , startsTo_ (distributor.startsTo_)
273  , lengthsTo_ (distributor.lengthsTo_)
274  , maxSendLength_ (distributor.maxSendLength_)
275  , indicesTo_ (distributor.indicesTo_)
276  , numReceives_ (distributor.numReceives_)
277  , totalReceiveLength_ (distributor.totalReceiveLength_)
278  , lengthsFrom_ (distributor.lengthsFrom_)
279  , imagesFrom_ (distributor.imagesFrom_)
280  , startsFrom_ (distributor.startsFrom_)
281  , indicesFrom_ (distributor.indicesFrom_)
282  , reverseDistributor_ (distributor.reverseDistributor_)
283  , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
284  , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
285  , useDistinctTags_ (distributor.useDistinctTags_)
286  {
287  using Teuchos::ParameterList;
288  using Teuchos::parameterList;
289  using Teuchos::RCP;
290  using Teuchos::rcp;
291 
292  // Clone the right-hand side's ParameterList, so that this' list
293  // is decoupled from the right-hand side's list. We don't need to
294  // do validation, since the right-hand side already has validated
295  // its parameters, so just call setMyParamList(). Note that this
296  // won't work if the right-hand side doesn't have a list set yet,
297  // so we first check for null.
298  RCP<const ParameterList> rhsList = distributor.getParameterList ();
299  if (! rhsList.is_null ()) {
300  this->setMyParamList (parameterList (* rhsList));
301  }
302 
303 #ifdef TPETRA_DISTRIBUTOR_TIMERS
304  makeTimers ();
305 #endif // TPETRA_DISTRIBUTOR_TIMERS
306 
307  if (debug_) {
308  TEUCHOS_TEST_FOR_EXCEPTION
309  (out_.is_null (), std::logic_error, "Tpetra::Distributor::init: debug_ "
310  "is true but out_ (pointer to the output stream) is NULL. Please "
311  "report this bug to the Tpetra developers.");
312  Teuchos::OSTab tab (out_);
313  std::ostringstream os;
314  os << comm_->getRank ()
315  << ": Distributor copy ctor done" << std::endl;
316  *out_ << os.str ();
317  }
318  }
319 
321  using Teuchos::ParameterList;
322  using Teuchos::parameterList;
323  using Teuchos::RCP;
324 
325  std::swap (comm_, rhs.comm_);
326  std::swap (out_, rhs.out_);
327  std::swap (howInitialized_, rhs.howInitialized_);
328  std::swap (sendType_, rhs.sendType_);
329  std::swap (barrierBetween_, rhs.barrierBetween_);
330  std::swap (debug_, rhs.debug_);
331  std::swap (enable_cuda_rdma_, rhs.enable_cuda_rdma_);
332  std::swap (numExports_, rhs.numExports_);
333  std::swap (selfMessage_, rhs.selfMessage_);
334  std::swap (numSends_, rhs.numSends_);
335  std::swap (imagesTo_, rhs.imagesTo_);
336  std::swap (startsTo_, rhs.startsTo_);
337  std::swap (lengthsTo_, rhs.lengthsTo_);
338  std::swap (maxSendLength_, rhs.maxSendLength_);
339  std::swap (indicesTo_, rhs.indicesTo_);
340  std::swap (numReceives_, rhs.numReceives_);
341  std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
342  std::swap (lengthsFrom_, rhs.lengthsFrom_);
343  std::swap (imagesFrom_, rhs.imagesFrom_);
344  std::swap (startsFrom_, rhs.startsFrom_);
345  std::swap (indicesFrom_, rhs.indicesFrom_);
346  std::swap (reverseDistributor_, rhs.reverseDistributor_);
347  std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
348  std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
349  std::swap (useDistinctTags_, rhs.useDistinctTags_);
350 
351  // Swap parameter lists. If they are the same object, make a deep
352  // copy first, so that modifying one won't modify the other one.
353  RCP<ParameterList> lhsList = this->getNonconstParameterList ();
354  RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
355  if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
356  rhsList = parameterList (*rhsList);
357  }
358  if (! rhsList.is_null ()) {
359  this->setMyParamList (rhsList);
360  }
361  if (! lhsList.is_null ()) {
362  rhs.setMyParamList (lhsList);
363  }
364 
365  // We don't need to swap timers, because all instances of
366  // Distributor use the same timers.
367  }
368 
370  {
371  // We shouldn't have any outstanding communication requests at
372  // this point.
373  TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
374  "Tpetra::Distributor: Destructor called with " << requests_.size()
375  << " outstanding posts (unfulfilled communication requests). There "
376  "should be none at this point. Please report this bug to the Tpetra "
377  "developers.");
378  }
379 
380  void
381  Distributor::setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
382  {
383  using Teuchos::FancyOStream;
384  using Teuchos::getIntegralValue;
385  using Teuchos::includesVerbLevel;
386  using Teuchos::OSTab;
387  using Teuchos::ParameterList;
388  using Teuchos::parameterList;
389  using Teuchos::RCP;
390  using std::endl;
391 
392  RCP<const ParameterList> validParams = getValidParameters ();
393  plist->validateParametersAndSetDefaults (*validParams);
394 
395  const bool barrierBetween =
396  plist->get<bool> ("Barrier between receives and sends");
397  const Details::EDistributorSendType sendType =
398  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
399  const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
400  const bool debug = plist->get<bool> ("Debug");
401  const bool enable_cuda_rdma = plist->get<bool> ("Enable MPI CUDA RDMA support");
402 
403  // We check this property explicitly, since we haven't yet learned
404  // how to make a validator that can cross-check properties.
405  // Later, turn this into a validator so that it can be embedded in
406  // the valid ParameterList and used in Optika.
407  TEUCHOS_TEST_FOR_EXCEPTION(
408  ! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
409  std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
410  << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
411  "between receives and sends." << endl << "This is invalid; you must "
412  "include the barrier if you use ready sends." << endl << "Ready sends "
413  "require that their corresponding receives have already been posted, "
414  "and the only way to guarantee that in general is with a barrier.");
415 
416  // Now that we've validated the input list, save the results.
417  sendType_ = sendType;
418  barrierBetween_ = barrierBetween;
419  useDistinctTags_ = useDistinctTags;
420  debug_ = debug;
421  enable_cuda_rdma_ = enable_cuda_rdma;
422 
423  // ParameterListAcceptor semantics require pointer identity of the
424  // sublist passed to setParameterList(), so we save the pointer.
425  this->setMyParamList (plist);
426  }
427 
428  Teuchos::RCP<const Teuchos::ParameterList>
430  {
431  using Teuchos::Array;
432  using Teuchos::ParameterList;
433  using Teuchos::parameterList;
434  using Teuchos::RCP;
435  using Teuchos::setStringToIntegralParameter;
436 
437  const bool barrierBetween = barrierBetween_default;
438  const bool useDistinctTags = useDistinctTags_default;
439  const bool debug = tpetraDistributorDebugDefault;
440  const bool enable_cuda_rdma = enable_cuda_rdma_default;
441 
442  Array<std::string> sendTypes = distributorSendTypes ();
443  const std::string defaultSendType ("Send");
444  Array<Details::EDistributorSendType> sendTypeEnums;
445  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
446  sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
447  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
448  sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
449 
450  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
451  plist->set ("Barrier between receives and sends", barrierBetween,
452  "Whether to execute a barrier between receives and sends in do"
453  "[Reverse]Posts(). Required for correctness when \"Send type\""
454  "=\"Rsend\", otherwise correct but not recommended.");
455  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
456  defaultSendType, "When using MPI, the variant of send to use in "
457  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
458  plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
459  "MPI message tags for different code paths.");
460  plist->set ("Debug", debug, "Whether to print copious debugging output on "
461  "all processes.");
462  plist->set ("Enable MPI CUDA RDMA support", enable_cuda_rdma,
463  "Whether to enable RDMA support for MPI communication between "
464  "CUDA GPUs. Only enable this if you know for sure your MPI "
465  "library supports it.");
466 
467  // mfh 24 Dec 2015: Tpetra no longer inherits from
468  // Teuchos::VerboseObject, so it doesn't need the "VerboseObject"
469  // sublist. However, we retain the "VerboseObject" sublist
470  // anyway, for backwards compatibility (otherwise the above
471  // validation would fail with an invalid parameter name, should
472  // the user still want to provide this list).
473  Teuchos::setupVerboseObjectSublist (&*plist);
474  return Teuchos::rcp_const_cast<const ParameterList> (plist);
475  }
476 
477 
479  { return totalReceiveLength_; }
480 
482  { return numReceives_; }
483 
485  { return selfMessage_; }
486 
488  { return numSends_; }
489 
491  { return maxSendLength_; }
492 
493  Teuchos::ArrayView<const int> Distributor::getImagesFrom() const
494  { return imagesFrom_; }
495 
496  Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
497  { return lengthsFrom_; }
498 
499  Teuchos::ArrayView<const int> Distributor::getImagesTo() const
500  { return imagesTo_; }
501 
502  Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
503  { return lengthsTo_; }
504 
505  Teuchos::RCP<Distributor>
507  if (reverseDistributor_.is_null ()) {
508  createReverseDistributor ();
509  }
510  return reverseDistributor_;
511  }
512 
513 
514  void
515  Distributor::createReverseDistributor() const
516  {
517  reverseDistributor_ = Teuchos::rcp (new Distributor (comm_));
518 
519  // The total length of all the sends of this Distributor. We
520  // calculate it because it's the total length of all the receives
521  // of the reverse Distributor.
522  size_t totalSendLength =
523  std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
524 
525  // The maximum length of any of the receives of this Distributor.
526  // We calculate it because it's the maximum length of any of the
527  // sends of the reverse Distributor.
528  size_t maxReceiveLength = 0;
529  const int myImageID = comm_->getRank();
530  for (size_t i=0; i < numReceives_; ++i) {
531  if (imagesFrom_[i] != myImageID) {
532  // Don't count receives for messages sent by myself to myself.
533  if (lengthsFrom_[i] > maxReceiveLength) {
534  maxReceiveLength = lengthsFrom_[i];
535  }
536  }
537  }
538 
539  // Initialize all of reverseDistributor's data members. This
540  // mainly just involves flipping "send" and "receive," or the
541  // equivalent "to" and "from."
542  reverseDistributor_->lengthsTo_ = lengthsFrom_;
543  reverseDistributor_->imagesTo_ = imagesFrom_;
544  reverseDistributor_->indicesTo_ = indicesFrom_;
545  reverseDistributor_->startsTo_ = startsFrom_;
546  reverseDistributor_->lengthsFrom_ = lengthsTo_;
547  reverseDistributor_->imagesFrom_ = imagesTo_;
548  reverseDistributor_->indicesFrom_ = indicesTo_;
549  reverseDistributor_->startsFrom_ = startsTo_;
550  reverseDistributor_->numSends_ = numReceives_;
551  reverseDistributor_->numReceives_ = numSends_;
552  reverseDistributor_->selfMessage_ = selfMessage_;
553  reverseDistributor_->maxSendLength_ = maxReceiveLength;
554  reverseDistributor_->totalReceiveLength_ = totalSendLength;
555  reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
556 
557  // Note: technically, I am my reverse distributor's reverse distributor, but
558  // we will not set this up, as it gives us an opportunity to test
559  // that reverseDistributor is an inverse operation w.r.t. value semantics of distributors
560  // Note: numExports_ was not copied
561  }
562 
563 
565  using Teuchos::Array;
566  using Teuchos::CommRequest;
567  using Teuchos::FancyOStream;
568  using Teuchos::includesVerbLevel;
569  using Teuchos::is_null;
570  using Teuchos::OSTab;
571  using Teuchos::RCP;
572  using Teuchos::waitAll;
573  using std::endl;
574 
575  Teuchos::OSTab tab (out_);
576 
577 #ifdef TPETRA_DISTRIBUTOR_TIMERS
578  Teuchos::TimeMonitor timeMon (*timer_doWaits_);
579 #endif // TPETRA_DISTRIBUTOR_TIMERS
580 
581  const int myRank = comm_->getRank ();
582 
583  if (debug_) {
584  std::ostringstream os;
585  os << myRank << ": doWaits: # reqs = "
586  << requests_.size () << endl;
587  *out_ << os.str ();
588  }
589 
590  if (requests_.size() > 0) {
591  waitAll (*comm_, requests_());
592 
593 #ifdef HAVE_TEUCHOS_DEBUG
594  // Make sure that waitAll() nulled out all the requests.
595  for (Array<RCP<CommRequest<int> > >::const_iterator it = requests_.begin();
596  it != requests_.end(); ++it)
597  {
598  TEUCHOS_TEST_FOR_EXCEPTION( ! is_null (*it), std::runtime_error,
599  Teuchos::typeName(*this) << "::doWaits(): Communication requests "
600  "should all be null aftr calling Teuchos::waitAll() on them, but "
601  "at least one request is not null.");
602  }
603 #endif // HAVE_TEUCHOS_DEBUG
604  // Restore the invariant that requests_.size() is the number of
605  // outstanding nonblocking communication requests.
606  requests_.resize (0);
607  }
608 
609 #ifdef HAVE_TEUCHOS_DEBUG
610  {
611  const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
612  int globalSizeNonzero = 0;
613  Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
614  localSizeNonzero,
615  Teuchos::outArg (globalSizeNonzero));
616  TEUCHOS_TEST_FOR_EXCEPTION(
617  globalSizeNonzero != 0, std::runtime_error,
618  "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
619  "a nonzero number of outstanding posts. There should be none at this "
620  "point. Please report this bug to the Tpetra developers.");
621  }
622 #endif // HAVE_TEUCHOS_DEBUG
623 
624  if (debug_) {
625  std::ostringstream os;
626  os << myRank << ": doWaits done" << endl;
627  *out_ << os.str ();
628  }
629  }
630 
632  // call doWaits() on the reverse Distributor, if it exists
633  if (! reverseDistributor_.is_null()) {
634  reverseDistributor_->doWaits();
635  }
636  }
637 
638  std::string Distributor::description () const {
639  std::ostringstream out;
640 
641  out << "\"Tpetra::Distributor\": {";
642  const std::string label = this->getObjectLabel ();
643  if (label != "") {
644  out << "Label: " << label << ", ";
645  }
646  out << "How initialized: "
648  << ", Parameters: {"
649  << "Send type: "
650  << DistributorSendTypeEnumToString (sendType_)
651  << ", Barrier between receives and sends: "
652  << (barrierBetween_ ? "true" : "false")
653  << ", Use distinct tags: "
654  << (useDistinctTags_ ? "true" : "false")
655  << ", Debug: " << (debug_ ? "true" : "false")
656  << ", Enable MPI CUDA RDMA support: "
657  << (enable_cuda_rdma_ ? "true" : "false")
658  << "}}";
659  return out.str ();
660  }
661 
662  void
663  Distributor::describe (Teuchos::FancyOStream &out,
664  const Teuchos::EVerbosityLevel verbLevel) const
665  {
666  using std::endl;
667  using std::setw;
668  using Teuchos::VERB_DEFAULT;
669  using Teuchos::VERB_NONE;
670  using Teuchos::VERB_LOW;
671  using Teuchos::VERB_MEDIUM;
672  using Teuchos::VERB_HIGH;
673  using Teuchos::VERB_EXTREME;
674  Teuchos::EVerbosityLevel vl = verbLevel;
675  if (vl == VERB_DEFAULT) vl = VERB_LOW;
676  const int myImageID = comm_->getRank();
677  const int numImages = comm_->getSize();
678  Teuchos::OSTab tab (out);
679 
680  if (vl == VERB_NONE) {
681  return;
682  } else {
683  if (myImageID == 0) {
684  // VERB_LOW and higher prints description() (on Proc 0 only).
685  // We quote the class name because it contains colons:
686  // quoting makes the output valid YAML.
687  out << "\"Tpetra::Distributor\":" << endl;
688  Teuchos::OSTab tab2 (out);
689  const std::string label = this->getObjectLabel ();
690  if (label != "") {
691  out << "Label: " << label << endl;
692  }
693  out << "How initialized: "
695  << endl << "Parameters: " << endl;
696  {
697  Teuchos::OSTab tab3 (out);
698  out << "\"Send type\": "
699  << DistributorSendTypeEnumToString (sendType_) << endl
700  << "\"Barrier between receives and sends\": "
701  << (barrierBetween_ ? "true" : "false") << endl;
702  out << "\"Use distinct tags\": "
703  << (useDistinctTags_ ? "true" : "false") << endl;
704  out << "\"Debug\": " << (debug_ ? "true" : "false") << endl;
705  out << "\"Enable MPI CUDA RDMA support\": " <<
706  (enable_cuda_rdma_ ? "true" : "false") << endl;
707  }
708  }
709  if (vl == VERB_LOW) {
710  return;
711  } else {
712  Teuchos::OSTab tab2 (out);
713  // vl > VERB_LOW lets each image print its data. We assume
714  // that all images can print to the given output stream, and
715  // execute barriers to make it more likely that the output
716  // will be in the right order.
717  for (int imageCtr = 0; imageCtr < numImages; ++imageCtr) {
718  if (myImageID == imageCtr) {
719  if (myImageID == 0) {
720  out << "Number of processes: " << numImages << endl;
721  }
722  out << "Process: " << myImageID << endl;
723  Teuchos::OSTab tab3 (out);
724  out << "selfMessage: " << hasSelfMessage () << endl;
725  out << "numSends: " << getNumSends () << endl;
726  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
727  out << "imagesTo: " << toString (imagesTo_) << endl;
728  out << "lengthsTo: " << toString (lengthsTo_) << endl;
729  out << "maxSendLength: " << getMaxSendLength () << endl;
730  }
731  if (vl == VERB_EXTREME) {
732  out << "startsTo: " << toString (startsTo_) << endl;
733  out << "indicesTo: " << toString (indicesTo_) << endl;
734  }
735  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
736  out << "numReceives: " << getNumReceives () << endl;
737  out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
738  out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
739  out << "startsFrom: " << toString (startsFrom_) << endl;
740  out << "imagesFrom: " << toString (imagesFrom_) << endl;
741  }
742  // Last output is a flush; it leaves a space and also
743  // helps synchronize output.
744  out << std::flush;
745  } // if it's my image's turn to print
746  // Execute barriers to give output time to synchronize.
747  // One barrier generally isn't enough.
748  comm_->barrier();
749  comm_->barrier();
750  comm_->barrier();
751  } // for each image
752  }
753  }
754  }
755 
756  void
757  Distributor::computeReceives ()
758  {
759  using Teuchos::Array;
760  using Teuchos::as;
761  using Teuchos::CommStatus;
762  using Teuchos::CommRequest;
763  using Teuchos::ireceive;
764  using Teuchos::RCP;
765  using Teuchos::rcp;
766  using Teuchos::REDUCE_SUM;
767  using Teuchos::receive;
768  using Teuchos::reduce;
769  using Teuchos::scatter;
770  using Teuchos::send;
771  using Teuchos::waitAll;
772  using std::endl;
773 
774  Teuchos::OSTab tab (out_);
775  const int myRank = comm_->getRank();
776  const int numProcs = comm_->getSize();
777 
778  // MPI tag for nonblocking receives and blocking sends in this method.
779  const int pathTag = 2;
780  const int tag = this->getTag (pathTag);
781 
782  if (debug_) {
783  std::ostringstream os;
784  os << myRank << ": computeReceives: "
785  "{selfMessage_: " << (selfMessage_ ? "true" : "false")
786  << ", tag: " << tag << "}" << endl;
787  *out_ << os.str ();
788  }
789 
790  // toNodesFromMe[i] == the number of messages sent by this process
791  // to process i. The data in numSends_, imagesTo_, and lengthsTo_
792  // concern the contiguous sends. Therefore, each process will be
793  // listed in imagesTo_ at most once, and so toNodesFromMe[i] will
794  // either be 0 or 1.
795  {
796  Array<int> toNodesFromMe (numProcs, 0);
797 #ifdef HAVE_TEUCHOS_DEBUG
798  bool counting_error = false;
799 #endif // HAVE_TEUCHOS_DEBUG
800  for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
801 #ifdef HAVE_TEUCHOS_DEBUG
802  if (toNodesFromMe[imagesTo_[i]] != 0) {
803  counting_error = true;
804  }
805 #endif // HAVE_TEUCHOS_DEBUG
806  toNodesFromMe[imagesTo_[i]] = 1;
807  }
808 #ifdef HAVE_TEUCHOS_DEBUG
809  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
810  "Tpetra::Distributor::computeReceives: There was an error on at least "
811  "one process in counting the number of messages send by that process to "
812  "the other processs. Please report this bug to the Tpetra developers.",
813  *comm_);
814 #endif // HAVE_TEUCHOS_DEBUG
815 
816  if (debug_) {
817  std::ostringstream os;
818  os << myRank << ": computeReceives: Calling reduce and scatter" << endl;
819  *out_ << os.str ();
820  }
821 
822  // Compute the number of receives that this process needs to
823  // post. The number of receives includes any self sends (i.e.,
824  // messages sent by this process to itself).
825  //
826  // (We will use numReceives_ this below to post exactly that
827  // number of receives, with MPI_ANY_SOURCE as the sending rank.
828  // This will tell us from which processes this process expects
829  // to receive, and how many packets of data we expect to receive
830  // from each process.)
831  //
832  // toNodesFromMe[i] is the number of messages sent by this
833  // process to process i. Compute the sum (elementwise) of all
834  // the toNodesFromMe arrays on all processes in the
835  // communicator. If the array x is that sum, then if this
836  // process has rank j, x[j] is the number of messages sent
837  // to process j, that is, the number of receives on process j
838  // (including any messages sent by process j to itself).
839  //
840  // Yes, this requires storing and operating on an array of
841  // length P, where P is the number of processes in the
842  // communicator. Epetra does this too. Avoiding this O(P)
843  // memory bottleneck would require some research.
844  //
845  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
846  // implement this O(P) memory algorithm.
847  //
848  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
849  // process (0) from toNodesFromMe, to numRecvsOnEachProc.
850  // Then, scatter the latter, so that each process p gets
851  // numRecvsOnEachProc[p].
852  //
853  // 2. Like #1, but use MPI_Reduce_scatter instead of
854  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
855  // optimized to reduce the number of messages, but
856  // MPI_Reduce_scatter is more general than we need (it
857  // allows the equivalent of MPI_Scatterv). See Bug 6336.
858  //
859  // 3. Do an all-reduce on toNodesFromMe, and let my process
860  // (with rank myRank) get numReceives_ from
861  // toNodesFromMe[myRank]. The HPCCG miniapp uses the
862  // all-reduce method.
863  //
864  // Approaches 1 and 3 have the same critical path length.
865  // However, #3 moves more data. This is because the final
866  // result is just one integer, but #3 moves a whole array of
867  // results to all the processes. This is why we use Approach 1
868  // here.
869  //
870  // mfh 12 Apr 2013: See discussion in createFromSends() about
871  // how we could use this communication to propagate an error
872  // flag for "free" in a release build.
873 
874  const int root = 0; // rank of root process of the reduction
875  Array<int> numRecvsOnEachProc; // temp; only needed on root
876  if (myRank == root) {
877  numRecvsOnEachProc.resize (numProcs);
878  }
879  int numReceivesAsInt = 0; // output
880  reduce<int, int> (toNodesFromMe.getRawPtr (),
881  numRecvsOnEachProc.getRawPtr (),
882  numProcs, REDUCE_SUM, root, *comm_);
883  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
884  &numReceivesAsInt, 1, root, *comm_);
885  numReceives_ = static_cast<size_t> (numReceivesAsInt);
886  }
887 
888  // Now we know numReceives_, which is this process' number of
889  // receives. Allocate the lengthsFrom_ and imagesFrom_ arrays
890  // with this number of entries.
891  lengthsFrom_.assign (numReceives_, 0);
892  imagesFrom_.assign (numReceives_, 0);
893 
894  //
895  // Ask (via nonblocking receive) each process from which we are
896  // receiving how many packets we should expect from it in the
897  // communication pattern.
898  //
899 
900  // At this point, numReceives_ includes any self message that
901  // there may be. At the end of this routine, we'll subtract off
902  // the self message (if there is one) from numReceives_. In this
903  // routine, we don't need to receive a message from ourselves in
904  // order to figure out our lengthsFrom_ and source process ID; we
905  // can just ask ourselves directly. Thus, the actual number of
906  // nonblocking receives we post here does not include the self
907  // message.
908  const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
909 
910  // Teuchos' wrapper for nonblocking receives requires receive
911  // buffers that it knows won't go away. This is why we use RCPs,
912  // one RCP per nonblocking receive request. They get allocated in
913  // the loop below.
914  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
915  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
916  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
917 
918  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
919  // (receive data from any process).
920 #ifdef HAVE_MPI
921  const int anySourceProc = MPI_ANY_SOURCE;
922 #else
923  const int anySourceProc = -1;
924 #endif
925 
926  if (debug_) {
927  std::ostringstream os;
928  os << myRank << ": computeReceives: Posting "
929  << actualNumReceives << " irecvs" << endl;
930  *out_ << os.str ();
931  }
932 
933  // Post the (nonblocking) receives.
934  for (size_t i = 0; i < actualNumReceives; ++i) {
935  // Once the receive completes, we can ask the corresponding
936  // CommStatus object (output by wait()) for the sending process'
937  // ID (which we'll assign to imagesFrom_[i] -- don't forget to
938  // do that!).
939  lengthsFromBuffers[i].resize (1);
940  lengthsFromBuffers[i][0] = as<size_t> (0);
941  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc, tag, *comm_);
942  if (debug_) {
943  std::ostringstream os;
944  os << myRank << ": computeReceives: "
945  "Posted any-proc irecv w/ specified tag " << tag << endl;
946  *out_ << os.str ();
947  }
948  }
949 
950  if (debug_) {
951  std::ostringstream os;
952  os << myRank << ": computeReceives: "
953  "posting " << numSends_ << " sends" << endl;
954  *out_ << os.str ();
955  }
956  // Post the sends: Tell each process to which we are sending how
957  // many packets it should expect from us in the communication
958  // pattern. We could use nonblocking sends here, as long as we do
959  // a waitAll() on all the sends and receives at once.
960  //
961  // We assume that numSends_ and selfMessage_ have already been
962  // set. The value of numSends_ (my process' number of sends) does
963  // not include any message that it might send to itself.
964  for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
965  if (imagesTo_[i] != myRank) {
966  // Send a message to imagesTo_[i], telling that process that
967  // this communication pattern will send that process
968  // lengthsTo_[i] blocks of packets.
969  const size_t* const lengthsTo_i = &lengthsTo_[i];
970  send<int, size_t> (lengthsTo_i, 1, as<int> (imagesTo_[i]), tag, *comm_);
971  if (debug_) {
972  std::ostringstream os;
973  os << myRank << ": computeReceives: "
974  "Posted send to Proc " << imagesTo_[i] << " w/ specified tag "
975  << tag << endl;
976  *out_ << os.str ();
977  }
978  }
979  else {
980  // We don't need a send in the self-message case. If this
981  // process will send a message to itself in the communication
982  // pattern, then the last element of lengthsFrom_ and
983  // imagesFrom_ corresponds to the self-message. Of course
984  // this process knows how long the message is, and the process
985  // ID is its own process ID.
986  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
987  imagesFrom_[numReceives_-1] = myRank;
988  }
989  }
990 
991  if (debug_) {
992  std::ostringstream os;
993  os << myRank << ": computeReceives: waitAll on "
994  << requests.size () << " requests" << endl;
995  *out_ << os.str ();
996  }
997  //
998  // Wait on all the receives. When they arrive, check the status
999  // output of wait() for the receiving process ID, unpack the
1000  // request buffers into lengthsFrom_, and set imagesFrom_ from the
1001  // status.
1002  //
1003  waitAll (*comm_, requests (), statuses ());
1004  for (size_t i = 0; i < actualNumReceives; ++i) {
1005  lengthsFrom_[i] = *lengthsFromBuffers[i];
1006  imagesFrom_[i] = statuses[i]->getSourceRank ();
1007  }
1008 
1009  // Sort the imagesFrom_ array, and apply the same permutation to
1010  // lengthsFrom_. This ensures that imagesFrom_[i] and
1011  // lengthsFrom_[i] refers to the same thing.
1012  sort2 (imagesFrom_.begin(), imagesFrom_.end(), lengthsFrom_.begin());
1013 
1014  // Compute indicesFrom_
1015  totalReceiveLength_ = std::accumulate (lengthsFrom_.begin(), lengthsFrom_.end(), 0);
1016  indicesFrom_.clear ();
1017  indicesFrom_.reserve (totalReceiveLength_);
1018  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1019  indicesFrom_.push_back(i);
1020  }
1021 
1022  startsFrom_.clear ();
1023  startsFrom_.reserve (numReceives_);
1024  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
1025  startsFrom_.push_back(j);
1026  j += lengthsFrom_[i];
1027  }
1028 
1029  if (selfMessage_) {
1030  --numReceives_;
1031  }
1032 
1033  if (debug_) {
1034  std::ostringstream os;
1035  os << myRank << ": computeReceives: done" << endl;
1036  *out_ << os.str ();
1037  }
1038  }
1039 
1040  size_t
1041  Distributor::createFromSends (const Teuchos::ArrayView<const int> &exportNodeIDs)
1042  {
1043  using Teuchos::outArg;
1044  using Teuchos::REDUCE_MAX;
1045  using Teuchos::reduceAll;
1046  using std::endl;
1047 
1048  Teuchos::OSTab tab (out_);
1049 
1050  numExports_ = exportNodeIDs.size();
1051 
1052  const int myImageID = comm_->getRank();
1053  const int numImages = comm_->getSize();
1054  if (debug_) {
1055  std::ostringstream os;
1056  os << myImageID << ": createFromSends" << endl;
1057  *out_ << os.str ();
1058  }
1059 
1060  // exportNodeIDs tells us the communication pattern for this
1061  // distributor. It dictates the way that the export data will be
1062  // interpreted in doPosts(). We want to perform at most one
1063  // send per process in doPosts; this is for two reasons:
1064  // * minimize latency / overhead in the comm routines (nice)
1065  // * match the number of receives and sends between processes
1066  // (necessary)
1067  //
1068  // Teuchos::Comm requires that the data for a send are contiguous
1069  // in a send buffer. Therefore, if the data in the send buffer
1070  // for doPosts() are not contiguous, they will need to be copied
1071  // into a contiguous buffer. The user has specified this
1072  // noncontiguous pattern and we can't do anything about it.
1073  // However, if they do not provide an efficient pattern, we will
1074  // warn them if one of the following compile-time options has been
1075  // set:
1076  // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
1077  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
1078  //
1079  // If the data are contiguous, then we can post the sends in situ
1080  // (i.e., without needing to copy them into a send buffer).
1081  //
1082  // Determine contiguity. There are a number of ways to do this:
1083  // * If the export IDs are sorted, then all exports to a
1084  // particular node must be contiguous. This is what Epetra does.
1085  // * If the export ID of the current export already has been
1086  // listed, then the previous listing should correspond to the
1087  // same export. This tests contiguity, but not sortedness.
1088  //
1089  // Both of these tests require O(n), where n is the number of
1090  // exports. However, the latter will positively identify a greater
1091  // portion of contiguous patterns. We use the latter method.
1092  //
1093  // Check to see if values are grouped by images without gaps
1094  // If so, indices_to -> 0.
1095 
1096  // Set up data structures for quick traversal of arrays.
1097  // This contains the number of sends for each process ID.
1098  //
1099  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
1100  // that create an array of length the number of processes in the
1101  // communicator (plus one). Given how this code uses this array,
1102  // it should be straightforward to replace it with a hash table or
1103  // some other more space-efficient data structure. In practice,
1104  // most of the entries of starts should be zero for a sufficiently
1105  // large process count, unless the communication pattern is dense.
1106  // Note that it's important to be able to iterate through keys (i
1107  // for which starts[i] is nonzero) in increasing order.
1108  Teuchos::Array<size_t> starts (numImages + 1, 0);
1109 
1110  // numActive is the number of sends that are not Null
1111  size_t numActive = 0;
1112  int needSendBuff = 0; // Boolean
1113 
1114 #ifdef HAVE_TPETRA_DEBUG
1115  int badID = -1; // only used in a debug build
1116 #endif // HAVE_TPETRA_DEBUG
1117  for (size_t i = 0; i < numExports_; ++i) {
1118  const int exportID = exportNodeIDs[i];
1119  if (exportID >= numImages) {
1120 #ifdef HAVE_TPETRA_DEBUG
1121  badID = myImageID;
1122 #endif // HAVE_TPETRA_DEBUG
1123  break;
1124  }
1125  else if (exportID >= 0) {
1126  // exportID is a valid process ID. Increment the number of
1127  // messages this process will send to that process.
1128  ++starts[exportID];
1129 
1130  // If we're sending more than one message to process exportID,
1131  // then it is possible that the data are not contiguous.
1132  // Check by seeing if the previous process ID in the list
1133  // (exportNodeIDs[i-1]) is the same. It's safe to use i-1,
1134  // because if starts[exportID] > 1, then i must be > 1 (since
1135  // the starts array was filled with zeros initially).
1136 
1137  // null entries break continuity.
1138  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
1139  if (needSendBuff==0 && starts[exportID] > 1 && exportID != exportNodeIDs[i-1]) {
1140  needSendBuff = 1;
1141  }
1142  ++numActive;
1143  }
1144  }
1145 
1146 #ifdef HAVE_TPETRA_DEBUG
1147  // Test whether any process in the communicator got an invalid
1148  // process ID. If badID != -1 on this process, then it equals
1149  // this process' rank. The max of all badID over all processes is
1150  // the max rank which has an invalid process ID.
1151  {
1152  int gbl_badID;
1153  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1154  TEUCHOS_TEST_FOR_EXCEPTION(gbl_badID >= 0, std::runtime_error,
1155  Teuchos::typeName(*this) << "::createFromSends(): Process " << gbl_badID
1156  << ", perhaps among other processes, got a bad send process ID.");
1157  }
1158 #else
1159  // FIXME (mfh 12 Apr 2013, 15 Jul 2015) Rather than simply
1160  // ignoring this information, we should think about how to pass it
1161  // along so that all the processes find out about it. In a
1162  // release build with efficiency warnings turned off, the next
1163  // collective communication happens in computeReceives(). We
1164  // could figure out how to encode the error flag in that
1165  // operation, for example by adding an extra entry to the
1166  // collective's output array that encodes the error condition (0
1167  // on all processes if no error, else 1 on any process with the
1168  // error, so that the sum will produce a nonzero value if any
1169  // process had an error). I'll defer this change for now and
1170  // recommend instead that people with troubles try a debug build.
1171 #endif // HAVE_TPETRA_DEBUG
1172 
1173 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1174  {
1175  int global_needSendBuff;
1176  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1177  outArg (global_needSendBuff));
1179  global_needSendBuff != 0, std::runtime_error,
1180  "::createFromSends: Grouping export IDs together by process rank often "
1181  "improves performance.");
1182  }
1183 #endif
1184 
1185  // Determine from the caller's data whether or not the current
1186  // process should send (a) message(s) to itself.
1187  if (starts[myImageID] != 0) {
1188  selfMessage_ = true;
1189  }
1190  else {
1191  selfMessage_ = false;
1192  }
1193 
1194 #ifdef HAVE_TEUCHOS_DEBUG
1195  bool index_neq_numActive = false;
1196  bool send_neq_numSends = false;
1197 #endif
1198  if (! needSendBuff) {
1199  // grouped by image, no send buffer or indicesTo_ needed
1200  numSends_ = 0;
1201  // Count total number of sends, i.e., total number of images to
1202  // which we are sending. This includes myself, if applicable.
1203  for (int i = 0; i < numImages; ++i) {
1204  if (starts[i]) {
1205  ++numSends_;
1206  }
1207  }
1208 
1209  // Not only do we not need these, but we must clear them, as
1210  // empty status of indicesTo is a flag used later.
1211  indicesTo_.resize(0);
1212  // Size these to numSends_; note, at the moment, numSends_
1213  // includes self sends. Set their values to zeros.
1214  imagesTo_.assign(numSends_,0);
1215  startsTo_.assign(numSends_,0);
1216  lengthsTo_.assign(numSends_,0);
1217 
1218  // set startsTo to the offset for each send (i.e., each image ID)
1219  // set imagesTo to the image ID for each send
1220  // in interpreting this code, remember that we are assuming contiguity
1221  // that is why index skips through the ranks
1222  {
1223  size_t index = 0, nodeIndex = 0;
1224  for (size_t i = 0; i < numSends_; ++i) {
1225  while (exportNodeIDs[nodeIndex] < 0) {
1226  ++nodeIndex; // skip all negative node IDs
1227  }
1228  startsTo_[i] = nodeIndex;
1229  int imageID = exportNodeIDs[nodeIndex];
1230  imagesTo_[i] = imageID;
1231  index += starts[imageID];
1232  nodeIndex += starts[imageID];
1233  }
1234 #ifdef HAVE_TEUCHOS_DEBUG
1235  if (index != numActive) {
1236  index_neq_numActive = true;
1237  }
1238 #endif
1239  }
1240  // sort the startsTo and image IDs together, in ascending order, according
1241  // to image IDs
1242  if (numSends_ > 0) {
1243  sort2(imagesTo_.begin(), imagesTo_.end(), startsTo_.begin());
1244  }
1245  // compute the maximum send length
1246  maxSendLength_ = 0;
1247  for (size_t i = 0; i < numSends_; ++i) {
1248  int imageID = imagesTo_[i];
1249  lengthsTo_[i] = starts[imageID];
1250  if ((imageID != myImageID) && (lengthsTo_[i] > maxSendLength_)) {
1251  maxSendLength_ = lengthsTo_[i];
1252  }
1253  }
1254  }
1255  else {
1256  // not grouped by image, need send buffer and indicesTo_
1257 
1258  // starts[i] is the number of sends to node i
1259  // numActive equals number of sends total, \sum_i starts[i]
1260 
1261  // this loop starts at starts[1], so explicitly check starts[0]
1262  if (starts[0] == 0 ) {
1263  numSends_ = 0;
1264  }
1265  else {
1266  numSends_ = 1;
1267  }
1268  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1269  im1=starts.begin();
1270  i != starts.end(); ++i)
1271  {
1272  if (*i != 0) ++numSends_;
1273  *i += *im1;
1274  im1 = i;
1275  }
1276  // starts[i] now contains the number of exports to nodes 0 through i
1277 
1278  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1279  i=starts.rbegin()+1;
1280  i != starts.rend(); ++i)
1281  {
1282  *ip1 = *i;
1283  ip1 = i;
1284  }
1285  starts[0] = 0;
1286  // starts[i] now contains the number of exports to nodes 0 through
1287  // i-1, i.e., all nodes before node i
1288 
1289  indicesTo_.resize(numActive);
1290 
1291  for (size_t i = 0; i < numExports_; ++i) {
1292  if (exportNodeIDs[i] >= 0) {
1293  // record the offset to the sendBuffer for this export
1294  indicesTo_[starts[exportNodeIDs[i]]] = i;
1295  // now increment the offset for this node
1296  ++starts[exportNodeIDs[i]];
1297  }
1298  }
1299  // our send buffer will contain the export data for each of the nodes
1300  // we communicate with, in order by node id
1301  // sendBuffer = {node_0_data, node_1_data, ..., node_np-1_data}
1302  // indicesTo now maps each export to the location in our send buffer
1303  // associated with the export
1304  // data for export i located at sendBuffer[indicesTo[i]]
1305  //
1306  // starts[i] once again contains the number of exports to
1307  // nodes 0 through i
1308  for (int node = numImages-1; node != 0; --node) {
1309  starts[node] = starts[node-1];
1310  }
1311  starts.front() = 0;
1312  starts[numImages] = numActive;
1313  //
1314  // starts[node] once again contains the number of exports to
1315  // nodes 0 through node-1
1316  // i.e., the start of my data in the sendBuffer
1317 
1318  // this contains invalid data at nodes we don't care about, that is okay
1319  imagesTo_.resize(numSends_);
1320  startsTo_.resize(numSends_);
1321  lengthsTo_.resize(numSends_);
1322 
1323  // for each group of sends/exports, record the destination node,
1324  // the length, and the offset for this send into the
1325  // send buffer (startsTo_)
1326  maxSendLength_ = 0;
1327  size_t snd = 0;
1328  for (int node = 0; node < numImages; ++node ) {
1329  if (starts[node+1] != starts[node]) {
1330  lengthsTo_[snd] = starts[node+1] - starts[node];
1331  startsTo_[snd] = starts[node];
1332  // record max length for all off-node sends
1333  if ((node != myImageID) && (lengthsTo_[snd] > maxSendLength_)) {
1334  maxSendLength_ = lengthsTo_[snd];
1335  }
1336  imagesTo_[snd] = node;
1337  ++snd;
1338  }
1339  }
1340 #ifdef HAVE_TEUCHOS_DEBUG
1341  if (snd != numSends_) {
1342  send_neq_numSends = true;
1343  }
1344 #endif
1345  }
1346 #ifdef HAVE_TEUCHOS_DEBUG
1347  SHARED_TEST_FOR_EXCEPTION(index_neq_numActive, std::logic_error,
1348  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1349  SHARED_TEST_FOR_EXCEPTION(send_neq_numSends, std::logic_error,
1350  "Tpetra::Distributor::createFromSends: logic error. Please notify the Tpetra team.",*comm_);
1351 #endif
1352 
1353  if (selfMessage_) --numSends_;
1354 
1355  // Invert map to see what msgs are received and what length
1356  computeReceives();
1357 
1358  if (debug_) {
1359  std::ostringstream os;
1360  os << myImageID << ": createFromSends: done" << endl;
1361  *out_ << os.str ();
1362  }
1363 
1364  // createFromRecvs() calls createFromSends(), but will set
1365  // howInitialized_ again after calling createFromSends().
1366  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1367 
1368  return totalReceiveLength_;
1369  }
1370 
1371 } // namespace Tpetra
Namespace Tpetra contains the class and methods constituting the Tpetra library.
size_t getNumReceives() const
The number of processes from which we will receive data.
std::string description() const
A simple one-line description of this object.
ArrayView< const int > getImagesTo() const
Ranks of the processes to which this process will send values.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
Implementation details of Tpetra.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
Sets up and executes a communication plan for a Tpetra DistObject.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
size_t createFromSends(const ArrayView< const int > &exportNodeIDs)
Set up Distributor using list of process ranks to which this process will send.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
virtual ~Distributor()
Destructor (virtual for memory safety).
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
ArrayView< const int > getImagesFrom() const
Ranks of the processes sending values to this process.
size_t getNumSends() const
The number of processes to which we will send data.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Print the object with some verbosity level to an FancyOStream.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
Array< std::string > distributorSendTypes()
Valid values for Distributor&#39;s "Send type" parameter.
RCP< Distributor > getReverse() const
A reverse communication plan Distributor.
EDistributorSendType
The type of MPI send that Distributor should use.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.