Drizzled Public API Documentation

net_serv.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2008 Sun Microsystems, Inc.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <config.h>
22 
23 #include <drizzled/current_session.h>
24 #include <drizzled/error.h>
25 #include <drizzled/session.h>
26 #include <drizzled/statistics_variables.h>
27 #include <drizzled/system_variables.h>
28 
29 #include <assert.h>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <signal.h>
34 #include <errno.h>
35 #include <sys/socket.h>
36 #include <sys/poll.h>
37 #include <zlib.h>
38 #include <algorithm>
39 
40 #include "errmsg.h"
41 #include "vio.h"
42 #include "net_serv.h"
43 
44 namespace drizzle_plugin {
45 
46 using namespace std;
47 using namespace drizzled;
48 
49 /*
50  The following handles the differences when this is linked between the
51  client and the server.
52 
53  This gives an error if a too big packet is found
54  The server can change this with the -O switch, but because the client
55  can't normally do this the client should have a bigger max_allowed_packet.
56 */
57 
58  /* Constants when using compression */
59 #define NET_HEADER_SIZE 4 /* standard header size */
60 #define COMP_HEADER_SIZE 3 /* compression header extra size */
61 
62 #define MAX_PACKET_LENGTH (256L*256L*256L-1)
63 
64 static bool net_write_buff(NET*, const void*, uint32_t len);
65 static int drizzleclient_net_real_write(NET *net, const unsigned char *packet, size_t len);
66 
69 void NET::init(int sock, uint32_t buffer_length)
70 {
71  vio= new Vio(sock);
72  max_packet= (uint32_t) buffer_length;
73  max_packet_size= max(buffer_length, drizzled::global_system_variables.max_allowed_packet);
74 
75  buff= (unsigned char*) malloc((size_t) max_packet + NET_HEADER_SIZE + COMP_HEADER_SIZE);
76  buff_end= buff + max_packet;
77  error_= 0;
78  pkt_nr= compress_pkt_nr= 0;
79  write_pos= read_pos= buff;
80  compress= 0;
81  where_b= remain_in_buf= 0;
82  last_errno= 0;
83  vio->fastsend();
84 }
85 
86 void NET::end()
87 {
88  free(buff);
89  buff= NULL;
90 }
91 
92 void NET::close()
93 {
94  drizzled::safe_delete(vio);
95 }
96 
97 bool NET::peer_addr(char *buf, size_t buflen, uint16_t& port)
98 {
99  return vio->peer_addr(buf, buflen, port);
100 }
101 
102 void NET::keepalive(bool flag)
103 {
104  vio->keepalive(flag);
105 }
106 
107 int NET::get_sd() const
108 {
109  return vio->get_fd();
110 }
111 
114 static bool drizzleclient_net_realloc(NET *net, size_t length)
115 {
116  if (length >= net->max_packet_size)
117  {
118  /* @todo: 1 and 2 codes are identical. */
119  net->error_= 3;
120  net->last_errno= ER_NET_PACKET_TOO_LARGE;
121  my_error(ER_NET_PACKET_TOO_LARGE, MYF(0));
122  return 1;
123  }
124  size_t pkt_length = (length + IO_SIZE - 1) & ~(IO_SIZE - 1);
125  /*
126  We must allocate some extra bytes for the end 0 and to be able to
127  read big compressed blocks
128  */
129  unsigned char* buff= (unsigned char*)realloc((char*) net->buff, pkt_length + NET_HEADER_SIZE + COMP_HEADER_SIZE);
130  net->buff=net->write_pos= buff;
131  net->buff_end= buff + (net->max_packet= (uint32_t) pkt_length);
132  return 0;
133 }
134 
135 bool NET::flush()
136 {
137  bool error= false;
138  if (buff != write_pos)
139  {
140  error= drizzleclient_net_real_write(this, buff, write_pos - buff) ? true : false;
141  write_pos= buff;
142  }
143  /* Sync packet number if using compression */
144  if (compress)
145  pkt_nr= compress_pkt_nr;
146  return error;
147 }
148 
149 
150 /*****************************************************************************
151  ** Write something to server/client buffer
152  *****************************************************************************/
153 
164 static bool
165 drizzleclient_net_write(NET* net, const void* packet0, size_t len)
166 {
167  const unsigned char* packet= reinterpret_cast<const unsigned char*>(packet0);
168  unsigned char buff[NET_HEADER_SIZE];
169  if (unlikely(!net->vio)) /* nowhere to write */
170  return 0;
171  /*
172  Big packets are handled by splitting them in packets of MAX_PACKET_LENGTH
173  length. The last packet is always a packet that is < MAX_PACKET_LENGTH.
174  (The last packet may even have a length of 0)
175  */
176  while (len >= MAX_PACKET_LENGTH)
177  {
178  const uint32_t z_size = MAX_PACKET_LENGTH;
179  int3store(buff, z_size);
180  buff[3]= (unsigned char) net->pkt_nr++;
181  if (net_write_buff(net, buff, NET_HEADER_SIZE) || net_write_buff(net, packet, z_size))
182  return 1;
183  packet += z_size;
184  len-= z_size;
185  }
186  /* Write last packet */
187  int3store(buff,len);
188  buff[3]= (unsigned char) net->pkt_nr++;
189  return net_write_buff(net, buff, NET_HEADER_SIZE) || net_write_buff(net, packet, len);
190 }
191 
219 static bool
220 drizzleclient_net_write_command(NET *net,unsigned char command,
221  const unsigned char *header, size_t head_len,
222  const unsigned char *packet, size_t len)
223 {
224  uint32_t length=len+1+head_len; /* 1 extra byte for command */
225  unsigned char buff[NET_HEADER_SIZE+1];
226  uint32_t header_size=NET_HEADER_SIZE+1;
227 
228  buff[4]=command; /* For first packet */
229 
230  if (length >= MAX_PACKET_LENGTH)
231  {
232  /* Take into account that we have the command in the first header */
233  len= MAX_PACKET_LENGTH - 1 - head_len;
234  do
235  {
236  int3store(buff, MAX_PACKET_LENGTH);
237  buff[3]= (unsigned char) net->pkt_nr++;
238  if (net_write_buff(net, buff, header_size) ||
239  net_write_buff(net, header, head_len) ||
240  net_write_buff(net, packet, len))
241  return(1);
242  packet+= len;
243  length-= MAX_PACKET_LENGTH;
244  len= MAX_PACKET_LENGTH;
245  head_len= 0;
246  header_size= NET_HEADER_SIZE;
247  } while (length >= MAX_PACKET_LENGTH);
248  len=length; /* Data left to be written */
249  }
250  int3store(buff,length);
251  buff[3]= (unsigned char) net->pkt_nr++;
252  return (net_write_buff(net, buff, header_size) ||
253  (head_len && net_write_buff(net, header, head_len)) ||
254  net_write_buff(net, packet, len) || net->flush());
255 }
256 
283 static bool
284 net_write_buff(NET* net, const void* packet0, uint32_t len)
285 {
286  const unsigned char* packet= reinterpret_cast<const unsigned char*>(packet0);
287  uint32_t left_length;
288  if (net->compress && net->max_packet > MAX_PACKET_LENGTH)
289  left_length= MAX_PACKET_LENGTH - (net->write_pos - net->buff);
290  else
291  left_length= (uint32_t) (net->buff_end - net->write_pos);
292 
293  if (len > left_length)
294  {
295  if (net->write_pos != net->buff)
296  {
297  /* Fill up already used packet and write it */
298  memcpy(net->write_pos,packet,left_length);
299  if (drizzleclient_net_real_write(net, net->buff,
300  (size_t) (net->write_pos - net->buff) + left_length))
301  return 1;
302  net->write_pos= net->buff;
303  packet+= left_length;
304  len-= left_length;
305  }
306  if (net->compress)
307  {
308  /*
309  We can't have bigger packets than 16M with compression
310  Because the uncompressed length is stored in 3 bytes
311  */
312  left_length= MAX_PACKET_LENGTH;
313  while (len > left_length)
314  {
315  if (drizzleclient_net_real_write(net, packet, left_length))
316  return 1;
317  packet+= left_length;
318  len-= left_length;
319  }
320  }
321  if (len > net->max_packet)
322  return drizzleclient_net_real_write(net, packet, len) ? 1 : 0;
323  /* Send out rest of the blocks as full sized blocks */
324  }
325  memcpy(net->write_pos,packet,len);
326  net->write_pos+= len;
327  return 0;
328 }
329 
330 
339 /*
340  TODO: rewrite this in a manner to do non-block writes. If a write can not be made, and we are
341  in the server, yield to another process and come back later.
342 */
343 static int
344 drizzleclient_net_real_write(NET *net, const unsigned char *packet, size_t len)
345 {
346  /* Backup of the original SO_RCVTIMEO timeout */
347 
348  if (net->error_ == 2)
349  return(-1); /* socket can't be used */
350 
351  if (net->compress)
352  {
353  const uint32_t header_length=NET_HEADER_SIZE+COMP_HEADER_SIZE;
354  unsigned char* b= (unsigned char*) malloc(len + NET_HEADER_SIZE + COMP_HEADER_SIZE);
355  memcpy(b+header_length,packet,len);
356 
357  size_t complen= len * 120 / 100 + 12;
358  unsigned char* compbuf= new unsigned char[complen];
359  uLongf tmp_complen= complen;
360  int res= compress((Bytef*) compbuf, &tmp_complen,
361  (Bytef*) (b+header_length),
362  len);
363  complen= tmp_complen;
364 
365  delete[] compbuf;
366 
367  if (res != Z_OK || complen >= len)
368  complen= 0;
369  else
370  {
371  size_t tmplen= complen;
372  complen= len;
373  len= tmplen;
374  }
375  int3store(&b[NET_HEADER_SIZE],complen);
376  int3store(b,len);
377  b[3]=(unsigned char) (net->compress_pkt_nr++);
378  len+= header_length;
379  packet= b;
380  }
381 
382  uint32_t retry_count= 0;
383  const unsigned char* pos= packet;
384  const unsigned char* end= pos + len;
385  /* Loop until we have read everything */
386  while (pos != end)
387  {
388  assert(pos);
389  // TODO - see bug comment below - will we crash now?
390  size_t length;
391  if ((long) (length= net->vio->write( pos, (size_t) (end-pos))) <= 0)
392  {
393  /*
394  * We could end up here with net->vio == NULL
395  * See LP bug#436685
396  * If that is the case, we exit the while loop
397  */
398  if (net->vio == NULL)
399  break;
400 
401  const bool interrupted= net->vio->should_retry();
402  /*
403  If we read 0, or we were interrupted this means that
404  we need to switch to blocking mode and wait until the timeout
405  on the socket kicks in.
406  */
407  if (interrupted || length == 0)
408  {
409  bool old_mode;
410  while (net->vio->blocking(true, &old_mode) < 0)
411  {
412  if (net->vio->should_retry() && retry_count++ < net->retry_count)
413  continue;
414  net->error_= 2; /* Close socket */
415  net->last_errno= ER_NET_PACKET_TOO_LARGE;
416  my_error(ER_NET_PACKET_TOO_LARGE, MYF(0));
417  goto end;
418  }
419  retry_count=0;
420  continue;
421  }
422  else
423  {
424  if (retry_count++ < net->retry_count)
425  continue;
426  }
427 
428  if (net->vio->get_errno() == EINTR)
429  {
430  continue;
431  }
432  net->error_= 2; /* Close socket */
433  net->last_errno= interrupted ? CR_NET_WRITE_INTERRUPTED : CR_NET_ERROR_ON_WRITE;
434  break;
435  }
436  pos+= length;
437 
438  /* If this is an error we may not have a current_session any more */
439  if (current_session)
440  current_session->status_var.bytes_sent+= length;
441  }
442 end:
443  if (net->compress)
444  free((char*)packet);
445 
446  return (int) (pos != end);
447 }
448 
449 
459 static uint32_t
460 my_real_read(NET *net, size_t *complen)
461 {
462  size_t length= 0;
463  uint32_t retry_count=0;
464  size_t len=packet_error;
465  uint32_t remain= net->compress ? NET_HEADER_SIZE+COMP_HEADER_SIZE : NET_HEADER_SIZE;
466 
467  *complen = 0;
468 
469  /* Read timeout is set in drizzleclient_net_set_read_timeout */
470 
471  unsigned char* pos = net->buff + net->where_b; /* net->packet -4 */
472 
473  for (uint32_t i= 0; i < 2 ; i++)
474  {
475  while (remain > 0)
476  {
477  /* First read is done with non blocking mode */
478  if ((long) (length= net->vio->read(pos, remain)) <= 0L)
479  {
480  if (net->vio == NULL)
481  goto end;
482 
483  const bool interrupted = net->vio->should_retry();
484 
485  if (interrupted)
486  { /* Probably in MIT threads */
487  if (retry_count++ < net->retry_count)
488  continue;
489  }
490  if (net->vio->get_errno() == EINTR)
491  {
492  continue;
493  }
494  len= packet_error;
495  net->error_= 2; /* Close socket */
496  net->last_errno= net->vio->was_interrupted() ? CR_NET_READ_INTERRUPTED : CR_NET_READ_ERROR;
497  goto end;
498  }
499  remain -= (uint32_t) length;
500  pos+= length;
501  current_session->status_var.bytes_received+= length;
502  }
503  if (i == 0)
504  { /* First parts is packet length */
505  uint32_t helping;
506 
507  if (net->buff[net->where_b + 3] != (unsigned char) net->pkt_nr)
508  {
509  len= packet_error;
510  /* Not a NET error on the client. XXX: why? */
511  my_error(ER_NET_PACKETS_OUT_OF_ORDER, MYF(0));
512  goto end;
513  }
514  net->compress_pkt_nr= ++net->pkt_nr;
515  if (net->compress)
516  {
517  /*
518  If the packet is compressed then complen > 0 and contains the
519  number of bytes in the uncompressed packet
520  */
521  *complen=uint3korr(&(net->buff[net->where_b + NET_HEADER_SIZE]));
522  }
523 
524  len=uint3korr(net->buff+net->where_b);
525  if (!len) /* End of big multi-packet */
526  goto end;
527  helping = max(len,*complen) + net->where_b;
528  /* The necessary size of net->buff */
529  if (helping >= net->max_packet)
530  {
531  if (drizzleclient_net_realloc(net,helping))
532  {
533  /* Clear the buffer so libdrizzle doesn't keep retrying */
534  while (len > 0)
535  {
536  length= read(net->vio->get_fd(), net->buff, min((size_t)net->max_packet, len));
537  assert((long)length > 0L);
538  len-= length;
539  }
540 
541  len= packet_error; /* Return error and close connection */
542  goto end;
543  }
544  }
545  pos=net->buff + net->where_b;
546  remain = (uint32_t) len;
547  }
548  }
549 
550 end:
551  return len;
552 }
553 
554 
571 static uint32_t
573 {
574  size_t len, complen;
575 
576  if (not net->compress)
577  {
578  len = my_real_read(net,&complen);
579  if (len == MAX_PACKET_LENGTH)
580  {
581  /* First packet of a multi-packet. Concatenate the packets */
582  uint32_t save_pos = net->where_b;
583  size_t total_length= 0;
584 
585  do
586  {
587  net->where_b += len;
588  total_length += len;
589  len = my_real_read(net,&complen);
590  } while (len == MAX_PACKET_LENGTH);
591 
592  if (len != packet_error)
593  {
594  len+= total_length;
595  }
596  net->where_b = save_pos;
597  }
598  net->read_pos = net->buff + net->where_b;
599 
600  if (len != packet_error)
601  net->read_pos[len]=0; /* Safeguard for drizzleclient_use_result */
602 
603  return len;
604  }
605  else
606  {
607  /* We are using the compressed protocol */
608 
609  uint32_t buf_length;
610  uint32_t start_of_packet;
611  uint32_t first_packet_offset;
612  uint32_t read_length, multi_byte_packet=0;
613 
614  if (net->remain_in_buf)
615  {
616  buf_length= net->buf_length; /* Data left in old packet */
617  first_packet_offset= start_of_packet= (net->buf_length -
618  net->remain_in_buf);
619  /* Restore the character that was overwritten by the end 0 */
620  net->buff[start_of_packet]= net->save_char;
621  }
622  else
623  {
624  /* reuse buffer, as there is nothing in it that we need */
625  buf_length= start_of_packet= first_packet_offset= 0;
626  }
627  for (;;)
628  {
629  uint32_t packet_len;
630 
631  if (buf_length - start_of_packet >= NET_HEADER_SIZE)
632  {
633  read_length = uint3korr(net->buff+start_of_packet);
634  if (!read_length)
635  {
636  /* End of multi-byte packet */
637  start_of_packet += NET_HEADER_SIZE;
638  break;
639  }
640  if (read_length + NET_HEADER_SIZE <= buf_length - start_of_packet)
641  {
642  if (multi_byte_packet)
643  {
644  /* Remove packet header for second packet */
645  memmove(net->buff + first_packet_offset + start_of_packet,
646  net->buff + first_packet_offset + start_of_packet +
647  NET_HEADER_SIZE,
648  buf_length - start_of_packet);
649  start_of_packet += read_length;
650  buf_length -= NET_HEADER_SIZE;
651  }
652  else
653  start_of_packet+= read_length + NET_HEADER_SIZE;
654 
655  if (read_length != MAX_PACKET_LENGTH) /* last package */
656  {
657  multi_byte_packet= 0; /* No last zero len packet */
658  break;
659  }
660  multi_byte_packet= NET_HEADER_SIZE;
661  /* Move data down to read next data packet after current one */
662  if (first_packet_offset)
663  {
664  memmove(net->buff,net->buff+first_packet_offset,
665  buf_length-first_packet_offset);
666  buf_length-=first_packet_offset;
667  start_of_packet -= first_packet_offset;
668  first_packet_offset=0;
669  }
670  continue;
671  }
672  }
673  /* Move data down to read next data packet after current one */
674  if (first_packet_offset)
675  {
676  memmove(net->buff,net->buff+first_packet_offset,
677  buf_length-first_packet_offset);
678  buf_length-=first_packet_offset;
679  start_of_packet -= first_packet_offset;
680  first_packet_offset=0;
681  }
682 
683  net->where_b=buf_length;
684  if ((packet_len = my_real_read(net,&complen)) == packet_error)
685  return packet_error;
686 
687  if (complen)
688  {
689  unsigned char * compbuf= (unsigned char *) malloc(complen);
690  if (compbuf != NULL)
691  {
692  uLongf tmp_complen= complen;
693  int error= uncompress((Bytef*) compbuf, &tmp_complen,
694  (Bytef*) (net->buff + net->where_b),
695  (uLong)packet_len);
696  complen= tmp_complen;
697 
698  if (error != Z_OK)
699  {
700  net->error_= 2; /* caller will close socket */
701  net->last_errno= CR_NET_UNCOMPRESS_ERROR;
702  }
703  else
704  {
705  memcpy((net->buff + net->where_b), compbuf, complen);
706  }
707  free(compbuf);
708  }
709  }
710  else
711  {
712  complen= packet_len;
713  }
714 
715  }
716  buf_length+= complen;
717 
718  net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE;
719  net->buf_length= buf_length;
720  net->remain_in_buf= (uint32_t) (buf_length - start_of_packet);
721  len = ((uint32_t) (start_of_packet - first_packet_offset) - NET_HEADER_SIZE -
722  multi_byte_packet);
723  net->save_char= net->read_pos[len]; /* Must be saved */
724  net->read_pos[len]=0; /* Safeguard for drizzleclient_use_result */
725  }
726  return len;
727 }
728 
729 void NET::set_read_timeout(uint32_t timeout)
730 {
731  read_timeout_= timeout;
732 #ifndef __sun
733  if (vio)
734  vio->timeout(0, timeout);
735 #endif
736  return;
737 }
738 
739 void NET::set_write_timeout(uint32_t timeout)
740 {
741  write_timeout_= timeout;
742 #ifndef __sun
743  if (vio)
744  vio->timeout(1, timeout);
745 #endif
746  return;
747 }
748 
749 bool NET::write(const void* data, size_t size)
750 {
751  return drizzleclient_net_write(this, data, size);
752 }
753 
754 bool NET::write_command(unsigned char command, data_ref header, data_ref body)
755 {
756  return drizzleclient_net_write_command(this, command, header.data(), header.size(), body.data(), body.size());
757 }
758 
759 uint32_t NET::read()
760 {
761  return drizzleclient_net_read(this);
762 }
763 
764 } /* namespace drizzle_plugin */