00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00026 #include <config.h>
00027 #include <fcntl.h>
00028 #include <drizzled/error.h>
00029 #include <drizzled/field/epoch.h>
00030 #include <drizzled/gettext.h>
00031 #include <drizzled/internal/my_sys.h>
00032 #include <drizzled/item/empty_string.h>
00033 #include <drizzled/item/int.h>
00034 #include <drizzled/lock.h>
00035 #include <drizzled/message/table.h>
00036 #include <drizzled/optimizer/cost_vector.h>
00037 #include <drizzled/plugin/client.h>
00038 #include <drizzled/plugin/event_observer.h>
00039 #include <drizzled/plugin/storage_engine.h>
00040 #include <drizzled/probes.h>
00041 #include <drizzled/session.h>
00042 #include <drizzled/sql_base.h>
00043 #include <drizzled/sql_parse.h>
00044 #include <drizzled/transaction_services.h>
00045 #include <drizzled/key.h>
00046 #include <drizzled/sql_lex.h>
00047
00048 using namespace std;
00049
00050 namespace drizzled {
00051
00052
00053
00054
00055 Cursor::Cursor(plugin::StorageEngine &engine_arg,
00056 Table &arg)
00057 : table(arg),
00058 engine(engine_arg),
00059 estimation_rows_to_insert(0),
00060 ref(0),
00061 key_used_on_scan(MAX_KEY), active_index(MAX_KEY),
00062 ref_length(sizeof(internal::my_off_t)),
00063 inited(NONE),
00064 locked(false),
00065 next_insert_id(0), insert_id_for_cur_row(0)
00066 { }
00067
00068 Cursor::~Cursor(void)
00069 {
00070 assert(locked == false);
00071
00072 }
00073
00074
00075
00076
00077
00078
00079
00080 Cursor *Cursor::clone(memory::Root *mem_root)
00081 {
00082 Cursor *new_handler= getTable()->getMutableShare()->db_type()->getCursor(*getTable());
00083
00084
00085
00086
00087
00088
00089 if (!(new_handler->ref= (unsigned char*) mem_root->alloc_root(ALIGN_SIZE(ref_length)*2)))
00090 return NULL;
00091
00092 identifier::Table identifier(getTable()->getShare()->getSchemaName(),
00093 getTable()->getShare()->getTableName(),
00094 getTable()->getShare()->getType());
00095
00096 if (new_handler && !new_handler->ha_open(identifier,
00097 getTable()->getDBStat(),
00098 HA_OPEN_IGNORE_IF_LOCKED))
00099 return new_handler;
00100 return NULL;
00101 }
00102
00103
00104
00105
00106
00107
00108 uint32_t Cursor::calculate_key_len(uint32_t key_position, key_part_map keypart_map_arg)
00109 {
00110
00111 assert(((keypart_map_arg + 1) & keypart_map_arg) == 0);
00112
00113 const KeyPartInfo *key_part_found= getTable()->getShare()->getKeyInfo(key_position).key_part;
00114 const KeyPartInfo *end_key_part_found= key_part_found + getTable()->getShare()->getKeyInfo(key_position).key_parts;
00115 uint32_t length= 0;
00116
00117 while (key_part_found < end_key_part_found && keypart_map_arg)
00118 {
00119 length+= key_part_found->store_length;
00120 keypart_map_arg >>= 1;
00121 key_part_found++;
00122 }
00123 return length;
00124 }
00125
00126 int Cursor::startIndexScan(uint32_t idx, bool sorted)
00127 {
00128 int result;
00129 assert(inited == NONE);
00130 if (!(result= doStartIndexScan(idx, sorted)))
00131 inited=INDEX;
00132 end_range= NULL;
00133 return result;
00134 }
00135
00136 int Cursor::endIndexScan()
00137 {
00138 assert(inited==INDEX);
00139 inited=NONE;
00140 end_range= NULL;
00141 return(doEndIndexScan());
00142 }
00143
00144 int Cursor::startTableScan(bool scan)
00145 {
00146 int result;
00147 assert(inited==NONE || (inited==RND && scan));
00148 inited= (result= doStartTableScan(scan)) ? NONE: RND;
00149
00150 return result;
00151 }
00152
00153 int Cursor::endTableScan()
00154 {
00155 assert(inited==RND);
00156 inited=NONE;
00157 return(doEndTableScan());
00158 }
00159
00160 int Cursor::ha_index_or_rnd_end()
00161 {
00162 return inited == INDEX ? endIndexScan() : inited == RND ? endTableScan() : 0;
00163 }
00164
00165 void Cursor::ha_start_bulk_insert(ha_rows rows)
00166 {
00167 estimation_rows_to_insert= rows;
00168 start_bulk_insert(rows);
00169 }
00170
00171 int Cursor::ha_end_bulk_insert()
00172 {
00173 estimation_rows_to_insert= 0;
00174 return end_bulk_insert();
00175 }
00176
00177 const key_map *Cursor::keys_to_use_for_scanning()
00178 {
00179 return &key_map_empty;
00180 }
00181
00182 bool Cursor::has_transactions()
00183 {
00184 return (getTable()->getShare()->db_type()->check_flag(HTON_BIT_DOES_TRANSACTIONS));
00185 }
00186
00187 void Cursor::ha_statistic_increment(uint64_t system_status_var::*offset) const
00188 {
00189 (getTable()->in_use->status_var.*offset)++;
00190 }
00191
00192 void **Cursor::ha_data(Session *session) const
00193 {
00194 return session->getEngineData(getEngine());
00195 }
00196
00197 bool Cursor::is_fatal_error(int error, uint32_t flags)
00198 {
00199 if (!error ||
00200 ((flags & HA_CHECK_DUP_KEY) &&
00201 (error == HA_ERR_FOUND_DUPP_KEY ||
00202 error == HA_ERR_FOUND_DUPP_UNIQUE)))
00203 return false;
00204 return true;
00205 }
00206
00207
00208 ha_rows Cursor::records() { return stats.records; }
00209 uint64_t Cursor::tableSize() { return stats.index_file_length + stats.data_file_length; }
00210 uint64_t Cursor::rowSize() { return getTable()->getRecordLength() + getTable()->sizeFields(); }
00211
00212 int Cursor::doOpen(const identifier::Table &identifier, int mode, uint32_t test_if_locked)
00213 {
00214 return open(identifier.getPath().c_str(), mode, test_if_locked);
00215 }
00216
00223 int Cursor::ha_open(const identifier::Table &identifier,
00224 int mode,
00225 int test_if_locked)
00226 {
00227 int error;
00228
00229 if ((error= doOpen(identifier, mode, test_if_locked)))
00230 {
00231 if ((error == EACCES || error == EROFS) && mode == O_RDWR &&
00232 (getTable()->db_stat & HA_TRY_READ_ONLY))
00233 {
00234 getTable()->db_stat|=HA_READ_ONLY;
00235 error= doOpen(identifier, O_RDONLY,test_if_locked);
00236 }
00237 }
00238 if (error)
00239 {
00240 errno= error;
00241 }
00242 else
00243 {
00244 if (getTable()->getShare()->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
00245 getTable()->db_stat|=HA_READ_ONLY;
00246 (void) extra(HA_EXTRA_NO_READCHECK);
00247
00248
00249 if (!ref && !(ref= (unsigned char*) getTable()->alloc_root(ALIGN_SIZE(ref_length)*2)))
00250 {
00251 close();
00252 error=HA_ERR_OUT_OF_MEM;
00253 }
00254 else
00255 dup_ref=ref+ALIGN_SIZE(ref_length);
00256 }
00257 return error;
00258 }
00259
00266 int Cursor::read_first_row(unsigned char * buf, uint32_t primary_key)
00267 {
00268 int error;
00269
00270 ha_statistic_increment(&system_status_var::ha_read_first_count);
00271
00272
00273
00274
00275
00276
00277 if (stats.deleted < 10 || primary_key >= MAX_KEY ||
00278 !(getTable()->index_flags(primary_key) & HA_READ_ORDER))
00279 {
00280 error= startTableScan(1);
00281 if (error == 0)
00282 {
00283 while ((error= rnd_next(buf)) == HA_ERR_RECORD_DELETED) ;
00284 (void) endTableScan();
00285 }
00286 }
00287 else
00288 {
00289
00290 error= startIndexScan(primary_key, 0);
00291 if (error == 0)
00292 {
00293 error=index_first(buf);
00294 (void) endIndexScan();
00295 }
00296 }
00297 return error;
00298 }
00299
00311 inline uint64_t
00312 compute_next_insert_id(uint64_t nr, drizzle_system_variables *variables)
00313 {
00314 if (variables->auto_increment_increment == 1)
00315 return (nr+1);
00316 nr= (((nr+ variables->auto_increment_increment -
00317 variables->auto_increment_offset)) /
00318 (uint64_t) variables->auto_increment_increment);
00319 return (nr* (uint64_t) variables->auto_increment_increment +
00320 variables->auto_increment_offset);
00321 }
00322
00323
00324 void Cursor::adjust_next_insert_id_after_explicit_value(uint64_t nr)
00325 {
00326
00327
00328
00329
00330
00331 if ((next_insert_id > 0) && (nr >= next_insert_id))
00332 set_next_insert_id(compute_next_insert_id(nr, &getTable()->in_use->variables));
00333 }
00334
00335
00351 inline uint64_t
00352 prev_insert_id(uint64_t nr, drizzle_system_variables *variables)
00353 {
00354 if (unlikely(nr < variables->auto_increment_offset))
00355 {
00356
00357
00358
00359
00360
00361 return nr;
00362 }
00363 if (variables->auto_increment_increment == 1)
00364 return nr;
00365 nr= (((nr - variables->auto_increment_offset)) /
00366 (uint64_t) variables->auto_increment_increment);
00367 return (nr * (uint64_t) variables->auto_increment_increment +
00368 variables->auto_increment_offset);
00369 }
00370
00371
00441 #define AUTO_INC_DEFAULT_NB_ROWS 1 // Some prefer 1024 here
00442 #define AUTO_INC_DEFAULT_NB_MAX_BITS 16
00443 #define AUTO_INC_DEFAULT_NB_MAX ((1 << AUTO_INC_DEFAULT_NB_MAX_BITS) - 1)
00444
00445 int Cursor::update_auto_increment()
00446 {
00447 uint64_t nr, nb_reserved_values;
00448 bool append= false;
00449 Session *session= getTable()->in_use;
00450 drizzle_system_variables *variables= &session->variables;
00451
00452
00453
00454
00455
00456 assert(next_insert_id >= auto_inc_interval_for_cur_row.minimum());
00457
00458
00459
00460
00461
00462 if ((nr= getTable()->next_number_field->val_int()) != 0
00463 || getTable()->auto_increment_field_not_null)
00464 {
00465
00466
00467
00468
00469
00470
00471 adjust_next_insert_id_after_explicit_value(nr);
00472 insert_id_for_cur_row= 0;
00473
00474 return 0;
00475 }
00476
00477 if ((nr= next_insert_id) >= auto_inc_interval_for_cur_row.maximum())
00478 {
00479
00480 const Discrete_interval *forced=
00481 session->auto_inc_intervals_forced.get_next();
00482 if (forced != NULL)
00483 {
00484 nr= forced->minimum();
00485 nb_reserved_values= forced->values();
00486 }
00487 else
00488 {
00489
00490
00491
00492
00493 uint32_t nb_already_reserved_intervals=
00494 session->auto_inc_intervals_in_cur_stmt_for_binlog.nb_elements();
00495 uint64_t nb_desired_values;
00496
00497
00498
00499
00500
00501
00502
00503
00504
00505
00506
00507 if (nb_already_reserved_intervals == 0 &&
00508 (estimation_rows_to_insert > 0))
00509 nb_desired_values= estimation_rows_to_insert;
00510 else
00511 {
00512
00513 if (nb_already_reserved_intervals <= AUTO_INC_DEFAULT_NB_MAX_BITS)
00514 {
00515 nb_desired_values= AUTO_INC_DEFAULT_NB_ROWS *
00516 (1 << nb_already_reserved_intervals);
00517 set_if_smaller(nb_desired_values, (uint64_t)AUTO_INC_DEFAULT_NB_MAX);
00518 }
00519 else
00520 nb_desired_values= AUTO_INC_DEFAULT_NB_MAX;
00521 }
00522
00523 get_auto_increment(variables->auto_increment_offset,
00524 variables->auto_increment_increment,
00525 nb_desired_values, &nr,
00526 &nb_reserved_values);
00527 if (nr == ~(uint64_t) 0)
00528 return HA_ERR_AUTOINC_READ_FAILED;
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538 nr= compute_next_insert_id(nr-1, variables);
00539 }
00540
00541 if (getTable()->getShare()->next_number_keypart == 0)
00542 {
00543
00544 append= true;
00545 }
00546 }
00547
00548 if (unlikely(getTable()->next_number_field->store((int64_t) nr, true)))
00549 {
00550
00551
00552
00553 if (session->getKilled() == Session::KILL_BAD_DATA)
00554 return HA_ERR_AUTOINC_ERANGE;
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564 nr= prev_insert_id(getTable()->next_number_field->val_int(), variables);
00565 if (unlikely(getTable()->next_number_field->store((int64_t) nr, true)))
00566 nr= getTable()->next_number_field->val_int();
00567 }
00568 if (append)
00569 {
00570 auto_inc_interval_for_cur_row.replace(nr, nb_reserved_values,
00571 variables->auto_increment_increment);
00572 }
00573
00574
00575
00576
00577
00578
00579
00580
00581 insert_id_for_cur_row= nr;
00582
00583
00584
00585
00586 set_next_insert_id(compute_next_insert_id(nr, variables));
00587
00588 return 0;
00589 }
00590
00591
00608 void Cursor::ha_release_auto_increment()
00609 {
00610 release_auto_increment();
00611 insert_id_for_cur_row= 0;
00612 auto_inc_interval_for_cur_row.replace(0, 0, 0);
00613 if (next_insert_id > 0)
00614 {
00615 next_insert_id= 0;
00616
00617
00618
00619
00620 getTable()->in_use->auto_inc_intervals_forced.empty();
00621 }
00622 }
00623
00624 void Cursor::drop_table(const char *)
00625 {
00626 close();
00627 }
00628
00629
00645 int Cursor::ha_check(Session *, HA_CHECK_OPT *)
00646 {
00647 return HA_ADMIN_OK;
00648 }
00649
00655 inline
00656 void
00657 Cursor::setTransactionReadWrite()
00658 {
00659 ResourceContext *resource_context;
00660
00661
00662
00663
00664
00665
00666 if (not getTable()->in_use)
00667 return;
00668
00669 resource_context= getTable()->in_use->getResourceContext(getEngine());
00670
00671
00672
00673
00674
00675
00676
00677
00678 if (resource_context->isStarted())
00679 {
00680 resource_context->markModifiedData();
00681 }
00682 }
00683
00684
00695 int
00696 Cursor::ha_delete_all_rows()
00697 {
00698 setTransactionReadWrite();
00699
00700 int result= delete_all_rows();
00701
00702 if (result == 0)
00703 {
00710 Session *const session= getTable()->in_use;
00711 TransactionServices &transaction_services= TransactionServices::singleton();
00712 transaction_services.truncateTable(*session, *getTable());
00713 }
00714
00715 return result;
00716 }
00717
00718
00725 int
00726 Cursor::ha_reset_auto_increment(uint64_t value)
00727 {
00728 setTransactionReadWrite();
00729
00730 return reset_auto_increment(value);
00731 }
00732
00733
00740 int
00741 Cursor::ha_analyze(Session* session, HA_CHECK_OPT*)
00742 {
00743 setTransactionReadWrite();
00744
00745 return analyze(session);
00746 }
00747
00754 int
00755 Cursor::ha_disable_indexes(uint32_t mode)
00756 {
00757 setTransactionReadWrite();
00758
00759 return disable_indexes(mode);
00760 }
00761
00762
00769 int
00770 Cursor::ha_enable_indexes(uint32_t mode)
00771 {
00772 setTransactionReadWrite();
00773
00774 return enable_indexes(mode);
00775 }
00776
00777
00784 int
00785 Cursor::ha_discard_or_import_tablespace(bool discard)
00786 {
00787 setTransactionReadWrite();
00788
00789 return discard_or_import_tablespace(discard);
00790 }
00791
00798 void
00799 Cursor::closeMarkForDelete(const char *name)
00800 {
00801 setTransactionReadWrite();
00802
00803 return drop_table(name);
00804 }
00805
00806 int Cursor::index_next_same(unsigned char *buf, const unsigned char *key, uint32_t keylen)
00807 {
00808 int error;
00809 if (!(error=index_next(buf)))
00810 {
00811 ptrdiff_t ptrdiff= buf - getTable()->getInsertRecord();
00812 unsigned char *save_record_0= NULL;
00813 KeyInfo *key_info= NULL;
00814 KeyPartInfo *key_part;
00815 KeyPartInfo *key_part_end= NULL;
00816
00817
00818
00819
00820
00821
00822
00823
00824
00825 if (ptrdiff)
00826 {
00827 save_record_0= getTable()->getInsertRecord();
00828 getTable()->record[0]= buf;
00829 key_info= getTable()->key_info + active_index;
00830 key_part= key_info->key_part;
00831 key_part_end= key_part + key_info->key_parts;
00832 for (; key_part < key_part_end; key_part++)
00833 {
00834 assert(key_part->field);
00835 key_part->field->move_field_offset(ptrdiff);
00836 }
00837 }
00838
00839 if (key_cmp_if_same(getTable(), key, active_index, keylen))
00840 {
00841 getTable()->status=STATUS_NOT_FOUND;
00842 error=HA_ERR_END_OF_FILE;
00843 }
00844
00845
00846 if (ptrdiff)
00847 {
00848 getTable()->record[0]= save_record_0;
00849 for (key_part= key_info->key_part; key_part < key_part_end; key_part++)
00850 key_part->field->move_field_offset(-ptrdiff);
00851 }
00852 }
00853 return error;
00854 }
00855
00856
00857
00858
00859
00860
00882 double Cursor::index_only_read_time(uint32_t keynr, double key_records)
00883 {
00884 uint32_t keys_per_block= (stats.block_size/2/
00885 (getTable()->key_info[keynr].key_length + ref_length) + 1);
00886 return ((double) (key_records + keys_per_block-1) /
00887 (double) keys_per_block);
00888 }
00889
00890
00891
00892
00893
00894
00926 ha_rows
00927 Cursor::multi_range_read_info_const(uint32_t keyno, RANGE_SEQ_IF *seq,
00928 void *seq_init_param,
00929 uint32_t ,
00930 uint32_t *bufsz, uint32_t *flags, optimizer::CostVector *cost)
00931 {
00932 KEY_MULTI_RANGE range;
00933 range_seq_t seq_it;
00934 ha_rows rows, total_rows= 0;
00935 uint32_t n_ranges=0;
00936
00937
00938 *bufsz= 0;
00939
00940 seq_it= seq->init(seq_init_param, n_ranges, *flags);
00941 while (!seq->next(seq_it, &range))
00942 {
00943 n_ranges++;
00944 key_range *min_endp, *max_endp;
00945 {
00946 min_endp= range.start_key.length? &range.start_key : NULL;
00947 max_endp= range.end_key.length? &range.end_key : NULL;
00948 }
00949 if ((range.range_flag & UNIQUE_RANGE) && !(range.range_flag & NULL_RANGE))
00950 rows= 1;
00951 else
00952 {
00953 if (HA_POS_ERROR == (rows= this->records_in_range(keyno, min_endp,
00954 max_endp)))
00955 {
00956
00957 total_rows= HA_POS_ERROR;
00958 break;
00959 }
00960 }
00961 total_rows += rows;
00962 }
00963
00964 if (total_rows != HA_POS_ERROR)
00965 {
00966
00967 *flags |= HA_MRR_USE_DEFAULT_IMPL;
00968 cost->zero();
00969 cost->setAvgIOCost(1);
00970 if ((*flags & HA_MRR_INDEX_ONLY) && total_rows > 2)
00971 cost->setIOCount(index_only_read_time(keyno, (uint32_t)total_rows));
00972 else
00973 cost->setIOCount(read_time(keyno, n_ranges, total_rows));
00974 cost->setCpuCost((double) total_rows / TIME_FOR_COMPARE + 0.01);
00975 }
00976 return total_rows;
00977 }
00978
00979
01014 int Cursor::multi_range_read_info(uint32_t keyno, uint32_t n_ranges, uint32_t n_rows,
01015 uint32_t *bufsz, uint32_t *flags, optimizer::CostVector *cost)
01016 {
01017 *bufsz= 0;
01018
01019 *flags |= HA_MRR_USE_DEFAULT_IMPL;
01020
01021 cost->zero();
01022 cost->setAvgIOCost(1);
01023
01024
01025 if (*flags & HA_MRR_INDEX_ONLY)
01026 cost->setIOCount(index_only_read_time(keyno, n_rows));
01027 else
01028 cost->setIOCount(read_time(keyno, n_ranges, n_rows));
01029 return 0;
01030 }
01031
01032
01074 int
01075 Cursor::multi_range_read_init(RANGE_SEQ_IF *seq_funcs, void *seq_init_param,
01076 uint32_t n_ranges, uint32_t mode)
01077 {
01078 mrr_iter= seq_funcs->init(seq_init_param, n_ranges, mode);
01079 mrr_funcs= *seq_funcs;
01080 mrr_is_output_sorted= test(mode & HA_MRR_SORTED);
01081 mrr_have_range= false;
01082
01083 return 0;
01084 }
01085
01086
01100 int Cursor::multi_range_read_next(char **range_info)
01101 {
01102 int result= 0;
01103 int range_res= 0;
01104
01105 if (not mrr_have_range)
01106 {
01107 mrr_have_range= true;
01108 goto start;
01109 }
01110
01111 do
01112 {
01113
01114 if (mrr_cur_range.range_flag != (UNIQUE_RANGE | EQ_RANGE))
01115 {
01116 result= read_range_next();
01117
01118 if (result != HA_ERR_END_OF_FILE)
01119 break;
01120 }
01121 else
01122 {
01123 if (was_semi_consistent_read())
01124 goto scan_it_again;
01125
01126
01127
01128
01129 result= HA_ERR_END_OF_FILE;
01130 }
01131
01132 start:
01133
01134 while (!(range_res= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
01135 {
01136 scan_it_again:
01137 result= read_range_first(mrr_cur_range.start_key.keypart_map ?
01138 &mrr_cur_range.start_key : 0,
01139 mrr_cur_range.end_key.keypart_map ?
01140 &mrr_cur_range.end_key : 0,
01141 test(mrr_cur_range.range_flag & EQ_RANGE),
01142 mrr_is_output_sorted);
01143 if (result != HA_ERR_END_OF_FILE)
01144 break;
01145 }
01146 }
01147 while ((result == HA_ERR_END_OF_FILE) && !range_res);
01148
01149 *range_info= mrr_cur_range.ptr;
01150 return result;
01151 }
01152
01153
01154
01155
01156
01157
01176 int Cursor::read_range_first(const key_range *start_key,
01177 const key_range *end_key,
01178 bool eq_range_arg,
01179 bool )
01180 {
01181 int result;
01182
01183 eq_range= eq_range_arg;
01184 end_range= 0;
01185 if (end_key)
01186 {
01187 end_range= &save_end_range;
01188 save_end_range= *end_key;
01189 key_compare_result_on_equal= ((end_key->flag == HA_READ_BEFORE_KEY) ? 1 :
01190 (end_key->flag == HA_READ_AFTER_KEY) ? -1 : 0);
01191 }
01192 range_key_part= getTable()->key_info[active_index].key_part;
01193
01194 if (!start_key)
01195 result= index_first(getTable()->getInsertRecord());
01196 else
01197 result= index_read_map(getTable()->getInsertRecord(),
01198 start_key->key,
01199 start_key->keypart_map,
01200 start_key->flag);
01201 if (result)
01202 return((result == HA_ERR_KEY_NOT_FOUND)
01203 ? HA_ERR_END_OF_FILE
01204 : result);
01205
01206 return (compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
01207 }
01208
01209
01223 int Cursor::read_range_next()
01224 {
01225 int result;
01226
01227 if (eq_range)
01228 {
01229
01230 return(index_next_same(getTable()->getInsertRecord(),
01231 end_range->key,
01232 end_range->length));
01233 }
01234 result= index_next(getTable()->getInsertRecord());
01235 if (result)
01236 return result;
01237 return(compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
01238 }
01239
01240
01256 int Cursor::compare_key(key_range *range)
01257 {
01258 int cmp;
01259 if (not range)
01260 return 0;
01261 cmp= key_cmp(range_key_part, range->key, range->length);
01262 if (!cmp)
01263 cmp= key_compare_result_on_equal;
01264 return cmp;
01265 }
01266
01267 int Cursor::index_read_idx_map(unsigned char * buf, uint32_t index,
01268 const unsigned char * key,
01269 key_part_map keypart_map,
01270 enum ha_rkey_function find_flag)
01271 {
01272 int error, error1;
01273 error= doStartIndexScan(index, 0);
01274 if (!error)
01275 {
01276 error= index_read_map(buf, key, keypart_map, find_flag);
01277 error1= doEndIndexScan();
01278 }
01279 return error ? error : error1;
01280 }
01281
01289 static bool log_row_for_replication(Table* table,
01290 const unsigned char *before_record,
01291 const unsigned char *after_record)
01292 {
01293 TransactionServices &transaction_services= TransactionServices::singleton();
01294 Session *const session= table->in_use;
01295
01296 if (table->getShare()->getType() || not transaction_services.shouldConstructMessages())
01297 return false;
01298
01299 bool result= false;
01300
01301 switch (session->lex().sql_command)
01302 {
01303 case SQLCOM_CREATE_TABLE:
01304
01305
01306
01307
01308
01309
01310
01311
01312
01313 result= transaction_services.insertRecord(*session, *table);
01314 break;
01315 case SQLCOM_REPLACE:
01316 case SQLCOM_REPLACE_SELECT:
01317
01318
01319
01320
01321
01322
01323
01324
01325
01326
01327
01328
01329
01330
01331
01332
01333
01334
01335 if (after_record == NULL)
01336 {
01337
01338
01339
01340
01341
01342 transaction_services.deleteRecord(*session, *table, true);
01343
01344
01345
01346
01347
01348 transaction_services.finalizeStatementMessage(*session->getStatementMessage(), *session);
01349 }
01350 else
01351 {
01352 if (before_record == NULL)
01353 result= transaction_services.insertRecord(*session, *table);
01354 else
01355 transaction_services.updateRecord(*session, *table, before_record, after_record);
01356 }
01357 break;
01358 case SQLCOM_INSERT:
01359 case SQLCOM_INSERT_SELECT:
01360 case SQLCOM_LOAD:
01361
01362
01363
01364
01365
01366
01367 if (before_record == NULL)
01368 result= transaction_services.insertRecord(*session, *table);
01369 else
01370 transaction_services.updateRecord(*session, *table, before_record, after_record);
01371 break;
01372
01373 case SQLCOM_UPDATE:
01374 transaction_services.updateRecord(*session, *table, before_record, after_record);
01375 break;
01376
01377 case SQLCOM_DELETE:
01378 transaction_services.deleteRecord(*session, *table);
01379 break;
01380 default:
01381 break;
01382 }
01383
01384 return result;
01385 }
01386
01387 int Cursor::ha_external_lock(Session *session, int lock_type)
01388 {
01389
01390
01391
01392
01393
01394 assert(next_insert_id == 0);
01395
01396 if (DRIZZLE_CURSOR_RDLOCK_START_ENABLED() ||
01397 DRIZZLE_CURSOR_WRLOCK_START_ENABLED() ||
01398 DRIZZLE_CURSOR_UNLOCK_START_ENABLED())
01399 {
01400 if (lock_type == F_RDLCK)
01401 {
01402 DRIZZLE_CURSOR_RDLOCK_START(getTable()->getShare()->getSchemaName(),
01403 getTable()->getShare()->getTableName());
01404 }
01405 else if (lock_type == F_WRLCK)
01406 {
01407 DRIZZLE_CURSOR_WRLOCK_START(getTable()->getShare()->getSchemaName(),
01408 getTable()->getShare()->getTableName());
01409 }
01410 else if (lock_type == F_UNLCK)
01411 {
01412 DRIZZLE_CURSOR_UNLOCK_START(getTable()->getShare()->getSchemaName(),
01413 getTable()->getShare()->getTableName());
01414 }
01415 }
01416
01417
01418
01419
01420
01421
01422 int error= external_lock(session, lock_type);
01423
01424 if (DRIZZLE_CURSOR_RDLOCK_DONE_ENABLED() ||
01425 DRIZZLE_CURSOR_WRLOCK_DONE_ENABLED() ||
01426 DRIZZLE_CURSOR_UNLOCK_DONE_ENABLED())
01427 {
01428 if (lock_type == F_RDLCK)
01429 {
01430 DRIZZLE_CURSOR_RDLOCK_DONE(error);
01431 }
01432 else if (lock_type == F_WRLCK)
01433 {
01434 DRIZZLE_CURSOR_WRLOCK_DONE(error);
01435 }
01436 else if (lock_type == F_UNLCK)
01437 {
01438 DRIZZLE_CURSOR_UNLOCK_DONE(error);
01439 }
01440 }
01441
01442 return error;
01443 }
01444
01445
01449 int Cursor::ha_reset()
01450 {
01451
01452 assert(! getTable()->getShare()->all_set.none());
01453 assert(getTable()->key_read == 0);
01454
01455 assert(inited == NONE);
01456
01457 getTable()->free_io_cache();
01458
01459 getTable()->default_column_bitmaps();
01460 return(reset());
01461 }
01462
01463
01464 int Cursor::insertRecord(unsigned char *buf)
01465 {
01466 int error;
01467
01468
01469
01470
01471
01472
01473
01474 if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
01475 {
01476 getTable()->timestamp_field->set_time();
01477 }
01478
01479 DRIZZLE_INSERT_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
01480 setTransactionReadWrite();
01481
01482 if (unlikely(plugin::EventObserver::beforeInsertRecord(*getTable(), buf)))
01483 {
01484 error= ER_EVENT_OBSERVER_PLUGIN;
01485 }
01486 else
01487 {
01488 error= doInsertRecord(buf);
01489 if (unlikely(plugin::EventObserver::afterInsertRecord(*getTable(), buf, error)))
01490 {
01491 error= ER_EVENT_OBSERVER_PLUGIN;
01492 }
01493 }
01494
01495 ha_statistic_increment(&system_status_var::ha_write_count);
01496
01497 DRIZZLE_INSERT_ROW_DONE(error);
01498
01499 if (unlikely(error))
01500 {
01501 return error;
01502 }
01503
01504 if (unlikely(log_row_for_replication(getTable(), NULL, buf)))
01505 return HA_ERR_RBR_LOGGING_FAILED;
01506
01507 return 0;
01508 }
01509
01510
01511 int Cursor::updateRecord(const unsigned char *old_data, unsigned char *new_data)
01512 {
01513 int error;
01514
01515
01516
01517
01518
01519 assert(new_data == getTable()->getInsertRecord());
01520
01521 DRIZZLE_UPDATE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
01522 setTransactionReadWrite();
01523 if (unlikely(plugin::EventObserver::beforeUpdateRecord(*getTable(), old_data, new_data)))
01524 {
01525 error= ER_EVENT_OBSERVER_PLUGIN;
01526 }
01527 else
01528 {
01529 if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
01530 {
01531 getTable()->timestamp_field->set_time();
01532 }
01533
01534 error= doUpdateRecord(old_data, new_data);
01535 if (unlikely(plugin::EventObserver::afterUpdateRecord(*getTable(), old_data, new_data, error)))
01536 {
01537 error= ER_EVENT_OBSERVER_PLUGIN;
01538 }
01539 }
01540
01541 ha_statistic_increment(&system_status_var::ha_update_count);
01542
01543 DRIZZLE_UPDATE_ROW_DONE(error);
01544
01545 if (unlikely(error))
01546 {
01547 return error;
01548 }
01549
01550 if (unlikely(log_row_for_replication(getTable(), old_data, new_data)))
01551 return HA_ERR_RBR_LOGGING_FAILED;
01552
01553 return 0;
01554 }
01555 TableShare *Cursor::getShare()
01556 {
01557 return getTable()->getMutableShare();
01558 }
01559
01560 int Cursor::deleteRecord(const unsigned char *buf)
01561 {
01562 int error;
01563
01564 DRIZZLE_DELETE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
01565 setTransactionReadWrite();
01566 if (unlikely(plugin::EventObserver::beforeDeleteRecord(*getTable(), buf)))
01567 {
01568 error= ER_EVENT_OBSERVER_PLUGIN;
01569 }
01570 else
01571 {
01572 error= doDeleteRecord(buf);
01573 if (unlikely(plugin::EventObserver::afterDeleteRecord(*getTable(), buf, error)))
01574 {
01575 error= ER_EVENT_OBSERVER_PLUGIN;
01576 }
01577 }
01578
01579 ha_statistic_increment(&system_status_var::ha_delete_count);
01580
01581 DRIZZLE_DELETE_ROW_DONE(error);
01582
01583 if (unlikely(error))
01584 return error;
01585
01586 if (unlikely(log_row_for_replication(getTable(), buf, NULL)))
01587 return HA_ERR_RBR_LOGGING_FAILED;
01588
01589 return 0;
01590 }
01591
01592 }