42 #ifndef TPETRA_DISTRIBUTOR_HPP 43 #define TPETRA_DISTRIBUTOR_HPP 46 #include <Teuchos_as.hpp> 47 #include <Teuchos_Describable.hpp> 48 #include <Teuchos_ParameterListAcceptorDefaultBase.hpp> 49 #include <Teuchos_VerboseObject.hpp> 60 #ifdef TPETRA_DISTRIBUTOR_TIMERS 61 # undef TPETRA_DISTRIBUTOR_TIMERS 62 #endif // TPETRA_DISTRIBUTOR_TIMERS 64 #include "KokkosCompat_View.hpp" 65 #include "Kokkos_Core.hpp" 66 #include "Kokkos_TeuchosCommAdapters.hpp" 95 DISTRIBUTOR_NOT_INITIALIZED,
96 DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS,
97 DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS,
98 DISTRIBUTOR_INITIALIZED_BY_REVERSE,
99 DISTRIBUTOR_INITIALIZED_BY_COPY
187 public Teuchos::Describable,
188 public Teuchos::ParameterListAcceptorDefaultBase {
201 explicit Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm);
214 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
215 const Teuchos::RCP<Teuchos::FancyOStream>& out);
230 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
231 const Teuchos::RCP<Teuchos::ParameterList>& plist);
249 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
250 const Teuchos::RCP<Teuchos::FancyOStream>& out,
251 const Teuchos::RCP<Teuchos::ParameterList>& plist);
274 void setParameterList (
const Teuchos::RCP<Teuchos::ParameterList>& plist);
280 Teuchos::RCP<const Teuchos::ParameterList> getValidParameters ()
const;
305 size_t createFromSends (
const ArrayView<const int>& exportNodeIDs);
340 template <
class Ordinal>
342 createFromRecvs (
const ArrayView<const Ordinal>& remoteIDs,
343 const ArrayView<const int>& remoteNodeIDs,
344 Array<Ordinal>& exportIDs,
345 Array<int>& exportNodeIDs);
354 size_t getNumReceives()
const;
359 size_t getNumSends()
const;
362 bool hasSelfMessage()
const;
365 size_t getMaxSendLength()
const;
368 size_t getTotalReceiveLength()
const;
374 ArrayView<const int> getImagesFrom()
const;
380 ArrayView<const int> getImagesTo()
const;
389 ArrayView<const size_t> getLengthsFrom()
const;
398 ArrayView<const size_t> getLengthsTo()
const;
405 return howInitialized_;
422 RCP<Distributor> getReverse()
const;
448 template <
class Packet>
450 doPostsAndWaits (
const ArrayView<const Packet> &exports,
452 const ArrayView<Packet> &imports);
475 template <
class Packet>
477 doPostsAndWaits (
const ArrayView<const Packet> &exports,
478 const ArrayView<size_t> &numExportPacketsPerLID,
479 const ArrayView<Packet> &imports,
480 const ArrayView<size_t> &numImportPacketsPerLID);
506 template <
class Packet>
508 doPosts (
const ArrayRCP<const Packet> &exports,
510 const ArrayRCP<Packet> &imports);
530 template <
class Packet>
532 doPosts (
const ArrayRCP<const Packet> &exports,
533 const ArrayView<size_t> &numExportPacketsPerLID,
534 const ArrayRCP<Packet> &imports,
535 const ArrayView<size_t> &numImportPacketsPerLID);
549 template <
class Packet>
551 doReversePostsAndWaits (
const ArrayView<const Packet> &exports,
553 const ArrayView<Packet> &imports);
559 template <
class Packet>
561 doReversePostsAndWaits (
const ArrayView<const Packet> &exports,
562 const ArrayView<size_t> &numExportPacketsPerLID,
563 const ArrayView<Packet> &imports,
564 const ArrayView<size_t> &numImportPacketsPerLID);
570 template <
class Packet>
572 doReversePosts (
const ArrayRCP<const Packet> &exports,
574 const ArrayRCP<Packet> &imports);
580 template <
class Packet>
582 doReversePosts (
const ArrayRCP<const Packet> &exports,
583 const ArrayView<size_t> &numExportPacketsPerLID,
584 const ArrayRCP<Packet> &imports,
585 const ArrayView<size_t> &numImportPacketsPerLID);
593 void doReverseWaits ();
615 template <
class ExpView,
class ImpView>
616 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
618 const ExpView &exports,
620 const ImpView &imports);
643 template <
class ExpView,
class ImpView>
644 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
645 doPostsAndWaits (
const ExpView &exports,
646 const ArrayView<size_t> &numExportPacketsPerLID,
647 const ImpView &imports,
648 const ArrayView<size_t> &numImportPacketsPerLID);
674 template <
class ExpView,
class ImpView>
675 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
676 doPosts (
const ExpView &exports,
678 const ImpView &imports);
698 template <
class ExpView,
class ImpView>
699 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
700 doPosts (
const ExpView &exports,
701 const ArrayView<size_t> &numExportPacketsPerLID,
702 const ImpView &imports,
703 const ArrayView<size_t> &numImportPacketsPerLID);
709 template <
class ExpView,
class ImpView>
710 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
711 doReversePostsAndWaits (
const ExpView &exports,
713 const ImpView &imports);
719 template <
class ExpView,
class ImpView>
720 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
721 doReversePostsAndWaits (
const ExpView &exports,
722 const ArrayView<size_t> &numExportPacketsPerLID,
723 const ImpView &imports,
724 const ArrayView<size_t> &numImportPacketsPerLID);
730 template <
class ExpView,
class ImpView>
731 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
732 doReversePosts (
const ExpView &exports,
734 const ImpView &imports);
740 template <
class ExpView,
class ImpView>
741 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
742 doReversePosts (
const ExpView &exports,
743 const ArrayView<size_t> &numExportPacketsPerLID,
744 const ImpView &imports,
745 const ArrayView<size_t> &numImportPacketsPerLID);
751 bytes_sent = lastRoundBytesSend_;
752 bytes_recvd = lastRoundBytesRecv_;
761 std::string description()
const;
764 void describe (Teuchos::FancyOStream &out,
const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default)
const;
770 RCP<const Comm<int> > comm_;
773 Teuchos::RCP<Teuchos::FancyOStream> out_;
785 bool barrierBetween_;
800 bool enable_cuda_rdma_;
834 Array<int> imagesTo_;
844 Array<size_t> startsTo_;
851 Array<size_t> lengthsTo_;
856 size_t maxSendLength_;
873 Array<size_t> indicesTo_;
892 size_t totalReceiveLength_;
899 Array<size_t> lengthsFrom_;
906 Array<int> imagesFrom_;
913 Array<size_t> startsFrom_;
921 Array<size_t> indicesFrom_;
929 Array<RCP<Teuchos::CommRequest<int> > > requests_;
935 mutable RCP<Distributor> reverseDistributor_;
939 size_t lastRoundBytesSend_;
942 size_t lastRoundBytesRecv_;
944 #ifdef TPETRA_DISTRIBUTOR_TIMERS 945 Teuchos::RCP<Teuchos::Time> timer_doPosts3_;
946 Teuchos::RCP<Teuchos::Time> timer_doPosts4_;
947 Teuchos::RCP<Teuchos::Time> timer_doWaits_;
948 Teuchos::RCP<Teuchos::Time> timer_doPosts3_recvs_;
949 Teuchos::RCP<Teuchos::Time> timer_doPosts4_recvs_;
950 Teuchos::RCP<Teuchos::Time> timer_doPosts3_barrier_;
951 Teuchos::RCP<Teuchos::Time> timer_doPosts4_barrier_;
952 Teuchos::RCP<Teuchos::Time> timer_doPosts3_sends_;
953 Teuchos::RCP<Teuchos::Time> timer_doPosts4_sends_;
957 #endif // TPETRA_DISTRIBUTOR_TIMERS 970 bool useDistinctTags_;
976 int getTag (
const int pathTag)
const;
996 init (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
997 const Teuchos::RCP<Teuchos::FancyOStream>& out,
998 const Teuchos::RCP<Teuchos::ParameterList>& plist);
1010 void computeReceives ();
1024 template <
class Ordinal>
1025 void computeSends (
const ArrayView<const Ordinal> &importIDs,
1026 const ArrayView<const int> &importNodeIDs,
1027 Array<Ordinal> &exportIDs,
1028 Array<int> &exportNodeIDs);
1031 void createReverseDistributor()
const;
1036 template <
class Packet>
1040 const ArrayView<Packet>& imports)
1042 using Teuchos::arcp;
1043 using Teuchos::ArrayRCP;
1044 typedef typename ArrayRCP<const Packet>::size_type size_type;
1046 TEUCHOS_TEST_FOR_EXCEPTION(
1047 requests_.size () != 0, std::runtime_error,
"Tpetra::Distributor::" 1048 "doPostsAndWaits(3 args): There are " << requests_.size () <<
1049 " outstanding nonblocking messages pending. It is incorrect to call " 1050 "this method with posts outstanding.");
1062 ArrayRCP<const Packet> exportsArcp (exports.getRawPtr (),
1063 static_cast<size_type
> (0),
1064 exports.size(),
false);
1077 doPosts (exportsArcp,
1079 arcp<Packet> (imports.getRawPtr (), 0, imports.size (),
false));
1082 lastRoundBytesSend_ = exports.size () *
sizeof (Packet);
1083 lastRoundBytesRecv_ = imports.size () *
sizeof (Packet);
1086 template <
class Packet>
1089 const ArrayView<size_t> &numExportPacketsPerLID,
1090 const ArrayView<Packet> &imports,
1091 const ArrayView<size_t> &numImportPacketsPerLID)
1093 using Teuchos::arcp;
1094 using Teuchos::ArrayRCP;
1096 TEUCHOS_TEST_FOR_EXCEPTION(
1097 requests_.size () != 0, std::runtime_error,
"Tpetra::Distributor::" 1098 "doPostsAndWaits: There are " << requests_.size () <<
" outstanding " 1099 "nonblocking messages pending. It is incorrect to call doPostsAndWaits " 1100 "with posts outstanding.");
1113 typedef typename ArrayRCP<const Packet>::size_type size_type;
1114 ArrayRCP<const Packet> exportsArcp (exports.getRawPtr (),
1115 static_cast<size_type
> (0),
1116 exports.size (),
false);
1121 doPosts (exportsArcp,
1122 numExportPacketsPerLID,
1123 arcp<Packet> (imports.getRawPtr (), 0, imports.size (),
false),
1124 numImportPacketsPerLID);
1127 lastRoundBytesSend_ = exports.size () *
sizeof (Packet);
1128 lastRoundBytesRecv_ = imports.size () *
sizeof (Packet);
1132 template <
class Packet>
1136 const ArrayRCP<Packet>& imports)
1138 using Teuchos::Array;
1140 using Teuchos::FancyOStream;
1141 using Teuchos::includesVerbLevel;
1142 using Teuchos::ireceive;
1143 using Teuchos::isend;
1144 using Teuchos::OSTab;
1145 using Teuchos::readySend;
1146 using Teuchos::send;
1147 using Teuchos::ssend;
1148 using Teuchos::TypeNameTraits;
1149 using Teuchos::typeName;
1151 typedef Array<size_t>::size_type size_type;
1153 Teuchos::OSTab tab (out_);
1155 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1156 Teuchos::TimeMonitor timeMon (*timer_doPosts3_);
1157 #endif // TPETRA_DISTRIBUTOR_TIMERS 1162 const bool doBarrier = barrierBetween_;
1190 TEUCHOS_TEST_FOR_EXCEPTION(
1191 sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier, std::logic_error,
1192 "Tpetra::Distributor::doPosts(3 args): Ready-send version requires a " 1193 "barrier between posting receives and posting ready sends. This should " 1194 "have been checked before. " 1195 "Please report this bug to the Tpetra developers.");
1197 const int myImageID = comm_->getRank ();
1198 size_t selfReceiveOffset = 0;
1206 const size_t totalNumImportPackets = totalReceiveLength_ * numPackets;
1207 TEUCHOS_TEST_FOR_EXCEPTION(
1208 static_cast<size_t> (imports.size ()) < totalNumImportPackets,
1209 std::invalid_argument,
"Tpetra::Distributor::doPosts(3 args): The " 1210 "'imports' array must have enough entries to hold the expected number " 1211 "of import packets. imports.size() = " << imports.size () <<
" < " 1212 "totalNumImportPackets = " << totalNumImportPackets <<
".");
1219 const int pathTag = 0;
1220 const int tag = this->getTag (pathTag);
1223 TEUCHOS_TEST_FOR_EXCEPTION(
1224 requests_.size () != 0, std::logic_error,
"Tpetra::Distributor::" 1225 "doPosts(3 args): Process " << myImageID <<
": requests_.size() = " 1226 << requests_.size () <<
" != 0.");
1227 std::ostringstream os;
1228 os << myImageID <<
": doPosts(3," 1229 << (indicesTo_.empty () ?
"fast" :
"slow") <<
")" << endl;
1246 const size_type actualNumReceives = as<size_type> (numReceives_) +
1247 as<size_type> (selfMessage_ ? 1 : 0);
1248 requests_.resize (0);
1256 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1257 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3_recvs_);
1258 #endif // TPETRA_DISTRIBUTOR_TIMERS 1260 size_t curBufOffset = 0;
1261 for (size_type i = 0; i < actualNumReceives; ++i) {
1262 const size_t curBufLen = lengthsFrom_[i] * numPackets;
1263 if (imagesFrom_[i] != myImageID) {
1271 TEUCHOS_TEST_FOR_EXCEPTION(
1272 curBufOffset + curBufLen > static_cast<size_t> (imports.size ()),
1273 std::logic_error,
"Tpetra::Distributor::doPosts(3 args): Exceeded " 1274 "size of 'imports' array in packing loop on Process " << myImageID
1275 <<
". imports.size() = " << imports.size () <<
" < offset + length" 1276 " = " << (curBufOffset + curBufLen) <<
".");
1278 ArrayRCP<Packet> recvBuf =
1279 imports.persistingView (curBufOffset, curBufLen);
1280 requests_.push_back (ireceive<int, Packet> (recvBuf, imagesFrom_[i],
1283 std::ostringstream os;
1284 os << myImageID <<
": doPosts(3," 1285 << (indicesTo_.empty () ?
"fast" :
"slow") <<
"): " 1286 <<
"Posted irecv from Proc " << imagesFrom_[i] <<
" with " 1287 "specified tag " << tag << endl;
1292 selfReceiveOffset = curBufOffset;
1294 curBufOffset += curBufLen;
1299 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1300 Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts3_barrier_);
1301 #endif // TPETRA_DISTRIBUTOR_TIMERS 1310 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1311 Teuchos::TimeMonitor timeMonSends (*timer_doPosts3_sends_);
1312 #endif // TPETRA_DISTRIBUTOR_TIMERS 1319 size_t numBlocks = numSends_ + selfMessage_;
1320 size_t imageIndex = 0;
1321 while ((imageIndex < numBlocks) && (imagesTo_[imageIndex] < myImageID)) {
1324 if (imageIndex == numBlocks) {
1329 size_t selfIndex = 0;
1331 if (indicesTo_.empty()) {
1333 std::ostringstream os;
1334 os << myImageID <<
": doPosts(3,fast): posting sends" << endl;
1340 for (
size_t i = 0; i < numBlocks; ++i) {
1341 size_t p = i + imageIndex;
1342 if (p > (numBlocks - 1)) {
1346 if (imagesTo_[p] != myImageID) {
1347 ArrayView<const Packet> tmpSend =
1348 exports.view (startsTo_[p]*numPackets, lengthsTo_[p]*numPackets);
1350 if (sendType == Details::DISTRIBUTOR_SEND) {
1351 send<int, Packet> (tmpSend.getRawPtr (),
1352 as<int> (tmpSend.size ()),
1353 imagesTo_[p], tag, *comm_);
1355 else if (sendType == Details::DISTRIBUTOR_ISEND) {
1356 ArrayRCP<const Packet> tmpSendBuf =
1357 exports.persistingView (startsTo_[p] * numPackets,
1358 lengthsTo_[p] * numPackets);
1359 requests_.push_back (isend<int, Packet> (tmpSendBuf, imagesTo_[p],
1362 else if (sendType == Details::DISTRIBUTOR_RSEND) {
1363 readySend<int, Packet> (tmpSend.getRawPtr (),
1364 as<int> (tmpSend.size ()),
1365 imagesTo_[p], tag, *comm_);
1367 else if (sendType == Details::DISTRIBUTOR_SSEND) {
1368 ssend<int, Packet> (tmpSend.getRawPtr (),
1369 as<int> (tmpSend.size ()),
1370 imagesTo_[p], tag, *comm_);
1372 TEUCHOS_TEST_FOR_EXCEPTION(
1373 true, std::logic_error,
"Tpetra::Distributor::doPosts(3 args): " 1374 "Invalid send type. We should never get here. " 1375 "Please report this bug to the Tpetra developers.");
1379 std::ostringstream os;
1380 os << myImageID <<
": doPosts(3,fast): " 1381 <<
"Posted send to Proc " << imagesTo_[i]
1382 <<
" w/ specified tag " << tag << endl;
1399 std::copy (exports.begin()+startsTo_[selfNum]*numPackets,
1400 exports.begin()+startsTo_[selfNum]*numPackets+lengthsTo_[selfNum]*numPackets,
1401 imports.begin()+selfReceiveOffset);
1404 std::ostringstream os;
1405 os << myImageID <<
": doPosts(3,fast) done" << endl;
1411 std::ostringstream os;
1412 os << myImageID <<
": doPosts(3,slow): posting sends" << endl;
1418 ArrayRCP<Packet> sendArray (maxSendLength_ * numPackets);
1420 TEUCHOS_TEST_FOR_EXCEPTION(
1421 sendType == Details::DISTRIBUTOR_ISEND, std::logic_error,
1422 "Tpetra::Distributor::doPosts(3 args): The \"send buffer\" code path " 1423 <<
"doesn't currently work with nonblocking sends.");
1425 for (
size_t i = 0; i < numBlocks; ++i) {
1426 size_t p = i + imageIndex;
1427 if (p > (numBlocks - 1)) {
1431 if (imagesTo_[p] != myImageID) {
1432 typename ArrayView<const Packet>::iterator srcBegin, srcEnd;
1433 size_t sendArrayOffset = 0;
1434 size_t j = startsTo_[p];
1435 for (
size_t k = 0; k < lengthsTo_[p]; ++k, ++j) {
1436 srcBegin = exports.begin() + indicesTo_[j]*numPackets;
1437 srcEnd = srcBegin + numPackets;
1438 std::copy (srcBegin, srcEnd, sendArray.begin()+sendArrayOffset);
1439 sendArrayOffset += numPackets;
1441 ArrayView<const Packet> tmpSend =
1442 sendArray.view (0, lengthsTo_[p]*numPackets);
1444 if (sendType == Details::DISTRIBUTOR_SEND) {
1445 send<int, Packet> (tmpSend.getRawPtr (),
1446 as<int> (tmpSend.size ()),
1447 imagesTo_[p], tag, *comm_);
1449 else if (sendType == Details::DISTRIBUTOR_ISEND) {
1450 ArrayRCP<const Packet> tmpSendBuf =
1451 sendArray.persistingView (0, lengthsTo_[p] * numPackets);
1452 requests_.push_back (isend<int, Packet> (tmpSendBuf, imagesTo_[p],
1455 else if (sendType == Details::DISTRIBUTOR_RSEND) {
1456 readySend<int, Packet> (tmpSend.getRawPtr (),
1457 as<int> (tmpSend.size ()),
1458 imagesTo_[p], tag, *comm_);
1460 else if (sendType == Details::DISTRIBUTOR_SSEND) {
1461 ssend<int, Packet> (tmpSend.getRawPtr (),
1462 as<int> (tmpSend.size ()),
1463 imagesTo_[p], tag, *comm_);
1466 TEUCHOS_TEST_FOR_EXCEPTION(
1467 true, std::logic_error,
"Tpetra::Distributor::doPosts(3 args): " 1468 "Invalid send type. We should never get here. " 1469 "Please report this bug to the Tpetra developers.");
1473 std::ostringstream os;
1474 os << myImageID <<
": doPosts(3,slow): " 1475 <<
"Posted send to Proc " << imagesTo_[i]
1476 <<
" w/ specified tag " << tag << endl;
1482 selfIndex = startsTo_[p];
1487 for (
size_t k = 0; k < lengthsTo_[selfNum]; ++k) {
1488 std::copy (exports.begin()+indicesTo_[selfIndex]*numPackets,
1489 exports.begin()+indicesTo_[selfIndex]*numPackets + numPackets,
1490 imports.begin() + selfReceiveOffset);
1492 selfReceiveOffset += numPackets;
1496 std::ostringstream os;
1497 os << myImageID <<
": doPosts(3,slow) done" << endl;
1503 template <
class Packet>
1506 const ArrayView<size_t>& numExportPacketsPerLID,
1507 const ArrayRCP<Packet>& imports,
1508 const ArrayView<size_t>& numImportPacketsPerLID)
1510 using Teuchos::Array;
1512 using Teuchos::ireceive;
1513 using Teuchos::isend;
1514 using Teuchos::readySend;
1515 using Teuchos::send;
1516 using Teuchos::ssend;
1517 using Teuchos::TypeNameTraits;
1518 #ifdef HAVE_TEUCHOS_DEBUG 1519 using Teuchos::OSTab;
1520 #endif // HAVE_TEUCHOS_DEBUG 1522 typedef Array<size_t>::size_type size_type;
1524 Teuchos::OSTab tab (out_);
1526 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1527 Teuchos::TimeMonitor timeMon (*timer_doPosts4_);
1528 #endif // TPETRA_DISTRIBUTOR_TIMERS 1533 const bool doBarrier = barrierBetween_;
1559 TEUCHOS_TEST_FOR_EXCEPTION(
1560 sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier, std::logic_error,
1561 "Tpetra::Distributor::doPosts(4 args): Ready-send version requires a " 1562 "barrier between posting receives and posting ready sends. This should " 1563 "have been checked before. " 1564 "Please report this bug to the Tpetra developers.");
1566 const int myImageID = comm_->getRank ();
1567 size_t selfReceiveOffset = 0;
1569 #ifdef HAVE_TEUCHOS_DEBUG 1571 size_t totalNumImportPackets = 0;
1572 for (
int ii = 0; ii < numImportPacketsPerLID.size(); ++ii) {
1573 totalNumImportPackets += numImportPacketsPerLID[ii];
1575 TEUCHOS_TEST_FOR_EXCEPTION(
1576 static_cast<size_t> (imports.size ()) < totalNumImportPackets,
1577 std::runtime_error,
"Tpetra::Distributor::doPosts(4 args): The 'imports' " 1578 "array must have enough entries to hold the expected number of import " 1579 "packets. imports.size() = " << imports.size() <<
" < " 1580 "totalNumImportPackets = " << totalNumImportPackets <<
".");
1581 #endif // HAVE_TEUCHOS_DEBUG 1588 const int pathTag = 1;
1589 const int tag = this->getTag (pathTag);
1592 TEUCHOS_TEST_FOR_EXCEPTION(
1593 requests_.size () != 0, std::logic_error,
"Tpetra::Distributor::" 1594 "doPosts(4 args): Process " << myImageID <<
": requests_.size() = " 1595 << requests_.size () <<
" != 0.");
1596 std::ostringstream os;
1597 os << myImageID <<
": doPosts(4," 1598 << (indicesTo_.empty () ?
"fast" :
"slow") <<
")" << endl;
1615 const size_type actualNumReceives = as<size_type> (numReceives_) +
1616 as<size_type> (selfMessage_ ? 1 : 0);
1617 requests_.resize (0);
1625 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1626 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4_recvs_);
1627 #endif // TPETRA_DISTRIBUTOR_TIMERS 1629 size_t curBufferOffset = 0;
1630 size_t curLIDoffset = 0;
1631 for (size_type i = 0; i < actualNumReceives; ++i) {
1632 size_t totalPacketsFrom_i = 0;
1633 for (
size_t j = 0; j < lengthsFrom_[i]; ++j) {
1634 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
1636 curLIDoffset += lengthsFrom_[i];
1637 if (imagesFrom_[i] != myImageID && totalPacketsFrom_i) {
1646 ArrayRCP<Packet> recvBuf =
1647 imports.persistingView (curBufferOffset, totalPacketsFrom_i);
1648 requests_.push_back (ireceive<int, Packet> (recvBuf, imagesFrom_[i],
1652 selfReceiveOffset = curBufferOffset;
1654 curBufferOffset += totalPacketsFrom_i;
1659 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1660 Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts4_barrier_);
1661 #endif // TPETRA_DISTRIBUTOR_TIMERS 1670 #ifdef TPETRA_DISTRIBUTOR_TIMERS 1671 Teuchos::TimeMonitor timeMonSends (*timer_doPosts4_sends_);
1672 #endif // TPETRA_DISTRIBUTOR_TIMERS 1676 Array<size_t> sendPacketOffsets(numSends_,0), packetsPerSend(numSends_,0);
1677 size_t maxNumPackets = 0;
1678 size_t curPKToffset = 0;
1679 for (
size_t pp=0; pp<numSends_; ++pp) {
1680 sendPacketOffsets[pp] = curPKToffset;
1681 size_t numPackets = 0;
1682 for (
size_t j=startsTo_[pp]; j<startsTo_[pp]+lengthsTo_[pp]; ++j) {
1683 numPackets += numExportPacketsPerLID[j];
1685 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
1686 packetsPerSend[pp] = numPackets;
1687 curPKToffset += numPackets;
1692 size_t numBlocks = numSends_+ selfMessage_;
1693 size_t imageIndex = 0;
1694 while ((imageIndex < numBlocks) && (imagesTo_[imageIndex] < myImageID)) {
1697 if (imageIndex == numBlocks) {
1702 size_t selfIndex = 0;
1704 if (indicesTo_.empty()) {
1706 std::ostringstream os;
1707 os << myImageID <<
": doPosts(4,fast): posting sends" << endl;
1713 for (
size_t i = 0; i < numBlocks; ++i) {
1714 size_t p = i + imageIndex;
1715 if (p > (numBlocks - 1)) {
1719 if (imagesTo_[p] != myImageID && packetsPerSend[p] > 0) {
1720 ArrayView<const Packet> tmpSend =
1721 exports.view (sendPacketOffsets[p], packetsPerSend[p]);
1723 if (sendType == Details::DISTRIBUTOR_SEND) {
1724 send<int, Packet> (tmpSend.getRawPtr (),
1725 as<int> (tmpSend.size ()),
1726 imagesTo_[p], tag, *comm_);
1728 else if (sendType == Details::DISTRIBUTOR_RSEND) {
1729 readySend<int, Packet> (tmpSend.getRawPtr (),
1730 as<int> (tmpSend.size ()),
1731 imagesTo_[p], tag, *comm_);
1733 else if (sendType == Details::DISTRIBUTOR_ISEND) {
1734 ArrayRCP<const Packet> tmpSendBuf =
1735 exports.persistingView (sendPacketOffsets[p], packetsPerSend[p]);
1736 requests_.push_back (isend<int, Packet> (tmpSendBuf, imagesTo_[p],
1739 else if (sendType == Details::DISTRIBUTOR_SSEND) {
1740 ssend<int, Packet> (tmpSend.getRawPtr (),
1741 as<int> (tmpSend.size ()),
1742 imagesTo_[p], tag, *comm_);
1745 TEUCHOS_TEST_FOR_EXCEPTION(
1746 true, std::logic_error,
"Tpetra::Distributor::doPosts(4 args): " 1747 "Invalid send type. We should never get here. Please report " 1748 "this bug to the Tpetra developers.");
1757 std::copy (exports.begin()+sendPacketOffsets[selfNum],
1758 exports.begin()+sendPacketOffsets[selfNum]+packetsPerSend[selfNum],
1759 imports.begin()+selfReceiveOffset);
1762 std::ostringstream os;
1763 os << myImageID <<
": doPosts(4,fast) done" << endl;
1769 std::ostringstream os;
1770 os << myImageID <<
": doPosts(4,slow): posting sends" << endl;
1775 ArrayRCP<Packet> sendArray (maxNumPackets);
1777 TEUCHOS_TEST_FOR_EXCEPTION(
1778 sendType == Details::DISTRIBUTOR_ISEND, std::logic_error,
1779 "Tpetra::Distributor::doPosts(3 args): The \"send buffer\" " 1780 "code path may not necessarily work with nonblocking sends.");
1782 Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
1784 for (
int j=0; j<numExportPacketsPerLID.size(); ++j) {
1785 indicesOffsets[j] = ioffset;
1786 ioffset += numExportPacketsPerLID[j];
1789 for (
size_t i = 0; i < numBlocks; ++i) {
1790 size_t p = i + imageIndex;
1791 if (p > (numBlocks - 1)) {
1795 if (imagesTo_[p] != myImageID) {
1796 typename ArrayView<const Packet>::iterator srcBegin, srcEnd;
1797 size_t sendArrayOffset = 0;
1798 size_t j = startsTo_[p];
1799 size_t numPacketsTo_p = 0;
1800 for (
size_t k = 0; k < lengthsTo_[p]; ++k, ++j) {
1801 srcBegin = exports.begin() + indicesOffsets[j];
1802 srcEnd = srcBegin + numExportPacketsPerLID[j];
1803 numPacketsTo_p += numExportPacketsPerLID[j];
1804 std::copy (srcBegin, srcEnd, sendArray.begin()+sendArrayOffset);
1805 sendArrayOffset += numExportPacketsPerLID[j];
1807 if (numPacketsTo_p > 0) {
1808 ArrayView<const Packet> tmpSend =
1809 sendArray.view (0, numPacketsTo_p);
1811 if (sendType == Details::DISTRIBUTOR_RSEND) {
1812 readySend<int, Packet> (tmpSend.getRawPtr (),
1813 as<int> (tmpSend.size ()),
1814 imagesTo_[p], tag, *comm_);
1816 else if (sendType == Details::DISTRIBUTOR_ISEND) {
1817 ArrayRCP<const Packet> tmpSendBuf =
1818 sendArray.persistingView (0, numPacketsTo_p);
1819 requests_.push_back (isend<int, Packet> (tmpSendBuf, imagesTo_[p],
1822 else if (sendType == Details::DISTRIBUTOR_SSEND) {
1823 ssend<int, Packet> (tmpSend.getRawPtr (),
1824 as<int> (tmpSend.size ()),
1825 imagesTo_[p], tag, *comm_);
1828 send<int, Packet> (tmpSend.getRawPtr (),
1829 as<int> (tmpSend.size ()),
1830 imagesTo_[p], tag, *comm_);
1836 selfIndex = startsTo_[p];
1841 for (
size_t k = 0; k < lengthsTo_[selfNum]; ++k) {
1842 std::copy (exports.begin()+indicesOffsets[selfIndex],
1843 exports.begin()+indicesOffsets[selfIndex]+numExportPacketsPerLID[selfIndex],
1844 imports.begin() + selfReceiveOffset);
1845 selfReceiveOffset += numExportPacketsPerLID[selfIndex];
1850 std::ostringstream os;
1851 os << myImageID <<
": doPosts(4,slow) done" << endl;
1857 template <
class Packet>
1861 const ArrayView<Packet>& imports)
1875 typedef typename ArrayRCP<const Packet>::size_type size_type;
1876 ArrayRCP<const Packet> exportsArcp (exports.getRawPtr(), as<size_type> (0),
1877 exports.size(),
false);
1882 doReversePosts (exportsArcp,
1884 arcp<Packet> (imports.getRawPtr (), 0, imports.size (),
false));
1887 lastRoundBytesSend_ = exports.size() *
sizeof(Packet);
1888 lastRoundBytesRecv_ = imports.size() *
sizeof(Packet);
1891 template <
class Packet>
1894 const ArrayView<size_t> &numExportPacketsPerLID,
1895 const ArrayView<Packet> &imports,
1896 const ArrayView<size_t> &numImportPacketsPerLID)
1899 using Teuchos::arcp;
1900 using Teuchos::ArrayRCP;
1902 TEUCHOS_TEST_FOR_EXCEPTION(
1903 requests_.size () != 0, std::runtime_error,
"Tpetra::Distributor::" 1904 "doReversePostsAndWaits(4 args): There are " << requests_.size ()
1905 <<
" outstanding nonblocking messages pending. It is incorrect to call " 1906 "this method with posts outstanding.");
1919 typedef typename ArrayRCP<const Packet>::size_type size_type;
1920 ArrayRCP<const Packet> exportsArcp (exports.getRawPtr (), as<size_type> (0),
1921 exports.size (),
false);
1922 doReversePosts (exportsArcp,
1923 numExportPacketsPerLID,
1924 arcp<Packet> (imports.getRawPtr (), 0, imports.size (),
false),
1925 numImportPacketsPerLID);
1928 lastRoundBytesSend_ = exports.size() *
sizeof(Packet);
1929 lastRoundBytesRecv_ = imports.size() *
sizeof(Packet);
1932 template <
class Packet>
1936 const ArrayRCP<Packet>& imports)
1939 TEUCHOS_TEST_FOR_EXCEPTION(
1940 ! indicesTo_.empty (), std::runtime_error,
1941 "Tpetra::Distributor::doReversePosts(3 args): Can only do reverse " 1942 "communication when original data are blocked by process.");
1943 if (reverseDistributor_.is_null ()) {
1944 createReverseDistributor ();
1946 reverseDistributor_->doPosts (exports, numPackets, imports);
1949 template <
class Packet>
1952 const ArrayView<size_t>& numExportPacketsPerLID,
1953 const ArrayRCP<Packet>& imports,
1954 const ArrayView<size_t>& numImportPacketsPerLID)
1957 TEUCHOS_TEST_FOR_EXCEPTION(
1958 ! indicesTo_.empty (), std::runtime_error,
1959 "Tpetra::Distributor::doReversePosts(3 args): Can only do reverse " 1960 "communication when original data are blocked by process.");
1961 if (reverseDistributor_.is_null ()) {
1962 createReverseDistributor ();
1964 reverseDistributor_->doPosts (exports, numExportPacketsPerLID,
1965 imports, numImportPacketsPerLID);
1968 template <
class ExpView,
class ImpView>
1969 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
1971 doPostsAndWaits (
const ExpView &exports,
1973 const ImpView &imports)
1982 typedef ExpView exports_view;
1983 typedef ImpView imports_view;
1984 const bool can_access_from_host =
1985 Kokkos::Impl::VerifyExecutionCanAccessMemorySpace< Kokkos::HostSpace,
1986 typename exports_view::memory_space >::value;
1987 if (! enable_cuda_rdma_ && ! can_access_from_host) {
1988 typename exports_view::HostMirror host_exports =
1989 Kokkos::create_mirror_view (exports);
1990 typename imports_view::HostMirror host_imports =
1991 Kokkos::create_mirror_view (imports);
1993 doPostsAndWaits (Kokkos::Compat::create_const_view (host_exports),
2000 TEUCHOS_TEST_FOR_EXCEPTION(
2001 requests_.size () != 0, std::runtime_error,
"Tpetra::Distributor::" 2002 "doPostsAndWaits(3 args): There are " << requests_.size () <<
2003 " outstanding nonblocking messages pending. It is incorrect to call " 2004 "this method with posts outstanding.");
2006 doPosts (exports, numPackets, imports);
2010 template <
class ExpView,
class ImpView>
2011 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
2013 doPostsAndWaits (
const ExpView &exports,
2014 const ArrayView<size_t> &numExportPacketsPerLID,
2015 const ImpView &imports,
2016 const ArrayView<size_t> &numImportPacketsPerLID)
2024 typedef ExpView exports_view;
2025 typedef ImpView imports_view;
2026 if (!enable_cuda_rdma_ && !exports_view::is_hostspace) {
2027 typename exports_view::HostMirror host_exports =
2028 Kokkos::create_mirror_view(exports);
2029 typename imports_view::HostMirror host_imports =
2030 Kokkos::create_mirror_view(imports);
2032 doPostsAndWaits(Kokkos::Compat::create_const_view(host_exports),
2033 numExportPacketsPerLID,
2035 numImportPacketsPerLID);
2040 TEUCHOS_TEST_FOR_EXCEPTION(
2041 requests_.size () != 0, std::runtime_error,
2042 "Tpetra::Distributor::doPostsAndWaits(4 args): There are " 2043 << requests_.size () <<
" outstanding nonblocking messages pending. " 2044 "It is incorrect to call this method with posts outstanding.");
2046 doPosts (exports, numExportPacketsPerLID, imports, numImportPacketsPerLID);
2051 template <
class ExpView,
class ImpView>
2052 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
2054 doPosts (
const ExpView &exports,
2056 const ImpView &imports)
2058 using Teuchos::Array;
2060 using Teuchos::FancyOStream;
2061 using Teuchos::includesVerbLevel;
2062 using Teuchos::ireceive;
2063 using Teuchos::isend;
2064 using Teuchos::OSTab;
2065 using Teuchos::readySend;
2066 using Teuchos::send;
2067 using Teuchos::ssend;
2068 using Teuchos::TypeNameTraits;
2069 using Teuchos::typeName;
2071 using Kokkos::Compat::create_const_view;
2072 using Kokkos::Compat::create_view;
2073 using Kokkos::Compat::subview_offset;
2074 using Kokkos::Compat::deep_copy_offset;
2075 typedef Array<size_t>::size_type size_type;
2076 typedef ExpView exports_view_type;
2077 typedef ImpView imports_view_type;
2079 Teuchos::OSTab tab (out_);
2081 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2082 Teuchos::TimeMonitor timeMon (*timer_doPosts3_);
2083 #endif // TPETRA_DISTRIBUTOR_TIMERS 2088 const bool doBarrier = barrierBetween_;
2116 TEUCHOS_TEST_FOR_EXCEPTION(
2117 sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier, std::logic_error,
2118 "Tpetra::Distributor::doPosts(3 args): Ready-send version requires a " 2119 "barrier between posting receives and posting ready sends. This should " 2120 "have been checked before. " 2121 "Please report this bug to the Tpetra developers.");
2123 const int myImageID = comm_->getRank ();
2124 size_t selfReceiveOffset = 0;
2127 const size_t totalNumImportPackets = totalReceiveLength_ * numPackets;
2128 TEUCHOS_TEST_FOR_EXCEPTION(
2129 static_cast<size_t> (imports.dimension_0 ()) < totalNumImportPackets,
2130 std::runtime_error,
"Tpetra::Distributor::doPosts(3 args): The 'imports' " 2131 "array must have enough entries to hold the expected number of import " 2132 "packets. imports.dimension_0() = " << imports.dimension_0 () <<
" < " 2133 "totalNumImportPackets = " << totalNumImportPackets <<
".");
2140 const int pathTag = 0;
2141 const int tag = this->getTag (pathTag);
2144 TEUCHOS_TEST_FOR_EXCEPTION(
2145 requests_.size () != 0, std::logic_error,
"Tpetra::Distributor::" 2146 "doPosts(3 args): Process " << myImageID <<
": requests_.size() = " 2147 << requests_.size () <<
" != 0.");
2148 std::ostringstream os;
2149 os << myImageID <<
": doPosts(3," 2150 << (indicesTo_.empty () ?
"fast" :
"slow") <<
")" << endl;
2167 const size_type actualNumReceives = as<size_type> (numReceives_) +
2168 as<size_type> (selfMessage_ ? 1 : 0);
2169 requests_.resize (0);
2177 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2178 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts3_recvs_);
2179 #endif // TPETRA_DISTRIBUTOR_TIMERS 2181 size_t curBufferOffset = 0;
2182 for (size_type i = 0; i < actualNumReceives; ++i) {
2183 if (imagesFrom_[i] != myImageID) {
2191 imports_view_type recvBuf =
2192 subview_offset (imports, curBufferOffset, lengthsFrom_[i]*numPackets);
2193 requests_.push_back (ireceive<int> (recvBuf, imagesFrom_[i],
2196 std::ostringstream os;
2197 os << myImageID <<
": doPosts(3," 2198 << (indicesTo_.empty () ?
"fast" :
"slow") <<
"): " 2199 <<
"Posted irecv from Proc " << imagesFrom_[i] <<
" with " 2200 "specified tag " << tag << endl;
2205 selfReceiveOffset = curBufferOffset;
2207 curBufferOffset += lengthsFrom_[i]*numPackets;
2212 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2213 Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts3_barrier_);
2214 #endif // TPETRA_DISTRIBUTOR_TIMERS 2223 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2224 Teuchos::TimeMonitor timeMonSends (*timer_doPosts3_sends_);
2225 #endif // TPETRA_DISTRIBUTOR_TIMERS 2232 size_t numBlocks = numSends_ + selfMessage_;
2233 size_t imageIndex = 0;
2234 while ((imageIndex < numBlocks) && (imagesTo_[imageIndex] < myImageID)) {
2237 if (imageIndex == numBlocks) {
2242 size_t selfIndex = 0;
2244 if (indicesTo_.empty()) {
2246 std::ostringstream os;
2247 os << myImageID <<
": doPosts(3,fast): posting sends" << endl;
2253 for (
size_t i = 0; i < numBlocks; ++i) {
2254 size_t p = i + imageIndex;
2255 if (p > (numBlocks - 1)) {
2259 if (imagesTo_[p] != myImageID) {
2260 exports_view_type tmpSend = subview_offset(
2261 exports, startsTo_[p]*numPackets, lengthsTo_[p]*numPackets);
2263 if (sendType == Details::DISTRIBUTOR_SEND) {
2265 as<int> (tmpSend.size ()),
2266 imagesTo_[p], tag, *comm_);
2268 else if (sendType == Details::DISTRIBUTOR_ISEND) {
2269 exports_view_type tmpSendBuf =
2270 subview_offset (exports, startsTo_[p] * numPackets,
2271 lengthsTo_[p] * numPackets);
2272 requests_.push_back (isend<int> (tmpSendBuf, imagesTo_[p],
2275 else if (sendType == Details::DISTRIBUTOR_RSEND) {
2276 readySend<int> (tmpSend,
2277 as<int> (tmpSend.size ()),
2278 imagesTo_[p], tag, *comm_);
2280 else if (sendType == Details::DISTRIBUTOR_SSEND) {
2281 ssend<int> (tmpSend,
2282 as<int> (tmpSend.size ()),
2283 imagesTo_[p], tag, *comm_);
2285 TEUCHOS_TEST_FOR_EXCEPTION(
2286 true, std::logic_error,
"Tpetra::Distributor::doPosts(3 args): " 2287 "Invalid send type. We should never get here. " 2288 "Please report this bug to the Tpetra developers.");
2292 std::ostringstream os;
2293 os << myImageID <<
": doPosts(3,fast): " 2294 <<
"Posted send to Proc " << imagesTo_[i]
2295 <<
" w/ specified tag " << tag << endl;
2312 deep_copy_offset(imports, exports, selfReceiveOffset,
2313 startsTo_[selfNum]*numPackets,
2314 lengthsTo_[selfNum]*numPackets);
2317 std::ostringstream os;
2318 os << myImageID <<
": doPosts(3,fast) done" << endl;
2324 std::ostringstream os;
2325 os << myImageID <<
": doPosts(3,slow): posting sends" << endl;
2331 typedef typename ExpView::non_const_value_type Packet;
2332 typedef typename ExpView::array_layout Layout;
2333 typedef typename ExpView::device_type Device;
2334 typedef typename ExpView::memory_traits Mem;
2335 Kokkos::View<Packet*,Layout,Device,Mem> sendArray (
"sendArray",
2336 maxSendLength_ * numPackets);
2338 TEUCHOS_TEST_FOR_EXCEPTION(
2339 sendType == Details::DISTRIBUTOR_ISEND, std::logic_error,
2340 "Tpetra::Distributor::doPosts(3 args): The \"send buffer\" code path " 2341 "doesn't currently work with nonblocking sends.");
2343 for (
size_t i = 0; i < numBlocks; ++i) {
2344 size_t p = i + imageIndex;
2345 if (p > (numBlocks - 1)) {
2349 if (imagesTo_[p] != myImageID) {
2350 size_t sendArrayOffset = 0;
2351 size_t j = startsTo_[p];
2352 for (
size_t k = 0; k < lengthsTo_[p]; ++k, ++j) {
2353 deep_copy_offset(sendArray, exports, sendArrayOffset,
2354 indicesTo_[j]*numPackets, numPackets);
2355 sendArrayOffset += numPackets;
2358 subview_offset(sendArray,
size_t(0), lengthsTo_[p]*numPackets);
2360 if (sendType == Details::DISTRIBUTOR_SEND) {
2362 as<int> (tmpSend.size ()),
2363 imagesTo_[p], tag, *comm_);
2365 else if (sendType == Details::DISTRIBUTOR_ISEND) {
2366 exports_view_type tmpSendBuf =
2367 subview_offset (sendArray,
size_t(0), lengthsTo_[p] * numPackets);
2368 requests_.push_back (isend<int> (tmpSendBuf, imagesTo_[p],
2371 else if (sendType == Details::DISTRIBUTOR_RSEND) {
2372 readySend<int> (tmpSend,
2373 as<int> (tmpSend.size ()),
2374 imagesTo_[p], tag, *comm_);
2376 else if (sendType == Details::DISTRIBUTOR_SSEND) {
2377 ssend<int> (tmpSend,
2378 as<int> (tmpSend.size ()),
2379 imagesTo_[p], tag, *comm_);
2382 TEUCHOS_TEST_FOR_EXCEPTION(
2383 true, std::logic_error,
"Tpetra::Distributor::doPosts(3 args): " 2384 "Invalid send type. We should never get here. " 2385 "Please report this bug to the Tpetra developers.");
2389 std::ostringstream os;
2390 os << myImageID <<
": doPosts(3,slow): " 2391 <<
"Posted send to Proc " << imagesTo_[i]
2392 <<
" w/ specified tag " << tag << endl;
2398 selfIndex = startsTo_[p];
2403 for (
size_t k = 0; k < lengthsTo_[selfNum]; ++k) {
2404 deep_copy_offset(imports, exports, selfReceiveOffset,
2405 indicesTo_[selfIndex]*numPackets, numPackets);
2407 selfReceiveOffset += numPackets;
2411 std::ostringstream os;
2412 os << myImageID <<
": doPosts(3,slow) done" << endl;
2418 template <
class ExpView,
class ImpView>
2419 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
2421 doPosts (
const ExpView &exports,
2422 const ArrayView<size_t> &numExportPacketsPerLID,
2423 const ImpView &imports,
2424 const ArrayView<size_t> &numImportPacketsPerLID)
2426 using Teuchos::Array;
2428 using Teuchos::ireceive;
2429 using Teuchos::isend;
2430 using Teuchos::readySend;
2431 using Teuchos::send;
2432 using Teuchos::ssend;
2433 using Teuchos::TypeNameTraits;
2434 #ifdef HAVE_TEUCHOS_DEBUG 2435 using Teuchos::OSTab;
2436 #endif // HAVE_TEUCHOS_DEBUG 2438 using Kokkos::Compat::create_const_view;
2439 using Kokkos::Compat::create_view;
2440 using Kokkos::Compat::subview_offset;
2441 using Kokkos::Compat::deep_copy_offset;
2442 typedef Array<size_t>::size_type size_type;
2443 typedef ExpView exports_view_type;
2444 typedef ImpView imports_view_type;
2446 Teuchos::OSTab tab (out_);
2448 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2449 Teuchos::TimeMonitor timeMon (*timer_doPosts4_);
2450 #endif // TPETRA_DISTRIBUTOR_TIMERS 2455 const bool doBarrier = barrierBetween_;
2481 TEUCHOS_TEST_FOR_EXCEPTION(
2482 sendType == Details::DISTRIBUTOR_RSEND && ! doBarrier,
2483 std::logic_error,
"Tpetra::Distributor::doPosts(4 args): Ready-send " 2484 "version requires a barrier between posting receives and posting ready " 2485 "sends. This should have been checked before. " 2486 "Please report this bug to the Tpetra developers.");
2488 const int myImageID = comm_->getRank ();
2489 size_t selfReceiveOffset = 0;
2491 #ifdef HAVE_TEUCHOS_DEBUG 2493 size_t totalNumImportPackets = 0;
2494 for (size_type ii = 0; ii < numImportPacketsPerLID.size (); ++ii) {
2495 totalNumImportPackets += numImportPacketsPerLID[ii];
2497 TEUCHOS_TEST_FOR_EXCEPTION(
2498 imports.dimension_0 () < totalNumImportPackets, std::runtime_error,
2499 "Tpetra::Distributor::doPosts(4 args): The 'imports' array must have " 2500 "enough entries to hold the expected number of import packets. " 2501 "imports.dimension_0() = " << imports.dimension_0 () <<
" < " 2502 "totalNumImportPackets = " << totalNumImportPackets <<
".");
2503 #endif // HAVE_TEUCHOS_DEBUG 2510 const int pathTag = 1;
2511 const int tag = this->getTag (pathTag);
2514 TEUCHOS_TEST_FOR_EXCEPTION(
2515 requests_.size () != 0, std::logic_error,
"Tpetra::Distributor::" 2516 "doPosts(4 args): Process " << myImageID <<
": requests_.size () = " 2517 << requests_.size () <<
" != 0.");
2518 std::ostringstream os;
2519 os << myImageID <<
": doPosts(4," 2520 << (indicesTo_.empty () ?
"fast" :
"slow") <<
")" << endl;
2537 const size_type actualNumReceives = as<size_type> (numReceives_) +
2538 as<size_type> (selfMessage_ ? 1 : 0);
2539 requests_.resize (0);
2547 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2548 Teuchos::TimeMonitor timeMonRecvs (*timer_doPosts4_recvs_);
2549 #endif // TPETRA_DISTRIBUTOR_TIMERS 2551 size_t curBufferOffset = 0;
2552 size_t curLIDoffset = 0;
2553 for (size_type i = 0; i < actualNumReceives; ++i) {
2554 size_t totalPacketsFrom_i = 0;
2555 for (
size_t j = 0; j < lengthsFrom_[i]; ++j) {
2556 totalPacketsFrom_i += numImportPacketsPerLID[curLIDoffset+j];
2558 curLIDoffset += lengthsFrom_[i];
2559 if (imagesFrom_[i] != myImageID && totalPacketsFrom_i) {
2568 imports_view_type recvBuf =
2569 subview_offset (imports, curBufferOffset, totalPacketsFrom_i);
2570 requests_.push_back (ireceive<int> (recvBuf, imagesFrom_[i],
2574 selfReceiveOffset = curBufferOffset;
2576 curBufferOffset += totalPacketsFrom_i;
2581 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2582 Teuchos::TimeMonitor timeMonBarrier (*timer_doPosts4_barrier_);
2583 #endif // TPETRA_DISTRIBUTOR_TIMERS 2592 #ifdef TPETRA_DISTRIBUTOR_TIMERS 2593 Teuchos::TimeMonitor timeMonSends (*timer_doPosts4_sends_);
2594 #endif // TPETRA_DISTRIBUTOR_TIMERS 2598 Array<size_t> sendPacketOffsets(numSends_,0), packetsPerSend(numSends_,0);
2599 size_t maxNumPackets = 0;
2600 size_t curPKToffset = 0;
2601 for (
size_t pp=0; pp<numSends_; ++pp) {
2602 sendPacketOffsets[pp] = curPKToffset;
2603 size_t numPackets = 0;
2604 for (
size_t j=startsTo_[pp]; j<startsTo_[pp]+lengthsTo_[pp]; ++j) {
2605 numPackets += numExportPacketsPerLID[j];
2607 if (numPackets > maxNumPackets) maxNumPackets = numPackets;
2608 packetsPerSend[pp] = numPackets;
2609 curPKToffset += numPackets;
2614 size_t numBlocks = numSends_+ selfMessage_;
2615 size_t imageIndex = 0;
2616 while ((imageIndex < numBlocks) && (imagesTo_[imageIndex] < myImageID)) {
2619 if (imageIndex == numBlocks) {
2624 size_t selfIndex = 0;
2626 if (indicesTo_.empty()) {
2628 std::ostringstream os;
2629 os << myImageID <<
": doPosts(4,fast): posting sends" << endl;
2635 for (
size_t i = 0; i < numBlocks; ++i) {
2636 size_t p = i + imageIndex;
2637 if (p > (numBlocks - 1)) {
2641 if (imagesTo_[p] != myImageID && packetsPerSend[p] > 0) {
2642 exports_view_type tmpSend =
2643 subview_offset(exports, sendPacketOffsets[p], packetsPerSend[p]);
2645 if (sendType == Details::DISTRIBUTOR_SEND) {
2647 as<int> (tmpSend.size ()),
2648 imagesTo_[p], tag, *comm_);
2650 else if (sendType == Details::DISTRIBUTOR_RSEND) {
2651 readySend<int> (tmpSend,
2652 as<int> (tmpSend.size ()),
2653 imagesTo_[p], tag, *comm_);
2655 else if (sendType == Details::DISTRIBUTOR_ISEND) {
2656 exports_view_type tmpSendBuf =
2657 subview_offset (exports, sendPacketOffsets[p], packetsPerSend[p]);
2658 requests_.push_back (isend<int> (tmpSendBuf, imagesTo_[p],
2661 else if (sendType == Details::DISTRIBUTOR_SSEND) {
2662 ssend<int> (tmpSend,
2663 as<int> (tmpSend.size ()),
2664 imagesTo_[p], tag, *comm_);
2667 TEUCHOS_TEST_FOR_EXCEPTION(
2668 true, std::logic_error,
"Tpetra::Distributor::doPosts(4 args): " 2669 "Invalid send type. We should never get here. " 2670 "Please report this bug to the Tpetra developers.");
2679 deep_copy_offset(imports, exports, selfReceiveOffset,
2680 sendPacketOffsets[selfNum], packetsPerSend[selfNum]);
2683 std::ostringstream os;
2684 os << myImageID <<
": doPosts(4,fast) done" << endl;
2690 std::ostringstream os;
2691 os << myImageID <<
": doPosts(4,slow): posting sends" << endl;
2696 typedef typename ExpView::non_const_value_type Packet;
2697 typedef typename ExpView::array_layout Layout;
2698 typedef typename ExpView::device_type Device;
2699 typedef typename ExpView::memory_traits Mem;
2700 Kokkos::View<Packet*,Layout,Device,Mem> sendArray (
"sendArray", maxNumPackets);
2702 TEUCHOS_TEST_FOR_EXCEPTION(
2703 sendType == Details::DISTRIBUTOR_ISEND, std::logic_error,
2704 "Tpetra::Distributor::doPosts(3 args): The \"send buffer\" code path " 2705 "may not necessarily work with nonblocking sends.");
2707 Array<size_t> indicesOffsets (numExportPacketsPerLID.size(), 0);
2709 for (
int j=0; j<numExportPacketsPerLID.size(); ++j) {
2710 indicesOffsets[j] = ioffset;
2711 ioffset += numExportPacketsPerLID[j];
2714 for (
size_t i = 0; i < numBlocks; ++i) {
2715 size_t p = i + imageIndex;
2716 if (p > (numBlocks - 1)) {
2720 if (imagesTo_[p] != myImageID) {
2721 size_t sendArrayOffset = 0;
2722 size_t j = startsTo_[p];
2723 size_t numPacketsTo_p = 0;
2724 for (
size_t k = 0; k < lengthsTo_[p]; ++k, ++j) {
2725 deep_copy_offset(sendArray, exports, sendArrayOffset,
2726 indicesOffsets[j], numExportPacketsPerLID[j]);
2727 sendArrayOffset += numExportPacketsPerLID[j];
2729 if (numPacketsTo_p > 0) {
2731 subview_offset(sendArray,
size_t(0), numPacketsTo_p);
2733 if (sendType == Details::DISTRIBUTOR_RSEND) {
2734 readySend<int> (tmpSend,
2735 as<int> (tmpSend.size ()),
2736 imagesTo_[p], tag, *comm_);
2738 else if (sendType == Details::DISTRIBUTOR_ISEND) {
2739 exports_view_type tmpSendBuf =
2740 subview_offset (sendArray,
size_t(0), numPacketsTo_p);
2741 requests_.push_back (isend<int> (tmpSendBuf, imagesTo_[p],
2744 else if (sendType == Details::DISTRIBUTOR_SSEND) {
2745 ssend<int> (tmpSend,
2746 as<int> (tmpSend.size ()),
2747 imagesTo_[p], tag, *comm_);
2751 as<int> (tmpSend.size ()),
2752 imagesTo_[p], tag, *comm_);
2758 selfIndex = startsTo_[p];
2763 for (
size_t k = 0; k < lengthsTo_[selfNum]; ++k) {
2764 deep_copy_offset(imports, exports, selfReceiveOffset,
2765 indicesOffsets[selfIndex],
2766 numExportPacketsPerLID[selfIndex]);
2767 selfReceiveOffset += numExportPacketsPerLID[selfIndex];
2772 std::ostringstream os;
2773 os << myImageID <<
": doPosts(4,slow) done" << endl;
2779 template <
class ExpView,
class ImpView>
2780 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
2782 doReversePostsAndWaits (
const ExpView &exports,
2784 const ImpView &imports)
2792 typedef ExpView exports_view;
2793 typedef ImpView imports_view;
2794 if (!enable_cuda_rdma_ && !exports_view::is_hostspace) {
2795 typename exports_view::HostMirror host_exports =
2796 Kokkos::create_mirror_view(exports);
2797 typename imports_view::HostMirror host_imports =
2798 Kokkos::create_mirror_view(imports);
2800 doPostsAndWaits(Kokkos::Compat::create_const_view(host_exports),
2807 doReversePosts (exports, numPackets, imports);
2811 template <
class ExpView,
class ImpView>
2812 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
2814 doReversePostsAndWaits (
const ExpView &exports,
2815 const ArrayView<size_t> &numExportPacketsPerLID,
2816 const ImpView &imports,
2817 const ArrayView<size_t> &numImportPacketsPerLID)
2825 typedef ExpView exports_view;
2826 typedef ImpView imports_view;
2827 if (!enable_cuda_rdma_ && !exports_view::is_hostspace) {
2828 typename exports_view::HostMirror host_exports =
2829 Kokkos::create_mirror_view(exports);
2830 typename imports_view::HostMirror host_imports =
2831 Kokkos::create_mirror_view(imports);
2833 doPostsAndWaits(Kokkos::Compat::create_const_view(host_exports),
2834 numExportPacketsPerLID,
2836 numImportPacketsPerLID);
2841 TEUCHOS_TEST_FOR_EXCEPTION(requests_.size() != 0, std::runtime_error,
2842 "Tpetra::Distributor::doReversePostsAndWaits(4 args): There are " 2843 << requests_.size() <<
" outstanding nonblocking messages pending. It " 2844 "is incorrect to call this method with posts outstanding.");
2846 doReversePosts (exports, numExportPacketsPerLID, imports,
2847 numImportPacketsPerLID);
2851 template <
class ExpView,
class ImpView>
2852 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
2854 doReversePosts (
const ExpView &exports,
2856 const ImpView &imports)
2859 TEUCHOS_TEST_FOR_EXCEPTION(
2860 ! indicesTo_.empty (), std::runtime_error,
2861 "Tpetra::Distributor::doReversePosts(3 args): Can only do " 2862 "reverse communication when original data are blocked by process.");
2863 if (reverseDistributor_.is_null ()) {
2864 createReverseDistributor ();
2866 reverseDistributor_->doPosts (exports, numPackets, imports);
2869 template <
class ExpView,
class ImpView>
2870 typename std::enable_if<(Kokkos::Impl::is_view<ExpView>::value && Kokkos::Impl::is_view<ImpView>::value)>::type
2872 doReversePosts (
const ExpView &exports,
2873 const ArrayView<size_t> &numExportPacketsPerLID,
2874 const ImpView &imports,
2875 const ArrayView<size_t> &numImportPacketsPerLID)
2878 TEUCHOS_TEST_FOR_EXCEPTION(
2879 ! indicesTo_.empty (), std::runtime_error,
2880 "Tpetra::Distributor::doReversePosts(3 args): Can only do " 2881 "reverse communication when original data are blocked by process.");
2882 if (reverseDistributor_.is_null ()) {
2883 createReverseDistributor ();
2885 reverseDistributor_->doPosts (exports, numExportPacketsPerLID,
2886 imports, numImportPacketsPerLID);
2889 template <
class OrdinalType>
2891 computeSends (
const ArrayView<const OrdinalType> & importIDs,
2892 const ArrayView<const int> & importNodeIDs,
2893 Array<OrdinalType> & exportIDs,
2894 Array<int> & exportNodeIDs)
2904 typedef typename ArrayView<const OrdinalType>::size_type size_type;
2906 Teuchos::OSTab tab (out_);
2907 const int myRank = comm_->getRank ();
2909 std::ostringstream os;
2910 os << myRank <<
": computeSends" << endl;
2914 TEUCHOS_TEST_FOR_EXCEPTION(
2915 importIDs.size () != importNodeIDs.size (), std::invalid_argument,
2916 "Tpetra::Distributor::computeSends: On Process " << myRank <<
": " 2917 "importNodeIDs.size() = " << importNodeIDs.size ()
2918 <<
" != importIDs.size() = " << importIDs.size () <<
".");
2920 const size_type numImports = importNodeIDs.size ();
2921 Array<size_t> importObjs (2*numImports);
2923 for (size_type i = 0; i < numImports; ++i) {
2924 importObjs[2*i] =
static_cast<size_t> (importIDs[i]);
2925 importObjs[2*i+1] =
static_cast<size_t> (myRank);
2933 std::ostringstream os;
2934 os << myRank <<
": computeSends: tempPlan.createFromSends" << endl;
2940 const size_t numExportsAsSizeT = tempPlan.
createFromSends (importNodeIDs);
2941 const size_type numExports =
static_cast<size_type
> (numExportsAsSizeT);
2942 TEUCHOS_TEST_FOR_EXCEPTION(
2943 numExports < 0, std::logic_error,
"Tpetra::Distributor::computeSends: " 2944 "tempPlan.createFromSends() returned numExports = " << numExportsAsSizeT
2945 <<
" as a size_t, which overflows to " << numExports <<
" when cast to " 2946 << Teuchos::TypeNameTraits<size_type>::name () <<
". " 2947 "Please report this bug to the Tpetra developers.");
2948 TEUCHOS_TEST_FOR_EXCEPTION(
2950 std::logic_error,
"Tpetra::Distributor::computeSends: tempPlan.getTotal" 2952 "Exports = " << numExports <<
". Please report this bug to the " 2953 "Tpetra developers.");
2955 if (numExports > 0) {
2956 exportIDs.resize (numExports);
2957 exportNodeIDs.resize (numExports);
2968 TEUCHOS_TEST_FOR_EXCEPTION(
2969 sizeof (
size_t) <
sizeof (OrdinalType), std::logic_error,
2970 "Tpetra::Distributor::computeSends: sizeof(size_t) = " <<
sizeof(
size_t)
2971 <<
" < sizeof(" << Teuchos::TypeNameTraits<OrdinalType>::name () <<
") = " 2972 <<
sizeof (OrdinalType) <<
". This violates an assumption of the " 2973 "method. It's not hard to work around (just use Array<OrdinalType> as " 2974 "the export buffer, not Array<size_t>), but we haven't done that yet. " 2975 "Please report this bug to the Tpetra developers.");
2977 TEUCHOS_TEST_FOR_EXCEPTION(
2980 "Tpetra::Distributor::computeSends: tempPlan.getTotalReceiveLength() = " 2982 <<
". Please report this bug to the Tpetra developers.");
2986 std::ostringstream os;
2987 os << myRank <<
": computeSends: tempPlan.doPostsAndWaits" << endl;
2993 for (size_type i = 0; i < numExports; ++i) {
2994 exportIDs[i] =
static_cast<OrdinalType
> (exportObjs[2*i]);
2995 exportNodeIDs[i] =
static_cast<int> (exportObjs[2*i+1]);
2999 std::ostringstream os;
3000 os << myRank <<
": computeSends done" << endl;
3005 template <
class OrdinalType>
3008 const ArrayView<const int> &remoteImageIDs,
3009 Array<OrdinalType> &exportGIDs,
3010 Array<int> &exportNodeIDs)
3014 Teuchos::OSTab tab (out_);
3015 const int myRank = comm_->getRank();
3018 *out_ << myRank <<
": createFromRecvs" << endl;
3021 #ifdef HAVE_TPETRA_DEBUG 3022 using Teuchos::outArg;
3023 using Teuchos::reduceAll;
3028 (remoteIDs.size () != remoteImageIDs.size ()) ? myRank : -1;
3029 int maxErrProc = -1;
3030 reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX, errProc, outArg (maxErrProc));
3031 TEUCHOS_TEST_FOR_EXCEPTION(maxErrProc != -1, std::runtime_error,
3032 Teuchos::typeName (*
this) <<
"::createFromRecvs(): lists of remote IDs " 3033 "and remote process IDs must have the same size on all participating " 3034 "processes. Maximum process ID with error: " << maxErrProc <<
".");
3035 #else // NOT HAVE_TPETRA_DEBUG 3038 TEUCHOS_TEST_FOR_EXCEPTION(
3039 remoteIDs.size () != remoteImageIDs.size (), std::invalid_argument,
3040 Teuchos::typeName (*
this) <<
"::createFromRecvs<" <<
3041 Teuchos::TypeNameTraits<OrdinalType>::name () <<
">(): On Process " <<
3042 myRank <<
": remoteIDs.size() = " << remoteIDs.size () <<
" != " 3043 "remoteImageIDs.size() = " << remoteImageIDs.size () <<
".");
3044 #endif // HAVE_TPETRA_DEBUG 3046 computeSends (remoteIDs, remoteImageIDs, exportGIDs, exportNodeIDs);
3048 const size_t numProcsSendingToMe = createFromSends (exportNodeIDs ());
3055 std::ostringstream os;
3056 os <<
"Proc " << myRank <<
": {numProcsSendingToMe: " 3057 << numProcsSendingToMe <<
", remoteImageIDs.size(): " 3058 << remoteImageIDs.size () <<
", selfMessage_: " 3059 << (selfMessage_ ?
"true" :
"false") <<
"}" << std::endl;
3060 std::cerr << os.str ();
3064 *out_ << myRank <<
": createFromRecvs done" << endl;
3067 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS;
3072 #endif // TPETRA_DISTRIBUTOR_HPP Namespace Tpetra contains the class and methods constituting the Tpetra library.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
void doPosts(const ArrayRCP< const Packet > &exports, size_t numPackets, const ArrayRCP< Packet > &imports)
Post the data for a forward plan, but do not execute the waits yet.
void deep_copy(const LittleBlock< ST2, LO > &dst, const LittleBlock< ST1, LO > &src, typename std::enable_if< std::is_convertible< ST1, ST2 >::value &&!std::is_const< ST2 >::value, int >::type *=NULL)
Copy the LittleBlock src into the LittleBlock dst.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
Implementation details of Tpetra.
Details::EDistributorHowInitialized howInitialized() const
Return an enum indicating whether and how a Distributor was initialized.
void doPostsAndWaits(const ArrayView< const Packet > &exports, size_t numPackets, const ArrayView< Packet > &imports)
Execute the (forward) communication plan.
Sets up and executes a communication plan for a Tpetra DistObject.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
size_t createFromSends(const ArrayView< const int > &exportNodeIDs)
Set up Distributor using list of process ranks to which this process will send.
void doReversePosts(const ArrayRCP< const Packet > &exports, size_t numPackets, const ArrayRCP< Packet > &imports)
Post the data for a reverse plan, but do not execute the waits yet.
void doReversePostsAndWaits(const ArrayView< const Packet > &exports, size_t numPackets, const ArrayView< Packet > &imports)
Execute the reverse communication plan.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
Stand-alone utility functions and macros.
void getLastDoStatistics(size_t &bytes_sent, size_t &bytes_recvd) const
Information on the last call to do/doReverse.
Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void createFromRecvs(const ArrayView< const Ordinal > &remoteIDs, const ArrayView< const int > &remoteNodeIDs, Array< Ordinal > &exportIDs, Array< int > &exportNodeIDs)
Set up Distributor using list of process ranks from which to receive.
EDistributorSendType
The type of MPI send that Distributor should use.