Drizzled Public API Documentation

alias_ms.cc

00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Barry Leslie
00020  *
00021  * 2008-12-30
00022  *
00023  * H&G2JCtL
00024  *
00025  * BLOB alias index.
00026  *
00027  */
00028 
00029 #ifdef HAVE_ALIAS_SUPPORT
00030 #include "cslib/CSConfig.h"
00031 
00032 #include "string.h"
00033 
00034 #ifdef DRIZZLED
00035 #include <drizzled/common.h>
00036 #endif
00037 
00038 #include "cslib/CSGlobal.h"
00039 #include "cslib/CSLog.h"
00040 #include "cslib/CSStrUtil.h"
00041 #include "cslib/CSFile.h"
00042 #include "system_table_ms.h"
00043 #include "database_ms.h"
00044 
00045 #include "alias_ms.h"
00046 
00047 
00048 
00049 //------------------------
00050 MSAlias::~MSAlias()
00051 {
00052   enter_();
00053   
00054   ASSERT(iClosing);
00055   ASSERT(iPoolSysTables.getSize() == 0);
00056 
00057   
00058   if (iFilePath) {
00059   
00060     if (iDelete)
00061       iFilePath->removeFile();
00062       
00063     iFilePath->release();
00064   }
00065   
00066   if (iFileShare)
00067     iFileShare->release();
00068     
00069   exit_();
00070 }
00071 
00072 //------------------------
00073 MSAlias::MSAlias(MSDatabase *db_noref)
00074 {
00075   iClosing = false;
00076   iDelete = false;
00077   iDatabase_br = db_noref;
00078   iFilePath = NULL;
00079   iFileShare = NULL;
00080 }
00081 
00082 //------------------------
00083 void MSAlias::ma_close()
00084 {
00085   enter_();
00086   
00087   iClosing = true;
00088   if (iFileShare)
00089     iFileShare->close();
00090   iPoolSysTables.clear();
00091   exit_();
00092 }
00093 
00094 //------------------------
00095 // Compress the index bucket chain and free unused buckets.
00096 void MSAlias::MSAliasCompress(CSFile *fa, CSSortedList  *freeList, MSABucketLinkedList *bucketChain)
00097 {
00098   // For now I will just remove empty buckets. 
00099   // Later this function should also compress the records also 
00100   // thus making the searches faster and freeing up more space.
00101   MSABucketInfo *b_info, *next;
00102   
00103   b_info = bucketChain->getFront();
00104   while (b_info) {
00105     next = b_info->getNextLink();
00106     if (b_info->getSize() == 0) {
00107       bucketChain->remove(RETAIN(b_info));
00108       freeList->add(b_info);
00109     }   
00110     b_info = next;
00111   }
00112     
00113 }
00114 
00115 //------------------------
00116 void MSAlias::MSAliasLoad()
00117 { 
00118   CSFile      *fa = NULL;
00119   CSSortedList  freeList;
00120   off64_t     fileSize;
00121 
00122   enter_();
00123   
00124   fa = CSFile::newFile(RETAIN(iFilePath));
00125   push_(fa);
00126 
00127   MSAliasHeadRec header;
00128   uint64_t free_list_offset;
00129   fa->open(CSFile::DEFAULT);
00130   fa->read(&header, 0, sizeof(header), sizeof(header));
00131   
00132   /* Check the file header: */
00133   if (CS_GET_DISK_4(header.ah_magic_4) != MS_ALIAS_FILE_MAGIC)
00134     CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_HEADER_MAGIC);
00135   if (CS_GET_DISK_2(header.ah_version_2) != MS_ALIAS_FILE_VERSION)
00136     CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_VERSION_TOO_NEW);
00137     
00138   free_list_offset = CS_GET_DISK_8(header.ah_free_list_8);
00139   
00140   fileSize = CS_GET_DISK_8(header.ah_file_size_8);
00141 
00142   // Do some sanity checks. 
00143   if (CS_GET_DISK_2(header.ah_head_size_2) != sizeof(header))
00144     CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
00145   
00146   if (CS_GET_DISK_2(header.ah_num_buckets_2) != BUCKET_LIST_SIZE)
00147     CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
00148   
00149   if (CS_GET_DISK_4(header.ah_bucket_size_4) != NUM_RECORDS_PER_BUCKET)
00150     CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
00151   
00152   if (fileSize != fa->getEOF()) 
00153     CSException::throwFileError(CS_CONTEXT, iFilePath->getCString(), CS_ERR_BAD_FILE_HEADER);
00154   
00155   // Load the bucket headers into RAM
00156   MSADiskBucketHeadRec bucketHead = {0};
00157   uint64_t offset, start_offset;
00158 
00159   // Fist load the free list:
00160   if (free_list_offset) {
00161     start_offset = offset = free_list_offset;
00162     do {
00163       fa->read(&bucketHead, offset, sizeof(MSADiskBucketHeadRec), sizeof(MSADiskBucketHeadRec));
00164       freeList.add(MSABucketInfo::newMSABucketInfo(offset));
00165       offset = CS_GET_DISK_8(bucketHead.ab_next_bucket_8);          
00166     } while (offset != start_offset);
00167     
00168   }
00169   for (uint32_t i = 0; i < BUCKET_LIST_SIZE; i++) {
00170     uint64_t used, total_space;
00171     MSABucketLinkedList *bucketChain = &(iFileShare->msa_buckets[i]);
00172     
00173     start_offset = offset = sizeof(header) + i * sizeof(MSADiskBucketRec);
00174     used = total_space = 0;
00175     do {
00176       uint32_t num, end_of_records;
00177       
00178       fa->read(&bucketHead, offset, sizeof(MSADiskBucketHeadRec), sizeof(MSADiskBucketHeadRec));
00179       num = CS_GET_DISK_4(bucketHead.ab_num_recs_4);
00180       end_of_records = CS_GET_DISK_4(bucketHead.ab_eor_rec_4);
00181       total_space += NUM_RECORDS_PER_BUCKET;
00182       used += num;
00183       bucketChain->addFront(MSABucketInfo::newMSABucketInfo(offset, num, end_of_records));
00184       offset = CS_GET_DISK_8(bucketHead.ab_next_bucket_8);
00185       
00186     } while (offset != start_offset);
00187     
00188     // Pack the index if required
00189     if (((total_space - used) /  NUM_RECORDS_PER_BUCKET) > 1) 
00190       MSAliasCompress(fa, &freeList, bucketChain); 
00191       
00192   }
00193   
00194   // If there are free buckets try to free up some disk
00195   // space or add them to a free list to be reused later.
00196   if (freeList.getSize()) {
00197     uint64_t last_bucket = fileSize - sizeof(MSADiskBucketRec);
00198     MSABucketInfo *rec;
00199     bool reduce = false;
00200     
00201     // Search for freed buckets at the end of the file
00202     // so that they can be released and the file
00203     // shrunk.
00204     //
00205     // The free list has been sorted so that buckets
00206     // with the highest file offset are first.
00207     do {
00208       rec = (MSABucketInfo*) freeList.itemAt(0);
00209       if (rec->bi_bucket_offset != last_bucket);
00210         break;
00211         
00212       last_bucket -= sizeof(MSADiskBucketRec);
00213       freeList.remove(rec);
00214       reduce = true;
00215     } while (freeList.getSize());
00216     
00217     if (reduce) {
00218       // The file can be reduced in size.
00219       fileSize = last_bucket + sizeof(MSADiskBucketRec);  
00220       fa->setEOF(fileSize);
00221       CS_SET_DISK_8(header.ah_file_size_8, fileSize);
00222       fa->write(&header.ah_file_size_8, offsetof(MSAliasHeadRec,ah_file_size_8) , 8); 
00223     }
00224     
00225     // Add the empty buckets to the index file's empty bucket list.
00226     memset(&bucketHead, 0, sizeof(bucketHead));
00227     offset = 0;
00228     while (freeList.getSize()) { // Add the empty buckets to the empty_bucket list.
00229       rec = (MSABucketInfo*) freeList.takeItemAt(0);
00230       
00231       // buckets are added to the front of the list.
00232       fa->write(&offset, rec->bi_bucket_offset + offsetof(MSADiskBucketHeadRec,ab_next_bucket_8) , 8);
00233       offset =  rec->bi_bucket_offset;
00234       fa->write(&offset, offsetof(MSAliasHeadRec,ah_free_list_8) , 8); 
00235       
00236       iFileShare->msa_empty_buckets.addFront(rec);
00237     }
00238   }
00239   
00240   iFileShare->msa_fileSize = fa->getEOF();
00241   
00242   release_(fa);
00243   exit_();
00244 }
00245 
00246 //------------------------
00247 void MSAlias::buildAliasIndex()
00248 {
00249   MSBlobHeadRec blob;
00250   MSRepository  *repo;
00251   uint64_t      blob_size, fileSize, offset;
00252   uint16_t      head_size;
00253   MSAliasFile   *afile;
00254   MSAliasRec    aliasRec;
00255   
00256   enter_();
00257   
00258   afile = getAliasFile();
00259   frompool_(afile);
00260 
00261   afile->startLoad();
00262 
00263   CSSyncVector  *repo_list = iDatabase_br->getRepositoryList();
00264   
00265   // No locking is required since the index is loaded before the database is opened
00266   // and the compactor thread is started.
00267 
00268   for (uint32_t repo_index =0; repo_index<repo_list->size(); repo_index++) {
00269     if ((repo = (MSRepository *) repo_list->get(repo_index))) {
00270       MSRepoFile  *repoFile = repo->openRepoFile();
00271       push_(repoFile);
00272       fileSize = repo->getRepoFileSize();
00273       offset = repo->getRepoHeadSize();
00274       
00275       aliasRec.repo_id = repoFile->myRepo->getRepoID();
00276       
00277       while (offset < fileSize) {
00278         if (repoFile->read(&blob, offset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) 
00279           break;
00280           
00281         if ((CS_GET_DISK_1(blob.rb_status_1) == MS_BLOB_REFERENCED) && CS_GET_DISK_2(blob.rb_alias_offset_2)) {
00282           aliasRec.repo_offset = offset;
00283           aliasRec.alias_hash = CS_GET_DISK_4(blob.rb_alias_hash_4);
00284           addAlias(afile, &aliasRec);
00285         }
00286         
00287         head_size = CS_GET_DISK_2(blob.rb_head_size_2);
00288         blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
00289         offset += head_size + blob_size;
00290       }
00291       
00292       release_(repoFile);
00293     }
00294   }
00295   
00296   afile->finishLoad();
00297   backtopool_(afile);
00298 
00299   exit_();
00300 }
00301 
00302 //------------------------
00303 void MSAlias::MSAliasBuild()
00304 {
00305   CSFile *fa;
00306   MSAliasHeadRec header = {0};
00307   uint64_t offset, size = sizeof(header) + BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec);
00308   enter_();
00309   
00310   fa = CSFile::newFile(RETAIN(iFilePath));
00311   push_(fa);
00312 
00313   fa->open(CSFile::CREATE | CSFile::TRUNCATE);
00314 
00315   // Create an empty index with 1 empty bucket in each bucket chain.  
00316 
00317   CS_SET_DISK_4(header.ah_magic_4,  MS_ALIAS_FILE_MAGIC);
00318   CS_SET_DISK_2(header.ah_version_2, MS_ALIAS_FILE_VERSION);
00319     
00320   CS_SET_DISK_2(header.ah_head_size_2, sizeof(header));
00321   CS_SET_DISK_8(header.ah_file_size_8, size);
00322   
00323   CS_SET_DISK_2(header.ah_num_buckets_2, BUCKET_LIST_SIZE);
00324   CS_SET_DISK_2(header.ah_bucket_size_4, NUM_RECORDS_PER_BUCKET);
00325   
00326   fa->setEOF(size); // Grow the file.
00327   fa->write(&header, 0, sizeof(header));
00328 
00329   offset = sizeof(header);
00330   
00331   // Initialize the file bucket chains.
00332   MSADiskBucketHeadRec bucketHead = {0};
00333   for (uint32_t i = 0; i < BUCKET_LIST_SIZE; i++) {
00334     CS_SET_DISK_8(bucketHead.ab_prev_bucket_8, offset);
00335     CS_SET_DISK_8(bucketHead.ab_next_bucket_8, offset);
00336     fa->write(&bucketHead, offset, sizeof(MSADiskBucketHeadRec));
00337     // Add the bucket to the RAM based list.
00338     iFileShare->msa_buckets[i].addFront(MSABucketInfo::newMSABucketInfo(offset));
00339     offset += sizeof(MSADiskBucketRec); // NOTE: MSADiskBucketRec not MSADiskBucketHeadRec
00340   }
00341   
00342   fa->sync();
00343   
00344   
00345   
00346   fa->close();
00347   
00348   release_(fa);
00349   
00350   // Scan through all the BLOBs in the repository and add an entry
00351   // for each blob alias.
00352   buildAliasIndex();
00353 
00354   exit_();
00355 }
00356 
00357 //------------------------
00358 void MSAlias::ma_open(const char *file_name)
00359 {
00360   bool isdir = false;
00361   
00362   enter_();
00363 
00364   iFilePath = CSPath::newPath(RETAIN(iDatabase_br->myDatabasePath), file_name);
00365   
00366 retry:
00367   new_(iFileShare, MSAliasFileShare(RETAIN(iFilePath)));
00368   
00369   if (iFilePath->exists(&isdir)) {
00370     try_(a) {
00371       MSAliasLoad(); 
00372     }
00373     catch_(a) {
00374       // If an error occurs delete the index and rebuild it.
00375       self->myException.log(NULL);
00376       iFileShare->release();
00377       iFilePath->removeFile();
00378       goto retry;
00379     }
00380     cont_(a);
00381   } else
00382     MSAliasBuild();
00383   
00384   
00385   exit_();
00386 }
00387 
00388 //------------------------
00389 uint32_t MSAlias::hashAlias(const char *ptr)
00390 {
00391   register uint32_t h = 0, g;
00392   
00393   while (*ptr) {
00394     h = (h << 4) + (uint32_t) toupper(*ptr++);
00395     if ((g = (h & 0xF0000000)))
00396       h = (h ^ (g >> 24)) ^ g;
00397   }
00398 
00399   return (h);
00400 }
00401 
00402 //------------------------
00403 void MSAlias::addAlias(MSAliasFile *af, MSAliasRec *rec)
00404 {
00405   MSDiskAliasRec diskRec;
00406   CS_SET_DISK_4(diskRec.ar_repo_id_4, rec->repo_id);  
00407   CS_SET_DISK_8(diskRec.ar_offset_8, rec->repo_offset); 
00408   CS_SET_DISK_4(diskRec.ar_hash_4, rec->alias_hash);
00409   af->addRec(&diskRec);
00410 
00411 }
00412 
00413 //------------------------
00414 uint32_t MSAlias::addAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
00415 {
00416   MSDiskAliasRec diskRec;
00417   uint32_t hash;
00418   uint32_t f_repo_id;
00419   uint64_t f_repo_offset;
00420   bool referenced = false;
00421   enter_();
00422   
00423   hash = hashAlias(alias);
00424   
00425   // Use a lock to make sure that the same alias cannot be added at the same time.
00426   lock_(this);
00427   
00428   MSAliasFile *af = getAliasFile();
00429   frompool_(af);
00430 
00431   if (findBlobByAlias(RETAIN(af), alias, &referenced, &f_repo_id, &f_repo_offset)) {
00432     if ((f_repo_id == repo_id) && (f_repo_offset == repo_offset))
00433       goto done; // Do not treat this as an error.
00434     if (!referenced) {
00435       // If the alias is in use by a non referenced BLOB then delete it.
00436       // This can happen because I allow newly created BLOBs to be accessed
00437       // by their alias even before a reference to the BLOB has been added to
00438       // the database.
00439       af->deleteCurrentRec();
00440     } else  {
00441 #ifdef xxDEBUG
00442       CSL.log(self, CSLog::Protocol, "Alias: ");
00443       CSL.log(self, CSLog::Protocol, alias);
00444       CSL.log(self, CSLog::Protocol, "\n");
00445 #endif
00446       CSException::throwException(CS_CONTEXT, MS_ERR_DUPLICATE, "Alias Exists");
00447     }
00448   }
00449     
00450   CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id); 
00451   CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);  
00452   CS_SET_DISK_4(diskRec.ar_hash_4, hash); 
00453 
00454   af->addRec(&diskRec);
00455 done:
00456   backtopool_(af);
00457 
00458   unlock_(this);
00459   return_(hash);
00460 }
00461 
00462 //------------------------
00463 void MSAlias::deleteAlias(MSDiskAliasPtr diskRec)
00464 {
00465   enter_();
00466   
00467   MSAliasFile *af = getAliasFile();
00468   frompool_(af);
00469   if (af->findRec(diskRec))
00470     af->deleteCurrentRec();
00471   backtopool_(af);
00472 
00473   exit_();
00474 }
00475 
00476 //------------------------
00477 void MSAlias::deleteAlias(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
00478 {
00479   MSDiskAliasRec diskRec;
00480   
00481   CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id); 
00482   CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);  
00483   CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash); 
00484   deleteAlias(&diskRec);
00485   
00486 }
00487 //------------------------
00488 void MSAlias::resetAlias(uint32_t old_repo_id, uint64_t old_repo_offset, uint32_t alias_hash, uint32_t new_repo_id, uint64_t new_repo_offset)
00489 {
00490   MSDiskAliasRec diskRec;
00491   bool found;
00492   enter_();
00493   
00494   CS_SET_DISK_4(diskRec.ar_repo_id_4, old_repo_id); 
00495   CS_SET_DISK_8(diskRec.ar_offset_8, old_repo_offset);  
00496   CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash); 
00497 
00498   lock_(this);
00499   
00500   MSAliasFile *af = getAliasFile();
00501   frompool_(af);
00502   found = af->findRec(&diskRec);
00503   CS_SET_DISK_4(diskRec.ar_repo_id_4, new_repo_id); 
00504   CS_SET_DISK_8(diskRec.ar_offset_8, new_repo_offset);  
00505 
00506   if (found) 
00507     af->updateCurrentRec(&diskRec);
00508   else {
00509     CSException::logException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Alias doesn't exists");
00510     af->addRec(&diskRec);
00511   }
00512       
00513   backtopool_(af);
00514 
00515   unlock_(this);
00516   exit_();
00517 }
00518 
00519 //------------------------
00520 // Check to see if the blob with the given repo_id
00521 // and repo_offset has the specified alias.
00522 bool MSAlias::hasBlobAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias, bool *referenced)
00523 {
00524   bool found = false;
00525   MSRepoFile *repoFile;
00526   MSBlobHeadRec blob;
00527   uint8_t status;
00528   uint64_t offset;
00529   uint32_t alias_size = strlen(alias) +1;
00530   char blob_alias[BLOB_ALIAS_LENGTH +1];
00531   
00532   if (alias_size > BLOB_ALIAS_LENGTH)
00533     return false;
00534 
00535   enter_();
00536   
00537   repoFile = iDatabase_br->getRepoFileFromPool(repo_id, false);
00538   frompool_(repoFile);
00539 
00540   repoFile->read(&blob, repo_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
00541   status = CS_GET_DISK_1(blob.rb_status_1);
00542   if (IN_USE_BLOB_STATUS(status)) {
00543     offset = repo_offset + CS_GET_DISK_2(blob.rb_alias_offset_2);
00544     
00545     blob_alias[BLOB_ALIAS_LENGTH] = 0;
00546     if (repoFile->read(blob_alias, offset, alias_size, 0) == alias_size) {
00547       found = !my_strcasecmp(&my_charset_utf8_general_ci, blob_alias, alias);
00548       if (found)
00549         *referenced = (status == MS_BLOB_REFERENCED);
00550     }
00551   } else {
00552     CSException::logException(CS_CONTEXT, MS_ERR_ENGINE, "Deleted BLOB alias found. (Rebuild BLOB alias index.)");
00553   }
00554   
00555     
00556   backtopool_(repoFile);  
00557 
00558   return_(found);
00559 }
00560 
00561 //------------------------
00562 bool MSAlias::findBlobByAlias( MSAliasFile *af, const char *alias, bool *referenced, uint32_t *repo_id, uint64_t *repo_offset)
00563 {
00564   bool found = false;
00565   uint32_t hash, l_repo_id, l_repo_offset;
00566   MSDiskAliasPtr diskRec;
00567   enter_();
00568 
00569   push_(af);
00570   
00571   hash = hashAlias(alias);
00572   diskRec = af->findRec(hash);
00573   
00574   while (diskRec && !found) {
00575     l_repo_id = CS_GET_DISK_4(diskRec->ar_repo_id_4);
00576     l_repo_offset = CS_GET_DISK_8(diskRec->ar_offset_8);
00577     if (hasBlobAlias(l_repo_id, l_repo_offset, alias, referenced))
00578       found = true;
00579     else
00580       diskRec = af->nextRec();
00581   }
00582     
00583   if (found) {
00584     if (repo_id)
00585       *repo_id = l_repo_id;
00586       
00587     if (repo_offset)
00588       *repo_offset = l_repo_offset;
00589   }
00590   
00591   release_(af);
00592   return_(found);
00593 }
00594 //------------------------
00595 bool MSAlias::findBlobByAlias( const char *alias, bool *referenced, uint32_t *repo_id, uint64_t *repo_offset)
00596 {
00597   bool found;
00598   enter_();
00599   
00600   MSAliasFile *af = getAliasFile();
00601   frompool_(af);
00602   
00603   found = findBlobByAlias(RETAIN(af), alias, referenced, repo_id, repo_offset);
00604 
00605   backtopool_(af);
00606   return_(found);
00607 }
00608 
00609 //------------------------
00610 bool MSAlias::blobAliasExists(uint32_t repo_id, uint64_t repo_offset, uint32_t alias_hash)
00611 {
00612   bool found;
00613   MSDiskAliasRec diskRec;
00614   
00615   CS_SET_DISK_4(diskRec.ar_repo_id_4, repo_id); 
00616   CS_SET_DISK_8(diskRec.ar_offset_8, repo_offset);  
00617   CS_SET_DISK_4(diskRec.ar_hash_4, alias_hash); 
00618 
00619   enter_();
00620   
00621   MSAliasFile *af = getAliasFile();
00622   frompool_(af);
00623   
00624   found = af->findRec(&diskRec);
00625 
00626   backtopool_(af);
00627   return_(found);
00628 }
00629 
00631 MSSysMeta::MSSysMeta(MSAlias *msa)
00632 {
00633   md_myMSAlias = msa;
00634   md_isFileInUse = false;
00635   md_NextLink = md_PrevLink = NULL;
00636   
00637   mtab = MSMetaDataTable::newMSMetaDataTable(RETAIN(msa->iDatabase_br));
00638 }
00639 
00640 //------------------------
00641 MSSysMeta::~MSSysMeta()
00642 {
00643   if (mtab)
00644     mtab->release();
00645 
00646   if (md_myMSAlias)
00647     md_myMSAlias->release();
00648 }
00649 
00650 //------------------------
00651 void MSSysMeta::returnToPool()
00652 {
00653   enter_();
00654   push_(this);
00655   
00656     
00657   md_isFileInUse = false;
00658     
00659   if (!md_myMSAlias->iClosing) {
00660     lock_(&md_myMSAlias->iSysTablePoolLock); // It may be better if the pool had it's own lock.
00661     md_nextFile = md_myMSAlias->iSysTablePool;
00662     md_myMSAlias->iSysTablePool - this;
00663     unlock_(&md_myMSAlias->iSysTablePoolLock);
00664   }
00665     
00666   release_(this);
00667   exit_();
00668 }
00669 //------------------------
00670 bool MSSysMeta::matchAlias(uint32_t repo_id, uint64_t repo_offset, const char *alias)
00671 {
00672   mtab->seqScanInit(); 
00673   return mtab->matchAlias(repo_id, repo_offset, alias); 
00674 }
00675 
00678 MSAliasFile::MSAliasFile(MSAliasFileShare *share)
00679 {
00680   ba_share = share;
00681   ba_isFileInUse = false;
00682   ba_NextLink = ba_PrevLink = NULL;
00683   
00684   iCurrentRec = 0;
00685   iBucketCache = NULL;
00686   iStartBucket = iCurrentBucket = NULL;
00687   iBucketChain = NULL;
00688   iLoading = false;
00689   ba_nextFile = NULL;
00690   
00691   iFile = CSFile::newFile(RETAIN(ba_share->msa_filePath));  
00692   iFile->open(CSFile::DEFAULT);
00693   
00694   
00695 }
00696 
00697 //------------------------
00698 MSAliasFile::~MSAliasFile()
00699 {
00700   if (iFile)
00701     iFile->release();
00702     
00703   if (iBucketCache)
00704     cs_free(iBucketCache);
00705 }
00706 
00707 //------------------------
00708 void MSAliasFile::startLoad()
00709 {
00710   enter_();
00711   
00712   ASSERT(!iLoading);
00713   
00714 //  iBucketCache = (MSADiskBucketRec*) cs_malloc(BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec));
00715 //  memset(iBucketCache, 0, BUCKET_LIST_SIZE * sizeof(MSADiskBucketRec));
00716   iLoading = true;
00717   
00718   exit_();
00719 }
00720 
00721 //------------------------
00722 void MSAliasFile::finishLoad()
00723 {
00724   enter_();
00725   ASSERT(iLoading);
00726   // Write the bucket cache to disk.
00727 //  for (iCurrentBucket && iCurrentBucket->getSize()) {
00728     // To Be implemented.
00729 //  }
00730 //  cs_free(iBucketCache);
00731   iBucketCache = NULL;
00732   iLoading = false;
00733   exit_();
00734 }
00735 
00736 //------------------------
00737 void MSAliasFile::returnToPool()
00738 {
00739   enter_();
00740   push_(this);
00741   
00742   if (iLoading) {
00743     // If iLoading is still set then probably an exception has been thrown.
00744     try_(a) {
00745       finishLoad();
00746     }
00747     catch_(a) 
00748       iLoading = false;
00749     cont_(a);
00750   }
00751 
00752   ba_isFileInUse = false;
00753     
00754   if (!ba_share->msa_closing) {
00755     lock_(&ba_share->msa_poolLock);
00756     ba_nextFile = ba_share->msa_pool;
00757     ba_share->msa_pool = this;
00758     unlock_(&ba_share->msa_poolLock);
00759   }
00760     
00761   release_(this);
00762   exit_();
00763 }
00764 
00765 //------------------------
00766 // The bucket chain is treated as a circular list.
00767 bool MSAliasFile::nextBucket(bool with_space)
00768 {
00769   bool have_bucket = false;
00770   enter_();
00771   
00772   while (!have_bucket){
00773     if (iCurrentBucket) {
00774       iCurrentBucket = iCurrentBucket->getNextLink();
00775       if (!iCurrentBucket)
00776         iCurrentBucket = iBucketChain->getFront();
00777       if (iCurrentBucket == iStartBucket)
00778         break;
00779     } else {
00780       iCurrentBucket = iBucketChain->getFront();
00781       iStartBucket = iCurrentBucket;
00782     }
00783     
00784     if ((iCurrentBucket->getSize() && !with_space) || (with_space && (iCurrentBucket->getSize() < NUM_RECORDS_PER_BUCKET))){
00785       // Only read the portion of the bucket containing records.
00786       iCurrentRec = iCurrentBucket->getEndOfRecords(); // The current record is set just beyond the last valid record.
00787       size_t size = iCurrentRec * sizeof(MSDiskAliasRec);   
00788       iFile->read(iBucket, iCurrentBucket->bi_records_offset, size, size);      
00789       have_bucket = true;
00790     }
00791   }
00792   
00793   return_(have_bucket);
00794 }
00795 
00796 //------------------------
00797 MSDiskAliasPtr MSAliasFile::nextRec()
00798 {
00799   MSDiskAliasPtr rec = NULL;
00800   bool have_rec;
00801   enter_();
00802   
00803   while ((!(have_rec = scanBucket())) && nextBucket(false));
00804   
00805   if (have_rec) 
00806     rec = &(iBucket[iCurrentRec]);
00807     
00808   return_(rec);
00809 }
00810 
00811 //------------------------
00812 // When starting a search:
00813 // If a bucket is already loaded and it is in the correct bucket chain
00814 // then search it first. In this case then the search starts at the current
00815 // bucket in the chain.
00816 //
00817 // Searches are from back to front with the idea that the more recently
00818 // added objects will be seached for more often and they are more likely
00819 // to be at the end of the chain.
00820 MSDiskAliasPtr MSAliasFile::findRec(uint32_t hash)
00821 {
00822   MSDiskAliasPtr rec = NULL;
00823   MSABucketLinkedList *list = ba_share->getBucketChain(hash);
00824   enter_();
00825   
00826   CS_SET_DISK_4(iDiskHash_4, hash);
00827   if (list == iBucketChain) {
00828     // The search is performed back to front.
00829     iCurrentRec = iCurrentBucket->getEndOfRecords();  // Position the start just beyond the last valid record.
00830     iStartBucket = iCurrentBucket;
00831     if (scanBucket()) {
00832       rec = &(iBucket[iCurrentRec]);
00833       goto done;
00834     }
00835   } else {
00836     iBucketChain = list;
00837     iCurrentBucket = NULL;
00838     iStartBucket = NULL;
00839   }
00840 
00841   if (nextBucket(false))
00842     rec = nextRec();
00843     
00844 done:
00845   return_(rec);
00846 }
00847 
00848 //------------------------
00849 bool MSAliasFile::findRec(MSDiskAliasPtr theRec)
00850 {
00851   MSDiskAliasPtr aRec = NULL;
00852   bool found = false;
00853   enter_();
00854   
00855   aRec = findRec(CS_GET_DISK_4(theRec->ar_hash_4));
00856   while ( aRec && !found) {
00857     if (CS_EQ_DISK_4(aRec->ar_repo_id_4, theRec->ar_repo_id_4) && CS_EQ_DISK_8(aRec->ar_offset_8, theRec->ar_offset_8))
00858       found = true;
00859     else
00860       aRec = nextRec();
00861   } 
00862   return_(found);
00863 }
00864 
00865 //------------------------
00866 void MSAliasFile::addRec(MSDiskAliasPtr new_rec)
00867 {
00868   MSABucketLinkedList *list = ba_share->getBucketChain(CS_GET_DISK_4(new_rec->ar_hash_4));
00869   enter_();
00870   lock_(&ba_share->msa_writeLock);
00871 
00872   if (iBucketChain != list) {
00873     iBucketChain = list;
00874     iCurrentBucket = NULL;
00875     iStartBucket = NULL;
00876   } else 
00877     iStartBucket = iCurrentBucket;
00878 
00879   if ((iCurrentBucket && (iCurrentBucket->getSize() < NUM_RECORDS_PER_BUCKET)) || nextBucket(true)) { // Find a bucket with space in it for a record.
00880     uint32_t size = iCurrentBucket->getSize();
00881     uint32_t end_of_records = iCurrentBucket->getEndOfRecords();
00882 
00883     if (size == end_of_records) { // No holes in the recored list
00884       iCurrentRec = end_of_records;     
00885     } else { // Search for the empty record
00886       iCurrentRec = end_of_records -2;      
00887       while (iCurrentRec && !CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4))
00888         iCurrentRec--;
00889         
00890       ASSERT(CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4));
00891     }
00892     
00893     memcpy(&iBucket[iCurrentRec], new_rec, sizeof(MSDiskAliasRec)); // Add the record to the cached bucket.
00894     
00895     iCurrentBucket->recAdded(iFile, iCurrentRec); // update the current bucket header.
00896   } else { // A new bucket must be added to the chain.
00897     MSADiskBucketHeadRec new_bucket = {0};
00898     CSDiskValue8 disk_8_value;
00899     uint64_t new_bucket_offset;
00900     MSABucketInfo *next, *prev;
00901     
00902     next = iBucketChain->getFront();
00903     prev = iBucketChain->getBack();
00904     
00905     // Set the next and prev bucket offsets in the new bucket record.
00906     CS_SET_DISK_8(new_bucket.ab_prev_bucket_8, prev->bi_bucket_offset);
00907     CS_SET_DISK_8(new_bucket.ab_next_bucket_8, next->bi_bucket_offset);
00908     
00909     if (ba_share->msa_empty_buckets.getSize()) { // Get a bucket from the empty bucket list.
00910       MSABucketInfo *empty_bucket = ba_share->msa_empty_buckets.removeFront();
00911       
00912       new_bucket_offset = empty_bucket->bi_bucket_offset;     
00913       empty_bucket->release();
00914       
00915       // Update the index file's empty bucket list 
00916       if (ba_share->msa_empty_buckets.getSize() == 0) 
00917         CS_SET_NULL_DISK_8(disk_8_value);
00918       else
00919         CS_SET_DISK_8(disk_8_value, iBucketChain->getFront()->bi_bucket_offset);
00920       
00921       iFile->write(&disk_8_value, offsetof(MSAliasHeadRec,ah_free_list_8) , 8); 
00922     } else // There are no empty buckets so grow the file.
00923       new_bucket_offset = ba_share->msa_fileSize;
00924       
00925     // Write the new bucket's record header to the file
00926     iFile->write(&new_bucket, new_bucket_offset, sizeof(MSADiskBucketHeadRec)); 
00927     
00928     // Insert the new bucket into the bucket chain on the disk.
00929     CS_SET_DISK_8(disk_8_value, new_bucket_offset);
00930     iFile->write(&disk_8_value, prev->bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_next_bucket_8), 8); 
00931     iFile->write(&disk_8_value, next->bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_prev_bucket_8), 8); 
00932     
00933     // Update the file size in the file header if required
00934     if (ba_share->msa_fileSize == new_bucket_offset) {
00935       ba_share->msa_fileSize += sizeof(MSADiskBucketRec); // Note this is MSADiskBucketRec not MSADiskBucketHeadRec
00936 
00937       CS_SET_DISK_8(disk_8_value, ba_share->msa_fileSize);
00938       iFile->write(&disk_8_value, offsetof(MSAliasHeadRec,ah_file_size_8) , 8); 
00939     }
00940     
00941     // Add the info rec into the bucket chain in RAM.
00942     iCurrentBucket = MSABucketInfo::newMSABucketInfo(new_bucket_offset, 1, 0);
00943     iBucketChain->addFront(iCurrentBucket);
00944     iCurrentRec = 0;
00945   }
00946   
00947   uint64_t offset;
00948   offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
00949 
00950   // Write the new index entry to the index file.
00951   iFile->write(new_rec, offset, sizeof(MSDiskAliasRec)); 
00952   
00953   unlock_(&ba_share->msa_writeLock);
00954   
00955   exit_();  
00956 }
00957 //------------------------
00958 void MSAliasFile::deleteCurrentRec()
00959 {
00960   MSDiskAliasPtr rec = &(iBucket[iCurrentRec]);
00961   uint64_t  offset;
00962   enter_();
00963   
00964   CS_SET_NULL_DISK_4(rec->ar_repo_id_4);
00965   offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
00966   
00967   lock_(&ba_share->msa_writeLock);
00968 
00969   // Update the index file. It is assumed that repo_id is the first 4 bytes of 'rec'.
00970   iFile->write(rec, offset, 4); 
00971   
00972   iCurrentBucket->recRemoved(iFile, iCurrentRec, iBucket);
00973   
00974   unlock_(&ba_share->msa_writeLock);
00975   
00976   exit_();  
00977 }
00978 
00979 //------------------------
00980 void MSAliasFile::updateCurrentRec(MSDiskAliasPtr update_rec)
00981 {
00982   uint64_t  offset;
00983   enter_();
00984   
00985   // ASSERT that the updated rec still belongs to this bucket chain.
00986   ASSERT(ba_share->getBucketChain(CS_GET_DISK_4(update_rec->ar_hash_4)) == iBucketChain);
00987   ASSERT(!CS_IS_NULL_DISK_4(iBucket[iCurrentRec].ar_repo_id_4)); // We should not be updating a deleted record.
00988   
00989   lock_(&ba_share->msa_writeLock);
00990   offset = iCurrentBucket->bi_records_offset + iCurrentRec * sizeof(MSDiskAliasRec);
00991   
00992   // Update the record on disk.
00993   iFile->write(update_rec, offset, sizeof(MSDiskAliasRec));
00994   
00995   // Update the record in memory. 
00996   CS_COPY_DISK_4(iBucket[iCurrentRec].ar_repo_id_4, update_rec->ar_repo_id_4);  
00997   CS_COPY_DISK_8(iBucket[iCurrentRec].ar_offset_8, update_rec->ar_offset_8);  
00998 
00999   unlock_(&ba_share->msa_writeLock);
01000   exit_();  
01001 }
01002 
01003 
01004 //------------------------
01005 MSABucketInfo *MSABucketInfo::newMSABucketInfo(uint64_t offset, uint32_t num, uint32_t last)
01006 {
01007   MSABucketInfo *bucket;
01008   new_(bucket, MSABucketInfo(offset, num, last));
01009   return bucket;
01010 }
01011 //------------------------
01012 void MSABucketInfo::recRemoved(CSFile *iFile, uint32_t idx, MSDiskAliasRec bucket[])
01013 {
01014   MSADiskBucketHeadRec head;
01015   enter_();
01016   
01017   ASSERT(idx < bi_end_of_records);
01018   
01019   bi_num_recs--;
01020   if (!bi_num_recs) {
01021     // It would be nice to remove this bucket from the 
01022     // bucket list and place it on the empty list.
01023     // Before this can be done a locking method would
01024     // be needed to block anyone from reading this
01025     // bucket while it was being moved.
01026     //
01027     // I haven't done this because I have been trying
01028     // to avoid read locks.
01029     bi_end_of_records = 0;
01030   } else if ((bi_end_of_records -1) == idx) {
01031     while (idx && CS_IS_NULL_DISK_4(bucket[idx].ar_repo_id_4))
01032       idx--;
01033       
01034     if ((idx ==0) && CS_IS_NULL_DISK_4(bucket[0].ar_repo_id_4))
01035       bi_end_of_records = 0;
01036     else
01037       bi_end_of_records = idx +1;
01038     
01039     ASSERT(bi_end_of_records >= bi_num_recs);
01040   }
01041   
01042   // Update the index file.
01043   CS_SET_DISK_4(head.ab_num_recs_4, bi_num_recs);
01044   CS_SET_DISK_4(head.ab_eor_rec_4, bi_end_of_records);
01045   iFile->write(&head.ab_num_recs_4, bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_num_recs_4), 8); 
01046   exit_();
01047 }
01048 
01049 //------------------------
01050 void MSABucketInfo::recAdded(CSFile *iFile, uint32_t idx)
01051 {
01052   MSADiskBucketHeadRec head;
01053   enter_();
01054   
01055   ASSERT(bi_num_recs < NUM_RECORDS_PER_BUCKET);
01056   ASSERT(idx < NUM_RECORDS_PER_BUCKET);
01057 
01058   bi_num_recs++;
01059   if (idx == bi_end_of_records)
01060     bi_end_of_records++;
01061     
01062   // Update the index file.
01063   CS_SET_DISK_4(head.ab_num_recs_4, bi_num_recs);
01064   CS_SET_DISK_4(head.ab_eor_rec_4, bi_end_of_records);
01065   iFile->write(&head.ab_num_recs_4, bi_bucket_offset +  offsetof(MSADiskBucketHeadRec,ab_num_recs_4), 8); 
01066   exit_();  
01067 }
01068 
01070 MSAliasFile *MSAliasFileShare::getPoolFile()
01071 {
01072   MSAliasFile *af;
01073   enter_();
01074   
01075   lock_(&msa_poolLock); 
01076   if ((af = msa_pool)) {
01077     msa_pool = af->ba_nextFile;
01078   } else {
01079     new_(af, MSAliasFile(this));
01080     msa_poolFiles.addFront(af);
01081   }
01082   unlock_(&msa_poolLock);
01083   
01084   af->ba_nextFile = NULL;
01085   ASSERT(!af->ba_isFileInUse);
01086   af->ba_isFileInUse = true;
01087   af->retain();
01088   
01089   return_(af);
01090 }
01091 #endif // HAVE_ALIAS_SUPPORT
01092