dune-common  2.3.1
communicator.hh
Go to the documentation of this file.
1 // -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 // vi: set et ts=4 sw=2 sts=2:
3 // $Id$
4 #ifndef DUNE_COMMUNICATOR
5 #define DUNE_COMMUNICATOR
6 
7 #include "remoteindices.hh"
8 #include "interface.hh"
12 
13 #if HAVE_MPI
14 // MPI header
15 #include <mpi.h>
16 
17 namespace Dune
18 {
102  struct SizeOne
103  {};
104 
111  {};
112 
113 
119  template<class V>
120  struct CommPolicy
121  {
133  typedef V Type;
134 
140  typedef typename V::value_type IndexedType;
141 
147 
156  static const void* getAddress(const V& v, int index);
157 
163  static int getSize(const V&, int index);
164  };
165 
166  template<class K, int n> class FieldVector;
167 
168  template<class B, class A> class VariableBlockVector;
169 
170  template<class K, class A, int n>
172  {
174 
175  typedef typename Type::B IndexedType;
176 
178 
179  static const void* getAddress(const Type& v, int i);
180 
181  static int getSize(const Type& v, int i);
182  };
183 
188  {};
189 
193  template<class T>
195  {
197 
198  static const IndexedType& gather(const T& vec, std::size_t i);
199 
200  static void scatter(T& vec, const IndexedType& v, std::size_t i);
201 
202  };
203 
215  template<typename T>
216  class DatatypeCommunicator : public InterfaceBuilder
217  {
218  public:
219 
223  typedef T ParallelIndexSet;
224 
229 
233  typedef typename RemoteIndices::GlobalIndex GlobalIndex;
234 
238  typedef typename RemoteIndices::Attribute Attribute;
239 
243  typedef typename RemoteIndices::LocalIndex LocalIndex;
244 
248  DatatypeCommunicator();
249 
253  ~DatatypeCommunicator();
254 
281  template<class T1, class T2, class V>
282  void build(const RemoteIndices& remoteIndices, const T1& sourceFlags, V& sendData, const T2& destFlags, V& receiveData);
283 
287  void forward();
288 
292  void backward();
293 
297  void free();
298  private:
299  enum {
303  commTag_ = 234
304  };
305 
309  const RemoteIndices* remoteIndices_;
310 
311  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >
312  MessageTypeMap;
313 
317  MessageTypeMap messageTypes;
318 
322  void* data_;
323 
324  MPI_Request* requests_[2];
325 
329  bool created_;
330 
334  template<class V, bool FORWARD>
335  void createRequests(V& sendData, V& receiveData);
336 
340  template<class T1, class T2, class V, bool send>
341  void createDataTypes(const T1& source, const T2& destination, V& data);
342 
346  void sendRecv(MPI_Request* req);
347 
351  struct IndexedTypeInformation
352  {
358  void build(int i)
359  {
360  length = new int[i];
361  displ = new MPI_Aint[i];
362  size = i;
363  }
364 
368  void free()
369  {
370  delete[] length;
371  delete[] displ;
372  }
374  int* length;
376  MPI_Aint* displ;
382  int elements;
386  int size;
387  };
388 
394  template<class V>
395  struct MPIDatatypeInformation
396  {
401  MPIDatatypeInformation(const V& data) : data_(data)
402  {}
403 
409  void reserve(int proc, int size)
410  {
411  information_[proc].build(size);
412  }
419  void add(int proc, int local)
420  {
421  IndexedTypeInformation& info=information_[proc];
422  assert(info.elements<info.size);
423  MPI_Address( const_cast<void*>(CommPolicy<V>::getAddress(data_, local)),
424  info.displ+info.elements);
425  info.length[info.elements]=CommPolicy<V>::getSize(data_, local);
426  info.elements++;
427  }
428 
433  std::map<int,IndexedTypeInformation> information_;
437  const V& data_;
438 
439  };
440 
441  };
442 
453  {
454 
455  public:
460 
467  template<class Data, class Interface>
469  build(const Interface& interface);
470 
478  template<class Data, class Interface>
479  void build(const Data& source, const Data& target, const Interface& interface);
480 
509  template<class GatherScatter, class Data>
510  void forward(const Data& source, Data& dest);
511 
540  template<class GatherScatter, class Data>
541  void backward(Data& source, const Data& dest);
542 
568  template<class GatherScatter, class Data>
569  void forward(Data& data);
570 
596  template<class GatherScatter, class Data>
597  void backward(Data& data);
598 
602  void free();
603 
608 
609  private:
610 
614  typedef std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
615  InterfaceMap;
616 
617 
621  template<class Data, typename IndexedTypeFlag>
622  struct MessageSizeCalculator
623  {};
624 
629  template<class Data>
630  struct MessageSizeCalculator<Data,SizeOne>
631  {
638  inline int operator()(const InterfaceInformation& info) const;
647  inline int operator()(const Data& data, const InterfaceInformation& info) const;
648  };
649 
654  template<class Data>
655  struct MessageSizeCalculator<Data,VariableSize>
656  {
665  inline int operator()(const Data& data, const InterfaceInformation& info) const;
666  };
667 
671  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
672  struct MessageGatherer
673  {};
674 
679  template<class Data, class GatherScatter, bool send>
680  struct MessageGatherer<Data,GatherScatter,send,SizeOne>
681  {
683  typedef typename CommPolicy<Data>::IndexedType Type;
684 
689  typedef GatherScatter Gatherer;
690 
691  enum {
697  forward=send
698  };
699 
707  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
708  };
709 
714  template<class Data, class GatherScatter, bool send>
715  struct MessageGatherer<Data,GatherScatter,send,VariableSize>
716  {
718  typedef typename CommPolicy<Data>::IndexedType Type;
719 
724  typedef GatherScatter Gatherer;
725 
726  enum {
732  forward=send
733  };
734 
742  inline void operator()(const InterfaceMap& interface, const Data& data, Type* buffer, size_t bufferSize) const;
743  };
744 
748  template<class Data, class GatherScatter, bool send, typename IndexedTypeFlag>
749  struct MessageScatterer
750  {};
751 
756  template<class Data, class GatherScatter, bool send>
757  struct MessageScatterer<Data,GatherScatter,send,SizeOne>
758  {
760  typedef typename CommPolicy<Data>::IndexedType Type;
761 
766  typedef GatherScatter Scatterer;
767 
768  enum {
774  forward=send
775  };
776 
784  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
785  };
790  template<class Data, class GatherScatter, bool send>
791  struct MessageScatterer<Data,GatherScatter,send,VariableSize>
792  {
794  typedef typename CommPolicy<Data>::IndexedType Type;
795 
800  typedef GatherScatter Scatterer;
801 
802  enum {
808  forward=send
809  };
810 
818  inline void operator()(const InterfaceMap& interface, Data& data, Type* buffer, const int& proc) const;
819  };
820 
824  struct MessageInformation
825  {
827  MessageInformation()
828  : start_(0), size_(0)
829  {}
830 
838  MessageInformation(size_t start, size_t size)
839  : start_(start), size_(size)
840  {}
844  size_t start_;
848  size_t size_;
849  };
850 
857  typedef std::map<int,std::pair<MessageInformation,MessageInformation> >
858  InformationMap;
862  InformationMap messageInformation_;
866  char* buffers_[2];
870  size_t bufferSize_[2];
871 
872  enum {
876  commTag_
877  };
878 
882  std::map<int,std::pair<InterfaceInformation,InterfaceInformation> > interfaces_;
883 
884  MPI_Comm communicator_;
885 
889  template<class GatherScatter, bool FORWARD, class Data>
890  void sendRecv(const Data& source, Data& target);
891 
892  };
893 
894 #ifndef DOXYGEN
895 
896  template<class V>
897  inline const void* CommPolicy<V>::getAddress(const V& v, int index)
898  {
899  return &(v[index]);
900  }
901 
902  template<class V>
903  inline int CommPolicy<V>::getSize(const V& v, int index)
904  {
906  DUNE_UNUSED_PARAMETER(index);
907  return 1;
908  }
909 
910  template<class K, class A, int n>
911  inline const void* CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getAddress(const Type& v, int index)
912  {
913  return &(v[index][0]);
914  }
915 
916  template<class K, class A, int n>
917  inline int CommPolicy<VariableBlockVector<FieldVector<K, n>, A> >::getSize(const Type& v, int index)
918  {
919  return v[index].getsize();
920  }
921 
922  template<class T>
923  inline const typename CopyGatherScatter<T>::IndexedType& CopyGatherScatter<T>::gather(const T & vec, std::size_t i)
924  {
925  return vec[i];
926  }
927 
928  template<class T>
929  inline void CopyGatherScatter<T>::scatter(T& vec, const IndexedType& v, std::size_t i)
930  {
931  vec[i]=v;
932  }
933 
934  template<typename T>
935  DatatypeCommunicator<T>::DatatypeCommunicator()
936  : remoteIndices_(0), created_(false)
937  {
938  requests_[0]=0;
939  requests_[1]=0;
940  }
941 
942 
943 
944  template<typename T>
945  DatatypeCommunicator<T>::~DatatypeCommunicator()
946  {
947  free();
948  }
949 
950  template<typename T>
951  template<class T1, class T2, class V>
952  inline void DatatypeCommunicator<T>::build(const RemoteIndices& remoteIndices,
953  const T1& source, V& sendData,
954  const T2& destination, V& receiveData)
955  {
956  remoteIndices_ = &remoteIndices;
957  free();
958  createDataTypes<T1,T2,V,false>(source,destination, receiveData);
959  createDataTypes<T1,T2,V,true>(source,destination, sendData);
960  createRequests<V,true>(sendData, receiveData);
961  createRequests<V,false>(receiveData, sendData);
962  created_=true;
963  }
964 
965  template<typename T>
966  void DatatypeCommunicator<T>::free()
967  {
968  if(created_) {
969  delete[] requests_[0];
970  delete[] requests_[1];
971  typedef MessageTypeMap::iterator iterator;
972  typedef MessageTypeMap::const_iterator const_iterator;
973 
974  const const_iterator end=messageTypes.end();
975 
976  for(iterator process = messageTypes.begin(); process != end; ++process) {
977  MPI_Datatype *type = &(process->second.first);
978  int finalized=0;
979 #if MPI_2
980  MPI_Finalized(&finalized);
981 #endif
982  if(*type!=MPI_DATATYPE_NULL && !finalized)
983  MPI_Type_free(type);
984  type = &(process->second.second);
985  if(*type!=MPI_DATATYPE_NULL && !finalized)
986  MPI_Type_free(type);
987  }
988  messageTypes.clear();
989  created_=false;
990  }
991 
992  }
993 
994  template<typename T>
995  template<class T1, class T2, class V, bool send>
996  void DatatypeCommunicator<T>::createDataTypes(const T1& sourceFlags, const T2& destFlags, V& data)
997  {
998 
999  MPIDatatypeInformation<V> dataInfo(data);
1000  this->template buildInterface<RemoteIndices,T1,T2,MPIDatatypeInformation<V>,send>(*remoteIndices_,sourceFlags, destFlags, dataInfo);
1001 
1002  typedef typename RemoteIndices::RemoteIndexMap::const_iterator const_iterator;
1003  const const_iterator end=this->remoteIndices_->end();
1004 
1005  // Allocate MPI_Datatypes and deallocate memory for the type construction.
1006  for(const_iterator process=this->remoteIndices_->begin(); process != end; ++process) {
1007  IndexedTypeInformation& info=dataInfo.information_[process->first];
1008  // Shift the displacement
1009  MPI_Aint base;
1010  MPI_Address(const_cast<void *>(CommPolicy<V>::getAddress(data, 0)), &base);
1011 
1012  for(int i=0; i< info.elements; i++) {
1013  info.displ[i]-=base;
1014  }
1015 
1016  // Create data type
1017  MPI_Datatype* type = &( send ? messageTypes[process->first].first : messageTypes[process->first].second);
1018  MPI_Type_hindexed(info.elements, info.length, info.displ,
1019  MPITraits<typename CommPolicy<V>::IndexedType>::getType(),
1020  type);
1021  MPI_Type_commit(type);
1022  // Deallocate memory
1023  info.free();
1024  }
1025  }
1026 
1027  template<typename T>
1028  template<class V, bool createForward>
1029  void DatatypeCommunicator<T>::createRequests(V& sendData, V& receiveData)
1030  {
1031  typedef std::map<int,std::pair<MPI_Datatype,MPI_Datatype> >::const_iterator MapIterator;
1032  int rank;
1033  static int index = createForward ? 1 : 0;
1034  int noMessages = messageTypes.size();
1035  // allocate request handles
1036  requests_[index] = new MPI_Request[2*noMessages];
1037  const MapIterator end = messageTypes.end();
1038  int request=0;
1039  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1040 
1041  // Set up the requests for receiving first
1042  for(MapIterator process = messageTypes.begin(); process != end;
1043  ++process, ++request) {
1044  MPI_Datatype type = createForward ? process->second.second : process->second.first;
1045  void* address = const_cast<void*>(CommPolicy<V>::getAddress(receiveData,0));
1046  MPI_Recv_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1047  }
1048 
1049  // And now the send requests
1050 
1051  for(MapIterator process = messageTypes.begin(); process != end;
1052  ++process, ++request) {
1053  MPI_Datatype type = createForward ? process->second.first : process->second.second;
1054  void* address = const_cast<void*>(CommPolicy<V>::getAddress(sendData, 0));
1055  MPI_Ssend_init(address, 1, type, process->first, commTag_, this->remoteIndices_->communicator(), requests_[index]+request);
1056  }
1057  }
1058 
1059  template<typename T>
1060  void DatatypeCommunicator<T>::forward()
1061  {
1062  sendRecv(requests_[1]);
1063  }
1064 
1065  template<typename T>
1066  void DatatypeCommunicator<T>::backward()
1067  {
1068  sendRecv(requests_[0]);
1069  }
1070 
1071  template<typename T>
1072  void DatatypeCommunicator<T>::sendRecv(MPI_Request* requests)
1073  {
1074  int noMessages = messageTypes.size();
1075  // Start the receive calls first
1076  MPI_Startall(noMessages, requests);
1077  // Now the send calls
1078  MPI_Startall(noMessages, requests+noMessages);
1079 
1080  // Wait for completion of the communication send first then receive
1081  MPI_Status* status=new MPI_Status[2*noMessages];
1082  for(int i=0; i<2*noMessages; i++)
1083  status[i].MPI_ERROR=MPI_SUCCESS;
1084 
1085  int send = MPI_Waitall(noMessages, requests+noMessages, status+noMessages);
1086  int receive = MPI_Waitall(noMessages, requests, status);
1087 
1088  // Error checks
1089  int success=1, globalSuccess=0;
1090  if(send==MPI_ERR_IN_STATUS) {
1091  int rank;
1092  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1093  std::cerr<<rank<<": Error in sending :"<<std::endl;
1094  // Search for the error
1095  for(int i=noMessages; i< 2*noMessages; i++)
1096  if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1097  char message[300];
1098  int messageLength;
1099  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1100  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1101  for(int i=0; i< messageLength; i++)
1102  std::cout<<message[i];
1103  }
1104  std::cerr<<std::endl;
1105  success=0;
1106  }
1107 
1108  if(receive==MPI_ERR_IN_STATUS) {
1109  int rank;
1110  MPI_Comm_rank(this->remoteIndices_->communicator(), &rank);
1111  std::cerr<<rank<<": Error in receiving!"<<std::endl;
1112  // Search for the error
1113  for(int i=0; i< noMessages; i++)
1114  if(status[i].MPI_ERROR!=MPI_SUCCESS) {
1115  char message[300];
1116  int messageLength;
1117  MPI_Error_string(status[i].MPI_ERROR, message, &messageLength);
1118  std::cerr<<" source="<<status[i].MPI_SOURCE<<" message: ";
1119  for(int i=0; i< messageLength; i++)
1120  std::cerr<<message[i];
1121  }
1122  std::cerr<<std::endl;
1123  success=0;
1124  }
1125 
1126  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, this->remoteIndices_->communicator());
1127 
1128  delete[] status;
1129 
1130  if(!globalSuccess)
1131  DUNE_THROW(CommunicationError, "A communication error occurred!");
1132 
1133  }
1134 
1136  {
1137  buffers_[0]=0;
1138  buffers_[1]=0;
1139  bufferSize_[0]=0;
1140  bufferSize_[1]=0;
1141  }
1142 
1143  template<class Data, class Interface>
1144  typename enable_if<is_same<SizeOne, typename CommPolicy<Data>::IndexedTypeFlag>::value, void>::type
1145  BufferedCommunicator::build(const Interface& interface)
1146  {
1147  interfaces_=interface.interfaces();
1148  communicator_=interface.communicator();
1149  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1150  ::const_iterator const_iterator;
1151  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1152  const const_iterator end = interfaces_.end();
1153  int lrank;
1154  MPI_Comm_rank(communicator_, &lrank);
1155 
1156  bufferSize_[0]=0;
1157  bufferSize_[1]=0;
1158 
1159  for(const_iterator interfacePair = interfaces_.begin();
1160  interfacePair != end; ++interfacePair) {
1161  int noSend = MessageSizeCalculator<Data,Flag>() (interfacePair->second.first);
1162  int noRecv = MessageSizeCalculator<Data,Flag>() (interfacePair->second.second);
1163  if (noSend + noRecv > 0)
1164  messageInformation_.insert(std::make_pair(interfacePair->first,
1165  std::make_pair(MessageInformation(bufferSize_[0],
1166  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1167  MessageInformation(bufferSize_[1],
1168  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1169  bufferSize_[0] += noSend;
1170  bufferSize_[1] += noRecv;
1171  }
1172 
1173  // allocate the buffers
1174  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1175  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1176 
1177  buffers_[0] = new char[bufferSize_[0]];
1178  buffers_[1] = new char[bufferSize_[1]];
1179  }
1180 
1181  template<class Data, class Interface>
1182  void BufferedCommunicator::build(const Data& source, const Data& dest, const Interface& interface)
1183  {
1184 
1185  interfaces_=interface.interfaces();
1186  communicator_=interface.communicator();
1187  typedef typename std::map<int,std::pair<InterfaceInformation,InterfaceInformation> >
1188  ::const_iterator const_iterator;
1189  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1190  const const_iterator end = interfaces_.end();
1191 
1192  bufferSize_[0]=0;
1193  bufferSize_[1]=0;
1194 
1195  for(const_iterator interfacePair = interfaces_.begin();
1196  interfacePair != end; ++interfacePair) {
1197  int noSend = MessageSizeCalculator<Data,Flag>() (source, interfacePair->second.first);
1198  int noRecv = MessageSizeCalculator<Data,Flag>() (dest, interfacePair->second.second);
1199  if (noSend + noRecv > 0)
1200  messageInformation_.insert(std::make_pair(interfacePair->first,
1201  std::make_pair(MessageInformation(bufferSize_[0],
1202  noSend*sizeof(typename CommPolicy<Data>::IndexedType)),
1203  MessageInformation(bufferSize_[1],
1204  noRecv*sizeof(typename CommPolicy<Data>::IndexedType)))));
1205  bufferSize_[0] += noSend;
1206  bufferSize_[1] += noRecv;
1207  }
1208 
1209  bufferSize_[0] *= sizeof(typename CommPolicy<Data>::IndexedType);
1210  bufferSize_[1] *= sizeof(typename CommPolicy<Data>::IndexedType);
1211  // allocate the buffers
1212  buffers_[0] = new char[bufferSize_[0]];
1213  buffers_[1] = new char[bufferSize_[1]];
1214  }
1215 
1216  inline void BufferedCommunicator::free()
1217  {
1218  messageInformation_.clear();
1219  if(buffers_[0])
1220  delete[] buffers_[0];
1221 
1222  if(buffers_[1])
1223  delete[] buffers_[1];
1224  buffers_[0]=buffers_[1]=0;
1225  }
1226 
1228  {
1229  free();
1230  }
1231 
1232  template<class Data>
1233  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1234  (const InterfaceInformation& info) const
1235  {
1236  return info.size();
1237  }
1238 
1239 
1240  template<class Data>
1241  inline int BufferedCommunicator::MessageSizeCalculator<Data,SizeOne>::operator()
1242  (const Data&, const InterfaceInformation& info) const
1243  {
1244  return operator()(info);
1245  }
1246 
1247 
1248  template<class Data>
1249  inline int BufferedCommunicator::MessageSizeCalculator<Data, VariableSize>::operator()
1250  (const Data& data, const InterfaceInformation& info) const
1251  {
1252  int entries=0;
1253 
1254  for(size_t i=0; i < info.size(); i++)
1255  entries += CommPolicy<Data>::getSize(data,info[i]);
1256 
1257  return entries;
1258  }
1259 
1260 
1261  template<class Data, class GatherScatter, bool FORWARD>
1262  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces,const Data& data, Type* buffer, size_t bufferSize) const
1263  {
1264  typedef typename InterfaceMap::const_iterator
1265  const_iterator;
1266 
1267  int rank;
1268  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1269  const const_iterator end = interfaces.end();
1270  size_t index=0;
1271 
1272  for(const_iterator interfacePair = interfaces.begin();
1273  interfacePair != end; ++interfacePair) {
1274  int size = forward ? interfacePair->second.first.size() :
1275  interfacePair->second.second.size();
1276 
1277  for(int i=0; i < size; i++) {
1278  int local = forward ? interfacePair->second.first[i] :
1279  interfacePair->second.second[i];
1280  for(std::size_t j=0; j < CommPolicy<Data>::getSize(data, local); j++, index++) {
1281 
1282 #ifdef DUNE_ISTL_WITH_CHECKING
1283  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1284 #endif
1285  buffer[index]=GatherScatter::gather(data, local, j);
1286  }
1287 
1288  }
1289  }
1290 
1291  }
1292 
1293 
1294  template<class Data, class GatherScatter, bool FORWARD>
1295  inline void BufferedCommunicator::MessageGatherer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, const Data& data, Type* buffer, size_t bufferSize) const
1296  {
1297  DUNE_UNUSED_PARAMETER(bufferSize);
1298  typedef typename InterfaceMap::const_iterator
1299  const_iterator;
1300  const const_iterator end = interfaces.end();
1301  size_t index = 0;
1302 
1303  int rank;
1304  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
1305 
1306  for(const_iterator interfacePair = interfaces.begin();
1307  interfacePair != end; ++interfacePair) {
1308  size_t size = FORWARD ? interfacePair->second.first.size() :
1309  interfacePair->second.second.size();
1310 
1311  for(size_t i=0; i < size; i++) {
1312 
1313 #ifdef DUNE_ISTL_WITH_CHECKING
1314  assert(bufferSize>=(index+1)*sizeof(typename CommPolicy<Data>::IndexedType));
1315 #endif
1316 
1317  buffer[index++] = GatherScatter::gather(data, FORWARD ? interfacePair->second.first[i] :
1318  interfacePair->second.second[i]);
1319  }
1320  }
1321 
1322  }
1323 
1324 
1325  template<class Data, class GatherScatter, bool FORWARD>
1326  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,VariableSize>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1327  {
1328  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1329  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1330 
1331  assert(infoPair!=interfaces.end());
1332 
1333  const Information& info = FORWARD ? infoPair->second.second :
1334  infoPair->second.first;
1335 
1336  for(size_t i=0, index=0; i < info.size(); i++) {
1337  for(size_t j=0; j < CommPolicy<Data>::getSize(data, info[i]); j++)
1338  GatherScatter::scatter(data, buffer[index++], info[i], j);
1339  }
1340  }
1341 
1342 
1343  template<class Data, class GatherScatter, bool FORWARD>
1344  inline void BufferedCommunicator::MessageScatterer<Data,GatherScatter,FORWARD,SizeOne>::operator()(const InterfaceMap& interfaces, Data& data, Type* buffer, const int& proc) const
1345  {
1346  typedef typename InterfaceMap::value_type::second_type::first_type Information;
1347  const typename InterfaceMap::const_iterator infoPair = interfaces.find(proc);
1348 
1349  assert(infoPair!=interfaces.end());
1350 
1351  const Information& info = FORWARD ? infoPair->second.second :
1352  infoPair->second.first;
1353 
1354  for(size_t i=0; i < info.size(); i++) {
1355  GatherScatter::scatter(data, buffer[i], info[i]);
1356  }
1357  }
1358 
1359 
1360  template<class GatherScatter,class Data>
1361  void BufferedCommunicator::forward(Data& data)
1362  {
1363  this->template sendRecv<GatherScatter,true>(data, data);
1364  }
1365 
1366 
1367  template<class GatherScatter, class Data>
1368  void BufferedCommunicator::backward(Data& data)
1369  {
1370  this->template sendRecv<GatherScatter,false>(data, data);
1371  }
1372 
1373 
1374  template<class GatherScatter, class Data>
1375  void BufferedCommunicator::forward(const Data& source, Data& dest)
1376  {
1377  this->template sendRecv<GatherScatter,true>(source, dest);
1378  }
1379 
1380 
1381  template<class GatherScatter, class Data>
1382  void BufferedCommunicator::backward(Data& source, const Data& dest)
1383  {
1384  this->template sendRecv<GatherScatter,false>(dest, source);
1385  }
1386 
1387 
1388  template<class GatherScatter, bool FORWARD, class Data>
1389  void BufferedCommunicator::sendRecv(const Data& source, Data& dest)
1390  {
1391  int rank, lrank;
1392 
1393  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
1394  MPI_Comm_rank(MPI_COMM_WORLD,&lrank);
1395 
1396  typedef typename CommPolicy<Data>::IndexedType Type;
1397  Type *sendBuffer, *recvBuffer;
1398  size_t sendBufferSize;
1399 #ifndef NDEBUG
1400  size_t recvBufferSize;
1401 #endif
1402 
1403  if(FORWARD) {
1404  sendBuffer = reinterpret_cast<Type*>(buffers_[0]);
1405  sendBufferSize = bufferSize_[0];
1406  recvBuffer = reinterpret_cast<Type*>(buffers_[1]);
1407 #ifndef NDEBUG
1408  recvBufferSize = bufferSize_[1];
1409 #endif
1410  }else{
1411  sendBuffer = reinterpret_cast<Type*>(buffers_[1]);
1412  sendBufferSize = bufferSize_[1];
1413  recvBuffer = reinterpret_cast<Type*>(buffers_[0]);
1414 #ifndef NDEBUG
1415  recvBufferSize = bufferSize_[0];
1416 #endif
1417  }
1418  typedef typename CommPolicy<Data>::IndexedTypeFlag Flag;
1419 
1420  MessageGatherer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, source, sendBuffer, sendBufferSize);
1421 
1422  MPI_Request* sendRequests = new MPI_Request[messageInformation_.size()];
1423  MPI_Request* recvRequests = new MPI_Request[messageInformation_.size()];
1424 
1425  // Setup receive first
1426  typedef typename InformationMap::const_iterator const_iterator;
1427 
1428  const const_iterator end = messageInformation_.end();
1429  size_t i=0;
1430  int* processMap = new int[messageInformation_.size()];
1431 
1432  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i) {
1433  processMap[i]=info->first;
1434  if(FORWARD) {
1435  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1436  Dune::dvverb<<rank<<": receiving "<<info->second.second.size_<<" from "<<info->first<<std::endl;
1437  MPI_Irecv(recvBuffer+info->second.second.start_, info->second.second.size_,
1438  MPI_BYTE, info->first, commTag_, communicator_,
1439  recvRequests+i);
1440  }else{
1441  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= recvBufferSize );
1442  Dune::dvverb<<rank<<": receiving "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1443  MPI_Irecv(recvBuffer+info->second.first.start_, info->second.first.size_,
1444  MPI_BYTE, info->first, commTag_, communicator_,
1445  recvRequests+i);
1446  }
1447  }
1448 
1449  // now the send requests
1450  i=0;
1451  for(const_iterator info = messageInformation_.begin(); info != end; ++info, ++i)
1452  if(FORWARD) {
1453  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= recvBufferSize );
1454  Dune::dvverb<<rank<<": sending "<<info->second.first.size_<<" to "<<info->first<<std::endl;
1455  assert(info->second.first.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.first.size_ <= sendBufferSize );
1456  MPI_Issend(sendBuffer+info->second.first.start_, info->second.first.size_,
1457  MPI_BYTE, info->first, commTag_, communicator_,
1458  sendRequests+i);
1459  }else{
1460  assert(info->second.second.start_*sizeof(typename CommPolicy<Data>::IndexedType)+info->second.second.size_ <= sendBufferSize );
1461  Dune::dvverb<<rank<<": sending "<<info->second.second.size_<<" to "<<info->first<<std::endl;
1462  MPI_Issend(sendBuffer+info->second.second.start_, info->second.second.size_,
1463  MPI_BYTE, info->first, commTag_, communicator_,
1464  sendRequests+i);
1465  }
1466 
1467  // Wait for completion of receive and immediately start scatter
1468  i=0;
1469  //int success = 1;
1470  int finished = MPI_UNDEFINED;
1471  MPI_Status status; //[messageInformation_.size()];
1472  //MPI_Waitall(messageInformation_.size(), recvRequests, status);
1473 
1474  for(i=0; i< messageInformation_.size(); i++) {
1475  status.MPI_ERROR=MPI_SUCCESS;
1476  MPI_Waitany(messageInformation_.size(), recvRequests, &finished, &status);
1477  assert(finished != MPI_UNDEFINED);
1478 
1479  if(status.MPI_ERROR==MPI_SUCCESS) {
1480  int& proc = processMap[finished];
1481  typename InformationMap::const_iterator infoIter = messageInformation_.find(proc);
1482  assert(infoIter != messageInformation_.end());
1483 
1484  MessageInformation info = (FORWARD) ? infoIter->second.second : infoIter->second.first;
1485  assert(info.start_+info.size_ <= recvBufferSize);
1486 
1487  MessageScatterer<Data,GatherScatter,FORWARD,Flag>() (interfaces_, dest, recvBuffer+info.start_, proc);
1488  }else{
1489  std::cerr<<rank<<": MPI_Error occurred while receiving message from "<<processMap[finished]<<std::endl;
1490  //success=0;
1491  }
1492  }
1493 
1494  MPI_Status recvStatus;
1495 
1496  // Wait for completion of sends
1497  for(i=0; i< messageInformation_.size(); i++)
1498  if(MPI_SUCCESS!=MPI_Wait(sendRequests+i, &recvStatus)) {
1499  std::cerr<<rank<<": MPI_Error occurred while sending message to "<<processMap[finished]<<std::endl;
1500  //success=0;
1501  }
1502  /*
1503  int globalSuccess;
1504  MPI_Allreduce(&success, &globalSuccess, 1, MPI_INT, MPI_MIN, interface_->communicator());
1505 
1506  if(!globalSuccess)
1507  DUNE_THROW(CommunicationError, "A communication error occurred!");
1508  */
1509  delete[] processMap;
1510  delete[] sendRequests;
1511  delete[] recvRequests;
1512 
1513  }
1514 
1515 #endif // DOXYGEN
1516 
1518 }
1519 
1520 #endif
1521 
1522 #endif
DVVerbType dvverb(std::cout)
stream for very verbose output.
Definition: stdstreams.hh:94
Traits for type conversions and type information.
Manager class for the mapping between local indices and globally unique indices.
Definition: indexset.hh:217
~BufferedCommunicator()
Destructor.
Dune namespace.
Definition: alignment.hh:13
A few common exception classes.
V Type
The type the policy is for.
Definition: communicator.hh:133
Enable typedef if condition is met.
Definition: typetraits.hh:328
The indices present on remote processes.
Definition: remoteindices.hh:47
Communication interface between remote and local indices.
Definition: interface.hh:207
Error thrown if there was a problem with the communication.
Definition: communicator.hh:187
LocalIndex::Attribute Attribute
The type of the attribute.
Definition: remoteindices.hh:220
std::size_t size_
The size of the buffer.
Definition: variablesizecommunicator.hh:132
void backward(Data &source, const Data &dest)
Communicate in the reverse direction, i.e. send from target to source.
ParallelIndexSet::LocalIndex LocalIndex
The type of the local index.
Definition: remoteindices.hh:215
V::value_type IndexedType
The type we get at each index with operator[].
Definition: communicator.hh:140
A communicator that uses buffers to gather and scatter the data to be send or received.
Definition: communicator.hh:452
Provides classes for building the communication interface between remote indices. ...
Flag for marking indexed data structures where the data at each index may be a variable multiple of a...
Definition: communicator.hh:110
SizeOne IndexedTypeFlag
Whether the indexed type has variable size or there is always one value at each index.
Definition: communicator.hh:146
GatherScatter default implementation that just copies data.
Definition: communicator.hh:194
void forward(const Data &source, Data &dest)
Send from source to target.
Definition: communicator.hh:166
VariableBlockVector< FieldVector< K, n >, A > Type
Definition: communicator.hh:173
Standard Dune debug streams.
ParallelIndexSet::GlobalIndex GlobalIndex
The type of the global index.
Definition: remoteindices.hh:209
#define DUNE_THROW(E, m)
Definition: exceptions.hh:244
void free()
Free the allocated memory (i.e. buffers and message information.
Information describing an interface.
Definition: interface.hh:99
enable_if< is_same< SizeOne, typename CommPolicy< Data >::IndexedTypeFlag >::value, void >::type build(const Interface &interface)
Build the buffers and information for the communication process.
BufferedCommunicator()
Constructor.
Definition: communicator.hh:168
Flag for marking indexed data structures where data at each index is of the same size.
Definition: communicator.hh:102
static int getSize(const V &, int index)
Get the number of primitve elements at that index.
#define DUNE_UNUSED_PARAMETER(parm)
A macro to mark intentional unused function parameters with.
Definition: unused.hh:18
An index present on the local process.
Definition: localindex.hh:33
Default policy used for communicating an indexed type.
Definition: communicator.hh:120
CommPolicy< T >::IndexedType IndexedType
Definition: communicator.hh:196
Default exception class for I/O errors.
Definition: exceptions.hh:257
Classes describing a distributed indexset.
static const void * getAddress(const V &v, int index)
Get the address of entry at an index.
DataHandle & data_
Definition: variablesizecommunicator.hh:521
Base class of all classes representing a communication interface.
Definition: interface.hh:33