51 #include <drizzled/current_session.h>
52 #include <drizzled/error.h>
53 #include <drizzled/gettext.h>
54 #include <drizzled/probes.h>
55 #include <drizzled/sql_parse.h>
56 #include <drizzled/session.h>
57 #include <drizzled/session/times.h>
58 #include <drizzled/sql_base.h>
59 #include <drizzled/replication_services.h>
60 #include <drizzled/transaction_services.h>
61 #include <drizzled/transaction_context.h>
62 #include <drizzled/message/transaction.pb.h>
64 #include <drizzled/resource_context.h>
65 #include <drizzled/lock.h>
66 #include <drizzled/item/int.h>
67 #include <drizzled/item/empty_string.h>
68 #include <drizzled/field/epoch.h>
69 #include <drizzled/plugin/client.h>
70 #include <drizzled/plugin/monitored_in_transaction.h>
71 #include <drizzled/plugin/transactional_storage_engine.h>
72 #include <drizzled/plugin/xa_resource_manager.h>
73 #include <drizzled/plugin/xa_storage_engine.h>
74 #include <drizzled/internal/my_sys.h>
75 #include <drizzled/statistics_variables.h>
76 #include <drizzled/system_variables.h>
77 #include <drizzled/session/transactions.h>
82 #include <google/protobuf/repeated_field.h>
85 using namespace google;
307 static plugin::XaStorageEngine& xa_storage_engine()
309 static plugin::XaStorageEngine& engine=
static_cast<plugin::XaStorageEngine&
>(*plugin::StorageEngine::findByName(
"InnoDB"));
313 void TransactionServices::registerResourceForStatement(
Session& session,
317 if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
326 registerResourceForTransaction(session, monitored, engine);
330 ResourceContext& resource_context= session.getResourceContext(*monitored, 0);
344 void TransactionServices::registerResourceForStatement(
Session& session,
349 if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
358 registerResourceForTransaction(session, monitored, engine, resource_manager);
362 ResourceContext& resource_context= session.getResourceContext(*monitored, 0);
376 void TransactionServices::registerResourceForTransaction(
Session& session,
381 ResourceContext& resource_context= session.getResourceContext(*monitored, 1);
386 session.server_status|= SERVER_STATUS_IN_TRANS;
397 if (session.transaction.xid_state.xid.is_null())
398 session.transaction.xid_state.xid.set(session.
getQueryId());
401 if (not session.getResourceContext(*monitored, 0).
isStarted())
402 registerResourceForStatement(session, monitored, engine);
405 void TransactionServices::registerResourceForTransaction(
Session& session,
411 ResourceContext& resource_context= session.getResourceContext(*monitored, 1);
416 session.server_status|= SERVER_STATUS_IN_TRANS;
427 if (session.transaction.xid_state.xid.is_null())
428 session.transaction.xid_state.xid.set(session.
getQueryId());
430 engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
433 if (! session.getResourceContext(*monitored, 0).
isStarted())
434 registerResourceForStatement(session, monitored, engine, resource_manager);
437 void TransactionServices::allocateNewTransactionId()
439 if (! ReplicationServices::isActive())
444 Session *my_session= current_session;
445 uint64_t xa_id= xa_storage_engine().getNewTransactionId(my_session);
446 my_session->setXaId(xa_id);
449 uint64_t TransactionServices::getCurrentTransactionId(Session& session)
451 if (session.getXaId() == 0)
453 session.setXaId(xa_storage_engine().getNewTransactionId(&session));
456 return session.getXaId();
459 int TransactionServices::commitTransaction(
Session& session,
460 bool normal_transaction)
462 int error= 0, cookie= 0;
468 TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
470 bool is_real_trans= normal_transaction || session.transaction.
all.getResourceContexts().empty();
478 assert(session.transaction.
stmt.getResourceContexts().empty() ||
479 trans == &session.transaction.
stmt);
481 if (resource_contexts.empty() ==
false)
483 if (is_real_trans && session.wait_if_global_read_lock(
false,
false))
485 rollbackTransaction(session, normal_transaction);
494 if (shouldConstructMessages())
496 BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
505 if (! resource_context->hasModifiedData())
512 if (
int err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction))
514 my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
519 session.status_var.ha_prepare_count++;
523 if (error == 0 && is_real_trans)
529 error= commitTransactionMessage(session);
533 rollbackTransaction(session, normal_transaction);
538 error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
541 session.startWaitingGlobalReadLock();
550 int TransactionServices::commitPhaseOne(
Session& session,
551 bool normal_transaction)
555 TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
557 bool is_real_trans= normal_transaction || session.transaction.
all.getResourceContexts().empty();
558 bool all= normal_transaction;
563 if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
566 if (resource_contexts.empty() ==
false)
568 BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
574 if (
int err= resource_context->getXaResourceManager()->xaCommit(&session, all))
576 my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
579 else if (normal_transaction)
581 session.status_var.ha_commit_count++;
586 if (
int err= resource_context->getTransactionalStorageEngine()->commit(&session, all))
588 my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
591 else if (normal_transaction)
593 session.status_var.ha_commit_count++;
596 resource_context->reset();
600 session.transaction.xid_state.xid.set_null();
602 if (normal_transaction)
604 session.
variables.tx_isolation= session.session_tx_isolation;
605 session.transaction.cleanup();
612 int TransactionServices::rollbackTransaction(
Session& session,
613 bool normal_transaction)
617 TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
619 bool is_real_trans= normal_transaction || session.transaction.
all.getResourceContexts().empty();
620 bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
626 assert(session.transaction.
stmt.getResourceContexts().empty() || trans == &session.transaction.
stmt);
628 if (resource_contexts.empty() ==
false)
630 BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
636 if (
int err= resource_context->getXaResourceManager()->xaRollback(&session, all))
638 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
641 else if (normal_transaction)
643 session.status_var.ha_rollback_count++;
648 if (
int err= resource_context->getTransactionalStorageEngine()->rollback(&session, all))
650 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
653 else if (normal_transaction)
655 session.status_var.ha_rollback_count++;
658 resource_context->reset();
669 rollbackTransactionMessage(session);
671 rollbackStatementMessage(session);
674 session.transaction.xid_state.xid.set_null();
675 if (normal_transaction)
677 session.
variables.tx_isolation=session.session_tx_isolation;
678 session.transaction.cleanup();
681 if (normal_transaction)
689 session.getKilled() != Session::KILL_CONNECTION)
691 push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
692 ER_WARNING_NOT_COMPLETE_ROLLBACK,
693 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
699 int TransactionServices::autocommitOrRollback(
Session& session,
704 if ((statement != NULL) && (! error))
705 finalizeStatementMessage(*statement, session);
707 if (session.transaction.
stmt.getResourceContexts().empty() ==
false)
710 TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
711 BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
713 resource_context->getTransactionalStorageEngine()->endStatement(&session);
718 if (commitTransaction(session,
false))
723 (void) rollbackTransaction(session,
false);
726 (void) rollbackTransaction(session,
true);
727 session.server_status&= ~SERVER_STATUS_IN_TRANS;
731 session.
variables.tx_isolation= session.session_tx_isolation;
747 int TransactionServices::rollbackToSavepoint(
Session& session,
752 TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
753 TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
755 trans->no_2pc=
false;
760 BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, sv_resource_contexts)
766 if (
int err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv))
768 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
773 session.status_var.ha_savepoint_rollback_count++;
783 TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
784 TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
785 TransactionContext::ResourceContexts set_difference_contexts;
792 set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
794 sort(sorted_tran_resource_contexts.begin(),
795 sorted_tran_resource_contexts.end(),
796 ResourceContextCompare());
797 sort(sorted_sv_resource_contexts.begin(),
798 sorted_sv_resource_contexts.end(),
799 ResourceContextCompare());
800 set_difference(sorted_tran_resource_contexts.begin(),
801 sorted_tran_resource_contexts.end(),
802 sorted_sv_resource_contexts.begin(),
803 sorted_sv_resource_contexts.end(),
804 set_difference_contexts.begin(),
805 ResourceContextCompare());
812 BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, set_difference_contexts)
814 plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
816 if (resource->participatesInSqlTransaction())
818 if (
int err= resource_context->getTransactionalStorageEngine()->rollback(&session,
true))
820 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
825 session.status_var.ha_rollback_count++;
828 resource_context->reset();
831 trans->setResourceContexts(sv_resource_contexts);
833 if (shouldConstructMessages())
835 cleanupTransactionMessage(getActiveTransactionMessage(session), session);
836 message::Transaction *savepoint_transaction= sv.getTransactionMessage();
837 if (savepoint_transaction != NULL)
844 message::Transaction *savepoint_transaction_copy=
new message::Transaction(*sv.getTransactionMessage());
845 uint32_t num_statements = savepoint_transaction_copy->statement_size();
846 if (num_statements == 0)
852 session.
setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
867 int TransactionServices::setSavepoint(
Session& session,
872 TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
874 if (resource_contexts.empty() ==
false)
876 BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
882 if (
int err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv))
884 my_error(ER_GET_ERRNO, MYF(0), err);
889 session.status_var.ha_savepoint_count++;
897 sv.setResourceContexts(resource_contexts);
899 if (shouldConstructMessages())
903 if (transaction != NULL)
907 sv.setTransactionMessage(transaction_savepoint);
914 int TransactionServices::releaseSavepoint(
Session& session,
919 TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
921 BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
927 if (
int err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv))
929 my_error(ER_GET_ERRNO, MYF(0), err);
938 bool TransactionServices::shouldConstructMessages()
940 return ReplicationServices::isActive();
944 bool should_inc_trx_id)
948 if (unlikely(transaction == NULL))
956 initTransactionMessage(*transaction, session, should_inc_trx_id);
964 bool should_inc_trx_id)
969 if (should_inc_trx_id)
971 trx->set_transaction_id(getCurrentTransactionId(session));
977 trx->set_transaction_id(0);
980 trx->set_start_timestamp(session.times.getCurrentTimestamp());
983 transaction.set_segment_id(1);
984 transaction.set_end_segment(
true);
991 trx->set_end_timestamp(session.times.getCurrentTimestamp());
1003 int TransactionServices::commitTransactionMessage(
Session& session)
1005 if (! ReplicationServices::isActive())
1018 if (statement != NULL)
1020 finalizeStatementMessage(*statement, session);
1031 if (transaction->statement_size() == 0)
1033 cleanupTransactionMessage(transaction, session);
1037 finalizeTransactionMessage(*transaction, session);
1039 plugin::ReplicationReturnCode result= ReplicationServices::pushTransactionMessage(session, *transaction);
1041 cleanupTransactionMessage(transaction, session);
1043 return static_cast<int>(result);
1047 message::Statement::Type type,
1050 statement.set_type(type);
1051 statement.set_start_timestamp(session.times.getCurrentTimestamp());
1054 statement.set_sql(session.getQueryString()->c_str());
1060 statement.set_end_timestamp(session.times.getCurrentTimestamp());
1064 void TransactionServices::rollbackTransactionMessage(
Session& session)
1066 if (! ReplicationServices::isActive())
1088 if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1091 uint64_t trx_id= transaction->transaction_context().transaction_id();
1092 uint32_t seg_id= transaction->segment_id();
1098 transaction->Clear();
1099 initTransactionMessage(*transaction, session,
false);
1102 transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1103 transaction->set_segment_id(seg_id);
1104 transaction->set_end_segment(
true);
1108 initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1109 finalizeStatementMessage(*statement, session);
1111 finalizeTransactionMessage(*transaction, session);
1113 (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1116 cleanupTransactionMessage(transaction, session);
1119 void TransactionServices::rollbackStatementMessage(
Session& session)
1121 if (! ReplicationServices::isActive())
1127 if (current_statement == NULL)
1136 bool is_segmented=
false;
1138 switch (current_statement->type())
1140 case message::Statement::INSERT:
1141 if (current_statement->insert_data().segment_id() > 1)
1145 case message::Statement::UPDATE:
1146 if (current_statement->update_data().segment_id() > 1)
1150 case message::Statement::DELETE:
1151 if (current_statement->delete_data().segment_id() > 1)
1164 google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1165 statements_in_txn= transaction->mutable_statement();
1166 statements_in_txn->RemoveLast();
1176 current_statement= transaction->add_statement();
1177 initStatementMessage(*current_statement,
1178 message::Statement::ROLLBACK_STATEMENT,
1180 finalizeStatementMessage(*current_statement, session);
1187 uint64_t trx_id= transaction->transaction_context().transaction_id();
1188 uint32_t seg_id= transaction->segment_id();
1190 transaction->set_end_segment(
false);
1191 commitTransactionMessage(session);
1192 transaction= getActiveTransactionMessage(session,
false);
1195 transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1196 transaction->set_segment_id(seg_id + 1);
1197 transaction->set_end_segment(
true);
1204 uint32_t *next_segment_id)
1216 if (statement == NULL)
1218 transaction= getActiveTransactionMessage(session);
1220 if (static_cast<size_t>(transaction->ByteSize()) >=
1221 transaction_message_threshold)
1223 transaction= segmentTransactionMessage(session, transaction);
1226 statement= transaction->add_statement();
1227 setInsertHeader(*statement, session, table);
1232 transaction= getActiveTransactionMessage(session);
1239 if (static_cast<size_t>(transaction->ByteSize()) >=
1240 transaction_message_threshold)
1243 uint64_t trx_id= transaction->transaction_context().transaction_id();
1244 uint32_t seg_id= transaction->segment_id();
1249 *next_segment_id= current_data->segment_id() + 1;
1251 current_data->set_end_segment(
false);
1252 transaction->set_end_segment(
false);
1259 commitTransactionMessage(session);
1266 transaction= getActiveTransactionMessage(session,
false);
1267 assert(transaction != NULL);
1269 statement= transaction->add_statement();
1270 setInsertHeader(*statement, session, table);
1274 transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1275 transaction->set_segment_id(seg_id + 1);
1276 transaction->set_end_segment(
true);
1285 *next_segment_id= current_data.segment_id();
1296 initStatementMessage(statement, message::Statement::INSERT, session);
1306 table_metadata->set_schema_name(table.getShare()->getSchemaName());
1307 table_metadata->set_table_name(table.getShare()->getTableName());
1309 Field **table_fields= table.getFields();
1316 while (
Field* current_field= *table_fields++)
1318 field_metadata= header->add_field_metadata();
1319 field_metadata->set_name(current_field->field_name);
1320 field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1324 bool TransactionServices::insertRecord(
Session& session,
1327 if (! ReplicationServices::isActive())
1330 if (not table.getShare()->is_replicated())
1341 if (not table.getShare()->hasPrimaryKey())
1343 my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1347 uint32_t next_segment_id= 1;
1351 data->set_segment_id(next_segment_id);
1352 data->set_end_segment(
true);
1355 Field *current_field;
1356 Field **table_fields= table.getFields();
1359 string_value->set_charset(system_charset_info);
1364 while ((current_field= *table_fields++) != NULL)
1366 if (current_field->is_null())
1368 record->add_is_null(
true);
1369 record->add_insert_value(
"", 0);
1373 string_value= current_field->val_str_internal(string_value);
1374 record->add_is_null(
false);
1375 record->add_insert_value(string_value->c_ptr(), string_value->length());
1376 string_value->free();
1384 const unsigned char *old_record,
1385 const unsigned char *new_record,
1386 uint32_t *next_segment_id)
1398 if (statement == NULL)
1400 transaction= getActiveTransactionMessage(session);
1402 if (static_cast<size_t>(transaction->ByteSize()) >=
1403 transaction_message_threshold)
1405 transaction= segmentTransactionMessage(session, transaction);
1408 statement= transaction->add_statement();
1409 setUpdateHeader(*statement, session, table, old_record, new_record);
1414 transaction= getActiveTransactionMessage(session);
1421 if (static_cast<size_t>(transaction->ByteSize()) >=
1422 transaction_message_threshold)
1425 uint64_t trx_id= transaction->transaction_context().transaction_id();
1426 uint32_t seg_id= transaction->segment_id();
1431 *next_segment_id= current_data->segment_id() + 1;
1433 current_data->set_end_segment(
false);
1434 transaction->set_end_segment(
false);
1441 commitTransactionMessage(session);
1448 transaction= getActiveTransactionMessage(session,
false);
1449 assert(transaction != NULL);
1451 statement= transaction->add_statement();
1452 setUpdateHeader(*statement, session, table, old_record, new_record);
1456 transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1457 transaction->set_segment_id(seg_id + 1);
1458 transaction->set_end_segment(
true);
1467 *next_segment_id= current_data.segment_id();
1477 const unsigned char *old_record,
1478 const unsigned char *new_record)
1480 initStatementMessage(statement, message::Statement::UPDATE, session);
1490 table_metadata->set_schema_name(table.getShare()->getSchemaName());
1491 table_metadata->set_table_name(table.getShare()->getTableName());
1493 Field *current_field;
1494 Field **table_fields= table.getFields();
1501 while ((current_field= *table_fields++) != NULL)
1509 field_metadata= header->add_key_field_metadata();
1510 field_metadata->set_name(current_field->
field_name);
1511 field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1514 if (isFieldUpdated(current_field, table, old_record, new_record))
1517 field_metadata= header->add_set_field_metadata();
1518 field_metadata->set_name(current_field->
field_name);
1519 field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1524 void TransactionServices::updateRecord(
Session& session,
1526 const unsigned char *old_record,
1527 const unsigned char *new_record)
1529 if (! ReplicationServices::isActive())
1532 if (not table.getShare()->is_replicated())
1535 uint32_t next_segment_id= 1;
1536 message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1539 data->set_segment_id(next_segment_id);
1540 data->set_end_segment(
true);
1543 Field *current_field;
1544 Field **table_fields= table.getFields();
1546 string_value->set_charset(system_charset_info);
1548 while ((current_field= *table_fields++) != NULL)
1561 if (isFieldUpdated(current_field, table, old_record, new_record))
1564 bool is_read_set= current_field->isReadSet();
1567 table.setReadSet(current_field->position());
1570 string_value= current_field->val_str_internal(string_value);
1576 current_field->setReadSet(is_read_set);
1578 if (current_field->is_null())
1580 record->add_is_null(
true);
1581 record->add_after_value(
"", 0);
1585 record->add_is_null(
false);
1586 record->add_after_value(string_value->c_ptr(), string_value->length());
1588 string_value->free();
1603 string_value= current_field->val_str_internal(string_value,
1605 current_field->offset(const_cast<unsigned char *>(new_record)));
1606 record->add_key_value(string_value->c_ptr(), string_value->length());
1607 string_value->free();
1613 bool TransactionServices::isFieldUpdated(
Field *current_field,
1615 const unsigned char *old_record,
1616 const unsigned char *new_record)
1623 const unsigned char *old_ptr= (
const unsigned char *) old_record + (ptrdiff_t) (current_field->
ptr - table.getInsertRecord());
1624 const unsigned char *new_ptr= (
const unsigned char *) new_record + (ptrdiff_t) (current_field->
ptr - table.getInsertRecord());
1626 uint32_t field_length= current_field->
pack_length();
1628 bool old_value_is_null= current_field->is_null_in_record(old_record);
1629 bool new_value_is_null= current_field->is_null_in_record(new_record);
1631 bool isUpdated=
false;
1632 if (old_value_is_null != new_value_is_null)
1634 if ((old_value_is_null) && (! new_value_is_null))
1638 else if ((! old_value_is_null) && (new_value_is_null))
1646 if (memcmp(old_ptr, new_ptr, field_length) != 0)
1656 uint32_t *next_segment_id)
1668 if (statement == NULL)
1670 transaction= getActiveTransactionMessage(session);
1672 if (static_cast<size_t>(transaction->ByteSize()) >=
1673 transaction_message_threshold)
1675 transaction= segmentTransactionMessage(session, transaction);
1678 statement= transaction->add_statement();
1679 setDeleteHeader(*statement, session, table);
1684 transaction= getActiveTransactionMessage(session);
1691 if (static_cast<size_t>(transaction->ByteSize()) >=
1692 transaction_message_threshold)
1695 uint64_t trx_id= transaction->transaction_context().transaction_id();
1696 uint32_t seg_id= transaction->segment_id();
1701 *next_segment_id= current_data->segment_id() + 1;
1703 current_data->set_end_segment(
false);
1704 transaction->set_end_segment(
false);
1711 commitTransactionMessage(session);
1718 transaction= getActiveTransactionMessage(session,
false);
1719 assert(transaction != NULL);
1721 statement= transaction->add_statement();
1722 setDeleteHeader(*statement, session, table);
1726 transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1727 transaction->set_segment_id(seg_id + 1);
1728 transaction->set_end_segment(
true);
1737 *next_segment_id= current_data.segment_id();
1748 initStatementMessage(statement, message::Statement::DELETE, session);
1757 table_metadata->set_schema_name(table.getShare()->getSchemaName());
1758 table_metadata->set_table_name(table.getShare()->getTableName());
1760 Field *current_field;
1761 Field **table_fields= table.getFields();
1765 while ((current_field= *table_fields++) != NULL)
1774 field_metadata= header->add_key_field_metadata();
1775 field_metadata->set_name(current_field->
field_name);
1776 field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1781 void TransactionServices::deleteRecord(
Session& session,
1783 bool use_update_record)
1785 if (! ReplicationServices::isActive())
1788 if (not table.getShare()->is_replicated())
1791 uint32_t next_segment_id= 1;
1795 data->set_segment_id(next_segment_id);
1796 data->set_end_segment(
true);
1799 Field *current_field;
1800 Field **table_fields= table.getFields();
1802 string_value->set_charset(system_charset_info);
1804 while ((current_field= *table_fields++) != NULL)
1813 if (use_update_record)
1822 const unsigned char *old_ptr= current_field->
ptr;
1823 current_field->
ptr= table.getUpdateRecord() +
static_cast<ptrdiff_t
>(old_ptr - table.getInsertRecord());
1824 string_value= current_field->val_str_internal(string_value);
1825 current_field->
ptr=
const_cast<unsigned char *
>(old_ptr);
1829 string_value= current_field->val_str_internal(string_value);
1834 record->add_key_value(string_value->c_ptr(), string_value->length());
1835 string_value->free();
1840 void TransactionServices::createTable(
Session& session,
1843 if (not ReplicationServices::isActive())
1846 if (not message::is_replicated(table))
1852 initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1859 message::Table *new_table_message= create_table_statement->mutable_table();
1860 *new_table_message= table;
1862 finalizeStatementMessage(*statement, session);
1864 finalizeTransactionMessage(*transaction, session);
1866 (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1868 cleanupTransactionMessage(transaction, session);
1872 void TransactionServices::createSchema(
Session& session,
1875 if (! ReplicationServices::isActive())
1878 if (not message::is_replicated(schema))
1884 initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1891 message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1892 *new_schema_message= schema;
1894 finalizeStatementMessage(*statement, session);
1896 finalizeTransactionMessage(*transaction, session);
1898 (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1900 cleanupTransactionMessage(transaction, session);
1904 void TransactionServices::dropSchema(
Session& session,
1908 if (not ReplicationServices::isActive())
1911 if (not message::is_replicated(schema))
1917 initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1925 drop_schema_statement->set_schema_name(identifier.getSchemaName());
1927 finalizeStatementMessage(*statement, session);
1929 finalizeTransactionMessage(*transaction, session);
1931 (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1933 cleanupTransactionMessage(transaction, session);
1936 void TransactionServices::alterSchema(
Session& session,
1940 if (! ReplicationServices::isActive())
1943 if (not message::is_replicated(old_schema))
1949 initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
1960 *before= old_schema;
1963 finalizeStatementMessage(*statement, session);
1965 finalizeTransactionMessage(*transaction, session);
1967 (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1969 cleanupTransactionMessage(transaction, session);
1972 void TransactionServices::dropTable(
Session& session,
1977 if (! ReplicationServices::isActive())
1980 if (not message::is_replicated(table))
1986 initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
1994 drop_table_statement->set_if_exists_clause(if_exists);
1998 table_metadata->set_schema_name(identifier.getSchemaName());
1999 table_metadata->set_table_name(identifier.getTableName());
2001 finalizeStatementMessage(*statement, session);
2003 finalizeTransactionMessage(*transaction, session);
2005 (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2007 cleanupTransactionMessage(transaction, session);
2012 if (! ReplicationServices::isActive())
2015 if (not table.getShare()->is_replicated())
2021 initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2030 table_metadata->set_schema_name(table.getShare()->getSchemaName());
2031 table_metadata->set_table_name(table.getShare()->getTableName());
2033 finalizeStatementMessage(*statement, session);
2035 finalizeTransactionMessage(*transaction, session);
2037 (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2039 cleanupTransactionMessage(transaction, session);
2042 void TransactionServices::rawStatement(
Session& session,
2043 const string &query,
2044 const string &schema)
2046 if (! ReplicationServices::isActive())
2052 initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2053 statement->set_sql(query);
2054 if (not schema.empty())
2055 statement->set_raw_sql_schema(schema);
2056 finalizeStatementMessage(*statement, session);
2058 finalizeTransactionMessage(*transaction, session);
2060 (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2062 cleanupTransactionMessage(transaction, session);
2067 if (not ReplicationServices::isActive())
2072 initTransactionMessage(transaction, session,
true);
2075 finalizeTransactionMessage(transaction, session);
2078 trx_event->CopyFrom(event);
2079 plugin::ReplicationReturnCode result= ReplicationServices::pushTransactionMessage(session, transaction);
2083 bool TransactionServices::sendStartupEvent(
Session& session)
2086 event.set_type(message::Event::STARTUP);
2087 return not sendEvent(session, event);
2090 bool TransactionServices::sendShutdownEvent(
Session& session)
2093 event.set_type(message::Event::SHUTDOWN);
2094 return not sendEvent(session, event);
plugin::MonitoredInTransaction * getMonitored() const
virtual uint32_t pack_length() const
message::Statement * getStatementMessage() const
bool fieldInPrimaryKey(Field *field) const
bool transaction_rollback_request
bool hasModifiedNonTransData() const
virtual bool participatesInXaTransaction() const =0
message::Transaction * getTransactionMessage() const
void setXaResourceManager(plugin::XaResourceManager *in_xa_resource_manager)
void setTransactionalStorageEngine(plugin::TransactionalStorageEngine *in_trx_storage_engine)
virtual bool participatesInSqlTransaction() const =0
void setTransactionMessage(message::Transaction *in_message)
query_id_t getQueryId() const
void setMonitored(plugin::MonitoredInTransaction *in_monitored)
uint32_t getServerId() const
drizzle_system_variables & variables
void setStatementMessage(message::Statement *in_message)
void registerResource(ResourceContext *resource)