00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include <config.h>
00020 #include <cstdio>
00021 #include <drizzled/sql_select.h>
00022 #include <drizzled/show.h>
00023 #include <drizzled/error.h>
00024 #include <drizzled/name_resolution_context_state.h>
00025 #include <drizzled/probes.h>
00026 #include <drizzled/sql_base.h>
00027 #include <drizzled/sql_load.h>
00028 #include <drizzled/field/epoch.h>
00029 #include <drizzled/lock.h>
00030 #include <drizzled/sql_table.h>
00031 #include <drizzled/pthread_globals.h>
00032 #include <drizzled/transaction_services.h>
00033 #include <drizzled/plugin/transactional_storage_engine.h>
00034 #include <drizzled/select_insert.h>
00035 #include <drizzled/select_create.h>
00036 #include <drizzled/table/shell.h>
00037 #include <drizzled/alter_info.h>
00038 #include <drizzled/sql_parse.h>
00039 #include <drizzled/sql_lex.h>
00040
00041 namespace drizzled
00042 {
00043
00044 extern plugin::StorageEngine *heap_engine;
00045 extern plugin::StorageEngine *myisam_engine;
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068 static int check_insert_fields(Session *session, TableList *table_list,
00069 List<Item> &fields, List<Item> &values,
00070 bool check_unique,
00071 table_map *)
00072 {
00073 Table *table= table_list->table;
00074
00075 if (fields.size() == 0 && values.size() != 0)
00076 {
00077 if (values.size() != table->getShare()->sizeFields())
00078 {
00079 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
00080 return -1;
00081 }
00082 clear_timestamp_auto_bits(table->timestamp_field_type,
00083 TIMESTAMP_AUTO_SET_ON_INSERT);
00084
00085
00086
00087
00088 table->setWriteSet();
00089 }
00090 else
00091 {
00092 Select_Lex *select_lex= &session->lex().select_lex;
00093 Name_resolution_context *context= &select_lex->context;
00094 Name_resolution_context_state ctx_state;
00095 int res;
00096
00097 if (fields.size() != values.size())
00098 {
00099 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
00100 return -1;
00101 }
00102
00103 session->dup_field= 0;
00104
00105
00106 ctx_state.save_state(context, table_list);
00107
00108
00109
00110
00111
00112 table_list->next_local= 0;
00113 context->resolve_in_table_list_only(table_list);
00114 res= setup_fields(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
00115
00116
00117 ctx_state.restore_state(context, table_list);
00118
00119 if (res)
00120 return -1;
00121
00122 if (check_unique && session->dup_field)
00123 {
00124 my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), session->dup_field->field_name);
00125 return -1;
00126 }
00127 if (table->timestamp_field)
00128 {
00129 if (table->timestamp_field->isWriteSet())
00130 {
00131 clear_timestamp_auto_bits(table->timestamp_field_type,
00132 TIMESTAMP_AUTO_SET_ON_INSERT);
00133 }
00134 else
00135 {
00136 table->setWriteSet(table->timestamp_field->position());
00137 }
00138 }
00139 }
00140
00141 return 0;
00142 }
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164 static int check_update_fields(Session *session, TableList *insert_table_list,
00165 List<Item> &update_fields,
00166 table_map *)
00167 {
00168 Table *table= insert_table_list->table;
00169 bool timestamp_mark= false;
00170
00171 if (table->timestamp_field)
00172 {
00173
00174
00175
00176
00177 timestamp_mark= table->write_set->test(table->timestamp_field->position());
00178 table->write_set->reset(table->timestamp_field->position());
00179 }
00180
00181
00182 if (setup_fields(session, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
00183 return -1;
00184
00185 if (table->timestamp_field)
00186 {
00187
00188 if (table->timestamp_field->isWriteSet())
00189 {
00190 clear_timestamp_auto_bits(table->timestamp_field_type,
00191 TIMESTAMP_AUTO_SET_ON_UPDATE);
00192 }
00193
00194 if (timestamp_mark)
00195 {
00196 table->setWriteSet(table->timestamp_field->position());
00197 }
00198 }
00199 return 0;
00200 }
00201
00202
00211 static
00212 void upgrade_lock_type(Session *,
00213 thr_lock_type *lock_type,
00214 enum_duplicates duplic,
00215 bool )
00216 {
00217 if (duplic == DUP_UPDATE ||
00218 (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
00219 {
00220 *lock_type= TL_WRITE_DEFAULT;
00221 return;
00222 }
00223 }
00224
00225
00234 bool insert_query(Session *session,TableList *table_list,
00235 List<Item> &fields,
00236 List<List_item> &values_list,
00237 List<Item> &update_fields,
00238 List<Item> &update_values,
00239 enum_duplicates duplic,
00240 bool ignore)
00241 {
00242 int error;
00243 bool transactional_table, joins_freed= false;
00244 bool changed;
00245 uint32_t value_count;
00246 ulong counter = 1;
00247 uint64_t id;
00248 CopyInfo info;
00249 Table *table= 0;
00250 List<List_item>::iterator its(values_list.begin());
00251 List_item *values;
00252 Name_resolution_context *context;
00253 Name_resolution_context_state ctx_state;
00254 thr_lock_type lock_type;
00255 Item *unused_conds= 0;
00256
00257
00258
00259
00260
00261
00262 upgrade_lock_type(session, &table_list->lock_type, duplic,
00263 values_list.size() > 1);
00264
00265 if (session->openTablesLock(table_list))
00266 {
00267 DRIZZLE_INSERT_DONE(1, 0);
00268 return true;
00269 }
00270
00271 lock_type= table_list->lock_type;
00272
00273 session->set_proc_info("init");
00274 session->used_tables=0;
00275 values= its++;
00276 value_count= values->size();
00277
00278 if (prepare_insert(session, table_list, table, fields, values,
00279 update_fields, update_values, duplic, &unused_conds,
00280 false,
00281 (fields.size() || !value_count ||
00282 (0) != 0), !ignore))
00283 {
00284 if (table != NULL)
00285 table->cursor->ha_release_auto_increment();
00286 if (!joins_freed)
00287 free_underlaid_joins(session, &session->lex().select_lex);
00288 session->setAbortOnWarning(false);
00289 DRIZZLE_INSERT_DONE(1, 0);
00290 return true;
00291 }
00292
00293
00294 table= table_list->table;
00295
00296 context= &session->lex().select_lex.context;
00297
00298
00299
00300
00301
00302 assert(!table_list->next_local);
00303 assert(!context->table_list->next_local);
00304 assert(!context->first_name_resolution_table->next_name_resolution_table);
00305
00306
00307 ctx_state.save_state(context, table_list);
00308
00309
00310
00311
00312
00313 table_list->next_local= 0;
00314 context->resolve_in_table_list_only(table_list);
00315
00316 while ((values= its++))
00317 {
00318 counter++;
00319 if (values->size() != value_count)
00320 {
00321 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
00322
00323 if (table != NULL)
00324 table->cursor->ha_release_auto_increment();
00325 if (!joins_freed)
00326 free_underlaid_joins(session, &session->lex().select_lex);
00327 session->setAbortOnWarning(false);
00328 DRIZZLE_INSERT_DONE(1, 0);
00329
00330 return true;
00331 }
00332 if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
00333 {
00334 if (table != NULL)
00335 table->cursor->ha_release_auto_increment();
00336 if (!joins_freed)
00337 free_underlaid_joins(session, &session->lex().select_lex);
00338 session->setAbortOnWarning(false);
00339 DRIZZLE_INSERT_DONE(1, 0);
00340 return true;
00341 }
00342 }
00343 its= values_list.begin();
00344
00345
00346 ctx_state.restore_state(context, table_list);
00347
00348
00349
00350
00351 info.ignore= ignore;
00352 info.handle_duplicates=duplic;
00353 info.update_fields= &update_fields;
00354 info.update_values= &update_values;
00355
00356
00357
00358
00359
00360
00361 session->count_cuted_fields= ignore ? CHECK_FIELD_WARN : CHECK_FIELD_ERROR_FOR_NULL;
00362
00363 session->cuted_fields = 0L;
00364 table->next_number_field=table->found_next_number_field;
00365
00366 error=0;
00367 session->set_proc_info("update");
00368 if (duplic == DUP_REPLACE)
00369 table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
00370 if (duplic == DUP_UPDATE)
00371 table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
00372 {
00373 if (duplic != DUP_ERROR || ignore)
00374 table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
00375 table->cursor->ha_start_bulk_insert(values_list.size());
00376 }
00377
00378
00379 session->setAbortOnWarning(not ignore);
00380
00381 table->mark_columns_needed_for_insert();
00382
00383 while ((values= its++))
00384 {
00385 if (fields.size() || !value_count)
00386 {
00387 table->restoreRecordAsDefault();
00388 if (fill_record(session, fields, *values))
00389 {
00390 if (values_list.size() != 1 && ! session->is_error())
00391 {
00392 info.records++;
00393 continue;
00394 }
00395
00396
00397
00398
00399
00400 error=1;
00401 break;
00402 }
00403 }
00404 else
00405 {
00406 table->restoreRecordAsDefault();
00407
00408 if (fill_record(session, table->getFields(), *values))
00409 {
00410 if (values_list.size() != 1 && ! session->is_error())
00411 {
00412 info.records++;
00413 continue;
00414 }
00415 error=1;
00416 break;
00417 }
00418 }
00419
00420
00421 plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
00422
00423 error=write_record(session, table ,&info);
00424 if (error)
00425 break;
00426 session->row_count++;
00427 }
00428
00429 free_underlaid_joins(session, &session->lex().select_lex);
00430 joins_freed= true;
00431
00432
00433
00434
00435
00436 {
00437
00438
00439
00440
00441 table->cursor->ha_release_auto_increment();
00442 if (table->cursor->ha_end_bulk_insert() && !error)
00443 {
00444 table->print_error(errno,MYF(0));
00445 error=1;
00446 }
00447 if (duplic != DUP_ERROR || ignore)
00448 table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
00449
00450 transactional_table= table->cursor->has_transactions();
00451
00452 changed= (info.copied || info.deleted || info.updated);
00453 if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
00454 {
00455 if (session->transaction.stmt.hasModifiedNonTransData())
00456 session->transaction.all.markModifiedNonTransData();
00457 }
00458 assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
00459
00460 }
00461 session->set_proc_info("end");
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472 id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
00473 session->first_successful_insert_id_in_cur_stmt :
00474 (session->arg_of_last_insert_id_function ?
00475 session->first_successful_insert_id_in_prev_stmt :
00476 ((table->next_number_field && info.copied) ?
00477 table->next_number_field->val_int() : 0));
00478 table->next_number_field=0;
00479 session->count_cuted_fields= CHECK_FIELD_IGNORE;
00480 table->auto_increment_field_not_null= false;
00481 if (duplic == DUP_REPLACE)
00482 table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
00483
00484 if (error)
00485 {
00486 if (table != NULL)
00487 table->cursor->ha_release_auto_increment();
00488 if (!joins_freed)
00489 free_underlaid_joins(session, &session->lex().select_lex);
00490 session->setAbortOnWarning(false);
00491 DRIZZLE_INSERT_DONE(1, 0);
00492 return true;
00493 }
00494
00495 if (values_list.size() == 1 && (!(session->options & OPTION_WARNINGS) ||
00496 !session->cuted_fields))
00497 {
00498 session->row_count_func= info.copied + info.deleted + info.updated;
00499 session->my_ok((ulong) session->rowCount(),
00500 info.copied + info.deleted + info.touched, id);
00501 }
00502 else
00503 {
00504 char buff[160];
00505 if (ignore)
00506 snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
00507 (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
00508 else
00509 snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
00510 (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
00511 session->row_count_func= info.copied + info.deleted + info.updated;
00512 session->my_ok((ulong) session->rowCount(),
00513 info.copied + info.deleted + info.touched, id, buff);
00514 }
00515 session->status_var.inserted_row_count+= session->rowCount();
00516 session->setAbortOnWarning(false);
00517 DRIZZLE_INSERT_DONE(0, session->rowCount());
00518
00519 return false;
00520 }
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539 static bool prepare_insert_check_table(Session *session, TableList *table_list,
00540 List<Item> &,
00541 bool select_insert)
00542 {
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552 if (setup_tables_and_check_access(session, &session->lex().select_lex.context,
00553 &session->lex().select_lex.top_join_list,
00554 table_list,
00555 &session->lex().select_lex.leaf_tables,
00556 select_insert))
00557 return(true);
00558
00559 return(false);
00560 }
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594 bool prepare_insert(Session *session, TableList *table_list,
00595 Table *table, List<Item> &fields, List_item *values,
00596 List<Item> &update_fields, List<Item> &update_values,
00597 enum_duplicates duplic,
00598 COND **,
00599 bool select_insert,
00600 bool check_fields, bool abort_on_warning)
00601 {
00602 Select_Lex *select_lex= &session->lex().select_lex;
00603 Name_resolution_context *context= &select_lex->context;
00604 Name_resolution_context_state ctx_state;
00605 bool insert_into_view= (0 != 0);
00606 bool res= 0;
00607 table_map map= 0;
00608
00609
00610 assert (!select_insert || !values);
00611
00612
00613
00614
00615
00616
00617 if (not select_insert)
00618 {
00619 for (Select_Lex_Unit *un= select_lex->first_inner_unit();
00620 un;
00621 un= un->next_unit())
00622 {
00623 for (Select_Lex *sl= un->first_select();
00624 sl;
00625 sl= sl->next_select())
00626 {
00627 sl->context.outer_context= 0;
00628 }
00629 }
00630 }
00631
00632 if (duplic == DUP_UPDATE)
00633 {
00634
00635 if (table_list->set_insert_values(session->mem_root))
00636 return(true);
00637 }
00638
00639 if (prepare_insert_check_table(session, table_list, fields, select_insert))
00640 return(true);
00641
00642
00643
00644 if (values)
00645 {
00646
00647 assert (!select_lex->group_list.elements);
00648
00649
00650 ctx_state.save_state(context, table_list);
00651
00652
00653
00654
00655
00656 table_list->next_local= 0;
00657 context->resolve_in_table_list_only(table_list);
00658
00659 res= check_insert_fields(session, context->table_list, fields, *values,
00660 !insert_into_view, &map) ||
00661 setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0);
00662
00663 if (!res && check_fields)
00664 {
00665 bool saved_abort_on_warning= session->abortOnWarning();
00666
00667 session->setAbortOnWarning(abort_on_warning);
00668 res= check_that_all_fields_are_given_values(session,
00669 table ? table :
00670 context->table_list->table,
00671 context->table_list);
00672 session->setAbortOnWarning(saved_abort_on_warning);
00673 }
00674
00675 if (!res && duplic == DUP_UPDATE)
00676 {
00677 res= check_update_fields(session, context->table_list, update_fields, &map);
00678 }
00679
00680
00681 ctx_state.restore_state(context, table_list);
00682
00683 if (not res)
00684 res= setup_fields(session, 0, update_values, MARK_COLUMNS_READ, 0, 0);
00685 }
00686
00687 if (res)
00688 return(res);
00689
00690 if (not table)
00691 table= table_list->table;
00692
00693 if (not select_insert)
00694 {
00695 TableList *duplicate;
00696 if ((duplicate= unique_table(table_list, table_list->next_global, true)))
00697 {
00698 my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
00699
00700 return true;
00701 }
00702 }
00703
00704 if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
00705 table->prepare_for_position();
00706
00707 return false;
00708 }
00709
00710
00711
00712
00713 static int last_uniq_key(Table *table,uint32_t keynr)
00714 {
00715 while (++keynr < table->getShare()->sizeKeys())
00716 if (table->key_info[keynr].flags & HA_NOSAME)
00717 return 0;
00718 return 1;
00719 }
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746
00747
00748
00749 int write_record(Session *session, Table *table,CopyInfo *info)
00750 {
00751 int error;
00752 std::vector<unsigned char> key;
00753 boost::dynamic_bitset<> *save_read_set, *save_write_set;
00754 uint64_t prev_insert_id= table->cursor->next_insert_id;
00755 uint64_t insert_id_for_cur_row= 0;
00756
00757
00758 info->records++;
00759 save_read_set= table->read_set;
00760 save_write_set= table->write_set;
00761
00762 if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
00763 {
00764 while ((error=table->cursor->insertRecord(table->getInsertRecord())))
00765 {
00766 uint32_t key_nr;
00767
00768
00769
00770
00771
00772
00773
00774 if (table->cursor->insert_id_for_cur_row > 0)
00775 insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
00776 else
00777 table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
00778 bool is_duplicate_key_error;
00779 if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
00780 goto err;
00781 is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
00782 if (!is_duplicate_key_error)
00783 {
00784
00785
00786
00787
00788
00789 if (info->ignore)
00790 goto gok_or_after_err;
00791 goto err;
00792 }
00793 if ((int) (key_nr = table->get_dup_key(error)) < 0)
00794 {
00795 error= HA_ERR_FOUND_DUPP_KEY;
00796 goto err;
00797 }
00798
00799 table->use_all_columns();
00800
00801
00802
00803
00804
00805 if (info->handle_duplicates == DUP_REPLACE &&
00806 table->next_number_field &&
00807 key_nr == table->getShare()->next_number_index &&
00808 (insert_id_for_cur_row > 0))
00809 goto err;
00810 if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
00811 {
00812 if (table->cursor->rnd_pos(table->getUpdateRecord(),table->cursor->dup_ref))
00813 goto err;
00814 }
00815 else
00816 {
00817 if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE))
00818 {
00819 error=errno;
00820 goto err;
00821 }
00822
00823 if (not key.size())
00824 {
00825 key.resize(table->getShare()->max_unique_length);
00826 }
00827 key_copy(&key[0], table->getInsertRecord(), table->key_info+key_nr, 0);
00828 if ((error=(table->cursor->index_read_idx_map(table->getUpdateRecord(),key_nr,
00829 &key[0], HA_WHOLE_KEY,
00830 HA_READ_KEY_EXACT))))
00831 goto err;
00832 }
00833 if (info->handle_duplicates == DUP_UPDATE)
00834 {
00835
00836
00837
00838
00839
00840 assert(table->insert_values.size());
00841 table->storeRecordAsInsert();
00842 table->restoreRecord();
00843 assert(info->update_fields->size() ==
00844 info->update_values->size());
00845 if (fill_record(session, *info->update_fields,
00846 *info->update_values,
00847 info->ignore))
00848 goto before_err;
00849
00850 table->cursor->restore_auto_increment(prev_insert_id);
00851 if (table->next_number_field)
00852 table->cursor->adjust_next_insert_id_after_explicit_value(
00853 table->next_number_field->val_int());
00854 info->touched++;
00855
00856 if (! table->records_are_comparable() || table->compare_records())
00857 {
00858 if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
00859 table->getInsertRecord())) &&
00860 error != HA_ERR_RECORD_IS_THE_SAME)
00861 {
00862 if (info->ignore &&
00863 !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
00864 {
00865 goto gok_or_after_err;
00866 }
00867 goto err;
00868 }
00869
00870 if (error != HA_ERR_RECORD_IS_THE_SAME)
00871 info->updated++;
00872 else
00873 error= 0;
00874
00875
00876
00877
00878
00879
00880
00881 insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
00882 info->copied++;
00883 }
00884
00885 if (table->next_number_field)
00886 table->cursor->adjust_next_insert_id_after_explicit_value(
00887 table->next_number_field->val_int());
00888 info->touched++;
00889
00890 goto gok_or_after_err;
00891 }
00892 else
00893 {
00894
00895
00896
00897
00898
00899
00900
00901
00902
00903
00904
00905
00906
00907
00908 if (last_uniq_key(table,key_nr) &&
00909 !table->cursor->referenced_by_foreign_key() &&
00910 (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
00911 table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
00912 {
00913 if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
00914 table->getInsertRecord())) &&
00915 error != HA_ERR_RECORD_IS_THE_SAME)
00916 goto err;
00917 if (error != HA_ERR_RECORD_IS_THE_SAME)
00918 info->deleted++;
00919 else
00920 error= 0;
00921 session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
00922
00923
00924
00925
00926 goto after_n_copied_inc;
00927 }
00928 else
00929 {
00930 if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
00931 goto err;
00932 info->deleted++;
00933 if (!table->cursor->has_transactions())
00934 session->transaction.stmt.markModifiedNonTransData();
00935
00936 }
00937 }
00938 }
00939 session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
00940
00941
00942
00943
00944 if (table->read_set != save_read_set ||
00945 table->write_set != save_write_set)
00946 table->column_bitmaps_set(*save_read_set, *save_write_set);
00947 }
00948 else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
00949 {
00950 if (!info->ignore ||
00951 table->cursor->is_fatal_error(error, HA_CHECK_DUP))
00952 goto err;
00953 table->cursor->restore_auto_increment(prev_insert_id);
00954 goto gok_or_after_err;
00955 }
00956
00957 after_n_copied_inc:
00958 info->copied++;
00959 session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
00960
00961 gok_or_after_err:
00962 if (!table->cursor->has_transactions())
00963 session->transaction.stmt.markModifiedNonTransData();
00964 return(0);
00965
00966 err:
00967 info->last_errno= error;
00968
00969 if (session->lex().current_select)
00970 session->lex().current_select->no_error= 0;
00971 table->print_error(error,MYF(0));
00972
00973 before_err:
00974 table->cursor->restore_auto_increment(prev_insert_id);
00975 table->column_bitmaps_set(*save_read_set, *save_write_set);
00976 return 1;
00977 }
00978
00979
00980
00981
00982
00983
00984 int check_that_all_fields_are_given_values(Session *session, Table *entry,
00985 TableList *)
00986 {
00987 int err= 0;
00988
00989 for (Field **field=entry->getFields() ; *field ; field++)
00990 {
00991 if (((*field)->isWriteSet()) == false)
00992 {
00993
00994
00995
00996
00997
00998 if (((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
00999 ((*field)->real_type() != DRIZZLE_TYPE_ENUM))
01000 {
01001 my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), (*field)->field_name);
01002 err= 1;
01003 }
01004 }
01005 else
01006 {
01007
01008
01009
01010
01011
01012
01013
01014
01015 if (((*field)->flags & NOT_NULL_FLAG) &&
01016 (*field)->is_null())
01017 {
01018 my_error(ER_BAD_NULL_ERROR, MYF(0), (*field)->field_name);
01019 err= 1;
01020 }
01021 }
01022 }
01023 return session->abortOnWarning() ? err : 0;
01024 }
01025
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040
01041
01042
01043 bool insert_select_prepare(Session *session)
01044 {
01045 LEX *lex= &session->lex();
01046 Select_Lex *select_lex= &lex->select_lex;
01047
01048
01049
01050
01051
01052
01053 if (prepare_insert(session, lex->query_tables,
01054 lex->query_tables->table, lex->field_list, 0,
01055 lex->update_list, lex->value_list,
01056 lex->duplicates,
01057 &select_lex->where, true, false, false))
01058 return(true);
01059
01060
01061
01062
01063
01064 assert(select_lex->leaf_tables != 0);
01065 lex->leaf_tables_insert= select_lex->leaf_tables;
01066
01067 select_lex->leaf_tables= select_lex->leaf_tables->next_leaf;
01068 return(false);
01069 }
01070
01071
01072 select_insert::select_insert(TableList *table_list_par, Table *table_par,
01073 List<Item> *fields_par,
01074 List<Item> *update_fields,
01075 List<Item> *update_values,
01076 enum_duplicates duplic,
01077 bool ignore_check_option_errors) :
01078 table_list(table_list_par), table(table_par), fields(fields_par),
01079 autoinc_value_of_last_inserted_row(0),
01080 insert_into_view(table_list_par && 0 != 0)
01081 {
01082 info.handle_duplicates= duplic;
01083 info.ignore= ignore_check_option_errors;
01084 info.update_fields= update_fields;
01085 info.update_values= update_values;
01086 }
01087
01088
01089 int
01090 select_insert::prepare(List<Item> &values, Select_Lex_Unit *u)
01091 {
01092 int res;
01093 table_map map= 0;
01094 Select_Lex *lex_current_select_save= session->lex().current_select;
01095
01096
01097 unit= u;
01098
01099
01100
01101
01102
01103
01104 session->lex().current_select= &session->lex().select_lex;
01105 res= check_insert_fields(session, table_list, *fields, values,
01106 !insert_into_view, &map) ||
01107 setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0);
01108
01109 if (!res && fields->size())
01110 {
01111 bool saved_abort_on_warning= session->abortOnWarning();
01112 session->setAbortOnWarning(not info.ignore);
01113 res= check_that_all_fields_are_given_values(session, table_list->table,
01114 table_list);
01115 session->setAbortOnWarning(saved_abort_on_warning);
01116 }
01117
01118 if (info.handle_duplicates == DUP_UPDATE && !res)
01119 {
01120 Name_resolution_context *context= &session->lex().select_lex.context;
01121 Name_resolution_context_state ctx_state;
01122
01123
01124 ctx_state.save_state(context, table_list);
01125
01126
01127 table_list->next_local= 0;
01128 context->resolve_in_table_list_only(table_list);
01129
01130 res= res || check_update_fields(session, context->table_list,
01131 *info.update_fields, &map);
01132
01133
01134
01135
01136
01137 assert (!table_list->next_name_resolution_table);
01138 if (session->lex().select_lex.group_list.elements == 0 and
01139 not session->lex().select_lex.with_sum_func)
01140
01141
01142
01143
01144
01145 table_list->next_name_resolution_table=
01146 ctx_state.get_first_name_resolution_table();
01147
01148 res= res || setup_fields(session, 0, *info.update_values,
01149 MARK_COLUMNS_READ, 0, 0);
01150 if (!res)
01151 {
01152
01153
01154
01155
01156
01157
01158 List<Item>::iterator li(info.update_values->begin());
01159 Item *item;
01160
01161 while ((item= li++))
01162 {
01163 item->transform(&Item::update_value_transformer,
01164 (unsigned char*)session->lex().current_select);
01165 }
01166 }
01167
01168
01169 ctx_state.restore_state(context, table_list);
01170 }
01171
01172 session->lex().current_select= lex_current_select_save;
01173 if (res)
01174 return(1);
01175
01176
01177
01178
01179 table= table_list->table;
01180
01181
01182
01183
01184
01185 if (unique_table(table_list, table_list->next_global))
01186 {
01187
01188 session->lex().current_select->options|= OPTION_BUFFER_RESULT;
01189 session->lex().current_select->join->select_options|= OPTION_BUFFER_RESULT;
01190 }
01191 else if (not (session->lex().current_select->options & OPTION_BUFFER_RESULT))
01192 {
01193
01194
01195
01196
01197
01198
01199
01200
01201
01202 table->cursor->ha_start_bulk_insert((ha_rows) 0);
01203 }
01204 table->restoreRecordAsDefault();
01205 table->next_number_field=table->found_next_number_field;
01206
01207 session->cuted_fields=0;
01208
01209 if (info.ignore || info.handle_duplicates != DUP_ERROR)
01210 table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
01211
01212 if (info.handle_duplicates == DUP_REPLACE)
01213 table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
01214
01215 if (info.handle_duplicates == DUP_UPDATE)
01216 table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
01217
01218 session->setAbortOnWarning(not info.ignore);
01219 table->mark_columns_needed_for_insert();
01220
01221
01222 return(res);
01223 }
01224
01225
01226
01227
01228
01229
01230
01231
01232
01233
01234
01235
01236
01237
01238
01239
01240
01241
01242 int select_insert::prepare2(void)
01243 {
01244 if (session->lex().current_select->options & OPTION_BUFFER_RESULT)
01245 table->cursor->ha_start_bulk_insert((ha_rows) 0);
01246
01247 return(0);
01248 }
01249
01250
01251 void select_insert::cleanup()
01252 {
01253
01254 assert(0);
01255 }
01256
01257 select_insert::~select_insert()
01258 {
01259
01260 if (table)
01261 {
01262 table->next_number_field=0;
01263 table->auto_increment_field_not_null= false;
01264 table->cursor->ha_reset();
01265 }
01266 session->count_cuted_fields= CHECK_FIELD_IGNORE;
01267 session->setAbortOnWarning(false);
01268 return;
01269 }
01270
01271
01272 bool select_insert::send_data(List<Item> &values)
01273 {
01274
01275 bool error= false;
01276
01277 if (unit->offset_limit_cnt)
01278 {
01279 unit->offset_limit_cnt--;
01280 return false;
01281 }
01282
01283 session->count_cuted_fields= CHECK_FIELD_WARN;
01284 store_values(values);
01285 session->count_cuted_fields= CHECK_FIELD_IGNORE;
01286 if (session->is_error())
01287 return true;
01288
01289
01290 plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
01291
01292 error= write_record(session, table, &info);
01293 table->auto_increment_field_not_null= false;
01294
01295 if (!error)
01296 {
01297 if (info.handle_duplicates == DUP_UPDATE)
01298 {
01299
01300
01301
01302
01303
01304
01305
01306
01307 table->restoreRecordAsDefault();
01308 }
01309 if (table->next_number_field)
01310 {
01311
01312
01313
01314
01315 if (session->first_successful_insert_id_in_cur_stmt == 0)
01316 autoinc_value_of_last_inserted_row=
01317 table->next_number_field->val_int();
01318
01319
01320
01321
01322 table->next_number_field->reset();
01323 }
01324 }
01325 return(error);
01326 }
01327
01328
01329 void select_insert::store_values(List<Item> &values)
01330 {
01331 if (fields->size())
01332 fill_record(session, *fields, values, true);
01333 else
01334 fill_record(session, table->getFields(), values, true);
01335 }
01336
01337 void select_insert::send_error(drizzled::error_t errcode,const char *err)
01338 {
01339 my_message(errcode, err, MYF(0));
01340 }
01341
01342
01343 bool select_insert::send_eof()
01344 {
01345 int error;
01346 bool const trans_table= table->cursor->has_transactions();
01347 uint64_t id;
01348 bool changed;
01349
01350 error= table->cursor->ha_end_bulk_insert();
01351 table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
01352 table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
01353
01354 if ((changed= (info.copied || info.deleted || info.updated)))
01355 {
01356
01357
01358
01359
01360 if (session->transaction.stmt.hasModifiedNonTransData())
01361 session->transaction.all.markModifiedNonTransData();
01362 }
01363 assert(trans_table || !changed ||
01364 session->transaction.stmt.hasModifiedNonTransData());
01365
01366 table->cursor->ha_release_auto_increment();
01367
01368 if (error)
01369 {
01370 table->print_error(error,MYF(0));
01371 DRIZZLE_INSERT_SELECT_DONE(error, 0);
01372 return 1;
01373 }
01374 char buff[160];
01375 if (info.ignore)
01376 snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
01377 (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
01378 else
01379 snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
01380 (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
01381 session->row_count_func= info.copied + info.deleted + info.updated;
01382
01383 id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
01384 session->first_successful_insert_id_in_cur_stmt :
01385 (session->arg_of_last_insert_id_function ?
01386 session->first_successful_insert_id_in_prev_stmt :
01387 (info.copied ? autoinc_value_of_last_inserted_row : 0));
01388 session->my_ok((ulong) session->rowCount(),
01389 info.copied + info.deleted + info.touched, id, buff);
01390 session->status_var.inserted_row_count+= session->rowCount();
01391 DRIZZLE_INSERT_SELECT_DONE(0, session->rowCount());
01392 return 0;
01393 }
01394
01395 void select_insert::abort() {
01396
01397
01398
01399
01400
01401
01402
01403
01404 if (table)
01405 {
01406 bool changed, transactional_table;
01407
01408 table->cursor->ha_end_bulk_insert();
01409
01410
01411
01412
01413
01414
01415
01416
01417
01418
01419
01420
01421
01422
01423
01424 changed= (info.copied || info.deleted || info.updated);
01425 transactional_table= table->cursor->has_transactions();
01426 assert(transactional_table || !changed ||
01427 session->transaction.stmt.hasModifiedNonTransData());
01428 table->cursor->ha_release_auto_increment();
01429 }
01430
01431 if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
01432 {
01433 DRIZZLE_INSERT_SELECT_DONE(0, info.copied + info.deleted + info.updated);
01434 }
01435
01436 return;
01437 }
01438
01439
01440
01441
01442
01443
01444
01445
01446
01447
01448
01449
01450
01451
01452
01453
01454
01455
01456
01457
01458
01459
01460
01461
01462
01463
01464
01465
01466
01467
01468
01469
01470
01471
01472
01473
01474
01475
01476
01477
01478
01479
01480
01481
01482
01483
01484
01485
01486 static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
01487 TableList *create_table,
01488 message::Table &table_proto,
01489 AlterInfo *alter_info,
01490 List<Item> *items,
01491 bool is_if_not_exists,
01492 DrizzleLock **lock,
01493 identifier::Table::const_reference identifier)
01494 {
01495 TableShare share(message::Table::INTERNAL);
01496 uint32_t select_field_count= items->size();
01497
01498 List<Item>::iterator it(items->begin());
01499 Item *item;
01500 Field *tmp_field;
01501
01502 if (not (identifier.isTmp()) && create_table->table->db_stat)
01503 {
01504
01505 if (is_if_not_exists)
01506 {
01507 create_info->table_existed= 1;
01508 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
01509 ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
01510 create_table->getTableName());
01511 return create_table->table;
01512 }
01513
01514 my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
01515 return NULL;
01516 }
01517
01518 {
01519 table::Shell tmp_table(share);
01520
01521 if (not table_proto.engine().name().compare("MyISAM"))
01522 tmp_table.getMutableShare()->db_low_byte_first= true;
01523 else if (not table_proto.engine().name().compare("MEMORY"))
01524 tmp_table.getMutableShare()->db_low_byte_first= true;
01525
01526 tmp_table.in_use= session;
01527
01528 while ((item=it++))
01529 {
01530 CreateField *cr_field;
01531 Field *field, *def_field;
01532 if (item->type() == Item::FUNC_ITEM)
01533 {
01534 if (item->result_type() != STRING_RESULT)
01535 {
01536 field= item->tmp_table_field(&tmp_table);
01537 }
01538 else
01539 {
01540 field= item->tmp_table_field_from_field_type(&tmp_table, 0);
01541 }
01542 }
01543 else
01544 {
01545 field= create_tmp_field(session, &tmp_table, item, item->type(),
01546 (Item ***) 0, &tmp_field, &def_field, false,
01547 false, false, 0);
01548 }
01549
01550 if (!field ||
01551 !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
01552 ((Item_field *)item)->field :
01553 (Field*) 0))))
01554 {
01555 return NULL;
01556 }
01557
01558 if (item->maybe_null)
01559 {
01560 cr_field->flags &= ~NOT_NULL_FLAG;
01561 }
01562
01563 alter_info->create_list.push_back(cr_field);
01564 }
01565 }
01566
01567
01568
01569
01570
01571
01572
01573
01574 Table *table= 0;
01575 {
01576 if (not create_table_no_lock(session,
01577 identifier,
01578 create_info,
01579 table_proto,
01580 alter_info,
01581 false,
01582 select_field_count,
01583 is_if_not_exists))
01584 {
01585 if (create_info->table_existed && not identifier.isTmp())
01586 {
01587
01588
01589
01590
01591
01592 my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
01593 return NULL;
01594 }
01595
01596 if (not identifier.isTmp())
01597 {
01598
01599 boost::mutex::scoped_lock scopedLock(table::Cache::singleton().mutex());
01600
01601 if (create_table->table)
01602 {
01603 table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
01604
01605 if (concurrent_table->reopen_name_locked_table(create_table, session))
01606 {
01607 (void)plugin::StorageEngine::dropTable(*session, identifier);
01608 }
01609 else
01610 {
01611 table= create_table->table;
01612 }
01613 }
01614 else
01615 {
01616 (void)plugin::StorageEngine::dropTable(*session, identifier);
01617 }
01618 }
01619 else
01620 {
01621 if (not (table= session->openTable(create_table, (bool*) 0,
01622 DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
01623 not create_info->table_existed)
01624 {
01625
01626
01627
01628
01629
01630 session->drop_temporary_table(identifier);
01631 }
01632 }
01633 }
01634 if (not table)
01635 return NULL;
01636 }
01637
01638 table->reginfo.lock_type=TL_WRITE;
01639 if (not ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH)))
01640 {
01641 if (*lock)
01642 {
01643 session->unlockTables(*lock);
01644 *lock= 0;
01645 }
01646
01647 if (not create_info->table_existed)
01648 session->drop_open_table(table, identifier);
01649
01650 return NULL;
01651 }
01652
01653 return table;
01654 }
01655
01656
01657 int
01658 select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
01659 {
01660 DrizzleLock *extra_lock= NULL;
01661
01662
01663
01664
01665
01666
01667
01668
01669
01670
01671 unit= u;
01672
01673 if (not (table= create_table_from_items(session, create_info, create_table,
01674 table_proto,
01675 alter_info, &values,
01676 is_if_not_exists,
01677 &extra_lock, identifier)))
01678 {
01679 return(-1);
01680 }
01681
01682 if (extra_lock)
01683 {
01684 assert(m_plock == NULL);
01685
01686 if (identifier.isTmp())
01687 m_plock= &m_lock;
01688 else
01689 m_plock= &session->extra_lock;
01690
01691 *m_plock= extra_lock;
01692 }
01693
01694 if (table->getShare()->sizeFields() < values.size())
01695 {
01696 my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
01697 return(-1);
01698 }
01699
01700
01701 field= table->getFields() + table->getShare()->sizeFields() - values.size();
01702
01703
01704 for (Field **f= field ; *f ; f++)
01705 {
01706 table->setWriteSet((*f)->position());
01707 }
01708
01709
01710 table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
01711 table->next_number_field=table->found_next_number_field;
01712
01713 table->restoreRecordAsDefault();
01714 session->cuted_fields=0;
01715 if (info.ignore || info.handle_duplicates != DUP_ERROR)
01716 table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
01717
01718 if (info.handle_duplicates == DUP_REPLACE)
01719 table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
01720
01721 if (info.handle_duplicates == DUP_UPDATE)
01722 table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
01723
01724 table->cursor->ha_start_bulk_insert((ha_rows) 0);
01725 session->setAbortOnWarning(not info.ignore);
01726 if (check_that_all_fields_are_given_values(session, table, table_list))
01727 return(1);
01728
01729 table->mark_columns_needed_for_insert();
01730 table->cursor->extra(HA_EXTRA_WRITE_CACHE);
01731 return(0);
01732 }
01733
01734 void select_create::store_values(List<Item> &values)
01735 {
01736 fill_record(session, field, values, true);
01737 }
01738
01739
01740 void select_create::send_error(drizzled::error_t errcode,const char *err)
01741 {
01742
01743
01744
01745
01746
01747
01748
01749
01750
01751
01752
01753 select_insert::send_error(errcode, err);
01754 }
01755
01756
01757 bool select_create::send_eof()
01758 {
01759 bool tmp=select_insert::send_eof();
01760 if (tmp)
01761 abort();
01762 else
01763 {
01764
01765
01766
01767
01768
01769 if (!table->getShare()->getType())
01770 {
01771 TransactionServices &transaction_services= TransactionServices::singleton();
01772 transaction_services.autocommitOrRollback(*session, 0);
01773 (void) session->endActiveTransaction();
01774 }
01775
01776 table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
01777 table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
01778 if (m_plock)
01779 {
01780 session->unlockTables(*m_plock);
01781 *m_plock= NULL;
01782 m_plock= NULL;
01783 }
01784 }
01785 return tmp;
01786 }
01787
01788
01789 void select_create::abort()
01790 {
01791
01792
01793
01794
01795
01796
01797
01798
01799
01800
01801
01802
01803
01804
01805
01806 select_insert::abort();
01807
01808 if (m_plock)
01809 {
01810 session->unlockTables(*m_plock);
01811 *m_plock= NULL;
01812 m_plock= NULL;
01813 }
01814
01815 if (table)
01816 {
01817 table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
01818 table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
01819 if (not create_info->table_existed)
01820 session->drop_open_table(table, identifier);
01821 table= NULL;
01822 }
01823 }
01824
01825 }