Drizzled Public API Documentation

compactor_ms.cc

00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-07-10
00023  *
00024  * H&G2JCtL
00025  *
00026  * Network interface.
00027  *
00028  */
00029 
00030 #include "cslib/CSConfig.h"
00031 
00032 #include "defs_ms.h"
00033 
00034 #include "cslib/CSGlobal.h"
00035 #include "cslib/CSStrUtil.h"
00036 #include "cslib/CSStorage.h"
00037 
00038 #include "compactor_ms.h"
00039 #include "open_table_ms.h"
00040 #include "repository_ms.h"
00041 #include "parameters_ms.h"
00042 
00043 /*
00044  * ---------------------------------------------------------------
00045  * COMPACTOR THREAD
00046  */
00047 
00048 MSCompactorThread::MSCompactorThread(time_t wait_time, MSDatabase *db):
00049 CSDaemon(wait_time, NULL),
00050 iCompactorDatabase(db)
00051 {
00052 }
00053 
00054 void MSCompactorThread::close()
00055 {
00056 }
00057 
00058 bool MSCompactorThread::doWork()
00059 {
00060   bool        complete;
00061   MSRepository    *src_repo, *dst_repo;
00062   MSRepoFile      *src_file, *dst_file;
00063   uint32_t      src_repo_id;
00064   MSBlobHeadRec   blob;
00065   off64_t       src_offset;
00066   uint16_t      head_size;
00067   uint64_t      blob_size, blob_data_size;
00068   CSStringBuffer    *head;
00069   MSRepoPointersRec ptr;
00070   uint32_t        table_ref_count;
00071   uint32_t        blob_ref_count;
00072   int         ref_count;
00073   size_t        ref_size;
00074   CSMutex       *mylock;
00075   uint32_t      tab_id;
00076   uint64_t      blob_id;
00077   MSOpenTable     *otab;
00078   uint32_t      repo_id;
00079   uint64_t      repo_offset;
00080   uint64_t      repo_blob_size;
00081   uint16_t      repo_head_size;
00082   uint16_t      tab_index;
00083   uint8_t       status;
00084 
00085   enter_();
00086   retry:
00087   
00088 #ifdef MS_COMPACTOR_POLLS
00089   if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(NULL)))
00090     return_(true);
00091 #else
00092   myWaitTime = MS_DEFAULT_COMPACTOR_WAIT * 1000;  // Time in milli-seconds
00093   if (!(src_repo = iCompactorDatabase->getRepoFullOfTrash(&myWaitTime)))
00094     return_(true);
00095 #endif
00096   frompool_(src_repo);
00097   src_file = src_repo->openRepoFile();
00098   push_(src_file);
00099 
00100   dst_repo = iCompactorDatabase->lockRepo(src_repo->myRepoFileSize - src_repo->myGarbageCount);
00101   frompool_(dst_repo);
00102   dst_file = dst_repo->openRepoFile();
00103   push_(dst_file);
00104 
00105   new_(head, CSStringBuffer(100));
00106   push_(head);
00107 
00108   complete = false;
00109   src_repo_id = src_repo->myRepoID;
00110   src_offset = src_repo->myRepoHeadSize;
00111   //printf("\nCompacting repo %"PRId32"\n\n", src_repo_id);
00112   // For testing:
00113   {
00114     int blockit = 0;
00115     if (blockit) {
00116       release_(head);
00117       release_(dst_file);
00118       backtopool_(dst_repo);
00119       release_(src_file);
00120       backtopool_(src_repo);
00121 
00122       myWaitTime = 5 * 1000;  // Time in milli-seconds
00123       return_(true);
00124     }
00125   }
00126   while (src_offset < src_repo->myRepoFileSize) {     
00127     retry_loop:
00128     suspended();
00129 
00130     if (myMustQuit)
00131       goto quit;
00132     retry_read:
00133     
00134     // A lock is required here because references and dereferences to the
00135     // BLOBs can result in the repository record being updated while 
00136     // it is being copied.
00137     mylock = &src_repo->myRepoLock[src_offset % CS_REPO_REC_LOCK_COUNT];
00138     lock_(mylock);
00139     if (src_file->read(&blob, src_offset, src_repo->myRepoBlobHeadSize, 0) < src_repo->myRepoBlobHeadSize) {
00140       unlock_(mylock);
00141       break;
00142     }
00143     ref_size = CS_GET_DISK_1(blob.rb_ref_size_1);
00144     ref_count = CS_GET_DISK_2(blob.rb_ref_count_2);
00145     head_size = CS_GET_DISK_2(blob.rb_head_size_2);
00146     blob_size = CS_GET_DISK_6(blob.rb_blob_repo_size_6);
00147     blob_data_size = CS_GET_DISK_6(blob.rb_blob_data_size_6);
00148     status = CS_GET_DISK_1(blob.rb_status_1);
00149     if ((blob_data_size == 0) || ref_count <= 0 || ref_size == 0 ||
00150       head_size < src_repo->myRepoBlobHeadSize + ref_count * ref_size ||
00151       !VALID_BLOB_STATUS(status)) {
00152       /* Can't be true. Assume this is garbage! */
00153       unlock_(mylock);
00154       src_offset++;
00155       goto retry_read;
00156     }
00157     if (IN_USE_BLOB_STATUS(status)) {
00158       head->setLength(head_size);
00159       if (src_file->read(head->getBuffer(0), src_offset, head_size, 0) != head_size) {
00160         unlock_(mylock);
00161         break;
00162       }
00163 
00164       table_ref_count = 0;
00165       blob_ref_count = 0;
00166       
00167       ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
00168       for (int count = 0; count < ref_count; count++) {
00169         switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00170           case MS_BLOB_FREE_REF:
00171             break;
00172           case MS_BLOB_TABLE_REF:
00173             /* Check the reference: */
00174             tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
00175             blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
00176 
00177             otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id);
00178             if (otab) {
00179               frompool_(otab);
00180               /* Ignore the return value (it will fail because auth_code is wrong!)!! */
00181               uint32_t auth_code = 0;
00182               otab->getDBTable()->readBlobHandle(otab, blob_id, &auth_code, &repo_id, &repo_offset, &repo_blob_size, &repo_head_size, false);
00183               backtopool_(otab);
00184               if (repo_id == src_repo_id &&
00185                 repo_offset == src_offset &&
00186                 repo_blob_size == blob_data_size &&
00187                 repo_head_size == head_size)
00188                 table_ref_count++;
00189               else
00190                 /* Remove the reference: */
00191                 CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00192             }
00193             else
00194               CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00195             break;
00196           case MS_BLOB_DELETE_REF:
00197             /* These are temporary references from the TempLog file. */
00198             /* We try to prevent this from happening, but it can! */
00199             uint32_t      temp_log_id;
00200             uint32_t      temp_log_offset;
00201             MSTempLogFile *temp_log;
00202 
00203             temp_log_id = CS_GET_DISK_4(ptr.rp_temp_ref->tp_log_id_4);
00204             temp_log_offset = CS_GET_DISK_4(ptr.rp_temp_ref->tp_offset_4);
00205             if ((temp_log = iCompactorDatabase->openTempLogFile(temp_log_id, NULL, NULL))) {
00206               MSTempLogItemRec  log_item;
00207               uint32_t        then;
00208               time_t        now;
00209 
00210               push_(temp_log);
00211               if (temp_log->read(&log_item, temp_log_offset, sizeof(MSTempLogItemRec), 0) == sizeof(MSTempLogItemRec)) {
00212                 then = CS_GET_DISK_4(log_item.ti_time_4);
00213                 now = time(NULL);
00214                 if (now < (time_t)(then + PBMSParameters::getTempBlobTimeout())) {
00215                   /* Wait for the BLOB to expire before we continue: */                 
00216                   release_(temp_log);
00217                   unlock_(mylock);
00218 
00219                   /* Go to sleep until the problem has gone away! */
00220                   lock_(this);
00221                   suspendedWait(MSTempLog::adjustWaitTime(then, now));
00222                   unlock_(this);
00223                   goto retry_loop;
00224                 }
00225               }
00226               release_(temp_log);
00227             }
00228 
00229             /* Remove the temp reference: */
00230             CS_SET_DISK_2(ptr.rp_ref->rr_type_2, MS_BLOB_FREE_REF);
00231             break;
00232           default:
00233             tab_index = CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2);
00234             if (tab_index > ref_count || !tab_index) {
00235               /* Can't be true. Assume this is garbage! */
00236               unlock_(mylock);
00237               src_offset++;
00238               goto retry_read;
00239             }
00240             blob_ref_count++;
00241             break;
00242         }
00243         ptr.rp_chars += ref_size;
00244       }
00245 
00246       if (table_ref_count && blob_ref_count) {
00247         /* Check the blob references again to make sure that they
00248          * refer to valid table references.
00249          */
00250         MSRepoTableRefPtr tab_ref;
00251 
00252         blob_ref_count = 0;
00253         ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
00254         for (int count = 0; count < ref_count; count++) {
00255           switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00256             case MS_BLOB_FREE_REF:
00257             case MS_BLOB_TABLE_REF:
00258             case MS_BLOB_DELETE_REF:
00259               break;
00260             default: // If it isn't one of the above we assume it is an blob ref. (er_table_2 can never have a value equal to one of the above REF type flags.)
00261                 // It was already verified above that the index was with in range.
00262               tab_ref = (MSRepoTableRefPtr) (head->getBuffer(0) + src_repo->myRepoBlobHeadSize + (CS_GET_DISK_2(ptr.rp_blob_ref->er_table_2)-1) * ref_size);
00263               if (CS_GET_DISK_2(tab_ref->rr_type_2) == MS_BLOB_TABLE_REF)
00264                 blob_ref_count++;
00265               break;
00266           }
00267           ptr.rp_chars += ref_size;
00268         }
00269       }
00270 
00271       if (blob_ref_count) {
00272         off64_t dst_offset;
00273 
00274         dst_offset = dst_repo->myRepoFileSize;
00275 
00276         /* Write the header. */
00277         dst_file->write(head->getBuffer(0), dst_offset, head_size);
00278 
00279         /* We have an engine reference, copy the BLOB over: */
00280         CSFile::transfer(RETAIN(dst_file), dst_offset + head_size, RETAIN(src_file), src_offset + head_size, blob_size, iCompactBuffer, MS_COMPACTOR_BUFFER_SIZE);
00281 
00282 #ifdef HAVE_ALIAS_SUPPORT
00283         /* If the BLOB has an alias update the alias index. */
00284         if (CS_GET_DISK_2(blob.rb_alias_offset_2)) {
00285           iCompactorDatabase->moveBlobAlias( src_repo_id, src_offset, CS_GET_DISK_4(blob.rb_alias_hash_4), dst_repo->myRepoID, dst_offset);
00286         }
00287 #endif        
00288         /* Update the references: */
00289         ptr.rp_chars = head->getBuffer(0) + src_repo->myRepoBlobHeadSize;
00290         for (int count = 0; count < ref_count; count++) {
00291           switch (CS_GET_DISK_2(ptr.rp_ref->rr_type_2)) {
00292             case MS_BLOB_FREE_REF:
00293               break;
00294             case MS_BLOB_TABLE_REF:
00295               tab_id = CS_GET_DISK_4(ptr.rp_tab_ref->tr_table_id_4);
00296               blob_id = CS_GET_DISK_6(ptr.rp_tab_ref->tr_blob_id_6);
00297 
00298               if ((otab = MSTableList::getOpenTableByID(iCompactorDatabase->myDatabaseID, tab_id))) {
00299                 frompool_(otab);
00300                 otab->getDBTable()->updateBlobHandle(otab, blob_id, dst_repo->myRepoID, dst_offset, 0);
00301                 backtopool_(otab);
00302               }
00303               break;
00304             case MS_BLOB_DELETE_REF:
00305               break;
00306             default:
00307               break;
00308           }
00309           ptr.rp_chars += ref_size;
00310         }
00311 
00312         dst_repo->myRepoFileSize += head_size + blob_size;
00313       }
00314     }
00315     
00316     unlock_(mylock);
00317     src_offset += head_size + blob_size;
00318   }
00319 
00320   src_repo->mustBeDeleted = true;
00321   complete = true;
00322 
00323   quit:
00324   release_(head);
00325   release_(dst_file);
00326   backtopool_(dst_repo);
00327   release_(src_file);
00328   backtopool_(src_repo);
00329 
00330   if (complete)
00331     iCompactorDatabase->removeRepo(src_repo_id, &myMustQuit);
00332 
00333   if (!myMustQuit)
00334     goto retry;
00335   return_(true);
00336 }
00337 
00338 void *MSCompactorThread::completeWork()
00339 {
00340   close();
00341   return NULL;
00342 }
00343