Index  Source Files  Annotated Class List  Alphabetical Class List  Class Hierarchy  Graphical Class Hierarchy 

FIX::ThreadedSocketConnection Class Reference

Encapsulates a socket file descriptor (multi-threaded). More...

#include <ThreadedSocketConnection.h>

Inheritance diagram for FIX::ThreadedSocketConnection:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketConnection:
Collaboration graph
[legend]

List of all members.

Public Types

typedef std::set< SessionIDSessions

Public Member Functions

 ThreadedSocketConnection (int s, Sessions sessions, Application &application, Log *pLog)
 ThreadedSocketConnection (const SessionID &, int s, const std::string &address, short port, Application &, Log *pLog)
virtual ~ThreadedSocketConnection ()
SessiongetSession () const
int getSocket () const
bool connect ()
void disconnect ()
bool read ()

Private Member Functions

bool readMessage (std::string &msg) throw ( SocketRecvFailed )
void processStream ()
bool send (const std::string &)
bool setSession (const std::string &msg)

Private Attributes

int m_socket
char m_buffer [BUFSIZ]
std::string m_address
int m_port
Applicationm_application
Logm_pLog
Parser m_parser
Sessions m_sessions
Sessionm_pSession
bool m_disconnect
fd_set m_fds

Detailed Description

Encapsulates a socket file descriptor (multi-threaded).

Definition at line 44 of file ThreadedSocketConnection.h.


Member Typedef Documentation

Definition at line 47 of file ThreadedSocketConnection.h.


Constructor & Destructor Documentation

FIX::ThreadedSocketConnection::ThreadedSocketConnection ( int  s,
Sessions  sessions,
Application application,
Log pLog 
)

Definition at line 36 of file ThreadedSocketConnection.cpp.

00037 : m_socket( s ), m_application( application ), m_pLog( pLog ),
00038   m_sessions( sessions ), m_pSession( 0 ),
00039   m_disconnect( false )
00040 {
00041   FD_ZERO( &m_fds );
00042   FD_SET( m_socket, &m_fds );
00043 }

FIX::ThreadedSocketConnection::ThreadedSocketConnection ( const SessionID sessionID,
int  s,
const std::string &  address,
short  port,
Application application,
Log pLog 
)

Definition at line 46 of file ThreadedSocketConnection.cpp.

00049   : m_socket( s ), m_address( address ), m_port( port ),
00050     m_application( application ), m_pLog( m_pLog ),
00051     m_pSession( Session::lookupSession( sessionID ) ),
00052     m_disconnect( false )
00053 {
00054   FD_ZERO( &m_fds );
00055   FD_SET( m_socket, &m_fds );
00056   if ( m_pSession ) m_pSession->setResponder( this );
00057 }

FIX::ThreadedSocketConnection::~ThreadedSocketConnection (  )  [virtual]

Definition at line 59 of file ThreadedSocketConnection.cpp.

References FIX::Session::getSessionID(), m_pSession, FIX::Session::setResponder(), and FIX::Session::unregisterSession().

00060 {
00061   if ( m_pSession )
00062   {
00063     m_pSession->setResponder( 0 );
00064     Session::unregisterSession( m_pSession->getSessionID() );
00065   }
00066 }


Member Function Documentation

bool FIX::ThreadedSocketConnection::connect (  ) 
void FIX::ThreadedSocketConnection::disconnect (  )  [virtual]
Session* FIX::ThreadedSocketConnection::getSession (  )  const [inline]

Definition at line 55 of file ThreadedSocketConnection.h.

References m_pSession.

Referenced by FIX::ThreadedSocketInitiator::socketThread().

00055 { return m_pSession; }

int FIX::ThreadedSocketConnection::getSocket (  )  const [inline]
void FIX::ThreadedSocketConnection::processStream (  )  [private]

Definition at line 154 of file ThreadedSocketConnection.cpp.

References disconnect(), FIX::Session::isLoggedOn(), m_pSession, FIX::Session::next(), QF_STACK_POP, QF_STACK_PUSH, readMessage(), setSession(), and FIX::TYPE::UtcTimeStamp.

Referenced by read().

00155 { QF_STACK_PUSH(ThreadedSocketConnection::processStream)
00156 
00157   std::string msg;
00158   while( readMessage(msg) )
00159   {
00160     if ( !m_pSession )
00161     {
00162       if ( !setSession( msg ) )
00163       { disconnect(); continue; }
00164     }
00165     try
00166     {
00167       m_pSession->next( msg, UtcTimeStamp() );
00168     }
00169     catch( InvalidMessage& )
00170     {
00171       if( !m_pSession->isLoggedOn() )
00172       {
00173         disconnect();
00174         return;
00175       }
00176     }
00177   }
00178 
00179   QF_STACK_POP
00180 }

bool FIX::ThreadedSocketConnection::read (  ) 

Definition at line 89 of file ThreadedSocketConnection.cpp.

References FIX::Parser::addToStream(), disconnect(), FIX::Session::disconnect(), FIX::Session::getLog(), m_buffer, m_disconnect, m_fds, m_parser, m_pSession, m_socket, FIX::Session::next(), FIX::Log::onEvent(), processStream(), QF_STACK_POP, and QF_STACK_PUSH.

Referenced by FIX::ThreadedSocketAcceptor::socketConnectionThread(), and FIX::ThreadedSocketInitiator::socketThread().

