Drizzled Public API Documentation

filesort.cc

Go to the documentation of this file.
00001 /* Copyright (C) 2000-2006 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 
00024 #include <config.h>
00025 
00026 #include <float.h>
00027 #include <limits.h>
00028 
00029 #include <queue>
00030 #include <algorithm>
00031 #include <iostream>
00032 
00033 #include <drizzled/drizzled.h>
00034 #include <drizzled/sql_sort.h>
00035 #include <drizzled/filesort.h>
00036 #include <drizzled/error.h>
00037 #include <drizzled/probes.h>
00038 #include <drizzled/session.h>
00039 #include <drizzled/table.h>
00040 #include <drizzled/table_list.h>
00041 #include <drizzled/optimizer/range.h>
00042 #include <drizzled/records.h>
00043 #include <drizzled/internal/iocache.h>
00044 #include <drizzled/internal/my_sys.h>
00045 #include <plugin/myisam/myisam.h>
00046 #include <drizzled/plugin/transactional_storage_engine.h>
00047 #include <drizzled/atomics.h>
00048 #include <drizzled/global_buffer.h>
00049 #include <drizzled/sort_field.h>
00050 #include <drizzled/item/subselect.h>
00051 
00052 using namespace std;
00053 
00054 namespace drizzled {
00055 
00056 /* Defines used by filesort and uniques */
00057 #define MERGEBUFF   7
00058 #define MERGEBUFF2    15
00059 
00060 class BufferCompareContext
00061 {
00062 public:
00063   qsort_cmp2 key_compare;
00064   void *key_compare_arg;
00065 
00066   BufferCompareContext() :
00067     key_compare(0),
00068     key_compare_arg(0)
00069   { }
00070 
00071 };
00072 
00073 class SortParam {
00074 public:
00075   uint32_t rec_length;          /* Length of sorted records */
00076   uint32_t sort_length;     /* Length of sorted columns */
00077   uint32_t ref_length;      /* Length of record ref. */
00078   uint32_t addon_length;        /* Length of added packed fields */
00079   uint32_t res_length;          /* Length of records in final sorted file/buffer */
00080   uint32_t keys;        /* Max keys / buffer */
00081   ha_rows max_rows,examined_rows;
00082   Table *sort_form;     /* For quicker make_sortkey */
00083   SortField *local_sortorder;
00084   SortField *end;
00085   sort_addon_field *addon_field; /* Descriptors for companion fields */
00086   unsigned char *unique_buff;
00087   bool not_killable;
00088   char *tmp_buffer;
00089   /* The fields below are used only by Unique class */
00090   qsort2_cmp compare;
00091   BufferCompareContext cmp_context;
00092 
00093   SortParam() :
00094     rec_length(0),
00095     sort_length(0),
00096     ref_length(0),
00097     addon_length(0),
00098     res_length(0),
00099     keys(0),
00100     max_rows(0),
00101     examined_rows(0),
00102     sort_form(0),
00103     local_sortorder(0),
00104     end(0),
00105     addon_field(0),
00106     unique_buff(0),
00107     not_killable(0),
00108     tmp_buffer(0),
00109     compare(0)
00110   {
00111   }
00112 
00113   ~SortParam()
00114   {
00115     if (tmp_buffer)
00116       free(tmp_buffer);
00117   }
00118 
00119   int write_keys(unsigned char * *sort_keys,
00120                  uint32_t count,
00121                  internal::IO_CACHE *buffer_file,
00122                  internal::IO_CACHE *tempfile);
00123 
00124   void make_sortkey(unsigned char *to,
00125                     unsigned char *ref_pos);
00126   void register_used_fields();
00127   bool save_index(unsigned char **sort_keys,
00128                   uint32_t count,
00129                   filesort_info *table_sort);
00130 
00131 };
00132 
00133 /* functions defined in this file */
00134 
00135 static char **make_char_array(char **old_pos, uint32_t fields,
00136                               uint32_t length);
00137 
00138 static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffer_file,
00139                                              uint32_t count,
00140                                              unsigned char *buf);
00141 
00142 static uint32_t suffix_length(uint32_t string_length);
00143 static void unpack_addon_fields(sort_addon_field *addon_field,
00144                                 unsigned char *buff);
00145 
00146 FileSort::FileSort(Session &arg) :
00147   _session(arg)
00148 { 
00149 }
00150 
00186 ha_rows FileSort::run(Table *table, SortField *sortorder, uint32_t s_length,
00187                       optimizer::SqlSelect *select, ha_rows max_rows,
00188                       bool sort_positions, ha_rows &examined_rows)
00189 {
00190   int error= 1;
00191   uint32_t memavl= 0, min_sort_memory;
00192   uint32_t maxbuffer;
00193   size_t allocated_sort_memory= 0;
00194   buffpek *buffpek_inst= 0;
00195   ha_rows records= HA_POS_ERROR;
00196   unsigned char **sort_keys= 0;
00197   internal::IO_CACHE tempfile;
00198   internal::IO_CACHE buffpek_pointers;
00199   internal::IO_CACHE *selected_records_file;
00200   internal::IO_CACHE *outfile;
00201   SortParam param;
00202   bool multi_byte_charset;
00203 
00204   /*
00205     Don't use table->sort in filesort as it is also used by
00206     QuickIndexMergeSelect. Work with a copy and put it back at the end
00207     when index_merge select has finished with it.
00208   */
00209   filesort_info table_sort(table->sort);
00210   table->sort.io_cache= NULL;
00211 
00212   TableList *tab= table->pos_in_table_list;
00213   Item_subselect *subselect= tab ? tab->containing_subselect() : 0;
00214 
00215   DRIZZLE_FILESORT_START(table->getShare()->getSchemaName(), table->getShare()->getTableName());
00216 
00217   /*
00218    Release InnoDB's adaptive hash index latch (if holding) before
00219    running a sort.
00220   */
00221   plugin::TransactionalStorageEngine::releaseTemporaryLatches(&getSession());
00222 
00223 
00224   outfile= table_sort.io_cache;
00225   assert(tempfile.buffer == 0);
00226   assert(buffpek_pointers.buffer == 0);
00227 
00228   param.sort_length= sortlength(sortorder, s_length, &multi_byte_charset);
00229   param.ref_length= table->cursor->ref_length;
00230 
00231   if (!(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) && !sort_positions)
00232   {
00233     /*
00234       Get the descriptors of all fields whose values are appended
00235       to sorted fields and get its total length in param.spack_length.
00236     */
00237     param.addon_field= get_addon_fields(table->getFields(),
00238                                         param.sort_length,
00239                                         &param.addon_length);
00240   }
00241 
00242   table_sort.addon_buf= 0;
00243   table_sort.addon_length= param.addon_length;
00244   table_sort.addon_field= param.addon_field;
00245   table_sort.unpack= unpack_addon_fields;
00246   if (param.addon_field)
00247   {
00248     param.res_length= param.addon_length;
00249     if (!(table_sort.addon_buf= (unsigned char *) malloc(param.addon_length)))
00250     {
00251       goto err;
00252     }
00253   }
00254   else
00255   {
00256     param.res_length= param.ref_length;
00257     /*
00258       The reference to the record is considered
00259       as an additional sorted field
00260     */
00261     param.sort_length+= param.ref_length;
00262   }
00263   param.rec_length= param.sort_length+param.addon_length;
00264   param.max_rows= max_rows;
00265 
00266   if (select && select->quick)
00267   {
00268     getSession().status_var.filesort_range_count++;
00269   }
00270   else
00271   {
00272     getSession().status_var.filesort_scan_count++;
00273   }
00274 #ifdef CAN_TRUST_RANGE
00275   if (select && select->quick && select->quick->records > 0L)
00276   {
00277     records= min((ha_rows) (select->quick->records*2+EXTRA_RECORDS*2),
00278                  table->cursor->stats.records)+EXTRA_RECORDS;
00279     selected_records_file=0;
00280   }
00281   else
00282 #endif
00283   {
00284     records= table->cursor->estimate_rows_upper_bound();
00285     /*
00286       If number of records is not known, use as much of sort buffer
00287       as possible.
00288     */
00289     if (records == HA_POS_ERROR)
00290       records--;  // we use 'records+1' below.
00291     selected_records_file= 0;
00292   }
00293 
00294   if (multi_byte_charset && !(param.tmp_buffer= (char*) malloc(param.sort_length)))
00295   {
00296     goto err;
00297   }
00298 
00299   memavl= getSession().variables.sortbuff_size;
00300   min_sort_memory= max((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
00301   while (memavl >= min_sort_memory)
00302   {
00303     uint32_t old_memavl;
00304     uint32_t keys= memavl/(param.rec_length+sizeof(char*));
00305     param.keys= (uint32_t) min(records+1, (ha_rows)keys);
00306 
00307     allocated_sort_memory= param.keys * param.rec_length;
00308     if (not global_sort_buffer.add(allocated_sort_memory))
00309     {
00310       my_error(ER_OUT_OF_GLOBAL_SORTMEMORY, MYF(ME_ERROR+ME_WAITTANG));
00311       goto err;
00312     }
00313 
00314     if ((table_sort.sort_keys=
00315    (unsigned char **) make_char_array((char **) table_sort.sort_keys,
00316                                             param.keys, param.rec_length)))
00317       break;
00318 
00319     global_sort_buffer.sub(allocated_sort_memory);
00320     old_memavl= memavl;
00321     if ((memavl= memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
00322       memavl= min_sort_memory;
00323   }
00324   sort_keys= table_sort.sort_keys;
00325   if (memavl < min_sort_memory)
00326   {
00327     my_error(ER_OUT_OF_SORTMEMORY,MYF(ME_ERROR+ME_WAITTANG));
00328     goto err;
00329   }
00330 
00331   if (buffpek_pointers.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
00332   {
00333     goto err;
00334   }
00335 
00336   param.keys--;       /* TODO: check why we do this */
00337   param.sort_form= table;
00338   param.end=(param.local_sortorder=sortorder)+s_length;
00339   if ((records= find_all_keys(&param,select,sort_keys, &buffpek_pointers,
00340                               &tempfile, selected_records_file)) == HA_POS_ERROR)
00341   {
00342     goto err;
00343   }
00344   maxbuffer= (uint32_t) (my_b_tell(&buffpek_pointers)/sizeof(*buffpek_inst));
00345 
00346   if (maxbuffer == 0)     // The whole set is in memory
00347   {
00348     if (param.save_index(sort_keys,(uint32_t) records, &table_sort))
00349     {
00350       goto err;
00351     }
00352   }
00353   else
00354   {
00355     if (table_sort.buffpek && table_sort.buffpek_len < maxbuffer)
00356     {
00357       if (table_sort.buffpek)
00358         free(table_sort.buffpek);
00359       table_sort.buffpek = 0;
00360     }
00361     if (!(table_sort.buffpek=
00362           (unsigned char *) read_buffpek_from_file(&buffpek_pointers, maxbuffer, table_sort.buffpek)))
00363     {
00364       goto err;
00365     }
00366     buffpek_inst= (buffpek *) table_sort.buffpek;
00367     table_sort.buffpek_len= maxbuffer;
00368     buffpek_pointers.close_cached_file();
00369   /* Open cached file if it isn't open */
00370     if (! my_b_inited(outfile) && outfile->open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME)))
00371     {
00372       goto err;
00373     }
00374 
00375     if (outfile->reinit_io_cache(internal::WRITE_CACHE,0L,0,0))
00376     {
00377       goto err;
00378     }
00379 
00380     /*
00381       Use also the space previously used by string pointers in sort_buffer
00382       for temporary key storage.
00383     */
00384     param.keys=((param.keys*(param.rec_length+sizeof(char*))) / param.rec_length-1);
00385 
00386     maxbuffer--;        // Offset from 0
00387     if (merge_many_buff(&param,(unsigned char*) sort_keys,buffpek_inst,&maxbuffer, &tempfile))
00388     {
00389       goto err;
00390     }
00391 
00392     if (flush_io_cache(&tempfile) || tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00393     {
00394       goto err;
00395     }
00396 
00397     if (merge_index(&param,(unsigned char*) sort_keys,buffpek_inst,maxbuffer,&tempfile, outfile))
00398     {
00399       goto err;
00400     }
00401   }
00402 
00403   if (records > param.max_rows)
00404   {
00405     records= param.max_rows;
00406   }
00407   error =0;
00408 
00409  err:
00410   if (not subselect || not subselect->is_uncacheable())
00411   {
00412     free(sort_keys);
00413     table_sort.sort_keys= 0;
00414     free(buffpek_inst);
00415     table_sort.buffpek= 0;
00416     table_sort.buffpek_len= 0;
00417   }
00418 
00419   tempfile.close_cached_file();
00420   buffpek_pointers.close_cached_file();
00421 
00422   if (my_b_inited(outfile))
00423   {
00424     if (flush_io_cache(outfile))
00425     {
00426       error=1;
00427     }
00428     {
00429       internal::my_off_t save_pos= outfile->pos_in_file;
00430       /* For following reads */
00431       if (outfile->reinit_io_cache(internal::READ_CACHE,0L,0,0))
00432       {
00433   error=1;
00434       }
00435       outfile->end_of_file=save_pos;
00436     }
00437   }
00438 
00439   if (error)
00440   {
00441     my_message(ER_FILSORT_ABORT, ER(ER_FILSORT_ABORT),
00442                MYF(ME_ERROR+ME_WAITTANG));
00443   }
00444   else
00445   {
00446     getSession().status_var.filesort_rows+= (uint32_t) records;
00447   }
00448   examined_rows= param.examined_rows;
00449   global_sort_buffer.sub(allocated_sort_memory);
00450   table->sort= table_sort;
00451   DRIZZLE_FILESORT_DONE(error, records);
00452   return (error ? HA_POS_ERROR : records);
00453 } /* filesort */
00454 
00457 static char **make_char_array(char **old_pos, uint32_t fields,
00458                               uint32_t length)
00459 {
00460   char **pos;
00461   char *char_pos;
00462 
00463   if (old_pos ||
00464       (old_pos= (char**) malloc((uint32_t) fields*(length+sizeof(char*)))))
00465   {
00466     pos=old_pos; char_pos=((char*) (pos+fields)) -length;
00467     while (fields--) *(pos++) = (char_pos+= length);
00468   }
00469 
00470   return(old_pos);
00471 } /* make_char_array */
00472 
00473 
00476 static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffpek_pointers, uint32_t count,
00477                                      unsigned char *buf)
00478 {
00479   uint32_t length= sizeof(buffpek)*count;
00480   unsigned char *tmp= buf;
00481   if (count > UINT_MAX/sizeof(buffpek))
00482     return 0; /* sizeof(buffpek)*count will overflow */
00483   if (!tmp)
00484     tmp= (unsigned char *)malloc(length);
00485   if (tmp)
00486   {
00487     if (buffpek_pointers->reinit_io_cache(internal::READ_CACHE,0L,0,0) ||
00488   my_b_read(buffpek_pointers, (unsigned char*) tmp, length))
00489     {
00490       free((char*) tmp);
00491       tmp=0;
00492     }
00493   }
00494   return(tmp);
00495 }
00496 
00497 
00535 ha_rows FileSort::find_all_keys(SortParam *param, 
00536                                 optimizer::SqlSelect *select,
00537                                 unsigned char **sort_keys,
00538                                 internal::IO_CACHE *buffpek_pointers,
00539                                 internal::IO_CACHE *tempfile, internal::IO_CACHE *indexfile)
00540 {
00541   int error,flag,quick_select;
00542   uint32_t idx,indexpos,ref_length;
00543   unsigned char *ref_pos,*next_pos,ref_buff[MAX_REFLENGTH];
00544   internal::my_off_t record;
00545   Table *sort_form;
00546   volatile Session::killed_state_t *killed= getSession().getKilledPtr();
00547   Cursor *file;
00548   boost::dynamic_bitset<> *save_read_set= NULL;
00549   boost::dynamic_bitset<> *save_write_set= NULL;
00550 
00551   idx=indexpos=0;
00552   error=quick_select=0;
00553   sort_form=param->sort_form;
00554   file= sort_form->cursor;
00555   ref_length=param->ref_length;
00556   ref_pos= ref_buff;
00557   quick_select=select && select->quick;
00558   record=0;
00559   flag= ((!indexfile && ! file->isOrdered())
00560    || quick_select);
00561   if (indexfile || flag)
00562     ref_pos= &file->ref[0];
00563   next_pos=ref_pos;
00564   if (! indexfile && ! quick_select)
00565   {
00566     next_pos=(unsigned char*) 0;      /* Find records in sequence */
00567     if (file->startTableScan(1))
00568       return(HA_POS_ERROR);
00569     file->extra_opt(HA_EXTRA_CACHE, getSession().variables.read_buff_size);
00570   }
00571 
00572   ReadRecord read_record_info;
00573   if (quick_select)
00574   {
00575     if (select->quick->reset())
00576       return(HA_POS_ERROR);
00577 
00578     if (read_record_info.init_read_record(&getSession(), select->quick->head, select, 1, 1))
00579       return(HA_POS_ERROR);
00580   }
00581 
00582   /* Remember original bitmaps */
00583   save_read_set=  sort_form->read_set;
00584   save_write_set= sort_form->write_set;
00585   /* Set up temporary column read map for columns used by sort */
00586   sort_form->tmp_set.reset();
00587   /* Temporary set for register_used_fields and register_field_in_read_map */
00588   sort_form->read_set= &sort_form->tmp_set;
00589   param->register_used_fields();
00590   if (select && select->cond)
00591     select->cond->walk(&Item::register_field_in_read_map, 1,
00592                        (unsigned char*) sort_form);
00593   sort_form->column_bitmaps_set(sort_form->tmp_set, sort_form->tmp_set);
00594 
00595   for (;;)
00596   {
00597     if (quick_select)
00598     {
00599       if ((error= read_record_info.read_record(&read_record_info)))
00600       {
00601         error= HA_ERR_END_OF_FILE;
00602         break;
00603       }
00604       file->position(sort_form->getInsertRecord());
00605     }
00606     else          /* Not quick-select */
00607     {
00608       if (indexfile)
00609       {
00610   if (my_b_read(indexfile,(unsigned char*) ref_pos,ref_length))
00611   {
00612     error= errno ? errno : -1;    /* Abort */
00613     break;
00614   }
00615   error=file->rnd_pos(sort_form->getInsertRecord(),next_pos);
00616       }
00617       else
00618       {
00619   error=file->rnd_next(sort_form->getInsertRecord());
00620 
00621   if (!flag)
00622   {
00623     internal::my_store_ptr(ref_pos,ref_length,record); // Position to row
00624     record+= sort_form->getShare()->db_record_offset;
00625   }
00626   else if (!error)
00627     file->position(sort_form->getInsertRecord());
00628       }
00629       if (error && error != HA_ERR_RECORD_DELETED)
00630   break;
00631     }
00632 
00633     if (*killed)
00634     {
00635       if (!indexfile && !quick_select)
00636       {
00637         (void) file->extra(HA_EXTRA_NO_CACHE);
00638         file->endTableScan();
00639       }
00640       return(HA_POS_ERROR);
00641     }
00642     if (error == 0)
00643       param->examined_rows++;
00644     if (error == 0 && (!select || select->skip_record() == 0))
00645     {
00646       if (idx == param->keys)
00647       {
00648   if (param->write_keys(sort_keys, idx, buffpek_pointers, tempfile))
00649     return(HA_POS_ERROR);
00650   idx=0;
00651   indexpos++;
00652       }
00653       param->make_sortkey(sort_keys[idx++], ref_pos);
00654     }
00655     else
00656     {
00657       file->unlock_row();
00658     }
00659 
00660     /* It does not make sense to read more keys in case of a fatal error */
00661     if (getSession().is_error())
00662       break;
00663   }
00664   if (quick_select)
00665   {
00666     /*
00667       index_merge quick select uses table->sort when retrieving rows, so free
00668       resoures it has allocated.
00669     */
00670     read_record_info.end_read_record();
00671   }
00672   else
00673   {
00674     (void) file->extra(HA_EXTRA_NO_CACHE);  /* End cacheing of records */
00675     if (!next_pos)
00676       file->endTableScan();
00677   }
00678 
00679   if (getSession().is_error())
00680     return(HA_POS_ERROR);
00681 
00682   /* Signal we should use orignal column read and write maps */
00683   sort_form->column_bitmaps_set(*save_read_set, *save_write_set);
00684 
00685   if (error != HA_ERR_END_OF_FILE)
00686   {
00687     sort_form->print_error(error,MYF(ME_ERROR | ME_WAITTANG));
00688     return(HA_POS_ERROR);
00689   }
00690 
00691   if (indexpos && idx && param->write_keys(sort_keys,idx,buffpek_pointers,tempfile))
00692   {
00693     return(HA_POS_ERROR);
00694   }
00695 
00696   return(my_b_inited(tempfile) ?
00697         (ha_rows) (my_b_tell(tempfile)/param->rec_length) :
00698         idx);
00699 } /* find_all_keys */
00700 
00701 
00724 int SortParam::write_keys(unsigned char **sort_keys, uint32_t count,
00725                           internal::IO_CACHE *buffpek_pointers, internal::IO_CACHE *tempfile)
00726 {
00727   buffpek buffpek;
00728 
00729   internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
00730   if (!my_b_inited(tempfile) &&
00731       tempfile->open_cached_file(drizzle_tmpdir.c_str(), TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
00732   {
00733     return 1;
00734   }
00735   /* check we won't have more buffpeks than we can possibly keep in memory */
00736   if (my_b_tell(buffpek_pointers) + sizeof(buffpek) > (uint64_t)UINT_MAX)
00737   {
00738     return 1;
00739   }
00740 
00741   buffpek.file_pos= my_b_tell(tempfile);
00742   if ((ha_rows) count > max_rows)
00743     count=(uint32_t) max_rows;
00744 
00745   buffpek.count=(ha_rows) count;
00746 
00747   for (unsigned char **ptr= sort_keys + count ; sort_keys != ptr ; sort_keys++)
00748   {
00749     if (my_b_write(tempfile, (unsigned char*) *sort_keys, (uint32_t) rec_length))
00750     {
00751       return 1;
00752     }
00753   }
00754 
00755   if (my_b_write(buffpek_pointers, (unsigned char*) &buffpek, sizeof(buffpek)))
00756   {
00757     return 1;
00758   }
00759 
00760   return 0;
00761 } /* write_keys */
00762 
00763 
00768 static inline void store_length(unsigned char *to, uint32_t length, uint32_t pack_length)
00769 {
00770   switch (pack_length) {
00771   case 1:
00772     *to= (unsigned char) length;
00773     break;
00774   case 2:
00775     mi_int2store(to, length);
00776     break;
00777   case 3:
00778     mi_int3store(to, length);
00779     break;
00780   default:
00781     mi_int4store(to, length);
00782     break;
00783   }
00784 }
00785 
00786 
00789 void SortParam::make_sortkey(unsigned char *to, unsigned char *ref_pos)
00790 {
00791   Field *field;
00792   SortField *sort_field;
00793   size_t length;
00794 
00795   for (sort_field= local_sortorder ;
00796        sort_field != end ;
00797        sort_field++)
00798   {
00799     bool maybe_null=0;
00800     if ((field=sort_field->field))
00801     {           // Field
00802       if (field->maybe_null())
00803       {
00804   if (field->is_null())
00805   {
00806     if (sort_field->reverse)
00807       memset(to, 255, sort_field->length+1);
00808     else
00809       memset(to, 0, sort_field->length+1);
00810     to+= sort_field->length+1;
00811     continue;
00812   }
00813   else
00814     *to++=1;
00815       }
00816       field->sort_string(to, sort_field->length);
00817     }
00818     else
00819     {           // Item
00820       Item *item=sort_field->item;
00821       maybe_null= item->maybe_null;
00822 
00823       switch (sort_field->result_type) {
00824       case STRING_RESULT:
00825         {
00826           const CHARSET_INFO * const cs=item->collation.collation;
00827           char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
00828           int diff;
00829           uint32_t sort_field_length;
00830 
00831           if (maybe_null)
00832             *to++=1;
00833           /* All item->str() to use some extra byte for end null.. */
00834           String tmp((char*) to,sort_field->length+4,cs);
00835           String *res= item->str_result(&tmp);
00836           if (!res)
00837           {
00838             if (maybe_null)
00839               memset(to-1, 0, sort_field->length+1);
00840             else
00841             {
00842               /*
00843                 This should only happen during extreme conditions if we run out
00844                 of memory or have an item marked not null when it can be null.
00845                 This code is here mainly to avoid a hard crash in this case.
00846               */
00847               assert(0);
00848               memset(to, 0, sort_field->length);  // Avoid crash
00849             }
00850             break;
00851           }
00852           length= res->length();
00853           sort_field_length= sort_field->length - sort_field->suffix_length;
00854           diff=(int) (sort_field_length - length);
00855           if (diff < 0)
00856           {
00857             diff=0;
00858             length= sort_field_length;
00859           }
00860           if (sort_field->suffix_length)
00861           {
00862             /* Store length last in result_string */
00863             store_length(to + sort_field_length, length,
00864                          sort_field->suffix_length);
00865           }
00866           if (sort_field->need_strxnfrm)
00867           {
00868             char *from=(char*) res->ptr();
00869             uint32_t tmp_length;
00870             if ((unsigned char*) from == to)
00871             {
00872               set_if_smaller(length,sort_field->length);
00873               memcpy(tmp_buffer,from,length);
00874               from= tmp_buffer;
00875             }
00876             tmp_length= my_strnxfrm(cs,to,sort_field->length,
00877                                     (unsigned char*) from, length);
00878             assert(tmp_length == sort_field->length);
00879           }
00880           else
00881           {
00882             my_strnxfrm(cs,(unsigned char*)to,length,(const unsigned char*)res->ptr(),length);
00883             cs->cset->fill(cs, (char *)to+length,diff,fill_char);
00884           }
00885           break;
00886         }
00887       case INT_RESULT:
00888         {
00889           int64_t value= item->val_int_result();
00890           if (maybe_null)
00891           {
00892             *to++=1;
00893             if (item->null_value)
00894             {
00895               if (maybe_null)
00896                 memset(to-1, 0, sort_field->length+1);
00897               else
00898               {
00899                 memset(to, 0, sort_field->length);
00900               }
00901               break;
00902             }
00903           }
00904           to[7]= (unsigned char) value;
00905           to[6]= (unsigned char) (value >> 8);
00906           to[5]= (unsigned char) (value >> 16);
00907           to[4]= (unsigned char) (value >> 24);
00908           to[3]= (unsigned char) (value >> 32);
00909           to[2]= (unsigned char) (value >> 40);
00910           to[1]= (unsigned char) (value >> 48);
00911           if (item->unsigned_flag)                    /* Fix sign */
00912             to[0]= (unsigned char) (value >> 56);
00913           else
00914             to[0]= (unsigned char) (value >> 56) ^ 128; /* Reverse signbit */
00915           break;
00916         }
00917       case DECIMAL_RESULT:
00918         {
00919           type::Decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
00920           if (maybe_null)
00921           {
00922             if (item->null_value)
00923             {
00924               memset(to, 0, sort_field->length+1);
00925               to++;
00926               break;
00927             }
00928             *to++=1;
00929           }
00930           dec_val->val_binary(E_DEC_FATAL_ERROR, to,
00931                               item->max_length - (item->decimals ? 1:0),
00932                               item->decimals);
00933           break;
00934         }
00935       case REAL_RESULT:
00936         {
00937           double value= item->val_result();
00938           if (maybe_null)
00939           {
00940             if (item->null_value)
00941             {
00942               memset(to, 0, sort_field->length+1);
00943               to++;
00944               break;
00945             }
00946             *to++=1;
00947           }
00948           change_double_for_sort(value,(unsigned char*) to);
00949           break;
00950         }
00951       case ROW_RESULT:
00952       default:
00953         // This case should never be choosen
00954         assert(0);
00955         break;
00956       }
00957     }
00958 
00959     if (sort_field->reverse)
00960     {             /* Revers key */
00961       if (maybe_null)
00962         to[-1]= ~to[-1];
00963       length=sort_field->length;
00964       while (length--)
00965       {
00966   *to = (unsigned char) (~ *to);
00967   to++;
00968       }
00969     }
00970     else
00971     {
00972       to+= sort_field->length;
00973     }
00974   }
00975 
00976   if (addon_field)
00977   {
00978     /*
00979       Save field values appended to sorted fields.
00980       First null bit indicators are appended then field values follow.
00981       In this implementation we use fixed layout for field values -
00982       the same for all records.
00983     */
00984     sort_addon_field *addonf= addon_field;
00985     unsigned char *nulls= to;
00986     assert(addonf != 0);
00987     memset(nulls, 0, addonf->offset);
00988     to+= addonf->offset;
00989     for ( ; (field= addonf->field) ; addonf++)
00990     {
00991       if (addonf->null_bit && field->is_null())
00992       {
00993         nulls[addonf->null_offset]|= addonf->null_bit;
00994 #ifdef HAVE_VALGRIND
00995   memset(to, 0, addonf->length);
00996 #endif
00997       }
00998       else
00999       {
01000 #ifdef HAVE_VALGRIND
01001         unsigned char *end= field->pack(to, field->ptr);
01002   uint32_t local_length= (uint32_t) ((to + addonf->length) - end);
01003   assert((int) local_length >= 0);
01004   if (local_length)
01005     memset(end, 0, local_length);
01006 #else
01007         (void) field->pack(to, field->ptr);
01008 #endif
01009       }
01010       to+= addonf->length;
01011     }
01012   }
01013   else
01014   {
01015     /* Save filepos last */
01016     memcpy(to, ref_pos, (size_t) ref_length);
01017   }
01018 }
01019 
01020 
01021 /*
01022   fields used by sorting in the sorted table's read set
01023 */
01024 
01025 void SortParam::register_used_fields()
01026 {
01027   SortField *sort_field;
01028   Table *table= sort_form;
01029 
01030   for (sort_field= local_sortorder ;
01031        sort_field != end ;
01032        sort_field++)
01033   {
01034     Field *field;
01035     if ((field= sort_field->field))
01036     {
01037       if (field->getTable() == table)
01038         table->setReadSet(field->position());
01039     }
01040     else
01041     {           // Item
01042       sort_field->item->walk(&Item::register_field_in_read_map, 1,
01043                              (unsigned char *) table);
01044     }
01045   }
01046 
01047   if (addon_field)
01048   {
01049     sort_addon_field *addonf= addon_field;
01050     Field *field;
01051     for ( ; (field= addonf->field) ; addonf++)
01052       table->setReadSet(field->position());
01053   }
01054   else
01055   {
01056     /* Save filepos last */
01057     table->prepare_for_position();
01058   }
01059 }
01060 
01061 
01062 bool SortParam::save_index(unsigned char **sort_keys, uint32_t count, filesort_info *table_sort)
01063 {
01064   uint32_t offset;
01065   unsigned char *to;
01066 
01067   internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
01068   offset= rec_length - res_length;
01069 
01070   if ((ha_rows) count > max_rows)
01071     count=(uint32_t) max_rows;
01072 
01073   if (!(to= table_sort->record_pointers= (unsigned char*) malloc(res_length*count)))
01074     return true;
01075 
01076   for (unsigned char **end_ptr= sort_keys+count ; sort_keys != end_ptr ; sort_keys++)
01077   {
01078     memcpy(to, *sort_keys+offset, res_length);
01079     to+= res_length;
01080   }
01081 
01082   return false;
01083 }
01084 
01085 
01088 int FileSort::merge_many_buff(SortParam *param, unsigned char *sort_buffer,
01089                               buffpek *buffpek_inst, uint32_t *maxbuffer, internal::IO_CACHE *t_file)
01090 {
01091   internal::IO_CACHE t_file2,*from_file,*to_file,*temp;
01092   buffpek *lastbuff;
01093 
01094   if (*maxbuffer < MERGEBUFF2)
01095     return 0;
01096   if (flush_io_cache(t_file) ||
01097       t_file2.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX,DISK_BUFFER_SIZE, MYF(MY_WME)))
01098   {
01099     return 1;
01100   }
01101 
01102   from_file= t_file ; to_file= &t_file2;
01103   while (*maxbuffer >= MERGEBUFF2)
01104   {
01105     uint32_t i;
01106 
01107     if (from_file->reinit_io_cache(internal::READ_CACHE,0L,0,0))
01108     {
01109       break;
01110     }
01111 
01112     if (to_file->reinit_io_cache(internal::WRITE_CACHE,0L,0,0))
01113     {
01114       break;
01115     }
01116 
01117     lastbuff=buffpek_inst;
01118     for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
01119     {
01120       if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
01121       buffpek_inst+i,buffpek_inst+i+MERGEBUFF-1,0))
01122       {
01123         goto cleanup;
01124       }
01125     }
01126 
01127     if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
01128           buffpek_inst+i,buffpek_inst+ *maxbuffer,0))
01129     {
01130       break;
01131     }
01132 
01133     if (flush_io_cache(to_file))
01134     {
01135       break;
01136     }
01137 
01138     temp=from_file; from_file=to_file; to_file=temp;
01139     from_file->setup_io_cache();
01140     to_file->setup_io_cache();
01141     *maxbuffer= (uint32_t) (lastbuff-buffpek_inst)-1;
01142   }
01143 
01144 cleanup:
01145   to_file->close_cached_file();     // This holds old result
01146   if (to_file == t_file)
01147   {
01148     *t_file=t_file2;        // Copy result file
01149     t_file->setup_io_cache();
01150   }
01151 
01152   return(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
01153 } /* merge_many_buff */
01154 
01155 
01163 uint32_t FileSort::read_to_buffer(internal::IO_CACHE *fromfile, buffpek *buffpek_inst, uint32_t rec_length)
01164 {
01165   uint32_t count;
01166   uint32_t length;
01167 
01168   if ((count= (uint32_t) min((ha_rows) buffpek_inst->max_keys,buffpek_inst->count)))
01169   {
01170     if (pread(fromfile->file,(unsigned char*) buffpek_inst->base, (length= rec_length*count),buffpek_inst->file_pos) == 0)
01171       return((uint32_t) -1);
01172 
01173     buffpek_inst->key= buffpek_inst->base;
01174     buffpek_inst->file_pos+= length;      /* New filepos */
01175     buffpek_inst->count-= count;
01176     buffpek_inst->mem_count= count;
01177   }
01178   return (count*rec_length);
01179 } /* read_to_buffer */
01180 
01181 
01182 class compare_functor
01183 {
01184   qsort2_cmp key_compare;
01185   void *key_compare_arg;
01186 
01187   public:
01188   compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg) :
01189     key_compare(in_key_compare),
01190     key_compare_arg(in_compare_arg)
01191   { }
01192   
01193   inline bool operator()(const buffpek *i, const buffpek *j) const
01194   {
01195     int val= key_compare(key_compare_arg, &i->key, &j->key);
01196 
01197     return (val >= 0);
01198   }
01199 };
01200 
01201 
01220 int FileSort::merge_buffers(SortParam *param, internal::IO_CACHE *from_file,
01221                             internal::IO_CACHE *to_file, unsigned char *sort_buffer,
01222                             buffpek *lastbuff, buffpek *Fb, buffpek *Tb,
01223                             int flag)
01224 {
01225   int error;
01226   uint32_t rec_length,res_length,offset;
01227   size_t sort_length;
01228   uint32_t maxcount;
01229   ha_rows max_rows,org_max_rows;
01230   internal::my_off_t to_start_filepos;
01231   unsigned char *strpos;
01232   buffpek *buffpek_inst;
01233   qsort2_cmp cmp;
01234   void *first_cmp_arg;
01235   volatile Session::killed_state_t *killed= getSession().getKilledPtr();
01236   Session::killed_state_t not_killable;
01237 
01238   getSession().status_var.filesort_merge_passes++;
01239   if (param->not_killable)
01240   {
01241     killed= &not_killable;
01242     not_killable= Session::NOT_KILLED;
01243   }
01244 
01245   error=0;
01246   rec_length= param->rec_length;
01247   res_length= param->res_length;
01248   sort_length= param->sort_length;
01249   offset= rec_length-res_length;
01250   maxcount= (uint32_t) (param->keys/((uint32_t) (Tb-Fb) +1));
01251   to_start_filepos= my_b_tell(to_file);
01252   strpos= (unsigned char*) sort_buffer;
01253   org_max_rows=max_rows= param->max_rows;
01254 
01255   /* The following will fire if there is not enough space in sort_buffer */
01256   assert(maxcount!=0);
01257 
01258   if (param->unique_buff)
01259   {
01260     cmp= param->compare;
01261     first_cmp_arg= (void *) &param->cmp_context;
01262   }
01263   else
01264   {
01265     cmp= internal::get_ptr_compare(sort_length);
01266     first_cmp_arg= (void*) &sort_length;
01267   }
01268   priority_queue<buffpek *, vector<buffpek *>, compare_functor >
01269     queue(compare_functor(cmp, first_cmp_arg));
01270   for (buffpek_inst= Fb ; buffpek_inst <= Tb ; buffpek_inst++)
01271   {
01272     buffpek_inst->base= strpos;
01273     buffpek_inst->max_keys= maxcount;
01274     strpos+= (uint32_t) (error= (int) read_to_buffer(from_file, buffpek_inst,
01275                                                                          rec_length));
01276     if (error == -1)
01277       return -1;
01278 
01279     buffpek_inst->max_keys= buffpek_inst->mem_count;  // If less data in buffers than expected
01280     queue.push(buffpek_inst);
01281   }
01282 
01283   if (param->unique_buff)
01284   {
01285     /*
01286        Called by Unique::get()
01287        Copy the first argument to param->unique_buff for unique removal.
01288        Store it also in 'to_file'.
01289 
01290        This is safe as we know that there is always more than one element
01291        in each block to merge (This is guaranteed by the Unique:: algorithm
01292     */
01293     buffpek_inst= queue.top();
01294     memcpy(param->unique_buff, buffpek_inst->key, rec_length);
01295     if (my_b_write(to_file, (unsigned char*) buffpek_inst->key, rec_length))
01296     {
01297       return 1;
01298     }
01299     buffpek_inst->key+= rec_length;
01300     buffpek_inst->mem_count--;
01301     if (!--max_rows)
01302     {
01303       error= 0;
01304       goto end;
01305     }
01306     /* Top element has been used */
01307     queue.pop();
01308     queue.push(buffpek_inst);
01309   }
01310   else
01311   {
01312     cmp= 0;                                        // Not unique
01313   }
01314 
01315   while (queue.size() > 1)
01316   {
01317     if (*killed)
01318     {
01319       return 1;
01320     }
01321     for (;;)
01322     {
01323       buffpek_inst= queue.top();
01324       if (cmp)                                        // Remove duplicates
01325       {
01326         if (!(*cmp)(first_cmp_arg, &(param->unique_buff),
01327                     (unsigned char**) &buffpek_inst->key))
01328               goto skip_duplicate;
01329             memcpy(param->unique_buff, buffpek_inst->key, rec_length);
01330       }
01331       if (flag == 0)
01332       {
01333         if (my_b_write(to_file,(unsigned char*) buffpek_inst->key, rec_length))
01334         {
01335           return 1;
01336         }
01337       }
01338       else
01339       {
01340         if (my_b_write(to_file, (unsigned char*) buffpek_inst->key+offset, res_length))
01341         {
01342           return 1;
01343         }
01344       }
01345       if (!--max_rows)
01346       {
01347         error= 0;
01348         goto end;
01349       }
01350 
01351     skip_duplicate:
01352       buffpek_inst->key+= rec_length;
01353       if (! --buffpek_inst->mem_count)
01354       {
01355         if (!(error= (int) read_to_buffer(from_file,buffpek_inst,
01356                                           rec_length)))
01357         {
01358           queue.pop();
01359           break;                        /* One buffer have been removed */
01360         }
01361         else if (error == -1)
01362         {
01363           return -1;
01364         }
01365       }
01366       /* Top element has been replaced */
01367       queue.pop();
01368       queue.push(buffpek_inst);
01369     }
01370   }
01371   buffpek_inst= queue.top();
01372   buffpek_inst->base= sort_buffer;
01373   buffpek_inst->max_keys= param->keys;
01374 
01375   /*
01376     As we know all entries in the buffer are unique, we only have to
01377     check if the first one is the same as the last one we wrote
01378   */
01379   if (cmp)
01380   {
01381     if (!(*cmp)(first_cmp_arg, &(param->unique_buff), (unsigned char**) &buffpek_inst->key))
01382     {
01383       buffpek_inst->key+= rec_length;         // Remove duplicate
01384       --buffpek_inst->mem_count;
01385     }
01386   }
01387 
01388   do
01389   {
01390     if ((ha_rows) buffpek_inst->mem_count > max_rows)
01391     {                                        /* Don't write too many records */
01392       buffpek_inst->mem_count= (uint32_t) max_rows;
01393       buffpek_inst->count= 0;                        /* Don't read more */
01394     }
01395     max_rows-= buffpek_inst->mem_count;
01396     if (flag == 0)
01397     {
01398       if (my_b_write(to_file,(unsigned char*) buffpek_inst->key,
01399                      (rec_length*buffpek_inst->mem_count)))
01400       {
01401         return 1;
01402       }
01403     }
01404     else
01405     {
01406       unsigned char *end;
01407       strpos= buffpek_inst->key+offset;
01408       for (end= strpos+buffpek_inst->mem_count*rec_length ;
01409            strpos != end ;
01410            strpos+= rec_length)
01411       {
01412         if (my_b_write(to_file, (unsigned char *) strpos, res_length))
01413         {
01414           return 1;
01415         }
01416       }
01417     }
01418   }
01419 
01420   while ((error=(int) read_to_buffer(from_file,buffpek_inst, rec_length))
01421          != -1 && error != 0);
01422 
01423 end:
01424   lastbuff->count= min(org_max_rows-max_rows, param->max_rows);
01425   lastbuff->file_pos= to_start_filepos;
01426 
01427   return error;
01428 } /* merge_buffers */
01429 
01430 
01431   /* Do a merge to output-file (save only positions) */
01432 
01433 int FileSort::merge_index(SortParam *param, unsigned char *sort_buffer,
01434                           buffpek *buffpek_inst, uint32_t maxbuffer,
01435                           internal::IO_CACHE *tempfile, internal::IO_CACHE *outfile)
01436 {
01437   if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek_inst,buffpek_inst,
01438         buffpek_inst+maxbuffer,1))
01439     return 1;
01440 
01441   return 0;
01442 } /* merge_index */
01443 
01444 
01445 static uint32_t suffix_length(uint32_t string_length)
01446 {
01447   if (string_length < 256)
01448     return 1;
01449   if (string_length < 256L*256L)
01450     return 2;
01451   if (string_length < 256L*256L*256L)
01452     return 3;
01453   return 4;                                     // Can't sort longer than 4G
01454 }
01455 
01456 
01457 
01475 uint32_t FileSort::sortlength(SortField *sortorder, uint32_t s_length, bool *multi_byte_charset)
01476 {
01477   uint32_t length;
01478   const CHARSET_INFO *cs;
01479   *multi_byte_charset= 0;
01480 
01481   length=0;
01482   for (; s_length-- ; sortorder++)
01483   {
01484     sortorder->need_strxnfrm= 0;
01485     sortorder->suffix_length= 0;
01486     if (sortorder->field)
01487     {
01488       cs= sortorder->field->sort_charset();
01489       sortorder->length= sortorder->field->sort_length();
01490 
01491       if (use_strnxfrm((cs=sortorder->field->sort_charset())))
01492       {
01493         sortorder->need_strxnfrm= 1;
01494         *multi_byte_charset= 1;
01495         sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
01496       }
01497       if (sortorder->field->maybe_null())
01498   length++;       // Place for NULL marker
01499     }
01500     else
01501     {
01502       sortorder->result_type= sortorder->item->result_type();
01503       if (sortorder->item->result_as_int64_t())
01504         sortorder->result_type= INT_RESULT;
01505 
01506       switch (sortorder->result_type) {
01507       case STRING_RESULT:
01508         sortorder->length=sortorder->item->max_length;
01509         set_if_smaller(sortorder->length,
01510                        getSession().variables.max_sort_length);
01511         if (use_strnxfrm((cs=sortorder->item->collation.collation)))
01512         {
01513           sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
01514           sortorder->need_strxnfrm= 1;
01515           *multi_byte_charset= 1;
01516         }
01517         else if (cs == &my_charset_bin)
01518         {
01519           /* Store length last to be able to sort blob/varbinary */
01520           sortorder->suffix_length= suffix_length(sortorder->length);
01521           sortorder->length+= sortorder->suffix_length;
01522         }
01523         break;
01524       case INT_RESULT:
01525         sortorder->length=8;      // Size of intern int64_t
01526         break;
01527       case DECIMAL_RESULT:
01528         sortorder->length=
01529           class_decimal_get_binary_size(sortorder->item->max_length -
01530                                      (sortorder->item->decimals ? 1 : 0),
01531                                      sortorder->item->decimals);
01532         break;
01533       case REAL_RESULT:
01534         sortorder->length=sizeof(double);
01535         break;
01536       case ROW_RESULT:
01537         // This case should never be choosen
01538         assert(0);
01539         break;
01540       }
01541       if (sortorder->item->maybe_null)
01542         length++;       // Place for NULL marker
01543     }
01544     set_if_smaller(sortorder->length, (size_t)getSession().variables.max_sort_length);
01545     length+=sortorder->length;
01546   }
01547   sortorder->field= (Field*) 0;     // end marker
01548   return length;
01549 }
01550 
01551 
01578 sort_addon_field *FileSort::get_addon_fields(Field **ptabfield, uint32_t sortlength_arg, uint32_t *plength)
01579 {
01580   Field **pfield;
01581   Field *field;
01582   sort_addon_field *addonf;
01583   uint32_t length= 0;
01584   uint32_t fields= 0;
01585   uint32_t null_fields= 0;
01586 
01587   /*
01588     If there is a reference to a field in the query add it
01589     to the the set of appended fields.
01590     Note for future refinement:
01591     This this a too strong condition.
01592     Actually we need only the fields referred in the
01593     result set. And for some of them it makes sense to use
01594     the values directly from sorted fields.
01595   */
01596   *plength= 0;
01597 
01598   for (pfield= ptabfield; (field= *pfield) ; pfield++)
01599   {
01600     if (!(field->isReadSet()))
01601       continue;
01602     if (field->flags & BLOB_FLAG)
01603       return 0;
01604     length+= field->max_packed_col_length(field->pack_length());
01605     if (field->maybe_null())
01606       null_fields++;
01607     fields++;
01608   }
01609   if (!fields)
01610     return 0;
01611   length+= (null_fields+7)/8;
01612 
01613   if (length+sortlength_arg > getSession().variables.max_length_for_sort_data ||
01614       !(addonf= (sort_addon_field *) malloc(sizeof(sort_addon_field)*
01615                                             (fields+1))))
01616     return 0;
01617 
01618   *plength= length;
01619   length= (null_fields+7)/8;
01620   null_fields= 0;
01621   for (pfield= ptabfield; (field= *pfield) ; pfield++)
01622   {
01623     if (!(field->isReadSet()))
01624       continue;
01625     addonf->field= field;
01626     addonf->offset= length;
01627     if (field->maybe_null())
01628     {
01629       addonf->null_offset= null_fields/8;
01630       addonf->null_bit= 1<<(null_fields & 7);
01631       null_fields++;
01632     }
01633     else
01634     {
01635       addonf->null_offset= 0;
01636       addonf->null_bit= 0;
01637     }
01638     addonf->length= field->max_packed_col_length(field->pack_length());
01639     length+= addonf->length;
01640     addonf++;
01641   }
01642   addonf->field= 0;     // Put end marker
01643 
01644   return (addonf-fields);
01645 }
01646 
01647 
01663 static void
01664 unpack_addon_fields(sort_addon_field *addon_field, unsigned char *buff)
01665 {
01666   Field *field;
01667   sort_addon_field *addonf= addon_field;
01668 
01669   for ( ; (field= addonf->field) ; addonf++)
01670   {
01671     if (addonf->null_bit && (addonf->null_bit & buff[addonf->null_offset]))
01672     {
01673       field->set_null();
01674       continue;
01675     }
01676     field->set_notnull();
01677     field->unpack(field->ptr, buff + addonf->offset);
01678   }
01679 }
01680 
01681 /*
01682 ** functions to change a double or float to a sortable string
01683 ** The following should work for IEEE
01684 */
01685 
01686 #define DBL_EXP_DIG (sizeof(double)*8-DBL_MANT_DIG)
01687 
01688 void change_double_for_sort(double nr,unsigned char *to)
01689 {
01690   unsigned char *tmp=(unsigned char*) to;
01691   if (nr == 0.0)
01692   {           /* Change to zero string */
01693     tmp[0]=(unsigned char) 128;
01694     memset(tmp+1, 0, sizeof(nr)-1);
01695   }
01696   else
01697   {
01698 #ifdef WORDS_BIGENDIAN
01699     memcpy(tmp,&nr,sizeof(nr));
01700 #else
01701     {
01702       unsigned char *ptr= (unsigned char*) &nr;
01703 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
01704       tmp[0]= ptr[3]; tmp[1]=ptr[2]; tmp[2]= ptr[1]; tmp[3]=ptr[0];
01705       tmp[4]= ptr[7]; tmp[5]=ptr[6]; tmp[6]= ptr[5]; tmp[7]=ptr[4];
01706 #else
01707       tmp[0]= ptr[7]; tmp[1]=ptr[6]; tmp[2]= ptr[5]; tmp[3]=ptr[4];
01708       tmp[4]= ptr[3]; tmp[5]=ptr[2]; tmp[6]= ptr[1]; tmp[7]=ptr[0];
01709 #endif
01710     }
01711 #endif
01712     if (tmp[0] & 128)       /* Negative */
01713     {           /* make complement */
01714       uint32_t i;
01715       for (i=0 ; i < sizeof(nr); i++)
01716   tmp[i]=tmp[i] ^ (unsigned char) 255;
01717     }
01718     else
01719     {         /* Set high and move exponent one up */
01720       uint16_t exp_part=(((uint16_t) tmp[0] << 8) | (uint16_t) tmp[1] |
01721            (uint16_t) 32768);
01722       exp_part+= (uint16_t) 1 << (16-1-DBL_EXP_DIG);
01723       tmp[0]= (unsigned char) (exp_part >> 8);
01724       tmp[1]= (unsigned char) exp_part;
01725     }
01726   }
01727 }
01728 
01729 } /* namespace drizzled */