Drizzled Public API Documentation

memcached_qc.cc

00001 /* 
00002  * Copyright (C) 2010 Djellel Eddine Difallah
00003  * All rights reserved.
00004  *
00005  * Redistribution and use in source and binary forms, with or without
00006  * modification, are permitted provided that the following conditions are met:
00007  *
00008  *   * Redistributions of source code must retain the above copyright notice,
00009  *     this list of conditions and the following disclaimer.
00010  *   * Redistributions in binary form must reproduce the above copyright notice,
00011  *     this list of conditions and the following disclaimer in the documentation
00012  *     and/or other materials provided with the distribution.
00013  *   * Neither the name of Djellel Eddine Difallah nor the names of its contributors
00014  *     may be used to endorse or promote products derived from this software
00015  *     without specific prior written permission.
00016  *
00017  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00018  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00019  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00020  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
00021  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
00022  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
00023  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00024  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
00025  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
00026  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
00027  * THE POSSIBILITY OF SUCH DAMAGE.
00028  */
00029 
00030 
00031 
00032 #include <config.h>
00033 
00034 #include <drizzled/plugin.h>
00035 #include <drizzled/session.h>
00036 #include <drizzled/select_send.h>
00037 #include <drizzled/item/null.h>
00038 
00039 #include <gcrypt.h>
00040 #include <string>
00041 #include <iostream>
00042 #include <vector>
00043 
00044 #include "memcached_qc.h"
00045 #include "query_cache_udf_tools.h"
00046 #include "data_dictionary_schema.h"
00047 #include "invalidator.h"
00048 #include <boost/program_options.hpp>
00049 #include <drizzled/module/option_map.h>
00050 
00051 using namespace drizzled;
00052 using namespace std;
00053 namespace po= boost::program_options;
00054 
00055 static uint64_constraint expiry_time;
00056 
00057 memcache::Memcache* MemcachedQueryCache::client;
00058 std::string MemcachedQueryCache::memcached_servers;
00059 bool sysvar_memcached_qc_enable;
00060 
00061 bool MemcachedQueryCache::isSelect(string query)
00062 {
00063   uint i= 0;
00064   /*
00065    Skip '(' characters in queries like following:
00066    (select a from t1) union (select a from t1);
00067   */
00068   const char* sql= query.c_str();
00069   while (sql[i] == '(')
00070     i++;
00071   /*
00072    Test if the query is a SELECT
00073    (pre-space is removed in dispatch_command).
00074     First '/' looks like comment before command it is not
00075     frequently appeared in real life, consequently we can
00076     check all such queries, too.
00077   */
00078   if ((my_toupper(system_charset_info, sql[i])     != 'S' ||
00079        my_toupper(system_charset_info, sql[i + 1]) != 'E' ||
00080        my_toupper(system_charset_info, sql[i + 2]) != 'L') &&
00081       sql[i] != '/')
00082   {
00083     return false;
00084   }
00085   return true;
00086 }
00087 
00088 bool MemcachedQueryCache::doIsCached(Session *session)
00089 {
00090   if (sysvar_memcached_qc_enable && isSelect(session->query))
00091   {
00092     /* ToDo: Check against the cache content */
00093     string query= session->query + *session->schema();
00094     char* key= md5_key(query.c_str());
00095     if(queryCacheService.isCached(key))
00096     {
00097      session->query_cache_key.assign(key);
00098      free(key);
00099      return true;
00100     }
00101     free(key);
00102   }
00103   return false;
00104 }
00105 
00106 bool MemcachedQueryCache::doSendCachedResultset(Session *session)
00107 {
00113   LEX *lex= session->lex;
00114   register Select_Lex *select_lex= &lex->select_lex;
00115   select_result *result=lex->result;
00116   if (not result && not (result= new select_send()))
00117     return true;
00118   result->prepare(select_lex->item_list, select_lex->master_unit());
00119 
00120   /* fetching the resultset from memcached */  
00121   vector<char> raw_resultset; 
00122   getClient()->get(session->query_cache_key, raw_resultset);
00123   if(raw_resultset.empty())
00124     return false;
00125   message::Resultset resultset_message;
00126   if (not resultset_message.ParseFromString(string(raw_resultset.begin(),raw_resultset.end())))
00127     return false;
00128   List<Item> item_list;
00129 
00130   /* Send the fields */
00131   message::SelectHeader header= resultset_message.select_header();
00132   size_t num_fields= header.field_meta_size();
00133   for (size_t y= 0; y < num_fields; y++)
00134   {
00135     message::FieldMeta field= header.field_meta(y);
00136     string value=field.field_alias();
00137     item_list.push_back(new Item_string(value.c_str(), value.length(), system_charset_info));
00138   }
00139   result->send_fields(item_list);
00140   item_list.empty();
00141 
00142   /* Send the Data */
00143   message::SelectData data= resultset_message.select_data();
00144   session->limit_found_rows= 0; 
00145   for (int j= 0; j < data.record_size(); j++)
00146   {
00147     message::SelectRecord record= data.record(j);
00148     for (size_t y= 0; y < num_fields; y++)
00149     {
00150       if(record.is_null(y))
00151       {
00152         item_list.push_back(new Item_null());
00153       }
00154       else
00155       {
00156         string value=record.record_value(y);
00157         item_list.push_back(new Item_string(value.c_str(), value.length(), system_charset_info));
00158       }
00159     }
00160     result->send_data(item_list);
00161     item_list.empty();
00162   }
00163   /* Send End of file */
00164   result->send_eof();
00165   /* reset the cache key at the session level */
00166   session->query_cache_key= "";
00167   return false;
00168 }
00169 
00170 /* Check if the tables in the query do not contain
00171  * Data_dictionary
00172  */
00173 void MemcachedQueryCache::checkTables(Session *session, TableList* in_table)
00174 {
00175   for (TableList* tmp_table= in_table; tmp_table; tmp_table= tmp_table->next_global)
00176   {
00177     if (strcasecmp(tmp_table->db, "DATA_DICTIONARY") == 0)
00178     {
00179       session->lex().setCacheable(false);
00180       break;
00181     }
00182   } 
00183 }
00184 
00185 /* init the current resultset in the session
00186  * set the header message (hashkey= sql + schema)
00187  */
00188 bool MemcachedQueryCache::doPrepareResultset(Session *session)
00189 {   
00190   checkTables(session, session->lex().query_tables);
00191   if (sysvar_memcached_qc_enable && session->lex().isCacheable())
00192   {
00193     /* Prepare and set the key for the session */
00194     string query= session->query + *session->schema();
00195     char* key= md5_key(query.c_str());
00196 
00197     /* make sure only one thread will cache the query 
00198      * if executed concurently
00199      */
00200     pthread_mutex_lock(&mutex);
00201 
00202     if(not queryCacheService.isCached(key))
00203     {
00204       session->query_cache_key.assign(key);
00205       free(key);
00206     
00207       /* create the Resultset */
00208       message::Resultset *resultset= queryCacheService.setCurrentResultsetMessage(session);
00209   
00210       /* setting the resultset infos */
00211       resultset->set_key(session->query_cache_key);
00212       resultset->set_schema(*session->schema());
00213       resultset->set_sql(session->query);
00214       pthread_mutex_unlock(&mutex);
00215       
00216       return true;
00217     }
00218     pthread_mutex_unlock(&mutex);
00219     free(key);
00220   }
00221   return false;
00222 }
00223 
00224 /* Send the current resultset to memcached
00225  * Reset the current resultset of the session
00226  */
00227 bool MemcachedQueryCache::doSetResultset(Session *session)
00228 {   
00229   message::Resultset *resultset= session->getResultsetMessage();
00230   if (sysvar_memcached_qc_enable && (not session->is_error()) && resultset != NULL && session->lex().isCacheable())
00231   {
00232     /* Generate the final Header */
00233     queryCacheService.setResultsetHeader(*resultset, session, session->lex().query_tables);
00234     /* serialize the Resultset Message */
00235     std::string output;
00236     resultset->SerializeToString(&output);
00237 
00238     /* setting to memecahced */
00239     time_t expiry= expiry_time;  // ToDo: add a user defined expiry
00240     uint32_t flags= 0;
00241     std::vector<char> raw(output.size());
00242     memcpy(&raw[0], output.c_str(), output.size());
00243     if(not client->set(session->query_cache_key, raw, expiry, flags))
00244     {
00245       delete resultset;
00246       session->resetResultsetMessage();
00247       return false;
00248     }
00249     
00250     /* Clear the Selectdata from the Resultset to be localy cached
00251      * Comment if Keeping the data in the header is needed
00252      */
00253     resultset->clear_select_data();
00254 
00255     /* add the Resultset (including the header) to the hash 
00256      * This is done after the memcached set
00257      */
00258     queryCacheService.cache[session->query_cache_key]= *resultset;
00259 
00260     /* endup the current statement */
00261     delete resultset;
00262     session->resetResultsetMessage();
00263     return true;
00264   }
00265   return false;
00266 }
00267 
00268 /* Adds a record (List<Item>) to the current Resultset.SelectData
00269  */
00270 bool MemcachedQueryCache::doInsertRecord(Session *session, List<Item> &list)
00271 {   
00272   if(sysvar_memcached_qc_enable)
00273   {
00274     queryCacheService.addRecord(session, list);
00275     return true;
00276   }
00277   return false;
00278 }
00279 
00280 char* MemcachedQueryCache::md5_key(const char *str)
00281 {
00282   int msg_len= strlen(str);
00283   /* Length of resulting sha1 hash - gcry_md_get_algo_dlen
00284   * returns digest lenght for an algo */
00285   int hash_len= gcry_md_get_algo_dlen( GCRY_MD_MD5 );
00286   /* output sha1 hash - this will be binary data */
00287   unsigned char* hash= (unsigned char*) malloc(hash_len);
00288   /* output sha1 hash - converted to hex representation
00289   * 2 hex digits for every byte + 1 for trailing \0 */
00290   char *out= (char *) malloc( sizeof(char) * ((hash_len*2)+1) );
00291   char *p= out;
00292   /* calculate the SHA1 digest. This is a bit of a shortcut function
00293   * most gcrypt operations require the creation of a handle, etc. */
00294   gcry_md_hash_buffer( GCRY_MD_MD5, hash, str , msg_len );
00295   /* Convert each byte to its 2 digit ascii
00296   * hex representation and place in out */
00297   int i;
00298   for ( i = 0; i < hash_len; i++, p += 2 )
00299   {
00300     snprintf ( p, 3, "%02x", hash[i] );
00301   }
00302   free(hash);
00303   return out;
00304 }
00305 
00307 extern plugin::Create_function<PrintQueryCacheMetaFunction> *print_query_cache_meta_func_factory;
00308 plugin::Create_function<QueryCacheFlushFunction> *query_cache_flush_func= NULL;
00309 
00311 static QueryCacheTool *query_cache_tool;
00312 static QueryCacheStatusTool *query_cache_status;
00313 static CachedTables *query_cached_tables;
00314 
00315 static int init(module::Context &context)
00316 {
00317   const module::option_map &vm= context.getOptions();
00318 
00319   MemcachedQueryCache* memc= new MemcachedQueryCache("Memcached_Query_Cache", vm["servers"].as<string>());
00320   context.add(memc);
00321 
00322   Invalidator* invalidator= new Invalidator("Memcached_Query_Cache_Invalidator");
00323   context.add(invalidator);
00324   ReplicationServices &replication_services= ReplicationServices::singleton();
00325   string replicator_name("default_replicator");
00326   replication_services.attachApplier(invalidator, replicator_name);
00327   
00328   /* Setup the module's UDFs */
00329   print_query_cache_meta_func_factory=
00330     new plugin::Create_function<PrintQueryCacheMetaFunction>("print_query_cache_meta");
00331   context.add(print_query_cache_meta_func_factory);
00332   
00333   query_cache_flush_func= new plugin::Create_function<QueryCacheFlushFunction>("query_cache_flush");
00334   context.add(query_cache_flush_func);
00335 
00336   /* Setup the module Data dict and status infos */
00337   query_cache_tool= new (nothrow) QueryCacheTool();
00338   context.add(query_cache_tool);
00339   query_cache_status= new (nothrow) QueryCacheStatusTool();
00340   context.add(query_cache_status);
00341   query_cached_tables= new (nothrow) CachedTables();
00342   context.add(query_cached_tables);
00343   
00344   context.registerVariable(new sys_var_constrained_value<uint64_t>("expiry", expiry_time));
00345   context.registerVariable(new sys_var_const_string_val("servers", vm["servers"].as<string>()));
00346   context.registerVariable(new sys_var_bool_ptr("enable", &sysvar_memcached_qc_enable));
00347   return 0;
00348 }
00349 
00350 QueryCacheStatusTool::Generator::Generator(drizzled::Field **fields) :
00351   plugin::TableFunction::Generator(fields)
00352 { 
00353   status_var_ptr= vars;
00354 }
00355 
00356 bool QueryCacheStatusTool::Generator::populate()
00357 {
00358   if (*status_var_ptr)
00359   {
00360     string return_value;
00361 
00362     /* VARIABLE_NAME */
00363     push((*status_var_ptr)->name);
00364     if (strcmp((**status_var_ptr).name, "enable") == 0)
00365       return_value= sysvar_memcached_qc_enable ? "ON" : "OFF";
00366     if (strcmp((**status_var_ptr).name, "servers") == 0) 
00367       return_value= MemcachedQueryCache::getServers();
00368     if (strcmp((**status_var_ptr).name, "expiry") == 0)
00369       return_value= boost::lexical_cast<std::string>(expiry_time);
00370 
00371     /* VARIABLE_VALUE */
00372     if (return_value.length())
00373       push(return_value);
00374     else 
00375       push(" ");
00376 
00377     status_var_ptr++;
00378 
00379     return true;
00380   }
00381   return false;
00382 }
00383 
00384 static void init_options(drizzled::module::option_context &context)
00385 {
00386   context("servers",
00387           po::value<string>()->default_value("127.0.0.1:11211"),
00388           _("List of memcached servers."));
00389   context("expiry",
00390           po::value<uint64_constraint>(&expiry_time)->default_value(1000),
00391           _("Expiry time of memcached entries"));
00392   context("enable",
00393           po::value<bool>(&sysvar_memcached_qc_enable)->default_value(false)->zero_tokens(),
00394           _("Enable Memcached Query Cache"));
00395 }
00396 
00397 DRIZZLE_DECLARE_PLUGIN
00398 {
00399   DRIZZLE_VERSION_ID,
00400   "Query_Cache",
00401   "0.3",
00402   "Djellel Eddine Difallah",
00403   "Caches Select resultsets in Memcached",
00404   PLUGIN_LICENSE_BSD,
00405   init,   /* Plugin Init      */
00406   NULL, /* depends */
00407   init_options    /* config options   */
00408 }
00409 DRIZZLE_DECLARE_PLUGIN_END;