00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #pragma once
00022
00023 #include <client/client_priv.h>
00024 #include <drizzled/error_t.h>
00025 #include <plugin/slave/queue_thread.h>
00026 #include <plugin/slave/sql_executor.h>
00027 #include <string>
00028 #include <vector>
00029
00030 namespace slave
00031 {
00032
00033 class QueueProducer : public QueueThread, public SQLExecutor
00034 {
00035 public:
00036 QueueProducer() :
00037 SQLExecutor("slave", "replication"),
00038 _check_interval(5),
00039 _master_port(3306),
00040 _last_return(DRIZZLE_RETURN_OK),
00041 _is_connected(false),
00042 _saved_max_commit_id(0),
00043 _max_reconnects(10),
00044 _seconds_between_reconnects(30)
00045 {}
00046
00047 virtual ~QueueProducer();
00048
00049 bool init();
00050 bool process();
00051 void shutdown();
00052
00053 void setSleepInterval(uint32_t seconds)
00054 {
00055 _check_interval= seconds;
00056 }
00057
00058 uint32_t getSleepInterval()
00059 {
00060 return _check_interval;
00061 }
00062
00063 void setMasterHost(const std::string &host)
00064 {
00065 _master_host= host;
00066 }
00067
00068 void setMasterPort(uint16_t port)
00069 {
00070 _master_port= port;
00071 }
00072
00073 void setMasterUser(const std::string &user)
00074 {
00075 _master_user= user;
00076 }
00077
00078 void setMasterPassword(const std::string &password)
00079 {
00080 _master_pass= password;
00081 }
00082
00083 void setMaxReconnectAttempts(uint32_t max)
00084 {
00085 _max_reconnects= max;
00086 }
00087
00088 void setSecondsBetweenReconnects(uint32_t seconds)
00089 {
00090 _seconds_between_reconnects= seconds;
00091 }
00092
00093 void setCachedMaxCommitId(uint64_t value)
00094 {
00095 _saved_max_commit_id= value;
00096 }
00097
00098 private:
00100 uint32_t _check_interval;
00101
00102
00103 std::string _master_host;
00104 uint16_t _master_port;
00105 std::string _master_user;
00106 std::string _master_pass;
00107
00108 drizzle_st _drizzle;
00109 drizzle_con_st _connection;
00110 drizzle_return_t _last_return;
00111
00112 bool _is_connected;
00113 uint64_t _saved_max_commit_id;
00114 uint32_t _max_reconnects;
00115 uint32_t _seconds_between_reconnects;
00116
00117 std::string _last_error_message;
00118
00122 bool openConnection();
00123
00127 bool closeConnection();
00128
00138 bool reconnect(bool initial_connection);
00139
00148 bool queryForMaxCommitId(uint64_t *max_commit_id);
00149
00163 enum drizzled::error_t queryForReplicationEvents(uint64_t max_commit_id);
00164
00165 bool queryForTrxIdList(uint64_t max_commit_id, std::vector<uint64_t> &list);
00166 bool queueInsert(const char *trx_id,
00167 const char *seg_id,
00168 const char *commit_id,
00169 const char *msg,
00170 const char *msg_length);
00171
00178 void setIOState(const std::string &err_msg, bool status);
00179
00180 };
00181
00182 }
00183