00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <config.h>
00025 #include <sys/types.h>
00026 #include <sys/stat.h>
00027 #include <limits.h>
00028 #include <iostream>
00029 #include <string>
00030 #include <algorithm>
00031 #include <vector>
00032 #include <unistd.h>
00033 #include <drizzled/gettext.h>
00034 #include <drizzled/message/transaction.pb.h>
00035 #include <drizzled/message/statement_transform.h>
00036 #include "transaction_manager.h"
00037 #include "transaction_file_reader.h"
00038 #include "transaction_log_connection.h"
00039
00040 #include <google/protobuf/io/coded_stream.h>
00041 #include <google/protobuf/io/zero_copy_stream_impl.h>
00042
00043 #include <boost/program_options.hpp>
00044
00045 using namespace std;
00046 using namespace google;
00047 using namespace drizzled;
00048
00049 namespace po= boost::program_options;
00050
00051 static const char *replace_with_spaces= "\n\r";
00052
00053 static void printErrorMessage(enum message::TransformSqlError err)
00054 {
00055 switch (err)
00056 {
00057 case message::MISSING_HEADER:
00058 {
00059 cerr << "Data segment without a header\n";
00060 break;
00061 }
00062 case message::MISSING_DATA:
00063 {
00064 cerr << "Header segment without a data segment\n";
00065 break;
00066 }
00067 case message::UUID_MISMATCH:
00068 {
00069 cerr << "UUID on objects did not match\n";
00070 break;
00071 }
00072 default:
00073 {
00074 cerr << "Unhandled error\n";
00075 break;
00076 }
00077 }
00078 }
00079
00089 static bool printStatement(const message::Statement &statement, string &output)
00090 {
00091 vector<string> sql_strings;
00092 enum message::TransformSqlError err;
00093
00094 err= message::transformStatementToSql(statement,
00095 sql_strings,
00096 message::DRIZZLE,
00097 true );
00098
00099 if (err != message::NONE)
00100 {
00101 printErrorMessage(err);
00102 return false;
00103 }
00104
00105 for (vector<string>::iterator sql_string_iter= sql_strings.begin();
00106 sql_string_iter != sql_strings.end();
00107 ++sql_string_iter)
00108 {
00109 string &sql= *sql_string_iter;
00110
00111
00112
00113
00114
00115 {
00116 string::size_type found= sql.find_first_of(replace_with_spaces);
00117 while (found != string::npos)
00118 {
00119 sql[found]= ' ';
00120 found= sql.find_first_of(replace_with_spaces, found);
00121 }
00122 }
00123
00124
00125
00126
00127 {
00128 string::size_type found= sql.find_first_of('\0');
00129 while (found != string::npos)
00130 {
00131 sql[found]= '\\';
00132 sql.insert(found + 1, 1, '0');
00133 found= sql.find_first_of('\0', found);
00134 }
00135 }
00136
00137 output.append(sql + ";\n");
00138 }
00139
00140 return true;
00141 }
00142
00143 static bool isDDLStatement(const message::Statement &statement)
00144 {
00145 bool isDDL;
00146
00147 switch (statement.type())
00148 {
00149 case (message::Statement::TRUNCATE_TABLE):
00150 case (message::Statement::CREATE_SCHEMA):
00151 case (message::Statement::ALTER_SCHEMA):
00152 case (message::Statement::DROP_SCHEMA):
00153 case (message::Statement::CREATE_TABLE):
00154 case (message::Statement::ALTER_TABLE):
00155 case (message::Statement::DROP_TABLE):
00156 case (message::Statement::RAW_SQL):
00157 {
00158 isDDL= true;
00159 break;
00160 }
00161 default:
00162 {
00163 isDDL= false;
00164 break;
00165 }
00166 }
00167
00168 return isDDL;
00169 }
00170
00171 static bool isEndStatement(const message::Statement &statement)
00172 {
00173 switch (statement.type())
00174 {
00175 case (message::Statement::INSERT):
00176 {
00177 const message::InsertData &data= statement.insert_data();
00178 if (not data.end_segment())
00179 return false;
00180 break;
00181 }
00182 case (message::Statement::UPDATE):
00183 {
00184 const message::UpdateData &data= statement.update_data();
00185 if (not data.end_segment())
00186 return false;
00187 break;
00188 }
00189 case (message::Statement::DELETE):
00190 {
00191 const message::DeleteData &data= statement.delete_data();
00192 if (not data.end_segment())
00193 return false;
00194 break;
00195 }
00196 default:
00197 return true;
00198 }
00199 return true;
00200 }
00201
00202 static bool isEndTransaction(const message::Transaction &transaction)
00203 {
00204 const message::TransactionContext trx= transaction.transaction_context();
00205
00206 size_t num_statements= transaction.statement_size();
00207
00208
00209
00210
00211
00212 for (size_t x= 0; x < num_statements; ++x)
00213 {
00214 const message::Statement &statement= transaction.statement(x);
00215
00216 if (not isEndStatement(statement))
00217 return false;
00218 }
00219
00220 return true;
00221 }
00222
00223 static void printEvent(const message::Event &event)
00224 {
00225 switch (event.type())
00226 {
00227 case message::Event::STARTUP:
00228 {
00229 cout << "-- EVENT: Server startup\n";
00230 break;
00231 }
00232 case message::Event::SHUTDOWN:
00233 {
00234 cout << "-- EVENT: Server shutdown\n";
00235 break;
00236 }
00237 default:
00238 {
00239 cout << "-- EVENT: Unknown event\n";
00240 break;
00241 }
00242 }
00243 }
00244
00245 static void printTransactionSummary(const message::Transaction &transaction,
00246 bool ignore_events)
00247 {
00248 static uint64_t last_trx_id= 0;
00249 const message::TransactionContext trx= transaction.transaction_context();
00250
00251 if (last_trx_id != trx.transaction_id())
00252 cout << "\ntransaction_id = " << trx.transaction_id() << endl;
00253
00254 last_trx_id= trx.transaction_id();
00255
00256 if (transaction.has_event() && (not ignore_events))
00257 {
00258 cout << "\t";
00259 printEvent(transaction.event());
00260 }
00261
00262 size_t num_statements= transaction.statement_size();
00263 size_t x;
00264
00265 for (x= 0; x < num_statements; ++x)
00266 {
00267 const message::Statement &statement= transaction.statement(x);
00268
00269 switch (statement.type())
00270 {
00271 case (message::Statement::ROLLBACK):
00272 {
00273 cout << "\tROLLBACK\n";
00274 break;
00275 }
00276 case (message::Statement::INSERT):
00277 {
00278 const message::InsertHeader &header= statement.insert_header();
00279 const message::TableMetadata &meta= header.table_metadata();
00280 cout << "\tINSERT INTO `" << meta.table_name() << "`\n";
00281 break;
00282 }
00283 case (message::Statement::DELETE):
00284 {
00285 const message::DeleteHeader &header= statement.delete_header();
00286 const message::TableMetadata &meta= header.table_metadata();
00287 cout << "\tDELETE FROM `" << meta.table_name() << "`\n";
00288 break;
00289 }
00290 case (message::Statement::UPDATE):
00291 {
00292 const message::UpdateHeader &header= statement.update_header();
00293 const message::TableMetadata &meta= header.table_metadata();
00294 cout << "\tUPDATE `" << meta.table_name() << "`\n";
00295 break;
00296 }
00297 case (message::Statement::TRUNCATE_TABLE):
00298 {
00299 const message::TableMetadata &meta= statement.truncate_table_statement().table_metadata();
00300 cout << "\tTRUNCATE TABLE `" << meta.table_name() << "`\n";
00301 break;
00302 }
00303 case (message::Statement::CREATE_SCHEMA):
00304 {
00305 const message::Schema &schema= statement.create_schema_statement().schema();
00306 cout << "\tCREATE SCHEMA `" << schema.name() << "`\n";
00307 break;
00308 }
00309 case (message::Statement::ALTER_SCHEMA):
00310 {
00311 const message::Schema &schema= statement.alter_schema_statement().before();
00312 cout << "\tALTER SCHEMA `" << schema.name() << "`\n";
00313 break;
00314 }
00315 case (message::Statement::DROP_SCHEMA):
00316 {
00317 cout << "\tDROP SCHEMA `" << statement.drop_schema_statement().schema_name() << "`\n";
00318 break;
00319 }
00320 case (message::Statement::CREATE_TABLE):
00321 {
00322 const message::Table &table= statement.create_table_statement().table();
00323 cout << "\tCREATE TABLE `" << table.name() << "`\n";
00324 break;
00325 }
00326 case (message::Statement::ALTER_TABLE):
00327 {
00328 const message::Table &table= statement.alter_table_statement().before();
00329 cout << "\tALTER TABLE `" << table.name() << "`\n";
00330 break;
00331 }
00332 case (message::Statement::DROP_TABLE):
00333 {
00334 const message::TableMetadata &meta= statement.drop_table_statement().table_metadata();
00335 cout << "\tDROP TABLE `" << meta.table_name() << "`\n";
00336 break;
00337 }
00338 case (message::Statement::SET_VARIABLE):
00339 {
00340 const message::FieldMetadata &meta= statement.set_variable_statement().variable_metadata();
00341 cout << "\tSET VARIABLE " << meta.name() << "\n";
00342 break;
00343 }
00344 case (message::Statement::RAW_SQL):
00345 {
00346 cout << "\tRAW SQL\n";
00347 break;
00348 }
00349 case (message::Statement::ROLLBACK_STATEMENT):
00350 {
00351 cout << "\tROLLBACK STATEMENT\n";
00352 break;
00353 }
00354 default:
00355 cout << "\tUnhandled Statement Type\n";
00356 }
00357 }
00358 }
00359
00370 static bool printTransaction(const message::Transaction &transaction,
00371 bool ignore_events,
00372 bool print_as_raw)
00373 {
00374 static uint64_t last_trx_id= 0;
00375 bool should_commit= true;
00376 const message::TransactionContext trx= transaction.transaction_context();
00377
00378
00379
00380
00381 if (transaction.has_event())
00382 {
00383 last_trx_id= trx.transaction_id();
00384 if (not ignore_events)
00385 {
00386 if (print_as_raw)
00387 transaction.PrintDebugString();
00388 else
00389 printEvent(transaction.event());
00390 }
00391 return true;
00392 }
00393
00394 if (print_as_raw)
00395 {
00396 transaction.PrintDebugString();
00397 return true;
00398 }
00399
00400 size_t num_statements= transaction.statement_size();
00401 vector<string> cached_statement_sql;
00402
00403 for (size_t x= 0; x < num_statements; ++x)
00404 {
00405 const message::Statement &statement= transaction.statement(x);
00406
00407
00408 if (x == 0)
00409 {
00410
00411 if (trx.transaction_id() != last_trx_id)
00412 {
00413 if (isDDLStatement(statement))
00414 {
00415 cout << "SET AUTOCOMMIT=0;" << endl;
00416 }
00417 else
00418 {
00419 cout << "START TRANSACTION;" << endl;
00420 }
00421 }
00422
00423 last_trx_id= trx.transaction_id();
00424 }
00425
00426 if (should_commit)
00427 should_commit= isEndStatement(statement);
00428
00429
00430
00431
00432
00433
00434
00435 if (statement.type() == message::Statement::ROLLBACK)
00436 should_commit= false;
00437
00438 string output;
00439
00440 if (not printStatement(statement, output))
00441 {
00442 return false;
00443 }
00444
00445 if (isEndStatement(statement))
00446 {
00447
00448 if (cached_statement_sql.empty() &&
00449 (statement.type() != message::Statement::ROLLBACK_STATEMENT))
00450 {
00451 cout << output;
00452 }
00453
00454
00455 else if (statement.type() == message::Statement::ROLLBACK_STATEMENT)
00456 {
00457 cached_statement_sql.clear();
00458 cout << "-- Rollback statement\n";
00459 }
00460
00461
00462 else
00463 {
00464 for (size_t y= 0; y < cached_statement_sql.size(); y++)
00465 {
00466 cout << cached_statement_sql[y];
00467 }
00468 cached_statement_sql.clear();
00469 }
00470 }
00471
00472
00473
00474
00475
00476 else
00477 {
00478 cached_statement_sql.push_back(output);
00479 }
00480 }
00481
00482
00483
00484
00485
00486
00487
00488
00489 if (should_commit)
00490 cout << "COMMIT;" << endl;
00491
00492 return true;
00493 }
00494
00495 static bool processTransactionMessage(TransactionManager &trx_mgr,
00496 const message::Transaction &transaction,
00497 bool summarize,
00498 bool ignore_events,
00499 bool print_as_raw)
00500 {
00501 if (not isEndTransaction(transaction))
00502 {
00503 trx_mgr.store(transaction);
00504 }
00505 else
00506 {
00507 const message::TransactionContext trx= transaction.transaction_context();
00508 uint64_t transaction_id= trx.transaction_id();
00509
00510
00511
00512
00513
00514 if (trx_mgr.contains(transaction_id))
00515 {
00516 trx_mgr.store(transaction);
00517
00518 uint32_t size= trx_mgr.getTransactionBufferSize(transaction_id);
00519 uint32_t idx= 0;
00520
00521 while (idx != size)
00522 {
00523 message::Transaction new_trx;
00524 trx_mgr.getTransactionMessage(new_trx, transaction_id, idx);
00525 if (summarize)
00526 {
00527 printTransactionSummary(new_trx, ignore_events);
00528 }
00529 else
00530 {
00531 if (not printTransaction(new_trx, ignore_events, print_as_raw))
00532 {
00533 return false;
00534 }
00535 }
00536 idx++;
00537 }
00538
00539
00540 trx_mgr.remove(transaction_id);
00541 }
00542 else
00543 {
00544 if (summarize)
00545 {
00546 printTransactionSummary(transaction, ignore_events);
00547 }
00548 else
00549 {
00550 if (not printTransaction(transaction, ignore_events, print_as_raw))
00551 {
00552 return false;
00553 }
00554 }
00555 }
00556 }
00557
00558 return true;
00559 }
00560
00561 static void getTrxIdList(TransactionLogConnection *connection,
00562 const string &query_string,
00563 vector<uint64_t> &ordered_trx_id_list)
00564 {
00565 drizzle_result_st result;
00566
00567 connection->query(query_string, &result);
00568
00569 drizzle_row_t row;
00570 while ((row= drizzle_row_next(&result)))
00571 {
00572 if (row[0])
00573 {
00574 ordered_trx_id_list.push_back(boost::lexical_cast<uint64_t>(row[0]));
00575 }
00576 }
00577
00578 drizzle_result_free(&result);
00579 }
00580
00581 static int extractRowsForTrxIds(TransactionLogConnection *connection,
00582 const vector<uint64_t> &ordered_trx_id_list,
00583 bool summarize,
00584 bool ignore_events,
00585 bool print_as_raw)
00586 {
00587 for (size_t idx= 0; idx < ordered_trx_id_list.size(); idx++)
00588 {
00589 uint64_t trx_id= ordered_trx_id_list[idx];
00590
00591 string sql("SELECT MESSAGE, MESSAGE_LEN"
00592 " FROM DATA_DICTIONARY.SYS_REPLICATION_LOG"
00593 " WHERE ID = ");
00594 sql.append(boost::lexical_cast<string>(trx_id));
00595 sql.append(" ORDER BY SEGID ASC");
00596
00597 drizzle_result_st result;
00598 connection->query(sql, &result);
00599
00600 drizzle_row_t row;
00601 while ((row= drizzle_row_next(&result)))
00602 {
00603 char* data= (char*)row[0];
00604 uint64_t length= (row[1]) ? boost::lexical_cast<uint64_t>(row[1]) : 0;
00605
00606 message::Transaction transaction;
00607 TransactionManager trx_mgr;
00608
00609 transaction.ParseFromArray(data, length);
00610
00611 if (not processTransactionMessage(trx_mgr, transaction,
00612 summarize, ignore_events,
00613 print_as_raw))
00614 {
00615 drizzle_result_free(&result);
00616 return -1;
00617 }
00618 }
00619
00620 drizzle_result_free(&result);
00621 }
00622
00623 return 0;
00624 }
00625
00630 int main(int argc, char* argv[])
00631 {
00632 GOOGLE_PROTOBUF_VERIFY_VERSION;
00633 int opt_start_pos= 0;
00634 uint64_t opt_transaction_id= 0;
00635 uint64_t opt_start_transaction_id= 0;
00636 uint32_t opt_drizzle_port= 0;
00637 string current_user, opt_password, opt_protocol, current_host;
00638 bool use_drizzle_protocol= false;
00639
00640
00641
00642
00643 po::options_description desc("Program options");
00644 desc.add_options()
00645 ("help", _("Display help and exit"))
00646 ("use-innodb-replication-log", _("Read from the innodb transaction log"))
00647 ("user,u", po::value<string>(¤t_user)->default_value(""),
00648 _("User for login if not current user."))
00649 ("port,p", po::value<uint32_t>(&opt_drizzle_port)->default_value(0),
00650 _("Port number to use for connection."))
00651 ("password,P", po::value<string>(&opt_password)->default_value(""),
00652 _("Password to use when connecting to server"))
00653 ("protocol",po::value<string>(&opt_protocol)->default_value("mysql"),
00654 _("The protocol of connection (mysql or drizzle)."))
00655 ("checksum", _("Perform checksum"))
00656 ("ignore-events", _("Ignore event messages"))
00657 ("input-file", po::value< vector<string> >(), _("Transaction log file"))
00658 ("raw", _("Print raw Protobuf messages instead of SQL"))
00659 ("start-pos",
00660 po::value<int>(&opt_start_pos),
00661 _("Start reading from the given file position"))
00662 ("start-transaction-id",
00663 po::value<uint64_t>(&opt_start_transaction_id),
00664 _("Only output for the given transaction ID and later"))
00665 ("transaction-id",
00666 po::value<uint64_t>(&opt_transaction_id),
00667 _("Only output for the given transaction ID"))
00668 ("summarize", _("Summarize message contents"));
00669
00670
00671
00672
00673 po::positional_options_description pos;
00674 pos.add("input-file", 1);
00675
00676
00677
00678
00679 po::variables_map vm;
00680 po::store(po::command_line_parser(argc, argv).
00681 options(desc).positional(pos).run(), vm);
00682 po::notify(vm);
00683
00684 if (vm.count("help"))
00685 {
00686 cerr << desc << endl;
00687 return -1;
00688 }
00689
00690 if (not vm.count("input-file") && not vm.count("use-innodb-replication-log"))
00691 {
00692 cerr << desc << endl;
00693 return -1;
00694 }
00695
00696
00697
00698
00699
00700 if (vm.count("start-pos") && vm.count("transaction-id"))
00701 {
00702 cerr << _("Cannot use --start-pos and --transaction-id together\n");
00703 return -1;
00704 }
00705
00706 if (vm.count("summarize") && (vm.count("raw") || vm.count("transaction-id")))
00707 {
00708 cerr << _("Cannot use --summarize with either --raw or --transaction-id\n");
00709 return -1;
00710 }
00711
00712 bool do_checksum= vm.count("checksum") ? true : false;
00713 bool use_innodb_replication_log= vm.count("use-innodb-replication-log") ? true : false;
00714 bool ignore_events= vm.count("ignore-events") ? true : false;
00715 bool print_as_raw= vm.count("raw") ? true : false;
00716 bool summarize= vm.count("summarize") ? true : false;
00717
00718
00719
00720
00721
00722 if (use_innodb_replication_log)
00723 {
00724 TransactionLogConnection *connection;
00725 connection= new TransactionLogConnection(current_host, opt_drizzle_port,
00726 current_user, opt_password,
00727 use_drizzle_protocol);
00728
00729
00730 vector<uint64_t> ordered_trx_id_list;
00731 string query_string;
00732
00733 if (vm.count("transaction-id"))
00734 {
00735 ordered_trx_id_list.push_back(opt_transaction_id);
00736 }
00737 else if (vm.count("start-transaction-id"))
00738 {
00739 query_string.append("SELECT ID"
00740 " FROM DATA_DICTIONARY.SYS_REPLICATION_LOG"
00741 " WHERE COMMIT_ID > 0"
00742 " AND ID > ");
00743 query_string.append(boost::lexical_cast<string>(opt_start_transaction_id));
00744 query_string.append(" ORDER BY COMMIT_ID ASC");
00745
00746 getTrxIdList(connection, query_string, ordered_trx_id_list);
00747 }
00748 else
00749 {
00750 query_string.append("SELECT ID"
00751 " FROM DATA_DICTIONARY.SYS_REPLICATION_LOG"
00752 " WHERE COMMIT_ID > 0"
00753 " ORDER BY COMMIT_ID ASC");
00754
00755 getTrxIdList(connection, query_string, ordered_trx_id_list);
00756 }
00757
00758 if (extractRowsForTrxIds(connection, ordered_trx_id_list,
00759 summarize, ignore_events, print_as_raw))
00760 {
00761 return -1;
00762 }
00763 }
00764
00765
00766
00767
00768
00769 else
00770 {
00771 string filename= vm["input-file"].as< vector<string> >()[0];
00772
00773 TransactionFileReader fileReader;
00774
00775 if (not fileReader.openFile(filename, opt_start_pos))
00776 {
00777 cerr << fileReader.getErrorString() << endl;
00778 return -1;
00779 }
00780
00781 message::Transaction transaction;
00782 TransactionManager trx_mgr;
00783 uint32_t checksum= 0;
00784
00785 while (fileReader.getNextTransaction(transaction, &checksum))
00786 {
00787 const message::TransactionContext trx= transaction.transaction_context();
00788 uint64_t transaction_id= trx.transaction_id();
00789
00790
00791
00792
00793
00794 if (vm.count("transaction-id"))
00795 {
00796 if (opt_transaction_id == transaction_id)
00797 {
00798 if (not processTransactionMessage(trx_mgr, transaction, summarize,
00799 ignore_events, print_as_raw))
00800 {
00801 return -1;
00802 }
00803 }
00804 else
00805 {
00806 continue;
00807 }
00808 }
00809 else
00810 {
00811
00812
00813
00814 if (not processTransactionMessage(trx_mgr, transaction, summarize,
00815 ignore_events, print_as_raw))
00816 {
00817 return -1;
00818 }
00819 }
00820
00821 if (do_checksum)
00822 {
00823 uint32_t calculated= fileReader.checksumLastReadTransaction();
00824 if (checksum != calculated)
00825 {
00826 cerr << _("Checksum failed. Wanted ")
00827 << checksum
00828 << _(" got ")
00829 << calculated
00830 << endl;
00831 }
00832 }
00833 }
00834
00835 string error= fileReader.getErrorString();
00836
00837 if (error != "EOF")
00838 {
00839 cerr << error << endl;
00840 return -1;
00841 }
00842 }
00843
00844 return 0;
00845 }