00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <config.h>
00022 #include <plugin/slave/queue_consumer.h>
00023 #include <drizzled/message/transaction.pb.h>
00024 #include <drizzled/message/statement_transform.h>
00025 #include <drizzled/sql/result_set.h>
00026 #include <drizzled/execute.h>
00027 #include <string>
00028 #include <vector>
00029 #include <boost/thread.hpp>
00030 #include <boost/lexical_cast.hpp>
00031 #include <google/protobuf/text_format.h>
00032
00033 using namespace std;
00034 using namespace drizzled;
00035
00036 namespace slave
00037 {
00038
00039 bool QueueConsumer::init()
00040 {
00041 setApplierState("", true);
00042 return true;
00043 }
00044
00045
00046 void QueueConsumer::shutdown()
00047 {
00048 setApplierState(getErrorMessage(), false);
00049 }
00050
00051
00052 bool QueueConsumer::process()
00053 {
00054 TrxIdList completedTransactionIds;
00055
00056 getListOfCompletedTransactions(completedTransactionIds);
00057
00058 for (size_t x= 0; x < completedTransactionIds.size(); x++)
00059 {
00060 string commit_id;
00061 uint64_t trx_id= completedTransactionIds[x];
00062
00063 vector<string> aggregate_sql;
00064 vector<string> segmented_sql;
00065
00066 message::Transaction transaction;
00067 uint32_t segment_id= 1;
00068
00069 while (getMessage(transaction, commit_id, trx_id, segment_id++))
00070 {
00071 convertToSQL(transaction, aggregate_sql, segmented_sql);
00072 transaction.Clear();
00073 }
00074
00075
00076
00077
00078
00079
00080 assert((not commit_id.empty()) && (commit_id != "0"));
00081 assert(segmented_sql.empty());
00082
00083 if (not aggregate_sql.empty())
00084 {
00085
00086
00087
00088 vector<string>::iterator agg_iter;
00089 for (agg_iter= aggregate_sql.begin(); agg_iter != aggregate_sql.end(); ++agg_iter)
00090 {
00091 string &sql= *agg_iter;
00092 string::iterator si= sql.begin();
00093 for (; si != sql.end(); ++si)
00094 {
00095 if (*si == '\"')
00096 {
00097 si= sql.insert(si, '\\');
00098 ++si;
00099 }
00100 else if (*si == '\\')
00101 {
00102 si= sql.insert(si, '\\');
00103 ++si;
00104 si= sql.insert(si, '\\');
00105 ++si;
00106 si= sql.insert(si, '\\');
00107 ++si;
00108 }
00109 else if (*si == ';')
00110 {
00111 si= sql.insert(si, '\\');
00112 ++si;
00113 }
00114 }
00115 }
00116 }
00117
00118 if (not executeSQLWithCommitId(aggregate_sql, commit_id))
00119 {
00120 return false;
00121 }
00122
00123 if (not deleteFromQueue(trx_id))
00124 {
00125 return false;
00126 }
00127 }
00128
00129 return true;
00130 }
00131
00132
00133 bool QueueConsumer::getMessage(message::Transaction &transaction,
00134 string &commit_id,
00135 uint64_t trx_id,
00136 uint32_t segment_id)
00137 {
00138 string sql("SELECT `msg`, `commit_order` FROM `sys_replication`.`queue`"
00139 " WHERE `trx_id` = ");
00140 sql.append(boost::lexical_cast<string>(trx_id));
00141 sql.append(" AND `seg_id` = ", 16);
00142 sql.append(boost::lexical_cast<string>(segment_id));
00143
00144 sql::ResultSet result_set(2);
00145 Execute execute(*(_session.get()), true);
00146
00147 execute.run(sql, result_set);
00148
00149 assert(result_set.getMetaData().getColumnCount() == 2);
00150
00151
00152 uint32_t found_rows= 0;
00153 while (result_set.next())
00154 {
00155 string msg= result_set.getString(0);
00156 string com_id= result_set.getString(1);
00157
00158 if ((msg == "") || (found_rows == 1))
00159 break;
00160
00161
00162 assert(result_set.isNull(0) == false);
00163 assert(result_set.isNull(1) == false);
00164
00165 google::protobuf::TextFormat::ParseFromString(msg, &transaction);
00166
00167 commit_id= com_id;
00168 found_rows++;
00169 }
00170
00171 if (found_rows == 0)
00172 return false;
00173
00174 return true;
00175 }
00176
00177 bool QueueConsumer::getListOfCompletedTransactions(TrxIdList &list)
00178 {
00179 Execute execute(*(_session.get()), true);
00180
00181 string sql("SELECT `trx_id` FROM `sys_replication`.`queue`"
00182 " WHERE `commit_order` IS NOT NULL AND `commit_order` > 0"
00183 " ORDER BY `commit_order` ASC");
00184
00185
00186 sql::ResultSet result_set(1);
00187
00188 execute.run(sql, result_set);
00189
00190 assert(result_set.getMetaData().getColumnCount() == 1);
00191
00192 while (result_set.next())
00193 {
00194 assert(result_set.isNull(0) == false);
00195 string value= result_set.getString(0);
00196
00197
00198 if (value != "")
00199 {
00200 list.push_back(boost::lexical_cast<uint64_t>(result_set.getString(0)));
00201 }
00202 }
00203
00204 return true;
00205 }
00206
00207
00208 bool QueueConsumer::convertToSQL(const message::Transaction &transaction,
00209 vector<string> &aggregate_sql,
00210 vector<string> &segmented_sql)
00211 {
00212 if (transaction.has_event())
00213 return true;
00214
00215 size_t num_statements= transaction.statement_size();
00216
00217
00218
00219
00220
00221
00222
00223
00224 for (size_t idx= 0; idx < num_statements; idx++)
00225 {
00226 const message::Statement &statement= transaction.statement(idx);
00227
00228
00229 if (statement.type() == message::Statement::ROLLBACK)
00230 {
00231 assert(idx == (num_statements - 1));
00232 aggregate_sql.clear();
00233 segmented_sql.clear();
00234 break;
00235 }
00236
00237 switch (statement.type())
00238 {
00239
00240 case message::Statement::TRUNCATE_TABLE:
00241 case message::Statement::CREATE_SCHEMA:
00242 case message::Statement::ALTER_SCHEMA:
00243 case message::Statement::DROP_SCHEMA:
00244 case message::Statement::CREATE_TABLE:
00245 case message::Statement::ALTER_TABLE:
00246 case message::Statement::DROP_TABLE:
00247 case message::Statement::RAW_SQL:
00248 {
00249 segmented_sql.push_back("COMMIT");
00250 break;
00251 }
00252
00253
00254 case message::Statement::ROLLBACK_STATEMENT:
00255 {
00256 segmented_sql.clear();
00257 continue;
00258 }
00259
00260 default:
00261 {
00262 break;
00263 }
00264 }
00265
00266 if (message::transformStatementToSql(statement, segmented_sql,
00267 message::DRIZZLE, true))
00268 {
00269 return false;
00270 }
00271
00272 if (isEndStatement(statement))
00273 {
00274 aggregate_sql.insert(aggregate_sql.end(),
00275 segmented_sql.begin(),
00276 segmented_sql.end());
00277 segmented_sql.clear();
00278 }
00279 }
00280
00281 return true;
00282 }
00283
00284
00285 bool QueueConsumer::isEndStatement(const message::Statement &statement)
00286 {
00287 switch (statement.type())
00288 {
00289 case (message::Statement::INSERT):
00290 {
00291 const message::InsertData &data= statement.insert_data();
00292 if (not data.end_segment())
00293 return false;
00294 break;
00295 }
00296 case (message::Statement::UPDATE):
00297 {
00298 const message::UpdateData &data= statement.update_data();
00299 if (not data.end_segment())
00300 return false;
00301 break;
00302 }
00303 case (message::Statement::DELETE):
00304 {
00305 const message::DeleteData &data= statement.delete_data();
00306 if (not data.end_segment())
00307 return false;
00308 break;
00309 }
00310 default:
00311 return true;
00312 }
00313 return true;
00314 }
00315
00316
00317 void QueueConsumer::setApplierState(const string &err_msg, bool status)
00318 {
00319 vector<string> statements;
00320 string sql;
00321 string msg(err_msg);
00322
00323 if (not status)
00324 {
00325 sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'STOPPED'";
00326 }
00327 else
00328 {
00329 sql= "UPDATE `sys_replication`.`applier_state` SET `status` = 'RUNNING'";
00330 }
00331
00332 sql.append(", `error_msg` = '", 17);
00333
00334
00335 string::iterator it;
00336 for (it= msg.begin(); it != msg.end(); ++it)
00337 {
00338 if (*it == '\'')
00339 {
00340 it= msg.insert(it, '\'');
00341 ++it;
00342 }
00343 else if (*it == ';')
00344 {
00345 it= msg.insert(it, '\\');
00346 ++it;
00347 }
00348 }
00349
00350 sql.append(msg);
00351 sql.append("'", 1);
00352
00353 statements.push_back(sql);
00354 executeSQL(statements);
00355 }
00356
00357
00358 bool QueueConsumer::executeSQLWithCommitId(vector<string> &sql,
00359 const string &commit_id)
00360 {
00361 string tmp("UPDATE `sys_replication`.`applier_state`"
00362 " SET `last_applied_commit_id` = ");
00363 tmp.append(commit_id);
00364 sql.push_back(tmp);
00365
00366 return executeSQL(sql);
00367 }
00368
00369
00370 bool QueueConsumer::deleteFromQueue(uint64_t trx_id)
00371 {
00372 string sql("DELETE FROM `sys_replication`.`queue` WHERE `trx_id` = ");
00373 sql.append(boost::lexical_cast<std::string>(trx_id));
00374
00375 vector<string> sql_vect;
00376 sql_vect.push_back(sql);
00377
00378 return executeSQL(sql_vect);
00379 }
00380
00381 }