Drizzled Public API Documentation

TransTest.cc

00001 /* Copyright (C) 2009 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Barry Leslie
00020  *
00021  * 2009-06-17
00022  *
00023  * H&G2JCtL
00024  *
00025  * PBMS transaction handling test driver.
00026  *
00027  * This is a test driver for the PBMS transaction log. It uses 2 tables in a database and
00028  * inserts transaction records into 1 while writing them to the transaction log. The transaction
00029  * log reader thread reads the transactions from the log and writes them to the second table.
00030  * After a recovery the 2 tables should be identical.
00031  *
00032  * Built in crash points can be triggered to test that the recovery works correctly.
00033  *
00034  */
00035  
00036 #ifdef UNIT_TEST
00037 
00038 #include <stdlib.h>
00039 #include <stdio.h>
00040 #include <unistd.h>
00041 #include <string.h>
00042 #include <ctype.h>
00043 #include <inttypes.h>
00044 
00045 #include "cslib/CSConfig.h"
00046 #include "cslib/CSGlobal.h"
00047 #include "cslib/CSThread.h"
00048 #include "cslib/CSStrUtil.h"
00049 #include "cslib/CSStorage.h"
00050 
00051 #include "trans_cache_ms.h"
00052 #include "trans_log_ms.h"
00053 
00054 #include "mysql.h"
00055 
00056 #define CREATE_TABLE_BODY "\
00057  (\
00058   blob_ref INT NOT NULL AUTO_INCREMENT,\
00059   tab_id INT NOT NULL,\
00060   blob_id BIGINT NOT NULL, \
00061   committed BOOLEAN NOT NULL DEFAULT 0, \
00062   PRIMARY KEY (blob_ref, tab_id)\
00063 )\
00064 ENGINE = INNODB\
00065 "
00066 #ifdef LOG_TABLE
00067 #undef LOG_TABLE
00068 #endif
00069 
00070 #define LOG_TABLE "translog"
00071 #define REF_TABLE "transref_%d"
00072 #define MAX_THREADS 20
00073 
00074 #define A_DB_ID 123
00075 
00076 #define TEST_DATABASE_NAME "TransTest"
00077 static const char *user_name = "root";
00078 static const char *user_passwd = "";
00079 static int  port = 3306;
00080 static const char *host = "localhost";
00081 static int nap_time = 1000;
00082 static int max_transaction = 10; // The maximum number of records generated per transaction
00083 static bool dump_log = false, overflow_crash = false;
00084 static int crash_site = 0;    // The location to crash at.
00085 static int num_threads = 1;   // The number of writer threads.
00086 //static int rate = 1000;     // The maximum transactions per second to allow.
00087 static time_t timeout = 60;   // How long to run for before crashing or shutting down.
00088 static bool revover_only = false;
00089 static bool recreate = false;
00090 
00091 static uint32_t cache_size = 0, log_size = 0;
00092 
00093 static MSTrans *trans_log;
00094 
00095 static CSThreadList *thread_list;
00096 
00097 static MYSQL *new_connection(bool check_for_db);
00098 
00099 static CSThread *main_thread;
00100 
00101 //------------------------------------------------
00102 class TransTestThread : public CSDaemon {
00103 public:
00104   TransTestThread(): 
00105     CSDaemon(thread_list),
00106     count(0),
00107     myActivity(0),
00108     log(NULL),
00109     stopit(false),
00110     finished(false),
00111     mysql(NULL)
00112     {}
00113 
00114    ~TransTestThread()
00115   {
00116     if (log)
00117       log->release();
00118       
00119     if (mysql)
00120       mysql_close(mysql);
00121   }
00122    
00123   MSTrans *log;
00124   MYSQL *mysql;
00125   uint32_t count;
00126   uint32_t myActivity;
00127 
00128   bool stopit;
00129   bool finished;
00130   
00131   virtual bool doWork() {return true;}  
00132 };
00133 
00134 //------------------------------------------------
00135 class TransTestWriterThread : public TransTestThread {
00136 public:
00137   TransTestWriterThread():TransTestThread() {}
00138   
00139   uint32_t tab_id;
00140   FILE  *myLog;
00141   
00142   void generate_records();
00143   bool doWork() 
00144   {
00145     generate_records();
00146     finished = true;
00147     return true;
00148   }
00149   
00150   static TransTestWriterThread *newTransTestWriterThread(uint32_t id)
00151   {
00152     TransTestWriterThread *tt;
00153     enter_();
00154     
00155     
00156     new_(tt, TransTestWriterThread());
00157     
00158     char name[32];
00159     sprintf(name, "write_%d.log", id);
00160     if (recreate)
00161       tt->myLog = fopen(name, "w+");
00162     else {
00163       tt->myLog = fopen(name, "a+");
00164       fprintf(tt->myLog, "====================================================\n");
00165     }
00166     
00167     tt->tab_id = id ;
00168     tt->mysql = new_connection(false);
00169     tt->log = trans_log;
00170     trans_log->retain();
00171     
00172     return_(tt); 
00173   }
00174   
00175   
00176 };
00177 
00178 //------------------------------------------------
00179 class TransTestReaderThread : public TransTestThread {
00180 public:
00181   TransTestReaderThread():TransTestThread(){}
00182   
00183   bool recovering;
00184   void processTransactionLog();
00185   bool doWork() 
00186   {
00187     processTransactionLog();
00188     return true;
00189   }
00190   
00191   static TransTestReaderThread *newTransTestReaderThread(MSTrans *log)
00192   {
00193     TransTestReaderThread *tt;
00194     enter_();
00195     
00196     new_(tt, TransTestReaderThread());
00197     tt->mysql = new_connection(false);
00198     tt->log = log;
00199     tt->log->retain();
00200     
00201     tt->log->txn_SetReader(tt); // The reader daemon is passed in unreferenced.
00202     tt->recovering = false;
00203     return_(tt); 
00204   }
00205   
00206   bool rec_found(uint64_t id, uint32_t tab_id) 
00207   {
00208     char stmt[100];
00209     MYSQL_RES *results = NULL;            
00210     bool found;
00211     
00212     sprintf(stmt, "SELECT blob_ref FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %"PRIu32"", id, tab_id); 
00213     if (mysql_query(mysql, stmt)) {
00214       printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
00215       printf("%s\n", stmt);
00216       exit(1);
00217     }
00218     
00219     
00220     results = mysql_store_result(mysql);
00221     if (!results){
00222       printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
00223       exit(1);
00224     }
00225 
00226     found = (mysql_num_rows(results) == 1);   
00227     mysql_free_result(results);
00228     
00229     return found;
00230     
00231   }
00232   
00233   
00234 };
00235 
00236 TransTestReaderThread *TransReader;
00237 //------------------------------------------------
00238 static void report_mysql_error(MYSQL *mysql, int line, const char *msg)
00239 {
00240   printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), line);
00241   if (msg)
00242     printf("%s\n", msg);
00243   exit(1);
00244 }
00245 
00246 
00247 //------------------------------------------------
00248 static MYSQL *new_connection(bool check_for_db)
00249 {
00250   MYSQL *mysql;
00251 
00252   mysql = mysql_init(NULL);
00253   if (!mysql) {
00254     printf( "mysql_init() failed.\n");
00255     exit(1);
00256   }
00257 
00258   if (mysql_real_connect(mysql, host, user_name, user_passwd, NULL, port, NULL, 0) == NULL)
00259     report_mysql_error(mysql, __LINE__, "mysql_real_connect()");
00260 
00261   if (check_for_db) {
00262     MYSQL_RES *results = NULL;            
00263     
00264     if (mysql_query(mysql, "show databases like \"" TEST_DATABASE_NAME "\""))
00265       report_mysql_error(mysql, __LINE__, "show databases like \"" TEST_DATABASE_NAME "\"");
00266     
00267     results = mysql_store_result(mysql);
00268     if (!results)
00269       report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00270 
00271 
00272     if (mysql_num_rows(results) != 1) {
00273       if (mysql_query(mysql, "create database " TEST_DATABASE_NAME ))
00274         report_mysql_error(mysql, __LINE__, "create database " TEST_DATABASE_NAME );
00275     }
00276     mysql_free_result(results);
00277   }
00278   
00279   if (mysql_query(mysql, "use " TEST_DATABASE_NAME ))
00280     report_mysql_error(mysql, __LINE__, "use " TEST_DATABASE_NAME );
00281 
00282   return mysql;
00283 }
00284 
00285 //------------------------------------------------
00286 static void init_database(MYSQL *mysql, int cnt)
00287 {
00288   char stmt[1024];
00289   
00290   unlink("ms-trans-log.dat");
00291   mysql_query(mysql, "drop table if exists " LOG_TABLE  ";");
00292   
00293   if (mysql_query(mysql, "create table " LOG_TABLE  CREATE_TABLE_BODY ";")){
00294     printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
00295     exit(1);
00296   }
00297 
00298   while (cnt) {
00299     sprintf(stmt, "drop table if exists " REF_TABLE  ";", cnt);
00300     mysql_query(mysql, stmt);
00301     sprintf(stmt, "create table " REF_TABLE  CREATE_TABLE_BODY ";", cnt);
00302     if (mysql_query(mysql, stmt)){
00303       printf( "MySQL ERROR: %d \"%s\" line %d\n", mysql_errno(mysql), mysql_error(mysql), __LINE__);
00304       exit(1);
00305     }
00306     cnt--;
00307   }
00308 }
00309 
00310 
00311 //------------------------------------------------
00312 static void display_help(const char *app)
00313 {
00314   printf("\nUsage:\n");
00315   printf("%s -help | -r  [-t<num_threads>] | -d | [-n] [-sc <cache_size>] [-sl <log_size>] [-c <crash_site>]  [-t<num_threads>] [<timeout>]\n\n", app);
00316   
00317   printf("-r: Test recovery after a crash or shutdown.\n");
00318   printf("-d: Dump the transaction log.\n");
00319   printf("-n: Recreate the tables and recovery log.\n");
00320   printf("-c <crash_site>: Crash at this location rather than shutting down. Max = %d\n", MAX_CRASH_POINT+1);
00321   printf("-t<num_threads>: The number of writer threads to use, default is %d.\n", num_threads);
00322   //printf("-r<rate>: The number af records to be inserted per second, default is %d.\n", rate);
00323   printf("<timeout>: The number seconds the test should run before shuttingdown or crashing, default is %d.\n\n", timeout);
00324   exit(1);
00325 }
00326 
00327 //--------------------------------- 
00328 static void process_args(int argc, const char * argv[])
00329 {
00330   if (argc < 2)
00331     return;
00332     
00333   for (int i = 1; i < argc; ) {
00334     if ( argv[i][0] != '-') { // Must be timeout
00335       timeout = atoi(argv[i]);
00336       i++;
00337       if ((i != argc) || !timeout)
00338         display_help(argv[0]);
00339     } else {
00340       switch (argv[i][1]) {
00341         case 'h':
00342           display_help(argv[0]);
00343           break;
00344           
00345         case 'r':
00346           if (argc > 4 || argv[i][2])
00347             display_help(argv[0]);
00348           revover_only = true;
00349           i++;
00350           break;
00351           
00352         case 'd':
00353           if (argc != 2 || argv[i][2])
00354             display_help(argv[0]);
00355           dump_log = true;
00356           i++;
00357           break;
00358           
00359         case 'n':
00360           if (argv[i][2])
00361             display_help(argv[0]);
00362           recreate = true;
00363           i++;
00364           break;
00365           
00366         case 'c':
00367           if (argv[i][2])
00368             display_help(argv[0]);
00369           i++;
00370           crash_site = atoi(argv[i]);
00371           if (crash_site == (MAX_CRASH_POINT + 1))
00372             overflow_crash = true;
00373           else if ((!crash_site) || (crash_site >  MAX_CRASH_POINT))
00374             display_help(argv[0]);
00375           i++;
00376           break;
00377           
00378         case 's': {
00379             uint32_t size;
00380             
00381             size = atol(argv[i+1]);
00382             if (!size)
00383               display_help(argv[0]);
00384               
00385             if (argv[i][2] == 'c') 
00386               cache_size = size;
00387             else if (argv[i][2] == 'l')
00388               log_size = size;
00389             else 
00390               display_help(argv[0]);
00391             
00392             i+=2;
00393           }
00394           break;
00395           
00396         case 't':
00397           if (argv[i][2])
00398             display_help(argv[0]);
00399           i++;
00400           num_threads = atoi(argv[i]);
00401           if (!num_threads)
00402             display_help(argv[0]);
00403           i++;
00404           break;
00405 /*          
00406         case 'r':
00407           i++;
00408           rate = atoi(argv[i]);
00409           if (!rate)
00410             display_help(argv[0]);
00411           i++;
00412           break;
00413 */
00414         default:
00415           display_help(argv[0]);
00416       }
00417       
00418     }
00419   }
00420 }
00421 
00422 //--------------------------------- 
00423 static void init_env()
00424 {
00425   cs_init_memory();
00426   CSThread::startUp();
00427   if (!(main_thread = CSThread::newCSThread())) {
00428     CSException::logOSError(CS_CONTEXT, ENOMEM);
00429     exit(1);
00430   }
00431   
00432   CSThread::setSelf(main_thread);
00433   
00434   enter_();
00435   try_(a) {
00436     trans_log = MSTrans::txn_NewMSTrans("./ms-trans-log.dat", /*dump_log*/ true);
00437     new_(thread_list, CSThreadList()); 
00438   }
00439   catch_(a) {
00440     self->logException();
00441     CSThread::shutDown();
00442     exit(1);
00443   }
00444   cont_(a);
00445   
00446 }
00447 //--------------------------------- 
00448 static void deinit_env()
00449 {
00450   if (thread_list) {
00451     thread_list->release();
00452     thread_list = NULL;
00453   }
00454   
00455   if (trans_log) {
00456     trans_log->release();
00457     trans_log = NULL;
00458   }
00459   
00460   if (main_thread) {
00461     main_thread->release();
00462     main_thread = NULL;
00463   }
00464   
00465   CSThread::shutDown();
00466   cs_exit_memory();
00467 }
00468 //--------------------------------- 
00469 static bool verify_database(MYSQL *mysql)
00470 {
00471   MYSQL_RES **r_results, *l_results = NULL;            
00472   MYSQL_ROW r_record, l_record;
00473   bool ok = false;
00474   int i, log_row_cnt, ref_row_cnt = 0, tab_id;
00475   char stmt[1024];
00476   
00477   r_results = (MYSQL_RES **) malloc(num_threads * sizeof(MYSQL_RES *));
00478   
00479   if (mysql_query(mysql, "select * from "LOG_TABLE" where committed = 0 order by blob_ref")) 
00480     report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
00481             
00482   l_results = mysql_store_result(mysql);
00483   if (!l_results)
00484     report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00485 
00486   log_row_cnt = mysql_num_rows(l_results);
00487   mysql_free_result(l_results);
00488   if (log_row_cnt)
00489     printf("Uncommitted references: %d\n", log_row_cnt);
00490 
00491   //---------
00492   for (i =0; i < num_threads; i++) {
00493     sprintf(stmt, "select * from "REF_TABLE" order by blob_ref", i+1);
00494     if (mysql_query(mysql, stmt)) 
00495       report_mysql_error(mysql, __LINE__, stmt);
00496               
00497     r_results[i] = mysql_store_result(mysql);
00498     if (!r_results)
00499       report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00500       
00501     ref_row_cnt += mysql_num_rows(r_results[i]);
00502   } 
00503   //---------
00504   if (mysql_query(mysql, "select * from "LOG_TABLE" order by blob_ref")) 
00505     report_mysql_error(mysql, __LINE__, "select * from "LOG_TABLE" order by blob_ref");
00506             
00507   l_results = mysql_store_result(mysql);
00508   if (!l_results)
00509     report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00510 
00511   log_row_cnt = mysql_num_rows(l_results);
00512   
00513   if (log_row_cnt != ref_row_cnt) {
00514     if (ref_row_cnt > log_row_cnt) {
00515       printf("verify_database() Failed: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt,  ref_row_cnt);
00516       goto done;
00517     }
00518     
00519     printf("verify_database() Warnning: row count doesn't match: log_row_cnt(%d) != ref_row_cnt(%d)\n", log_row_cnt,  ref_row_cnt);   
00520     printf("Possible unreferenced BLOBs\n");
00521   }
00522   
00523   if (log_row_cnt == ref_row_cnt) {
00524     for ( i = 0; i < log_row_cnt; i++) {
00525       l_record = mysql_fetch_row(l_results);
00526       tab_id = atol(l_record[1]);
00527       r_record = mysql_fetch_row(r_results[tab_id-1]);    
00528       if ((atol(l_record[0]) != atol(r_record[0])) ||
00529         (atol(l_record[1]) != atol(r_record[1])) ||
00530         (atol(l_record[2]) != atol(r_record[2]))) {
00531         
00532         printf("verify_database() Failed: in row %d, tab_id %d\n", i+1, tab_id);
00533         printf("field 1:  %d =? %d\n", atol(l_record[0]), atol(r_record[0]));
00534         printf("field 2:  %d =? %d\n", atol(l_record[1]), atol(r_record[1]));
00535         printf("field 3:  %d =? %d\n", atol(l_record[2]), atol(r_record[2]));
00536         goto done;
00537       }
00538         
00539     }
00540   } else { // The important thing is that there are no BLOBs in the ref tabels that are not in the log table.
00541 
00542     for (i =0; i < num_threads; i++) {
00543       mysql_free_result(r_results[i]);
00544       
00545       sprintf(stmt, "select * from "REF_TABLE" where  blob_ref not in (select blob_ref from TransTest.translog where tab_id = %d)", i+1, i+1);
00546       if (mysql_query(mysql, stmt)) 
00547         report_mysql_error(mysql, __LINE__, stmt);
00548                 
00549       r_results[i] = mysql_store_result(mysql);
00550       if (!r_results)
00551         report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00552         
00553       if (mysql_num_rows(r_results[i])) {
00554         printf("verify_database() Failed, Missing BLOBs: %s\n", stmt);
00555         goto done;
00556       }
00557     } 
00558   }
00559   
00560   printf("verify_database() OK.\n");
00561   ok = true;
00562   
00563   done:
00564   
00565   for (i =0; i < num_threads; i++) {
00566     mysql_free_result(r_results[i]);
00567   }
00568   free(r_results);
00569   
00570   mysql_free_result(l_results);
00571   
00572 #ifdef DEBUG  
00573   if (!ok) {
00574     trans_log->txn_DumpLog("trace.log");
00575   }
00576 #endif  
00577   return ok;
00578 }
00579 
00580 //------------------------------------------------
00581 void TransTestReaderThread::processTransactionLog()
00582 {
00583   MSTransRec rec = {0};
00584   MS_TxnState state;
00585   char stmt[1024];
00586   uint32_t last_tid = 0;
00587   enter_();
00588   
00589   // Read in transactions from the log and update
00590   // the database table based on them.
00591   
00592   try_(a) {
00593     while (!myMustQuit && !stopit) {
00594       // This will sleep while waiting for the next 
00595       // completed transaction.
00596       log->txn_GetNextTransaction(&rec, &state); 
00597       if (myMustQuit)
00598         break;
00599 
00600       myActivity++;
00601 #ifdef CHECK_TIDS
00602       if (num_threads == 1) {
00603         ASSERT( ((last_tid + 1) == rec.tr_id) || (last_tid  == rec.tr_id) || !last_tid);
00604         last_tid = rec.tr_id;
00605       }
00606 #endif      
00607       if (!recovering) 
00608         count++;
00609       
00610       switch (TRANS_TYPE(rec.tr_type)) {
00611         case MS_ReferenceTxn:
00612         case MS_DereferenceTxn:
00613         case MS_RollBackTxn:
00614         case MS_CommitTxn:
00615         case MS_RecoveredTxn:
00616         break;
00617         default:
00618           printf("Unexpected transaction type: %d\n", rec.tr_type);
00619           exit(1);              
00620       }
00621       
00622       if (state == MS_Committed){
00623         // Dereferences are applied when the transaction is commited.
00624         // References are applied imediatly and removed if the transaction is rolled back.
00625         if (TRANS_TYPE(rec.tr_type) == MS_DereferenceTxn) {
00626           sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
00627           if (mysql_query(mysql, stmt))  
00628             report_mysql_error(mysql, __LINE__, stmt);
00629         } else if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
00630           sprintf(stmt, "UPDATE "LOG_TABLE" SET committed = 1 WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
00631           if (mysql_query(mysql, stmt))  
00632             report_mysql_error(mysql, __LINE__, stmt);
00633         }
00634       } else if (state == MS_RolledBack) { 
00635         //printf("ROLLBACK!\n");
00636         if (TRANS_TYPE(rec.tr_type) == MS_ReferenceTxn) {
00637           sprintf(stmt, "DELETE FROM "LOG_TABLE" WHERE blob_ref = %"PRIu64" AND tab_id = %d AND blob_id = %"PRIu64"", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id); 
00638           if (mysql_query(mysql, stmt))  
00639             report_mysql_error(mysql, __LINE__, stmt);
00640         }
00641       } else if (state == MS_Recovered) { 
00642         printf("Recovered transaction being ignored:\n");
00643         printf("blob_ref = %"PRIu64", tab_id = %d, blob_id = %"PRIu64"\n\n", rec.tr_blob_ref_id, rec.tr_tab_id, rec.tr_blob_id);
00644       } else {
00645         printf("Unexpected transaction state: %d\n", state);
00646         exit(1);              
00647       }
00648       
00649       
00650     }
00651   }
00652   catch_(a) {
00653     self->logException();
00654     printf("\n\n!!!!!!!! THE TRANSACTION LOG READER DIED! !!!!!!!!!!!\n\n");
00655     if (!myMustQuit && !stopit)
00656       exit(1);
00657   }
00658   cont_(a);
00659   printf("The transaction log reader shutting down.\n");
00660   exit_();
00661 }
00662 
00663 //------------------------------------------------
00664 void TransTestWriterThread::generate_records()
00665 {
00666 
00667   MS_Txn  txn_type;   
00668   uint64_t  blob_id;  
00669   uint64_t  blob_ref_id;  
00670   int tsize, i;
00671   bool do_delete;
00672     
00673   char stmt[1024];
00674   enter_();
00675 
00676   try_(a) {
00677     while (!myMustQuit && !stopit) {
00678     
00679       myActivity++;
00680       usleep(nap_time); // Give up a bit of time
00681       if (myMustQuit || stopit)
00682         break;
00683         
00684       tsize = rand() % max_transaction;
00685       
00686       if (mysql_autocommit(mysql, 0))
00687         report_mysql_error(mysql, __LINE__, "mysql_autocommit()");
00688         
00689       i = 0;
00690       do {
00691         do_delete = ((rand() %2) == 0);
00692         
00693         // decide if this is an insert or delete
00694         if (do_delete) {
00695           MYSQL_RES *results = NULL;            
00696           MYSQL_ROW record;
00697           int cnt;
00698           
00699           // If we are deleting then randomly select a record to delete
00700           // and delete it. 
00701           
00702           txn_type = MS_DereferenceTxn;
00703 
00704           sprintf(stmt, "select * from "REF_TABLE, tab_id); 
00705           if (mysql_query(mysql, stmt)) 
00706             report_mysql_error(mysql, __LINE__, stmt);
00707             
00708           results = mysql_store_result(mysql);
00709           if (!results)
00710             report_mysql_error(mysql, __LINE__, "mysql_store_result()");
00711             
00712           cnt = mysql_num_rows(results);
00713           if (!cnt)
00714             do_delete = false; // There is nothing to delete
00715           else {
00716             mysql_data_seek(results, rand()%cnt);
00717             record = mysql_fetch_row(results);
00718               
00719             blob_ref_id = atol(record[0]);
00720             blob_id = atol(record[2]);
00721             
00722             sprintf(stmt, "DELETE FROM "REF_TABLE" WHERE blob_ref = %"PRIu64" AND blob_id = %"PRIu64"", tab_id, blob_ref_id, blob_id); 
00723             if (mysql_query(mysql, stmt))  
00724               report_mysql_error(mysql, __LINE__, stmt);
00725               
00726             if (mysql_affected_rows(mysql) == 0)
00727               do_delete = false; // Another thread must have deleted the row first.
00728             else
00729               fprintf(myLog, "DELETE %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id); 
00730           }
00731           
00732           mysql_free_result(results);
00733         } 
00734         
00735         if (!do_delete) {
00736           blob_id = self->myTID; // Assign the tid as the blob id to help with debugging.
00737           txn_type = MS_ReferenceTxn;
00738           
00739           sprintf(stmt, "INSERT INTO "REF_TABLE" VALUES( NULL, %d, %"PRIu64", 0)", tab_id, tab_id, blob_id); 
00740           if (mysql_query(mysql, stmt)) 
00741             report_mysql_error(mysql, __LINE__, stmt);
00742             
00743           blob_ref_id = mysql_insert_id(mysql);
00744           if (!blob_ref_id)
00745             report_mysql_error(mysql, __LINE__, "mysql_insert_id() returned 0");
00746           
00747           fprintf(myLog, "INSERT %"PRIu64" %"PRIu64"\n", blob_ref_id, blob_id); 
00748           // Apply the blob reference now. This will be undone if the transaction is rolled back.
00749           sprintf(stmt, "INSERT INTO "LOG_TABLE" VALUES(%"PRIu64", %d, %"PRIu64", 0)", blob_ref_id, tab_id, blob_id); 
00750           if (mysql_query(mysql, stmt)) 
00751             report_mysql_error(mysql, __LINE__, stmt);
00752         }
00753 
00754         i++;
00755         count++;
00756         if (i >= tsize) { //Commit the database transaction before the log transaction.
00757           bool rollback;
00758           
00759           rollback = ((tsize > 0) && ((rand() % 1000) == 0));
00760           if (rollback) {
00761             printf("Rollback\n");
00762             if (mysql_rollback(mysql)) // commit the staement to the database,
00763               report_mysql_error(mysql, __LINE__, "mysql_rollback()");  
00764             fprintf(myLog, "Rollback %"PRIu32"\n", self->myTID);  
00765             log->txn_LogTransaction(MS_RollBackTxn);
00766           } else {
00767             if (mysql_commit(mysql)) // commit the staement to the database,
00768               report_mysql_error(mysql, __LINE__, "mysql_commit()");  
00769             fprintf(myLog, "Commit %"PRIu32"\n", self->myTID);  
00770             log->txn_LogTransaction(txn_type, true, A_DB_ID, tab_id, blob_id, blob_ref_id);
00771           }
00772         } else
00773           log->txn_LogTransaction(txn_type, false, A_DB_ID, tab_id, blob_id, blob_ref_id);
00774                 
00775       } while ( i < tsize);
00776             
00777     }
00778   }
00779   
00780   catch_(a) {
00781     self->logException();
00782     printf("\n\nA writer thread for table %d died! \n\n", tab_id);
00783     if (i == tsize) {
00784       printf(" It is possible that the last %d operations on table %d were committed to the database but not to the log.\n", tsize, tab_id);
00785     }
00786     if (!myMustQuit && !stopit)
00787       exit(1);
00788   }
00789   cont_(a);
00790   printf("Writer thread for table %d is shutting down.\n", tab_id);
00791   exit_();
00792 }
00793 
00794 // SELECT * FROM TransTest.translog where  blob_ref not in (select blob_ref from TransTest.transref)
00795 // SELECT * FROM TransTest.transref_1 where  blob_ref not in (select blob_ref from TransTest.translog where tab_id = 1)
00796 // SELECT * FROM TransTest.translog where  tab_id = 1 AND blob_ref not in (select blob_ref from TransTest.transref_1)
00797 // select count(*) from TransTest.translog where committed = 1
00798 //--------------------------------- 
00799 int main (int argc, const char * argv[]) 
00800 {
00801   MYSQL *mysql;
00802   TransTestWriterThread **writer = NULL;
00803   int rtc = 1;
00804   
00805   process_args(argc, argv);
00806   
00807   mysql = new_connection(true);
00808   
00809   if (recreate)
00810     init_database(mysql, num_threads);
00811     
00812   init_env();
00813   enter_();
00814   
00815   if (dump_log) {
00816     printf("LOG dumped\n");
00817     exit(1);
00818   }
00819   
00820   TransReader = TransTestReaderThread::newTransTestReaderThread(trans_log);
00821   push_(TransReader);
00822   TransReader->recovering = true;
00823   TransReader->start();
00824   
00825   // wait until the recovery is complete.
00826   while (trans_log->txn_GetNumRecords())
00827     usleep(100);
00828     
00829   TransReader->recovering = false;
00830   
00831   if (log_size)
00832     trans_log->txn_SetLogSize(log_size);
00833     
00834   if (cache_size)
00835     trans_log->txn_SetCacheSize(cache_size);
00836     
00837   if (revover_only) {
00838     TransReader->stopit = true;
00839     if (verify_database(mysql))
00840       rtc = 0;
00841     goto done;
00842   }
00843   
00844   try_(a) {
00845     writer = (TransTestWriterThread **) cs_malloc(num_threads * sizeof(TransTestWriterThread *));
00846     for (int i = 0; i < num_threads; i++) {
00847       TransTestWriterThread *wt = TransTestWriterThread::newTransTestWriterThread(i+1);
00848       wt->start();
00849       writer[i] = wt;
00850     }
00851   
00852     printf("Timeout: %d seconds\n", timeout); 
00853     timeout += time(NULL);
00854     int header = 0;
00855     while (timeout > time(NULL)) {
00856       MSTransStatsRec stats;
00857       self->sleep(1000);
00858       trans_log->txn_GetStats(&stats);
00859       
00860       
00861       if (!(header%20)) {
00862         for (int i = 0; i < num_threads; i++) {       
00863           if (writer[i]->myActivity == 0) {
00864             printf("Writer thread %d HUNG!!!\n", i);
00865           }
00866           writer[i]->myActivity = 0;
00867         }
00868         
00869         if (TransReader->myActivity == 0) {
00870           printf("Reader thread HUNG!!!\n");
00871         }
00872         TransReader->myActivity = 0;
00873           
00874         printf("%s | %s | %s | %s | %s | %s | %s | %s\n", "LogSize", "Full", "MaxSize", "Overflows", "Overflowing", "CacheSize", "Cache Used", "Cache Hit");
00875       }
00876       header++;
00877       //printf("Writes: %d \t\t Reads: %d \t%d \t start: %"PRIu64"\t\t eol:%"PRIu64"\n", count, TransReader->count, count - TransReader->count, trans_log->txn_Start, trans_log->txn_EOL);
00878       printf("%7llu | %3d%% | %7llu | %9d | %11s | %9d | %9d%% | %9d%%\n",// | \t\t\t%"PRIu64" \t%"PRIu64"\n", 
00879         stats.ts_LogSize,
00880         stats.ts_PercentFull,
00881         stats.ts_MaxSize,
00882         stats.ts_OverflowCount,
00883         (stats.ts_IsOverflowing)?"Over Flow": "   ---   ",
00884         stats.ts_TransCacheSize,
00885         stats.ts_PercentTransCacheUsed,
00886         stats.ts_PercentCacheHit//, trans_log->txn_Start, trans_log->txn_EOL
00887         );
00888         
00889         if (stats.ts_IsOverflowing && overflow_crash) {
00890           printf("Simulating crash while in overflow\n");
00891           exit(1);
00892         }
00893     }
00894 
00895 #ifdef CRASH_TEST   
00896     if (crash_site) {
00897       printf("Crashing at crash site %d\n", crash_site);
00898       trans_test_crash_point = crash_site;
00899       // set the crash site and wait to die.
00900       while(1)
00901         self->sleep(1000);
00902     }
00903 #endif
00904     
00905     printf("Shutting down the writer threads:\n");
00906     for (int i = 0; i < num_threads; i++) {
00907       writer[i]->stopit = true;
00908     }
00909     
00910     TransReader->stopit = true;
00911     // Give the writers a chance to shutdown by themselves.
00912     int cnt = 100;
00913     while (cnt) {
00914       int i;
00915       for (i = 0; i < num_threads && writer[i]->finished; i++);
00916       if (i == num_threads && TransReader->finished)
00917         break;
00918       self->sleep(10);  
00919       cnt--;      
00920     }
00921     
00922     for (int i = 0; i < num_threads; i++) {
00923       writer[i]->stop();
00924     }
00925     
00926   }
00927   rtc = 0;
00928   catch_(a) {
00929     printf("Main thread abort.\n");
00930     self->logException();
00931   }
00932   cont_(a);
00933   if (writer) {
00934     for (int i = 0; i < num_threads; i++) {
00935       writer[i]->stop();
00936       writer[i]->release();
00937     }
00938     cs_free(writer);
00939   }
00940     
00941 done:
00942   TransReader->stop();
00943   release_(TransReader);
00944   
00945   outer_();
00946   
00947   thread_list->stopAllThreads();
00948   deinit_env();
00949   mysql_close(mysql);
00950   exit(rtc);
00951 }
00952 
00953 #endif // UNIT_TEST