Drizzled Public API Documentation

create_replication.cc

00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2010 Brian Aker
00005  *
00006  *  This program is free software; you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation; either version 2 of the License, or
00009  *  (at your option) any later version.
00010  *
00011  *  This program is distributed in the hope that it will be useful,
00012  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *  GNU General Public License for more details.
00015  *
00016  *  You should have received a copy of the GNU General Public License
00017  *  along with this program; if not, write to the Free Software
00018  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00019  */
00020 
00021 #include <config.h>
00022 
00023 #include "read_replication.h"
00024 #include "create_replication.h"
00025 
00026 #ifdef UNIV_NONINL
00027 #include "dict0crea.ic"
00028 #endif
00029 
00030 #include "btr0pcur.h"
00031 #include "btr0btr.h"
00032 #include "page0page.h"
00033 #include "mach0data.h"
00034 #include "dict0boot.h"
00035 #include "dict0dict.h"
00036 #include "que0que.h"
00037 #include "row0ins.h"
00038 #include "row0mysql.h"
00039 #include "pars0pars.h"
00040 #include "trx0roll.h"
00041 #include "usr0sess.h"
00042 #include "ut0vec.h"
00043 #include "row0merge.h"
00044 #include "row0mysql.h"
00045 
00046 UNIV_INTERN ulint dict_create_sys_replication_log(void)
00047 {
00048   dict_table_t* table1;
00049   ulint error;
00050   trx_t *trx;
00051 
00052   mutex_enter(&(dict_sys->mutex));
00053 
00054   table1 = dict_table_get_low("SYS_REPLICATION_LOG");
00055 
00056   trx_sys_read_commit_id();
00057 
00058   if (table1) 
00059   {
00060     mutex_exit(&(dict_sys->mutex));
00061 
00062     return(DB_SUCCESS);
00063   }
00064 
00065   mutex_exit(&(dict_sys->mutex));
00066 
00067   trx= trx_allocate_for_mysql();
00068 
00069   trx->op_info= "creating replication sys table";
00070 
00071   row_mysql_lock_data_dictionary(trx);
00072 
00073   pars_info_t *info= pars_info_create();
00074 
00075 
00076   error = que_eval_sql(info,
00077                        "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
00078                        "BEGIN\n"
00079                        "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n" 
00080                        "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
00081                        "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
00082                        "END;\n"
00083                        , FALSE, trx);
00084 
00085 
00086 
00087   if (error != DB_SUCCESS)
00088   {
00089     fprintf(stderr, "InnoDB: error %lu in creation.\n", (ulong) error);
00090 
00091     ut_a(error == DB_OUT_OF_FILE_SPACE || error == DB_TOO_MANY_CONCURRENT_TRXS);
00092 
00093     fprintf(stderr,
00094             "InnoDB: creation failed\n"
00095             "InnoDB: tablespace is full\n"
00096             "InnoDB: dropping incompletely created SYS_REPLICATION_LOG table.\n");
00097 
00098     row_drop_table_for_mysql("SYS_REPLICATION_LOG", trx, TRUE);
00099 
00100     error = DB_MUST_GET_MORE_FILE_SPACE;
00101   }
00102 
00103   trx_commit_for_mysql(trx);
00104 
00105   row_mysql_unlock_data_dictionary(trx);
00106 
00107   trx_free_for_mysql(trx);
00108 
00109   return(error);
00110 }
00111 
00112 UNIV_INTERN int read_replication_log_table_message(const char* table_name, drizzled::message::Table *table_message)
00113 {
00114   std::string search_string(table_name);
00115   boost::algorithm::to_lower(search_string);
00116 
00117   if (search_string.compare("sys_replication_log") != 0)
00118     return -1;
00119 
00120   drizzled::message::Engine *engine= table_message->mutable_engine();
00121   engine->set_name("InnoDB");
00122   table_message->set_name("SYS_REPLICATION_LOG");
00123   table_message->set_schema("DATA_DICTIONARY");
00124   table_message->set_type(drizzled::message::Table::STANDARD);
00125   table_message->set_creation_timestamp(0);
00126   table_message->set_update_timestamp(0);
00127 
00128   drizzled::message::Table::TableOptions *options= table_message->mutable_options();
00129   options->set_collation_id(drizzled::my_charset_bin.number);
00130   options->set_collation(drizzled::my_charset_bin.name);
00131   options->set_dont_replicate(true);
00132 
00133   drizzled::message::Table::Field *field= table_message->add_field();
00134   field->set_name("ID");
00135   field->set_type(drizzled::message::Table::Field::BIGINT);
00136 
00137   field= table_message->add_field();
00138   field->set_name("SEGID");
00139   field->set_type(drizzled::message::Table::Field::INTEGER);
00140 
00141   field= table_message->add_field();
00142   field->set_name("COMMIT_ID");
00143   field->set_type(drizzled::message::Table::Field::BIGINT);
00144 
00145   field= table_message->add_field();
00146   field->set_name("END_TIMESTAMP");
00147   field->set_type(drizzled::message::Table::Field::BIGINT);
00148 
00149   field= table_message->add_field();
00150   field->set_name("MESSAGE_LEN");
00151   field->set_type(drizzled::message::Table::Field::INTEGER);
00152 
00153   field= table_message->add_field();
00154   field->set_name("MESSAGE");
00155   field->set_type(drizzled::message::Table::Field::BLOB);
00156   drizzled::message::Table::Field::StringFieldOptions *stropt= field->mutable_string_options();
00157   stropt->set_collation_id(drizzled::my_charset_bin.number);
00158   stropt->set_collation(drizzled::my_charset_bin.name);
00159 
00160   drizzled::message::Table::Index *index= table_message->add_indexes();
00161   index->set_name("PRIMARY");
00162   index->set_is_primary(true);
00163   index->set_is_unique(true);
00164   index->set_type(drizzled::message::Table::Index::BTREE);
00165   index->set_key_length(12);
00166   drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
00167   part->set_fieldnr(0);
00168   part->set_compare_length(8);
00169   part= index->add_index_part();
00170   part->set_fieldnr(1);
00171   part->set_compare_length(4);
00172 
00173   index= table_message->add_indexes();
00174   index->set_name("COMMIT_IDX");
00175   index->set_is_primary(false);
00176   index->set_is_unique(false);
00177   index->set_type(drizzled::message::Table::Index::BTREE);
00178   index->set_key_length(16);
00179   part= index->add_index_part();
00180   part->set_fieldnr(2);
00181   part->set_compare_length(8);
00182   part= index->add_index_part();
00183   part->set_fieldnr(0);
00184   part->set_compare_length(8);
00185 
00186   return 0;
00187 }
00188 
00189 extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t*  prebuilt);
00190 
00191 ulint insert_replication_message(const char *message, size_t size, 
00192                                  trx_t *trx, uint64_t trx_id, 
00193                                  uint64_t end_timestamp, bool is_end_segment, 
00194                                  uint32_t seg_id) 
00195 {
00196   ulint error;
00197   row_prebuilt_t* prebuilt; /* For reading rows */
00198   dict_table_t *table;
00199   que_thr_t*  thr;
00200   byte*  data;
00201 
00202   table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
00203 
00204   prebuilt = row_create_prebuilt(table);
00205 
00206   if (prebuilt->trx != trx) 
00207   {
00208     row_update_prebuilt_trx(prebuilt, trx);
00209   }
00210 
00211   /* DDL operations create table/drop table call
00212    * innobase_commit_low() which will commit the trx
00213    * that leaves the operation of committing to the
00214    * log in a new trx. If that is the case we need
00215    * to keep track and commit the trx later in this
00216    * function. 
00217    */ 
00218   bool is_started= true;
00219   if (trx->conc_state == TRX_NOT_STARTED)
00220   {
00221     is_started= false;
00222   }
00223 
00224   dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
00225   dfield_t *dfield;
00226 
00227   dfield = dtuple_get_nth_field(dtuple, 0);
00228   data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
00229   row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&trx_id, 8, dict_table_is_comp(prebuilt->table));
00230   dfield_set_data(dfield, data, 8);
00231 
00232   dfield = dtuple_get_nth_field(dtuple, 1);
00233 
00234   data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
00235   row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&seg_id, 4, dict_table_is_comp(prebuilt->table));
00236   dfield_set_data(dfield, data, 4);
00237   
00238   uint64_t commit_id= 0;
00239   if (is_end_segment)
00240   {
00241     commit_id= trx_sys_commit_id.increment();
00242   } 
00243 
00244   dfield = dtuple_get_nth_field(dtuple, 2);
00245   data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
00246   row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&commit_id, 8, dict_table_is_comp(prebuilt->table));
00247   dfield_set_data(dfield, data, 8);
00248 
00249   dfield = dtuple_get_nth_field(dtuple, 3);
00250   data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
00251   row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
00252   dfield_set_data(dfield, data, 8);
00253 
00254   dfield = dtuple_get_nth_field(dtuple, 4);
00255   data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
00256   row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
00257   dfield_set_data(dfield, data, 4);
00258 
00259   dfield = dtuple_get_nth_field(dtuple, 5);
00260   dfield_set_data(dfield, message, size);
00261 
00262   ins_node_t* node    = prebuilt->ins_node;
00263 
00264   thr = que_fork_get_first_thr(prebuilt->ins_graph);
00265 
00266   if (prebuilt->sql_stat_start) {
00267     node->state = INS_NODE_SET_IX_LOCK;
00268     prebuilt->sql_stat_start = FALSE;
00269   } else {
00270     node->state = INS_NODE_ALLOC_ROW_ID;
00271   }
00272 
00273   que_thr_move_to_run_state_for_mysql(thr, trx);
00274 
00275 //run_again:
00276   thr->run_node = node;
00277   thr->prev_node = node;
00278 
00279   row_ins_step(thr);
00280 
00281   error = trx->error_state;
00282 
00283   que_thr_stop_for_mysql_no_error(thr, trx);
00284   row_prebuilt_free(prebuilt, FALSE);
00285 
00286   if (! is_started)
00287   {
00288     trx_commit_for_mysql(trx);
00289   }
00290 
00291   return error;
00292 }
00293 
00294 UNIV_INTERN read_replication_state_st *replication_read_init(void)
00295 {
00296   read_replication_state_st *state= new read_replication_state_st;
00297 
00298   mutex_enter(&(dict_sys->mutex));
00299 
00300   mtr_start(&state->mtr);
00301   state->sys_tables= dict_table_get_low("SYS_REPLICATION_LOG");
00302   state->sys_index= UT_LIST_GET_FIRST(state->sys_tables->indexes);
00303 
00304   mutex_exit(&(dict_sys->mutex));
00305 
00306   btr_pcur_open_at_index_side(TRUE, state->sys_index, BTR_SEARCH_LEAF, &state->pcur, TRUE, &state->mtr);
00307 
00308   return state;
00309 }
00310 
00311 UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *state)
00312 {
00313   btr_pcur_close(&state->pcur);
00314   mtr_commit(&state->mtr);
00315   delete state;
00316 }
00317 
00318 UNIV_INTERN struct read_replication_return_st replication_read_next(struct read_replication_state_st *state)
00319 {
00320   struct read_replication_return_st ret;
00321   const rec_t *rec;
00322 
00323   btr_pcur_move_to_next_user_rec(&state->pcur, &state->mtr);
00324 
00325   rec= btr_pcur_get_rec(&state->pcur);
00326 
00327   while (btr_pcur_is_on_user_rec(&state->pcur))
00328   {
00329     const byte* field;
00330     ulint len;
00331 
00332     // Is the row deleted? If so go fetch the next
00333     if (rec_get_deleted_flag(rec, 0))
00334       continue;
00335 
00336     // Store transaction id
00337     field = rec_get_nth_field_old(rec, 0, &len);
00338     byte idbyte[8];
00339     convert_to_mysql_format(idbyte, field, 8);
00340     ret.id= *(uint64_t *)idbyte;
00341 
00342     // Store segment id
00343     field = rec_get_nth_field_old(rec, 1, &len);
00344     byte segbyte[4];
00345     convert_to_mysql_format(segbyte, field, 4);
00346     ret.seg_id= *(uint32_t *)segbyte;
00347 
00348     field = rec_get_nth_field_old(rec, 4, &len);
00349     byte commitbyte[8];
00350     convert_to_mysql_format(commitbyte, field, 8);
00351     ret.commit_id= *(uint64_t *)commitbyte;
00352 
00353     field = rec_get_nth_field_old(rec, 5, &len);
00354     byte timestampbyte[8];
00355     convert_to_mysql_format(timestampbyte, field, 8);
00356     ret.end_timestamp= *(uint64_t *)timestampbyte;
00357 
00358     // Handler message
00359     field = rec_get_nth_field_old(rec, 7, &len);
00360     ret.message= (char *)field;
00361     ret.message_length= len;
00362 
00363     // @todo double check that "field" will continue to be value past this
00364     // point.
00365     btr_pcur_store_position(&state->pcur, &state->mtr);
00366     mtr_commit(&state->mtr);
00367 
00368     mtr_start(&state->mtr);
00369 
00370     btr_pcur_restore_position(BTR_SEARCH_LEAF, &state->pcur, &state->mtr);
00371 
00372     return ret;
00373   }
00374 
00375   /* end of index */
00376   memset(&ret, 0, sizeof(ret));
00377 
00378   return ret;
00379 }
00380 
00381 UNIV_INTERN void convert_to_mysql_format(byte* out, const byte* in, int len)
00382 {
00383   byte *ptr;
00384   ptr = out + len;
00385 
00386   for (;;) {
00387     ptr--;
00388     *ptr = *in;
00389     if (ptr == out) {
00390       break;
00391     }
00392     in++;
00393   }
00394 
00395   out[len - 1] = (byte) (out[len - 1] ^ 128);
00396 
00397 }