Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00072 #include <config.h>
00073 #include "transaction_log.h"
00074
00075 #include <sys/stat.h>
00076 #include <fcntl.h>
00077 #include <unistd.h>
00078 #include <errno.h>
00079
00080 #include <vector>
00081 #include <string>
00082
00083 #include <drizzled/internal/my_sys.h>
00084 #include <drizzled/errmsg_print.h>
00085 #include <drizzled/gettext.h>
00086 #include <drizzled/message/transaction.pb.h>
00087 #include <drizzled/transaction_services.h>
00088 #include <drizzled/algorithm/crc32.h>
00089
00090 #include <google/protobuf/io/coded_stream.h>
00091
00092 using namespace std;
00093 using namespace drizzled;
00094 using namespace google;
00095
00096 TransactionLog *transaction_log= NULL;
00097
00098 TransactionLog::TransactionLog(const string in_log_file_path,
00099 uint32_t in_flush_frequency,
00100 bool in_do_checksum) :
00101 state(OFFLINE),
00102 log_file_path(in_log_file_path),
00103 has_error(false),
00104 error_message(),
00105 flush_frequency(in_flush_frequency),
00106 do_checksum(in_do_checksum)
00107 {
00108
00109 log_file= open(log_file_path.c_str(), O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU);
00110 if (log_file == -1)
00111 {
00112 char errmsg[STRERROR_MAX];
00113 strerror_r(errno, errmsg, sizeof(errmsg));
00114 error_message.assign(_("Failed to open transaction log file "));
00115 error_message.append(log_file_path);
00116 error_message.append(" Got error: ");
00117 error_message.append(errmsg);
00118 error_message.push_back('\n');
00119 has_error= true;
00120 return;
00121 }
00122
00123
00124 if (log_file_path.find_first_of('/') != string::npos)
00125 {
00126
00127 string tmp;
00128 tmp= log_file_path.substr(log_file_path.find_last_of('/') + 1);
00129 log_file_name.assign(tmp);
00130 }
00131 else
00132 log_file_name.assign(log_file_path);
00133
00134
00135
00136
00137
00138 log_offset= lseek(log_file, 0, SEEK_END);
00139
00140 state= ONLINE;
00141 }
00142
00143 uint8_t *TransactionLog::packTransactionIntoLogEntry(const message::Transaction &trx,
00144 uint8_t *buffer,
00145 uint32_t *checksum_out)
00146 {
00147 uint8_t *orig_buffer= buffer;
00148 size_t message_byte_length= trx.ByteSize();
00149
00150
00151
00152
00153
00154 buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
00155 static_cast<uint32_t>(ReplicationServices::TRANSACTION), buffer);
00156 buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(
00157 static_cast<uint32_t>(message_byte_length), buffer);
00158
00159
00160
00161
00162
00163 buffer= trx.SerializeWithCachedSizesToArray(buffer);
00164
00165 if (do_checksum)
00166 {
00167 *checksum_out= drizzled::algorithm::crc32(
00168 reinterpret_cast<char *>(buffer) - message_byte_length, message_byte_length);
00169 }
00170 else
00171 *checksum_out= 0;
00172
00173
00174 buffer= protobuf::io::CodedOutputStream::WriteLittleEndian32ToArray(*checksum_out, buffer);
00175
00176 buffer= orig_buffer;
00177 return orig_buffer;
00178 }
00179
00180 off_t TransactionLog::writeEntry(const uint8_t *data, size_t data_length)
00181 {
00182 ssize_t written= 0;
00183
00184
00185
00186
00187 off_t cur_offset= log_offset.fetch_and_add(static_cast<off_t>(data_length));
00188
00189
00190
00191
00192
00193 if (unlikely(state == CRASHED))
00194 {
00195
00196
00197
00198
00199 log_offset= cur_offset;
00200 return log_offset;
00201 }
00202
00203
00204 do
00205 {
00206 written= pwrite(log_file, data, data_length, cur_offset);
00207 }
00208 while (written == -1 && errno == EINTR);
00209
00210 if (unlikely(written != static_cast<ssize_t>(data_length)))
00211 {
00212 char errmsg[STRERROR_MAX];
00213 strerror_r(errno, errmsg, sizeof(errmsg));
00214 errmsg_printf(error::ERROR,
00215 _("Failed to write full size of log entry. Tried to write %" PRId64
00216 " bytes at offset %" PRId64 ", but only wrote %" PRId32 " bytes. Error: %s\n"),
00217 static_cast<int64_t>(data_length),
00218 static_cast<int64_t>(cur_offset),
00219 static_cast<int32_t>(written),
00220 errmsg);
00221 state= CRASHED;
00222
00223
00224
00225
00226 log_offset= cur_offset;
00227 }
00228
00229 int error_code= syncLogFile();
00230
00231 if (unlikely(error_code != 0))
00232 {
00233 sql_perror(_("Failed to sync log file."));
00234 }
00235
00236 return cur_offset;
00237 }
00238
00239 int TransactionLog::syncLogFile()
00240 {
00241 switch (flush_frequency)
00242 {
00243 case FLUSH_FREQUENCY_EVERY_WRITE:
00244 return internal::my_sync(log_file, 0);
00245 case FLUSH_FREQUENCY_EVERY_SECOND:
00246 {
00247 time_t now_time= time(NULL);
00248 if (last_sync_time <= (now_time - 1))
00249 {
00250 last_sync_time= now_time;
00251 return internal::my_sync(log_file, 0);
00252 }
00253 return 0;
00254 }
00255 case FLUSH_FREQUENCY_OS:
00256 default:
00257 return 0;
00258 }
00259 }
00260
00261 const string &TransactionLog::getLogFilename()
00262 {
00263 return log_file_name;
00264 }
00265
00266 const string &TransactionLog::getLogFilepath()
00267 {
00268 return log_file_path;
00269 }
00270
00271 void TransactionLog::truncate()
00272 {
00273
00274
00275
00276
00277
00278 log_offset= (off_t) 0;
00279 int result;
00280 do
00281 {
00282 result= ftruncate(log_file, log_offset);
00283 }
00284 while (result == -1 && errno == EINTR);
00285 }
00286
00287 bool TransactionLog::findLogFilenameContainingTransactionId(const ReplicationServices::GlobalTransactionId&,
00288 string &out_filename) const
00289 {
00290
00291
00292
00293
00294
00295 out_filename.assign(log_file_path);
00296 return true;
00297 }
00298
00299 bool TransactionLog::hasError() const
00300 {
00301 return has_error;
00302 }
00303
00304 void TransactionLog::clearError()
00305 {
00306 has_error= false;
00307 error_message.clear();
00308 }
00309
00310 const string &TransactionLog::getErrorMessage() const
00311 {
00312 return error_message;
00313 }
00314
00315 size_t TransactionLog::getLogEntrySize(const message::Transaction &trx)
00316 {
00317 return trx.ByteSize() + HEADER_TRAILER_BYTES;
00318 }