00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00050 #include <config.h>
00051 #include <drizzled/current_session.h>
00052 #include <drizzled/error.h>
00053 #include <drizzled/gettext.h>
00054 #include <drizzled/probes.h>
00055 #include <drizzled/sql_parse.h>
00056 #include <drizzled/session.h>
00057 #include <drizzled/sql_base.h>
00058 #include <drizzled/replication_services.h>
00059 #include <drizzled/transaction_services.h>
00060 #include <drizzled/transaction_context.h>
00061 #include <drizzled/message/transaction.pb.h>
00062 #include <drizzled/message/statement_transform.h>
00063 #include <drizzled/resource_context.h>
00064 #include <drizzled/lock.h>
00065 #include <drizzled/item/int.h>
00066 #include <drizzled/item/empty_string.h>
00067 #include <drizzled/field/epoch.h>
00068 #include <drizzled/plugin/client.h>
00069 #include <drizzled/plugin/monitored_in_transaction.h>
00070 #include <drizzled/plugin/transactional_storage_engine.h>
00071 #include <drizzled/plugin/xa_resource_manager.h>
00072 #include <drizzled/plugin/xa_storage_engine.h>
00073 #include <drizzled/internal/my_sys.h>
00074
00075 #include <vector>
00076 #include <algorithm>
00077 #include <functional>
00078 #include <google/protobuf/repeated_field.h>
00079
00080 using namespace std;
00081 using namespace google;
00082
00083 namespace drizzled
00084 {
00085
00303 TransactionServices::TransactionServices()
00304 {
00305 plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
00306 if (engine)
00307 {
00308 xa_storage_engine= (plugin::XaStorageEngine*)engine;
00309 }
00310 else
00311 {
00312 xa_storage_engine= NULL;
00313 }
00314 }
00315
00316 void TransactionServices::registerResourceForStatement(Session::reference session,
00317 plugin::MonitoredInTransaction *monitored,
00318 plugin::TransactionalStorageEngine *engine)
00319 {
00320 if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
00321 {
00322
00323
00324
00325
00326
00327
00328
00329 registerResourceForTransaction(session, monitored, engine);
00330 }
00331
00332 TransactionContext *trans= &session.transaction.stmt;
00333 ResourceContext *resource_context= session.getResourceContext(monitored, 0);
00334
00335 if (resource_context->isStarted())
00336 return;
00337
00338 assert(monitored->participatesInSqlTransaction());
00339 assert(not monitored->participatesInXaTransaction());
00340
00341 resource_context->setMonitored(monitored);
00342 resource_context->setTransactionalStorageEngine(engine);
00343 trans->registerResource(resource_context);
00344
00345 trans->no_2pc|= true;
00346 }
00347
00348 void TransactionServices::registerResourceForStatement(Session::reference session,
00349 plugin::MonitoredInTransaction *monitored,
00350 plugin::TransactionalStorageEngine *engine,
00351 plugin::XaResourceManager *resource_manager)
00352 {
00353 if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
00354 {
00355
00356
00357
00358
00359
00360
00361
00362 registerResourceForTransaction(session, monitored, engine, resource_manager);
00363 }
00364
00365 TransactionContext *trans= &session.transaction.stmt;
00366 ResourceContext *resource_context= session.getResourceContext(monitored, 0);
00367
00368 if (resource_context->isStarted())
00369 return;
00370
00371 assert(monitored->participatesInXaTransaction());
00372 assert(monitored->participatesInSqlTransaction());
00373
00374 resource_context->setMonitored(monitored);
00375 resource_context->setTransactionalStorageEngine(engine);
00376 resource_context->setXaResourceManager(resource_manager);
00377 trans->registerResource(resource_context);
00378
00379 trans->no_2pc|= false;
00380 }
00381
00382 void TransactionServices::registerResourceForTransaction(Session::reference session,
00383 plugin::MonitoredInTransaction *monitored,
00384 plugin::TransactionalStorageEngine *engine)
00385 {
00386 TransactionContext *trans= &session.transaction.all;
00387 ResourceContext *resource_context= session.getResourceContext(monitored, 1);
00388
00389 if (resource_context->isStarted())
00390 return;
00391
00392 session.server_status|= SERVER_STATUS_IN_TRANS;
00393
00394 trans->registerResource(resource_context);
00395
00396 assert(monitored->participatesInSqlTransaction());
00397 assert(not monitored->participatesInXaTransaction());
00398
00399 resource_context->setMonitored(monitored);
00400 resource_context->setTransactionalStorageEngine(engine);
00401 trans->no_2pc|= true;
00402
00403 if (session.transaction.xid_state.xid.is_null())
00404 session.transaction.xid_state.xid.set(session.getQueryId());
00405
00406
00407 if (! session.getResourceContext(monitored, 0)->isStarted())
00408 registerResourceForStatement(session, monitored, engine);
00409 }
00410
00411 void TransactionServices::registerResourceForTransaction(Session::reference session,
00412 plugin::MonitoredInTransaction *monitored,
00413 plugin::TransactionalStorageEngine *engine,
00414 plugin::XaResourceManager *resource_manager)
00415 {
00416 TransactionContext *trans= &session.transaction.all;
00417 ResourceContext *resource_context= session.getResourceContext(monitored, 1);
00418
00419 if (resource_context->isStarted())
00420 return;
00421
00422 session.server_status|= SERVER_STATUS_IN_TRANS;
00423
00424 trans->registerResource(resource_context);
00425
00426 assert(monitored->participatesInSqlTransaction());
00427
00428 resource_context->setMonitored(monitored);
00429 resource_context->setXaResourceManager(resource_manager);
00430 resource_context->setTransactionalStorageEngine(engine);
00431 trans->no_2pc|= true;
00432
00433 if (session.transaction.xid_state.xid.is_null())
00434 session.transaction.xid_state.xid.set(session.getQueryId());
00435
00436 engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
00437
00438
00439 if (! session.getResourceContext(monitored, 0)->isStarted())
00440 registerResourceForStatement(session, monitored, engine, resource_manager);
00441 }
00442
00443 void TransactionServices::allocateNewTransactionId()
00444 {
00445 ReplicationServices &replication_services= ReplicationServices::singleton();
00446 if (! replication_services.isActive())
00447 {
00448 return;
00449 }
00450
00451 Session *my_session= current_session;
00452 uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
00453 my_session->setXaId(xa_id);
00454 }
00455
00456 uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
00457 {
00458 if (session.getXaId() == 0)
00459 {
00460 session.setXaId(xa_storage_engine->getNewTransactionId(&session));
00461 }
00462
00463 return session.getXaId();
00464 }
00465
00466 int TransactionServices::commitTransaction(Session::reference session,
00467 bool normal_transaction)
00468 {
00469 int error= 0, cookie= 0;
00470
00471
00472
00473
00474 TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
00475 TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
00476
00477 bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
00478
00479
00480
00481
00482
00483
00484
00485 assert(session.transaction.stmt.getResourceContexts().empty() ||
00486 trans == &session.transaction.stmt);
00487
00488 if (resource_contexts.empty() == false)
00489 {
00490 if (is_real_trans && session.wait_if_global_read_lock(false, false))
00491 {
00492 rollbackTransaction(session, normal_transaction);
00493 return 1;
00494 }
00495
00496
00497
00498
00499
00500
00501 if (shouldConstructMessages())
00502 {
00503 for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00504 it != resource_contexts.end() && ! error;
00505 ++it)
00506 {
00507 ResourceContext *resource_context= *it;
00508 int err;
00509
00510
00511
00512
00513
00514 if (! resource_context->hasModifiedData())
00515 continue;
00516
00517 plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00518
00519 if (resource->participatesInXaTransaction())
00520 {
00521 if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
00522 {
00523 my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
00524 error= 1;
00525 }
00526 else
00527 {
00528 session.status_var.ha_prepare_count++;
00529 }
00530 }
00531 }
00532 if (error == 0 && is_real_trans)
00533 {
00534
00535
00536
00537
00538 error= commitTransactionMessage(session);
00539 }
00540 if (error)
00541 {
00542 rollbackTransaction(session, normal_transaction);
00543 error= 1;
00544 goto end;
00545 }
00546 }
00547 error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
00548 end:
00549 if (is_real_trans)
00550 session.startWaitingGlobalReadLock();
00551 }
00552 return error;
00553 }
00554
00559 int TransactionServices::commitPhaseOne(Session::reference session,
00560 bool normal_transaction)
00561 {
00562 int error=0;
00563 TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
00564 TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
00565
00566 bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
00567 bool all= normal_transaction;
00568
00569
00570
00571
00572 if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
00573 all= true;
00574
00575 if (resource_contexts.empty() == false)
00576 {
00577 for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00578 it != resource_contexts.end();
00579 ++it)
00580 {
00581 int err;
00582 ResourceContext *resource_context= *it;
00583
00584 plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00585
00586 if (resource->participatesInXaTransaction())
00587 {
00588 if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
00589 {
00590 my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
00591 error= 1;
00592 }
00593 else if (normal_transaction)
00594 {
00595 session.status_var.ha_commit_count++;
00596 }
00597 }
00598 else if (resource->participatesInSqlTransaction())
00599 {
00600 if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
00601 {
00602 my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
00603 error= 1;
00604 }
00605 else if (normal_transaction)
00606 {
00607 session.status_var.ha_commit_count++;
00608 }
00609 }
00610 resource_context->reset();
00611 }
00612
00613 if (is_real_trans)
00614 session.transaction.xid_state.xid.null();
00615
00616 if (normal_transaction)
00617 {
00618 session.variables.tx_isolation= session.session_tx_isolation;
00619 session.transaction.cleanup();
00620 }
00621 }
00622 trans->reset();
00623 return error;
00624 }
00625
00626 int TransactionServices::rollbackTransaction(Session::reference session,
00627 bool normal_transaction)
00628 {
00629 int error= 0;
00630 TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
00631 TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
00632
00633 bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
00634 bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
00635
00636
00637
00638
00639
00640 assert(session.transaction.stmt.getResourceContexts().empty() ||
00641 trans == &session.transaction.stmt);
00642
00643 if (resource_contexts.empty() == false)
00644 {
00645 for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00646 it != resource_contexts.end();
00647 ++it)
00648 {
00649 int err;
00650 ResourceContext *resource_context= *it;
00651
00652 plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00653
00654 if (resource->participatesInXaTransaction())
00655 {
00656 if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
00657 {
00658 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
00659 error= 1;
00660 }
00661 else if (normal_transaction)
00662 {
00663 session.status_var.ha_rollback_count++;
00664 }
00665 }
00666 else if (resource->participatesInSqlTransaction())
00667 {
00668 if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
00669 {
00670 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
00671 error= 1;
00672 }
00673 else if (normal_transaction)
00674 {
00675 session.status_var.ha_rollback_count++;
00676 }
00677 }
00678 resource_context->reset();
00679 }
00680
00681
00682
00683
00684
00685
00686
00687
00688 if (all)
00689 rollbackTransactionMessage(session);
00690 else
00691 rollbackStatementMessage(session);
00692
00693 if (is_real_trans)
00694 session.transaction.xid_state.xid.null();
00695 if (normal_transaction)
00696 {
00697 session.variables.tx_isolation=session.session_tx_isolation;
00698 session.transaction.cleanup();
00699 }
00700 }
00701 if (normal_transaction)
00702 session.transaction_rollback_request= false;
00703
00704
00705
00706
00707 if (is_real_trans &&
00708 session.transaction.all.hasModifiedNonTransData() &&
00709 session.getKilled() != Session::KILL_CONNECTION)
00710 {
00711 push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00712 ER_WARNING_NOT_COMPLETE_ROLLBACK,
00713 ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
00714 }
00715 trans->reset();
00716 return error;
00717 }
00718
00719 int TransactionServices::autocommitOrRollback(Session::reference session,
00720 int error)
00721 {
00722
00723 message::Statement *statement= session.getStatementMessage();
00724 if ((statement != NULL) && (! error))
00725 finalizeStatementMessage(*statement, session);
00726
00727 if (session.transaction.stmt.getResourceContexts().empty() == false)
00728 {
00729 TransactionContext *trans = &session.transaction.stmt;
00730 TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
00731 for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00732 it != resource_contexts.end();
00733 ++it)
00734 {
00735 ResourceContext *resource_context= *it;
00736
00737 resource_context->getTransactionalStorageEngine()->endStatement(&session);
00738 }
00739
00740 if (! error)
00741 {
00742 if (commitTransaction(session, false))
00743 error= 1;
00744 }
00745 else
00746 {
00747 (void) rollbackTransaction(session, false);
00748 if (session.transaction_rollback_request)
00749 {
00750 (void) rollbackTransaction(session, true);
00751 session.server_status&= ~SERVER_STATUS_IN_TRANS;
00752 }
00753 }
00754
00755 session.variables.tx_isolation= session.session_tx_isolation;
00756 }
00757
00758 return error;
00759 }
00760
00761 struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
00762 {
00763 result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
00764 {
00765
00766
00767 return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
00768 }
00769 };
00770
00771 int TransactionServices::rollbackToSavepoint(Session::reference session,
00772 NamedSavepoint &sv)
00773 {
00774 int error= 0;
00775 TransactionContext *trans= &session.transaction.all;
00776 TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
00777 TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
00778
00779 trans->no_2pc= false;
00780
00781
00782
00783
00784 for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
00785 it != sv_resource_contexts.end();
00786 ++it)
00787 {
00788 int err;
00789 ResourceContext *resource_context= *it;
00790
00791 plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00792
00793 if (resource->participatesInSqlTransaction())
00794 {
00795 if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
00796 {
00797 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
00798 error= 1;
00799 }
00800 else
00801 {
00802 session.status_var.ha_savepoint_rollback_count++;
00803 }
00804 }
00805 trans->no_2pc|= not resource->participatesInXaTransaction();
00806 }
00807
00808
00809
00810
00811 {
00812 TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
00813 TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
00814 TransactionContext::ResourceContexts set_difference_contexts;
00815
00816
00817
00818
00819
00820
00821 set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
00822
00823 sort(sorted_tran_resource_contexts.begin(),
00824 sorted_tran_resource_contexts.end(),
00825 ResourceContextCompare());
00826 sort(sorted_sv_resource_contexts.begin(),
00827 sorted_sv_resource_contexts.end(),
00828 ResourceContextCompare());
00829 set_difference(sorted_tran_resource_contexts.begin(),
00830 sorted_tran_resource_contexts.end(),
00831 sorted_sv_resource_contexts.begin(),
00832 sorted_sv_resource_contexts.end(),
00833 set_difference_contexts.begin(),
00834 ResourceContextCompare());
00835
00836
00837
00838
00839
00840
00841 for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
00842 it != set_difference_contexts.end();
00843 ++it)
00844 {
00845 ResourceContext *resource_context= *it;
00846 int err;
00847
00848 plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00849
00850 if (resource->participatesInSqlTransaction())
00851 {
00852 if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
00853 {
00854 my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
00855 error= 1;
00856 }
00857 else
00858 {
00859 session.status_var.ha_rollback_count++;
00860 }
00861 }
00862 resource_context->reset();
00863 }
00864 }
00865 trans->setResourceContexts(sv_resource_contexts);
00866
00867 if (shouldConstructMessages())
00868 {
00869 cleanupTransactionMessage(getActiveTransactionMessage(session), session);
00870 message::Transaction *savepoint_transaction= sv.getTransactionMessage();
00871 if (savepoint_transaction != NULL)
00872 {
00873
00874
00875
00876
00877
00878 message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
00879 uint32_t num_statements = savepoint_transaction_copy->statement_size();
00880 if (num_statements == 0)
00881 {
00882 session.setStatementMessage(NULL);
00883 }
00884 else
00885 {
00886 session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));
00887 }
00888 session.setTransactionMessage(savepoint_transaction_copy);
00889 }
00890 }
00891
00892 return error;
00893 }
00894
00901 int TransactionServices::setSavepoint(Session::reference session,
00902 NamedSavepoint &sv)
00903 {
00904 int error= 0;
00905 TransactionContext *trans= &session.transaction.all;
00906 TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
00907
00908 if (resource_contexts.empty() == false)
00909 {
00910 for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00911 it != resource_contexts.end();
00912 ++it)
00913 {
00914 ResourceContext *resource_context= *it;
00915 int err;
00916
00917 plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00918
00919 if (resource->participatesInSqlTransaction())
00920 {
00921 if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
00922 {
00923 my_error(ER_GET_ERRNO, MYF(0), err);
00924 error= 1;
00925 }
00926 else
00927 {
00928 session.status_var.ha_savepoint_count++;
00929 }
00930 }
00931 }
00932 }
00933
00934
00935
00936 sv.setResourceContexts(resource_contexts);
00937
00938 if (shouldConstructMessages())
00939 {
00940 message::Transaction *transaction= session.getTransactionMessage();
00941
00942 if (transaction != NULL)
00943 {
00944 message::Transaction *transaction_savepoint=
00945 new message::Transaction(*transaction);
00946 sv.setTransactionMessage(transaction_savepoint);
00947 }
00948 }
00949
00950 return error;
00951 }
00952
00953 int TransactionServices::releaseSavepoint(Session::reference session,
00954 NamedSavepoint &sv)
00955 {
00956 int error= 0;
00957
00958 TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
00959
00960 for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00961 it != resource_contexts.end();
00962 ++it)
00963 {
00964 int err;
00965 ResourceContext *resource_context= *it;
00966
00967 plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00968
00969 if (resource->participatesInSqlTransaction())
00970 {
00971 if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
00972 {
00973 my_error(ER_GET_ERRNO, MYF(0), err);
00974 error= 1;
00975 }
00976 }
00977 }
00978
00979 return error;
00980 }
00981
00982 bool TransactionServices::shouldConstructMessages()
00983 {
00984 ReplicationServices &replication_services= ReplicationServices::singleton();
00985 return replication_services.isActive();
00986 }
00987
00988 message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
00989 bool should_inc_trx_id)
00990 {
00991 message::Transaction *transaction= session.getTransactionMessage();
00992
00993 if (unlikely(transaction == NULL))
00994 {
00995
00996
00997
00998
00999
01000 transaction= new (nothrow) message::Transaction();
01001 initTransactionMessage(*transaction, session, should_inc_trx_id);
01002 session.setTransactionMessage(transaction);
01003 return transaction;
01004 }
01005 else
01006 return transaction;
01007 }
01008
01009 void TransactionServices::initTransactionMessage(message::Transaction &transaction,
01010 Session::reference session,
01011 bool should_inc_trx_id)
01012 {
01013 message::TransactionContext *trx= transaction.mutable_transaction_context();
01014 trx->set_server_id(session.getServerId());
01015
01016 if (should_inc_trx_id)
01017 {
01018 trx->set_transaction_id(getCurrentTransactionId(session));
01019 session.setXaId(0);
01020 }
01021 else
01022 {
01023
01024 trx->set_transaction_id(0);
01025 }
01026
01027 trx->set_start_timestamp(session.getCurrentTimestamp());
01028
01029
01030 transaction.set_segment_id(1);
01031 transaction.set_end_segment(true);
01032 }
01033
01034 void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
01035 Session::const_reference session)
01036 {
01037 message::TransactionContext *trx= transaction.mutable_transaction_context();
01038 trx->set_end_timestamp(session.getCurrentTimestamp());
01039 }
01040
01041 void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
01042 Session::reference session)
01043 {
01044 delete transaction;
01045 session.setStatementMessage(NULL);
01046 session.setTransactionMessage(NULL);
01047 session.setXaId(0);
01048 }
01049
01050 int TransactionServices::commitTransactionMessage(Session::reference session)
01051 {
01052 ReplicationServices &replication_services= ReplicationServices::singleton();
01053 if (! replication_services.isActive())
01054 return 0;
01055
01056
01057
01058
01059
01060 if (session.getTransactionMessage() == NULL)
01061 return 0;
01062
01063
01064 message::Statement *statement= session.getStatementMessage();
01065
01066 if (statement != NULL)
01067 {
01068 finalizeStatementMessage(*statement, session);
01069 }
01070
01071 message::Transaction* transaction= getActiveTransactionMessage(session);
01072
01073
01074
01075
01076
01077
01078
01079 if (transaction->statement_size() == 0)
01080 {
01081 cleanupTransactionMessage(transaction, session);
01082 return 0;
01083 }
01084
01085 finalizeTransactionMessage(*transaction, session);
01086
01087 plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
01088
01089 cleanupTransactionMessage(transaction, session);
01090
01091 return static_cast<int>(result);
01092 }
01093
01094 void TransactionServices::initStatementMessage(message::Statement &statement,
01095 message::Statement::Type type,
01096 Session::const_reference session)
01097 {
01098 statement.set_type(type);
01099 statement.set_start_timestamp(session.getCurrentTimestamp());
01100
01101 if (session.variables.replicate_query)
01102 statement.set_sql(session.getQueryString()->c_str());
01103 }
01104
01105 void TransactionServices::finalizeStatementMessage(message::Statement &statement,
01106 Session::reference session)
01107 {
01108 statement.set_end_timestamp(session.getCurrentTimestamp());
01109 session.setStatementMessage(NULL);
01110 }
01111
01112 void TransactionServices::rollbackTransactionMessage(Session::reference session)
01113 {
01114 ReplicationServices &replication_services= ReplicationServices::singleton();
01115 if (! replication_services.isActive())
01116 return;
01117
01118 message::Transaction *transaction= getActiveTransactionMessage(session);
01119
01120
01121
01122
01123
01124
01125
01126
01127
01128
01129
01130
01131
01132
01133
01134
01135
01136
01137 if (unlikely(message::transactionContainsBulkSegment(*transaction)))
01138 {
01139
01140 uint64_t trx_id= transaction->transaction_context().transaction_id();
01141 uint32_t seg_id= transaction->segment_id();
01142
01143
01144
01145
01146
01147 transaction->Clear();
01148 initTransactionMessage(*transaction, session, false);
01149
01150
01151 transaction->mutable_transaction_context()->set_transaction_id(trx_id);
01152 transaction->set_segment_id(seg_id);
01153 transaction->set_end_segment(true);
01154
01155 message::Statement *statement= transaction->add_statement();
01156
01157 initStatementMessage(*statement, message::Statement::ROLLBACK, session);
01158 finalizeStatementMessage(*statement, session);
01159
01160 finalizeTransactionMessage(*transaction, session);
01161
01162 (void) replication_services.pushTransactionMessage(session, *transaction);
01163 }
01164
01165 cleanupTransactionMessage(transaction, session);
01166 }
01167
01168 void TransactionServices::rollbackStatementMessage(Session::reference session)
01169 {
01170 ReplicationServices &replication_services= ReplicationServices::singleton();
01171 if (! replication_services.isActive())
01172 return;
01173
01174 message::Statement *current_statement= session.getStatementMessage();
01175
01176
01177 if (current_statement == NULL)
01178 return;
01179
01180
01181
01182
01183
01184
01185
01186 bool is_segmented= false;
01187
01188 switch (current_statement->type())
01189 {
01190 case message::Statement::INSERT:
01191 if (current_statement->insert_data().segment_id() > 1)
01192 is_segmented= true;
01193 break;
01194
01195 case message::Statement::UPDATE:
01196 if (current_statement->update_data().segment_id() > 1)
01197 is_segmented= true;
01198 break;
01199
01200 case message::Statement::DELETE:
01201 if (current_statement->delete_data().segment_id() > 1)
01202 is_segmented= true;
01203 break;
01204
01205 default:
01206 break;
01207 }
01208
01209
01210
01211
01212
01213 message::Transaction *transaction= getActiveTransactionMessage(session);
01214 google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
01215 statements_in_txn= transaction->mutable_statement();
01216 statements_in_txn->RemoveLast();
01217 session.setStatementMessage(NULL);
01218
01219
01220
01221
01222
01223
01224 if (is_segmented)
01225 {
01226 current_statement= transaction->add_statement();
01227 initStatementMessage(*current_statement,
01228 message::Statement::ROLLBACK_STATEMENT,
01229 session);
01230 finalizeStatementMessage(*current_statement, session);
01231 }
01232 }
01233
01234 message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
01235 message::Transaction *transaction)
01236 {
01237 uint64_t trx_id= transaction->transaction_context().transaction_id();
01238 uint32_t seg_id= transaction->segment_id();
01239
01240 transaction->set_end_segment(false);
01241 commitTransactionMessage(session);
01242 transaction= getActiveTransactionMessage(session, false);
01243
01244
01245 transaction->mutable_transaction_context()->set_transaction_id(trx_id);
01246 transaction->set_segment_id(seg_id + 1);
01247 transaction->set_end_segment(true);
01248
01249 return transaction;
01250 }
01251
01252 message::Statement &TransactionServices::getInsertStatement(Session::reference session,
01253 Table &table,
01254 uint32_t *next_segment_id)
01255 {
01256 message::Statement *statement= session.getStatementMessage();
01257 message::Transaction *transaction= NULL;
01258
01259
01260
01261
01262
01263
01264
01265
01266 if (statement == NULL)
01267 {
01268 transaction= getActiveTransactionMessage(session);
01269
01270 if (static_cast<size_t>(transaction->ByteSize()) >=
01271 transaction_message_threshold)
01272 {
01273 transaction= segmentTransactionMessage(session, transaction);
01274 }
01275
01276 statement= transaction->add_statement();
01277 setInsertHeader(*statement, session, table);
01278 session.setStatementMessage(statement);
01279 }
01280 else
01281 {
01282 transaction= getActiveTransactionMessage(session);
01283
01284
01285
01286
01287
01288
01289 if (static_cast<size_t>(transaction->ByteSize()) >=
01290 transaction_message_threshold)
01291 {
01292
01293 uint64_t trx_id= transaction->transaction_context().transaction_id();
01294 uint32_t seg_id= transaction->segment_id();
01295
01296 message::InsertData *current_data= statement->mutable_insert_data();
01297
01298
01299 *next_segment_id= current_data->segment_id() + 1;
01300
01301 current_data->set_end_segment(false);
01302 transaction->set_end_segment(false);
01303
01304
01305
01306
01307
01308
01309 commitTransactionMessage(session);
01310
01311
01312
01313
01314
01315
01316 transaction= getActiveTransactionMessage(session, false);
01317 assert(transaction != NULL);
01318
01319 statement= transaction->add_statement();
01320 setInsertHeader(*statement, session, table);
01321 session.setStatementMessage(statement);
01322
01323
01324 transaction->mutable_transaction_context()->set_transaction_id(trx_id);
01325 transaction->set_segment_id(seg_id + 1);
01326 transaction->set_end_segment(true);
01327 }
01328 else
01329 {
01330
01331
01332
01333
01334 const message::InsertData ¤t_data= statement->insert_data();
01335 *next_segment_id= current_data.segment_id();
01336 }
01337 }
01338
01339 return *statement;
01340 }
01341
01342 void TransactionServices::setInsertHeader(message::Statement &statement,
01343 Session::const_reference session,
01344 Table &table)
01345 {
01346 initStatementMessage(statement, message::Statement::INSERT, session);
01347
01348
01349
01350
01351
01352
01353 message::InsertHeader *header= statement.mutable_insert_header();
01354 message::TableMetadata *table_metadata= header->mutable_table_metadata();
01355
01356 string schema_name;
01357 (void) table.getShare()->getSchemaName(schema_name);
01358 string table_name;
01359 (void) table.getShare()->getTableName(table_name);
01360
01361 table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
01362 table_metadata->set_table_name(table_name.c_str(), table_name.length());
01363
01364 Field *current_field;
01365 Field **table_fields= table.getFields();
01366
01367 message::FieldMetadata *field_metadata;
01368
01369
01370 table.setReadSet();
01371
01372 while ((current_field= *table_fields++) != NULL)
01373 {
01374 field_metadata= header->add_field_metadata();
01375 field_metadata->set_name(current_field->field_name);
01376 field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
01377 }
01378 }
01379
01380 bool TransactionServices::insertRecord(Session::reference session,
01381 Table &table)
01382 {
01383 ReplicationServices &replication_services= ReplicationServices::singleton();
01384 if (! replication_services.isActive())
01385 return false;
01386
01387 if (not table.getShare()->is_replicated())
01388 return false;
01389
01398 if (not table.getShare()->hasPrimaryKey())
01399 {
01400 my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
01401 return true;
01402 }
01403
01404 uint32_t next_segment_id= 1;
01405 message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
01406
01407 message::InsertData *data= statement.mutable_insert_data();
01408 data->set_segment_id(next_segment_id);
01409 data->set_end_segment(true);
01410 message::InsertRecord *record= data->add_record();
01411
01412 Field *current_field;
01413 Field **table_fields= table.getFields();
01414
01415 String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
01416 string_value->set_charset(system_charset_info);
01417
01418
01419 table.setReadSet();
01420
01421 while ((current_field= *table_fields++) != NULL)
01422 {
01423 if (current_field->is_null())
01424 {
01425 record->add_is_null(true);
01426 record->add_insert_value("", 0);
01427 }
01428 else
01429 {
01430 string_value= current_field->val_str_internal(string_value);
01431 record->add_is_null(false);
01432 record->add_insert_value(string_value->c_ptr(), string_value->length());
01433 string_value->free();
01434 }
01435 }
01436 return false;
01437 }
01438
01439 message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
01440 Table &table,
01441 const unsigned char *old_record,
01442 const unsigned char *new_record,
01443 uint32_t *next_segment_id)
01444 {
01445 message::Statement *statement= session.getStatementMessage();
01446 message::Transaction *transaction= NULL;
01447
01448
01449
01450
01451
01452
01453
01454
01455 if (statement == NULL)
01456 {
01457 transaction= getActiveTransactionMessage(session);
01458
01459 if (static_cast<size_t>(transaction->ByteSize()) >=
01460 transaction_message_threshold)
01461 {
01462 transaction= segmentTransactionMessage(session, transaction);
01463 }
01464
01465 statement= transaction->add_statement();
01466 setUpdateHeader(*statement, session, table, old_record, new_record);
01467 session.setStatementMessage(statement);
01468 }
01469 else
01470 {
01471 transaction= getActiveTransactionMessage(session);
01472
01473
01474
01475
01476
01477
01478 if (static_cast<size_t>(transaction->ByteSize()) >=
01479 transaction_message_threshold)
01480 {
01481
01482 uint64_t trx_id= transaction->transaction_context().transaction_id();
01483 uint32_t seg_id= transaction->segment_id();
01484
01485 message::UpdateData *current_data= statement->mutable_update_data();
01486
01487
01488 *next_segment_id= current_data->segment_id() + 1;
01489
01490 current_data->set_end_segment(false);
01491 transaction->set_end_segment(false);
01492
01493
01494
01495
01496
01497
01498 commitTransactionMessage(session);
01499
01500
01501
01502
01503
01504
01505 transaction= getActiveTransactionMessage(session, false);
01506 assert(transaction != NULL);
01507
01508 statement= transaction->add_statement();
01509 setUpdateHeader(*statement, session, table, old_record, new_record);
01510 session.setStatementMessage(statement);
01511
01512
01513 transaction->mutable_transaction_context()->set_transaction_id(trx_id);
01514 transaction->set_segment_id(seg_id + 1);
01515 transaction->set_end_segment(true);
01516 }
01517 else
01518 {
01519
01520
01521
01522
01523 const message::UpdateData ¤t_data= statement->update_data();
01524 *next_segment_id= current_data.segment_id();
01525 }
01526 }
01527
01528 return *statement;
01529 }
01530
01531 void TransactionServices::setUpdateHeader(message::Statement &statement,
01532 Session::const_reference session,
01533 Table &table,
01534 const unsigned char *old_record,
01535 const unsigned char *new_record)
01536 {
01537 initStatementMessage(statement, message::Statement::UPDATE, session);
01538
01539
01540
01541
01542
01543
01544 message::UpdateHeader *header= statement.mutable_update_header();
01545 message::TableMetadata *table_metadata= header->mutable_table_metadata();
01546
01547 string schema_name;
01548 (void) table.getShare()->getSchemaName(schema_name);
01549 string table_name;
01550 (void) table.getShare()->getTableName(table_name);
01551
01552 table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
01553 table_metadata->set_table_name(table_name.c_str(), table_name.length());
01554
01555 Field *current_field;
01556 Field **table_fields= table.getFields();
01557
01558 message::FieldMetadata *field_metadata;
01559
01560
01561 table.setReadSet();
01562
01563 while ((current_field= *table_fields++) != NULL)
01564 {
01565
01566
01567
01568
01569 if (table.getShare()->fieldInPrimaryKey(current_field))
01570 {
01571 field_metadata= header->add_key_field_metadata();
01572 field_metadata->set_name(current_field->field_name);
01573 field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
01574 }
01575
01576 if (isFieldUpdated(current_field, table, old_record, new_record))
01577 {
01578
01579 field_metadata= header->add_set_field_metadata();
01580 field_metadata->set_name(current_field->field_name);
01581 field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
01582 }
01583 }
01584 }
01585
01586 void TransactionServices::updateRecord(Session::reference session,
01587 Table &table,
01588 const unsigned char *old_record,
01589 const unsigned char *new_record)
01590 {
01591 ReplicationServices &replication_services= ReplicationServices::singleton();
01592 if (! replication_services.isActive())
01593 return;
01594
01595 if (not table.getShare()->is_replicated())
01596 return;
01597
01598 uint32_t next_segment_id= 1;
01599 message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
01600
01601 message::UpdateData *data= statement.mutable_update_data();
01602 data->set_segment_id(next_segment_id);
01603 data->set_end_segment(true);
01604 message::UpdateRecord *record= data->add_record();
01605
01606 Field *current_field;
01607 Field **table_fields= table.getFields();
01608 String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
01609 string_value->set_charset(system_charset_info);
01610
01611 while ((current_field= *table_fields++) != NULL)
01612 {
01613
01614
01615
01616
01617
01618
01619
01620
01621
01622
01623
01624 if (isFieldUpdated(current_field, table, old_record, new_record))
01625 {
01626
01627 bool is_read_set= current_field->isReadSet();
01628
01629
01630 table.setReadSet(current_field->position());
01631
01632
01633 string_value= current_field->val_str_internal(string_value);
01634
01635
01636
01637
01638
01639 current_field->setReadSet(is_read_set);
01640
01641 if (current_field->is_null())
01642 {
01643 record->add_is_null(true);
01644 record->add_after_value("", 0);
01645 }
01646 else
01647 {
01648 record->add_is_null(false);
01649 record->add_after_value(string_value->c_ptr(), string_value->length());
01650 }
01651 string_value->free();
01652 }
01653
01654
01655
01656
01657
01658
01659 if (table.getShare()->fieldInPrimaryKey(current_field))
01660 {
01666 string_value= current_field->val_str_internal(string_value,
01667 old_record +
01668 current_field->offset(const_cast<unsigned char *>(new_record)));
01669 record->add_key_value(string_value->c_ptr(), string_value->length());
01670 string_value->free();
01671 }
01672
01673 }
01674 }
01675
01676 bool TransactionServices::isFieldUpdated(Field *current_field,
01677 Table &table,
01678 const unsigned char *old_record,
01679 const unsigned char *new_record)
01680 {
01681
01682
01683
01684
01685
01686 const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
01687 const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
01688
01689 uint32_t field_length= current_field->pack_length();
01691 bool old_value_is_null= current_field->is_null_in_record(old_record);
01692 bool new_value_is_null= current_field->is_null_in_record(new_record);
01693
01694 bool isUpdated= false;
01695 if (old_value_is_null != new_value_is_null)
01696 {
01697 if ((old_value_is_null) && (! new_value_is_null))
01698 {
01699 isUpdated= true;
01700 }
01701 else if ((! old_value_is_null) && (new_value_is_null))
01702 {
01703 isUpdated= true;
01704 }
01705 }
01706
01707 if (! isUpdated)
01708 {
01709 if (memcmp(old_ptr, new_ptr, field_length) != 0)
01710 {
01711 isUpdated= true;
01712 }
01713 }
01714 return isUpdated;
01715 }
01716
01717 message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
01718 Table &table,
01719 uint32_t *next_segment_id)
01720 {
01721 message::Statement *statement= session.getStatementMessage();
01722 message::Transaction *transaction= NULL;
01723
01724
01725
01726
01727
01728
01729
01730
01731 if (statement == NULL)
01732 {
01733 transaction= getActiveTransactionMessage(session);
01734
01735 if (static_cast<size_t>(transaction->ByteSize()) >=
01736 transaction_message_threshold)
01737 {
01738 transaction= segmentTransactionMessage(session, transaction);
01739 }
01740
01741 statement= transaction->add_statement();
01742 setDeleteHeader(*statement, session, table);
01743 session.setStatementMessage(statement);
01744 }
01745 else
01746 {
01747 transaction= getActiveTransactionMessage(session);
01748
01749
01750
01751
01752
01753
01754 if (static_cast<size_t>(transaction->ByteSize()) >=
01755 transaction_message_threshold)
01756 {
01757
01758 uint64_t trx_id= transaction->transaction_context().transaction_id();
01759 uint32_t seg_id= transaction->segment_id();
01760
01761 message::DeleteData *current_data= statement->mutable_delete_data();
01762
01763
01764 *next_segment_id= current_data->segment_id() + 1;
01765
01766 current_data->set_end_segment(false);
01767 transaction->set_end_segment(false);
01768
01769
01770
01771
01772
01773
01774 commitTransactionMessage(session);
01775
01776
01777
01778
01779
01780
01781 transaction= getActiveTransactionMessage(session, false);
01782 assert(transaction != NULL);
01783
01784 statement= transaction->add_statement();
01785 setDeleteHeader(*statement, session, table);
01786 session.setStatementMessage(statement);
01787
01788
01789 transaction->mutable_transaction_context()->set_transaction_id(trx_id);
01790 transaction->set_segment_id(seg_id + 1);
01791 transaction->set_end_segment(true);
01792 }
01793 else
01794 {
01795
01796
01797
01798
01799 const message::DeleteData ¤t_data= statement->delete_data();
01800 *next_segment_id= current_data.segment_id();
01801 }
01802 }
01803
01804 return *statement;
01805 }
01806
01807 void TransactionServices::setDeleteHeader(message::Statement &statement,
01808 Session::const_reference session,
01809 Table &table)
01810 {
01811 initStatementMessage(statement, message::Statement::DELETE, session);
01812
01813
01814
01815
01816
01817 message::DeleteHeader *header= statement.mutable_delete_header();
01818 message::TableMetadata *table_metadata= header->mutable_table_metadata();
01819
01820 string schema_name;
01821 (void) table.getShare()->getSchemaName(schema_name);
01822 string table_name;
01823 (void) table.getShare()->getTableName(table_name);
01824
01825 table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
01826 table_metadata->set_table_name(table_name.c_str(), table_name.length());
01827
01828 Field *current_field;
01829 Field **table_fields= table.getFields();
01830
01831 message::FieldMetadata *field_metadata;
01832
01833 while ((current_field= *table_fields++) != NULL)
01834 {
01835
01836
01837
01838
01839
01840 if (table.getShare()->fieldInPrimaryKey(current_field))
01841 {
01842 field_metadata= header->add_key_field_metadata();
01843 field_metadata->set_name(current_field->field_name);
01844 field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
01845 }
01846 }
01847 }
01848
01849 void TransactionServices::deleteRecord(Session::reference session,
01850 Table &table,
01851 bool use_update_record)
01852 {
01853 ReplicationServices &replication_services= ReplicationServices::singleton();
01854 if (! replication_services.isActive())
01855 return;
01856
01857 if (not table.getShare()->is_replicated())
01858 return;
01859
01860 uint32_t next_segment_id= 1;
01861 message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
01862
01863 message::DeleteData *data= statement.mutable_delete_data();
01864 data->set_segment_id(next_segment_id);
01865 data->set_end_segment(true);
01866 message::DeleteRecord *record= data->add_record();
01867
01868 Field *current_field;
01869 Field **table_fields= table.getFields();
01870 String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
01871 string_value->set_charset(system_charset_info);
01872
01873 while ((current_field= *table_fields++) != NULL)
01874 {
01875
01876
01877
01878
01879
01880 if (table.getShare()->fieldInPrimaryKey(current_field))
01881 {
01882 if (use_update_record)
01883 {
01884
01885
01886
01887
01888
01889
01890
01891 const unsigned char *old_ptr= current_field->ptr;
01892 current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
01893 string_value= current_field->val_str_internal(string_value);
01894 current_field->ptr= const_cast<unsigned char *>(old_ptr);
01895 }
01896 else
01897 {
01898 string_value= current_field->val_str_internal(string_value);
01902 }
01903 record->add_key_value(string_value->c_ptr(), string_value->length());
01904 string_value->free();
01905 }
01906 }
01907 }
01908
01909 void TransactionServices::createTable(Session::reference session,
01910 const message::Table &table)
01911 {
01912 ReplicationServices &replication_services= ReplicationServices::singleton();
01913 if (not replication_services.isActive())
01914 return;
01915
01916 if (not message::is_replicated(table))
01917 return;
01918
01919 message::Transaction *transaction= getActiveTransactionMessage(session);
01920 message::Statement *statement= transaction->add_statement();
01921
01922 initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
01923
01924
01925
01926
01927
01928 message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
01929 message::Table *new_table_message= create_table_statement->mutable_table();
01930 *new_table_message= table;
01931
01932 finalizeStatementMessage(*statement, session);
01933
01934 finalizeTransactionMessage(*transaction, session);
01935
01936 (void) replication_services.pushTransactionMessage(session, *transaction);
01937
01938 cleanupTransactionMessage(transaction, session);
01939
01940 }
01941
01942 void TransactionServices::createSchema(Session::reference session,
01943 const message::Schema &schema)
01944 {
01945 ReplicationServices &replication_services= ReplicationServices::singleton();
01946 if (! replication_services.isActive())
01947 return;
01948
01949 if (not message::is_replicated(schema))
01950 return;
01951
01952 message::Transaction *transaction= getActiveTransactionMessage(session);
01953 message::Statement *statement= transaction->add_statement();
01954
01955 initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
01956
01957
01958
01959
01960
01961 message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
01962 message::Schema *new_schema_message= create_schema_statement->mutable_schema();
01963 *new_schema_message= schema;
01964
01965 finalizeStatementMessage(*statement, session);
01966
01967 finalizeTransactionMessage(*transaction, session);
01968
01969 (void) replication_services.pushTransactionMessage(session, *transaction);
01970
01971 cleanupTransactionMessage(transaction, session);
01972
01973 }
01974
01975 void TransactionServices::dropSchema(Session::reference session,
01976 identifier::Schema::const_reference identifier,
01977 message::schema::const_reference schema)
01978 {
01979 ReplicationServices &replication_services= ReplicationServices::singleton();
01980 if (not replication_services.isActive())
01981 return;
01982
01983 if (not message::is_replicated(schema))
01984 return;
01985
01986 message::Transaction *transaction= getActiveTransactionMessage(session);
01987 message::Statement *statement= transaction->add_statement();
01988
01989 initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
01990
01991
01992
01993
01994
01995 message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
01996
01997 drop_schema_statement->set_schema_name(identifier.getSchemaName());
01998
01999 finalizeStatementMessage(*statement, session);
02000
02001 finalizeTransactionMessage(*transaction, session);
02002
02003 (void) replication_services.pushTransactionMessage(session, *transaction);
02004
02005 cleanupTransactionMessage(transaction, session);
02006 }
02007
02008 void TransactionServices::alterSchema(Session::reference session,
02009 const message::Schema &old_schema,
02010 const message::Schema &new_schema)
02011 {
02012 ReplicationServices &replication_services= ReplicationServices::singleton();
02013 if (! replication_services.isActive())
02014 return;
02015
02016 if (not message::is_replicated(old_schema))
02017 return;
02018
02019 message::Transaction *transaction= getActiveTransactionMessage(session);
02020 message::Statement *statement= transaction->add_statement();
02021
02022 initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
02023
02024
02025
02026
02027
02028 message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
02029
02030 message::Schema *before= alter_schema_statement->mutable_before();
02031 message::Schema *after= alter_schema_statement->mutable_after();
02032
02033 *before= old_schema;
02034 *after= new_schema;
02035
02036 finalizeStatementMessage(*statement, session);
02037
02038 finalizeTransactionMessage(*transaction, session);
02039
02040 (void) replication_services.pushTransactionMessage(session, *transaction);
02041
02042 cleanupTransactionMessage(transaction, session);
02043 }
02044
02045 void TransactionServices::dropTable(Session::reference session,
02046 identifier::Table::const_reference identifier,
02047 message::table::const_reference table,
02048 bool if_exists)
02049 {
02050 ReplicationServices &replication_services= ReplicationServices::singleton();
02051 if (! replication_services.isActive())
02052 return;
02053
02054 if (not message::is_replicated(table))
02055 return;
02056
02057 message::Transaction *transaction= getActiveTransactionMessage(session);
02058 message::Statement *statement= transaction->add_statement();
02059
02060 initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
02061
02062
02063
02064
02065
02066 message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
02067
02068 drop_table_statement->set_if_exists_clause(if_exists);
02069
02070 message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
02071
02072 table_metadata->set_schema_name(identifier.getSchemaName());
02073 table_metadata->set_table_name(identifier.getTableName());
02074
02075 finalizeStatementMessage(*statement, session);
02076
02077 finalizeTransactionMessage(*transaction, session);
02078
02079 (void) replication_services.pushTransactionMessage(session, *transaction);
02080
02081 cleanupTransactionMessage(transaction, session);
02082 }
02083
02084 void TransactionServices::truncateTable(Session::reference session,
02085 Table &table)
02086 {
02087 ReplicationServices &replication_services= ReplicationServices::singleton();
02088 if (! replication_services.isActive())
02089 return;
02090
02091 if (not table.getShare()->is_replicated())
02092 return;
02093
02094 message::Transaction *transaction= getActiveTransactionMessage(session);
02095 message::Statement *statement= transaction->add_statement();
02096
02097 initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
02098
02099
02100
02101
02102
02103 message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
02104 message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
02105
02106 string schema_name;
02107 (void) table.getShare()->getSchemaName(schema_name);
02108
02109 string table_name;
02110 (void) table.getShare()->getTableName(table_name);
02111
02112 table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
02113 table_metadata->set_table_name(table_name.c_str(), table_name.length());
02114
02115 finalizeStatementMessage(*statement, session);
02116
02117 finalizeTransactionMessage(*transaction, session);
02118
02119 (void) replication_services.pushTransactionMessage(session, *transaction);
02120
02121 cleanupTransactionMessage(transaction, session);
02122 }
02123
02124 void TransactionServices::rawStatement(Session::reference session,
02125 const string &query,
02126 const string &schema)
02127 {
02128 ReplicationServices &replication_services= ReplicationServices::singleton();
02129 if (! replication_services.isActive())
02130 return;
02131
02132 message::Transaction *transaction= getActiveTransactionMessage(session);
02133 message::Statement *statement= transaction->add_statement();
02134
02135 initStatementMessage(*statement, message::Statement::RAW_SQL, session);
02136 statement->set_sql(query);
02137 if (not schema.empty())
02138 statement->set_raw_sql_schema(schema);
02139 finalizeStatementMessage(*statement, session);
02140
02141 finalizeTransactionMessage(*transaction, session);
02142
02143 (void) replication_services.pushTransactionMessage(session, *transaction);
02144
02145 cleanupTransactionMessage(transaction, session);
02146 }
02147
02148 int TransactionServices::sendEvent(Session::reference session,
02149 const message::Event &event)
02150 {
02151 ReplicationServices &replication_services= ReplicationServices::singleton();
02152 if (! replication_services.isActive())
02153 return 0;
02154
02155 message::Transaction *transaction= new (nothrow) message::Transaction();
02156
02157
02158 initTransactionMessage(*transaction, session, true);
02159
02160
02161 finalizeTransactionMessage(*transaction, session);
02162
02163 message::Event *trx_event= transaction->mutable_event();
02164
02165 trx_event->CopyFrom(event);
02166
02167 plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
02168
02169 delete transaction;
02170
02171 return static_cast<int>(result);
02172 }
02173
02174 bool TransactionServices::sendStartupEvent(Session::reference session)
02175 {
02176 message::Event event;
02177 event.set_type(message::Event::STARTUP);
02178 if (sendEvent(session, event) != 0)
02179 return false;
02180 return true;
02181 }
02182
02183 bool TransactionServices::sendShutdownEvent(Session::reference session)
02184 {
02185 message::Event event;
02186 event.set_type(message::Event::SHUTDOWN);
02187 if (sendEvent(session, event) != 0)
02188 return false;
02189 return true;
02190 }
02191
02192 }