Drizzled Public API Documentation

udf_ms.cc

00001 #ifndef DRIZZLED
00002 //#define SUPPORT_PBMS_TRIGGERS
00003 #ifdef SUPPORT_PBMS_TRIGGERS
00004 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00005  *
00006  * PrimeBase Media Stream for MySQL
00007  *
00008  * This program is free software; you can redistribute it and/or modify
00009  * it under the terms of the GNU General Public License as published by
00010  * the Free Software Foundation; either version 2 of the License, or
00011  * (at your option) any later version.
00012  *
00013  * This program is distributed in the hope that it will be useful,
00014  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00015  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016  * GNU General Public License for more details.
00017  *
00018  * You should have received a copy of the GNU General Public License
00019  * along with this program; if not, write to the Free Software
00020  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00021  *
00022  * Barry Leslie
00023  *
00024  * 2008-09-11
00025  *
00026  * H&G2JCtL
00027  *
00028  * User Defined Functions for use in triggers for non PBMS enabled engines.
00029  *
00030  */
00031 
00032 #include "cslib/CSConfig.h"
00033 
00034 #include <stdlib.h>
00035 #include <stdio.h>
00036 #include <string.h>
00037 
00038 //#include "mysql_priv.h"
00039 
00040 #include <mysql.h>
00041 #include <ctype.h>
00042 
00043 #include "cslib/CSGlobal.h"
00044 #include "cslib/CSStrUtil.h"
00045 #include "cslib/CSThread.h"
00046 
00047 #include "engine_ms.h"
00048 
00049 #ifdef MOVE_THIS_TO_ITS_OWN_FILE
00050 bool pbms_enabled(const char *name)
00051 {
00052   bool found = false;
00053   PBMSSharedMemoryPtr sh_mem = StreamingEngines->sharedMemory;
00054   PBMSEnginePtr   engine;
00055   
00056   if (sh_mem) {
00057     for (int i = 0; i<sh_mem->sm_list_len && !found; i++) {
00058       if (engine = sh_mem->sm_engine_list[i]) {
00059         found = (strcasecmp(name, engine->ms_engine_name) == 0);
00060       }
00061     }
00062   }
00063   
00064   return found;
00065 }
00066 
00067 // =============================
00068 #define PBMS_ENGINE_NAME_LEN 64 // This should be big enough.
00069 bool pbms_engine_is_enabled(char *name, size_t len)
00070 {
00071   bool found = false;
00072 
00073   if (len < PBMS_ENGINE_NAME_LEN) {
00074     char engine_name[PBMS_ENGINE_NAME_LEN];
00075     memcpy(engine_name, name, len);
00076     engine_name[len] = 0;
00077     
00078     found = pbms_enabled(engine_name);  
00079   }
00080   
00081   return found;
00082 }
00083 
00084 // =============================
00085 static bool trig_open_table(void **open_table, char *table_path, PBMSResultPtr result)
00086 { 
00087   return do_open_table(open_table, table_path, result);
00088 }
00089 
00090 char *pbms_trig_use_blob(const char *database, size_t db_len, const char *table, size_t tab_len, unsigned short col_position, char *url, size_t url_len, char *out_url, PBMSResultPtr result)
00091 {
00092   void *open_table = NULL;
00093   int rtc;
00094   char blob_url[PBMS_BLOB_URL_SIZE], *ret_blob_url;
00095   char table_path[PATH_MAX];
00096   
00097   if (url_len >= PBMS_BLOB_URL_SIZE) {
00098     pbms_error_result(CS_CONTEXT, MS_ERR_INCORRECT_URL, "Incorrect URL", result);
00099     return NULL; 
00100   }
00101   
00102   if (pbms_table_path(database, db_len, table, tab_len,  table_path, result) || trig_open_table(&open_table, table_path, result))
00103     return NULL;
00104   
00105   memcpy(blob_url, url, url_len);
00106   blob_url[url_len] = 0;
00107   
00108   // Internally col_position are '0' based for consistency with the internal PBXT calls to PBMS.
00109   if (!(rtc = ms_use_blob(open_table, &ret_blob_url, blob_url, col_position -1, result))) {   
00110     if (!(rtc = ms_retain_blobs(open_table,  result)))
00111       cs_strcpy(PBMS_BLOB_URL_SIZE, out_url, ret_blob_url);
00112   }
00113   
00114   ms_close_table(open_table);
00115   
00116   if (rtc)
00117     return NULL;
00118   
00119   return out_url; 
00120 }
00121 
00122 // =============================
00123 char *pbms_use_blob(const char *database, const char *table, unsigned short col_position, char *url, size_t url_len, char *out_url, PBMSResultPtr result)
00124 {
00125   return pbms_trig_use_blob(database, strlen(database), table, strlen(table), col_position, url, url_len, out_url, result);
00126 }
00127 
00128 // =============================
00129 int pbms_trig_release_blob(const char *database, size_t db_len, const char *table, size_t tab_len, unsigned short col_position, const char *url, size_t url_len, PBMSResultPtr result)
00130 {
00131   void *open_table = NULL;
00132   int rtc;
00133   char blob_url[PBMS_BLOB_URL_SIZE];
00134   char table_path[PATH_MAX];
00135   
00136   if (url_len >= PBMS_BLOB_URL_SIZE)
00137     return 0; // url is too long to be a valid blob url.
00138 
00139   if (pbms_table_path(database, db_len, table, tab_len,  table_path, result) || trig_open_table(&open_table, table_path, result))
00140     return 1;
00141   
00142   memcpy(blob_url, url, url_len);
00143   blob_url[url_len] = 0;
00144   
00145   rtc = ms_release_blob(open_table, blob_url, result);
00146   
00147   ms_close_table(open_table);
00148   
00149   return rtc; 
00150 }
00151 
00152 // =============================
00153 int pbms_release_blob(const char *database, const char *table, unsigned short col_position, const char *url, size_t url_len, PBMSResultPtr result)
00154 {
00155   return pbms_trig_release_blob(database, strlen(database), table, strlen(table), col_position, url, url_len, result);
00156 }
00157 
00158 // =============================
00159 int pbms_new_blob(const char *database, const char *table, unsigned short col_position, unsigned char *blob, size_t blob_len, char *out_url, PBMSResultPtr result)
00160 {
00161   void *open_table = NULL;
00162   char table_path[PATH_MAX];
00163   int rtc;
00164   
00165   if (pbms_table_path(database, strlen(database), table,  strlen(table),  table_path, result) || trig_open_table(&open_table, table_path, result))
00166     return 1;
00167     
00168   rtc =  ms_create_blob(open_table, blob, blob_len, out_url, col_position -1,  result);
00169     
00170   ms_close_table(open_table);
00171   
00172   return rtc; 
00173 }
00174 
00175 // =============================
00176 int pbms_trig_drop_table(const char *database, size_t db_len, const char *table, size_t tab_len, PBMSResultPtr result)
00177 {
00178   char table_path[PATH_MAX];
00179   
00180   if (pbms_table_path(database, db_len, table, tab_len,  table_path, result))
00181     return 1;
00182   
00183   return ms_drop_table(table_path, result);
00184 }
00185 
00186 // =============================
00187 int pbms_drop_table(const char *database, const char *table, PBMSResultPtr result)
00188 {
00189   return pbms_trig_drop_table(database, strlen(database), table, strlen(table), result);
00190 }
00191 
00192 // =============================
00193 int pbms_trig_rename_table(const char *database, size_t db_len, const char *o_table, size_t o_tab_len, const char *n_table, size_t n_tab_len, PBMSResultPtr result)
00194 {
00195   char o_table_path[PATH_MAX], n_table_path[PATH_MAX];
00196   
00197   if (pbms_table_path(database, db_len, o_table, o_tab_len,  o_table_path, result) || pbms_table_path(database, db_len, n_table, n_tab_len,  n_table_path, result) )
00198     return 1;
00199   
00200   return ms_rename_table(o_table_path, n_table_path, result);
00201 }
00202 
00203 // =============================
00204 int pbms_rename_table(const char *database, const char *o_table,  const char *n_table, PBMSResultPtr result)
00205 {
00206   return pbms_trig_rename_table(database, strlen(database), o_table, strlen(o_table), n_table, strlen(n_table), result);
00207 }
00208 
00209 #endif MOVE_THIS_TO_ITS_OWN_FILE
00210 
00211 extern "C" {
00212 my_bool pbms_insert_blob_trig_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
00213 char *pbms_insert_blob_trig(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *res_length, char *is_null, char *error);
00214 void pbms_insert_blob_trig_deinit(UDF_INIT *initid);
00215 
00216 //-----------
00217 my_bool pbms_update_blob_trig_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
00218 char *pbms_update_blob_trig(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *res_length, char *is_null, char *error);
00219 void pbms_update_blob_trig_deinit(UDF_INIT *initid);
00220 
00221 //-----------
00222 my_bool pbms_delete_blob_trig_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
00223 longlong pbms_delete_blob_trig(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error);
00224 
00225 //-----------
00226 my_bool pbms_delete_all_blobs_in_table_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
00227 longlong pbms_delete_all_blobs_in_table(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error);
00228 
00229 //-----------
00230 my_bool pbms_rename_table_with_blobs_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
00231 longlong pbms_rename_table_with_blobs(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error);
00232 
00233 //-----------
00234 my_bool pbms_enabled_engine_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
00235 longlong pbms_enabled_engine(UDF_INIT *initid, UDF_ARGS *args, char *is_null, char *error);
00236 }
00237 
00238 static void report_udf_error(UDF_INIT *initid __attribute__((unused)), char *error, const char *func, const char *message)
00239 {
00240   *error = 1;
00241   
00242   // I wish there were a way to pass the error text up to the caller but I do not know if/how it can be done.
00243   // So for now just send it to stderr.
00244   fprintf(stderr, "PBMS UDF Error (%s): %s\n", func,  message);
00245 }
00246 
00247 static char *local_reference_blob(UDF_INIT *initid, char *database, size_t db_len, char *table, size_t tab_len, unsigned short col_position, char *url, size_t url_len, char *result, char *error, const char *func)
00248 {
00249   char *out_url = NULL, blob_url[PBMS_BLOB_URL_SIZE];
00250   PBMSResultRec pbmsResult = {0};
00251   
00252   out_url = pbms_trig_use_blob(database, db_len, table, tab_len, col_position, url, url_len, blob_url, &pbmsResult); 
00253 
00254   if (out_url) {
00255     size_t url_len = strlen(out_url) +1;
00256     if (url_len < 255) {
00257       cs_strcpy(255, result,  out_url);
00258       out_url = result;
00259     } else {
00260       initid->ptr = (char*) malloc(url_len);
00261       if (initid->ptr) {
00262         cs_strcpy(url_len, initid->ptr,  out_url);
00263         out_url = initid->ptr;
00264       } else {
00265         report_udf_error(initid, error, func, "Couldn't allocate memory");
00266         out_url = NULL;
00267       }
00268     }
00269   } else if  (pbmsResult.mr_code == MS_ERR_INCORRECT_URL) 
00270     out_url = url; // Not a URL so just return it so that it is inserted as is.
00271   else 
00272     report_udf_error(initid, error, func, pbmsResult.mr_message);
00273   
00274   
00275   return out_url;
00276 }
00277 
00278 //======================================================================
00279 // CREATE FUNCTION pbms_insert_blob_trig RETURNS STRING SONAME "libpbms.so";
00280 // pbms_insert_blob_trig(database, table, col_position, blob_url);
00281 my_bool pbms_insert_blob_trig_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
00282 {
00283   if (args->arg_count != 4 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT || args->arg_type[2] != INT_RESULT || args->arg_type[3] != STRING_RESULT)
00284   {
00285     strcpy(message,"Wrong arguments to pbms_insert_blob_trig()");
00286     return 1;
00287   }
00288   args->maybe_null[0] = 0;
00289   args->maybe_null[1] = 0;
00290   args->maybe_null[2] = 0;
00291   args->maybe_null[3] = 1;
00292 
00293   initid->max_length=PBMS_BLOB_URL_SIZE;
00294   initid->maybe_null=1;
00295   initid->ptr=NULL;
00296   return 0;
00297 }
00298 
00299 void pbms_insert_blob_trig_deinit(UDF_INIT *initid)
00300 {
00301   if (initid->ptr)
00302     free(initid->ptr);
00303 }
00304 
00305 #define INT_ARG(a)  (*((longlong*) a))
00306 
00307 char *pbms_insert_blob_trig(UDF_INIT *initid, UDF_ARGS *args,
00308                     char *result, unsigned long *res_length, char *is_null,
00309                     char *error)
00310 {
00311   char *out_url;
00312   
00313   *is_null=1;
00314   
00315   // The first parameter is the table name which should never be NULL or an empty string.
00316   if (!args->args[0] || !args->lengths[0] || !args->args[1] || !args->lengths[1]) {
00317     report_udf_error(initid, error, __FUNC__, "Bad arguments");
00318     return NULL;
00319   }
00320 
00321   if (!args->args[3] || !args->lengths[3]) {
00322     return NULL;
00323   }
00324   
00325   out_url = local_reference_blob(initid, args->args[0], args->lengths[0], args->args[1], args->lengths[1], INT_ARG(args->args[2]), args->args[3], args->lengths[3], result, error, __FUNC__);   
00326   if (!out_url) 
00327     return NULL;
00328 
00329   *is_null=0;
00330   *res_length = strlen(out_url);
00331   return out_url; 
00332 }
00333 
00334 //======================================================================
00335 // CREATE FUNCTION pbms_update_blob_trig RETURNS STRING SONAME "libpbms.so";
00336 // pbms_update_blob_trig(database, table, col_position, old_blob_url, new_blob_url);
00337 my_bool pbms_update_blob_trig_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
00338 {
00339   if (args->arg_count != 5 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT || args->arg_type[2] != INT_RESULT || args->arg_type[3] != STRING_RESULT || args->arg_type[4] != STRING_RESULT)
00340   {
00341     strcpy(message,"Wrong arguments to pbms_update_blob_trig()");
00342     return 1;
00343   }
00344   args->maybe_null[0] = 0;
00345   args->maybe_null[1] = 0;
00346   args->maybe_null[2] = 0;
00347   args->maybe_null[3] = 1;
00348   args->maybe_null[4] = 1;
00349   
00350   initid->maybe_null=1;
00351   initid->ptr=NULL;
00352   return 0;
00353 }
00354 
00355 void pbms_update_blob_trig_deinit(UDF_INIT *initid)
00356 {
00357   if (initid->ptr)
00358     free(initid->ptr);
00359 }
00360 
00361 char *pbms_update_blob_trig(UDF_INIT *initid, UDF_ARGS *args,
00362                     char *result, unsigned long *res_length, char *is_null,
00363                     char *error)
00364 {
00365   char *out_url = NULL;
00366   
00367   
00368   // The first parameter is the table name which should never be NULL or an empty string.
00369   if (!args->args[0] || !args->lengths[0] || !args->args[1] || !args->lengths[1]) {
00370     report_udf_error(initid, error, __FUNC__, "Bad arguments");
00371     return NULL;
00372   }
00373   
00374   // Check to see if the blob url changed
00375   if (args->lengths[2] == args->lengths[3] && !memcmp(args->args[2], args->args[3], args->lengths[3])) {
00376     if (args->lengths[2]) {
00377       *is_null=0;
00378       *res_length = args->lengths[2];
00379       return args->args[2];
00380     }
00381     
00382     *is_null=1;
00383     return NULL;
00384   }
00385   
00386   
00387   if (args->lengths[4] && args->args[4]) { // Reference the new blob.
00388     out_url = local_reference_blob(initid, args->args[0], args->lengths[0], args->args[1], args->lengths[1], INT_ARG(args->args[2]), args->args[4], args->lengths[4], result, error, __FUNC__); 
00389     if (!out_url) 
00390       return 0;
00391   }
00392   
00393   if (args->lengths[3] && args->args[3]) { // Dereference the old blob
00394     PBMSResultRec pbmsResult = {0};
00395     if (pbms_trig_release_blob(args->args[0], args->lengths[0], args->args[1], args->lengths[1], INT_ARG(args->args[2]), args->args[3], args->lengths[3], &pbmsResult)) {
00396       report_udf_error(initid, error,  __FUNC__, pbmsResult.mr_message);
00397       if (out_url)
00398         pbms_trig_release_blob(args->args[0], args->lengths[0], args->args[1], args->lengths[1], INT_ARG(args->args[2]), out_url, strlen(out_url), &pbmsResult);
00399       return NULL;
00400     } 
00401   }
00402   
00403   if (out_url) {
00404     *is_null=0;
00405     *res_length = strlen(out_url);
00406   }
00407   
00408   return out_url; 
00409 }
00410 
00411 //======================================================================
00412 // CREATE FUNCTION pbms_delete_blob_trig RETURNS INT SONAME "libpbms.so";
00413 // pbms_delete_blob_trig(database, table, col_position, blob_url);
00414 my_bool pbms_delete_blob_trig_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
00415 {
00416   if (args->arg_count != 4 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT || args->arg_type[2] != INT_RESULT || args->arg_type[3] != STRING_RESULT)
00417   {
00418     strcpy(message,"Wrong arguments to pbms_delete_blob_trig()");
00419     return 1;
00420   }
00421   args->maybe_null[0] = 0;
00422   args->maybe_null[1] = 0;
00423   args->maybe_null[2] = 0;
00424   args->maybe_null[3] = 1;  
00425   initid->maybe_null=0;
00426   
00427   return 0;
00428 }
00429 
00430 longlong pbms_delete_blob_trig(UDF_INIT *initid, UDF_ARGS *args, char *is_null __attribute__((unused)), char *error)
00431 {
00432   PBMSResultRec pbmsResult = {0};
00433   
00434   // The first parameter is the table name which should never be NULL or an empty string.
00435   if (!args->args[0] || !args->lengths[0] || !args->args[1] || !args->lengths[1]) {
00436     report_udf_error(initid, error, __FUNC__, "Bad arguments");
00437     return 1;
00438   }
00439   
00440   if (!args->args[3] || !args->lengths[3]) {
00441     return 0; //Dropping a NULL blob.
00442   }
00443   
00444   if (! pbms_trig_release_blob(args->args[0], args->lengths[0], args->args[1], args->lengths[1], INT_ARG(args->args[2]), args->args[3], args->lengths[3], &pbmsResult))
00445     return  0;
00446   
00447   report_udf_error(initid, error,  __FUNC__, pbmsResult.mr_message);
00448   return 1;
00449 }
00450 
00451 
00452 //======================================================================
00453 // CREATE FUNCTION pbms_delete_all_blobs_in_table RETURNS INT SONAME "libpbms.so";
00454 // pbms_delete_all_blobs_in_table(database, table);
00455 my_bool pbms_delete_all_blobs_in_table_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
00456 {
00457   if (args->arg_count != 2 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT)
00458   {
00459     strcpy(message,"Wrong arguments to pbms_delete_all_blobs_in_table()");
00460     return 1;
00461   }
00462   args->maybe_null[0] = 0;
00463   args->maybe_null[1] = 0;
00464   initid->maybe_null=0;
00465   
00466   return 0;
00467 }
00468 
00469 longlong pbms_delete_all_blobs_in_table(UDF_INIT *initid, UDF_ARGS *args, char *is_null __attribute__((unused)), char *error)
00470 {
00471   PBMSResultRec pbmsResult = {0};
00472   
00473   // The first parameter is the table name which should never be NULL or an empty string.
00474   if (args->arg_count != 2 || !args->args[0] || !args->lengths[0] || !args->args[1] || !args->lengths[1]) {
00475     report_udf_error(initid, error, __FUNC__, "Bad arguments");
00476     return 1;
00477   }
00478   
00479   if (!pbms_trig_drop_table(args->args[0], args->lengths[0], args->args[1], args->lengths[1], &pbmsResult)) 
00480     return 0;
00481   
00482   report_udf_error(initid, error,  __FUNC__, pbmsResult.mr_message);
00483   return 1;
00484 }
00485 
00486 //======================================================================
00487 // CREATE FUNCTION pbms_rename_table_with_blobs RETURNS INT SONAME "libpbms.so";
00488 // pbms_rename_table_with_blobs(database, old_table, new_table);
00489 my_bool pbms_rename_table_with_blobs_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
00490 {
00491   if (args->arg_count != 3 || args->arg_type[0] != STRING_RESULT || args->arg_type[1] != STRING_RESULT || args->arg_type[2] != STRING_RESULT)
00492   {
00493     strcpy(message,"Wrong arguments to pbms_rename_table_with_blobs()");
00494     return 1;
00495   }
00496   args->maybe_null[0] = 0;
00497   args->maybe_null[1] = 0;
00498   args->maybe_null[2] = 0;
00499   initid->maybe_null=0;
00500   
00501   return 0;
00502 }
00503 
00504 longlong pbms_rename_table_with_blobs(UDF_INIT *initid, UDF_ARGS *args, char *is_null __attribute__((unused)), char *error)
00505 {
00506   PBMSResultRec pbmsResult = {0};
00507   
00508   // The first parameter is the table name which should never be NULL or an empty string.
00509   if (args->arg_count != 3 || !args->args[0] || !args->lengths[0] || !args->args[1] || !args->lengths[1] || !args->args[2] || !args->lengths[2]) {
00510     report_udf_error(initid, error, __FUNC__, "Bad arguments");
00511     return 1;
00512   }
00513   
00514   if (!pbms_trig_rename_table(args->args[0], args->lengths[0], args->args[1], args->lengths[1], args->args[2], args->lengths[2], &pbmsResult)) 
00515     return 0;
00516   
00517   report_udf_error(initid, error,  __FUNC__, pbmsResult.mr_message);
00518   return 1;
00519 }
00520 
00521 //======================================================================
00522 // CREATE FUNCTION pbms_enabled_engine RETURNS INT SONAME "libpbms.so";
00523 // pbms_enabled_engine(database, table);
00524 my_bool pbms_enabled_engine_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
00525 {
00526   if (args->arg_count != 1 || args->arg_type[0] != STRING_RESULT)
00527   {
00528     strcpy(message,"Wrong arguments to pbms_enabled_engine()");
00529     return 1;
00530   }
00531   args->maybe_null[0] = 0;
00532   initid->maybe_null=0;
00533   
00534   return 0;
00535 }
00536 
00537 longlong pbms_enabled_engine(UDF_INIT *initid, UDF_ARGS *args, char *is_null __attribute__((unused)), char *error)
00538 {
00539   // The first parameter is the engine name which should never be NULL or an empty string.
00540   if (args->arg_count != 1 || !args->args[0] || !args->lengths[0]) {
00541     report_udf_error(initid, error, __FUNC__, "Bad arguments");
00542     return -1;
00543   }
00544   
00545   return pbms_engine_is_enabled(args->args[0], args->lengths[0]);
00546 }
00547 
00548 #endif // SUPPORT_PBMS_TRIGGERS
00549 #endif // DRIZZLED