Drizzled Public API Documentation

repository_ms.cc

00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-05-25
00023  *
00024  * H&G2JCtL
00025  *
00026  * Network interface.
00027  *
00028  */
00029 
00030 #ifdef DRIZZLED
00031 #include <config.h>
00032 #include <drizzled/common.h>
00033 #include <drizzled/session.h>
00034 #endif
00035 
00036 #include "cslib/CSConfig.h"
00037 #include <inttypes.h>
00038 
00039 #include "defs_ms.h"
00040 
00041 #include "cslib/CSGlobal.h"
00042 #include "cslib/CSLog.h"
00043 #include "cslib/CSStrUtil.h"
00044 #include "cslib/CSHTTPStream.h"
00045 #include "cslib/CSStream.h"
00046 
00047 #include "repository_ms.h"
00048 #include "open_table_ms.h"
00049 #include "connection_handler_ms.h"
00050 #include "metadata_ms.h"
00051 #include "parameters_ms.h"
00052 #include "pbmsdaemon_ms.h"
00053 
00054 /*
00055  * ---------------------------------------------------------------
00056  * REPOSITORY FILE
00057  */
00058 
00059 MSRepoFile::MSRepoFile():
00060 CSFile(),
00061 myRepo(NULL),
00062 isFileInUse(false),
00063 nextFile(NULL),
00064 iNextLink(NULL),
00065 iPrevLink(NULL)
00066 {
00067 }
00068 
00069 MSRepoFile::~MSRepoFile()
00070 {
00071   close();
00072 }
00073 
00074 void MSRepoFile::updateGarbage(uint64_t size) 
00075 {
00076   MSRepoHeadRec repo_head;
00077   enter_();
00078 
00079   lock_(myRepo);
00080   myRepo->myGarbageCount += size;
00081   CS_SET_DISK_8(repo_head.rh_garbage_count_8, myRepo->myGarbageCount);
00082   ASSERT(myRepo->myGarbageCount <= myRepo->myRepoFileSize);
00083   write(&repo_head.rh_garbage_count_8, offsetof(MSRepoHeadRec, rh_garbage_count_8), 8);
00084   unlock_(myRepo);
00085   if (!myRepo->myRepoXLock) 
00086     myRepo->signalCompactor();
00087 
00088   exit_();
00089 }
00090 
00091 void MSRepoFile::updateAccess(MSBlobHeadPtr blob, uint64_t rep_offset) 
00092 {
00093   time_t  now = time(NULL);
00094   uint32_t count = CS_GET_DISK_4(blob->rb_access_count_4) +1;
00095 
00096   CS_SET_DISK_4(blob->rb_last_access_4, now);
00097   CS_SET_DISK_4(blob->rb_access_count_4, count);
00098   write(&blob->rb_last_access_4, rep_offset + offsetof(MSBlobHeadRec, rb_last_access_4), 8);
00099   myRepo->myLastAccessTime = now;
00100 }
00101 
00102 uint64_t MSRepoFile::readBlobChunk(PBMSBlobIDPtr blob_id, uint64_t rep_offset, uint64_t blob_offset, uint64_t buffer_size, char *buffer)
00103 {
00104   MSBlobHeadRec   blob_head;
00105   size_t        tfer;
00106   uint16_t        head_size;
00107   uint64_t        blob_size;
00108   uint32_t        ac;
00109   uint64_t        offset, blob_read =0;
00110 
00111   enter_();
00112 
00113   read(&blob_head, rep_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
00114   if (CS_GET_DISK_4(blob_head.rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00115     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00116     
00117   blob_size = CS_GET_DISK_6(blob_head.rb_blob_repo_size_6);
00118   head_size = CS_GET_DISK_2(blob_head.rb_head_size_2);
00119   ac = CS_GET_DISK_4(blob_head.rb_auth_code_4);
00120   if (blob_id->bi_auth_code != ac)
00121     CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
00122 
00123   offset = rep_offset + blob_offset + head_size;
00124   
00125   if (blob_offset > blob_size)
00126     goto done;
00127   
00128   if ((blob_offset + buffer_size) > blob_size)
00129     buffer_size = blob_size - blob_offset;
00130     
00131   while (buffer_size > 0) {
00132     if (buffer_size <= (uint64_t) (SSIZE_MAX))
00133       tfer = (size_t) buffer_size;
00134     else
00135       tfer = SSIZE_MAX;
00136       
00137     read(buffer, offset, tfer, tfer);
00138     offset += (uint64_t) tfer;
00139     buffer += (uint64_t) tfer;
00140     buffer_size -= (uint64_t) tfer;
00141     blob_read += (uint64_t) tfer;
00142   }
00143 
00144   /* Only update the access timestamp when reading the first block: */
00145   if (!blob_offset) 
00146     updateAccess(&blob_head, rep_offset);
00147   
00148 done:
00149   return_(blob_read);
00150 }
00151 
00152 void MSRepoFile::writeBlobChunk(PBMSBlobIDPtr blob_id, uint64_t rep_offset, uint64_t blob_offset, uint64_t data_size, char *data)
00153 {
00154   size_t        tfer;
00155   off64_t       offset;
00156   MSBlobHeadRec   blob_head;
00157   uint16_t        head_size;
00158   uint64_t        blob_size;
00159   uint32_t        ac;
00160 
00161   enter_();
00162 
00163   read(&blob_head, rep_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
00164   if (CS_GET_DISK_4(blob_head.rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00165     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00166 
00167   blob_size = CS_GET_DISK_6(blob_head.rb_blob_repo_size_6);
00168   head_size = CS_GET_DISK_2(blob_head.rb_head_size_2);
00169   ac = CS_GET_DISK_4(blob_head.rb_auth_code_4);
00170   if (blob_id->bi_auth_code != ac)
00171     CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
00172 
00173   if ((blob_offset + data_size) > blob_size) 
00174     CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB write size or offset");
00175 
00176   offset = (uint64_t) head_size + rep_offset + blob_offset;
00177     
00178   while (data_size > 0) {
00179     if (data_size <= (uint64_t) (SSIZE_MAX))
00180       tfer = (size_t) data_size;
00181     else
00182       tfer = SSIZE_MAX;
00183       
00184     write(data, offset, tfer);
00185     data += (uint64_t) tfer;
00186     offset += (uint64_t) tfer;
00187     data_size -= (uint64_t) tfer;
00188   }
00189 
00190   exit_();
00191 }
00192 
00193 void MSRepoFile::sendBlob(MSOpenTable *otab, uint64_t offset, uint64_t req_offset, uint64_t req_size, uint32_t auth_code, bool with_auth_code, bool info_only, CSHTTPOutputStream *stream)
00194 {
00195   MSConnectionHandler *me;
00196   size_t        tfer;
00197   off64_t       start_offset = offset;
00198   MSBlobHeadRec   blob_head;
00199   uint8_t       storage_type;
00200   uint16_t        head_size, meta_size;
00201   uint64_t        blob_data_size, local_blob_size, meta_offset;
00202   uint32_t        ac;
00203   char        num_str[CS_WIDTH_INT_64];
00204   bool        redirect = false;
00205   
00206 
00207   enter_();
00208   me = (MSConnectionHandler *) self;
00209 
00210   read(&blob_head, start_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
00211   local_blob_size = CS_GET_DISK_6(blob_head.rb_blob_repo_size_6); // This is the size of the BLOB data in the repository. Can be 0 if the BLOB is stored some where else.
00212   blob_data_size = CS_GET_DISK_6(blob_head.rb_blob_data_size_6);// This is the actual size of the BLOB.
00213   head_size = CS_GET_DISK_2(blob_head.rb_head_size_2);
00214   meta_size = CS_GET_DISK_2(blob_head.rb_mdata_size_2);
00215   meta_offset = start_offset + CS_GET_DISK_2(blob_head.rb_mdata_offset_2);
00216   ac = CS_GET_DISK_4(blob_head.rb_auth_code_4);
00217   if ((with_auth_code && auth_code != ac) || (CS_GET_DISK_4(blob_head.rd_magic_4) != MS_BLOB_HEADER_MAGIC))
00218     CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
00219 
00220   storage_type = CS_GET_DISK_1(blob_head.rb_storage_type_1);
00221   
00222   if ((!info_only) && BLOB_IN_CLOUD(storage_type)) {
00223     CSString *redirect_url = NULL;
00224     CloudKeyRec key;
00225     getBlobKey(&blob_head, &key);
00226     redirect_url =  otab->getDB()->myBlobCloud->cl_getDataURL(&key);  
00227     push_(redirect_url);
00228     stream->setStatus(301);
00229     stream->addHeader("Location", redirect_url->getCString());
00230     release_(redirect_url);
00231     redirect = true;
00232   } else
00233     stream->setStatus(200);
00234 
00235   if (storage_type == MS_STANDARD_STORAGE) {
00236     char hex_checksum[33];
00237     cs_bin_to_hex(33, hex_checksum, 16, blob_head.rb_blob_checksum_md5d.val);
00238     stream->addHeader(MS_CHECKSUM_TAG, hex_checksum);
00239   }
00240   
00241   snprintf(num_str, CS_WIDTH_INT_64, "%"PRIu64"", blob_data_size);
00242   stream->addHeader(MS_BLOB_SIZE, num_str);
00243     
00244   snprintf(num_str, CS_WIDTH_INT_64, "%"PRIu32"", CS_GET_DISK_4(blob_head.rb_last_access_4));
00245   stream->addHeader(MS_LAST_ACCESS, num_str);
00246 
00247   snprintf(num_str, CS_WIDTH_INT_64, "%"PRIu32"", CS_GET_DISK_4(blob_head.rb_access_count_4));
00248   stream->addHeader(MS_ACCESS_COUNT, num_str);
00249   
00250   snprintf(num_str, CS_WIDTH_INT_64, "%"PRIu32"", CS_GET_DISK_4(blob_head.rb_create_time_4));
00251   stream->addHeader(MS_CREATION_TIME, num_str);
00252   
00253   snprintf(num_str, CS_WIDTH_INT_64, "%"PRIu32"", storage_type);
00254   stream->addHeader(MS_BLOB_TYPE, num_str);
00255   
00256   
00257   // Add the meta data headers.
00258   if (meta_size) {
00259     MetaData metadata;
00260     char *name, *value;
00261     
00262     read(otab->myOTBuffer, meta_offset, meta_size, meta_size);
00263     metadata.use_data(otab->myOTBuffer, meta_size);
00264     while ((name = metadata.findNext(&value))) {
00265       stream->addHeader(name, value);
00266     }
00267     
00268   }
00269     
00270   offset += (uint64_t) head_size + req_offset;
00271   local_blob_size -= req_offset;
00272   if (local_blob_size > req_size)
00273     local_blob_size = req_size;
00274 
00275   stream->setContentLength((redirect || info_only)?0:local_blob_size);
00276   stream->writeHead();
00277   me->replyPending = false;
00278 
00279   if ((!redirect) && !info_only) {
00280     
00281     while (local_blob_size > 0) {
00282       if (local_blob_size <=  MS_OT_BUFFER_SIZE)
00283         tfer = (size_t) local_blob_size;
00284       else
00285         tfer = MS_OT_BUFFER_SIZE;
00286       read(otab->myOTBuffer, offset, tfer, tfer);
00287       stream->write(otab->myOTBuffer, tfer);
00288       offset += (uint64_t) tfer;
00289       local_blob_size -= (uint64_t) tfer;
00290     }
00291   }
00292   stream->flush();
00293 
00294   if (!info_only) {
00295     // Should the time stamp be updated if only the BLOB info was requested?
00296     /* Update the access timestamp: */
00297     updateAccess(&blob_head, start_offset);
00298   }
00299   
00300   exit_();
00301 }
00302 
00303 void MSRepoFile::update_blob_header(MSOpenTable *otab, uint64_t offset, uint64_t blob_size, uint16_t head_size, uint16_t new_head_size)
00304 {
00305   uint16_t  w_offset =  offsetof(MSBlobHeadRec, rb_ref_count_2);
00306   MSRepoPointersRec ptr;
00307   enter_();
00308 
00309   ptr.rp_chars = otab->myOTBuffer;
00310   CS_SET_DISK_4(ptr.rp_head->rb_mod_time_4, time(NULL));
00311   
00312   if (head_size == new_head_size) {
00313     w_offset =  offsetof(MSBlobHeadRec, rb_ref_count_2);
00314     write(otab->myOTBuffer + w_offset, offset + w_offset, head_size - w_offset);
00315   } else {
00316     /* Copy to a new space, free the old: */
00317     off64_t     dst_offset;
00318     CSStringBuffer  *buffer;
00319     uint16_t ref_count, ref_size;
00320     uint32_t tab_id;
00321     uint64_t blob_id;
00322     
00323     myRepo->myRepoDatabase->openWriteRepo(otab);
00324     dst_offset = otab->myWriteRepo->myRepoFileSize;
00325 
00326     /* Write the header. */
00327     otab->myWriteRepoFile->write(otab->myOTBuffer, dst_offset, new_head_size);
00328 
00329     /* We have an engine reference, copy the BLOB over: */
00330     new_(buffer, CSStringBuffer());
00331     push_(buffer);
00332     buffer->setLength(MS_COMPACTOR_BUFFER_SIZE);
00333     CSFile::transfer(RETAIN(otab->myWriteRepoFile), dst_offset + new_head_size, RETAIN(this), offset + head_size, blob_size, buffer->getBuffer(0), MS_COMPACTOR_BUFFER_SIZE);
00334     release_(buffer);
00335 
00336 #ifdef HAVE_ALIAS_SUPPORT
00337     /* Update the BLOB alias if required. */
00338     
00339     if (CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2)) {
00340       uint32_t alias_hash = CS_GET_DISK_4(ptr.rp_head->rb_alias_hash_4);
00341       myRepo->myRepoDatabase->moveBlobAlias(myRepo->myRepoID, offset, alias_hash, myRepo->myRepoID, dst_offset);
00342     }
00343 #endif
00344 
00345     /* Update the references: */
00346     ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
00347     ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
00348     ptr.rp_chars += myRepo->myRepoBlobHeadSize;
00349     
00350     while (ref_count) {
00351       switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00352         case MS_BLOB_FREE_REF:
00353           break;
00354         case MS_BLOB_TABLE_REF:
00355           tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
00356           blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
00357 
00358           if ((otab->haveTable()) && (otab->getDBTable()->myTableID == tab_id))
00359             otab->getDBTable()->updateBlobHandle(otab, blob_id, otab->myWriteRepo->myRepoID, dst_offset, new_head_size);
00360           else {
00361             MSOpenTable *ref_otab;
00362 
00363             ref_otab = MSTableList::getOpenTableByID(myRepo->myRepoDatabase->myDatabaseID, tab_id);
00364             frompool_(ref_otab);
00365             ref_otab->getDBTable()->updateBlobHandle(ref_otab, blob_id, otab->myWriteRepo->myRepoID, dst_offset, new_head_size);
00366             backtopool_(ref_otab);
00367           }
00368           break;
00369         case MS_BLOB_DELETE_REF:
00370           break;
00371         default:
00372           break;
00373       }
00374       ptr.rp_chars += ref_size;
00375       ref_count--; 
00376     }
00377 
00378     otab->myWriteRepo->myRepoFileSize += new_head_size + blob_size;
00379 
00380     /* Free the old head: */
00381     ptr.rp_chars = otab->myOTBuffer;
00382     if (myRepo->lockedForBackup()) {
00383       // This is done to tell the backup process that this BLOB was moved
00384       // after the backup had started and needs to be backed up also. 
00385       // (The moved BLOB doesn't though because the change took place after the backup had begone.)
00386       CS_SET_DISK_1(ptr.rp_head->rb_status_1, MS_BLOB_MOVED);
00387       CS_SET_DISK_4(ptr.rp_head->rb_backup_id_4, myRepo->myRepoDatabase->backupID());
00388     } else
00389       CS_SET_DISK_1(ptr.rp_head->rb_status_1, MS_BLOB_DELETED);
00390     
00391     write(ptr.rp_chars + MS_BLOB_STAT_OFFS, offset + MS_BLOB_STAT_OFFS, head_size - MS_BLOB_STAT_OFFS);
00392       
00393 #ifdef DO_NOT_WIPE_BLOB   
00394     // Why is the BLOB header data being wiped here?
00395     // The data may be needed for backup.
00396     ptr.rp_chars += myRepo->myRepoBlobHeadSize;
00397     memset(ptr.rp_chars, 0, head_size - myRepo->myRepoBlobHeadSize);
00398     
00399     w_offset =  offsetof(MSBlobHeadRec, rb_alias_hash_4);
00400     write(otab->myOTBuffer + w_offset, offset + w_offset, head_size - w_offset);
00401 #endif
00402 
00403     /* Increment the garbage count: */
00404     updateGarbage(head_size + blob_size);
00405     
00406   }
00407   exit_();
00408 }
00409 
00410 void MSRepoFile::referenceBlob(MSOpenTable *otab, uint64_t offset, uint16_t head_size, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id, uint32_t auth_code, uint16_t col_index)
00411 {
00412   CSMutex       *myLock;
00413   MSRepoPointersRec ptr;
00414   uint32_t        size, ref_count;
00415   size_t        ref_size, read_size;
00416   MSRepoBlobRefPtr  free_ref = NULL;
00417   MSRepoBlobRefPtr  free2_ref = NULL;
00418   MSRepoTableRefPtr tab_ref = NULL;
00419   uint16_t        new_head_size;
00420 #ifdef HAVE_ALIAS_SUPPORT
00421   bool        reset_alias_index = false;
00422   char        blob_alias[BLOB_ALIAS_LENGTH];
00423 #endif
00424   uint64_t        blob_size;
00425   
00426   enter_();
00427   /* Lock the BLOB: */
00428   myLock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
00429   lock_(myLock);
00430   /* Read the header: */
00431   if (head_size > MS_OT_BUFFER_SIZE) {
00432     CSException::throwAssertion(CS_CONTEXT, "BLOB header overflow");
00433   }
00434   
00435   read_size = read(otab->myOTBuffer, offset, head_size, 0);
00436   ptr.rp_chars = otab->myOTBuffer;
00437   if (CS_GET_DISK_4(ptr.rp_head->rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00438     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00439   if (read_size < myRepo->myRepoBlobHeadSize)
00440     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB header incomplete");
00441   if ( ! IN_USE_BLOB_STATUS(CS_GET_DISK_1(ptr.rp_head->rb_status_1)))
00442     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
00443   if (CS_GET_DISK_4(ptr.rp_bytes + myRepo->myRepoBlobHeadSize - 4) != auth_code)
00444     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB data does not match reference");
00445   /* Assume that what is here is correct: */
00446   if (head_size != CS_GET_DISK_2(ptr.rp_head->rb_head_size_2)) {
00447     head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00448     if (head_size > MS_OT_BUFFER_SIZE) { // Could happen if the header was creatd with a different version of PBMS.
00449       CSException::throwAssertion(CS_CONTEXT, "BLOB header overflow");
00450     }
00451     read_size = read(otab->myOTBuffer, offset, head_size, myRepo->myRepoBlobHeadSize);
00452   }
00453   head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00454   blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
00455   if (read_size < head_size) {
00456     /* This should not happen, because the file has been recovered,
00457      * which should have already adjusted the head and blob
00458      * size.
00459      * If this happens then the file must have been truncated an the BLOB has been
00460      * lost so we set the blob size to zero.
00461      */
00462     head_size = read_size;
00463     blob_size = 0;
00464     
00465   }
00466   ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
00467   ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
00468   
00469 #ifdef HAVE_ALIAS_SUPPORT
00470   if (CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2)) {
00471     reset_alias_index = true;
00472     strcpy(blob_alias, otab->myOTBuffer + CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2));
00473   }
00474 #endif
00475   
00476   size = head_size - myRepo->myRepoBlobHeadSize;
00477   if (size > ref_size * ref_count)
00478     size = ref_size * ref_count;
00479   CS_SET_DISK_4(ptr.rp_head->rb_last_ref_4, (uint32_t) time(NULL)); // Set the reference time
00480   CS_SET_DISK_1(ptr.rp_head->rb_status_1, MS_BLOB_REFERENCED); 
00481   ptr.rp_chars += myRepo->myRepoBlobHeadSize;
00482   while (size >= ref_size) {
00483     switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00484       case MS_BLOB_FREE_REF:
00485         if (!free_ref)
00486           free_ref = ptr.rp_blob_ref;
00487         else if (!free2_ref)
00488           free2_ref = ptr.rp_blob_ref;
00489         break;
00490       case MS_BLOB_TABLE_REF:
00491 #ifdef HAVE_ALIAS_SUPPORT
00492         reset_alias_index = false; // No need to reset the index if the BLOB is already referenced. (We don't care what table references it.)
00493 #endif
00494         if (CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4) == tab_id &&
00495           CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6) == blob_id)
00496           tab_ref = ptr.rp_tab_ref;
00497         break;
00498       case MS_BLOB_DELETE_REF: {
00499         uint32_t tab_index;
00500 
00501         tab_index = CS_GET_DISK_2(ptr.rp_temp_ref->tp_del_ref_2);
00502         if (tab_index && tab_index < ref_count) {
00503           MSRepoTableRefPtr tr;
00504 
00505           tab_index--;
00506           tr = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->getRepoBlobHeadSize() + tab_index * ref_size);
00507           if (CS_GET_DISK_4(tr->tr_table_id_4) == tab_id &&
00508             CS_GET_DISK_6(tr->tr_blob_id_6) == blob_id) {
00509             CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00510             if (free_ref)
00511               free2_ref = free_ref;
00512             free_ref = ptr.rp_blob_ref;
00513           }
00514         }
00515         else if (tab_index == INVALID_INDEX) {
00516           /* The is a reference from the temporary log only!! */
00517           if (free_ref)
00518             free2_ref = free_ref;
00519           free_ref = ptr.rp_blob_ref;
00520         }
00521         break;
00522       }
00523       default: { // Must be a blob REF, check that the BLOB reference doesn't already exist.
00524         uint32_t tab_index;
00525         tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
00526         
00527         if (tab_index && tab_index < ref_count) {
00528           MSRepoTableRefPtr tr;
00529 
00530           tab_index--;
00531           tr = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->getRepoBlobHeadSize() + tab_index * ref_size);
00532           if (CS_GET_DISK_4(tr->tr_table_id_4) == tab_id &&
00533             CS_GET_DISK_6(tr->tr_blob_id_6) == blob_id) {
00534             if (COMMIT_MASK(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8)) ==  blob_ref_id) {
00535               char message[100];
00536               snprintf(message, 100, "Duplicate BLOB reference: db_id: %"PRIu32", tab_id:%"PRIu32", blob_ref_id: %"PRIu64"\n", myRepo->myRepoDatabase->myDatabaseID, tab_id, blob_ref_id);
00537               /* The reference already exists so there is nothing to do... */
00538               self->myException.log(self, message);
00539               goto done;
00540             }
00541           }
00542         }
00543         break;
00544       }
00545     }
00546     ptr.rp_chars += ref_size;
00547     size -= ref_size;
00548   }
00549 
00550   // A BLOB reference needs to be added and if there is not
00551   // already a table reference then a table reference must be added
00552   // also.
00553   if (!free_ref || (!tab_ref && !free2_ref)) {
00554     size_t new_refs = (tab_ref)?1:2;
00555     ptr.rp_chars = otab->myOTBuffer;
00556     size_t sp = MS_VAR_SPACE(ptr.rp_head);
00557     
00558     if (sp > (new_refs * CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1))) {
00559       sp = MS_MIN_BLOB_HEAD_SIZE;
00560     }
00561     
00562     if (MS_CAN_ADD_REFS(ptr.rp_head, new_refs)) {
00563       new_head_size = head_size;
00564 
00565     } else { // The header must be grown
00566       size_t new_size, max_refs;
00567 
00568       if (ref_count < 2)
00569         max_refs = 4;
00570       else if (ref_count > 32)
00571         max_refs = ref_count + 32;
00572       else
00573         max_refs = 2 * ref_count;
00574         
00575       if (max_refs > (MS_OT_BUFFER_SIZE/ref_size))
00576         max_refs = (MS_OT_BUFFER_SIZE/ref_size);
00577         
00578       if (max_refs < (ref_count + new_refs))
00579         CSException::throwAssertion(CS_CONTEXT, "BLOB reference header overflow");
00580 
00581       new_size = head_size + ref_size * max_refs;
00582 
00583       //Shift the metadata in the header
00584       if (CS_GET_DISK_2(ptr.rp_head->rb_mdata_size_2)) {
00585         uint16_t  mdata_size, mdata_offset, alias_offset, shift;
00586         
00587         shift = new_size - head_size;
00588         mdata_size = CS_GET_DISK_2(ptr.rp_head->rb_mdata_size_2);
00589         mdata_offset = CS_GET_DISK_2(ptr.rp_head->rb_mdata_offset_2);
00590         alias_offset = CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2);
00591         
00592         memmove(ptr.rp_chars + mdata_offset + shift, ptr.rp_chars + mdata_offset, shift);
00593         memset(ptr.rp_chars + mdata_offset, 0, shift);
00594         mdata_offset += shift;
00595         alias_offset += shift;
00596         
00597         CS_SET_DISK_2(ptr.rp_head->rb_mdata_offset_2, mdata_offset);
00598         CS_SET_DISK_2(ptr.rp_head->rb_alias_offset_2, alias_offset);
00599         
00600       } else
00601         memset(ptr.rp_chars + head_size, 0, new_size - head_size);
00602       
00603       new_head_size = new_size;
00604     }
00605     CS_SET_DISK_2(ptr.rp_head->rb_head_size_2, new_head_size);
00606     CS_SET_DISK_2(ptr.rp_head->rb_ref_count_2, ref_count + new_refs);
00607     ptr.rp_chars += myRepo->myRepoBlobHeadSize + ref_count * ref_size;
00608     
00609     if (!free_ref) {
00610       free_ref = ptr.rp_blob_ref;
00611       memset(free_ref, 0, ref_size);
00612       ptr.rp_chars += ref_size;
00613     }
00614 
00615     if (!tab_ref) {
00616       free2_ref = ptr.rp_blob_ref;
00617       memset(free2_ref, 0, ref_size); 
00618     }
00619     
00620     ref_count += new_refs;
00621   }
00622   else
00623     new_head_size = head_size;
00624 
00625   if (!tab_ref) {
00626     tab_ref = (MSRepoTableRefPtr) free2_ref;
00627 
00628     CS_SET_DISK_2(tab_ref->rr_type_2, MS_BLOB_TABLE_REF);
00629     CS_SET_DISK_4(tab_ref->tr_table_id_4, tab_id);
00630     CS_SET_DISK_6(tab_ref->tr_blob_id_6, blob_id);
00631   }
00632 
00633   size_t tab_idx;
00634 
00635   tab_idx = (((char *) tab_ref - otab->myOTBuffer) - myRepo->myRepoBlobHeadSize) / ref_size;
00636 
00637   CS_SET_DISK_2(free_ref->er_table_2, tab_idx+1);
00638   CS_SET_DISK_2(free_ref->er_col_index_2, col_index);
00639   CS_SET_DISK_8(free_ref->er_blob_ref_id_8, UNCOMMITTED(blob_ref_id));
00640 
00641   update_blob_header(otab, offset, blob_size, head_size, new_head_size);
00642 #ifdef HAVE_ALIAS_SUPPORT
00643   if (reset_alias_index) 
00644     myRepo->myRepoDatabase->registerBlobAlias(myRepo->myRepoID, offset, blob_alias);
00645 #endif
00646 
00647 done:
00648   
00649   unlock_(myLock);
00650   exit_();
00651 }
00652 
00653 void MSRepoFile::setBlobMetaData(MSOpenTable *otab, uint64_t offset, const char *meta_data, uint16_t meta_data_len, bool reset_alias, const char  *alias)
00654 {
00655   CSMutex       *mylock;
00656   MSRepoPointersRec ptr;
00657   size_t        read_size;
00658   uint16_t        new_head_size;
00659   uint64_t        blob_size;
00660   uint16_t        head_size, mdata_size, mdata_offset, alias_offset = 0;
00661   MSBlobHeadRec   blob;
00662   
00663   enter_();
00664   /* Lock the BLOB: */
00665   mylock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
00666   lock_(mylock);
00667 
00668   /* Read the header: */
00669   if (read(&blob, offset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
00670     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB header incomplete");
00671   }
00672 
00673   head_size = CS_GET_DISK_2(blob.rb_head_size_2);
00674 
00675   if (head_size > MS_OT_BUFFER_SIZE) {
00676     CSException::throwAssertion(CS_CONTEXT, "BLOB header overflow");
00677   }
00678   
00679   read_size = read(otab->myOTBuffer, offset, head_size, 0);
00680   ptr.rp_chars = otab->myOTBuffer;
00681   if (CS_GET_DISK_4(ptr.rp_head->rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00682     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00683   if (read_size < myRepo->myRepoBlobHeadSize)
00684     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB header incomplete");
00685   if (! IN_USE_BLOB_STATUS(CS_GET_DISK_1(ptr.rp_head->rb_status_1)))
00686     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
00687 
00688 
00689   blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
00690   if (read_size < head_size) {
00691     /* This should not happen, because the file has been recovered,
00692      * which should have already adjusted the head and blob
00693      * size.
00694      * If this happens then the file must have been truncated an the BLOB has been
00695      * lost so we set the blob size to zero.
00696      */
00697     head_size = read_size;
00698     blob_size = 0;    
00699   }
00700   mdata_size = CS_GET_DISK_2(ptr.rp_head->rb_mdata_size_2);
00701 
00702   if ((meta_data_len < mdata_size) || MS_CAN_ADD_MDATA(ptr.rp_head, meta_data_len - mdata_size)) 
00703     new_head_size = head_size;
00704   else { // The header must be grown
00705 
00706     new_head_size = head_size + meta_data_len - mdata_size;
00707     if (new_head_size > MS_OT_BUFFER_SIZE)
00708       CSException::throwAssertion(CS_CONTEXT, "BLOB reference header overflow");
00709 
00710     memset(ptr.rp_chars + head_size, 0, new_head_size - head_size);
00711     CS_SET_DISK_2(ptr.rp_head->rb_head_size_2, new_head_size);
00712     
00713   } 
00714         
00715   // Meta data is placed at the end of the header.
00716   if (meta_data_len)
00717     mdata_offset = new_head_size - meta_data_len;
00718   else
00719     mdata_offset = 0;
00720   mdata_size  = meta_data_len;
00721     
00722   
00723   CS_SET_DISK_2(ptr.rp_head->rb_mdata_size_2, mdata_size);
00724   CS_SET_DISK_2(ptr.rp_head->rb_mdata_offset_2, mdata_offset);  
00725 #ifdef HAVE_ALIAS_SUPPORT
00726   uint32_t alias_hash = INVALID_ALIAS_HASH;
00727   if (alias) {
00728     alias_hash = CS_GET_DISK_4(ptr.rp_head->rb_alias_hash_4);
00729     alias_offset = CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2);
00730     if (reset_alias) {
00731       if (alias_offset)
00732         alias_hash = myRepo->myRepoDatabase->updateBlobAlias(myRepo->myRepoID, offset, alias_hash, alias);
00733       else {
00734         alias_hash = myRepo->myRepoDatabase->registerBlobAlias(myRepo->myRepoID, offset, alias);
00735       }
00736     }
00737     alias_offset = mdata_offset + (alias - meta_data);
00738     
00739   } else if (reset_alias && CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2)) {
00740     alias_offset = CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2);
00741     myRepo->myRepoDatabase->deleteBlobAlias(myRepo->myRepoID, offset, CS_GET_DISK_4(ptr.rp_head->rb_alias_hash_4));
00742     alias_offset = 0;
00743   }
00744 #else
00745   uint32_t alias_hash = ((uint32_t)-1);
00746   if (alias || reset_alias) {
00747     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "No BLOB alias support.");
00748   }
00749 #endif
00750 
00751   CS_SET_DISK_2(ptr.rp_head->rb_alias_offset_2, alias_offset);
00752   CS_SET_DISK_4(ptr.rp_head->rb_alias_hash_4, alias_hash);
00753   
00754   memcpy(ptr.rp_chars + mdata_offset, meta_data, meta_data_len);
00755     
00756   update_blob_header(otab, offset, blob_size, head_size, new_head_size);
00757     
00758   unlock_(mylock);
00759   exit_();
00760 
00761 }
00762 
00763 
00764 void MSRepoFile::releaseBlob(MSOpenTable *otab, uint64_t offset, uint16_t head_size, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id, uint32_t auth_code)
00765 {
00766   CSMutex       *mylock;
00767   MSRepoPointersRec ptr;
00768   uint32_t      table_ref_count = 0;
00769   uint32_t      size;
00770   size_t        ref_size, ref_count, read_size;
00771   MSRepoTempRefPtr  temp_ref = NULL;
00772   uint16_t      tab_index = 0;
00773   MSRepoTableRefPtr tab_ref;
00774   uint16_t      alias_offset;
00775   uint32_t      alias_hash;
00776 
00777   enter_();
00778   /* Lock the BLOB: */
00779   mylock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
00780   lock_(mylock);
00781   /* Read the header: */
00782   ASSERT(head_size <= MS_OT_BUFFER_SIZE);
00783   read_size = read(otab->myOTBuffer, offset, head_size, 0);
00784   ptr.rp_chars = otab->myOTBuffer;
00785   if (CS_GET_DISK_4(ptr.rp_head->rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00786     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00787   if (read_size < myRepo->myRepoBlobHeadSize) {
00788     removeBlob(otab, tab_id, blob_id, offset, auth_code);
00789     goto exit;
00790   }
00791   if ((! IN_USE_BLOB_STATUS(CS_GET_DISK_1(ptr.rp_head->rb_status_1))) ||
00792     CS_GET_DISK_4(ptr.rp_bytes + myRepo->myRepoBlobHeadSize - 4) != auth_code) {
00793     removeBlob(otab, tab_id, blob_id, offset, auth_code);
00794     goto exit;
00795   }
00796   
00797   /* Assume that what is here is correct: */
00798   if (head_size != CS_GET_DISK_2(ptr.rp_head->rb_head_size_2)) {
00799     head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00800     read_size = read(otab->myOTBuffer, offset, head_size, myRepo->myRepoBlobHeadSize);
00801   }
00802   head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00803   if (read_size < head_size) {
00804     /* This should not happen, because the file has been recovered,
00805      * which should have already adjusted the head and blob
00806      * size.
00807      * If this happens then the file must have been truncated an the BLOB has been
00808      * lost so we set the blob size to zero.
00809      */
00810     head_size = read_size;
00811   }
00812   ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
00813   ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
00814 
00815   alias_offset = CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2);
00816   alias_hash = CS_GET_DISK_4(ptr.rp_head->rb_alias_hash_4);
00817 
00818   size = head_size - myRepo->myRepoBlobHeadSize;
00819   if (size > ref_size * ref_count)
00820     size = ref_size * ref_count;
00821   ptr.rp_chars += myRepo->myRepoBlobHeadSize;
00822   while (size >= ref_size) {
00823     switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00824       case MS_BLOB_FREE_REF:
00825       case MS_BLOB_TABLE_REF:
00826         break;
00827       case MS_BLOB_DELETE_REF: {
00828         uint32_t tabi;
00829 
00830         tabi = CS_GET_DISK_2(ptr.rp_temp_ref->tp_del_ref_2);
00831         if (tabi && tabi < ref_count) {
00832           tabi--;
00833           tab_ref = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->myRepoBlobHeadSize + tabi * ref_size);
00834           if (CS_GET_DISK_4(tab_ref->tr_table_id_4) == tab_id &&
00835             CS_GET_DISK_6(tab_ref->tr_blob_id_6) == blob_id) {
00836             /* This is an old free, take it out. */
00837             // Barry: What happens to the record in the temp log associated with this ref
00838             // that is waiting to free the BLOB?
00839             // Answer: It will find that there is MS_BLOB_DELETE_REF record with the BLOB
00840             // or if there is one it will be for a different free in a different temp log
00841             // or with a different temp log offset.
00842             CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00843           }
00844         }
00845         break;
00846       }
00847       default:  // Must be a blob REF
00848         tab_ref = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
00849         if (CS_GET_DISK_4(tab_ref->tr_table_id_4) == tab_id &&
00850           CS_GET_DISK_6(tab_ref->tr_blob_id_6) == blob_id) {
00851           if (COMMIT_MASK(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8)) ==  blob_ref_id) {
00852             /* Found the reference, remove it... */
00853             tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1;
00854             temp_ref = ptr.rp_temp_ref;
00855             //temp_ref = (MSRepoTempRefPtr) tab_ref; // Set temp ref to the table ref so that it will be removed if there are no more references to it.
00856             CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);           
00857           }
00858           else
00859             table_ref_count++;
00860         }
00861         break;
00862     }
00863     ptr.rp_chars += ref_size;
00864     size -= ref_size;
00865   }
00866 
00867   // If the refernce was found and there are no 
00868   // table references then the BLOB can be scheduled for deletion.
00869   if ((!table_ref_count) && temp_ref) {
00870     uint32_t  log_id;
00871     uint32_t log_offset;
00872     uint32_t temp_time;
00873 #ifdef HAVE_ALIAS_SUPPORT
00874     MSDiskAliasRec aliasDiskRec;
00875     MSDiskAliasPtr aliasDiskPtr = NULL;
00876     
00877     if (alias_offset) {
00878       CS_SET_DISK_4(aliasDiskRec.ar_repo_id_4, myRepo->myRepoID); 
00879       CS_SET_DISK_8(aliasDiskRec.ar_offset_8, offset);  
00880       CS_SET_DISK_4(aliasDiskRec.ar_hash_4, alias_hash);
00881       aliasDiskPtr = &aliasDiskRec;
00882     }
00883     
00884     myRepo->myRepoDatabase->queueForDeletion(otab, MS_TL_BLOB_REF, tab_id, blob_id, auth_code, &log_id, &log_offset, &temp_time, aliasDiskPtr);
00885 #else
00886     myRepo->myRepoDatabase->queueForDeletion(otab, MS_TL_BLOB_REF, tab_id, blob_id, auth_code, &log_id, &log_offset, &temp_time);
00887 #endif
00888     myRepo->myLastTempTime = temp_time;
00889     CS_SET_DISK_2(temp_ref->rr_type_2, MS_BLOB_DELETE_REF);
00890     CS_SET_DISK_2(temp_ref->tp_del_ref_2, tab_index+1);
00891     CS_SET_DISK_4(temp_ref->tp_log_id_4, log_id);
00892     CS_SET_DISK_4(temp_ref->tp_offset_4, log_offset);
00893     
00894     CS_SET_DISK_1(ptr.rp_head->rb_status_1, MS_BLOB_ALLOCATED); // The BLOB is allocated but no longer referenced
00895   }
00896   if (temp_ref) {
00897     /* The reason I do not write the header of the header, is because
00898      * I want to handle the rb_last_access_4 being set at the
00899      * same time!
00900      */
00901     write(otab->myOTBuffer + MS_BLOB_STAT_OFFS, offset + MS_BLOB_STAT_OFFS, head_size - MS_BLOB_STAT_OFFS);
00902   } else if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == false) {
00903     char message[100];
00904     snprintf(message, 100, "BLOB reference not found: db_id: %"PRIu32", tab_id:%"PRIu32", blob_ref_id: %"PRIu64"\n", myRepo->myRepoDatabase->myDatabaseID, tab_id, blob_ref_id);
00905     /* The reference already exists so there is nothing to do... */
00906     self->myException.log(self, message);
00907   }
00908 
00909   exit:
00910   unlock_(mylock);
00911   exit_();
00912 }
00913 
00914 void MSRepoFile::commitBlob(MSOpenTable *otab, uint64_t offset, uint16_t head_size, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id, uint32_t auth_code)
00915 {
00916   CSMutex       *mylock;
00917   MSRepoPointersRec ptr;
00918   uint32_t        size;
00919   size_t        ref_size, ref_count, read_size;
00920   MSRepoTableRefPtr tab_ref;
00921 
00922   enter_();
00923   /* Lock the BLOB: */
00924   mylock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
00925   lock_(mylock);
00926   /* Read the header: */
00927   ASSERT(head_size <= MS_OT_BUFFER_SIZE);
00928   read_size = read(otab->myOTBuffer, offset, head_size, 0);
00929   ptr.rp_chars = otab->myOTBuffer;
00930   if (CS_GET_DISK_4(ptr.rp_head->rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00931     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00932 
00933 
00934   if (read_size < myRepo->myRepoBlobHeadSize)
00935     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB header incomplete");
00936   if ( ! IN_USE_BLOB_STATUS(CS_GET_DISK_1(ptr.rp_head->rb_status_1)))
00937     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
00938   if (auth_code && CS_GET_DISK_4(ptr.rp_bytes + myRepo->myRepoBlobHeadSize - 4) != auth_code)
00939     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB data does not match reference");
00940 
00941   
00942   /* Assume that what is here is correct: */
00943   if (head_size != CS_GET_DISK_2(ptr.rp_head->rb_head_size_2)) {
00944     head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00945     read_size = read(otab->myOTBuffer, offset, head_size, myRepo->myRepoBlobHeadSize);
00946   }
00947   
00948   head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00949   if (read_size < head_size) {
00950     /* This should not happen, because the file has been recovered,
00951      * which should have already adjusted the head and blob
00952      * size.
00953      * If this happens then the file must have been truncated an the BLOB has been
00954      * lost so we set the blob size to zero.
00955      */
00956     head_size = read_size;
00957   }
00958   ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
00959   ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
00960 
00961   size = head_size - myRepo->myRepoBlobHeadSize;
00962   if (size > ref_size * ref_count)
00963     size = ref_size * ref_count;
00964   ptr.rp_chars += myRepo->myRepoBlobHeadSize;
00965   while (size >= ref_size) {
00966     switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00967       case MS_BLOB_FREE_REF:
00968       case MS_BLOB_TABLE_REF:
00969         break;
00970       case MS_BLOB_DELETE_REF: {
00971         break;
00972       }
00973       default:  // Must be a blob REF
00974         tab_ref = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
00975         if (CS_GET_DISK_4(tab_ref->tr_table_id_4) == tab_id &&
00976           CS_GET_DISK_6(tab_ref->tr_blob_id_6) == blob_id) {
00977           uint64_t ref_id = CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8);
00978           if (COMMIT_MASK(ref_id) ==  blob_ref_id) {
00979             /* Found the reference, mark it as committed... */
00980             CS_SET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8, blob_ref_id);
00981             offset +=   (ptr.rp_chars - otab->myOTBuffer) + offsetof(MSRepoBlobRefRec, er_blob_ref_id_8);
00982             write(&(ptr.rp_blob_ref->er_blob_ref_id_8), offset, 8);         
00983             goto exit;
00984           }
00985         }
00986         break;
00987     }
00988     ptr.rp_chars += ref_size;
00989     size -= ref_size;
00990   }
00991 
00992   if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == false) {
00993     char message[100];
00994     snprintf(message, 100, "BLOB reference not found: db_id: %"PRIu32", tab_id:%"PRIu32", blob_ref_id: %"PRIu64"\n", myRepo->myRepoDatabase->myDatabaseID, tab_id, blob_ref_id);
00995     self->myException.log(self, message);
00996   }
00997   
00998   exit:
00999   unlock_(mylock);
01000   exit_();
01001 }
01002 
01003 void MSRepoFile::realFreeBlob(MSOpenTable *otab, char *buffer, uint32_t auth_code, uint64_t offset, uint16_t head_size, uint64_t blob_size, size_t ref_size)
01004 {
01005   uint32_t        tab_id;
01006   uint64_t        blob_id;
01007   size_t        size;
01008   MSRepoPointersRec ptr;
01009   enter_();
01010   
01011   ptr.rp_chars = buffer;
01012   
01013   if (BLOB_IN_CLOUD(CS_GET_DISK_1(ptr.rp_head->rb_storage_type_1))) {
01014     CloudKeyRec key;
01015     getBlobKey(ptr.rp_head, &key);
01016     if (!myRepo->myRepoDatabase->myBlobCloud)
01017       CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Deleting cloud BLOB without cloud.");
01018       
01019     myRepo->myRepoDatabase->myBlobCloud->cl_deleteData(&key); 
01020   }
01021     
01022 #ifdef HAVE_ALIAS_SUPPORT
01023   uint32_t        alias_hash;
01024   alias_hash = CS_GET_DISK_4(ptr.rp_head->rb_alias_hash_4);
01025   if (alias_hash != INVALID_ALIAS_HASH)
01026     myRepo->myRepoDatabase->deleteBlobAlias(myRepo->myRepoID, offset, alias_hash);
01027 #endif
01028 
01029   // Assuming the BLOB is still locked:
01030   CS_SET_DISK_1(ptr.rp_head->rb_status_1, MS_BLOB_DELETED);
01031   write(ptr.rp_chars + MS_BLOB_STAT_OFFS, offset + MS_BLOB_STAT_OFFS, head_size - MS_BLOB_STAT_OFFS);
01032 
01033   /* Update garbage count: */
01034   updateGarbage(head_size + blob_size);
01035 
01036   /* Remove all table references (should not be any)! */
01037   size = head_size - myRepo->myRepoBlobHeadSize;
01038   ptr.rp_chars += myRepo->myRepoBlobHeadSize;
01039   while (size >= ref_size) {
01040     if (CS_GET_DISK_2(ptr.rp_ref->rr_type_2) == MS_BLOB_TABLE_REF) {
01041       tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
01042       blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);        
01043       removeBlob(otab, tab_id, blob_id, offset, auth_code);
01044     }
01045     ptr.rp_chars += ref_size;
01046     size -= ref_size;
01047   }
01048   exit_();
01049 }
01050 
01051 /* This function will free the BLOB reference, if the record is invalid. */
01052 void MSRepoFile::freeTableReference(MSOpenTable *otab, uint64_t offset, uint16_t head_size, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code)
01053 {
01054   CSMutex       *mylock;
01055   MSRepoPointersRec ptr;
01056   uint32_t        blob_ref_count = 0;
01057   uint32_t        table_ref_count = 0;
01058   bool        modified = false;
01059   uint32_t        size;
01060   size_t        ref_size, ref_count, read_size;
01061   MSRepoTableRefPtr tab_ref = NULL;
01062   uint64_t        blob_size;
01063 
01064   enter_();
01065   /* Lock the BLOB: */
01066   mylock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
01067   lock_(mylock);
01068   /* Read the header: */
01069   ASSERT(head_size <= MS_OT_BUFFER_SIZE);
01070   read_size = read(otab->myOTBuffer, offset, head_size, 0);
01071   ptr.rp_chars = otab->myOTBuffer;
01072   if (CS_GET_DISK_4(ptr.rp_head->rd_magic_4) != MS_BLOB_HEADER_MAGIC)
01073     CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
01074   if (read_size < myRepo->myRepoBlobHeadSize) {
01075     removeBlob(otab, tab_id, blob_id, offset, auth_code);
01076     goto exit;
01077   }
01078   if ((! IN_USE_BLOB_STATUS(CS_GET_DISK_1(ptr.rp_head->rb_status_1))) ||
01079     CS_GET_DISK_4(ptr.rp_bytes + myRepo->myRepoBlobHeadSize - 4) != auth_code) {
01080     removeBlob(otab, tab_id, blob_id, offset, auth_code);
01081     goto exit;
01082   }
01083   
01084   /* Assume that what is here is correct: */
01085   if (head_size != CS_GET_DISK_2(ptr.rp_head->rb_head_size_2)) {
01086     head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
01087     read_size = read(otab->myOTBuffer, offset, head_size, myRepo->myRepoBlobHeadSize);
01088   }
01089   head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
01090   blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
01091   if (read_size < head_size) {
01092     /* This should not happen, because the file has been recovered,
01093      * which should have already adjusted the head and blob
01094      * size.
01095      * If this happens then the file must have been truncated an the BLOB has been
01096      * lost so we set the blob size to zero.
01097      */
01098     head_size = read_size;
01099     blob_size = 0; 
01100     
01101   }
01102   ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
01103   ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
01104   size = head_size - myRepo->myRepoBlobHeadSize;
01105   if (size > ref_size * ref_count)
01106     size = ref_size * ref_count;
01107   ptr.rp_chars += myRepo->myRepoBlobHeadSize;
01108   while (size >= ref_size) {
01109     switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
01110       case MS_BLOB_FREE_REF:
01111         break;
01112       case MS_BLOB_TABLE_REF:
01113         if (CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4) == tab_id &&
01114           CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6) == blob_id)
01115           tab_ref = ptr.rp_tab_ref;
01116         break;
01117       case MS_BLOB_DELETE_REF:
01118       break;
01119       default:
01120         MSRepoTableRefPtr tr;
01121 
01122         tr = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
01123         if (CS_GET_DISK_2(tr->rr_type_2) == MS_BLOB_TABLE_REF) {
01124           /* I am deleting all references of a table. So I am here to
01125            * also delete the blob references that refer to the
01126            * table reference!!!
01127            */
01128           if (CS_GET_DISK_4(tr->tr_table_id_4) == tab_id && CS_GET_DISK_6(tr->tr_blob_id_6) == blob_id) {
01129             /* Free the blob reference: */
01130             CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
01131             modified = true;
01132           }
01133           else
01134             blob_ref_count++;
01135         
01136         }
01137         break;
01138     }
01139     ptr.rp_chars += ref_size;
01140     size -= ref_size;
01141   }
01142 
01143   if (!table_ref_count && tab_ref) {
01144     CS_SET_DISK_2(tab_ref->rr_type_2, MS_BLOB_FREE_REF);
01145     modified = true;
01146   }
01147   
01148   
01149   if (!blob_ref_count) {
01150     realFreeBlob(otab, otab->myOTBuffer, auth_code, offset, head_size, blob_size, ref_size);
01151   } else if (modified)
01152     /* The reason I do not write the header of the header, is because
01153      * I want to handle the rb_last_access_4 being set at the
01154      * same time!
01155      */
01156     write(otab->myOTBuffer + MS_BLOB_STAT_OFFS, offset + MS_BLOB_STAT_OFFS, head_size - MS_BLOB_STAT_OFFS);
01157 
01158   unlock_(mylock);
01159 
01160   if (!table_ref_count || !tab_ref)
01161     /* Free the table reference, if there are no more
01162      * blob references, reference the table reference,
01163      * or if the table reference was not found in the
01164      * BLOB at all!
01165      */
01166     removeBlob(otab, tab_id, blob_id, offset, auth_code);
01167 
01168   exit_();
01169 
01170   exit:
01171   unlock_(mylock);
01172   exit_();
01173 }
01174 
01175 void MSRepoFile::checkBlob(CSStringBuffer *buffer, uint64_t offset, uint32_t auth_code, uint32_t temp_log_id, uint32_t temp_log_offset)
01176 {
01177   CSMutex       *mylock;
01178   MSBlobHeadRec   blob;
01179   MSRepoPointersRec ptr;
01180   uint32_t        blob_ref_count = 0;
01181   bool        modified = false;
01182   uint32_t        size;
01183   size_t        ref_size, ref_count, read_size;
01184   uint8_t       status;
01185   uint16_t        head_size;
01186   uint64_t        blob_size;
01187   MSRepoTempRefPtr  my_ref = NULL;
01188   uint16_t        ref_type = MS_BLOB_FREE_REF;
01189   enter_();
01190   
01191   /* Lock the BLOB: */
01192   mylock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
01193   lock_(mylock);
01194 
01195   /* Read the head of the header: */
01196   if (read(&blob, offset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) 
01197     goto exit;
01198 
01199   // Because the temp log will be replayed from the start when the server
01200   // is restarted it is likely that it will have references to BLOBs that
01201   // no longer exist. So it is not an error if the BLOB ref doesn't point to
01202   // a valid BLOB. 
01203   //
01204   // At some point this should probably be rethought because you cannot
01205   // tell the diference between a bad ref because of old data and a bad 
01206   // ref because of a BUG.
01207   if (CS_GET_DISK_4(blob.rd_magic_4) != MS_BLOB_HEADER_MAGIC) 
01208     goto exit;
01209   
01210   head_size = CS_GET_DISK_2(blob.rb_head_size_2);
01211   blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
01212   ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
01213   ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
01214   status = CS_GET_DISK_1(blob.rb_status_1);
01215   if (! IN_USE_BLOB_STATUS(status))
01216     goto exit;
01217 
01218   /* Read the entire header: */
01219   buffer->setLength(head_size);
01220   ptr.rp_chars = buffer->getBuffer(0);
01221   read_size = read(ptr.rp_chars, offset, head_size, 0);
01222   if (read_size < myRepo->myRepoBlobHeadSize)
01223     goto exit;
01224   if (CS_GET_DISK_4(ptr.rp_bytes + myRepo->myRepoBlobHeadSize - 4) != auth_code)
01225     goto exit;
01226   if (read_size < head_size) {
01227     /* This should not happen, because the file has been recovered,
01228      * which should have already adjusted the head and blob
01229      * size.
01230      * If this happens then the file must have been truncated an the BLOB has been
01231      * lost so we set the blob size to zero.
01232      */
01233     head_size = read_size;
01234     blob_size = 0; 
01235   }
01236   size = head_size - myRepo->myRepoBlobHeadSize;
01237   if (size > ref_size * ref_count)
01238     size = ref_size * ref_count;
01239 
01240   
01241   /* Search through all references: */
01242   ptr.rp_chars += myRepo->myRepoBlobHeadSize;
01243   while (size >= ref_size) {
01244     switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
01245       case MS_BLOB_FREE_REF:
01246         break;
01247       case MS_BLOB_TABLE_REF:
01248         break;
01249       case MS_BLOB_DELETE_REF:
01250         if (CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4) == temp_log_id &&
01251           CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4) == temp_log_offset) {
01252           ref_type = CS_GET_DISK_2(ptr.rp_ref->rr_type_2);
01253           my_ref = ptr.rp_temp_ref;
01254           CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
01255           modified = true;
01256         }
01257         break;
01258       default:
01259         MSRepoTableRefPtr tr;
01260         uint32_t        tabi;
01261 
01262         tabi = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
01263         if (tabi < ref_count) {
01264           tr = (MSRepoTableRefPtr) (buffer->getBuffer(0) + myRepo->myRepoBlobHeadSize + (tabi-1) * ref_size);
01265           if (CS_GET_DISK_2(tr->rr_type_2) == MS_BLOB_TABLE_REF)
01266             blob_ref_count++;
01267         }
01268         break;
01269     }
01270     ptr.rp_chars += ref_size;
01271     size -= ref_size;
01272   }
01273 
01274   if ((ref_type == (uint16_t)MS_BLOB_DELETE_REF) && !blob_ref_count) {
01275     realFreeBlob(NULL, buffer->getBuffer(0), auth_code, offset, head_size, blob_size, ref_size);
01276   }
01277   
01278   exit:
01279   unlock_(mylock);
01280   exit_();
01281 }
01282 
01283 void MSRepoFile::returnToPool()
01284 {
01285   myRepo->myRepoDatabase->returnRepoFileToPool(this);
01286 }
01287 
01288 void MSRepoFile::removeBlob(MSOpenTable *otab, uint32_t tab_id, uint64_t blob_id, uint64_t offset, uint32_t auth_code)
01289 {
01290   enter_();
01291   if (otab && otab->getDBTable()->myTableID == tab_id)
01292     otab->getDBTable()->freeBlobHandle(otab, blob_id, myRepo->myRepoID, offset, auth_code);
01293   else {
01294     MSOpenTable *tmp_otab;
01295 
01296     if ((tmp_otab = MSTableList::getOpenTableByID(myRepo->myRepoDatabase->myDatabaseID, tab_id))) {
01297       frompool_(tmp_otab);
01298       tmp_otab->getDBTable()->freeBlobHandle(tmp_otab, blob_id, myRepo->myRepoID, offset, auth_code);
01299       backtopool_(tmp_otab);
01300     }
01301   }
01302   exit_();
01303 }
01304 
01305 MSRepoFile *MSRepoFile::newRepoFile(MSRepository *repo, CSPath *path)
01306 {
01307   MSRepoFile *f;
01308   
01309   if (!(f = new MSRepoFile())) {
01310     path->release();
01311     CSException::throwOSError(CS_CONTEXT, ENOMEM);
01312   }
01313   f->myRepo = repo;
01314   f->myFilePath = path;
01315   return f;
01316 }
01317 
01318 /*
01319  * ---------------------------------------------------------------
01320  * REPOSITORY
01321  */
01322 
01323 MSRepository::MSRepository(uint32_t id, MSDatabase *db, off64_t file_size):
01324 CSSharedRefObject(),
01325 myRepoID(id),
01326 myRepoFileSize(file_size),
01327 myRepoLockState(REPO_UNLOCKED),
01328 isRemovingFP(false),
01329 myRepoDatabase(db),
01330 myGarbageCount(0),
01331 myRepoHeadSize(0),
01332 myRepoDefRefSize(0),
01333 myRepoBlobHeadSize(0),
01334 myRecoveryOffset(0),
01335 myLastTempTime(0),
01336 myLastAccessTime(0),
01337 myLastCreateTime(0),
01338 myLastRefTime(0),
01339 mustBeDeleted(false),
01340 myRepoXLock(false),
01341 iFilePool(NULL)
01342 {
01343 }
01344 
01345 MSRepository::~MSRepository()
01346 {
01347   CSPath *path = NULL;
01348 
01349   enter_();
01350   if (mustBeDeleted) {
01351     path = getRepoFilePath();
01352     push_(path);
01353   }
01354 
01355   isRemovingFP = true;
01356   removeRepoFilesNotInUse();
01357   /* With this, I also delete those that are in use!: */
01358   iPoolFiles.clear();
01359 
01360   if (path) {
01361     path->removeFile();
01362     release_(path);
01363   }
01364   exit_();
01365 }
01366 
01367 void MSRepository::openRepoFileForWriting(MSOpenTable *otab)
01368 {
01369   if (!otab->myWriteRepoFile)
01370     otab->myWriteRepoFile = openRepoFile();
01371 }
01372 
01373 uint64_t MSRepository::receiveBlob(MSOpenTable *otab, uint16_t head_size, uint64_t blob_size, Md5Digest *checksum, CSInputStream *stream)
01374 {
01375   off64_t offset;
01376   size_t  tfer;
01377 
01378   enter_();
01379     
01380   offset = myRepoFileSize;
01381 
01382   offset += head_size;
01383   
01384   ASSERT(myRepoDatabase->myBlobType == MS_STANDARD_STORAGE);
01385   if (stream) {
01386     CSMd5 md5;
01387     push_(stream);
01388     md5.md5_init();
01389     while (blob_size > 0) {
01390       if (blob_size <=  MS_OT_BUFFER_SIZE)
01391         tfer = (size_t) blob_size;
01392       else
01393         tfer = MS_OT_BUFFER_SIZE;
01394       tfer = stream->read(otab->myOTBuffer, tfer);
01395       if (!tfer)
01396         CSException::throwOSError(CS_CONTEXT, EPIPE);
01397       if (checksum) md5.md5_append((const u_char *)(otab->myOTBuffer), tfer);
01398       otab->myWriteRepoFile->write(otab->myOTBuffer, offset, tfer);
01399       offset += (uint64_t) tfer;
01400       blob_size -= (uint64_t) tfer;
01401     }
01402     if (checksum) md5.md5_get_digest(checksum);
01403     release_(stream);
01404   } else {
01405     // Write 1 byte to the end to reserver the space.
01406     otab->myWriteRepoFile->write("x" , offset + blob_size -1, 1);
01407   }
01408 
01409   return_( myRepoFileSize);
01410 }
01411 
01412 // copyBlob() copies the BLOB and its header.
01413 uint64_t MSRepository::copyBlob(MSOpenTable *otab, uint64_t size, CSInputStream *stream)
01414 {
01415   off64_t offset = myRepoFileSize;
01416   size_t  tfer;
01417 
01418   while (size > 0) {
01419     if (size <= MS_OT_BUFFER_SIZE)
01420       tfer = (size_t) size;
01421     else
01422       tfer = MS_OT_BUFFER_SIZE;
01423     tfer = stream->read(otab->myOTBuffer, tfer);
01424     if (!tfer)
01425       CSException::throwOSError(CS_CONTEXT, EPIPE);
01426     otab->myWriteRepoFile->write(otab->myOTBuffer, offset, tfer);
01427     offset += (uint64_t) tfer;
01428     size -= (uint64_t) tfer;
01429   }
01430   
01431   return myRepoFileSize;
01432 }
01433 
01434 void MSRepository::writeBlobHead(MSOpenTable *otab, uint64_t offset, uint8_t ref_size, uint16_t head_size, uint64_t blob_size, Md5Digest *checksum, char *metadata, uint16_t metadata_size, uint64_t blob_id, uint32_t auth_code, uint32_t log_id, uint32_t log_offset, uint8_t blob_type, CloudKeyPtr cloud_key)
01435 {
01436   MSBlobHeadPtr   blob ;
01437   MSRepoTableRefPtr tab_ref;
01438   MSRepoTempRefPtr  temp_ref;
01439   time_t        now;
01440   uint16_t        tab_idx, max_ref_count = (head_size - myRepoBlobHeadSize - metadata_size) / ref_size;
01441   size_t        size;
01442   
01443   if (max_ref_count > MS_REPO_MIN_REF_COUNT)
01444     max_ref_count = MS_REPO_MIN_REF_COUNT;
01445     
01446   ASSERT(max_ref_count > 1);
01447   
01448   if (blob_type == MS_CLOUD_STORAGE) 
01449     now = cloud_key->creation_time;
01450   else
01451     now = time(NULL);
01452   
01453   blob = (MSBlobHeadPtr) otab->myOTBuffer;
01454   CS_SET_DISK_4(blob->rb_last_access_4, now);
01455   CS_SET_DISK_4(blob->rb_mod_time_4, now);
01456   CS_SET_DISK_4(blob->rb_access_count_4, 0);
01457   CS_SET_DISK_4(blob->rb_backup_id_4, 0);
01458   CS_SET_DISK_4(blob->rb_create_time_4, now);
01459   CS_SET_DISK_4(blob->rd_magic_4, MS_BLOB_HEADER_MAGIC);
01460   CS_SET_DISK_2(blob->rb_head_size_2, head_size);
01461   CS_SET_DISK_6(blob->rb_blob_data_size_6, blob_size);
01462   CS_SET_DISK_1(blob->rb_status_1, MS_BLOB_ALLOCATED);
01463   CS_SET_DISK_1(blob->rb_ref_size_1, ref_size);
01464   CS_SET_DISK_2(blob->rb_ref_count_2, max_ref_count);
01465   CS_SET_DISK_4(blob->rb_last_ref_4, 0);
01466   CS_SET_DISK_4(otab->myOTBuffer + myRepoBlobHeadSize - 4, auth_code);
01467   if (checksum)
01468     memcpy(&(blob->rb_blob_checksum_md5d), checksum, sizeof(Md5Digest));
01469 
01470   CS_SET_DISK_2(blob->rb_mdata_size_2, metadata_size);
01471   if (metadata_size) {
01472     uint16_t metadata_offset = head_size - metadata_size;
01473     
01474     CS_SET_DISK_2(blob->rb_mdata_offset_2, metadata_offset);
01475     memcpy(otab->myOTBuffer + metadata_offset, metadata, metadata_size);
01476     
01477 #ifdef HAVE_ALIAS_SUPPORT
01478     MetaData md;  
01479     md.use_data(metadata, metadata_size);
01480     const char *alias;  
01481     alias = md.findAlias();
01482     if (alias) {
01483       uint32_t alias_hash;
01484       uint16_t alias_offset = metadata_offset + (uint16_t) (alias - metadata);
01485       CS_SET_DISK_2(blob->rb_alias_offset_2, alias_offset);
01486       alias_hash = myRepoDatabase->registerBlobAlias(myRepoID, offset, alias);
01487       CS_SET_DISK_4(blob->rb_alias_hash_4, alias_hash);
01488     } else {
01489       CS_SET_DISK_2(blob->rb_alias_offset_2, 0);
01490     }
01491 #else
01492     CS_SET_DISK_2(blob->rb_alias_offset_2, 0);
01493 #endif
01494     
01495   } else {
01496     CS_SET_DISK_2(blob->rb_mdata_offset_2, 0);
01497     CS_SET_DISK_2(blob->rb_alias_offset_2, 0);
01498   }
01499   
01500 
01501   if (blob_id) {
01502     tab_ref = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepoBlobHeadSize);
01503     CS_SET_DISK_2(tab_ref->rr_type_2, MS_BLOB_TABLE_REF);
01504     CS_SET_DISK_4(tab_ref->tr_table_id_4, otab->getDBTable()->myTableID);
01505     CS_SET_DISK_6(tab_ref->tr_blob_id_6, blob_id);
01506     temp_ref = (MSRepoTempRefPtr) (otab->myOTBuffer + myRepoBlobHeadSize + ref_size);
01507     tab_idx = 1;  // This is the index of the blob table ref in the repository record.
01508     size = myRepoBlobHeadSize + ref_size + ref_size;
01509   }
01510   else {
01511     temp_ref = (MSRepoTempRefPtr) (otab->myOTBuffer + myRepoBlobHeadSize);
01512     tab_idx = INVALID_INDEX;  // Means not used
01513     size = myRepoBlobHeadSize + ref_size;
01514   }
01515 
01516   CS_SET_DISK_2(temp_ref->rr_type_2, MS_BLOB_DELETE_REF);
01517   CS_SET_DISK_2(temp_ref->tp_del_ref_2, tab_idx);
01518   CS_SET_DISK_4(temp_ref->tp_log_id_4, log_id);
01519   CS_SET_DISK_4(temp_ref->tp_offset_4, log_offset);
01520   
01521   if (blob_type == MS_CLOUD_STORAGE) { // The data is stored in the cloud and not in the repository.
01522     CS_SET_DISK_4(blob->rb_s3_key_id_4, cloud_key->ref_index);
01523     CS_SET_DISK_4(blob->rb_s3_cloud_ref_4, cloud_key->cloud_ref);
01524     blob_size = 0; // The blob is not stored in the repository so the blob storage size in the repository is zero
01525   }
01526 
01527   memset(otab->myOTBuffer + size, 0, head_size - size - metadata_size);
01528   
01529   CS_SET_DISK_1(blob->rb_storage_type_1, blob_type);
01530   CS_SET_DISK_6(blob->rb_blob_repo_size_6, blob_size);
01531   otab->myWriteRepoFile->write(blob, offset, head_size);
01532   
01533   setRepoFileSize(otab, offset + head_size + blob_size);
01534 }
01535 
01536 void MSRepository::setRepoFileSize(MSOpenTable *otab, off64_t offset)
01537 {
01538   myRepoFileSize = offset;
01539   if (myRepoFileSize >= PBMSParameters::getRepoThreshold()
01540      || getGarbageLevel() >= PBMSParameters::getGarbageThreshold())
01541     otab->closeForWriting();
01542 }
01543 
01544 void MSRepository::syncHead(MSRepoFile *fh)
01545 {
01546   MSRepoHeadRec head;
01547 
01548   fh->sync();
01549   myRecoveryOffset = myRepoFileSize;
01550   CS_SET_DISK_8(head.rh_recovery_offset_8, myRecoveryOffset);
01551   CS_SET_DISK_4(head.rh_last_temp_time_4, myLastTempTime);
01552   CS_SET_DISK_4(head.rh_last_access_4, myLastAccessTime);
01553   CS_SET_DISK_4(head.rh_create_time_4, myLastCreateTime);
01554   CS_SET_DISK_4(head.rh_last_ref_4, myLastRefTime);
01555 
01556   fh->write(&head.rh_recovery_offset_8, offsetof(MSRepoHeadRec, rh_recovery_offset_8), 24);
01557   fh->sync();
01558 }
01559 
01560 MSRepoFile *MSRepository::openRepoFile()
01561 {
01562   MSRepoFile  *fh;
01563 
01564   enter_();
01565   fh = MSRepoFile::newRepoFile(this, getRepoFilePath());
01566   push_(fh);
01567   if (myRepoFileSize)
01568     fh->open(CSFile::DEFAULT);
01569   else
01570     fh->open(CSFile::CREATE);
01571   if (!myRepoHeadSize) {
01572     MSRepoHeadRec head;
01573     MSBlobHeadRec blob;
01574     size_t      size;
01575     int       status;
01576     int       ref_size;
01577     uint16_t    head_size;
01578     uint64_t    blob_size;
01579 
01580     lock_(this);
01581     /* Check again after locking: */
01582     if (!myRepoHeadSize) {
01583       if (fh->read(&head, 0, offsetof(MSRepoHeadRec, rh_reserved_4), 0) < offsetof(MSRepoHeadRec, rh_reserved_4)) {
01584         CS_SET_DISK_4(head.rh_magic_4, MS_REPO_FILE_MAGIC);
01585         CS_SET_DISK_2(head.rh_version_2, MS_REPO_FILE_VERSION);
01586         CS_SET_DISK_2(head.rh_repo_head_size_2, MS_REPO_FILE_HEAD_SIZE);
01587         CS_SET_DISK_2(head.rh_blob_head_size_2, sizeof(MSBlobHeadRec));
01588         CS_SET_DISK_2(head.rh_def_ref_size_2, sizeof(MSRepoGenericRefRec));
01589         CS_SET_DISK_8(head.rh_recovery_offset_8, MS_REPO_FILE_HEAD_SIZE);
01590         CS_SET_DISK_8(head.rh_garbage_count_8, 0);
01591         CS_SET_DISK_4(head.rh_last_temp_time_4, 0);
01592         CS_SET_DISK_4(head.rh_last_access_4, 0);
01593         CS_SET_DISK_4(head.rh_create_time_4, 0);
01594         CS_SET_DISK_4(head.rh_last_ref_4, 0);
01595         CS_SET_DISK_4(head.rh_reserved_4, 0);
01596         fh->write(&head, 0, sizeof(MSRepoHeadRec));
01597       }
01598       
01599       /* Check the file header: */
01600       if (CS_GET_DISK_4(head.rh_magic_4) != MS_REPO_FILE_MAGIC)
01601         CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_BAD_HEADER_MAGIC);
01602       if (CS_GET_DISK_2(head.rh_version_2) > MS_REPO_FILE_VERSION)
01603         CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_VERSION_TOO_NEW);
01604 
01605       /* Load the header details: */
01606       myRepoHeadSize = CS_GET_DISK_2(head.rh_repo_head_size_2);
01607       myRepoDefRefSize = CS_GET_DISK_2(head.rh_def_ref_size_2);
01608       myRepoBlobHeadSize = CS_GET_DISK_2(head.rh_blob_head_size_2);
01609       myRecoveryOffset = CS_GET_DISK_8(head.rh_recovery_offset_8);
01610       myGarbageCount = CS_GET_DISK_8(head.rh_garbage_count_8);
01611       myLastTempTime = CS_GET_DISK_4(head.rh_last_temp_time_4);
01612       myLastAccessTime = CS_GET_DISK_4(head.rh_last_access_4);
01613       myLastCreateTime = CS_GET_DISK_4(head.rh_create_time_4);
01614       myLastRefTime = CS_GET_DISK_4(head.rh_last_ref_4);
01615 
01616       /* File size, cannot be less than header size: */
01617       if (myRepoFileSize < myRepoHeadSize)
01618         myRepoFileSize = myRepoHeadSize;
01619 
01620       ASSERT(myGarbageCount <= myRepoFileSize);
01621 
01622       /* Recover the file: */
01623       while (myRecoveryOffset < myRepoFileSize) {
01624         if ((size = fh->read(&blob, myRecoveryOffset, MS_MIN_BLOB_HEAD_SIZE, 0)) < MS_MIN_BLOB_HEAD_SIZE) {
01625           if (size != 0) {
01626             myRepoFileSize = myRecoveryOffset;
01627             fh->setEOF(myRepoFileSize);
01628           }
01629           break;
01630         }
01631         uint16_t ref_count, mdata_size, mdata_offset;
01632         
01633         status = CS_GET_DISK_1(blob.rb_status_1);
01634         ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
01635         ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
01636         head_size = CS_GET_DISK_2(blob.rb_head_size_2);
01637         mdata_size = CS_GET_DISK_2(blob.rb_mdata_size_2);
01638         mdata_offset = CS_GET_DISK_2(blob.rb_mdata_offset_2);
01639         blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
01640         if ((CS_GET_DISK_4(blob.rd_magic_4) != MS_BLOB_HEADER_MAGIC) ||
01641           (! IN_USE_BLOB_STATUS(status)) ||
01642           head_size < (myRepoBlobHeadSize + ref_size * MS_REPO_MIN_REF_COUNT) ||
01643           head_size < (mdata_offset + mdata_size) ||
01644           ((blob_size == 0) && (BLOB_IN_REPOSITORY(CS_GET_DISK_1(blob.rb_storage_type_1)))) ||
01645           myRecoveryOffset + head_size + blob_size > myRepoFileSize) {
01646           myRepoFileSize = myRecoveryOffset;
01647           fh->setEOF(myRepoFileSize);
01648           break;
01649         }
01650         myRecoveryOffset += head_size + blob_size;
01651       }
01652 
01653       fh->sync();
01654       myRecoveryOffset = myRepoFileSize;
01655       CS_SET_DISK_8(head.rh_recovery_offset_8, myRecoveryOffset);
01656       fh->write(&head, offsetof(MSRepoHeadRec, rh_recovery_offset_8), 8);
01657       fh->sync();
01658     }
01659     unlock_(this);
01660   }
01661   pop_(fh);
01662   return_(fh);
01663 }
01664 
01665 void MSRepository::lockRepo(RepoLockState state)
01666 {
01667   CSMutex *myLock;
01668   enter_();
01669   
01670   myLock = &myRepoWriteLock;
01671   lock_(myLock);
01672   
01673   ASSERT(!myRepoXLock);
01674   
01675   myRepoLockState = state;
01676   myRepoXLock = true;
01677     
01678   unlock_(myLock);
01679   exit_();
01680 }
01681 
01682 void MSRepository::signalCompactor()
01683 {
01684 #ifndef MS_COMPACTOR_POLLS
01685   if (!mustBeDeleted) {
01686     if (getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
01687       if (myRepoDatabase->myCompactorThread)
01688         myRepoDatabase->myCompactorThread->wakeup();
01689     }
01690   }
01691 #endif
01692 }
01693 
01694 void MSRepository::unlockRepo(RepoLockState state)
01695 {
01696   CSMutex *myLock;
01697   enter_();
01698   myLock = &myRepoWriteLock;
01699   lock_(myLock);
01700   
01701   ASSERT(myRepoLockState & state);
01702   
01703   myRepoLockState &= ~state;
01704   if (myRepoLockState == REPO_UNLOCKED) {
01705     myRepoXLock = false;
01706     signalCompactor();
01707   }
01708   unlock_(myLock);
01709   
01710   exit_();
01711 }
01712 
01713 // Repositories are not removed from the pool when 
01714 // scheduled for backup so the REPO_BACKUP flag is
01715 // not handled here.
01716 void MSRepository::returnToPool()
01717 {
01718   CSMutex *myLock;
01719   enter_();
01720   myLock = &myRepoWriteLock;
01721   lock_(myLock);
01722   this->myRepoLockState &= ~(REPO_COMPACTING | REPO_WRITE);
01723   if ( this->myRepoLockState == REPO_UNLOCKED) {
01724     myRepoXLock = false;
01725     signalCompactor();
01726   }
01727   unlock_(myLock);
01728   
01729   this->release();
01730   exit_();
01731 }
01732 
01733 void MSRepository::backupCompleted()
01734 {
01735   CSMutex *myLock;
01736   enter_();
01737   myLock = &myRepoWriteLock;
01738   lock_(myLock);
01739   
01740   
01741   this->myRepoLockState &= ~REPO_BACKUP;
01742   if ( this->myRepoLockState == REPO_UNLOCKED) {
01743     myRepoXLock = false;
01744     signalCompactor();
01745   }
01746     
01747   unlock_(myLock);
01748   exit_();
01749 }
01750 
01751 bool MSRepository::lockedForBackup() { return ((myRepoLockState & REPO_BACKUP) == REPO_BACKUP);}
01752 
01753 uint32_t MSRepository::initBackup()
01754 {
01755   CSMutex *myLock;
01756   uint32_t state;
01757   enter_();
01758   
01759   myLock = &myRepoWriteLock;
01760   lock_(myLock);
01761   state = this->myRepoLockState;
01762   this->myRepoLockState |= REPO_BACKUP;
01763   if (this->myRepoLockState == REPO_BACKUP) 
01764     this->myRepoXLock = true;
01765     
01766   unlock_(myLock);
01767   return_(state);
01768 }
01769 
01770 MSRepoFile *MSRepository::getRepoFile()
01771 {
01772   MSRepoFile *file;
01773 
01774   if ((file = iFilePool)) {
01775     iFilePool = file->nextFile;
01776     file->nextFile = NULL;
01777     file->isFileInUse = true;
01778     file->retain();
01779   }
01780   return file;
01781 }
01782 
01783 void MSRepository::addRepoFile(MSRepoFile *file)
01784 {
01785   iPoolFiles.addFront(file);
01786 }
01787 
01788 void MSRepository::removeRepoFile(MSRepoFile *file)
01789 {
01790   iPoolFiles.remove(file);
01791 }
01792 
01793 void MSRepository::returnRepoFile(MSRepoFile *file)
01794 {
01795   file->isFileInUse = false;
01796   file->nextFile = iFilePool;
01797   iFilePool = file;
01798 }
01799 
01800 bool MSRepository::removeRepoFilesNotInUse()
01801 {
01802   MSRepoFile *file, *curr_file;
01803 
01804   iFilePool = NULL;
01805   /* Remove all files that are not in use: */
01806   if ((file = (MSRepoFile *) iPoolFiles.getBack())) {
01807     do {
01808       curr_file = file;
01809       file = (MSRepoFile *) file->getNextLink();
01810       if (!curr_file->isFileInUse)
01811         iPoolFiles.remove(curr_file);
01812     } while (file);
01813   }
01814   return iPoolFiles.getSize() == 0;
01815 }
01816 
01817 off64_t MSRepository::getRepoFileSize()
01818 {
01819   return myRepoFileSize;
01820 }
01821 
01822 size_t MSRepository::getRepoHeadSize()
01823 {
01824   return myRepoHeadSize;
01825 }
01826 
01827 size_t MSRepository::getRepoBlobHeadSize()
01828 {
01829   return myRepoBlobHeadSize;
01830 }
01831 
01832 CSMutex *MSRepository::getRepoLock(off64_t offset)
01833 {
01834   return &myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
01835 }
01836 
01837 uint32_t MSRepository::getRepoID()
01838 {
01839   return myRepoID;
01840 }
01841 
01842 uint32_t MSRepository::getGarbageLevel()
01843 {
01844   if (myRepoFileSize <= myRepoHeadSize)
01845     return 0;
01846   return myGarbageCount * 100 / (myRepoFileSize - myRepoHeadSize);
01847 }
01848 
01849 CSPath *MSRepository::getRepoFilePath()
01850 {
01851   char file_name[120];
01852 
01853   cs_strcpy(120, file_name, "bs-repository");
01854   cs_add_dir_char(120, file_name);
01855   cs_strcat(120, file_name, "repo-");
01856   cs_strcat(120, file_name, myRepoID);
01857   cs_strcat(120, file_name, ".bs");
01858 
01859   if (myRepoDatabase && myRepoDatabase->myDatabasePath) {
01860     return CSPath::newPath(RETAIN(myRepoDatabase->myDatabasePath), file_name);
01861   }
01862   return NULL;
01863 }
01864