Drizzled Public API Documentation

transaction_ms.cc

00001 /* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Barry Leslie
00020  *
00021  * 2009-07-09
00022  *
00023  * H&G2JCtL
00024  *
00025  * PBMS transaction daemon.
00026  *
00027  *
00028  */
00029 
00030 #include "cslib/CSConfig.h"
00031 
00032 #include <inttypes.h>
00033 
00034 #include "defs_ms.h"
00035 
00036 #include "cslib/CSGlobal.h"
00037 #include "cslib/CSStrUtil.h"
00038 #include "cslib/CSLog.h"
00039 
00040 #include "mysql_ms.h"
00041 #include "open_table_ms.h"
00042 #include "trans_log_ms.h"
00043 #include "transaction_ms.h"
00044 #include "pbmsdaemon_ms.h"
00045 
00046 /*
00047  * The pbms_ functions are utility functions supplied by ha_pbms.cc
00048  */
00049 void  pbms_take_part_in_transaction(void *thread);
00050 
00051 MSTrans *MSTransactionManager::tm_Log;
00052 MSTransactionThread *MSTransactionManager::tm_Reader;
00053 
00054 
00055 typedef  struct {
00056   CSDiskValue4  lr_time_4;    // The database ID for the operation.
00057   CSDiskValue1  lr_state_1;   // The transaction state. 
00058   CSDiskValue1  lr_type_1;    // The transaction type. If the first bit is set then the transaction is an autocommit.
00059   CSDiskValue4  lr_db_id_4;   // The database ID for the operation.
00060   CSDiskValue4  lr_tab_id_4;  // The table ID for the operation.
00061   CSDiskValue8  lr_blob_id_8; // The blob ID for the operation.
00062   CSDiskValue8  lr_blob_ref_id_8;// The blob reference id.
00063 } MSDiskLostRec, *MSDiskLostPtr;
00064 
00065 /*
00066  * ---------------------------------------------------------------
00067  * The transaction reader thread 
00068  */
00069 
00070 class MSTransactionThread : public CSDaemon {
00071 public:
00072   MSTransactionThread(MSTrans *txn_log);
00073   
00074   virtual ~MSTransactionThread(){} // Do nothing here because 'self' will no longer be valid, use completeWork().
00075 
00076 
00077   void close();
00078 
00079   virtual bool doWork();
00080 
00081   virtual void *completeWork();
00082   
00083   void flush();
00084   
00085   bool trt_is_ready;
00086 private:
00087   void reportLostReference(MSTransPtr rec, MS_TxnState state);
00088   void dereference(MSTransPtr rec, MS_TxnState state);
00089   void commitReference(MSTransPtr rec, MS_TxnState state);
00090   
00091   MSTrans *trt_log;
00092   CSFile  *trt_lostLog;
00093 
00094 };
00095 
00096 MSTransactionThread::MSTransactionThread(MSTrans *txn_log):
00097 CSDaemon(0, NULL),
00098 trt_is_ready(false),
00099 trt_log(txn_log),
00100 trt_lostLog(NULL)
00101 {
00102   trt_log->txn_SetReader(this);
00103 }
00104 
00105 void MSTransactionThread::close()
00106 {
00107   if (trt_lostLog)
00108     trt_lostLog->close();
00109 }
00110 
00111 void MSTransactionThread::reportLostReference(MSTransPtr rec, MS_TxnState state)
00112 {
00113   MSDiskLostRec lrec;
00114   const char *t_txt, *s_txt;
00115   char b1[16], b2[16], msg[100];
00116   MSDatabase *db;
00117   MSTable *tab;
00118   
00119   //if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == true)
00120     //return;
00121 
00122   enter_();
00123   // Do not report errors caused by missing databases or tables.
00124   // This can happen if the transaction log is reread after a crash
00125   // and transactions are found that belonged to dropped databases
00126   // or tables.
00127   db = MSDatabase::getDatabase(rec->tr_db_id, true);
00128   if (!db)
00129     goto dont_worry_about_it;
00130     
00131   push_(db);
00132   tab = db->getTable(rec->tr_tab_id, true);
00133   release_(db);
00134   if (!tab)
00135     goto dont_worry_about_it;
00136   tab->release();
00137   
00138   switch (state) {
00139     case MS_Committed:
00140       s_txt = "Commit";
00141       break;
00142     case MS_RolledBack:
00143       s_txt = "RolledBack";
00144       break;
00145     case MS_Recovered:
00146       s_txt = "Recovered";
00147       break;
00148     case MS_Running:
00149       s_txt = "Running";
00150       break;
00151     default:
00152       snprintf(b1, 16, "(%d)?", state);
00153       s_txt = b1;
00154   }
00155 
00156   switch (TRANS_TYPE(rec->tr_type)) {
00157     case MS_DereferenceTxn:
00158       t_txt = "Dereference";
00159       break;
00160     case MS_ReferenceTxn:
00161       t_txt = "Reference";
00162       break;
00163     default:
00164       snprintf(b2, 16, "(%x)?", rec->tr_type);
00165       t_txt = b2;
00166   }
00167 
00168   snprintf(msg, 100, "Lost PBMS record: %s %s db_id: %"PRIu32" tab_id: %"PRIu32" blob_id: %"PRIu64"", s_txt, t_txt, rec->tr_db_id, rec->tr_tab_id, rec->tr_blob_id);
00169   CSL.logLine(self, CSLog::Warning, msg);
00170 
00171   CS_SET_DISK_4(lrec.lr_time_4, time(NULL));
00172   CS_SET_DISK_1(lrec.lr_state_1, state);
00173   CS_SET_DISK_1(lrec.lr_type_1, rec->tr_type);
00174   CS_SET_DISK_4(lrec.lr_db_id_4, rec->tr_db_id);
00175   CS_SET_DISK_4(lrec.lr_tab_id_4, rec->tr_tab_id);
00176   CS_SET_DISK_8(lrec.lr_blob_id_8, rec->tr_blob_id);
00177   CS_SET_DISK_8(lrec.lr_blob_ref_id_8, rec->tr_blob_ref_id);
00178   
00179   if (!trt_lostLog) {
00180     CSPath *path;
00181     char *str = cs_strdup(trt_log->txn_GetTXNLogPath());
00182     cs_remove_last_name_of_path(str);
00183     
00184     path = CSPath::newPath(str, "pbms_lost_txn.dat");
00185     cs_free(str);
00186     
00187     trt_lostLog = CSFile::newFile(path);
00188     trt_lostLog->open(CSFile::CREATE);
00189   }
00190   trt_lostLog->write(&lrec, trt_lostLog->getEOF(), sizeof(MSDiskLostRec));
00191   trt_lostLog->sync();
00192   
00193 dont_worry_about_it:
00194   exit_();
00195   
00196 }
00197 
00198 void MSTransactionThread::dereference(MSTransPtr rec, MS_TxnState state)
00199 {
00200   enter_();
00201   
00202   try_(a) {
00203     MSOpenTable   *otab;
00204     otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
00205     frompool_(otab);
00206     otab->freeReference(rec->tr_blob_id, rec->tr_blob_ref_id);
00207     backtopool_(otab);
00208   }
00209   
00210   catch_(a) {
00211     reportLostReference(rec, state);
00212   }
00213   
00214   cont_(a); 
00215   exit_();
00216 }
00217 
00218 void MSTransactionThread::commitReference(MSTransPtr rec, MS_TxnState state)
00219 {
00220   enter_();
00221   
00222   try_(a) {
00223     MSOpenTable   *otab;
00224     otab = MSTableList::getOpenTableByID(rec->tr_db_id, rec->tr_tab_id);
00225     frompool_(otab);
00226     otab->commitReference(rec->tr_blob_id, rec->tr_blob_ref_id);
00227     backtopool_(otab);
00228   }
00229   
00230   catch_(a) {
00231     reportLostReference(rec, state);
00232   }
00233   
00234   cont_(a); 
00235   exit_();
00236 }
00237 
00238 void MSTransactionThread::flush()
00239 {
00240   enter_();
00241   
00242   // For now I just wait until the transaction queue is empty or
00243   // the transaction at the head of the queue has not yet been
00244   // committed.
00245   //
00246   // What needs to be done is for the transaction log to scan 
00247   // past the non commited transaction to see if there are any
00248   // other committed transaction in the log and apply them if found.
00249   
00250   wakeup(); // Incase the reader is sleeping.
00251   while (trt_log->txn_haveNextTransaction() && !isSuspend() && self->myMustQuit)
00252     self->sleep(10);    
00253   exit_();
00254 }
00255 
00256 bool MSTransactionThread::doWork()
00257 {
00258   enter_();
00259   
00260   try_(a) {
00261     MSTransRec rec = {0,0,0,0,0,0,0};
00262     MS_TxnState state;
00263     while (!myMustQuit) {
00264       // This will sleep while waiting for the next 
00265       // completed transaction.
00266       trt_log->txn_GetNextTransaction(&rec, &state); 
00267       if (myMustQuit)
00268         break;
00269         
00270       if (rec.tr_db_id == 0) // The database was dropped.
00271         continue;
00272         
00273       if (state == MS_Committed){
00274         if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) 
00275           dereference(&rec, state);
00276         else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
00277           commitReference(&rec, state);
00278 
00279       } else if (state == MS_RolledBack) { 
00280         // There is nothing to do on rollback of a dereference.
00281         
00282         if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
00283           dereference(&rec, state);
00284           
00285       } else if (state == MS_Recovered) { 
00286         if ((TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) || (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn))
00287           reportLostReference(&rec, state); // Report these even though they may not be lost.
00288         
00289         // Because of the 2 phase commit issue with other engines I cannot
00290         // just roll back the transaction because it may have been committed 
00291         // on the master engine. So to be safe I will always err on the side
00292         // of having unreference BLOBs in the repository rather than risking
00293         // deleting a BLOB that was referenced. To this end I will commit references
00294         // while ignoring (rolling back) dereferences.
00295         if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn)  
00296           commitReference(&rec, state);
00297         
00298       }
00299     }
00300   }
00301   
00302   catch_(a) {
00303     self->logException();
00304     CSL.logLine(NULL, CSLog::Error, "!!!!!!!! THE PBMS TRANSACTION LOG READER DIED! !!!!!!!!!!!");
00305   }
00306   cont_(a);
00307   return_(true);
00308 }
00309 
00310 void *MSTransactionThread::completeWork()
00311 {
00312   close();
00313   
00314   if (trt_log)
00315     trt_log->release();
00316     
00317   if (trt_lostLog)
00318     trt_lostLog->release();
00319   return NULL;
00320 }
00321 
00322 /*
00323  * ---------------------------------------------------------------
00324  * The transaction log manager 
00325  */
00326 void MSTransactionManager::startUpReader()
00327 {
00328   char pbms_path[PATH_MAX];
00329   enter_();
00330   
00331   cs_strcpy(PATH_MAX, pbms_path, PBMSDaemon::getPBMSDir()); 
00332   cs_add_name_to_path(PATH_MAX, pbms_path, "ms-trans-log.dat");
00333   
00334   tm_Log = MSTrans::txn_NewMSTrans(pbms_path);
00335   new_(tm_Reader, MSTransactionThread(RETAIN(tm_Log)));
00336 
00337   tm_Reader->start();
00338   
00339   // Wait for the transaction reader to recover any old transaction:
00340   tm_Reader->flush();
00341     
00342   exit_();
00343 }
00344 
00345 void MSTransactionManager::startUp()
00346 {
00347   CSPath *path = NULL;
00348   enter_();
00349   
00350   // Do not start the reader if the pbms dir doesn't exist.
00351   path = CSPath::newPath(PBMSDaemon::getPBMSDir());
00352   push_(path);
00353   if (path->exists()) {
00354     startUpReader();
00355   }
00356   release_(path);
00357   
00358   exit_();
00359 }
00360 
00361 void MSTransactionManager::shutDown()
00362 {
00363   if (tm_Reader) {
00364     tm_Reader->stop();
00365     tm_Reader->release();
00366     tm_Reader = NULL;
00367   }
00368   if (tm_Log) {
00369     tm_Log->release();
00370     tm_Log = NULL;
00371   }
00372 }
00373 
00374 void MSTransactionManager::flush()
00375 {
00376   if (tm_Reader) 
00377     tm_Reader->flush();
00378 }
00379 
00380 void MSTransactionManager::suspend(bool do_flush)
00381 {
00382   enter_();
00383   
00384   if (do_flush) 
00385     flush();
00386     
00387   if (tm_Reader) {
00388     tm_Reader->suspend();
00389   } 
00390   exit_();
00391 }
00392 
00393 void MSTransactionManager::resume()
00394 {
00395   enter_();
00396   if (tm_Reader) {
00397     tm_Reader->resume();
00398   } 
00399   exit_();
00400 }
00401 
00402 void MSTransactionManager::commit()
00403 {
00404   enter_();
00405   
00406   if (!tm_Log)
00407     startUpReader();
00408     
00409   self->myStmtCount = 0;
00410   self->myStartStmt = 0;
00411 
00412   tm_Log->txn_LogTransaction(MS_CommitTxn);
00413   
00414 
00415   exit_();
00416 }
00417 
00418 void MSTransactionManager::rollback()
00419 {
00420   enter_();
00421   
00422   if (!tm_Log)
00423     startUpReader();
00424     
00425   self->myStmtCount = 0;
00426   self->myStartStmt = 0;
00427 
00428   tm_Log->txn_LogTransaction(MS_RollBackTxn);
00429 
00430   exit_();
00431 }
00432 
00433 class MSTransactionCheckPoint: public CSString
00434 {
00435   public:
00436   MSTransactionCheckPoint(const char *name, uint32_t stmtCount ):CSString(name)
00437   {
00438     position = stmtCount;
00439   }
00440   
00441   uint32_t position;
00442 };
00443 
00444 #ifdef DRIZZLED
00445 void MSTransactionManager::setSavepoint(const char *savePoint)
00446 {
00447   MSTransactionCheckPoint *checkPoint;
00448   enter_();
00449   
00450   new_(checkPoint, MSTransactionCheckPoint(savePoint, self->myStmtCount));
00451   
00452   push_(checkPoint);
00453   self->mySavePoints.add(checkPoint);
00454   pop_(checkPoint);
00455   
00456   exit_();
00457 }
00458 
00459 void MSTransactionManager::releaseSavepoint(const char *savePoint)
00460 {
00461   MSTransactionCheckPoint *checkPoint;
00462   CSString *name;
00463   enter_();
00464   
00465   name = CSString::newString(savePoint);
00466   push_(name);
00467 
00468   checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
00469   release_(name);
00470   
00471   if (checkPoint)     
00472     self->mySavePoints.remove(checkPoint);
00473     
00474   exit_();
00475 }
00476 
00477 void MSTransactionManager::rollbackTo(const char *savePoint)
00478 {
00479   MSTransactionCheckPoint *checkPoint;
00480   CSString *name;
00481   enter_();
00482   
00483   name = CSString::newString(savePoint);
00484   push_(name);
00485 
00486   checkPoint = (MSTransactionCheckPoint*) self->mySavePoints.find(name);
00487   release_(name);
00488   
00489   if (checkPoint) {
00490     uint32_t position = checkPoint->position;
00491     
00492     self->mySavePoints.remove(checkPoint);
00493     rollbackToPosition(position);
00494   }
00495     
00496   exit_();
00497 }
00498 #endif
00499 
00500 void MSTransactionManager::rollbackToPosition(uint32_t position)
00501 {
00502   enter_();
00503 
00504   ASSERT(self->myStmtCount > position);
00505   
00506   if (!tm_Log)
00507     startUpReader();
00508   tm_Log->txn_LogPartialRollBack(position);
00509   
00510   exit_();
00511 }
00512 
00513 void MSTransactionManager::dropDatabase(uint32_t db_id)
00514 {
00515   enter_();
00516 
00517   if (!tm_Log)
00518     startUpReader();
00519   
00520   tm_Log->txn_dropDatabase(db_id);
00521 
00522   exit_();
00523 }
00524 
00525 void MSTransactionManager::logTransaction(bool ref, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id)
00526 {
00527   enter_();
00528   
00529   if (!tm_Log)
00530     startUpReader();
00531 
00532   if (!self->myTID) {
00533     bool autocommit = false;
00534     autocommit = ms_is_autocommit();
00535 #ifndef DRIZZLED
00536     if (!autocommit)
00537       pbms_take_part_in_transaction(ms_my_get_thread());
00538 #endif
00539       
00540     self->myIsAutoCommit = autocommit;
00541   }
00542   
00543   // PBMS always explicitly commits
00544   tm_Log->txn_LogTransaction((ref)?MS_ReferenceTxn:MS_DereferenceTxn, false /*autocommit*/, db_id, tab_id, blob_id, blob_ref_id);
00545 
00546   self->myStmtCount++;
00547   
00548   exit_();
00549 }
00550 
00551