00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00039 #include <config.h>
00040 #include <drizzled/gettext.h>
00041 #include <drizzled/plugin/transaction_applier.h>
00042 #include <drizzled/message/transaction.pb.h>
00043 #include <drizzled/plugin.h>
00044
00045 #include <drizzled/item/string.h>
00046 #include "filtered_replicator.h"
00047 #include <boost/program_options.hpp>
00048 #include <drizzled/module/option_map.h>
00049 #include <vector>
00050 #include <string>
00051 namespace po= boost::program_options;
00052 using namespace std;
00053 using namespace drizzled;
00054
00055 namespace drizzle_plugin
00056 {
00057
00058 static string sysvar_filtered_replicator_sch_filters;
00059 static string sysvar_filtered_replicator_tab_filters;
00060
00061 FilteredReplicator::FilteredReplicator(string name_arg,
00062 const std::string &sch_filter,
00063 const std::string &tab_filter,
00064 const std::string &sch_regex,
00065 const std::string &tab_regex) :
00066 plugin::TransactionReplicator(name_arg),
00067 schemas_to_filter(),
00068 tables_to_filter(),
00069 _sch_filter(sch_filter),
00070 _tab_filter(tab_filter),
00071 _sch_regex(sch_regex),
00072 _tab_regex(tab_regex),
00073 sch_re(NULL),
00074 tab_re(NULL)
00075 {
00076
00077
00078
00079
00080 if (not _sch_filter.empty())
00081 {
00082 populateFilter(_sch_filter, schemas_to_filter);
00083 }
00084
00085
00086
00087
00088
00089 if (not _tab_filter.empty())
00090 {
00091 populateFilter(_tab_filter, tables_to_filter);
00092 }
00093
00094
00095
00096
00097
00098 if (not _sch_regex.empty())
00099 {
00100 const char *error= NULL;
00101 int32_t error_offset= 0;
00102 sch_re= pcre_compile(_sch_regex.c_str(),
00103 0,
00104 &error,
00105 &error_offset,
00106 NULL);
00107 }
00108
00109
00110
00111
00112
00113 if (not _tab_regex.empty())
00114 {
00115 const char *error= NULL;
00116 int32_t error_offset= 0;
00117 tab_re= pcre_compile(_tab_regex.c_str(),
00118 0,
00119 &error,
00120 &error_offset,
00121 NULL);
00122 }
00123
00124 pthread_mutex_init(&sch_vector_lock, NULL);
00125 pthread_mutex_init(&tab_vector_lock, NULL);
00126 pthread_mutex_init(&sysvar_sch_lock, NULL);
00127 pthread_mutex_init(&sysvar_tab_lock, NULL);
00128 }
00129
00130 FilteredReplicator::~FilteredReplicator()
00131 {
00132 if (sch_re)
00133 {
00134 pcre_free(sch_re);
00135 }
00136 if (tab_re)
00137 {
00138 pcre_free(tab_re);
00139 }
00140
00141 pthread_mutex_destroy(&sch_vector_lock);
00142 pthread_mutex_destroy(&tab_vector_lock);
00143 pthread_mutex_destroy(&sysvar_sch_lock);
00144 pthread_mutex_destroy(&sysvar_tab_lock);
00145
00146 }
00147
00148 void FilteredReplicator::parseStatementTableMetadata(const message::Statement &in_statement,
00149 string &in_schema_name,
00150 string &in_table_name) const
00151 {
00152 switch (in_statement.type())
00153 {
00154 case message::Statement::INSERT:
00155 {
00156 const message::TableMetadata &metadata= in_statement.insert_header().table_metadata();
00157 in_schema_name.assign(metadata.schema_name());
00158 in_table_name.assign(metadata.table_name());
00159 break;
00160 }
00161 case message::Statement::UPDATE:
00162 {
00163 const message::TableMetadata &metadata= in_statement.update_header().table_metadata();
00164 in_schema_name.assign(metadata.schema_name());
00165 in_table_name.assign(metadata.table_name());
00166 break;
00167 }
00168 case message::Statement::DELETE:
00169 {
00170 const message::TableMetadata &metadata= in_statement.delete_header().table_metadata();
00171 in_schema_name.assign(metadata.schema_name());
00172 in_table_name.assign(metadata.table_name());
00173 break;
00174 }
00175 case message::Statement::CREATE_SCHEMA:
00176 {
00177 in_schema_name.assign(in_statement.create_schema_statement().schema().name());
00178 in_table_name.clear();
00179 break;
00180 }
00181 case message::Statement::ALTER_SCHEMA:
00182 {
00183 in_schema_name.assign(in_statement.alter_schema_statement().after().name());
00184 in_table_name.clear();
00185 break;
00186 }
00187 case message::Statement::DROP_SCHEMA:
00188 {
00189 in_schema_name.assign(in_statement.drop_schema_statement().schema_name());
00190 in_table_name.clear();
00191 break;
00192 }
00193 case message::Statement::CREATE_TABLE:
00194 {
00195 in_schema_name.assign(in_statement.create_table_statement().table().schema());
00196 in_table_name.assign(in_statement.create_table_statement().table().name());
00197 break;
00198 }
00199 case message::Statement::ALTER_TABLE:
00200 {
00201 in_schema_name.assign(in_statement.alter_table_statement().after().schema());
00202 in_table_name.assign(in_statement.alter_table_statement().after().name());
00203 break;
00204 }
00205 case message::Statement::DROP_TABLE:
00206 {
00207 const message::TableMetadata &metadata= in_statement.drop_table_statement().table_metadata();
00208 in_schema_name.assign(metadata.schema_name());
00209 in_table_name.assign(metadata.table_name());
00210 break;
00211 }
00212 default:
00213 {
00214
00215 in_schema_name.clear();
00216 in_table_name.clear();
00217 break;
00218 }
00219 }
00220 }
00221
00222 plugin::ReplicationReturnCode
00223 FilteredReplicator::replicate(plugin::TransactionApplier *in_applier,
00224 Session &in_session,
00225 message::Transaction &to_replicate)
00226 {
00227 string schema_name;
00228 string table_name;
00229
00230 size_t num_statements= to_replicate.statement_size();
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240 message::Transaction filtered_transaction;
00241
00242 for (size_t x= 0; x < num_statements; ++x)
00243 {
00244 schema_name.clear();
00245 table_name.clear();
00246
00247 const message::Statement &statement= to_replicate.statement(x);
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259 if (statement.type() == message::Statement::RAW_SQL)
00260 {
00261 parseQuery(statement.sql(), schema_name, table_name);
00262 }
00263 else
00264 {
00265 parseStatementTableMetadata(statement, schema_name, table_name);
00266 }
00267
00268
00269
00270
00271
00272
00273
00274 std::transform(schema_name.begin(), schema_name.end(),
00275 schema_name.begin(), ::tolower);
00276 std::transform(table_name.begin(), table_name.end(),
00277 table_name.begin(), ::tolower);
00278
00279 if (! isSchemaFiltered(schema_name) &&
00280 ! isTableFiltered(table_name))
00281 {
00282 message::Statement *s= filtered_transaction.add_statement();
00283 *s= statement;
00284 }
00285 }
00286
00287 if (filtered_transaction.statement_size() > 0)
00288 {
00289
00290
00291
00292
00293
00294 message::TransactionContext *tc= filtered_transaction.mutable_transaction_context();
00295 *tc= to_replicate.transaction_context();
00296 return in_applier->apply(in_session, filtered_transaction);
00297 }
00298 return plugin::SUCCESS;
00299 }
00300
00301 void FilteredReplicator::populateFilter(std::string input,
00302 std::vector<string> &filter)
00303 {
00304
00305
00306
00307
00308 std::transform(input.begin(), input.end(),
00309 input.begin(), ::tolower);
00310 string::size_type last_pos= input.find_first_not_of(',', 0);
00311 string::size_type pos= input.find_first_of(',', last_pos);
00312
00313 while (pos != string::npos || last_pos != string::npos)
00314 {
00315 filter.push_back(input.substr(last_pos, pos - last_pos));
00316 last_pos= input.find_first_not_of(',', pos);
00317 pos= input.find_first_of(',', last_pos);
00318 }
00319 }
00320
00321 bool FilteredReplicator::isSchemaFiltered(const string &schema_name)
00322 {
00323 pthread_mutex_lock(&sch_vector_lock);
00324 std::vector<string>::iterator it= find(schemas_to_filter.begin(),
00325 schemas_to_filter.end(),
00326 schema_name);
00327 if (it != schemas_to_filter.end())
00328 {
00329 pthread_mutex_unlock(&sch_vector_lock);
00330 return true;
00331 }
00332 pthread_mutex_unlock(&sch_vector_lock);
00333
00334
00335
00336
00337
00338
00339 if (not _sch_regex.empty())
00340 {
00341 int32_t result= pcre_exec(sch_re,
00342 NULL,
00343 schema_name.c_str(),
00344 schema_name.length(),
00345 0,
00346 0,
00347 NULL,
00348 0);
00349 if (result >= 0)
00350 {
00351 return true;
00352 }
00353 }
00354
00355 return false;
00356 }
00357
00358 bool FilteredReplicator::isTableFiltered(const string &table_name)
00359 {
00360 pthread_mutex_lock(&tab_vector_lock);
00361 std::vector<string>::iterator it= find(tables_to_filter.begin(),
00362 tables_to_filter.end(),
00363 table_name);
00364 if (it != tables_to_filter.end())
00365 {
00366 pthread_mutex_unlock(&tab_vector_lock);
00367 return true;
00368 }
00369 pthread_mutex_unlock(&tab_vector_lock);
00370
00371
00372
00373
00374
00375
00376 if (not _tab_regex.empty())
00377 {
00378 int32_t result= pcre_exec(tab_re,
00379 NULL,
00380 table_name.c_str(),
00381 table_name.length(),
00382 0,
00383 0,
00384 NULL,
00385 0);
00386 if (result >= 0)
00387 {
00388 return true;
00389 }
00390 }
00391
00392 return false;
00393 }
00394
00395 void FilteredReplicator::parseQuery(const string &sql,
00396 string &schema_name,
00397 string &table_name)
00398 {
00399
00400
00401
00402
00403 string::size_type pos= sql.find_first_of(' ', 0);
00404 string type= sql.substr(0, pos);
00405
00406
00407
00408
00409
00410 std::transform(type.begin(), type.end(),
00411 type.begin(), ::toupper);
00412
00413 if (type.compare("DROP") == 0)
00414 {
00415
00416
00417
00418
00419
00420
00421 pos= sql.find_first_of(' ', 11);
00422 string cmp_str= sql.substr(11, pos - 11);
00423 string target_name("");
00424 if (cmp_str.compare("IF") == 0)
00425 {
00426
00427 pos= sql.find_first_of(' ', 21);
00428 target_name.assign(sql.substr(21, pos - 21));
00429 }
00430 else
00431 {
00432 target_name.assign(cmp_str);
00433 }
00434
00435
00436
00437
00438
00439 pos= target_name.find_first_of('.', 0);
00440 if (pos != string::npos)
00441 {
00442
00443
00444
00445 schema_name.assign(target_name.substr(0, pos));
00446
00447
00448
00449 table_name.assign(target_name.substr(pos + 1));
00450 }
00451 else
00452 {
00453 table_name.assign(target_name);
00454 }
00455 }
00456 else if (type.compare("CREATE") == 0)
00457 {
00458
00459
00460
00461
00462
00463 pos= sql.find_first_of(' ', 13);
00464 string target_name= sql.substr(13, pos - 13);
00465
00466
00467
00468
00469
00470 pos= target_name.find_first_of('.', 0);
00471 if (pos != string::npos)
00472 {
00473
00474
00475
00476 schema_name.assign(target_name.substr(0, pos));
00477
00478
00479
00480 table_name.assign(target_name.substr(pos + 1));
00481 }
00482 else
00483 {
00484 table_name.assign(target_name);
00485 }
00486 }
00487 else
00488 {
00489
00490 return;
00491 }
00492 }
00493
00494 void FilteredReplicator::setSchemaFilter(const string &input)
00495 {
00496 pthread_mutex_lock(&sch_vector_lock);
00497 pthread_mutex_lock(&sysvar_sch_lock);
00498 _sch_filter.assign(input);
00499 schemas_to_filter.clear();
00500 populateFilter(_sch_filter, schemas_to_filter);
00501 pthread_mutex_unlock(&sysvar_sch_lock);
00502 pthread_mutex_unlock(&sch_vector_lock);
00503 }
00504
00505 void FilteredReplicator::setTableFilter(const string &input)
00506 {
00507 pthread_mutex_lock(&tab_vector_lock);
00508 pthread_mutex_lock(&sysvar_tab_lock);
00509 _tab_filter.assign(input);
00510 tables_to_filter.clear();
00511 populateFilter(_tab_filter, tables_to_filter);
00512 pthread_mutex_unlock(&sysvar_tab_lock);
00513 pthread_mutex_unlock(&tab_vector_lock);
00514 }
00515
00516 static FilteredReplicator *filtered_replicator= NULL;
00517
00518 static int filtered_schemas_validate(Session*, set_var *var)
00519 {
00520 const char *input= var->value->str_value.ptr();
00521 if (input == NULL)
00522 return 1;
00523
00524 if (input && filtered_replicator)
00525 {
00526 filtered_replicator->setSchemaFilter(input);
00527 return 0;
00528 }
00529 return 1;
00530 }
00531
00532
00533 static int filtered_tables_validate(Session*, set_var *var)
00534 {
00535 const char *input= var->value->str_value.ptr();
00536 if (input == NULL)
00537 return 1;
00538
00539 if (input && filtered_replicator)
00540 {
00541 filtered_replicator->setTableFilter(input);
00542 return 0;
00543 }
00544 return 1;
00545 }
00546
00547
00548 static int init(module::Context &context)
00549 {
00550 const module::option_map &vm= context.getOptions();
00551
00552 filtered_replicator= new FilteredReplicator("filtered_replicator",
00553 vm["filteredschemas"].as<string>(),
00554 vm["filteredtables"].as<string>(),
00555 vm["schemaregex"].as<string>(),
00556 vm["tableregex"].as<string>());
00557
00558 context.add(filtered_replicator);
00559 context.registerVariable(new sys_var_std_string("filteredschemas",
00560 sysvar_filtered_replicator_sch_filters,
00561 filtered_schemas_validate));
00562 context.registerVariable(new sys_var_std_string("filteredtables",
00563 sysvar_filtered_replicator_tab_filters,
00564 filtered_tables_validate));
00565
00566 context.registerVariable(new sys_var_const_string_val("schemaregex",
00567 vm["schemaregex"].as<string>()));
00568 context.registerVariable(new sys_var_const_string_val("tableregex",
00569 vm["tableregex"].as<string>()));
00570
00571 return 0;
00572 }
00573
00574 static void init_options(drizzled::module::option_context &context)
00575 {
00576 context("filteredschemas",
00577 po::value<string>(&sysvar_filtered_replicator_sch_filters)->default_value(""),
00578 N_("Comma-separated list of schemas to exclude"));
00579 context("filteredtables",
00580 po::value<string>(&sysvar_filtered_replicator_tab_filters)->default_value(""),
00581 N_("Comma-separated list of tables to exclude"));
00582 context("schemaregex",
00583 po::value<string>()->default_value(""),
00584 N_("Regular expression to apply to schemas to exclude"));
00585 context("tableregex",
00586 po::value<string>()->default_value(""),
00587 N_("Regular expression to apply to tables to exclude"));
00588 }
00589
00590 }
00591
00592 DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);