Drizzled Public API Documentation

queue_producer.h

00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2011 David Shrewsbury
00005  *
00006  *  This program is free software; you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation; either version 2 of the License, or
00009  *  (at your option) any later version.
00010  *
00011  *  This program is distributed in the hope that it will be useful,
00012  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *  GNU General Public License for more details.
00015  *
00016  *  You should have received a copy of the GNU General Public License
00017  *  along with this program; if not, write to the Free Software
00018  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00019  */
00020 
00021 #pragma once
00022 
00023 #include <client/client_priv.h>
00024 #include <drizzled/error_t.h>
00025 #include <plugin/slave/queue_thread.h>
00026 #include <plugin/slave/sql_executor.h>
00027 #include <string>
00028 #include <vector>
00029 
00030 namespace slave
00031 {
00032   
00033 class QueueProducer : public QueueThread, public SQLExecutor
00034 {
00035 public:
00036   QueueProducer() :
00037     SQLExecutor("slave", "replication"),
00038     _check_interval(5),
00039     _master_port(3306),
00040     _last_return(DRIZZLE_RETURN_OK),
00041     _is_connected(false),
00042     _saved_max_commit_id(0),
00043     _max_reconnects(10),
00044     _seconds_between_reconnects(30)
00045   {}
00046 
00047   virtual ~QueueProducer();
00048 
00049   bool init();
00050   bool process();
00051   void shutdown();
00052 
00053   void setSleepInterval(uint32_t seconds)
00054   {
00055     _check_interval= seconds;
00056   }
00057 
00058   uint32_t getSleepInterval()
00059   {
00060     return _check_interval;
00061   }
00062 
00063   void setMasterHost(const std::string &host)
00064   {
00065     _master_host= host;
00066   }
00067 
00068   void setMasterPort(uint16_t port)
00069   {
00070     _master_port= port;
00071   }
00072 
00073   void setMasterUser(const std::string &user)
00074   {
00075     _master_user= user;
00076   }
00077 
00078   void setMasterPassword(const std::string &password)
00079   {
00080     _master_pass= password;
00081   }
00082 
00083   void setMaxReconnectAttempts(uint32_t max)
00084   {
00085     _max_reconnects= max;
00086   }
00087 
00088   void setSecondsBetweenReconnects(uint32_t seconds)
00089   {
00090     _seconds_between_reconnects= seconds;
00091   }
00092 
00093   void setCachedMaxCommitId(uint64_t value)
00094   {
00095     _saved_max_commit_id= value;
00096   }
00097 
00098 private:
00100   uint32_t _check_interval;
00101 
00102   /* Master server connection parameters */
00103   std::string _master_host;
00104   uint16_t    _master_port;
00105   std::string _master_user;
00106   std::string _master_pass;
00107 
00108   drizzle_st _drizzle;
00109   drizzle_con_st _connection;
00110   drizzle_return_t _last_return;
00111 
00112   bool _is_connected;
00113   uint64_t _saved_max_commit_id;
00114   uint32_t _max_reconnects;
00115   uint32_t _seconds_between_reconnects;
00116 
00117   std::string _last_error_message;
00118 
00122   bool openConnection();
00123 
00127   bool closeConnection();
00128 
00138   bool reconnect(bool initial_connection);
00139 
00148   bool queryForMaxCommitId(uint64_t *max_commit_id);
00149 
00163   enum drizzled::error_t queryForReplicationEvents(uint64_t max_commit_id);
00164 
00165   bool queryForTrxIdList(uint64_t max_commit_id, std::vector<uint64_t> &list);
00166   bool queueInsert(const char *trx_id,
00167                    const char *seg_id,
00168                    const char *commit_id,
00169                    const char *msg,
00170                    const char *msg_length);
00171 
00178   void setIOState(const std::string &err_msg, bool status);
00179 
00180 };
00181 
00182 } /* namespace slave */
00183