![]() |
Encapsulates a socket file descriptor (single-threaded). More...
#include <SocketConnection.h>
Public Types | |
typedef std::set< SessionID > | Sessions |
Public Member Functions | |
SocketConnection (int s, Sessions sessions, SocketMonitor *pMonitor) | |
SocketConnection (SocketInitiator &, const SessionID &, int, SocketMonitor *) | |
virtual | ~SocketConnection () |
int | getSocket () const |
Session * | getSession () const |
bool | read (SocketConnector &s) |
bool | read (SocketAcceptor &, SocketServer &) |
bool | processQueue () |
void | signal () |
void | unsignal () |
void | onTimeout () |
Private Types | |
typedef std::deque < std::string, ALLOCATOR < std::string > > | Queue |
Private Member Functions | |
bool | isValidSession () |
void | readFromSocket () throw ( SocketRecvFailed ) |
bool | readMessage (std::string &msg) |
void | readMessages (SocketMonitor &s) |
bool | send (const std::string &) |
void | disconnect () |
Private Attributes | |
int | m_socket |
char | m_buffer [BUFSIZ] |
Parser | m_parser |
Queue | m_sendQueue |
unsigned | m_sendLength |
Sessions | m_sessions |
Session * | m_pSession |
SocketMonitor * | m_pMonitor |
Mutex | m_mutex |
fd_set | m_fds |
Encapsulates a socket file descriptor (single-threaded).
Definition at line 45 of file SocketConnection.h.
typedef std::deque<std::string, ALLOCATOR<std::string> > FIX::SocketConnection::Queue [private] |
Definition at line 79 of file SocketConnection.h.
typedef std::set<SessionID> FIX::SocketConnection::Sessions |
Definition at line 48 of file SocketConnection.h.
FIX::SocketConnection::SocketConnection | ( | int | s, | |
Sessions | sessions, | |||
SocketMonitor * | pMonitor | |||
) |
Definition at line 36 of file SocketConnection.cpp.
References m_fds, and m_socket.
00038 : m_socket( s ), m_sendLength( 0 ), 00039 m_sessions(sessions), m_pSession( 0 ), m_pMonitor( pMonitor ) 00040 { 00041 FD_ZERO( &m_fds ); 00042 FD_SET( m_socket, &m_fds ); 00043 }
FIX::SocketConnection::SocketConnection | ( | SocketInitiator & | i, | |
const SessionID & | sessionID, | |||
int | s, | |||
SocketMonitor * | pMonitor | |||
) |
Definition at line 45 of file SocketConnection.cpp.
References m_fds, m_sessions, and m_socket.
00048 : m_socket( s ), m_sendLength( 0 ), 00049 m_pSession( i.getSession( sessionID, *this ) ), 00050 m_pMonitor( pMonitor ) 00051 { 00052 FD_ZERO( &m_fds ); 00053 FD_SET( m_socket, &m_fds ); 00054 m_sessions.insert( sessionID ); 00055 }
FIX::SocketConnection::~SocketConnection | ( | ) | [virtual] |
Definition at line 57 of file SocketConnection.cpp.
References FIX::Session::getSessionID(), m_pSession, and FIX::Session::unregisterSession().
00058 { 00059 if ( m_pSession ) 00060 Session::unregisterSession( m_pSession->getSessionID() ); 00061 }
void FIX::SocketConnection::disconnect | ( | ) | [private, virtual] |
Implements FIX::Responder.
Definition at line 107 of file SocketConnection.cpp.
References FIX::SocketMonitor::drop(), m_pMonitor, m_socket, QF_STACK_POP, and QF_STACK_PUSH.
00108 { QF_STACK_PUSH(SocketConnection::disconnect) 00109 00110 if ( m_pMonitor ) 00111 m_pMonitor->drop( m_socket ); 00112 00113 QF_STACK_POP 00114 }
Session* FIX::SocketConnection::getSession | ( | ) | const [inline] |
Definition at line 55 of file SocketConnection.h.
References m_pSession.
Referenced by FIX::SocketInitiator::onConnect(), FIX::SocketInitiator::onDisconnect(), and FIX::SocketAcceptor::onDisconnect().
00055 { return m_pSession; }
int FIX::SocketConnection::getSocket | ( | ) | const [inline] |
bool FIX::SocketConnection::isValidSession | ( | ) | [private] |
Definition at line 188 of file SocketConnection.cpp.
References FIX::Session::getSessionID(), FIX::Session::isSessionRegistered(), m_pSession, m_sessions, QF_STACK_POP, and QF_STACK_PUSH.
Referenced by read().
00189 { QF_STACK_PUSH(SocketConnection::isValidSession) 00190 00191 if( m_pSession == 0 ) 00192 return false; 00193 SessionID sessionID = m_pSession->getSessionID(); 00194 if( Session::isSessionRegistered(sessionID) ) 00195 return false; 00196 return !( m_sessions.find(sessionID) == m_sessions.end() ); 00197 00198 QF_STACK_POP 00199 }
void FIX::SocketConnection::onTimeout | ( | ) |
Definition at line 244 of file SocketConnection.cpp.
References m_pSession, FIX::Session::next(), QF_STACK_POP, and QF_STACK_PUSH.
Referenced by FIX::SocketInitiator::onConnect().
00245 { QF_STACK_PUSH(SocketConnection::onTimeout) 00246 if ( m_pSession ) m_pSession->next(); 00247 QF_STACK_POP 00248 }
bool FIX::SocketConnection::processQueue | ( | ) |
Definition at line 76 of file SocketConnection.cpp.
References m_fds, m_mutex, m_sendLength, m_sendQueue, m_socket, QF_STACK_POP, QF_STACK_PUSH, FIX::Queue< T >::size(), and FIX::socket_send().
Referenced by FIX::SocketInitiator::onWrite(), FIX::SocketAcceptor::onWrite(), and send().
00077 { QF_STACK_PUSH(SocketConnection::processQueue) 00078 00079 Locker l( m_mutex ); 00080 00081 if( !m_sendQueue.size() ) return true; 00082 00083 struct timeval timeout = { 0, 0 }; 00084 fd_set writeset = m_fds; 00085 if( select( 1 + m_socket, 0, &writeset, 0, &timeout ) <= 0 ) 00086 return false; 00087 00088 const std::string& msg = m_sendQueue.front(); 00089 00090 int result = socket_send 00091 ( m_socket, msg.c_str() + m_sendLength, msg.length() - m_sendLength ); 00092 00093 if( result > 0 ) 00094 m_sendLength += result; 00095 00096 if( m_sendLength == msg.length() ) 00097 { 00098 m_sendLength = 0; 00099 m_sendQueue.pop_front(); 00100 } 00101 00102 return !m_sendQueue.size(); 00103 00104 QF_STACK_POP 00105 }
bool FIX::SocketConnection::read | ( | SocketAcceptor & | a, | |
SocketServer & | s | |||
) |
Definition at line 136 of file SocketConnection.cpp.
References FIX::SocketMonitor::drop(), FIX::Session::getLog(), FIX::Acceptor::getLog(), FIX::SocketServer::getMonitor(), FIX::Acceptor::getSession(), FIX::Session::getSessionID(), isValidSession(), FIX::Session::lookupSession(), m_pSession, m_socket, FIX::Session::next(), FIX::Log::onEvent(), FIX::Log::onIncoming(), QF_STACK_POP, QF_STACK_PUSH, read(), readFromSocket(), readMessage(), readMessages(), FIX::Session::registerSession(), and FIX::TYPE::UtcTimeStamp.
00137 { QF_STACK_PUSH(SocketConnection::read) 00138 00139 std::string msg; 00140 try 00141 { 00142 readFromSocket(); 00143 00144 if ( !m_pSession ) 00145 { 00146 if ( !readMessage( msg ) ) return false; 00147 m_pSession = Session::lookupSession( msg, true ); 00148 if( !isValidSession() ) 00149 { 00150 m_pSession = 0; 00151 if( a.getLog() ) 00152 { 00153 a.getLog()->onEvent( "Session not found for incoming message: " + msg ); 00154 a.getLog()->onIncoming( msg ); 00155 } 00156 } 00157 if( m_pSession ) 00158 m_pSession = a.getSession( msg, *this ); 00159 if( m_pSession ) 00160 m_pSession->next( msg, UtcTimeStamp() ); 00161 if( !m_pSession ) 00162 { 00163 s.getMonitor().drop( m_socket ); 00164 return false; 00165 } 00166 00167 Session::registerSession( m_pSession->getSessionID() ); 00168 } 00169 00170 readMessages( s.getMonitor() ); 00171 return true; 00172 } 00173 catch ( SocketRecvFailed& e ) 00174 { 00175 if( m_pSession ) 00176 m_pSession->getLog()->onEvent( e.what() ); 00177 s.getMonitor().drop( m_socket ); 00178 } 00179 catch ( InvalidMessage& ) 00180 { 00181 s.getMonitor().drop( m_socket ); 00182 } 00183 return false; 00184 00185 QF_STACK_POP 00186 }
bool FIX::SocketConnection::read | ( | SocketConnector & | s | ) |
Definition at line 116 of file SocketConnection.cpp.
References FIX::Session::getLog(), FIX::SocketConnector::getMonitor(), m_pSession, FIX::Log::onEvent(), QF_STACK_POP, QF_STACK_PUSH, readFromSocket(), and readMessages().
Referenced by FIX::SocketInitiator::onData(), FIX::SocketAcceptor::onData(), and read().
00117 { QF_STACK_PUSH(SocketConnection::read) 00118 00119 if ( !m_pSession ) return false; 00120 00121 try 00122 { 00123 readFromSocket(); 00124 readMessages( s.getMonitor() ); 00125 } 00126 catch( SocketRecvFailed& e ) 00127 { 00128 m_pSession->getLog()->onEvent( e.what() ); 00129 return false; 00130 } 00131 return true; 00132 00133 QF_STACK_POP 00134 }
void FIX::SocketConnection::readFromSocket | ( | ) | throw ( SocketRecvFailed ) [private] |
Definition at line 201 of file SocketConnection.cpp.
References FIX::Parser::addToStream(), m_buffer, m_parser, m_socket, QF_STACK_POP, and QF_STACK_PUSH.
Referenced by read().
00203 { QF_STACK_PUSH(SocketConnection::readFromSocket) 00204 00205 int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 ); 00206 if( size <= 0 ) throw SocketRecvFailed( size ); 00207 m_parser.addToStream( m_buffer, size ); 00208 00209 QF_STACK_POP 00210 }
bool FIX::SocketConnection::readMessage | ( | std::string & | msg | ) | [private] |
Definition at line 212 of file SocketConnection.cpp.
References m_parser, QF_STACK_POP, QF_STACK_PUSH, and FIX::Parser::readFixMessage().
Referenced by read(), and readMessages().
00213 { QF_STACK_PUSH(SocketConnection::readMessage) 00214 00215 try 00216 { 00217 return m_parser.readFixMessage( msg ); 00218 } 00219 catch ( MessageParseError& ) {} 00220 return true; 00221 00222 QF_STACK_POP 00223 }
void FIX::SocketConnection::readMessages | ( | SocketMonitor & | s | ) | [private] |
Definition at line 225 of file SocketConnection.cpp.
References FIX::SocketMonitor::drop(), FIX::Session::isLoggedOn(), m_pSession, m_socket, FIX::Session::next(), readMessage(), and FIX::TYPE::UtcTimeStamp.
Referenced by read().
00226 { 00227 if( !m_pSession ) return; 00228 00229 std::string msg; 00230 while( readMessage( msg ) ) 00231 { 00232 try 00233 { 00234 m_pSession->next( msg, UtcTimeStamp() ); 00235 } 00236 catch ( InvalidMessage& ) 00237 { 00238 if( !m_pSession->isLoggedOn() ) 00239 s.drop( m_socket ); 00240 } 00241 } 00242 }
bool FIX::SocketConnection::send | ( | const std::string & | msg | ) | [private, virtual] |
Implements FIX::Responder.
Definition at line 63 of file SocketConnection.cpp.
References m_mutex, m_sendQueue, processQueue(), QF_STACK_POP, QF_STACK_PUSH, and signal().
00064 { QF_STACK_PUSH(SocketConnection::send) 00065 00066 Locker l( m_mutex ); 00067 00068 m_sendQueue.push_back( msg ); 00069 processQueue(); 00070 signal(); 00071 return true; 00072 00073 QF_STACK_POP 00074 }
void FIX::SocketConnection::signal | ( | ) | [inline] |
Definition at line 61 of file SocketConnection.h.
References m_mutex, m_pMonitor, m_sendQueue, m_socket, FIX::SocketMonitor::signal(), and FIX::Queue< T >::size().
Referenced by send().
00062 { 00063 Locker l( m_mutex ); 00064 if( m_sendQueue.size() == 1 ) 00065 m_pMonitor->signal( m_socket ); 00066 }
void FIX::SocketConnection::unsignal | ( | ) | [inline] |
Definition at line 68 of file SocketConnection.h.
References m_mutex, m_pMonitor, m_sendQueue, m_socket, FIX::Queue< T >::size(), and FIX::SocketMonitor::unsignal().
Referenced by FIX::SocketInitiator::onWrite(), and FIX::SocketAcceptor::onWrite().
00069 { 00070 Locker l( m_mutex ); 00071 if( m_sendQueue.size() == 0 ) 00072 m_pMonitor->unsignal( m_socket ); 00073 }
char FIX::SocketConnection::m_buffer[BUFSIZ] [private] |
Definition at line 89 of file SocketConnection.h.
Referenced by readFromSocket().
fd_set FIX::SocketConnection::m_fds [private] |
Definition at line 98 of file SocketConnection.h.
Referenced by processQueue(), and SocketConnection().
Mutex FIX::SocketConnection::m_mutex [private] |
Definition at line 97 of file SocketConnection.h.
Referenced by processQueue(), send(), signal(), and unsignal().
Parser FIX::SocketConnection::m_parser [private] |
Definition at line 91 of file SocketConnection.h.
Referenced by readFromSocket(), and readMessage().
SocketMonitor* FIX::SocketConnection::m_pMonitor [private] |
Definition at line 96 of file SocketConnection.h.
Referenced by disconnect(), signal(), and unsignal().
Session* FIX::SocketConnection::m_pSession [private] |
Definition at line 95 of file SocketConnection.h.
Referenced by getSession(), isValidSession(), onTimeout(), read(), readMessages(), and ~SocketConnection().
unsigned FIX::SocketConnection::m_sendLength [private] |
Definition at line 93 of file SocketConnection.h.
Referenced by processQueue().
Queue FIX::SocketConnection::m_sendQueue [private] |
Definition at line 92 of file SocketConnection.h.
Referenced by processQueue(), send(), signal(), and unsignal().
Sessions FIX::SocketConnection::m_sessions [private] |
Definition at line 94 of file SocketConnection.h.
Referenced by isValidSession(), and SocketConnection().
int FIX::SocketConnection::m_socket [private] |
Definition at line 88 of file SocketConnection.h.
Referenced by disconnect(), getSocket(), processQueue(), read(), readFromSocket(), readMessages(), signal(), SocketConnection(), and unsignal().