00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033 #include "cslib/CSConfig.h"
00034
00035 #include <stdlib.h>
00036 #include <inttypes.h>
00037
00038 #include "cslib/CSGlobal.h"
00039 #include "cslib/CSStrUtil.h"
00040 #include "cslib/CSStorage.h"
00041
00042 #include "trans_log_ms.h"
00043 #include "trans_cache_ms.h"
00044
00045 #ifdef CRASH_TEST
00046 uint32_t trans_test_crash_point;
00047 #define CRASH_POINT(p) { if (p == trans_test_crash_point) { char *ptr = NULL; printf("Crash on demand at: %s(%d), start: %"PRIu64", eol: %"PRIu64"\n", __FILE__, __LINE__, txn_Start, txn_EOL); *ptr = 88;}}
00048 #else
00049 #define CRASH_POINT(p)
00050 #endif
00051
00052 #define MS_TRANS_LOG_MAGIC 0xA6E7D7B3
00053 #define MS_TRANS_LOG_VERSION 1
00054 #define MS_TRANS_LOG_RECOVERED 0XA1
00055 #define MS_TRANS_LOG_NOT_RECOVERED 0XA2
00056 #define MS_TRANS_NO_OVERFLOW 0XB1
00057 #define MS_TRANS_OVERFLOW 0XB2
00058
00059 #define DFLT_TRANS_CHECKPOINT_THRESHOLD 1024
00060
00061 #define DFLT_TRANS_LOG_LIST_SIZE (1024 * 10)
00062 #define DFLT_TRANS_CACHE_SIZE (500)
00063
00064 #define TRANS_CAN_RESIZE ((txn_MaxRecords != txn_ReqestedMaxRecords) && (txn_EOL >= txn_Start) && !txn_HaveOverflow)
00065
00066 typedef struct MSDiskTrans {
00067 CSDiskValue4 dtr_id_4;
00068 CSDiskValue1 dtr_type_1;
00069 CSDiskValue1 dtr_check_1;
00070 CSDiskValue4 dtr_db_id_4;
00071 CSDiskValue4 dtr_tab_id_4;
00072 CSDiskValue8 dtr_blob_id_8;
00073 CSDiskValue8 dtr_blob_ref_id_8;
00074 } MSDiskTransRec, *MSDiskTransPtr;
00075
00076 #define SET_DISK_TRANSREC(d, s) { \
00077 CS_SET_DISK_4((d)->dtr_id_4, (s)->tr_id);\
00078 CS_SET_DISK_1((d)->dtr_type_1, (s)->tr_type);\
00079 CS_SET_DISK_1((d)->dtr_check_1, (s)->tr_check);\
00080 CS_SET_DISK_4((d)->dtr_db_id_4, (s)->tr_db_id);\
00081 CS_SET_DISK_4((d)->dtr_tab_id_4, (s)->tr_tab_id);\
00082 CS_SET_DISK_8((d)->dtr_blob_id_8, (s)->tr_blob_id);\
00083 CS_SET_DISK_8((d)->dtr_blob_ref_id_8, (s)->tr_blob_ref_id);\
00084 }
00085
00086 #define GET_DISK_TRANSREC(s, d) { \
00087 (s)->tr_id = CS_GET_DISK_4((d)->dtr_id_4);\
00088 (s)->tr_type = CS_GET_DISK_1((d)->dtr_type_1);\
00089 (s)->tr_check = CS_GET_DISK_1((d)->dtr_check_1);\
00090 (s)->tr_db_id = CS_GET_DISK_4((d)->dtr_db_id_4);\
00091 (s)->tr_tab_id = CS_GET_DISK_4((d)->dtr_tab_id_4);\
00092 (s)->tr_blob_id = CS_GET_DISK_8((d)->dtr_blob_id_8);\
00093 (s)->tr_blob_ref_id = CS_GET_DISK_8((d)->dtr_blob_ref_id_8);\
00094 }
00095
00096 static uint8_t checksum(uint8_t *data, size_t len)
00097 {
00098 register uint32_t sum = 0, g;
00099 uint8_t *chk;
00100
00101 chk = data + len - 1;
00102 while (chk > data) {
00103 sum = (sum << 4) + *chk;
00104 if ((g = sum & 0xF0000000)) {
00105 sum = sum ^ (g >> 24);
00106 sum = sum ^ g;
00107 }
00108 chk--;
00109 }
00110 return (uint8_t) (sum ^ (sum >> 24) ^ (sum >> 16) ^ (sum >> 8));
00111 }
00112
00113 MSTrans::MSTrans() :
00114 CSSharedRefObject(),
00115 txn_MaxCheckPoint(0),
00116 txn_Doingbackup(false),
00117 txn_reader(NULL),
00118 txn_IsTxnValid(false),
00119 txn_CurrentTxn(0),
00120 txn_TxnIndex(0),
00121 txn_StartCheckPoint(0),
00122 txn_TransCache(NULL),
00123 txn_BlockingTransaction(0),
00124 txn_File(NULL),
00125 txn_EOLCheckPoint(0),
00126 txn_MaxRecords(0),
00127 txn_ReqestedMaxRecords(0),
00128 txn_HighWaterMark(0),
00129 txn_OverflowCount(0),
00130 txn_MaxTID(0),
00131 txn_Recovered(false),
00132 txn_HaveOverflow(false),
00133 txn_Overflow(0),
00134 txn_EOL(0),
00135 txn_Start(0),
00136 txn_Checksum(0)
00137 {
00138 }
00139
00140 MSTrans::~MSTrans()
00141 {
00142 txn_Close();
00143 if (txn_TransCache)
00144 txn_TransCache->release();
00145
00146 }
00147
00148 void MSTrans::txn_Close()
00149 {
00150
00151 if (txn_File) {
00152
00153 CS_SET_DISK_4(txn_DiskHeader.th_next_txn_id_4, txn_MaxTID);
00154 txn_File->write(&(txn_DiskHeader.th_next_txn_id_4), offsetof(MSDiskTransHeadRec, th_next_txn_id_4), 4 );
00155
00156 CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
00157 CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00158 CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
00159 txn_File->write(&(txn_DiskHeader.th_start_8),
00160 offsetof(MSDiskTransHeadRec, th_start_8),
00161 sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
00162 CRASH_POINT(1);
00163 txn_File->flush();
00164 txn_File->sync();
00165
00166 if (txn_Recovered) {
00167
00168 CS_SET_DISK_1(txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
00169 txn_File->write(&(txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1 );
00170 txn_File->flush();
00171 txn_File->sync();
00172 }
00173
00174 txn_File->close();
00175 txn_File->release();
00176 txn_File = NULL;
00177 }
00178 }
00179 void MSTrans::txn_SetFile(CSFile *tr_file)
00180 {
00181 txn_File = tr_file;
00182 }
00183
00184
00185 #ifdef TRACE_ALL
00186 static FILE *txn_debug_log;
00187 #endif
00188
00189 MSTrans *MSTrans::txn_NewMSTrans(const char *log_path, bool dump_log)
00190 {
00191 MSTrans *trans = NULL;
00192 CSPath *path = NULL;
00193 uint64_t log_size;
00194 enter_();
00195
00196 (void) dump_log;
00197
00198 new_(trans, MSTrans());
00199 push_(trans);
00200
00201 path = CSPath::newPath(log_path);
00202 push_(path);
00203
00204 try_again:
00205
00206
00207 if (!path->exists()) {
00208
00209 MSDiskTransRec *recs;
00210 off64_t offset = sizeof(MSDiskTransHeadRec);
00211 uint64_t num_records = DFLT_TRANS_LOG_LIST_SIZE;
00212 size_t size;
00213 CSFile *tr_file;
00214
00215 recs = (MSDiskTransRec *) cs_calloc(1024 * sizeof(MSDiskTransRec));
00216 push_ptr_(recs);
00217
00218 tr_file = path->createFile(CSFile::CREATE);
00219 push_(tr_file);
00220
00221 log_size = DFLT_TRANS_LOG_LIST_SIZE * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
00222
00223
00224 while (num_records) {
00225 if (num_records < 1024)
00226 size = num_records;
00227 else
00228 size = 1024;
00229 tr_file->write(recs, offset, size * sizeof(MSDiskTransRec));
00230 offset += size * sizeof(MSDiskTransRec);
00231 num_records -= size;
00232 }
00233
00234 trans->txn_MaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
00235 trans->txn_ReqestedMaxRecords = DFLT_TRANS_LOG_LIST_SIZE;
00236 trans->txn_MaxCheckPoint = DFLT_TRANS_CHECKPOINT_THRESHOLD;
00237 trans->txn_MaxTID = 1;
00238
00239
00240 CS_SET_DISK_4(trans->txn_DiskHeader.th_magic_4, MS_TRANS_LOG_MAGIC);
00241 CS_SET_DISK_2(trans->txn_DiskHeader.th_version_2, MS_TRANS_LOG_VERSION);
00242
00243 CS_SET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4, trans->txn_MaxTID);
00244
00245 CS_SET_DISK_2(trans->txn_DiskHeader.th_check_point_2, trans->txn_MaxCheckPoint);
00246
00247 CS_SET_DISK_8(trans->txn_DiskHeader.th_list_size_8, trans->txn_MaxRecords);
00248 CS_SET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8, trans->txn_ReqestedMaxRecords);
00249
00250 CS_SET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4, DFLT_TRANS_CACHE_SIZE);
00251
00252 CS_SET_DISK_8(trans->txn_DiskHeader.th_start_8, 0);
00253 CS_SET_DISK_8(trans->txn_DiskHeader.th_eol_8, 0);
00254
00255 CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_RECOVERED);
00256 CS_SET_DISK_1(trans->txn_DiskHeader.th_checksum_1, 1);
00257 CS_SET_DISK_1(trans->txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
00258
00259 tr_file->write(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec));
00260 pop_(tr_file);
00261 trans->txn_SetFile(tr_file);
00262
00263 trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
00264
00265 trans->txn_TransCache = MSTransCache::newMSTransCache(DFLT_TRANS_CACHE_SIZE);
00266
00267 release_(recs);
00268
00269 } else {
00270 bool overflow = false, recovered = false;
00271
00272 CSFile *tr_file = path->createFile(CSFile::DEFAULT);
00273 push_(tr_file);
00274
00275
00276 if (tr_file->read(&(trans->txn_DiskHeader), 0, sizeof(MSDiskTransHeadRec), 0) < sizeof(MSDiskTransHeadRec)) {
00277 release_(tr_file);
00278 path->removeFile();
00279 goto try_again;
00280 }
00281
00282
00283 if (CS_GET_DISK_4(trans->txn_DiskHeader.th_magic_4) != MS_TRANS_LOG_MAGIC)
00284 CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_HEADER_MAGIC);
00285
00286 if (CS_GET_DISK_2(trans->txn_DiskHeader.th_version_2) != MS_TRANS_LOG_VERSION)
00287 CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_VERSION_TOO_NEW);
00288
00289
00290 if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_NO_OVERFLOW)
00291 overflow = false;
00292 else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_overflow_1) == MS_TRANS_OVERFLOW)
00293 overflow = true;
00294 else
00295 CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
00296
00297
00298 if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_NOT_RECOVERED)
00299 recovered = false;
00300 else if (CS_GET_DISK_1(trans->txn_DiskHeader.th_recovered_1) == MS_TRANS_LOG_RECOVERED)
00301 recovered = true;
00302 else
00303 CSException::throwFileError(CS_CONTEXT, path->getCString(), CS_ERR_BAD_FILE_HEADER);
00304
00305
00306 log_size = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8) * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec);
00307
00308 if ((log_size > tr_file->getEOF()) ||
00309 ((log_size < tr_file->getEOF()) && !overflow)){
00310
00311 char buffer[CS_EXC_MESSAGE_SIZE];
00312 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unexpected transaction log size: ");
00313 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, path->getCString());
00314 CSException::throwException(CS_CONTEXT, CS_ERR_BAD_FILE_HEADER, buffer);
00315 }
00316
00317 trans->txn_MaxTID = CS_GET_DISK_4(trans->txn_DiskHeader.th_next_txn_id_4);
00318
00319
00320 trans->txn_TransCache = MSTransCache::newMSTransCache(CS_GET_DISK_4(trans->txn_DiskHeader.th_requested_cache_size_4));
00321
00322 pop_(tr_file);
00323 trans->txn_SetFile(tr_file);
00324
00325 trans->txn_MaxCheckPoint = CS_GET_DISK_2(trans->txn_DiskHeader.th_check_point_2);
00326
00327 trans->txn_MaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_list_size_8);
00328 trans->txn_ReqestedMaxRecords = CS_GET_DISK_8(trans->txn_DiskHeader.th_requested_list_size_8);
00329
00330 trans->txn_Checksum = CS_GET_DISK_1(trans->txn_DiskHeader.th_checksum_1);
00331 trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
00332 trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
00333 trans->txn_HaveOverflow = overflow;
00334 if (overflow)
00335 trans->txn_Overflow = (tr_file->getEOF() - sizeof(MSDiskTransHeadRec)) /sizeof(MSDiskTransRec);
00336 else
00337 trans->txn_Overflow = 0;
00338
00339 #ifdef DEBUG
00340 if (overflow)
00341 printf("Recovering overflow log\n");
00342 if (dump_log) {
00343 char name[100];
00344 snprintf(name, 100, "%dms-trans-log.dump", (int)time(NULL));
00345 trans->txn_DumpLog(name);
00346 }
00347 #endif
00348
00349 if (!recovered)
00350 trans->txn_Recover();
00351
00352
00353 }
00354
00355 trans->txn_Recovered = true;
00356
00357
00358 trans->txn_EOL = CS_GET_DISK_8(trans->txn_DiskHeader.th_eol_8);
00359 trans->txn_Start = CS_GET_DISK_8(trans->txn_DiskHeader.th_start_8);
00360
00361
00362
00363 CS_SET_DISK_1(trans->txn_DiskHeader.th_recovered_1, MS_TRANS_LOG_NOT_RECOVERED);
00364 trans->txn_File->write(&(trans->txn_DiskHeader.th_recovered_1), offsetof(MSDiskTransHeadRec, th_recovered_1), 1);
00365
00366
00367 trans->txn_TransCache->tc_StartCacheReload(true);
00368 trans->txn_LoadTransactionCache(trans->txn_Start);
00369 trans->txn_TransCache->tc_CompleteCacheReload();
00370
00371 if (trans->txn_MaxRecords != trans->txn_ReqestedMaxRecords)
00372 trans->txn_ResizeLog();
00373
00374 release_(path);
00375 pop_(trans);
00376
00377 #ifdef TRACE_ALL
00378
00379 txn_debug_log = fopen("log_dump.txt", "w+");
00380 if (!txn_debug_log) {
00381 perror("log_dump.txt");
00382 }
00383 #endif
00384
00385 return_(trans);
00386 }
00387
00388 bool MSTrans::txn_ValidRecord(MSTransPtr rec)
00389 {
00390 uint8_t check = rec->tr_check;
00391 bool ok;
00392
00393 rec->tr_check = txn_Checksum;
00394 ok = (checksum((uint8_t*)rec, sizeof(MSTransRec)) == check);
00395 rec->tr_check = check;
00396 return ok;
00397 }
00398
00399 void MSTrans::txn_GetRecordAt(uint64_t index, MSTransPtr rec)
00400 {
00401 MSDiskTransRec drec;
00402 off64_t offset;
00403
00404
00405 offset = sizeof(MSDiskTransHeadRec) + index * sizeof(MSDiskTransRec);
00406 txn_File->read(&drec, offset, sizeof(MSDiskTransRec), sizeof(MSDiskTransRec));
00407 GET_DISK_TRANSREC(rec, &drec);
00408 }
00409
00410
00411
00412
00413 void MSTrans::txn_Recover()
00414 {
00415 MSTransRec rec = {0,0,0,0,0,0,0};
00416 uint64_t original_eol = txn_EOL;
00417 enter_();
00418
00419 #ifdef DEBUG
00420 printf("Recovering transaction log!\n");
00421 #endif
00422
00423 txn_MaxTID = 0;
00424
00425
00426 for (; txn_EOL < txn_MaxRecords; txn_EOL++) {
00427 txn_GetRecordAt(txn_EOL, &rec);
00428 if (! txn_ValidRecord(&rec))
00429 break;
00430 }
00431
00432 if (txn_EOL == txn_MaxRecords) {
00433
00434
00435
00436
00437 txn_EOL = 0;
00438 }
00439
00440 txn_MaxTID++;
00441
00442 CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00443
00444
00445
00446
00447 if (((original_eol < txn_Start) || (original_eol > txn_EOL)) && (txn_EOL >= txn_Start))
00448 txn_Start = txn_EOL +1;
00449
00450
00451 uint64_t end_search = (txn_Start < txn_EOL)? txn_EOL : txn_MaxRecords;
00452 for (; txn_Start < end_search; txn_Start++) {
00453 txn_GetRecordAt(txn_Start, &rec);
00454 if (TRANS_IS_START(rec.tr_type))
00455 break;
00456 }
00457
00458 if (txn_Start == end_search)
00459 txn_Start = txn_EOL;
00460
00461 CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
00462
00463 txn_TransCache->tc_SetRecovering(true);
00464
00465 txn_TransCache->tc_StartCacheReload(true);
00466 txn_LoadTransactionCache(txn_Start);
00467 txn_TransCache->tc_CompleteCacheReload();
00468
00469
00470
00471 TRef ref;
00472 bool terminated;
00473 while (txn_TransCache->tc_GetTransaction(&ref, &terminated)) {
00474
00475 txn_MaxTID = txn_TransCache->tc_GetTransactionID(ref);
00476 if (!terminated) {
00477 self->myTID = txn_MaxTID;
00478 self->myTransRef = ref;
00479 self->myStartTxn = false;
00480 txn_AddTransaction(MS_RecoveredTxn);
00481 }
00482 CRASH_POINT(2);
00483 txn_TransCache->tc_FreeTransaction(ref);
00484
00485
00486
00487
00488
00489 if (txn_TransCache->tc_ShoulReloadCache()) {
00490 txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload(true));
00491 txn_TransCache->tc_CompleteCacheReload();
00492 }
00493 }
00494
00495
00496 txn_TransCache->tc_SetRecovering(false);
00497 self->myTransRef = 0;
00498
00499
00500 CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00501
00502 exit_();
00503 }
00504
00505 bool ReadTXNLog::rl_CanContinue()
00506 {
00507 return rl_log->txn_TransCache->tc_ContinueCacheReload();
00508 }
00509
00510 void ReadTXNLog::rl_Load(uint64_t log_position, MSTransPtr rec)
00511 {
00512 rl_log->txn_TransCache->tc_AddRec(log_position, rec);
00513 }
00514
00515 void ReadTXNLog::rl_Store(uint64_t log_position, MSTransPtr rec)
00516 {
00517 MSDiskTransRec drec;
00518 SET_DISK_TRANSREC(&drec, rec);
00519
00520 rl_log->txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + log_position * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec));
00521 }
00522
00523 void ReadTXNLog::rl_Flush()
00524 {
00525 rl_log->txn_File->flush();
00526 rl_log->txn_File->sync();
00527 }
00528
00529 void ReadTXNLog::rl_ReadLog(uint64_t read_start, bool log_locked)
00530 {
00531 uint64_t size, orig_size;
00532 bool reading_overflow = (read_start >= rl_log->txn_MaxRecords);
00533 enter_();
00534
00535
00536
00537 if (reading_overflow) {
00538 orig_size = rl_log->txn_Overflow;
00539 size = rl_log->txn_Overflow - read_start;
00540 } else {
00541 orig_size = rl_log->txn_GetNumRecords();
00542
00543 if (rl_log->txn_Start <= read_start)
00544 size = orig_size - (read_start - rl_log->txn_Start);
00545 else
00546 size = rl_log->txn_EOL - read_start;
00547 }
00548
00549
00550 while (size && rl_CanContinue()) {
00551 MSDiskTransRec diskRecords[1000];
00552 uint32_t read_size;
00553 off64_t offset;
00554
00555 if (size > 1000)
00556 read_size = 1000 ;
00557 else
00558 read_size = size ;
00559
00560
00561 if ((!reading_overflow) && (rl_log->txn_EOL < read_start) && ((rl_log->txn_MaxRecords - read_start) < read_size))
00562 read_size = rl_log->txn_MaxRecords - read_start ;
00563
00564
00565 offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);
00566 rl_log->txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
00567
00568
00569 for (uint32_t i = 0; i < read_size && rl_CanContinue(); i++) {
00570 MSTransRec rec;
00571 MSDiskTransPtr drec = diskRecords + i;
00572 GET_DISK_TRANSREC(&rec, drec);
00573
00574 rl_Load(read_start + i, &rec);
00575 }
00576
00577 size -= read_size;
00578 read_start += read_size;
00579 if (read_start == rl_log->txn_MaxRecords)
00580 read_start = 0;
00581 }
00582
00583 if (rl_log->txn_HaveOverflow && !reading_overflow) {
00584 if (rl_CanContinue())
00585 rl_ReadLog(rl_log->txn_MaxRecords, false);
00586
00587 } else if (!log_locked) {
00588
00589
00590
00591
00592
00593 uint64_t new_size;
00594 lock_(rl_log);
00595 if (reading_overflow)
00596 new_size = rl_log->txn_Overflow;
00597 else
00598 new_size = rl_log->txn_GetNumRecords();
00599 if (rl_CanContinue() && (orig_size != new_size)) {
00600 rl_ReadLog(read_start, true);
00601 }
00602 unlock_(rl_log);
00603 }
00604
00605
00606 exit_();
00607 }
00608
00609 void MSTrans::txn_LoadTransactionCache(uint64_t read_start)
00610 {
00611 ReadTXNLog log(this);
00612 enter_();
00613 log.rl_ReadLog(read_start, false);
00614 txn_TransCache->tc_UpdateCacheVersion();
00615 exit_();
00616 }
00617
00618 void MSTrans::txn_ResizeLog()
00619 {
00620 enter_();
00621
00622 lock_(this);
00623 if (TRANS_CAN_RESIZE) {
00624
00625
00626
00627
00628
00629 #ifdef DEBUG
00630 uint64_t old_size = txn_MaxRecords;
00631 #endif
00632 if (txn_MaxRecords > txn_ReqestedMaxRecords) {
00633 uint64_t max_resize = txn_MaxRecords - txn_EOL;
00634
00635 if ( txn_Start == txn_EOL)
00636 max_resize = txn_MaxRecords;
00637 else {
00638 max_resize = txn_MaxRecords - txn_EOL;
00639 if (!txn_Start)
00640 max_resize--;
00641 }
00642
00643
00644 if (max_resize > (txn_MaxRecords - txn_ReqestedMaxRecords))
00645 max_resize = txn_MaxRecords - txn_ReqestedMaxRecords;
00646
00647 txn_MaxRecords -= max_resize;
00648 } else
00649 txn_MaxRecords = txn_ReqestedMaxRecords;
00650
00651 #ifdef DEBUG
00652 char buffer[CS_EXC_MESSAGE_SIZE];
00653 snprintf(buffer, CS_EXC_MESSAGE_SIZE, "Resizing the Transaction log from %"PRIu64" to %"PRIu64" \n", old_size, txn_MaxRecords);
00654 CSException::logException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, buffer);
00655 #endif
00656
00657 CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
00658
00659 txn_File->setEOF(txn_MaxRecords * sizeof(MSDiskTransRec) + sizeof(MSDiskTransHeadRec));
00660 txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
00661
00662 if (txn_Start == txn_EOL) {
00663 txn_Start = 0;
00664 txn_EOL = 0;
00665 } else if (txn_MaxRecords == txn_EOL) {
00666 txn_EOL = 0;
00667 }
00668
00669 txn_ResetEOL();
00670
00671 }
00672 unlock_(this);
00673
00674 exit_();
00675 }
00676
00677 void MSTrans::txn_ResetEOL()
00678 {
00679 enter_();
00680
00681 txn_EOLCheckPoint = txn_MaxCheckPoint;
00682 txn_StartCheckPoint = txn_MaxCheckPoint;
00683
00684 if (!txn_EOL)
00685 txn_Checksum++;
00686 CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00687 CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
00688 CS_SET_DISK_1(txn_DiskHeader.th_checksum_1, txn_Checksum);
00689 txn_File->write(&(txn_DiskHeader.th_start_8),
00690 offsetof(MSDiskTransHeadRec, th_start_8),
00691 sizeof(MSDiskTransHeadRec) - offsetof(MSDiskTransHeadRec, th_start_8) );
00692 CRASH_POINT(5);
00693 txn_File->flush();
00694 txn_File->sync();
00695 CRASH_POINT(10);
00696
00697 exit_();
00698 }
00699
00700 #define PRINT_TRANS(tid, a, t)
00701
00702 #ifndef PRINT_TRANS
00703 #define PRINT_TRANS(tid, a, t) printTrans(tid, a, t)
00704 static void printTrans(uint32_t tid, bool autocommit, MS_Txn type)
00705 {
00706 const char *type_name = "???";
00707
00708 switch (type) {
00709 case MS_RollBackTxn:
00710 type_name = "Rollback";
00711 break;
00712 case MS_PartialRollBackTxn:
00713 type_name = "PartialRollBack";
00714 break;
00715 case MS_CommitTxn:
00716 type_name = "Commit";
00717 break;
00718 case MS_ReferenceTxn:
00719 type_name = "Reference";
00720 break;
00721 case MS_DereferenceTxn:
00722 type_name = "Dereference";
00723 break;
00724 case MS_RecoveredTxn:
00725 type_name = "Recovered";
00726 break;
00727 }
00728
00729 fprintf(stderr, "MSTrans::txn_LogTransaction(%d, autocommit = %s, %s)\n", tid, (autocommit)?"On":"Off", type_name);
00730
00731 }
00732 #endif
00733
00734 void MSTrans::txn_LogTransaction(MS_Txn type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
00735 {
00736 enter_();
00737
00738 lock_(this);
00739 if (!self->myTID) {
00740 switch (type) {
00741 case MS_RollBackTxn:
00742 case MS_PartialRollBackTxn:
00743 case MS_CommitTxn: {
00744 unlock_(this);
00745 exit_();;
00746 }
00747 case MS_ReferenceTxn:
00748 case MS_DereferenceTxn:
00749 case MS_RecoveredTxn:
00750 break;
00751 }
00752 txn_MaxTID++;
00753 self->myTID = txn_MaxTID;
00754 self->myTransRef = TRANS_CACHE_NEW_REF;
00755 self->myStartTxn = true;
00756 }
00757
00758 PRINT_TRANS(self->myTID, autocommit, type);
00759
00760 txn_AddTransaction(type, autocommit, db_id, tab_id, blob_id, blob_ref_id);
00761 if (autocommit || TRANS_TYPE_IS_TERMINATED(type))
00762 txn_NewTransaction();
00763
00764 unlock_(this);
00765
00766 exit_();
00767 }
00768
00769 void MSTrans::txn_AddTransaction(uint8_t tran_type, bool autocommit, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
00770 {
00771 MSTransRec rec = {0,0,0,0,0,0,0};
00772 MSDiskTransRec drec;
00773 uint64_t new_offset = txn_EOL;
00774 bool do_flush = true;
00775
00776 enter_();
00777
00778
00779
00780 if (txn_IsFull()) {
00781 if (!txn_HaveOverflow) {
00782 CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_OVERFLOW);
00783 txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
00784
00785 CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
00786 CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00787 txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
00788
00789 txn_File->flush();
00790 txn_File->sync();
00791 txn_HaveOverflow = true;
00792 txn_OverflowCount++;
00793 txn_Overflow = txn_MaxRecords;
00794 }
00795
00796 new_offset = txn_Overflow;
00797 }
00798
00799 rec.tr_id = self->myTID ;
00800 rec.tr_type = tran_type;
00801 rec.tr_db_id = db_id;
00802 rec.tr_tab_id = tab_id;
00803 rec.tr_blob_id = blob_id;
00804 rec.tr_blob_ref_id = blob_ref_id;
00805
00806 if (self->myStartTxn) {
00807 TRANS_SET_START(rec.tr_type);
00808 self->myStartTxn = false;
00809 }
00810
00811 if (autocommit) {
00812 TRANS_SET_AUTOCOMMIT(rec.tr_type);
00813 }
00814
00815 #ifdef TRACE_ALL
00816 if (txn_debug_log){
00817 char *ttype, *cmt;
00818 switch (TRANS_TYPE(rec.tr_type)) {
00819 case MS_ReferenceTxn:
00820 ttype = "+";
00821 break;
00822 case MS_DereferenceTxn:
00823 ttype = "-";
00824 break;
00825 case MS_RollBackTxn:
00826 ttype = "rb";
00827 rec.tr_blob_ref_id = 0;
00828 break;
00829 case MS_RecoveredTxn:
00830 ttype = "rcov";
00831 rec.tr_blob_ref_id = 0;
00832 break;
00833 default:
00834 ttype = "???";
00835 }
00836
00837 if (TRANS_IS_TERMINATED(rec.tr_type))
00838 cmt = "c";
00839 else
00840 cmt = "";
00841
00842 fprintf(txn_debug_log, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" %"PRIu64" %"PRIu64" %"PRIu64" %d\n", self->myTID, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id, txn_Start, txn_EOL, new_offset, txn_HaveOverflow);
00843 }
00844 #endif
00845
00846 rec.tr_check = txn_Checksum;
00847
00848
00849 rec.tr_check = checksum((uint8_t*)&rec, sizeof(rec));
00850
00851
00852 SET_DISK_TRANSREC(&drec, &rec);
00853 #ifdef CRASH_TEST
00854
00855 if (trans_test_crash_point == 9) {
00856 txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec)/2 );
00857 CRASH_POINT(9);
00858 } else
00859 txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
00860 #else
00861 txn_File->write(&drec, sizeof(MSDiskTransHeadRec) + new_offset * sizeof(MSDiskTransRec) , sizeof(MSDiskTransRec) );
00862 #endif
00863 CRASH_POINT(3);
00864
00865 if (TRANS_IS_TERMINATED(tran_type)) {
00866 CRASH_POINT(4);
00867 txn_File->flush();
00868 txn_File->sync();
00869 do_flush = false;
00870 }
00871
00872 if (!txn_HaveOverflow) {
00873 uint64_t rec_offset = txn_EOL;
00874
00875 txn_EOL = new_offset;
00876 txn_EOL++;
00877
00878 if (txn_EOL == txn_MaxRecords) {
00879
00880 txn_EOL = 0;
00881 }
00882
00883 txn_EOLCheckPoint--;
00884 if ((!txn_EOLCheckPoint) || !txn_EOL) {
00885
00886
00887
00888
00889
00890 if (do_flush) {
00891 txn_File->flush();
00892 txn_File->sync();
00893 }
00894
00895 txn_ResetEOL();
00896 }
00897
00898 txn_TransCache->tc_AddRec(rec_offset, &rec, self->myTransRef);
00899
00900 if (txn_GetNumRecords() > txn_HighWaterMark)
00901 txn_HighWaterMark = txn_GetNumRecords();
00902
00903 } else {
00904 txn_TransCache->tc_AddRec(txn_Overflow, &rec, self->myTransRef);
00905 txn_Overflow++;
00906 if (txn_Overflow > txn_HighWaterMark)
00907 txn_HighWaterMark = txn_Overflow;
00908 }
00909
00910 ASSERT(txn_EOL < txn_MaxRecords);
00911 ASSERT(txn_Start < txn_MaxRecords);
00912 exit_();
00913 }
00914
00915 uint64_t MSTrans::txn_GetSize()
00916 {
00917 return sizeof(MSDiskTransHeadRec) + txn_MaxRecords * sizeof(MSDiskTransRec);
00918 }
00919
00920
00921 void MSTrans::txn_NewTransaction()
00922 {
00923 enter_();
00924
00925 self->myTID = 0;
00926
00927 exit_();
00928 }
00929
00930
00931 void MSTrans::txn_PerformIdleTasks()
00932 {
00933 enter_();
00934
00935 if (txn_TransCache->tc_ShoulReloadCache()) {
00936 txn_LoadTransactionCache(txn_TransCache->tc_StartCacheReload());
00937 txn_TransCache->tc_CompleteCacheReload();
00938 exit_();
00939 }
00940
00941
00942
00943 txn_reader->suspendedWait(1000);
00944 exit_();
00945 }
00946
00947
00948 void MSTrans::txn_ResetReadPosition(uint64_t pos)
00949 {
00950 bool rollover = (pos < txn_Start);
00951 enter_();
00952
00953 if (pos >= txn_MaxRecords) {
00954 lock_(this);
00955
00956
00957
00958
00959 txn_Start = txn_MaxRecords;
00960 txn_MaxRecords = txn_Overflow;
00961 txn_EOL = 0;
00962 txn_HaveOverflow = false;
00963 txn_Overflow = 0;
00964
00965 CS_SET_DISK_1(txn_DiskHeader.th_overflow_1, MS_TRANS_NO_OVERFLOW);
00966 CS_SET_DISK_8(txn_DiskHeader.th_list_size_8, txn_MaxRecords);
00967 txn_File->write(&(txn_DiskHeader.th_overflow_1), offsetof(MSDiskTransHeadRec, th_overflow_1), 1);
00968 txn_File->write(&(txn_DiskHeader.th_list_size_8), offsetof(MSDiskTransHeadRec, th_list_size_8), 8);
00969
00970 txn_ResetEOL();
00971
00972 unlock_(this);
00973 } else
00974 txn_Start = pos;
00975
00976 ASSERT(txn_Start <= txn_MaxRecords);
00977
00978 if (!rollover)
00979 txn_StartCheckPoint -= (pos - txn_Start);
00980
00981
00982 if ( rollover || (txn_StartCheckPoint <=0)) {
00983 lock_(this);
00984 CS_SET_DISK_8(txn_DiskHeader.th_start_8, txn_Start);
00985 CS_SET_DISK_8(txn_DiskHeader.th_eol_8, txn_EOL);
00986 txn_File->write(&(txn_DiskHeader.th_start_8), offsetof(MSDiskTransHeadRec, th_start_8), 16);
00987 CRASH_POINT(5);
00988 txn_File->flush();
00989 txn_File->sync();
00990 txn_StartCheckPoint = txn_MaxCheckPoint;
00991 unlock_(this);
00992 }
00993
00994 CRASH_POINT(6);
00995
00996 if (TRANS_CAN_RESIZE)
00997 txn_ResizeLog();
00998
00999 exit_();
01000 }
01001
01002 bool MSTrans::txn_haveNextTransaction()
01003 {
01004 bool terminated = false;
01005 TRef ref;
01006
01007 txn_TransCache->tc_GetTransaction(&ref, &terminated);
01008
01009 return terminated;
01010 }
01011
01012
01013 void MSTrans::txn_GetNextTransaction(MSTransPtr tran, MS_TxnState *state)
01014 {
01015 bool terminated;
01016 uint64_t log_position;
01017 enter_();
01018
01019 ASSERT(txn_reader == self);
01020 lock_(txn_reader);
01021
01022 do {
01023
01024
01025
01026 while ((!txn_IsTxnValid) && !self->myMustQuit) {
01027
01028
01029 while (txn_Doingbackup && !self->myMustQuit)
01030 txn_PerformIdleTasks();
01031
01032 if (txn_TransCache->tc_GetTransaction(&txn_CurrentTxn, &terminated) && terminated) {
01033 txn_IsTxnValid = true;
01034 txn_TxnIndex = 0;
01035 } else
01036 txn_PerformIdleTasks();
01037 }
01038
01039 if (self->myMustQuit)
01040 break;
01041
01042 if (txn_TransCache->tc_GetRecAt(txn_CurrentTxn, txn_TxnIndex++, tran, state))
01043 break;
01044
01045 CRASH_POINT(7);
01046 txn_TransCache->tc_FreeTransaction(txn_CurrentTxn);
01047 CRASH_POINT(8);
01048 if (txn_TransCache->tc_GetTransactionStartPosition(&log_position)) {
01049 txn_ResetReadPosition(log_position);
01050 }else{
01051 if (txn_TransCache->tc_ShoulReloadCache()) {
01052 uint64_t pos = txn_TransCache->tc_StartCacheReload();
01053 txn_ResetReadPosition(pos);
01054 txn_LoadTransactionCache(pos);
01055 txn_TransCache->tc_CompleteCacheReload();
01056 } else {
01057
01058
01059
01060 lock_(this);
01061 if (txn_TransCache->tc_GetTransactionStartPosition(&log_position))
01062 txn_ResetReadPosition(log_position);
01063 else
01064 txn_ResetReadPosition(txn_EOL);
01065 unlock_(this);
01066 }
01067 }
01068
01069 txn_IsTxnValid = false;
01070
01071 } while (1);
01072
01073 unlock_(txn_reader);
01074 exit_();
01075 }
01076
01077
01078 void MSTrans::txn_GetStats(MSTransStatsPtr stats)
01079 {
01080
01081 if (txn_HaveOverflow) {
01082 stats->ts_IsOverflowing = true;
01083 stats->ts_LogSize = txn_Overflow;
01084 } else {
01085 stats->ts_IsOverflowing = false;
01086 stats->ts_LogSize = txn_GetNumRecords();
01087 }
01088 stats->ts_PercentFull = (stats->ts_LogSize * 100) / CS_GET_DISK_8(txn_DiskHeader.th_requested_list_size_8);
01089
01090 stats->ts_MaxSize = txn_HighWaterMark;
01091 stats->ts_OverflowCount = txn_OverflowCount;
01092
01093 stats->ts_TransCacheSize = txn_TransCache->tc_GetCacheUsed();
01094 stats->ts_PercentTransCacheUsed = txn_TransCache->tc_GetPercentCacheUsed();
01095 stats->ts_PercentCacheHit = txn_TransCache->tc_GetPercentCacheHit();
01096 }
01097
01098 void MSTrans::txn_SetCacheSize(uint32_t new_size)
01099 {
01100 enter_();
01101
01102
01103 lock_(txn_reader);
01104 lock_(this);
01105
01106 CS_SET_DISK_4(txn_DiskHeader.th_requested_cache_size_4, new_size);
01107
01108 txn_File->write(&(txn_DiskHeader.th_requested_cache_size_4), offsetof(MSDiskTransHeadRec, th_requested_cache_size_4), 4);
01109 txn_File->flush();
01110 txn_File->sync();
01111
01112 txn_TransCache->tc_SetSize(new_size);
01113
01114 unlock_(this);
01115 unlock_(txn_reader);
01116 exit_();
01117 }
01118
01119 void MSTrans::txn_SetLogSize(uint64_t new_size)
01120 {
01121 enter_();
01122
01123
01124
01125 lock_(txn_reader);
01126 lock_(this);
01127
01128 txn_ReqestedMaxRecords = (new_size - sizeof(MSDiskTransHeadRec)) / sizeof(MSDiskTransRec);
01129
01130 if (txn_ReqestedMaxRecords < 10)
01131 txn_ReqestedMaxRecords = 10;
01132
01133 CS_SET_DISK_8(txn_DiskHeader.th_requested_list_size_8, txn_ReqestedMaxRecords);
01134
01135 txn_File->write(&(txn_DiskHeader.th_requested_list_size_8), offsetof(MSDiskTransHeadRec, th_requested_list_size_8), 8);
01136 txn_File->flush();
01137 txn_File->sync();
01138
01139 unlock_(this);
01140 unlock_(txn_reader);
01141
01142 exit_();
01143 }
01144
01145
01146 class DBSearchTXNLog : ReadTXNLog {
01147 public:
01148 DBSearchTXNLog(MSTrans *log): ReadTXNLog(log), sdb_db_id(0), sdb_isDirty(false) {}
01149
01150 uint32_t sdb_db_id;
01151 bool sdb_isDirty;
01152
01153 virtual bool rl_CanContinue() { return true;}
01154 virtual void rl_Load(uint64_t log_position, MSTransPtr rec)
01155 {
01156 if (rec->tr_db_id == sdb_db_id) {
01157 sdb_isDirty = true;
01158 rec->tr_db_id = 0;
01159 rl_Store(log_position, rec);
01160 }
01161 }
01162
01163 void SetDataBaseIDToZero(uint32_t db_id)
01164 {
01165 sdb_db_id = db_id;
01166 rl_ReadLog(rl_log->txn_GetStartPosition(), false);
01167 if (sdb_isDirty)
01168 rl_Flush();
01169 }
01170 };
01171
01172
01173
01174
01175 void MSTrans::txn_dropDatabase(uint32_t db_id)
01176 {
01177 enter_();
01178
01179
01180
01181 lock_(txn_reader);
01182 lock_(this);
01183
01184
01185 txn_TransCache->tc_dropDatabase(db_id);
01186
01187
01188
01189 DBSearchTXNLog searchLog(this);
01190
01191 searchLog.SetDataBaseIDToZero(db_id);
01192
01193 unlock_(this);
01194 unlock_(txn_reader);
01195 exit_();
01196 }
01197
01198 #ifdef DEBUG
01199 void MSTrans::txn_DumpLog(const char *file)
01200 {
01201 size_t size, read_start = 0;
01202 FILE *fptr;
01203 enter_();
01204
01205 fptr = fopen(file, "w+");
01206 if (!fptr) {
01207 perror(file);
01208 return;
01209 }
01210
01211 if (txn_Overflow)
01212 size = txn_Overflow;
01213 else
01214 size = txn_MaxRecords;
01215
01216
01217 while (size) {
01218 MSDiskTransRec diskRecords[1000];
01219 uint32_t read_size;
01220 off64_t offset;
01221
01222 if (size > 1000)
01223 read_size = 1000 ;
01224 else
01225 read_size = size ;
01226
01227
01228 offset = sizeof(MSDiskTransHeadRec) + read_start * sizeof(MSDiskTransRec);
01229 txn_File->read(diskRecords, offset, read_size* sizeof(MSDiskTransRec), read_size* sizeof(MSDiskTransRec));
01230
01231 for (uint32_t i = 0; i < read_size; i++) {
01232 const char *ttype, *cmt;
01233 MSTransRec rec;
01234 MSDiskTransPtr drec = diskRecords + i;
01235 GET_DISK_TRANSREC(&rec, drec);
01236
01237 switch (TRANS_TYPE(rec.tr_type)) {
01238 case MS_ReferenceTxn:
01239 ttype = "+";
01240 break;
01241 case MS_DereferenceTxn:
01242 ttype = "-";
01243 break;
01244 case MS_RollBackTxn:
01245 ttype = "rb";
01246 rec.tr_blob_ref_id = 0;
01247 break;
01248 case MS_RecoveredTxn:
01249 ttype = "rcov";
01250 rec.tr_blob_ref_id = 0;
01251 break;
01252 default:
01253 ttype = "???";
01254 }
01255
01256 if (TRANS_IS_TERMINATED(rec.tr_type))
01257 cmt = "c";
01258 else
01259 cmt = "";
01260
01261
01262 fprintf(fptr, "%"PRIu32" \t\t%s%s %"PRIu64" %"PRIu32" \t %s %s %s\n", rec.tr_id, ttype, cmt, rec.tr_blob_ref_id, rec.tr_tab_id,
01263 ((read_start + i) == txn_Start) ? "START":"",
01264 ((read_start + i) == txn_EOL) ? "EOL":"",
01265 ((read_start + i) == txn_MaxRecords) ? "OverFlow":""
01266 );
01267 }
01268
01269 size -= read_size;
01270 read_start += read_size;
01271 }
01272 fclose(fptr);
01273 exit_();
01274 }
01275
01276 #endif
01277