00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #ifdef DRIZZLED
00031 #include <config.h>
00032 #include <drizzled/common.h>
00033 #include <drizzled/session.h>
00034 #endif
00035
00036 #include "cslib/CSConfig.h"
00037 #include <inttypes.h>
00038
00039 #include "defs_ms.h"
00040
00041 #include "cslib/CSGlobal.h"
00042 #include "cslib/CSLog.h"
00043 #include "cslib/CSStrUtil.h"
00044 #include "cslib/CSHTTPStream.h"
00045 #include "cslib/CSStream.h"
00046
00047 #include "repository_ms.h"
00048 #include "open_table_ms.h"
00049 #include "connection_handler_ms.h"
00050 #include "metadata_ms.h"
00051 #include "parameters_ms.h"
00052 #include "pbmsdaemon_ms.h"
00053
00054
00055
00056
00057
00058
00059 MSRepoFile::MSRepoFile():
00060 CSFile(),
00061 myRepo(NULL),
00062 isFileInUse(false),
00063 nextFile(NULL),
00064 iNextLink(NULL),
00065 iPrevLink(NULL)
00066 {
00067 }
00068
00069 MSRepoFile::~MSRepoFile()
00070 {
00071 close();
00072 }
00073
00074 void MSRepoFile::updateGarbage(uint64_t size)
00075 {
00076 MSRepoHeadRec repo_head;
00077 enter_();
00078
00079 lock_(myRepo);
00080 myRepo->myGarbageCount += size;
00081 CS_SET_DISK_8(repo_head.rh_garbage_count_8, myRepo->myGarbageCount);
00082 ASSERT(myRepo->myGarbageCount <= myRepo->myRepoFileSize);
00083 write(&repo_head.rh_garbage_count_8, offsetof(MSRepoHeadRec, rh_garbage_count_8), 8);
00084 unlock_(myRepo);
00085 if (!myRepo->myRepoXLock)
00086 myRepo->signalCompactor();
00087
00088 exit_();
00089 }
00090
00091 void MSRepoFile::updateAccess(MSBlobHeadPtr blob, uint64_t rep_offset)
00092 {
00093 time_t now = time(NULL);
00094 uint32_t count = CS_GET_DISK_4(blob->rb_access_count_4) +1;
00095
00096 CS_SET_DISK_4(blob->rb_last_access_4, now);
00097 CS_SET_DISK_4(blob->rb_access_count_4, count);
00098 write(&blob->rb_last_access_4, rep_offset + offsetof(MSBlobHeadRec, rb_last_access_4), 8);
00099 myRepo->myLastAccessTime = now;
00100 }
00101
00102 uint64_t MSRepoFile::readBlobChunk(PBMSBlobIDPtr blob_id, uint64_t rep_offset, uint64_t blob_offset, uint64_t buffer_size, char *buffer)
00103 {
00104 MSBlobHeadRec blob_head;
00105 size_t tfer;
00106 uint16_t head_size;
00107 uint64_t blob_size;
00108 uint32_t ac;
00109 uint64_t offset, blob_read =0;
00110
00111 enter_();
00112
00113 read(&blob_head, rep_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
00114 if (CS_GET_DISK_4(blob_head.rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00115 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00116
00117 blob_size = CS_GET_DISK_6(blob_head.rb_blob_repo_size_6);
00118 head_size = CS_GET_DISK_2(blob_head.rb_head_size_2);
00119 ac = CS_GET_DISK_4(blob_head.rb_auth_code_4);
00120 if (blob_id->bi_auth_code != ac)
00121 CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
00122
00123 offset = rep_offset + blob_offset + head_size;
00124
00125 if (blob_offset > blob_size)
00126 goto done;
00127
00128 if ((blob_offset + buffer_size) > blob_size)
00129 buffer_size = blob_size - blob_offset;
00130
00131 while (buffer_size > 0) {
00132 if (buffer_size <= (uint64_t) (SSIZE_MAX))
00133 tfer = (size_t) buffer_size;
00134 else
00135 tfer = SSIZE_MAX;
00136
00137 read(buffer, offset, tfer, tfer);
00138 offset += (uint64_t) tfer;
00139 buffer += (uint64_t) tfer;
00140 buffer_size -= (uint64_t) tfer;
00141 blob_read += (uint64_t) tfer;
00142 }
00143
00144
00145 if (!blob_offset)
00146 updateAccess(&blob_head, rep_offset);
00147
00148 done:
00149 return_(blob_read);
00150 }
00151
00152 void MSRepoFile::writeBlobChunk(PBMSBlobIDPtr blob_id, uint64_t rep_offset, uint64_t blob_offset, uint64_t data_size, char *data)
00153 {
00154 size_t tfer;
00155 off64_t offset;
00156 MSBlobHeadRec blob_head;
00157 uint16_t head_size;
00158 uint64_t blob_size;
00159 uint32_t ac;
00160
00161 enter_();
00162
00163 read(&blob_head, rep_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
00164 if (CS_GET_DISK_4(blob_head.rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00165 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00166
00167 blob_size = CS_GET_DISK_6(blob_head.rb_blob_repo_size_6);
00168 head_size = CS_GET_DISK_2(blob_head.rb_head_size_2);
00169 ac = CS_GET_DISK_4(blob_head.rb_auth_code_4);
00170 if (blob_id->bi_auth_code != ac)
00171 CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
00172
00173 if ((blob_offset + data_size) > blob_size)
00174 CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB write size or offset");
00175
00176 offset = (uint64_t) head_size + rep_offset + blob_offset;
00177
00178 while (data_size > 0) {
00179 if (data_size <= (uint64_t) (SSIZE_MAX))
00180 tfer = (size_t) data_size;
00181 else
00182 tfer = SSIZE_MAX;
00183
00184 write(data, offset, tfer);
00185 data += (uint64_t) tfer;
00186 offset += (uint64_t) tfer;
00187 data_size -= (uint64_t) tfer;
00188 }
00189
00190 exit_();
00191 }
00192
00193 void MSRepoFile::sendBlob(MSOpenTable *otab, uint64_t offset, uint64_t req_offset, uint64_t req_size, uint32_t auth_code, bool with_auth_code, bool info_only, CSHTTPOutputStream *stream)
00194 {
00195 MSConnectionHandler *me;
00196 size_t tfer;
00197 off64_t start_offset = offset;
00198 MSBlobHeadRec blob_head;
00199 uint8_t storage_type;
00200 uint16_t head_size, meta_size;
00201 uint64_t blob_data_size, local_blob_size, meta_offset;
00202 uint32_t ac;
00203 char num_str[CS_WIDTH_INT_64];
00204 bool redirect = false;
00205
00206
00207 enter_();
00208 me = (MSConnectionHandler *) self;
00209
00210 read(&blob_head, start_offset, sizeof(MSBlobHeadRec), sizeof(MSBlobHeadRec));
00211 local_blob_size = CS_GET_DISK_6(blob_head.rb_blob_repo_size_6);
00212 blob_data_size = CS_GET_DISK_6(blob_head.rb_blob_data_size_6);
00213 head_size = CS_GET_DISK_2(blob_head.rb_head_size_2);
00214 meta_size = CS_GET_DISK_2(blob_head.rb_mdata_size_2);
00215 meta_offset = start_offset + CS_GET_DISK_2(blob_head.rb_mdata_offset_2);
00216 ac = CS_GET_DISK_4(blob_head.rb_auth_code_4);
00217 if ((with_auth_code && auth_code != ac) || (CS_GET_DISK_4(blob_head.rd_magic_4) != MS_BLOB_HEADER_MAGIC))
00218 CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
00219
00220 storage_type = CS_GET_DISK_1(blob_head.rb_storage_type_1);
00221
00222 if ((!info_only) && BLOB_IN_CLOUD(storage_type)) {
00223 CSString *redirect_url = NULL;
00224 CloudKeyRec key;
00225 getBlobKey(&blob_head, &key);
00226 redirect_url = otab->getDB()->myBlobCloud->cl_getDataURL(&key);
00227 push_(redirect_url);
00228 stream->setStatus(301);
00229 stream->addHeader("Location", redirect_url->getCString());
00230 release_(redirect_url);
00231 redirect = true;
00232 } else
00233 stream->setStatus(200);
00234
00235 if (storage_type == MS_STANDARD_STORAGE) {
00236 char hex_checksum[33];
00237 cs_bin_to_hex(33, hex_checksum, 16, blob_head.rb_blob_checksum_md5d.val);
00238 stream->addHeader(MS_CHECKSUM_TAG, hex_checksum);
00239 }
00240
00241 snprintf(num_str, CS_WIDTH_INT_64, "%"PRIu64"", blob_data_size);
00242 stream->addHeader(MS_BLOB_SIZE, num_str);
00243
00244 snprintf(num_str, CS_WIDTH_INT_64, "%"PRIu32"", CS_GET_DISK_4(blob_head.rb_last_access_4));
00245 stream->addHeader(MS_LAST_ACCESS, num_str);
00246
00247 snprintf(num_str, CS_WIDTH_INT_64, "%"PRIu32"", CS_GET_DISK_4(blob_head.rb_access_count_4));
00248 stream->addHeader(MS_ACCESS_COUNT, num_str);
00249
00250 snprintf(num_str, CS_WIDTH_INT_64, "%"PRIu32"", CS_GET_DISK_4(blob_head.rb_create_time_4));
00251 stream->addHeader(MS_CREATION_TIME, num_str);
00252
00253 snprintf(num_str, CS_WIDTH_INT_64, "%"PRIu32"", storage_type);
00254 stream->addHeader(MS_BLOB_TYPE, num_str);
00255
00256
00257
00258 if (meta_size) {
00259 MetaData metadata;
00260 char *name, *value;
00261
00262 read(otab->myOTBuffer, meta_offset, meta_size, meta_size);
00263 metadata.use_data(otab->myOTBuffer, meta_size);
00264 while ((name = metadata.findNext(&value))) {
00265 stream->addHeader(name, value);
00266 }
00267
00268 }
00269
00270 offset += (uint64_t) head_size + req_offset;
00271 local_blob_size -= req_offset;
00272 if (local_blob_size > req_size)
00273 local_blob_size = req_size;
00274
00275 stream->setContentLength((redirect || info_only)?0:local_blob_size);
00276 stream->writeHead();
00277 me->replyPending = false;
00278
00279 if ((!redirect) && !info_only) {
00280
00281 while (local_blob_size > 0) {
00282 if (local_blob_size <= MS_OT_BUFFER_SIZE)
00283 tfer = (size_t) local_blob_size;
00284 else
00285 tfer = MS_OT_BUFFER_SIZE;
00286 read(otab->myOTBuffer, offset, tfer, tfer);
00287 stream->write(otab->myOTBuffer, tfer);
00288 offset += (uint64_t) tfer;
00289 local_blob_size -= (uint64_t) tfer;
00290 }
00291 }
00292 stream->flush();
00293
00294 if (!info_only) {
00295
00296
00297 updateAccess(&blob_head, start_offset);
00298 }
00299
00300 exit_();
00301 }
00302
00303 void MSRepoFile::update_blob_header(MSOpenTable *otab, uint64_t offset, uint64_t blob_size, uint16_t head_size, uint16_t new_head_size)
00304 {
00305 uint16_t w_offset = offsetof(MSBlobHeadRec, rb_ref_count_2);
00306 MSRepoPointersRec ptr;
00307 enter_();
00308
00309 ptr.rp_chars = otab->myOTBuffer;
00310 CS_SET_DISK_4(ptr.rp_head->rb_mod_time_4, time(NULL));
00311
00312 if (head_size == new_head_size) {
00313 w_offset = offsetof(MSBlobHeadRec, rb_ref_count_2);
00314 write(otab->myOTBuffer + w_offset, offset + w_offset, head_size - w_offset);
00315 } else {
00316
00317 off64_t dst_offset;
00318 CSStringBuffer *buffer;
00319 uint16_t ref_count, ref_size;
00320 uint32_t tab_id;
00321 uint64_t blob_id;
00322
00323 myRepo->myRepoDatabase->openWriteRepo(otab);
00324 dst_offset = otab->myWriteRepo->myRepoFileSize;
00325
00326
00327 otab->myWriteRepoFile->write(otab->myOTBuffer, dst_offset, new_head_size);
00328
00329
00330 new_(buffer, CSStringBuffer());
00331 push_(buffer);
00332 buffer->setLength(MS_COMPACTOR_BUFFER_SIZE);
00333 CSFile::transfer(RETAIN(otab->myWriteRepoFile), dst_offset + new_head_size, RETAIN(this), offset + head_size, blob_size, buffer->getBuffer(0), MS_COMPACTOR_BUFFER_SIZE);
00334 release_(buffer);
00335
00336 #ifdef HAVE_ALIAS_SUPPORT
00337
00338
00339 if (CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2)) {
00340 uint32_t alias_hash = CS_GET_DISK_4(ptr.rp_head->rb_alias_hash_4);
00341 myRepo->myRepoDatabase->moveBlobAlias(myRepo->myRepoID, offset, alias_hash, myRepo->myRepoID, dst_offset);
00342 }
00343 #endif
00344
00345
00346 ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
00347 ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
00348 ptr.rp_chars += myRepo->myRepoBlobHeadSize;
00349
00350 while (ref_count) {
00351 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00352 case MS_BLOB_FREE_REF:
00353 break;
00354 case MS_BLOB_TABLE_REF:
00355 tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
00356 blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
00357
00358 if ((otab->haveTable()) && (otab->getDBTable()->myTableID == tab_id))
00359 otab->getDBTable()->updateBlobHandle(otab, blob_id, otab->myWriteRepo->myRepoID, dst_offset, new_head_size);
00360 else {
00361 MSOpenTable *ref_otab;
00362
00363 ref_otab = MSTableList::getOpenTableByID(myRepo->myRepoDatabase->myDatabaseID, tab_id);
00364 frompool_(ref_otab);
00365 ref_otab->getDBTable()->updateBlobHandle(ref_otab, blob_id, otab->myWriteRepo->myRepoID, dst_offset, new_head_size);
00366 backtopool_(ref_otab);
00367 }
00368 break;
00369 case MS_BLOB_DELETE_REF:
00370 break;
00371 default:
00372 break;
00373 }
00374 ptr.rp_chars += ref_size;
00375 ref_count--;
00376 }
00377
00378 otab->myWriteRepo->myRepoFileSize += new_head_size + blob_size;
00379
00380
00381 ptr.rp_chars = otab->myOTBuffer;
00382 if (myRepo->lockedForBackup()) {
00383
00384
00385
00386 CS_SET_DISK_1(ptr.rp_head->rb_status_1, MS_BLOB_MOVED);
00387 CS_SET_DISK_4(ptr.rp_head->rb_backup_id_4, myRepo->myRepoDatabase->backupID());
00388 } else
00389 CS_SET_DISK_1(ptr.rp_head->rb_status_1, MS_BLOB_DELETED);
00390
00391 write(ptr.rp_chars + MS_BLOB_STAT_OFFS, offset + MS_BLOB_STAT_OFFS, head_size - MS_BLOB_STAT_OFFS);
00392
00393 #ifdef DO_NOT_WIPE_BLOB
00394
00395
00396 ptr.rp_chars += myRepo->myRepoBlobHeadSize;
00397 memset(ptr.rp_chars, 0, head_size - myRepo->myRepoBlobHeadSize);
00398
00399 w_offset = offsetof(MSBlobHeadRec, rb_alias_hash_4);
00400 write(otab->myOTBuffer + w_offset, offset + w_offset, head_size - w_offset);
00401 #endif
00402
00403
00404 updateGarbage(head_size + blob_size);
00405
00406 }
00407 exit_();
00408 }
00409
00410 void MSRepoFile::referenceBlob(MSOpenTable *otab, uint64_t offset, uint16_t head_size, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id, uint32_t auth_code, uint16_t col_index)
00411 {
00412 CSMutex *myLock;
00413 MSRepoPointersRec ptr;
00414 uint32_t size, ref_count;
00415 size_t ref_size, read_size;
00416 MSRepoBlobRefPtr free_ref = NULL;
00417 MSRepoBlobRefPtr free2_ref = NULL;
00418 MSRepoTableRefPtr tab_ref = NULL;
00419 uint16_t new_head_size;
00420 #ifdef HAVE_ALIAS_SUPPORT
00421 bool reset_alias_index = false;
00422 char blob_alias[BLOB_ALIAS_LENGTH];
00423 #endif
00424 uint64_t blob_size;
00425
00426 enter_();
00427
00428 myLock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
00429 lock_(myLock);
00430
00431 if (head_size > MS_OT_BUFFER_SIZE) {
00432 CSException::throwAssertion(CS_CONTEXT, "BLOB header overflow");
00433 }
00434
00435 read_size = read(otab->myOTBuffer, offset, head_size, 0);
00436 ptr.rp_chars = otab->myOTBuffer;
00437 if (CS_GET_DISK_4(ptr.rp_head->rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00438 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00439 if (read_size < myRepo->myRepoBlobHeadSize)
00440 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB header incomplete");
00441 if ( ! IN_USE_BLOB_STATUS(CS_GET_DISK_1(ptr.rp_head->rb_status_1)))
00442 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
00443 if (CS_GET_DISK_4(ptr.rp_bytes + myRepo->myRepoBlobHeadSize - 4) != auth_code)
00444 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB data does not match reference");
00445
00446 if (head_size != CS_GET_DISK_2(ptr.rp_head->rb_head_size_2)) {
00447 head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00448 if (head_size > MS_OT_BUFFER_SIZE) {
00449 CSException::throwAssertion(CS_CONTEXT, "BLOB header overflow");
00450 }
00451 read_size = read(otab->myOTBuffer, offset, head_size, myRepo->myRepoBlobHeadSize);
00452 }
00453 head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00454 blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
00455 if (read_size < head_size) {
00456
00457
00458
00459
00460
00461
00462 head_size = read_size;
00463 blob_size = 0;
00464
00465 }
00466 ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
00467 ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
00468
00469 #ifdef HAVE_ALIAS_SUPPORT
00470 if (CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2)) {
00471 reset_alias_index = true;
00472 strcpy(blob_alias, otab->myOTBuffer + CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2));
00473 }
00474 #endif
00475
00476 size = head_size - myRepo->myRepoBlobHeadSize;
00477 if (size > ref_size * ref_count)
00478 size = ref_size * ref_count;
00479 CS_SET_DISK_4(ptr.rp_head->rb_last_ref_4, (uint32_t) time(NULL));
00480 CS_SET_DISK_1(ptr.rp_head->rb_status_1, MS_BLOB_REFERENCED);
00481 ptr.rp_chars += myRepo->myRepoBlobHeadSize;
00482 while (size >= ref_size) {
00483 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00484 case MS_BLOB_FREE_REF:
00485 if (!free_ref)
00486 free_ref = ptr.rp_blob_ref;
00487 else if (!free2_ref)
00488 free2_ref = ptr.rp_blob_ref;
00489 break;
00490 case MS_BLOB_TABLE_REF:
00491 #ifdef HAVE_ALIAS_SUPPORT
00492 reset_alias_index = false;
00493 #endif
00494 if (CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4) == tab_id &&
00495 CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6) == blob_id)
00496 tab_ref = ptr.rp_tab_ref;
00497 break;
00498 case MS_BLOB_DELETE_REF: {
00499 uint32_t tab_index;
00500
00501 tab_index = CS_GET_DISK_2(ptr.rp_temp_ref->tp_del_ref_2);
00502 if (tab_index && tab_index < ref_count) {
00503 MSRepoTableRefPtr tr;
00504
00505 tab_index--;
00506 tr = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->getRepoBlobHeadSize() + tab_index * ref_size);
00507 if (CS_GET_DISK_4(tr->tr_table_id_4) == tab_id &&
00508 CS_GET_DISK_6(tr->tr_blob_id_6) == blob_id) {
00509 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00510 if (free_ref)
00511 free2_ref = free_ref;
00512 free_ref = ptr.rp_blob_ref;
00513 }
00514 }
00515 else if (tab_index == INVALID_INDEX) {
00516
00517 if (free_ref)
00518 free2_ref = free_ref;
00519 free_ref = ptr.rp_blob_ref;
00520 }
00521 break;
00522 }
00523 default: {
00524 uint32_t tab_index;
00525 tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
00526
00527 if (tab_index && tab_index < ref_count) {
00528 MSRepoTableRefPtr tr;
00529
00530 tab_index--;
00531 tr = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->getRepoBlobHeadSize() + tab_index * ref_size);
00532 if (CS_GET_DISK_4(tr->tr_table_id_4) == tab_id &&
00533 CS_GET_DISK_6(tr->tr_blob_id_6) == blob_id) {
00534 if (COMMIT_MASK(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8)) == blob_ref_id) {
00535 char message[100];
00536 snprintf(message, 100, "Duplicate BLOB reference: db_id: %"PRIu32", tab_id:%"PRIu32", blob_ref_id: %"PRIu64"\n", myRepo->myRepoDatabase->myDatabaseID, tab_id, blob_ref_id);
00537
00538 self->myException.log(self, message);
00539 goto done;
00540 }
00541 }
00542 }
00543 break;
00544 }
00545 }
00546 ptr.rp_chars += ref_size;
00547 size -= ref_size;
00548 }
00549
00550
00551
00552
00553 if (!free_ref || (!tab_ref && !free2_ref)) {
00554 size_t new_refs = (tab_ref)?1:2;
00555 ptr.rp_chars = otab->myOTBuffer;
00556 size_t sp = MS_VAR_SPACE(ptr.rp_head);
00557
00558 if (sp > (new_refs * CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1))) {
00559 sp = MS_MIN_BLOB_HEAD_SIZE;
00560 }
00561
00562 if (MS_CAN_ADD_REFS(ptr.rp_head, new_refs)) {
00563 new_head_size = head_size;
00564
00565 } else {
00566 size_t new_size, max_refs;
00567
00568 if (ref_count < 2)
00569 max_refs = 4;
00570 else if (ref_count > 32)
00571 max_refs = ref_count + 32;
00572 else
00573 max_refs = 2 * ref_count;
00574
00575 if (max_refs > (MS_OT_BUFFER_SIZE/ref_size))
00576 max_refs = (MS_OT_BUFFER_SIZE/ref_size);
00577
00578 if (max_refs < (ref_count + new_refs))
00579 CSException::throwAssertion(CS_CONTEXT, "BLOB reference header overflow");
00580
00581 new_size = head_size + ref_size * max_refs;
00582
00583
00584 if (CS_GET_DISK_2(ptr.rp_head->rb_mdata_size_2)) {
00585 uint16_t mdata_size, mdata_offset, alias_offset, shift;
00586
00587 shift = new_size - head_size;
00588 mdata_size = CS_GET_DISK_2(ptr.rp_head->rb_mdata_size_2);
00589 mdata_offset = CS_GET_DISK_2(ptr.rp_head->rb_mdata_offset_2);
00590 alias_offset = CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2);
00591
00592 memmove(ptr.rp_chars + mdata_offset + shift, ptr.rp_chars + mdata_offset, shift);
00593 memset(ptr.rp_chars + mdata_offset, 0, shift);
00594 mdata_offset += shift;
00595 alias_offset += shift;
00596
00597 CS_SET_DISK_2(ptr.rp_head->rb_mdata_offset_2, mdata_offset);
00598 CS_SET_DISK_2(ptr.rp_head->rb_alias_offset_2, alias_offset);
00599
00600 } else
00601 memset(ptr.rp_chars + head_size, 0, new_size - head_size);
00602
00603 new_head_size = new_size;
00604 }
00605 CS_SET_DISK_2(ptr.rp_head->rb_head_size_2, new_head_size);
00606 CS_SET_DISK_2(ptr.rp_head->rb_ref_count_2, ref_count + new_refs);
00607 ptr.rp_chars += myRepo->myRepoBlobHeadSize + ref_count * ref_size;
00608
00609 if (!free_ref) {
00610 free_ref = ptr.rp_blob_ref;
00611 memset(free_ref, 0, ref_size);
00612 ptr.rp_chars += ref_size;
00613 }
00614
00615 if (!tab_ref) {
00616 free2_ref = ptr.rp_blob_ref;
00617 memset(free2_ref, 0, ref_size);
00618 }
00619
00620 ref_count += new_refs;
00621 }
00622 else
00623 new_head_size = head_size;
00624
00625 if (!tab_ref) {
00626 tab_ref = (MSRepoTableRefPtr) free2_ref;
00627
00628 CS_SET_DISK_2(tab_ref->rr_type_2, MS_BLOB_TABLE_REF);
00629 CS_SET_DISK_4(tab_ref->tr_table_id_4, tab_id);
00630 CS_SET_DISK_6(tab_ref->tr_blob_id_6, blob_id);
00631 }
00632
00633 size_t tab_idx;
00634
00635 tab_idx = (((char *) tab_ref - otab->myOTBuffer) - myRepo->myRepoBlobHeadSize) / ref_size;
00636
00637 CS_SET_DISK_2(free_ref->er_table_2, tab_idx+1);
00638 CS_SET_DISK_2(free_ref->er_col_index_2, col_index);
00639 CS_SET_DISK_8(free_ref->er_blob_ref_id_8, UNCOMMITTED(blob_ref_id));
00640
00641 update_blob_header(otab, offset, blob_size, head_size, new_head_size);
00642 #ifdef HAVE_ALIAS_SUPPORT
00643 if (reset_alias_index)
00644 myRepo->myRepoDatabase->registerBlobAlias(myRepo->myRepoID, offset, blob_alias);
00645 #endif
00646
00647 done:
00648
00649 unlock_(myLock);
00650 exit_();
00651 }
00652
00653 void MSRepoFile::setBlobMetaData(MSOpenTable *otab, uint64_t offset, const char *meta_data, uint16_t meta_data_len, bool reset_alias, const char *alias)
00654 {
00655 CSMutex *mylock;
00656 MSRepoPointersRec ptr;
00657 size_t read_size;
00658 uint16_t new_head_size;
00659 uint64_t blob_size;
00660 uint16_t head_size, mdata_size, mdata_offset, alias_offset = 0;
00661 MSBlobHeadRec blob;
00662
00663 enter_();
00664
00665 mylock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
00666 lock_(mylock);
00667
00668
00669 if (read(&blob, offset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec)) {
00670 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB header incomplete");
00671 }
00672
00673 head_size = CS_GET_DISK_2(blob.rb_head_size_2);
00674
00675 if (head_size > MS_OT_BUFFER_SIZE) {
00676 CSException::throwAssertion(CS_CONTEXT, "BLOB header overflow");
00677 }
00678
00679 read_size = read(otab->myOTBuffer, offset, head_size, 0);
00680 ptr.rp_chars = otab->myOTBuffer;
00681 if (CS_GET_DISK_4(ptr.rp_head->rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00682 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00683 if (read_size < myRepo->myRepoBlobHeadSize)
00684 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB header incomplete");
00685 if (! IN_USE_BLOB_STATUS(CS_GET_DISK_1(ptr.rp_head->rb_status_1)))
00686 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
00687
00688
00689 blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
00690 if (read_size < head_size) {
00691
00692
00693
00694
00695
00696
00697 head_size = read_size;
00698 blob_size = 0;
00699 }
00700 mdata_size = CS_GET_DISK_2(ptr.rp_head->rb_mdata_size_2);
00701
00702 if ((meta_data_len < mdata_size) || MS_CAN_ADD_MDATA(ptr.rp_head, meta_data_len - mdata_size))
00703 new_head_size = head_size;
00704 else {
00705
00706 new_head_size = head_size + meta_data_len - mdata_size;
00707 if (new_head_size > MS_OT_BUFFER_SIZE)
00708 CSException::throwAssertion(CS_CONTEXT, "BLOB reference header overflow");
00709
00710 memset(ptr.rp_chars + head_size, 0, new_head_size - head_size);
00711 CS_SET_DISK_2(ptr.rp_head->rb_head_size_2, new_head_size);
00712
00713 }
00714
00715
00716 if (meta_data_len)
00717 mdata_offset = new_head_size - meta_data_len;
00718 else
00719 mdata_offset = 0;
00720 mdata_size = meta_data_len;
00721
00722
00723 CS_SET_DISK_2(ptr.rp_head->rb_mdata_size_2, mdata_size);
00724 CS_SET_DISK_2(ptr.rp_head->rb_mdata_offset_2, mdata_offset);
00725 #ifdef HAVE_ALIAS_SUPPORT
00726 uint32_t alias_hash = INVALID_ALIAS_HASH;
00727 if (alias) {
00728 alias_hash = CS_GET_DISK_4(ptr.rp_head->rb_alias_hash_4);
00729 alias_offset = CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2);
00730 if (reset_alias) {
00731 if (alias_offset)
00732 alias_hash = myRepo->myRepoDatabase->updateBlobAlias(myRepo->myRepoID, offset, alias_hash, alias);
00733 else {
00734 alias_hash = myRepo->myRepoDatabase->registerBlobAlias(myRepo->myRepoID, offset, alias);
00735 }
00736 }
00737 alias_offset = mdata_offset + (alias - meta_data);
00738
00739 } else if (reset_alias && CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2)) {
00740 alias_offset = CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2);
00741 myRepo->myRepoDatabase->deleteBlobAlias(myRepo->myRepoID, offset, CS_GET_DISK_4(ptr.rp_head->rb_alias_hash_4));
00742 alias_offset = 0;
00743 }
00744 #else
00745 uint32_t alias_hash = ((uint32_t)-1);
00746 if (alias || reset_alias) {
00747 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_IMPLEMENTED, "No BLOB alias support.");
00748 }
00749 #endif
00750
00751 CS_SET_DISK_2(ptr.rp_head->rb_alias_offset_2, alias_offset);
00752 CS_SET_DISK_4(ptr.rp_head->rb_alias_hash_4, alias_hash);
00753
00754 memcpy(ptr.rp_chars + mdata_offset, meta_data, meta_data_len);
00755
00756 update_blob_header(otab, offset, blob_size, head_size, new_head_size);
00757
00758 unlock_(mylock);
00759 exit_();
00760
00761 }
00762
00763
00764 void MSRepoFile::releaseBlob(MSOpenTable *otab, uint64_t offset, uint16_t head_size, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id, uint32_t auth_code)
00765 {
00766 CSMutex *mylock;
00767 MSRepoPointersRec ptr;
00768 uint32_t table_ref_count = 0;
00769 uint32_t size;
00770 size_t ref_size, ref_count, read_size;
00771 MSRepoTempRefPtr temp_ref = NULL;
00772 uint16_t tab_index = 0;
00773 MSRepoTableRefPtr tab_ref;
00774 uint16_t alias_offset;
00775 uint32_t alias_hash;
00776
00777 enter_();
00778
00779 mylock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
00780 lock_(mylock);
00781
00782 ASSERT(head_size <= MS_OT_BUFFER_SIZE);
00783 read_size = read(otab->myOTBuffer, offset, head_size, 0);
00784 ptr.rp_chars = otab->myOTBuffer;
00785 if (CS_GET_DISK_4(ptr.rp_head->rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00786 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00787 if (read_size < myRepo->myRepoBlobHeadSize) {
00788 removeBlob(otab, tab_id, blob_id, offset, auth_code);
00789 goto exit;
00790 }
00791 if ((! IN_USE_BLOB_STATUS(CS_GET_DISK_1(ptr.rp_head->rb_status_1))) ||
00792 CS_GET_DISK_4(ptr.rp_bytes + myRepo->myRepoBlobHeadSize - 4) != auth_code) {
00793 removeBlob(otab, tab_id, blob_id, offset, auth_code);
00794 goto exit;
00795 }
00796
00797
00798 if (head_size != CS_GET_DISK_2(ptr.rp_head->rb_head_size_2)) {
00799 head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00800 read_size = read(otab->myOTBuffer, offset, head_size, myRepo->myRepoBlobHeadSize);
00801 }
00802 head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00803 if (read_size < head_size) {
00804
00805
00806
00807
00808
00809
00810 head_size = read_size;
00811 }
00812 ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
00813 ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
00814
00815 alias_offset = CS_GET_DISK_2(ptr.rp_head->rb_alias_offset_2);
00816 alias_hash = CS_GET_DISK_4(ptr.rp_head->rb_alias_hash_4);
00817
00818 size = head_size - myRepo->myRepoBlobHeadSize;
00819 if (size > ref_size * ref_count)
00820 size = ref_size * ref_count;
00821 ptr.rp_chars += myRepo->myRepoBlobHeadSize;
00822 while (size >= ref_size) {
00823 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00824 case MS_BLOB_FREE_REF:
00825 case MS_BLOB_TABLE_REF:
00826 break;
00827 case MS_BLOB_DELETE_REF: {
00828 uint32_t tabi;
00829
00830 tabi = CS_GET_DISK_2(ptr.rp_temp_ref->tp_del_ref_2);
00831 if (tabi && tabi < ref_count) {
00832 tabi--;
00833 tab_ref = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->myRepoBlobHeadSize + tabi * ref_size);
00834 if (CS_GET_DISK_4(tab_ref->tr_table_id_4) == tab_id &&
00835 CS_GET_DISK_6(tab_ref->tr_blob_id_6) == blob_id) {
00836
00837
00838
00839
00840
00841
00842 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00843 }
00844 }
00845 break;
00846 }
00847 default:
00848 tab_ref = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
00849 if (CS_GET_DISK_4(tab_ref->tr_table_id_4) == tab_id &&
00850 CS_GET_DISK_6(tab_ref->tr_blob_id_6) == blob_id) {
00851 if (COMMIT_MASK(CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8)) == blob_ref_id) {
00852
00853 tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1;
00854 temp_ref = ptr.rp_temp_ref;
00855
00856 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00857 }
00858 else
00859 table_ref_count++;
00860 }
00861 break;
00862 }
00863 ptr.rp_chars += ref_size;
00864 size -= ref_size;
00865 }
00866
00867
00868
00869 if ((!table_ref_count) && temp_ref) {
00870 uint32_t log_id;
00871 uint32_t log_offset;
00872 uint32_t temp_time;
00873 #ifdef HAVE_ALIAS_SUPPORT
00874 MSDiskAliasRec aliasDiskRec;
00875 MSDiskAliasPtr aliasDiskPtr = NULL;
00876
00877 if (alias_offset) {
00878 CS_SET_DISK_4(aliasDiskRec.ar_repo_id_4, myRepo->myRepoID);
00879 CS_SET_DISK_8(aliasDiskRec.ar_offset_8, offset);
00880 CS_SET_DISK_4(aliasDiskRec.ar_hash_4, alias_hash);
00881 aliasDiskPtr = &aliasDiskRec;
00882 }
00883
00884 myRepo->myRepoDatabase->queueForDeletion(otab, MS_TL_BLOB_REF, tab_id, blob_id, auth_code, &log_id, &log_offset, &temp_time, aliasDiskPtr);
00885 #else
00886 myRepo->myRepoDatabase->queueForDeletion(otab, MS_TL_BLOB_REF, tab_id, blob_id, auth_code, &log_id, &log_offset, &temp_time);
00887 #endif
00888 myRepo->myLastTempTime = temp_time;
00889 CS_SET_DISK_2(temp_ref->rr_type_2, MS_BLOB_DELETE_REF);
00890 CS_SET_DISK_2(temp_ref->tp_del_ref_2, tab_index+1);
00891 CS_SET_DISK_4(temp_ref->tp_log_id_4, log_id);
00892 CS_SET_DISK_4(temp_ref->tp_offset_4, log_offset);
00893
00894 CS_SET_DISK_1(ptr.rp_head->rb_status_1, MS_BLOB_ALLOCATED);
00895 }
00896 if (temp_ref) {
00897
00898
00899
00900
00901 write(otab->myOTBuffer + MS_BLOB_STAT_OFFS, offset + MS_BLOB_STAT_OFFS, head_size - MS_BLOB_STAT_OFFS);
00902 } else if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == false) {
00903 char message[100];
00904 snprintf(message, 100, "BLOB reference not found: db_id: %"PRIu32", tab_id:%"PRIu32", blob_ref_id: %"PRIu64"\n", myRepo->myRepoDatabase->myDatabaseID, tab_id, blob_ref_id);
00905
00906 self->myException.log(self, message);
00907 }
00908
00909 exit:
00910 unlock_(mylock);
00911 exit_();
00912 }
00913
00914 void MSRepoFile::commitBlob(MSOpenTable *otab, uint64_t offset, uint16_t head_size, uint32_t tab_id, uint64_t blob_id, uint64_t blob_ref_id, uint32_t auth_code)
00915 {
00916 CSMutex *mylock;
00917 MSRepoPointersRec ptr;
00918 uint32_t size;
00919 size_t ref_size, ref_count, read_size;
00920 MSRepoTableRefPtr tab_ref;
00921
00922 enter_();
00923
00924 mylock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
00925 lock_(mylock);
00926
00927 ASSERT(head_size <= MS_OT_BUFFER_SIZE);
00928 read_size = read(otab->myOTBuffer, offset, head_size, 0);
00929 ptr.rp_chars = otab->myOTBuffer;
00930 if (CS_GET_DISK_4(ptr.rp_head->rd_magic_4) != MS_BLOB_HEADER_MAGIC)
00931 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
00932
00933
00934 if (read_size < myRepo->myRepoBlobHeadSize)
00935 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB header incomplete");
00936 if ( ! IN_USE_BLOB_STATUS(CS_GET_DISK_1(ptr.rp_head->rb_status_1)))
00937 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
00938 if (auth_code && CS_GET_DISK_4(ptr.rp_bytes + myRepo->myRepoBlobHeadSize - 4) != auth_code)
00939 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB data does not match reference");
00940
00941
00942
00943 if (head_size != CS_GET_DISK_2(ptr.rp_head->rb_head_size_2)) {
00944 head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00945 read_size = read(otab->myOTBuffer, offset, head_size, myRepo->myRepoBlobHeadSize);
00946 }
00947
00948 head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
00949 if (read_size < head_size) {
00950
00951
00952
00953
00954
00955
00956 head_size = read_size;
00957 }
00958 ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
00959 ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
00960
00961 size = head_size - myRepo->myRepoBlobHeadSize;
00962 if (size > ref_size * ref_count)
00963 size = ref_size * ref_count;
00964 ptr.rp_chars += myRepo->myRepoBlobHeadSize;
00965 while (size >= ref_size) {
00966 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00967 case MS_BLOB_FREE_REF:
00968 case MS_BLOB_TABLE_REF:
00969 break;
00970 case MS_BLOB_DELETE_REF: {
00971 break;
00972 }
00973 default:
00974 tab_ref = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
00975 if (CS_GET_DISK_4(tab_ref->tr_table_id_4) == tab_id &&
00976 CS_GET_DISK_6(tab_ref->tr_blob_id_6) == blob_id) {
00977 uint64_t ref_id = CS_GET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8);
00978 if (COMMIT_MASK(ref_id) == blob_ref_id) {
00979
00980 CS_SET_DISK_8(ptr.rp_blob_ref->er_blob_ref_id_8, blob_ref_id);
00981 offset += (ptr.rp_chars - otab->myOTBuffer) + offsetof(MSRepoBlobRefRec, er_blob_ref_id_8);
00982 write(&(ptr.rp_blob_ref->er_blob_ref_id_8), offset, 8);
00983 goto exit;
00984 }
00985 }
00986 break;
00987 }
00988 ptr.rp_chars += ref_size;
00989 size -= ref_size;
00990 }
00991
00992 if (PBMSDaemon::isDaemonState(PBMSDaemon::DaemonStartUp) == false) {
00993 char message[100];
00994 snprintf(message, 100, "BLOB reference not found: db_id: %"PRIu32", tab_id:%"PRIu32", blob_ref_id: %"PRIu64"\n", myRepo->myRepoDatabase->myDatabaseID, tab_id, blob_ref_id);
00995 self->myException.log(self, message);
00996 }
00997
00998 exit:
00999 unlock_(mylock);
01000 exit_();
01001 }
01002
01003 void MSRepoFile::realFreeBlob(MSOpenTable *otab, char *buffer, uint32_t auth_code, uint64_t offset, uint16_t head_size, uint64_t blob_size, size_t ref_size)
01004 {
01005 uint32_t tab_id;
01006 uint64_t blob_id;
01007 size_t size;
01008 MSRepoPointersRec ptr;
01009 enter_();
01010
01011 ptr.rp_chars = buffer;
01012
01013 if (BLOB_IN_CLOUD(CS_GET_DISK_1(ptr.rp_head->rb_storage_type_1))) {
01014 CloudKeyRec key;
01015 getBlobKey(ptr.rp_head, &key);
01016 if (!myRepo->myRepoDatabase->myBlobCloud)
01017 CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Deleting cloud BLOB without cloud.");
01018
01019 myRepo->myRepoDatabase->myBlobCloud->cl_deleteData(&key);
01020 }
01021
01022 #ifdef HAVE_ALIAS_SUPPORT
01023 uint32_t alias_hash;
01024 alias_hash = CS_GET_DISK_4(ptr.rp_head->rb_alias_hash_4);
01025 if (alias_hash != INVALID_ALIAS_HASH)
01026 myRepo->myRepoDatabase->deleteBlobAlias(myRepo->myRepoID, offset, alias_hash);
01027 #endif
01028
01029
01030 CS_SET_DISK_1(ptr.rp_head->rb_status_1, MS_BLOB_DELETED);
01031 write(ptr.rp_chars + MS_BLOB_STAT_OFFS, offset + MS_BLOB_STAT_OFFS, head_size - MS_BLOB_STAT_OFFS);
01032
01033
01034 updateGarbage(head_size + blob_size);
01035
01036
01037 size = head_size - myRepo->myRepoBlobHeadSize;
01038 ptr.rp_chars += myRepo->myRepoBlobHeadSize;
01039 while (size >= ref_size) {
01040 if (CS_GET_DISK_2(ptr.rp_ref->rr_type_2) == MS_BLOB_TABLE_REF) {
01041 tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
01042 blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
01043 removeBlob(otab, tab_id, blob_id, offset, auth_code);
01044 }
01045 ptr.rp_chars += ref_size;
01046 size -= ref_size;
01047 }
01048 exit_();
01049 }
01050
01051
01052 void MSRepoFile::freeTableReference(MSOpenTable *otab, uint64_t offset, uint16_t head_size, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code)
01053 {
01054 CSMutex *mylock;
01055 MSRepoPointersRec ptr;
01056 uint32_t blob_ref_count = 0;
01057 uint32_t table_ref_count = 0;
01058 bool modified = false;
01059 uint32_t size;
01060 size_t ref_size, ref_count, read_size;
01061 MSRepoTableRefPtr tab_ref = NULL;
01062 uint64_t blob_size;
01063
01064 enter_();
01065
01066 mylock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
01067 lock_(mylock);
01068
01069 ASSERT(head_size <= MS_OT_BUFFER_SIZE);
01070 read_size = read(otab->myOTBuffer, offset, head_size, 0);
01071 ptr.rp_chars = otab->myOTBuffer;
01072 if (CS_GET_DISK_4(ptr.rp_head->rd_magic_4) != MS_BLOB_HEADER_MAGIC)
01073 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "Invalid BLOB identifier");
01074 if (read_size < myRepo->myRepoBlobHeadSize) {
01075 removeBlob(otab, tab_id, blob_id, offset, auth_code);
01076 goto exit;
01077 }
01078 if ((! IN_USE_BLOB_STATUS(CS_GET_DISK_1(ptr.rp_head->rb_status_1))) ||
01079 CS_GET_DISK_4(ptr.rp_bytes + myRepo->myRepoBlobHeadSize - 4) != auth_code) {
01080 removeBlob(otab, tab_id, blob_id, offset, auth_code);
01081 goto exit;
01082 }
01083
01084
01085 if (head_size != CS_GET_DISK_2(ptr.rp_head->rb_head_size_2)) {
01086 head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
01087 read_size = read(otab->myOTBuffer, offset, head_size, myRepo->myRepoBlobHeadSize);
01088 }
01089 head_size = CS_GET_DISK_2(ptr.rp_head->rb_head_size_2);
01090 blob_size = CS_GET_DISK_6(ptr.rp_head->rb_blob_repo_size_6);
01091 if (read_size < head_size) {
01092
01093
01094
01095
01096
01097
01098 head_size = read_size;
01099 blob_size = 0;
01100
01101 }
01102 ref_size = CS_GET_DISK_1(ptr.rp_head->rb_ref_size_1);
01103 ref_count = CS_GET_DISK_2(ptr.rp_head->rb_ref_count_2);
01104 size = head_size - myRepo->myRepoBlobHeadSize;
01105 if (size > ref_size * ref_count)
01106 size = ref_size * ref_count;
01107 ptr.rp_chars += myRepo->myRepoBlobHeadSize;
01108 while (size >= ref_size) {
01109 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
01110 case MS_BLOB_FREE_REF:
01111 break;
01112 case MS_BLOB_TABLE_REF:
01113 if (CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4) == tab_id &&
01114 CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6) == blob_id)
01115 tab_ref = ptr.rp_tab_ref;
01116 break;
01117 case MS_BLOB_DELETE_REF:
01118 break;
01119 default:
01120 MSRepoTableRefPtr tr;
01121
01122 tr = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
01123 if (CS_GET_DISK_2(tr->rr_type_2) == MS_BLOB_TABLE_REF) {
01124
01125
01126
01127
01128 if (CS_GET_DISK_4(tr->tr_table_id_4) == tab_id && CS_GET_DISK_6(tr->tr_blob_id_6) == blob_id) {
01129
01130 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
01131 modified = true;
01132 }
01133 else
01134 blob_ref_count++;
01135
01136 }
01137 break;
01138 }
01139 ptr.rp_chars += ref_size;
01140 size -= ref_size;
01141 }
01142
01143 if (!table_ref_count && tab_ref) {
01144 CS_SET_DISK_2(tab_ref->rr_type_2, MS_BLOB_FREE_REF);
01145 modified = true;
01146 }
01147
01148
01149 if (!blob_ref_count) {
01150 realFreeBlob(otab, otab->myOTBuffer, auth_code, offset, head_size, blob_size, ref_size);
01151 } else if (modified)
01152
01153
01154
01155
01156 write(otab->myOTBuffer + MS_BLOB_STAT_OFFS, offset + MS_BLOB_STAT_OFFS, head_size - MS_BLOB_STAT_OFFS);
01157
01158 unlock_(mylock);
01159
01160 if (!table_ref_count || !tab_ref)
01161
01162
01163
01164
01165
01166 removeBlob(otab, tab_id, blob_id, offset, auth_code);
01167
01168 exit_();
01169
01170 exit:
01171 unlock_(mylock);
01172 exit_();
01173 }
01174
01175 void MSRepoFile::checkBlob(CSStringBuffer *buffer, uint64_t offset, uint32_t auth_code, uint32_t temp_log_id, uint32_t temp_log_offset)
01176 {
01177 CSMutex *mylock;
01178 MSBlobHeadRec blob;
01179 MSRepoPointersRec ptr;
01180 uint32_t blob_ref_count = 0;
01181 bool modified = false;
01182 uint32_t size;
01183 size_t ref_size, ref_count, read_size;
01184 uint8_t status;
01185 uint16_t head_size;
01186 uint64_t blob_size;
01187 MSRepoTempRefPtr my_ref = NULL;
01188 uint16_t ref_type = MS_BLOB_FREE_REF;
01189 enter_();
01190
01191
01192 mylock = &myRepo->myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
01193 lock_(mylock);
01194
01195
01196 if (read(&blob, offset, sizeof(MSBlobHeadRec), 0) < sizeof(MSBlobHeadRec))
01197 goto exit;
01198
01199
01200
01201
01202
01203
01204
01205
01206
01207 if (CS_GET_DISK_4(blob.rd_magic_4) != MS_BLOB_HEADER_MAGIC)
01208 goto exit;
01209
01210 head_size = CS_GET_DISK_2(blob.rb_head_size_2);
01211 blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
01212 ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
01213 ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
01214 status = CS_GET_DISK_1(blob.rb_status_1);
01215 if (! IN_USE_BLOB_STATUS(status))
01216 goto exit;
01217
01218
01219 buffer->setLength(head_size);
01220 ptr.rp_chars = buffer->getBuffer(0);
01221 read_size = read(ptr.rp_chars, offset, head_size, 0);
01222 if (read_size < myRepo->myRepoBlobHeadSize)
01223 goto exit;
01224 if (CS_GET_DISK_4(ptr.rp_bytes + myRepo->myRepoBlobHeadSize - 4) != auth_code)
01225 goto exit;
01226 if (read_size < head_size) {
01227
01228
01229
01230
01231
01232
01233 head_size = read_size;
01234 blob_size = 0;
01235 }
01236 size = head_size - myRepo->myRepoBlobHeadSize;
01237 if (size > ref_size * ref_count)
01238 size = ref_size * ref_count;
01239
01240
01241
01242 ptr.rp_chars += myRepo->myRepoBlobHeadSize;
01243 while (size >= ref_size) {
01244 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
01245 case MS_BLOB_FREE_REF:
01246 break;
01247 case MS_BLOB_TABLE_REF:
01248 break;
01249 case MS_BLOB_DELETE_REF:
01250 if (CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4) == temp_log_id &&
01251 CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4) == temp_log_offset) {
01252 ref_type = CS_GET_DISK_2(ptr.rp_ref->rr_type_2);
01253 my_ref = ptr.rp_temp_ref;
01254 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
01255 modified = true;
01256 }
01257 break;
01258 default:
01259 MSRepoTableRefPtr tr;
01260 uint32_t tabi;
01261
01262 tabi = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
01263 if (tabi < ref_count) {
01264 tr = (MSRepoTableRefPtr) (buffer->getBuffer(0) + myRepo->myRepoBlobHeadSize + (tabi-1) * ref_size);
01265 if (CS_GET_DISK_2(tr->rr_type_2) == MS_BLOB_TABLE_REF)
01266 blob_ref_count++;
01267 }
01268 break;
01269 }
01270 ptr.rp_chars += ref_size;
01271 size -= ref_size;
01272 }
01273
01274 if ((ref_type == (uint16_t)MS_BLOB_DELETE_REF) && !blob_ref_count) {
01275 realFreeBlob(NULL, buffer->getBuffer(0), auth_code, offset, head_size, blob_size, ref_size);
01276 }
01277
01278 exit:
01279 unlock_(mylock);
01280 exit_();
01281 }
01282
01283 void MSRepoFile::returnToPool()
01284 {
01285 myRepo->myRepoDatabase->returnRepoFileToPool(this);
01286 }
01287
01288 void MSRepoFile::removeBlob(MSOpenTable *otab, uint32_t tab_id, uint64_t blob_id, uint64_t offset, uint32_t auth_code)
01289 {
01290 enter_();
01291 if (otab && otab->getDBTable()->myTableID == tab_id)
01292 otab->getDBTable()->freeBlobHandle(otab, blob_id, myRepo->myRepoID, offset, auth_code);
01293 else {
01294 MSOpenTable *tmp_otab;
01295
01296 if ((tmp_otab = MSTableList::getOpenTableByID(myRepo->myRepoDatabase->myDatabaseID, tab_id))) {
01297 frompool_(tmp_otab);
01298 tmp_otab->getDBTable()->freeBlobHandle(tmp_otab, blob_id, myRepo->myRepoID, offset, auth_code);
01299 backtopool_(tmp_otab);
01300 }
01301 }
01302 exit_();
01303 }
01304
01305 MSRepoFile *MSRepoFile::newRepoFile(MSRepository *repo, CSPath *path)
01306 {
01307 MSRepoFile *f;
01308
01309 if (!(f = new MSRepoFile())) {
01310 path->release();
01311 CSException::throwOSError(CS_CONTEXT, ENOMEM);
01312 }
01313 f->myRepo = repo;
01314 f->myFilePath = path;
01315 return f;
01316 }
01317
01318
01319
01320
01321
01322
01323 MSRepository::MSRepository(uint32_t id, MSDatabase *db, off64_t file_size):
01324 CSSharedRefObject(),
01325 myRepoID(id),
01326 myRepoFileSize(file_size),
01327 myRepoLockState(REPO_UNLOCKED),
01328 isRemovingFP(false),
01329 myRepoDatabase(db),
01330 myGarbageCount(0),
01331 myRepoHeadSize(0),
01332 myRepoDefRefSize(0),
01333 myRepoBlobHeadSize(0),
01334 myRecoveryOffset(0),
01335 myLastTempTime(0),
01336 myLastAccessTime(0),
01337 myLastCreateTime(0),
01338 myLastRefTime(0),
01339 mustBeDeleted(false),
01340 myRepoXLock(false),
01341 iFilePool(NULL)
01342 {
01343 }
01344
01345 MSRepository::~MSRepository()
01346 {
01347 CSPath *path = NULL;
01348
01349 enter_();
01350 if (mustBeDeleted) {
01351 path = getRepoFilePath();
01352 push_(path);
01353 }
01354
01355 isRemovingFP = true;
01356 removeRepoFilesNotInUse();
01357
01358 iPoolFiles.clear();
01359
01360 if (path) {
01361 path->removeFile();
01362 release_(path);
01363 }
01364 exit_();
01365 }
01366
01367 void MSRepository::openRepoFileForWriting(MSOpenTable *otab)
01368 {
01369 if (!otab->myWriteRepoFile)
01370 otab->myWriteRepoFile = openRepoFile();
01371 }
01372
01373 uint64_t MSRepository::receiveBlob(MSOpenTable *otab, uint16_t head_size, uint64_t blob_size, Md5Digest *checksum, CSInputStream *stream)
01374 {
01375 off64_t offset;
01376 size_t tfer;
01377
01378 enter_();
01379
01380 offset = myRepoFileSize;
01381
01382 offset += head_size;
01383
01384 ASSERT(myRepoDatabase->myBlobType == MS_STANDARD_STORAGE);
01385 if (stream) {
01386 CSMd5 md5;
01387 push_(stream);
01388 md5.md5_init();
01389 while (blob_size > 0) {
01390 if (blob_size <= MS_OT_BUFFER_SIZE)
01391 tfer = (size_t) blob_size;
01392 else
01393 tfer = MS_OT_BUFFER_SIZE;
01394 tfer = stream->read(otab->myOTBuffer, tfer);
01395 if (!tfer)
01396 CSException::throwOSError(CS_CONTEXT, EPIPE);
01397 if (checksum) md5.md5_append((const u_char *)(otab->myOTBuffer), tfer);
01398 otab->myWriteRepoFile->write(otab->myOTBuffer, offset, tfer);
01399 offset += (uint64_t) tfer;
01400 blob_size -= (uint64_t) tfer;
01401 }
01402 if (checksum) md5.md5_get_digest(checksum);
01403 release_(stream);
01404 } else {
01405
01406 otab->myWriteRepoFile->write("x" , offset + blob_size -1, 1);
01407 }
01408
01409 return_( myRepoFileSize);
01410 }
01411
01412
01413 uint64_t MSRepository::copyBlob(MSOpenTable *otab, uint64_t size, CSInputStream *stream)
01414 {
01415 off64_t offset = myRepoFileSize;
01416 size_t tfer;
01417
01418 while (size > 0) {
01419 if (size <= MS_OT_BUFFER_SIZE)
01420 tfer = (size_t) size;
01421 else
01422 tfer = MS_OT_BUFFER_SIZE;
01423 tfer = stream->read(otab->myOTBuffer, tfer);
01424 if (!tfer)
01425 CSException::throwOSError(CS_CONTEXT, EPIPE);
01426 otab->myWriteRepoFile->write(otab->myOTBuffer, offset, tfer);
01427 offset += (uint64_t) tfer;
01428 size -= (uint64_t) tfer;
01429 }
01430
01431 return myRepoFileSize;
01432 }
01433
01434 void MSRepository::writeBlobHead(MSOpenTable *otab, uint64_t offset, uint8_t ref_size, uint16_t head_size, uint64_t blob_size, Md5Digest *checksum, char *metadata, uint16_t metadata_size, uint64_t blob_id, uint32_t auth_code, uint32_t log_id, uint32_t log_offset, uint8_t blob_type, CloudKeyPtr cloud_key)
01435 {
01436 MSBlobHeadPtr blob ;
01437 MSRepoTableRefPtr tab_ref;
01438 MSRepoTempRefPtr temp_ref;
01439 time_t now;
01440 uint16_t tab_idx, max_ref_count = (head_size - myRepoBlobHeadSize - metadata_size) / ref_size;
01441 size_t size;
01442
01443 if (max_ref_count > MS_REPO_MIN_REF_COUNT)
01444 max_ref_count = MS_REPO_MIN_REF_COUNT;
01445
01446 ASSERT(max_ref_count > 1);
01447
01448 if (blob_type == MS_CLOUD_STORAGE)
01449 now = cloud_key->creation_time;
01450 else
01451 now = time(NULL);
01452
01453 blob = (MSBlobHeadPtr) otab->myOTBuffer;
01454 CS_SET_DISK_4(blob->rb_last_access_4, now);
01455 CS_SET_DISK_4(blob->rb_mod_time_4, now);
01456 CS_SET_DISK_4(blob->rb_access_count_4, 0);
01457 CS_SET_DISK_4(blob->rb_backup_id_4, 0);
01458 CS_SET_DISK_4(blob->rb_create_time_4, now);
01459 CS_SET_DISK_4(blob->rd_magic_4, MS_BLOB_HEADER_MAGIC);
01460 CS_SET_DISK_2(blob->rb_head_size_2, head_size);
01461 CS_SET_DISK_6(blob->rb_blob_data_size_6, blob_size);
01462 CS_SET_DISK_1(blob->rb_status_1, MS_BLOB_ALLOCATED);
01463 CS_SET_DISK_1(blob->rb_ref_size_1, ref_size);
01464 CS_SET_DISK_2(blob->rb_ref_count_2, max_ref_count);
01465 CS_SET_DISK_4(blob->rb_last_ref_4, 0);
01466 CS_SET_DISK_4(otab->myOTBuffer + myRepoBlobHeadSize - 4, auth_code);
01467 if (checksum)
01468 memcpy(&(blob->rb_blob_checksum_md5d), checksum, sizeof(Md5Digest));
01469
01470 CS_SET_DISK_2(blob->rb_mdata_size_2, metadata_size);
01471 if (metadata_size) {
01472 uint16_t metadata_offset = head_size - metadata_size;
01473
01474 CS_SET_DISK_2(blob->rb_mdata_offset_2, metadata_offset);
01475 memcpy(otab->myOTBuffer + metadata_offset, metadata, metadata_size);
01476
01477 #ifdef HAVE_ALIAS_SUPPORT
01478 MetaData md;
01479 md.use_data(metadata, metadata_size);
01480 const char *alias;
01481 alias = md.findAlias();
01482 if (alias) {
01483 uint32_t alias_hash;
01484 uint16_t alias_offset = metadata_offset + (uint16_t) (alias - metadata);
01485 CS_SET_DISK_2(blob->rb_alias_offset_2, alias_offset);
01486 alias_hash = myRepoDatabase->registerBlobAlias(myRepoID, offset, alias);
01487 CS_SET_DISK_4(blob->rb_alias_hash_4, alias_hash);
01488 } else {
01489 CS_SET_DISK_2(blob->rb_alias_offset_2, 0);
01490 }
01491 #else
01492 CS_SET_DISK_2(blob->rb_alias_offset_2, 0);
01493 #endif
01494
01495 } else {
01496 CS_SET_DISK_2(blob->rb_mdata_offset_2, 0);
01497 CS_SET_DISK_2(blob->rb_alias_offset_2, 0);
01498 }
01499
01500
01501 if (blob_id) {
01502 tab_ref = (MSRepoTableRefPtr) (otab->myOTBuffer + myRepoBlobHeadSize);
01503 CS_SET_DISK_2(tab_ref->rr_type_2, MS_BLOB_TABLE_REF);
01504 CS_SET_DISK_4(tab_ref->tr_table_id_4, otab->getDBTable()->myTableID);
01505 CS_SET_DISK_6(tab_ref->tr_blob_id_6, blob_id);
01506 temp_ref = (MSRepoTempRefPtr) (otab->myOTBuffer + myRepoBlobHeadSize + ref_size);
01507 tab_idx = 1;
01508 size = myRepoBlobHeadSize + ref_size + ref_size;
01509 }
01510 else {
01511 temp_ref = (MSRepoTempRefPtr) (otab->myOTBuffer + myRepoBlobHeadSize);
01512 tab_idx = INVALID_INDEX;
01513 size = myRepoBlobHeadSize + ref_size;
01514 }
01515
01516 CS_SET_DISK_2(temp_ref->rr_type_2, MS_BLOB_DELETE_REF);
01517 CS_SET_DISK_2(temp_ref->tp_del_ref_2, tab_idx);
01518 CS_SET_DISK_4(temp_ref->tp_log_id_4, log_id);
01519 CS_SET_DISK_4(temp_ref->tp_offset_4, log_offset);
01520
01521 if (blob_type == MS_CLOUD_STORAGE) {
01522 CS_SET_DISK_4(blob->rb_s3_key_id_4, cloud_key->ref_index);
01523 CS_SET_DISK_4(blob->rb_s3_cloud_ref_4, cloud_key->cloud_ref);
01524 blob_size = 0;
01525 }
01526
01527 memset(otab->myOTBuffer + size, 0, head_size - size - metadata_size);
01528
01529 CS_SET_DISK_1(blob->rb_storage_type_1, blob_type);
01530 CS_SET_DISK_6(blob->rb_blob_repo_size_6, blob_size);
01531 otab->myWriteRepoFile->write(blob, offset, head_size);
01532
01533 setRepoFileSize(otab, offset + head_size + blob_size);
01534 }
01535
01536 void MSRepository::setRepoFileSize(MSOpenTable *otab, off64_t offset)
01537 {
01538 myRepoFileSize = offset;
01539 if (myRepoFileSize >= PBMSParameters::getRepoThreshold()
01540 || getGarbageLevel() >= PBMSParameters::getGarbageThreshold())
01541 otab->closeForWriting();
01542 }
01543
01544 void MSRepository::syncHead(MSRepoFile *fh)
01545 {
01546 MSRepoHeadRec head;
01547
01548 fh->sync();
01549 myRecoveryOffset = myRepoFileSize;
01550 CS_SET_DISK_8(head.rh_recovery_offset_8, myRecoveryOffset);
01551 CS_SET_DISK_4(head.rh_last_temp_time_4, myLastTempTime);
01552 CS_SET_DISK_4(head.rh_last_access_4, myLastAccessTime);
01553 CS_SET_DISK_4(head.rh_create_time_4, myLastCreateTime);
01554 CS_SET_DISK_4(head.rh_last_ref_4, myLastRefTime);
01555
01556 fh->write(&head.rh_recovery_offset_8, offsetof(MSRepoHeadRec, rh_recovery_offset_8), 24);
01557 fh->sync();
01558 }
01559
01560 MSRepoFile *MSRepository::openRepoFile()
01561 {
01562 MSRepoFile *fh;
01563
01564 enter_();
01565 fh = MSRepoFile::newRepoFile(this, getRepoFilePath());
01566 push_(fh);
01567 if (myRepoFileSize)
01568 fh->open(CSFile::DEFAULT);
01569 else
01570 fh->open(CSFile::CREATE);
01571 if (!myRepoHeadSize) {
01572 MSRepoHeadRec head;
01573 MSBlobHeadRec blob;
01574 size_t size;
01575 int status;
01576 int ref_size;
01577 uint16_t head_size;
01578 uint64_t blob_size;
01579
01580 lock_(this);
01581
01582 if (!myRepoHeadSize) {
01583 if (fh->read(&head, 0, offsetof(MSRepoHeadRec, rh_reserved_4), 0) < offsetof(MSRepoHeadRec, rh_reserved_4)) {
01584 CS_SET_DISK_4(head.rh_magic_4, MS_REPO_FILE_MAGIC);
01585 CS_SET_DISK_2(head.rh_version_2, MS_REPO_FILE_VERSION);
01586 CS_SET_DISK_2(head.rh_repo_head_size_2, MS_REPO_FILE_HEAD_SIZE);
01587 CS_SET_DISK_2(head.rh_blob_head_size_2, sizeof(MSBlobHeadRec));
01588 CS_SET_DISK_2(head.rh_def_ref_size_2, sizeof(MSRepoGenericRefRec));
01589 CS_SET_DISK_8(head.rh_recovery_offset_8, MS_REPO_FILE_HEAD_SIZE);
01590 CS_SET_DISK_8(head.rh_garbage_count_8, 0);
01591 CS_SET_DISK_4(head.rh_last_temp_time_4, 0);
01592 CS_SET_DISK_4(head.rh_last_access_4, 0);
01593 CS_SET_DISK_4(head.rh_create_time_4, 0);
01594 CS_SET_DISK_4(head.rh_last_ref_4, 0);
01595 CS_SET_DISK_4(head.rh_reserved_4, 0);
01596 fh->write(&head, 0, sizeof(MSRepoHeadRec));
01597 }
01598
01599
01600 if (CS_GET_DISK_4(head.rh_magic_4) != MS_REPO_FILE_MAGIC)
01601 CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_BAD_HEADER_MAGIC);
01602 if (CS_GET_DISK_2(head.rh_version_2) > MS_REPO_FILE_VERSION)
01603 CSException::throwFileError(CS_CONTEXT, fh->getPathString(), CS_ERR_VERSION_TOO_NEW);
01604
01605
01606 myRepoHeadSize = CS_GET_DISK_2(head.rh_repo_head_size_2);
01607 myRepoDefRefSize = CS_GET_DISK_2(head.rh_def_ref_size_2);
01608 myRepoBlobHeadSize = CS_GET_DISK_2(head.rh_blob_head_size_2);
01609 myRecoveryOffset = CS_GET_DISK_8(head.rh_recovery_offset_8);
01610 myGarbageCount = CS_GET_DISK_8(head.rh_garbage_count_8);
01611 myLastTempTime = CS_GET_DISK_4(head.rh_last_temp_time_4);
01612 myLastAccessTime = CS_GET_DISK_4(head.rh_last_access_4);
01613 myLastCreateTime = CS_GET_DISK_4(head.rh_create_time_4);
01614 myLastRefTime = CS_GET_DISK_4(head.rh_last_ref_4);
01615
01616
01617 if (myRepoFileSize < myRepoHeadSize)
01618 myRepoFileSize = myRepoHeadSize;
01619
01620 ASSERT(myGarbageCount <= myRepoFileSize);
01621
01622
01623 while (myRecoveryOffset < myRepoFileSize) {
01624 if ((size = fh->read(&blob, myRecoveryOffset, MS_MIN_BLOB_HEAD_SIZE, 0)) < MS_MIN_BLOB_HEAD_SIZE) {
01625 if (size != 0) {
01626 myRepoFileSize = myRecoveryOffset;
01627 fh->setEOF(myRepoFileSize);
01628 }
01629 break;
01630 }
01631 uint16_t ref_count, mdata_size, mdata_offset;
01632
01633 status = CS_GET_DISK_1(blob.rb_status_1);
01634 ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
01635 ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
01636 head_size = CS_GET_DISK_2(blob.rb_head_size_2);
01637 mdata_size = CS_GET_DISK_2(blob.rb_mdata_size_2);
01638 mdata_offset = CS_GET_DISK_2(blob.rb_mdata_offset_2);
01639 blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
01640 if ((CS_GET_DISK_4(blob.rd_magic_4) != MS_BLOB_HEADER_MAGIC) ||
01641 (! IN_USE_BLOB_STATUS(status)) ||
01642 head_size < (myRepoBlobHeadSize + ref_size * MS_REPO_MIN_REF_COUNT) ||
01643 head_size < (mdata_offset + mdata_size) ||
01644 ((blob_size == 0) && (BLOB_IN_REPOSITORY(CS_GET_DISK_1(blob.rb_storage_type_1)))) ||
01645 myRecoveryOffset + head_size + blob_size > myRepoFileSize) {
01646 myRepoFileSize = myRecoveryOffset;
01647 fh->setEOF(myRepoFileSize);
01648 break;
01649 }
01650 myRecoveryOffset += head_size + blob_size;
01651 }
01652
01653 fh->sync();
01654 myRecoveryOffset = myRepoFileSize;
01655 CS_SET_DISK_8(head.rh_recovery_offset_8, myRecoveryOffset);
01656 fh->write(&head, offsetof(MSRepoHeadRec, rh_recovery_offset_8), 8);
01657 fh->sync();
01658 }
01659 unlock_(this);
01660 }
01661 pop_(fh);
01662 return_(fh);
01663 }
01664
01665 void MSRepository::lockRepo(RepoLockState state)
01666 {
01667 CSMutex *myLock;
01668 enter_();
01669
01670 myLock = &myRepoWriteLock;
01671 lock_(myLock);
01672
01673 ASSERT(!myRepoXLock);
01674
01675 myRepoLockState = state;
01676 myRepoXLock = true;
01677
01678 unlock_(myLock);
01679 exit_();
01680 }
01681
01682 void MSRepository::signalCompactor()
01683 {
01684 #ifndef MS_COMPACTOR_POLLS
01685 if (!mustBeDeleted) {
01686 if (getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
01687 if (myRepoDatabase->myCompactorThread)
01688 myRepoDatabase->myCompactorThread->wakeup();
01689 }
01690 }
01691 #endif
01692 }
01693
01694 void MSRepository::unlockRepo(RepoLockState state)
01695 {
01696 CSMutex *myLock;
01697 enter_();
01698 myLock = &myRepoWriteLock;
01699 lock_(myLock);
01700
01701 ASSERT(myRepoLockState & state);
01702
01703 myRepoLockState &= ~state;
01704 if (myRepoLockState == REPO_UNLOCKED) {
01705 myRepoXLock = false;
01706 signalCompactor();
01707 }
01708 unlock_(myLock);
01709
01710 exit_();
01711 }
01712
01713
01714
01715
01716 void MSRepository::returnToPool()
01717 {
01718 CSMutex *myLock;
01719 enter_();
01720 myLock = &myRepoWriteLock;
01721 lock_(myLock);
01722 this->myRepoLockState &= ~(REPO_COMPACTING | REPO_WRITE);
01723 if ( this->myRepoLockState == REPO_UNLOCKED) {
01724 myRepoXLock = false;
01725 signalCompactor();
01726 }
01727 unlock_(myLock);
01728
01729 this->release();
01730 exit_();
01731 }
01732
01733 void MSRepository::backupCompleted()
01734 {
01735 CSMutex *myLock;
01736 enter_();
01737 myLock = &myRepoWriteLock;
01738 lock_(myLock);
01739
01740
01741 this->myRepoLockState &= ~REPO_BACKUP;
01742 if ( this->myRepoLockState == REPO_UNLOCKED) {
01743 myRepoXLock = false;
01744 signalCompactor();
01745 }
01746
01747 unlock_(myLock);
01748 exit_();
01749 }
01750
01751 bool MSRepository::lockedForBackup() { return ((myRepoLockState & REPO_BACKUP) == REPO_BACKUP);}
01752
01753 uint32_t MSRepository::initBackup()
01754 {
01755 CSMutex *myLock;
01756 uint32_t state;
01757 enter_();
01758
01759 myLock = &myRepoWriteLock;
01760 lock_(myLock);
01761 state = this->myRepoLockState;
01762 this->myRepoLockState |= REPO_BACKUP;
01763 if (this->myRepoLockState == REPO_BACKUP)
01764 this->myRepoXLock = true;
01765
01766 unlock_(myLock);
01767 return_(state);
01768 }
01769
01770 MSRepoFile *MSRepository::getRepoFile()
01771 {
01772 MSRepoFile *file;
01773
01774 if ((file = iFilePool)) {
01775 iFilePool = file->nextFile;
01776 file->nextFile = NULL;
01777 file->isFileInUse = true;
01778 file->retain();
01779 }
01780 return file;
01781 }
01782
01783 void MSRepository::addRepoFile(MSRepoFile *file)
01784 {
01785 iPoolFiles.addFront(file);
01786 }
01787
01788 void MSRepository::removeRepoFile(MSRepoFile *file)
01789 {
01790 iPoolFiles.remove(file);
01791 }
01792
01793 void MSRepository::returnRepoFile(MSRepoFile *file)
01794 {
01795 file->isFileInUse = false;
01796 file->nextFile = iFilePool;
01797 iFilePool = file;
01798 }
01799
01800 bool MSRepository::removeRepoFilesNotInUse()
01801 {
01802 MSRepoFile *file, *curr_file;
01803
01804 iFilePool = NULL;
01805
01806 if ((file = (MSRepoFile *) iPoolFiles.getBack())) {
01807 do {
01808 curr_file = file;
01809 file = (MSRepoFile *) file->getNextLink();
01810 if (!curr_file->isFileInUse)
01811 iPoolFiles.remove(curr_file);
01812 } while (file);
01813 }
01814 return iPoolFiles.getSize() == 0;
01815 }
01816
01817 off64_t MSRepository::getRepoFileSize()
01818 {
01819 return myRepoFileSize;
01820 }
01821
01822 size_t MSRepository::getRepoHeadSize()
01823 {
01824 return myRepoHeadSize;
01825 }
01826
01827 size_t MSRepository::getRepoBlobHeadSize()
01828 {
01829 return myRepoBlobHeadSize;
01830 }
01831
01832 CSMutex *MSRepository::getRepoLock(off64_t offset)
01833 {
01834 return &myRepoLock[offset % CS_REPO_REC_LOCK_COUNT];
01835 }
01836
01837 uint32_t MSRepository::getRepoID()
01838 {
01839 return myRepoID;
01840 }
01841
01842 uint32_t MSRepository::getGarbageLevel()
01843 {
01844 if (myRepoFileSize <= myRepoHeadSize)
01845 return 0;
01846 return myGarbageCount * 100 / (myRepoFileSize - myRepoHeadSize);
01847 }
01848
01849 CSPath *MSRepository::getRepoFilePath()
01850 {
01851 char file_name[120];
01852
01853 cs_strcpy(120, file_name, "bs-repository");
01854 cs_add_dir_char(120, file_name);
01855 cs_strcat(120, file_name, "repo-");
01856 cs_strcat(120, file_name, myRepoID);
01857 cs_strcat(120, file_name, ".bs");
01858
01859 if (myRepoDatabase && myRepoDatabase->myDatabasePath) {
01860 return CSPath::newPath(RETAIN(myRepoDatabase->myDatabasePath), file_name);
01861 }
01862 return NULL;
01863 }
01864