Drizzled Public API Documentation

rabbitmq_log.cc

00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2010 Marcus Eriksson
00005  *
00006  *  Authors:
00007  *
00008  *  Marcus Eriksson <krummas@gmail.com>
00009  *
00010  *  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU General Public License for more details.
00019  *
00020  *  You should have received a copy of the GNU General Public License
00021  *  along with this program; if not, write to the Free Software
00022  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00023  */
00024 
00025 #include <config.h>
00026 #include "rabbitmq_log.h"
00027 #include <drizzled/message/transaction.pb.h>
00028 #include <google/protobuf/io/coded_stream.h>
00029 #include <stdio.h>
00030 #include <drizzled/module/registry.h>
00031 #include <drizzled/plugin.h>
00032 #include <stdint.h>
00033 #include "rabbitmq_handler.h"
00034 #include <boost/program_options.hpp>
00035 #include <drizzled/module/option_map.h>
00036 
00037 namespace po= boost::program_options;
00038 
00039 using namespace std;
00040 using namespace drizzled;
00041 using namespace google;
00042 
00043 namespace drizzle_plugin
00044 {
00045 
00049 static port_constraint sysvar_rabbitmq_port;
00050 
00051 
00052 RabbitMQLog::RabbitMQLog(const string &name, 
00053                          const std::string &exchange,
00054                          const std::string &routingkey,
00055                          RabbitMQHandler* mqHandler) :
00056   plugin::TransactionApplier(name),
00057   _rabbitMQHandler(mqHandler),
00058   _exchange(exchange),
00059   _routingkey(routingkey)
00060 { }
00061 
00062 RabbitMQLog::~RabbitMQLog() 
00063 { }
00064 
00065 plugin::ReplicationReturnCode
00066 RabbitMQLog::apply(Session &, const message::Transaction &to_apply)
00067 {
00068   size_t message_byte_length= to_apply.ByteSize();
00069   uint8_t* buffer= new uint8_t[message_byte_length];
00070   if(buffer == NULL)
00071   {
00072     errmsg_printf(error::ERROR, _("Failed to allocate enough memory to transaction message\n"));
00073     deactivate();
00074     return plugin::UNKNOWN_ERROR;
00075   }
00076 
00077   to_apply.SerializeWithCachedSizesToArray(buffer);
00078   try
00079   {
00080     _rabbitMQHandler->publish(buffer, 
00081                              int(message_byte_length), 
00082                              _exchange,
00083                              _routingkey);
00084   }
00085   catch(exception& e)
00086   {
00087     errmsg_printf(error::ERROR, _(e.what()));
00088     deactivate();
00089     return plugin::UNKNOWN_ERROR;
00090   }
00091   delete[] buffer;
00092   return plugin::SUCCESS;
00093 }
00094 
00095 static RabbitMQLog *rabbitmqLogger; 
00096 static RabbitMQHandler* rabbitmqHandler; 
00097 
00103 static int init(drizzled::module::Context &context)
00104 {
00105   const module::option_map &vm= context.getOptions();
00106   
00107   try 
00108   {
00109     rabbitmqHandler= new RabbitMQHandler(vm["host"].as<string>(),
00110                                          sysvar_rabbitmq_port, 
00111                                          vm["username"].as<string>(), 
00112                                          vm["password"].as<string>(), 
00113                                          vm["virtualhost"].as<string>());
00114   } 
00115   catch (exception& e) 
00116   {
00117     errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQHandler.  Got error: %s\n"),
00118                   e.what());
00119     return 1;
00120   }
00121   try 
00122   {
00123     rabbitmqLogger= new RabbitMQLog("rabbit_log_applier",
00124                                     vm["exchange"].as<string>(),
00125                                     vm["routingkey"].as<string>(),
00126                                     rabbitmqHandler);
00127   } 
00128   catch (exception& e) 
00129   {
00130     errmsg_printf(error::ERROR, _("Failed to allocate the RabbitMQLog instance.  Got error: %s\n"), 
00131                   e.what());
00132     return 1;
00133   }
00134 
00135   context.add(rabbitmqLogger);
00136   ReplicationServices &replication_services= ReplicationServices::singleton();
00137   replication_services.attachApplier(rabbitmqLogger, vm["use-replicator"].as<string>());
00138 
00139   context.registerVariable(new sys_var_const_string_val("host", vm["host"].as<string>()));
00140   context.registerVariable(new sys_var_constrained_value_readonly<in_port_t>("port", sysvar_rabbitmq_port));
00141   context.registerVariable(new sys_var_const_string_val("username", vm["username"].as<string>()));
00142   context.registerVariable(new sys_var_const_string_val("password", vm["password"].as<string>()));
00143   context.registerVariable(new sys_var_const_string_val("virtualhost", vm["virtualhost"].as<string>()));
00144   context.registerVariable(new sys_var_const_string_val("exchange", vm["exchange"].as<string>()));
00145   context.registerVariable(new sys_var_const_string_val("routingkey", vm["routingkey"].as<string>()));
00146 
00147   return 0;
00148 }
00149 
00150 
00151 static void init_options(drizzled::module::option_context &context)
00152 {
00153   context("host", 
00154           po::value<string>()->default_value("localhost"),
00155           _("Host name to connect to"));
00156   context("port",
00157           po::value<port_constraint>(&sysvar_rabbitmq_port)->default_value(5672),
00158           _("Port to connect to"));
00159   context("virtualhost",
00160           po::value<string>()->default_value("/"),
00161           _("RabbitMQ virtualhost"));
00162   context("username",
00163           po::value<string>()->default_value("guest"),
00164           _("RabbitMQ username"));
00165   context("password",
00166           po::value<string>()->default_value("guest"),
00167           _("RabbitMQ password"));
00168   context("use-replicator",
00169           po::value<string>()->default_value("default_replicator"),
00170           _("Name of the replicator plugin to use (default='default_replicator')"));
00171   context("exchange",
00172           po::value<string>()->default_value("ReplicationExchange"),
00173           _("Name of RabbitMQ exchange to publish to"));
00174   context("routingkey",
00175           po::value<string>()->default_value("ReplicationRoutingKey"),
00176           _("Name of RabbitMQ routing key to use"));
00177 }
00178 
00179 } /* namespace drizzle_plugin */
00180 
00181 DRIZZLE_PLUGIN(drizzle_plugin::init, NULL, drizzle_plugin::init_options);
00182