Drizzled Public API Documentation

join_cache.cc

Go to the documentation of this file.
00001 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2008-2009 Sun Microsystems, Inc.
00005  *
00006  *  This program is free software; you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation; either version 2 of the License, or
00009  *  (at your option) any later version.
00010  *
00011  *  This program is distributed in the hope that it will be useful,
00012  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *  GNU General Public License for more details.
00015  *
00016  *  You should have received a copy of the GNU General Public License
00017  *  along with this program; if not, write to the Free Software
00018  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00019  */
00020 
00030 #include <config.h>
00031 
00032 #include <drizzled/sql_select.h> /* include join.h */
00033 #include <drizzled/field/blob.h>
00034 #include <drizzled/drizzled.h>
00035 #include <drizzled/internal/my_sys.h>
00036 #include <drizzled/table.h>
00037 #include <drizzled/session.h>
00038 
00039 #include <algorithm>
00040 
00041 using namespace std;
00042 
00043 namespace drizzled
00044 {
00045 
00046 static uint32_t used_blob_length(CacheField **ptr);
00047 
00048 static uint32_t used_blob_length(CacheField **ptr)
00049 {
00050   uint32_t length,blob_length;
00051   for (length=0 ; *ptr ; ptr++)
00052   {
00053     (*ptr)->blob_length=blob_length=(*ptr)->blob_field->get_length();
00054     length+=blob_length;
00055     (*ptr)->blob_field->get_ptr(&(*ptr)->str);
00056   }
00057   return length;
00058 }
00059 
00060 /*****************************************************************************
00061   Fill join cache with packed records
00062   Records are stored in tab->cache.buffer and last record in
00063   last record is stored with pointers to blobs to support very big
00064   records
00065 ******************************************************************************/
00066 int join_init_cache(Session *session, JoinTable *tables, uint32_t table_count)
00067 {
00068   unsigned int length, blobs;
00069   size_t size;
00070   CacheField *copy,**blob_ptr;
00071   JoinCache  *cache;
00072   JoinTable *join_tab;
00073 
00074   cache= &tables[table_count].cache;
00075   cache->fields=blobs=0;
00076 
00077   join_tab= tables;
00078   for (unsigned int i= 0; i < table_count ; i++, join_tab++)
00079   {
00080     if (!join_tab->used_fieldlength)    /* Not calced yet */
00081       calc_used_field_length(session, join_tab);
00082     cache->fields+=join_tab->used_fields;
00083     blobs+=join_tab->used_blobs;
00084 
00085     /* SemiJoinDuplicateElimination: reserve space for rowid */
00086     if (join_tab->rowid_keep_flags & JoinTable::KEEP_ROWID)
00087     {
00088       cache->fields++;
00089       join_tab->used_fieldlength += join_tab->table->cursor->ref_length;
00090     }
00091   }
00092   if (!(cache->field=(CacheField*)
00093         memory::sql_alloc(sizeof(CacheField)*(cache->fields+table_count*2)+(blobs+1)* sizeof(CacheField*))))
00094   {
00095     size= cache->end - cache->buff;
00096     global_join_buffer.sub(size);
00097     free((unsigned char*) cache->buff);
00098     cache->buff=0;
00099     return(1);
00100   }
00101   copy=cache->field;
00102   blob_ptr=cache->blob_ptr=(CacheField**)
00103     (cache->field+cache->fields+table_count*2);
00104 
00105   length=0;
00106   for (unsigned int i= 0 ; i < table_count ; i++)
00107   {
00108     uint32_t null_fields=0, used_fields;
00109     Field **f_ptr,*field;
00110     for (f_ptr= tables[i].table->getFields(), used_fields= tables[i].used_fields; used_fields; f_ptr++)
00111     {
00112       field= *f_ptr;
00113       if (field->isReadSet())
00114       {
00115         used_fields--;
00116         length+=field->fill_cache_field(copy);
00117         if (copy->blob_field)
00118           (*blob_ptr++)=copy;
00119         if (field->maybe_null())
00120           null_fields++;
00121         copy->get_rowid= NULL;
00122         copy++;
00123       }
00124     }
00125     /* Copy null bits from table */
00126     if (null_fields && tables[i].table->getNullFields())
00127     {           /* must copy null bits */
00128       copy->str= tables[i].table->null_flags;
00129       copy->length= tables[i].table->getShare()->null_bytes;
00130       copy->strip=0;
00131       copy->blob_field=0;
00132       copy->get_rowid= NULL;
00133       length+=copy->length;
00134       copy++;
00135       cache->fields++;
00136     }
00137     /* If outer join table, copy null_row flag */
00138     if (tables[i].table->maybe_null)
00139     {
00140       copy->str= (unsigned char*) &tables[i].table->null_row;
00141       copy->length=sizeof(tables[i].table->null_row);
00142       copy->strip=0;
00143       copy->blob_field=0;
00144       copy->get_rowid= NULL;
00145       length+=copy->length;
00146       copy++;
00147       cache->fields++;
00148     }
00149     /* SemiJoinDuplicateElimination: Allocate space for rowid if needed */
00150     if (tables[i].rowid_keep_flags & JoinTable::KEEP_ROWID)
00151     {
00152       copy->str= tables[i].table->cursor->ref;
00153       copy->length= tables[i].table->cursor->ref_length;
00154       copy->strip=0;
00155       copy->blob_field=0;
00156       copy->get_rowid= NULL;
00157       if (tables[i].rowid_keep_flags & JoinTable::CALL_POSITION)
00158       {
00159         /* We will need to call h->position(): */
00160         copy->get_rowid= tables[i].table;
00161         /* And those after us won't have to: */
00162         tables[i].rowid_keep_flags&=  ~((int)JoinTable::CALL_POSITION);
00163       }
00164       copy++;
00165     }
00166   }
00167 
00168   cache->length= length+blobs*sizeof(char*);
00169   cache->blobs= blobs;
00170   *blob_ptr= NULL;          /* End sequentel */
00171   size= max((size_t) session->variables.join_buff_size, (size_t)cache->length);
00172   if (not global_join_buffer.add(size))
00173   {
00174     my_error(ER_OUT_OF_GLOBAL_JOINMEMORY, MYF(ME_ERROR+ME_WAITTANG));
00175     return 1;
00176   }
00177   if (!(cache->buff= (unsigned char*) malloc(size)))
00178     return 1;
00179   cache->end= cache->buff+size;
00180   cache->reset_cache_write();
00181 
00182   return 0;
00183 }
00184 
00185 bool JoinCache::store_record_in_cache()
00186 {
00187   JoinCache *cache= this;
00188   unsigned char *local_pos;
00189   CacheField *copy,*end_field;
00190   bool last_record;
00191 
00192   local_pos= cache->pos;
00193   end_field= cache->field+cache->fields;
00194 
00195   {
00196     uint32_t local_length;
00197 
00198     local_length= cache->length;
00199     if (cache->blobs)
00200     {
00201       local_length+= used_blob_length(cache->blob_ptr);
00202     }
00203 
00204     if ((last_record= (local_length + cache->length > (size_t) (cache->end - local_pos))))
00205     {
00206       cache->ptr_record= cache->records;
00207     }
00208   }
00209 
00210   /*
00211     There is room in cache. Put record there
00212   */
00213   cache->records++;
00214   for (copy= cache->field; copy < end_field; copy++)
00215   {
00216     if (copy->blob_field)
00217     {
00218       if (last_record)
00219       {
00220         copy->blob_field->get_image(local_pos, copy->length+sizeof(char*), copy->blob_field->charset());
00221         local_pos+= copy->length+sizeof(char*);
00222       }
00223       else
00224       {
00225         copy->blob_field->get_image(local_pos, copy->length, // blob length
00226             copy->blob_field->charset());
00227         memcpy(local_pos + copy->length,copy->str,copy->blob_length);  // Blob data
00228         local_pos+= copy->length+copy->blob_length;
00229       }
00230     }
00231     else
00232     {
00233       // SemiJoinDuplicateElimination: Get the rowid into table->ref:
00234       if (copy->get_rowid)
00235         copy->get_rowid->cursor->position(copy->get_rowid->getInsertRecord());
00236 
00237       if (copy->strip)
00238       {
00239         unsigned char *str, *local_end;
00240         for (str= copy->str,local_end= str+copy->length; local_end > str && local_end[-1] == ' '; local_end--) {}
00241 
00242         uint32_t local_length= (uint32_t) (local_end - str);
00243         memcpy(local_pos+2, str, local_length);
00244         int2store(local_pos, local_length);
00245         local_pos+= local_length+2;
00246       }
00247       else
00248       {
00249         memcpy(local_pos, copy->str, copy->length);
00250         local_pos+= copy->length;
00251       }
00252     }
00253   }
00254   cache->pos= local_pos;
00255   return last_record || (size_t) (cache->end - local_pos) < cache->length;
00256 }
00257 
00258 void JoinCache::reset_cache_read()
00259 {
00260   record_nr= 0;
00261   pos= buff;
00262 }
00263 
00264 void JoinCache::reset_cache_write()
00265 {
00266   reset_cache_read();
00267   records= 0;
00268   ptr_record= UINT32_MAX;
00269 }
00270 
00275 } /* namespace drizzled */