websocketpp  0.5.1
C++/Boost Asio based websocket client/server library
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Pages
connection.hpp
1 /*
2  * Copyright (c) 2014, Peter Thorson. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  * * Redistributions of source code must retain the above copyright
7  * notice, this list of conditions and the following disclaimer.
8  * * Redistributions in binary form must reproduce the above copyright
9  * notice, this list of conditions and the following disclaimer in the
10  * documentation and/or other materials provided with the distribution.
11  * * Neither the name of the WebSocket++ Project nor the
12  * names of its contributors may be used to endorse or promote products
13  * derived from this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
19  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  *
26  */
27 
28 #ifndef WEBSOCKETPP_TRANSPORT_IOSTREAM_CON_HPP
29 #define WEBSOCKETPP_TRANSPORT_IOSTREAM_CON_HPP
30 
31 #include <websocketpp/transport/iostream/base.hpp>
32 
33 #include <websocketpp/transport/base/connection.hpp>
34 
35 #include <websocketpp/logger/levels.hpp>
36 
37 #include <websocketpp/common/connection_hdl.hpp>
38 #include <websocketpp/common/memory.hpp>
39 #include <websocketpp/common/platforms.hpp>
40 
41 #include <algorithm>
42 #include <iostream>
43 #include <sstream>
44 #include <string>
45 #include <vector>
46 
47 namespace websocketpp {
48 namespace transport {
49 namespace iostream {
50 
53 struct timer {
54  void cancel() {}
55 };
56 
57 template <typename config>
58 class connection : public lib::enable_shared_from_this< connection<config> > {
59 public:
63  typedef lib::shared_ptr<type> ptr;
64 
66  typedef typename config::concurrency_type concurrency_type;
68  typedef typename config::alog_type alog_type;
70  typedef typename config::elog_type elog_type;
71 
72  // Concurrency policy types
73  typedef typename concurrency_type::scoped_lock_type scoped_lock_type;
74  typedef typename concurrency_type::mutex_type mutex_type;
75 
76  typedef lib::shared_ptr<timer> timer_ptr;
77 
78  explicit connection(bool is_server, alog_type & alog, elog_type & elog)
79  : m_output_stream(NULL)
80  , m_reading(false)
81  , m_is_server(is_server)
82  , m_is_secure(false)
83  , m_alog(alog)
84  , m_elog(elog)
85  , m_remote_endpoint("iostream transport")
86  {
87  m_alog.write(log::alevel::devel,"iostream con transport constructor");
88  }
89 
91  ptr get_shared() {
92  return type::shared_from_this();
93  }
94 
96 
102  void register_ostream(std::ostream * o) {
103  // TODO: lock transport state?
104  scoped_lock_type lock(m_read_mutex);
105  m_output_stream = o;
106  }
107 
109 
127  friend std::istream & operator>> (std::istream & in, type & t) {
128  // this serializes calls to external read.
129  scoped_lock_type lock(t.m_read_mutex);
130 
131  t.read(in);
132 
133  return in;
134  }
135 
137 
150  size_t read_some(char const * buf, size_t len) {
151  // this serializes calls to external read.
152  scoped_lock_type lock(m_read_mutex);
153 
154  return this->read_some_impl(buf,len);
155  }
156 
158 
173  size_t read_all(char const * buf, size_t len) {
174  // this serializes calls to external read.
175  scoped_lock_type lock(m_read_mutex);
176 
177  size_t total_read = 0;
178  size_t temp_read = 0;
179 
180  do {
181  temp_read = this->read_some_impl(buf+total_read,len-total_read);
182  total_read += temp_read;
183  } while (temp_read != 0 && total_read < len);
184 
185  return total_read;
186  }
187 
189 
193  size_t readsome(char const * buf, size_t len) {
194  return this->read_some(buf,len);
195  }
196 
198 
204  void eof() {
205  // this serializes calls to external read.
206  scoped_lock_type lock(m_read_mutex);
207 
208  if (m_reading) {
209  complete_read(make_error_code(transport::error::eof));
210  }
211  }
212 
214 
220  void fatal_error() {
221  // this serializes calls to external read.
222  scoped_lock_type lock(m_read_mutex);
223 
224  if (m_reading) {
225  complete_read(make_error_code(transport::error::pass_through));
226  }
227  }
228 
230 
242  void set_secure(bool value) {
243  m_is_secure = value;
244  }
245 
247 
256  bool is_secure() const {
257  return m_is_secure;
258  }
259 
261 
274  void set_remote_endpoint(std::string value) {
275  m_remote_endpoint = value;
276  }
277 
279 
290  std::string get_remote_endpoint() const {
291  return m_remote_endpoint;
292  }
293 
295 
299  return m_connection_hdl;
300  }
301 
303 
312  timer_ptr set_timer(long, timer_handler) {
313  return timer_ptr();
314  }
315 
317 
331  m_write_handler = h;
332  }
333 
335 
351  m_shutdown_handler = h;
352  }
353 protected:
355 
360  void init(init_handler handler) {
361  m_alog.write(log::alevel::devel,"iostream connection init");
362  handler(lib::error_code());
363  }
364 
366 
389  void async_read_at_least(size_t num_bytes, char *buf, size_t len,
390  read_handler handler)
391  {
392  std::stringstream s;
393  s << "iostream_con async_read_at_least: " << num_bytes;
394  m_alog.write(log::alevel::devel,s.str());
395 
396  if (num_bytes > len) {
397  handler(make_error_code(error::invalid_num_bytes),size_t(0));
398  return;
399  }
400 
401  if (m_reading == true) {
402  handler(make_error_code(error::double_read),size_t(0));
403  return;
404  }
405 
406  if (num_bytes == 0 || len == 0) {
407  handler(lib::error_code(),size_t(0));
408  return;
409  }
410 
411  m_buf = buf;
412  m_len = len;
413  m_bytes_needed = num_bytes;
414  m_read_handler = handler;
415  m_cursor = 0;
416  m_reading = true;
417  }
418 
420 
437  void async_write(char const * buf, size_t len, transport::write_handler
438  handler)
439  {
440  m_alog.write(log::alevel::devel,"iostream_con async_write");
441  // TODO: lock transport state?
442 
443  lib::error_code ec;
444 
445  if (m_output_stream) {
446  m_output_stream->write(buf,len);
447 
448  if (m_output_stream->bad()) {
449  ec = make_error_code(error::bad_stream);
450  }
451  } else if (m_write_handler) {
452  ec = m_write_handler(m_connection_hdl, buf, len);
453  } else {
454  ec = make_error_code(error::output_stream_required);
455  }
456 
457  handler(ec);
458  }
459 
461 
477  void async_write(std::vector<buffer> const & bufs, transport::write_handler
478  handler)
479  {
480  m_alog.write(log::alevel::devel,"iostream_con async_write buffer list");
481  // TODO: lock transport state?
482 
483  lib::error_code ec;
484 
485  if (m_output_stream) {
486  std::vector<buffer>::const_iterator it;
487  for (it = bufs.begin(); it != bufs.end(); it++) {
488  m_output_stream->write((*it).buf,(*it).len);
489 
490  if (m_output_stream->bad()) {
491  ec = make_error_code(error::bad_stream);
492  break;
493  }
494  }
495  } else if (m_write_handler) {
496  std::vector<buffer>::const_iterator it;
497  for (it = bufs.begin(); it != bufs.end(); it++) {
498  ec = m_write_handler(m_connection_hdl, (*it).buf, (*it).len);
499  if (ec) {break;}
500  }
501 
502  } else {
503  ec = make_error_code(error::output_stream_required);
504  }
505 
506  handler(ec);
507  }
508 
510 
514  m_connection_hdl = hdl;
515  }
516 
518 
528  lib::error_code dispatch(dispatch_handler handler) {
529  handler();
530  return lib::error_code();
531  }
532 
534 
542  lib::error_code ec;
543 
544  if (m_shutdown_handler) {
545  ec = m_shutdown_handler(m_connection_hdl);
546  }
547 
548  handler(ec);
549  }
550 private:
551  void read(std::istream &in) {
552  m_alog.write(log::alevel::devel,"iostream_con read");
553 
554  while (in.good()) {
555  if (!m_reading) {
556  m_elog.write(log::elevel::devel,"write while not reading");
557  break;
558  }
559 
560  in.read(m_buf+m_cursor,static_cast<std::streamsize>(m_len-m_cursor));
561 
562  if (in.gcount() == 0) {
563  m_elog.write(log::elevel::devel,"read zero bytes");
564  break;
565  }
566 
567  m_cursor += static_cast<size_t>(in.gcount());
568 
569  // TODO: error handling
570  if (in.bad()) {
571  m_reading = false;
572  complete_read(make_error_code(error::bad_stream));
573  }
574 
575  if (m_cursor >= m_bytes_needed) {
576  m_reading = false;
577  complete_read(lib::error_code());
578  }
579  }
580  }
581 
582  size_t read_some_impl(char const * buf, size_t len) {
583  m_alog.write(log::alevel::devel,"iostream_con read_some");
584 
585  if (!m_reading) {
586  m_elog.write(log::elevel::devel,"write while not reading");
587  return 0;
588  }
589 
590  size_t bytes_to_copy = (std::min)(len,m_len-m_cursor);
591 
592  std::copy(buf,buf+bytes_to_copy,m_buf+m_cursor);
593 
594  m_cursor += bytes_to_copy;
595 
596  if (m_cursor >= m_bytes_needed) {
597  complete_read(lib::error_code());
598  }
599 
600  return bytes_to_copy;
601  }
602 
604 
619  void complete_read(lib::error_code const & ec) {
620  m_reading = false;
621 
622  read_handler handler = m_read_handler;
623  m_read_handler = read_handler();
624 
625  handler(ec,m_cursor);
626  }
627 
628  // Read space (Protected by m_read_mutex)
629  char * m_buf;
630  size_t m_len;
631  size_t m_bytes_needed;
632  read_handler m_read_handler;
633  size_t m_cursor;
634 
635  // transport resources
636  std::ostream * m_output_stream;
637  connection_hdl m_connection_hdl;
638  write_handler m_write_handler;
639  shutdown_handler m_shutdown_handler;
640 
641  bool m_reading;
642  bool const m_is_server;
643  bool m_is_secure;
644  alog_type & m_alog;
645  elog_type & m_elog;
646  std::string m_remote_endpoint;
647 
648  // This lock ensures that only one thread can edit read data for this
649  // connection. This is a very coarse lock that is basically locked all the
650  // time. The nature of the connection is such that it cannot be
651  // parallelized, the locking is here to prevent intra-connection concurrency
652  // in order to allow inter-connection concurrency.
653  mutex_type m_read_mutex;
654 };
655 
656 
657 } // namespace iostream
658 } // namespace transport
659 } // namespace websocketpp
660 
661 #endif // WEBSOCKETPP_TRANSPORT_IOSTREAM_CON_HPP
void register_ostream(std::ostream *o)
Register a std::ostream with the transport for writing output.
Definition: connection.hpp:102
uint16_t value
The type of a close code value.
Definition: close.hpp:49
void set_shutdown_handler(shutdown_handler h)
Sets the shutdown handler.
Definition: connection.hpp:350
void set_handle(connection_hdl hdl)
Set Connection Handle.
Definition: connection.hpp:513
friend std::istream & operator>>(std::istream &in, type &t)
Overloaded stream input operator.
Definition: connection.hpp:127
void async_write(std::vector< buffer > const &bufs, transport::write_handler handler)
Asyncronous Transport Write (scatter-gather)
Definition: connection.hpp:477
lib::error_code dispatch(dispatch_handler handler)
Call given handler back within the transport's event system (if present)
Definition: connection.hpp:528
void set_remote_endpoint(std::string value)
Set human readable remote endpoint address.
Definition: connection.hpp:274
lib::function< void(lib::error_code const &)> write_handler
The type and signature of the callback passed to the write method.
Definition: connection.hpp:123
static level const devel
Low level debugging information (warning: very chatty)
Definition: levels.hpp:63
lib::weak_ptr< void > connection_hdl
A handle to uniquely identify a connection.
bool is_secure() const
Tests whether or not the underlying transport is secure.
Definition: connection.hpp:256
void async_shutdown(transport::shutdown_handler handler)
Perform cleanup on socket shutdown_handler.
Definition: connection.hpp:541
size_t readsome(char const *buf, size_t len)
Manual input supply (DEPRECATED)
Definition: connection.hpp:193
async_read called while another async_read was in progress
Definition: base.hpp:62
lib::function< void(lib::error_code const &, size_t)> read_handler
The type and signature of the callback passed to the read method.
Definition: connection.hpp:120
size_t read_all(char const *buf, size_t len)
Manual input supply (read all)
Definition: connection.hpp:173
underlying transport pass through
Definition: connection.hpp:153
std::string get_remote_endpoint() const
Get human readable remote endpoint address.
Definition: connection.hpp:290
static level const devel
Development messages (warning: very chatty)
Definition: levels.hpp:141
lib::function< lib::error_code(connection_hdl)> shutdown_handler
Definition: base.hpp:49
void async_write(char const *buf, size_t len, transport::write_handler handler)
Asyncronous Transport Write.
Definition: connection.hpp:437
size_t read_some(char const *buf, size_t len)
Manual input supply (read some)
Definition: connection.hpp:150
lib::function< void()> dispatch_handler
The type and signature of the callback passed to the dispatch method.
Definition: connection.hpp:135
async_read_at_least call requested more bytes than buffer can store
Definition: base.hpp:59
config::alog_type alog_type
Type of this transport's access logging policy.
Definition: connection.hpp:68
lib::function< void(lib::error_code const &)> timer_handler
The type and signature of the callback passed to the read method.
Definition: connection.hpp:126
lib::function< void(lib::error_code const &)> shutdown_handler
The type and signature of the callback passed to the shutdown method.
Definition: connection.hpp:129
Namespace for the WebSocket++ project.
Definition: base64.hpp:41
config::elog_type elog_type
Type of this transport's error logging policy.
Definition: connection.hpp:70
lib::function< void(lib::error_code const &)> init_handler
The type and signature of the callback passed to the init hook.
Definition: connection.hpp:117
connection< config > type
Type of this connection transport component.
Definition: connection.hpp:61
ptr get_shared()
Get a shared pointer to this component.
Definition: connection.hpp:91
void set_write_handler(write_handler h)
Sets the write handler.
Definition: connection.hpp:330
void init(init_handler handler)
Initialize the connection transport.
Definition: connection.hpp:360
connection_hdl get_handle() const
Get the connection handle.
Definition: connection.hpp:298
void set_secure(bool value)
Set whether or not this connection is secure.
Definition: connection.hpp:242
timer_ptr set_timer(long, timer_handler)
Call back a function after a period of time.
Definition: connection.hpp:312
void fatal_error()
Signal transport error.
Definition: connection.hpp:220
void async_read_at_least(size_t num_bytes, char *buf, size_t len, read_handler handler)
Initiate an async_read for at least num_bytes bytes into buf.
Definition: connection.hpp:389
config::concurrency_type concurrency_type
transport concurrency policy
Definition: connection.hpp:66
lib::function< lib::error_code(connection_hdl, char const *, size_t)> write_handler
The type and signature of the callback used by iostream transport to write.
Definition: base.hpp:45
lib::shared_ptr< type > ptr
Type of a shared pointer to this connection transport component.
Definition: connection.hpp:63