ThreadedSocketInitiator.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #include "ThreadedSocketInitiator.h"
00028 #include "Session.h"
00029 #include "Settings.h"
00030
00031 namespace FIX
00032 {
00033 ThreadedSocketInitiator::ThreadedSocketInitiator(
00034 Application& application,
00035 MessageStoreFactory& factory,
00036 const SessionSettings& settings ) throw( ConfigError )
00037 : Initiator( application, factory, settings ),
00038 m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
00039 m_sendBufSize( 0 ), m_rcvBufSize( 0 )
00040 {
00041 socket_init();
00042 }
00043
00044 ThreadedSocketInitiator::ThreadedSocketInitiator(
00045 Application& application,
00046 MessageStoreFactory& factory,
00047 const SessionSettings& settings,
00048 LogFactory& logFactory ) throw( ConfigError )
00049 : Initiator( application, factory, settings, logFactory ),
00050 m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
00051 m_sendBufSize( 0 ), m_rcvBufSize( 0 )
00052 {
00053 socket_init();
00054 }
00055
00056 ThreadedSocketInitiator::~ThreadedSocketInitiator()
00057 {
00058 socket_term();
00059 }
00060
00061 void ThreadedSocketInitiator::onConfigure( const SessionSettings& s )
00062 throw ( ConfigError )
00063 { QF_STACK_PUSH(ThreadedSocketInitiator::onConfigure)
00064
00065 try { m_reconnectInterval = s.get().getLong( "ReconnectInterval" ); }
00066 catch ( std::exception& ) {}
00067 if( s.get().has( SOCKET_NODELAY ) )
00068 m_noDelay = s.get().getBool( SOCKET_NODELAY );
00069 if( s.get().has( SOCKET_SEND_BUFFER_SIZE ) )
00070 m_sendBufSize = s.get().getLong( SOCKET_SEND_BUFFER_SIZE );
00071 if( s.get().has( SOCKET_RECEIVE_BUFFER_SIZE ) )
00072 m_rcvBufSize = s.get().getLong( SOCKET_RECEIVE_BUFFER_SIZE );
00073
00074 QF_STACK_POP
00075 }
00076
00077 void ThreadedSocketInitiator::onInitialize( const SessionSettings& s )
00078 throw ( RuntimeError )
00079 { QF_STACK_PUSH(ThreadedSocketInitiator::onInitialize)
00080 QF_STACK_POP
00081 }
00082
00083 void ThreadedSocketInitiator::onStart()
00084 { QF_STACK_PUSH(ThreadedSocketInitiator::onStart)
00085
00086 while ( !isStopped() )
00087 {
00088 time_t now;
00089 ::time( &now );
00090
00091 if ( (now - m_lastConnect) >= m_reconnectInterval )
00092 {
00093 Locker l( m_mutex );
00094 connect();
00095 m_lastConnect = now;
00096 }
00097
00098 process_sleep( 1 );
00099 }
00100
00101 QF_STACK_POP
00102 }
00103
00104 bool ThreadedSocketInitiator::onPoll( double timeout )
00105 { QF_STACK_PUSH(ThreadedSocketInitiator::onPoll)
00106
00107 return false;
00108
00109 QF_STACK_POP
00110 }
00111
00112 void ThreadedSocketInitiator::onStop()
00113 { QF_STACK_PUSH(ThreadedSocketInitiator::onStop)
00114
00115 SocketToThread threads;
00116 SocketToThread::iterator i;
00117
00118 {
00119 Locker l(m_mutex);
00120
00121 time_t start = 0;
00122 time_t now = 0;
00123
00124 ::time( &start );
00125 while ( isLoggedOn() )
00126 {
00127 if( ::time(&now) -5 >= start )
00128 break;
00129 }
00130
00131 threads = m_threads;
00132 m_threads.clear();
00133 }
00134
00135 for ( i = threads.begin(); i != threads.end(); ++i )
00136 socket_close( i->first );
00137
00138 for ( i = threads.begin(); i != threads.end(); ++i )
00139 thread_join( i->second );
00140 threads.clear();
00141
00142 QF_STACK_POP
00143 }
00144
00145 void ThreadedSocketInitiator::doConnect( const SessionID& s, const Dictionary& d )
00146 { QF_STACK_PUSH(ThreadedSocketInitiator::doConnect)
00147
00148 try
00149 {
00150 Session* session = Session::lookupSession( s );
00151 if( !session->isSessionTime(UtcTimeStamp()) ) return;
00152
00153 Log* log = session->getLog();
00154
00155 std::string address;
00156 short port = 0;
00157 getHost( s, d, address, port );
00158
00159 int socket = socket_createConnector();
00160 if( m_noDelay )
00161 socket_setsockopt( socket, TCP_NODELAY );
00162 if( m_sendBufSize )
00163 socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize );
00164 if( m_rcvBufSize )
00165 socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize );
00166
00167 setPending( s );
00168 log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) );
00169
00170 ThreadedSocketConnection* pConnection =
00171 new ThreadedSocketConnection( s, socket, address, port, getApplication(), getLog() );
00172
00173 ThreadPair* pair = new ThreadPair( this, pConnection );
00174
00175 {
00176 Locker l( m_mutex );
00177 thread_id thread;
00178 if ( thread_spawn( &socketThread, pair, thread ) )
00179 {
00180 addThread( socket, thread );
00181 }
00182 else
00183 {
00184 delete pair;
00185 pConnection->disconnect();
00186 delete pConnection;
00187 setDisconnected( s );
00188 }
00189 }
00190 }
00191 catch ( std::exception& ) {}
00192
00193 QF_STACK_POP
00194 }
00195
00196 void ThreadedSocketInitiator::addThread( int s, thread_id t )
00197 { QF_STACK_PUSH(ThreadedSocketInitiator::addThread)
00198
00199 Locker l(m_mutex);
00200
00201 m_threads[ s ] = t;
00202 QF_STACK_POP
00203 }
00204
00205 void ThreadedSocketInitiator::removeThread( int s )
00206 { QF_STACK_PUSH(ThreadedSocketInitiator::removeThread)
00207
00208 Locker l(m_mutex);
00209 SocketToThread::iterator i = m_threads.find( s );
00210
00211 if ( i != m_threads.end() )
00212 {
00213 thread_detach( i->second );
00214 m_threads.erase( i );
00215 }
00216
00217 QF_STACK_POP
00218 }
00219
00220 THREAD_PROC ThreadedSocketInitiator::socketThread( void* p )
00221 { QF_STACK_TRY
00222 QF_STACK_PUSH(ThreadedSocketInitiator::socketThread)
00223
00224 ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
00225
00226 ThreadedSocketInitiator* pInitiator = pair->first;
00227 ThreadedSocketConnection* pConnection = pair->second;
00228 FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
00229 FIX::Session* pSession = FIX::Session::lookupSession( sessionID );
00230 int socket = pConnection->getSocket();
00231 delete pair;
00232
00233 pInitiator->lock();
00234
00235 if( !pConnection->connect() )
00236 {
00237 pInitiator->getLog()->onEvent( "Connection failed" );
00238 pConnection->disconnect();
00239 delete pConnection;
00240 pInitiator->removeThread( socket );
00241 pInitiator->setDisconnected( sessionID );
00242 return 0;
00243 }
00244
00245 pInitiator->setConnected( sessionID );
00246 pInitiator->getLog()->onEvent( "Connection succeeded" );
00247
00248 pSession->next();
00249
00250 while ( pConnection->read() ) {}
00251
00252 delete pConnection;
00253 if( !pInitiator->isStopped() )
00254 pInitiator->removeThread( socket );
00255
00256 pInitiator->setDisconnected( sessionID );
00257 return 0;
00258
00259 QF_STACK_POP
00260 QF_STACK_CATCH
00261 }
00262
00263 void ThreadedSocketInitiator::getHost( const SessionID& s, const Dictionary& d,
00264 std::string& address, short& port )
00265 { QF_STACK_PUSH(ThreadedSocketInitiator::getHost)
00266
00267 int num = 0;
00268 SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
00269 if ( i != m_sessionToHostNum.end() ) num = i->second;
00270
00271 std::stringstream hostStream;
00272 hostStream << SOCKET_CONNECT_HOST << num;
00273 std::string hostString = hostStream.str();
00274
00275 std::stringstream portStream;
00276 std::string portString = portStream.str();
00277 portStream << SOCKET_CONNECT_PORT << num;
00278
00279 if( d.has(hostString) && d.has(portString) )
00280 {
00281 address = d.getString( hostString );
00282 port = ( short ) d.getLong( portString );
00283 }
00284 else
00285 {
00286 num = 0;
00287 address = d.getString( SOCKET_CONNECT_HOST );
00288 port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
00289 }
00290
00291 m_sessionToHostNum[ s ] = ++num;
00292
00293 QF_STACK_POP
00294 }
00295
00296 }