00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #include "cslib/CSConfig.h"
00031
00032 #include "defs_ms.h"
00033
00034 #include "cslib/CSGlobal.h"
00035 #include "cslib/CSStrUtil.h"
00036 #include "cslib/CSStorage.h"
00037
00038 #include "compactor_ms.h"
00039 #include "open_table_ms.h"
00040 #include "repository_ms.h"
00041 #include "parameters_ms.h"
00042
00043
00044
00045
00046
00047
00048 MSCompactorThread::MSCompactorThread(time_t wait_time, MSDatabase *db):
00049 CSDaemon(wait_time, NULL),
00050 iCompactorDatabase(db)
00051 {
00052 }
00053
00054 void MSCompactorThread::close()
00055 {
00056 }
00057
00058 bool MSCompactorThread::doWork()
00059 {
00060 bool complete;
00061 MSRepository *src_repo, *dst_repo;
00062 MSRepoFile *src_file, *dst_file;
00063 uint32_t src_repo_id;
00064 MSBlobHeadRec blob;
00065 off64_t src_offset;
00066 uint16_t head_size;
00067 uint64_t blob_size, blob_data_size;
00068 CSStringBuffer *head;
00069 MSRepoPointersRec ptr;
00070 uint32_t table_ref_count;
00071 uint32_t blob_ref_count;
00072 int ref_count;
00073 size_t ref_size;
00074 CSMutex *mylock;
00075 uint32_t tab_id;
00076 uint64_t blob_id;
00077 MSOpenTable *otab;
00078 uint32_t repo_id;
00079 uint64_t repo_offset;
00080 uint64_t repo_blob_size;
00081 uint16_t repo_head_size;
00082 uint16_t tab_index;
00083 uint8_t status;
00084
00085 enter_();
00086 retry:
00087
00088 #ifdef MS_COMPACTOR_POLLS
00089 if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(NULL)))
00090 return_(true);
00091 #else
00092 myWaitTime = MS_DEFAULT_COMPACTOR_WAIT * 1000;
00093 if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(&myWaitTime)))
00094 return_(true);
00095 #endif
00096 frompool_(src_repo);
00097 src_file = src_repo->openRepoFile();
00098 push_(src_file);
00099
00100 dst_repo = iCompactorDatabase->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
00101 frompool_(dst_repo);
00102 dst_file = dst_repo->openRepoFile();
00103 push_(dst_file);
00104
00105 new_(head, CSStringBuffer(100));
00106 push_(head);
00107
00108 complete = false;
00109 src_repo_id = src_repo->myRepoID;
00110 src_offset = src_repo->myRepoHeadSize;
00111
00112
00113 {
00114 int blockit = 0;
00115 if (blockit) {
00116 release_(head);
00117 release_(dst_file);
00118 backtopool_(dst_repo);
00119 release_(src_file);
00120 backtopool_(src_repo);
00121
00122 myWaitTime = 5 * 1000;
00123 return_(true);
00124 }
00125 }
00126 while (src_offset < src_repo->myRepoFileSize) {
00127 retry_loop:
00128 suspended();
00129
00130 if (myMustQuit)
00131 goto quit;
00132 retry_read:
00133
00134
00135
00136
00137 mylock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
00138 lock_(mylock);
00139 if (src_file->read(&blob, src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
00140 unlock_(mylock);
00141 break;
00142 }
00143 ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
00144 ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
00145 head_size = CS_GET_DISK_2(blob.rb_head_size_2);
00146 blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
00147 blob_data_size = CS_GET_DISK_6(blob.rb_blob_data_size_6);
00148 status = CS_GET_DISK_1(blob.rb_status_1);
00149 if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
00150 head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
00151 !VALID_BLOB_STATUS(status)) {
00152
00153 unlock_(mylock);
00154 src_offset++;
00155 goto retry_read;
00156 }
00157 if (IN_USE_BLOB_STATUS(status)) {
00158 head->setLength(head_size);
00159 if (src_file->read(head->getBuffer(0), src_offset, head_size, 0) != head_size) {
00160 unlock_(mylock);
00161 break;
00162 }
00163
00164 table_ref_count = 0;
00165 blob_ref_count = 0;
00166
00167 ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
00168 for (int count = 0; count < ref_count; count++) {
00169 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00170 case MS_BLOB_FREE_REF:
00171 break;
00172 case MS_BLOB_TABLE_REF:
00173
00174 tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
00175 blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
00176
00177 otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id);
00178 if (otab) {
00179 frompool_(otab);
00180
00181 uint32_t auth_code = 0;
00182 otab->getDBTable()->readBlobHandle(otab, blob_id, &auth_code, &repo_id, &repo_offset, &repo_blob_size, &repo_head_size, false);
00183 backtopool_(otab);
00184 if (repo_id == src_repo_id &&
00185 repo_offset == src_offset &&
00186 repo_blob_size == blob_data_size &&
00187 repo_head_size == head_size)
00188 table_ref_count++;
00189 else
00190
00191 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00192 }
00193 else
00194 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00195 break;
00196 case MS_BLOB_DELETE_REF:
00197
00198
00199 uint32_t temp_log_id;
00200 uint32_t temp_log_offset;
00201 MSTempLogFile *temp_log;
00202
00203 temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
00204 temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
00205 if ((temp_log = iCompactorDatabase->openTempLogFile(temp_log_id, NULL, NULL))) {
00206 MSTempLogItemRec log_item;
00207 uint32_t then;
00208 time_t now;
00209
00210 push_(temp_log);
00211 if (temp_log->read(&log_item, temp_log_offset, sizeof(MSTempLogItemRec), 0) == sizeof(MSTempLogItemRec)) {
00212 then = CS_GET_DISK_4(log_item.ti_time_4);
00213 now = time(NULL);
00214 if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
00215
00216 release_(temp_log);
00217 unlock_(mylock);
00218
00219
00220 lock_(this);
00221 suspendedWait(MSTempLog::adjustWaitTime(then, now));
00222 unlock_(this);
00223 goto retry_loop;
00224 }
00225 }
00226 release_(temp_log);
00227 }
00228
00229
00230 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00231 break;
00232 default:
00233 tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
00234 if (tab_index > ref_count || !tab_index) {
00235
00236 unlock_(mylock);
00237 src_offset++;
00238 goto retry_read;
00239 }
00240 blob_ref_count++;
00241 break;
00242 }
00243 ptr.rp_chars += ref_size;
00244 }
00245
00246 if (table_ref_count && blob_ref_count) {
00247
00248
00249
00250 MSRepoTableRefPtr tab_ref;
00251
00252 blob_ref_count = 0;
00253 ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
00254 for (int count = 0; count < ref_count; count++) {
00255 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00256 case MS_BLOB_FREE_REF:
00257 case MS_BLOB_TABLE_REF:
00258 case MS_BLOB_DELETE_REF:
00259 break;
00260 default:
00261
00262 tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
00263 if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
00264 blob_ref_count++;
00265 break;
00266 }
00267 ptr.rp_chars += ref_size;
00268 }
00269 }
00270
00271 if (blob_ref_count) {
00272 off64_t dst_offset;
00273
00274 dst_offset = dst_repo->myRepoFileSize;
00275
00276
00277 dst_file->write(head->getBuffer(0), dst_offset, head_size);
00278
00279
00280 CSFile::transfer(RETAIN(dst_file), dst_offset + head_size, RETAIN(src_file), src_offset + head_size, blob_size, iCompactBuffer, MS_COMPACTOR_BUFFER_SIZE);
00281
00282 #ifdef HAVE_ALIAS_SUPPORT
00283
00284 if (CS_GET_DISK_2(blob.rb_alias_offset_2)) {
00285 iCompactorDatabase->moveBlobAlias( src_repo_id, src_offset, CS_GET_DISK_4(blob.rb_alias_hash_4), dst_repo->myRepoID, dst_offset);
00286 }
00287 #endif
00288
00289 ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
00290 for (int count = 0; count < ref_count; count++) {
00291 switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00292 case MS_BLOB_FREE_REF:
00293 break;
00294 case MS_BLOB_TABLE_REF:
00295 tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
00296 blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
00297
00298 if ((otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id))) {
00299 frompool_(otab);
00300 otab->getDBTable()->updateBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, 0);
00301 backtopool_(otab);
00302 }
00303 break;
00304 case MS_BLOB_DELETE_REF:
00305 break;
00306 default:
00307 break;
00308 }
00309 ptr.rp_chars += ref_size;
00310 }
00311
00312 dst_repo->myRepoFileSize += head_size + blob_size;
00313 }
00314 }
00315
00316 unlock_(mylock);
00317 src_offset += head_size + blob_size;
00318 }
00319
00320 src_repo->mustBeDeleted = true;
00321 complete = true;
00322
00323 quit:
00324 release_(head);
00325 release_(dst_file);
00326 backtopool_(dst_repo);
00327 release_(src_file);
00328 backtopool_(src_repo);
00329
00330 if (complete)
00331 iCompactorDatabase->removeRepo(src_repo_id, &myMustQuit);
00332
00333 if (!myMustQuit)
00334 goto retry;
00335 return_(true);
00336 }
00337
00338 void *MSCompactorThread::completeWork()
00339 {
00340 close();
00341 return NULL;
00342 }
00343