Drizzled Public API Documentation

print_transaction_message.cc

Go to the documentation of this file.
00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2009 Sun Microsystems, Inc.
00005  *
00006  *  Authors:
00007  *
00008  *  Jay Pipes <joinfu@sun.com>
00009  *
00010  *  This program is free software; you can redistribute it and/or modify
00011  *  it under the terms of the GNU General Public License as published by
00012  *  the Free Software Foundation; either version 2 of the License, or
00013  *  (at your option) any later version.
00014  *
00015  *  This program is distributed in the hope that it will be useful,
00016  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00017  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018  *  GNU General Public License for more details.
00019  *
00020  *  You should have received a copy of the GNU General Public License
00021  *  along with this program; if not, write to the Free Software
00022  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00023  */
00024 
00031 #include <config.h>
00032 #include <drizzled/plugin/function.h>
00033 #include <drizzled/item/func.h>
00034 #include <drizzled/function/str/strfunc.h>
00035 #include <drizzled/error.h>
00036 #include <drizzled/internal/my_sys.h>
00037 #include <drizzled/charset.h>
00038 #include <drizzled/gettext.h>
00039 #include <drizzled/errmsg_print.h>
00040 
00041 #include <fcntl.h>
00042 #include <errno.h>
00043 
00044 #include "transaction_log.h"
00045 #include "print_transaction_message.h"
00046 
00047 #include <drizzled/message/transaction.pb.h>
00048 #include <drizzled/replication_services.h>
00049 #include <google/protobuf/io/zero_copy_stream.h>
00050 #include <google/protobuf/io/zero_copy_stream_impl.h>
00051 #include <google/protobuf/io/coded_stream.h>
00052 #include <google/protobuf/text_format.h>
00053 
00054 using namespace std;
00055 using namespace drizzled;
00056 using namespace google;
00057 
00059 extern TransactionLog *transaction_log;
00060 
00061 plugin::Create_function<PrintTransactionMessageFunction> *print_transaction_message_func_factory= NULL;
00062 
00063 void PrintTransactionMessageFunction::fix_length_and_dec()
00064 {
00065   max_length= 2 * 1024 * 1024; /* 2MB size limit seems ok... */
00066   args[0]->collation.set(
00067     get_charset_by_csname(args[0]->collation.collation->csname,
00068                           MY_CS_BINSORT), DERIVATION_COERCIBLE);
00069 }
00070 
00071 String *PrintTransactionMessageFunction::val_str(String *str)
00072 {
00073   assert(fixed == true);
00074 
00075   String *filename_arg= args[0]->val_str(str);
00076   off_t offset_arg= static_cast<int64_t>(args[1]->val_int());
00077 
00078   if (filename_arg == NULL || args[1]->null_value == true)
00079   {
00080     my_error(ER_INVALID_NULL_ARGUMENT, MYF(0), func_name());
00081     null_value= true;
00082     return NULL;
00083   }
00084 
00085   if (transaction_log == NULL)
00086   {
00087     my_error(ER_INVALID_NULL_ARGUMENT, MYF(0), func_name());
00088     null_value= true;
00089     return NULL;
00090   }
00091 
00092   null_value= false;
00093 
00094   message::Transaction transaction_message;
00095 
00102   const string &filename= transaction_log->getLogFilename();
00103   int log_file= open(filename.c_str(), O_RDONLY);
00104   if (log_file == -1)
00105   {
00106     sql_perror(_("Failed to open transaction log file"), filename);
00107     null_value= true;
00108 
00109     return NULL;
00110   }
00111 
00112   (void) lseek(log_file, offset_arg, SEEK_SET);
00113 
00114   protobuf::io::FileInputStream *file_input= new protobuf::io::FileInputStream(log_file);
00115   file_input->SetCloseOnDelete(true);
00116 
00117   protobuf::io::CodedInputStream *coded_input= new protobuf::io::CodedInputStream(file_input);
00118 
00119   /* Grab our message type and length */
00120   uint32_t message_type;
00121   if (! coded_input->ReadLittleEndian32(&message_type))
00122   {
00123     delete coded_input;
00124     delete file_input;
00125 
00127     null_value= true;
00128     return NULL;
00129   }
00130 
00131   /* Validate message type */
00132   if (message_type != ReplicationServices::TRANSACTION)
00133   {
00134     fprintf(stderr, _("GPB message is not a valid type.\n"));
00135     delete coded_input;
00136     delete file_input;
00137     null_value= true;
00138     return NULL;
00139   }
00140 
00141   uint32_t message_size;
00142   if (! coded_input->ReadLittleEndian32(&message_size))
00143   {
00144     delete coded_input;
00145     delete file_input;
00146 
00148     null_value= true;
00149     return NULL;
00150   }
00151 
00152   if (message_size > INT_MAX)
00153   {
00154     fprintf(stderr, _("GPB message is not a valid size.\n"));
00155     delete coded_input;
00156     delete file_input;
00157     null_value= true;
00158     return NULL;
00159   }
00160 
00161   uint8_t *buffer= (uint8_t *) malloc(message_size);
00162 
00163   bool result= coded_input->ReadRaw(buffer, message_size);
00164   if (result == false)
00165   {
00166     // 120 was arbitrary
00167     sql_perror(_("Could not read transaction message. Raw buffer read "), std::string((const char *)buffer, std::min(message_size, 120U)));
00168   }
00169 
00170   result= transaction_message.ParseFromArray(buffer, static_cast<int32_t>(message_size));
00171   if (result == false)
00172   {
00173     fprintf(stderr, _("Unable to parse transaction. Got error: %s.\n"), transaction_message.InitializationErrorString().c_str());
00174     if (buffer != NULL)
00175       fprintf(stderr, _("BUFFER: %s\n"), buffer);
00176   }
00177 
00178   free(buffer);
00179 
00180   string transaction_text;
00181   protobuf::TextFormat::PrintToString(transaction_message, &transaction_text);
00182 
00183   if (str->alloc(transaction_text.length()))
00184   {
00185     delete coded_input;
00186     delete file_input;
00187     null_value= true;
00188     return NULL;
00189   }
00190 
00191   str->length(transaction_text.length());
00192 
00193   strncpy(str->ptr(), transaction_text.c_str(), transaction_text.length());
00194 
00195   delete coded_input;
00196   delete file_input;
00197 
00198   return str;
00199 }