Drizzled Public API Documentation

records.cc

Go to the documentation of this file.
00001 /* Copyright (C) 2000-2006 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00022 #include <config.h>
00023 
00024 #include <drizzled/drizzled.h>
00025 #include <drizzled/error.h>
00026 #include <drizzled/internal/iocache.h>
00027 #include <drizzled/internal/my_sys.h>
00028 #include <drizzled/optimizer/range.h>
00029 #include <drizzled/plugin/storage_engine.h>
00030 #include <drizzled/records.h>
00031 #include <drizzled/session.h>
00032 #include <drizzled/table.h>
00033 
00034 namespace drizzled
00035 {
00036 
00037 static int rr_sequential(ReadRecord *info);
00038 static int rr_quick(ReadRecord *info);
00039 static int rr_from_tempfile(ReadRecord *info);
00040 static int rr_unpack_from_tempfile(ReadRecord *info);
00041 static int rr_unpack_from_buffer(ReadRecord *info);
00042 static int rr_from_pointers(ReadRecord *info);
00043 static int rr_from_cache(ReadRecord *info);
00044 static int rr_cmp(unsigned char *a,unsigned char *b);
00045 static int rr_index_first(ReadRecord *info);
00046 static int rr_index(ReadRecord *info);
00047 
00048 void ReadRecord::init_reard_record_sequential()
00049 {
00050   read_record= rr_sequential;
00051 }
00052 
00053 int ReadRecord::init_read_record_idx(Session *,
00054                                      Table *table_arg,
00055                                      bool print_error_arg,
00056                                      uint32_t idx)
00057 {
00058   table_arg->emptyRecord();
00059   table= table_arg;
00060   cursor=  table->cursor;
00061   record= table->getInsertRecord();
00062   print_error= print_error_arg;
00063 
00064   table->status=0;      /* And it's always found */
00065   if (not table->cursor->inited)
00066   {
00067     int error= table->cursor->startIndexScan(idx, 1);
00068     if (error != 0)
00069       return error;
00070   }
00071   /* read_record will be changed to rr_index in rr_index_first */
00072   read_record= rr_index_first;
00073 
00074   return 0;
00075 }
00076 
00077 
00078 int ReadRecord::init_read_record(Session *session_arg,
00079                                  Table *table_arg,
00080                                  optimizer::SqlSelect *select_arg,
00081                                  int use_record_cache,
00082                                  bool print_error_arg)
00083 {
00084   internal::IO_CACHE *tempfile;
00085   int error= 0;
00086 
00087   session= session_arg;
00088   table= table_arg;
00089   cursor= table->cursor;
00090   forms= &table;    /* Only one table */
00091 
00092   if (table->sort.addon_field)
00093   {
00094     rec_buf= table->sort.addon_buf;
00095     ref_length= table->sort.addon_length;
00096   }
00097   else
00098   {
00099     table->emptyRecord();
00100     record= table->getInsertRecord();
00101     ref_length= table->cursor->ref_length;
00102   }
00103   select= select_arg;
00104   print_error= print_error_arg;
00105   ignore_not_found_rows= 0;
00106   table->status=0;      /* And it's always found */
00107 
00108   if (select && my_b_inited(select->file))
00109   {
00110     tempfile= select->file;
00111   }
00112   else
00113   {
00114     tempfile= table->sort.io_cache;
00115   }
00116 
00117   if (tempfile && my_b_inited(tempfile)) // Test if ref-records was used
00118   {
00119     read_record= (table->sort.addon_field ?
00120                   rr_unpack_from_tempfile : rr_from_tempfile);
00121 
00122     io_cache=tempfile;
00123     io_cache->reinit_io_cache(internal::READ_CACHE,0L,0,0);
00124     ref_pos=table->cursor->ref;
00125     if (!table->cursor->inited)
00126     {
00127       error= table->cursor->startTableScan(0);
00128       if (error != 0)
00129         return error;
00130     }
00131 
00132     /*
00133       table->sort.addon_field is checked because if we use addon fields,
00134       it doesn't make sense to use cache - we don't read from the table
00135       and table->sort.io_cache is read sequentially
00136     */
00137     if (!table->sort.addon_field &&
00138         session->variables.read_rnd_buff_size &&
00139         !(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) &&
00140         (table->db_stat & HA_READ_ONLY ||
00141         table->reginfo.lock_type <= TL_READ_NO_INSERT) &&
00142         (uint64_t) table->getShare()->getRecordLength() * (table->cursor->stats.records+
00143                                                 table->cursor->stats.deleted) >
00144         (uint64_t) MIN_FILE_LENGTH_TO_USE_ROW_CACHE &&
00145         io_cache->end_of_file/ref_length * table->getShare()->getRecordLength() >
00146         (internal::my_off_t) MIN_ROWS_TO_USE_TABLE_CACHE &&
00147         !table->getShare()->blob_fields &&
00148         ref_length <= MAX_REFLENGTH)
00149     {
00150       if (init_rr_cache())
00151       {
00152         read_record= rr_from_cache;
00153       }
00154     }
00155   }
00156   else if (select && select->quick)
00157   {
00158     read_record= rr_quick;
00159   }
00160   else if (table->sort.record_pointers)
00161   {
00162     error= table->cursor->startTableScan(0);
00163     if (error != 0)
00164       return error;
00165 
00166     cache_pos=table->sort.record_pointers;
00167     cache_end= cache_pos+ table->sort.found_records * ref_length;
00168     read_record= (table->sort.addon_field ?  rr_unpack_from_buffer : rr_from_pointers);
00169   }
00170   else
00171   {
00172     read_record= rr_sequential;
00173     error= table->cursor->startTableScan(1);
00174     if (error != 0)
00175       return error;
00176 
00177     /* We can use record cache if we don't update dynamic length tables */
00178     if (!table->no_cache &&
00179         (use_record_cache > 0 ||
00180         (int) table->reginfo.lock_type <= (int) TL_READ_WITH_SHARED_LOCKS ||
00181         !(table->getShare()->db_options_in_use & HA_OPTION_PACK_RECORD)))
00182     {
00183       table->cursor->extra_opt(HA_EXTRA_CACHE, session->variables.read_buff_size);
00184     }
00185   }
00186 
00187   return 0;
00188 } /* init_read_record */
00189 
00190 
00191 void ReadRecord::end_read_record()
00192 {                   /* free cache if used */
00193   if (cache)
00194   {
00195     global_read_rnd_buffer.sub(session->variables.read_rnd_buff_size);
00196     free((char*) cache);
00197     cache= NULL;
00198   }
00199   if (table)
00200   {
00201     table->filesort_free_buffers();
00202     (void) cursor->extra(HA_EXTRA_NO_CACHE);
00203     if (read_record != rr_quick) // otherwise quick_range does it
00204       (void) cursor->ha_index_or_rnd_end();
00205 
00206     table= NULL;
00207   }
00208 }
00209 
00210 static int rr_handle_error(ReadRecord *info, int error)
00211 {
00212   if (error == HA_ERR_END_OF_FILE)
00213     error= -1;
00214   else
00215   {
00216     if (info->print_error)
00217       info->table->print_error(error, MYF(0));
00218     if (error < 0)                            // Fix negative BDB errno
00219       error= 1;
00220   }
00221   return error;
00222 }
00223 
00225 static int rr_quick(ReadRecord *info)
00226 {
00227   int tmp;
00228   while ((tmp= info->select->quick->get_next()))
00229   {
00230     if (info->session->getKilled())
00231     {
00232       my_error(ER_SERVER_SHUTDOWN, MYF(0));
00233       return 1;
00234     }
00235     if (tmp != HA_ERR_RECORD_DELETED)
00236     {
00237       tmp= rr_handle_error(info, tmp);
00238       break;
00239     }
00240   }
00241 
00242   return tmp;
00243 }
00244 
00257 static int rr_index_first(ReadRecord *info)
00258 {
00259   int tmp= info->cursor->index_first(info->record);
00260   info->read_record= rr_index;
00261   if (tmp)
00262     tmp= rr_handle_error(info, tmp);
00263   return tmp;
00264 }
00265 
00281 static int rr_index(ReadRecord *info)
00282 {
00283   int tmp= info->cursor->index_next(info->record);
00284   if (tmp)
00285     tmp= rr_handle_error(info, tmp);
00286   return tmp;
00287 }
00288 
00289 int rr_sequential(ReadRecord *info)
00290 {
00291   int tmp;
00292   while ((tmp= info->cursor->rnd_next(info->record)))
00293   {
00294     if (info->session->getKilled())
00295     {
00296       info->session->send_kill_message();
00297       return 1;
00298     }
00299     /*
00300       TODO> Fix this so that engine knows how to behave on its own.
00301       rnd_next can return RECORD_DELETED for MyISAM when one thread is
00302       reading and another deleting without locks.
00303     */
00304     if (tmp != HA_ERR_RECORD_DELETED)
00305     {
00306       tmp= rr_handle_error(info, tmp);
00307       break;
00308     }
00309   }
00310 
00311   return tmp;
00312 }
00313 
00314 static int rr_from_tempfile(ReadRecord *info)
00315 {
00316   int tmp;
00317   for (;;)
00318   {
00319     if (my_b_read(info->io_cache,info->ref_pos,info->ref_length))
00320       return -1;          /* End of cursor */
00321     if (!(tmp=info->cursor->rnd_pos(info->record,info->ref_pos)))
00322       break;
00323     /* The following is extremely unlikely to happen */
00324     if (tmp == HA_ERR_RECORD_DELETED ||
00325         (tmp == HA_ERR_KEY_NOT_FOUND && info->ignore_not_found_rows))
00326       continue;
00327     tmp= rr_handle_error(info, tmp);
00328     break;
00329   }
00330   return tmp;
00331 } /* rr_from_tempfile */
00332 
00348 static int rr_unpack_from_tempfile(ReadRecord *info)
00349 {
00350   if (my_b_read(info->io_cache, info->rec_buf, info->ref_length))
00351     return -1;
00352   Table *table= info->table;
00353   (*table->sort.unpack)(table->sort.addon_field, info->rec_buf);
00354 
00355   return 0;
00356 }
00357 
00358 static int rr_from_pointers(ReadRecord *info)
00359 {
00360   int tmp;
00361   unsigned char *cache_pos;
00362 
00363 
00364   for (;;)
00365   {
00366     if (info->cache_pos == info->cache_end)
00367       return -1;          /* End of cursor */
00368     cache_pos= info->cache_pos;
00369     info->cache_pos+= info->ref_length;
00370 
00371     if (!(tmp=info->cursor->rnd_pos(info->record,cache_pos)))
00372       break;
00373 
00374     /* The following is extremely unlikely to happen */
00375     if (tmp == HA_ERR_RECORD_DELETED ||
00376         (tmp == HA_ERR_KEY_NOT_FOUND && info->ignore_not_found_rows))
00377       continue;
00378     tmp= rr_handle_error(info, tmp);
00379     break;
00380   }
00381   return tmp;
00382 }
00383 
00399 static int rr_unpack_from_buffer(ReadRecord *info)
00400 {
00401   if (info->cache_pos == info->cache_end)
00402     return -1;                      /* End of buffer */
00403   Table *table= info->table;
00404   (*table->sort.unpack)(table->sort.addon_field, info->cache_pos);
00405   info->cache_pos+= info->ref_length;
00406 
00407   return 0;
00408 }
00409 
00410 /* cacheing of records from a database */
00411 bool ReadRecord::init_rr_cache()
00412 {
00413   uint32_t local_rec_cache_size;
00414 
00415   struct_length= 3 + MAX_REFLENGTH;
00416   reclength= ALIGN_SIZE(table->getShare()->getRecordLength() + 1);
00417   if (reclength < struct_length)
00418     reclength= ALIGN_SIZE(struct_length);
00419 
00420   error_offset= table->getShare()->getRecordLength();
00421   cache_records= (session->variables.read_rnd_buff_size /
00422                         (reclength + struct_length));
00423   local_rec_cache_size= cache_records * reclength;
00424   rec_cache_size= cache_records * ref_length;
00425 
00426   if (not global_read_rnd_buffer.add(session->variables.read_rnd_buff_size))
00427   {
00428     my_error(ER_OUT_OF_GLOBAL_READRNDMEMORY, MYF(ME_ERROR+ME_WAITTANG));
00429     return false;
00430   }
00431 
00432   // We have to allocate one more byte to use uint3korr (see comments for it)
00433   if (cache_records <= 2 ||
00434       !(cache=(unsigned char*) malloc(local_rec_cache_size + cache_records * struct_length + 1)))
00435   {
00436     return false;
00437   }
00438 #ifdef HAVE_VALGRIND
00439   // Avoid warnings in qsort
00440   memset(cache, 0, local_rec_cache_size + cache_records * struct_length + 1);
00441 #endif
00442   read_positions= cache + local_rec_cache_size;
00443   cache_pos= cache_end= cache;
00444 
00445   return true;
00446 } /* init_rr_cache */
00447 
00448 static int rr_from_cache(ReadRecord *info)
00449 {
00450   uint32_t length;
00451   internal::my_off_t rest_of_file;
00452   int16_t error;
00453   unsigned char *position,*ref_position,*record_pos;
00454   uint32_t record;
00455 
00456   for (;;)
00457   {
00458     if (info->cache_pos != info->cache_end)
00459     {
00460       if (info->cache_pos[info->error_offset])
00461       {
00462         shortget(error,info->cache_pos);
00463         if (info->print_error)
00464           info->table->print_error(error,MYF(0));
00465       }
00466       else
00467       {
00468         error=0;
00469         memcpy(info->record,info->cache_pos, (size_t) info->table->getShare()->getRecordLength());
00470       }
00471       info->cache_pos+= info->reclength;
00472       return ((int) error);
00473     }
00474     length=info->rec_cache_size;
00475     rest_of_file= info->io_cache->end_of_file - my_b_tell(info->io_cache);
00476     if ((internal::my_off_t) length > rest_of_file)
00477     {
00478       length= (uint32_t) rest_of_file;
00479     }
00480 
00481     if (!length || my_b_read(info->io_cache, info->getCache(), length))
00482     {
00483       return -1;      /* End of cursor */
00484     }
00485 
00486     length/=info->ref_length;
00487     position=info->getCache();
00488     ref_position=info->read_positions;
00489     for (uint32_t i= 0 ; i < length ; i++,position+=info->ref_length)
00490     {
00491       memcpy(ref_position,position,(size_t) info->ref_length);
00492       ref_position+=MAX_REFLENGTH;
00493       int3store(ref_position,(long) i);
00494       ref_position+=3;
00495     }
00496     internal::my_qsort(info->read_positions, length, info->struct_length,
00497                        (qsort_cmp) rr_cmp);
00498 
00499     position=info->read_positions;
00500     for (uint32_t i= 0 ; i < length ; i++)
00501     {
00502       memcpy(info->ref_pos, position, (size_t)info->ref_length);
00503       position+=MAX_REFLENGTH;
00504       record=uint3korr(position);
00505       position+=3;
00506       record_pos= info->getCache() + record * info->reclength;
00507       if ((error=(int16_t) info->cursor->rnd_pos(record_pos,info->ref_pos)))
00508       {
00509         record_pos[info->error_offset]=1;
00510         shortstore(record_pos,error);
00511       }
00512       else
00513         record_pos[info->error_offset]=0;
00514     }
00515     info->cache_end= (info->cache_pos= info->getCache())+length*info->reclength;
00516   }
00517 } /* rr_from_cache */
00518 
00519 static int rr_cmp(unsigned char *a,unsigned char *b)
00520 {
00521   if (a[0] != b[0])
00522     return (int) a[0] - (int) b[0];
00523   if (a[1] != b[1])
00524     return (int) a[1] - (int) b[1];
00525   if (a[2] != b[2])
00526     return (int) a[2] - (int) b[2];
00527 #if MAX_REFLENGTH == 4
00528   return (int) a[3] - (int) b[3];
00529 #else
00530   if (a[3] != b[3])
00531     return (int) a[3] - (int) b[3];
00532   if (a[4] != b[4])
00533     return (int) a[4] - (int) b[4];
00534   if (a[5] != b[5])
00535     return (int) a[1] - (int) b[5];
00536   if (a[6] != b[6])
00537     return (int) a[6] - (int) b[6];
00538   return (int) a[7] - (int) b[7];
00539 #endif
00540 }
00541 
00542 } /* namespace drizzled */