00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifdef _MSC_VER
00021 #include "stdafx.h"
00022 #else
00023 #include "config.h"
00024 #endif
00025 #include "CallStack.h"
00026
00027 #ifdef HAVE_ODBC
00028
00029 #ifndef SQLLEN
00030 #define SQLLEN SQLINTEGER
00031 #endif
00032
00033 #include "OdbcStore.h"
00034 #include "SessionID.h"
00035 #include "SessionSettings.h"
00036 #include "FieldConvertors.h"
00037 #include "Parser.h"
00038 #include "Utility.h"
00039 #include "strptime.h"
00040 #include <fstream>
00041
00042 namespace FIX
00043 {
00044
00045 const std::string OdbcStoreFactory::DEFAULT_USER = "sa";
00046 const std::string OdbcStoreFactory::DEFAULT_PASSWORD = "";
00047 const std::string OdbcStoreFactory::DEFAULT_CONNECTION_STRING
00048 = "DATABASE=quickfix;DRIVER={SQL Server};SERVER=(local);";
00049
00050 OdbcStore::OdbcStore
00051 ( const SessionID& s, const std::string& user, const std::string& password,
00052 const std::string& connectionString )
00053 : m_sessionID( s )
00054 {
00055 m_pConnection = new OdbcConnection( user, password, connectionString );
00056 populateCache();
00057 }
00058
00059 OdbcStore::~OdbcStore()
00060 {
00061 delete m_pConnection;
00062 }
00063
00064 void OdbcStore::populateCache()
00065 { QF_STACK_PUSH(OdbcStore::populateCache)
00066
00067 std::stringstream queryString;
00068
00069 queryString << "SELECT creation_time, incoming_seqnum, outgoing_seqnum FROM sessions WHERE "
00070 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00071 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00072 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00073 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00074
00075 OdbcQuery query( queryString.str() );
00076
00077 if( !m_pConnection->execute(query) )
00078 throw ConfigError( "Unable to connect to database" );
00079
00080 int rows = 0;
00081 while( query.fetch() )
00082 {
00083 rows++;
00084 if( rows > 1 )
00085 throw ConfigError( "Multiple entries found for session in database" );
00086
00087 SQL_TIMESTAMP_STRUCT creationTime;
00088 SQLLEN creationTimeLength;
00089 SQLGetData( query.statement(), 1, SQL_C_TYPE_TIMESTAMP, &creationTime, 0, &creationTimeLength );
00090 SQLLEN incomingSeqNum;
00091 SQLLEN incomingSeqNumLength;
00092 SQLGetData( query.statement(), 2, SQL_C_SLONG, &incomingSeqNum, 0, &incomingSeqNumLength );
00093
00094 SQLLEN outgoingSeqNum;
00095 SQLLEN outgoingSeqNumLength;
00096 SQLGetData( query.statement(), 3, SQL_C_SLONG, &outgoingSeqNum, 0, &outgoingSeqNumLength );
00097
00098 UtcTimeStamp time;
00099 time.setYMD( creationTime.year, creationTime.month, creationTime.day );
00100 time.setHMS( creationTime.hour, creationTime.minute, creationTime.second, creationTime.fraction );
00101 m_cache.setCreationTime( time );
00102 m_cache.setNextTargetMsgSeqNum( incomingSeqNum );
00103 m_cache.setNextSenderMsgSeqNum( outgoingSeqNum );
00104 }
00105 query.close();
00106
00107 if( rows == 0 )
00108 {
00109 UtcTimeStamp time = m_cache.getCreationTime();
00110 char sqlTime[ 20 ];
00111 int year, month, day, hour, minute, second, millis;
00112 time.getYMD (year, month, day);
00113 time.getHMS (hour, minute, second, millis);
00114 STRING_SPRINTF (sqlTime, "%d-%02d-%02d %02d:%02d:%02d",
00115 year, month, day, hour, minute, second);
00116 std::stringstream queryString2;
00117 queryString2 << "INSERT INTO sessions (beginstring, sendercompid, targetcompid, session_qualifier,"
00118 << "creation_time, incoming_seqnum, outgoing_seqnum) VALUES("
00119 << "'" << m_sessionID.getBeginString().getValue() << "',"
00120 << "'" << m_sessionID.getSenderCompID().getValue() << "',"
00121 << "'" << m_sessionID.getTargetCompID().getValue() << "',"
00122 << "'" << m_sessionID.getSessionQualifier() << "',"
00123 << "{ts '" << sqlTime << "'},"
00124 << m_cache.getNextTargetMsgSeqNum() << ","
00125 << m_cache.getNextSenderMsgSeqNum() << ")";
00126
00127 OdbcQuery query2( queryString2.str() );
00128 if( !m_pConnection->execute(query2) )
00129 throw ConfigError( "Unable to create session in database" );
00130 }
00131
00132 QF_STACK_POP
00133 }
00134
00135 MessageStore* OdbcStoreFactory::create( const SessionID& s )
00136 { QF_STACK_PUSH(OdbcStoreFactory::create)
00137
00138 if( m_useSettings )
00139 return create( s, m_settings.get(s) );
00140 else if( m_useDictionary )
00141 return create( s, m_dictionary );
00142 else
00143 return new OdbcStore( s, m_user, m_password, m_connectionString );
00144
00145 QF_STACK_POP
00146 }
00147
00148 void OdbcStoreFactory::destroy( MessageStore* pStore )
00149 { QF_STACK_PUSH(OdbcStoreFactory::destroy)
00150 delete pStore;
00151 QF_STACK_POP
00152 }
00153
00154 MessageStore* OdbcStoreFactory::create( const SessionID& s, const Dictionary& settings )
00155 { QF_STACK_PUSH(OdbcStoreFactory::create)
00156
00157 std::string user = DEFAULT_USER;
00158 std::string password = DEFAULT_PASSWORD;
00159 std::string connectionString = DEFAULT_CONNECTION_STRING;
00160
00161 try { user = settings.getString( ODBC_STORE_USER ); }
00162 catch( ConfigError& ) {}
00163
00164 try { password = settings.getString( ODBC_STORE_PASSWORD ); }
00165 catch( ConfigError& ) {}
00166
00167 try { connectionString = settings.getString( ODBC_STORE_CONNECTION_STRING ); }
00168 catch( ConfigError& ) {}
00169
00170 return new OdbcStore( s, user, password, connectionString );
00171
00172 QF_STACK_POP
00173 }
00174
00175 bool OdbcStore::set( int msgSeqNum, const std::string& msg )
00176 throw ( IOException )
00177 { QF_STACK_PUSH(OdbcStore::set)
00178
00179 std::string msgCopy = msg;
00180 string_replace( "'", "''", msgCopy );
00181
00182 std::stringstream queryString;
00183 queryString << "INSERT INTO messages "
00184 << "(beginstring, sendercompid, targetcompid, session_qualifier, msgseqnum, message) "
00185 << "VALUES ("
00186 << "'" << m_sessionID.getBeginString().getValue() << "',"
00187 << "'" << m_sessionID.getSenderCompID().getValue() << "',"
00188 << "'" << m_sessionID.getTargetCompID().getValue() << "',"
00189 << "'" << m_sessionID.getSessionQualifier() << "',"
00190 << msgSeqNum << ","
00191 << "'" << msgCopy << "')";
00192
00193 OdbcQuery query( queryString.str() );
00194 if( !m_pConnection->execute(query) )
00195 {
00196 query.close();
00197 std::stringstream queryString2;
00198 queryString2 << "UPDATE messages SET message='" << msgCopy << "' WHERE "
00199 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00200 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00201 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00202 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "' and "
00203 << "msgseqnum=" << msgSeqNum;
00204 OdbcQuery query2( queryString2.str() );
00205 if( !m_pConnection->execute(query2) )
00206 query2.throwException();
00207 }
00208 return true;
00209
00210 QF_STACK_POP
00211 }
00212
00213 void OdbcStore::get( int begin, int end,
00214 std::vector < std::string > & result ) const
00215 throw ( IOException )
00216 { QF_STACK_PUSH(OdbcStore::get)
00217
00218 result.clear();
00219 std::stringstream queryString;
00220 queryString << "SELECT message FROM messages WHERE "
00221 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00222 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00223 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00224 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "' and "
00225 << "msgseqnum>=" << begin << " and " << "msgseqnum<=" << end << " "
00226 << "ORDER BY msgseqnum";
00227
00228 OdbcQuery query( queryString.str() );
00229
00230 if( !m_pConnection->execute(query) )
00231 query.throwException();
00232
00233 while( query.fetch() )
00234 {
00235 std::string message;
00236 SQLVARCHAR messageBuffer[4096];
00237 SQLLEN messageLength;
00238
00239 while( odbcSuccess(SQLGetData( query.statement(), 1, SQL_C_CHAR, &messageBuffer, 4095, &messageLength)) )
00240 {
00241 messageBuffer[messageLength] = 0;
00242 message += (char*)messageBuffer;
00243 }
00244
00245 result.push_back( message );
00246 }
00247
00248 QF_STACK_POP
00249 }
00250
00251 int OdbcStore::getNextSenderMsgSeqNum() const throw ( IOException )
00252 { QF_STACK_PUSH(OdbcStore::getNextSenderMsgSeqNum)
00253 return m_cache.getNextSenderMsgSeqNum();
00254 QF_STACK_POP
00255 }
00256
00257 int OdbcStore::getNextTargetMsgSeqNum() const throw ( IOException )
00258 { QF_STACK_PUSH(OdbcStore::getNextTargetMsgSeqNum)
00259 return m_cache.getNextTargetMsgSeqNum();
00260 QF_STACK_POP
00261 }
00262
00263 void OdbcStore::setNextSenderMsgSeqNum( int value ) throw ( IOException )
00264 { QF_STACK_PUSH(OdbcStore::setNextSenderMsgSeqNum)
00265
00266 std::stringstream queryString;
00267 queryString << "UPDATE sessions SET outgoing_seqnum=" << value << " WHERE "
00268 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00269 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00270 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00271 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00272 OdbcQuery query( queryString.str() );
00273 if( !m_pConnection->execute(query) )
00274 query.throwException();
00275 m_cache.setNextSenderMsgSeqNum( value );
00276
00277 QF_STACK_POP
00278 }
00279
00280 void OdbcStore::setNextTargetMsgSeqNum( int value ) throw ( IOException )
00281 { QF_STACK_PUSH(OdbcStore::setNextTargetMsgSeqNum)
00282
00283 std::stringstream queryString;
00284 queryString << "UPDATE sessions SET incoming_seqnum=" << value << " WHERE "
00285 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00286 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00287 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00288 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00289
00290 OdbcQuery query( queryString.str() );
00291 if( !m_pConnection->execute(query) )
00292 query.throwException();
00293
00294 m_cache.setNextTargetMsgSeqNum( value );
00295
00296 QF_STACK_POP
00297 }
00298
00299 void OdbcStore::incrNextSenderMsgSeqNum() throw ( IOException )
00300 { QF_STACK_PUSH(OdbcStore::incrNextSenderMsgSeqNum)
00301 m_cache.incrNextSenderMsgSeqNum();
00302 setNextSenderMsgSeqNum( m_cache.getNextSenderMsgSeqNum() );
00303 QF_STACK_POP
00304 }
00305
00306 void OdbcStore::incrNextTargetMsgSeqNum() throw ( IOException )
00307 { QF_STACK_PUSH(OdbcStore::incrNextTargetMsgSeqNum)
00308 m_cache.incrNextTargetMsgSeqNum();
00309 setNextTargetMsgSeqNum( m_cache.getNextTargetMsgSeqNum() );
00310 QF_STACK_POP
00311 }
00312
00313 UtcTimeStamp OdbcStore::getCreationTime() const throw ( IOException )
00314 { QF_STACK_PUSH(OdbcStore::getCreationTime)
00315 return m_cache.getCreationTime();
00316 QF_STACK_POP
00317 }
00318
00319 void OdbcStore::reset() throw ( IOException )
00320 { QF_STACK_PUSH(OdbcStore::reset)
00321
00322 std::stringstream queryString;
00323 queryString << "DELETE FROM messages WHERE "
00324 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00325 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00326 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00327 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00328
00329 OdbcQuery query( queryString.str() );
00330 if( !m_pConnection->execute(query) )
00331 query.throwException();
00332 query.close();
00333
00334 m_cache.reset();
00335 UtcTimeStamp time = m_cache.getCreationTime();
00336
00337 int year, month, day, hour, minute, second, millis;
00338 time.getYMD( year, month, day );
00339 time.getHMS( hour, minute, second, millis );
00340
00341 char sqlTime[ 20 ];
00342 STRING_SPRINTF( sqlTime, "%d-%02d-%02d %02d:%02d:%02d",
00343 year, month, day, hour, minute, second );
00344
00345 std::stringstream queryString2;
00346 queryString2 << "UPDATE sessions SET creation_time={ts '" << sqlTime << "'}, "
00347 << "incoming_seqnum=" << m_cache.getNextTargetMsgSeqNum() << ", "
00348 << "outgoing_seqnum=" << m_cache.getNextSenderMsgSeqNum() << " WHERE "
00349 << "beginstring=" << "'" << m_sessionID.getBeginString().getValue() << "' and "
00350 << "sendercompid=" << "'" << m_sessionID.getSenderCompID().getValue() << "' and "
00351 << "targetcompid=" << "'" << m_sessionID.getTargetCompID().getValue() << "' and "
00352 << "session_qualifier=" << "'" << m_sessionID.getSessionQualifier() << "'";
00353
00354 OdbcQuery query2( queryString2.str() );
00355 if( !m_pConnection->execute(query2) )
00356 query2.throwException();
00357
00358 QF_STACK_POP
00359 }
00360
00361 void OdbcStore::refresh() throw ( IOException )
00362 { QF_STACK_PUSH(OdbcStore::refresh)
00363
00364 m_cache.reset();
00365 populateCache();
00366
00367 QF_STACK_POP
00368 }
00369
00370 }
00371
00372 #endif