Drizzled Public API Documentation

transaction_services.cc

00001 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2008 Sun Microsystems, Inc.
00005  *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
00006  *
00007  *  This program is free software; you can redistribute it and/or modify
00008  *  it under the terms of the GNU General Public License as published by
00009  *  the Free Software Foundation; version 2 of the License.
00010  *
00011  *  This program is distributed in the hope that it will be useful,
00012  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *  GNU General Public License for more details.
00015  *
00016  *  You should have received a copy of the GNU General Public License
00017  *  along with this program; if not, write to the Free Software
00018  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00019  */
00020 
00050 #include <config.h>
00051 #include <drizzled/current_session.h>
00052 #include <drizzled/error.h>
00053 #include <drizzled/gettext.h>
00054 #include <drizzled/probes.h>
00055 #include <drizzled/sql_parse.h>
00056 #include <drizzled/session.h>
00057 #include <drizzled/sql_base.h>
00058 #include <drizzled/replication_services.h>
00059 #include <drizzled/transaction_services.h>
00060 #include <drizzled/transaction_context.h>
00061 #include <drizzled/message/transaction.pb.h>
00062 #include <drizzled/message/statement_transform.h>
00063 #include <drizzled/resource_context.h>
00064 #include <drizzled/lock.h>
00065 #include <drizzled/item/int.h>
00066 #include <drizzled/item/empty_string.h>
00067 #include <drizzled/field/epoch.h>
00068 #include <drizzled/plugin/client.h>
00069 #include <drizzled/plugin/monitored_in_transaction.h>
00070 #include <drizzled/plugin/transactional_storage_engine.h>
00071 #include <drizzled/plugin/xa_resource_manager.h>
00072 #include <drizzled/plugin/xa_storage_engine.h>
00073 #include <drizzled/internal/my_sys.h>
00074 
00075 #include <vector>
00076 #include <algorithm>
00077 #include <functional>
00078 #include <google/protobuf/repeated_field.h>
00079 
00080 using namespace std;
00081 using namespace google;
00082 
00083 namespace drizzled
00084 {
00085 
00303 TransactionServices::TransactionServices()
00304 {
00305   plugin::StorageEngine *engine= plugin::StorageEngine::findByName("InnoDB");
00306   if (engine)
00307   {
00308     xa_storage_engine= (plugin::XaStorageEngine*)engine; 
00309   }
00310   else 
00311   {
00312     xa_storage_engine= NULL;
00313   }
00314 }
00315 
00316 void TransactionServices::registerResourceForStatement(Session::reference session,
00317                                                        plugin::MonitoredInTransaction *monitored,
00318                                                        plugin::TransactionalStorageEngine *engine)
00319 {
00320   if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
00321   {
00322     /* 
00323      * Now we automatically register this resource manager for the
00324      * normal transaction.  This is fine because a statement
00325      * transaction registration should always enlist the resource
00326      * in the normal transaction which contains the statement
00327      * transaction.
00328      */
00329     registerResourceForTransaction(session, monitored, engine);
00330   }
00331 
00332   TransactionContext *trans= &session.transaction.stmt;
00333   ResourceContext *resource_context= session.getResourceContext(monitored, 0);
00334 
00335   if (resource_context->isStarted())
00336     return; /* already registered, return */
00337 
00338   assert(monitored->participatesInSqlTransaction());
00339   assert(not monitored->participatesInXaTransaction());
00340 
00341   resource_context->setMonitored(monitored);
00342   resource_context->setTransactionalStorageEngine(engine);
00343   trans->registerResource(resource_context);
00344 
00345   trans->no_2pc|= true;
00346 }
00347 
00348 void TransactionServices::registerResourceForStatement(Session::reference session,
00349                                                        plugin::MonitoredInTransaction *monitored,
00350                                                        plugin::TransactionalStorageEngine *engine,
00351                                                        plugin::XaResourceManager *resource_manager)
00352 {
00353   if (session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
00354   {
00355     /* 
00356      * Now we automatically register this resource manager for the
00357      * normal transaction.  This is fine because a statement
00358      * transaction registration should always enlist the resource
00359      * in the normal transaction which contains the statement
00360      * transaction.
00361      */
00362     registerResourceForTransaction(session, monitored, engine, resource_manager);
00363   }
00364 
00365   TransactionContext *trans= &session.transaction.stmt;
00366   ResourceContext *resource_context= session.getResourceContext(monitored, 0);
00367 
00368   if (resource_context->isStarted())
00369     return; /* already registered, return */
00370 
00371   assert(monitored->participatesInXaTransaction());
00372   assert(monitored->participatesInSqlTransaction());
00373 
00374   resource_context->setMonitored(monitored);
00375   resource_context->setTransactionalStorageEngine(engine);
00376   resource_context->setXaResourceManager(resource_manager);
00377   trans->registerResource(resource_context);
00378 
00379   trans->no_2pc|= false;
00380 }
00381 
00382 void TransactionServices::registerResourceForTransaction(Session::reference session,
00383                                                          plugin::MonitoredInTransaction *monitored,
00384                                                          plugin::TransactionalStorageEngine *engine)
00385 {
00386   TransactionContext *trans= &session.transaction.all;
00387   ResourceContext *resource_context= session.getResourceContext(monitored, 1);
00388 
00389   if (resource_context->isStarted())
00390     return; /* already registered, return */
00391 
00392   session.server_status|= SERVER_STATUS_IN_TRANS;
00393 
00394   trans->registerResource(resource_context);
00395 
00396   assert(monitored->participatesInSqlTransaction());
00397   assert(not monitored->participatesInXaTransaction());
00398 
00399   resource_context->setMonitored(monitored);
00400   resource_context->setTransactionalStorageEngine(engine);
00401   trans->no_2pc|= true;
00402 
00403   if (session.transaction.xid_state.xid.is_null())
00404     session.transaction.xid_state.xid.set(session.getQueryId());
00405 
00406   /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
00407   if (! session.getResourceContext(monitored, 0)->isStarted())
00408     registerResourceForStatement(session, monitored, engine);
00409 }
00410 
00411 void TransactionServices::registerResourceForTransaction(Session::reference session,
00412                                                          plugin::MonitoredInTransaction *monitored,
00413                                                          plugin::TransactionalStorageEngine *engine,
00414                                                          plugin::XaResourceManager *resource_manager)
00415 {
00416   TransactionContext *trans= &session.transaction.all;
00417   ResourceContext *resource_context= session.getResourceContext(monitored, 1);
00418 
00419   if (resource_context->isStarted())
00420     return; /* already registered, return */
00421 
00422   session.server_status|= SERVER_STATUS_IN_TRANS;
00423 
00424   trans->registerResource(resource_context);
00425 
00426   assert(monitored->participatesInSqlTransaction());
00427 
00428   resource_context->setMonitored(monitored);
00429   resource_context->setXaResourceManager(resource_manager);
00430   resource_context->setTransactionalStorageEngine(engine);
00431   trans->no_2pc|= true;
00432 
00433   if (session.transaction.xid_state.xid.is_null())
00434     session.transaction.xid_state.xid.set(session.getQueryId());
00435 
00436   engine->startTransaction(&session, START_TRANS_NO_OPTIONS);
00437 
00438   /* Only true if user is executing a BEGIN WORK/START TRANSACTION */
00439   if (! session.getResourceContext(monitored, 0)->isStarted())
00440     registerResourceForStatement(session, monitored, engine, resource_manager);
00441 }
00442 
00443 void TransactionServices::allocateNewTransactionId()
00444 {
00445   ReplicationServices &replication_services= ReplicationServices::singleton();
00446   if (! replication_services.isActive())
00447   {
00448     return;
00449   }
00450 
00451   Session *my_session= current_session;
00452   uint64_t xa_id= xa_storage_engine->getNewTransactionId(my_session);
00453   my_session->setXaId(xa_id);
00454 }
00455 
00456 uint64_t TransactionServices::getCurrentTransactionId(Session::reference session)
00457 {
00458   if (session.getXaId() == 0)
00459   {
00460     session.setXaId(xa_storage_engine->getNewTransactionId(&session)); 
00461   }
00462 
00463   return session.getXaId();
00464 }
00465 
00466 int TransactionServices::commitTransaction(Session::reference session,
00467                                            bool normal_transaction)
00468 {
00469   int error= 0, cookie= 0;
00470   /*
00471     'all' means that this is either an explicit commit issued by
00472     user, or an implicit commit issued by a DDL.
00473   */
00474   TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
00475   TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
00476 
00477   bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
00478 
00479   /*
00480     We must not commit the normal transaction if a statement
00481     transaction is pending. Otherwise statement transaction
00482     flags will not get propagated to its normal transaction's
00483     counterpart.
00484   */
00485   assert(session.transaction.stmt.getResourceContexts().empty() ||
00486               trans == &session.transaction.stmt);
00487 
00488   if (resource_contexts.empty() == false)
00489   {
00490     if (is_real_trans && session.wait_if_global_read_lock(false, false))
00491     {
00492       rollbackTransaction(session, normal_transaction);
00493       return 1;
00494     }
00495 
00496     /*
00497      * If replication is on, we do a PREPARE on the resource managers, push the
00498      * Transaction message across the replication stream, and then COMMIT if the
00499      * replication stream returned successfully.
00500      */
00501     if (shouldConstructMessages())
00502     {
00503       for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00504            it != resource_contexts.end() && ! error;
00505            ++it)
00506       {
00507         ResourceContext *resource_context= *it;
00508         int err;
00509         /*
00510           Do not call two-phase commit if this particular
00511           transaction is read-only. This allows for simpler
00512           implementation in engines that are always read-only.
00513         */
00514         if (! resource_context->hasModifiedData())
00515           continue;
00516 
00517         plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00518 
00519         if (resource->participatesInXaTransaction())
00520         {
00521           if ((err= resource_context->getXaResourceManager()->xaPrepare(&session, normal_transaction)))
00522           {
00523             my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
00524             error= 1;
00525           }
00526           else
00527           {
00528             session.status_var.ha_prepare_count++;
00529           }
00530         }
00531       }
00532       if (error == 0 && is_real_trans)
00533       {
00534         /*
00535          * Push the constructed Transaction messages across to
00536          * replicators and appliers.
00537          */
00538         error= commitTransactionMessage(session);
00539       }
00540       if (error)
00541       {
00542         rollbackTransaction(session, normal_transaction);
00543         error= 1;
00544         goto end;
00545       }
00546     }
00547     error= commitPhaseOne(session, normal_transaction) ? (cookie ? 2 : 1) : 0;
00548 end:
00549     if (is_real_trans)
00550       session.startWaitingGlobalReadLock();
00551   }
00552   return error;
00553 }
00554 
00559 int TransactionServices::commitPhaseOne(Session::reference session,
00560                                         bool normal_transaction)
00561 {
00562   int error=0;
00563   TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
00564   TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
00565 
00566   bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
00567   bool all= normal_transaction;
00568 
00569   /* If we're in autocommit then we have a real transaction to commit
00570      (except if it's BEGIN)
00571   */
00572   if (! session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
00573     all= true;
00574 
00575   if (resource_contexts.empty() == false)
00576   {
00577     for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00578          it != resource_contexts.end();
00579          ++it)
00580     {
00581       int err;
00582       ResourceContext *resource_context= *it;
00583 
00584       plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00585 
00586       if (resource->participatesInXaTransaction())
00587       {
00588         if ((err= resource_context->getXaResourceManager()->xaCommit(&session, all)))
00589         {
00590           my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
00591           error= 1;
00592         }
00593         else if (normal_transaction)
00594         {
00595           session.status_var.ha_commit_count++;
00596         }
00597       }
00598       else if (resource->participatesInSqlTransaction())
00599       {
00600         if ((err= resource_context->getTransactionalStorageEngine()->commit(&session, all)))
00601         {
00602           my_error(ER_ERROR_DURING_COMMIT, MYF(0), err);
00603           error= 1;
00604         }
00605         else if (normal_transaction)
00606         {
00607           session.status_var.ha_commit_count++;
00608         }
00609       }
00610       resource_context->reset(); /* keep it conveniently zero-filled */
00611     }
00612 
00613     if (is_real_trans)
00614       session.transaction.xid_state.xid.null();
00615 
00616     if (normal_transaction)
00617     {
00618       session.variables.tx_isolation= session.session_tx_isolation;
00619       session.transaction.cleanup();
00620     }
00621   }
00622   trans->reset();
00623   return error;
00624 }
00625 
00626 int TransactionServices::rollbackTransaction(Session::reference session,
00627                                              bool normal_transaction)
00628 {
00629   int error= 0;
00630   TransactionContext *trans= normal_transaction ? &session.transaction.all : &session.transaction.stmt;
00631   TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
00632 
00633   bool is_real_trans= normal_transaction || session.transaction.all.getResourceContexts().empty();
00634   bool all = normal_transaction || !session_test_options(&session, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN);
00635 
00636   /*
00637     We must not rollback the normal transaction if a statement
00638     transaction is pending.
00639   */
00640   assert(session.transaction.stmt.getResourceContexts().empty() ||
00641               trans == &session.transaction.stmt);
00642 
00643   if (resource_contexts.empty() == false)
00644   {
00645     for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00646          it != resource_contexts.end();
00647          ++it)
00648     {
00649       int err;
00650       ResourceContext *resource_context= *it;
00651 
00652       plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00653 
00654       if (resource->participatesInXaTransaction())
00655       {
00656         if ((err= resource_context->getXaResourceManager()->xaRollback(&session, all)))
00657         {
00658           my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
00659           error= 1;
00660         }
00661         else if (normal_transaction)
00662         {
00663           session.status_var.ha_rollback_count++;
00664         }
00665       }
00666       else if (resource->participatesInSqlTransaction())
00667       {
00668         if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, all)))
00669         {
00670           my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
00671           error= 1;
00672         }
00673         else if (normal_transaction)
00674         {
00675           session.status_var.ha_rollback_count++;
00676         }
00677       }
00678       resource_context->reset(); /* keep it conveniently zero-filled */
00679     }
00680     
00681     /* 
00682      * We need to signal the ROLLBACK to ReplicationServices here
00683      * BEFORE we set the transaction ID to NULL.  This is because
00684      * if a bulk segment was sent to replicators, we need to send
00685      * a rollback statement with the corresponding transaction ID
00686      * to rollback.
00687      */
00688     if (all)
00689       rollbackTransactionMessage(session);
00690     else
00691       rollbackStatementMessage(session);
00692 
00693     if (is_real_trans)
00694       session.transaction.xid_state.xid.null();
00695     if (normal_transaction)
00696     {
00697       session.variables.tx_isolation=session.session_tx_isolation;
00698       session.transaction.cleanup();
00699     }
00700   }
00701   if (normal_transaction)
00702     session.transaction_rollback_request= false;
00703 
00704   /*
00705    * If a non-transactional table was updated, warn the user
00706    */
00707   if (is_real_trans &&
00708       session.transaction.all.hasModifiedNonTransData() &&
00709       session.getKilled() != Session::KILL_CONNECTION)
00710   {
00711     push_warning(&session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00712                  ER_WARNING_NOT_COMPLETE_ROLLBACK,
00713                  ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
00714   }
00715   trans->reset();
00716   return error;
00717 }
00718 
00719 int TransactionServices::autocommitOrRollback(Session::reference session,
00720                                               int error)
00721 {
00722   /* One GPB Statement message per SQL statement */
00723   message::Statement *statement= session.getStatementMessage();
00724   if ((statement != NULL) && (! error))
00725     finalizeStatementMessage(*statement, session);
00726 
00727   if (session.transaction.stmt.getResourceContexts().empty() == false)
00728   {
00729     TransactionContext *trans = &session.transaction.stmt;
00730     TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
00731     for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00732          it != resource_contexts.end();
00733          ++it)
00734     {
00735       ResourceContext *resource_context= *it;
00736 
00737       resource_context->getTransactionalStorageEngine()->endStatement(&session);
00738     }
00739 
00740     if (! error)
00741     {
00742       if (commitTransaction(session, false))
00743         error= 1;
00744     }
00745     else
00746     {
00747       (void) rollbackTransaction(session, false);
00748       if (session.transaction_rollback_request)
00749       {
00750         (void) rollbackTransaction(session, true);
00751         session.server_status&= ~SERVER_STATUS_IN_TRANS;
00752       }
00753     }
00754 
00755     session.variables.tx_isolation= session.session_tx_isolation;
00756   }
00757 
00758   return error;
00759 }
00760 
00761 struct ResourceContextCompare : public std::binary_function<ResourceContext *, ResourceContext *, bool>
00762 {
00763   result_type operator()(const ResourceContext *lhs, const ResourceContext *rhs) const
00764   {
00765     /* The below is perfectly fine, since we're simply comparing addresses for the underlying
00766      * resources aren't the same... */
00767     return reinterpret_cast<uint64_t>(lhs->getMonitored()) < reinterpret_cast<uint64_t>(rhs->getMonitored());
00768   }
00769 };
00770 
00771 int TransactionServices::rollbackToSavepoint(Session::reference session,
00772                                              NamedSavepoint &sv)
00773 {
00774   int error= 0;
00775   TransactionContext *trans= &session.transaction.all;
00776   TransactionContext::ResourceContexts &tran_resource_contexts= trans->getResourceContexts();
00777   TransactionContext::ResourceContexts &sv_resource_contexts= sv.getResourceContexts();
00778 
00779   trans->no_2pc= false;
00780   /*
00781     rolling back to savepoint in all storage engines that were part of the
00782     transaction when the savepoint was set
00783   */
00784   for (TransactionContext::ResourceContexts::iterator it= sv_resource_contexts.begin();
00785        it != sv_resource_contexts.end();
00786        ++it)
00787   {
00788     int err;
00789     ResourceContext *resource_context= *it;
00790 
00791     plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00792 
00793     if (resource->participatesInSqlTransaction())
00794     {
00795       if ((err= resource_context->getTransactionalStorageEngine()->rollbackToSavepoint(&session, sv)))
00796       {
00797         my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
00798         error= 1;
00799       }
00800       else
00801       {
00802         session.status_var.ha_savepoint_rollback_count++;
00803       }
00804     }
00805     trans->no_2pc|= not resource->participatesInXaTransaction();
00806   }
00807   /*
00808     rolling back the transaction in all storage engines that were not part of
00809     the transaction when the savepoint was set
00810   */
00811   {
00812     TransactionContext::ResourceContexts sorted_tran_resource_contexts(tran_resource_contexts);
00813     TransactionContext::ResourceContexts sorted_sv_resource_contexts(sv_resource_contexts);
00814     TransactionContext::ResourceContexts set_difference_contexts;
00815 
00816     /* 
00817      * Bug #542299: segfault during set_difference() below.  copy<>() requires pre-allocation
00818      * of all elements, including the target, which is why we pre-allocate the set_difference_contexts
00819      * here
00820      */
00821     set_difference_contexts.reserve(max(tran_resource_contexts.size(), sv_resource_contexts.size()));
00822 
00823     sort(sorted_tran_resource_contexts.begin(),
00824          sorted_tran_resource_contexts.end(),
00825          ResourceContextCompare());
00826     sort(sorted_sv_resource_contexts.begin(),
00827          sorted_sv_resource_contexts.end(),
00828          ResourceContextCompare());
00829     set_difference(sorted_tran_resource_contexts.begin(),
00830                    sorted_tran_resource_contexts.end(),
00831                    sorted_sv_resource_contexts.begin(),
00832                    sorted_sv_resource_contexts.end(),
00833                    set_difference_contexts.begin(),
00834                    ResourceContextCompare());
00835     /* 
00836      * set_difference_contexts now contains all resource contexts
00837      * which are in the transaction context but were NOT in the
00838      * savepoint's resource contexts.
00839      */
00840         
00841     for (TransactionContext::ResourceContexts::iterator it= set_difference_contexts.begin();
00842          it != set_difference_contexts.end();
00843          ++it)
00844     {
00845       ResourceContext *resource_context= *it;
00846       int err;
00847 
00848       plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00849 
00850       if (resource->participatesInSqlTransaction())
00851       {
00852         if ((err= resource_context->getTransactionalStorageEngine()->rollback(&session, !(0))))
00853         {
00854           my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
00855           error= 1;
00856         }
00857         else
00858         {
00859           session.status_var.ha_rollback_count++;
00860         }
00861       }
00862       resource_context->reset(); /* keep it conveniently zero-filled */
00863     }
00864   }
00865   trans->setResourceContexts(sv_resource_contexts);
00866 
00867   if (shouldConstructMessages())
00868   {
00869     cleanupTransactionMessage(getActiveTransactionMessage(session), session);
00870     message::Transaction *savepoint_transaction= sv.getTransactionMessage();
00871     if (savepoint_transaction != NULL)
00872     {
00873       /* Make a copy of the savepoint transaction, this is necessary to assure proper cleanup. 
00874          Upon commit the savepoint_transaction_copy will be cleaned up by a call to 
00875          cleanupTransactionMessage(). The Transaction message in NamedSavepoint will be cleaned
00876          up when the savepoint is cleaned up. This avoids calling delete twice on the Transaction.
00877       */ 
00878       message::Transaction *savepoint_transaction_copy= new message::Transaction(*sv.getTransactionMessage());
00879       uint32_t num_statements = savepoint_transaction_copy->statement_size();
00880       if (num_statements == 0)
00881       {    
00882         session.setStatementMessage(NULL);
00883       }    
00884       else 
00885       {
00886         session.setStatementMessage(savepoint_transaction_copy->mutable_statement(num_statements - 1));    
00887       }    
00888       session.setTransactionMessage(savepoint_transaction_copy);
00889     }
00890   }
00891 
00892   return error;
00893 }
00894 
00901 int TransactionServices::setSavepoint(Session::reference session,
00902                                       NamedSavepoint &sv)
00903 {
00904   int error= 0;
00905   TransactionContext *trans= &session.transaction.all;
00906   TransactionContext::ResourceContexts &resource_contexts= trans->getResourceContexts();
00907 
00908   if (resource_contexts.empty() == false)
00909   {
00910     for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00911          it != resource_contexts.end();
00912          ++it)
00913     {
00914       ResourceContext *resource_context= *it;
00915       int err;
00916 
00917       plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00918 
00919       if (resource->participatesInSqlTransaction())
00920       {
00921         if ((err= resource_context->getTransactionalStorageEngine()->setSavepoint(&session, sv)))
00922         {
00923           my_error(ER_GET_ERRNO, MYF(0), err);
00924           error= 1;
00925         }
00926         else
00927         {
00928           session.status_var.ha_savepoint_count++;
00929         }
00930       }
00931     }
00932   }
00933   /*
00934     Remember the list of registered storage engines.
00935   */
00936   sv.setResourceContexts(resource_contexts);
00937 
00938   if (shouldConstructMessages())
00939   {
00940     message::Transaction *transaction= session.getTransactionMessage();
00941                   
00942     if (transaction != NULL)
00943     {
00944       message::Transaction *transaction_savepoint= 
00945         new message::Transaction(*transaction);
00946       sv.setTransactionMessage(transaction_savepoint);
00947     }
00948   } 
00949 
00950   return error;
00951 }
00952 
00953 int TransactionServices::releaseSavepoint(Session::reference session,
00954                                           NamedSavepoint &sv)
00955 {
00956   int error= 0;
00957 
00958   TransactionContext::ResourceContexts &resource_contexts= sv.getResourceContexts();
00959 
00960   for (TransactionContext::ResourceContexts::iterator it= resource_contexts.begin();
00961        it != resource_contexts.end();
00962        ++it)
00963   {
00964     int err;
00965     ResourceContext *resource_context= *it;
00966 
00967     plugin::MonitoredInTransaction *resource= resource_context->getMonitored();
00968 
00969     if (resource->participatesInSqlTransaction())
00970     {
00971       if ((err= resource_context->getTransactionalStorageEngine()->releaseSavepoint(&session, sv)))
00972       {
00973         my_error(ER_GET_ERRNO, MYF(0), err);
00974         error= 1;
00975       }
00976     }
00977   }
00978   
00979   return error;
00980 }
00981 
00982 bool TransactionServices::shouldConstructMessages()
00983 {
00984   ReplicationServices &replication_services= ReplicationServices::singleton();
00985   return replication_services.isActive();
00986 }
00987 
00988 message::Transaction *TransactionServices::getActiveTransactionMessage(Session::reference session,
00989                                                                        bool should_inc_trx_id)
00990 {
00991   message::Transaction *transaction= session.getTransactionMessage();
00992 
00993   if (unlikely(transaction == NULL))
00994   {
00995     /* 
00996      * Allocate and initialize a new transaction message 
00997      * for this Session object.  Session is responsible for
00998      * deleting transaction message when done with it.
00999      */
01000     transaction= new (nothrow) message::Transaction();
01001     initTransactionMessage(*transaction, session, should_inc_trx_id);
01002     session.setTransactionMessage(transaction);
01003     return transaction;
01004   }
01005   else
01006     return transaction;
01007 }
01008 
01009 void TransactionServices::initTransactionMessage(message::Transaction &transaction,
01010                                                  Session::reference session,
01011                                                  bool should_inc_trx_id)
01012 {
01013   message::TransactionContext *trx= transaction.mutable_transaction_context();
01014   trx->set_server_id(session.getServerId());
01015 
01016   if (should_inc_trx_id)
01017   {
01018     trx->set_transaction_id(getCurrentTransactionId(session));
01019     session.setXaId(0);
01020   }
01021   else
01022   {
01023     /* trx and seg id will get set properly elsewhere */
01024     trx->set_transaction_id(0);
01025   }
01026 
01027   trx->set_start_timestamp(session.getCurrentTimestamp());
01028   
01029   /* segment info may get set elsewhere as needed */
01030   transaction.set_segment_id(1);
01031   transaction.set_end_segment(true);
01032 }
01033 
01034 void TransactionServices::finalizeTransactionMessage(message::Transaction &transaction,
01035                                                      Session::const_reference session)
01036 {
01037   message::TransactionContext *trx= transaction.mutable_transaction_context();
01038   trx->set_end_timestamp(session.getCurrentTimestamp());
01039 }
01040 
01041 void TransactionServices::cleanupTransactionMessage(message::Transaction *transaction,
01042                                                     Session::reference session)
01043 {
01044   delete transaction;
01045   session.setStatementMessage(NULL);
01046   session.setTransactionMessage(NULL);
01047   session.setXaId(0);
01048 }
01049 
01050 int TransactionServices::commitTransactionMessage(Session::reference session)
01051 {
01052   ReplicationServices &replication_services= ReplicationServices::singleton();
01053   if (! replication_services.isActive())
01054     return 0;
01055 
01056   /*
01057    * If no Transaction message was ever created, then no data modification
01058    * occurred inside the transaction, so nothing to do.
01059    */
01060   if (session.getTransactionMessage() == NULL)
01061     return 0;
01062   
01063   /* If there is an active statement message, finalize it. */
01064   message::Statement *statement= session.getStatementMessage();
01065 
01066   if (statement != NULL)
01067   {
01068     finalizeStatementMessage(*statement, session);
01069   }
01070 
01071   message::Transaction* transaction= getActiveTransactionMessage(session);
01072 
01073   /*
01074    * It is possible that we could have a Transaction without any Statements
01075    * if we had created a Statement but had to roll it back due to it failing
01076    * mid-execution, and no subsequent Statements were added to the Transaction
01077    * message. In this case, we simply clean up the message and not push it.
01078    */
01079   if (transaction->statement_size() == 0)
01080   {
01081     cleanupTransactionMessage(transaction, session);
01082     return 0;
01083   }
01084   
01085   finalizeTransactionMessage(*transaction, session);
01086   
01087   plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
01088 
01089   cleanupTransactionMessage(transaction, session);
01090 
01091   return static_cast<int>(result);
01092 }
01093 
01094 void TransactionServices::initStatementMessage(message::Statement &statement,
01095                                                message::Statement::Type type,
01096                                                Session::const_reference session)
01097 {
01098   statement.set_type(type);
01099   statement.set_start_timestamp(session.getCurrentTimestamp());
01100 
01101   if (session.variables.replicate_query)
01102     statement.set_sql(session.getQueryString()->c_str());
01103 }
01104 
01105 void TransactionServices::finalizeStatementMessage(message::Statement &statement,
01106                                                    Session::reference session)
01107 {
01108   statement.set_end_timestamp(session.getCurrentTimestamp());
01109   session.setStatementMessage(NULL);
01110 }
01111 
01112 void TransactionServices::rollbackTransactionMessage(Session::reference session)
01113 {
01114   ReplicationServices &replication_services= ReplicationServices::singleton();
01115   if (! replication_services.isActive())
01116     return;
01117   
01118   message::Transaction *transaction= getActiveTransactionMessage(session);
01119 
01120   /*
01121    * OK, so there are two situations that we need to deal with here:
01122    *
01123    * 1) We receive an instruction to ROLLBACK the current transaction
01124    *    and the currently-stored Transaction message is *self-contained*, 
01125    *    meaning that no Statement messages in the Transaction message
01126    *    contain a message having its segment_id member greater than 1.  If
01127    *    no non-segment ID 1 members are found, we can simply clear the
01128    *    current Transaction message and remove it from memory.
01129    *
01130    * 2) If the Transaction message does indeed have a non-end segment, that
01131    *    means that a bulk update/delete/insert Transaction message segment
01132    *    has previously been sent over the wire to replicators.  In this case, 
01133    *    we need to package a Transaction with a Statement message of type
01134    *    ROLLBACK to indicate to replicators that previously-transmitted
01135    *    messages must be un-applied.
01136    */
01137   if (unlikely(message::transactionContainsBulkSegment(*transaction)))
01138   {
01139     /* Remember the transaction ID so we can re-use it */
01140     uint64_t trx_id= transaction->transaction_context().transaction_id();
01141     uint32_t seg_id= transaction->segment_id();
01142 
01143     /*
01144      * Clear the transaction, create a Rollback statement message, 
01145      * attach it to the transaction, and push it to replicators.
01146      */
01147     transaction->Clear();
01148     initTransactionMessage(*transaction, session, false);
01149 
01150     /* Set the transaction ID to match the previous messages */
01151     transaction->mutable_transaction_context()->set_transaction_id(trx_id);
01152     transaction->set_segment_id(seg_id);
01153     transaction->set_end_segment(true);
01154 
01155     message::Statement *statement= transaction->add_statement();
01156 
01157     initStatementMessage(*statement, message::Statement::ROLLBACK, session);
01158     finalizeStatementMessage(*statement, session);
01159 
01160     finalizeTransactionMessage(*transaction, session);
01161     
01162     (void) replication_services.pushTransactionMessage(session, *transaction);
01163   }
01164 
01165   cleanupTransactionMessage(transaction, session);
01166 }
01167 
01168 void TransactionServices::rollbackStatementMessage(Session::reference session)
01169 {
01170   ReplicationServices &replication_services= ReplicationServices::singleton();
01171   if (! replication_services.isActive())
01172     return;
01173 
01174   message::Statement *current_statement= session.getStatementMessage();
01175 
01176   /* If we never added a Statement message, nothing to undo. */
01177   if (current_statement == NULL)
01178     return;
01179 
01180   /*
01181    * If the Statement has been segmented, then we've already pushed a portion
01182    * of this Statement's row changes through the replication stream and we
01183    * need to send a ROLLBACK_STATEMENT message. Otherwise, we can simply
01184    * delete the current Statement message.
01185    */
01186   bool is_segmented= false;
01187 
01188   switch (current_statement->type())
01189   {
01190     case message::Statement::INSERT:
01191       if (current_statement->insert_data().segment_id() > 1)
01192         is_segmented= true;
01193       break;
01194 
01195     case message::Statement::UPDATE:
01196       if (current_statement->update_data().segment_id() > 1)
01197         is_segmented= true;
01198       break;
01199 
01200     case message::Statement::DELETE:
01201       if (current_statement->delete_data().segment_id() > 1)
01202         is_segmented= true;
01203       break;
01204 
01205     default:
01206       break;
01207   }
01208 
01209   /*
01210    * Remove the Statement message we've been working with (same as
01211    * current_statement).
01212    */
01213   message::Transaction *transaction= getActiveTransactionMessage(session);
01214   google::protobuf::RepeatedPtrField<message::Statement> *statements_in_txn;
01215   statements_in_txn= transaction->mutable_statement();
01216   statements_in_txn->RemoveLast();
01217   session.setStatementMessage(NULL);
01218   
01219   /*
01220    * Create the ROLLBACK_STATEMENT message, if we need to. This serves as
01221    * an indicator to cancel the previous Statement message which should have
01222    * had its end_segment attribute set to false.
01223    */
01224   if (is_segmented)
01225   {
01226     current_statement= transaction->add_statement();
01227     initStatementMessage(*current_statement,
01228                          message::Statement::ROLLBACK_STATEMENT,
01229                          session);
01230     finalizeStatementMessage(*current_statement, session);
01231   }
01232 }
01233 
01234 message::Transaction *TransactionServices::segmentTransactionMessage(Session::reference session,
01235                                                                      message::Transaction *transaction)
01236 {
01237   uint64_t trx_id= transaction->transaction_context().transaction_id();
01238   uint32_t seg_id= transaction->segment_id();
01239   
01240   transaction->set_end_segment(false);
01241   commitTransactionMessage(session);
01242   transaction= getActiveTransactionMessage(session, false);
01243   
01244   /* Set the transaction ID to match the previous messages */
01245   transaction->mutable_transaction_context()->set_transaction_id(trx_id);
01246   transaction->set_segment_id(seg_id + 1);
01247   transaction->set_end_segment(true);
01248 
01249   return transaction;
01250 }
01251 
01252 message::Statement &TransactionServices::getInsertStatement(Session::reference session,
01253                                                             Table &table,
01254                                                             uint32_t *next_segment_id)
01255 {
01256   message::Statement *statement= session.getStatementMessage();
01257   message::Transaction *transaction= NULL;
01258   
01259   /*
01260    * If statement is NULL, this is a new statement.
01261    * If statement is NOT NULL, this a continuation of the same statement.
01262    * This is because autocommitOrRollback() finalizes the statement so that
01263    * we guarantee only one Statement message per statement (i.e., we no longer
01264    * share a single GPB message for multiple statements).
01265    */
01266   if (statement == NULL)
01267   {
01268     transaction= getActiveTransactionMessage(session);
01269 
01270     if (static_cast<size_t>(transaction->ByteSize()) >= 
01271         transaction_message_threshold)
01272     {
01273       transaction= segmentTransactionMessage(session, transaction);
01274     }
01275 
01276     statement= transaction->add_statement();
01277     setInsertHeader(*statement, session, table);
01278     session.setStatementMessage(statement);
01279   }
01280   else
01281   {
01282     transaction= getActiveTransactionMessage(session);
01283     
01284     /*
01285      * If we've passed our threshold for the statement size (possible for
01286      * a bulk insert), we'll finalize the Statement and Transaction (doing
01287      * the Transaction will keep it from getting huge).
01288      */
01289     if (static_cast<size_t>(transaction->ByteSize()) >= 
01290         transaction_message_threshold)
01291     {
01292       /* Remember the transaction ID so we can re-use it */
01293       uint64_t trx_id= transaction->transaction_context().transaction_id();
01294       uint32_t seg_id= transaction->segment_id();
01295       
01296       message::InsertData *current_data= statement->mutable_insert_data();
01297       
01298       /* Caller should use this value when adding a new record */
01299       *next_segment_id= current_data->segment_id() + 1;
01300       
01301       current_data->set_end_segment(false);
01302       transaction->set_end_segment(false);
01303       
01304       /* 
01305        * Send the trx message to replicators after finalizing the 
01306        * statement and transaction. This will also set the Transaction
01307        * and Statement objects in Session to NULL.
01308        */
01309       commitTransactionMessage(session);
01310       
01311       /*
01312        * Statement and Transaction should now be NULL, so new ones will get
01313        * created. We reuse the transaction id since we are segmenting
01314        * one transaction.
01315        */
01316       transaction= getActiveTransactionMessage(session, false);
01317       assert(transaction != NULL);
01318 
01319       statement= transaction->add_statement();
01320       setInsertHeader(*statement, session, table);
01321       session.setStatementMessage(statement);
01322             
01323       /* Set the transaction ID to match the previous messages */
01324       transaction->mutable_transaction_context()->set_transaction_id(trx_id);
01325       transaction->set_segment_id(seg_id + 1);
01326       transaction->set_end_segment(true);
01327     }
01328     else
01329     {
01330       /*
01331        * Continuation of the same statement. Carry forward the existing
01332        * segment id.
01333        */
01334       const message::InsertData &current_data= statement->insert_data();
01335       *next_segment_id= current_data.segment_id();
01336     }
01337   }
01338   
01339   return *statement;
01340 }
01341 
01342 void TransactionServices::setInsertHeader(message::Statement &statement,
01343                                           Session::const_reference session,
01344                                           Table &table)
01345 {
01346   initStatementMessage(statement, message::Statement::INSERT, session);
01347 
01348   /* 
01349    * Now we construct the specialized InsertHeader message inside
01350    * the generalized message::Statement container...
01351    */
01352   /* Set up the insert header */
01353   message::InsertHeader *header= statement.mutable_insert_header();
01354   message::TableMetadata *table_metadata= header->mutable_table_metadata();
01355 
01356   string schema_name;
01357   (void) table.getShare()->getSchemaName(schema_name);
01358   string table_name;
01359   (void) table.getShare()->getTableName(table_name);
01360 
01361   table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
01362   table_metadata->set_table_name(table_name.c_str(), table_name.length());
01363 
01364   Field *current_field;
01365   Field **table_fields= table.getFields();
01366 
01367   message::FieldMetadata *field_metadata;
01368 
01369   /* We will read all the table's fields... */
01370   table.setReadSet();
01371 
01372   while ((current_field= *table_fields++) != NULL) 
01373   {
01374     field_metadata= header->add_field_metadata();
01375     field_metadata->set_name(current_field->field_name);
01376     field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
01377   }
01378 }
01379 
01380 bool TransactionServices::insertRecord(Session::reference session,
01381                                        Table &table)
01382 {
01383   ReplicationServices &replication_services= ReplicationServices::singleton();
01384   if (! replication_services.isActive())
01385     return false;
01386 
01387   if (not table.getShare()->is_replicated())
01388     return false;
01389 
01398   if (not table.getShare()->hasPrimaryKey())
01399   {
01400     my_error(ER_NO_PRIMARY_KEY_ON_REPLICATED_TABLE, MYF(0));
01401     return true;
01402   }
01403 
01404   uint32_t next_segment_id= 1;
01405   message::Statement &statement= getInsertStatement(session, table, &next_segment_id);
01406 
01407   message::InsertData *data= statement.mutable_insert_data();
01408   data->set_segment_id(next_segment_id);
01409   data->set_end_segment(true);
01410   message::InsertRecord *record= data->add_record();
01411 
01412   Field *current_field;
01413   Field **table_fields= table.getFields();
01414 
01415   String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
01416   string_value->set_charset(system_charset_info);
01417 
01418   /* We will read all the table's fields... */
01419   table.setReadSet();
01420 
01421   while ((current_field= *table_fields++) != NULL) 
01422   {
01423     if (current_field->is_null())
01424     {
01425       record->add_is_null(true);
01426       record->add_insert_value("", 0);
01427     } 
01428     else 
01429     {
01430       string_value= current_field->val_str_internal(string_value);
01431       record->add_is_null(false);
01432       record->add_insert_value(string_value->c_ptr(), string_value->length());
01433       string_value->free();
01434     }
01435   }
01436   return false;
01437 }
01438 
01439 message::Statement &TransactionServices::getUpdateStatement(Session::reference session,
01440                                                             Table &table,
01441                                                             const unsigned char *old_record, 
01442                                                             const unsigned char *new_record,
01443                                                             uint32_t *next_segment_id)
01444 {
01445   message::Statement *statement= session.getStatementMessage();
01446   message::Transaction *transaction= NULL;
01447 
01448   /*
01449    * If statement is NULL, this is a new statement.
01450    * If statement is NOT NULL, this a continuation of the same statement.
01451    * This is because autocommitOrRollback() finalizes the statement so that
01452    * we guarantee only one Statement message per statement (i.e., we no longer
01453    * share a single GPB message for multiple statements).
01454    */
01455   if (statement == NULL)
01456   {
01457     transaction= getActiveTransactionMessage(session);
01458     
01459     if (static_cast<size_t>(transaction->ByteSize()) >= 
01460         transaction_message_threshold)
01461     {
01462       transaction= segmentTransactionMessage(session, transaction);
01463     }
01464     
01465     statement= transaction->add_statement();
01466     setUpdateHeader(*statement, session, table, old_record, new_record);
01467     session.setStatementMessage(statement);
01468   }
01469   else
01470   {
01471     transaction= getActiveTransactionMessage(session);
01472     
01473     /*
01474      * If we've passed our threshold for the statement size (possible for
01475      * a bulk insert), we'll finalize the Statement and Transaction (doing
01476      * the Transaction will keep it from getting huge).
01477      */
01478     if (static_cast<size_t>(transaction->ByteSize()) >= 
01479         transaction_message_threshold)
01480     {
01481       /* Remember the transaction ID so we can re-use it */
01482       uint64_t trx_id= transaction->transaction_context().transaction_id();
01483       uint32_t seg_id= transaction->segment_id();
01484       
01485       message::UpdateData *current_data= statement->mutable_update_data();
01486       
01487       /* Caller should use this value when adding a new record */
01488       *next_segment_id= current_data->segment_id() + 1;
01489       
01490       current_data->set_end_segment(false);
01491       transaction->set_end_segment(false);
01492       
01493       /* 
01494        * Send the trx message to replicators after finalizing the 
01495        * statement and transaction. This will also set the Transaction
01496        * and Statement objects in Session to NULL.
01497        */
01498       commitTransactionMessage(session);
01499       
01500       /*
01501        * Statement and Transaction should now be NULL, so new ones will get
01502        * created. We reuse the transaction id since we are segmenting
01503        * one transaction.
01504        */
01505       transaction= getActiveTransactionMessage(session, false);
01506       assert(transaction != NULL);
01507       
01508       statement= transaction->add_statement();
01509       setUpdateHeader(*statement, session, table, old_record, new_record);
01510       session.setStatementMessage(statement);
01511       
01512       /* Set the transaction ID to match the previous messages */
01513       transaction->mutable_transaction_context()->set_transaction_id(trx_id);
01514       transaction->set_segment_id(seg_id + 1);
01515       transaction->set_end_segment(true);
01516     }
01517     else
01518     {
01519       /*
01520        * Continuation of the same statement. Carry forward the existing
01521        * segment id.
01522        */
01523       const message::UpdateData &current_data= statement->update_data();
01524       *next_segment_id= current_data.segment_id();
01525     }
01526   }
01527   
01528   return *statement;
01529 }
01530 
01531 void TransactionServices::setUpdateHeader(message::Statement &statement,
01532                                           Session::const_reference session,
01533                                           Table &table,
01534                                           const unsigned char *old_record, 
01535                                           const unsigned char *new_record)
01536 {
01537   initStatementMessage(statement, message::Statement::UPDATE, session);
01538 
01539   /* 
01540    * Now we construct the specialized UpdateHeader message inside
01541    * the generalized message::Statement container...
01542    */
01543   /* Set up the update header */
01544   message::UpdateHeader *header= statement.mutable_update_header();
01545   message::TableMetadata *table_metadata= header->mutable_table_metadata();
01546 
01547   string schema_name;
01548   (void) table.getShare()->getSchemaName(schema_name);
01549   string table_name;
01550   (void) table.getShare()->getTableName(table_name);
01551 
01552   table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
01553   table_metadata->set_table_name(table_name.c_str(), table_name.length());
01554 
01555   Field *current_field;
01556   Field **table_fields= table.getFields();
01557 
01558   message::FieldMetadata *field_metadata;
01559 
01560   /* We will read all the table's fields... */
01561   table.setReadSet();
01562 
01563   while ((current_field= *table_fields++) != NULL) 
01564   {
01565     /*
01566      * We add the "key field metadata" -- i.e. the fields which is
01567      * the primary key for the table.
01568      */
01569     if (table.getShare()->fieldInPrimaryKey(current_field))
01570     {
01571       field_metadata= header->add_key_field_metadata();
01572       field_metadata->set_name(current_field->field_name);
01573       field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
01574     }
01575 
01576     if (isFieldUpdated(current_field, table, old_record, new_record))
01577     {
01578       /* Field is changed from old to new */
01579       field_metadata= header->add_set_field_metadata();
01580       field_metadata->set_name(current_field->field_name);
01581       field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
01582     }
01583   }
01584 }
01585 
01586 void TransactionServices::updateRecord(Session::reference session,
01587                                        Table &table, 
01588                                        const unsigned char *old_record, 
01589                                        const unsigned char *new_record)
01590 {
01591   ReplicationServices &replication_services= ReplicationServices::singleton();
01592   if (! replication_services.isActive())
01593     return;
01594 
01595   if (not table.getShare()->is_replicated())
01596     return;
01597 
01598   uint32_t next_segment_id= 1;
01599   message::Statement &statement= getUpdateStatement(session, table, old_record, new_record, &next_segment_id);
01600 
01601   message::UpdateData *data= statement.mutable_update_data();
01602   data->set_segment_id(next_segment_id);
01603   data->set_end_segment(true);
01604   message::UpdateRecord *record= data->add_record();
01605 
01606   Field *current_field;
01607   Field **table_fields= table.getFields();
01608   String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
01609   string_value->set_charset(system_charset_info);
01610 
01611   while ((current_field= *table_fields++) != NULL) 
01612   {
01613     /*
01614      * Here, we add the SET field values.  We used to do this in the setUpdateHeader() method, 
01615      * but then realized that an UPDATE statement could potentially have different values for
01616      * the SET field.  For instance, imagine this SQL scenario:
01617      *
01618      * CREATE TABLE t1 (id INT NOT NULL PRIMARY KEY, count INT NOT NULL);
01619      * INSERT INTO t1 (id, counter) VALUES (1,1),(2,2),(3,3);
01620      * UPDATE t1 SET counter = counter + 1 WHERE id IN (1,2);
01621      *
01622      * We will generate two UpdateRecord messages with different set_value byte arrays.
01623      */
01624     if (isFieldUpdated(current_field, table, old_record, new_record))
01625     {
01626       /* Store the original "read bit" for this field */
01627       bool is_read_set= current_field->isReadSet();
01628 
01629       /* We need to mark that we will "read" this field... */
01630       table.setReadSet(current_field->position());
01631 
01632       /* Read the string value of this field's contents */
01633       string_value= current_field->val_str_internal(string_value);
01634 
01635       /* 
01636        * Reset the read bit after reading field to its original state.  This 
01637        * prevents the field from being included in the WHERE clause
01638        */
01639       current_field->setReadSet(is_read_set);
01640 
01641       if (current_field->is_null())
01642       {
01643         record->add_is_null(true);
01644         record->add_after_value("", 0);
01645       }
01646       else
01647       {
01648         record->add_is_null(false);
01649         record->add_after_value(string_value->c_ptr(), string_value->length());
01650       }
01651       string_value->free();
01652     }
01653 
01654     /* 
01655      * Add the WHERE clause values now...for now, this means the
01656      * primary key field value.  Replication only supports tables
01657      * with a primary key.
01658      */
01659     if (table.getShare()->fieldInPrimaryKey(current_field))
01660     {
01666       string_value= current_field->val_str_internal(string_value,
01667                                                     old_record + 
01668                                                     current_field->offset(const_cast<unsigned char *>(new_record)));
01669       record->add_key_value(string_value->c_ptr(), string_value->length());
01670       string_value->free();
01671     }
01672 
01673   }
01674 }
01675 
01676 bool TransactionServices::isFieldUpdated(Field *current_field,
01677                                          Table &table,
01678                                          const unsigned char *old_record,
01679                                          const unsigned char *new_record)
01680 {
01681   /*
01682    * The below really should be moved into the Field API and Record API.  But for now
01683    * we do this crazy pointer fiddling to figure out if the current field
01684    * has been updated in the supplied record raw byte pointers.
01685    */
01686   const unsigned char *old_ptr= (const unsigned char *) old_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
01687   const unsigned char *new_ptr= (const unsigned char *) new_record + (ptrdiff_t) (current_field->ptr - table.getInsertRecord());
01688 
01689   uint32_t field_length= current_field->pack_length(); 
01691   bool old_value_is_null= current_field->is_null_in_record(old_record);
01692   bool new_value_is_null= current_field->is_null_in_record(new_record);
01693 
01694   bool isUpdated= false;
01695   if (old_value_is_null != new_value_is_null)
01696   {
01697     if ((old_value_is_null) && (! new_value_is_null)) /* old value is NULL, new value is non NULL */
01698     {
01699       isUpdated= true;
01700     }
01701     else if ((! old_value_is_null) && (new_value_is_null)) /* old value is non NULL, new value is NULL */
01702     {
01703       isUpdated= true;
01704     }
01705   }
01706 
01707   if (! isUpdated)
01708   {
01709     if (memcmp(old_ptr, new_ptr, field_length) != 0)
01710     {
01711       isUpdated= true;
01712     }
01713   }
01714   return isUpdated;
01715 }  
01716 
01717 message::Statement &TransactionServices::getDeleteStatement(Session::reference session,
01718                                                             Table &table,
01719                                                             uint32_t *next_segment_id)
01720 {
01721   message::Statement *statement= session.getStatementMessage();
01722   message::Transaction *transaction= NULL;
01723 
01724   /*
01725    * If statement is NULL, this is a new statement.
01726    * If statement is NOT NULL, this a continuation of the same statement.
01727    * This is because autocommitOrRollback() finalizes the statement so that
01728    * we guarantee only one Statement message per statement (i.e., we no longer
01729    * share a single GPB message for multiple statements).
01730    */
01731   if (statement == NULL)
01732   {
01733     transaction= getActiveTransactionMessage(session);
01734     
01735     if (static_cast<size_t>(transaction->ByteSize()) >= 
01736         transaction_message_threshold)
01737     {
01738       transaction= segmentTransactionMessage(session, transaction);
01739     }
01740     
01741     statement= transaction->add_statement();
01742     setDeleteHeader(*statement, session, table);
01743     session.setStatementMessage(statement);
01744   }
01745   else
01746   {
01747     transaction= getActiveTransactionMessage(session);
01748     
01749     /*
01750      * If we've passed our threshold for the statement size (possible for
01751      * a bulk insert), we'll finalize the Statement and Transaction (doing
01752      * the Transaction will keep it from getting huge).
01753      */
01754     if (static_cast<size_t>(transaction->ByteSize()) >= 
01755         transaction_message_threshold)
01756     {
01757       /* Remember the transaction ID so we can re-use it */
01758       uint64_t trx_id= transaction->transaction_context().transaction_id();
01759       uint32_t seg_id= transaction->segment_id();
01760       
01761       message::DeleteData *current_data= statement->mutable_delete_data();
01762       
01763       /* Caller should use this value when adding a new record */
01764       *next_segment_id= current_data->segment_id() + 1;
01765       
01766       current_data->set_end_segment(false);
01767       transaction->set_end_segment(false);
01768       
01769       /* 
01770        * Send the trx message to replicators after finalizing the 
01771        * statement and transaction. This will also set the Transaction
01772        * and Statement objects in Session to NULL.
01773        */
01774       commitTransactionMessage(session);
01775       
01776       /*
01777        * Statement and Transaction should now be NULL, so new ones will get
01778        * created. We reuse the transaction id since we are segmenting
01779        * one transaction.
01780        */
01781       transaction= getActiveTransactionMessage(session, false);
01782       assert(transaction != NULL);
01783       
01784       statement= transaction->add_statement();
01785       setDeleteHeader(*statement, session, table);
01786       session.setStatementMessage(statement);
01787       
01788       /* Set the transaction ID to match the previous messages */
01789       transaction->mutable_transaction_context()->set_transaction_id(trx_id);
01790       transaction->set_segment_id(seg_id + 1);
01791       transaction->set_end_segment(true);
01792     }
01793     else
01794     {
01795       /*
01796        * Continuation of the same statement. Carry forward the existing
01797        * segment id.
01798        */
01799       const message::DeleteData &current_data= statement->delete_data();
01800       *next_segment_id= current_data.segment_id();
01801     }
01802   }
01803   
01804   return *statement;
01805 }
01806 
01807 void TransactionServices::setDeleteHeader(message::Statement &statement,
01808                                           Session::const_reference session,
01809                                           Table &table)
01810 {
01811   initStatementMessage(statement, message::Statement::DELETE, session);
01812 
01813   /* 
01814    * Now we construct the specialized DeleteHeader message inside
01815    * the generalized message::Statement container...
01816    */
01817   message::DeleteHeader *header= statement.mutable_delete_header();
01818   message::TableMetadata *table_metadata= header->mutable_table_metadata();
01819 
01820   string schema_name;
01821   (void) table.getShare()->getSchemaName(schema_name);
01822   string table_name;
01823   (void) table.getShare()->getTableName(table_name);
01824 
01825   table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
01826   table_metadata->set_table_name(table_name.c_str(), table_name.length());
01827 
01828   Field *current_field;
01829   Field **table_fields= table.getFields();
01830 
01831   message::FieldMetadata *field_metadata;
01832 
01833   while ((current_field= *table_fields++) != NULL) 
01834   {
01835     /* 
01836      * Add the WHERE clause values now...for now, this means the
01837      * primary key field value.  Replication only supports tables
01838      * with a primary key.
01839      */
01840     if (table.getShare()->fieldInPrimaryKey(current_field))
01841     {
01842       field_metadata= header->add_key_field_metadata();
01843       field_metadata->set_name(current_field->field_name);
01844       field_metadata->set_type(message::internalFieldTypeToFieldProtoType(current_field->type()));
01845     }
01846   }
01847 }
01848 
01849 void TransactionServices::deleteRecord(Session::reference session,
01850                                        Table &table,
01851                                        bool use_update_record)
01852 {
01853   ReplicationServices &replication_services= ReplicationServices::singleton();
01854   if (! replication_services.isActive())
01855     return;
01856 
01857   if (not table.getShare()->is_replicated())
01858     return;
01859 
01860   uint32_t next_segment_id= 1;
01861   message::Statement &statement= getDeleteStatement(session, table, &next_segment_id);
01862 
01863   message::DeleteData *data= statement.mutable_delete_data();
01864   data->set_segment_id(next_segment_id);
01865   data->set_end_segment(true);
01866   message::DeleteRecord *record= data->add_record();
01867 
01868   Field *current_field;
01869   Field **table_fields= table.getFields();
01870   String *string_value= new (session.mem_root) String(TransactionServices::DEFAULT_RECORD_SIZE);
01871   string_value->set_charset(system_charset_info);
01872 
01873   while ((current_field= *table_fields++) != NULL) 
01874   {
01875     /* 
01876      * Add the WHERE clause values now...for now, this means the
01877      * primary key field value.  Replication only supports tables
01878      * with a primary key.
01879      */
01880     if (table.getShare()->fieldInPrimaryKey(current_field))
01881     {
01882       if (use_update_record)
01883       {
01884         /*
01885          * Temporarily point to the update record to get its value.
01886          * This is pretty much a hack in order to get the PK value from
01887          * the update record rather than the insert record. Field::val_str()
01888          * should not change anything in Field::ptr, so this should be safe.
01889          * We are careful not to change anything in old_ptr.
01890          */
01891         const unsigned char *old_ptr= current_field->ptr;
01892         current_field->ptr= table.getUpdateRecord() + static_cast<ptrdiff_t>(old_ptr - table.getInsertRecord());
01893         string_value= current_field->val_str_internal(string_value);
01894         current_field->ptr= const_cast<unsigned char *>(old_ptr);
01895       }
01896       else
01897       {
01898         string_value= current_field->val_str_internal(string_value);
01902       }
01903       record->add_key_value(string_value->c_ptr(), string_value->length());
01904       string_value->free();
01905     }
01906   }
01907 }
01908 
01909 void TransactionServices::createTable(Session::reference session,
01910                                       const message::Table &table)
01911 {
01912   ReplicationServices &replication_services= ReplicationServices::singleton();
01913   if (not replication_services.isActive())
01914     return;
01915 
01916   if (not message::is_replicated(table))
01917     return;
01918 
01919   message::Transaction *transaction= getActiveTransactionMessage(session);
01920   message::Statement *statement= transaction->add_statement();
01921 
01922   initStatementMessage(*statement, message::Statement::CREATE_TABLE, session);
01923 
01924   /* 
01925    * Construct the specialized CreateTableStatement message and attach
01926    * it to the generic Statement message
01927    */
01928   message::CreateTableStatement *create_table_statement= statement->mutable_create_table_statement();
01929   message::Table *new_table_message= create_table_statement->mutable_table();
01930   *new_table_message= table;
01931 
01932   finalizeStatementMessage(*statement, session);
01933 
01934   finalizeTransactionMessage(*transaction, session);
01935   
01936   (void) replication_services.pushTransactionMessage(session, *transaction);
01937 
01938   cleanupTransactionMessage(transaction, session);
01939 
01940 }
01941 
01942 void TransactionServices::createSchema(Session::reference session,
01943                                        const message::Schema &schema)
01944 {
01945   ReplicationServices &replication_services= ReplicationServices::singleton();
01946   if (! replication_services.isActive())
01947     return;
01948 
01949   if (not message::is_replicated(schema))
01950     return;
01951 
01952   message::Transaction *transaction= getActiveTransactionMessage(session);
01953   message::Statement *statement= transaction->add_statement();
01954 
01955   initStatementMessage(*statement, message::Statement::CREATE_SCHEMA, session);
01956 
01957   /* 
01958    * Construct the specialized CreateSchemaStatement message and attach
01959    * it to the generic Statement message
01960    */
01961   message::CreateSchemaStatement *create_schema_statement= statement->mutable_create_schema_statement();
01962   message::Schema *new_schema_message= create_schema_statement->mutable_schema();
01963   *new_schema_message= schema;
01964 
01965   finalizeStatementMessage(*statement, session);
01966 
01967   finalizeTransactionMessage(*transaction, session);
01968   
01969   (void) replication_services.pushTransactionMessage(session, *transaction);
01970 
01971   cleanupTransactionMessage(transaction, session);
01972 
01973 }
01974 
01975 void TransactionServices::dropSchema(Session::reference session,
01976                                      identifier::Schema::const_reference identifier,
01977                                      message::schema::const_reference schema)
01978 {
01979   ReplicationServices &replication_services= ReplicationServices::singleton();
01980   if (not replication_services.isActive())
01981     return;
01982 
01983   if (not message::is_replicated(schema))
01984     return;
01985 
01986   message::Transaction *transaction= getActiveTransactionMessage(session);
01987   message::Statement *statement= transaction->add_statement();
01988 
01989   initStatementMessage(*statement, message::Statement::DROP_SCHEMA, session);
01990 
01991   /* 
01992    * Construct the specialized DropSchemaStatement message and attach
01993    * it to the generic Statement message
01994    */
01995   message::DropSchemaStatement *drop_schema_statement= statement->mutable_drop_schema_statement();
01996 
01997   drop_schema_statement->set_schema_name(identifier.getSchemaName());
01998 
01999   finalizeStatementMessage(*statement, session);
02000 
02001   finalizeTransactionMessage(*transaction, session);
02002   
02003   (void) replication_services.pushTransactionMessage(session, *transaction);
02004 
02005   cleanupTransactionMessage(transaction, session);
02006 }
02007 
02008 void TransactionServices::alterSchema(Session::reference session,
02009                                       const message::Schema &old_schema,
02010                                       const message::Schema &new_schema)
02011 {
02012   ReplicationServices &replication_services= ReplicationServices::singleton();
02013   if (! replication_services.isActive())
02014     return;
02015 
02016   if (not message::is_replicated(old_schema))
02017     return;
02018 
02019   message::Transaction *transaction= getActiveTransactionMessage(session);
02020   message::Statement *statement= transaction->add_statement();
02021 
02022   initStatementMessage(*statement, message::Statement::ALTER_SCHEMA, session);
02023 
02024   /* 
02025    * Construct the specialized AlterSchemaStatement message and attach
02026    * it to the generic Statement message
02027    */
02028   message::AlterSchemaStatement *alter_schema_statement= statement->mutable_alter_schema_statement();
02029 
02030   message::Schema *before= alter_schema_statement->mutable_before();
02031   message::Schema *after= alter_schema_statement->mutable_after();
02032 
02033   *before= old_schema;
02034   *after= new_schema;
02035 
02036   finalizeStatementMessage(*statement, session);
02037 
02038   finalizeTransactionMessage(*transaction, session);
02039   
02040   (void) replication_services.pushTransactionMessage(session, *transaction);
02041 
02042   cleanupTransactionMessage(transaction, session);
02043 }
02044 
02045 void TransactionServices::dropTable(Session::reference session,
02046                                     identifier::Table::const_reference identifier,
02047                                     message::table::const_reference table,
02048                                     bool if_exists)
02049 {
02050   ReplicationServices &replication_services= ReplicationServices::singleton();
02051   if (! replication_services.isActive())
02052     return;
02053 
02054   if (not message::is_replicated(table))
02055     return;
02056 
02057   message::Transaction *transaction= getActiveTransactionMessage(session);
02058   message::Statement *statement= transaction->add_statement();
02059 
02060   initStatementMessage(*statement, message::Statement::DROP_TABLE, session);
02061 
02062   /* 
02063    * Construct the specialized DropTableStatement message and attach
02064    * it to the generic Statement message
02065    */
02066   message::DropTableStatement *drop_table_statement= statement->mutable_drop_table_statement();
02067 
02068   drop_table_statement->set_if_exists_clause(if_exists);
02069 
02070   message::TableMetadata *table_metadata= drop_table_statement->mutable_table_metadata();
02071 
02072   table_metadata->set_schema_name(identifier.getSchemaName());
02073   table_metadata->set_table_name(identifier.getTableName());
02074 
02075   finalizeStatementMessage(*statement, session);
02076 
02077   finalizeTransactionMessage(*transaction, session);
02078   
02079   (void) replication_services.pushTransactionMessage(session, *transaction);
02080 
02081   cleanupTransactionMessage(transaction, session);
02082 }
02083 
02084 void TransactionServices::truncateTable(Session::reference session,
02085                                         Table &table)
02086 {
02087   ReplicationServices &replication_services= ReplicationServices::singleton();
02088   if (! replication_services.isActive())
02089     return;
02090 
02091   if (not table.getShare()->is_replicated())
02092     return;
02093 
02094   message::Transaction *transaction= getActiveTransactionMessage(session);
02095   message::Statement *statement= transaction->add_statement();
02096 
02097   initStatementMessage(*statement, message::Statement::TRUNCATE_TABLE, session);
02098 
02099   /* 
02100    * Construct the specialized TruncateTableStatement message and attach
02101    * it to the generic Statement message
02102    */
02103   message::TruncateTableStatement *truncate_statement= statement->mutable_truncate_table_statement();
02104   message::TableMetadata *table_metadata= truncate_statement->mutable_table_metadata();
02105 
02106   string schema_name;
02107   (void) table.getShare()->getSchemaName(schema_name);
02108 
02109   string table_name;
02110   (void) table.getShare()->getTableName(table_name);
02111 
02112   table_metadata->set_schema_name(schema_name.c_str(), schema_name.length());
02113   table_metadata->set_table_name(table_name.c_str(), table_name.length());
02114 
02115   finalizeStatementMessage(*statement, session);
02116 
02117   finalizeTransactionMessage(*transaction, session);
02118   
02119   (void) replication_services.pushTransactionMessage(session, *transaction);
02120 
02121   cleanupTransactionMessage(transaction, session);
02122 }
02123 
02124 void TransactionServices::rawStatement(Session::reference session,
02125                                        const string &query,
02126                                        const string &schema)
02127 {
02128   ReplicationServices &replication_services= ReplicationServices::singleton();
02129   if (! replication_services.isActive())
02130     return;
02131  
02132   message::Transaction *transaction= getActiveTransactionMessage(session);
02133   message::Statement *statement= transaction->add_statement();
02134 
02135   initStatementMessage(*statement, message::Statement::RAW_SQL, session);
02136   statement->set_sql(query);
02137   if (not schema.empty())
02138     statement->set_raw_sql_schema(schema);
02139   finalizeStatementMessage(*statement, session);
02140 
02141   finalizeTransactionMessage(*transaction, session);
02142   
02143   (void) replication_services.pushTransactionMessage(session, *transaction);
02144 
02145   cleanupTransactionMessage(transaction, session);
02146 }
02147 
02148 int TransactionServices::sendEvent(Session::reference session,
02149                                    const message::Event &event)
02150 {
02151   ReplicationServices &replication_services= ReplicationServices::singleton();
02152   if (! replication_services.isActive())
02153     return 0;
02154 
02155   message::Transaction *transaction= new (nothrow) message::Transaction();
02156 
02157   // set server id, start timestamp
02158   initTransactionMessage(*transaction, session, true);
02159 
02160   // set end timestamp
02161   finalizeTransactionMessage(*transaction, session);
02162 
02163   message::Event *trx_event= transaction->mutable_event();
02164 
02165   trx_event->CopyFrom(event);
02166 
02167   plugin::ReplicationReturnCode result= replication_services.pushTransactionMessage(session, *transaction);
02168 
02169   delete transaction;
02170 
02171   return static_cast<int>(result);
02172 }
02173 
02174 bool TransactionServices::sendStartupEvent(Session::reference session)
02175 {
02176   message::Event event;
02177   event.set_type(message::Event::STARTUP);
02178   if (sendEvent(session, event) != 0)
02179     return false;
02180   return true;
02181 }
02182 
02183 bool TransactionServices::sendShutdownEvent(Session::reference session)
02184 {
02185   message::Event event;
02186   event.set_type(message::Event::SHUTDOWN);
02187   if (sendEvent(session, event) != 0)
02188     return false;
02189   return true;
02190 }
02191 
02192 } /* namespace drizzled */