Drizzled Public API Documentation

sql_insert.cc

00001 /* Copyright (C) 2000-2006 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 
00017 /* Insert of records */
00018 
00019 #include <config.h>
00020 #include <cstdio>
00021 #include <drizzled/sql_select.h>
00022 #include <drizzled/show.h>
00023 #include <drizzled/error.h>
00024 #include <drizzled/name_resolution_context_state.h>
00025 #include <drizzled/probes.h>
00026 #include <drizzled/sql_base.h>
00027 #include <drizzled/sql_load.h>
00028 #include <drizzled/field/epoch.h>
00029 #include <drizzled/lock.h>
00030 #include <drizzled/sql_table.h>
00031 #include <drizzled/pthread_globals.h>
00032 #include <drizzled/transaction_services.h>
00033 #include <drizzled/plugin/transactional_storage_engine.h>
00034 #include <drizzled/select_insert.h>
00035 #include <drizzled/select_create.h>
00036 #include <drizzled/table/shell.h>
00037 #include <drizzled/alter_info.h>
00038 #include <drizzled/sql_parse.h>
00039 #include <drizzled/sql_lex.h>
00040 
00041 namespace drizzled
00042 {
00043 
00044 extern plugin::StorageEngine *heap_engine;
00045 extern plugin::StorageEngine *myisam_engine;
00046 
00047 /*
00048   Check if insert fields are correct.
00049 
00050   SYNOPSIS
00051     check_insert_fields()
00052     session                         The current thread.
00053     table                       The table for insert.
00054     fields                      The insert fields.
00055     values                      The insert values.
00056     check_unique                If duplicate values should be rejected.
00057 
00058   NOTE
00059     Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
00060     or leaves it as is, depending on if timestamp should be updated or
00061     not.
00062 
00063   RETURN
00064     0           OK
00065     -1          Error
00066 */
00067 
00068 static int check_insert_fields(Session *session, TableList *table_list,
00069                                List<Item> &fields, List<Item> &values,
00070                                bool check_unique,
00071                                table_map *)
00072 {
00073   Table *table= table_list->table;
00074 
00075   if (fields.size() == 0 && values.size() != 0)
00076   {
00077     if (values.size() != table->getShare()->sizeFields())
00078     {
00079       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
00080       return -1;
00081     }
00082     clear_timestamp_auto_bits(table->timestamp_field_type,
00083                               TIMESTAMP_AUTO_SET_ON_INSERT);
00084     /*
00085       No fields are provided so all fields must be provided in the values.
00086       Thus we set all bits in the write set.
00087     */
00088     table->setWriteSet();
00089   }
00090   else
00091   {           // Part field list
00092     Select_Lex *select_lex= &session->lex().select_lex;
00093     Name_resolution_context *context= &select_lex->context;
00094     Name_resolution_context_state ctx_state;
00095     int res;
00096 
00097     if (fields.size() != values.size())
00098     {
00099       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
00100       return -1;
00101     }
00102 
00103     session->dup_field= 0;
00104 
00105     /* Save the state of the current name resolution context. */
00106     ctx_state.save_state(context, table_list);
00107 
00108     /*
00109       Perform name resolution only in the first table - 'table_list',
00110       which is the table that is inserted into.
00111     */
00112     table_list->next_local= 0;
00113     context->resolve_in_table_list_only(table_list);
00114     res= setup_fields(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
00115 
00116     /* Restore the current context. */
00117     ctx_state.restore_state(context, table_list);
00118 
00119     if (res)
00120       return -1;
00121 
00122     if (check_unique && session->dup_field)
00123     {
00124       my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), session->dup_field->field_name);
00125       return -1;
00126     }
00127     if (table->timestamp_field) // Don't automaticly set timestamp if used
00128     {
00129       if (table->timestamp_field->isWriteSet())
00130       {
00131         clear_timestamp_auto_bits(table->timestamp_field_type,
00132                                   TIMESTAMP_AUTO_SET_ON_INSERT);
00133       }
00134       else
00135       {
00136         table->setWriteSet(table->timestamp_field->position());
00137       }
00138     }
00139   }
00140 
00141   return 0;
00142 }
00143 
00144 
00145 /*
00146   Check update fields for the timestamp field.
00147 
00148   SYNOPSIS
00149     check_update_fields()
00150     session                         The current thread.
00151     insert_table_list           The insert table list.
00152     table                       The table for update.
00153     update_fields               The update fields.
00154 
00155   NOTE
00156     If the update fields include the timestamp field,
00157     remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.
00158 
00159   RETURN
00160     0           OK
00161     -1          Error
00162 */
00163 
00164 static int check_update_fields(Session *session, TableList *insert_table_list,
00165                                List<Item> &update_fields,
00166                                table_map *)
00167 {
00168   Table *table= insert_table_list->table;
00169   bool timestamp_mark= false;
00170 
00171   if (table->timestamp_field)
00172   {
00173     /*
00174       Unmark the timestamp field so that we can check if this is modified
00175       by update_fields
00176     */
00177     timestamp_mark= table->write_set->test(table->timestamp_field->position());
00178     table->write_set->reset(table->timestamp_field->position());
00179   }
00180 
00181   /* Check the fields we are going to modify */
00182   if (setup_fields(session, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
00183     return -1;
00184 
00185   if (table->timestamp_field)
00186   {
00187     /* Don't set timestamp column if this is modified. */
00188     if (table->timestamp_field->isWriteSet())
00189     {
00190       clear_timestamp_auto_bits(table->timestamp_field_type,
00191                                 TIMESTAMP_AUTO_SET_ON_UPDATE);
00192     }
00193 
00194     if (timestamp_mark)
00195     {
00196       table->setWriteSet(table->timestamp_field->position());
00197     }
00198   }
00199   return 0;
00200 }
00201 
00202 
00211 static
00212 void upgrade_lock_type(Session *,
00213                        thr_lock_type *lock_type,
00214                        enum_duplicates duplic,
00215                        bool )
00216 {
00217   if (duplic == DUP_UPDATE ||
00218       (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
00219   {
00220     *lock_type= TL_WRITE_DEFAULT;
00221     return;
00222   }
00223 }
00224 
00225 
00234 bool insert_query(Session *session,TableList *table_list,
00235                   List<Item> &fields,
00236                   List<List_item> &values_list,
00237                   List<Item> &update_fields,
00238                   List<Item> &update_values,
00239                   enum_duplicates duplic,
00240       bool ignore)
00241 {
00242   int error;
00243   bool transactional_table, joins_freed= false;
00244   bool changed;
00245   uint32_t value_count;
00246   ulong counter = 1;
00247   uint64_t id;
00248   CopyInfo info;
00249   Table *table= 0;
00250   List<List_item>::iterator its(values_list.begin());
00251   List_item *values;
00252   Name_resolution_context *context;
00253   Name_resolution_context_state ctx_state;
00254   thr_lock_type lock_type;
00255   Item *unused_conds= 0;
00256 
00257 
00258   /*
00259     Upgrade lock type if the requested lock is incompatible with
00260     the current connection mode or table operation.
00261   */
00262   upgrade_lock_type(session, &table_list->lock_type, duplic,
00263                     values_list.size() > 1);
00264 
00265   if (session->openTablesLock(table_list))
00266   {
00267     DRIZZLE_INSERT_DONE(1, 0);
00268     return true;
00269   }
00270 
00271   lock_type= table_list->lock_type;
00272 
00273   session->set_proc_info("init");
00274   session->used_tables=0;
00275   values= its++;
00276   value_count= values->size();
00277 
00278   if (prepare_insert(session, table_list, table, fields, values,
00279          update_fields, update_values, duplic, &unused_conds,
00280                            false,
00281                            (fields.size() || !value_count ||
00282                             (0) != 0), !ignore))
00283   {
00284     if (table != NULL)
00285       table->cursor->ha_release_auto_increment();
00286     if (!joins_freed)
00287       free_underlaid_joins(session, &session->lex().select_lex);
00288     session->setAbortOnWarning(false);
00289     DRIZZLE_INSERT_DONE(1, 0);
00290     return true;
00291   }
00292 
00293   /* mysql_prepare_insert set table_list->table if it was not set */
00294   table= table_list->table;
00295 
00296   context= &session->lex().select_lex.context;
00297   /*
00298     These three asserts test the hypothesis that the resetting of the name
00299     resolution context below is not necessary at all since the list of local
00300     tables for INSERT always consists of one table.
00301   */
00302   assert(!table_list->next_local);
00303   assert(!context->table_list->next_local);
00304   assert(!context->first_name_resolution_table->next_name_resolution_table);
00305 
00306   /* Save the state of the current name resolution context. */
00307   ctx_state.save_state(context, table_list);
00308 
00309   /*
00310     Perform name resolution only in the first table - 'table_list',
00311     which is the table that is inserted into.
00312   */
00313   table_list->next_local= 0;
00314   context->resolve_in_table_list_only(table_list);
00315 
00316   while ((values= its++))
00317   {
00318     counter++;
00319     if (values->size() != value_count)
00320     {
00321       my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
00322 
00323       if (table != NULL)
00324         table->cursor->ha_release_auto_increment();
00325       if (!joins_freed)
00326         free_underlaid_joins(session, &session->lex().select_lex);
00327       session->setAbortOnWarning(false);
00328       DRIZZLE_INSERT_DONE(1, 0);
00329 
00330       return true;
00331     }
00332     if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
00333     {
00334       if (table != NULL)
00335         table->cursor->ha_release_auto_increment();
00336       if (!joins_freed)
00337         free_underlaid_joins(session, &session->lex().select_lex);
00338       session->setAbortOnWarning(false);
00339       DRIZZLE_INSERT_DONE(1, 0);
00340       return true;
00341     }
00342   }
00343   its= values_list.begin();
00344 
00345   /* Restore the current context. */
00346   ctx_state.restore_state(context, table_list);
00347 
00348   /*
00349     Fill in the given fields and dump it to the table cursor
00350   */
00351   info.ignore= ignore;
00352   info.handle_duplicates=duplic;
00353   info.update_fields= &update_fields;
00354   info.update_values= &update_values;
00355 
00356   /*
00357     Count warnings for all inserts.
00358     For single line insert, generate an error if try to set a NOT NULL field
00359     to NULL.
00360   */
00361   session->count_cuted_fields= ignore ? CHECK_FIELD_WARN : CHECK_FIELD_ERROR_FOR_NULL;
00362 
00363   session->cuted_fields = 0L;
00364   table->next_number_field=table->found_next_number_field;
00365 
00366   error=0;
00367   session->set_proc_info("update");
00368   if (duplic == DUP_REPLACE)
00369     table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
00370   if (duplic == DUP_UPDATE)
00371     table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
00372   {
00373     if (duplic != DUP_ERROR || ignore)
00374       table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
00375     table->cursor->ha_start_bulk_insert(values_list.size());
00376   }
00377 
00378 
00379   session->setAbortOnWarning(not ignore);
00380 
00381   table->mark_columns_needed_for_insert();
00382 
00383   while ((values= its++))
00384   {
00385     if (fields.size() || !value_count)
00386     {
00387       table->restoreRecordAsDefault();  // Get empty record
00388       if (fill_record(session, fields, *values))
00389       {
00390   if (values_list.size() != 1 && ! session->is_error())
00391   {
00392     info.records++;
00393     continue;
00394   }
00395   /*
00396     TODO: set session->abort_on_warning if values_list.elements == 1
00397     and check that all items return warning in case of problem with
00398     storing field.
00399         */
00400   error=1;
00401   break;
00402       }
00403     }
00404     else
00405     {
00406       table->restoreRecordAsDefault();  // Get empty record
00407 
00408       if (fill_record(session, table->getFields(), *values))
00409       {
00410   if (values_list.size() != 1 && ! session->is_error())
00411   {
00412     info.records++;
00413     continue;
00414   }
00415   error=1;
00416   break;
00417       }
00418     }
00419 
00420     // Release latches in case bulk insert takes a long time
00421     plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
00422 
00423     error=write_record(session, table ,&info);
00424     if (error)
00425       break;
00426     session->row_count++;
00427   }
00428 
00429   free_underlaid_joins(session, &session->lex().select_lex);
00430   joins_freed= true;
00431 
00432   /*
00433     Now all rows are inserted.  Time to update logs and sends response to
00434     user
00435   */
00436   {
00437     /*
00438       Do not do this release if this is a delayed insert, it would steal
00439       auto_inc values from the delayed_insert thread as they share Table.
00440     */
00441     table->cursor->ha_release_auto_increment();
00442     if (table->cursor->ha_end_bulk_insert() && !error)
00443     {
00444       table->print_error(errno,MYF(0));
00445       error=1;
00446     }
00447     if (duplic != DUP_ERROR || ignore)
00448       table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
00449 
00450     transactional_table= table->cursor->has_transactions();
00451 
00452     changed= (info.copied || info.deleted || info.updated);
00453     if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
00454     {
00455       if (session->transaction.stmt.hasModifiedNonTransData())
00456   session->transaction.all.markModifiedNonTransData();
00457     }
00458     assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
00459 
00460   }
00461   session->set_proc_info("end");
00462   /*
00463     We'll report to the client this id:
00464     - if the table contains an autoincrement column and we successfully
00465     inserted an autogenerated value, the autogenerated value.
00466     - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
00467     called, X.
00468     - if the table contains an autoincrement column, and some rows were
00469     inserted, the id of the last "inserted" row (if IGNORE, that value may not
00470     have been really inserted but ignored).
00471   */
00472   id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
00473     session->first_successful_insert_id_in_cur_stmt :
00474     (session->arg_of_last_insert_id_function ?
00475      session->first_successful_insert_id_in_prev_stmt :
00476      ((table->next_number_field && info.copied) ?
00477      table->next_number_field->val_int() : 0));
00478   table->next_number_field=0;
00479   session->count_cuted_fields= CHECK_FIELD_IGNORE;
00480   table->auto_increment_field_not_null= false;
00481   if (duplic == DUP_REPLACE)
00482     table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
00483 
00484   if (error)
00485   {
00486     if (table != NULL)
00487       table->cursor->ha_release_auto_increment();
00488     if (!joins_freed)
00489       free_underlaid_joins(session, &session->lex().select_lex);
00490     session->setAbortOnWarning(false);
00491     DRIZZLE_INSERT_DONE(1, 0);
00492     return true;
00493   }
00494 
00495   if (values_list.size() == 1 && (!(session->options & OPTION_WARNINGS) ||
00496             !session->cuted_fields))
00497   {
00498     session->row_count_func= info.copied + info.deleted + info.updated;
00499     session->my_ok((ulong) session->rowCount(),
00500                    info.copied + info.deleted + info.touched, id);
00501   }
00502   else
00503   {
00504     char buff[160];
00505     if (ignore)
00506       snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
00507               (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
00508     else
00509       snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
00510         (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
00511     session->row_count_func= info.copied + info.deleted + info.updated;
00512     session->my_ok((ulong) session->rowCount(),
00513                    info.copied + info.deleted + info.touched, id, buff);
00514   }
00515   session->status_var.inserted_row_count+= session->rowCount();
00516   session->setAbortOnWarning(false);
00517   DRIZZLE_INSERT_DONE(0, session->rowCount());
00518 
00519   return false;
00520 }
00521 
00522 
00523 /*
00524   Check if table can be updated
00525 
00526   SYNOPSIS
00527      prepare_insert_check_table()
00528      session    Thread handle
00529      table_list   Table list
00530      fields   List of fields to be updated
00531      where    Pointer to where clause
00532      select_insert      Check is making for SELECT ... INSERT
00533 
00534    RETURN
00535      false ok
00536      true  ERROR
00537 */
00538 
00539 static bool prepare_insert_check_table(Session *session, TableList *table_list,
00540                                              List<Item> &,
00541                                              bool select_insert)
00542 {
00543 
00544 
00545   /*
00546      first table in list is the one we'll INSERT into, requires INSERT_ACL.
00547      all others require SELECT_ACL only. the ACL requirement below is for
00548      new leaves only anyway (view-constituents), so check for SELECT rather
00549      than INSERT.
00550   */
00551 
00552   if (setup_tables_and_check_access(session, &session->lex().select_lex.context,
00553                                     &session->lex().select_lex.top_join_list,
00554                                     table_list,
00555                                     &session->lex().select_lex.leaf_tables,
00556                                     select_insert))
00557     return(true);
00558 
00559   return(false);
00560 }
00561 
00562 
00563 /*
00564   Prepare items in INSERT statement
00565 
00566   SYNOPSIS
00567     prepare_insert()
00568     session     Thread handler
00569     table_list          Global/local table list
00570     table   Table to insert into (can be NULL if table should
00571       be taken from table_list->table)
00572     where   Where clause (for insert ... select)
00573     select_insert true if INSERT ... SELECT statement
00574     check_fields        true if need to check that all INSERT fields are
00575                         given values.
00576     abort_on_warning    whether to report if some INSERT field is not
00577                         assigned as an error (true) or as a warning (false).
00578 
00579   TODO (in far future)
00580     In cases of:
00581     INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
00582     ON DUPLICATE KEY ...
00583     we should be able to refer to sum1 in the ON DUPLICATE KEY part
00584 
00585   WARNING
00586     You MUST set table->insert_values to 0 after calling this function
00587     before releasing the table object.
00588 
00589   RETURN VALUE
00590     false OK
00591     true  error
00592 */
00593 
00594 bool prepare_insert(Session *session, TableList *table_list,
00595                           Table *table, List<Item> &fields, List_item *values,
00596                           List<Item> &update_fields, List<Item> &update_values,
00597                           enum_duplicates duplic,
00598                           COND **,
00599                           bool select_insert,
00600                           bool check_fields, bool abort_on_warning)
00601 {
00602   Select_Lex *select_lex= &session->lex().select_lex;
00603   Name_resolution_context *context= &select_lex->context;
00604   Name_resolution_context_state ctx_state;
00605   bool insert_into_view= (0 != 0);
00606   bool res= 0;
00607   table_map map= 0;
00608 
00609   /* INSERT should have a SELECT or VALUES clause */
00610   assert (!select_insert || !values);
00611 
00612   /*
00613     For subqueries in VALUES() we should not see the table in which we are
00614     inserting (for INSERT ... SELECT this is done by changing table_list,
00615     because INSERT ... SELECT share Select_Lex it with SELECT.
00616   */
00617   if (not select_insert)
00618   {
00619     for (Select_Lex_Unit *un= select_lex->first_inner_unit();
00620          un;
00621          un= un->next_unit())
00622     {
00623       for (Select_Lex *sl= un->first_select();
00624            sl;
00625            sl= sl->next_select())
00626       {
00627         sl->context.outer_context= 0;
00628       }
00629     }
00630   }
00631 
00632   if (duplic == DUP_UPDATE)
00633   {
00634     /* it should be allocated before Item::fix_fields() */
00635     if (table_list->set_insert_values(session->mem_root))
00636       return(true);
00637   }
00638 
00639   if (prepare_insert_check_table(session, table_list, fields, select_insert))
00640     return(true);
00641 
00642 
00643   /* Prepare the fields in the statement. */
00644   if (values)
00645   {
00646     /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
00647     assert (!select_lex->group_list.elements);
00648 
00649     /* Save the state of the current name resolution context. */
00650     ctx_state.save_state(context, table_list);
00651 
00652     /*
00653       Perform name resolution only in the first table - 'table_list',
00654       which is the table that is inserted into.
00655      */
00656     table_list->next_local= 0;
00657     context->resolve_in_table_list_only(table_list);
00658 
00659     res= check_insert_fields(session, context->table_list, fields, *values,
00660                              !insert_into_view, &map) ||
00661       setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0);
00662 
00663     if (!res && check_fields)
00664     {
00665       bool saved_abort_on_warning= session->abortOnWarning();
00666 
00667       session->setAbortOnWarning(abort_on_warning);
00668       res= check_that_all_fields_are_given_values(session,
00669                                                   table ? table :
00670                                                   context->table_list->table,
00671                                                   context->table_list);
00672       session->setAbortOnWarning(saved_abort_on_warning);
00673     }
00674 
00675     if (!res && duplic == DUP_UPDATE)
00676     {
00677       res= check_update_fields(session, context->table_list, update_fields, &map);
00678     }
00679 
00680     /* Restore the current context. */
00681     ctx_state.restore_state(context, table_list);
00682 
00683     if (not res)
00684       res= setup_fields(session, 0, update_values, MARK_COLUMNS_READ, 0, 0);
00685   }
00686 
00687   if (res)
00688     return(res);
00689 
00690   if (not table)
00691     table= table_list->table;
00692 
00693   if (not select_insert)
00694   {
00695     TableList *duplicate;
00696     if ((duplicate= unique_table(table_list, table_list->next_global, true)))
00697     {
00698       my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
00699 
00700       return true;
00701     }
00702   }
00703 
00704   if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
00705     table->prepare_for_position();
00706 
00707   return false;
00708 }
00709 
00710 
00711   /* Check if there is more uniq keys after field */
00712 
00713 static int last_uniq_key(Table *table,uint32_t keynr)
00714 {
00715   while (++keynr < table->getShare()->sizeKeys())
00716     if (table->key_info[keynr].flags & HA_NOSAME)
00717       return 0;
00718   return 1;
00719 }
00720 
00721 
00722 /*
00723   Write a record to table with optional deleting of conflicting records,
00724   invoke proper triggers if needed.
00725 
00726   SYNOPSIS
00727      write_record()
00728       session   - thread context
00729       table - table to which record should be written
00730       info  - CopyInfo structure describing handling of duplicates
00731               and which is used for counting number of records inserted
00732               and deleted.
00733 
00734   NOTE
00735     Once this record will be written to table after insert trigger will
00736     be invoked. If instead of inserting new record we will update old one
00737     then both on update triggers will work instead. Similarly both on
00738     delete triggers will be invoked if we will delete conflicting records.
00739 
00740     Sets session->transaction.stmt.modified_non_trans_data to true if table which is updated didn't have
00741     transactions.
00742 
00743   RETURN VALUE
00744     0     - success
00745     non-0 - error
00746 */
00747 
00748 
00749 int write_record(Session *session, Table *table,CopyInfo *info)
00750 {
00751   int error;
00752   std::vector<unsigned char> key;
00753   boost::dynamic_bitset<> *save_read_set, *save_write_set;
00754   uint64_t prev_insert_id= table->cursor->next_insert_id;
00755   uint64_t insert_id_for_cur_row= 0;
00756 
00757 
00758   info->records++;
00759   save_read_set=  table->read_set;
00760   save_write_set= table->write_set;
00761 
00762   if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
00763   {
00764     while ((error=table->cursor->insertRecord(table->getInsertRecord())))
00765     {
00766       uint32_t key_nr;
00767       /*
00768         If we do more than one iteration of this loop, from the second one the
00769         row will have an explicit value in the autoinc field, which was set at
00770         the first call of handler::update_auto_increment(). So we must save
00771         the autogenerated value to avoid session->insert_id_for_cur_row to become
00772         0.
00773       */
00774       if (table->cursor->insert_id_for_cur_row > 0)
00775         insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
00776       else
00777         table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
00778       bool is_duplicate_key_error;
00779       if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
00780   goto err;
00781       is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
00782       if (!is_duplicate_key_error)
00783       {
00784         /*
00785           We come here when we had an ignorable error which is not a duplicate
00786           key error. In this we ignore error if ignore flag is set, otherwise
00787           report error as usual. We will not do any duplicate key processing.
00788         */
00789         if (info->ignore)
00790           goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
00791         goto err;
00792       }
00793       if ((int) (key_nr = table->get_dup_key(error)) < 0)
00794       {
00795   error= HA_ERR_FOUND_DUPP_KEY;         /* Database can't find key */
00796   goto err;
00797       }
00798       /* Read all columns for the row we are going to replace */
00799       table->use_all_columns();
00800       /*
00801   Don't allow REPLACE to replace a row when a auto_increment column
00802   was used.  This ensures that we don't get a problem when the
00803   whole range of the key has been used.
00804       */
00805       if (info->handle_duplicates == DUP_REPLACE &&
00806           table->next_number_field &&
00807           key_nr == table->getShare()->next_number_index &&
00808     (insert_id_for_cur_row > 0))
00809   goto err;
00810       if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
00811       {
00812   if (table->cursor->rnd_pos(table->getUpdateRecord(),table->cursor->dup_ref))
00813     goto err;
00814       }
00815       else
00816       {
00817   if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
00818   {
00819     error=errno;
00820     goto err;
00821   }
00822 
00823   if (not key.size())
00824   {
00825           key.resize(table->getShare()->max_unique_length);
00826   }
00827   key_copy(&key[0], table->getInsertRecord(), table->key_info+key_nr, 0);
00828   if ((error=(table->cursor->index_read_idx_map(table->getUpdateRecord(),key_nr,
00829                                                     &key[0], HA_WHOLE_KEY,
00830                                                     HA_READ_KEY_EXACT))))
00831     goto err;
00832       }
00833       if (info->handle_duplicates == DUP_UPDATE)
00834       {
00835         /*
00836           We don't check for other UNIQUE keys - the first row
00837           that matches, is updated. If update causes a conflict again,
00838           an error is returned
00839         */
00840   assert(table->insert_values.size());
00841         table->storeRecordAsInsert();
00842         table->restoreRecord();
00843         assert(info->update_fields->size() ==
00844                     info->update_values->size());
00845         if (fill_record(session, *info->update_fields,
00846                                                  *info->update_values,
00847                                                  info->ignore))
00848           goto before_err;
00849 
00850         table->cursor->restore_auto_increment(prev_insert_id);
00851         if (table->next_number_field)
00852           table->cursor->adjust_next_insert_id_after_explicit_value(
00853             table->next_number_field->val_int());
00854         info->touched++;
00855 
00856         if (! table->records_are_comparable() || table->compare_records())
00857         {
00858           if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
00859                                                 table->getInsertRecord())) &&
00860               error != HA_ERR_RECORD_IS_THE_SAME)
00861           {
00862             if (info->ignore &&
00863                 !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
00864             {
00865               goto gok_or_after_err;
00866             }
00867             goto err;
00868           }
00869 
00870           if (error != HA_ERR_RECORD_IS_THE_SAME)
00871             info->updated++;
00872           else
00873             error= 0;
00874           /*
00875             If ON DUP KEY UPDATE updates a row instead of inserting one, it's
00876             like a regular UPDATE statement: it should not affect the value of a
00877             next SELECT LAST_INSERT_ID() or insert_id().
00878             Except if LAST_INSERT_ID(#) was in the INSERT query, which is
00879             handled separately by Session::arg_of_last_insert_id_function.
00880           */
00881           insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
00882           info->copied++;
00883         }
00884 
00885         if (table->next_number_field)
00886           table->cursor->adjust_next_insert_id_after_explicit_value(
00887             table->next_number_field->val_int());
00888         info->touched++;
00889 
00890         goto gok_or_after_err;
00891       }
00892       else /* DUP_REPLACE */
00893       {
00894   /*
00895     The manual defines the REPLACE semantics that it is either
00896     an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
00897     InnoDB do not function in the defined way if we allow MySQL
00898     to convert the latter operation internally to an UPDATE.
00899           We also should not perform this conversion if we have
00900           timestamp field with ON UPDATE which is different from DEFAULT.
00901           Another case when conversion should not be performed is when
00902           we have ON DELETE trigger on table so user may notice that
00903           we cheat here. Note that it is ok to do such conversion for
00904           tables which have ON UPDATE but have no ON DELETE triggers,
00905           we just should not expose this fact to users by invoking
00906           ON UPDATE triggers.
00907   */
00908   if (last_uniq_key(table,key_nr) &&
00909       !table->cursor->referenced_by_foreign_key() &&
00910             (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
00911              table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
00912         {
00913           if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
00914                   table->getInsertRecord())) &&
00915               error != HA_ERR_RECORD_IS_THE_SAME)
00916             goto err;
00917           if (error != HA_ERR_RECORD_IS_THE_SAME)
00918             info->deleted++;
00919           else
00920             error= 0;
00921           session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
00922           /*
00923             Since we pretend that we have done insert we should call
00924             its after triggers.
00925           */
00926           goto after_n_copied_inc;
00927         }
00928         else
00929         {
00930           if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
00931             goto err;
00932           info->deleted++;
00933           if (!table->cursor->has_transactions())
00934             session->transaction.stmt.markModifiedNonTransData();
00935           /* Let us attempt do write_row() once more */
00936         }
00937       }
00938     }
00939     session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
00940     /*
00941       Restore column maps if they where replaced during an duplicate key
00942       problem.
00943     */
00944     if (table->read_set != save_read_set ||
00945         table->write_set != save_write_set)
00946       table->column_bitmaps_set(*save_read_set, *save_write_set);
00947   }
00948   else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
00949   {
00950     if (!info->ignore ||
00951         table->cursor->is_fatal_error(error, HA_CHECK_DUP))
00952       goto err;
00953     table->cursor->restore_auto_increment(prev_insert_id);
00954     goto gok_or_after_err;
00955   }
00956 
00957 after_n_copied_inc:
00958   info->copied++;
00959   session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
00960 
00961 gok_or_after_err:
00962   if (!table->cursor->has_transactions())
00963     session->transaction.stmt.markModifiedNonTransData();
00964   return(0);
00965 
00966 err:
00967   info->last_errno= error;
00968   /* current_select is NULL if this is a delayed insert */
00969   if (session->lex().current_select)
00970     session->lex().current_select->no_error= 0;        // Give error
00971   table->print_error(error,MYF(0));
00972 
00973 before_err:
00974   table->cursor->restore_auto_increment(prev_insert_id);
00975   table->column_bitmaps_set(*save_read_set, *save_write_set);
00976   return 1;
00977 }
00978 
00979 
00980 /******************************************************************************
00981   Check that all fields with arn't null_fields are used
00982 ******************************************************************************/
00983 
00984 int check_that_all_fields_are_given_values(Session *session, Table *entry,
00985                                            TableList *)
00986 {
00987   int err= 0;
00988 
00989   for (Field **field=entry->getFields() ; *field ; field++)
00990   {
00991     if (((*field)->isWriteSet()) == false)
00992     {
00993       /*
00994        * If the field doesn't have any default value
00995        * and there is no actual value specified in the
00996        * INSERT statement, throw error ER_NO_DEFAULT_FOR_FIELD.
00997        */
00998       if (((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
00999         ((*field)->real_type() != DRIZZLE_TYPE_ENUM))
01000       {
01001         my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), (*field)->field_name);
01002         err= 1;
01003       }
01004     }
01005     else
01006     {
01007       /*
01008        * However, if an actual NULL value was specified
01009        * for the field and the field is a NOT NULL field,
01010        * throw ER_BAD_NULL_ERROR.
01011        *
01012        * Per the SQL standard, inserting NULL into a NOT NULL
01013        * field requires an error to be thrown.
01014        */
01015       if (((*field)->flags & NOT_NULL_FLAG) &&
01016           (*field)->is_null())
01017       {
01018         my_error(ER_BAD_NULL_ERROR, MYF(0), (*field)->field_name);
01019         err= 1;
01020       }
01021     }
01022   }
01023   return session->abortOnWarning() ? err : 0;
01024 }
01025 
01026 /***************************************************************************
01027   Store records in INSERT ... SELECT *
01028 ***************************************************************************/
01029 
01030 
01031 /*
01032   make insert specific preparation and checks after opening tables
01033 
01034   SYNOPSIS
01035     insert_select_prepare()
01036     session         thread handler
01037 
01038   RETURN
01039     false OK
01040     true  Error
01041 */
01042 
01043 bool insert_select_prepare(Session *session)
01044 {
01045   LEX *lex= &session->lex();
01046   Select_Lex *select_lex= &lex->select_lex;
01047 
01048   /*
01049     Select_Lex do not belong to INSERT statement, so we can't add WHERE
01050     clause if table is VIEW
01051   */
01052 
01053   if (prepare_insert(session, lex->query_tables,
01054                            lex->query_tables->table, lex->field_list, 0,
01055                            lex->update_list, lex->value_list,
01056                            lex->duplicates,
01057                            &select_lex->where, true, false, false))
01058     return(true);
01059 
01060   /*
01061     exclude first table from leaf tables list, because it belong to
01062     INSERT
01063   */
01064   assert(select_lex->leaf_tables != 0);
01065   lex->leaf_tables_insert= select_lex->leaf_tables;
01066   /* skip all leaf tables belonged to view where we are insert */
01067   select_lex->leaf_tables= select_lex->leaf_tables->next_leaf;
01068   return(false);
01069 }
01070 
01071 
01072 select_insert::select_insert(TableList *table_list_par, Table *table_par,
01073                              List<Item> *fields_par,
01074                              List<Item> *update_fields,
01075                              List<Item> *update_values,
01076                              enum_duplicates duplic,
01077                              bool ignore_check_option_errors) :
01078   table_list(table_list_par), table(table_par), fields(fields_par),
01079   autoinc_value_of_last_inserted_row(0),
01080   insert_into_view(table_list_par && 0 != 0)
01081 {
01082   info.handle_duplicates= duplic;
01083   info.ignore= ignore_check_option_errors;
01084   info.update_fields= update_fields;
01085   info.update_values= update_values;
01086 }
01087 
01088 
01089 int
01090 select_insert::prepare(List<Item> &values, Select_Lex_Unit *u)
01091 {
01092   int res;
01093   table_map map= 0;
01094   Select_Lex *lex_current_select_save= session->lex().current_select;
01095 
01096 
01097   unit= u;
01098 
01099   /*
01100     Since table in which we are going to insert is added to the first
01101     select, LEX::current_select should point to the first select while
01102     we are fixing fields from insert list.
01103   */
01104   session->lex().current_select= &session->lex().select_lex;
01105   res= check_insert_fields(session, table_list, *fields, values,
01106                            !insert_into_view, &map) ||
01107        setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0);
01108 
01109   if (!res && fields->size())
01110   {
01111     bool saved_abort_on_warning= session->abortOnWarning();
01112     session->setAbortOnWarning(not info.ignore);
01113     res= check_that_all_fields_are_given_values(session, table_list->table,
01114                                                 table_list);
01115     session->setAbortOnWarning(saved_abort_on_warning);
01116   }
01117 
01118   if (info.handle_duplicates == DUP_UPDATE && !res)
01119   {
01120     Name_resolution_context *context= &session->lex().select_lex.context;
01121     Name_resolution_context_state ctx_state;
01122 
01123     /* Save the state of the current name resolution context. */
01124     ctx_state.save_state(context, table_list);
01125 
01126     /* Perform name resolution only in the first table - 'table_list'. */
01127     table_list->next_local= 0;
01128     context->resolve_in_table_list_only(table_list);
01129 
01130     res= res || check_update_fields(session, context->table_list,
01131                                     *info.update_fields, &map);
01132     /*
01133       When we are not using GROUP BY and there are no ungrouped aggregate functions
01134       we can refer to other tables in the ON DUPLICATE KEY part.
01135       We use next_name_resolution_table descructively, so check it first (views?)
01136     */
01137     assert (!table_list->next_name_resolution_table);
01138     if (session->lex().select_lex.group_list.elements == 0 and
01139         not session->lex().select_lex.with_sum_func)
01140       /*
01141         We must make a single context out of the two separate name resolution contexts :
01142         the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
01143         To do that we must concatenate the two lists
01144       */
01145       table_list->next_name_resolution_table=
01146         ctx_state.get_first_name_resolution_table();
01147 
01148     res= res || setup_fields(session, 0, *info.update_values,
01149                              MARK_COLUMNS_READ, 0, 0);
01150     if (!res)
01151     {
01152       /*
01153         Traverse the update values list and substitute fields from the
01154         select for references (Item_ref objects) to them. This is done in
01155         order to get correct values from those fields when the select
01156         employs a temporary table.
01157       */
01158       List<Item>::iterator li(info.update_values->begin());
01159       Item *item;
01160 
01161       while ((item= li++))
01162       {
01163         item->transform(&Item::update_value_transformer,
01164                         (unsigned char*)session->lex().current_select);
01165       }
01166     }
01167 
01168     /* Restore the current context. */
01169     ctx_state.restore_state(context, table_list);
01170   }
01171 
01172   session->lex().current_select= lex_current_select_save;
01173   if (res)
01174     return(1);
01175   /*
01176     if it is INSERT into join view then check_insert_fields already found
01177     real table for insert
01178   */
01179   table= table_list->table;
01180 
01181   /*
01182     Is table which we are changing used somewhere in other parts of
01183     query
01184   */
01185   if (unique_table(table_list, table_list->next_global))
01186   {
01187     /* Using same table for INSERT and SELECT */
01188     session->lex().current_select->options|= OPTION_BUFFER_RESULT;
01189     session->lex().current_select->join->select_options|= OPTION_BUFFER_RESULT;
01190   }
01191   else if (not (session->lex().current_select->options & OPTION_BUFFER_RESULT))
01192   {
01193     /*
01194       We must not yet prepare the result table if it is the same as one of the
01195       source tables (INSERT SELECT). The preparation may disable
01196       indexes on the result table, which may be used during the select, if it
01197       is the same table (Bug #6034). Do the preparation after the select phase
01198       in select_insert::prepare2().
01199       We won't start bulk inserts at all if this statement uses functions or
01200       should invoke triggers since they may access to the same table too.
01201     */
01202     table->cursor->ha_start_bulk_insert((ha_rows) 0);
01203   }
01204   table->restoreRecordAsDefault();    // Get empty record
01205   table->next_number_field=table->found_next_number_field;
01206 
01207   session->cuted_fields=0;
01208 
01209   if (info.ignore || info.handle_duplicates != DUP_ERROR)
01210     table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
01211 
01212   if (info.handle_duplicates == DUP_REPLACE)
01213     table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
01214 
01215   if (info.handle_duplicates == DUP_UPDATE)
01216     table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
01217 
01218   session->setAbortOnWarning(not info.ignore);
01219   table->mark_columns_needed_for_insert();
01220 
01221 
01222   return(res);
01223 }
01224 
01225 
01226 /*
01227   Finish the preparation of the result table.
01228 
01229   SYNOPSIS
01230     select_insert::prepare2()
01231     void
01232 
01233   DESCRIPTION
01234     If the result table is the same as one of the source tables (INSERT SELECT),
01235     the result table is not finally prepared at the join prepair phase.
01236     Do the final preparation now.
01237 
01238   RETURN
01239     0   OK
01240 */
01241 
01242 int select_insert::prepare2(void)
01243 {
01244   if (session->lex().current_select->options & OPTION_BUFFER_RESULT)
01245     table->cursor->ha_start_bulk_insert((ha_rows) 0);
01246 
01247   return(0);
01248 }
01249 
01250 
01251 void select_insert::cleanup()
01252 {
01253   /* select_insert/select_create are never re-used in prepared statement */
01254   assert(0);
01255 }
01256 
01257 select_insert::~select_insert()
01258 {
01259 
01260   if (table)
01261   {
01262     table->next_number_field=0;
01263     table->auto_increment_field_not_null= false;
01264     table->cursor->ha_reset();
01265   }
01266   session->count_cuted_fields= CHECK_FIELD_IGNORE;
01267   session->setAbortOnWarning(false);
01268   return;
01269 }
01270 
01271 
01272 bool select_insert::send_data(List<Item> &values)
01273 {
01274 
01275   bool error= false;
01276 
01277   if (unit->offset_limit_cnt)
01278   {           // using limit offset,count
01279     unit->offset_limit_cnt--;
01280     return false;
01281   }
01282 
01283   session->count_cuted_fields= CHECK_FIELD_WARN;  // Calculate cuted fields
01284   store_values(values);
01285   session->count_cuted_fields= CHECK_FIELD_IGNORE;
01286   if (session->is_error())
01287     return true;
01288 
01289   // Release latches in case bulk insert takes a long time
01290   plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
01291 
01292   error= write_record(session, table, &info);
01293   table->auto_increment_field_not_null= false;
01294 
01295   if (!error)
01296   {
01297     if (info.handle_duplicates == DUP_UPDATE)
01298     {
01299       /*
01300         Restore fields of the record since it is possible that they were
01301         changed by ON DUPLICATE KEY UPDATE clause.
01302 
01303         If triggers exist then whey can modify some fields which were not
01304         originally touched by INSERT ... SELECT, so we have to restore
01305         their original values for the next row.
01306       */
01307       table->restoreRecordAsDefault();
01308     }
01309     if (table->next_number_field)
01310     {
01311       /*
01312         If no value has been autogenerated so far, we need to remember the
01313         value we just saw, we may need to send it to client in the end.
01314       */
01315       if (session->first_successful_insert_id_in_cur_stmt == 0) // optimization
01316         autoinc_value_of_last_inserted_row=
01317           table->next_number_field->val_int();
01318       /*
01319         Clear auto-increment field for the next record, if triggers are used
01320         we will clear it twice, but this should be cheap.
01321       */
01322       table->next_number_field->reset();
01323     }
01324   }
01325   return(error);
01326 }
01327 
01328 
01329 void select_insert::store_values(List<Item> &values)
01330 {
01331   if (fields->size())
01332     fill_record(session, *fields, values, true);
01333   else
01334     fill_record(session, table->getFields(), values, true);
01335 }
01336 
01337 void select_insert::send_error(drizzled::error_t errcode,const char *err)
01338 {
01339   my_message(errcode, err, MYF(0));
01340 }
01341 
01342 
01343 bool select_insert::send_eof()
01344 {
01345   int error;
01346   bool const trans_table= table->cursor->has_transactions();
01347   uint64_t id;
01348   bool changed;
01349 
01350   error= table->cursor->ha_end_bulk_insert();
01351   table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
01352   table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
01353 
01354   if ((changed= (info.copied || info.deleted || info.updated)))
01355   {
01356     /*
01357       We must invalidate the table in the query cache before binlog writing
01358       and autocommitOrRollback.
01359     */
01360     if (session->transaction.stmt.hasModifiedNonTransData())
01361       session->transaction.all.markModifiedNonTransData();
01362   }
01363   assert(trans_table || !changed ||
01364               session->transaction.stmt.hasModifiedNonTransData());
01365 
01366   table->cursor->ha_release_auto_increment();
01367 
01368   if (error)
01369   {
01370     table->print_error(error,MYF(0));
01371     DRIZZLE_INSERT_SELECT_DONE(error, 0);
01372     return 1;
01373   }
01374   char buff[160];
01375   if (info.ignore)
01376     snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
01377       (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
01378   else
01379     snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
01380       (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
01381   session->row_count_func= info.copied + info.deleted + info.updated;
01382 
01383   id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
01384     session->first_successful_insert_id_in_cur_stmt :
01385     (session->arg_of_last_insert_id_function ?
01386      session->first_successful_insert_id_in_prev_stmt :
01387      (info.copied ? autoinc_value_of_last_inserted_row : 0));
01388   session->my_ok((ulong) session->rowCount(),
01389                  info.copied + info.deleted + info.touched, id, buff);
01390   session->status_var.inserted_row_count+= session->rowCount();
01391   DRIZZLE_INSERT_SELECT_DONE(0, session->rowCount());
01392   return 0;
01393 }
01394 
01395 void select_insert::abort() {
01396 
01397 
01398   /*
01399     If the creation of the table failed (due to a syntax error, for
01400     example), no table will have been opened and therefore 'table'
01401     will be NULL. In that case, we still need to execute the rollback
01402     and the end of the function.
01403    */
01404   if (table)
01405   {
01406     bool changed, transactional_table;
01407 
01408     table->cursor->ha_end_bulk_insert();
01409 
01410     /*
01411       If at least one row has been inserted/modified and will stay in
01412       the table (the table doesn't have transactions) we must write to
01413       the binlog (and the error code will make the slave stop).
01414 
01415       For many errors (example: we got a duplicate key error while
01416       inserting into a MyISAM table), no row will be added to the table,
01417       so passing the error to the slave will not help since there will
01418       be an error code mismatch (the inserts will succeed on the slave
01419       with no error).
01420 
01421       If table creation failed, the number of rows modified will also be
01422       zero, so no check for that is made.
01423     */
01424     changed= (info.copied || info.deleted || info.updated);
01425     transactional_table= table->cursor->has_transactions();
01426     assert(transactional_table || !changed ||
01427     session->transaction.stmt.hasModifiedNonTransData());
01428     table->cursor->ha_release_auto_increment();
01429   }
01430 
01431   if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
01432   {
01433     DRIZZLE_INSERT_SELECT_DONE(0, info.copied + info.deleted + info.updated);
01434   }
01435 
01436   return;
01437 }
01438 
01439 
01440 /***************************************************************************
01441   CREATE TABLE (SELECT) ...
01442 ***************************************************************************/
01443 
01444 /*
01445   Create table from lists of fields and items (or just return Table
01446   object for pre-opened existing table).
01447 
01448   SYNOPSIS
01449     create_table_from_items()
01450       session          in     Thread object
01451       create_info  in     Create information (like MAX_ROWS, ENGINE or
01452                           temporary table flag)
01453       create_table in     Pointer to TableList object providing database
01454                           and name for table to be created or to be open
01455       alter_info   in/out Initial list of columns and indexes for the table
01456                           to be created
01457       items        in     List of items which should be used to produce rest
01458                           of fields for the table (corresponding fields will
01459                           be added to the end of alter_info->create_list)
01460       lock         out    Pointer to the DrizzleLock object for table created
01461                           (or open temporary table) will be returned in this
01462                           parameter. Since this table is not included in
01463                           Session::lock caller is responsible for explicitly
01464                           unlocking this table.
01465       hooks
01466 
01467   NOTES
01468     This function behaves differently for base and temporary tables:
01469     - For base table we assume that either table exists and was pre-opened
01470       and locked at openTablesLock() stage (and in this case we just
01471       emit error or warning and return pre-opened Table object) or special
01472       placeholder was put in table cache that guarantees that this table
01473       won't be created or opened until the placeholder will be removed
01474       (so there is an exclusive lock on this table).
01475     - We don't pre-open existing temporary table, instead we either open
01476       or create and then open table in this function.
01477 
01478     Since this function contains some logic specific to CREATE TABLE ...
01479     SELECT it should be changed before it can be used in other contexts.
01480 
01481   RETURN VALUES
01482     non-zero  Pointer to Table object for table created or opened
01483     0         Error
01484 */
01485 
01486 static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
01487                                       TableList *create_table,
01488               message::Table &table_proto,
01489                                       AlterInfo *alter_info,
01490                                       List<Item> *items,
01491                                       bool is_if_not_exists,
01492                                       DrizzleLock **lock,
01493               identifier::Table::const_reference identifier)
01494 {
01495   TableShare share(message::Table::INTERNAL);
01496   uint32_t select_field_count= items->size();
01497   /* Add selected items to field list */
01498   List<Item>::iterator it(items->begin());
01499   Item *item;
01500   Field *tmp_field;
01501 
01502   if (not (identifier.isTmp()) && create_table->table->db_stat)
01503   {
01504     /* Table already exists and was open at openTablesLock() stage. */
01505     if (is_if_not_exists)
01506     {
01507       create_info->table_existed= 1;    // Mark that table existed
01508       push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
01509                           ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
01510                           create_table->getTableName());
01511       return create_table->table;
01512     }
01513 
01514     my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
01515     return NULL;
01516   }
01517 
01518   {
01519     table::Shell tmp_table(share);    // Used during 'CreateField()'
01520 
01521     if (not table_proto.engine().name().compare("MyISAM"))
01522       tmp_table.getMutableShare()->db_low_byte_first= true;
01523     else if (not table_proto.engine().name().compare("MEMORY"))
01524       tmp_table.getMutableShare()->db_low_byte_first= true;
01525 
01526     tmp_table.in_use= session;
01527 
01528     while ((item=it++))
01529     {
01530       CreateField *cr_field;
01531       Field *field, *def_field;
01532       if (item->type() == Item::FUNC_ITEM)
01533       {
01534         if (item->result_type() != STRING_RESULT)
01535         {
01536           field= item->tmp_table_field(&tmp_table);
01537         }
01538         else
01539         {
01540           field= item->tmp_table_field_from_field_type(&tmp_table, 0);
01541         }
01542       }
01543       else
01544       {
01545         field= create_tmp_field(session, &tmp_table, item, item->type(),
01546                                 (Item ***) 0, &tmp_field, &def_field, false,
01547                                 false, false, 0);
01548       }
01549 
01550       if (!field ||
01551           !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
01552                                             ((Item_field *)item)->field :
01553                                             (Field*) 0))))
01554       {
01555         return NULL;
01556       }
01557 
01558       if (item->maybe_null)
01559       {
01560         cr_field->flags &= ~NOT_NULL_FLAG;
01561       }
01562 
01563       alter_info->create_list.push_back(cr_field);
01564     }
01565   }
01566 
01567   /*
01568     Create and lock table.
01569 
01570     Note that we either creating (or opening existing) temporary table or
01571     creating base table on which name we have exclusive lock. So code below
01572     should not cause deadlocks or races.
01573   */
01574   Table *table= 0;
01575   {
01576     if (not create_table_no_lock(session,
01577          identifier,
01578          create_info,
01579          table_proto,
01580          alter_info,
01581          false,
01582          select_field_count,
01583          is_if_not_exists))
01584     {
01585       if (create_info->table_existed && not identifier.isTmp())
01586       {
01587         /*
01588           This means that someone created table underneath server
01589           or it was created via different mysqld front-end to the
01590           cluster. We don't have much options but throw an error.
01591         */
01592         my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
01593         return NULL;
01594       }
01595 
01596       if (not identifier.isTmp())
01597       {
01598         /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
01599         boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
01600 
01601         if (create_table->table)
01602         {
01603           table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
01604 
01605           if (concurrent_table->reopen_name_locked_table(create_table, session))
01606           {
01607             (void)plugin::StorageEngine::dropTable(*session, identifier);
01608           }
01609           else
01610           {
01611             table= create_table->table;
01612           }
01613         }
01614         else
01615         {
01616           (void)plugin::StorageEngine::dropTable(*session, identifier);
01617         }
01618       }
01619       else
01620       {
01621         if (not (table= session->openTable(create_table, (bool*) 0,
01622                                            DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
01623             not create_info->table_existed)
01624         {
01625           /*
01626             This shouldn't happen as creation of temporary table should make
01627             it preparable for open. But let us do close_temporary_table() here
01628             just in case.
01629           */
01630           session->drop_temporary_table(identifier);
01631         }
01632       }
01633     }
01634     if (not table)                                   // open failed
01635       return NULL;
01636   }
01637 
01638   table->reginfo.lock_type=TL_WRITE;
01639   if (not ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH)))
01640   {
01641     if (*lock)
01642     {
01643       session->unlockTables(*lock);
01644       *lock= 0;
01645     }
01646 
01647     if (not create_info->table_existed)
01648       session->drop_open_table(table, identifier);
01649 
01650     return NULL;
01651   }
01652 
01653   return table;
01654 }
01655 
01656 
01657 int
01658 select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
01659 {
01660   DrizzleLock *extra_lock= NULL;
01661   /*
01662     For replication, the CREATE-SELECT statement is written
01663     in two pieces: the first transaction messsage contains
01664     the CREATE TABLE statement as a CreateTableStatement message
01665     necessary to create the table.
01666 
01667     The second transaction message contains all the InsertStatement
01668     and associated InsertRecords that should go into the table.
01669    */
01670 
01671   unit= u;
01672 
01673   if (not (table= create_table_from_items(session, create_info, create_table,
01674             table_proto,
01675             alter_info, &values,
01676             is_if_not_exists,
01677             &extra_lock, identifier)))
01678   {
01679     return(-1);       // abort() deletes table
01680   }
01681 
01682   if (extra_lock)
01683   {
01684     assert(m_plock == NULL);
01685 
01686     if (identifier.isTmp())
01687       m_plock= &m_lock;
01688     else
01689       m_plock= &session->extra_lock;
01690 
01691     *m_plock= extra_lock;
01692   }
01693 
01694   if (table->getShare()->sizeFields() < values.size())
01695   {
01696     my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
01697     return(-1);
01698   }
01699 
01700  /* First field to copy */
01701   field= table->getFields() + table->getShare()->sizeFields() - values.size();
01702 
01703   /* Mark all fields that are given values */
01704   for (Field **f= field ; *f ; f++)
01705   {
01706     table->setWriteSet((*f)->position());
01707   }
01708 
01709   /* Don't set timestamp if used */
01710   table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
01711   table->next_number_field=table->found_next_number_field;
01712 
01713   table->restoreRecordAsDefault();      // Get empty record
01714   session->cuted_fields=0;
01715   if (info.ignore || info.handle_duplicates != DUP_ERROR)
01716     table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
01717 
01718   if (info.handle_duplicates == DUP_REPLACE)
01719     table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
01720 
01721   if (info.handle_duplicates == DUP_UPDATE)
01722     table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
01723 
01724   table->cursor->ha_start_bulk_insert((ha_rows) 0);
01725   session->setAbortOnWarning(not info.ignore);
01726   if (check_that_all_fields_are_given_values(session, table, table_list))
01727     return(1);
01728 
01729   table->mark_columns_needed_for_insert();
01730   table->cursor->extra(HA_EXTRA_WRITE_CACHE);
01731   return(0);
01732 }
01733 
01734 void select_create::store_values(List<Item> &values)
01735 {
01736   fill_record(session, field, values, true);
01737 }
01738 
01739 
01740 void select_create::send_error(drizzled::error_t errcode,const char *err)
01741 {
01742   /*
01743     This will execute any rollbacks that are necessary before writing
01744     the transcation cache.
01745 
01746     We disable the binary log since nothing should be written to the
01747     binary log.  This disabling is important, since we potentially do
01748     a "roll back" of non-transactional tables by removing the table,
01749     and the actual rollback might generate events that should not be
01750     written to the binary log.
01751 
01752   */
01753   select_insert::send_error(errcode, err);
01754 }
01755 
01756 
01757 bool select_create::send_eof()
01758 {
01759   bool tmp=select_insert::send_eof();
01760   if (tmp)
01761     abort();
01762   else
01763   {
01764     /*
01765       Do an implicit commit at end of statement for non-temporary
01766       tables.  This can fail, but we should unlock the table
01767       nevertheless.
01768     */
01769     if (!table->getShare()->getType())
01770     {
01771       TransactionServices &transaction_services= TransactionServices::singleton();
01772       transaction_services.autocommitOrRollback(*session, 0);
01773       (void) session->endActiveTransaction();
01774     }
01775 
01776     table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
01777     table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
01778     if (m_plock)
01779     {
01780       session->unlockTables(*m_plock);
01781       *m_plock= NULL;
01782       m_plock= NULL;
01783     }
01784   }
01785   return tmp;
01786 }
01787 
01788 
01789 void select_create::abort()
01790 {
01791   /*
01792     In select_insert::abort() we roll back the statement, including
01793     truncating the transaction cache of the binary log. To do this, we
01794     pretend that the statement is transactional, even though it might
01795     be the case that it was not.
01796 
01797     We roll back the statement prior to deleting the table and prior
01798     to releasing the lock on the table, since there might be potential
01799     for failure if the rollback is executed after the drop or after
01800     unlocking the table.
01801 
01802     We also roll back the statement regardless of whether the creation
01803     of the table succeeded or not, since we need to reset the binary
01804     log state.
01805   */
01806   select_insert::abort();
01807 
01808   if (m_plock)
01809   {
01810     session->unlockTables(*m_plock);
01811     *m_plock= NULL;
01812     m_plock= NULL;
01813   }
01814 
01815   if (table)
01816   {
01817     table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
01818     table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
01819     if (not create_info->table_existed)
01820       session->drop_open_table(table, identifier);
01821     table= NULL;                                    // Safety
01822   }
01823 }
01824 
01825 } /* namespace drizzled */