Drizzled Public API Documentation

events_ms.cc

00001 /*
00002  *  Copyright (C) 2010 PrimeBase Technologies GmbH, Germany
00003  *
00004  *  This program is free software; you can redistribute it and/or modify
00005  *  it under the terms of the GNU General Public License as published by
00006  *  the Free Software Foundation; version 2 of the License.
00007  *
00008  *  This program is distributed in the hope that it will be useful,
00009  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00010  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00011  *  GNU General Public License for more details.
00012  *
00013  *  You should have received a copy of the GNU General Public License
00014  *  along with this program; if not, write to the Free Software
00015  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00016  *
00017  * Barry Leslie
00018  *
00019  * 2010-06-01
00020  */
00021 
00022 #include <config.h>
00023 #include <string>
00024 #include <inttypes.h>
00025 
00026 #include <drizzled/session.h>
00027 #include <drizzled/field/blob.h>
00028 #include <drizzled/sql_lex.h>
00029 
00030 #include "cslib/CSConfig.h"
00031 #include "cslib/CSGlobal.h"
00032 #include "cslib/CSStrUtil.h"
00033 #include "cslib/CSThread.h"
00034 
00035 #include "events_ms.h"
00036 #include "parameters_ms.h"
00037 #include "engine_ms.h"
00038 
00039 using namespace drizzled;
00040 using namespace plugin;
00041 using namespace std;
00042 
00043 
00044 //==================================
00045 // My table event observers: 
00046 static bool insertRecord(const char *db, const char *table_name, char *possible_blob_url,  size_t length, 
00047   Session &session, Field_blob *field, unsigned char *blob_rec, size_t packlength)
00048 {
00049   char *blob_url;
00050   char safe_url[PBMS_BLOB_URL_SIZE+1];
00051   PBMSBlobURLRec blob_url_buffer;
00052   size_t org_length = length;
00053   int32_t err;
00054   PBMSResultRec result;
00055   
00056   // Tell PBMS to record a new reference to the BLOB.
00057   // If 'blob' is not a BLOB URL then it will be stored in the repositor as a new BLOB
00058   // and a reference to it will be created.
00059   
00060   if (MSEngine::couldBeURL(possible_blob_url, length) == false) {
00061     err = MSEngine::createBlob(db, table_name, possible_blob_url, length, &blob_url_buffer, &result);
00062     if (err) {
00063       // If it fails log the error and continue to try and release any other BLOBs in the row.
00064       fprintf(stderr, "PBMSEvents: createBlob(\"%s.%s\") error (%d):'%s'\n", 
00065         db, table_name, result.mr_code,  result.mr_message);
00066         
00067       return true;
00068     }       
00069     blob_url = blob_url_buffer.bu_data;
00070   } else {
00071     // The BLOB URL may not be null terminate, if so
00072     // then copy it to a safe buffer and terminate it.
00073     if (possible_blob_url[length]) {
00074       memcpy(safe_url, possible_blob_url, length);
00075       safe_url[length] = 0;
00076       blob_url = safe_url;
00077     } else
00078       blob_url = possible_blob_url;
00079   }
00080   
00081   // Tell PBMS to add a reference to the BLOB.
00082   err = MSEngine::referenceBlob(db, table_name, &blob_url_buffer, blob_url, field->position(), &result);
00083   if (err) {
00084     // If it fails log the error and continue to try and release any other BLOBs in the row.
00085     fprintf(stderr, "PBMSEvents: referenceBlob(\"%s.%s\", \"%s\" ) error (%d):'%s'\n", 
00086       db, table_name, blob_url, result.mr_code,  result.mr_message);
00087       
00088     return true;
00089   }
00090   
00091   // The URL is modified on insert so if the BLOB length changed reset it. 
00092   // This will happen if the BLOB data was replaced with a BLOB reference. 
00093   length = strlen(blob_url_buffer.bu_data)  +1;
00094   if ((length != org_length) || memcmp(blob_url_buffer.bu_data, possible_blob_url, length)) {
00095     char *blob = possible_blob_url; // This is the BLOB as the server currently sees it.
00096     
00097     if (length != org_length) {
00098       field->store_length(blob_rec, length);
00099     }
00100     
00101     if (length > org_length) {
00102       // This can only happen if the BLOB URL is actually larger than the BLOB itself.
00103       blob = (char *) session.getMemRoot()->allocate(length);
00104       memcpy(blob_rec+packlength, &blob, sizeof(char*));
00105     }     
00106     memcpy(blob, blob_url_buffer.bu_data, length);
00107   } 
00108 
00109   return false;
00110 }
00111 
00112 //---
00113 static bool deleteRecord(const char *db, const char *table_name, char *blob_url,  size_t length)
00114 {
00115   int32_t err;
00116   char safe_url[PBMS_BLOB_URL_SIZE+1];
00117   PBMSResultRec result;
00118   bool call_failed = false;
00119   
00120   // Check to see if this is a valid URL.
00121   if (MSEngine::couldBeURL(blob_url, length)) {
00122   
00123     // The BLOB URL may not be null terminate, if so
00124     // then copy it to a safe buffer and terminate it.
00125     if (blob_url[length]) {
00126       memcpy(safe_url, blob_url, length);
00127       safe_url[length] = 0;
00128       blob_url = safe_url;
00129     }
00130     
00131     // Signal PBMS to delete the reference to the BLOB.
00132     err = MSEngine::dereferenceBlob(db, table_name, blob_url, &result);
00133     if (err) {
00134       // If it fails log the error and continue to try and release any other BLOBs in the row.
00135       fprintf(stderr, "PBMSEvents: dereferenceBlob(\"%s.%s\") error (%d):'%s'\n", 
00136         db, table_name, result.mr_code,  result.mr_message);
00137         
00138       call_failed = true;
00139     }
00140   }
00141 
00142   return call_failed;
00143 }
00144 
00145 //---
00146 static bool observeBeforeInsertRecord(BeforeInsertRecordEventData &data)
00147 {
00148   Field_blob *field;
00149   unsigned char *blob_rec;
00150   char *blob_url;
00151   size_t packlength, i, length;
00152 
00153   for (i= 0; i < data.table.sizeBlobFields(); i++) {
00154     field = data.table.getBlobFieldAt(i);
00155     
00156     if (field->is_null_in_record(data.row))
00157       continue;
00158       
00159     // Get the blob record:
00160     packlength = field->pack_length() - data.table.getBlobPtrSize();
00161 
00162     blob_rec = (unsigned char *)data.row + field->offset(data.table.getInsertRecord());
00163     length = field->get_length(blob_rec);
00164     memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
00165 
00166     if (insertRecord(data.table.getSchemaName(), data.table.getTableName(), 
00167       blob_url, length, data.session, field, blob_rec, packlength))
00168       return true;
00169   }
00170 
00171   return false;
00172 }
00173 
00174 //---
00175 static bool observeAfterInsertRecord(AfterInsertRecordEventData &data)
00176 {
00177   bool has_blob = false;
00178   
00179   for (uint32_t i= 0; (i < data.table.sizeBlobFields()) && (has_blob == false); i++) {
00180     Field_blob *field = data.table.getBlobFieldAt(i);
00181     
00182     if ( field->is_null_in_record(data.row) == false)
00183       has_blob = true;
00184   }
00185   
00186   if  (has_blob)
00187     MSEngine::callCompleted(data.err == 0);
00188   
00189   return false;
00190 }
00191 
00192 //---
00193 static bool observeBeforeUpdateRecord(BeforeUpdateRecordEventData &data)
00194 {
00195   Field_blob *field;
00196   uint32_t field_offset;
00197   const unsigned char *old_blob_rec;
00198   unsigned char *new_blob_rec= NULL;
00199   char *old_blob_url, *new_blob_url;
00200   size_t packlength, i, old_length= 0, new_length= 0;
00201   const unsigned char *old_row = data.old_row;
00202   unsigned char *new_row = data.new_row;
00203   const char *db = data.table.getSchemaName();
00204   const char *table_name = data.table.getTableName();
00205   bool old_null, new_null;
00206 
00207   for (i= 0; i < data.table.sizeBlobFields(); i++) {
00208     field = data.table.getBlobFieldAt(i);
00209     
00210     new_null = field->is_null_in_record(new_row);   
00211     old_null = field->is_null_in_record(old_row);
00212     
00213     if (new_null && old_null)
00214       continue;
00215     
00216     // Check to see if the BLOB data was updated.
00217 
00218     // Get the blob records:
00219     field_offset = field->offset(data.table.getInsertRecord());
00220     packlength = field->pack_length() - data.table.getBlobPtrSize();
00221 
00222     if (new_null) {
00223       new_blob_url = NULL;
00224     } else {
00225       new_blob_rec = new_row + field_offset;
00226       new_length = field->get_length(new_blob_rec);
00227       memcpy(&new_blob_url, new_blob_rec +packlength, sizeof(char*));
00228     }
00229     
00230     if (old_null) {
00231       old_blob_url = NULL;
00232     } else {
00233       old_blob_rec = old_row + field_offset;
00234       old_length = field->get_length(old_blob_rec);
00235       memcpy(&old_blob_url, old_blob_rec +packlength, sizeof(char*));
00236     }
00237     
00238     // Check to see if the BLOBs are the same.
00239     // I am assuming that if the BLOB pointer is different then teh BLOB has changed.
00240     // Zero length BLOBs are a special case because they may have a NULL data pointer,
00241     // to catch this and distiguish it from a NULL BLOB I do a check to see if one field was NULL:
00242     // (old_null != new_null)
00243     if ((old_blob_url != new_blob_url) || (old_null != new_null)) {
00244       
00245       // The BLOB was updated so delete the old one and insert the new one.
00246       if ((old_null == false) && deleteRecord(db, table_name, old_blob_url, old_length))
00247         return true;
00248         
00249       if ((new_null == false) && insertRecord(db, table_name, new_blob_url, new_length, data.session, field, new_blob_rec, packlength))
00250         return true;
00251 
00252     }
00253     
00254   }
00255 
00256   return false;
00257 }
00258 
00259 //---
00260 static bool observeAfterUpdateRecord(AfterUpdateRecordEventData &data)
00261 {
00262   bool has_blob = false;
00263   const unsigned char *old_row = data.old_row;
00264   const unsigned char *new_row = data.new_row;
00265   
00266   for (uint32_t i= 0; (i < data.table.sizeBlobFields()) && (has_blob == false); i++) {
00267     Field_blob *field = data.table.getBlobFieldAt(i);   
00268     bool new_null = field->is_null_in_record(new_row);    
00269     bool old_null = field->is_null_in_record(old_row);
00270     
00271     if ( (new_null == false) || (old_null == false)) {
00272       const unsigned char *blob_rec;      
00273       size_t field_offset = field->offset(data.table.getInsertRecord());
00274       size_t packlength = field->pack_length() - data.table.getBlobPtrSize();
00275       char *old_blob_url, *new_blob_url;
00276       
00277       blob_rec = new_row + field_offset;
00278       memcpy(&new_blob_url, blob_rec +packlength, sizeof(char*));
00279 
00280       blob_rec = old_row + field_offset;
00281       memcpy(&old_blob_url, blob_rec +packlength, sizeof(char*));
00282 
00283       has_blob = ((old_blob_url != new_blob_url) || (old_null != new_null));
00284     }
00285   }
00286   
00287   if  (has_blob)
00288     MSEngine::callCompleted(data.err == 0);
00289 
00290   return false;
00291 }
00292 
00293 //---
00294 static bool observeAfterDeleteRecord(AfterDeleteRecordEventData &data)
00295 {
00296   Field_blob *field;
00297   const unsigned char *blob_rec;
00298   char *blob_url;
00299   size_t packlength, i, length;
00300   bool call_failed = false;
00301   bool has_blob = false;
00302   
00303   if (data.err != 0)
00304     return false;
00305 
00306   for (i= 0; (i < data.table.sizeBlobFields()) && (call_failed == false); i++) {
00307     field = data.table.getBlobFieldAt(i);
00308     
00309     if (field->is_null_in_record(data.row))
00310       continue;
00311       
00312     has_blob = true;  
00313     // Get the blob record:
00314     packlength = field->pack_length() - data.table.getBlobPtrSize();
00315 
00316     blob_rec = data.row + field->offset(data.table.getInsertRecord());
00317     length = field->get_length(blob_rec);
00318     memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
00319 
00320     if (deleteRecord(data.table.getSchemaName(), data.table.getTableName(), blob_url, length))
00321       call_failed = true;
00322   }
00323   
00324   if (has_blob)
00325     MSEngine::callCompleted(call_failed == false);
00326     
00327   return call_failed;
00328 }
00329 
00330 //==================================
00331 // My session event observers: 
00332 static bool observeAfterDropDatabase(AfterDropDatabaseEventData &data)
00333 {
00334   PBMSResultRec result;
00335   if (data.err != 0)
00336     return false;
00337 
00338   if (MSEngine::dropDatabase(data.db.c_str(), &result) != 0) {
00339     fprintf(stderr, "PBMSEvents: dropDatabase(\"%s\") error (%d):'%s'\n", 
00340       data.db.c_str(), result.mr_code,  result.mr_message);
00341   }
00342   
00343   // Always return no error for after drop database. What could the server do about it?
00344   return false;
00345 }
00346 
00347 //==================================
00348 // My schema event observers: 
00349 static bool observeAfterDropTable(AfterDropTableEventData &data)
00350 {
00351   PBMSResultRec result;
00352   if (data.err != 0)
00353     return false;
00354 
00355   if (MSEngine::dropTable(data.table.getSchemaName().c_str(), data.table.getTableName().c_str(), &result) != 0) {
00356     fprintf(stderr, "PBMSEvents: dropTable(\"%s.%s\") error (%d):'%s'\n", 
00357       data.table.getSchemaName().c_str(), data.table.getTableName().c_str(), result.mr_code,  result.mr_message);
00358     return true;
00359   }
00360   MSEngine::callCompleted(true);
00361   
00362   return false;
00363 }
00364 
00365 //---
00366 static bool observeAfterRenameTable(AfterRenameTableEventData &data)
00367 {
00368   PBMSResultRec result;
00369   if (data.err != 0)
00370     return false;
00371 
00372   const char *from_db = data.from.getSchemaName().c_str();
00373   const char *from_table = data.from.getTableName().c_str();
00374   const char *to_db = data.to.getSchemaName().c_str();
00375   const char *to_table = data.to.getTableName().c_str();
00376   
00377   if (MSEngine::renameTable(from_db, from_table, to_db, to_table, &result) != 0) {
00378     fprintf(stderr, "PBMSEvents: renameTable(\"%s.%s\" To \"%s.%s\") error (%d):'%s'\n", 
00379       from_db, from_table, to_db, to_table, result.mr_code,  result.mr_message);
00380     return true;
00381   }
00382   MSEngine::callCompleted(true);
00383   
00384   return false;
00385 }
00386 
00387 //==================================
00388 /* This is where I register which table events my pluggin is interested in.*/
00389 void PBMSEvents::registerTableEventsDo(TableShare &table_share, EventObserverList &observers)
00390 {
00391   if ((PBMSParameters::isPBMSEventsEnabled() == false) 
00392     || (PBMSParameters::isBLOBTable(table_share.getSchemaName(), table_share.getTableName()) == false))
00393     return;
00394     
00395   if (table_share.blob_fields > 0) {
00396     registerEvent(observers, BEFORE_INSERT_RECORD, PBMSParameters::getBeforeInsertEventPosition()); // I want to be called first if passible
00397     registerEvent(observers, AFTER_INSERT_RECORD); 
00398     registerEvent(observers, BEFORE_UPDATE_RECORD, PBMSParameters::getBeforeUptateEventPosition());
00399     registerEvent(observers, AFTER_UPDATE_RECORD); 
00400     registerEvent(observers, AFTER_DELETE_RECORD);
00401  }
00402 }
00403 
00404 //==================================
00405 /* This is where I register which schema events my pluggin is interested in.*/
00406 void PBMSEvents::registerSchemaEventsDo(const std::string &db, EventObserverList &observers)
00407 {
00408   if ((PBMSParameters::isPBMSEventsEnabled() == false) 
00409     || (PBMSParameters::isBLOBDatabase(db.c_str()) == false))
00410     return;
00411     
00412   registerEvent(observers, AFTER_DROP_TABLE);
00413   registerEvent(observers, AFTER_RENAME_TABLE);
00414 }
00415 
00416 //==================================
00417 /* This is where I register which schema events my pluggin is interested in.*/
00418 void PBMSEvents::registerSessionEventsDo(Session &, EventObserverList &observers)
00419 {
00420   if (PBMSParameters::isPBMSEventsEnabled() == false) 
00421     return;
00422     
00423   registerEvent(observers, AFTER_DROP_DATABASE);
00424 }
00425 
00426 //==================================
00427 /* The event observer.*/
00428 bool PBMSEvents::observeEventDo(EventData &data)
00429 {
00430   bool result= false;
00431   
00432   switch (data.event) {
00433   case AFTER_DROP_DATABASE:
00434     result = observeAfterDropDatabase((AfterDropDatabaseEventData &)data);
00435     break;
00436     
00437   case AFTER_DROP_TABLE:
00438     result = observeAfterDropTable((AfterDropTableEventData &)data);
00439     break;
00440     
00441   case AFTER_RENAME_TABLE:
00442     result = observeAfterRenameTable((AfterRenameTableEventData &)data);
00443     break;
00444     
00445   case BEFORE_INSERT_RECORD:
00446      result = observeBeforeInsertRecord((BeforeInsertRecordEventData &)data);
00447     break;
00448     
00449   case AFTER_INSERT_RECORD:
00450     result = observeAfterInsertRecord((AfterInsertRecordEventData &)data);
00451     break;
00452     
00453  case BEFORE_UPDATE_RECORD:
00454     result = observeBeforeUpdateRecord((BeforeUpdateRecordEventData &)data);
00455     break;
00456              
00457   case AFTER_UPDATE_RECORD:
00458     result = observeAfterUpdateRecord((AfterUpdateRecordEventData &)data);
00459     break;
00460     
00461   case AFTER_DELETE_RECORD:
00462     result = observeAfterDeleteRecord((AfterDeleteRecordEventData &)data);
00463     break;
00464 
00465   default:
00466     fprintf(stderr, "PBMSEvents: Unexpected event '%s'\n", EventObserver::eventName(data.event));
00467  
00468   }
00469   
00470   return result;
00471 }
00472