FreeFOAM The Cross-Platform CFD Toolkit
mpiIPstreamImpl.C
Go to the documentation of this file.
1 /*---------------------------------------------------------------------------*\
2  ========= |
3  \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
4  \\ / O peration |
5  \\ / A nd | Copyright (C) 1991-2010 OpenCFD Ltd.
6  \\/ M anipulation |
7 -------------------------------------------------------------------------------
8 License
9  This file is part of OpenFOAM.
10 
11  OpenFOAM is free software: you can redistribute it and/or modify it
12  under the terms of the GNU General Public License as published by
13  the Free Software Foundation, either version 3 of the License, or
14  (at your option) any later version.
15 
16  OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
19  for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with OpenFOAM. If not, see <http://www.gnu.org/licenses/>.
23 
24 Description
25  Read token and binary block from mpiIPstreamImpl
26 
27 \*---------------------------------------------------------------------------*/
28 
29 #include "mpi.h"
30 
31 #include "mpiIPstreamImpl.H"
32 #include "mpiPstreamGlobals.H"
33 
35 #include <OpenFOAM/Pstream.H>
36 
37 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
38 
39 // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * //
40 
41 namespace Foam
42 {
43 
44 defineTypeNameAndDebug(mpiIPstreamImpl, 0);
45 addToRunTimeSelectionTable(IPstreamImpl, mpiIPstreamImpl, dictionary);
46 
47 }
48 
49 
50 // * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * //
51 
53 (
54  const PstreamImpl::commsTypes commsType,
55  const label bufSize,
56  int& fromProcNo,
57  label& messageSize,
58  List<char>& buf
59 )
60 {
61  MPI_Status status;
62 
63  // If the buffer size is not specified, probe the incoming message
64  // and set it
65  if (!bufSize)
66  {
67  MPI_Probe(Pstream::procID(fromProcNo), Pstream::msgType(), MPI_COMM_WORLD, &status);
68  MPI_Get_count(&status, MPI_BYTE, &messageSize);
69 
70  buf.setSize(messageSize);
71  }
72 
73  messageSize = read(commsType, fromProcNo, buf.begin(), buf.size());
74 
75  if (!messageSize)
76  {
78  (
79  "mpiIPstreamImpl::mpiIPstreamImpl(const commsTypes commsType, const label bufSize, "
80  "const int fromProcNo, label& messageSize, List<char>& buf)"
81  ) << "read failed"
83  }
84 }
85 
86 
88 (
89  const PstreamImpl::commsTypes commsType,
90  const int fromProcNo,
91  char* buf,
92  const std::streamsize bufSize
93 )
94 {
95  if (commsType == PstreamImpl::blocking || commsType == PstreamImpl::scheduled)
96  {
97  MPI_Status status;
98 
99  if
100  (
101  MPI_Recv
102  (
103  buf,
104  bufSize,
105  MPI_PACKED,
106  Pstream::procID(fromProcNo),
107  Pstream::msgType(),
108  MPI_COMM_WORLD,
109  &status
110  )
111  )
112  {
114  (
115  "mpiIPstreamImpl::read"
116  "(const int fromProcNo, char* buf, std::streamsize bufSize)"
117  ) << "MPI_Recv cannot receive incoming message"
119 
120  return 0;
121  }
122 
123 
124  // Check size of message read
125 
126  label messageSize;
127  MPI_Get_count(&status, MPI_BYTE, &messageSize);
128 
129  if (messageSize > bufSize)
130  {
132  (
133  "mpiIPstreamImpl::read"
134  "(const int fromProcNo, char* buf, std::streamsize bufSize)"
135  ) << "buffer (" << label(bufSize)
136  << ") not large enough for incoming message ("
137  << messageSize << ')'
139  }
140 
141  return messageSize;
142  }
143  else if (commsType == PstreamImpl::nonBlocking)
144  {
145  MPI_Request request;
146 
147  if
148  (
149  MPI_Irecv
150  (
151  buf,
152  bufSize,
153  MPI_PACKED,
154  Pstream::procID(fromProcNo),
155  Pstream::msgType(),
156  MPI_COMM_WORLD,
157  &request
158  )
159  )
160  {
162  (
163  "mpiIPstreamImpl::read"
164  "(const int fromProcNo, char* buf, std::streamsize bufSize)"
165  ) << "MPI_Recv cannot start non-blocking receive"
167 
168  return 0;
169  }
170 
172 
173  return 1;
174  }
175  else
176  {
178  (
179  "mpiIPstreamImpl::read"
180  "(const int fromProcNo, char* buf, std::streamsize bufSize)"
181  ) << "Unsupported communications type " << commsType
183 
184  return 0;
185  }
186 }
187 
188 
190 {
192  {
193  if
194  (
195  MPI_Waitall
196  (
199  MPI_STATUSES_IGNORE
200  )
201  )
202  {
204  (
205  "mpiIPstreamImpl::waitRequests()"
206  ) << "MPI_Waitall returned with error" << endl;
207  }
208 
210  }
211 }
212 
213 
215 {
217  {
219  (
220  "mpiIPstreamImpl::finishedRequest(const label)"
221  ) << "There are "
223  << " outstanding send requests and you are asking for i=" << i
224  << nl
225  << "Maybe you are mixing blocking/non-blocking comms?"
227  }
228 
229  int flag;
230  MPI_Test
231  (
233  &flag,
234  MPI_STATUS_IGNORE
235  );
236 
237  return flag != 0;
238 }
239 
240 
241 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
242 
243 // ************************ vim: set sw=4 sts=4 et: ************************ //