Drizzled Public API Documentation

transaction_services.cc
1 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2008 Sun Microsystems, Inc.
5  * Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; version 2 of the License.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
50 #include <config.h>
51 #include <drizzled/current_session.h>
52 #include <drizzled/error.h>
53 #include <drizzled/gettext.h>
54 #include <drizzled/probes.h>
55 #include <drizzled/sql_parse.h>
56 #include <drizzled/session.h>
57 #include <drizzled/session/times.h>
58 #include <drizzled/sql_base.h>
59 #include <drizzled/replication_services.h>
60 #include <drizzled/transaction_services.h>
61 #include <drizzled/transaction_context.h>
62 #include <drizzled/message/transaction.pb.h>
64 #include <drizzled/resource_context.h>
65 #include <drizzled/lock.h>
66 #include <drizzled/item/int.h>
67 #include <drizzled/item/empty_string.h>
68 #include <drizzled/field/epoch.h>
69 #include <drizzled/plugin/client.h>
70 #include <drizzled/plugin/monitored_in_transaction.h>
71 #include <drizzled/plugin/transactional_storage_engine.h>
72 #include <drizzled/plugin/xa_resource_manager.h>
73 #include <drizzled/plugin/xa_storage_engine.h>
74 #include <drizzled/internal/my_sys.h>
75 #include <drizzled/statistics_variables.h>
76 #include <drizzled/system_variables.h>
77 #include <drizzled/session/transactions.h>
78 
79 #include <vector>
80 #include <algorithm>
81 #include <functional>
82 #include <google/protobuf/repeated_field.h>
83 
84 using namespace std;
85 using namespace google;
86 
87 namespace drizzled {
88 
307 static plugin::XaStorageEngine& xa_storage_engine()
308 {
309  static plugin::XaStorageEngine& engine= static_cast<plugin::XaStorageEngine&>(*plugin::StorageEngine::findByName("InnoDB"));
310  return engine;
311 }
312 
313 void TransactionServices::registerResourceForStatement(Session& session,
316 {
317  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
318  {
319  /*
320  * Now we automatically register this resource manager for the
321  * normal transaction. This is fine because a statement
322  * transaction registration should always enlist the resource
323  * in the normal transaction which contains the statement
324  * transaction.
325  */
326  registerResourceForTransaction(session, monitored, engine);
327  }
328 
329  TransactionContext& trans= session.transaction.stmt;
330  ResourceContext& resource_context= session.getResourceContext(*monitored, 0);
331 
332  if (resource_context.isStarted())
333  return; /* already registered, return */
334 
335  assert(monitored->participatesInSqlTransaction());
336  assert(not monitored->participatesInXaTransaction());
337 
338  resource_context.setMonitored(monitored);
339  resource_context.setTransactionalStorageEngine(engine);
340  trans.registerResource(&resource_context);
341  trans.no_2pc= true;
342 }
343 
344 void TransactionServices::registerResourceForStatement(Session& session,
347  plugin::XaResourceManager *resource_manager)
348 {
349  if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
350  {
351  /*
352  * Now we automatically register this resource manager for the
353  * normal transaction. This is fine because a statement
354  * transaction registration should always enlist the resource
355  * in the normal transaction which contains the statement
356  * transaction.
357  */
358  registerResourceForTransaction(session, monitored, engine, resource_manager);
359  }
360 
361  TransactionContext& trans= session.transaction.stmt;
362  ResourceContext& resource_context= session.getResourceContext(*monitored, 0);
363 
364  if (resource_context.isStarted())
365  return; /* already registered, return */
366 
367  assert(monitored->participatesInXaTransaction());
368  assert(monitored->participatesInSqlTransaction());
369 
370  resource_context.setMonitored(monitored);
371  resource_context.setTransactionalStorageEngine(engine);
372  resource_context.setXaResourceManager(resource_manager);
373  trans.registerResource(&resource_context);
374 }
375 
376 void TransactionServices::registerResourceForTransaction(Session& session,
379 {
380  TransactionContext& trans= session.transaction.all;
381  ResourceContext& resource_context= session.getResourceContext(*monitored, 1);
382 
383  if (resource_context.isStarted())
384  return; /* already registered, return */
385 
386  session.server_status|= SERVER_STATUS_IN_TRANS;
387 
388  trans.registerResource(&resource_context);
389 
390  assert(monitored->participatesInSqlTransaction());
391  assert(not monitored->participatesInXaTransaction());
392 
393  resource_context.setMonitored(monitored);
394  resource_context.setTransactionalStorageEngine(engine);
395  trans.no_2pc= true;
396 
397  if (session.transaction.xid_state.xid.is_null())
398  session.transaction.xid_state.xid.set(session.getQueryId());
399 
400  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
401  if (not session.getResourceContext(*monitored, 0).isStarted())
402  registerResourceForStatement(session, monitored, engine);
403 }
404 
405 void TransactionServices::registerResourceForTransaction(Session& session,
408  plugin::XaResourceManager *resource_manager)
409 {
410  TransactionContext *trans= &session.transaction.all;
411  ResourceContext& resource_context= session.getResourceContext(*monitored, 1);
412 
413  if (resource_context.isStarted())
414  return; /* already registered, return */
415 
416  session.server_status|= SERVER_STATUS_IN_TRANS;
417 
418  trans->registerResource(&resource_context);
419 
420  assert(monitored->participatesInSqlTransaction());
421 
422  resource_context.setMonitored(monitored);
423  resource_context.setXaResourceManager(resource_manager);
424  resource_context.setTransactionalStorageEngine(engine);
425  trans->no_2pc= true;
426 
427  if (session.transaction.xid_state.xid.is_null())
428  session.transaction.xid_state.xid.set(session.getQueryId());
429 
430  engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
431 
432  /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
433  if (! session.getResourceContext(*monitored, 0).isStarted())
434  registerResourceForStatement(session, monitored, engine, resource_manager);
435 }
436 
437 void TransactionServices::allocateNewTransactionId()
438 {
439  if (! ReplicationServices::isActive())
440  {
441  return;
442  }
443 
444  Session *my_session= current_session;
445  uint64_t xa_id= xa_storage_engine().getNewTransactionId(my_session);
446  my_session->setXaId(xa_id);
447 }
448 
449 uint64_t TransactionServices::getCurrentTransactionId(Session& session)
450 {
451  if (session.getXaId() == 0)
452  {
453  session.setXaId(xa_storage_engine().getNewTransactionId(&session));
454  }
455 
456  return session.getXaId();
457 }
458 
459 int TransactionServices::commitTransaction(Session& session,
460  bool normal_transaction)
461 {
462  int error= 0, cookie= 0;
463  /*
464  'all' means that this is either an explicit commit issued by
465  user, or an implicit commit issued by a DDL.
466  */
467  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
468  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
469 
470  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
471 
472  /*
473  We must not commit the normal transaction if a statement
474  transaction is pending. Otherwise statement transaction
475  flags will not get propagated to its normal transaction's
476  counterpart.
477  */
478  assert(session.transaction.stmt.getResourceContexts().empty() ||
479  trans == &session.transaction.stmt);
480 
481  if (resource_contexts.empty() == false)
482  {
483  if (is_real_trans && session.wait_if_global_read_lock(false, false))
484  {
485  rollbackTransaction(session, normal_transaction);
486  return 1;
487  }
488 
489  /*
490  * If replication is on, we do a PREPARE on the resource managers, push the
491  * Transaction message across the replication stream, and then COMMIT if the
492  * replication stream returned successfully.
493  */
494  if (shouldConstructMessages())
495  {
496  BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
497  {
498  if (error)
499  break;
500  /*
501  Do not call two-phase commit if this particular
502  transaction is read-only. This allows for simpler
503  implementation in engines that are always read-only.
504  */
505  if (! resource_context->hasModifiedData())
506  continue;
507 
508  plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
509 
510  if (resource->participatesInXaTransaction())
511  {
512  if (int err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction))
513  {
514  my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
515  error= 1;
516  }
517  else
518  {
519  session.status_var.ha_prepare_count++;
520  }
521  }
522  }
523  if (error == 0 && is_real_trans)
524  {
525  /*
526  * Push the constructed Transaction messages across to
527  * replicators and appliers.
528  */
529  error= commitTransactionMessage(session);
530  }
531  if (error)
532  {
533  rollbackTransaction(session, normal_transaction);
534  error= 1;
535  goto end;
536  }
537  }
538  error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
539 end:
540  if (is_real_trans)
541  session.startWaitingGlobalReadLock();
542  }
543  return error;
544 }
545 
550 int TransactionServices::commitPhaseOne(Session& session,
551  bool normal_transaction)
552 {
553  int error=0;
554  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
555  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
556 
557  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
558  bool all= normal_transaction;
559 
560  /* If we're in autocommit then we have a real transaction to commit
561  (except if it's BEGIN)
562  */
563  if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
564  all= true;
565 
566  if (resource_contexts.empty() == false)
567  {
568  BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
569  {
570  plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
571 
572  if (resource->participatesInXaTransaction())
573  {
574  if (int err= resource_context->getXaResourceManager()->xaCommit(&session, all))
575  {
576  my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
577  error= 1;
578  }
579  else if (normal_transaction)
580  {
581  session.status_var.ha_commit_count++;
582  }
583  }
584  else if (resource->participatesInSqlTransaction())
585  {
586  if (int err= resource_context->getTransactionalStorageEngine()->commit(&session, all))
587  {
588  my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
589  error= 1;
590  }
591  else if (normal_transaction)
592  {
593  session.status_var.ha_commit_count++;
594  }
595  }
596  resource_context->reset(); /* keep it conveniently zero-filled */
597  }
598 
599  if (is_real_trans)
600  session.transaction.xid_state.xid.set_null();
601 
602  if (normal_transaction)
603  {
604  session.variables.tx_isolation= session.session_tx_isolation;
605  session.transaction.cleanup();
606  }
607  }
608  trans->reset();
609  return error;
610 }
611 
612 int TransactionServices::rollbackTransaction(Session& session,
613  bool normal_transaction)
614 {
615  int error= 0;
616  TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
617  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
618 
619  bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
620  bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
621 
622  /*
623  We must not rollback the normal transaction if a statement
624  transaction is pending.
625  */
626  assert(session.transaction.stmt.getResourceContexts().empty() || trans == &session.transaction.stmt);
627 
628  if (resource_contexts.empty() == false)
629  {
630  BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
631  {
632  plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
633 
634  if (resource->participatesInXaTransaction())
635  {
636  if (int err= resource_context->getXaResourceManager()->xaRollback(&session, all))
637  {
638  my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
639  error= 1;
640  }
641  else if (normal_transaction)
642  {
643  session.status_var.ha_rollback_count++;
644  }
645  }
646  else if (resource->participatesInSqlTransaction())
647  {
648  if (int err= resource_context->getTransactionalStorageEngine()->rollback(&session, all))
649  {
650  my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
651  error= 1;
652  }
653  else if (normal_transaction)
654  {
655  session.status_var.ha_rollback_count++;
656  }
657  }
658  resource_context->reset(); /* keep it conveniently zero-filled */
659  }
660 
661  /*
662  * We need to signal the ROLLBACK to ReplicationServices here
663  * BEFORE we set the transaction ID to NULL. This is because
664  * if a bulk segment was sent to replicators, we need to send
665  * a rollback statement with the corresponding transaction ID
666  * to rollback.
667  */
668  if (all)
669  rollbackTransactionMessage(session);
670  else
671  rollbackStatementMessage(session);
672 
673  if (is_real_trans)
674  session.transaction.xid_state.xid.set_null();
675  if (normal_transaction)
676  {
677  session.variables.tx_isolation=session.session_tx_isolation;
678  session.transaction.cleanup();
679  }
680  }
681  if (normal_transaction)
682  session.transaction_rollback_request= false;
683 
684  /*
685  * If a non-transactional table was updated, warn the user
686  */
687  if (is_real_trans &&
688  session.transaction.all.hasModifiedNonTransData() &&
689  session.getKilled() != Session::KILL_CONNECTION)
690  {
691  push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
692  ER_WARNING_NOT_COMPLETE_ROLLBACK,
693  ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
694  }
695  trans->reset();
696  return error;
697 }
698 
699 int TransactionServices::autocommitOrRollback(Session& session,
700  int error)
701 {
702  /* One GPB Statement message per SQL statement */
703  message::Statement *statement= session.getStatementMessage();
704  if ((statement != NULL) && (! error))
705  finalizeStatementMessage(*statement, session);
706 
707  if (session.transaction.stmt.getResourceContexts().empty() == false)
708  {
709  TransactionContext *trans = &session.transaction.stmt;
710  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
711  BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
712  {
713  resource_context->getTransactionalStorageEngine()->endStatement(&session);
714  }
715 
716  if (! error)
717  {
718  if (commitTransaction(session, false))
719  error= 1;
720  }
721  else
722  {
723  (void) rollbackTransaction(session, false);
724  if (session.transaction_rollback_request)
725  {
726  (void) rollbackTransaction(session, true);
727  session.server_status&= ~SERVER_STATUS_IN_TRANS;
728  }
729  }
730 
731  session.variables.tx_isolation= session.session_tx_isolation;
732  }
733 
734  return error;
735 }
736 
737 struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
738 {
739  result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
740  {
741  /* The below is perfectly fine, since we're simply comparing addresses for the underlying
742  * resources aren't the same... */
743  return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
744  }
745 };
746 
747 int TransactionServices::rollbackToSavepoint(Session& session,
748  NamedSavepoint &sv)
749 {
750  int error= 0;
751  TransactionContext *trans= &session.transaction.all;
752  TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
753  TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
754 
755  trans->no_2pc= false;
756  /*
757  rolling back to savepoint in all storage engines that were part of the
758  transaction when the savepoint was set
759  */
760  BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, sv_resource_contexts)
761  {
762  plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
763 
764  if (resource->participatesInSqlTransaction())
765  {
766  if (int err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv))
767  {
768  my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
769  error= 1;
770  }
771  else
772  {
773  session.status_var.ha_savepoint_rollback_count++;
774  }
775  }
776  trans->no_2pc|= not resource->participatesInXaTransaction();
777  }
778  /*
779  rolling back the transaction in all storage engines that were not part of
780  the transaction when the savepoint was set
781  */
782  {
783  TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
784  TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
785  TransactionContext::ResourceContexts set_difference_contexts;
786 
787  /*
788  * Bug #542299: segfault during set_difference() below. copy<>() requires pre-allocation
789  * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
790  * here
791  */
792  set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
793 
794  sort(sorted_tran_resource_contexts.begin(),
795  sorted_tran_resource_contexts.end(),
796  ResourceContextCompare());
797  sort(sorted_sv_resource_contexts.begin(),
798  sorted_sv_resource_contexts.end(),
799  ResourceContextCompare());
800  set_difference(sorted_tran_resource_contexts.begin(),
801  sorted_tran_resource_contexts.end(),
802  sorted_sv_resource_contexts.begin(),
803  sorted_sv_resource_contexts.end(),
804  set_difference_contexts.begin(),
805  ResourceContextCompare());
806  /*
807  * set_difference_contexts now contains all resource contexts
808  * which are in the transaction context but were NOT in the
809  * savepoint's resource contexts.
810  */
811 
812  BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, set_difference_contexts)
813  {
814  plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
815 
816  if (resource->participatesInSqlTransaction())
817  {
818  if (int err= resource_context->getTransactionalStorageEngine()->rollback(&session, true))
819  {
820  my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
821  error= 1;
822  }
823  else
824  {
825  session.status_var.ha_rollback_count++;
826  }
827  }
828  resource_context->reset(); /* keep it conveniently zero-filled */
829  }
830  }
831  trans->setResourceContexts(sv_resource_contexts);
832 
833  if (shouldConstructMessages())
834  {
835  cleanupTransactionMessage(getActiveTransactionMessage(session), session);
836  message::Transaction *savepoint_transaction= sv.getTransactionMessage();
837  if (savepoint_transaction != NULL)
838  {
839  /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup.
840  Upon commit the savepoint_transaction_copy will be cleaned up by a call to
841  cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
842  up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
843  */
844  message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
845  uint32_t num_statements = savepoint_transaction_copy->statement_size();
846  if (num_statements == 0)
847  {
848  session.setStatementMessage(NULL);
849  }
850  else
851  {
852  session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
853  }
854  session.setTransactionMessage(savepoint_transaction_copy);
855  }
856  }
857 
858  return error;
859 }
860 
867 int TransactionServices::setSavepoint(Session& session,
868  NamedSavepoint &sv)
869 {
870  int error= 0;
871  TransactionContext *trans= &session.transaction.all;
872  TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
873 
874  if (resource_contexts.empty() == false)
875  {
876  BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
877  {
878  plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
879 
880  if (resource->participatesInSqlTransaction())
881  {
882  if (int err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv))
883  {
884  my_error(ER_GET_ERRNO, MYF(0), err);
885  error= 1;
886  }
887  else
888  {
889  session.status_var.ha_savepoint_count++;
890  }
891  }
892  }
893  }
894  /*
895  Remember the list of registered storage engines.
896  */
897  sv.setResourceContexts(resource_contexts);
898 
899  if (shouldConstructMessages())
900  {
901  message::Transaction *transaction= session.getTransactionMessage();
902 
903  if (transaction != NULL)
904  {
905  message::Transaction *transaction_savepoint=
906  new message::Transaction(*transaction);
907  sv.setTransactionMessage(transaction_savepoint);
908  }
909  }
910 
911  return error;
912 }
913 
914 int TransactionServices::releaseSavepoint(Session& session,
915  NamedSavepoint &sv)
916 {
917  int error= 0;
918 
919  TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
920 
921  BOOST_FOREACH(TransactionContext::ResourceContexts::reference resource_context, resource_contexts)
922  {
923  plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
924 
925  if (resource->participatesInSqlTransaction())
926  {
927  if (int err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv))
928  {
929  my_error(ER_GET_ERRNO, MYF(0), err);
930  error= 1;
931  }
932  }
933  }
934 
935  return error;
936 }
937 
938 bool TransactionServices::shouldConstructMessages()
939 {
940  return ReplicationServices::isActive();
941 }
942 
943 message::Transaction *TransactionServices::getActiveTransactionMessage(Session& session,
944  bool should_inc_trx_id)
945 {
946  message::Transaction *transaction= session.getTransactionMessage();
947 
948  if (unlikely(transaction == NULL))
949  {
950  /*
951  * Allocate and initialize a new transaction message
952  * for this Session object. Session is responsible for
953  * deleting transaction message when done with it.
954  */
955  transaction= new message::Transaction();
956  initTransactionMessage(*transaction, session, should_inc_trx_id);
957  session.setTransactionMessage(transaction);
958  }
959  return transaction;
960 }
961 
962 void TransactionServices::initTransactionMessage(message::Transaction &transaction,
963  Session& session,
964  bool should_inc_trx_id)
965 {
966  message::TransactionContext *trx= transaction.mutable_transaction_context();
967  trx->set_server_id(session.getServerId());
968 
969  if (should_inc_trx_id)
970  {
971  trx->set_transaction_id(getCurrentTransactionId(session));
972  session.setXaId(0);
973  }
974  else
975  {
976  /* trx and seg id will get set properly elsewhere */
977  trx->set_transaction_id(0);
978  }
979 
980  trx->set_start_timestamp(session.times.getCurrentTimestamp());
981 
982  /* segment info may get set elsewhere as needed */
983  transaction.set_segment_id(1);
984  transaction.set_end_segment(true);
985 }
986 
987 void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
988  const Session& session)
989 {
990  message::TransactionContext *trx= transaction.mutable_transaction_context();
991  trx->set_end_timestamp(session.times.getCurrentTimestamp());
992 }
993 
994 void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
995  Session& session)
996 {
997  delete transaction;
998  session.setStatementMessage(NULL);
999  session.setTransactionMessage(NULL);
1000  session.setXaId(0);
1001 }
1002 
1003 int TransactionServices::commitTransactionMessage(Session& session)
1004 {
1005  if (! ReplicationServices::isActive())
1006  return 0;
1007 
1008  /*
1009  * If no Transaction message was ever created, then no data modification
1010  * occurred inside the transaction, so nothing to do.
1011  */
1012  if (session.getTransactionMessage() == NULL)
1013  return 0;
1014 
1015  /* If there is an active statement message, finalize it. */
1016  message::Statement *statement= session.getStatementMessage();
1017 
1018  if (statement != NULL)
1019  {
1020  finalizeStatementMessage(*statement, session);
1021  }
1022 
1023  message::Transaction* transaction= getActiveTransactionMessage(session);
1024 
1025  /*
1026  * It is possible that we could have a Transaction without any Statements
1027  * if we had created a Statement but had to roll it back due to it failing
1028  * mid-execution, and no subsequent Statements were added to the Transaction
1029  * message. In this case, we simply clean up the message and not push it.
1030  */
1031  if (transaction->statement_size() == 0)
1032  {
1033  cleanupTransactionMessage(transaction, session);
1034  return 0;
1035  }
1036 
1037  finalizeTransactionMessage(*transaction, session);
1038 
1039  plugin::ReplicationReturnCode result= ReplicationServices::pushTransactionMessage(session, *transaction);
1040 
1041  cleanupTransactionMessage(transaction, session);
1042 
1043  return static_cast<int>(result);
1044 }
1045 
1046 void TransactionServices::initStatementMessage(message::Statement &statement,
1047  message::Statement::Type type,
1048  const Session& session)
1049 {
1050  statement.set_type(type);
1051  statement.set_start_timestamp(session.times.getCurrentTimestamp());
1052 
1053  if (session.variables.replicate_query)
1054  statement.set_sql(session.getQueryString()->c_str());
1055 }
1056 
1057 void TransactionServices::finalizeStatementMessage(message::Statement &statement,
1058  Session& session)
1059 {
1060  statement.set_end_timestamp(session.times.getCurrentTimestamp());
1061  session.setStatementMessage(NULL);
1062 }
1063 
1064 void TransactionServices::rollbackTransactionMessage(Session& session)
1065 {
1066  if (! ReplicationServices::isActive())
1067  return;
1068 
1069  message::Transaction *transaction= getActiveTransactionMessage(session);
1070 
1071  /*
1072  * OK, so there are two situations that we need to deal with here:
1073  *
1074  * 1) We receive an instruction to ROLLBACK the current transaction
1075  * and the currently-stored Transaction message is *self-contained*,
1076  * meaning that no Statement messages in the Transaction message
1077  * contain a message having its segment_id member greater than 1. If
1078  * no non-segment ID 1 members are found, we can simply clear the
1079  * current Transaction message and remove it from memory.
1080  *
1081  * 2) If the Transaction message does indeed have a non-end segment, that
1082  * means that a bulk update/delete/insert Transaction message segment
1083  * has previously been sent over the wire to replicators. In this case,
1084  * we need to package a Transaction with a Statement message of type
1085  * ROLLBACK to indicate to replicators that previously-transmitted
1086  * messages must be un-applied.
1087  */
1088  if (unlikely(message::transactionContainsBulkSegment(*transaction)))
1089  {
1090  /* Remember the transaction ID so we can re-use it */
1091  uint64_t trx_id= transaction->transaction_context().transaction_id();
1092  uint32_t seg_id= transaction->segment_id();
1093 
1094  /*
1095  * Clear the transaction, create a Rollback statement message,
1096  * attach it to the transaction, and push it to replicators.
1097  */
1098  transaction->Clear();
1099  initTransactionMessage(*transaction, session, false);
1100 
1101  /* Set the transaction ID to match the previous messages */
1102  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1103  transaction->set_segment_id(seg_id);
1104  transaction->set_end_segment(true);
1105 
1106  message::Statement *statement= transaction->add_statement();
1107 
1108  initStatementMessage(*statement, message::Statement::ROLLBACK, session);
1109  finalizeStatementMessage(*statement, session);
1110 
1111  finalizeTransactionMessage(*transaction, session);
1112 
1113  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1114  }
1115 
1116  cleanupTransactionMessage(transaction, session);
1117 }
1118 
1119 void TransactionServices::rollbackStatementMessage(Session& session)
1120 {
1121  if (! ReplicationServices::isActive())
1122  return;
1123 
1124  message::Statement *current_statement= session.getStatementMessage();
1125 
1126  /* If we never added a Statement message, nothing to undo. */
1127  if (current_statement == NULL)
1128  return;
1129 
1130  /*
1131  * If the Statement has been segmented, then we've already pushed a portion
1132  * of this Statement's row changes through the replication stream and we
1133  * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
1134  * delete the current Statement message.
1135  */
1136  bool is_segmented= false;
1137 
1138  switch (current_statement->type())
1139  {
1140  case message::Statement::INSERT:
1141  if (current_statement->insert_data().segment_id() > 1)
1142  is_segmented= true;
1143  break;
1144 
1145  case message::Statement::UPDATE:
1146  if (current_statement->update_data().segment_id() > 1)
1147  is_segmented= true;
1148  break;
1149 
1150  case message::Statement::DELETE:
1151  if (current_statement->delete_data().segment_id() > 1)
1152  is_segmented= true;
1153  break;
1154 
1155  default:
1156  break;
1157  }
1158 
1159  /*
1160  * Remove the Statement message we've been working with (same as
1161  * current_statement).
1162  */
1163  message::Transaction *transaction= getActiveTransactionMessage(session);
1164  google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
1165  statements_in_txn= transaction->mutable_statement();
1166  statements_in_txn->RemoveLast();
1167  session.setStatementMessage(NULL);
1168 
1169  /*
1170  * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
1171  * an indicator to cancel the previous Statement message which should have
1172  * had its end_segment attribute set to false.
1173  */
1174  if (is_segmented)
1175  {
1176  current_statement= transaction->add_statement();
1177  initStatementMessage(*current_statement,
1178  message::Statement::ROLLBACK_STATEMENT,
1179  session);
1180  finalizeStatementMessage(*current_statement, session);
1181  }
1182 }
1183 
1184 message::Transaction *TransactionServices::segmentTransactionMessage(Session& session,
1185  message::Transaction *transaction)
1186 {
1187  uint64_t trx_id= transaction->transaction_context().transaction_id();
1188  uint32_t seg_id= transaction->segment_id();
1189 
1190  transaction->set_end_segment(false);
1191  commitTransactionMessage(session);
1192  transaction= getActiveTransactionMessage(session, false);
1193 
1194  /* Set the transaction ID to match the previous messages */
1195  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1196  transaction->set_segment_id(seg_id + 1);
1197  transaction->set_end_segment(true);
1198 
1199  return transaction;
1200 }
1201 
1202 message::Statement &TransactionServices::getInsertStatement(Session& session,
1203  Table &table,
1204  uint32_t *next_segment_id)
1205 {
1206  message::Statement *statement= session.getStatementMessage();
1207  message::Transaction *transaction= NULL;
1208 
1209  /*
1210  * If statement is NULL, this is a new statement.
1211  * If statement is NOT NULL, this a continuation of the same statement.
1212  * This is because autocommitOrRollback() finalizes the statement so that
1213  * we guarantee only one Statement message per statement (i.e., we no longer
1214  * share a single GPB message for multiple statements).
1215  */
1216  if (statement == NULL)
1217  {
1218  transaction= getActiveTransactionMessage(session);
1219 
1220  if (static_cast<size_t>(transaction->ByteSize()) >=
1221  transaction_message_threshold)
1222  {
1223  transaction= segmentTransactionMessage(session, transaction);
1224  }
1225 
1226  statement= transaction->add_statement();
1227  setInsertHeader(*statement, session, table);
1228  session.setStatementMessage(statement);
1229  }
1230  else
1231  {
1232  transaction= getActiveTransactionMessage(session);
1233 
1234  /*
1235  * If we've passed our threshold for the statement size (possible for
1236  * a bulk insert), we'll finalize the Statement and Transaction (doing
1237  * the Transaction will keep it from getting huge).
1238  */
1239  if (static_cast<size_t>(transaction->ByteSize()) >=
1240  transaction_message_threshold)
1241  {
1242  /* Remember the transaction ID so we can re-use it */
1243  uint64_t trx_id= transaction->transaction_context().transaction_id();
1244  uint32_t seg_id= transaction->segment_id();
1245 
1246  message::InsertData *current_data= statement->mutable_insert_data();
1247 
1248  /* Caller should use this value when adding a new record */
1249  *next_segment_id= current_data->segment_id() + 1;
1250 
1251  current_data->set_end_segment(false);
1252  transaction->set_end_segment(false);
1253 
1254  /*
1255  * Send the trx message to replicators after finalizing the
1256  * statement and transaction. This will also set the Transaction
1257  * and Statement objects in Session to NULL.
1258  */
1259  commitTransactionMessage(session);
1260 
1261  /*
1262  * Statement and Transaction should now be NULL, so new ones will get
1263  * created. We reuse the transaction id since we are segmenting
1264  * one transaction.
1265  */
1266  transaction= getActiveTransactionMessage(session, false);
1267  assert(transaction != NULL);
1268 
1269  statement= transaction->add_statement();
1270  setInsertHeader(*statement, session, table);
1271  session.setStatementMessage(statement);
1272 
1273  /* Set the transaction ID to match the previous messages */
1274  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1275  transaction->set_segment_id(seg_id + 1);
1276  transaction->set_end_segment(true);
1277  }
1278  else
1279  {
1280  /*
1281  * Continuation of the same statement. Carry forward the existing
1282  * segment id.
1283  */
1284  const message::InsertData &current_data= statement->insert_data();
1285  *next_segment_id= current_data.segment_id();
1286  }
1287  }
1288 
1289  return *statement;
1290 }
1291 
1292 void TransactionServices::setInsertHeader(message::Statement &statement,
1293  const Session& session,
1294  Table &table)
1295 {
1296  initStatementMessage(statement, message::Statement::INSERT, session);
1297 
1298  /*
1299  * Now we construct the specialized InsertHeader message inside
1300  * the generalized message::Statement container...
1301  */
1302  /* Set up the insert header */
1303  message::InsertHeader *header= statement.mutable_insert_header();
1304  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1305 
1306  table_metadata->set_schema_name(table.getShare()->getSchemaName());
1307  table_metadata->set_table_name(table.getShare()->getTableName());
1308 
1309  Field **table_fields= table.getFields();
1310 
1311  message::FieldMetadata *field_metadata;
1312 
1313  /* We will read all the table's fields... */
1314  table.setReadSet();
1315 
1316  while (Field* current_field= *table_fields++)
1317  {
1318  field_metadata= header->add_field_metadata();
1319  field_metadata->set_name(current_field->field_name);
1320  field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1321  }
1322 }
1323 
1324 bool TransactionServices::insertRecord(Session& session,
1325  Table &table)
1326 {
1327  if (! ReplicationServices::isActive())
1328  return false;
1329 
1330  if (not table.getShare()->is_replicated())
1331  return false;
1332 
1341  if (not table.getShare()->hasPrimaryKey())
1342  {
1343  my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
1344  return true;
1345  }
1346 
1347  uint32_t next_segment_id= 1;
1348  message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
1349 
1350  message::InsertData *data= statement.mutable_insert_data();
1351  data->set_segment_id(next_segment_id);
1352  data->set_end_segment(true);
1353  message::InsertRecord *record= data->add_record();
1354 
1355  Field *current_field;
1356  Field **table_fields= table.getFields();
1357 
1358  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1359  string_value->set_charset(system_charset_info);
1360 
1361  /* We will read all the table's fields... */
1362  table.setReadSet();
1363 
1364  while ((current_field= *table_fields++) != NULL)
1365  {
1366  if (current_field->is_null())
1367  {
1368  record->add_is_null(true);
1369  record->add_insert_value("", 0);
1370  }
1371  else
1372  {
1373  string_value= current_field->val_str_internal(string_value);
1374  record->add_is_null(false);
1375  record->add_insert_value(string_value->c_ptr(), string_value->length());
1376  string_value->free();
1377  }
1378  }
1379  return false;
1380 }
1381 
1382 message::Statement &TransactionServices::getUpdateStatement(Session& session,
1383  Table &table,
1384  const unsigned char *old_record,
1385  const unsigned char *new_record,
1386  uint32_t *next_segment_id)
1387 {
1388  message::Statement *statement= session.getStatementMessage();
1389  message::Transaction *transaction= NULL;
1390 
1391  /*
1392  * If statement is NULL, this is a new statement.
1393  * If statement is NOT NULL, this a continuation of the same statement.
1394  * This is because autocommitOrRollback() finalizes the statement so that
1395  * we guarantee only one Statement message per statement (i.e., we no longer
1396  * share a single GPB message for multiple statements).
1397  */
1398  if (statement == NULL)
1399  {
1400  transaction= getActiveTransactionMessage(session);
1401 
1402  if (static_cast<size_t>(transaction->ByteSize()) >=
1403  transaction_message_threshold)
1404  {
1405  transaction= segmentTransactionMessage(session, transaction);
1406  }
1407 
1408  statement= transaction->add_statement();
1409  setUpdateHeader(*statement, session, table, old_record, new_record);
1410  session.setStatementMessage(statement);
1411  }
1412  else
1413  {
1414  transaction= getActiveTransactionMessage(session);
1415 
1416  /*
1417  * If we've passed our threshold for the statement size (possible for
1418  * a bulk insert), we'll finalize the Statement and Transaction (doing
1419  * the Transaction will keep it from getting huge).
1420  */
1421  if (static_cast<size_t>(transaction->ByteSize()) >=
1422  transaction_message_threshold)
1423  {
1424  /* Remember the transaction ID so we can re-use it */
1425  uint64_t trx_id= transaction->transaction_context().transaction_id();
1426  uint32_t seg_id= transaction->segment_id();
1427 
1428  message::UpdateData *current_data= statement->mutable_update_data();
1429 
1430  /* Caller should use this value when adding a new record */
1431  *next_segment_id= current_data->segment_id() + 1;
1432 
1433  current_data->set_end_segment(false);
1434  transaction->set_end_segment(false);
1435 
1436  /*
1437  * Send the trx message to replicators after finalizing the
1438  * statement and transaction. This will also set the Transaction
1439  * and Statement objects in Session to NULL.
1440  */
1441  commitTransactionMessage(session);
1442 
1443  /*
1444  * Statement and Transaction should now be NULL, so new ones will get
1445  * created. We reuse the transaction id since we are segmenting
1446  * one transaction.
1447  */
1448  transaction= getActiveTransactionMessage(session, false);
1449  assert(transaction != NULL);
1450 
1451  statement= transaction->add_statement();
1452  setUpdateHeader(*statement, session, table, old_record, new_record);
1453  session.setStatementMessage(statement);
1454 
1455  /* Set the transaction ID to match the previous messages */
1456  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1457  transaction->set_segment_id(seg_id + 1);
1458  transaction->set_end_segment(true);
1459  }
1460  else
1461  {
1462  /*
1463  * Continuation of the same statement. Carry forward the existing
1464  * segment id.
1465  */
1466  const message::UpdateData &current_data= statement->update_data();
1467  *next_segment_id= current_data.segment_id();
1468  }
1469  }
1470 
1471  return *statement;
1472 }
1473 
1474 void TransactionServices::setUpdateHeader(message::Statement &statement,
1475  const Session& session,
1476  Table &table,
1477  const unsigned char *old_record,
1478  const unsigned char *new_record)
1479 {
1480  initStatementMessage(statement, message::Statement::UPDATE, session);
1481 
1482  /*
1483  * Now we construct the specialized UpdateHeader message inside
1484  * the generalized message::Statement container...
1485  */
1486  /* Set up the update header */
1487  message::UpdateHeader *header= statement.mutable_update_header();
1488  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1489 
1490  table_metadata->set_schema_name(table.getShare()->getSchemaName());
1491  table_metadata->set_table_name(table.getShare()->getTableName());
1492 
1493  Field *current_field;
1494  Field **table_fields= table.getFields();
1495 
1496  message::FieldMetadata *field_metadata;
1497 
1498  /* We will read all the table's fields... */
1499  table.setReadSet();
1500 
1501  while ((current_field= *table_fields++) != NULL)
1502  {
1503  /*
1504  * We add the "key field metadata" -- i.e. the fields which is
1505  * the primary key for the table.
1506  */
1507  if (table.getShare()->fieldInPrimaryKey(current_field))
1508  {
1509  field_metadata= header->add_key_field_metadata();
1510  field_metadata->set_name(current_field->field_name);
1511  field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1512  }
1513 
1514  if (isFieldUpdated(current_field, table, old_record, new_record))
1515  {
1516  /* Field is changed from old to new */
1517  field_metadata= header->add_set_field_metadata();
1518  field_metadata->set_name(current_field->field_name);
1519  field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1520  }
1521  }
1522 }
1523 
1524 void TransactionServices::updateRecord(Session& session,
1525  Table &table,
1526  const unsigned char *old_record,
1527  const unsigned char *new_record)
1528 {
1529  if (! ReplicationServices::isActive())
1530  return;
1531 
1532  if (not table.getShare()->is_replicated())
1533  return;
1534 
1535  uint32_t next_segment_id= 1;
1536  message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
1537 
1538  message::UpdateData *data= statement.mutable_update_data();
1539  data->set_segment_id(next_segment_id);
1540  data->set_end_segment(true);
1541  message::UpdateRecord *record= data->add_record();
1542 
1543  Field *current_field;
1544  Field **table_fields= table.getFields();
1545  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1546  string_value->set_charset(system_charset_info);
1547 
1548  while ((current_field= *table_fields++) != NULL)
1549  {
1550  /*
1551  * Here, we add the SET field values. We used to do this in the setUpdateHeader() method,
1552  * but then realized that an UPDATE statement could potentially have different values for
1553  * the SET field. For instance, imagine this SQL scenario:
1554  *
1555  * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
1556  * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
1557  * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
1558  *
1559  * We will generate two UpdateRecord messages with different set_value byte arrays.
1560  */
1561  if (isFieldUpdated(current_field, table, old_record, new_record))
1562  {
1563  /* Store the original "read bit" for this field */
1564  bool is_read_set= current_field->isReadSet();
1565 
1566  /* We need to mark that we will "read" this field... */
1567  table.setReadSet(current_field->position());
1568 
1569  /* Read the string value of this field's contents */
1570  string_value= current_field->val_str_internal(string_value);
1571 
1572  /*
1573  * Reset the read bit after reading field to its original state. This
1574  * prevents the field from being included in the WHERE clause
1575  */
1576  current_field->setReadSet(is_read_set);
1577 
1578  if (current_field->is_null())
1579  {
1580  record->add_is_null(true);
1581  record->add_after_value("", 0);
1582  }
1583  else
1584  {
1585  record->add_is_null(false);
1586  record->add_after_value(string_value->c_ptr(), string_value->length());
1587  }
1588  string_value->free();
1589  }
1590 
1591  /*
1592  * Add the WHERE clause values now...for now, this means the
1593  * primary key field value. Replication only supports tables
1594  * with a primary key.
1595  */
1596  if (table.getShare()->fieldInPrimaryKey(current_field))
1597  {
1603  string_value= current_field->val_str_internal(string_value,
1604  old_record +
1605  current_field->offset(const_cast<unsigned char *>(new_record)));
1606  record->add_key_value(string_value->c_ptr(), string_value->length());
1607  string_value->free();
1608  }
1609 
1610  }
1611 }
1612 
1613 bool TransactionServices::isFieldUpdated(Field *current_field,
1614  Table &table,
1615  const unsigned char *old_record,
1616  const unsigned char *new_record)
1617 {
1618  /*
1619  * The below really should be moved into the Field API and Record API. But for now
1620  * we do this crazy pointer fiddling to figure out if the current field
1621  * has been updated in the supplied record raw byte pointers.
1622  */
1623  const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1624  const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
1625 
1626  uint32_t field_length= current_field->pack_length();
1628  bool old_value_is_null= current_field->is_null_in_record(old_record);
1629  bool new_value_is_null= current_field->is_null_in_record(new_record);
1630 
1631  bool isUpdated= false;
1632  if (old_value_is_null != new_value_is_null)
1633  {
1634  if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
1635  {
1636  isUpdated= true;
1637  }
1638  else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
1639  {
1640  isUpdated= true;
1641  }
1642  }
1643 
1644  if (! isUpdated)
1645  {
1646  if (memcmp(old_ptr, new_ptr, field_length) != 0)
1647  {
1648  isUpdated= true;
1649  }
1650  }
1651  return isUpdated;
1652 }
1653 
1654 message::Statement &TransactionServices::getDeleteStatement(Session& session,
1655  Table &table,
1656  uint32_t *next_segment_id)
1657 {
1658  message::Statement *statement= session.getStatementMessage();
1659  message::Transaction *transaction= NULL;
1660 
1661  /*
1662  * If statement is NULL, this is a new statement.
1663  * If statement is NOT NULL, this a continuation of the same statement.
1664  * This is because autocommitOrRollback() finalizes the statement so that
1665  * we guarantee only one Statement message per statement (i.e., we no longer
1666  * share a single GPB message for multiple statements).
1667  */
1668  if (statement == NULL)
1669  {
1670  transaction= getActiveTransactionMessage(session);
1671 
1672  if (static_cast<size_t>(transaction->ByteSize()) >=
1673  transaction_message_threshold)
1674  {
1675  transaction= segmentTransactionMessage(session, transaction);
1676  }
1677 
1678  statement= transaction->add_statement();
1679  setDeleteHeader(*statement, session, table);
1680  session.setStatementMessage(statement);
1681  }
1682  else
1683  {
1684  transaction= getActiveTransactionMessage(session);
1685 
1686  /*
1687  * If we've passed our threshold for the statement size (possible for
1688  * a bulk insert), we'll finalize the Statement and Transaction (doing
1689  * the Transaction will keep it from getting huge).
1690  */
1691  if (static_cast<size_t>(transaction->ByteSize()) >=
1692  transaction_message_threshold)
1693  {
1694  /* Remember the transaction ID so we can re-use it */
1695  uint64_t trx_id= transaction->transaction_context().transaction_id();
1696  uint32_t seg_id= transaction->segment_id();
1697 
1698  message::DeleteData *current_data= statement->mutable_delete_data();
1699 
1700  /* Caller should use this value when adding a new record */
1701  *next_segment_id= current_data->segment_id() + 1;
1702 
1703  current_data->set_end_segment(false);
1704  transaction->set_end_segment(false);
1705 
1706  /*
1707  * Send the trx message to replicators after finalizing the
1708  * statement and transaction. This will also set the Transaction
1709  * and Statement objects in Session to NULL.
1710  */
1711  commitTransactionMessage(session);
1712 
1713  /*
1714  * Statement and Transaction should now be NULL, so new ones will get
1715  * created. We reuse the transaction id since we are segmenting
1716  * one transaction.
1717  */
1718  transaction= getActiveTransactionMessage(session, false);
1719  assert(transaction != NULL);
1720 
1721  statement= transaction->add_statement();
1722  setDeleteHeader(*statement, session, table);
1723  session.setStatementMessage(statement);
1724 
1725  /* Set the transaction ID to match the previous messages */
1726  transaction->mutable_transaction_context()->set_transaction_id(trx_id);
1727  transaction->set_segment_id(seg_id + 1);
1728  transaction->set_end_segment(true);
1729  }
1730  else
1731  {
1732  /*
1733  * Continuation of the same statement. Carry forward the existing
1734  * segment id.
1735  */
1736  const message::DeleteData &current_data= statement->delete_data();
1737  *next_segment_id= current_data.segment_id();
1738  }
1739  }
1740 
1741  return *statement;
1742 }
1743 
1744 void TransactionServices::setDeleteHeader(message::Statement &statement,
1745  const Session& session,
1746  Table &table)
1747 {
1748  initStatementMessage(statement, message::Statement::DELETE, session);
1749 
1750  /*
1751  * Now we construct the specialized DeleteHeader message inside
1752  * the generalized message::Statement container...
1753  */
1754  message::DeleteHeader *header= statement.mutable_delete_header();
1755  message::TableMetadata *table_metadata= header->mutable_table_metadata();
1756 
1757  table_metadata->set_schema_name(table.getShare()->getSchemaName());
1758  table_metadata->set_table_name(table.getShare()->getTableName());
1759 
1760  Field *current_field;
1761  Field **table_fields= table.getFields();
1762 
1763  message::FieldMetadata *field_metadata;
1764 
1765  while ((current_field= *table_fields++) != NULL)
1766  {
1767  /*
1768  * Add the WHERE clause values now...for now, this means the
1769  * primary key field value. Replication only supports tables
1770  * with a primary key.
1771  */
1772  if (table.getShare()->fieldInPrimaryKey(current_field))
1773  {
1774  field_metadata= header->add_key_field_metadata();
1775  field_metadata->set_name(current_field->field_name);
1776  field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
1777  }
1778  }
1779 }
1780 
1781 void TransactionServices::deleteRecord(Session& session,
1782  Table &table,
1783  bool use_update_record)
1784 {
1785  if (! ReplicationServices::isActive())
1786  return;
1787 
1788  if (not table.getShare()->is_replicated())
1789  return;
1790 
1791  uint32_t next_segment_id= 1;
1792  message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
1793 
1794  message::DeleteData *data= statement.mutable_delete_data();
1795  data->set_segment_id(next_segment_id);
1796  data->set_end_segment(true);
1797  message::DeleteRecord *record= data->add_record();
1798 
1799  Field *current_field;
1800  Field **table_fields= table.getFields();
1801  String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
1802  string_value->set_charset(system_charset_info);
1803 
1804  while ((current_field= *table_fields++) != NULL)
1805  {
1806  /*
1807  * Add the WHERE clause values now...for now, this means the
1808  * primary key field value. Replication only supports tables
1809  * with a primary key.
1810  */
1811  if (table.getShare()->fieldInPrimaryKey(current_field))
1812  {
1813  if (use_update_record)
1814  {
1815  /*
1816  * Temporarily point to the update record to get its value.
1817  * This is pretty much a hack in order to get the PK value from
1818  * the update record rather than the insert record. Field::val_str()
1819  * should not change anything in Field::ptr, so this should be safe.
1820  * We are careful not to change anything in old_ptr.
1821  */
1822  const unsigned char *old_ptr= current_field->ptr;
1823  current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
1824  string_value= current_field->val_str_internal(string_value);
1825  current_field->ptr= const_cast<unsigned char *>(old_ptr);
1826  }
1827  else
1828  {
1829  string_value= current_field->val_str_internal(string_value);
1833  }
1834  record->add_key_value(string_value->c_ptr(), string_value->length());
1835  string_value->free();
1836  }
1837  }
1838 }
1839 
1840 void TransactionServices::createTable(Session& session,
1841  const message::Table &table)
1842 {
1843  if (not ReplicationServices::isActive())
1844  return;
1845 
1846  if (not message::is_replicated(table))
1847  return;
1848 
1849  message::Transaction *transaction= getActiveTransactionMessage(session);
1850  message::Statement *statement= transaction->add_statement();
1851 
1852  initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
1853 
1854  /*
1855  * Construct the specialized CreateTableStatement message and attach
1856  * it to the generic Statement message
1857  */
1858  message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
1859  message::Table *new_table_message= create_table_statement->mutable_table();
1860  *new_table_message= table;
1861 
1862  finalizeStatementMessage(*statement, session);
1863 
1864  finalizeTransactionMessage(*transaction, session);
1865 
1866  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1867 
1868  cleanupTransactionMessage(transaction, session);
1869 
1870 }
1871 
1872 void TransactionServices::createSchema(Session& session,
1873  const message::Schema &schema)
1874 {
1875  if (! ReplicationServices::isActive())
1876  return;
1877 
1878  if (not message::is_replicated(schema))
1879  return;
1880 
1881  message::Transaction *transaction= getActiveTransactionMessage(session);
1882  message::Statement *statement= transaction->add_statement();
1883 
1884  initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
1885 
1886  /*
1887  * Construct the specialized CreateSchemaStatement message and attach
1888  * it to the generic Statement message
1889  */
1890  message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
1891  message::Schema *new_schema_message= create_schema_statement->mutable_schema();
1892  *new_schema_message= schema;
1893 
1894  finalizeStatementMessage(*statement, session);
1895 
1896  finalizeTransactionMessage(*transaction, session);
1897 
1898  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1899 
1900  cleanupTransactionMessage(transaction, session);
1901 
1902 }
1903 
1904 void TransactionServices::dropSchema(Session& session,
1905  const identifier::Schema& identifier,
1907 {
1908  if (not ReplicationServices::isActive())
1909  return;
1910 
1911  if (not message::is_replicated(schema))
1912  return;
1913 
1914  message::Transaction *transaction= getActiveTransactionMessage(session);
1915  message::Statement *statement= transaction->add_statement();
1916 
1917  initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
1918 
1919  /*
1920  * Construct the specialized DropSchemaStatement message and attach
1921  * it to the generic Statement message
1922  */
1923  message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
1924 
1925  drop_schema_statement->set_schema_name(identifier.getSchemaName());
1926 
1927  finalizeStatementMessage(*statement, session);
1928 
1929  finalizeTransactionMessage(*transaction, session);
1930 
1931  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1932 
1933  cleanupTransactionMessage(transaction, session);
1934 }
1935 
1936 void TransactionServices::alterSchema(Session& session,
1937  const message::Schema &old_schema,
1938  const message::Schema &new_schema)
1939 {
1940  if (! ReplicationServices::isActive())
1941  return;
1942 
1943  if (not message::is_replicated(old_schema))
1944  return;
1945 
1946  message::Transaction *transaction= getActiveTransactionMessage(session);
1947  message::Statement *statement= transaction->add_statement();
1948 
1949  initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
1950 
1951  /*
1952  * Construct the specialized AlterSchemaStatement message and attach
1953  * it to the generic Statement message
1954  */
1955  message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
1956 
1957  message::Schema *before= alter_schema_statement->mutable_before();
1958  message::Schema *after= alter_schema_statement->mutable_after();
1959 
1960  *before= old_schema;
1961  *after= new_schema;
1962 
1963  finalizeStatementMessage(*statement, session);
1964 
1965  finalizeTransactionMessage(*transaction, session);
1966 
1967  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
1968 
1969  cleanupTransactionMessage(transaction, session);
1970 }
1971 
1972 void TransactionServices::dropTable(Session& session,
1973  const identifier::Table& identifier,
1975  bool if_exists)
1976 {
1977  if (! ReplicationServices::isActive())
1978  return;
1979 
1980  if (not message::is_replicated(table))
1981  return;
1982 
1983  message::Transaction *transaction= getActiveTransactionMessage(session);
1984  message::Statement *statement= transaction->add_statement();
1985 
1986  initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
1987 
1988  /*
1989  * Construct the specialized DropTableStatement message and attach
1990  * it to the generic Statement message
1991  */
1992  message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
1993 
1994  drop_table_statement->set_if_exists_clause(if_exists);
1995 
1996  message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
1997 
1998  table_metadata->set_schema_name(identifier.getSchemaName());
1999  table_metadata->set_table_name(identifier.getTableName());
2000 
2001  finalizeStatementMessage(*statement, session);
2002 
2003  finalizeTransactionMessage(*transaction, session);
2004 
2005  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2006 
2007  cleanupTransactionMessage(transaction, session);
2008 }
2009 
2010 void TransactionServices::truncateTable(Session& session, Table &table)
2011 {
2012  if (! ReplicationServices::isActive())
2013  return;
2014 
2015  if (not table.getShare()->is_replicated())
2016  return;
2017 
2018  message::Transaction *transaction= getActiveTransactionMessage(session);
2019  message::Statement *statement= transaction->add_statement();
2020 
2021  initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
2022 
2023  /*
2024  * Construct the specialized TruncateTableStatement message and attach
2025  * it to the generic Statement message
2026  */
2027  message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
2028  message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
2029 
2030  table_metadata->set_schema_name(table.getShare()->getSchemaName());
2031  table_metadata->set_table_name(table.getShare()->getTableName());
2032 
2033  finalizeStatementMessage(*statement, session);
2034 
2035  finalizeTransactionMessage(*transaction, session);
2036 
2037  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2038 
2039  cleanupTransactionMessage(transaction, session);
2040 }
2041 
2042 void TransactionServices::rawStatement(Session& session,
2043  const string &query,
2044  const string &schema)
2045 {
2046  if (! ReplicationServices::isActive())
2047  return;
2048 
2049  message::Transaction *transaction= getActiveTransactionMessage(session);
2050  message::Statement *statement= transaction->add_statement();
2051 
2052  initStatementMessage(*statement, message::Statement::RAW_SQL, session);
2053  statement->set_sql(query);
2054  if (not schema.empty())
2055  statement->set_raw_sql_schema(schema);
2056  finalizeStatementMessage(*statement, session);
2057 
2058  finalizeTransactionMessage(*transaction, session);
2059 
2060  (void) ReplicationServices::pushTransactionMessage(session, *transaction);
2061 
2062  cleanupTransactionMessage(transaction, session);
2063 }
2064 
2065 int TransactionServices::sendEvent(Session& session, const message::Event &event)
2066 {
2067  if (not ReplicationServices::isActive())
2068  return 0;
2069  message::Transaction transaction;
2070 
2071  // set server id, start timestamp
2072  initTransactionMessage(transaction, session, true);
2073 
2074  // set end timestamp
2075  finalizeTransactionMessage(transaction, session);
2076 
2077  message::Event *trx_event= transaction.mutable_event();
2078  trx_event->CopyFrom(event);
2079  plugin::ReplicationReturnCode result= ReplicationServices::pushTransactionMessage(session, transaction);
2080  return result;
2081 }
2082 
2083 bool TransactionServices::sendStartupEvent(Session& session)
2084 {
2085  message::Event event;
2086  event.set_type(message::Event::STARTUP);
2087  return not sendEvent(session, event);
2088 }
2089 
2090 bool TransactionServices::sendShutdownEvent(Session& session)
2091 {
2092  message::Event event;
2093  event.set_type(message::Event::SHUTDOWN);
2094  return not sendEvent(session, event);
2095 }
2096 
2097 } /* namespace drizzled */