Drizzled Public API Documentation

replication_services.cc

00001 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
00005  *  Copyright (C) 2009-2010 Jay Pipes <jaypipes@gmail.com>
00006  *
00007  *  Authors:
00008  *
00009  *    Jay Pipes <jaypipes@gmail.com>
00010  *
00011  *  This program is free software; you can redistribute it and/or modify
00012  *  it under the terms of the GNU General Public License as published by
00013  *  the Free Software Foundation; version 2 of the License.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU General Public License for more details.
00019  *
00020  *  You should have received a copy of the GNU General Public License
00021  *  along with this program; if not, write to the Free Software
00022  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00023  */
00024 
00037 #include <config.h>
00038 #include <drizzled/replication_services.h>
00039 #include <drizzled/plugin/transaction_replicator.h>
00040 #include <drizzled/plugin/transaction_applier.h>
00041 #include <drizzled/message/transaction.pb.h>
00042 #include <drizzled/gettext.h>
00043 #include <drizzled/session.h>
00044 #include <drizzled/error.h>
00045 
00046 #include <string>
00047 #include <vector>
00048 #include <algorithm>
00049 
00050 using namespace std;
00051 
00052 namespace drizzled
00053 {
00054 
00055 ReplicationServices::ReplicationServices() :
00056   is_active(false)
00057 {
00058 }
00059 
00060 void ReplicationServices::normalizeReplicatorName(string &name)
00061 {
00062   transform(name.begin(),
00063             name.end(),
00064             name.begin(),
00065             ::tolower);
00066   if (name.find("replicator") == string::npos)
00067     name.append("replicator", 10);
00068   {
00069     size_t found_underscore= name.find('_');
00070     while (found_underscore != string::npos)
00071     {
00072       name.erase(found_underscore, 1);
00073       found_underscore= name.find('_');
00074     }
00075   }
00076 }
00077 
00078 bool ReplicationServices::evaluateRegisteredPlugins()
00079 {
00080   /* 
00081    * We loop through appliers that have registered with us
00082    * and attempts to pair the applier with its requested
00083    * replicator.  If an applier has requested a replicator
00084    * that has either not been built or has not registered
00085    * with the replication services, we print an error and
00086    * return false
00087    */
00088   if (appliers.empty())
00089     return true;
00090 
00091   if (replicators.empty() && not appliers.empty())
00092   {
00093     errmsg_printf(error::ERROR,
00094                   N_("You registered a TransactionApplier plugin but no "
00095                      "TransactionReplicator plugins were registered.\n"));
00096     return false;
00097   }
00098 
00099   for (Appliers::iterator appl_iter= appliers.begin();
00100        appl_iter != appliers.end();
00101        ++appl_iter)
00102   {
00103     plugin::TransactionApplier *applier= (*appl_iter).second;
00104     string requested_replicator_name= (*appl_iter).first;
00105     normalizeReplicatorName(requested_replicator_name);
00106 
00107     bool found= false;
00108     Replicators::iterator repl_iter;
00109     for (repl_iter= replicators.begin();
00110          repl_iter != replicators.end();
00111          ++repl_iter)
00112     {
00113       string replicator_name= (*repl_iter)->getName();
00114       normalizeReplicatorName(replicator_name);
00115 
00116       if (requested_replicator_name.compare(replicator_name) == 0)
00117       {
00118         found= true;
00119         break;
00120       }
00121     }
00122     if (not found)
00123     {
00124       errmsg_printf(error::ERROR,
00125                     N_("You registered a TransactionApplier plugin but no "
00126                        "TransactionReplicator plugins were registered that match the "
00127                        "requested replicator name of '%s'.\n"
00128                        "We have deactivated the TransactionApplier '%s'.\n"),
00129                        requested_replicator_name.c_str(),
00130                        applier->getName().c_str());
00131       applier->deactivate();
00132       return false;
00133     }
00134     else
00135     {
00136       replication_streams.push_back(make_pair(*repl_iter, applier));
00137     }
00138   }
00139   is_active= true;
00140   return true;
00141 }
00142 
00143 void ReplicationServices::attachReplicator(plugin::TransactionReplicator *in_replicator)
00144 {
00145   replicators.push_back(in_replicator);
00146 }
00147 
00148 void ReplicationServices::detachReplicator(plugin::TransactionReplicator *in_replicator)
00149 {
00150   replicators.erase(std::find(replicators.begin(), replicators.end(), in_replicator));
00151 }
00152 
00153 void ReplicationServices::attachApplier(plugin::TransactionApplier *in_applier, const string &requested_replicator_name)
00154 {
00155   appliers.push_back(make_pair(requested_replicator_name, in_applier));
00156 }
00157 
00158 void ReplicationServices::detachApplier(plugin::TransactionApplier *)
00159 {
00160 }
00161 
00162 bool ReplicationServices::isActive() const
00163 {
00164   return is_active;
00165 }
00166 
00167 plugin::ReplicationReturnCode ReplicationServices::pushTransactionMessage(Session &in_session,
00168                                                                           message::Transaction &to_push)
00169 {
00170   plugin::ReplicationReturnCode result= plugin::SUCCESS;
00171 
00172   for (ReplicationStreams::iterator iter= replication_streams.begin();
00173        iter != replication_streams.end();
00174        ++iter)
00175   {
00176     plugin::TransactionReplicator *cur_repl= iter->first;
00177     plugin::TransactionApplier *cur_appl= iter->second;
00178 
00179     result= cur_repl->replicate(cur_appl, in_session, to_push);
00180 
00181     if (result == plugin::SUCCESS)
00182     {
00183       /* 
00184        * We update the timestamp for the last applied Transaction so that
00185        * publisher plugins can ask the replication services when the
00186        * last known applied Transaction was using the getLastAppliedTimestamp()
00187        * method.
00188        */
00189       last_applied_timestamp.fetch_and_store(to_push.transaction_context().end_timestamp());
00190     }
00191     else
00192       return result;
00193   }
00194   return result;
00195 }
00196 
00197 ReplicationServices::ReplicationStreams &ReplicationServices::getReplicationStreams()
00198 {
00199   return replication_streams;
00200 }
00201 
00202 } /* namespace drizzled */