![]() |
Monitors events on a collection of sockets. More...
#include <SocketMonitor.h>
Classes | |
class | Strategy |
Public Member Functions | |
SocketMonitor (int timeout=0) | |
virtual | ~SocketMonitor () |
bool | addConnect (int socket) |
bool | addRead (int socket) |
bool | addWrite (int socket) |
bool | drop (int socket) |
void | signal (int socket) |
void | unsignal (int socket) |
void | block (Strategy &strategy, bool poll=0, double timeout=0.0) |
int | numSockets () |
Private Types | |
typedef std::set< int > | Sockets |
typedef std::queue< int > | Queue |
Private Member Functions | |
void | setsockopt () |
bool | bind () |
bool | listen () |
void | buildSet (const Sockets &, fd_set &) |
timeval * | getTimeval (bool poll, double timeout) |
bool | sleepIfEmpty (bool poll) |
void | processReadSet (Strategy &, fd_set &) |
void | processWriteSet (Strategy &, fd_set &) |
void | processExceptSet (Strategy &, fd_set &) |
Private Attributes | |
int | m_timeout |
timeval | m_timeval |
clock_t | m_ticks |
int | m_signal |
int | m_interrupt |
Sockets | m_connectSockets |
Sockets | m_readSockets |
Sockets | m_writeSockets |
Queue | m_dropped |
Monitors events on a collection of sockets.
Definition at line 47 of file SocketMonitor.h.
typedef std::queue< int > FIX::SocketMonitor::Queue [private] |
Definition at line 68 of file SocketMonitor.h.
typedef std::set< int > FIX::SocketMonitor::Sockets [private] |
Definition at line 67 of file SocketMonitor.h.
FIX::SocketMonitor::SocketMonitor | ( | int | timeout = 0 |
) |
Definition at line 36 of file SocketMonitor.cpp.
References m_interrupt, m_readSockets, m_signal, m_ticks, m_timeval, FIX::socket_createpair(), FIX::socket_init(), and FIX::socket_setnonblock().
00037 : m_timeout( timeout ) 00038 { 00039 socket_init(); 00040 00041 std::pair<int, int> sockets = socket_createpair(); 00042 m_signal = sockets.first; 00043 m_interrupt = sockets.second; 00044 socket_setnonblock( m_signal ); 00045 socket_setnonblock( m_interrupt ); 00046 m_readSockets.insert( m_interrupt ); 00047 00048 m_timeval.tv_sec = 0; 00049 m_timeval.tv_usec = 0; 00050 #ifndef SELECT_DECREMENTS_TIME 00051 m_ticks = clock(); 00052 #endif 00053 }
FIX::SocketMonitor::~SocketMonitor | ( | ) | [virtual] |
Definition at line 55 of file SocketMonitor.cpp.
References m_readSockets, m_signal, FIX::socket_close(), and FIX::socket_term().
00056 { 00057 Sockets::iterator i; 00058 for ( i = m_readSockets.begin(); i != m_readSockets.end(); ++i ) { 00059 socket_close( *i ); 00060 } 00061 00062 socket_close( m_signal ); 00063 socket_term(); 00064 }
bool FIX::SocketMonitor::addConnect | ( | int | socket | ) |
Definition at line 66 of file SocketMonitor.cpp.
References m_connectSockets, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_setnonblock().
Referenced by FIX::SocketServer::accept(), and FIX::SocketConnector::connect().
00067 { QF_STACK_PUSH(SocketMonitor::addConnect) 00068 00069 socket_setnonblock( s ); 00070 Sockets::iterator i = m_connectSockets.find( s ); 00071 if( i != m_connectSockets.end() ) return false; 00072 00073 m_connectSockets.insert( s ); 00074 return true; 00075 00076 QF_STACK_POP 00077 }
bool FIX::SocketMonitor::addRead | ( | int | socket | ) |
Definition at line 79 of file SocketMonitor.cpp.
References m_readSockets, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_setnonblock().
00080 { QF_STACK_PUSH(SocketMonitor::addRead) 00081 00082 socket_setnonblock( s ); 00083 Sockets::iterator i = m_readSockets.find( s ); 00084 if( i != m_readSockets.end() ) return false; 00085 00086 m_readSockets.insert( s ); 00087 return true; 00088 00089 QF_STACK_POP 00090 }
bool FIX::SocketMonitor::addWrite | ( | int | socket | ) |
Definition at line 92 of file SocketMonitor.cpp.
References m_readSockets, m_writeSockets, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_setnonblock().
Referenced by processReadSet().
00093 { QF_STACK_PUSH(SocketMonitor::addWrite) 00094 00095 if( m_readSockets.find(s) == m_readSockets.end() ) 00096 return false; 00097 00098 socket_setnonblock( s ); 00099 Sockets::iterator i = m_writeSockets.find( s ); 00100 if( i != m_writeSockets.end() ) return false; 00101 00102 m_writeSockets.insert( s ); 00103 return true; 00104 00105 QF_STACK_POP 00106 }
bool FIX::SocketMonitor::bind | ( | ) | [private] |
void FIX::SocketMonitor::block | ( | Strategy & | strategy, | |
bool | poll = 0 , |
|||
double | timeout = 0.0 | |||
) |
Definition at line 204 of file SocketMonitor.cpp.
References buildSet(), getTimeval(), m_connectSockets, m_dropped, m_readSockets, m_writeSockets, FIX::SocketMonitor::Strategy::onError(), FIX::SocketMonitor::Strategy::onTimeout(), FIX::Queue< T >::pop(), processExceptSet(), processReadSet(), processWriteSet(), QF_STACK_POP, QF_STACK_PUSH, FIX::Queue< T >::size(), and sleepIfEmpty().
Referenced by FIX::SocketServer::block(), and FIX::SocketConnector::block().
00205 { QF_STACK_PUSH(SocketMonitor::block) 00206 00207 while ( m_dropped.size() ) 00208 { 00209 strategy.onError( *this, m_dropped.front() ); 00210 m_dropped.pop(); 00211 if ( m_dropped.size() == 0 ) 00212 return ; 00213 } 00214 00215 fd_set readSet; 00216 FD_ZERO( &readSet ); 00217 buildSet( m_readSockets, readSet ); 00218 fd_set writeSet; 00219 FD_ZERO( &writeSet ); 00220 buildSet( m_connectSockets, writeSet ); 00221 buildSet( m_writeSockets, writeSet ); 00222 fd_set exceptSet; 00223 FD_ZERO( &exceptSet ); 00224 buildSet( m_connectSockets, exceptSet ); 00225 00226 if ( sleepIfEmpty(poll) ) 00227 { 00228 strategy.onTimeout( *this ); 00229 return; 00230 } 00231 00232 int result = select( FD_SETSIZE, &readSet, &writeSet, &exceptSet, getTimeval(poll, timeout) ); 00233 00234 if ( result == 0 ) 00235 { 00236 strategy.onTimeout( *this ); 00237 return; 00238 } 00239 else if ( result > 0 ) 00240 { 00241 processExceptSet( strategy, exceptSet ); 00242 processWriteSet( strategy, writeSet ); 00243 processReadSet( strategy, readSet ); 00244 } 00245 else 00246 { 00247 strategy.onError( *this ); 00248 } 00249 00250 QF_STACK_POP 00251 }
void FIX::SocketMonitor::buildSet | ( | const Sockets & | sockets, | |
fd_set & | watchSet | |||
) | [private] |
Definition at line 363 of file SocketMonitor.cpp.
References QF_STACK_POP, and QF_STACK_PUSH.
Referenced by block().
00364 { QF_STACK_PUSH(SocketMonitor::buildSet) 00365 00366 Sockets::const_iterator iter; 00367 for ( iter = sockets.begin(); iter != sockets.end(); ++iter ) { 00368 FD_SET( *iter, &watchSet ); 00369 } 00370 QF_STACK_POP 00371 }
bool FIX::SocketMonitor::drop | ( | int | socket | ) |
Definition at line 108 of file SocketMonitor.cpp.
References m_connectSockets, m_dropped, m_readSockets, m_writeSockets, FIX::Queue< T >::push(), QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_close().
Referenced by FIX::SocketConnection::disconnect(), FIX::HttpServer::onConnect(), FIX::ServerWrapper::onError(), FIX::SocketConnection::read(), and FIX::SocketConnection::readMessages().
00109 { QF_STACK_PUSH(SocketMonitor::drop) 00110 00111 Sockets::iterator i = m_readSockets.find( s ); 00112 Sockets::iterator j = m_writeSockets.find( s ); 00113 Sockets::iterator k = m_connectSockets.find( s ); 00114 00115 if ( i != m_readSockets.end() || 00116 j != m_writeSockets.end() || 00117 k != m_connectSockets.end() ) 00118 { 00119 socket_close( s ); 00120 m_readSockets.erase( s ); 00121 m_writeSockets.erase( s ); 00122 m_connectSockets.erase( s ); 00123 m_dropped.push( s ); 00124 return true; 00125 } 00126 return false; 00127 00128 QF_STACK_POP 00129 }
timeval * FIX::SocketMonitor::getTimeval | ( | bool | poll, | |
double | timeout | |||
) | [inline, private] |
Definition at line 131 of file SocketMonitor.cpp.
References m_ticks, m_timeout, m_timeval, QF_STACK_POP, and QF_STACK_PUSH.
Referenced by block().
00132 { QF_STACK_PUSH(SocketMonitor::getTimeval) 00133 00134 if ( poll ) 00135 { 00136 m_timeval.tv_sec = 0; 00137 m_timeval.tv_usec = 0; 00138 return &m_timeval; 00139 } 00140 00141 timeout = m_timeout; 00142 00143 if ( !timeout ) 00144 return 0; 00145 #ifdef SELECT_MODIFIES_TIMEVAL 00146 if ( !m_timeval.tv_sec && !m_timeval.tv_usec && timeout ) 00147 m_timeval.tv_sec = timeout; 00148 return &m_timeval; 00149 #else 00150 double elapsed = ( double ) ( clock() - m_ticks ) / ( double ) CLOCKS_PER_SEC; 00151 if ( elapsed >= timeout || elapsed == 0.0 ) 00152 { 00153 m_ticks = clock(); 00154 m_timeval.tv_sec = 0; 00155 m_timeval.tv_usec = (long)(timeout * 1000000); 00156 } 00157 else 00158 { 00159 m_timeval.tv_sec = 0; 00160 m_timeval.tv_usec = ( long ) ( ( timeout - elapsed ) * 1000000 ); 00161 } 00162 return &m_timeval; 00163 #endif 00164 00165 QF_STACK_POP 00166 }
bool FIX::SocketMonitor::listen | ( | ) | [private] |
int FIX::SocketMonitor::numSockets | ( | ) | [inline] |
Definition at line 63 of file SocketMonitor.h.
References m_readSockets.
Referenced by FIX::SocketServer::numConnections().
00064 { return m_readSockets.size() - 1; }
void FIX::SocketMonitor::processExceptSet | ( | Strategy & | strategy, | |
fd_set & | exceptSet | |||
) | [private] |
Definition at line 339 of file SocketMonitor.cpp.
References m_connectSockets, FIX::SocketMonitor::Strategy::onError(), QF_STACK_POP, and QF_STACK_PUSH.
Referenced by block().
00340 { QF_STACK_PUSH(SocketMonitor::processExceptSet) 00341 00342 #ifdef _MSC_VER 00343 for ( unsigned i = 0; i < exceptSet.fd_count; ++i ) 00344 { 00345 int s = exceptSet.fd_array[ i ]; 00346 strategy.onError( *this, s ); 00347 } 00348 #else 00349 Sockets::iterator i; 00350 Sockets sockets = m_connectSockets; 00351 for ( i = sockets.begin(); i != sockets.end(); ++i ) 00352 { 00353 int s = *i; 00354 if ( !FD_ISSET( *i, &exceptSet ) ) 00355 continue; 00356 strategy.onError( *this, s ); 00357 } 00358 #endif 00359 00360 QF_STACK_POP 00361 }
void FIX::SocketMonitor::processReadSet | ( | Strategy & | strategy, | |
fd_set & | readSet | |||
) | [private] |
Definition at line 253 of file SocketMonitor.cpp.
References addWrite(), m_interrupt, m_readSockets, FIX::SocketMonitor::Strategy::onEvent(), QF_STACK_POP, and QF_STACK_PUSH.
Referenced by block().
00254 { QF_STACK_PUSH(SocketMonitor::processReadSet) 00255 00256 #ifdef _MSC_VER 00257 for ( unsigned i = 0; i < readSet.fd_count; ++i ) 00258 { 00259 int s = readSet.fd_array[ i ]; 00260 if( s == m_interrupt ) 00261 { 00262 int socket = 0; 00263 recv( s, (char*)&socket, sizeof(socket), 0 ); 00264 addWrite( socket ); 00265 } 00266 else 00267 { 00268 strategy.onEvent( *this, s ); 00269 } 00270 } 00271 #else 00272 Sockets::iterator i; 00273 Sockets sockets = m_readSockets; 00274 for ( i = sockets.begin(); i != sockets.end(); ++i ) 00275 { 00276 int s = *i; 00277 if ( !FD_ISSET( *i, &readSet ) ) 00278 continue; 00279 if( s == m_interrupt ) 00280 { 00281 int socket = 0; 00282 recv( s, (char*)&socket, sizeof(socket), 0 ); 00283 addWrite( socket ); 00284 } 00285 else 00286 { 00287 strategy.onEvent( *this, s ); 00288 } 00289 } 00290 #endif 00291 00292 QF_STACK_POP 00293 }
void FIX::SocketMonitor::processWriteSet | ( | Strategy & | strategy, | |
fd_set & | writeSet | |||
) | [private] |
Definition at line 295 of file SocketMonitor.cpp.
References m_connectSockets, m_readSockets, m_writeSockets, FIX::SocketMonitor::Strategy::onConnect(), FIX::SocketMonitor::Strategy::onWrite(), QF_STACK_POP, and QF_STACK_PUSH.
Referenced by block().
00296 { QF_STACK_PUSH(SocketMonitor::processWriteSet) 00297 00298 #ifdef _MSC_VER 00299 for ( unsigned i = 0; i < writeSet.fd_count; ++i ) 00300 { 00301 int s = writeSet.fd_array[ i ]; 00302 if( m_connectSockets.find(s) != m_connectSockets.end() ) 00303 { 00304 m_connectSockets.erase( s ); 00305 m_readSockets.insert( s ); 00306 strategy.onConnect( *this, s ); 00307 } 00308 else 00309 { 00310 strategy.onWrite( *this, s ); 00311 } 00312 } 00313 #else 00314 Sockets::iterator i; 00315 Sockets sockets = m_connectSockets; 00316 for( i = sockets.begin(); i != sockets.end(); ++i ) 00317 { 00318 int s = *i; 00319 if ( !FD_ISSET( *i, &writeSet ) ) 00320 continue; 00321 m_connectSockets.erase( s ); 00322 m_readSockets.insert( s ); 00323 strategy.onConnect( *this, s ); 00324 } 00325 00326 sockets = m_writeSockets; 00327 for( i = sockets.begin(); i != sockets.end(); ++i ) 00328 { 00329 int s = *i; 00330 if ( !FD_ISSET( *i, &writeSet ) ) 00331 continue; 00332 strategy.onWrite( *this, s ); 00333 } 00334 #endif 00335 00336 QF_STACK_POP 00337 }
void FIX::SocketMonitor::setsockopt | ( | ) | [private] |
void FIX::SocketMonitor::signal | ( | int | socket | ) |
Definition at line 187 of file SocketMonitor.cpp.
References m_signal, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_send().
Referenced by FIX::SocketConnection::signal().
00188 { QF_STACK_PUSH(SocketMonitor::signal) 00189 socket_send( m_signal, (char*)&socket, sizeof(socket) ); 00190 QF_STACK_POP 00191 }
bool FIX::SocketMonitor::sleepIfEmpty | ( | bool | poll | ) | [inline, private] |
Definition at line 168 of file SocketMonitor.cpp.
References m_connectSockets, m_readSockets, m_timeout, m_writeSockets, FIX::process_sleep(), QF_STACK_POP, and QF_STACK_PUSH.
Referenced by block().
00169 { QF_STACK_PUSH(SocketMonitor::sleepIfEmpty) 00170 00171 if( poll ) 00172 return false; 00173 00174 if ( m_readSockets.empty() && 00175 m_writeSockets.empty() && 00176 m_connectSockets.empty() ) 00177 { 00178 process_sleep( m_timeout ); 00179 return true; 00180 } 00181 else 00182 return false; 00183 00184 QF_STACK_POP 00185 }
void FIX::SocketMonitor::unsignal | ( | int | socket | ) |
Definition at line 193 of file SocketMonitor.cpp.
References m_writeSockets, QF_STACK_POP, and QF_STACK_PUSH.
Referenced by FIX::SocketConnection::unsignal().
00194 { QF_STACK_PUSH(SocketMonitor::unsignal) 00195 00196 Sockets::iterator i = m_writeSockets.find( s ); 00197 if( i == m_writeSockets.end() ) return; 00198 00199 m_writeSockets.erase( s ); 00200 00201 QF_STACK_POP 00202 }
Sockets FIX::SocketMonitor::m_connectSockets [private] |
Definition at line 89 of file SocketMonitor.h.
Referenced by addConnect(), block(), drop(), processExceptSet(), processWriteSet(), and sleepIfEmpty().
Queue FIX::SocketMonitor::m_dropped [private] |
Definition at line 92 of file SocketMonitor.h.
int FIX::SocketMonitor::m_interrupt [private] |
Definition at line 88 of file SocketMonitor.h.
Referenced by processReadSet(), and SocketMonitor().
Sockets FIX::SocketMonitor::m_readSockets [private] |
Definition at line 90 of file SocketMonitor.h.
Referenced by addRead(), addWrite(), block(), drop(), numSockets(), processReadSet(), processWriteSet(), sleepIfEmpty(), SocketMonitor(), and ~SocketMonitor().
int FIX::SocketMonitor::m_signal [private] |
Definition at line 87 of file SocketMonitor.h.
Referenced by signal(), SocketMonitor(), and ~SocketMonitor().
clock_t FIX::SocketMonitor::m_ticks [private] |
Definition at line 84 of file SocketMonitor.h.
Referenced by getTimeval(), and SocketMonitor().
int FIX::SocketMonitor::m_timeout [private] |
Definition at line 81 of file SocketMonitor.h.
Referenced by getTimeval(), and sleepIfEmpty().
timeval FIX::SocketMonitor::m_timeval [private] |
Definition at line 82 of file SocketMonitor.h.
Referenced by getTimeval(), and SocketMonitor().
Sockets FIX::SocketMonitor::m_writeSockets [private] |
Definition at line 91 of file SocketMonitor.h.
Referenced by addWrite(), block(), drop(), processWriteSet(), sleepIfEmpty(), and unsignal().