00090 { QF_STACK_PUSH(ThreadedSocketConnection::read)
00091 
00092   struct timeval timeout = { 1, 0 };
00093   fd_set readset = m_fds;
00094 
00095   try
00096   {
00097     // Wait for input (1 second timeout)
00098     int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
00099 
00100     if( result > 0 ) // Something to read
00101     {
00102       // We can read without blocking
00103       int size = recv( m_socket, m_buffer, sizeof(m_buffer), 0 );
00104       if ( size <= 0 ) { throw SocketRecvFailed( size ); }
00105       m_parser.addToStream( m_buffer, size );
00106     }
00107     else if( result == 0 && m_pSession ) // Timeout
00108     {
00109       m_pSession->next();
00110     }
00111     else if( result < 0 ) // Error
00112     {
00113       throw SocketRecvFailed( result );
00114     }
00115 
00116     processStream();
00117     return true;
00118   }
00119   catch ( SocketRecvFailed& e )
00120   {
00121     if( m_disconnect )
00122       return false;
00123 
00124     if( m_pSession )
00125     {
00126       m_pSession->getLog()->onEvent( e.what() );
00127       m_pSession->disconnect();
00128     }
00129     else
00130     {
00131       disconnect();
00132     }
00133 
00134     return false;
00135   }
00136 
00137   QF_STACK_POP
00138 }

bool FIX::ThreadedSocketConnection::readMessage ( std::string &  msg  )  throw ( SocketRecvFailed ) [private]

Definition at line 140 of file ThreadedSocketConnection.cpp.

References QF_STACK_POP, and QF_STACK_PUSH.

Referenced by processStream().

00142 { QF_STACK_PUSH(ThreadedSocketConnection::readMessage)
00143 
00144   try
00145   {
00146     return m_parser.readFixMessage( msg );
00147   }
00148   catch ( MessageParseError& ) {}
00149   return true;
00150 
00151   QF_STACK_POP
00152 }

bool FIX::ThreadedSocketConnection::send ( const std::string &  msg  )  [private, virtual]

Implements FIX::Responder.

Definition at line 68 of file ThreadedSocketConnection.cpp.

References m_socket, QF_STACK_POP, QF_STACK_PUSH, and FIX::socket_send().

00069 { QF_STACK_PUSH(ThreadedSocketConnection::send)
00070   return socket_send( m_socket, msg.c_str(), msg.length() ) >= 0;
00071   QF_STACK_POP
00072 }

bool FIX::ThreadedSocketConnection::setSession ( const std::string &  msg  )  [private]

Definition at line 182 of file ThreadedSocketConnection.cpp.

References FIX::Session::getSessionID(), FIX::Session::isSessionRegistered(), FIX::Session::lookupSession(), m_pLog, m_pSession, m_sessions, FIX::Log::onEvent(), FIX::Log::onIncoming(), FIX::process_sleep(), QF_STACK_POP, QF_STACK_PUSH, FIX::Session::registerSession(), and FIX::Session::setResponder().

Referenced by processStream().

00183 { QF_STACK_PUSH(ThreadedSocketConnection::setSession)
00184 
00185   m_pSession = Session::lookupSession( msg, true );
00186   if ( !m_pSession ) 
00187   {
00188     if( m_pLog )
00189     {
00190       m_pLog->onEvent( "Session not found for incoming message: " + msg );
00191       m_pLog->onIncoming( msg );
00192     }
00193     return false;
00194   }
00195 
00196   SessionID sessionID = m_pSession->getSessionID();
00197   m_pSession = 0;
00198 
00199   // see if the session frees up within 5 seconds
00200   for( int i = 1; i <= 5; i++ )
00201   {
00202     if( !Session::isSessionRegistered( sessionID ) )
00203       m_pSession = Session::registerSession( sessionID );
00204     if( m_pSession ) break;
00205     process_sleep( 1 );
00206   }
00207 
00208   if ( !m_pSession ) 
00209     return false;
00210   if ( m_sessions.find(m_pSession->getSessionID()) == m_sessions.end() )
00211     return false;
00212 
00213   m_pSession->setResponder( this );
00214   return true;
00215 
00216   QF_STACK_POP
00217 }


Member Data Documentation

Definition at line 70 of file ThreadedSocketConnection.h.

Referenced by connect().

Definition at line 73 of file ThreadedSocketConnection.h.

Definition at line 68 of file ThreadedSocketConnection.h.

Referenced by read().

Definition at line 78 of file ThreadedSocketConnection.h.

Referenced by disconnect(), and read().

Definition at line 79 of file ThreadedSocketConnection.h.

Referenced by read().

Definition at line 75 of file ThreadedSocketConnection.h.

Referenced by read().

Definition at line 74 of file ThreadedSocketConnection.h.

Referenced by setSession().

Definition at line 71 of file ThreadedSocketConnection.h.

Referenced by connect().

Definition at line 76 of file ThreadedSocketConnection.h.

Referenced by setSession().

Definition at line 67 of file ThreadedSocketConnection.h.

Referenced by disconnect(), getSocket(), read(), and send().


The documentation for this class was generated from the following files:

Generated on Mon Apr 5 21:00:13 2010 for QuickFIX by doxygen 1.6.1 written by Dimitri van Heesch, © 1997-2001