Drizzled Public API Documentation

trans_cache_ms.cc

00001 /* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Barry Leslie
00020  *
00021  * 2009-06-10
00022  *
00023  * H&G2JCtL
00024  *
00025  * PBMS transaction cache.
00026  *
00027  */
00028 
00029 #include "cslib/CSConfig.h"
00030 #include <inttypes.h>
00031 
00032 #include "cslib/CSGlobal.h"
00033 
00034 #include "trans_cache_ms.h"
00035 
00036 #define LIST_INC_SIZE 256 // If the list starts to grow it is probably because a backup is in progress so it could get quite large.
00037 #define MIN_LIST_SIZE 32  // A list size that should be able to handle a normal transaction load.
00038 #define MIN_CACHE_RECORDS 2 
00039 
00040 typedef struct myTrans {
00041   uint8_t tc_type;    // The transaction type. If the first bit is set then the transaction is an autocommit.
00042   uint32_t  tc_db_id;   // The database ID for the operation.
00043   uint32_t  tc_tab_id;    // The table ID for the operation.
00044   bool  tc_rolled_back; // 'true' if this action has been rolled back.
00045   uint64_t  tc_blob_id;   // The blob ID for the operation.
00046   uint64_t  tc_blob_ref_id; // The blob reference id.
00047   uint64_t  tc_position;  // The log position of the record.
00048 } myTransRec, *myTransPtr;
00049 
00050 #define BAD_LOG_POSITION ((uint64_t) -1)
00051 typedef struct TransList {
00052 #ifdef DEBUG
00053   uint32_t    old_tid;
00054 #endif
00055   uint32_t    tid;
00056   uint64_t    log_position; // The transaction log position of the start of the transaction.
00057   MS_TxnState terminated;   // 
00058   size_t    size;     // The allocated size of the list.
00059   size_t    len;      // The number of records in the list that are being used.
00060   myTransPtr  list;
00061 } TransListRec, *TransListPtr;
00062 
00063 MSTransCache::MSTransCache(): CSSharedRefObject(),
00064   tc_List(NULL),
00065   tc_OverFlow(NULL),
00066   tc_Size(0),
00067   tc_EOL(0),
00068   tc_First(0),
00069   tc_Used(0),
00070   tc_TotalTransCount(0),
00071   tc_TotalCacheCount(0),
00072   tc_ReLoadingThread(NULL),
00073   tc_OverFlowTID(0),
00074   tc_Full(false),
00075   tc_CacheVersion(0),
00076   tc_Recovering(false)
00077   {}
00078 
00079 MSTransCache::~MSTransCache() 
00080 {
00081   if (tc_List) {
00082     for (uint32_t i = 0; i < tc_Size; i++) {
00083       if (tc_List[i].list)
00084         cs_free(tc_List[i].list);
00085     }
00086     cs_free(tc_List);
00087   }
00088 }
00089 
00090 MSTransCache *MSTransCache::newMSTransCache(uint32_t min_size)
00091 {
00092   MSTransCache *tl = NULL;
00093   enter_();
00094   
00095   new_(tl, MSTransCache());
00096   push_(tl);
00097   
00098   if (MIN_LIST_SIZE > min_size)
00099     min_size = MIN_LIST_SIZE;
00100     
00101   tl->tc_Initialize(min_size);
00102     
00103   pop_(tl);
00104   
00105   return_(tl);
00106 }
00107   
00108 
00109 void MSTransCache::tc_Initialize(uint32_t size)
00110 {
00111   enter_();
00112     
00113   tc_Size = size;
00114   size++; // Add an extra for the overflow
00115   tc_List = (TransListPtr) cs_malloc(size * sizeof(TransListRec));
00116   
00117   // Give each new transaction list record a short list of transaction records
00118   for (uint32_t i = 0; i < tc_Size; i++) {
00119     tc_List[i].list = (myTransPtr) cs_malloc(MIN_CACHE_RECORDS * sizeof(myTransRec));
00120     tc_List[i].size = MIN_CACHE_RECORDS;
00121     tc_List[i].len = 0;
00122     tc_List[i].tid = 0;
00123     tc_List[i].log_position = 0;
00124     tc_List[i].terminated = MS_Running;
00125   }
00126 
00127   tc_OverFlow = tc_List + tc_Size;
00128   
00129   tc_OverFlow->list = NULL;
00130   tc_OverFlow->size = 0;
00131   tc_OverFlow->len = 0;
00132   tc_OverFlow->tid = 0;
00133   tc_OverFlow->log_position = 0;
00134   tc_OverFlow->terminated = MS_Running;
00135   exit_();
00136 }
00137 
00138 //--------------------
00139 void MSTransCache::tc_SetSize(uint32_t cache_size)
00140 {
00141   enter_();
00142 
00143   lock_(this);
00144   
00145   if (cache_size < MIN_LIST_SIZE)
00146     cache_size = MIN_LIST_SIZE;
00147   
00148   // If the cache is being reduced then free the record 
00149   // lists if the transactions about to be removed.
00150   for (uint32_t i = cache_size +1; i < tc_Size; i++) {
00151     if (tc_List[i].list)
00152       cs_free(tc_List[i].list);
00153   }
00154 
00155   // Add one to cache_size for overflow.  
00156   cs_realloc((void **) &tc_List, (cache_size +1) * sizeof(TransListRec));
00157   
00158   if (cache_size > tc_Size) {
00159     // Move the overflow record.
00160     memcpy(tc_List + cache_size, tc_List + tc_Size, sizeof(TransListRec));
00161     
00162     for (uint32_t i = tc_Size; i < cache_size; i++) {
00163       tc_List[i].list = (myTransPtr) cs_malloc(MIN_CACHE_RECORDS * sizeof(myTransRec));
00164       tc_List[i].size = MIN_CACHE_RECORDS;
00165       tc_List[i].len = 0;
00166       tc_List[i].tid = 0;
00167       tc_List[i].log_position = 0;
00168       tc_List[i].terminated = MS_Running;
00169     }
00170     
00171   }
00172   
00173   
00174   tc_Size = cache_size;
00175   tc_OverFlow = tc_List + tc_Size;
00176   
00177   unlock_(this);
00178 
00179   exit_();
00180 }
00181 
00182 bool MSTransCache::tc_ShoulReloadCache()
00183 {
00184   return (((tc_Used +1) < tc_Size) && tc_Full);
00185 }
00186 
00187 uint64_t MSTransCache::tc_StartCacheReload(bool startup)
00188 {
00189   enter_();
00190   
00191   (void) startup;
00192   
00193   ASSERT((startup) || tc_Full);
00194   tc_ReLoadingThread = self;
00195   tc_OverFlowTID = tc_OverFlow->tid;
00196   
00197   self->myTID = 0;
00198   self->myTransRef = 0;
00199 #ifdef DEBUG
00200     tc_ReloadCnt =0;
00201 #endif    
00202 
00203   return_(tc_OverFlow->log_position);
00204 }
00205 
00206 bool MSTransCache::tc_ContinueCacheReload()
00207 {
00208   // Reload should continue until the list is full again and the termination records 
00209   // for the first and overflow transactions have been found.
00210   //
00211   // It is assumed the reload will also stop if there are no more records to
00212   // be read in from the log.
00213   
00214   return ((tc_List[tc_First].terminated == MS_Running) || // Keep searching for the terminator for the first txn.
00215       (tc_OverFlow->tid == tc_OverFlowTID) || // The old overflow txn has not yet been loaded.
00216       (tc_OverFlow->terminated == MS_Running) // If the overflow tnx is terminated then the cache is also full. 
00217       );
00218 }
00219 
00220 
00221 void MSTransCache::tc_CompleteCacheReload()
00222 {
00223   enter_();
00224   
00225   tc_ReLoadingThread = NULL;
00226   if (tc_OverFlowTID) { // Clear the overflow condition;
00227     tc_OverFlow->tid = 0;
00228     tc_OverFlowTID = 0;
00229     tc_Full = false;
00230   }
00231   
00232   exit_();
00233 }
00234 
00235 #define OVERFLOW_TREF (tc_Size)
00236 #define MAX_TREF (OVERFLOW_TREF +1)
00237 
00238 // Create a new transaction record for the specified 
00239 // transaction.
00240 TRef MSTransCache::tc_NewTransaction(uint32_t tid)
00241 {
00242   TRef ref;
00243   enter_();
00244   
00245   ASSERT(tid);
00246   
00247   if (self != tc_ReLoadingThread) {
00248     tc_TotalTransCount++;     
00249   }
00250     
00251   // Once we have entered an overflow state we remain in it until
00252   // the cache has been reloaded even if there is now space in the cache.
00253   // This is to ensure that the transactions are loaded into the cache
00254   // in the correct order.
00255   // While reloading, make sure that any attempt to add a transaction by any thread
00256   // other than tc_ReLoadingThread recieves an overflow condition. 
00257   
00258   if (tc_Full) {
00259     if (tc_ReLoadingThread != self) {
00260       ref = MAX_TREF;
00261       goto done;
00262     }
00263 
00264 #ifdef DEBUG
00265     if (!tc_ReloadCnt) {
00266       ASSERT(tc_OverFlow->tid == tid); // The first txn reloaded should be the overflow txn
00267     }
00268     tc_ReloadCnt++;
00269 #endif  
00270     if (tid == tc_OverFlowTID) {
00271 #ifdef DEBUG
00272       tc_OverFlow->old_tid = tid;
00273 #endif  
00274       tc_OverFlow->tid = 0;
00275       tc_OverFlow->terminated = MS_Running;
00276       ASSERT((tc_Used +1) < tc_Size); // There should be room in the list for the old everflow txn.
00277     } else if (tc_OverFlowTID == 0) {
00278       // We are seaching for the end of the overflow txn
00279       // and found the start of another txn.
00280       ref = MAX_TREF;
00281       goto done;      
00282     }
00283   }
00284 
00285   if ((tc_Used +1) == tc_Size){ 
00286     // The cache is full.
00287     tc_OverFlowTID = 0;
00288     tc_OverFlow->tid = tid; // save the tid of the first transaction to overflow.
00289     tc_OverFlow->log_position = BAD_LOG_POSITION;
00290     tc_OverFlow->len = 0; 
00291     tc_OverFlow->terminated = MS_Running; 
00292     ref = OVERFLOW_TREF;
00293     tc_Full = true;
00294 #ifdef DEBUG
00295     tc_ReloadCnt++;
00296 #endif  
00297         
00298     goto done;
00299   }
00300   
00301   if (self != tc_ReLoadingThread) {
00302     tc_TotalCacheCount++;     
00303   }
00304 
00305   ref = tc_EOL;
00306   
00307 #ifdef CHECK_TIDS
00308 {
00309 static uint32_t last_tid = 0;
00310 static bool last_state = false;
00311 if (tc_Recovering != last_state)
00312   last_tid = 0;
00313   
00314 last_state = tc_Recovering;
00315 if (!( ((last_tid + 1) == tid) || !last_tid))
00316   printf("Expected tid %"PRIu32"\n", last_tid + 1);
00317 ASSERT( ((last_tid + 1) == tid) || !last_tid);
00318 last_tid = tid;
00319 }
00320 #endif
00321     
00322   tc_List[ref].tid = tid;
00323   tc_List[ref].len = 0;
00324   tc_List[ref].log_position = BAD_LOG_POSITION;
00325   tc_List[ref].terminated = MS_Running;
00326 
00327   // Update these after initializing the structure because
00328   // the reader thread may read it as soon as tc_EOL is updated.
00329   tc_Used++;
00330   tc_EOL++;
00331 
00332   if (tc_EOL == tc_Size)
00333     tc_EOL = 0;
00334 
00335 done: 
00336   self->myTID = tid;
00337   self->myTransRef = ref;
00338   self->myCacheVersion = tc_CacheVersion;
00339   return_(ref);
00340 }
00341 
00342 void MSTransCache::tc_FindTXNRef(uint32_t tid, TRef *tref)
00343 {
00344   uint32_t i = tc_First;
00345   enter_();
00346   
00347   // Search for the record
00348   if (tc_First > tc_EOL) {
00349     for (; i < OVERFLOW_TREF && *tref >= MAX_TREF; i++) {
00350       if (tc_List[i].tid == tid)
00351         *tref = i;
00352     }
00353     i = 0;
00354   }
00355   
00356   for (; i < tc_EOL && *tref >= MAX_TREF; i++) {
00357     if (tc_List[i].tid == tid)
00358       *tref = i;
00359   }
00360 
00361   // Do not return the overflow reference if the tid = tc_OverFlowTID.
00362   // This may seem a bit strange but it is needed so that the overflow txn
00363   // will get a new non-overflow cache slot when it is reloaded.  
00364   if ((*tref >= MAX_TREF) && (tid == tc_OverFlow->tid) && (tid != tc_OverFlowTID))
00365     *tref = OVERFLOW_TREF;
00366     
00367   self->myTID = tid;
00368   self->myTransRef = *tref;
00369   self->myCacheVersion = tc_CacheVersion;
00370   exit_();
00371 }
00372 
00373 // Add a transaction record to an already existing transaction
00374 // or possible creating a new one. Depending on the record added this may
00375 // also commit or rollback the transaction.
00376 void MSTransCache::tc_AddRec(uint64_t log_position, MSTransPtr rec, TRef tref)
00377 {
00378   TransListPtr lrec;
00379   enter_();
00380   
00381   lock_(this);
00382 
00383   //---------
00384   if (tref == TRANS_CACHE_UNKNOWN_REF) { // It is coming from a reload
00385     ASSERT(tc_ReLoadingThread == self); // Sanity check here
00386 
00387     if ((self->myTID == rec->tr_id) && (self->myTransRef != TRANS_CACHE_UNKNOWN_REF))
00388       tref = self->myTransRef;
00389     else {
00390       tc_FindTXNRef(rec->tr_id, &tref);
00391       if (tref == TRANS_CACHE_UNKNOWN_REF) {
00392         if (!TRANS_IS_START(rec->tr_type)) 
00393           goto done; // Ignore partial tansaction reloads.
00394           
00395         tref = tc_NewTransaction(rec->tr_id);
00396       }
00397     }
00398   }
00399   
00400   ASSERT((tref <= MAX_TREF) || (tref == TRANS_CACHE_NEW_REF));
00401   ASSERT(self->myTID == rec->tr_id);
00402   
00403   //---------
00404   if (tref >= OVERFLOW_TREF) {
00405     if (tref == TRANS_CACHE_NEW_REF) {
00406       ASSERT(TRANS_IS_START(rec->tr_type));
00407       tref = tc_NewTransaction(rec->tr_id);
00408     } else if (self->myCacheVersion != tc_CacheVersion) {
00409       // Check to see if the transaction if now in the cache
00410       tc_FindTXNRef(rec->tr_id, &tref);
00411     }
00412     
00413     if (tref >= OVERFLOW_TREF){ // Overflow.
00414       if (tref == OVERFLOW_TREF) {
00415         if (!tc_OverFlow->len)
00416           tc_OverFlow->log_position = log_position;
00417           
00418         tc_OverFlow->len++;
00419         if (TRANS_IS_TERMINATED(rec->tr_type)) {
00420           if (rec->tr_type == MS_RollBackTxn)
00421             tc_OverFlow->terminated = MS_RolledBack;
00422           else if (rec->tr_type == MS_RecoveredTxn)
00423             tc_OverFlow->terminated = MS_Recovered;
00424           else
00425             tc_OverFlow->terminated = MS_Committed;
00426         }
00427       }
00428       
00429       goto done;
00430     }
00431   }
00432 
00433   lrec = tc_List + tref;
00434   
00435   ASSERT(lrec->tid);
00436   ASSERT(lrec->tid == rec->tr_id);
00437   
00438   if (!lrec->len) { // The first record in the transaction
00439     lrec->log_position = log_position;
00440   } else if (( (TRANS_TYPE(rec->tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec->tr_type) == MS_DereferenceTxn)) && !tc_Recovering) { 
00441     // Make sure the record isn't already in the list.
00442     // This can happen during cache reload.
00443     for (uint32_t i = 0; i < lrec->len; i++) {
00444       if (lrec->list[i].tc_position == log_position)
00445         goto done;
00446     }
00447   }
00448   
00449   // During recovery there is no need to cache the records.
00450   if (!tc_Recovering) {
00451     switch (TRANS_TYPE(rec->tr_type)) {
00452       case MS_RollBackTxn:
00453       case MS_Committed:
00454       case MS_RecoveredTxn:
00455         // This is handled below;
00456         break;
00457         
00458       case MS_PartialRollBackTxn:
00459       {
00460         // The rollback position is stored in the place for the database id.
00461         for (uint32_t i = rec->tr_db_id;i < lrec->len; i++)
00462           lrec->list[i].tc_rolled_back = true;
00463           
00464         break;
00465       }
00466 
00467       case MS_ReferenceTxn:
00468       case MS_DereferenceTxn:
00469       {
00470         myTransPtr my_rec;
00471         
00472         if (lrec->len == lrec->size) { //Grow the list if required
00473           cs_realloc((void **) &(lrec->list), (lrec->size + 10)* sizeof(myTransRec));
00474           lrec->size += 10;   
00475         }
00476       
00477         my_rec = lrec->list + lrec->len;
00478         my_rec->tc_type = rec->tr_type;
00479         my_rec->tc_db_id = rec->tr_db_id;
00480         my_rec->tc_tab_id = rec->tr_tab_id;
00481         my_rec->tc_blob_id = rec->tr_blob_id;
00482         my_rec->tc_blob_ref_id = rec->tr_blob_ref_id;
00483         my_rec->tc_position = log_position;
00484         my_rec->tc_rolled_back = false;
00485         
00486         lrec->len++;        
00487         break;
00488       }
00489       
00490     }
00491   } else if ( (TRANS_TYPE(rec->tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec->tr_type) == MS_DereferenceTxn))
00492     lrec->len++;
00493   
00494   
00495   // Check to see if this is a commit or rollback 
00496   // Do this last because as soon as it is marked as terminated
00497   // the reader thread may start processing it.
00498   if (TRANS_IS_TERMINATED(rec->tr_type)) {
00499     if (rec->tr_type == MS_RollBackTxn)
00500       lrec->terminated = MS_RolledBack;
00501     else if (rec->tr_type == MS_RecoveredTxn)
00502       lrec->terminated = MS_Recovered;
00503     else
00504       lrec->terminated = MS_Committed;
00505   }
00506   
00507 done:
00508   unlock_(this);    
00509   exit_();
00510 }
00511 
00512 // Get the transaction ref of the first transaction in the list.
00513 // Sets committed to true or false depending on if the transaction is terminated.
00514 // If there is no trsansaction then false is returned.
00515 bool MSTransCache::tc_GetTransaction(TRef *ref, bool *terminated)
00516 {
00517   if (!tc_Used)
00518     return false;
00519   
00520   ASSERT(tc_List[tc_First].tid);
00521   
00522   *ref =  tc_First;
00523   *terminated = (tc_List[tc_First].terminated != MS_Running);
00524   
00525   return true;
00526 } 
00527 
00528 //----------
00529 bool MSTransCache::tc_GetTransactionStartPosition(uint64_t *log_position)
00530 {
00531   if ((!tc_Used) || (tc_List[tc_First].len == 0))
00532     return false;
00533     
00534   *log_position = tc_List[tc_First].log_position;
00535   return true;
00536 }
00537 
00538 //----------
00539 MS_TxnState MSTransCache::tc_TransactionState(TRef ref)
00540 {
00541   ASSERT((ref < tc_Size) && tc_List[ref].tid);
00542   
00543   return tc_List[ref].terminated;
00544 } 
00545 
00546 uint32_t MSTransCache::tc_GetTransactionID(TRef ref)
00547 {
00548   ASSERT((ref < tc_Size) && tc_List[ref].tid);
00549   
00550   return (tc_List[ref].tid);
00551 } 
00552   
00553 // Remove the transaction and all record associated with it.
00554 void MSTransCache::tc_FreeTransaction(TRef tref)
00555 {
00556   TransListPtr lrec;
00557   enter_();
00558   ASSERT(tc_Used && (tref < tc_Size) && tc_List[tref].tid);
00559   
00560 #ifdef CHECK_TIDS
00561 {
00562 static uint32_t last_tid = 0;
00563 static bool last_state = false;
00564 if (tc_Recovering != last_state)
00565   last_tid = 0;
00566   
00567 last_state = tc_Recovering;
00568 ASSERT( ((last_tid + 1) == tc_List[tref].tid) || !last_tid);
00569 last_tid = tc_List[tref].tid;
00570 }
00571 #endif
00572 
00573   lrec = tc_List + tref;
00574 #ifdef DEBUG
00575   lrec->old_tid = lrec->tid;
00576 #endif
00577   lrec->tid = 0;
00578   lrec->len = 0;
00579   
00580   if (lrec->size > 10) { // Free up some excess records.
00581     cs_realloc((void **) &(lrec->list), 10* sizeof(myTransRec));
00582     lrec->size = 10;    
00583   }
00584 
00585   lock_(this);
00586   tc_Used--;
00587   
00588   if (tref == tc_First) { // Reset the start of the list.
00589     TRef eol = tc_EOL; // cache this incase it changes 
00590     
00591     // Skip any unused records indicated by a zero tid.
00592     if (tc_First > eol) {
00593       for (; tc_First < tc_Size && !tc_List[tc_First].tid; tc_First++) ;
00594       
00595       if (tc_First == tc_Size)
00596         tc_First = 0;
00597     }
00598     
00599     for (; tc_First < eol && !tc_List[tc_First].tid; tc_First++) ;
00600   }
00601   
00602   ASSERT( (tc_Used == 0 && tc_First == tc_EOL) || (tc_Used != 0 && tc_First != tc_EOL)); 
00603 
00604   unlock_(this);
00605 
00606   exit_();
00607 }
00608 
00609 //--------------------
00610 bool MSTransCache::tc_GetRecAt(TRef tref, size_t index, MSTransPtr rec, MS_TxnState *state)
00611 {
00612   TransListPtr lrec;
00613   bool found = false;
00614 
00615   ASSERT(tc_Used && (tref < tc_Size) && tc_List[tref].tid);
00616 #ifdef CHECK_TIDS
00617 {
00618   static uint32_t last_tid = 0;
00619   ASSERT( ((last_tid + 1) == tc_List[tref].tid) || (last_tid  == tc_List[tref].tid) || !last_tid);
00620   last_tid = tc_List[tref].tid;
00621 }
00622 #endif
00623   
00624   lrec = tc_List + tref;
00625   if (index < lrec->len) {
00626     myTransPtr my_rec = lrec->list + index;
00627     
00628     rec->tr_type = my_rec->tc_type;
00629     rec->tr_db_id = my_rec->tc_db_id;
00630     rec->tr_tab_id = my_rec->tc_tab_id;
00631     rec->tr_blob_id = my_rec->tc_blob_id;
00632     rec->tr_blob_ref_id = my_rec->tc_blob_ref_id;
00633     rec->tr_id = lrec->tid;
00634     rec->tr_check = 0;
00635     if (my_rec->tc_rolled_back)
00636       *state = MS_RolledBack;
00637     else
00638       *state = lrec->terminated;
00639       
00640     found = true;
00641   }
00642   
00643   return found;
00644 }
00645 
00646 //--------------------
00647 void MSTransCache::tc_dropDatabase(uint32_t db_id)
00648 {
00649   enter_();
00650   lock_(this);
00651   if (tc_List) {
00652     for (uint32_t i = 0; i < tc_Size; i++) {
00653       myTransPtr rec = tc_List[i].list;
00654       if (rec) {
00655         uint32_t list_len = tc_List[i].len;     
00656         while (list_len) {
00657           if (rec->tc_db_id == db_id)
00658             rec->tc_db_id = 0;
00659           list_len--; 
00660           rec++;
00661         }
00662       }
00663     }   
00664   }
00665   
00666   unlock_(this);
00667   exit_();
00668 }