00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "cslib/CSConfig.h"
00030
00031 #include "defs_ms.h"
00032
00033 #include "cslib/CSGlobal.h"
00034 #include "cslib/CSLog.h"
00035 #include "cslib/CSStrUtil.h"
00036 #include "cslib/CSPath.h"
00037
00038 #include "open_table_ms.h"
00039 #include "table_ms.h"
00040 #include "connection_handler_ms.h"
00041 #include "engine_ms.h"
00042 #include "transaction_ms.h"
00043 #include "parameters_ms.h"
00044
00045
00046
00047
00048
00049
00050 MSOpenTable::MSOpenTable():
00051 CSRefObject(),
00052 CSPooled(),
00053 inUse(true),
00054 isNotATable(false),
00055 nextTable(NULL),
00056 myPool(NULL),
00057 myTableFile(NULL),
00058 myWriteRepo(NULL),
00059 myWriteRepoFile(NULL),
00060 myTempLogFile(NULL),
00061 iNextLink(NULL),
00062 iPrevLink(NULL)
00063
00064
00065
00066 {
00067 memset(myOTBuffer, 0, MS_OT_BUFFER_SIZE);
00068 }
00069
00070 MSOpenTable::~MSOpenTable()
00071 {
00072 close();
00073 }
00074
00075 void MSOpenTable::close()
00076 {
00077 enter_();
00078 if (myTableFile) {
00079 myTableFile->release();
00080 myTableFile = NULL;
00081 }
00082 closeForWriting();
00083 if (myTempLogFile) {
00084 myTempLogFile->release();
00085 myTempLogFile = NULL;
00086 }
00087
00088
00089
00090
00091
00092
00093
00094
00095 exit_();
00096 }
00097
00098 void MSOpenTable::returnToPool()
00099 {
00100 MSTableList::releaseTable(this);
00101 }
00102
00103
00104
00105 class CreateBlobCleanUp : public CSRefObject {
00106 bool do_cleanup;
00107 uint64_t old_size;
00108 MSOpenTable *ot;
00109 MSRepository *repo;
00110
00111 public:
00112
00113 CreateBlobCleanUp(): CSRefObject(),
00114 do_cleanup(false){}
00115
00116 ~CreateBlobCleanUp()
00117 {
00118 if (do_cleanup) {
00119 repo->setRepoFileSize(ot, old_size);
00120
00121 }
00122 }
00123
00124 void setCleanUp(MSOpenTable *ot_arg, MSRepository *repo_arg, uint64_t size)
00125 {
00126 old_size = size;
00127 repo = repo_arg;
00128 ot = ot_arg;
00129 do_cleanup = true;
00130 }
00131
00132 void cancelCleanUp()
00133 {
00134 do_cleanup = false;
00135 }
00136
00137 };
00138
00139 void MSOpenTable::createBlob(PBMSBlobURLPtr bh, uint64_t blob_size, char *metadata, uint16_t metadata_size, CSInputStream *stream, CloudKeyPtr cloud_key, Md5Digest *checksum)
00140 {
00141 uint64_t repo_offset;
00142 uint64_t blob_id = 0;
00143 uint32_t auth_code;
00144 uint16_t head_size;
00145 uint32_t log_id;
00146 uint32_t log_offset;
00147 uint32_t temp_time;
00148 uint64_t repo_size;
00149 uint64_t repo_id;
00150 Md5Digest my_checksum;
00151 CloudKeyRec cloud_key_rec;
00152 CreateBlobCleanUp *cleanup;
00153 enter_();
00154
00155 new_(cleanup, CreateBlobCleanUp());
00156 push_(cleanup);
00157
00158 if (!checksum)
00159 checksum = &my_checksum;
00160
00161 if (stream) push_(stream);
00162 openForWriting();
00163 ASSERT(myWriteRepo);
00164 auth_code = random();
00165 repo_size = myWriteRepo->getRepoFileSize();
00166 temp_time = myWriteRepo->myLastTempTime;
00167
00168
00169 cleanup->setCleanUp(this, myWriteRepo, repo_size);
00170
00171 head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
00172 if (getDB()->myBlobType == MS_STANDARD_STORAGE) {
00173 pop_(stream);
00174 repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size, checksum, stream);
00175 } else {
00176 ASSERT(getDB()->myBlobType == MS_CLOUD_STORAGE);
00177 CloudDB *cloud = getDB()->myBlobCloud;
00178
00179 if (!cloud)
00180 CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Creating cloud BLOB without cloud.");
00181
00182 repo_offset = repo_size + head_size;
00183 memset(checksum, 0, sizeof(Md5Digest));
00184
00185
00186 if (stream) {
00187 cloud_key = &cloud_key_rec;
00188 cloud->cl_getNewKey(cloud_key);
00189 pop_(stream);
00190 cloud->cl_putData(cloud_key, stream, blob_size);
00191 }
00192
00193 }
00194
00195 repo_id = myWriteRepo->myRepoID;
00196 if (isNotATable) {
00197 getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
00198 formatRepoURL(bh, repo_id, repo_offset, auth_code, blob_size);
00199 }
00200 else {
00201 blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
00202 getDB()->queueForDeletion(this, MS_TL_BLOB_REF, getDBTable()->myTableID, blob_id, auth_code, &log_id, &log_offset, &temp_time);
00203 formatBlobURL(bh, blob_id, auth_code, blob_size, 0);
00204 }
00205
00206 myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, checksum, metadata, metadata_size, blob_id, auth_code, log_id, log_offset, getDB()->myBlobType, cloud_key);
00207
00208 cleanup->cancelCleanUp();
00209 release_(cleanup);
00210
00211 exit_();
00212 }
00213
00214
00215 void MSOpenTable::createBlob(PBMSBlobIDPtr blob_id, uint64_t blob_size, char *metadata, uint16_t metadata_size)
00216 {
00217 uint64_t repo_size;
00218 uint64_t repo_offset;
00219 uint64_t repo_id;
00220 uint32_t auth_code;
00221 uint16_t head_size;
00222 uint32_t log_id;
00223 uint32_t log_offset;
00224 uint32_t temp_time;
00225 CreateBlobCleanUp *cleanup;
00226 enter_();
00227
00228 new_(cleanup, CreateBlobCleanUp());
00229 push_(cleanup);
00230
00231 openForWriting();
00232 ASSERT(myWriteRepo);
00233 auth_code = random();
00234
00235 repo_size = myWriteRepo->getRepoFileSize();
00236
00237
00238 cleanup->setCleanUp(this, myWriteRepo, repo_size);
00239
00240 head_size = myWriteRepo->getDefaultHeaderSize(metadata_size);
00241
00242 repo_offset = myWriteRepo->receiveBlob(this, head_size, blob_size);
00243 repo_id = myWriteRepo->myRepoID;
00244 temp_time = myWriteRepo->myLastTempTime;
00245 getDB()->queueForDeletion(this, MS_TL_REPO_REF, repo_id, repo_offset, auth_code, &log_id, &log_offset, &temp_time);
00246 myWriteRepo->myLastTempTime = temp_time;
00247 myWriteRepo->writeBlobHead(this, repo_offset, myWriteRepo->myRepoDefRefSize, head_size, blob_size, NULL, metadata, metadata_size, 0, auth_code, log_id, log_offset, MS_STANDARD_STORAGE, NULL);
00248
00249
00250 blob_id->bi_db_id = getDB()->myDatabaseID;
00251 blob_id->bi_blob_id = repo_offset;
00252 blob_id->bi_tab_id = repo_id;
00253 blob_id->bi_auth_code = auth_code;
00254 blob_id->bi_blob_size = blob_size;
00255 blob_id->bi_blob_type = MS_URL_TYPE_REPO;
00256 blob_id->bi_blob_ref_id = 0;
00257
00258 cleanup->cancelCleanUp();
00259 release_(cleanup);
00260
00261 exit_();
00262 }
00263
00264 void MSOpenTable::sendRepoBlob(uint64_t blob_id, uint64_t req_offset, uint64_t req_size, uint32_t auth_code, bool info_only, CSHTTPOutputStream *stream)
00265 {
00266 uint32_t repo_id;
00267 uint64_t offset;
00268 uint64_t size;
00269 uint16_t head_size;
00270 MSRepoFile *repo_file;
00271
00272 enter_();
00273 openForReading();
00274 getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &size, &head_size, true);
00275 repo_file = getDB()->getRepoFileFromPool(repo_id, false);
00276 frompool_(repo_file);
00277
00278 repo_file->sendBlob(this, offset, req_offset, req_size, 0, false, info_only, stream);
00279 backtopool_(repo_file);
00280 exit_();
00281 }
00282
00283 void MSOpenTable::freeReference(uint64_t blob_id, uint64_t blob_ref_id)
00284 {
00285 uint32_t repo_id;
00286 uint64_t offset;
00287 uint64_t blob_size;
00288 uint16_t head_size;
00289 MSRepoFile *repo_file;
00290 uint32_t auth_code = 0;
00291
00292 enter_();
00293 openForReading();
00294
00295 getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &blob_size, &head_size, true);
00296 repo_file = getDB()->getRepoFileFromPool(repo_id, false);
00297
00298 frompool_(repo_file);
00299 repo_file->releaseBlob(this, offset, head_size, getDBTable()->myTableID, blob_id, blob_ref_id, auth_code);
00300 backtopool_(repo_file);
00301
00302 exit_();
00303 }
00304
00305 void MSOpenTable::commitReference(uint64_t blob_id, uint64_t blob_ref_id)
00306 {
00307 uint32_t repo_id;
00308 uint64_t offset;
00309 uint64_t blob_size;
00310 uint16_t head_size;
00311 MSRepoFile *repo_file;
00312 uint32_t auth_code = 0;
00313
00314 enter_();
00315 openForReading();
00316
00317 getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &blob_size, &head_size, true);
00318 repo_file = getDB()->getRepoFileFromPool(repo_id, false);
00319
00320 frompool_(repo_file);
00321 repo_file->commitBlob(this, offset, head_size, getDBTable()->myTableID, blob_id, blob_ref_id, auth_code);
00322 backtopool_(repo_file);
00323
00324 exit_();
00325 }
00326
00327 void MSOpenTable::useBlob(int type, uint32_t db_id, uint32_t tab_id, uint64_t blob_id, uint32_t auth_code, uint16_t col_index, uint64_t blob_size, uint64_t blob_ref_id, PBMSBlobURLPtr ret_blob_url)
00328 {
00329 MSRepoFile *repo_file= NULL;
00330 MSBlobHeadRec blob;
00331 CSInputStream *stream;
00332 MSDatabase *blob_db;
00333 int state;
00334 uint16_t head_size;
00335 uint64_t repo_offset;
00336 uint32_t repo_id;
00337
00338 enter_();
00339
00340 blob_db = getDB();
00341
00342 if (!blob_db->isRecovering()) {
00343
00344
00345
00346
00347
00348 openForReading();
00349 if (type == MS_URL_TYPE_REPO) {
00350 uint32_t ac;
00351 uint8_t status;
00352 bool same_db = true;
00353
00354 if (blob_db->myDatabaseID == db_id)
00355 repo_file = blob_db->getRepoFileFromPool(tab_id, false);
00356 else {
00357 same_db = false;
00358 blob_db = MSDatabase::getDatabase(db_id);
00359 push_(blob_db);
00360 repo_file = blob_db->getRepoFileFromPool(tab_id, false);
00361 release_(blob_db);
00362 blob_db = repo_file->myRepo->myRepoDatabase;
00363 }
00364
00365 frompool_(repo_file);
00366 repo_file->read(&blob, blob_id, MS_MIN_BLOB_HEAD_SIZE, MS_MIN_BLOB_HEAD_SIZE);
00367
00368 repo_offset = blob_id;
00369 blob_size = CS_GET_DISK_6(blob.rb_blob_data_size_6);
00370 head_size = CS_GET_DISK_2(blob.rb_head_size_2);
00371
00372 ac = CS_GET_DISK_4(blob.rb_auth_code_4);
00373 if (auth_code != ac)
00374 CSException::throwException(CS_CONTEXT, MS_ERR_AUTH_FAILED, "Invalid BLOB identifier");
00375 status = CS_GET_DISK_1(blob.rb_status_1);
00376 if ( ! IN_USE_BLOB_STATUS(status))
00377 CSException::throwException(CS_CONTEXT, MS_ERR_NOT_FOUND, "BLOB has already been deleted");
00378
00379 if (same_db) {
00380
00381 repo_id = tab_id;
00382 blob_id = getDBTable()->createBlobHandle(this, tab_id, blob_id, blob_size, head_size, auth_code);
00383 state = MS_UB_NEW_HANDLE;
00384 }
00385 else {
00386
00387 getDB()->openWriteRepo(this);
00388
00389
00390
00391 if (getDB()->myBlobCloud || myWriteRepo->myRepoDatabase->myBlobCloud)
00392 CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Copying cloud BLOB between databases is not supported.");
00393
00394 stream = repo_file->getInputStream(repo_offset);
00395 push_(stream);
00396 repo_offset = myWriteRepo->copyBlob(this, head_size + blob_size, stream);
00397 release_(stream);
00398
00399
00400 repo_id = myWriteRepo->myRepoID;
00401 blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
00402 state = MS_UB_NEW_BLOB;
00403 }
00404 backtopool_(repo_file);
00405 }
00406 else {
00407
00408 if (blob_db->myDatabaseID == db_id && getDBTable()->myTableID == tab_id) {
00409 getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &repo_offset, &blob_size, &head_size, true);
00410
00411 state = MS_UB_SAME_TAB;
00412 }
00413 else {
00414 MSOpenTable *blob_otab;
00415
00416 blob_otab = MSTableList::getOpenTableByID(db_id, tab_id);
00417 frompool_(blob_otab);
00418 blob_otab->getDBTable()->readBlobHandle(blob_otab, blob_id, &auth_code, &repo_id, &repo_offset, &blob_size, &head_size, true);
00419 if (blob_db->myDatabaseID == db_id) {
00420 blob_id = getDBTable()->findBlobHandle(this, repo_id, repo_offset, blob_size, head_size, auth_code);
00421 if (blob_id == 0)
00422 blob_id = getDBTable()->createBlobHandle(this, repo_id, repo_offset, blob_size, head_size, auth_code);
00423 state = MS_UB_NEW_HANDLE;
00424 }
00425 else {
00426
00427
00428
00429 if (blob_db->myBlobCloud || myWriteRepo->myRepoDatabase->myBlobCloud)
00430 CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "Copying cloud BLOB between databases is not supported.");
00431
00432
00433
00434
00435
00436 blob_db->openWriteRepo(this);
00437
00438 stream = repo_file->getInputStream(repo_offset);
00439 push_(stream);
00440
00441 repo_offset = myWriteRepo->copyBlob(this, head_size + blob_size, stream);
00442
00443 release_(stream);
00444
00445 repo_id = myWriteRepo->myRepoID;
00446 blob_id = getDBTable()->createBlobHandle(this, myWriteRepo->myRepoID, repo_offset, blob_size, head_size, auth_code);
00447 state = MS_UB_NEW_BLOB;
00448 }
00449 backtopool_(blob_otab);
00450 }
00451
00452 }
00453
00454 blob_ref_id = blob_db->newBlobRefId();
00455
00456
00457
00458 tab_id = getDBTable()->myTableID;
00459
00460
00461 repo_file = blob_db->getRepoFileFromPool(repo_id, false);
00462 frompool_(repo_file);
00463 repo_file->referenceBlob(this, repo_offset, head_size, tab_id, blob_id, blob_ref_id, auth_code, col_index);
00464 backtopool_(repo_file);
00465
00466 MSTransactionManager::referenceBLOB(getDB()->myDatabaseID, tab_id, blob_id, blob_ref_id);
00467
00468 }
00469
00470 formatBlobURL(ret_blob_url, blob_id, auth_code, blob_size, tab_id, blob_ref_id);
00471
00472 exit_();
00473 }
00474
00475 void MSOpenTable::releaseReference(uint64_t blob_id, uint64_t blob_ref_id)
00476 {
00477 enter_();
00478
00479 MSTransactionManager::dereferenceBLOB(getDB()->myDatabaseID, getDBTable()->myTableID, blob_id, blob_ref_id);
00480
00481 exit_();
00482 }
00483
00484 void MSOpenTable::checkBlob(CSStringBuffer *buffer, uint64_t blob_id, uint32_t auth_code, uint32_t temp_log_id, uint32_t temp_log_offset)
00485 {
00486 uint32_t repo_id;
00487 uint64_t offset;
00488 uint64_t size;
00489 uint16_t head_size;
00490 MSRepoFile *repo_file;
00491
00492 enter_();
00493 openForReading();
00494 if (getDBTable()->readBlobHandle(this, blob_id, &auth_code, &repo_id, &offset, &size, &head_size, false)) {
00495 if ((repo_file = getDB()->getRepoFileFromPool(repo_id, true))) {
00496 frompool_(repo_file);
00497 repo_file->checkBlob(buffer, offset, auth_code, temp_log_id, temp_log_offset);
00498 backtopool_(repo_file);
00499 }
00500 else
00501 getDBTable()->freeBlobHandle(this, blob_id, repo_id, offset, auth_code);
00502 }
00503 exit_();
00504 }
00505
00506 bool MSOpenTable::deleteReferences(uint32_t temp_log_id, uint32_t temp_log_offset, bool *must_quit)
00507 {
00508 MSTableHeadRec tab_head;
00509 off64_t blob_id;
00510 MSTableBlobRec tab_blob;
00511 uint32_t repo_id;
00512 uint64_t repo_offset;
00513 uint16_t head_size;
00514 uint32_t auth_code;
00515 MSRepoFile *repo_file = NULL;
00516 bool result = true;
00517
00518 enter_();
00519 openForReading();
00520 if (myTableFile->read(&tab_head, 0, offsetof(MSTableHeadRec, th_reserved_4), 0) < offsetof(MSTableHeadRec, th_reserved_4))
00521
00522 goto exit;
00523 if (CS_GET_DISK_4(tab_head.th_temp_log_id_4) != temp_log_id ||
00524 CS_GET_DISK_4(tab_head.th_temp_log_offset_4) != temp_log_offset) {
00525
00526 result = false;
00527 goto exit;
00528 }
00529
00530 blob_id = CS_GET_DISK_2(tab_head.th_head_size_2);
00531 while (blob_id + sizeof(MSTableBlobRec) <= getDBTable()->getTableFileSize()) {
00532 if (*must_quit) {
00533
00534 result = false;
00535 break;
00536 }
00537 if (myTableFile->read(&tab_blob, blob_id, sizeof(MSTableBlobRec), 0) < sizeof(MSTableBlobRec))
00538 break;
00539 repo_id = CS_GET_DISK_3(tab_blob.tb_repo_id_3);
00540 repo_offset = CS_GET_DISK_6(tab_blob.tb_offset_6);
00541 head_size = CS_GET_DISK_2(tab_blob.tb_header_size_2);
00542 auth_code = CS_GET_DISK_4(tab_blob.tb_auth_code_4);
00543 if (repo_file && repo_file->myRepo->myRepoID != repo_id) {
00544 backtopool_(repo_file);
00545 repo_file = NULL;
00546 }
00547 if (!repo_file) {
00548 repo_file = getDB()->getRepoFileFromPool(repo_id, true);
00549 if (repo_file)
00550 frompool_(repo_file);
00551 }
00552 if (repo_file)
00553 repo_file->freeTableReference(this, repo_offset, head_size, getDBTable()->myTableID, blob_id, auth_code);
00554
00555 blob_id += sizeof(MSTableBlobRec);
00556 }
00557
00558 if (repo_file)
00559 backtopool_(repo_file);
00560
00561 exit:
00562 return_(result);
00563 }
00564
00565 void MSOpenTable::openForReading()
00566 {
00567 if (!myTableFile && !isNotATable)
00568 myTableFile = getDBTable()->openTableFile();
00569 }
00570
00571 void MSOpenTable::openForWriting()
00572 {
00573 if (myTableFile && myWriteRepo && myWriteRepoFile)
00574 return;
00575 enter_();
00576 openForReading();
00577 if (!myWriteRepo || !myWriteRepoFile)
00578 getDB()->openWriteRepo(this);
00579 exit_();
00580 }
00581
00582 void MSOpenTable::closeForWriting()
00583 {
00584 if (myWriteRepoFile) {
00585 myWriteRepoFile->myRepo->syncHead(myWriteRepoFile);
00586 myWriteRepoFile->release();
00587 myWriteRepoFile = NULL;
00588 }
00589 if (myWriteRepo) {
00590 myWriteRepo->unlockRepo(REPO_WRITE);
00591 #ifndef MS_COMPACTOR_POLLS
00592 if (myWriteRepo->getGarbageLevel() >= PBMSParameters::getGarbageThreshold()) {
00593 if (myWriteRepo->myRepoDatabase->myCompactorThread)
00594 myWriteRepo->myRepoDatabase->myCompactorThread->wakeup();
00595 }
00596 #endif
00597 myWriteRepo->release();
00598 myWriteRepo = NULL;
00599 }
00600 }
00601
00602 uint32_t MSOpenTable::getTableID()
00603 {
00604 return myPool->myPoolTable->myTableID;
00605 }
00606
00607 MSTable *MSOpenTable::getDBTable()
00608 {
00609 return myPool->myPoolTable;
00610 }
00611
00612 MSDatabase *MSOpenTable::getDB()
00613 {
00614 return myPool->myPoolDB;
00615 }
00616
00617 void MSOpenTable::formatBlobURL(PBMSBlobURLPtr blob_url, uint64_t blob_id, uint32_t auth_code, uint64_t blob_size, uint32_t tab_id, uint64_t blob_ref_id)
00618 {
00619 MSBlobURLRec blob;
00620
00621 blob.bu_type = MS_URL_TYPE_BLOB;
00622 blob.bu_db_id = getDB()->myDatabaseID;
00623 blob.bu_tab_id = tab_id;
00624 blob.bu_blob_id = blob_id;
00625 blob.bu_auth_code = auth_code;
00626 blob.bu_server_id = PBMSParameters::getServerID();
00627 blob.bu_blob_size = blob_size;
00628 blob.bu_blob_ref_id = blob_ref_id;
00629
00630 PBMSBlobURLTools::buildBlobURL(&blob, blob_url);
00631
00632 }
00633 void MSOpenTable::formatBlobURL(PBMSBlobURLPtr blob_url, uint64_t blob_id, uint32_t auth_code, uint64_t blob_size, uint64_t blob_ref_id)
00634 {
00635 formatBlobURL(blob_url, blob_id, auth_code, blob_size, getDBTable()->myTableID, blob_ref_id);
00636 }
00637 void MSOpenTable::formatRepoURL(PBMSBlobURLPtr blob_url, uint32_t log_id, uint64_t log_offset, uint32_t auth_code, uint64_t blob_size)
00638 {
00639 MSBlobURLRec blob;
00640
00641 blob.bu_type = MS_URL_TYPE_REPO;
00642 blob.bu_db_id = getDB()->myDatabaseID;
00643 blob.bu_tab_id = log_id;
00644 blob.bu_blob_id = log_offset;
00645 blob.bu_auth_code = auth_code;
00646 blob.bu_server_id = PBMSParameters::getServerID();
00647 blob.bu_blob_size = blob_size;
00648 blob.bu_blob_ref_id = 0;
00649
00650 PBMSBlobURLTools::buildBlobURL(&blob, blob_url);
00651 }
00652
00653 MSOpenTable *MSOpenTable::newOpenTable(MSOpenTablePool *pool)
00654 {
00655 MSOpenTable *otab;
00656
00657 if (!(otab = new MSOpenTable()))
00658 CSException::throwOSError(CS_CONTEXT, ENOMEM);
00659 if ((otab->myPool = pool))
00660 otab->isNotATable = pool->myPoolTable == NULL;
00661 else
00662 otab->isNotATable = false;
00663
00664 return otab;
00665 }
00666
00667
00668
00669
00670
00671
00672 MSOpenTablePool::MSOpenTablePool():
00673 myPoolTableID(0),
00674 isRemovingTP(false),
00675 myPoolTable(NULL),
00676 myPoolDB(NULL),
00677 iTablePool(NULL)
00678 {
00679 }
00680
00681 MSOpenTablePool::~MSOpenTablePool()
00682 {
00683 isRemovingTP = true;
00684 removeOpenTablesNotInUse();
00685
00686 iPoolTables.clear();
00687 if (myPoolTable)
00688 myPoolTable->release();
00689 if (myPoolDB)
00690 myPoolDB->release();
00691 }
00692
00693 #ifdef DEBUG
00694 void MSOpenTablePool::check()
00695 {
00696 MSOpenTable *otab, *ptab;
00697 bool found;
00698
00699 if ((otab = (MSOpenTable *) iPoolTables.getBack())) {
00700 do {
00701 found = false;
00702 ptab = iTablePool;
00703 while (ptab) {
00704 if (ptab == otab) {
00705 ASSERT(!found);
00706 found = true;
00707 }
00708 ptab = ptab->nextTable;
00709 }
00710 if (otab->inUse) {
00711 ASSERT(!found);
00712 }
00713 else {
00714 ASSERT(found);
00715 }
00716 otab = (MSOpenTable *) otab->getNextLink();
00717 } while (otab);
00718 }
00719 else
00720 ASSERT(!iTablePool);
00721 }
00722 #endif
00723
00724
00725
00726
00727
00728 MSOpenTable *MSOpenTablePool::getPoolTable()
00729 {
00730 MSOpenTable *otab;
00731
00732 if ((otab = iTablePool)) {
00733 iTablePool = otab->nextTable;
00734 otab->nextTable = NULL;
00735 ASSERT(!otab->inUse);
00736 otab->inUse = true;
00737 otab->retain();
00738 }
00739 return otab;
00740 }
00741
00742 void MSOpenTablePool::returnOpenTable(MSOpenTable *otab)
00743 {
00744 otab->inUse = false;
00745 otab->nextTable = iTablePool;
00746 iTablePool = otab;
00747 }
00748
00749
00750
00751
00752 void MSOpenTablePool::addOpenTable(MSOpenTable *otab)
00753 {
00754 iPoolTables.addFront(otab);
00755 }
00756
00757 void MSOpenTablePool::removeOpenTable(MSOpenTable *otab)
00758 {
00759 otab->close();
00760 iPoolTables.remove(otab);
00761 }
00762
00763 void MSOpenTablePool::removeOpenTablesNotInUse()
00764 {
00765 MSOpenTable *otab, *curr_otab;
00766
00767 iTablePool = NULL;
00768
00769 if ((otab = (MSOpenTable *) iPoolTables.getBack())) {
00770 do {
00771 curr_otab = otab;
00772 otab = (MSOpenTable *) otab->getNextLink();
00773 if (!curr_otab->inUse)
00774 iPoolTables.remove(curr_otab);
00775 } while (otab);
00776 }
00777 }
00778
00779 void MSOpenTablePool::returnToPool()
00780 {
00781 MSTableList::removeTablePool(this);
00782 }
00783
00784 MSOpenTablePool *MSOpenTablePool::newPool(uint32_t db_id, uint32_t tab_id)
00785 {
00786 MSOpenTablePool *pool;
00787 enter_();
00788
00789 if (!(pool = new MSOpenTablePool())) {
00790 CSException::throwOSError(CS_CONTEXT, ENOMEM);
00791 }
00792 push_(pool);
00793 pool->myPoolDB = MSDatabase::getDatabase(db_id);
00794 pool->myPoolTableID = tab_id;
00795 if (tab_id)
00796 pool->myPoolTable = pool->myPoolDB->getTable(tab_id, false);
00797 pop_(pool);
00798 return_(pool);
00799 }
00800
00801
00802
00803
00804
00805
00806 CSSyncOrderedList *MSTableList::gPoolListByID;
00807
00808 MSTableList::MSTableList()
00809 {
00810 }
00811
00812 MSTableList::~MSTableList()
00813 {
00814 }
00815
00816 void MSTableList::startUp()
00817 {
00818 new_(gPoolListByID, CSSyncOrderedList);
00819 }
00820
00821 void MSTableList::shutDown()
00822 {
00823 if (gPoolListByID) {
00824 gPoolListByID->clear();
00825 gPoolListByID->release();
00826 gPoolListByID = NULL;
00827 }
00828 }
00829
00830 class MSTableKey : public CSOrderKey {
00831 public:
00832 uint32_t myKeyDatabaseID;
00833 uint32_t myKeyTableID;
00834
00835 MSTableKey(): myKeyDatabaseID(0), myKeyTableID(0){ }
00836
00837 virtual ~MSTableKey() {
00838 }
00839
00840 int compareKey(CSObject *key) {return CSObject::compareKey(key);}
00841 virtual int compareKey(CSOrderKey *x) {
00842 MSTableKey *key = (MSTableKey *) x;
00843 int r = 0;
00844
00845 if (myKeyDatabaseID < key->myKeyDatabaseID)
00846 r = -1;
00847 else if (myKeyDatabaseID > key->myKeyDatabaseID)
00848 r = 1;
00849
00850 if (r == 0) {
00851 if (myKeyTableID < key->myKeyTableID)
00852 r = -1;
00853 else if (myKeyTableID > key->myKeyTableID)
00854 r = 1;
00855 }
00856 return r;
00857 }
00858
00859 public:
00860 static MSTableKey *newTableKey(uint32_t db_id, uint32_t tab_id)
00861 {
00862 MSTableKey *key;
00863
00864 if (!(key = new MSTableKey())) {
00865 CSException::throwOSError(CS_CONTEXT, ENOMEM);
00866 }
00867 key->myKeyDatabaseID = db_id;
00868 key->myKeyTableID = tab_id;
00869 return key;
00870 }
00871 };
00872
00873 MSOpenTable *MSTableList::getOpenTableByID(uint32_t db_id, uint32_t tab_id)
00874 {
00875 MSOpenTablePool *pool;
00876 MSOpenTable *otab = NULL;
00877 MSTableKey key;
00878
00879 enter_();
00880 lock_(gPoolListByID);
00881 key.myKeyDatabaseID = db_id;
00882 key.myKeyTableID = tab_id;
00883 pool = (MSOpenTablePool *) gPoolListByID->find(&key);
00884 if (!pool) {
00885 MSTableKey *key_ptr;
00886 pool = MSOpenTablePool::newPool(db_id, tab_id);
00887 key_ptr = MSTableKey::newTableKey(db_id, tab_id);
00888 gPoolListByID->add(key_ptr, pool);
00889 }
00890 if (!(otab = pool->getPoolTable())) {
00891 otab = MSOpenTable::newOpenTable(pool);
00892 pool->addOpenTable(otab);
00893 otab->retain();
00894 }
00895 unlock_(gPoolListByID);
00896 return_(otab);
00897 }
00898
00899 MSOpenTable *MSTableList::getOpenTableForDB(uint32_t db_id)
00900 {
00901 return(MSTableList::getOpenTableByID(db_id, 0));
00902 }
00903
00904
00905 void MSTableList::releaseTable(MSOpenTable *otab)
00906 {
00907 MSOpenTablePool *pool;
00908
00909 enter_();
00910 lock_(gPoolListByID);
00911 push_(otab);
00912 if ((pool = otab->myPool)) {
00913 if (pool->isRemovingTP) {
00914 pool->removeOpenTable(otab);
00915 gPoolListByID->wakeup();
00916 }
00917 else
00918 pool->returnOpenTable(otab);
00919 }
00920 release_(otab);
00921 unlock_(gPoolListByID);
00922 exit_();
00923 }
00924
00925 bool MSTableList::removeTablePoolIfEmpty(MSOpenTablePool *pool)
00926 {
00927 enter_();
00928 if (pool->getSize() == 0) {
00929 MSTableKey key;
00930
00931 key.myKeyDatabaseID = pool->myPoolDB->myDatabaseID;
00932 key.myKeyTableID = pool->myPoolTableID;
00933 gPoolListByID->remove(&key);
00934
00935
00936
00937 return_(true);
00938 }
00939 return_(false);
00940 }
00941
00942 void MSTableList::removeTablePool(MSOpenTablePool *pool)
00943 {
00944 enter_();
00945 lock_(gPoolListByID);
00946 for (;;) {
00947 pool->isRemovingTP = true;
00948 pool->removeOpenTablesNotInUse();
00949 if (removeTablePoolIfEmpty(pool))
00950 break;
00951
00952
00953
00954
00955
00956 gPoolListByID->wait();
00957 }
00958 unlock_(gPoolListByID);
00959 exit_();
00960 }
00961
00962
00963
00964
00965 void MSTableList::removeTablePool(MSOpenTable *otab)
00966 {
00967 MSOpenTablePool *pool;
00968 MSTableKey key;
00969
00970 key.myKeyDatabaseID = otab->getDB()->myDatabaseID;
00971 key.myKeyTableID = otab->getTableID();
00972
00973 enter_();
00974 frompool_(otab);
00975 lock_(gPoolListByID);
00976 for (;;) {
00977 if (!(pool = (MSOpenTablePool *) gPoolListByID->find(&key)))
00978 break;
00979 pool->isRemovingTP = true;
00980 pool->removeOpenTablesNotInUse();
00981 if (removeTablePoolIfEmpty(pool))
00982 break;
00983
00984
00985
00986
00987 gPoolListByID->wait();
00988 }
00989 unlock_(gPoolListByID);
00990 backtopool_(otab);
00991 exit_();
00992 }
00993
00994 void MSTableList::removeDatabaseTables(MSDatabase *database)
00995 {
00996 MSOpenTablePool *pool;
00997 uint32_t idx;
00998
00999
01000 enter_();
01001 push_(database);
01002
01003 retry:
01004 lock_(gPoolListByID);
01005 idx = 0;
01006 while ((pool = (MSOpenTablePool *) gPoolListByID->itemAt(idx))) {
01007 if (pool->myPoolDB == database) {
01008 break;
01009 }
01010 idx++;
01011 }
01012 unlock_(gPoolListByID);
01013
01014 if (pool) {
01015 removeTablePool(pool);
01016 goto retry;
01017 }
01018
01019 release_(database);
01020 exit_();
01021 }
01022
01023
01024
01025 MSOpenTablePool *MSTableList::lockTablePoolForDeletion(uint32_t db_id, uint32_t tab_id, CSString *db_name, CSString *tab_name)
01026 {
01027 MSOpenTablePool *pool;
01028 MSTableKey key;
01029
01030 enter_();
01031
01032 push_(db_name);
01033 if (tab_name)
01034 push_(tab_name);
01035
01036 key.myKeyDatabaseID = db_id;
01037 key.myKeyTableID = tab_id;
01038
01039 lock_(gPoolListByID);
01040
01041 for (;;) {
01042 if (!(pool = (MSOpenTablePool *) gPoolListByID->find(&key))) {
01043 char buffer[CS_EXC_MESSAGE_SIZE];
01044
01045 cs_strcpy(CS_EXC_MESSAGE_SIZE, buffer, "Table is temporarily not available: ");
01046 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, db_name->getCString());
01047 if(tab_name) {
01048 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, ".");
01049 cs_strcat(CS_EXC_MESSAGE_SIZE, buffer, tab_name->getCString());
01050 }
01051 CSException::throwException(CS_CONTEXT, MS_ERR_TABLE_LOCKED, buffer);
01052 }
01053 pool->isRemovingTP = true;
01054 pool->removeOpenTablesNotInUse();
01055 if (pool->getSize() == 0) {
01056
01057 break;
01058 }
01059
01060
01061
01062
01063 gPoolListByID->wait();
01064 }
01065 unlock_(gPoolListByID);
01066
01067 if (tab_name)
01068 release_(tab_name);
01069 release_(db_name);
01070 return_(pool);
01071
01072 }
01073
01074 MSOpenTablePool *MSTableList::lockTablePoolForDeletion(MSTable *tab)
01075 {
01076 CSString *tab_name = NULL, *db_name;
01077 uint32_t db_id, tab_id;
01078
01079 enter_();
01080
01081 db_name = tab->myDatabase->myDatabaseName;
01082 db_name->retain();
01083
01084 tab_name = tab->myTableName;
01085 tab_name->retain();
01086
01087 db_id = tab->myDatabase->myDatabaseID;
01088 tab_id = tab->myTableID;
01089
01090 tab->release();
01091
01092 return_( lockTablePoolForDeletion(db_id, tab_id, db_name, tab_name));
01093 }
01094
01095 MSOpenTablePool *MSTableList::lockTablePoolForDeletion(MSOpenTable *otab)
01096 {
01097 CSString *tab_name = NULL, *db_name;
01098 uint32_t db_id, tab_id;
01099 MSTable *tab;
01100
01101 enter_();
01102
01103 tab = otab->getDBTable();
01104 if (tab) {
01105 tab_name = tab->myTableName;
01106 tab_name->retain();
01107 }
01108
01109 db_name = otab->getDB()->myDatabaseName;
01110 db_name->retain();
01111
01112 db_id = otab->getDB()->myDatabaseID;
01113 tab_id = otab->getTableID();
01114
01115 otab->returnToPool();
01116
01117 return_( lockTablePoolForDeletion(db_id, tab_id, db_name, tab_name));
01118
01119 }
01120
01121