Drizzled Public API Documentation

transaction_log_reader.cc

Go to the documentation of this file.
00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
00005  *
00006  *  Authors:
00007  *
00008  *  Jay Pipes <joinfu@sun.com>
00009  *
00010  *  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU General Public License for more details.
00019  *
00020  *  You should have received a copy of the GNU General Public License
00021  *  along with this program; if not, write to the Free Software
00022  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00023  */
00024 
00044 #include <config.h>
00045 
00046 #include <fcntl.h>
00047 
00048 #include <climits>
00049 #include <cerrno>
00050 #include <cstdio>
00051 
00052 #include "transaction_log_reader.h"
00053 #include "transaction_log.h"
00054 
00055 #include <drizzled/gettext.h>
00056 #include <drizzled/message/transaction.pb.h>
00057 
00058 #include <google/protobuf/io/zero_copy_stream_impl.h>
00059 #include <google/protobuf/io/coded_stream.h>
00060 #include <drizzled/algorithm/crc32.h>
00061 #include <drizzled/errmsg_print.h>
00062 #include <drizzled/definitions.h>
00063 
00064 using namespace std;
00065 using namespace drizzled;
00066 using namespace google;
00067 
00068 bool TransactionLogReader::read(const ReplicationServices::GlobalTransactionId &to_read_trx_id, 
00069                             message::Transaction *to_fill)
00070 {
00071   /* 
00072    * We ask the log to give us the log file containing the
00073    * transaction message with the needed transaction id, then
00074    * we read into the log file to obtain the message, and 
00075    * fill the supplied pointer to Transaction message from the
00076    * raw data in the log file.
00077    */
00078   string log_filename_to_read;
00079   bool log_file_found= log.findLogFilenameContainingTransactionId(to_read_trx_id, log_filename_to_read);
00080   bool result= true;
00081   bool do_checksum= false;
00082 
00083   if (unlikely(! log_file_found))
00084   {
00085     return false;
00086   }
00087   else
00088   {
00089     /* Open the log file and read through the log until the transaction ID is found */
00090     int log_file= open(log_filename_to_read.c_str(), O_RDONLY | O_NONBLOCK);
00091 
00092     if (log_file == -1)
00093     {
00094       sql_perror(_("Failed to open transaction log file"), log_filename_to_read);
00095       return false;
00096     }
00097 
00098     protobuf::io::ZeroCopyInputStream *raw_input= new protobuf::io::FileInputStream(log_file);
00099     protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(raw_input);
00100 
00101     char *buffer= NULL;
00102     char *temp_buffer= NULL;
00103     uint32_t length= 0;
00104     uint32_t previous_length= 0;
00105     uint32_t checksum= 0;
00106 
00107     message::Transaction transaction;
00108 
00109     /* Read in the length of the command */
00110     while (result == true && coded_input->ReadLittleEndian32(&length) == true)
00111     {
00112       if (length > INT_MAX)
00113       {
00114         fprintf(stderr, _("Attempted to read record bigger than INT_MAX\n"));
00115         exit(1);
00116       }
00117 
00118       if (buffer == NULL)
00119       {
00120         /* 
00121         * First time around...just malloc the length.  This block gets rid
00122         * of a GCC warning about uninitialized temp_buffer.
00123         */
00124         temp_buffer= (char *) malloc(static_cast<size_t>(length));
00125       }
00126       /* No need to allocate if we have a buffer big enough... */
00127       else if (length > previous_length)
00128       {
00129         temp_buffer= (char *) realloc(buffer, static_cast<size_t>(length));
00130       }
00131 
00132       if (temp_buffer == NULL)
00133       {
00134         fprintf(stderr, _("Memory allocation failure trying to allocate %" PRIu64 " bytes.\n"),
00135                 static_cast<uint64_t>(length));
00136         break;
00137       }
00138       else
00139         buffer= temp_buffer;
00140 
00141       /* Read the Command */
00142       result= coded_input->ReadRaw(buffer, length);
00143       if (result == false)
00144       {
00145         char errmsg[STRERROR_MAX];
00146         strerror_r(errno, errmsg, sizeof(errmsg));
00147         fprintf(stderr, _("Could not read transaction message.\n"));
00148         fprintf(stderr, _("GPB ERROR: %s.\n"), errmsg);
00149         fprintf(stderr, _("Raw buffer read: %s.\n"), buffer);
00150         break;
00151       }
00152 
00153       result= transaction.ParseFromArray(buffer, static_cast<int32_t>(length));
00154       if (result == false)
00155       {
00156         fprintf(stderr, _("Unable to parse transaction. Got error: %s.\n"), transaction.InitializationErrorString().c_str());
00157         if (buffer != NULL)
00158           fprintf(stderr, _("BUFFER: %s\n"), buffer);
00159         break;
00160       }
00161 
00162       /* Skip 4 byte checksum */
00163       coded_input->ReadLittleEndian32(&checksum);
00164 
00165       if (do_checksum)
00166       {
00167         if (checksum != drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)))
00168         {
00169           fprintf(stderr, _("Checksum failed. Wanted %" PRIu32 " got %" PRIu32 "\n"), checksum, drizzled::algorithm::crc32(buffer, static_cast<size_t>(length)));
00170         }
00171       }
00172 
00173       /* Cool, message was read.  Check the trx id */
00174       if (transaction.transaction_context().transaction_id() == to_read_trx_id)
00175       {
00176         /* Found what we were looking for...copy to the pointer we should fill */
00177         to_fill->CopyFrom(transaction);
00178         break;
00179       }
00180 
00181       previous_length= length;
00182     }
00183     if (buffer)
00184       free(buffer);
00185     
00186     delete coded_input;
00187     delete raw_input;
00188 
00189     return result;
00190   }
00191 }