00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <config.h>
00023 #include <string>
00024 #include <inttypes.h>
00025
00026 #include <drizzled/session.h>
00027 #include <drizzled/field/blob.h>
00028 #include <drizzled/sql_lex.h>
00029
00030 #include "cslib/CSConfig.h"
00031 #include "cslib/CSGlobal.h"
00032 #include "cslib/CSStrUtil.h"
00033 #include "cslib/CSThread.h"
00034
00035 #include "events_ms.h"
00036 #include "parameters_ms.h"
00037 #include "engine_ms.h"
00038
00039 using namespace drizzled;
00040 using namespace plugin;
00041 using namespace std;
00042
00043
00044
00045
00046 static bool insertRecord(const char *db, const char *table_name, char *possible_blob_url, size_t length,
00047 Session &session, Field_blob *field, unsigned char *blob_rec, size_t packlength)
00048 {
00049 char *blob_url;
00050 char safe_url[PBMS_BLOB_URL_SIZE+1];
00051 PBMSBlobURLRec blob_url_buffer;
00052 size_t org_length = length;
00053 int32_t err;
00054 PBMSResultRec result;
00055
00056
00057
00058
00059
00060 if (MSEngine::couldBeURL(possible_blob_url, length) == false) {
00061 err = MSEngine::createBlob(db, table_name, possible_blob_url, length, &blob_url_buffer, &result);
00062 if (err) {
00063
00064 fprintf(stderr, "PBMSEvents: createBlob(\"%s.%s\") error (%d):'%s'\n",
00065 db, table_name, result.mr_code, result.mr_message);
00066
00067 return true;
00068 }
00069 blob_url = blob_url_buffer.bu_data;
00070 } else {
00071
00072
00073 if (possible_blob_url[length]) {
00074 memcpy(safe_url, possible_blob_url, length);
00075 safe_url[length] = 0;
00076 blob_url = safe_url;
00077 } else
00078 blob_url = possible_blob_url;
00079 }
00080
00081
00082 err = MSEngine::referenceBlob(db, table_name, &blob_url_buffer, blob_url, field->position(), &result);
00083 if (err) {
00084
00085 fprintf(stderr, "PBMSEvents: referenceBlob(\"%s.%s\", \"%s\" ) error (%d):'%s'\n",
00086 db, table_name, blob_url, result.mr_code, result.mr_message);
00087
00088 return true;
00089 }
00090
00091
00092
00093 length = strlen(blob_url_buffer.bu_data) +1;
00094 if ((length != org_length) || memcmp(blob_url_buffer.bu_data, possible_blob_url, length)) {
00095 char *blob = possible_blob_url;
00096
00097 if (length != org_length) {
00098 field->store_length(blob_rec, length);
00099 }
00100
00101 if (length > org_length) {
00102
00103 blob = (char *) session.getMemRoot()->allocate(length);
00104 memcpy(blob_rec+packlength, &blob, sizeof(char*));
00105 }
00106 memcpy(blob, blob_url_buffer.bu_data, length);
00107 }
00108
00109 return false;
00110 }
00111
00112
00113 static bool deleteRecord(const char *db, const char *table_name, char *blob_url, size_t length)
00114 {
00115 int32_t err;
00116 char safe_url[PBMS_BLOB_URL_SIZE+1];
00117 PBMSResultRec result;
00118 bool call_failed = false;
00119
00120
00121 if (MSEngine::couldBeURL(blob_url, length)) {
00122
00123
00124
00125 if (blob_url[length]) {
00126 memcpy(safe_url, blob_url, length);
00127 safe_url[length] = 0;
00128 blob_url = safe_url;
00129 }
00130
00131
00132 err = MSEngine::dereferenceBlob(db, table_name, blob_url, &result);
00133 if (err) {
00134
00135 fprintf(stderr, "PBMSEvents: dereferenceBlob(\"%s.%s\") error (%d):'%s'\n",
00136 db, table_name, result.mr_code, result.mr_message);
00137
00138 call_failed = true;
00139 }
00140 }
00141
00142 return call_failed;
00143 }
00144
00145
00146 static bool observeBeforeInsertRecord(BeforeInsertRecordEventData &data)
00147 {
00148 Field_blob *field;
00149 unsigned char *blob_rec;
00150 char *blob_url;
00151 size_t packlength, i, length;
00152
00153 for (i= 0; i < data.table.sizeBlobFields(); i++) {
00154 field = data.table.getBlobFieldAt(i);
00155
00156 if (field->is_null_in_record(data.row))
00157 continue;
00158
00159
00160 packlength = field->pack_length() - data.table.getBlobPtrSize();
00161
00162 blob_rec = (unsigned char *)data.row + field->offset(data.table.getInsertRecord());
00163 length = field->get_length(blob_rec);
00164 memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
00165
00166 if (insertRecord(data.table.getSchemaName(), data.table.getTableName(),
00167 blob_url, length, data.session, field, blob_rec, packlength))
00168 return true;
00169 }
00170
00171 return false;
00172 }
00173
00174
00175 static bool observeAfterInsertRecord(AfterInsertRecordEventData &data)
00176 {
00177 bool has_blob = false;
00178
00179 for (uint32_t i= 0; (i < data.table.sizeBlobFields()) && (has_blob == false); i++) {
00180 Field_blob *field = data.table.getBlobFieldAt(i);
00181
00182 if ( field->is_null_in_record(data.row) == false)
00183 has_blob = true;
00184 }
00185
00186 if (has_blob)
00187 MSEngine::callCompleted(data.err == 0);
00188
00189 return false;
00190 }
00191
00192
00193 static bool observeBeforeUpdateRecord(BeforeUpdateRecordEventData &data)
00194 {
00195 Field_blob *field;
00196 uint32_t field_offset;
00197 const unsigned char *old_blob_rec;
00198 unsigned char *new_blob_rec= NULL;
00199 char *old_blob_url, *new_blob_url;
00200 size_t packlength, i, old_length= 0, new_length= 0;
00201 const unsigned char *old_row = data.old_row;
00202 unsigned char *new_row = data.new_row;
00203 const char *db = data.table.getSchemaName();
00204 const char *table_name = data.table.getTableName();
00205 bool old_null, new_null;
00206
00207 for (i= 0; i < data.table.sizeBlobFields(); i++) {
00208 field = data.table.getBlobFieldAt(i);
00209
00210 new_null = field->is_null_in_record(new_row);
00211 old_null = field->is_null_in_record(old_row);
00212
00213 if (new_null && old_null)
00214 continue;
00215
00216
00217
00218
00219 field_offset = field->offset(data.table.getInsertRecord());
00220 packlength = field->pack_length() - data.table.getBlobPtrSize();
00221
00222 if (new_null) {
00223 new_blob_url = NULL;
00224 } else {
00225 new_blob_rec = new_row + field_offset;
00226 new_length = field->get_length(new_blob_rec);
00227 memcpy(&new_blob_url, new_blob_rec +packlength, sizeof(char*));
00228 }
00229
00230 if (old_null) {
00231 old_blob_url = NULL;
00232 } else {
00233 old_blob_rec = old_row + field_offset;
00234 old_length = field->get_length(old_blob_rec);
00235 memcpy(&old_blob_url, old_blob_rec +packlength, sizeof(char*));
00236 }
00237
00238
00239
00240
00241
00242
00243 if ((old_blob_url != new_blob_url) || (old_null != new_null)) {
00244
00245
00246 if ((old_null == false) && deleteRecord(db, table_name, old_blob_url, old_length))
00247 return true;
00248
00249 if ((new_null == false) && insertRecord(db, table_name, new_blob_url, new_length, data.session, field, new_blob_rec, packlength))
00250 return true;
00251
00252 }
00253
00254 }
00255
00256 return false;
00257 }
00258
00259
00260 static bool observeAfterUpdateRecord(AfterUpdateRecordEventData &data)
00261 {
00262 bool has_blob = false;
00263 const unsigned char *old_row = data.old_row;
00264 const unsigned char *new_row = data.new_row;
00265
00266 for (uint32_t i= 0; (i < data.table.sizeBlobFields()) && (has_blob == false); i++) {
00267 Field_blob *field = data.table.getBlobFieldAt(i);
00268 bool new_null = field->is_null_in_record(new_row);
00269 bool old_null = field->is_null_in_record(old_row);
00270
00271 if ( (new_null == false) || (old_null == false)) {
00272 const unsigned char *blob_rec;
00273 size_t field_offset = field->offset(data.table.getInsertRecord());
00274 size_t packlength = field->pack_length() - data.table.getBlobPtrSize();
00275 char *old_blob_url, *new_blob_url;
00276
00277 blob_rec = new_row + field_offset;
00278 memcpy(&new_blob_url, blob_rec +packlength, sizeof(char*));
00279
00280 blob_rec = old_row + field_offset;
00281 memcpy(&old_blob_url, blob_rec +packlength, sizeof(char*));
00282
00283 has_blob = ((old_blob_url != new_blob_url) || (old_null != new_null));
00284 }
00285 }
00286
00287 if (has_blob)
00288 MSEngine::callCompleted(data.err == 0);
00289
00290 return false;
00291 }
00292
00293
00294 static bool observeAfterDeleteRecord(AfterDeleteRecordEventData &data)
00295 {
00296 Field_blob *field;
00297 const unsigned char *blob_rec;
00298 char *blob_url;
00299 size_t packlength, i, length;
00300 bool call_failed = false;
00301 bool has_blob = false;
00302
00303 if (data.err != 0)
00304 return false;
00305
00306 for (i= 0; (i < data.table.sizeBlobFields()) && (call_failed == false); i++) {
00307 field = data.table.getBlobFieldAt(i);
00308
00309 if (field->is_null_in_record(data.row))
00310 continue;
00311
00312 has_blob = true;
00313
00314 packlength = field->pack_length() - data.table.getBlobPtrSize();
00315
00316 blob_rec = data.row + field->offset(data.table.getInsertRecord());
00317 length = field->get_length(blob_rec);
00318 memcpy(&blob_url, blob_rec +packlength, sizeof(char*));
00319
00320 if (deleteRecord(data.table.getSchemaName(), data.table.getTableName(), blob_url, length))
00321 call_failed = true;
00322 }
00323
00324 if (has_blob)
00325 MSEngine::callCompleted(call_failed == false);
00326
00327 return call_failed;
00328 }
00329
00330
00331
00332 static bool observeAfterDropDatabase(AfterDropDatabaseEventData &data)
00333 {
00334 PBMSResultRec result;
00335 if (data.err != 0)
00336 return false;
00337
00338 if (MSEngine::dropDatabase(data.db.c_str(), &result) != 0) {
00339 fprintf(stderr, "PBMSEvents: dropDatabase(\"%s\") error (%d):'%s'\n",
00340 data.db.c_str(), result.mr_code, result.mr_message);
00341 }
00342
00343
00344 return false;
00345 }
00346
00347
00348
00349 static bool observeAfterDropTable(AfterDropTableEventData &data)
00350 {
00351 PBMSResultRec result;
00352 if (data.err != 0)
00353 return false;
00354
00355 if (MSEngine::dropTable(data.table.getSchemaName().c_str(), data.table.getTableName().c_str(), &result) != 0) {
00356 fprintf(stderr, "PBMSEvents: dropTable(\"%s.%s\") error (%d):'%s'\n",
00357 data.table.getSchemaName().c_str(), data.table.getTableName().c_str(), result.mr_code, result.mr_message);
00358 return true;
00359 }
00360 MSEngine::callCompleted(true);
00361
00362 return false;
00363 }
00364
00365
00366 static bool observeAfterRenameTable(AfterRenameTableEventData &data)
00367 {
00368 PBMSResultRec result;
00369 if (data.err != 0)
00370 return false;
00371
00372 const char *from_db = data.from.getSchemaName().c_str();
00373 const char *from_table = data.from.getTableName().c_str();
00374 const char *to_db = data.to.getSchemaName().c_str();
00375 const char *to_table = data.to.getTableName().c_str();
00376
00377 if (MSEngine::renameTable(from_db, from_table, to_db, to_table, &result) != 0) {
00378 fprintf(stderr, "PBMSEvents: renameTable(\"%s.%s\" To \"%s.%s\") error (%d):'%s'\n",
00379 from_db, from_table, to_db, to_table, result.mr_code, result.mr_message);
00380 return true;
00381 }
00382 MSEngine::callCompleted(true);
00383
00384 return false;
00385 }
00386
00387
00388
00389 void PBMSEvents::registerTableEventsDo(TableShare &table_share, EventObserverList &observers)
00390 {
00391 if ((PBMSParameters::isPBMSEventsEnabled() == false)
00392 || (PBMSParameters::isBLOBTable(table_share.getSchemaName(), table_share.getTableName()) == false))
00393 return;
00394
00395 if (table_share.blob_fields > 0) {
00396 registerEvent(observers, BEFORE_INSERT_RECORD, PBMSParameters::getBeforeInsertEventPosition());
00397 registerEvent(observers, AFTER_INSERT_RECORD);
00398 registerEvent(observers, BEFORE_UPDATE_RECORD, PBMSParameters::getBeforeUptateEventPosition());
00399 registerEvent(observers, AFTER_UPDATE_RECORD);
00400 registerEvent(observers, AFTER_DELETE_RECORD);
00401 }
00402 }
00403
00404
00405
00406 void PBMSEvents::registerSchemaEventsDo(const std::string &db, EventObserverList &observers)
00407 {
00408 if ((PBMSParameters::isPBMSEventsEnabled() == false)
00409 || (PBMSParameters::isBLOBDatabase(db.c_str()) == false))
00410 return;
00411
00412 registerEvent(observers, AFTER_DROP_TABLE);
00413 registerEvent(observers, AFTER_RENAME_TABLE);
00414 }
00415
00416
00417
00418 void PBMSEvents::registerSessionEventsDo(Session &, EventObserverList &observers)
00419 {
00420 if (PBMSParameters::isPBMSEventsEnabled() == false)
00421 return;
00422
00423 registerEvent(observers, AFTER_DROP_DATABASE);
00424 }
00425
00426
00427
00428 bool PBMSEvents::observeEventDo(EventData &data)
00429 {
00430 bool result= false;
00431
00432 switch (data.event) {
00433 case AFTER_DROP_DATABASE:
00434 result = observeAfterDropDatabase((AfterDropDatabaseEventData &)data);
00435 break;
00436
00437 case AFTER_DROP_TABLE:
00438 result = observeAfterDropTable((AfterDropTableEventData &)data);
00439 break;
00440
00441 case AFTER_RENAME_TABLE:
00442 result = observeAfterRenameTable((AfterRenameTableEventData &)data);
00443 break;
00444
00445 case BEFORE_INSERT_RECORD:
00446 result = observeBeforeInsertRecord((BeforeInsertRecordEventData &)data);
00447 break;
00448
00449 case AFTER_INSERT_RECORD:
00450 result = observeAfterInsertRecord((AfterInsertRecordEventData &)data);
00451 break;
00452
00453 case BEFORE_UPDATE_RECORD:
00454 result = observeBeforeUpdateRecord((BeforeUpdateRecordEventData &)data);
00455 break;
00456
00457 case AFTER_UPDATE_RECORD:
00458 result = observeAfterUpdateRecord((AfterUpdateRecordEventData &)data);
00459 break;
00460
00461 case AFTER_DELETE_RECORD:
00462 result = observeAfterDeleteRecord((AfterDeleteRecordEventData &)data);
00463 break;
00464
00465 default:
00466 fprintf(stderr, "PBMSEvents: Unexpected event '%s'\n", EventObserver::eventName(data.event));
00467
00468 }
00469
00470 return result;
00471 }
00472