00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 #ifndef __PION_TCP_STREAM_HEADER__
00011 #define __PION_TCP_STREAM_HEADER__
00012
00013 #include <cstring>
00014 #include <istream>
00015 #include <streambuf>
00016 #include <boost/bind.hpp>
00017 #include <boost/thread/mutex.hpp>
00018 #include <boost/thread/condition.hpp>
00019 #include <pion/config.hpp>
00020 #include <pion/tcp/connection.hpp>
00021
00022
00023 namespace pion {
00024 namespace tcp {
00025
00026
00032 class stream_buffer
00033 : public std::basic_streambuf<char, std::char_traits<char> >
00034 {
00035 public:
00036
00037
00038 typedef char char_type;
00039 typedef std::char_traits<char>::int_type int_type;
00040 typedef std::char_traits<char>::off_type off_type;
00041 typedef std::char_traits<char>::pos_type pos_type;
00042 typedef std::char_traits<char> traits_type;
00043
00044
00045 enum {
00046 PUT_BACK_MAX = 10,
00047 WRITE_BUFFER_SIZE = 8192
00048 };
00049
00050
00056 explicit stream_buffer(tcp::connection_ptr& conn_ptr)
00057 : m_conn_ptr(conn_ptr), m_read_buf(m_conn_ptr->get_read_buffer().c_array())
00058 {
00059 setup_buffers();
00060 }
00061
00068 explicit stream_buffer(boost::asio::io_service& io_service,
00069 const bool ssl_flag = false)
00070 : m_conn_ptr(new connection(io_service, ssl_flag)),
00071 m_read_buf(m_conn_ptr->get_read_buffer().c_array())
00072 {
00073 setup_buffers();
00074 }
00075
00082 stream_buffer(boost::asio::io_service& io_service,
00083 connection::ssl_context_type& ssl_context)
00084 : m_conn_ptr(new connection(io_service, ssl_context)),
00085 m_read_buf(m_conn_ptr->get_read_buffer().c_array())
00086 {
00087 setup_buffers();
00088 }
00089
00091 virtual ~stream_buffer() { sync(); }
00092
00094 connection& get_connection(void) { return *m_conn_ptr; }
00095
00097 const connection& get_connection(void) const { return *m_conn_ptr; }
00098
00099
00100 protected:
00101
00103 inline void setup_buffers(void) {
00104
00105 setg(m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX);
00106
00107 setp(m_write_buf, m_write_buf+(WRITE_BUFFER_SIZE-1));
00108 }
00109
00115 inline int_type flush_output(void) {
00116 const std::streamsize bytes_to_send = std::streamsize(pptr() - pbase());
00117 int_type bytes_sent = 0;
00118 if (bytes_to_send > 0) {
00119 boost::mutex::scoped_lock async_lock(m_async_mutex);
00120 m_bytes_transferred = 0;
00121 m_conn_ptr->async_write(boost::asio::buffer(pbase(), bytes_to_send),
00122 boost::bind(&stream_buffer::operation_finished, this,
00123 boost::asio::placeholders::error,
00124 boost::asio::placeholders::bytes_transferred));
00125 m_async_done.wait(async_lock);
00126 bytes_sent = m_bytes_transferred;
00127 pbump(-bytes_sent);
00128 if (m_async_error)
00129 bytes_sent = traits_type::eof();
00130 }
00131 return bytes_sent;
00132 }
00133
00139 virtual int_type underflow(void) {
00140
00141 if (gptr() < egptr())
00142 return traits_type::to_int_type(*gptr());
00143
00144
00145 std::streamsize put_back_num = std::streamsize(gptr() - eback());
00146 if (put_back_num > PUT_BACK_MAX)
00147 put_back_num = PUT_BACK_MAX;
00148
00149
00150 if (put_back_num > 0)
00151 memmove(m_read_buf+(PUT_BACK_MAX-put_back_num), gptr()-put_back_num, put_back_num);
00152
00153
00154
00155
00156 boost::mutex::scoped_lock async_lock(m_async_mutex);
00157 m_bytes_transferred = 0;
00158 m_conn_ptr->async_read_some(boost::asio::buffer(m_read_buf+PUT_BACK_MAX,
00159 connection::READ_BUFFER_SIZE-PUT_BACK_MAX),
00160 boost::bind(&stream_buffer::operation_finished, this,
00161 boost::asio::placeholders::error,
00162 boost::asio::placeholders::bytes_transferred));
00163 m_async_done.wait(async_lock);
00164 if (m_async_error)
00165 return traits_type::eof();
00166
00167
00168 setg(m_read_buf+(PUT_BACK_MAX-put_back_num),
00169 m_read_buf+PUT_BACK_MAX,
00170 m_read_buf+PUT_BACK_MAX+m_bytes_transferred);
00171
00172
00173 return traits_type::to_int_type(*gptr());
00174 }
00175
00182 virtual int_type overflow(int_type c) {
00183 if (! traits_type::eq_int_type(c, traits_type::eof())) {
00184
00185
00186
00187 *pptr() = c;
00188 pbump(1);
00189 }
00190
00191 return ((flush_output() == traits_type::eof())
00192 ? traits_type::eof() : traits_type::not_eof(c));
00193 }
00194
00203 virtual std::streamsize xsputn(const char_type *s, std::streamsize n) {
00204 const std::streamsize bytes_available = std::streamsize(epptr() - pptr());
00205 std::streamsize bytes_sent = 0;
00206 if (bytes_available >= n) {
00207
00208 memcpy(pptr(), s, n);
00209 pbump(n);
00210 bytes_sent = n;
00211 } else {
00212
00213 if (bytes_available > 0) {
00214
00215 memcpy(pptr(), s, bytes_available);
00216 pbump(bytes_available);
00217 }
00218
00219 if (flush_output() == traits_type::eof())
00220 return 0;
00221 if ((n-bytes_available) >= (WRITE_BUFFER_SIZE-1)) {
00222
00223
00224 boost::mutex::scoped_lock async_lock(m_async_mutex);
00225 m_bytes_transferred = 0;
00226 m_conn_ptr->async_write(boost::asio::buffer(s+bytes_available,
00227 n-bytes_available),
00228 boost::bind(&stream_buffer::operation_finished, this,
00229 boost::asio::placeholders::error,
00230 boost::asio::placeholders::bytes_transferred));
00231 m_async_done.wait(async_lock);
00232 bytes_sent = bytes_available + m_bytes_transferred;
00233 } else {
00234
00235
00236 memcpy(pbase(), s+bytes_available, n-bytes_available);
00237 pbump(n-bytes_available);
00238 bytes_sent = n;
00239 }
00240 }
00241 return bytes_sent;
00242 }
00243
00252 virtual std::streamsize xsgetn(char_type *s, std::streamsize n) {
00253 std::streamsize bytes_remaining = n;
00254 while (bytes_remaining > 0) {
00255 const std::streamsize bytes_available = std::streamsize(egptr() - gptr());
00256 const std::streamsize bytes_next_read = ((bytes_available >= bytes_remaining)
00257 ? bytes_remaining : bytes_available);
00258
00259 if (bytes_next_read > 0) {
00260 memcpy(s, gptr(), bytes_next_read);
00261 gbump(bytes_next_read);
00262 bytes_remaining -= bytes_next_read;
00263 s += bytes_next_read;
00264 }
00265 if (bytes_remaining > 0) {
00266
00267 if (traits_type::eq_int_type(underflow(), traits_type::eof()))
00268 break;
00269 }
00270 }
00271 return(n-bytes_remaining);
00272 }
00273
00279 virtual int_type sync(void) {
00280 return ((flush_output() == traits_type::eof()) ? -1 : 0);
00281 }
00282
00283
00284 private:
00285
00287 inline void operation_finished(const boost::system::error_code& error_code,
00288 std::size_t bytes_transferred)
00289 {
00290 boost::mutex::scoped_lock async_lock(m_async_mutex);
00291 m_async_error = error_code;
00292 m_bytes_transferred = bytes_transferred;
00293 m_async_done.notify_one();
00294 }
00295
00296
00298 tcp::connection_ptr m_conn_ptr;
00299
00301 boost::mutex m_async_mutex;
00302
00304 boost::condition m_async_done;
00305
00307 boost::system::error_code m_async_error;
00308
00310 std::size_t m_bytes_transferred;
00311
00313 char_type * m_read_buf;
00314
00316 char_type m_write_buf[WRITE_BUFFER_SIZE];
00317 };
00318
00319
00323 class stream
00324 : public std::basic_iostream<char, std::char_traits<char> >
00325 {
00326 public:
00327
00328
00329 typedef char char_type;
00330 typedef std::char_traits<char>::int_type int_type;
00331 typedef std::char_traits<char>::off_type off_type;
00332 typedef std::char_traits<char>::pos_type pos_type;
00333 typedef std::char_traits<char> traits_type;
00334
00335
00341 explicit stream(tcp::connection_ptr& conn_ptr)
00342 : m_tcp_buf(conn_ptr)
00343 #ifdef _MSC_VER
00344 , std::basic_iostream<char, std::char_traits<char> >(NULL)
00345 #endif
00346 {
00347
00348 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
00349 }
00350
00357 explicit stream(boost::asio::io_service& io_service,
00358 const bool ssl_flag = false)
00359 : m_tcp_buf(io_service, ssl_flag)
00360 #ifdef _MSC_VER
00361 , std::basic_iostream<char, std::char_traits<char> >(NULL)
00362 #endif
00363 {
00364
00365 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
00366 }
00367
00374 stream(boost::asio::io_service& io_service,
00375 connection::ssl_context_type& ssl_context)
00376 : m_tcp_buf(io_service, ssl_context)
00377 #ifdef _MSC_VER
00378 , std::basic_iostream<char, std::char_traits<char> >(NULL)
00379 #endif
00380 {
00381
00382 std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
00383 }
00384
00393 inline boost::system::error_code accept(boost::asio::ip::tcp::acceptor& tcp_acceptor)
00394 {
00395 boost::system::error_code ec = m_tcp_buf.get_connection().accept(tcp_acceptor);
00396 if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_server();
00397 return ec;
00398 }
00399
00408 inline boost::system::error_code connect(boost::asio::ip::tcp::endpoint& tcp_endpoint)
00409 {
00410 boost::system::error_code ec = m_tcp_buf.get_connection().connect(tcp_endpoint);
00411 if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_client();
00412 return ec;
00413 }
00414
00424 inline boost::system::error_code connect(const boost::asio::ip::address& remote_addr,
00425 const unsigned int remote_port)
00426 {
00427 boost::asio::ip::tcp::endpoint tcp_endpoint(remote_addr, remote_port);
00428 boost::system::error_code ec = m_tcp_buf.get_connection().connect(tcp_endpoint);
00429 if (! ec && get_ssl_flag()) ec = m_tcp_buf.get_connection().handshake_client();
00430 return ec;
00431 }
00432
00434 inline void close(void) { m_tcp_buf.get_connection().close(); }
00435
00436
00437
00438
00440
00441
00442
00444 inline bool is_open(void) const { return m_tcp_buf.get_connection().is_open(); }
00445
00447 inline bool get_ssl_flag(void) const { return m_tcp_buf.get_connection().get_ssl_flag(); }
00448
00450 inline boost::asio::ip::address get_remote_ip(void) const {
00451 return m_tcp_buf.get_connection().get_remote_ip();
00452 }
00453
00455 stream_buffer *rdbuf(void) { return &m_tcp_buf; }
00456
00457
00458 private:
00459
00461 stream_buffer m_tcp_buf;
00462 };
00463
00464
00465 }
00466 }
00467
00468 #endif