Drizzled Public API Documentation

database_ms.cc

00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-05-25
00023  *
00024  * H&G2JCtL
00025  *
00026  * Network interface.
00027  *
00028  */
00029  
00030 #ifdef DRIZZLED
00031 #include <config.h>
00032 #include <drizzled/common.h>
00033 #include <drizzled/session.h>
00034 #include <drizzled/table.h>
00035 #include <drizzled/message/table.pb.h>
00036 #include <drizzled/charset_info.h>
00037 #include <drizzled/table_proto.h>
00038 #include <drizzled/field.h>
00039 #endif
00040 
00041 #include "cslib/CSConfig.h"
00042 
00043 #include <string.h>
00044 #include <ctype.h>
00045 #include <inttypes.h>
00046 
00047 #include "string.h"
00048 
00049 #include "cslib/CSGlobal.h"
00050 #include "cslib/CSLog.h"
00051 #include "cslib/CSDirectory.h"
00052 #include "cslib/CSStrUtil.h"
00053 
00054 #include "database_ms.h"
00055 #include "open_table_ms.h"
00056 #include "backup_ms.h"
00057 #include "table_ms.h"
00058 #include "temp_log_ms.h"
00059 #include "network_ms.h"
00060 #include "mysql_ms.h"
00061 #include "pbmslib.h"
00062 #include "transaction_ms.h"
00063 //#include "systab_variable_ms.h"
00064 #include "systab_httpheader_ms.h"
00065 #include "parameters_ms.h"
00066 #include "pbmsdaemon_ms.h"
00067 
00068 
00069 
00070 CSSyncSortedList  *MSDatabase::gDatabaseList;
00071 CSSparseArray   *MSDatabase::gDatabaseArray;
00072 /*
00073  * -------------------------------------------------------------------------
00074  * PBMS DATABASES
00075  */
00076 
00077 MSDatabase::MSDatabase():
00078 myIsPBMS(false),
00079 myDatabaseID(0),
00080 myDatabaseName(NULL),
00081 myDatabasePath(NULL),
00082 myTempLogArray(NULL),
00083 myCompactorThread(NULL),
00084 myTempLogThread(NULL),
00085 myRepostoryList(NULL),
00086 myBlobCloud(NULL),
00087 myBlobType(MS_STANDARD_STORAGE),
00088 isBackup(false),
00089 iBackupThread(NULL),
00090 iBackupTime(0),
00091 iRecovering(false),
00092 #ifdef HAVE_ALIAS_SUPPORT
00093 iBlobAliases(NULL),
00094 #endif
00095 iClosing(false),
00096 iTableList(NULL),
00097 iTableArray(NULL),
00098 iMaxTableID(0),
00099 iWriteTempLog(NULL),
00100 iDropping(false),
00101 iNextBlobRefId(0)
00102 {
00103 //startTracking();
00104 }
00105 
00106 MSDatabase::~MSDatabase()
00107 {
00108   iClosing = true;
00109   if (iBackupThread) {
00110     iBackupThread->stop();
00111     iBackupThread->release();
00112     iBackupThread = NULL;
00113   }
00114   
00115   if (myTempLogThread) {
00116     myTempLogThread->stop();
00117     myTempLogThread->release();
00118     myTempLogThread = NULL;
00119   }
00120   if (myCompactorThread) {
00121     myRepostoryList->wakeup(); // The compator thread waits on this.
00122     myCompactorThread->stop();
00123     myCompactorThread->release();
00124     myCompactorThread = NULL;
00125   }
00126   
00127   if (myDatabasePath)
00128     myDatabasePath->release();
00129     
00130   if (myDatabaseName)
00131     myDatabaseName->release();
00132     
00133   iWriteTempLog = NULL;
00134   if (myTempLogArray) {
00135     myTempLogArray->clear();
00136     myTempLogArray->release();
00137   }
00138   if (iTableList) {
00139     iTableList->clear();
00140     iTableList->release();
00141   }
00142   if (iTableArray){
00143     iTableArray->clear();
00144     iTableArray->release();
00145   }
00146   if (myRepostoryList) {
00147     myRepostoryList->clear();
00148     myRepostoryList->release();
00149   }
00150 #ifdef HAVE_ALIAS_SUPPORT
00151   if (iBlobAliases) {
00152     iBlobAliases->ma_close();
00153     iBlobAliases->release();
00154   }
00155 #endif
00156   if (myBlobCloud) {
00157     myBlobCloud->release();
00158   }
00159 }
00160 
00161 uint32_t MSDatabase::fileToTableId(const char *file_name, const char *name_part)
00162 {
00163   uint32_t value = 0;
00164 
00165   if (file_name) {
00166     const char *num = file_name +  strlen(file_name) - 1;
00167     
00168     while (num >= file_name && *num != '-')
00169       num--;
00170     if (name_part) {
00171       /* Check the name part of the file: */
00172       int len = strlen(name_part);
00173       
00174       if (len != num - file_name)
00175         return 0;
00176       if (strncmp(file_name, name_part, len) != 0)
00177         return 0;
00178     }
00179     num++;
00180     if (isdigit(*num))
00181       sscanf(num, "%"PRIu32"", &value);
00182   }
00183   return value;
00184 }
00185 
00186 const char *MSDatabase::fileToTableName(size_t size, char *tab_name, const char *file_name)
00187 {
00188   const char  *cptr;
00189   size_t    len;
00190 
00191   file_name = cs_last_name_of_path(file_name);
00192   cptr = file_name + strlen(file_name) - 1;
00193   while (cptr > file_name && *cptr != '.')
00194     cptr--;
00195   if (cptr > file_name && *cptr == '.') {
00196     if (strncmp(cptr, ".bs", 2) == 0) {
00197       cptr--;
00198       while (cptr > file_name && isdigit(*cptr))
00199         cptr--;
00200     }
00201   }
00202 
00203   len = cptr - file_name;
00204   if (len > size-1)
00205     len = size-1;
00206 
00207   memcpy(tab_name, file_name, len);
00208   tab_name[len] = 0;
00209 
00210   /* Return a pointer to what was removed! */
00211   return file_name + len;
00212 }
00213 
00214 
00215 const char *MSDatabase::getDatabaseNameCString()
00216 {
00217   return myDatabaseName->getCString();
00218 }
00219 
00220 MSTable *MSDatabase::getTable(CSString *tab_name, bool create)
00221 {
00222   MSTable *tab;
00223   
00224   enter_();
00225   push_(tab_name);
00226   lock_(iTableList);
00227   if (!(tab = (MSTable *) iTableList->find(tab_name))) {
00228 
00229     if (create) {
00230       /* Create a new table: */
00231       tab = MSTable::newTable(iMaxTableID+1, RETAIN(tab_name), this, (off64_t) 0, false);
00232       iTableList->add(tab);
00233       iTableArray->set(iMaxTableID+1, RETAIN(tab));
00234       iMaxTableID++;
00235     }
00236   }
00237   if (tab)
00238     tab->retain();
00239   unlock_(iTableList);
00240   release_(tab_name);
00241   return_(tab);
00242 }
00243 
00244 MSTable *MSDatabase::getTable(const char *tab_name, bool create)
00245 {
00246   return getTable(CSString::newString(tab_name), create);
00247 }
00248 
00249 
00250 MSTable *MSDatabase::getTable(uint32_t tab_id, bool missing_ok)
00251 {
00252   MSTable *tab;
00253   
00254   enter_();
00255   lock_(iTableList);
00256   if (!(tab = (MSTable *) iTableArray->get((uint32_t) tab_id))) {
00257     if (missing_ok) {
00258       unlock_(iTableList);
00259       return_(NULL);
00260     }
00261     char buffer[CS_EXC_MESSAGE_SIZE];
00262 
00263     cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown table #");
00264     cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) tab_id);
00265     cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, " in database ");
00266     cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, getDatabaseNameCString());
00267     CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_TABLE, buffer);
00268   }
00269   tab->retain();
00270   unlock_(iTableList);
00271   return_(tab);
00272 }
00273 
00274 MSTable *MSDatabase::getNextTable(uint32_t *pos)
00275 {
00276   uint32_t i = *pos;
00277   MSTable *tab = NULL;
00278   
00279   enter_();
00280   lock_(iTableList);
00281   while (i < iTableList->getSize()) {
00282     tab = (MSTable *) iTableList->itemAt(i++);
00283     if (!tab->isToDelete())
00284       break;
00285     tab = NULL;
00286   }
00287   if (tab)
00288     tab->retain();
00289   unlock_(iTableList);
00290   *pos = i;
00291   return_(tab);
00292 }
00293 
00294 void MSDatabase::addTable(uint32_t tab_id, const char *tab_name, off64_t file_size, bool to_delete)
00295 {
00296   MSTable *tab;
00297 
00298   if (tab_id > iMaxTableID)
00299     iMaxTableID = tab_id;
00300   tab = MSTable::newTable(tab_id, tab_name, this, file_size, to_delete);
00301   iTableList->add(tab);
00302   iTableArray->set(tab_id, RETAIN(tab));
00303 }
00304 
00305 void MSDatabase::addTableFromFile(CSDirectory *dir, const char *file_name, bool to_delete)
00306 {
00307   off64_t file_size;
00308   uint32_t  file_id;
00309   char  tab_name[MS_TABLE_NAME_SIZE];
00310 
00311   dir->info(NULL, &file_size, NULL);
00312   file_id = fileToTableId(file_name);
00313   fileToTableName(MS_TABLE_NAME_SIZE, tab_name, file_name);
00314   addTable(file_id, tab_name, file_size, to_delete);
00315 }
00316 
00317 void MSDatabase::removeTable(MSTable *tab)
00318 {
00319   enter_();
00320   push_(tab);
00321   lock_(iTableList);
00322   iTableList->remove(tab->myTableName);
00323   iTableArray->remove(tab->myTableID);
00324   unlock_(iTableList);
00325   release_(tab);
00326   exit_();
00327 }
00328 
00329 void MSDatabase::dropTable(MSTable *tab)
00330 {
00331   enter_();
00332   push_(tab);
00333   lock_(iTableList);
00334   iTableList->remove(tab->myTableName);
00335   iTableArray->remove(tab->myTableID);
00336 
00337   // Cute: you drop the table by adding it with the 'to_delete' flag set to 'true'
00338   addTable(tab->myTableID, tab->myTableName->getCString(), tab->getTableFileSize(), true);
00339 
00340   unlock_(iTableList);
00341   release_(tab);
00342   exit_();
00343 }
00344 
00345 // This function is used when dropping tables from a database before
00346 // dropping the database itself. 
00347 CSString *MSDatabase::getATableName()
00348 {
00349   uint32_t i = 0;
00350   MSTable *tab;
00351   CSString *name = NULL;
00352   
00353   enter_();
00354   lock_(iTableList);
00355 
00356   while ((tab = (MSTable *) iTableList->itemAt(i++)) && tab->isToDelete()) ;
00357   if (tab) {
00358     name = tab->getTableName();
00359     name->retain();
00360   }
00361   unlock_(iTableList);
00362   return_(name);
00363 }
00364 
00365 uint32_t MSDatabase::getTableCount()
00366 {
00367   uint32_t cnt = 0, i = 0;
00368   MSTable *tab;
00369   
00370   enter_();
00371   lock_(iTableList);
00372 
00373   while ((tab = (MSTable *) iTableList->itemAt(i++))) {
00374     if (!tab->isToDelete())
00375       cnt++;
00376   }
00377 
00378   unlock_(iTableList);
00379   return_(cnt);
00380 }
00381 
00382 
00383 void MSDatabase::renameTable(MSTable *tab, const char *to_name)
00384 {
00385   enter_();
00386   lock_(iTableList);
00387   iTableList->remove(tab->myTableName);
00388   iTableArray->remove(tab->myTableID);
00389 
00390   addTable(tab->myTableID, to_name, tab->getTableFileSize(), false);
00391 
00392   unlock_(iTableList);
00393   exit_();
00394 }
00395 
00396 void MSDatabase::openWriteRepo(MSOpenTable *otab)
00397 {
00398   if (otab->myWriteRepo && otab->myWriteRepoFile)
00399     return;
00400 
00401   enter_();
00402   if (!otab->myWriteRepo)
00403     otab->myWriteRepo = lockRepo(0);
00404 
00405   /* Now open the repo file for the open table: */
00406   otab->myWriteRepo->openRepoFileForWriting(otab);
00407   exit_();
00408 }
00409 
00410 MSRepository *MSDatabase::getRepoFullOfTrash(time_t *ret_wait_time)
00411 {
00412   MSRepository  *repo = NULL;
00413   time_t      wait_time = 0;
00414   
00415   if (ret_wait_time)
00416     wait_time = *ret_wait_time;
00417   enter_();
00418   lock_(myRepostoryList);
00419   for (uint32_t i=0; i<myRepostoryList->size(); i++) {
00420     retry:
00421     if ((repo = (MSRepository *) myRepostoryList->get(i))) {
00422       if (!repo->isRemovingFP && !repo->mustBeDeleted && !repo->isRepoLocked()) {
00423         if (!repo->myRepoHeadSize) {
00424           /* The file has not yet been opened, so the
00425            * garbage count will not be known!
00426            */
00427           MSRepoFile *repo_file;
00428 
00429           repo->retain();
00430           unlock_(myRepostoryList);
00431           push_(repo);
00432           repo_file = repo->openRepoFile();
00433           repo_file->release();
00434           release_(repo);
00435           lock_(myRepostoryList);
00436           goto retry;
00437         }
00438         if (repo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
00439           /* Make sure there are not temp BLOBs in this repository that have
00440            * not yet timed out:
00441            */
00442           time_t now = time(NULL);
00443           time_t then = repo->myLastTempTime;
00444 
00445           /* Check if there are any temp BLOBs to be removed: */
00446           if (now > (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
00447             repo->lockRepo(REPO_COMPACTING); 
00448             repo->retain();
00449             break;
00450           }
00451           else {
00452             /* There are temp BLOBs to wait for... */
00453             if (!wait_time || wait_time > MSTempLog::adjustWaitTime(then, now))
00454               wait_time = MSTempLog::adjustWaitTime(then, now);
00455           }
00456         }
00457       }
00458       repo = NULL;
00459     }
00460   }
00461   unlock_(myRepostoryList);
00462   if (ret_wait_time)
00463     *ret_wait_time = wait_time;
00464   return_(repo);
00465 }
00466 
00467 MSRepository *MSDatabase::lockRepo(off64_t size)
00468 {
00469   MSRepository  *repo;
00470   uint32_t      free_slot;
00471 
00472   enter_();
00473   lock_(myRepostoryList);
00474   free_slot = myRepostoryList->size();
00475   /* Find an unlocked repository file that is below the write threshold: */
00476   for (uint32_t i=0; i<myRepostoryList->size(); i++) {
00477     if ((repo = (MSRepository *) myRepostoryList->get(i))) {
00478       if ((!repo->isRepoLocked()) && (!repo->isRemovingFP) && (!repo->mustBeDeleted) &&
00479         ((repo->myRepoFileSize + size) < PBMSParameters::getRepoThreshold()) 
00480          && (repo->getGarbageLevel() < PBMSParameters::getGarbageThreshold()))
00481         goto found1;
00482     }
00483     else {
00484       if (i < free_slot)
00485         free_slot = i;
00486     }
00487   }
00488 
00489   /* None found, create a new repo file: */
00490   new_(repo, MSRepository(free_slot + 1, this, 0));
00491   myRepostoryList->set(free_slot, repo);
00492 
00493   found1:
00494   repo->retain();
00495   repo->lockRepo(REPO_WRITE);  // <- The MSRepository::backToPool() will unlock this.
00496   unlock_(myRepostoryList);
00497   return_(repo);
00498 }
00499 
00500 MSRepoFile *MSDatabase::getRepoFileFromPool(uint32_t repo_id, bool missing_ok)
00501 {
00502   MSRepository  *repo;
00503   MSRepoFile    *file;
00504 
00505   enter_();
00506   lock_(myRepostoryList);
00507   if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1))) {
00508     if (!missing_ok) {
00509       char buffer[CS_EXC_MESSAGE_SIZE];
00510 
00511       cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown repository file: ");
00512       cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
00513       CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, buffer);
00514     }
00515     unlock_(myRepostoryList);
00516     return_(NULL);
00517   }
00518   if (repo->isRemovingFP) {
00519     char buffer[CS_EXC_MESSAGE_SIZE];
00520 
00521     cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Repository will be removed: ");
00522     cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) repo_id);
00523     CSException::throwException(CS_CONTEXT, MS_ERR_REMOVING_REPO, buffer);
00524   }
00525   repo->retain(); /* Release is here: [++] */
00526   file = repo->getRepoFile();
00527   unlock_(myRepostoryList);
00528 
00529   if (!file) {
00530     file = repo->openRepoFile();
00531     lock_(myRepostoryList);
00532     repo->addRepoFile(RETAIN(file));
00533     unlock_(myRepostoryList);
00534   }
00535   return_(file);
00536 }
00537 
00538 void MSDatabase::returnRepoFileToPool(MSRepoFile *file)
00539 {
00540   MSRepository  *repo;
00541 
00542   enter_();
00543   lock_(myRepostoryList);
00544   push_(file);
00545   if ((repo = file->myRepo)) {
00546     if (repo->isRemovingFP) {
00547       repo->removeRepoFile(file); // No retain expected
00548       myRepostoryList->wakeup();
00549     }
00550     else
00551       repo->returnRepoFile(file); // No retain expected
00552     repo->release(); /* [++] here is the release.  */
00553   }
00554   release_(file);
00555   unlock_(myRepostoryList);
00556   exit_();
00557 }
00558 
00559 void MSDatabase::removeRepo(uint32_t repo_id, bool *mustQuit)
00560 {
00561   MSRepository *repo;
00562 
00563   enter_();
00564   lock_(myRepostoryList);
00565   while ((!mustQuit || !*mustQuit) && !iClosing) {
00566     if (!(repo = (MSRepository *) myRepostoryList->get(repo_id - 1)))
00567       break;
00568     repo->isRemovingFP = true;
00569     if (repo->removeRepoFilesNotInUse()) {
00570       myRepostoryList->set(repo_id - 1, NULL);
00571       break;
00572     }
00573     /*
00574      * Wait for the files that are in use to be
00575      * freed.
00576      */
00577     myRepostoryList->wait();
00578   }
00579   unlock_(myRepostoryList);
00580   exit_();
00581 }
00582 
00583 void MSDatabase::queueTempLogEvent(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
00584    uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time)
00585 {
00586   MSTempLogItemRec  item;
00587   uint32_t        timev;
00588 
00589   // Each otab object holds an handle to an instance of an OPEN
00590   // temp log. This is so that each thread has it's own open temp log 
00591   // and doesn't need to be opened and close it constantly.
00592   
00593   enter_();
00594   lock_(myTempLogArray);
00595   if (!iWriteTempLog) {
00596     iWriteTempLog = (MSTempLog *) myTempLogArray->last();
00597     if (!iWriteTempLog) {
00598       new_(iWriteTempLog, MSTempLog(1, this, 0));
00599       myTempLogArray->set(1, iWriteTempLog);
00600     }
00601   }
00602   if (!otab->myTempLogFile)
00603     otab->myTempLogFile = iWriteTempLog->openTempLog();
00604   else if (otab->myTempLogFile->myTempLogID != iWriteTempLog->myLogID) {
00605     otab->myTempLogFile->release();
00606     otab->myTempLogFile = NULL;
00607     otab->myTempLogFile = iWriteTempLog->openTempLog();
00608   }
00609 
00610   if (iWriteTempLog->myTempLogSize >= PBMSParameters::getTempLogThreshold()) {
00611     uint32_t tmp_log_id = iWriteTempLog->myLogID + 1;
00612 
00613     new_(iWriteTempLog, MSTempLog(tmp_log_id, this, 0));
00614     myTempLogArray->set(tmp_log_id, iWriteTempLog);
00615 
00616     otab->myTempLogFile->release();
00617     otab->myTempLogFile = NULL;
00618     otab->myTempLogFile = iWriteTempLog->openTempLog();
00619 
00620   }
00621 
00622   timev = time(NULL);
00623   *log_id = iWriteTempLog->myLogID;
00624   *log_offset = (uint32_t) iWriteTempLog->myTempLogSize;
00625   if (q_time)
00626     *q_time = timev;
00627   iWriteTempLog->myTempLogSize += iWriteTempLog->myTemplogRecSize;
00628   unlock_(myTempLogArray);
00629 
00630   CS_SET_DISK_1(item.ti_type_1, type);
00631   CS_SET_DISK_4(item.ti_table_id_4, tab_id);
00632   CS_SET_DISK_6(item.ti_blob_id_6, blob_id);
00633   CS_SET_DISK_4(item.ti_auth_code_4, auth_code);
00634   CS_SET_DISK_4(item.ti_time_4, timev);
00635   otab->myTempLogFile->write(&item, *log_offset, sizeof(MSTempLogItemRec));
00636   
00637   
00638   exit_();
00639 }
00640 
00641 #ifdef HAVE_ALIAS_SUPPORT
00642 void MSDatabase::queueForDeletion(MSOpenTable *otab, int type, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code,
00643    uint32_t *log_id, uint32_t *log_offset, uint32_t *q_time, MSDiskAliasPtr aliasDiskRec)
00644 {
00645   enter_();
00646   
00647   queueTempLogEvent(otab, type, tab_id, blob_id, auth_code, log_id, log_offset, q_time);
00648     
00649   // If it has an alias remove it from the ailias index.
00650   if (aliasDiskRec) {
00651     try_(a) {
00652       deleteBlobAlias(aliasDiskRec);
00653     }
00654     catch_(a);
00655     self->logException();
00656     cont_(a);
00657   }
00658   
00659   exit_();
00660 }
00661 #endif
00662 
00663 MSTempLogFile *MSDatabase::openTempLogFile(uint32_t log_id, size_t *log_rec_size, size_t *log_head_size)
00664 {
00665   MSTempLog   *log;
00666   MSTempLogFile *log_file = NULL;
00667 
00668   enter_();
00669   lock_(myTempLogArray);
00670   if (log_id)
00671     log = (MSTempLog *) myTempLogArray->get(log_id);
00672   else
00673     log = (MSTempLog *) myTempLogArray->first();
00674   if (log) {
00675     log_file = log->openTempLog();
00676     if (log_rec_size)
00677       *log_rec_size = log->myTemplogRecSize;
00678     if (log_head_size)
00679       *log_head_size = log->myTempLogHeadSize;
00680   }
00681   unlock_(myTempLogArray);
00682   return_(log_file);
00683 }
00684 
00685 uint32_t MSDatabase::getTempLogCount()
00686 {
00687   uint32_t count;
00688 
00689   enter_();
00690   lock_(myTempLogArray);
00691   count = myTempLogArray->size();
00692   unlock_(myTempLogArray);
00693   return_(count);
00694 }
00695 
00696 void MSDatabase::removeTempLog(uint32_t log_id)
00697 {
00698   enter_();
00699   lock_(myTempLogArray);
00700   myTempLogArray->remove(log_id);
00701   unlock_(myTempLogArray);
00702   exit_();
00703 }
00704 
00705 CSObject *MSDatabase::getKey()
00706 {
00707   return (CSObject *) myDatabaseName;
00708 }
00709 
00710 int MSDatabase::compareKey(CSObject *key)
00711 {
00712   return myDatabaseName->compare((CSString *) key);
00713 }
00714 
00715 MSCompactorThread *MSDatabase::getCompactorThread()
00716 {
00717   return myCompactorThread;
00718 }
00719 
00720 CSSyncVector *MSDatabase::getRepositoryList()
00721 { 
00722   return myRepostoryList;
00723 }
00724 
00725 #ifdef HAVE_ALIAS_SUPPORT
00726 uint32_t MSDatabase::registerBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
00727 {
00728   uint32_t hash;
00729   bool can_retry = true;
00730   enter_();
00731   
00732 retry:
00733   lock_(&iBlobAliaseLock);
00734   
00735   try_(a) {
00736     hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
00737   }
00738   
00739   catch_(a) {
00740     unlock_(&iBlobAliaseLock);
00741     if (can_retry) {
00742       // It can be that a duplicater alias exists that was deleted
00743       // but the transaction has not been written to the repository yet.
00744       // Flush all committed transactions to the repository file.
00745       MSTransactionManager::flush();
00746       can_retry = false;
00747       goto retry;
00748     }
00749     throw_();
00750   }
00751   
00752   cont_(a);
00753   unlock_(&iBlobAliaseLock);
00754   return_(hash);
00755 }
00756 
00757 uint32_t MSDatabase::updateBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t old_alias_hash, const char *alias)
00758 {
00759   uint32_t new_hash;
00760   enter_();
00761   lock_(&iBlobAliaseLock);
00762   
00763   new_hash = iBlobAliases->addAlias(repo_id, repo_offset, alias);
00764   iBlobAliases->deleteAlias(repo_id, repo_offset, old_alias_hash);
00765 
00766   unlock_(&iBlobAliaseLock);
00767   return_(new_hash);
00768 }
00769 
00770 void MSDatabase::deleteBlobAlias(MSDiskAliasPtr diskRec)
00771 {
00772   enter_();
00773   lock_(&iBlobAliaseLock);
00774   iBlobAliases->deleteAlias(diskRec);
00775   unlock_(&iBlobAliaseLock);
00776   exit_();
00777 }
00778 
00779 void MSDatabase::deleteBlobAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
00780 {
00781   MSDiskAliasRec diskRec;
00782   
00783   CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id); 
00784   CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);  
00785   CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash);
00786   deleteBlobAlias(&diskRec);
00787 }
00788 
00789 void MSDatabase::moveBlobAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
00790 {
00791   enter_();
00792   lock_(&iBlobAliaseLock);
00793   iBlobAliases->resetAlias(old_repo_id, old_repo_offset, alias_hash, new_repo_id, new_repo_offset);
00794   unlock_(&iBlobAliaseLock);
00795   exit_();
00796 }
00797 #endif
00798 
00799 bool MSDatabase::isValidHeaderField(const char *name)
00800 {
00801   bool is_valid = false;
00802   CSString    *header;
00803   enter_();
00804 
00805   if (name && *name) {
00806     if (strcasecmp(name, MS_ALIAS_TAG)) {
00807       lock_(&iHTTPMetaDataHeaders);
00808       header = CSString::newString(name);
00809       push_(header);
00810         
00811       is_valid = (iHTTPMetaDataHeaders.find(header) != NULL);
00812       release_(header);
00813       
00814       unlock_(&iHTTPMetaDataHeaders);
00815     } else 
00816       is_valid = true;
00817   }
00818   
00819   return_(is_valid);
00820 }
00821 
00822 void MSDatabase::startUp(const char *default_http_headers)
00823 {
00824   enter_();
00825   
00826   new_(gDatabaseList, CSSyncSortedList);
00827   new_(gDatabaseArray, CSSparseArray(5));
00828   MSHTTPHeaderTable::setDefaultMetaDataHeaders(default_http_headers);
00829   PBMSSystemTables::systemTablesStartUp();
00830   PBMSParameters::setBackupDatabaseID(1);
00831   exit_();
00832 }
00833 
00834 void MSDatabase::stopThreads()
00835 {
00836   MSDatabase *db;
00837 
00838   enter_();
00839   if (gDatabaseList) {
00840     lock_(gDatabaseList);
00841     for (int i=0;;i++) {
00842       if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
00843         break;
00844       db->iClosing = true;
00845       
00846       if (db->myTempLogThread) {
00847         db->myTempLogThread->stop();
00848         db->myTempLogThread->release();
00849         db->myTempLogThread = NULL;
00850       }
00851       if (db->myCompactorThread) {
00852         db->myRepostoryList->wakeup(); // The compator thread waits on this.
00853         db->myCompactorThread->stop();
00854         db->myCompactorThread->release();
00855         db->myCompactorThread = NULL;
00856       }
00857       
00858       if (db->iBackupThread) {
00859         db->iBackupThread->stop();
00860         db->iBackupThread->release();
00861         db->iBackupThread = NULL;
00862       }
00863   
00864     }
00865     
00866     unlock_(gDatabaseList);
00867   }
00868   exit_();
00869 }
00870 
00871 void MSDatabase::shutDown()
00872 {
00873     
00874   if (gDatabaseArray) {
00875     gDatabaseArray->clear();
00876     gDatabaseArray->release();
00877     gDatabaseArray = NULL;
00878   }
00879   
00880   if (gDatabaseList) {
00881     gDatabaseList->clear();
00882     gDatabaseList->release();
00883     gDatabaseList = NULL;
00884   }
00885   
00886   MSHTTPHeaderTable::releaseDefaultMetaDataHeaders();
00887   PBMSSystemTables::systemTableShutDown();
00888 }
00889 
00890 void MSDatabase::setBackupDatabase()
00891 {
00892   enter_();
00893   // I need to give the backup database a unique fake database ID.
00894   // This is so that it is not confused with the database being backed
00895   // backed up when opening tables.
00896   
00897   // Normally database IDs are generated by time(NULL) so small database IDs
00898   // are safe to use as fake IDs.
00899    
00900   lock_(gDatabaseList);
00901   myDatabaseID = PBMSParameters::getBackupDatabaseID() +1;
00902   PBMSParameters::setBackupDatabaseID(myDatabaseID);
00903   gDatabaseArray->set(myDatabaseID, RETAIN(this));
00904   isBackup = true;
00905   
00906   // Notify the cloud storage, if any, that it is a backup.
00907   // This is important because if the backup database is dropped
00908   // we need to be sure that only the BLOBs belonging to the
00909   // backup are removed from the cloud.
00910   myBlobCloud->cl_setCloudIsBackup(); 
00911   
00912   unlock_(gDatabaseList);
00913   
00914   // Rename the database path so that it is obviouse that this is an incomplete backup database.
00915   // When the backup is completed it will be renamed back.
00916   CSPath *new_path = CSPath::newPath(myDatabasePath->concat("#"));
00917   push_(new_path);
00918   
00919   if (new_path->exists()) 
00920     new_path->remove();
00921   
00922   CSPath *db_path = CSPath::newPath(RETAIN(myDatabasePath));
00923   push_(db_path);
00924   
00925   db_path->rename(new_path->getNameCString());
00926   myDatabasePath->release();
00927   myDatabasePath = new_path->getString();
00928   myDatabasePath->retain();
00929   
00930   release_(db_path);
00931   release_(new_path);
00932   
00933   
00934   exit_();
00935 }
00936 
00937 void MSDatabase::releaseBackupDatabase()
00938 {
00939   enter_();
00940 
00941   
00942   // The backup has completed succefully, rename the path to the correct name.
00943   CSPath *db_path = CSPath::newPath(myDatabasePath->getCString());
00944   push_(db_path);
00945   
00946   myDatabasePath->setLength(myDatabasePath->length()-1);
00947   db_path->rename(cs_last_name_of_path(myDatabasePath->getCString()));
00948   release_(db_path);
00949   
00950   // Remove the backup database object.
00951   lock_(gDatabaseList);
00952   gDatabaseArray->remove(myDatabaseID);
00953   MSTableList::removeDatabaseTables(this); // Will also release the database object.
00954   unlock_(gDatabaseList);
00955   
00956   
00957   exit_();
00958 }
00959 
00960 void MSDatabase::startBackup(MSBackupInfo *backup_info)
00961 {
00962   enter_();
00963 
00964   push_(backup_info);
00965   if (iBackupThread) {
00966     if (iBackupThread->isRunning()) {
00967       CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, "A backup is still running.");
00968     }
00969     iBackupThread->release();
00970     iBackupThread = NULL;
00971   }
00972   
00973   pop_(backup_info);
00974   iBackupThread = MSBackup::newMSBackup(backup_info);
00975   
00976   try_(a) {
00977     iBackupThread->startBackup(RETAIN(this));
00978   }
00979   
00980   catch_(a) {
00981     iBackupThread->release();
00982     iBackupThread = NULL;
00983     throw_();
00984   }
00985   cont_(a);
00986   
00987   exit_();
00988 }
00989 
00990 bool MSDatabase::backupStatus(uint64_t *total, uint64_t *completed, bool *completed_ok)
00991 {
00992   bool done;
00993   
00994   enter_();
00995   
00996   if (iBackupThread) {
00997     *total = iBackupThread->getBackupSize();
00998     *completed = iBackupThread->getBackupCompletedSize();
00999     done = !iBackupThread->isRunning();
01000     *completed = (iBackupThread->getStatus() == 0);
01001   } else {
01002     *completed_ok = done = true;
01003     *total = *completed = 0;      
01004   }
01005     
01006   return_(done);
01007 }
01008 
01009 uint32_t MSDatabase::backupID()
01010 { 
01011   return (iBackupThread)?iBackupThread->backupID(): 0;
01012 }
01013 
01014 void MSDatabase::terminateBackup()
01015 {
01016   if (iBackupThread) {
01017     iBackupThread->stop();
01018     iBackupThread->release();
01019     iBackupThread = NULL;
01020   }
01021 }
01022 
01023 MSDatabase *MSDatabase::getDatabase(CSString *db_name, bool create)
01024 {
01025   MSDatabase *db;
01026   enter_();
01027   push_(db_name);
01028   
01029   
01030   lock_(gDatabaseList);
01031   if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
01032     db = MSDatabase::loadDatabase(RETAIN(db_name), create);
01033     if (!db)
01034       goto exit;
01035   } else
01036     db->retain();
01037   
01038   exit:
01039   unlock_(gDatabaseList);
01040   release_(db_name);
01041   return_(db);
01042 }
01043 
01044 MSDatabase *MSDatabase::getDatabase(const char *db_name, bool create)
01045 {
01046   return getDatabase(CSString::newString(db_name), create);
01047 }
01048 
01049 MSDatabase *MSDatabase::getDatabase(uint32_t db_id, bool missing_ok)
01050 {
01051   MSDatabase *db;
01052   
01053   enter_();
01054   lock_(gDatabaseList);
01055   if ((db = (MSDatabase *) gDatabaseArray->get((uint32_t) db_id))) 
01056     db->retain();
01057   else {
01058     // Look for the database folder with the correct ID:
01059     CSPath *path = CSPath::newPath(PBMSDaemon::getPBMSDir());
01060     push_(path);
01061     if (path->exists()) {
01062       CSDirectory *dir;
01063       dir = CSDirectory::newDirectory(RETAIN(path));
01064       push_(dir);
01065       dir->open();
01066       
01067       while (dir->next() && !db) {
01068         if (!dir->isFile()) {
01069           const char *ptr, *dir_name  = dir->name();
01070           ptr = dir_name + strlen(dir_name) -1;
01071           
01072           while (ptr > dir_name && *ptr != '-') ptr--;
01073           
01074           if (*ptr ==  '-') {
01075             int len = ptr - dir_name;
01076             ptr++;
01077             if ((strtoul(ptr, NULL, 10) == db_id) && len) {
01078               db = getDatabase(CSString::newString(dir_name, len), true);
01079               ASSERT(db->myDatabaseID == db_id);
01080             }
01081           }
01082         }
01083       }
01084       release_(dir);
01085     }
01086     release_(path);   
01087   }
01088   unlock_(gDatabaseList);
01089   
01090   if ((!db) && !missing_ok) {
01091     char buffer[CS_EXC_MESSAGE_SIZE];
01092 
01093     cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Unknown database #");
01094     cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, (uint32_t) db_id);
01095     CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, buffer);
01096   }
01097   return_(db);
01098 }
01099 
01100 void MSDatabase::wakeTempLogThreads()
01101 {
01102   MSDatabase *db;
01103 
01104   if (!gDatabaseList)
01105     return;
01106   enter_();
01107   lock_(gDatabaseList);
01108   for (int i=0;;i++) {
01109     if (!(db = (MSDatabase *) gDatabaseList->itemAt(i)))
01110       break;
01111     if (db->myTempLogThread)
01112       db->myTempLogThread->wakeup();
01113   }
01114   unlock_(gDatabaseList);
01115   exit_();
01116 }
01117 
01118 uint32_t MSDatabase::getDBID(CSPath *path, CSString *db_name)
01119 {
01120   CSDirectory   *dir;
01121   uint32_t      db_id = 0;
01122   int       len = db_name->length();
01123   const char    *ptr;
01124   
01125   enter_();
01126   push_(db_name);
01127   push_(path);
01128   
01129   // Search for the ID of the database
01130   dir = CSDirectory::newDirectory(RETAIN(path));
01131   push_(dir);
01132   dir->open();
01133   while (dir->next() && !db_id) 
01134     {
01135     if (!dir->isFile()){
01136       ptr = dir->name() + strlen(dir->name()) -1;
01137       while (ptr > dir->name() && isdigit(*ptr)) ptr--;
01138       if ((*ptr == '-') && (len == (ptr - dir->name())) && !db_name->compare(dir->name(), len) ) {
01139         db_id = atol(ptr+1);        
01140       }
01141     }
01142   }
01143   release_(dir);
01144   
01145   if (!db_id) {
01146     db_id = time(NULL);
01147 
01148     while (1) { // search for a unique db_id
01149       dir = CSDirectory::newDirectory(RETAIN(path));
01150       push_(dir);
01151       dir->open();
01152       while (db_id && dir->next()) {
01153         if (!dir->isFile()) {
01154           ptr = dir->name() + strlen(dir->name()) -1;
01155           while (ptr > dir->name() && isdigit(*ptr)) ptr--;
01156           if ((*ptr == '-') && (db_id == strtoul(ptr+1, NULL, 10))) {
01157             db_id = 0;        
01158           }
01159         }
01160       }
01161       release_(dir);
01162       if (db_id)
01163         break;
01164       sleep(1); // Allow 1 second to pass.
01165       db_id = time(NULL);
01166     } 
01167   }
01168   
01169   release_(path);
01170   release_(db_name);
01171   return_(db_id);
01172 }
01173 
01174 CSPath *MSDatabase::createDatabasePath(const char *location, CSString *db_name, uint32_t *db_id_ptr, bool *create, bool is_pbms)
01175 {
01176   bool create_path = *create; 
01177   CSPath *path = NULL;
01178   char name_buffer[MS_DATABASE_NAME_SIZE + 40];
01179   uint32_t db_id;
01180   enter_();
01181   
01182   push_(db_name);
01183   *create = false;
01184   path = CSPath::newPath(location, "pbms");
01185   push_(path);
01186   if (!path->exists()) {
01187     if (!create_path){
01188       release_(path);
01189       path = NULL;
01190       goto done;
01191     }
01192       
01193     *create = true;
01194     path->makeDir();
01195   }
01196 
01197   // If this is the pbms database then nothing more is to be done.
01198   if (is_pbms)
01199     goto done;
01200   
01201   if ((!db_id_ptr) || !*db_id_ptr) {
01202     db_id = getDBID(RETAIN(path), RETAIN(db_name));
01203     if (db_id_ptr)
01204       *db_id_ptr = db_id;
01205   } else
01206     db_id = *db_id_ptr;
01207 
01208   // Create the PBMS database name with ID
01209   cs_strcpy(MS_DATABASE_NAME_SIZE + 40, name_buffer, db_name->getCString());
01210   cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, "-");
01211   cs_strcat(MS_DATABASE_NAME_SIZE + 40, name_buffer, (uint32_t) db_id);
01212       
01213   pop_(path);
01214   path = CSPath::newPath(path, name_buffer);
01215   push_(path);
01216   if (!path->exists()) {
01217     if (create_path) {
01218       *create = true;
01219       path->makeDir();
01220     } else {
01221       release_(path);
01222       path = NULL;
01223     }
01224       
01225   }
01226   
01227 done:
01228   if (path)
01229     pop_(path);
01230   release_(db_name);
01231   return_(path);
01232 }
01233 
01234 
01235 
01236 MSDatabase *MSDatabase::newDatabase(const char *db_location, CSString *db_name, uint32_t  db_id, bool create)
01237 {
01238   MSDatabase    *db = NULL;
01239   CSDirectory   *dir;
01240   MSRepository  *repo;
01241   CSPath      *path;
01242   const char    *file_name;
01243   uint32_t    file_id;
01244   off64_t     file_size;
01245   MSTempLog   *log;
01246   uint32_t    to_delete = 0;
01247   CSString    *db_path;
01248   bool      is_pbms = false;
01249 
01250   enter_();
01251 
01252   push_(db_name);
01253 
01254   //is_pbms = (strcmp(db_name->getCString(), "pbms") == 0); To be done later.
01255   
01256   /*
01257    * Block the creation of the pbms database if there is no MySQL database. 
01258    * The database name is case sensitive here if the file system names are
01259    * case sensitive. This is desirable.
01260    */
01261   path = CSPath::newPath(ms_my_get_mysql_home_path(), RETAIN(db_name));
01262   push_(path);
01263   if (create && !path->exists()) {
01264     CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
01265   }
01266   release_(path);
01267   
01268    // Create the database path, if 'create' == false then it can return NULL
01269   path = createDatabasePath(db_location, RETAIN(db_name), &db_id, &create, is_pbms);
01270   if (!path) {
01271     release_(db_name);
01272     return_(NULL);
01273   }
01274   push_(path);
01275   
01276   // Create the database object and initialize it.
01277   if (!(db = new MSDatabase())) {
01278     CSException::throwOSError(CS_CONTEXT, ENOMEM);
01279   }
01280   
01281   db->myIsPBMS = is_pbms;
01282   db_path = path->getString();
01283   db_path->retain();
01284   release_(path);
01285   path = NULL;
01286     
01287   db->iNextBlobRefId = (uint32_t) time(NULL);
01288   db->iNextBlobRefId <<= 32;
01289   db->iNextBlobRefId = COMMIT_MASK(db->iNextBlobRefId);
01290   db->iNextBlobRefId++;
01291   
01292   db->myDatabaseID = db_id;
01293   db->myDatabasePath = db_path;
01294   db->myDatabaseName = db_name;
01295   new_(db->myBlobCloud, CloudDB(db_id));
01296 
01297   pop_(db_name);  
01298   
01299   push_(db);
01300   
01301   
01302   new_(db->myTempLogArray, CSSyncSparseArray(20));
01303   new_(db->iTableList, CSSyncSortedList());
01304   new_(db->iTableArray, CSSparseArray(20));
01305   new_(db->myRepostoryList, CSSyncVector(20));
01306   
01307   if (!is_pbms) { //
01308 #ifdef HAVE_ALIAS_SUPPORT
01309     //db->retain(); no retain here, MSAlias() takes a back ref.
01310     new_(db->iBlobAliases, MSAlias(db));
01311 #endif    
01312     /* "Load" the database: */
01313 
01314     /* Get the max table ID: */
01315     dir = CSDirectory::newDirectory(RETAIN(db_path));
01316     push_(dir);
01317     dir->open();
01318     while (dir->next()) {
01319       file_name = dir->name();
01320       if (dir->isFile() && cs_is_extension(file_name, "bst"))
01321         db->addTableFromFile(dir, file_name, false);
01322     }
01323     release_(dir);
01324 
01325     path = CSPath::newPath(RETAIN(db_path), "bs-repository");
01326     if (path->exists()) {
01327       dir = CSDirectory::newDirectory(path);
01328       push_(dir);
01329       dir->open();
01330       while (dir->next()) {
01331         file_name = dir->name();
01332         if (dir->isFile() && cs_is_extension(file_name, "bs")) {
01333           if ((file_id = fileToTableId(file_name, "repo"))) {
01334             dir->info(NULL, &file_size, NULL);
01335             new_(repo, MSRepository(file_id, db, file_size));
01336             db->myRepostoryList->set(file_id - 1, repo);
01337           }
01338         }
01339       }
01340       release_(dir);
01341     }
01342     else {
01343       path->makeDir();
01344       path->release();
01345     }
01346 
01347     path = CSPath::newPath(RETAIN(db_path), "bs-logs");
01348     if (path->exists()) {
01349       dir = CSDirectory::newDirectory(path);
01350       push_(dir);
01351       dir->open();
01352       while (dir->next()) {
01353         file_name = dir->name();
01354         if (dir->isFile()) {
01355           if (cs_is_extension(file_name, "bs")) {
01356             if ((file_id = fileToTableId(file_name, "temp"))) {
01357               dir->info(NULL, &file_size, NULL);
01358               new_(log, MSTempLog(file_id, db, file_size));
01359               db->myTempLogArray->set(file_id, log);
01360             }
01361           }
01362           else if (cs_is_extension(file_name, "bst")) {
01363             db->addTableFromFile(dir, file_name, true);
01364             to_delete++;
01365           }
01366         }
01367       }
01368       release_(dir);
01369     }
01370     else {
01371       path->makeDir();
01372       path->release();
01373     }
01374 
01375     if (to_delete) {
01376       /* Go through and prepare all the tables that are to
01377        * be deleted:
01378        */
01379       uint32_t  i = 0;
01380       MSTable *tab;
01381       
01382       while ((tab = (MSTable *) db->iTableList->itemAt(i))) {
01383         if (tab->isToDelete())
01384           tab->prepareToDelete();
01385         i++;
01386       }
01387     }
01388 
01389   }
01390   pop_(db);
01391 
01392   return_(db);
01393 }
01394 
01395 void MSDatabase::startThreads()
01396 {
01397   enter_();
01398 
01399   if (myIsPBMS)
01400     exit_();
01401     
01402 #ifdef HAVE_ALIAS_SUPPORT
01403   // iBlobAliases->ma_open() must be called before starting any threads.
01404   iBlobAliases->ma_open(); 
01405 #endif
01406 
01407   new_(myTempLogThread, MSTempLogThread(1 * 1000, this));
01408   myTempLogThread->start();
01409 
01410 #ifdef MS_COMPACTOR_POLLS
01411   new_(myCompactorThread, MSCompactorThread(MS_COMPACTOR_POLL_FREQ, this));
01412 #else
01413   new_(myCompactorThread, MSCompactorThread(MS_DEFAULT_COMPACTOR_WAIT * 1000, this));
01414 #endif
01415 
01416   
01417   myCompactorThread->start();
01418   exit_();
01419 }
01420 
01421 
01422 void MSDatabase::dropDatabase() 
01423 {
01424   enter_();
01425 
01426   iDropping = true;
01427   iClosing = true;
01428   
01429   if (iBackupThread) {
01430     iBackupThread->stop();
01431     iBackupThread->release();
01432     iBackupThread = NULL;
01433   }
01434   
01435   if (myTempLogThread) {
01436     myTempLogThread->stop();
01437     myTempLogThread->release();
01438     myTempLogThread = NULL;
01439   }
01440   
01441   if (myCompactorThread) {
01442     myRepostoryList->wakeup(); // The compator thread waits on this.
01443     myCompactorThread->stop();
01444     myCompactorThread->release();
01445     myCompactorThread = NULL;
01446   }
01447   
01448   // Call cloud drop database even if the database is not currrently
01449   // using cloud storage just in case to was in the past. If the connection
01450   // to the cloud is not setup then nothing will be done.
01451   try_(a) {
01452     myBlobCloud->cl_dropDB();
01453   }
01454   catch_(a) {
01455     self->logException();
01456   }
01457   cont_(a);
01458   exit_();
01459 }
01460 
01461 void MSDatabase::removeDatabasePath(CSString *doomedDatabasePath ) 
01462 {
01463   CSPath *path = NULL;
01464   CSDirectory *dir = NULL;
01465   const char *file_name;
01466   enter_();
01467   
01468   push_(doomedDatabasePath);
01469   
01470   // Delete repository files
01471   path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-repository");
01472   push_(path);
01473   if (path->exists()) {
01474     dir = CSDirectory::newDirectory(RETAIN(path));
01475     push_(dir);
01476     dir->open();
01477     while (dir->next()) {
01478       file_name = dir->name();
01479       if (dir->isFile() && cs_is_extension(file_name, "bs")) {
01480         dir->deleteEntry();
01481       }
01482     }
01483     release_(dir);
01484     if (path->isEmpty())
01485       path->removeDir();
01486   }
01487   release_(path);
01488 
01489   // Delete temp log files.
01490   path = CSPath::newPath(RETAIN(doomedDatabasePath), "bs-logs");
01491   push_(path);
01492   if (path->exists()) {
01493     dir = CSDirectory::newDirectory(RETAIN(path));
01494     push_(dir);
01495     dir->open();
01496     while (dir->next()) {
01497       file_name = dir->name();
01498       if (dir->isFile() && (cs_is_extension(file_name, "bs") || cs_is_extension(file_name, "bst"))) {
01499         dir->deleteEntry();
01500       }
01501     }
01502     release_(dir);
01503     if (path->isEmpty())
01504       path->removeDir();
01505   }
01506   release_(path);
01507 
01508   // Delete table reference files.
01509   dir = CSDirectory::newDirectory(RETAIN(doomedDatabasePath));
01510   push_(dir);
01511   dir->open();
01512   while (dir->next()) {
01513     file_name = dir->name();
01514     if (dir->isFile() && cs_is_extension(file_name, "bst"))
01515       dir->deleteEntry();
01516   }
01517   release_(dir);
01518 
01519 #ifdef HAVE_ALIAS_SUPPORT
01520   path = CSPath::newPath(RETAIN(doomedDatabasePath), ACTIVE_ALIAS_INDEX);
01521   push_(path);
01522   path->removeFile();
01523   release_(path);
01524 #endif
01525 
01526   PBMSSystemTables::removeSystemTables(RETAIN(doomedDatabasePath));
01527   
01528   path = CSPath::newPath(RETAIN(doomedDatabasePath));
01529   push_(path);
01530   if (path->isEmpty() && !path->isLink()) {
01531     path->removeDir();
01532   } else { 
01533     CSStringBuffer *new_name;
01534     // If the database folder is not empty we rename it to get it out of the way.
01535     // If it is not renamed it will be reused if a database with the same name is
01536     // created again wich will result in the database ID being reused which may
01537     // have some bad side effects.
01538     new_(new_name, CSStringBuffer());
01539     push_(new_name);
01540     new_name->append(cs_last_name_of_path(doomedDatabasePath->getCString()));
01541     new_name->append("_DROPPED");
01542     path->rename(new_name->getCString());
01543     release_(new_name);
01544   }
01545   release_(path);
01546 
01547   release_(doomedDatabasePath);
01548   
01549   path = CSPath::newPath(PBMSDaemon::getPBMSDir());
01550   push_(path);
01551   if (path->isEmpty() && !path->isLink()) {
01552     path->removeDir();
01553   }
01554   release_(path);
01555 
01556   exit_();
01557 }
01558 
01559 /* Drop the PBMS database if it exists.
01560  * The root folder 'pbms' will be deleted also
01561  * if it is empty and not a symbolic link.
01562  * The database folder in 'pbms' is deleted if it is empty and
01563  * it is not a symbolic link.
01564  */
01565 void MSDatabase::dropDatabase(MSDatabase *doomedDatabase, const char *db_name ) 
01566 {
01567   CSString *doomedDatabasePath = NULL;
01568 
01569   enter_();
01570   
01571   if (doomedDatabase) {
01572     push_(doomedDatabase);
01573     
01574     // Remove any pending transactions for the dropped database.
01575     // This is important because if the database is restored it will have the
01576     // same database ID and the old transactions would be applied to it.
01577     MSTransactionManager::dropDatabase(doomedDatabase->myDatabaseID);
01578     
01579     doomedDatabasePath = doomedDatabase->myDatabasePath;
01580     doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
01581     
01582     MSTableList::removeDatabaseTables(RETAIN(doomedDatabase));
01583     MSSystemTableShare::removeDatabaseSystemTables(RETAIN(doomedDatabase));
01584 
01585     doomedDatabase->dropDatabase(); // Shutdown database threads.
01586     
01587     // To avoid a deadlock a lock is not taken on the database list
01588     // if shutdown is in progress. The only database that would be
01589     // dropped during a shutdown is an incomplete backup database.
01590     ASSERT(doomedDatabase->isBackup || !self->myMustQuit);
01591     if (!self->myMustQuit) 
01592       lock_(gDatabaseList); // Be sure to shutdown the database before locking this or it can lead to deadlocks
01593       
01594     gDatabaseArray->remove(doomedDatabase->myDatabaseID);
01595     if (!doomedDatabase->isBackup)
01596       gDatabaseList->remove(doomedDatabase->getKey());
01597     if (!self->myMustQuit) 
01598       unlock_(gDatabaseList); 
01599     ASSERT(doomedDatabase->getRefCount() == 1);
01600     release_(doomedDatabase);
01601     
01602   } else {
01603     CSPath *path;
01604     bool create = false;
01605     uint32_t db_id;
01606     
01607     path = createDatabasePath(ms_my_get_mysql_home_path(), CSString::newString(db_name), &db_id, &create);
01608     
01609     if (path) {
01610       MSTransactionManager::dropDatabase(db_id);
01611 
01612       push_(path);
01613       doomedDatabasePath = path->getString();
01614       doomedDatabasePath->retain(); // Hold on to this path after the database has been released.
01615       release_(path);
01616     }
01617   }
01618   
01619   if (doomedDatabasePath)
01620     removeDatabasePath(doomedDatabasePath);
01621   
01622   exit_();
01623 }
01624 
01625 void MSDatabase::dropDatabase(const char *db_name ) 
01626 {
01627   enter_();
01628   dropDatabase(getDatabase(db_name, false), db_name);
01629   exit_();
01630 }
01631 
01632 // The table_path can be several things here:
01633 // 1: <absalute path>/<database>/<table>
01634 // 2: <absalute path>/<database>
01635 // 3: <database>/<table>
01636 bool MSDatabase::convertTablePathToIDs(const char *table_path, uint32_t *db_id, uint32_t *tab_id, bool create) 
01637 {
01638   const char  *base = ms_my_get_mysql_home_path();
01639   CSString  *table_url;
01640   CSString  *db_path = NULL;
01641   CSString  *db_name = NULL;
01642   CSString  *tab_name = NULL;
01643   MSDatabase  *db;
01644   enter_();
01645   
01646   *db_id = 0;
01647   *tab_id = 0;
01648   
01649   table_url = CSString::newString(table_path);
01650   if (table_url->startsWith(base)) {
01651     table_url = table_url->right(base);
01652   }
01653   push_(table_url);
01654 
01655 
01656   db_path = table_url->left("/", -1);
01657   push_(db_path);
01658   tab_name = table_url->right("/", -1);
01659   
01660   pop_(db_path);
01661   release_(table_url);
01662 
01663   if (db_path->length() == 0) { // Only a database name was supplied.
01664     db_path->release();
01665     db_name = tab_name;
01666     tab_name = NULL;
01667   } else {
01668     if (tab_name->length() == 0) {
01669       tab_name->release();
01670       tab_name = NULL;
01671     } else
01672       push_(tab_name);
01673     push_(db_path);
01674     db_name = db_path->right("/", -1);
01675     pop_(db_path);
01676     if (db_name->length() == 0) {
01677       db_name->release();
01678       db_name = db_path;
01679     } else {
01680       db_path->release();
01681       db_path = NULL;
01682     }
01683   }
01684   
01685   db = MSDatabase::getDatabase(db_name, create); // This will release db_name
01686   if (db) {
01687     *db_id = db->myDatabaseID;
01688     if (tab_name) {
01689       MSTable   *tab;
01690       pop_(tab_name);
01691       push_(db);
01692       tab = db->getTable(tab_name, create);// This will release tab_name
01693       pop_(db);
01694       if (tab) {
01695         *tab_id = tab->myTableID;
01696         tab->release();
01697       }
01698     }
01699       
01700     db->release();
01701   }
01702   
01703   return_((*tab_id > 0) && (*db_id > 0));
01704 }
01705 
01706 bool MSDatabase::convertTableAndDatabaseToIDs(const char *db_name, const char *tab_name, uint32_t *db_id, uint32_t *tab_id, bool create) 
01707 {
01708   MSDatabase  *db;
01709   enter_();
01710   
01711   *db_id = 0;
01712   *tab_id = 0;
01713   
01714   db = MSDatabase::getDatabase(db_name, create); 
01715   if (db) {
01716     push_(db);
01717     *db_id = db->myDatabaseID;
01718     if (tab_name) {
01719       MSTable   *tab;
01720       tab = db->getTable(tab_name, create);
01721       if (tab) {
01722         *tab_id = tab->myTableID;
01723         tab->release();
01724       }
01725     }
01726       
01727     release_(db);
01728   }
01729   
01730   return_((*tab_id > 0) && (*db_id > 0));
01731 }
01732 
01733 MSDatabase *MSDatabase::loadDatabase(CSString *db_name, bool create)
01734 {
01735   MSDatabase *db;
01736   enter_();
01737   
01738   db = newDatabase(ms_my_get_mysql_home_path(), db_name, 0, create);
01739   
01740   if (db) {
01741     push_(db);
01742     
01743     gDatabaseList->add(RETAIN(db));
01744     
01745     gDatabaseArray->set(db->myDatabaseID, RETAIN(db));
01746     db->startThreads();
01747     PBMSSystemTables::loadSystemTables(RETAIN(db));
01748       
01749     pop_(db);
01750   }
01751   return_(db);
01752 }
01753 
01754 uint32_t MSDatabase::getDatabaseID(CSString *db_name, bool create)
01755 {
01756   MSDatabase *db;
01757   uint32_t id = 0;
01758   enter_();
01759   push_(db_name);
01760   
01761   
01762   lock_(gDatabaseList);
01763   if (!(db = (MSDatabase *) gDatabaseList->find(db_name))) {
01764     db = MSDatabase::loadDatabase(RETAIN(db_name), create);
01765     if (!db)
01766       goto exit;
01767     id = db->myDatabaseID;
01768     db->release();
01769   } else
01770     id = db->myDatabaseID;
01771   
01772   exit:
01773   unlock_(gDatabaseList);
01774   release_(db_name);
01775   return_(id);
01776 }
01777 
01778 
01779 uint32_t MSDatabase::getDatabaseID(const char *db_name, bool create)
01780 {
01781   return getDatabaseID(CSString::newString(db_name), create);
01782 }
01783 
01784 MSDatabase *MSDatabase::getBackupDatabase(CSString *db_location, CSString *db_name, uint32_t db_id, bool create)
01785 {
01786   bool was_created = create; 
01787   CSPath *path;
01788   MSDatabase *db;
01789   enter_();
01790   
01791   push_(db_location);
01792   push_(db_name);
01793   // If the db already exists and 'create' == true then the existing db
01794   // must be deleted.
01795   // Create the database path, if 'create' == false then it can return NULL
01796   path = createDatabasePath(db_location->getCString(), RETAIN(db_name), &db_id, &was_created);
01797   if (!path) {
01798     CSException::throwException(CS_CONTEXT, MS_ERR_UNKNOWN_DB, db_name->getCString());
01799   }
01800   push_(path);
01801   
01802   // If we wanted to create it but it already exists then throw an error.
01803   if ( create && !was_created) {
01804     char str[120];
01805     snprintf(str, 120, "Duplicate database: %s", db_name->getCString());
01806     CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE_DB, str);
01807   }
01808     
01809   release_(path);
01810   pop_(db_name);
01811   // everything looks OK
01812   db = newDatabase(db_location->getCString(), db_name, db_id, create);
01813   db->setBackupDatabase();
01814   release_(db_location);
01815   return_(db);
01816 }
01817