Drizzled Public API Documentation

pbms.h

00001 /* Copyright (C) 2010 PrimeBase Technologies GmbH
00002  * All rights reserved.
00003  * 
00004  * Redistribution and use in source and binary forms, with or without 
00005  * modification, are permitted provided that the following conditions are met:
00006  * 
00007  *     * Redistributions of source code must retain the above copyright notice, 
00008  *    this list of conditions and the following disclaimer.
00009  *     * Redistributions in binary form must reproduce the above copyright notice, 
00010  *    this list of conditions and the following disclaimer in the documentation 
00011  *    and/or other materials provided with the distribution.
00012  *     * Neither the name of the "PrimeBase Technologies GmbH" nor the names of its 
00013  *    contributors may be used to endorse or promote products derived from this 
00014  *    software without specific prior written permission.
00015  * 
00016  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 
00017  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 
00018  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 
00019  * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, 
00020  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 
00021  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
00022  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 
00023  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
00024  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
00025  * POSSIBILITY OF SUCH DAMAGE. 
00026  *  
00027  * PrimeBase Media Stream for MySQL and Drizzle
00028  *
00029  * Original author: Paul McCullagh
00030  * Continued development: Barry Leslie
00031  * H&G2JCtL
00032  *
00033  * 2007-06-01
00034  *
00035  * This file contains the BLOB streaming interface engines that
00036  * are streaming enabled.
00037  *
00038  */
00039 #pragma once
00040 #ifndef __PBMS_H__
00041 #define __PBMS_H__
00042 
00043 #include <stdio.h>
00044 #include <sys/types.h>
00045 #include <unistd.h>
00046 #include <stdlib.h>
00047 #include <fcntl.h>
00048 #include <string.h>
00049 #include <dirent.h>
00050 #include <signal.h>
00051 #include <ctype.h>
00052 #include <errno.h>
00053 #include <inttypes.h>
00054 #include <stdint.h>
00055 
00056 #ifdef USE_PRAGMA_INTERFACE
00057 #pragma interface     /* gcc class implementation */
00058 #endif
00059 
00060 
00061 #define MS_SHARED_MEMORY_MAGIC      0x7E9A120C
00062 #define MS_ENGINE_VERSION       3
00063 #define MS_CALLBACK_VERSION       6
00064 #define MS_SHARED_MEMORY_VERSION    2
00065 #define MS_ENGINE_LIST_SIZE       10
00066 #define MS_TEMP_FILE_PREFIX       "pbms_temp_"
00067 
00068 #define MS_BLOB_HANDLE_SIZE       300
00069 
00070 #define SH_MASK             ((S_IRUSR | S_IWUSR) | (S_IRGRP | S_IWGRP) | (S_IROTH))
00071 
00072 #define MS_OK             0
00073 #define MS_ERR_ENGINE         1             /* Internal engine error. */
00074 #define MS_ERR_UNKNOWN_TABLE      2             /* Returned if the engine cannot open the given table. */
00075 #define MS_ERR_NOT_FOUND        3             /* The BLOB cannot be found. */
00076 #define MS_ERR_TABLE_LOCKED       4             /* Table is currently locked. */
00077 #define MS_ERR_INCORRECT_URL      5
00078 #define MS_ERR_AUTH_FAILED        6
00079 #define MS_ERR_NOT_IMPLEMENTED      7
00080 #define MS_ERR_UNKNOWN_DB       8
00081 #define MS_ERR_REMOVING_REPO      9
00082 #define MS_ERR_DATABASE_DELETED     10
00083 #define MS_ERR_DUPLICATE        11            /* Attempt to insert a duplicate key into a system table. */
00084 #define MS_ERR_INVALID_RECORD     12
00085 #define MS_ERR_RECOVERY_IN_PROGRESS   13
00086 #define MS_ERR_DUPLICATE_DB       14
00087 #define MS_ERR_DUPLICATE_DB_ID      15
00088 #define MS_ERR_INVALID_OPERATION    16
00089 #define MS_ERR_MISSING_CLOUD_REFFERENCE 17
00090 #define MS_ERR_SYSTAB_VERSION     18
00091 
00092 #define MS_LOCK_NONE          0
00093 #define MS_LOCK_READONLY        1
00094 #define MS_LOCK_READ_WRITE        2
00095 
00096 #define PBMS_BLOB_URL_SIZE        120
00097 
00098 #define PBMS_FIELD_COL_SIZE       128
00099 #define PBMS_FIELD_COND_SIZE      300
00100 
00101 #define MS_RESULT_MESSAGE_SIZE      300
00102 #define MS_RESULT_STACK_SIZE      200
00103 
00104 typedef struct PBMSResultRec {
00105   uint8_t       mr_had_blobs;             /* A flag to indicate if the statement had any PBMS blobs. */
00106   int           mr_code;                /* Engine specific error code. */ 
00107   char          mr_message[MS_RESULT_MESSAGE_SIZE];   /* Error message, required if non-zero return code. */
00108   char          mr_stack[MS_RESULT_STACK_SIZE];     /* Trace information about where the error occurred. */
00109 } PBMSResultRec, *PBMSResultPtr;
00110 
00111 
00112 
00113 typedef struct PBMSBlobID {
00114   uint32_t        bi_db_id; 
00115   uint64_t        bi_blob_size; 
00116   uint64_t        bi_blob_id;       // or repo file offset if type = REPO
00117   uint64_t        bi_blob_ref_id;     
00118   uint32_t        bi_tab_id;        // or repo ID if type = REPO
00119   uint32_t        bi_auth_code;
00120   uint32_t        bi_blob_type;
00121 } PBMSBlobIDRec, *PBMSBlobIDPtr;
00122 
00123 
00124 typedef struct MSBlobURL {
00125   uint8_t         bu_type;
00126   uint32_t        bu_db_id;
00127   uint32_t        bu_tab_id;        // or repo ID if type = REPO
00128   uint64_t        bu_blob_id;       // or repo file offset if type = REPO
00129   uint32_t        bu_auth_code;
00130   uint32_t        bu_server_id;
00131   uint64_t        bu_blob_size;     
00132   uint64_t        bu_blob_ref_id;     // Unique identifier of the blob reference
00133 } MSBlobURLRec, *MSBlobURLPtr;
00134 
00135 
00136 typedef struct PBMSBlobURL {
00137   char          bu_data[PBMS_BLOB_URL_SIZE];
00138 } PBMSBlobURLRec, *PBMSBlobURLPtr;
00139 
00140 typedef struct PBMSEngineRec {
00141   int           ms_version;             /* MS_ENGINE_VERSION */
00142   int           ms_index;             /* The index into the engine list. */
00143   int           ms_removing;            /* TRUE (1) if the engine is being removed. */
00144   int           ms_internal;            /* TRUE (1) if the engine is supported directly in the mysq/drizzle handler code . */
00145   char          ms_engine_name[32];
00146   int           ms_has_transactions;        /* TRUE (1) if the engine supports transactions. */
00147 } PBMSEngineRec, *PBMSEnginePtr;
00148 
00149 /*      2 10    1     10      20      10        10      20        20
00150  * Format: "~*"<db_id><'~' || '_'><tab_id>"-"<blob_id>"-"<auth_code>"-"<server_id>"-"<blob_ref_id>"-"<blob_size>
00151  */
00152 
00153 #ifndef  PRIu64
00154 #define URL_FMT "~*%u%c%u-%llu-%x-%u-%llu-%llu"
00155 #else
00156 #define URL_FMT "~*%"PRIu32"%c%"PRIu32"-%"PRIu64"-%x-%"PRIu32"-%"PRIu64"-%"PRIu64""
00157 #endif
00158 #define MS_URL_TYPE_BLOB  '~'
00159 #define MS_URL_TYPE_REPO  '_'
00160 class PBMSBlobURLTools
00161 {
00162   public:
00163   static bool couldBeURL(const char *blob_url, size_t size, MSBlobURLPtr blob)
00164   {
00165     if (blob_url && (size < PBMS_BLOB_URL_SIZE) && (size > 16)) {
00166       MSBlobURLRec ignored_blob;
00167       char  buffer[PBMS_BLOB_URL_SIZE+1];
00168       char  junk[5];
00169       int   scanned;
00170       
00171       if (!blob)
00172         blob = &ignored_blob;
00173       
00174       junk[0] = 0;
00175       
00176       // There is no guarantee that the URL will be null terminated
00177       // so always copy it into our own buffer to be safe.
00178       memcpy(buffer, blob_url, size);
00179       buffer[size] = 0;
00180       blob_url = buffer;
00181       
00182       scanned = sscanf(blob_url, URL_FMT"%4s", 
00183         &blob->bu_db_id, 
00184         &blob->bu_type, 
00185         &blob->bu_tab_id, 
00186         &blob->bu_blob_id, 
00187         &blob->bu_auth_code, 
00188         &blob->bu_server_id, 
00189         &blob->bu_blob_ref_id, 
00190         &blob->bu_blob_size, 
00191         junk);
00192         
00193       if ((scanned != 8) || (blob->bu_type != MS_URL_TYPE_BLOB && blob->bu_type != MS_URL_TYPE_REPO)) {// If junk is found at the end this will also result in an invalid URL. 
00194         //printf("Bad URL \"%s\": scanned = %d, junk: %d, %d, %d, %d\n", blob_url, scanned, junk[0], junk[1], junk[2], junk[3]); 
00195         return false;
00196       }
00197     
00198       return true;
00199     }
00200     
00201     return false;
00202   }
00203   
00204   static bool couldBeURL(const char *blob_url, MSBlobURLPtr blob)
00205   {
00206     return couldBeURL(blob_url, strlen(blob_url), blob);
00207   }
00208   
00209   static void buildBlobURL(MSBlobURLPtr blob, PBMSBlobURLPtr url)
00210   {
00211     snprintf(url->bu_data, PBMS_BLOB_URL_SIZE, URL_FMT, blob->bu_db_id, 
00212                 blob->bu_type, 
00213                 blob->bu_tab_id, 
00214                 blob->bu_blob_id, 
00215                 blob->bu_auth_code, 
00216                 blob->bu_server_id, 
00217                 blob->bu_blob_ref_id, 
00218                 blob->bu_blob_size);
00219   }
00220 };
00221 
00222 #ifndef DRIZZLED
00223 /*
00224  * This function should never be called directly, it is called
00225  * by deregisterEngine() below.
00226  */
00227 typedef void (*ECRegisterdFunc)(PBMSEnginePtr engine);
00228 
00229 typedef void (*ECDeregisterdFunc)(PBMSEnginePtr engine);
00230 
00231 /*
00232  * Call this function to store a BLOB in the repository the BLOB's
00233  * URL will be returned. The returned URL buffer is expected to be atleast 
00234  * PBMS_BLOB_URL_SIZE long.
00235  *
00236  * The BLOB URL must still be retained or it will automaticly be deleted after a timeout expires.
00237  */
00238 typedef int (*ECCreateBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob, size_t blob_len, PBMSBlobURLPtr blob_url, PBMSResultPtr result);
00239 
00240 /*
00241  * Call this function for each BLOB to be retained. When a BLOB is used, the 
00242  * URL may be changed. The returned URL buffer is expected to be atleast 
00243  * PBMS_BLOB_URL_SIZE long.
00244  *
00245  * The returned URL must be inserted into the row in place of the given
00246  * URL.
00247  */
00248 typedef int (*ECRetainBlobsFunc)(bool built_in, const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, unsigned short col_index, PBMSResultPtr result);
00249 
00250 /*
00251  * If a row containing a BLOB is deleted, then the BLOBs in the
00252  * row must be released.
00253  *
00254  * Note: if a table is dropped, all the BLOBs referenced by the
00255  * table are automatically released.
00256  */
00257 typedef int (*ECReleaseBlobFunc)(bool built_in, const char *db_name, const char *tab_name, char *blob_url, PBMSResultPtr result);
00258 
00259 typedef int (*ECDropTable)(bool built_in, const char *db_name, const char *tab_name, PBMSResultPtr result);
00260 
00261 typedef int (*ECRenameTable)(bool built_in, const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result);
00262 
00263 typedef void (*ECCallCompleted)(bool built_in, bool ok);
00264 
00265 typedef struct PBMSCallbacksRec {
00266   int           cb_version;             /* MS_CALLBACK_VERSION */
00267   ECRegisterdFunc     cb_register;
00268   ECDeregisterdFunc   cb_deregister;
00269   ECCreateBlobsFunc   cb_create_blob;
00270   ECRetainBlobsFunc   cb_retain_blob;
00271   ECReleaseBlobFunc   cb_release_blob;
00272   ECDropTable       cb_drop_table;
00273   ECRenameTable     cb_rename_table;
00274   ECCallCompleted     cb_completed;
00275 } PBMSCallbacksRec, *PBMSCallbacksPtr;
00276 
00277 typedef struct PBMSSharedMemoryRec {
00278   int           sm_magic;             /* MS_SHARED_MEMORY_MAGIC */
00279   int           sm_version;             /* MS_SHARED_MEMORY_VERSION */
00280   volatile int      sm_shutdown_lock;         /* "Cheap" lock for shutdown! */
00281   PBMSCallbacksPtr    sm_callbacks;
00282   int           sm_reserved1[20];
00283   void          *sm_reserved2[20];
00284   int           sm_list_size;
00285   int           sm_list_len;
00286   PBMSEnginePtr     sm_engine_list[MS_ENGINE_LIST_SIZE];
00287 } PBMSSharedMemoryRec, *PBMSSharedMemoryPtr;
00288 
00289 #ifdef PBMS_API
00290 
00291 class PBMS_API
00292 {
00293 private:
00294   const char *temp_prefix[3];
00295   bool built_in;
00296 
00297 public:
00298   PBMS_API(): sharedMemory(NULL) { 
00299     int i = 0;
00300     temp_prefix[i++] = MS_TEMP_FILE_PREFIX;
00301     temp_prefix[i++] = NULL;
00302     
00303   }
00304 
00305   ~PBMS_API() { }
00306 
00307   /*
00308    * This method is called by the PBMS engine during startup.
00309    */
00310   int PBMSStartup(PBMSCallbacksPtr callbacks, PBMSResultPtr result) {
00311     int err;
00312     
00313     deleteTempFiles();
00314     err = getSharedMemory(true, result);
00315     if (!err)
00316       sharedMemory->sm_callbacks = callbacks;
00317       
00318     return err;
00319   }
00320 
00321   /*
00322    * This method is called by the PBMS engine during startup.
00323    */
00324   void PBMSShutdown() {
00325     
00326     if (!sharedMemory)
00327       return;
00328       
00329     lock();
00330     sharedMemory->sm_callbacks = NULL;
00331 
00332     bool empty = true;
00333     for (int i=0; i<sharedMemory->sm_list_len && empty; i++) {
00334       if (sharedMemory->sm_engine_list[i]) 
00335         empty = false;
00336     }
00337 
00338     unlock();
00339     
00340     if (empty) 
00341       removeSharedMemory();
00342   }
00343 
00344   /*
00345    * Register the engine with the Stream Daemon.
00346    */
00347   int registerEngine(PBMSEnginePtr the_engine, PBMSResultPtr result) {
00348     int err;
00349 
00350     deleteTempFiles();
00351 
00352     // The first engine to register creates the shared memory.
00353     if ((err = getSharedMemory(true, result)))
00354       return err;
00355 
00356     lock();
00357     for (int i=0; i<sharedMemory->sm_list_size; i++) {
00358       if (!sharedMemory->sm_engine_list[i]) {
00359         PBMSEnginePtr engine;
00360         engine = (PBMSEnginePtr) malloc(sizeof(PBMSEngineRec));
00361         if (!engine) {
00362           strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Out of memory.");
00363           err = MS_ERR_ENGINE;
00364           goto done;
00365         }
00366         memcpy(engine, the_engine, sizeof(PBMSEngineRec));
00367         
00368         sharedMemory->sm_engine_list[i] = engine;
00369         engine->ms_index = i;
00370         if (i >= sharedMemory->sm_list_len)
00371           sharedMemory->sm_list_len = i+1;
00372         if (sharedMemory->sm_callbacks)
00373           sharedMemory->sm_callbacks->cb_register(engine);
00374           
00375         built_in = (engine->ms_internal == 1);
00376         err =  MS_OK;
00377         goto done;
00378       }
00379     }
00380     
00381     result->mr_code = 15010;
00382     strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Too many BLOB streaming engines already registered");
00383     *result->mr_stack = 0;
00384     
00385     err = MS_ERR_ENGINE;
00386     
00387   done:
00388     unlock();
00389     return err;
00390   }
00391 
00392   void lock() {
00393     while (sharedMemory->sm_shutdown_lock)
00394       usleep(10000);
00395     sharedMemory->sm_shutdown_lock++;
00396     while (sharedMemory->sm_shutdown_lock != 1) {
00397       usleep(random() % 10000);
00398       sharedMemory->sm_shutdown_lock--;
00399       usleep(10000);
00400       sharedMemory->sm_shutdown_lock++;
00401     }
00402   }
00403 
00404   void unlock() {
00405     sharedMemory->sm_shutdown_lock--;
00406   }
00407 
00408   void deregisterEngine(const char *engine_name) {
00409     PBMSResultRec result;
00410     int err;
00411 
00412     if ((err = getSharedMemory(false, &result)))
00413       return;
00414 
00415     lock();
00416 
00417     bool empty = true;
00418     for (int i=0; i<sharedMemory->sm_list_len; i++) {
00419       PBMSEnginePtr engine = sharedMemory->sm_engine_list[i];
00420       if (engine) {        
00421         if (strcmp(engine->ms_engine_name, engine_name) == 0) {
00422           if (sharedMemory->sm_callbacks)
00423             sharedMemory->sm_callbacks->cb_deregister(engine);
00424           free(engine);
00425           sharedMemory->sm_engine_list[i] = NULL;
00426         }
00427         else
00428           empty = false;
00429       }
00430     }
00431 
00432     unlock();
00433 
00434     if (empty) 
00435       removeSharedMemory();
00436   }
00437 
00438   void removeSharedMemory() 
00439   {
00440     const char **prefix = temp_prefix;
00441     char  temp_file[100];
00442 
00443     // Do not remove the sharfed memory until after
00444     // the PBMS engine has shutdown.
00445     if (sharedMemory->sm_callbacks)
00446       return;
00447       
00448     sharedMemory->sm_magic = 0;
00449     free(sharedMemory);
00450     sharedMemory = NULL;
00451     
00452     while (*prefix) {
00453       getTempFileName(temp_file, 100, *prefix, getpid());
00454       unlink(temp_file);
00455       prefix++;
00456     }
00457   }
00458   
00459   bool isPBMSLoaded()
00460   {
00461     PBMSResultRec result;
00462     if (getSharedMemory(false, &result))
00463       return false;
00464       
00465     return (sharedMemory->sm_callbacks != NULL);
00466   }
00467   
00468   int  retainBlob(const char *db_name, const char *tab_name, PBMSBlobURLPtr ret_blob_url, char *blob_url, size_t blob_size, unsigned short col_index, PBMSResultPtr result)
00469   {
00470     int err;
00471     char safe_url[PBMS_BLOB_URL_SIZE+1];
00472 
00473 
00474     if ((err = getSharedMemory(false, result)))
00475       return err;
00476 
00477     if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL)) {
00478     
00479       if (!sharedMemory->sm_callbacks)  {
00480         ret_blob_url->bu_data[0] = 0;
00481         return MS_OK;
00482       }
00483       err = sharedMemory->sm_callbacks->cb_create_blob(built_in, db_name, tab_name, blob_url, blob_size, ret_blob_url, result);
00484       if (err)
00485         return err;
00486         
00487       blob_url = ret_blob_url->bu_data;
00488     } else {
00489       // Make sure the url is a C string:
00490       if (blob_url[blob_size]) {
00491         memcpy(safe_url, blob_url, blob_size);
00492         safe_url[blob_size] = 0;
00493         blob_url = safe_url;
00494       }
00495     }
00496     
00497 
00498     if (!sharedMemory->sm_callbacks) {
00499       result->mr_code = MS_ERR_INCORRECT_URL;
00500       strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "BLOB streaming daemon (PBMS) not installed");
00501       *result->mr_stack = 0;
00502       return MS_ERR_INCORRECT_URL;
00503     }
00504 
00505     return sharedMemory->sm_callbacks->cb_retain_blob(built_in, db_name, tab_name, ret_blob_url, blob_url, col_index, result);
00506   }
00507 
00508   int releaseBlob(const char *db_name, const char *tab_name, char *blob_url, size_t blob_size, PBMSResultPtr result)
00509   {
00510     int err;
00511     char safe_url[PBMS_BLOB_URL_SIZE+1];
00512 
00513     if ((err = getSharedMemory(false, result)))
00514       return err;
00515 
00516     if (!sharedMemory->sm_callbacks)
00517       return MS_OK;
00518 
00519     if (!PBMSBlobURLTools::couldBeURL(blob_url, blob_size, NULL))
00520       return MS_OK;
00521 
00522     if (blob_url[blob_size]) {
00523       memcpy(safe_url, blob_url, blob_size);
00524       safe_url[blob_size] = 0;
00525       blob_url = safe_url;
00526     }
00527     
00528     return sharedMemory->sm_callbacks->cb_release_blob(built_in, db_name, tab_name, blob_url, result);
00529   }
00530 
00531   int dropTable(const char *db_name, const char *tab_name, PBMSResultPtr result)
00532   {
00533     int err;
00534 
00535     if ((err = getSharedMemory(false, result)))
00536       return err;
00537 
00538     if (!sharedMemory->sm_callbacks)
00539       return MS_OK;
00540       
00541     return sharedMemory->sm_callbacks->cb_drop_table(built_in, db_name, tab_name, result);
00542   }
00543 
00544   int renameTable(const char *db_name, const char *from_table, const char *to_db, const char *to_table, PBMSResultPtr result)
00545   {
00546     int err;
00547 
00548     if ((err = getSharedMemory(false, result)))
00549       return err;
00550 
00551     if (!sharedMemory->sm_callbacks)
00552       return MS_OK;
00553       
00554     return sharedMemory->sm_callbacks->cb_rename_table(built_in, db_name, from_table, to_db, to_table, result);
00555   }
00556 
00557   void completed(int ok)
00558   {
00559     PBMSResultRec result;
00560 
00561     if (getSharedMemory(false, &result))
00562       return;
00563 
00564     if (!sharedMemory->sm_callbacks)
00565       return;
00566       
00567     sharedMemory->sm_callbacks->cb_completed(built_in, ok);
00568   }
00569   
00570   volatile PBMSSharedMemoryPtr sharedMemory;
00571 
00572 private:
00573   int getSharedMemory(bool create, PBMSResultPtr result)
00574   {
00575     int   tmp_f;
00576     int   r;
00577     char  temp_file[100];
00578     const char  **prefix = temp_prefix;
00579 
00580     if (sharedMemory)
00581       return MS_OK;
00582 
00583     while (*prefix) {
00584       getTempFileName(temp_file, 100, *prefix, getpid());
00585       tmp_f = open(temp_file, O_RDWR | (create ? O_CREAT : 0), SH_MASK);
00586       if (tmp_f == -1)
00587         return setOSResult(errno, "open", temp_file, result);
00588 
00589       r = lseek(tmp_f, 0, SEEK_SET);
00590       if (r == -1) {
00591         close(tmp_f);
00592         return setOSResult(errno, "lseek", temp_file, result);
00593       }
00594       ssize_t tfer;
00595       char buffer[100];
00596       
00597       tfer = read(tmp_f, buffer, 100);
00598       if (tfer == -1) {
00599         close(tmp_f);
00600         return setOSResult(errno, "read", temp_file, result);
00601       }
00602 
00603       buffer[tfer] = 0;
00604       sscanf(buffer, "%p", (void**) &sharedMemory);
00605       if (!sharedMemory || sharedMemory->sm_magic != MS_SHARED_MEMORY_MAGIC) {
00606         if (!create)
00607           return MS_OK;
00608 
00609         sharedMemory = (PBMSSharedMemoryPtr) calloc(1, sizeof(PBMSSharedMemoryRec));
00610         sharedMemory->sm_magic = MS_SHARED_MEMORY_MAGIC;
00611         sharedMemory->sm_version = MS_SHARED_MEMORY_VERSION;
00612         sharedMemory->sm_list_size = MS_ENGINE_LIST_SIZE;
00613 
00614         r = lseek(tmp_f, 0, SEEK_SET);
00615         if (r == -1) {
00616           close(tmp_f);
00617           return setOSResult(errno, "fseek", temp_file, result);
00618         }
00619 
00620         snprintf(buffer, 100, "%p", (void*) sharedMemory);
00621         tfer = write(tmp_f, buffer, strlen(buffer));
00622         if (tfer != (ssize_t) strlen(buffer)) {
00623           close(tmp_f);
00624           return setOSResult(errno, "write", temp_file, result);
00625         }
00626         r = fsync(tmp_f);
00627         if (r == -1) {
00628           close(tmp_f);
00629           return setOSResult(errno, "fsync", temp_file, result);
00630         }
00631       }
00632       else if (sharedMemory->sm_version != MS_SHARED_MEMORY_VERSION) {
00633         close(tmp_f);
00634         result->mr_code = -1000;
00635         *result->mr_stack = 0;
00636         strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Shared memory version: ");    
00637         strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, sharedMemory->sm_version);   
00638         strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ", does not match engine shared memory version: ");    
00639         strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, MS_SHARED_MEMORY_VERSION);   
00640         strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ".");    
00641         return MS_ERR_ENGINE;
00642       }
00643       close(tmp_f);
00644       
00645       // For backward compatability we need to create the old versions but we only need to read the current version.
00646       if (create)
00647         prefix++;
00648       else
00649         break;
00650     }
00651     return MS_OK;
00652   }
00653 
00654   void strcpy(size_t size, char *to, const char *from)
00655   {
00656     if (size > 0) {
00657       size--;
00658       while (*from && size--)
00659         *to++ = *from++;
00660       *to = 0;
00661     }
00662   }
00663 
00664   void strcat(size_t size, char *to, const char *from)
00665   {
00666     while (*to && size--) to++;
00667     strcpy(size, to, from);
00668   }
00669 
00670   void strcat(size_t size, char *to, int val)
00671   {
00672     char buffer[20];
00673 
00674     snprintf(buffer, 20, "%d", val);
00675     strcat(size, to, buffer);
00676   }
00677 
00678   int setOSResult(int err, const char *func, char *file, PBMSResultPtr result) {
00679     char *msg;
00680 
00681     result->mr_code = err;
00682     *result->mr_stack = 0;
00683     strcpy(MS_RESULT_MESSAGE_SIZE, result->mr_message, "System call ");   
00684     strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, func);   
00685     strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "() failed on ");    
00686     strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, file);   
00687     strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ": ");   
00688 
00689 #ifdef XT_WIN
00690     if (FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, err, 0, iMessage + strlen(iMessage), MS_RESULT_MESSAGE_SIZE - strlen(iMessage), NULL)) {
00691       char *ptr;
00692 
00693       ptr = &iMessage[strlen(iMessage)];
00694       while (ptr-1 > err_msg) {
00695         if (*(ptr-1) != '\n' && *(ptr-1) != '\r' && *(ptr-1) != '.')
00696           break;
00697         ptr--;
00698       }
00699       *ptr = 0;
00700 
00701       strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
00702       strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
00703       strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
00704       return MS_ERR_ENGINE;
00705     }
00706 #endif
00707 
00708     msg = strerror(err);
00709     if (msg) {
00710       strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, msg);
00711       strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, " (");
00712       strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
00713       strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, ")");
00714     }
00715     else {
00716       strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, "Unknown OS error code ");
00717       strcat(MS_RESULT_MESSAGE_SIZE, result->mr_message, err);
00718     }
00719 
00720     return MS_ERR_ENGINE;
00721   }
00722 
00723   void getTempFileName(char *temp_file, int buffer_size, const char * prefix, int pid)
00724   {
00725     snprintf(temp_file, buffer_size, "/tmp/%s%d", prefix,  pid);
00726   }
00727 
00728   bool startsWith(const char *cstr, const char *w_cstr)
00729   {
00730     while (*cstr && *w_cstr) {
00731       if (*cstr != *w_cstr)
00732         return false;
00733       cstr++;
00734       w_cstr++;
00735     }
00736     return *cstr || !*w_cstr;
00737   }
00738 
00739   void deleteTempFiles()
00740   {
00741     struct dirent entry;
00742     struct dirent *result;
00743     DIR       *odir;
00744     int       err;
00745     char      temp_file[100];
00746 
00747     if (!(odir = opendir("/tmp/")))
00748       return;
00749     err = readdir_r(odir, &entry, &result);
00750     while (!err && result) {
00751       const char **prefix = temp_prefix;
00752       
00753       while (*prefix) {
00754         if (startsWith(entry.d_name, *prefix)) {
00755           int pid = atoi(entry.d_name + strlen(*prefix));
00756           
00757           /* If the process does not exist: */
00758           if (kill(pid, 0) == -1 && errno == ESRCH) {
00759             getTempFileName(temp_file, 100, *prefix, pid);
00760             unlink(temp_file);
00761           }
00762         }
00763         prefix++;
00764       }
00765       
00766       err = readdir_r(odir, &entry, &result);
00767     }
00768     closedir(odir);
00769   }
00770 };
00771 #endif // PBMS_API
00772 
00773 /*
00774  * The following is a low level API for accessing blobs directly.
00775  */
00776  
00777 
00778 /*
00779  * Any threads using the direct blob access API must first register them selves with the
00780  * blob streaming engine before using the blob access functions. This is done by calling
00781  * PBMSInitBlobStreamingThread(). Call PBMSDeinitBlobStreamingThread() after the thread is
00782  * done using the direct blob access API
00783  */
00784  
00785 /* 
00786 * PBMSInitBlobStreamingThread(): Returns a pointer to a blob streaming thread.
00787 */
00788 extern void *PBMSInitBlobStreamingThread(char *thread_name, PBMSResultPtr result);
00789 extern void PBMSDeinitBlobStreamingThread(void *v_bs_thread);
00790 
00791 /* 
00792 * PBMSGetError():Gets the last error reported by a blob streaming thread.
00793 */
00794 extern void PBMSGetError(void *v_bs_thread, PBMSResultPtr result);
00795 
00796 /* 
00797 * PBMSCreateBlob():Creates a new blob in the database of the given size.
00798 */
00799 extern bool PBMSCreateBlob(PBMSBlobIDPtr blob_id, char *database_name, u_int64_t size);
00800 
00801 /* 
00802 * PBMSWriteBlob():Write the data to the blob in one or more chunks. The total size of all the chuncks of 
00803 * data written to the blob must match the size specified when the blob was created.
00804 */
00805 extern bool PBMSWriteBlob(PBMSBlobIDPtr blob_id, char *data, size_t size, size_t offset);
00806 
00807 /* 
00808 * PBMSReadBlob():Read the blob data out of the blob in one or more chunks.
00809 */
00810 extern bool PBMSReadBlob(PBMSBlobIDPtr blob_id, char *buffer, size_t *size, size_t offset);
00811 
00812 /*
00813 * PBMSIDToURL():Convert a blob id to a blob URL. The 'url' buffer must be atleast  PBMS_BLOB_URL_SIZE bytes in size.
00814 */
00815 extern bool PBMSIDToURL(PBMSBlobIDPtr blob_id, char *url);
00816 
00817 /*
00818 * PBMSIDToURL():Convert a blob URL to a blob ID.
00819 */
00820 extern bool PBMSURLToID(char *url, PBMSBlobIDPtr blob_id);
00821 #endif //DRIZZLED 
00822 
00823 
00824 #endif //__PBMS_H__