Drizzled Public API Documentation

transaction_log.cc

Go to the documentation of this file.
00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
00005  *  Copyright (C) 2010 Jay Pipes <jaypipes@gmail.com>
00006  *
00007  *  Authors:
00008  *
00009  *    Jay Pipes <jaypipes@gmail.com.com>
00010  *
00011  *  This program is free software; you can redistribute it and/or modify
00012  *  it under the terms of the GNU General Public License as published by
00013  *  the Free Software Foundation; either version 2 of the License, or
00014  *  (at your option) any later version.
00015  *
00016  *  This program is distributed in the hope that it will be useful,
00017  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00019  *  GNU General Public License for more details.
00020  *
00021  *  You should have received a copy of the GNU General Public License
00022  *  along with this program; if not, write to the Free Software
00023  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00024  */
00025 
00072 #include <config.h>
00073 #include "transaction_log.h"
00074 
00075 #include <sys/stat.h>
00076 #include <fcntl.h>
00077 #include <unistd.h>
00078 #include <errno.h>
00079 
00080 #include <vector>
00081 #include <string>
00082 
00083 #include <drizzled/internal/my_sys.h> /* for internal::my_sync */
00084 #include <drizzled/errmsg_print.h>
00085 #include <drizzled/gettext.h>
00086 #include <drizzled/message/transaction.pb.h>
00087 #include <drizzled/transaction_services.h>
00088 #include <drizzled/algorithm/crc32.h>
00089 
00090 #include <google/protobuf/io/coded_stream.h>
00091 
00092 using namespace std;
00093 using namespace drizzled;
00094 using namespace google;
00095 
00096 TransactionLog *transaction_log= NULL; /* The singleton transaction log */
00097 
00098 TransactionLog::TransactionLog(const string in_log_file_path,
00099                                uint32_t in_flush_frequency,
00100                                bool in_do_checksum) : 
00101     state(OFFLINE),
00102     log_file_path(in_log_file_path),
00103     has_error(false),
00104     error_message(),
00105     flush_frequency(in_flush_frequency),
00106     do_checksum(in_do_checksum)
00107 {
00108   /* Setup our log file and determine the next write offset... */
00109   log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
00110   if (log_file == -1)
00111   {
00112     char errmsg[STRERROR_MAX];
00113     strerror_r(errno, errmsg, sizeof(errmsg));
00114     error_message.assign(_("Failed to open transaction log file "));
00115     error_message.append(log_file_path);
00116     error_message.append("  Got error: ");
00117     error_message.append(errmsg);
00118     error_message.push_back('\n');
00119     has_error= true;
00120     return;
00121   }
00122 
00123   /* For convenience, grab the log file name from the path */
00124   if (log_file_path.find_first_of('/') != string::npos)
00125   {
00126     /* Strip to last / */
00127     string tmp;
00128     tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
00129     log_file_name.assign(tmp);
00130   }
00131   else
00132     log_file_name.assign(log_file_path);
00133 
00134   /* 
00135    * The offset of the next write is the current position of the log
00136    * file, since it's opened in append mode...
00137    */
00138   log_offset= lseek(log_file, 0, SEEK_END);
00139 
00140   state= ONLINE;
00141 }
00142 
00143 uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
00144                                                      uint8_t *buffer,
00145                                                      uint32_t *checksum_out)
00146 {
00147   uint8_t *orig_buffer= buffer;
00148   size_t message_byte_length= trx.ByteSize();
00149 
00150   /*
00151    * Write the header information, which is the message type and
00152    * the length of the transaction message into the buffer
00153    */
00154   buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
00155       static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
00156   buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
00157       static_cast<uint32_t>(message_byte_length), buffer);
00158   
00159   /*
00160    * Now write the serialized transaction message, followed
00161    * by the optional checksum into the buffer.
00162    */
00163   buffer= trx.SerializeWithCachedSizesToArray(buffer);
00164 
00165   if (do_checksum)
00166   {
00167     *checksum_out= drizzled::algorithm::crc32(
00168         reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
00169   }
00170   else
00171     *checksum_out= 0;
00172 
00173   /* We always write in network byte order */
00174   buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
00175   /* Reset the pointer back to its original location... */
00176   buffer= orig_buffer;
00177   return orig_buffer;
00178 }
00179 
00180 off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
00181 {
00182   ssize_t written= 0;
00183 
00184   /*
00185    * Do an atomic increment on the offset of the log file position
00186    */
00187   off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
00188 
00189   /* 
00190    * Quick safety...if an error occurs above in another writer, the log 
00191    * file will be in a crashed state.
00192    */
00193   if (unlikely(state == CRASHED))
00194   {
00195     /* 
00196      * Reset the log's offset in case we want to produce a decent error message including
00197      * the original offset where an error occurred.
00198      */
00199     log_offset= cur_offset;
00200     return log_offset;
00201   }
00202 
00203   /* Write the full buffer in one swoop */
00204   do
00205   {
00206     written= pwrite(log_file, data, data_length, cur_offset);
00207   }
00208   while (written == -1 && errno == EINTR); /* Just retry the write when interrupted by a signal... */
00209 
00210   if (unlikely(written != static_cast<ssize_t>(data_length)))
00211   {
00212     char errmsg[STRERROR_MAX];
00213     strerror_r(errno, errmsg, sizeof(errmsg));
00214     errmsg_printf(error::ERROR, 
00215                   _("Failed to write full size of log entry.  Tried to write %" PRId64
00216                     " bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes.  Error: %s\n"), 
00217                   static_cast<int64_t>(data_length),
00218                   static_cast<int64_t>(cur_offset),
00219                   static_cast<int32_t>(written), 
00220                   errmsg);
00221     state= CRASHED;
00222     /* 
00223      * Reset the log's offset in case we want to produce a decent error message including
00224      * the original offset where an error occurred.
00225      */
00226     log_offset= cur_offset;
00227   }
00228 
00229   int error_code= syncLogFile();
00230 
00231   if (unlikely(error_code != 0))
00232   {
00233     sql_perror(_("Failed to sync log file."));
00234   }
00235 
00236   return cur_offset;
00237 }
00238 
00239 int TransactionLog::syncLogFile()
00240 {
00241   switch (flush_frequency)
00242   {
00243   case FLUSH_FREQUENCY_EVERY_WRITE:
00244     return internal::my_sync(log_file, 0);
00245   case FLUSH_FREQUENCY_EVERY_SECOND:
00246     {
00247       time_t now_time= time(NULL);
00248       if (last_sync_time <= (now_time - 1))
00249       {
00250         last_sync_time= now_time;
00251         return internal::my_sync(log_file, 0);
00252       }
00253       return 0;
00254     }
00255   case FLUSH_FREQUENCY_OS:
00256   default:
00257     return 0;
00258   }
00259 }
00260 
00261 const string &TransactionLog::getLogFilename()
00262 {
00263   return log_file_name;
00264 }
00265 
00266 const string &TransactionLog::getLogFilepath()
00267 {
00268   return log_file_path;
00269 }
00270 
00271 void TransactionLog::truncate()
00272 {
00273   /* 
00274    * @note
00275    *
00276    * This is NOT THREAD SAFE! DEBUG/TEST code only!
00277    */
00278   log_offset= (off_t) 0;
00279   int result;
00280   do
00281   {
00282     result= ftruncate(log_file, log_offset);
00283   }
00284   while (result == -1 && errno == EINTR);
00285 }
00286 
00287 bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
00288                                                             string &out_filename) const
00289 {
00290   /* 
00291    * Currently, we simply return the single logfile name
00292    * Eventually, we'll have an index/hash with upper and
00293    * lower bounds to look up a log file with a transaction id
00294    */
00295   out_filename.assign(log_file_path);
00296   return true;
00297 }
00298 
00299 bool TransactionLog::hasError() const
00300 {
00301   return has_error;
00302 }
00303 
00304 void TransactionLog::clearError()
00305 {
00306   has_error= false;
00307   error_message.clear();
00308 }
00309 
00310 const string &TransactionLog::getErrorMessage() const
00311 {
00312   return error_message;
00313 }
00314 
00315 size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
00316 {
00317   return trx.ByteSize() + HEADER_TRAILER_BYTES;
00318 }