00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <config.h>
00022 #include <plugin/slave/replication_schema.h>
00023 #include <drizzled/execute.h>
00024 #include <drizzled/sql/result_set.h>
00025 #include <string>
00026 #include <vector>
00027 #include <boost/lexical_cast.hpp>
00028
00029 using namespace std;
00030 using namespace drizzled;
00031 using namespace boost;
00032
00033 namespace slave
00034 {
00035
00036 bool ReplicationSchema::create()
00037 {
00038 vector<string> sql;
00039
00040 sql.push_back("COMMIT");
00041 sql.push_back("CREATE SCHEMA IF NOT EXISTS `sys_replication` REPLICATE=FALSE");
00042
00043 if (not executeSQL(sql))
00044 return false;
00045
00046
00047
00048
00049
00050 sql.clear();
00051 sql.push_back("COMMIT");
00052 sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`io_state` ("
00053 " `status` VARCHAR(20) NOT NULL,"
00054 " `error_msg` VARCHAR(250))"
00055 " COMMENT = 'VERSION 1.0'");
00056
00057 if (not executeSQL(sql))
00058 return false;
00059
00060 sql.clear();
00061 sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`io_state`");
00062
00063 {
00064 sql::ResultSet result_set(1);
00065 Execute execute(*(_session.get()), true);
00066 execute.run(sql[0], result_set);
00067 result_set.next();
00068 string count= result_set.getString(0);
00069
00070
00071 if (count == "0")
00072 {
00073 sql.clear();
00074 sql.push_back("INSERT INTO `sys_replication`.`io_state` (`status`)"
00075 " VALUES ('STOPPED')");
00076 if (not executeSQL(sql))
00077 return false;
00078 }
00079 }
00080
00081
00082
00083
00084
00085 sql.clear();
00086 sql.push_back("COMMIT");
00087 sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`applier_state`"
00088 " (`last_applied_commit_id` BIGINT NOT NULL PRIMARY KEY,"
00089 " `status` VARCHAR(20) NOT NULL,"
00090 " `error_msg` VARCHAR(250))"
00091 " COMMENT = 'VERSION 1.0'");
00092
00093 if (not executeSQL(sql))
00094 return false;
00095
00096 sql.clear();
00097 sql.push_back("SELECT COUNT(*) FROM `sys_replication`.`applier_state`");
00098
00099 {
00100 sql::ResultSet result_set(1);
00101 Execute execute(*(_session.get()), true);
00102 execute.run(sql[0], result_set);
00103 result_set.next();
00104 string count= result_set.getString(0);
00105
00106
00107 if (count == "0")
00108 {
00109 sql.clear();
00110 sql.push_back("INSERT INTO `sys_replication`.`applier_state`"
00111 " (`last_applied_commit_id`, `status`)"
00112 " VALUES (0, 'STOPPED')");
00113 if (not executeSQL(sql))
00114 return false;
00115 }
00116 }
00117
00118
00119
00120
00121
00122 sql.clear();
00123 sql.push_back("COMMIT");
00124 sql.push_back("CREATE TABLE IF NOT EXISTS `sys_replication`.`queue`"
00125 " (`trx_id` BIGINT NOT NULL, `seg_id` INT NOT NULL,"
00126 " `commit_order` BIGINT, `msg` BLOB,"
00127 " PRIMARY KEY(`trx_id`, `seg_id`))"
00128 " COMMENT = 'VERSION 1.0'");
00129 if (not executeSQL(sql))
00130 return false;
00131
00132 return true;
00133 }
00134
00135 bool ReplicationSchema::setInitialMaxCommitId(uint64_t value)
00136 {
00137 vector<string> sql;
00138
00139 sql.push_back("UPDATE `sys_replication`.`applier_state`"
00140 " SET `last_applied_commit_id` = "
00141 + lexical_cast<string>(value));
00142
00143 return executeSQL(sql);
00144 }
00145
00146 }