Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00025 #include <config.h>
00026 #include "transaction_file_reader.h"
00027 #include <cstdlib>
00028 #include <cstdio>
00029 #include <cerrno>
00030 #include <fcntl.h>
00031 #include <boost/lexical_cast.hpp>
00032 #include <google/protobuf/io/coded_stream.h>
00033 #include <drizzled/definitions.h>
00034 #include <drizzled/algorithm/crc32.h>
00035 #include <drizzled/replication_services.h>
00036 #include <drizzled/gettext.h>
00037 #include <drizzled/util/convert.h>
00038
00039 using namespace std;
00040 using namespace drizzled;
00041 using namespace google;
00042
00043
00044 TransactionFileReader::TransactionFileReader()
00045 {
00046 raw_input= NULL;
00047 buffer= NULL;
00048 temp_buffer= NULL;
00049 previous_length= 0;
00050 file= -1;
00051 }
00052
00053 TransactionFileReader::~TransactionFileReader()
00054 {
00055 delete raw_input;
00056 close(file);
00057
00058 if (buffer != NULL)
00059 free(buffer);
00060 }
00061
00062 bool TransactionFileReader::openFile(const string &filename, int start_pos)
00063 {
00064 file= open(filename.c_str(), O_RDONLY);
00065 if (file == -1)
00066 {
00067 error= _("Cannot open file: ") + filename;
00068 return false;
00069 }
00070
00071 raw_input= new protobuf::io::FileInputStream(file);
00072
00073 if (start_pos > 0)
00074 {
00075 if (not raw_input->Skip(start_pos))
00076 {
00077 error= _("Could not skip to position ")
00078 + boost::lexical_cast<string>(start_pos);
00079 return false;
00080 }
00081 }
00082
00083 return true;
00084 }
00085
00086 bool TransactionFileReader::getNextTransaction(message::Transaction &transaction,
00087 uint32_t *checksum)
00088 {
00089 uint32_t message_type= 0;
00090 uint32_t length= 0;
00091 bool result= true;
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102 protobuf::io::CodedInputStream coded_input(raw_input);
00103
00104
00105 if (not coded_input.ReadLittleEndian32(&message_type) ||
00106 not coded_input.ReadLittleEndian32(&length))
00107 {
00108 error= "EOF";
00109 return false;
00110 }
00111
00112 if (message_type != ReplicationServices::TRANSACTION)
00113 {
00114 error= _("Found a non-transaction message in log. Currently, not supported.\n");
00115 return false;
00116 }
00117
00118 if (length > INT_MAX)
00119 {
00120 error= _("Attempted to read record bigger than INT_MAX\n");
00121 return false;
00122 }
00123
00124 if (buffer == NULL)
00125 {
00126
00127
00128
00129
00130 temp_buffer= (char *) malloc(static_cast<size_t>(length));
00131 }
00132
00133 else if (length > previous_length)
00134 {
00135 temp_buffer= (char *) realloc(buffer, static_cast<size_t>(length));
00136 }
00137
00138 if (temp_buffer == NULL)
00139 {
00140 error= _("Memory allocation failure trying to allocate ")
00141 + boost::lexical_cast<string>(length)
00142 + _(" bytes\n");
00143 return false;
00144 }
00145 else
00146 buffer= temp_buffer;
00147
00148
00149 result= coded_input.ReadRaw(buffer, (int) length);
00150
00151 if (result == false)
00152 {
00153 char errmsg[STRERROR_MAX];
00154 strerror_r(errno, errmsg, sizeof(errmsg));
00155 error= _("Could not read transaction message.\n");
00156 error += _("GPB ERROR: ") + string(errmsg) + "\n";
00157 string hexdump;
00158 hexdump.reserve(length * 4);
00159 bytesToHexdumpFormat(hexdump,
00160 reinterpret_cast<const unsigned char *>(buffer),
00161 length);
00162 error += _("HEXDUMP:\n\n") + hexdump;
00163 return false;
00164 }
00165
00166 result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
00167
00168 if (result == false)
00169 {
00170 error= _("Unable to parse command. Got error: ")
00171 + transaction.InitializationErrorString();
00172 if (buffer != NULL)
00173 {
00174 string hexdump;
00175 hexdump.reserve(length * 4);
00176 bytesToHexdumpFormat(hexdump,
00177 reinterpret_cast<const unsigned char *>(buffer),
00178 length);
00179 error += _("\nHEXDUMP:\n\n") + hexdump + "\n";
00180 }
00181 return false;
00182 }
00183
00184
00185 coded_input.ReadLittleEndian32(checksum);
00186
00187 previous_length= length;
00188
00189 return true;
00190 }
00191
00192 uint32_t TransactionFileReader::checksumLastReadTransaction()
00193 {
00194 return drizzled::algorithm::crc32(buffer,
00195 static_cast<size_t>(previous_length));
00196 }