Drizzled Public API Documentation

filtered_replicator.cc

Go to the documentation of this file.
00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2009 Sun Microsystems, Inc.
00005  *
00006  *  This program is free software; you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation; either version 2 of the License, or
00009  *  (at your option) any later version.
00010  *
00011  *  This program is distributed in the hope that it will be useful,
00012  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *  GNU General Public License for more details.
00015  *
00016  *  You should have received a copy of the GNU General Public License
00017  *  along with this program; if not, write to the Free Software
00018  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00019  */
00020 
00039 #include <config.h>
00040 #include <drizzled/gettext.h>
00041 #include <drizzled/plugin/transaction_applier.h>
00042 #include <drizzled/message/transaction.pb.h>
00043 #include <drizzled/plugin.h>
00044 
00045 #include <drizzled/item/string.h>
00046 #include "filtered_replicator.h"
00047 #include <boost/program_options.hpp>
00048 #include <drizzled/module/option_map.h>
00049 #include <vector>
00050 #include <string>
00051 namespace po= boost::program_options;
00052 using namespace std;
00053 using namespace drizzled;
00054 
00055 namespace drizzle_plugin
00056 {
00057 
00058 static string sysvar_filtered_replicator_sch_filters;
00059 static string sysvar_filtered_replicator_tab_filters;
00060 
00061 FilteredReplicator::FilteredReplicator(string name_arg,
00062                                        const std::string &sch_filter,
00063                                        const std::string &tab_filter,
00064                                        const std::string &sch_regex,
00065                                        const std::string &tab_regex) :
00066   plugin::TransactionReplicator(name_arg),
00067   schemas_to_filter(),
00068   tables_to_filter(),
00069   _sch_filter(sch_filter),
00070   _tab_filter(tab_filter),
00071   _sch_regex(sch_regex),
00072   _tab_regex(tab_regex),
00073   sch_re(NULL),
00074   tab_re(NULL)
00075 {
00076   /* 
00077    * Add each of the specified schemas to the vector of schemas
00078    * to filter.
00079    */
00080   if (not _sch_filter.empty())
00081   {
00082     populateFilter(_sch_filter, schemas_to_filter);
00083   }
00084 
00085   /* 
00086    * Add each of the specified tables to the vector of tables
00087    * to filter.
00088    */
00089   if (not _tab_filter.empty())
00090   {
00091     populateFilter(_tab_filter, tables_to_filter);
00092   }
00093 
00094   /* 
00095    * Compile the regular expression for schema's to filter
00096    * if one is specified.
00097    */
00098   if (not _sch_regex.empty())
00099   {
00100     const char *error= NULL;
00101     int32_t error_offset= 0;
00102     sch_re= pcre_compile(_sch_regex.c_str(),
00103                          0,
00104                          &error,
00105                          &error_offset,
00106                          NULL);
00107   }
00108 
00109   /* 
00110    * Compile the regular expression for table's to filter
00111    * if one is specified.
00112    */
00113   if (not _tab_regex.empty())
00114   {
00115     const char *error= NULL;
00116     int32_t error_offset= 0;
00117     tab_re= pcre_compile(_tab_regex.c_str(),
00118                          0,
00119                          &error,
00120                          &error_offset,
00121                          NULL);
00122   }
00123 
00124   pthread_mutex_init(&sch_vector_lock, NULL);
00125   pthread_mutex_init(&tab_vector_lock, NULL);
00126   pthread_mutex_init(&sysvar_sch_lock, NULL);
00127   pthread_mutex_init(&sysvar_tab_lock, NULL);
00128 }
00129 
00130 FilteredReplicator::~FilteredReplicator()
00131 {
00132   if (sch_re)
00133   {
00134     pcre_free(sch_re);
00135   }
00136   if (tab_re)
00137   {
00138     pcre_free(tab_re);
00139   }
00140 
00141   pthread_mutex_destroy(&sch_vector_lock);
00142   pthread_mutex_destroy(&tab_vector_lock);
00143   pthread_mutex_destroy(&sysvar_sch_lock);
00144   pthread_mutex_destroy(&sysvar_tab_lock);
00145 
00146 }
00147 
00148 void FilteredReplicator::parseStatementTableMetadata(const message::Statement &in_statement,
00149                                                      string &in_schema_name,
00150                                                      string &in_table_name) const
00151 {
00152   switch (in_statement.type())
00153   {
00154     case message::Statement::INSERT:
00155     {
00156       const message::TableMetadata &metadata= in_statement.insert_header().table_metadata();
00157       in_schema_name.assign(metadata.schema_name());
00158       in_table_name.assign(metadata.table_name());
00159       break;
00160     }
00161     case message::Statement::UPDATE:
00162     {
00163       const message::TableMetadata &metadata= in_statement.update_header().table_metadata();
00164       in_schema_name.assign(metadata.schema_name());
00165       in_table_name.assign(metadata.table_name());
00166       break;
00167     }
00168     case message::Statement::DELETE:
00169     {
00170       const message::TableMetadata &metadata= in_statement.delete_header().table_metadata();
00171       in_schema_name.assign(metadata.schema_name());
00172       in_table_name.assign(metadata.table_name());
00173       break;
00174     }
00175     case message::Statement::CREATE_SCHEMA:
00176     {
00177       in_schema_name.assign(in_statement.create_schema_statement().schema().name());
00178       in_table_name.clear();
00179       break;
00180     }
00181     case message::Statement::ALTER_SCHEMA:
00182     {
00183       in_schema_name.assign(in_statement.alter_schema_statement().after().name());
00184       in_table_name.clear();
00185       break;
00186     }
00187     case message::Statement::DROP_SCHEMA:
00188     {
00189       in_schema_name.assign(in_statement.drop_schema_statement().schema_name());
00190       in_table_name.clear();
00191       break;
00192     }
00193     case message::Statement::CREATE_TABLE:
00194     {
00195       in_schema_name.assign(in_statement.create_table_statement().table().schema());
00196       in_table_name.assign(in_statement.create_table_statement().table().name());
00197       break;
00198     }
00199     case message::Statement::ALTER_TABLE:
00200     {
00201       in_schema_name.assign(in_statement.alter_table_statement().after().schema());
00202       in_table_name.assign(in_statement.alter_table_statement().after().name());
00203       break;
00204     }
00205     case message::Statement::DROP_TABLE:
00206     {
00207       const message::TableMetadata &metadata= in_statement.drop_table_statement().table_metadata();
00208       in_schema_name.assign(metadata.schema_name());
00209       in_table_name.assign(metadata.table_name());
00210       break;
00211     }
00212     default:
00213     {
00214       /* All other types have no schema and table information */
00215       in_schema_name.clear();
00216       in_table_name.clear();
00217       break;
00218     }
00219   }  
00220 }
00221 
00222 plugin::ReplicationReturnCode
00223 FilteredReplicator::replicate(plugin::TransactionApplier *in_applier,
00224                               Session &in_session,
00225                               message::Transaction &to_replicate)
00226 {
00227   string schema_name;
00228   string table_name;
00229 
00230   size_t num_statements= to_replicate.statement_size();
00231 
00232   /* 
00233    * We build a new transaction message containing only Statement
00234    * messages that have not been filtered.
00235    *
00236    * @todo A more efficient method would be to rework the pointers
00237    * that the to_replicate.statement() vector contains and remove
00238    * the statement pointers that are filtered...
00239    */
00240   message::Transaction filtered_transaction;
00241 
00242   for (size_t x= 0; x < num_statements; ++x)
00243   {
00244     schema_name.clear();
00245     table_name.clear();
00246 
00247     const message::Statement &statement= to_replicate.statement(x);
00248 
00249     /*
00250      * First, we check to see if the command consists of raw SQL. If so,
00251      * we need to parse this SQL and determine whether to filter the event
00252      * based on the information we obtain from the parsed SQL.
00253      * If not raw SQL, check if this event should be filtered or not
00254      * based on the schema and table names in the command message.
00255      *
00256      * The schema and table names are stored in TableMetadata headers
00257      * for most types of Statement messages.
00258      */
00259     if (statement.type() == message::Statement::RAW_SQL)
00260     {
00261       parseQuery(statement.sql(), schema_name, table_name);
00262     }
00263     else
00264     {
00265       parseStatementTableMetadata(statement, schema_name, table_name);
00266     }
00267 
00268     /*
00269      * Convert the schema name and table name strings to lowercase so that it
00270      * does not matter what case the table or schema name was specified in. We
00271      * also keep all entries in the vectors of schemas and tables to filter in
00272      * lowercase.
00273      */
00274     std::transform(schema_name.begin(), schema_name.end(),
00275                   schema_name.begin(), ::tolower);
00276     std::transform(table_name.begin(), table_name.end(),
00277                   table_name.begin(), ::tolower);
00278 
00279     if (! isSchemaFiltered(schema_name) &&
00280         ! isTableFiltered(table_name))
00281     {
00282       message::Statement *s= filtered_transaction.add_statement();
00283       *s= statement; /* copy contruct */
00284     }
00285   }
00286 
00287   if (filtered_transaction.statement_size() > 0)
00288   {
00289 
00290     /*
00291      * We can now simply call the applier's apply() method, passing
00292      * along the supplied command.
00293      */
00294     message::TransactionContext *tc= filtered_transaction.mutable_transaction_context();
00295     *tc= to_replicate.transaction_context(); /* copy construct */
00296     return in_applier->apply(in_session, filtered_transaction);
00297   }
00298   return plugin::SUCCESS;
00299 }
00300 
00301 void FilteredReplicator::populateFilter(std::string input,
00302                                         std::vector<string> &filter)
00303 {
00304   /*
00305    * Convert the input string to lowercase so that all entries in the vector
00306    * will be in lowercase.
00307    */
00308   std::transform(input.begin(), input.end(),
00309                  input.begin(), ::tolower);
00310   string::size_type last_pos= input.find_first_not_of(',', 0);
00311   string::size_type pos= input.find_first_of(',', last_pos);
00312 
00313   while (pos != string::npos || last_pos != string::npos)
00314   {
00315     filter.push_back(input.substr(last_pos, pos - last_pos));
00316     last_pos= input.find_first_not_of(',', pos);
00317     pos= input.find_first_of(',', last_pos);
00318   }
00319 }
00320 
00321 bool FilteredReplicator::isSchemaFiltered(const string &schema_name)
00322 {
00323   pthread_mutex_lock(&sch_vector_lock);
00324   std::vector<string>::iterator it= find(schemas_to_filter.begin(),
00325                                          schemas_to_filter.end(),
00326                                          schema_name);
00327   if (it != schemas_to_filter.end())
00328   {
00329     pthread_mutex_unlock(&sch_vector_lock);
00330     return true;
00331   }
00332   pthread_mutex_unlock(&sch_vector_lock);
00333 
00334   /* 
00335    * If regular expression matching is enabled for schemas to filter, then
00336    * we check to see if this schema name matches the regular expression that
00337    * has been specified. 
00338    */
00339   if (not _sch_regex.empty())
00340   {
00341     int32_t result= pcre_exec(sch_re,
00342                               NULL,
00343                               schema_name.c_str(),
00344                               schema_name.length(),
00345                               0,
00346                               0,
00347                               NULL,
00348                               0);
00349     if (result >= 0)
00350     {
00351       return true;
00352     }
00353   }
00354 
00355   return false;
00356 }
00357 
00358 bool FilteredReplicator::isTableFiltered(const string &table_name)
00359 {
00360   pthread_mutex_lock(&tab_vector_lock);
00361   std::vector<string>::iterator it= find(tables_to_filter.begin(),
00362                                          tables_to_filter.end(),
00363                                          table_name);
00364   if (it != tables_to_filter.end())
00365   {
00366     pthread_mutex_unlock(&tab_vector_lock);
00367     return true;
00368   }
00369   pthread_mutex_unlock(&tab_vector_lock);
00370 
00371   /* 
00372    * If regular expression matching is enabled for tables to filter, then
00373    * we check to see if this table name matches the regular expression that
00374    * has been specified. 
00375    */
00376   if (not _tab_regex.empty())
00377   {
00378     int32_t result= pcre_exec(tab_re,
00379                               NULL,
00380                               table_name.c_str(),
00381                               table_name.length(),
00382                               0,
00383                               0,
00384                               NULL,
00385                               0);
00386     if (result >= 0)
00387     {
00388       return true;
00389     }
00390   }
00391 
00392   return false;
00393 }
00394 
00395 void FilteredReplicator::parseQuery(const string &sql,
00396                                     string &schema_name,
00397                                     string &table_name)
00398 {
00399   /*
00400    * Determine what type of SQL we are dealing with e.g. create table,
00401    * drop table, etc.
00402    */
00403   string::size_type pos= sql.find_first_of(' ', 0);
00404   string type= sql.substr(0, pos);
00405 
00406   /*
00407    * Convert the type string to uppercase here so that it doesn't
00408    * matter what case the user entered the statement in.
00409    */
00410   std::transform(type.begin(), type.end(),
00411                  type.begin(), ::toupper);
00412 
00413   if (type.compare("DROP") == 0)
00414   {
00415     /*
00416      * The schema and table name can be either the third word
00417      * or the fifth word in a DROP TABLE statement...so we extract
00418      * the third word from the SQL and see whether it is and IF or
00419      * not.
00420      */
00421     pos= sql.find_first_of(' ', 11);
00422     string cmp_str= sql.substr(11, pos - 11);
00423     string target_name("");
00424     if (cmp_str.compare("IF") == 0)
00425     {
00426       /* the name must be the fifth word */
00427       pos= sql.find_first_of(' ', 21);
00428       target_name.assign(sql.substr(21, pos - 21));
00429     }
00430     else
00431     {
00432       target_name.assign(cmp_str);
00433     }
00434     /*
00435      * Determine whether the name is a concatenation of the schema
00436      * name and table name i.e. schema.table or just the table name
00437      * on its own.
00438      */
00439     pos= target_name.find_first_of('.', 0);
00440     if (pos != string::npos)
00441     {
00442       /*
00443        * There is a schema name here...
00444        */
00445       schema_name.assign(target_name.substr(0, pos));
00446       /*
00447        * The rest of the name string is the table name.
00448        */
00449       table_name.assign(target_name.substr(pos + 1));
00450     }
00451     else
00452     {
00453       table_name.assign(target_name);
00454     }
00455   }
00456   else if (type.compare("CREATE") == 0)
00457   {
00458     /*
00459      * The schema and table name are always the third word
00460      * in a CREATE TABLE statement...always (unless there is
00461      * some crazy syntax I am unaware of).
00462      */
00463     pos= sql.find_first_of(' ', 13);
00464     string target_name= sql.substr(13, pos - 13);
00465     /*
00466      * Determine whether the name is a concatenation of the schema
00467      * name and table name i.e. schema.table or just the table name
00468      * on its own.
00469      */
00470     pos= target_name.find_first_of('.', 0);
00471     if (pos != string::npos)
00472     {
00473       /*
00474        * There is a schema name here...
00475        */
00476       schema_name.assign(target_name.substr(0, pos));
00477       /*
00478        * The rest of the name string is the table name.
00479        */
00480       table_name.assign(target_name.substr(pos + 1));
00481     }
00482     else
00483     {
00484       table_name.assign(target_name);
00485     }
00486   }
00487   else
00488   {
00489     /* we only deal with DROP and CREATE table for the moment */
00490     return;
00491   }
00492 }
00493 
00494 void FilteredReplicator::setSchemaFilter(const string &input)
00495 {
00496   pthread_mutex_lock(&sch_vector_lock);
00497   pthread_mutex_lock(&sysvar_sch_lock);
00498   _sch_filter.assign(input);
00499   schemas_to_filter.clear();
00500   populateFilter(_sch_filter, schemas_to_filter);
00501   pthread_mutex_unlock(&sysvar_sch_lock);
00502   pthread_mutex_unlock(&sch_vector_lock);
00503 }
00504 
00505 void FilteredReplicator::setTableFilter(const string &input)
00506 {
00507   pthread_mutex_lock(&tab_vector_lock);
00508   pthread_mutex_lock(&sysvar_tab_lock);
00509   _tab_filter.assign(input);
00510   tables_to_filter.clear();
00511   populateFilter(_tab_filter, tables_to_filter);
00512   pthread_mutex_unlock(&sysvar_tab_lock);
00513   pthread_mutex_unlock(&tab_vector_lock);
00514 }
00515 
00516 static FilteredReplicator *filtered_replicator= NULL; /* The singleton replicator */
00517 
00518 static int filtered_schemas_validate(Session*, set_var *var)
00519 {
00520   const char *input= var->value->str_value.ptr();
00521   if (input == NULL)
00522     return 1;
00523 
00524   if (input && filtered_replicator)
00525   {
00526     filtered_replicator->setSchemaFilter(input);
00527     return 0;
00528   }
00529   return 1;
00530 }
00531 
00532 
00533 static int filtered_tables_validate(Session*, set_var *var)
00534 {
00535   const char *input= var->value->str_value.ptr();
00536   if (input == NULL)
00537     return 1;
00538 
00539   if (input && filtered_replicator)
00540   {
00541     filtered_replicator->setTableFilter(input);
00542     return 0;
00543   }
00544   return 1;
00545 }
00546 
00547 
00548 static int init(module::Context &context)
00549 {
00550   const module::option_map &vm= context.getOptions();
00551   
00552   filtered_replicator= new FilteredReplicator("filtered_replicator",
00553                                               vm["filteredschemas"].as<string>(),
00554                                               vm["filteredtables"].as<string>(),
00555                                               vm["schemaregex"].as<string>(),
00556                                               vm["tableregex"].as<string>());
00557 
00558   context.add(filtered_replicator);
00559   context.registerVariable(new sys_var_std_string("filteredschemas",
00560                                                   sysvar_filtered_replicator_sch_filters,
00561                                                   filtered_schemas_validate));
00562   context.registerVariable(new sys_var_std_string("filteredtables",
00563                                                   sysvar_filtered_replicator_tab_filters,
00564                                                   filtered_tables_validate));
00565 
00566   context.registerVariable(new sys_var_const_string_val("schemaregex",
00567                                                         vm["schemaregex"].as<string>()));
00568   context.registerVariable(new sys_var_const_string_val("tableregex",
00569                                                         vm["tableregex"].as<string>()));
00570 
00571   return 0;
00572 }
00573 
00574 static void init_options(drizzled::module::option_context &context)
00575 {
00576   context("filteredschemas",
00577           po::value<string>(&sysvar_filtered_replicator_sch_filters)->default_value(""),
00578           N_("Comma-separated list of schemas to exclude"));
00579   context("filteredtables",
00580           po::value<string>(&sysvar_filtered_replicator_tab_filters)->default_value(""),
00581           N_("Comma-separated list of tables to exclude"));
00582   context("schemaregex", 
00583           po::value<string>()->default_value(""),
00584           N_("Regular expression to apply to schemas to exclude"));
00585   context("tableregex", 
00586           po::value<string>()->default_value(""),
00587           N_("Regular expression to apply to tables to exclude"));
00588 }
00589 
00590 } /* namespace drizzle_plugin */
00591 
00592 DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);