Drizzled Public API Documentation

transaction_file_reader.cc

Go to the documentation of this file.
00001 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2010 David Shrewsbury <shrewsbury.dave@gmail.com>
00005  *
00006  *  This program is free software; you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation; version 2 of the License.
00009  *
00010  *  This program is distributed in the hope that it will be useful,
00011  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  *  GNU General Public License for more details.
00014  *
00015  *  You should have received a copy of the GNU General Public License
00016  *  along with this program; if not, write to the Free Software
00017  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00018  */
00019 
00025 #include <config.h>
00026 #include "transaction_file_reader.h"
00027 #include <cstdlib>
00028 #include <cstdio>
00029 #include <cerrno>
00030 #include <fcntl.h>
00031 #include <boost/lexical_cast.hpp>
00032 #include <google/protobuf/io/coded_stream.h>
00033 #include <drizzled/definitions.h>
00034 #include <drizzled/algorithm/crc32.h>
00035 #include <drizzled/replication_services.h>
00036 #include <drizzled/gettext.h>
00037 #include <drizzled/util/convert.h>
00038 
00039 using namespace std;
00040 using namespace drizzled;
00041 using namespace google;
00042 
00043 
00044 TransactionFileReader::TransactionFileReader()
00045 {
00046   raw_input= NULL;
00047   buffer= NULL;
00048   temp_buffer= NULL;
00049   previous_length= 0;
00050   file= -1;
00051 }
00052 
00053 TransactionFileReader::~TransactionFileReader()
00054 {
00055   delete raw_input;
00056   close(file);
00057 
00058   if (buffer != NULL)
00059     free(buffer);
00060 }
00061 
00062 bool TransactionFileReader::openFile(const string &filename, int start_pos)
00063 {
00064   file= open(filename.c_str(), O_RDONLY);
00065   if (file == -1)
00066   {
00067     error= _("Cannot open file: ") + filename;
00068     return false;
00069   }
00070 
00071   raw_input= new protobuf::io::FileInputStream(file);
00072 
00073   if (start_pos > 0)
00074   {
00075     if (not raw_input->Skip(start_pos))
00076     {
00077       error= _("Could not skip to position ")
00078                + boost::lexical_cast<string>(start_pos);
00079       return false;
00080     }
00081   }
00082 
00083   return true;
00084 }
00085 
00086 bool TransactionFileReader::getNextTransaction(message::Transaction &transaction,
00087                                               uint32_t *checksum)
00088 {
00089   uint32_t message_type= 0;
00090   uint32_t length= 0;
00091   bool result= true;
00092 
00093   /*
00094    * Odd thing to note about using CodedInputStream: This class wasn't
00095    * intended to read large amounts of GPB messages. It has an upper
00096    * limit on the number of bytes it will read (see Protobuf docs for
00097    * this class for more info). A warning will be produced as you
00098    * get close to this limit. Since this is a pretty lightweight class,
00099    * we should be able to simply create a new one for each message we
00100    * want to read.
00101    */
00102   protobuf::io::CodedInputStream coded_input(raw_input);
00103 
00104   /* Read in the type and length of the command */
00105   if (not coded_input.ReadLittleEndian32(&message_type) ||
00106       not coded_input.ReadLittleEndian32(&length))
00107   {
00108     error= "EOF";
00109     return false;
00110   }
00111 
00112   if (message_type != ReplicationServices::TRANSACTION)
00113   {
00114     error= _("Found a non-transaction message in log. Currently, not supported.\n");
00115     return false;
00116   }
00117 
00118   if (length > INT_MAX)
00119   {
00120     error= _("Attempted to read record bigger than INT_MAX\n");
00121     return false;
00122   }
00123 
00124   if (buffer == NULL)
00125   {
00126     /*
00127      * First time around...just malloc the length.  This block gets rid
00128      * of a GCC warning about uninitialized temp_buffer.
00129      */
00130     temp_buffer= (char *) malloc(static_cast<size_t>(length));
00131   }
00132   /* No need to allocate if we have a buffer big enough... */
00133   else if (length > previous_length)
00134   {
00135     temp_buffer= (char *) realloc(buffer, static_cast<size_t>(length));
00136   }
00137 
00138   if (temp_buffer == NULL)
00139   {
00140     error= _("Memory allocation failure trying to allocate ")
00141             + boost::lexical_cast<string>(length)
00142             + _(" bytes\n");
00143     return false;
00144   }
00145   else
00146     buffer= temp_buffer;
00147 
00148   /* Read the Command */
00149   result= coded_input.ReadRaw(buffer, (int) length);
00150 
00151   if (result == false)
00152   {
00153     char errmsg[STRERROR_MAX];
00154     strerror_r(errno, errmsg, sizeof(errmsg));
00155     error= _("Could not read transaction message.\n");
00156     error += _("GPB ERROR: ") + string(errmsg) + "\n";
00157     string hexdump;
00158     hexdump.reserve(length * 4);
00159     bytesToHexdumpFormat(hexdump,
00160                          reinterpret_cast<const unsigned char *>(buffer),
00161                          length);
00162     error += _("HEXDUMP:\n\n") + hexdump;
00163     return false;
00164   }
00165 
00166   result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
00167 
00168   if (result == false)
00169   {
00170     error= _("Unable to parse command. Got error: ")
00171              + transaction.InitializationErrorString();
00172     if (buffer != NULL)
00173     {
00174       string hexdump;
00175       hexdump.reserve(length * 4);
00176       bytesToHexdumpFormat(hexdump,
00177                            reinterpret_cast<const unsigned char *>(buffer),
00178                            length);
00179       error += _("\nHEXDUMP:\n\n") + hexdump + "\n";
00180     }
00181     return false;
00182   }
00183 
00184   /* Read 4 byte checksum */
00185   coded_input.ReadLittleEndian32(checksum);
00186 
00187   previous_length= length;
00188 
00189   return true;
00190 }
00191 
00192 uint32_t TransactionFileReader::checksumLastReadTransaction()
00193 {
00194   return drizzled::algorithm::crc32(buffer,
00195                                     static_cast<size_t>(previous_length));
00196 }