FreeFOAM The Cross-Platform CFD Toolkit
mapDistributeTemplates.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 1991-2010 OpenCFD Ltd.
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 \*---------------------------------------------------------------------------*/
25 
26 #include <OpenFOAM/Pstream.H>
27 
28 // * * * * * * * * * * * * * Private Member Functions * * * * * * * * * * * //
29 
30 // Distribute list.
31 template<class T>
33 (
34  const Pstream::commsTypes commsType,
35  const List<labelPair>& schedule,
36  const label constructSize,
37  const labelListList& subMap,
38  const labelListList& constructMap,
39  List<T>& field
40 )
41 {
42  if (commsType == Pstream::blocking)
43  {
44  // Since buffered sending can reuse the field to collect the
45  // received data.
46 
47  // Send sub field to neighbour
48  for (label domain = 0; domain < Pstream::nProcs(); domain++)
49  {
50  const labelList& map = subMap[domain];
51 
52  if (domain != Pstream::myProcNo() && map.size())
53  {
54  OPstream toNbr(Pstream::blocking, domain);
55  toNbr << UIndirectList<T>(field, map);
56  }
57  }
58 
59  // Subset myself
60  const labelList& mySubMap = subMap[Pstream::myProcNo()];
61 
62  List<T> subField(mySubMap.size());
63  forAll(mySubMap, i)
64  {
65  subField[i] = field[mySubMap[i]];
66  }
67 
68  // Receive sub field from myself (subField)
69  const labelList& map = constructMap[Pstream::myProcNo()];
70 
71  field.setSize(constructSize);
72 
73  forAll(map, i)
74  {
75  field[map[i]] = subField[i];
76  }
77 
78  // Receive sub field from neighbour
79  for (label domain = 0; domain < Pstream::nProcs(); domain++)
80  {
81  const labelList& map = constructMap[domain];
82 
83  if (domain != Pstream::myProcNo() && map.size())
84  {
85  IPstream fromNbr(Pstream::blocking, domain);
86  List<T> subField(fromNbr);
87 
88  if (subField.size() != map.size())
89  {
91  (
92  "template<class T>\n"
93  "void mapDistribute::distribute\n"
94  "(\n"
95  " const Pstream::commsTypes commsType,\n"
96  " const List<labelPair>& schedule,\n"
97  " const label constructSize,\n"
98  " const labelListList& subMap,\n"
99  " const labelListList& constructMap,\n"
100  " List<T>& field\n"
101  ")\n"
102  ) << "Expected from processor " << domain
103  << " " << map.size() << " but received "
104  << subField.size() << " elements."
105  << abort(FatalError);
106  }
107 
108  forAll(map, i)
109  {
110  field[map[i]] = subField[i];
111  }
112  }
113  }
114  }
115  else if (commsType == Pstream::scheduled)
116  {
117  // Need to make sure I don't overwrite field with received data
118  // since the data might need to be sent to another processor. So
119  // allocate a new field for the results.
120  List<T> newField(constructSize);
121 
122  // Subset myself
123  UIndirectList<T> subField(field, subMap[Pstream::myProcNo()]);
124 
125  // Receive sub field from myself (subField)
126  const labelList& map = constructMap[Pstream::myProcNo()];
127 
128  forAll(map, i)
129  {
130  newField[map[i]] = subField[i];
131  }
132 
133  // Schedule will already have pruned 0-sized comms
134  forAll(schedule, i)
135  {
136  const labelPair& twoProcs = schedule[i];
137  label sendProc = twoProcs[0];
138  label recvProc = twoProcs[1];
139 
140  if (Pstream::myProcNo() == sendProc)
141  {
142  // I am sender. Send to recvProc.
143  OPstream toNbr(Pstream::scheduled, recvProc);
144  toNbr << UIndirectList<T>(field, subMap[recvProc]);
145  }
146  else
147  {
148  // I am receiver. Receive from sendProc.
149  IPstream fromNbr(Pstream::scheduled, sendProc);
150  List<T> subField(fromNbr);
151 
152  const labelList& map = constructMap[sendProc];
153 
154  if (subField.size() != map.size())
155  {
157  (
158  "template<class T>\n"
159  "void mapDistribute::distribute\n"
160  "(\n"
161  " const Pstream::commsTypes commsType,\n"
162  " const List<labelPair>& schedule,\n"
163  " const label constructSize,\n"
164  " const labelListList& subMap,\n"
165  " const labelListList& constructMap,\n"
166  " List<T>& field\n"
167  ")\n"
168  ) << "Expected from processor " << sendProc
169  << " " << map.size() << " but received "
170  << subField.size() << " elements."
171  << abort(FatalError);
172  }
173 
174  forAll(map, i)
175  {
176  newField[map[i]] = subField[i];
177  }
178  }
179  }
180  field.transfer(newField);
181  }
182  else if (commsType == Pstream::nonBlocking)
183  {
184  if (!contiguous<T>())
185  {
187  (
188  "template<class T>\n"
189  "void mapDistribute::distribute\n"
190  "(\n"
191  " const Pstream::commsTypes commsType,\n"
192  " const List<labelPair>& schedule,\n"
193  " const label constructSize,\n"
194  " const labelListList& subMap,\n"
195  " const labelListList& constructMap,\n"
196  " List<T>& field\n"
197  ")\n"
198  ) << "Non-blocking only supported for contiguous data."
199  << exit(FatalError);
200  }
201 
202  // Set up sends to neighbours
203 
204  List<List<T > > sendFields(Pstream::nProcs());
205 
206  for (label domain = 0; domain < Pstream::nProcs(); domain++)
207  {
208  const labelList& map = subMap[domain];
209 
210  if (domain != Pstream::myProcNo() && map.size())
211  {
212  List<T>& subField = sendFields[domain];
213  subField.setSize(map.size());
214  forAll(map, i)
215  {
216  subField[i] = field[map[i]];
217  }
218 
219  OPstream::write
220  (
221  Pstream::nonBlocking,
222  domain,
223  reinterpret_cast<const char*>(subField.begin()),
224  subField.size()*sizeof(T)
225  );
226  }
227  }
228 
229  // Set up receives from neighbours
230 
231  List<List<T > > recvFields(Pstream::nProcs());
232 
233  for (label domain = 0; domain < Pstream::nProcs(); domain++)
234  {
235  const labelList& map = constructMap[domain];
236 
237  if (domain != Pstream::myProcNo() && map.size())
238  {
239  recvFields[domain].setSize(map.size());
240  IPstream::read
241  (
242  Pstream::nonBlocking,
243  domain,
244  reinterpret_cast<char*>(recvFields[domain].begin()),
245  recvFields[domain].size()*sizeof(T)
246  );
247  }
248  }
249 
250 
251  // Set up 'send' to myself
252 
253  {
254  const labelList& map = subMap[Pstream::myProcNo()];
255 
256  List<T>& subField = sendFields[Pstream::myProcNo()];
257  subField.setSize(map.size());
258  forAll(map, i)
259  {
260  subField[i] = field[map[i]];
261  }
262  }
263 
264 
265  // Combine bits. Note that can reuse field storage
266 
267  field.setSize(constructSize);
268 
269 
270  // Receive sub field from myself (sendFields[Pstream::myProcNo()])
271  {
272  const labelList& map = constructMap[Pstream::myProcNo()];
273  const List<T>& subField = sendFields[Pstream::myProcNo()];
274 
275  forAll(map, i)
276  {
277  field[map[i]] = subField[i];
278  }
279  }
280 
281 
282  // Wait for all to finish
283 
284  OPstream::waitRequests();
285  IPstream::waitRequests();
286 
287  // Collect neighbour fields
288 
289  for (label domain = 0; domain < Pstream::nProcs(); domain++)
290  {
291  const labelList& map = constructMap[domain];
292 
293  if (domain != Pstream::myProcNo() && map.size())
294  {
295  if (recvFields[domain].size() != map.size())
296  {
298  (
299  "template<class T>\n"
300  "void mapDistribute::distribute\n"
301  "(\n"
302  " const Pstream::commsTypes commsType,\n"
303  " const List<labelPair>& schedule,\n"
304  " const label constructSize,\n"
305  " const labelListList& subMap,\n"
306  " const labelListList& constructMap,\n"
307  " List<T>& field\n"
308  ")\n"
309  ) << "Expected from processor " << domain
310  << " " << map.size() << " but received "
311  << recvFields[domain].size() << " elements."
312  << abort(FatalError);
313  }
314 
315  forAll(map, i)
316  {
317  field[map[i]] = recvFields[domain][i];
318  }
319  }
320  }
321  }
322  else
323  {
324  FatalErrorIn("mapDistribute::distribute(..)")
325  << "Unknown communication schedule " << commsType
326  << abort(FatalError);
327  }
328 }
329 
330 
331 // Distribute list.
332 template<class T, class CombineOp>
334 (
335  const Pstream::commsTypes commsType,
336  const List<labelPair>& schedule,
337  const label constructSize,
338  const labelListList& subMap,
339  const labelListList& constructMap,
340  List<T>& field,
341  const CombineOp& cop,
342  const T& nullValue
343 )
344 {
345  if (commsType == Pstream::blocking)
346  {
347  // Since buffered sending can reuse the field to collect the
348  // received data.
349 
350  // Send sub field to neighbour
351  for (label domain = 0; domain < Pstream::nProcs(); domain++)
352  {
353  const labelList& map = subMap[domain];
354 
355  if (domain != Pstream::myProcNo() && map.size())
356  {
357  OPstream toNbr(Pstream::blocking, domain);
358  toNbr << UIndirectList<T>(field, map);
359  }
360  }
361 
362  // Subset myself
363  const labelList& mySubMap = subMap[Pstream::myProcNo()];
364 
365  List<T> subField(mySubMap.size());
366  forAll(mySubMap, i)
367  {
368  subField[i] = field[mySubMap[i]];
369  }
370 
371  // Receive sub field from myself (subField)
372  const labelList& map = constructMap[Pstream::myProcNo()];
373 
374  field.setSize(constructSize);
375  field = nullValue;
376 
377  forAll(map, i)
378  {
379  cop(field[map[i]], subField[i]);
380  }
381 
382  // Receive sub field from neighbour
383  for (label domain = 0; domain < Pstream::nProcs(); domain++)
384  {
385  const labelList& map = constructMap[domain];
386 
387  if (domain != Pstream::myProcNo() && map.size())
388  {
389  IPstream fromNbr(Pstream::blocking, domain);
390  List<T> subField(fromNbr);
391 
392  if (subField.size() != map.size())
393  {
395  (
396  "template<class T>\n"
397  "void mapDistribute::distribute\n"
398  "(\n"
399  " const Pstream::commsTypes commsType,\n"
400  " const List<labelPair>& schedule,\n"
401  " const label constructSize,\n"
402  " const labelListList& subMap,\n"
403  " const labelListList& constructMap,\n"
404  " List<T>& field\n"
405  ")\n"
406  ) << "Expected from processor " << domain
407  << " " << map.size() << " but received "
408  << subField.size() << " elements."
409  << abort(FatalError);
410  }
411 
412  forAll(map, i)
413  {
414  cop(field[map[i]], subField[i]);
415  }
416  }
417  }
418  }
419  else if (commsType == Pstream::scheduled)
420  {
421  // Need to make sure I don't overwrite field with received data
422  // since the data might need to be sent to another processor. So
423  // allocate a new field for the results.
424  List<T> newField(constructSize, nullValue);
425 
426  // Subset myself
427  UIndirectList<T> subField(field, subMap[Pstream::myProcNo()]);
428 
429  // Receive sub field from myself (subField)
430  const labelList& map = constructMap[Pstream::myProcNo()];
431 
432  forAll(map, i)
433  {
434  cop(newField[map[i]], subField[i]);
435  }
436 
437  // Schedule will already have pruned 0-sized comms
438  forAll(schedule, i)
439  {
440  const labelPair& twoProcs = schedule[i];
441  label sendProc = twoProcs[0];
442  label recvProc = twoProcs[1];
443 
444  if (Pstream::myProcNo() == sendProc)
445  {
446  // I am sender. Send to recvProc.
447  OPstream toNbr(Pstream::scheduled, recvProc);
448  toNbr << UIndirectList<T>(field, subMap[recvProc]);
449  }
450  else
451  {
452  // I am receiver. Receive from sendProc.
453  IPstream fromNbr(Pstream::scheduled, sendProc);
454  List<T> subField(fromNbr);
455 
456  const labelList& map = constructMap[sendProc];
457 
458  if (subField.size() != map.size())
459  {
461  (
462  "template<class T>\n"
463  "void mapDistribute::distribute\n"
464  "(\n"
465  " const Pstream::commsTypes commsType,\n"
466  " const List<labelPair>& schedule,\n"
467  " const label constructSize,\n"
468  " const labelListList& subMap,\n"
469  " const labelListList& constructMap,\n"
470  " List<T>& field\n"
471  ")\n"
472  ) << "Expected from processor " << sendProc
473  << " " << map.size() << " but received "
474  << subField.size() << " elements."
475  << abort(FatalError);
476  }
477 
478  forAll(map, i)
479  {
480  cop(newField[map[i]], subField[i]);
481  }
482  }
483  }
484  field.transfer(newField);
485  }
486  else if (commsType == Pstream::nonBlocking)
487  {
488  if (!contiguous<T>())
489  {
491  (
492  "template<class T>\n"
493  "void mapDistribute::distribute\n"
494  "(\n"
495  " const Pstream::commsTypes commsType,\n"
496  " const List<labelPair>& schedule,\n"
497  " const label constructSize,\n"
498  " const labelListList& subMap,\n"
499  " const labelListList& constructMap,\n"
500  " List<T>& field\n"
501  ")\n"
502  ) << "Non-blocking only supported for contiguous data."
503  << exit(FatalError);
504  }
505 
506  // Set up sends to neighbours
507 
508  List<List<T > > sendFields(Pstream::nProcs());
509 
510  for (label domain = 0; domain < Pstream::nProcs(); domain++)
511  {
512  const labelList& map = subMap[domain];
513 
514  if (domain != Pstream::myProcNo() && map.size())
515  {
516  List<T>& subField = sendFields[domain];
517  subField.setSize(map.size());
518  forAll(map, i)
519  {
520  subField[i] = field[map[i]];
521  }
522 
523  OPstream::write
524  (
525  Pstream::nonBlocking,
526  domain,
527  reinterpret_cast<const char*>(subField.begin()),
528  subField.size()*sizeof(T)
529  );
530  }
531  }
532 
533  // Set up receives from neighbours
534 
535  List<List<T > > recvFields(Pstream::nProcs());
536 
537  for (label domain = 0; domain < Pstream::nProcs(); domain++)
538  {
539  const labelList& map = constructMap[domain];
540 
541  if (domain != Pstream::myProcNo() && map.size())
542  {
543  recvFields[domain].setSize(map.size());
544  IPstream::read
545  (
546  Pstream::nonBlocking,
547  domain,
548  reinterpret_cast<char*>(recvFields[domain].begin()),
549  recvFields[domain].size()*sizeof(T)
550  );
551  }
552  }
553 
554  // Set up 'send' to myself
555 
556  {
557  const labelList& map = subMap[Pstream::myProcNo()];
558 
559  List<T>& subField = sendFields[Pstream::myProcNo()];
560  subField.setSize(map.size());
561  forAll(map, i)
562  {
563  subField[i] = field[map[i]];
564  }
565  }
566 
567 
568  // Combine bits. Note that can reuse field storage
569 
570  field.setSize(constructSize);
571  field = nullValue;
572 
573  // Receive sub field from myself (subField)
574  {
575  const labelList& map = constructMap[Pstream::myProcNo()];
576  const List<T>& subField = sendFields[Pstream::myProcNo()];
577 
578  forAll(map, i)
579  {
580  cop(field[map[i]], subField[i]);
581  }
582  }
583 
584 
585  // Wait for all to finish
586 
587  OPstream::waitRequests();
588  IPstream::waitRequests();
589 
590  // Collect neighbour fields
591 
592  for (label domain = 0; domain < Pstream::nProcs(); domain++)
593  {
594  const labelList& map = constructMap[domain];
595 
596  if (domain != Pstream::myProcNo() && map.size())
597  {
598  if (recvFields[domain].size() != map.size())
599  {
601  (
602  "template<class T>\n"
603  "void mapDistribute::distribute\n"
604  "(\n"
605  " const Pstream::commsTypes commsType,\n"
606  " const List<labelPair>& schedule,\n"
607  " const label constructSize,\n"
608  " const labelListList& subMap,\n"
609  " const labelListList& constructMap,\n"
610  " List<T>& field\n"
611  ")\n"
612  ) << "Expected from processor " << domain
613  << " " << map.size() << " but received "
614  << recvFields[domain].size() << " elements."
615  << abort(FatalError);
616  }
617 
618  forAll(map, i)
619  {
620  cop(field[map[i]], recvFields[domain][i]);
621  }
622  }
623  }
624  }
625  else
626  {
627  FatalErrorIn("mapDistribute::distribute(..)")
628  << "Unknown communication schedule " << commsType
629  << abort(FatalError);
630  }
631 }
632 
633 
634 // ************************ vim: set sw=4 sts=4 et: ************************ //