Drizzled Public API Documentation

queue_producer.cc

00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2011 David Shrewsbury
00005  *
00006  *  This program is free software; you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation; either version 2 of the License, or
00009  *  (at your option) any later version.
00010  *
00011  *  This program is distributed in the hope that it will be useful,
00012  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *  GNU General Public License for more details.
00015  *
00016  *  You should have received a copy of the GNU General Public License
00017  *  along with this program; if not, write to the Free Software
00018  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00019  */
00020 
00021 #include <config.h>
00022 #include <plugin/slave/queue_producer.h>
00023 #include <drizzled/errmsg_print.h>
00024 #include <drizzled/sql/result_set.h>
00025 #include <drizzled/execute.h>
00026 #include <drizzled/gettext.h>
00027 #include <drizzled/message/transaction.pb.h>
00028 #include <boost/lexical_cast.hpp>
00029 #include <google/protobuf/text_format.h>
00030 
00031 using namespace std;
00032 using namespace drizzled;
00033 
00034 namespace slave
00035 {
00036 
00037 QueueProducer::~QueueProducer()
00038 {
00039   if (_is_connected)
00040     closeConnection();
00041 }
00042 
00043 bool QueueProducer::init()
00044 {
00045   setIOState("", true);
00046   return reconnect(true);
00047 }
00048 
00049 bool QueueProducer::process()
00050 {
00051   if (_saved_max_commit_id == 0)
00052   {
00053     if (not queryForMaxCommitId(&_saved_max_commit_id))
00054     {
00055       if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
00056       {
00057         if (reconnect(false))
00058         {
00059           return true;    /* reconnect successful, try again */
00060         }
00061         else
00062         {
00063           _last_error_message= "Master offline";
00064           return false;   /* reconnect failed, shutdown the thread */
00065         }
00066       }
00067       else
00068       {
00069         return false;     /* unrecoverable error, shutdown the thread */
00070       }
00071     }
00072   }
00073 
00074   /* Keep getting events until caught up */
00075   enum drizzled::error_t err;
00076   while ((err= (queryForReplicationEvents(_saved_max_commit_id))) == EE_OK)
00077   {}
00078 
00079   if (err == ER_YES)  /* We encountered an error */
00080   {
00081     if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
00082     {
00083       if (reconnect(false))
00084       {
00085         return true;    /* reconnect successful, try again */
00086       }
00087       else
00088       {
00089         _last_error_message= "Master offline";
00090         return false;   /* reconnect failed, shutdown the thread */
00091       }
00092     }
00093     else
00094     {
00095       return false;     /* unrecoverable error, shutdown the thread */
00096     }
00097   }
00098 
00099   return true;
00100 }
00101 
00102 void QueueProducer::shutdown()
00103 {
00104   setIOState(_last_error_message, false);
00105   if (_is_connected)
00106     closeConnection();
00107 }
00108 
00109 bool QueueProducer::reconnect(bool initial_connection)
00110 {
00111   if (not initial_connection)
00112   {
00113     errmsg_printf(error::ERROR, _("Lost connection to master. Reconnecting."));
00114   }
00115 
00116   _is_connected= false;
00117   _last_return= DRIZZLE_RETURN_OK;
00118   _last_error_message.clear();
00119   boost::posix_time::seconds duration(_seconds_between_reconnects);
00120 
00121   uint32_t attempts= 1;
00122 
00123   while (not openConnection())
00124   {
00125     if (attempts++ == _max_reconnects)
00126       break;
00127     boost::this_thread::sleep(duration);
00128   }
00129 
00130   return _is_connected;
00131 }
00132 
00133 bool QueueProducer::openConnection()
00134 {
00135   if (drizzle_create(&_drizzle) == NULL)
00136   {
00137     _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
00138     _last_error_message= "Replication slave: ";
00139     _last_error_message.append(drizzle_error(&_drizzle));
00140     errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00141     return false;
00142   }
00143   
00144   if (drizzle_con_create(&_drizzle, &_connection) == NULL)
00145   {
00146     _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
00147     _last_error_message= "Replication slave: ";
00148     _last_error_message.append(drizzle_error(&_drizzle));
00149     errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00150     return false;
00151   }
00152   
00153   drizzle_con_set_tcp(&_connection, _master_host.c_str(), _master_port);
00154   drizzle_con_set_auth(&_connection, _master_user.c_str(), _master_pass.c_str());
00155 
00156   drizzle_return_t ret= drizzle_con_connect(&_connection);
00157 
00158   if (ret != DRIZZLE_RETURN_OK)
00159   {
00160     _last_return= ret;
00161     _last_error_message= "Replication slave: ";
00162     _last_error_message.append(drizzle_error(&_drizzle));
00163     errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00164     return false;
00165   }
00166   
00167   _is_connected= true;
00168 
00169   return true;
00170 }
00171 
00172 bool QueueProducer::closeConnection()
00173 {
00174   drizzle_return_t ret;
00175   drizzle_result_st result;
00176 
00177   _is_connected= false;
00178 
00179   if (drizzle_quit(&_connection, &result, &ret) == NULL)
00180   {
00181     _last_return= ret;
00182     drizzle_result_free(&result);
00183     return false;
00184   }
00185 
00186   drizzle_result_free(&result);
00187 
00188   return true;
00189 }
00190 
00191 bool QueueProducer::queryForMaxCommitId(uint64_t *max_commit_id)
00192 {
00193   /*
00194    * This SQL will get the maximum commit_id value we have pulled over from
00195    * the master. We query two tables because either the queue will be empty,
00196    * in which case the last_applied_commit_id will be the value we want, or
00197    * we have yet to drain the queue,  we get the maximum value still in
00198    * the queue.
00199    */
00200   string sql("SELECT MAX(x.cid) FROM"
00201              " (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
00202              "  UNION ALL SELECT `last_applied_commit_id` AS cid"
00203              "  FROM `sys_replication`.`applier_state`) AS x");
00204 
00205   sql::ResultSet result_set(1);
00206   Execute execute(*(_session.get()), true);
00207   execute.run(sql, result_set);
00208   assert(result_set.getMetaData().getColumnCount() == 1);
00209 
00210   /* Really should only be 1 returned row */
00211   uint32_t found_rows= 0;
00212   while (result_set.next())
00213   {
00214     string value= result_set.getString(0);
00215 
00216     if ((value == "") || (found_rows == 1))
00217       break;
00218 
00219     assert(result_set.isNull(0) == false);
00220     *max_commit_id= boost::lexical_cast<uint64_t>(value);
00221     found_rows++;
00222   }
00223 
00224   if (found_rows == 0)
00225   {
00226     _last_error_message= "Could not determine last committed transaction.";
00227     return false;
00228   }
00229 
00230   return true;
00231 }
00232 
00233 bool QueueProducer::queryForTrxIdList(uint64_t max_commit_id,
00234                                       vector<uint64_t> &list)
00235 {
00236   (void)list;
00237   string sql("SELECT `id` FROM `data_dictionary`.`sys_replication_log`"
00238              " WHERE `commit_id` > ");
00239   sql.append(boost::lexical_cast<string>(max_commit_id));
00240   sql.append(" ORDER BY `commit_id` LIMIT 25");
00241 
00242   drizzle_return_t ret;
00243   drizzle_result_st result;
00244   drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
00245   
00246   if (ret != DRIZZLE_RETURN_OK)
00247   {
00248     _last_return= ret;
00249     _last_error_message= "Replication slave: ";
00250     _last_error_message.append(drizzle_error(&_drizzle));
00251     errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00252     drizzle_result_free(&result);
00253     return false;
00254   }
00255 
00256   ret= drizzle_result_buffer(&result);
00257 
00258   if (ret != DRIZZLE_RETURN_OK)
00259   {
00260     _last_return= ret;
00261     _last_error_message= "Replication slave: ";
00262     _last_error_message.append(drizzle_error(&_drizzle));
00263     errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00264     drizzle_result_free(&result);
00265     return false;
00266   }
00267 
00268   drizzle_row_t row;
00269 
00270   while ((row= drizzle_row_next(&result)) != NULL)
00271   {
00272     if (row[0])
00273     {
00274       list.push_back(boost::lexical_cast<uint32_t>(row[0]));
00275     }
00276     else
00277     {
00278       _last_return= ret;
00279       _last_error_message= "Replication slave: Unexpected NULL for trx id";
00280       errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00281       drizzle_result_free(&result);
00282       return false;
00283     }
00284   }
00285 
00286   drizzle_result_free(&result);
00287   return true;
00288 }
00289 
00290 
00291 bool QueueProducer::queueInsert(const char *trx_id,
00292                                 const char *seg_id,
00293                                 const char *commit_id,
00294                                 const char *msg,
00295                                 const char *msg_length)
00296 {
00297   message::Transaction message;
00298 
00299   message.ParseFromArray(msg, boost::lexical_cast<int>(msg_length));
00300 
00301   /*
00302    * The SQL to insert our results into the local queue.
00303    */
00304   string sql= "INSERT INTO `sys_replication`.`queue`"
00305               " (`trx_id`, `seg_id`, `commit_order`, `msg`) VALUES (";
00306   sql.append(trx_id);
00307   sql.append(", ", 2);
00308   sql.append(seg_id);
00309   sql.append(", ", 2);
00310   sql.append(commit_id);
00311   sql.append(", '", 3);
00312 
00313   /*
00314    * Ideally we would store the Transaction message in binary form, as it
00315    * it stored on the master and tranferred to the slave. However, we are
00316    * inserting using drizzle::Execute which doesn't really handle binary
00317    * data. Until that is changed, we store as plain text.
00318    */
00319   string message_text;
00320   google::protobuf::TextFormat::PrintToString(message, &message_text);  
00321 
00322   /*
00323    * Execution using drizzled::Execute requires some special escaping.
00324    */
00325   string::iterator it= message_text.begin();
00326   for (; it != message_text.end(); ++it)
00327   {
00328     if (*it == '\"')
00329     {
00330       it= message_text.insert(it, '\\');
00331       ++it;
00332     }
00333     else if (*it == '\'')
00334     {
00335       it= message_text.insert(it, '\\');
00336       ++it;
00337       it= message_text.insert(it, '\\');
00338       ++it;
00339     }
00340     else if (*it == '\\')
00341     {
00342       it= message_text.insert(it, '\\');
00343       ++it;
00344       it= message_text.insert(it, '\\');
00345       ++it;
00346       it= message_text.insert(it, '\\');
00347       ++it;
00348     }
00349     else if (*it == ';')
00350     {
00351       it= message_text.insert(it, '\\');
00352       ++it;  /* advance back to the semicolon */
00353     }
00354   }
00355 
00356   sql.append(message_text);
00357   sql.append("')", 2);
00358 
00359   vector<string> statements;
00360   statements.push_back(sql);
00361 
00362   if (not executeSQL(statements))
00363   {
00364     markInErrorState();
00365     return false;
00366   }
00367 
00368   uint64_t tmp_commit_id= boost::lexical_cast<uint64_t>(commit_id);
00369   if (tmp_commit_id > _saved_max_commit_id)
00370     _saved_max_commit_id= tmp_commit_id;
00371 
00372   return true;
00373 }
00374 
00375 
00376 enum drizzled::error_t QueueProducer::queryForReplicationEvents(uint64_t max_commit_id)
00377 {
00378   vector<uint64_t> trx_id_list;
00379 
00380   if (not queryForTrxIdList(max_commit_id, trx_id_list))
00381     return ER_YES;
00382 
00383   if (trx_id_list.size() == 0)    /* nothing to get from the master */
00384   {
00385     return ER_NO;
00386   }
00387 
00388   /*
00389    * The SQL to pull everything we need from the master.
00390    */
00391   string sql= "SELECT `id`, `segid`, `commit_id`, `message`, `message_len` "
00392               " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
00393 
00394   for (size_t x= 0; x < trx_id_list.size(); x++)
00395   {
00396     if (x > 0)
00397       sql.append(", ", 2);
00398     sql.append(boost::lexical_cast<string>(trx_id_list[x]));
00399   }
00400 
00401   sql.append(")", 1);
00402 
00403   drizzle_return_t ret;
00404   drizzle_result_st result;
00405   drizzle_query_str(&_connection, &result, sql.c_str(), &ret);
00406   
00407   if (ret != DRIZZLE_RETURN_OK)
00408   {
00409     _last_return= ret;
00410     _last_error_message= "Replication slave: ";
00411     _last_error_message.append(drizzle_error(&_drizzle));
00412     errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00413     drizzle_result_free(&result);
00414     return ER_YES;
00415   }
00416 
00417   /* TODO: Investigate 1-row-at-a-time buffering */
00418 
00419   ret= drizzle_result_buffer(&result);
00420 
00421   if (ret != DRIZZLE_RETURN_OK)
00422   {
00423     _last_return= ret;
00424     _last_error_message= "Replication slave: ";
00425     _last_error_message.append(drizzle_error(&_drizzle));
00426     errmsg_printf(error::ERROR, _("%s"), _last_error_message.c_str());
00427     drizzle_result_free(&result);
00428     return ER_YES;
00429   }
00430 
00431   drizzle_row_t row;
00432 
00433   while ((row= drizzle_row_next(&result)) != NULL)
00434   {
00435     if (not queueInsert(row[0], row[1], row[2], row[3], row[4]))
00436     {
00437       errmsg_printf(error::ERROR,
00438                     _("Replication slave: Unable to insert into queue."));
00439       drizzle_result_free(&result);
00440       return ER_YES;
00441     }
00442   }
00443 
00444   drizzle_result_free(&result);
00445 
00446   return EE_OK;
00447 }
00448 
00449 
00450 void QueueProducer::setIOState(const string &err_msg, bool status)
00451 {
00452   vector<string> statements;
00453   string sql;
00454   string msg(err_msg);
00455 
00456   if (not status)
00457   {
00458     sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
00459   }
00460   else
00461   {
00462     sql= "UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
00463   }
00464   
00465   sql.append(", `error_msg` = '", 17);
00466 
00467   /* Escape embedded quotes and statement terminators */
00468   string::iterator it;
00469   for (it= msg.begin(); it != msg.end(); ++it)
00470   {
00471     if (*it == '\'')
00472     {
00473       it= msg.insert(it, '\'');
00474       ++it;  /* advance back to the quote */
00475     }
00476     else if (*it == ';')
00477     {
00478       it= msg.insert(it, '\\');
00479       ++it;  /* advance back to the semicolon */
00480     }
00481   }
00482   
00483   sql.append(msg);
00484   sql.append("'", 1);
00485 
00486   statements.push_back(sql);
00487   executeSQL(statements);
00488 }
00489 
00490 } /* namespace slave */