Drizzled Public API Documentation

queue_consumer.cc

00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2011 David Shrewsbury
00005  *
00006  *  This program is free software; you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation; either version 2 of the License, or
00009  *  (at your option) any later version.
00010  *
00011  *  This program is distributed in the hope that it will be useful,
00012  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *  GNU General Public License for more details.
00015  *
00016  *  You should have received a copy of the GNU General Public License
00017  *  along with this program; if not, write to the Free Software
00018  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00019  */
00020 
00021 #include <config.h>
00022 #include <plugin/slave/queue_consumer.h>
00023 #include <drizzled/message/transaction.pb.h>
00024 #include <drizzled/message/statement_transform.h>
00025 #include <drizzled/sql/result_set.h>
00026 #include <drizzled/execute.h>
00027 #include <string>
00028 #include <vector>
00029 #include <boost/thread.hpp>
00030 #include <boost/lexical_cast.hpp>
00031 #include <google/protobuf/text_format.h>
00032 
00033 using namespace std;
00034 using namespace drizzled;
00035 
00036 namespace slave
00037 {
00038 
00039 bool QueueConsumer::init()
00040 {
00041   setApplierState("", true);
00042   return true;
00043 }
00044 
00045 
00046 void QueueConsumer::shutdown()
00047 {
00048   setApplierState(getErrorMessage(), false);
00049 }
00050 
00051 
00052 bool QueueConsumer::process()
00053 {
00054   TrxIdList completedTransactionIds;
00055 
00056   getListOfCompletedTransactions(completedTransactionIds);
00057 
00058   for (size_t x= 0; x < completedTransactionIds.size(); x++)
00059   {
00060     string commit_id;
00061     uint64_t trx_id= completedTransactionIds[x];
00062 
00063     vector<string> aggregate_sql;  /* final SQL to execute */
00064     vector<string> segmented_sql;  /* carryover from segmented statements */
00065 
00066     message::Transaction transaction;
00067     uint32_t segment_id= 1;
00068 
00069     while (getMessage(transaction, commit_id, trx_id, segment_id++))
00070     {
00071       convertToSQL(transaction, aggregate_sql, segmented_sql);
00072       transaction.Clear();
00073     }
00074 
00075     /*
00076      * The last message in a transaction should always have a commit_id
00077      * value larger than 0, though other messages of the same transaction
00078      * will have commit_id = 0.
00079      */
00080     assert((not commit_id.empty()) && (commit_id != "0"));
00081     assert(segmented_sql.empty());
00082 
00083     if (not aggregate_sql.empty())
00084     {
00085       /*
00086        * Execution using drizzled::Execute requires some special escaping.
00087        */
00088       vector<string>::iterator agg_iter;
00089       for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
00090       {
00091         string &sql= *agg_iter;
00092         string::iterator si= sql.begin();
00093         for (; si != sql.end(); ++si)
00094         {
00095           if (*si == '\"')
00096           {
00097             si= sql.insert(si, '\\');
00098             ++si;
00099           }
00100           else if (*si == '\\')
00101           {
00102             si= sql.insert(si, '\\');
00103             ++si;
00104             si= sql.insert(si, '\\');
00105             ++si;
00106             si= sql.insert(si, '\\');
00107             ++si;
00108           }
00109           else if (*si == ';')
00110           {
00111             si= sql.insert(si, '\\');
00112             ++si;  /* advance back to the semicolon */
00113           }
00114         }
00115       }
00116     }
00117 
00118     if (not executeSQLWithCommitId(aggregate_sql, commit_id))
00119     {
00120       return false;
00121     }
00122 
00123     if (not deleteFromQueue(trx_id))
00124     {
00125       return false;
00126     }
00127   }
00128 
00129   return true;
00130 }
00131 
00132 
00133 bool QueueConsumer::getMessage(message::Transaction &transaction,
00134                               string &commit_id,
00135                               uint64_t trx_id,
00136                               uint32_t segment_id)
00137 {
00138   string sql("SELECT `msg`, `commit_order` FROM `sys_replication`.`queue`"
00139              " WHERE `trx_id` = ");
00140   sql.append(boost::lexical_cast<string>(trx_id));
00141   sql.append(" AND `seg_id` = ", 16);
00142   sql.append(boost::lexical_cast<string>(segment_id));
00143 
00144   sql::ResultSet result_set(2);
00145   Execute execute(*(_session.get()), true);
00146   
00147   execute.run(sql, result_set);
00148   
00149   assert(result_set.getMetaData().getColumnCount() == 2);
00150 
00151   /* Really should only be 1 returned row */
00152   uint32_t found_rows= 0;
00153   while (result_set.next())
00154   {
00155     string msg= result_set.getString(0);
00156     string com_id= result_set.getString(1);
00157 
00158     if ((msg == "") || (found_rows == 1))
00159       break;
00160 
00161     /* Neither column should be NULL */
00162     assert(result_set.isNull(0) == false);
00163     assert(result_set.isNull(1) == false);
00164 
00165     google::protobuf::TextFormat::ParseFromString(msg, &transaction);
00166 
00167     commit_id= com_id;
00168     found_rows++;
00169   }
00170 
00171   if (found_rows == 0)
00172     return false;
00173   
00174   return true;
00175 }
00176 
00177 bool QueueConsumer::getListOfCompletedTransactions(TrxIdList &list)
00178 {
00179   Execute execute(*(_session.get()), true);
00180   
00181   string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
00182              " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
00183              " ORDER BY `commit_order` ASC");
00184   
00185   /* ResultSet size must match column count */
00186   sql::ResultSet result_set(1);
00187 
00188   execute.run(sql, result_set);
00189 
00190   assert(result_set.getMetaData().getColumnCount() == 1);
00191 
00192   while (result_set.next())
00193   {
00194     assert(result_set.isNull(0) == false);
00195     string value= result_set.getString(0);
00196     
00197     /* empty string returned when no more results */
00198     if (value != "")
00199     {
00200       list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
00201     }
00202   }
00203   
00204   return true;
00205 }
00206 
00207 
00208 bool QueueConsumer::convertToSQL(const message::Transaction &transaction,
00209                                 vector<string> &aggregate_sql,
00210                                 vector<string> &segmented_sql)
00211 {
00212   if (transaction.has_event())
00213     return true;
00214 
00215   size_t num_statements= transaction.statement_size();
00216 
00217   /*
00218    * Loop through all Statement messages within this Transaction and
00219    * convert each to equivalent SQL statements. Complete Statements will
00220    * be appended to aggregate_sql, while segmented Statements will remain
00221    * in segmented_sql to be appended to until completed, or rolled back.
00222    */
00223 
00224   for (size_t idx= 0; idx < num_statements; idx++)
00225   {
00226     const message::Statement &statement= transaction.statement(idx);
00227     
00228     /* We won't bother with executing a rolled back transaction */
00229     if (statement.type() == message::Statement::ROLLBACK)
00230     {
00231       assert(idx == (num_statements - 1));  /* should be the final Statement */
00232       aggregate_sql.clear();
00233       segmented_sql.clear();
00234       break;
00235     }
00236 
00237     switch (statement.type())
00238     {
00239       /* DDL cannot be in a transaction, so precede with a COMMIT */
00240       case message::Statement::TRUNCATE_TABLE:
00241       case message::Statement::CREATE_SCHEMA:
00242       case message::Statement::ALTER_SCHEMA:
00243       case message::Statement::DROP_SCHEMA:
00244       case message::Statement::CREATE_TABLE:
00245       case message::Statement::ALTER_TABLE:
00246       case message::Statement::DROP_TABLE:
00247       case message::Statement::RAW_SQL:  /* currently ALTER TABLE or RENAME */
00248       {
00249         segmented_sql.push_back("COMMIT");
00250         break;
00251       }
00252 
00253       /* Cancel any ongoing statement */
00254       case message::Statement::ROLLBACK_STATEMENT:
00255       {
00256         segmented_sql.clear();
00257         continue;
00258       }
00259       
00260       default:
00261       {
00262         break;
00263       }
00264     }
00265 
00266     if (message::transformStatementToSql(statement, segmented_sql,
00267                                          message::DRIZZLE, true))
00268     {
00269       return false;
00270     }
00271 
00272     if (isEndStatement(statement))
00273     {
00274       aggregate_sql.insert(aggregate_sql.end(),
00275                            segmented_sql.begin(),
00276                            segmented_sql.end());
00277       segmented_sql.clear();
00278     }
00279   }
00280 
00281   return true;
00282 }
00283 
00284 
00285 bool QueueConsumer::isEndStatement(const message::Statement &statement)
00286 {
00287   switch (statement.type())
00288   {
00289     case (message::Statement::INSERT):
00290     {
00291       const message::InsertData &data= statement.insert_data();
00292       if (not data.end_segment())
00293         return false;
00294       break;
00295     }
00296     case (message::Statement::UPDATE):
00297     {
00298       const message::UpdateData &data= statement.update_data();
00299       if (not data.end_segment())
00300         return false;
00301       break;
00302     }
00303     case (message::Statement::DELETE):
00304     {
00305       const message::DeleteData &data= statement.delete_data();
00306       if (not data.end_segment())
00307         return false;
00308       break;
00309     }
00310     default:
00311       return true;
00312   }
00313   return true;
00314 }
00315 
00316 
00317 void QueueConsumer::setApplierState(const string &err_msg, bool status)
00318 {
00319   vector<string> statements;
00320   string sql;
00321   string msg(err_msg);
00322 
00323   if (not status)
00324   {
00325     sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
00326   }
00327   else
00328   {
00329     sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
00330   }
00331   
00332   sql.append(", `error_msg` = '", 17);
00333 
00334   /* Escape embedded quotes and statement terminators */
00335   string::iterator it;
00336   for (it= msg.begin(); it != msg.end(); ++it)
00337   {
00338     if (*it == '\'')
00339     {
00340       it= msg.insert(it, '\'');
00341       ++it;  /* advance back to the quote */
00342     }
00343     else if (*it == ';')
00344     {
00345       it= msg.insert(it, '\\');
00346       ++it;  /* advance back to the semicolon */
00347     }
00348   }
00349   
00350   sql.append(msg);
00351   sql.append("'", 1);
00352 
00353   statements.push_back(sql);
00354   executeSQL(statements);
00355 }
00356 
00357 
00358 bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
00359                                            const string &commit_id)
00360 {
00361   string tmp("UPDATE `sys_replication`.`applier_state`"
00362              " SET `last_applied_commit_id` = ");
00363   tmp.append(commit_id);
00364   sql.push_back(tmp);
00365   
00366   return executeSQL(sql);
00367 }
00368 
00369 
00370 bool QueueConsumer::deleteFromQueue(uint64_t trx_id)
00371 {
00372   string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
00373   sql.append(boost::lexical_cast<std::string>(trx_id));
00374 
00375   vector<string> sql_vect;
00376   sql_vect.push_back(sql);
00377 
00378   return executeSQL(sql_vect);
00379 }
00380 
00381 } /* namespace slave */