libdap  Updated for version 3.18.2
XDRStreamMarshaller.cc
1 // XDRStreamMarshaller.cc
2 
3 // -*- mode: c++; c-basic-offset:4 -*-
4 
5 // This file is part of libdap, A C++ implementation of the OPeNDAP Data
6 // Access Protocol.
7 
8 // Copyright (c) 2002,2003,2016 OPeNDAP, Inc.
9 // Author: Patrick West <pwest@ucar.edu>
10 // James Gallagher <jgallagher@opendap.org>
11 //
12 // This library is free software; you can redistribute it and/or
13 // modify it under the terms of the GNU Lesser General Public
14 // License as published by the Free Software Foundation; either
15 // version 2.1 of the License, or (at your option) any later version.
16 //
17 // This library is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 // Lesser General Public License for more details.
21 //
22 // You should have received a copy of the GNU Lesser General Public
23 // License along with this library; if not, write to the Free Software
24 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25 //
26 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
27 
28 // (c) COPYRIGHT URI/MIT 1994-1999
29 // Please read the full copyright statement in the file COPYRIGHT_URI.
30 //
31 // Authors:
32 // pwest Patrick West <pwest@ucar.edu>
33 
34 
35 #include "config.h"
36 
37 #ifdef HAVE_PTHREAD_H
38 #include <pthread.h>
39 #endif
40 
41 #include <cassert>
42 
43 #include <iostream>
44 #include <sstream>
45 #include <iomanip>
46 
47 // #define DODS_DEBUG
48 
49 #include "XDRStreamMarshaller.h"
50 #ifdef USE_POSIX_THREADS
51 #include "MarshallerThread.h"
52 #endif
53 #include "Vector.h"
54 #include "XDRUtils.h"
55 #include "util.h"
56 
57 #include "debug.h"
58 
59 using namespace std;
60 
61 // Build this code so it does not use pthreads to write some kinds of
62 // data (see the put_vector() and put_vector_part() methods) in a child thread.
63 // #undef USE_POSIX_THREADS
64 
65 namespace libdap {
66 
67 char *XDRStreamMarshaller::d_buf = 0;
68 static const int XDR_DAP_BUFF_SIZE=256;
69 
70 
79 XDRStreamMarshaller::XDRStreamMarshaller(ostream &out) :
80  d_out(out), d_partial_put_byte_count(0), tm(0)
81 {
82  if (!d_buf) d_buf = (char *) malloc(XDR_DAP_BUFF_SIZE);
83  if (!d_buf) throw Error(internal_error, "Failed to allocate memory for data serialization.");
84 
85  xdrmem_create(&d_sink, d_buf, XDR_DAP_BUFF_SIZE, XDR_ENCODE);
86 
87 #ifdef USE_POSIX_THREADS
88  tm = new MarshallerThread;
89 #endif
90 }
91 
92 XDRStreamMarshaller::~XDRStreamMarshaller()
93 {
94  // Added this because when USE_POS... is not defined, 'tm' has no
95  // type, which the compiler complains about.
96 #ifdef USE_POSIX_THREADS
97  delete tm;
98 #endif
99  xdr_destroy(&d_sink);
100 }
101 
102 void XDRStreamMarshaller::put_byte(dods_byte val)
103 {
104  if (!xdr_setpos(&d_sink, 0))
105  throw Error("Network I/O Error. Could not send byte data - unable to set stream position.");
106 
107  if (!xdr_char(&d_sink, (char *) &val))
108  throw Error(
109  "Network I/O Error. Could not send byte data.");
110 
111  unsigned int bytes_written = xdr_getpos(&d_sink);
112  if (!bytes_written)
113  throw Error(
114  "Network I/O Error. Could not send byte data - unable to get stream position.");
115 
116 #ifdef USE_POSIX_THREADS
117  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
118 #endif
119 
120  d_out.write(d_buf, bytes_written);
121 }
122 
123 void XDRStreamMarshaller::put_int16(dods_int16 val)
124 {
125  if (!xdr_setpos(&d_sink, 0))
126  throw Error(
127  "Network I/O Error. Could not send int 16 data - unable to set stream position.");
128 
129  if (!XDR_INT16(&d_sink, &val))
130  throw Error(
131  "Network I/O Error. Could not send int 16 data.");
132 
133  unsigned int bytes_written = xdr_getpos(&d_sink);
134  if (!bytes_written)
135  throw Error(
136  "Network I/O Error. Could not send int 16 data - unable to get stream position.");
137 
138 #ifdef USE_POSIX_THREADS
139  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
140 #endif
141 
142  d_out.write(d_buf, bytes_written);
143 }
144 
145 void XDRStreamMarshaller::put_int32(dods_int32 val)
146 {
147  if (!xdr_setpos(&d_sink, 0))
148  throw Error(
149  "Network I/O Error. Could not send int 32 data - unable to set stream position.");
150 
151  if (!XDR_INT32(&d_sink, &val))
152  throw Error(
153  "Network I/O Error. Culd not read int 32 data.");
154 
155  unsigned int bytes_written = xdr_getpos(&d_sink);
156  if (!bytes_written)
157  throw Error(
158  "Network I/O Error. Could not send int 32 data - unable to get stream position.");
159 
160 #ifdef USE_POSIX_THREADS
161  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
162 #endif
163 
164  d_out.write(d_buf, bytes_written);
165 }
166 
167 void XDRStreamMarshaller::put_float32(dods_float32 val)
168 {
169  if (!xdr_setpos(&d_sink, 0))
170  throw Error(
171  "Network I/O Error. Could not send float 32 data - unable to set stream position.");
172 
173  if (!xdr_float(&d_sink, &val))
174  throw Error(
175  "Network I/O Error. Could not send float 32 data.");
176 
177  unsigned int bytes_written = xdr_getpos(&d_sink);
178  if (!bytes_written)
179  throw Error(
180  "Network I/O Error. Could not send float 32 data - unable to get stream position.");
181 
182 #ifdef USE_POSIX_THREADS
183  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
184 #endif
185 
186  d_out.write(d_buf, bytes_written);
187 }
188 
189 void XDRStreamMarshaller::put_float64(dods_float64 val)
190 {
191  if (!xdr_setpos(&d_sink, 0))
192  throw Error(
193  "Network I/O Error. Could not send float 64 data - unable to set stream position.");
194 
195  if (!xdr_double(&d_sink, &val))
196  throw Error(
197  "Network I/O Error. Could not send float 64 data.");
198 
199  unsigned int bytes_written = xdr_getpos(&d_sink);
200  if (!bytes_written)
201  throw Error(
202  "Network I/O Error. Could not send float 64 data - unable to get stream position.");
203 
204 #ifdef USE_POSIX_THREADS
205  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
206 #endif
207 
208  d_out.write(d_buf, bytes_written);
209 }
210 
211 void XDRStreamMarshaller::put_uint16(dods_uint16 val)
212 {
213  if (!xdr_setpos(&d_sink, 0))
214  throw Error(
215  "Network I/O Error. Could not send uint 16 data - unable to set stream position.");
216 
217  if (!XDR_UINT16(&d_sink, &val))
218  throw Error(
219  "Network I/O Error. Could not send uint 16 data.");
220 
221  unsigned int bytes_written = xdr_getpos(&d_sink);
222  if (!bytes_written)
223  throw Error(
224  "Network I/O Error. Could not send uint 16 data - unable to get stream position.");
225 
226 #ifdef USE_POSIX_THREADS
227  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
228 #endif
229 
230  d_out.write(d_buf, bytes_written);
231 }
232 
233 void XDRStreamMarshaller::put_uint32(dods_uint32 val)
234 {
235  if (!xdr_setpos(&d_sink, 0))
236  throw Error(
237  "Network I/O Error. Could not send uint 32 data - unable to set stream position.");
238 
239  if (!XDR_UINT32(&d_sink, &val))
240  throw Error(
241  "Network I/O Error. Could not send uint 32 data.");
242 
243  unsigned int bytes_written = xdr_getpos(&d_sink);
244  if (!bytes_written)
245  throw Error(
246  "Network I/O Error. Could not send uint 32 data - unable to get stream position.");
247 
248 #ifdef USE_POSIX_THREADS
249  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
250 #endif
251 
252  d_out.write(d_buf, bytes_written);
253 }
254 
255 void XDRStreamMarshaller::put_str(const string &val)
256 {
257  int size = val.length() + 8;
258 
259  XDR str_sink;
260  vector<char> str_buf(size);
261 
262  try {
263  xdrmem_create(&str_sink, &str_buf[0], size, XDR_ENCODE);
264 
265  if (!xdr_setpos(&str_sink, 0))
266  throw Error(
267  "Network I/O Error. Could not send string data - unable to set stream position.");
268 
269  const char *out_tmp = val.c_str();
270  if (!xdr_string(&str_sink, (char **) &out_tmp, size))
271  throw Error(
272  "Network I/O Error. Could not send string data.");
273 
274  unsigned int bytes_written = xdr_getpos(&str_sink);
275  if (!bytes_written)
276  throw Error(
277  "Network I/O Error. Could not send string data - unable to get stream position.");
278 
279 #ifdef USE_POSIX_THREADS
280  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
281 #endif
282 
283  d_out.write(&str_buf[0], bytes_written);
284 
285  xdr_destroy(&str_sink);
286  }
287  catch (...) {
288  xdr_destroy(&str_sink);
289  throw;
290  }
291 }
292 
293 void XDRStreamMarshaller::put_url(const string &val)
294 {
295  put_str(val);
296 }
297 
298 void XDRStreamMarshaller::put_opaque(char *val, unsigned int len)
299 {
300  if (len > XDR_DAP_BUFF_SIZE)
301  throw Error("Network I/O Error. Could not send opaque data - length of opaque data larger than allowed");
302 
303  if (!xdr_setpos(&d_sink, 0))
304  throw Error(
305  "Network I/O Error. Could not send opaque data - unable to set stream position.");
306 
307  if (!xdr_opaque(&d_sink, val, len))
308  throw Error(
309  "Network I/O Error. Could not send opaque data.");
310 
311  unsigned int bytes_written = xdr_getpos(&d_sink);
312  if (!bytes_written)
313  throw Error(
314  "Network I/O Error. Could not send opaque data - unable to get stream position.");
315 
316 #ifdef USE_POSIX_THREADS
317  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
318 #endif
319 
320  d_out.write(d_buf, bytes_written);
321 }
322 
323 void XDRStreamMarshaller::put_int(int val)
324 {
325  if (!xdr_setpos(&d_sink, 0))
326  throw Error(
327  "Network I/O Error. Could not send int data - unable to set stream position.");
328 
329  if (!xdr_int(&d_sink, &val))
330  throw Error(
331  "Network I/O Error(1). Could not send int data.");
332 
333  unsigned int bytes_written = xdr_getpos(&d_sink);
334  if (!bytes_written)
335  throw Error(
336  "Network I/O Error. Could not send int data - unable to get stream position.");
337 
338 #ifdef USE_POSIX_THREADS
339  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
340 #endif
341 
342  d_out.write(d_buf, bytes_written);
343 }
344 
345 void XDRStreamMarshaller::put_vector(char *val, int num, int width, Vector &vec)
346 {
347  put_vector(val, num, width, vec.var()->type());
348 }
349 
350 
359 {
360  put_int(num);
361  put_int(num);
362 
363  d_partial_put_byte_count = 0;
364 }
365 
373 {
374 #ifdef USE_POSIX_THREADS
375  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
376 #endif
377 
378  // Compute the trailing (padding) bytes
379 
380  // Note that the XDR standard pads values to 4 byte boundaries.
381  //unsigned int pad = (d_partial_put_byte_count % 4) == 0 ? 0: 4 - (d_partial_put_byte_count % 4);
382  unsigned int mod_4 = d_partial_put_byte_count & 0x03;
383  unsigned int pad = (mod_4 == 0) ? 0: 4 - mod_4;
384 
385  if (pad) {
386  vector<char> padding(4, 0); // 4 zeros
387 
388  d_out.write(&padding[0], pad);
389  if (d_out.fail()) throw Error("Network I/O Error. Could not send vector data padding");
390  }
391 }
392 
393 // Start of parallel I/O support. jhrg 8/19/15
394 void XDRStreamMarshaller::put_vector(char *val, int num, Vector &)
395 {
396  if (!val) throw InternalErr(__FILE__, __LINE__, "Could not send byte vector data. Buffer pointer is not set.");
397 
398  // write the number of members of the array being written and then set the position to 0
399  put_int(num);
400 
401  // this is the word boundary for writing xdr bytes in a vector.
402  const unsigned int add_to = 8;
403  // switch to memory on the heap since the thread will need to access it
404  // after this code returns.
405  char *byte_buf = new char[num + add_to];
406  XDR byte_sink;
407  try {
408  xdrmem_create(&byte_sink, byte_buf, num + add_to, XDR_ENCODE);
409  if (!xdr_setpos(&byte_sink, 0))
410  throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
411 
412  if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, num + add_to))
413  throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
414 
415  unsigned int bytes_written = xdr_getpos(&byte_sink);
416  if (!bytes_written)
417  throw Error("Network I/O Error. Could not send byte vector data - unable to get stream position.");
418 
419 #ifdef USE_POSIX_THREADS
420  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
421  tm->increment_child_thread_count();
422  tm->start_thread(MarshallerThread::write_thread, d_out, byte_buf, bytes_written);
423  xdr_destroy(&byte_sink);
424 #else
425  d_out.write(byte_buf, bytes_written);
426  xdr_destroy(&byte_sink);
427  delete [] byte_buf;
428 #endif
429 
430  }
431  catch (...) {
432  DBG(cerr << "Caught an exception in put_vector_thread" << endl);
433  xdr_destroy(&byte_sink);
434  delete [] byte_buf;
435  throw;
436  }
437 }
438 
439 // private
450 void XDRStreamMarshaller::put_vector(char *val, unsigned int num, int width, Type type)
451 {
452  assert(val || num == 0);
453 
454  // write the number of array members being written, then set the position back to 0
455  put_int(num);
456 
457  if (num == 0)
458  return;
459 
460  int use_width = width;
461  if (use_width < 4) use_width = 4;
462 
463  // the size is the number of elements num times the width of each
464  // element, then add 4 bytes for the number of elements
465  int size = (num * use_width) + 4;
466 
467  // allocate enough memory for the elements
468  //vector<char> vec_buf(size);
469  char *vec_buf = new char[size];
470  XDR vec_sink;
471  try {
472  xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
473 
474  // set the position of the sink to 0, we're starting at the beginning
475  if (!xdr_setpos(&vec_sink, 0))
476  throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
477 
478  // write the array to the buffer
479  if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
480  throw Error("Network I/O Error(2). Could not send vector data - unable to encode.");
481 
482  // how much was written to the buffer
483  unsigned int bytes_written = xdr_getpos(&vec_sink);
484  if (!bytes_written)
485  throw Error("Network I/O Error. Could not send vector data - unable to get stream position.");
486 
487 #ifdef USE_POSIX_THREADS
488  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
489  tm->increment_child_thread_count();
490  tm->start_thread(MarshallerThread::write_thread, d_out, vec_buf, bytes_written);
491  xdr_destroy(&vec_sink);
492 #else
493  d_out.write(vec_buf, bytes_written);
494  xdr_destroy(&vec_sink);
495  delete [] vec_buf;
496 #endif
497  }
498  catch (...) {
499  xdr_destroy(&vec_sink);
500  delete [] vec_buf;
501  throw;
502  }
503 }
504 
516 void XDRStreamMarshaller::put_vector_part(char *val, unsigned int num, int width, Type type)
517 {
518  if (width == 1) {
519  // Add space for the 4 bytes of length info and 4 bytes for padding, even though
520  // we will not send either of those.
521  const unsigned int add_to = 8;
522  unsigned int bufsiz = num + add_to;
523  //vector<char> byte_buf(bufsiz);
524  char *byte_buf = new char[bufsiz];
525  XDR byte_sink;
526  try {
527  xdrmem_create(&byte_sink, byte_buf, bufsiz, XDR_ENCODE);
528  if (!xdr_setpos(&byte_sink, 0))
529  throw Error("Network I/O Error. Could not send byte vector data - unable to set stream position.");
530 
531  if (!xdr_bytes(&byte_sink, (char **) &val, (unsigned int *) &num, bufsiz))
532  throw Error("Network I/O Error(2). Could not send byte vector data - unable to encode data.");
533 
534 #ifdef USE_POSIX_THREADS
535  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
536  tm->increment_child_thread_count();
537 
538  // Increment the element count so we can figure out about the padding in put_vector_last()
539  d_partial_put_byte_count += num;
540 
541  tm->start_thread(MarshallerThread::write_thread_part, d_out, byte_buf, num);
542  xdr_destroy(&byte_sink);
543 #else
544  // Only send the num bytes that follow the 4 bytes of length info - we skip the
545  // length info because it's already been sent and we don't send any trailing padding
546  // bytes in this method (see put_vector_last() for that).
547  d_out.write(byte_buf + 4, num);
548 
549  if (d_out.fail())
550  throw Error ("Network I/O Error. Could not send initial part of byte vector data");
551 
552  // Now increment the element count so we can figure out about the padding in put_vector_last()
553  d_partial_put_byte_count += num;
554 
555  xdr_destroy(&byte_sink);
556  delete [] byte_buf;
557 #endif
558  }
559  catch (...) {
560  xdr_destroy(&byte_sink);
561  delete [] byte_buf;
562  throw;
563  }
564  }
565  else {
566  int use_width = (width < 4) ? 4 : width;
567 
568  // the size is the number of elements num times the width of each
569  // element, then add 4 bytes for the (int) number of elements
570  int size = (num * use_width) + 4;
571 
572  // allocate enough memory for the elements
573  //vector<char> vec_buf(size);
574  char *vec_buf = new char[size];
575  XDR vec_sink;
576  try {
577  xdrmem_create(&vec_sink, vec_buf, size, XDR_ENCODE);
578 
579  // set the position of the sink to 0, we're starting at the beginning
580  if (!xdr_setpos(&vec_sink, 0))
581  throw Error("Network I/O Error. Could not send vector data - unable to set stream position.");
582 
583  // write the array to the buffer
584  if (!xdr_array(&vec_sink, (char **) &val, (unsigned int *) &num, size, width, XDRUtils::xdr_coder(type)))
585  throw Error("Network I/O Error(2). Could not send vector data -unable to encode data.");
586 
587 #ifdef USE_POSIX_THREADS
588  Locker lock(tm->get_mutex(), tm->get_cond(), tm->get_child_thread_count());
589  tm->increment_child_thread_count();
590 
591  // Increment the element count so we can figure out about the padding in put_vector_last()
592  d_partial_put_byte_count += (size - 4);
593  tm->start_thread(MarshallerThread::write_thread_part, d_out, vec_buf, size - 4);
594  xdr_destroy(&vec_sink);
595 #else
596  // write that much out to the output stream, skipping the length data that
597  // XDR writes since we have already written the length info using put_vector_start()
598  d_out.write(vec_buf + 4, size - 4);
599 
600  if (d_out.fail())
601  throw Error ("Network I/O Error. Could not send part of vector data");
602 
603  // Now increment the element count so we can figure out about the padding in put_vector_last()
604  d_partial_put_byte_count += (size - 4);
605 
606  xdr_destroy(&vec_sink);
607  delete [] vec_buf;
608 #endif
609  }
610  catch (...) {
611  xdr_destroy(&vec_sink);
612  delete [] vec_buf;
613  throw;
614  }
615  }
616 }
617 
618 void XDRStreamMarshaller::dump(ostream &strm) const
619 {
620  strm << DapIndent::LMarg << "XDRStreamMarshaller::dump - (" << (void *) this << ")" << endl;
621 }
622 
623 } // namespace libdap
624 
static void * write_thread(void *arg)
Holds a one-dimensional collection of DAP2 data types.
Definition: Vector.h:80
virtual void put_vector_part(char *val, unsigned int num, int width, Type type)
void start_thread(void *(*thread)(void *arg), std::ostream &out, char *byte_buf, unsigned int bytes_written)
STL namespace.
virtual void put_vector_start(int num)
Type
Identifies the data type.
Definition: Type.h:94
A class for software fault reporting.
Definition: InternalErr.h:64
virtual void dump(ostream &strm) const
dump the contents of this object to the specified ostream
virtual BaseType * var(const string &name="", bool exact_match=true, btp_stack *s=0)
Definition: Vector.cc:434
virtual Type type() const
Returns the type of the class instance.
Definition: BaseType.cc:310
static void * write_thread_part(void *arg)
static xdrproc_t xdr_coder(const Type &t)
Returns a function used to encode elements of an array.
Definition: XDRUtils.cc:145
A class for error processing.
Definition: Error.h:90