Drizzled Public API Documentation

sql_update.cc

00001 /* Copyright (C) 2000-2006 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 
00017 /*
00018   Single table and multi table updates of tables.
00019 */
00020 
00021 #include <config.h>
00022 
00023 #include <drizzled/sql_select.h>
00024 #include <drizzled/error.h>
00025 #include <drizzled/probes.h>
00026 #include <drizzled/sql_base.h>
00027 #include <drizzled/field/epoch.h>
00028 #include <drizzled/sql_parse.h>
00029 #include <drizzled/optimizer/range.h>
00030 #include <drizzled/records.h>
00031 #include <drizzled/internal/my_sys.h>
00032 #include <drizzled/internal/iocache.h>
00033 #include <drizzled/transaction_services.h>
00034 #include <drizzled/filesort.h>
00035 #include <drizzled/plugin/storage_engine.h>
00036 #include <drizzled/key.h>
00037 #include <drizzled/sql_lex.h>
00038 
00039 #include <boost/dynamic_bitset.hpp>
00040 #include <list>
00041 
00042 using namespace std;
00043 
00044 namespace drizzled
00045 {
00046 
00059 static void prepare_record_for_error_message(int error, Table *table)
00060 {
00061   Field **field_p= NULL;
00062   Field *field= NULL;
00063   uint32_t keynr= 0;
00064 
00065   /*
00066     Only duplicate key errors print the key value.
00067     If storage engine does always read all columns, we have the value alraedy.
00068   */
00069   if ((error != HA_ERR_FOUND_DUPP_KEY) ||
00070       ! (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ)))
00071     return;
00072 
00073   /*
00074     Get the number of the offended index.
00075     We will see MAX_KEY if the engine cannot determine the affected index.
00076   */
00077   if ((keynr= table->get_dup_key(error)) >= MAX_KEY)
00078     return;
00079 
00080   /* Create unique_map with all fields used by that index. */
00081   boost::dynamic_bitset<> unique_map(table->getShare()->sizeFields()); /* Fields in offended unique. */
00082   table->mark_columns_used_by_index_no_reset(keynr, unique_map);
00083 
00084   /* Subtract read_set and write_set. */
00085   unique_map-= *table->read_set;
00086   unique_map-= *table->write_set;
00087 
00088   /*
00089     If the unique index uses columns that are neither in read_set
00090     nor in write_set, we must re-read the record.
00091     Otherwise no need to do anything.
00092   */
00093   if (unique_map.none())
00094     return;
00095 
00096   /* Get identifier of last read record into table->cursor->ref. */
00097   table->cursor->position(table->getInsertRecord());
00098   /* Add all fields used by unique index to read_set. */
00099   *table->read_set|= unique_map;
00100   /* Read record that is identified by table->cursor->ref. */
00101   (void) table->cursor->rnd_pos(table->getUpdateRecord(), table->cursor->ref);
00102   /* Copy the newly read columns into the new record. */
00103   for (field_p= table->getFields(); (field= *field_p); field_p++)
00104   {
00105     if (unique_map.test(field->position()))
00106     {
00107       field->copy_from_tmp(table->getShare()->rec_buff_length);
00108     }
00109   }
00110 
00111   return;
00112 }
00113 
00114 
00115 /*
00116   Process usual UPDATE
00117 
00118   SYNOPSIS
00119     update_query()
00120     session     thread handler
00121     fields    fields for update
00122     values    values of fields for update
00123     conds   WHERE clause expression
00124     order_num   number of elemen in ORDER BY clause
00125     order   order_st BY clause list
00126     limit   limit clause
00127     handle_duplicates how to handle duplicates
00128 
00129   RETURN
00130     0  - OK
00131     1  - error
00132 */
00133 
00134 int update_query(Session *session, TableList *table_list,
00135                  List<Item> &fields, List<Item> &values, COND *conds,
00136                  uint32_t order_num, Order *order,
00137                  ha_rows limit, enum enum_duplicates,
00138                  bool ignore)
00139 {
00140   bool    using_limit= limit != HA_POS_ERROR;
00141   bool    used_key_is_modified;
00142   bool    transactional_table;
00143   int   error= 0;
00144   uint    used_index= MAX_KEY, dup_key_found;
00145   bool          need_sort= true;
00146   ha_rows updated, found;
00147   key_map old_covering_keys;
00148   Table   *table;
00149   optimizer::SqlSelect *select= NULL;
00150   ReadRecord  info;
00151   Select_Lex    *select_lex= &session->lex().select_lex;
00152   uint64_t     id;
00153   List<Item> all_fields;
00154   Session::killed_state_t killed_status= Session::NOT_KILLED;
00155 
00156   DRIZZLE_UPDATE_START(session->getQueryString()->c_str());
00157   if (session->openTablesLock(table_list))
00158   {
00159     DRIZZLE_UPDATE_DONE(1, 0, 0);
00160     return 1;
00161   }
00162 
00163   session->set_proc_info("init");
00164   table= table_list->table;
00165 
00166   /* Calculate "table->covering_keys" based on the WHERE */
00167   table->covering_keys= table->getShare()->keys_in_use;
00168   table->quick_keys.reset();
00169 
00170   if (prepare_update(session, table_list, &conds, order_num, order))
00171   {
00172     DRIZZLE_UPDATE_DONE(1, 0, 0);
00173     return 1;
00174   }
00175 
00176   old_covering_keys= table->covering_keys;    // Keys used in WHERE
00177   /* Check the fields we are going to modify */
00178   if (setup_fields_with_no_wrap(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0))
00179   {
00180     DRIZZLE_UPDATE_DONE(1, 0, 0);
00181     return 1;
00182   }
00183 
00184   if (table->timestamp_field)
00185   {
00186     // Don't set timestamp column if this is modified
00187     if (table->timestamp_field->isWriteSet())
00188     {
00189       table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
00190     }
00191     else
00192     {
00193       if (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
00194           table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH)
00195       {
00196         table->setWriteSet(table->timestamp_field->position());
00197       }
00198     }
00199   }
00200 
00201   if (setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0))
00202   {
00203     free_underlaid_joins(session, select_lex);
00204     DRIZZLE_UPDATE_DONE(1, 0, 0);
00205 
00206     return 1;
00207   }
00208 
00209   if (select_lex->inner_refs_list.size() &&
00210     fix_inner_refs(session, all_fields, select_lex, select_lex->ref_pointer_array))
00211   {
00212     DRIZZLE_UPDATE_DONE(1, 0, 0);
00213     return 1;
00214   }
00215 
00216   if (conds)
00217   {
00218     Item::cond_result cond_value;
00219     conds= remove_eq_conds(session, conds, &cond_value);
00220     if (cond_value == Item::COND_FALSE)
00221       limit= 0;                                   // Impossible WHERE
00222   }
00223 
00224   /*
00225     If a timestamp field settable on UPDATE is present then to avoid wrong
00226     update force the table handler to retrieve write-only fields to be able
00227     to compare records and detect data change.
00228   */
00229   if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ) &&
00230       table->timestamp_field &&
00231       (table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_UPDATE ||
00232        table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
00233   {
00234     *table->read_set|= *table->write_set;
00235   }
00236   // Don't count on usage of 'only index' when calculating which key to use
00237   table->covering_keys.reset();
00238 
00239   /* Update the table->cursor->stats.records number */
00240   table->cursor->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
00241 
00242   select= optimizer::make_select(table, 0, 0, conds, 0, &error);
00243   if (error || !limit ||
00244       (select && select->check_quick(session, false, limit)))
00245   {
00246     delete select;
00251     session->main_da.reset_diagnostics_area();
00252     free_underlaid_joins(session, select_lex);
00253     if (error || session->is_error())
00254     {
00255       DRIZZLE_UPDATE_DONE(1, 0, 0);
00256       return 1;
00257     }
00258     DRIZZLE_UPDATE_DONE(0, 0, 0);
00259     session->my_ok();       // No matching records
00260     return 0;
00261   }
00262   if (!select && limit != HA_POS_ERROR)
00263   {
00264     if ((used_index= optimizer::get_index_for_order(table, order, limit)) != MAX_KEY)
00265       need_sort= false;
00266   }
00267   /* If running in safe sql mode, don't allow updates without keys */
00268   if (table->quick_keys.none())
00269   {
00270     session->server_status|=SERVER_QUERY_NO_INDEX_USED;
00271   }
00272 
00273   table->mark_columns_needed_for_update();
00274 
00275   /* Check if we are modifying a key that we are used to search with */
00276 
00277   if (select && select->quick)
00278   {
00279     used_index= select->quick->index;
00280     used_key_is_modified= (!select->quick->unique_key_range() &&
00281                           select->quick->is_keys_used(*table->write_set));
00282   }
00283   else
00284   {
00285     used_key_is_modified= 0;
00286     if (used_index == MAX_KEY)                  // no index for sort order
00287       used_index= table->cursor->key_used_on_scan;
00288     if (used_index != MAX_KEY)
00289       used_key_is_modified= is_key_used(table, used_index, *table->write_set);
00290   }
00291 
00292 
00293   if (used_key_is_modified || order)
00294   {
00295     /*
00296       We can't update table directly;  We must first search after all
00297       matching rows before updating the table!
00298     */
00299     if (used_index < MAX_KEY && old_covering_keys.test(used_index))
00300     {
00301       table->key_read=1;
00302       table->mark_columns_used_by_index(used_index);
00303     }
00304     else
00305     {
00306       table->use_all_columns();
00307     }
00308 
00309     /* note: We avoid sorting avoid if we sort on the used index */
00310     if (order && (need_sort || used_key_is_modified))
00311     {
00312       /*
00313   Doing an order_st BY;  Let filesort find and sort the rows we are going
00314   to update
00315         NOTE: filesort will call table->prepare_for_position()
00316       */
00317       uint32_t         length= 0;
00318       SortField  *sortorder;
00319       ha_rows examined_rows;
00320       FileSort filesort(*session);
00321 
00322       table->sort.io_cache= new internal::IO_CACHE;
00323 
00324       if (!(sortorder=make_unireg_sortorder(order, &length, NULL)) ||
00325     (table->sort.found_records= filesort.run(table, sortorder, length,
00326                select, limit, 1,
00327                examined_rows)) == HA_POS_ERROR)
00328       {
00329   goto err;
00330       }
00331       /*
00332   Filesort has already found and selected the rows we want to update,
00333   so we don't need the where clause
00334       */
00335       safe_delete(select);
00336     }
00337     else
00338     {
00339       /*
00340   We are doing a search on a key that is updated. In this case
00341   we go trough the matching rows, save a pointer to them and
00342   update these in a separate loop based on the pointer.
00343       */
00344 
00345       internal::IO_CACHE tempfile;
00346       if (tempfile.open_cached_file(drizzle_tmpdir.c_str(),TEMP_PREFIX, DISK_BUFFER_SIZE, MYF(MY_WME)))
00347       {
00348   goto err;
00349       }
00350 
00351       /* If quick select is used, initialize it before retrieving rows. */
00352       if (select && select->quick && select->quick->reset())
00353         goto err;
00354       table->cursor->try_semi_consistent_read(1);
00355 
00356       /*
00357         When we get here, we have one of the following options:
00358         A. used_index == MAX_KEY
00359            This means we should use full table scan, and start it with
00360            init_read_record call
00361         B. used_index != MAX_KEY
00362            B.1 quick select is used, start the scan with init_read_record
00363            B.2 quick select is not used, this is full index scan (with LIMIT)
00364                Full index scan must be started with init_read_record_idx
00365       */
00366 
00367       if (used_index == MAX_KEY || (select && select->quick))
00368       {
00369         if ((error= info.init_read_record(session, table, select, 0, true)))
00370           goto err;
00371       }
00372       else
00373       {
00374         if ((error= info.init_read_record_idx(session, table, 1, used_index)))
00375           goto err;
00376       }
00377 
00378       session->set_proc_info("Searching rows for update");
00379       ha_rows tmp_limit= limit;
00380 
00381       while (not(error= info.read_record(&info)) && not session->getKilled())
00382       {
00383   if (!(select && select->skip_record()))
00384   {
00385           if (table->cursor->was_semi_consistent_read())
00386       continue;  /* repeat the read of the same row if it still exists */
00387 
00388     table->cursor->position(table->getInsertRecord());
00389     if (my_b_write(&tempfile,table->cursor->ref,
00390        table->cursor->ref_length))
00391     {
00392       error=1;
00393       break;
00394     }
00395     if (!--limit && using_limit)
00396     {
00397       error= -1;
00398       break;
00399     }
00400   }
00401   else
00402     table->cursor->unlock_row();
00403       }
00404       if (session->getKilled() && not error)
00405   error= 1;       // Aborted
00406       limit= tmp_limit;
00407       table->cursor->try_semi_consistent_read(0);
00408       info.end_read_record();
00409 
00410       /* Change select to use tempfile */
00411       if (select)
00412       {
00413   safe_delete(select->quick);
00414   if (select->free_cond)
00415     delete select->cond;
00416   select->cond=0;
00417       }
00418       else
00419       {
00420   select= new optimizer::SqlSelect();
00421   select->head=table;
00422       }
00423       if (tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
00424   error=1;
00425       // Read row ptrs from this cursor
00426       memcpy(select->file, &tempfile, sizeof(tempfile));
00427       if (error >= 0)
00428   goto err;
00429     }
00430     if (table->key_read)
00431       table->restore_column_maps_after_mark_index();
00432   }
00433 
00434   if (ignore)
00435     table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
00436 
00437   if (select && select->quick && select->quick->reset())
00438     goto err;
00439   table->cursor->try_semi_consistent_read(1);
00440   if ((error= info.init_read_record(session, table, select, 0, true)))
00441   {
00442     goto err;
00443   }
00444 
00445   updated= found= 0;
00446   /*
00447    * Per the SQL standard, inserting NULL into a NOT NULL
00448    * field requires an error to be thrown.
00449    *
00450    * @NOTE
00451    *
00452    * NULL check and handling occurs in field_conv.cc
00453    */
00454   session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
00455   session->cuted_fields= 0L;
00456   session->set_proc_info("Updating");
00457 
00458   transactional_table= table->cursor->has_transactions();
00459   session->setAbortOnWarning(test(!ignore));
00460 
00461   /*
00462     Assure that we can use position()
00463     if we need to create an error message.
00464   */
00465   if (table->cursor->getEngine()->check_flag(HTON_BIT_PARTIAL_COLUMN_READ))
00466     table->prepare_for_position();
00467 
00468   while (not (error=info.read_record(&info)) && not session->getKilled())
00469   {
00470     if (not (select && select->skip_record()))
00471     {
00472       if (table->cursor->was_semi_consistent_read())
00473         continue;  /* repeat the read of the same row if it still exists */
00474 
00475       table->storeRecord();
00476       if (fill_record(session, fields, values))
00477         break;
00478 
00479       found++;
00480 
00481       if (! table->records_are_comparable() || table->compare_records())
00482       {
00483         /* Non-batched update */
00484         error= table->cursor->updateRecord(table->getUpdateRecord(),
00485                                             table->getInsertRecord());
00486 
00487         table->auto_increment_field_not_null= false;
00488 
00489         if (!error || error == HA_ERR_RECORD_IS_THE_SAME)
00490         {
00491           if (error != HA_ERR_RECORD_IS_THE_SAME)
00492             updated++;
00493           else
00494             error= 0;
00495         }
00496         else if (! ignore ||
00497                  table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
00498         {
00499           /*
00500             If (ignore && error is ignorable) we don't have to
00501             do anything; otherwise...
00502           */
00503           myf flags= 0;
00504 
00505           if (table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
00506             flags|= ME_FATALERROR; /* Other handler errors are fatal */
00507 
00508           prepare_record_for_error_message(error, table);
00509           table->print_error(error,MYF(flags));
00510           error= 1;
00511           break;
00512         }
00513       }
00514 
00515       if (!--limit && using_limit)
00516       {
00517         error= -1;        // Simulate end of cursor
00518         break;
00519       }
00520     }
00521     else
00522       table->cursor->unlock_row();
00523     session->row_count++;
00524   }
00525   dup_key_found= 0;
00526   /*
00527     Caching the killed status to pass as the arg to query event constuctor;
00528     The cached value can not change whereas the killed status can
00529     (externally) since this point and change of the latter won't affect
00530     binlogging.
00531     It's assumed that if an error was set in combination with an effective
00532     killed status then the error is due to killing.
00533   */
00534   killed_status= session->getKilled(); // get the status of the volatile
00535   // simulated killing after the loop must be ineffective for binlogging
00536   error= (killed_status == Session::NOT_KILLED)?  error : 1;
00537 
00538   updated-= dup_key_found;
00539   table->cursor->try_semi_consistent_read(0);
00540 
00541   if (!transactional_table && updated > 0)
00542     session->transaction.stmt.markModifiedNonTransData();
00543 
00544   info.end_read_record();
00545   delete select;
00546   session->set_proc_info("end");
00547   table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
00548 
00549   /*
00550     error < 0 means really no error at all: we processed all rows until the
00551     last one without error. error > 0 means an error (e.g. unique key
00552     violation and no IGNORE or REPLACE). error == 0 is also an error (if
00553     preparing the record or invoking before triggers fails). See
00554     autocommitOrRollback(error>=0) and return(error>=0) below.
00555     Sometimes we want to binlog even if we updated no rows, in case user used
00556     it to be sure master and slave are in same state.
00557   */
00558   if ((error < 0) || session->transaction.stmt.hasModifiedNonTransData())
00559   {
00560     if (session->transaction.stmt.hasModifiedNonTransData())
00561       session->transaction.all.markModifiedNonTransData();
00562   }
00563   assert(transactional_table || !updated || session->transaction.stmt.hasModifiedNonTransData());
00564   free_underlaid_joins(session, select_lex);
00565 
00566   /* If LAST_INSERT_ID(X) was used, report X */
00567   id= session->arg_of_last_insert_id_function ?
00568     session->first_successful_insert_id_in_prev_stmt : 0;
00569 
00570   if (error < 0)
00571   {
00572     char buff[STRING_BUFFER_USUAL_SIZE];
00573     snprintf(buff, sizeof(buff), ER(ER_UPDATE_INFO), (ulong) found, (ulong) updated,
00574       (ulong) session->cuted_fields);
00575     session->row_count_func= updated;
00580     session->main_da.reset_diagnostics_area();
00581     session->my_ok((ulong) session->rowCount(), found, id, buff);
00582     session->status_var.updated_row_count+= session->rowCount();
00583   }
00584   session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;    /* calc cuted fields */
00585   session->setAbortOnWarning(false);
00586   DRIZZLE_UPDATE_DONE((error >= 0 || session->is_error()), found, updated);
00587   return ((error >= 0 || session->is_error()) ? 1 : 0);
00588 
00589 err:
00590   if (error != 0)
00591     table->print_error(error,MYF(0));
00592 
00593   delete select;
00594   free_underlaid_joins(session, select_lex);
00595   if (table->key_read)
00596   {
00597     table->key_read=0;
00598     table->cursor->extra(HA_EXTRA_NO_KEYREAD);
00599   }
00600   session->setAbortOnWarning(false);
00601 
00602   DRIZZLE_UPDATE_DONE(1, 0, 0);
00603   return 1;
00604 }
00605 
00606 /*
00607   Prepare items in UPDATE statement
00608 
00609   SYNOPSIS
00610     prepare_update()
00611     session     - thread handler
00612     table_list    - global/local table list
00613     conds   - conditions
00614     order_num   - number of ORDER BY list entries
00615     order   - ORDER BY clause list
00616 
00617   RETURN VALUE
00618     false OK
00619     true  error
00620 */
00621 bool prepare_update(Session *session, TableList *table_list,
00622        Item **conds, uint32_t order_num, Order *order)
00623 {
00624   List<Item> all_fields;
00625   Select_Lex *select_lex= &session->lex().select_lex;
00626 
00627   session->lex().allow_sum_func= 0;
00628 
00629   if (setup_tables_and_check_access(session, &select_lex->context,
00630                                     &select_lex->top_join_list,
00631                                     table_list,
00632                                     &select_lex->leaf_tables,
00633                                     false) ||
00634       session->setup_conds(table_list, conds) ||
00635       select_lex->setup_ref_array(session, order_num) ||
00636       setup_order(session, select_lex->ref_pointer_array,
00637       table_list, all_fields, all_fields, order))
00638     return true;
00639 
00640   /* Check that we are not using table that we are updating in a sub select */
00641   {
00642     TableList *duplicate;
00643     if ((duplicate= unique_table(table_list, table_list->next_global)))
00644     {
00645       my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
00646       return true;
00647     }
00648   }
00649 
00650   return false;
00651 }
00652 
00653 } /* namespace drizzled */