00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00024 #include <config.h>
00025
00026 #include <float.h>
00027 #include <limits.h>
00028
00029 #include <queue>
00030 #include <algorithm>
00031 #include <iostream>
00032
00033 #include <drizzled/drizzled.h>
00034 #include <drizzled/sql_sort.h>
00035 #include <drizzled/filesort.h>
00036 #include <drizzled/error.h>
00037 #include <drizzled/probes.h>
00038 #include <drizzled/session.h>
00039 #include <drizzled/table.h>
00040 #include <drizzled/table_list.h>
00041 #include <drizzled/optimizer/range.h>
00042 #include <drizzled/records.h>
00043 #include <drizzled/internal/iocache.h>
00044 #include <drizzled/internal/my_sys.h>
00045 #include <plugin/myisam/myisam.h>
00046 #include <drizzled/plugin/transactional_storage_engine.h>
00047 #include <drizzled/atomics.h>
00048 #include <drizzled/global_buffer.h>
00049 #include <drizzled/sort_field.h>
00050 #include <drizzled/item/subselect.h>
00051
00052 using namespace std;
00053
00054 namespace drizzled {
00055
00056
00057 #define MERGEBUFF 7
00058 #define MERGEBUFF2 15
00059
00060 class BufferCompareContext
00061 {
00062 public:
00063 qsort_cmp2 key_compare;
00064 void *key_compare_arg;
00065
00066 BufferCompareContext() :
00067 key_compare(0),
00068 key_compare_arg(0)
00069 { }
00070
00071 };
00072
00073 class SortParam {
00074 public:
00075 uint32_t rec_length;
00076 uint32_t sort_length;
00077 uint32_t ref_length;
00078 uint32_t addon_length;
00079 uint32_t res_length;
00080 uint32_t keys;
00081 ha_rows max_rows,examined_rows;
00082 Table *sort_form;
00083 SortField *local_sortorder;
00084 SortField *end;
00085 sort_addon_field *addon_field;
00086 unsigned char *unique_buff;
00087 bool not_killable;
00088 char *tmp_buffer;
00089
00090 qsort2_cmp compare;
00091 BufferCompareContext cmp_context;
00092
00093 SortParam() :
00094 rec_length(0),
00095 sort_length(0),
00096 ref_length(0),
00097 addon_length(0),
00098 res_length(0),
00099 keys(0),
00100 max_rows(0),
00101 examined_rows(0),
00102 sort_form(0),
00103 local_sortorder(0),
00104 end(0),
00105 addon_field(0),
00106 unique_buff(0),
00107 not_killable(0),
00108 tmp_buffer(0),
00109 compare(0)
00110 {
00111 }
00112
00113 ~SortParam()
00114 {
00115 if (tmp_buffer)
00116 free(tmp_buffer);
00117 }
00118
00119 int write_keys(unsigned char * *sort_keys,
00120 uint32_t count,
00121 internal::IO_CACHE *buffer_file,
00122 internal::IO_CACHE *tempfile);
00123
00124 void make_sortkey(unsigned char *to,
00125 unsigned char *ref_pos);
00126 void register_used_fields();
00127 bool save_index(unsigned char **sort_keys,
00128 uint32_t count,
00129 filesort_info *table_sort);
00130
00131 };
00132
00133
00134
00135 static char **make_char_array(char **old_pos, uint32_t fields,
00136 uint32_t length);
00137
00138 static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffer_file,
00139 uint32_t count,
00140 unsigned char *buf);
00141
00142 static uint32_t suffix_length(uint32_t string_length);
00143 static void unpack_addon_fields(sort_addon_field *addon_field,
00144 unsigned char *buff);
00145
00146 FileSort::FileSort(Session &arg) :
00147 _session(arg)
00148 {
00149 }
00150
00186 ha_rows FileSort::run(Table *table, SortField *sortorder, uint32_t s_length,
00187 optimizer::SqlSelect *select, ha_rows max_rows,
00188 bool sort_positions, ha_rows &examined_rows)
00189 {
00190 int error= 1;
00191 uint32_t memavl= 0, min_sort_memory;
00192 uint32_t maxbuffer;
00193 size_t allocated_sort_memory= 0;
00194 buffpek *buffpek_inst= 0;
00195 ha_rows records= HA_POS_ERROR;
00196 unsigned char **sort_keys= 0;
00197 internal::IO_CACHE tempfile;
00198 internal::IO_CACHE buffpek_pointers;
00199 internal::IO_CACHE *selected_records_file;
00200 internal::IO_CACHE *outfile;
00201 SortParam param;
00202 bool multi_byte_charset;
00203
00204
00205
00206
00207
00208
00209 filesort_info table_sort(table->sort);
00210 table->sort.io_cache= NULL;
00211
00212 TableList *tab= table->pos_in_table_list;
00213 Item_subselect *subselect= tab ? tab->containing_subselect() : 0;
00214
00215 DRIZZLE_FILESORT_START(table->getShare()->getSchemaName(), table->getShare()->getTableName());
00216
00217
00218
00219
00220
00221 plugin::TransactionalStorageEngine::releaseTemporaryLatches(&getSession());
00222
00223
00224 outfile= table_sort.io_cache;
00225 assert(tempfile.buffer == 0);
00226 assert(buffpek_pointers.buffer == 0);
00227
00228 param.sort_length= sortlength(sortorder, s_length, &multi_byte_charset);
00229 param.ref_length= table->cursor->ref_length;
00230
00231 if (!(table->cursor->getEngine()->check_flag(HTON_BIT_FAST_KEY_READ)) && !sort_positions)
00232 {
00233
00234
00235
00236
00237 param.addon_field= get_addon_fields(table->getFields(),
00238 param.sort_length,
00239 ¶m.addon_length);
00240 }
00241
00242 table_sort.addon_buf= 0;
00243 table_sort.addon_length= param.addon_length;
00244 table_sort.addon_field= param.addon_field;
00245 table_sort.unpack= unpack_addon_fields;
00246 if (param.addon_field)
00247 {
00248 param.res_length= param.addon_length;
00249 if (!(table_sort.addon_buf= (unsigned char *) malloc(param.addon_length)))
00250 {
00251 goto err;
00252 }
00253 }
00254 else
00255 {
00256 param.res_length= param.ref_length;
00257
00258
00259
00260
00261 param.sort_length+= param.ref_length;
00262 }
00263 param.rec_length= param.sort_length+param.addon_length;
00264 param.max_rows= max_rows;
00265
00266 if (select && select->quick)
00267 {
00268 getSession().status_var.filesort_range_count++;
00269 }
00270 else
00271 {
00272 getSession().status_var.filesort_scan_count++;
00273 }
00274 #ifdef CAN_TRUST_RANGE
00275 if (select && select->quick && select->quick->records > 0L)
00276 {
00277 records= min((ha_rows) (select->quick->records*2+EXTRA_RECORDS*2),
00278 table->cursor->stats.records)+EXTRA_RECORDS;
00279 selected_records_file=0;
00280 }
00281 else
00282 #endif
00283 {
00284 records= table->cursor->estimate_rows_upper_bound();
00285
00286
00287
00288
00289 if (records == HA_POS_ERROR)
00290 records--;
00291 selected_records_file= 0;
00292 }
00293
00294 if (multi_byte_charset && !(param.tmp_buffer= (char*) malloc(param.sort_length)))
00295 {
00296 goto err;
00297 }
00298
00299 memavl= getSession().variables.sortbuff_size;
00300 min_sort_memory= max((uint32_t)MIN_SORT_MEMORY, param.sort_length*MERGEBUFF2);
00301 while (memavl >= min_sort_memory)
00302 {
00303 uint32_t old_memavl;
00304 uint32_t keys= memavl/(param.rec_length+sizeof(char*));
00305 param.keys= (uint32_t) min(records+1, (ha_rows)keys);
00306
00307 allocated_sort_memory= param.keys * param.rec_length;
00308 if (not global_sort_buffer.add(allocated_sort_memory))
00309 {
00310 my_error(ER_OUT_OF_GLOBAL_SORTMEMORY, MYF(ME_ERROR+ME_WAITTANG));
00311 goto err;
00312 }
00313
00314 if ((table_sort.sort_keys=
00315 (unsigned char **) make_char_array((char **) table_sort.sort_keys,
00316 param.keys, param.rec_length)))
00317 break;
00318
00319 global_sort_buffer.sub(allocated_sort_memory);
00320 old_memavl= memavl;
00321 if ((memavl= memavl/4*3) < min_sort_memory && old_memavl > min_sort_memory)
00322 memavl= min_sort_memory;
00323 }
00324 sort_keys= table_sort.sort_keys;
00325 if (memavl < min_sort_memory)
00326 {
00327 my_error(ER_OUT_OF_SORTMEMORY,MYF(ME_ERROR+ME_WAITTANG));
00328 goto err;
00329 }
00330
00331 if (buffpek_pointers.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
00332 {
00333 goto err;
00334 }
00335
00336 param.keys--;
00337 param.sort_form= table;
00338 param.end=(param.local_sortorder=sortorder)+s_length;
00339 if ((records= find_all_keys(¶m,select,sort_keys, &buffpek_pointers,
00340 &tempfile, selected_records_file)) == HA_POS_ERROR)
00341 {
00342 goto err;
00343 }
00344 maxbuffer= (uint32_t) (my_b_tell(&buffpek_pointers)/sizeof(*buffpek_inst));
00345
00346 if (maxbuffer == 0)
00347 {
00348 if (param.save_index(sort_keys,(uint32_t) records, &table_sort))
00349 {
00350 goto err;
00351 }
00352 }
00353 else
00354 {
00355 if (table_sort.buffpek && table_sort.buffpek_len < maxbuffer)
00356 {
00357 if (table_sort.buffpek)
00358 free(table_sort.buffpek);
00359 table_sort.buffpek = 0;
00360 }
00361 if (!(table_sort.buffpek=
00362 (unsigned char *) read_buffpek_from_file(&buffpek_pointers, maxbuffer, table_sort.buffpek)))
00363 {
00364 goto err;
00365 }
00366 buffpek_inst= (buffpek *) table_sort.buffpek;
00367 table_sort.buffpek_len= maxbuffer;
00368 buffpek_pointers.close_cached_file();
00369
00370 if (! my_b_inited(outfile) && outfile->open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX,READ_RECORD_BUFFER, MYF(MY_WME)))
00371 {
00372 goto err;
00373 }
00374
00375 if (outfile->reinit_io_cache(internal::WRITE_CACHE,0L,0,0))
00376 {
00377 goto err;
00378 }
00379
00380
00381
00382
00383
00384 param.keys=((param.keys*(param.rec_length+sizeof(char*))) / param.rec_length-1);
00385
00386 maxbuffer--;
00387 if (merge_many_buff(¶m,(unsigned char*) sort_keys,buffpek_inst,&maxbuffer, &tempfile))
00388 {
00389 goto err;
00390 }
00391
00392 if (flush_io_cache(&tempfile) || tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00393 {
00394 goto err;
00395 }
00396
00397 if (merge_index(¶m,(unsigned char*) sort_keys,buffpek_inst,maxbuffer,&tempfile, outfile))
00398 {
00399 goto err;
00400 }
00401 }
00402
00403 if (records > param.max_rows)
00404 {
00405 records= param.max_rows;
00406 }
00407 error =0;
00408
00409 err:
00410 if (not subselect || not subselect->is_uncacheable())
00411 {
00412 free(sort_keys);
00413 table_sort.sort_keys= 0;
00414 free(buffpek_inst);
00415 table_sort.buffpek= 0;
00416 table_sort.buffpek_len= 0;
00417 }
00418
00419 tempfile.close_cached_file();
00420 buffpek_pointers.close_cached_file();
00421
00422 if (my_b_inited(outfile))
00423 {
00424 if (flush_io_cache(outfile))
00425 {
00426 error=1;
00427 }
00428 {
00429 internal::my_off_t save_pos= outfile->pos_in_file;
00430
00431 if (outfile->reinit_io_cache(internal::READ_CACHE,0L,0,0))
00432 {
00433 error=1;
00434 }
00435 outfile->end_of_file=save_pos;
00436 }
00437 }
00438
00439 if (error)
00440 {
00441 my_message(ER_FILSORT_ABORT, ER(ER_FILSORT_ABORT),
00442 MYF(ME_ERROR+ME_WAITTANG));
00443 }
00444 else
00445 {
00446 getSession().status_var.filesort_rows+= (uint32_t) records;
00447 }
00448 examined_rows= param.examined_rows;
00449 global_sort_buffer.sub(allocated_sort_memory);
00450 table->sort= table_sort;
00451 DRIZZLE_FILESORT_DONE(error, records);
00452 return (error ? HA_POS_ERROR : records);
00453 }
00454
00457 static char **make_char_array(char **old_pos, uint32_t fields,
00458 uint32_t length)
00459 {
00460 char **pos;
00461 char *char_pos;
00462
00463 if (old_pos ||
00464 (old_pos= (char**) malloc((uint32_t) fields*(length+sizeof(char*)))))
00465 {
00466 pos=old_pos; char_pos=((char*) (pos+fields)) -length;
00467 while (fields--) *(pos++) = (char_pos+= length);
00468 }
00469
00470 return(old_pos);
00471 }
00472
00473
00476 static unsigned char *read_buffpek_from_file(internal::IO_CACHE *buffpek_pointers, uint32_t count,
00477 unsigned char *buf)
00478 {
00479 uint32_t length= sizeof(buffpek)*count;
00480 unsigned char *tmp= buf;
00481 if (count > UINT_MAX/sizeof(buffpek))
00482 return 0;
00483 if (!tmp)
00484 tmp= (unsigned char *)malloc(length);
00485 if (tmp)
00486 {
00487 if (buffpek_pointers->reinit_io_cache(internal::READ_CACHE,0L,0,0) ||
00488 my_b_read(buffpek_pointers, (unsigned char*) tmp, length))
00489 {
00490 free((char*) tmp);
00491 tmp=0;
00492 }
00493 }
00494 return(tmp);
00495 }
00496
00497
00535 ha_rows FileSort::find_all_keys(SortParam *param,
00536 optimizer::SqlSelect *select,
00537 unsigned char **sort_keys,
00538 internal::IO_CACHE *buffpek_pointers,
00539 internal::IO_CACHE *tempfile, internal::IO_CACHE *indexfile)
00540 {
00541 int error,flag,quick_select;
00542 uint32_t idx,indexpos,ref_length;
00543 unsigned char *ref_pos,*next_pos,ref_buff[MAX_REFLENGTH];
00544 internal::my_off_t record;
00545 Table *sort_form;
00546 volatile Session::killed_state_t *killed= getSession().getKilledPtr();
00547 Cursor *file;
00548 boost::dynamic_bitset<> *save_read_set= NULL;
00549 boost::dynamic_bitset<> *save_write_set= NULL;
00550
00551 idx=indexpos=0;
00552 error=quick_select=0;
00553 sort_form=param->sort_form;
00554 file= sort_form->cursor;
00555 ref_length=param->ref_length;
00556 ref_pos= ref_buff;
00557 quick_select=select && select->quick;
00558 record=0;
00559 flag= ((!indexfile && ! file->isOrdered())
00560 || quick_select);
00561 if (indexfile || flag)
00562 ref_pos= &file->ref[0];
00563 next_pos=ref_pos;
00564 if (! indexfile && ! quick_select)
00565 {
00566 next_pos=(unsigned char*) 0;
00567 if (file->startTableScan(1))
00568 return(HA_POS_ERROR);
00569 file->extra_opt(HA_EXTRA_CACHE, getSession().variables.read_buff_size);
00570 }
00571
00572 ReadRecord read_record_info;
00573 if (quick_select)
00574 {
00575 if (select->quick->reset())
00576 return(HA_POS_ERROR);
00577
00578 if (read_record_info.init_read_record(&getSession(), select->quick->head, select, 1, 1))
00579 return(HA_POS_ERROR);
00580 }
00581
00582
00583 save_read_set= sort_form->read_set;
00584 save_write_set= sort_form->write_set;
00585
00586 sort_form->tmp_set.reset();
00587
00588 sort_form->read_set= &sort_form->tmp_set;
00589 param->register_used_fields();
00590 if (select && select->cond)
00591 select->cond->walk(&Item::register_field_in_read_map, 1,
00592 (unsigned char*) sort_form);
00593 sort_form->column_bitmaps_set(sort_form->tmp_set, sort_form->tmp_set);
00594
00595 for (;;)
00596 {
00597 if (quick_select)
00598 {
00599 if ((error= read_record_info.read_record(&read_record_info)))
00600 {
00601 error= HA_ERR_END_OF_FILE;
00602 break;
00603 }
00604 file->position(sort_form->getInsertRecord());
00605 }
00606 else
00607 {
00608 if (indexfile)
00609 {
00610 if (my_b_read(indexfile,(unsigned char*) ref_pos,ref_length))
00611 {
00612 error= errno ? errno : -1;
00613 break;
00614 }
00615 error=file->rnd_pos(sort_form->getInsertRecord(),next_pos);
00616 }
00617 else
00618 {
00619 error=file->rnd_next(sort_form->getInsertRecord());
00620
00621 if (!flag)
00622 {
00623 internal::my_store_ptr(ref_pos,ref_length,record);
00624 record+= sort_form->getShare()->db_record_offset;
00625 }
00626 else if (!error)
00627 file->position(sort_form->getInsertRecord());
00628 }
00629 if (error && error != HA_ERR_RECORD_DELETED)
00630 break;
00631 }
00632
00633 if (*killed)
00634 {
00635 if (!indexfile && !quick_select)
00636 {
00637 (void) file->extra(HA_EXTRA_NO_CACHE);
00638 file->endTableScan();
00639 }
00640 return(HA_POS_ERROR);
00641 }
00642 if (error == 0)
00643 param->examined_rows++;
00644 if (error == 0 && (!select || select->skip_record() == 0))
00645 {
00646 if (idx == param->keys)
00647 {
00648 if (param->write_keys(sort_keys, idx, buffpek_pointers, tempfile))
00649 return(HA_POS_ERROR);
00650 idx=0;
00651 indexpos++;
00652 }
00653 param->make_sortkey(sort_keys[idx++], ref_pos);
00654 }
00655 else
00656 {
00657 file->unlock_row();
00658 }
00659
00660
00661 if (getSession().is_error())
00662 break;
00663 }
00664 if (quick_select)
00665 {
00666
00667
00668
00669
00670 read_record_info.end_read_record();
00671 }
00672 else
00673 {
00674 (void) file->extra(HA_EXTRA_NO_CACHE);
00675 if (!next_pos)
00676 file->endTableScan();
00677 }
00678
00679 if (getSession().is_error())
00680 return(HA_POS_ERROR);
00681
00682
00683 sort_form->column_bitmaps_set(*save_read_set, *save_write_set);
00684
00685 if (error != HA_ERR_END_OF_FILE)
00686 {
00687 sort_form->print_error(error,MYF(ME_ERROR | ME_WAITTANG));
00688 return(HA_POS_ERROR);
00689 }
00690
00691 if (indexpos && idx && param->write_keys(sort_keys,idx,buffpek_pointers,tempfile))
00692 {
00693 return(HA_POS_ERROR);
00694 }
00695
00696 return(my_b_inited(tempfile) ?
00697 (ha_rows) (my_b_tell(tempfile)/param->rec_length) :
00698 idx);
00699 }
00700
00701
00724 int SortParam::write_keys(unsigned char **sort_keys, uint32_t count,
00725 internal::IO_CACHE *buffpek_pointers, internal::IO_CACHE *tempfile)
00726 {
00727 buffpek buffpek;
00728
00729 internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
00730 if (!my_b_inited(tempfile) &&
00731 tempfile->open_cached_file(drizzle_tmpdir.c_str(), TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
00732 {
00733 return 1;
00734 }
00735
00736 if (my_b_tell(buffpek_pointers) + sizeof(buffpek) > (uint64_t)UINT_MAX)
00737 {
00738 return 1;
00739 }
00740
00741 buffpek.file_pos= my_b_tell(tempfile);
00742 if ((ha_rows) count > max_rows)
00743 count=(uint32_t) max_rows;
00744
00745 buffpek.count=(ha_rows) count;
00746
00747 for (unsigned char **ptr= sort_keys + count ; sort_keys != ptr ; sort_keys++)
00748 {
00749 if (my_b_write(tempfile, (unsigned char*) *sort_keys, (uint32_t) rec_length))
00750 {
00751 return 1;
00752 }
00753 }
00754
00755 if (my_b_write(buffpek_pointers, (unsigned char*) &buffpek, sizeof(buffpek)))
00756 {
00757 return 1;
00758 }
00759
00760 return 0;
00761 }
00762
00763
00768 static inline void store_length(unsigned char *to, uint32_t length, uint32_t pack_length)
00769 {
00770 switch (pack_length) {
00771 case 1:
00772 *to= (unsigned char) length;
00773 break;
00774 case 2:
00775 mi_int2store(to, length);
00776 break;
00777 case 3:
00778 mi_int3store(to, length);
00779 break;
00780 default:
00781 mi_int4store(to, length);
00782 break;
00783 }
00784 }
00785
00786
00789 void SortParam::make_sortkey(unsigned char *to, unsigned char *ref_pos)
00790 {
00791 Field *field;
00792 SortField *sort_field;
00793 size_t length;
00794
00795 for (sort_field= local_sortorder ;
00796 sort_field != end ;
00797 sort_field++)
00798 {
00799 bool maybe_null=0;
00800 if ((field=sort_field->field))
00801 {
00802 if (field->maybe_null())
00803 {
00804 if (field->is_null())
00805 {
00806 if (sort_field->reverse)
00807 memset(to, 255, sort_field->length+1);
00808 else
00809 memset(to, 0, sort_field->length+1);
00810 to+= sort_field->length+1;
00811 continue;
00812 }
00813 else
00814 *to++=1;
00815 }
00816 field->sort_string(to, sort_field->length);
00817 }
00818 else
00819 {
00820 Item *item=sort_field->item;
00821 maybe_null= item->maybe_null;
00822
00823 switch (sort_field->result_type) {
00824 case STRING_RESULT:
00825 {
00826 const CHARSET_INFO * const cs=item->collation.collation;
00827 char fill_char= ((cs->state & MY_CS_BINSORT) ? (char) 0 : ' ');
00828 int diff;
00829 uint32_t sort_field_length;
00830
00831 if (maybe_null)
00832 *to++=1;
00833
00834 String tmp((char*) to,sort_field->length+4,cs);
00835 String *res= item->str_result(&tmp);
00836 if (!res)
00837 {
00838 if (maybe_null)
00839 memset(to-1, 0, sort_field->length+1);
00840 else
00841 {
00842
00843
00844
00845
00846
00847 assert(0);
00848 memset(to, 0, sort_field->length);
00849 }
00850 break;
00851 }
00852 length= res->length();
00853 sort_field_length= sort_field->length - sort_field->suffix_length;
00854 diff=(int) (sort_field_length - length);
00855 if (diff < 0)
00856 {
00857 diff=0;
00858 length= sort_field_length;
00859 }
00860 if (sort_field->suffix_length)
00861 {
00862
00863 store_length(to + sort_field_length, length,
00864 sort_field->suffix_length);
00865 }
00866 if (sort_field->need_strxnfrm)
00867 {
00868 char *from=(char*) res->ptr();
00869 uint32_t tmp_length;
00870 if ((unsigned char*) from == to)
00871 {
00872 set_if_smaller(length,sort_field->length);
00873 memcpy(tmp_buffer,from,length);
00874 from= tmp_buffer;
00875 }
00876 tmp_length= my_strnxfrm(cs,to,sort_field->length,
00877 (unsigned char*) from, length);
00878 assert(tmp_length == sort_field->length);
00879 }
00880 else
00881 {
00882 my_strnxfrm(cs,(unsigned char*)to,length,(const unsigned char*)res->ptr(),length);
00883 cs->cset->fill(cs, (char *)to+length,diff,fill_char);
00884 }
00885 break;
00886 }
00887 case INT_RESULT:
00888 {
00889 int64_t value= item->val_int_result();
00890 if (maybe_null)
00891 {
00892 *to++=1;
00893 if (item->null_value)
00894 {
00895 if (maybe_null)
00896 memset(to-1, 0, sort_field->length+1);
00897 else
00898 {
00899 memset(to, 0, sort_field->length);
00900 }
00901 break;
00902 }
00903 }
00904 to[7]= (unsigned char) value;
00905 to[6]= (unsigned char) (value >> 8);
00906 to[5]= (unsigned char) (value >> 16);
00907 to[4]= (unsigned char) (value >> 24);
00908 to[3]= (unsigned char) (value >> 32);
00909 to[2]= (unsigned char) (value >> 40);
00910 to[1]= (unsigned char) (value >> 48);
00911 if (item->unsigned_flag)
00912 to[0]= (unsigned char) (value >> 56);
00913 else
00914 to[0]= (unsigned char) (value >> 56) ^ 128;
00915 break;
00916 }
00917 case DECIMAL_RESULT:
00918 {
00919 type::Decimal dec_buf, *dec_val= item->val_decimal_result(&dec_buf);
00920 if (maybe_null)
00921 {
00922 if (item->null_value)
00923 {
00924 memset(to, 0, sort_field->length+1);
00925 to++;
00926 break;
00927 }
00928 *to++=1;
00929 }
00930 dec_val->val_binary(E_DEC_FATAL_ERROR, to,
00931 item->max_length - (item->decimals ? 1:0),
00932 item->decimals);
00933 break;
00934 }
00935 case REAL_RESULT:
00936 {
00937 double value= item->val_result();
00938 if (maybe_null)
00939 {
00940 if (item->null_value)
00941 {
00942 memset(to, 0, sort_field->length+1);
00943 to++;
00944 break;
00945 }
00946 *to++=1;
00947 }
00948 change_double_for_sort(value,(unsigned char*) to);
00949 break;
00950 }
00951 case ROW_RESULT:
00952 default:
00953
00954 assert(0);
00955 break;
00956 }
00957 }
00958
00959 if (sort_field->reverse)
00960 {
00961 if (maybe_null)
00962 to[-1]= ~to[-1];
00963 length=sort_field->length;
00964 while (length--)
00965 {
00966 *to = (unsigned char) (~ *to);
00967 to++;
00968 }
00969 }
00970 else
00971 {
00972 to+= sort_field->length;
00973 }
00974 }
00975
00976 if (addon_field)
00977 {
00978
00979
00980
00981
00982
00983
00984 sort_addon_field *addonf= addon_field;
00985 unsigned char *nulls= to;
00986 assert(addonf != 0);
00987 memset(nulls, 0, addonf->offset);
00988 to+= addonf->offset;
00989 for ( ; (field= addonf->field) ; addonf++)
00990 {
00991 if (addonf->null_bit && field->is_null())
00992 {
00993 nulls[addonf->null_offset]|= addonf->null_bit;
00994 #ifdef HAVE_VALGRIND
00995 memset(to, 0, addonf->length);
00996 #endif
00997 }
00998 else
00999 {
01000 #ifdef HAVE_VALGRIND
01001 unsigned char *end= field->pack(to, field->ptr);
01002 uint32_t local_length= (uint32_t) ((to + addonf->length) - end);
01003 assert((int) local_length >= 0);
01004 if (local_length)
01005 memset(end, 0, local_length);
01006 #else
01007 (void) field->pack(to, field->ptr);
01008 #endif
01009 }
01010 to+= addonf->length;
01011 }
01012 }
01013 else
01014 {
01015
01016 memcpy(to, ref_pos, (size_t) ref_length);
01017 }
01018 }
01019
01020
01021
01022
01023
01024
01025 void SortParam::register_used_fields()
01026 {
01027 SortField *sort_field;
01028 Table *table= sort_form;
01029
01030 for (sort_field= local_sortorder ;
01031 sort_field != end ;
01032 sort_field++)
01033 {
01034 Field *field;
01035 if ((field= sort_field->field))
01036 {
01037 if (field->getTable() == table)
01038 table->setReadSet(field->position());
01039 }
01040 else
01041 {
01042 sort_field->item->walk(&Item::register_field_in_read_map, 1,
01043 (unsigned char *) table);
01044 }
01045 }
01046
01047 if (addon_field)
01048 {
01049 sort_addon_field *addonf= addon_field;
01050 Field *field;
01051 for ( ; (field= addonf->field) ; addonf++)
01052 table->setReadSet(field->position());
01053 }
01054 else
01055 {
01056
01057 table->prepare_for_position();
01058 }
01059 }
01060
01061
01062 bool SortParam::save_index(unsigned char **sort_keys, uint32_t count, filesort_info *table_sort)
01063 {
01064 uint32_t offset;
01065 unsigned char *to;
01066
01067 internal::my_string_ptr_sort((unsigned char*) sort_keys, (uint32_t) count, sort_length);
01068 offset= rec_length - res_length;
01069
01070 if ((ha_rows) count > max_rows)
01071 count=(uint32_t) max_rows;
01072
01073 if (!(to= table_sort->record_pointers= (unsigned char*) malloc(res_length*count)))
01074 return true;
01075
01076 for (unsigned char **end_ptr= sort_keys+count ; sort_keys != end_ptr ; sort_keys++)
01077 {
01078 memcpy(to, *sort_keys+offset, res_length);
01079 to+= res_length;
01080 }
01081
01082 return false;
01083 }
01084
01085
01088 int FileSort::merge_many_buff(SortParam *param, unsigned char *sort_buffer,
01089 buffpek *buffpek_inst, uint32_t *maxbuffer, internal::IO_CACHE *t_file)
01090 {
01091 internal::IO_CACHE t_file2,*from_file,*to_file,*temp;
01092 buffpek *lastbuff;
01093
01094 if (*maxbuffer < MERGEBUFF2)
01095 return 0;
01096 if (flush_io_cache(t_file) ||
01097 t_file2.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX,DISK_BUFFER_SIZE, MYF(MY_WME)))
01098 {
01099 return 1;
01100 }
01101
01102 from_file= t_file ; to_file= &t_file2;
01103 while (*maxbuffer >= MERGEBUFF2)
01104 {
01105 uint32_t i;
01106
01107 if (from_file->reinit_io_cache(internal::READ_CACHE,0L,0,0))
01108 {
01109 break;
01110 }
01111
01112 if (to_file->reinit_io_cache(internal::WRITE_CACHE,0L,0,0))
01113 {
01114 break;
01115 }
01116
01117 lastbuff=buffpek_inst;
01118 for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
01119 {
01120 if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
01121 buffpek_inst+i,buffpek_inst+i+MERGEBUFF-1,0))
01122 {
01123 goto cleanup;
01124 }
01125 }
01126
01127 if (merge_buffers(param,from_file,to_file,sort_buffer,lastbuff++,
01128 buffpek_inst+i,buffpek_inst+ *maxbuffer,0))
01129 {
01130 break;
01131 }
01132
01133 if (flush_io_cache(to_file))
01134 {
01135 break;
01136 }
01137
01138 temp=from_file; from_file=to_file; to_file=temp;
01139 from_file->setup_io_cache();
01140 to_file->setup_io_cache();
01141 *maxbuffer= (uint32_t) (lastbuff-buffpek_inst)-1;
01142 }
01143
01144 cleanup:
01145 to_file->close_cached_file();
01146 if (to_file == t_file)
01147 {
01148 *t_file=t_file2;
01149 t_file->setup_io_cache();
01150 }
01151
01152 return(*maxbuffer >= MERGEBUFF2);
01153 }
01154
01155
01163 uint32_t FileSort::read_to_buffer(internal::IO_CACHE *fromfile, buffpek *buffpek_inst, uint32_t rec_length)
01164 {
01165 uint32_t count;
01166 uint32_t length;
01167
01168 if ((count= (uint32_t) min((ha_rows) buffpek_inst->max_keys,buffpek_inst->count)))
01169 {
01170 if (pread(fromfile->file,(unsigned char*) buffpek_inst->base, (length= rec_length*count),buffpek_inst->file_pos) == 0)
01171 return((uint32_t) -1);
01172
01173 buffpek_inst->key= buffpek_inst->base;
01174 buffpek_inst->file_pos+= length;
01175 buffpek_inst->count-= count;
01176 buffpek_inst->mem_count= count;
01177 }
01178 return (count*rec_length);
01179 }
01180
01181
01182 class compare_functor
01183 {
01184 qsort2_cmp key_compare;
01185 void *key_compare_arg;
01186
01187 public:
01188 compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg) :
01189 key_compare(in_key_compare),
01190 key_compare_arg(in_compare_arg)
01191 { }
01192
01193 inline bool operator()(const buffpek *i, const buffpek *j) const
01194 {
01195 int val= key_compare(key_compare_arg, &i->key, &j->key);
01196
01197 return (val >= 0);
01198 }
01199 };
01200
01201
01220 int FileSort::merge_buffers(SortParam *param, internal::IO_CACHE *from_file,
01221 internal::IO_CACHE *to_file, unsigned char *sort_buffer,
01222 buffpek *lastbuff, buffpek *Fb, buffpek *Tb,
01223 int flag)
01224 {
01225 int error;
01226 uint32_t rec_length,res_length,offset;
01227 size_t sort_length;
01228 uint32_t maxcount;
01229 ha_rows max_rows,org_max_rows;
01230 internal::my_off_t to_start_filepos;
01231 unsigned char *strpos;
01232 buffpek *buffpek_inst;
01233 qsort2_cmp cmp;
01234 void *first_cmp_arg;
01235 volatile Session::killed_state_t *killed= getSession().getKilledPtr();
01236 Session::killed_state_t not_killable;
01237
01238 getSession().status_var.filesort_merge_passes++;
01239 if (param->not_killable)
01240 {
01241 killed= ¬_killable;
01242 not_killable= Session::NOT_KILLED;
01243 }
01244
01245 error=0;
01246 rec_length= param->rec_length;
01247 res_length= param->res_length;
01248 sort_length= param->sort_length;
01249 offset= rec_length-res_length;
01250 maxcount= (uint32_t) (param->keys/((uint32_t) (Tb-Fb) +1));
01251 to_start_filepos= my_b_tell(to_file);
01252 strpos= (unsigned char*) sort_buffer;
01253 org_max_rows=max_rows= param->max_rows;
01254
01255
01256 assert(maxcount!=0);
01257
01258 if (param->unique_buff)
01259 {
01260 cmp= param->compare;
01261 first_cmp_arg= (void *) ¶m->cmp_context;
01262 }
01263 else
01264 {
01265 cmp= internal::get_ptr_compare(sort_length);
01266 first_cmp_arg= (void*) &sort_length;
01267 }
01268 priority_queue<buffpek *, vector<buffpek *>, compare_functor >
01269 queue(compare_functor(cmp, first_cmp_arg));
01270 for (buffpek_inst= Fb ; buffpek_inst <= Tb ; buffpek_inst++)
01271 {
01272 buffpek_inst->base= strpos;
01273 buffpek_inst->max_keys= maxcount;
01274 strpos+= (uint32_t) (error= (int) read_to_buffer(from_file, buffpek_inst,
01275 rec_length));
01276 if (error == -1)
01277 return -1;
01278
01279 buffpek_inst->max_keys= buffpek_inst->mem_count;
01280 queue.push(buffpek_inst);
01281 }
01282
01283 if (param->unique_buff)
01284 {
01285
01286
01287
01288
01289
01290
01291
01292
01293 buffpek_inst= queue.top();
01294 memcpy(param->unique_buff, buffpek_inst->key, rec_length);
01295 if (my_b_write(to_file, (unsigned char*) buffpek_inst->key, rec_length))
01296 {
01297 return 1;
01298 }
01299 buffpek_inst->key+= rec_length;
01300 buffpek_inst->mem_count--;
01301 if (!--max_rows)
01302 {
01303 error= 0;
01304 goto end;
01305 }
01306
01307 queue.pop();
01308 queue.push(buffpek_inst);
01309 }
01310 else
01311 {
01312 cmp= 0;
01313 }
01314
01315 while (queue.size() > 1)
01316 {
01317 if (*killed)
01318 {
01319 return 1;
01320 }
01321 for (;;)
01322 {
01323 buffpek_inst= queue.top();
01324 if (cmp)
01325 {
01326 if (!(*cmp)(first_cmp_arg, &(param->unique_buff),
01327 (unsigned char**) &buffpek_inst->key))
01328 goto skip_duplicate;
01329 memcpy(param->unique_buff, buffpek_inst->key, rec_length);
01330 }
01331 if (flag == 0)
01332 {
01333 if (my_b_write(to_file,(unsigned char*) buffpek_inst->key, rec_length))
01334 {
01335 return 1;
01336 }
01337 }
01338 else
01339 {
01340 if (my_b_write(to_file, (unsigned char*) buffpek_inst->key+offset, res_length))
01341 {
01342 return 1;
01343 }
01344 }
01345 if (!--max_rows)
01346 {
01347 error= 0;
01348 goto end;
01349 }
01350
01351 skip_duplicate:
01352 buffpek_inst->key+= rec_length;
01353 if (! --buffpek_inst->mem_count)
01354 {
01355 if (!(error= (int) read_to_buffer(from_file,buffpek_inst,
01356 rec_length)))
01357 {
01358 queue.pop();
01359 break;
01360 }
01361 else if (error == -1)
01362 {
01363 return -1;
01364 }
01365 }
01366
01367 queue.pop();
01368 queue.push(buffpek_inst);
01369 }
01370 }
01371 buffpek_inst= queue.top();
01372 buffpek_inst->base= sort_buffer;
01373 buffpek_inst->max_keys= param->keys;
01374
01375
01376
01377
01378
01379 if (cmp)
01380 {
01381 if (!(*cmp)(first_cmp_arg, &(param->unique_buff), (unsigned char**) &buffpek_inst->key))
01382 {
01383 buffpek_inst->key+= rec_length;
01384 --buffpek_inst->mem_count;
01385 }
01386 }
01387
01388 do
01389 {
01390 if ((ha_rows) buffpek_inst->mem_count > max_rows)
01391 {
01392 buffpek_inst->mem_count= (uint32_t) max_rows;
01393 buffpek_inst->count= 0;
01394 }
01395 max_rows-= buffpek_inst->mem_count;
01396 if (flag == 0)
01397 {
01398 if (my_b_write(to_file,(unsigned char*) buffpek_inst->key,
01399 (rec_length*buffpek_inst->mem_count)))
01400 {
01401 return 1;
01402 }
01403 }
01404 else
01405 {
01406 unsigned char *end;
01407 strpos= buffpek_inst->key+offset;
01408 for (end= strpos+buffpek_inst->mem_count*rec_length ;
01409 strpos != end ;
01410 strpos+= rec_length)
01411 {
01412 if (my_b_write(to_file, (unsigned char *) strpos, res_length))
01413 {
01414 return 1;
01415 }
01416 }
01417 }
01418 }
01419
01420 while ((error=(int) read_to_buffer(from_file,buffpek_inst, rec_length))
01421 != -1 && error != 0);
01422
01423 end:
01424 lastbuff->count= min(org_max_rows-max_rows, param->max_rows);
01425 lastbuff->file_pos= to_start_filepos;
01426
01427 return error;
01428 }
01429
01430
01431
01432
01433 int FileSort::merge_index(SortParam *param, unsigned char *sort_buffer,
01434 buffpek *buffpek_inst, uint32_t maxbuffer,
01435 internal::IO_CACHE *tempfile, internal::IO_CACHE *outfile)
01436 {
01437 if (merge_buffers(param,tempfile,outfile,sort_buffer,buffpek_inst,buffpek_inst,
01438 buffpek_inst+maxbuffer,1))
01439 return 1;
01440
01441 return 0;
01442 }
01443
01444
01445 static uint32_t suffix_length(uint32_t string_length)
01446 {
01447 if (string_length < 256)
01448 return 1;
01449 if (string_length < 256L*256L)
01450 return 2;
01451 if (string_length < 256L*256L*256L)
01452 return 3;
01453 return 4;
01454 }
01455
01456
01457
01475 uint32_t FileSort::sortlength(SortField *sortorder, uint32_t s_length, bool *multi_byte_charset)
01476 {
01477 uint32_t length;
01478 const CHARSET_INFO *cs;
01479 *multi_byte_charset= 0;
01480
01481 length=0;
01482 for (; s_length-- ; sortorder++)
01483 {
01484 sortorder->need_strxnfrm= 0;
01485 sortorder->suffix_length= 0;
01486 if (sortorder->field)
01487 {
01488 cs= sortorder->field->sort_charset();
01489 sortorder->length= sortorder->field->sort_length();
01490
01491 if (use_strnxfrm((cs=sortorder->field->sort_charset())))
01492 {
01493 sortorder->need_strxnfrm= 1;
01494 *multi_byte_charset= 1;
01495 sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
01496 }
01497 if (sortorder->field->maybe_null())
01498 length++;
01499 }
01500 else
01501 {
01502 sortorder->result_type= sortorder->item->result_type();
01503 if (sortorder->item->result_as_int64_t())
01504 sortorder->result_type= INT_RESULT;
01505
01506 switch (sortorder->result_type) {
01507 case STRING_RESULT:
01508 sortorder->length=sortorder->item->max_length;
01509 set_if_smaller(sortorder->length,
01510 getSession().variables.max_sort_length);
01511 if (use_strnxfrm((cs=sortorder->item->collation.collation)))
01512 {
01513 sortorder->length= cs->coll->strnxfrmlen(cs, sortorder->length);
01514 sortorder->need_strxnfrm= 1;
01515 *multi_byte_charset= 1;
01516 }
01517 else if (cs == &my_charset_bin)
01518 {
01519
01520 sortorder->suffix_length= suffix_length(sortorder->length);
01521 sortorder->length+= sortorder->suffix_length;
01522 }
01523 break;
01524 case INT_RESULT:
01525 sortorder->length=8;
01526 break;
01527 case DECIMAL_RESULT:
01528 sortorder->length=
01529 class_decimal_get_binary_size(sortorder->item->max_length -
01530 (sortorder->item->decimals ? 1 : 0),
01531 sortorder->item->decimals);
01532 break;
01533 case REAL_RESULT:
01534 sortorder->length=sizeof(double);
01535 break;
01536 case ROW_RESULT:
01537
01538 assert(0);
01539 break;
01540 }
01541 if (sortorder->item->maybe_null)
01542 length++;
01543 }
01544 set_if_smaller(sortorder->length, (size_t)getSession().variables.max_sort_length);
01545 length+=sortorder->length;
01546 }
01547 sortorder->field= (Field*) 0;
01548 return length;
01549 }
01550
01551
01578 sort_addon_field *FileSort::get_addon_fields(Field **ptabfield, uint32_t sortlength_arg, uint32_t *plength)
01579 {
01580 Field **pfield;
01581 Field *field;
01582 sort_addon_field *addonf;
01583 uint32_t length= 0;
01584 uint32_t fields= 0;
01585 uint32_t null_fields= 0;
01586
01587
01588
01589
01590
01591
01592
01593
01594
01595
01596 *plength= 0;
01597
01598 for (pfield= ptabfield; (field= *pfield) ; pfield++)
01599 {
01600 if (!(field->isReadSet()))
01601 continue;
01602 if (field->flags & BLOB_FLAG)
01603 return 0;
01604 length+= field->max_packed_col_length(field->pack_length());
01605 if (field->maybe_null())
01606 null_fields++;
01607 fields++;
01608 }
01609 if (!fields)
01610 return 0;
01611 length+= (null_fields+7)/8;
01612
01613 if (length+sortlength_arg > getSession().variables.max_length_for_sort_data ||
01614 !(addonf= (sort_addon_field *) malloc(sizeof(sort_addon_field)*
01615 (fields+1))))
01616 return 0;
01617
01618 *plength= length;
01619 length= (null_fields+7)/8;
01620 null_fields= 0;
01621 for (pfield= ptabfield; (field= *pfield) ; pfield++)
01622 {
01623 if (!(field->isReadSet()))
01624 continue;
01625 addonf->field= field;
01626 addonf->offset= length;
01627 if (field->maybe_null())
01628 {
01629 addonf->null_offset= null_fields/8;
01630 addonf->null_bit= 1<<(null_fields & 7);
01631 null_fields++;
01632 }
01633 else
01634 {
01635 addonf->null_offset= 0;
01636 addonf->null_bit= 0;
01637 }
01638 addonf->length= field->max_packed_col_length(field->pack_length());
01639 length+= addonf->length;
01640 addonf++;
01641 }
01642 addonf->field= 0;
01643
01644 return (addonf-fields);
01645 }
01646
01647
01663 static void
01664 unpack_addon_fields(sort_addon_field *addon_field, unsigned char *buff)
01665 {
01666 Field *field;
01667 sort_addon_field *addonf= addon_field;
01668
01669 for ( ; (field= addonf->field) ; addonf++)
01670 {
01671 if (addonf->null_bit && (addonf->null_bit & buff[addonf->null_offset]))
01672 {
01673 field->set_null();
01674 continue;
01675 }
01676 field->set_notnull();
01677 field->unpack(field->ptr, buff + addonf->offset);
01678 }
01679 }
01680
01681
01682
01683
01684
01685
01686 #define DBL_EXP_DIG (sizeof(double)*8-DBL_MANT_DIG)
01687
01688 void change_double_for_sort(double nr,unsigned char *to)
01689 {
01690 unsigned char *tmp=(unsigned char*) to;
01691 if (nr == 0.0)
01692 {
01693 tmp[0]=(unsigned char) 128;
01694 memset(tmp+1, 0, sizeof(nr)-1);
01695 }
01696 else
01697 {
01698 #ifdef WORDS_BIGENDIAN
01699 memcpy(tmp,&nr,sizeof(nr));
01700 #else
01701 {
01702 unsigned char *ptr= (unsigned char*) &nr;
01703 #if defined(__FLOAT_WORD_ORDER) && (__FLOAT_WORD_ORDER == __BIG_ENDIAN)
01704 tmp[0]= ptr[3]; tmp[1]=ptr[2]; tmp[2]= ptr[1]; tmp[3]=ptr[0];
01705 tmp[4]= ptr[7]; tmp[5]=ptr[6]; tmp[6]= ptr[5]; tmp[7]=ptr[4];
01706 #else
01707 tmp[0]= ptr[7]; tmp[1]=ptr[6]; tmp[2]= ptr[5]; tmp[3]=ptr[4];
01708 tmp[4]= ptr[3]; tmp[5]=ptr[2]; tmp[6]= ptr[1]; tmp[7]=ptr[0];
01709 #endif
01710 }
01711 #endif
01712 if (tmp[0] & 128)
01713 {
01714 uint32_t i;
01715 for (i=0 ; i < sizeof(nr); i++)
01716 tmp[i]=tmp[i] ^ (unsigned char) 255;
01717 }
01718 else
01719 {
01720 uint16_t exp_part=(((uint16_t) tmp[0] << 8) | (uint16_t) tmp[1] |
01721 (uint16_t) 32768);
01722 exp_part+= (uint16_t) 1 << (16-1-DBL_EXP_DIG);
01723 tmp[0]= (unsigned char) (exp_part >> 8);
01724 tmp[1]= (unsigned char) exp_part;
01725 }
01726 }
01727 }
01728
01729 }