Drizzled Public API Documentation

row0ins.cc

00001 /*****************************************************************************
00002 
00003 Copyright (C) 1996, 2010, Innobase Oy. All Rights Reserved.
00004 
00005 This program is free software; you can redistribute it and/or modify it under
00006 the terms of the GNU General Public License as published by the Free Software
00007 Foundation; version 2 of the License.
00008 
00009 This program is distributed in the hope that it will be useful, but WITHOUT
00010 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00011 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
00012 
00013 You should have received a copy of the GNU General Public License along with
00014 this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
00015 St, Fifth Floor, Boston, MA 02110-1301 USA
00016 
00017 *****************************************************************************/
00018 
00019 /**************************************************/
00026 #include "row0ins.h"
00027 
00028 #ifdef UNIV_NONINL
00029 #include "row0ins.ic"
00030 #endif
00031 
00032 #include "ha_prototypes.h"
00033 #include "dict0dict.h"
00034 #include "dict0boot.h"
00035 #include "trx0undo.h"
00036 #include "btr0btr.h"
00037 #include "btr0cur.h"
00038 #include "mach0data.h"
00039 #include "que0que.h"
00040 #include "row0upd.h"
00041 #include "row0sel.h"
00042 #include "row0row.h"
00043 #include "rem0cmp.h"
00044 #include "lock0lock.h"
00045 #include "log0log.h"
00046 #include "eval0eval.h"
00047 #include "data0data.h"
00048 #include "usr0sess.h"
00049 #include "buf0lru.h"
00050 
00051 #define ROW_INS_PREV  1
00052 #define ROW_INS_NEXT  2
00053 
00054 /*************************************************************************
00055 IMPORTANT NOTE: Any operation that generates redo MUST check that there
00056 is enough space in the redo log before for that operation. This is
00057 done by calling log_free_check(). The reason for checking the
00058 availability of the redo log space before the start of the operation is
00059 that we MUST not hold any synchonization objects when performing the
00060 check.
00061 If you make a change in this module make sure that no codepath is
00062 introduced where a call to log_free_check() is bypassed. */
00063 
00064 /*********************************************************************/
00067 UNIV_INTERN
00068 ins_node_t*
00069 ins_node_create(
00070 /*============*/
00071   ulint   ins_type, 
00072   dict_table_t* table,    
00073   mem_heap_t* heap)   
00074 {
00075   ins_node_t* node;
00076 
00077   node = static_cast<ins_node_t *>(mem_heap_alloc(heap, sizeof(ins_node_t)));
00078 
00079   node->common.type = QUE_NODE_INSERT;
00080 
00081   node->ins_type = ins_type;
00082 
00083   node->state = INS_NODE_SET_IX_LOCK;
00084   node->table = table;
00085   node->index = NULL;
00086   node->entry = NULL;
00087 
00088   node->select = NULL;
00089 
00090   node->trx_id = 0;
00091 
00092   node->entry_sys_heap = mem_heap_create(128);
00093 
00094   node->magic_n = INS_NODE_MAGIC_N;
00095 
00096   return(node);
00097 }
00098 
00099 /***********************************************************/
00101 UNIV_INTERN
00102 void
00103 ins_node_create_entry_list(
00104 /*=======================*/
00105   ins_node_t* node) 
00106 {
00107   dict_index_t* index;
00108   dtuple_t* entry;
00109 
00110   ut_ad(node->entry_sys_heap);
00111 
00112   UT_LIST_INIT(node->entry_list);
00113 
00114   index = dict_table_get_first_index(node->table);
00115 
00116   while (index != NULL) {
00117     entry = row_build_index_entry(node->row, NULL, index,
00118                 node->entry_sys_heap);
00119     UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
00120 
00121     index = dict_table_get_next_index(index);
00122   }
00123 }
00124 
00125 /*****************************************************************/
00127 static
00128 void
00129 row_ins_alloc_sys_fields(
00130 /*=====================*/
00131   ins_node_t* node) 
00132 {
00133   dtuple_t*   row;
00134   dict_table_t*   table;
00135   mem_heap_t*   heap;
00136   const dict_col_t* col;
00137   dfield_t*   dfield;
00138   byte*     ptr;
00139 
00140   row = node->row;
00141   table = node->table;
00142   heap = node->entry_sys_heap;
00143 
00144   ut_ad(row && table && heap);
00145   ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
00146 
00147   /* 1. Allocate buffer for row id */
00148 
00149   col = dict_table_get_sys_col(table, DATA_ROW_ID);
00150 
00151   dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
00152 
00153   ptr = static_cast<byte *>(mem_heap_zalloc(heap, DATA_ROW_ID_LEN));
00154 
00155   dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
00156 
00157   node->row_id_buf = ptr;
00158 
00159   /* 3. Allocate buffer for trx id */
00160 
00161   col = dict_table_get_sys_col(table, DATA_TRX_ID);
00162 
00163   dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
00164   ptr = static_cast<byte *>(mem_heap_zalloc(heap, DATA_TRX_ID_LEN));
00165 
00166   dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
00167 
00168   node->trx_id_buf = ptr;
00169 
00170   /* 4. Allocate buffer for roll ptr */
00171 
00172   col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
00173 
00174   dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
00175   ptr = static_cast<byte *>(mem_heap_zalloc(heap, DATA_ROLL_PTR_LEN));
00176 
00177   dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
00178 }
00179 
00180 /*********************************************************************/
00184 UNIV_INTERN
00185 void
00186 ins_node_set_new_row(
00187 /*=================*/
00188   ins_node_t* node, 
00189   dtuple_t* row)  
00190 {
00191   node->state = INS_NODE_SET_IX_LOCK;
00192   node->index = NULL;
00193   node->entry = NULL;
00194 
00195   node->row = row;
00196 
00197   mem_heap_empty(node->entry_sys_heap);
00198 
00199   /* Create templates for index entries */
00200 
00201   ins_node_create_entry_list(node);
00202 
00203   /* Allocate from entry_sys_heap buffers for sys fields */
00204 
00205   row_ins_alloc_sys_fields(node);
00206 
00207   /* As we allocated a new trx id buf, the trx id should be written
00208   there again: */
00209 
00210   node->trx_id = 0;
00211 }
00212 
00213 /*******************************************************************/
00218 static
00219 ulint
00220 row_ins_sec_index_entry_by_modify(
00221 /*==============================*/
00222   ulint   mode, 
00225   btr_cur_t*  cursor, 
00226   const dtuple_t* entry,  
00227   que_thr_t*  thr,  
00228   mtr_t*    mtr)  
00230 {
00231   big_rec_t*  dummy_big_rec;
00232   mem_heap_t* heap;
00233   upd_t*    update;
00234   rec_t*    rec;
00235   ulint   err;
00236 
00237   rec = btr_cur_get_rec(cursor);
00238 
00239   ut_ad(!dict_index_is_clust(cursor->index));
00240   ut_ad(rec_get_deleted_flag(rec,
00241            dict_table_is_comp(cursor->index->table)));
00242 
00243   /* We know that in the alphabetical ordering, entry and rec are
00244   identified. But in their binary form there may be differences if
00245   there are char fields in them. Therefore we have to calculate the
00246   difference. */
00247 
00248   heap = mem_heap_create(1024);
00249 
00250   update = row_upd_build_sec_rec_difference_binary(
00251     cursor->index, entry, rec, thr_get_trx(thr), heap);
00252   if (mode == BTR_MODIFY_LEAF) {
00253     /* Try an optimistic updating of the record, keeping changes
00254     within the page */
00255 
00256     err = btr_cur_optimistic_update(BTR_KEEP_SYS_FLAG, cursor,
00257             update, 0, thr, mtr);
00258     switch (err) {
00259     case DB_OVERFLOW:
00260     case DB_UNDERFLOW:
00261     case DB_ZIP_OVERFLOW:
00262       err = DB_FAIL;
00263     }
00264   } else {
00265     ut_a(mode == BTR_MODIFY_TREE);
00266     if (buf_LRU_buf_pool_running_out()) {
00267 
00268       err = DB_LOCK_TABLE_FULL;
00269 
00270       goto func_exit;
00271     }
00272 
00273     err = btr_cur_pessimistic_update(BTR_KEEP_SYS_FLAG, cursor,
00274              &heap, &dummy_big_rec, update,
00275              0, thr, mtr);
00276     ut_ad(!dummy_big_rec);
00277   }
00278 func_exit:
00279   mem_heap_free(heap);
00280 
00281   return(err);
00282 }
00283 
00284 /*******************************************************************/
00289 static
00290 ulint
00291 row_ins_clust_index_entry_by_modify(
00292 /*================================*/
00293   ulint   mode, 
00296   btr_cur_t*  cursor, 
00297   mem_heap_t**  heap, 
00298   big_rec_t** big_rec,
00301   const dtuple_t* entry,  
00302   que_thr_t*  thr,  
00303   mtr_t*    mtr)  
00305 {
00306   rec_t*    rec;
00307   upd_t*    update;
00308   ulint   err;
00309 
00310   ut_ad(dict_index_is_clust(cursor->index));
00311 
00312   *big_rec = NULL;
00313 
00314   rec = btr_cur_get_rec(cursor);
00315 
00316   ut_ad(rec_get_deleted_flag(rec,
00317            dict_table_is_comp(cursor->index->table)));
00318 
00319   if (!*heap) {
00320     *heap = mem_heap_create(1024);
00321   }
00322 
00323   /* Build an update vector containing all the fields to be modified;
00324   NOTE that this vector may NOT contain system columns trx_id or
00325   roll_ptr */
00326 
00327   update = row_upd_build_difference_binary(cursor->index, entry, rec,
00328              thr_get_trx(thr), *heap);
00329   if (mode == BTR_MODIFY_LEAF) {
00330     /* Try optimistic updating of the record, keeping changes
00331     within the page */
00332 
00333     err = btr_cur_optimistic_update(0, cursor, update, 0, thr,
00334             mtr);
00335     switch (err) {
00336     case DB_OVERFLOW:
00337     case DB_UNDERFLOW:
00338     case DB_ZIP_OVERFLOW:
00339       err = DB_FAIL;
00340     }
00341   } else {
00342     ut_a(mode == BTR_MODIFY_TREE);
00343     if (buf_LRU_buf_pool_running_out()) {
00344 
00345       return(DB_LOCK_TABLE_FULL);
00346 
00347     }
00348     err = btr_cur_pessimistic_update(0, cursor,
00349              heap, big_rec, update,
00350              0, thr, mtr);
00351   }
00352 
00353   return(err);
00354 }
00355 
00356 /*********************************************************************/
00360 static
00361 ibool
00362 row_ins_cascade_ancestor_updates_table(
00363 /*===================================*/
00364   que_node_t* node, 
00365   dict_table_t* table)  
00366 {
00367   que_node_t* parent;
00368   upd_node_t* upd_node;
00369 
00370   parent = que_node_get_parent(node);
00371 
00372   while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
00373 
00374     upd_node = static_cast<upd_node_t *>(parent);
00375 
00376     if (upd_node->table == table && upd_node->is_delete == FALSE) {
00377 
00378       return(TRUE);
00379     }
00380 
00381     parent = que_node_get_parent(parent);
00382 
00383     ut_a(parent);
00384   }
00385 
00386   return(FALSE);
00387 }
00388 
00389 /*********************************************************************/
00393 static
00394 ulint
00395 row_ins_cascade_n_ancestors(
00396 /*========================*/
00397   que_node_t* node) 
00398 {
00399   que_node_t* parent;
00400   ulint   n_ancestors = 0;
00401 
00402   parent = que_node_get_parent(node);
00403 
00404   while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
00405     n_ancestors++;
00406 
00407     parent = que_node_get_parent(parent);
00408 
00409     ut_a(parent);
00410   }
00411 
00412   return(n_ancestors);
00413 }
00414 
00415 /******************************************************************/
00422 static
00423 ulint
00424 row_ins_cascade_calc_update_vec(
00425 /*============================*/
00426   upd_node_t* node,   
00428   dict_foreign_t* foreign,  
00430   mem_heap_t* heap)   
00432 {
00433   upd_node_t* cascade   = node->cascade_node;
00434   dict_table_t* table   = foreign->foreign_table;
00435   dict_index_t* index   = foreign->foreign_index;
00436   upd_t*    update;
00437   upd_field_t*  ufield;
00438   dict_table_t* parent_table;
00439   dict_index_t* parent_index;
00440   upd_t*    parent_update;
00441   upd_field_t*  parent_ufield;
00442   ulint   n_fields_updated;
00443   ulint   parent_field_no;
00444   ulint   i;
00445   ulint   j;
00446 
00447   ut_a(node);
00448   ut_a(foreign);
00449   ut_a(cascade);
00450   ut_a(table);
00451   ut_a(index);
00452 
00453   /* Calculate the appropriate update vector which will set the fields
00454   in the child index record to the same value (possibly padded with
00455   spaces if the column is a fixed length CHAR or FIXBINARY column) as
00456   the referenced index record will get in the update. */
00457 
00458   parent_table = node->table;
00459   ut_a(parent_table == foreign->referenced_table);
00460   parent_index = foreign->referenced_index;
00461   parent_update = node->update;
00462 
00463   update = cascade->update;
00464 
00465   update->info_bits = 0;
00466   update->n_fields = foreign->n_fields;
00467 
00468   n_fields_updated = 0;
00469 
00470   for (i = 0; i < foreign->n_fields; i++) {
00471 
00472     parent_field_no = dict_table_get_nth_col_pos(
00473       parent_table,
00474       dict_index_get_nth_col_no(parent_index, i));
00475 
00476     for (j = 0; j < parent_update->n_fields; j++) {
00477       parent_ufield = parent_update->fields + j;
00478 
00479       if (parent_ufield->field_no == parent_field_no) {
00480 
00481         ulint     min_size;
00482         const dict_col_t* col;
00483         ulint     ufield_len;
00484 
00485         col = dict_index_get_nth_col(index, i);
00486 
00487         /* A field in the parent index record is
00488         updated. Let us make the update vector
00489         field for the child table. */
00490 
00491         ufield = update->fields + n_fields_updated;
00492 
00493         ufield->field_no
00494           = dict_table_get_nth_col_pos(
00495           table, dict_col_get_no(col));
00496         ufield->exp = NULL;
00497 
00498         ufield->new_val = parent_ufield->new_val;
00499         ufield_len = dfield_get_len(&ufield->new_val);
00500 
00501         /* Clear the "external storage" flag */
00502         dfield_set_len(&ufield->new_val, ufield_len);
00503 
00504         /* Do not allow a NOT NULL column to be
00505         updated as NULL */
00506 
00507         if (dfield_is_null(&ufield->new_val)
00508             && (col->prtype & DATA_NOT_NULL)) {
00509 
00510           return(ULINT_UNDEFINED);
00511         }
00512 
00513         /* If the new value would not fit in the
00514         column, do not allow the update */
00515 
00516         if (!dfield_is_null(&ufield->new_val)
00517             && dtype_get_at_most_n_mbchars(
00518           col->prtype, col->mbminmaxlen,
00519           col->len,
00520           ufield_len,
00521           static_cast<const char *>(dfield_get_data(&ufield->new_val)))
00522             < ufield_len) {
00523 
00524           return(ULINT_UNDEFINED);
00525         }
00526 
00527         /* If the parent column type has a different
00528         length than the child column type, we may
00529         need to pad with spaces the new value of the
00530         child column */
00531 
00532         min_size = dict_col_get_min_size(col);
00533 
00534         /* Because UNIV_SQL_NULL (the marker
00535         of SQL NULL values) exceeds all possible
00536         values of min_size, the test below will
00537         not hold for SQL NULL columns. */
00538 
00539         if (min_size > ufield_len) {
00540 
00541           byte* pad;
00542           ulint pad_len;
00543           byte* padded_data;
00544           ulint mbminlen;
00545 
00546           padded_data = static_cast<unsigned char *>(mem_heap_alloc(
00547             heap, min_size));
00548 
00549           pad = padded_data + ufield_len;
00550           pad_len = min_size - ufield_len;
00551 
00552           memcpy(padded_data,
00553                  dfield_get_data(&ufield
00554                      ->new_val),
00555                  ufield_len);
00556 
00557           mbminlen = dict_col_get_mbminlen(col);
00558 
00559           ut_ad(!(ufield_len % mbminlen));
00560           ut_ad(!(min_size % mbminlen));
00561 
00562           if (mbminlen == 1
00563               && dtype_get_charset_coll(
00564                 col->prtype)
00565               == DATA_MYSQL_BINARY_CHARSET_COLL) {
00566             /* Do not pad BINARY columns */
00567             return(ULINT_UNDEFINED);
00568           }
00569 
00570           row_mysql_pad_col(mbminlen,
00571                 pad, pad_len);
00572           dfield_set_data(&ufield->new_val,
00573               padded_data, min_size);
00574         }
00575 
00576         n_fields_updated++;
00577       }
00578     }
00579   }
00580 
00581   update->n_fields = n_fields_updated;
00582 
00583   return(n_fields_updated);
00584 }
00585 
00586 /*********************************************************************/
00589 static
00590 void
00591 row_ins_set_detailed(
00592 /*=================*/
00593   trx_t*    trx,    
00594   dict_foreign_t* foreign)  
00595 {
00596   mutex_enter(&srv_misc_tmpfile_mutex);
00597   rewind(srv_misc_tmpfile);
00598 
00599   if (os_file_set_eof(srv_misc_tmpfile)) {
00600     ut_print_name(srv_misc_tmpfile, trx, TRUE,
00601             foreign->foreign_table_name);
00602     dict_print_info_on_foreign_key_in_create_format(
00603       srv_misc_tmpfile, trx, foreign, FALSE);
00604     trx_set_detailed_error_from_file(trx, srv_misc_tmpfile);
00605   } else {
00606     trx_set_detailed_error(trx, "temp file operation failed");
00607   }
00608 
00609   mutex_exit(&srv_misc_tmpfile_mutex);
00610 }
00611 
00612 /*********************************************************************/
00615 static
00616 void
00617 row_ins_foreign_report_err(
00618 /*=======================*/
00619   const char* errstr,   
00621   que_thr_t*  thr,    
00623   dict_foreign_t* foreign,  
00624   const rec_t*  rec,    
00626   const dtuple_t* entry)    
00628 {
00629   FILE* ef  = dict_foreign_err_file;
00630   trx_t*  trx = thr_get_trx(thr);
00631 
00632   row_ins_set_detailed(trx, foreign);
00633 
00634   mutex_enter(&dict_foreign_err_mutex);
00635   rewind(ef);
00636   ut_print_timestamp(ef);
00637   fputs(" Transaction:\n", ef);
00638   trx_print(ef, trx, 600);
00639 
00640   fputs("Foreign key constraint fails for table ", ef);
00641   ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
00642   fputs(":\n", ef);
00643   dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
00644               TRUE);
00645   putc('\n', ef);
00646   fputs(errstr, ef);
00647   fputs(" in parent table, in index ", ef);
00648   ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
00649   if (entry) {
00650     fputs(" tuple:\n", ef);
00651     dtuple_print(ef, entry);
00652   }
00653   fputs("\nBut in child table ", ef);
00654   ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
00655   fputs(", in index ", ef);
00656   ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
00657   if (rec) {
00658     fputs(", there is a record:\n", ef);
00659     rec_print(ef, rec, foreign->foreign_index);
00660   } else {
00661     fputs(", the record is not available\n", ef);
00662   }
00663   putc('\n', ef);
00664 
00665   mutex_exit(&dict_foreign_err_mutex);
00666 }
00667 
00668 /*********************************************************************/
00672 static
00673 void
00674 row_ins_foreign_report_add_err(
00675 /*===========================*/
00676   trx_t*    trx,    
00677   dict_foreign_t* foreign,  
00678   const rec_t*  rec,    
00681   const dtuple_t* entry)    
00683 {
00684   FILE* ef  = dict_foreign_err_file;
00685 
00686   row_ins_set_detailed(trx, foreign);
00687 
00688   mutex_enter(&dict_foreign_err_mutex);
00689   rewind(ef);
00690   ut_print_timestamp(ef);
00691   fputs(" Transaction:\n", ef);
00692   trx_print(ef, trx, 600);
00693   fputs("Foreign key constraint fails for table ", ef);
00694   ut_print_name(ef, trx, TRUE, foreign->foreign_table_name);
00695   fputs(":\n", ef);
00696   dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign,
00697               TRUE);
00698   fputs("\nTrying to add in child table, in index ", ef);
00699   ut_print_name(ef, trx, FALSE, foreign->foreign_index->name);
00700   if (entry) {
00701     fputs(" tuple:\n", ef);
00702     /* TODO: DB_TRX_ID and DB_ROLL_PTR may be uninitialized.
00703     It would be better to only display the user columns. */
00704     dtuple_print(ef, entry);
00705   }
00706   fputs("\nBut in parent table ", ef);
00707   ut_print_name(ef, trx, TRUE, foreign->referenced_table_name);
00708   fputs(", in index ", ef);
00709   ut_print_name(ef, trx, FALSE, foreign->referenced_index->name);
00710   fputs(",\nthe closest match we can find is record:\n", ef);
00711   if (rec && page_rec_is_supremum(rec)) {
00712     /* If the cursor ended on a supremum record, it is better
00713     to report the previous record in the error message, so that
00714     the user gets a more descriptive error message. */
00715     rec = page_rec_get_prev_const(rec);
00716   }
00717 
00718   if (rec) {
00719     rec_print(ef, rec, foreign->referenced_index);
00720   }
00721   putc('\n', ef);
00722 
00723   mutex_exit(&dict_foreign_err_mutex);
00724 }
00725 
00726 /*********************************************************************/
00728 static
00729 void
00730 row_ins_invalidate_query_cache(
00731 /*===========================*/
00732   que_thr_t*  unused,   
00734   const char* name)   
00736 {
00737   char* buf;
00738   char* ptr;
00739   ulint len = strlen(name) + 1;
00740 
00741         (void)unused;
00742 
00743   buf = mem_strdupl(name, len);
00744 
00745   ptr = strchr(buf, '/');
00746   ut_a(ptr);
00747   *ptr = '\0';
00748 
00749   mem_free(buf);
00750 }
00751 
00752 /*********************************************************************/
00757 static
00758 ulint
00759 row_ins_foreign_check_on_constraint(
00760 /*================================*/
00761   que_thr_t*  thr,    
00763   dict_foreign_t* foreign,  
00765   btr_pcur_t* pcur,   
00767   dtuple_t* entry,    
00769   mtr_t*    mtr)    
00771 {
00772   upd_node_t* node;
00773   upd_node_t* cascade;
00774   dict_table_t* table   = foreign->foreign_table;
00775   dict_index_t* index;
00776   dict_index_t* clust_index;
00777   dtuple_t* ref;
00778   mem_heap_t* upd_vec_heap  = NULL;
00779   const rec_t*  rec;
00780   const rec_t*  clust_rec;
00781   const buf_block_t* clust_block;
00782   upd_t*    update;
00783   ulint   n_to_update;
00784   ulint   err;
00785   ulint   i;
00786   trx_t*    trx;
00787   mem_heap_t* tmp_heap  = NULL;
00788 
00789   ut_a(thr);
00790   ut_a(foreign);
00791   ut_a(pcur);
00792   ut_a(mtr);
00793 
00794   trx = thr_get_trx(thr);
00795 
00796   /* Since we are going to delete or update a row, we have to invalidate
00797   the MySQL query cache for table. A deadlock of threads is not possible
00798   here because the caller of this function does not hold any latches with
00799   the sync0sync.h rank above the kernel mutex. The query cache mutex has
00800   a rank just above the kernel mutex. */
00801 
00802   row_ins_invalidate_query_cache(thr, table->name);
00803 
00804   node = static_cast<upd_node_t *>(thr->run_node);
00805 
00806   if (node->is_delete && 0 == (foreign->type
00807              & (DICT_FOREIGN_ON_DELETE_CASCADE
00808           | DICT_FOREIGN_ON_DELETE_SET_NULL))) {
00809 
00810     row_ins_foreign_report_err("Trying to delete",
00811              thr, foreign,
00812              btr_pcur_get_rec(pcur), entry);
00813 
00814     return(DB_ROW_IS_REFERENCED);
00815   }
00816 
00817   if (!node->is_delete && 0 == (foreign->type
00818               & (DICT_FOREIGN_ON_UPDATE_CASCADE
00819            | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
00820 
00821     /* This is an UPDATE */
00822 
00823     row_ins_foreign_report_err("Trying to update",
00824              thr, foreign,
00825              btr_pcur_get_rec(pcur), entry);
00826 
00827     return(DB_ROW_IS_REFERENCED);
00828   }
00829 
00830   if (node->cascade_node == NULL) {
00831     /* Extend our query graph by creating a child to current
00832     update node. The child is used in the cascade or set null
00833     operation. */
00834 
00835     node->cascade_heap = mem_heap_create(128);
00836     node->cascade_node = row_create_update_node_for_mysql(
00837       table, node->cascade_heap);
00838     que_node_set_parent(node->cascade_node, node);
00839   }
00840 
00841   /* Initialize cascade_node to do the operation we want. Note that we
00842   use the SAME cascade node to do all foreign key operations of the
00843   SQL DELETE: the table of the cascade node may change if there are
00844   several child tables to the table where the delete is done! */
00845 
00846   cascade = node->cascade_node;
00847 
00848   cascade->table = table;
00849 
00850   cascade->foreign = foreign;
00851 
00852   if (node->is_delete
00853       && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
00854     cascade->is_delete = TRUE;
00855   } else {
00856     cascade->is_delete = FALSE;
00857 
00858     if (foreign->n_fields > cascade->update_n_fields) {
00859       /* We have to make the update vector longer */
00860 
00861       cascade->update = upd_create(foreign->n_fields,
00862                  node->cascade_heap);
00863       cascade->update_n_fields = foreign->n_fields;
00864     }
00865   }
00866 
00867   /* We do not allow cyclic cascaded updating (DELETE is allowed,
00868   but not UPDATE) of the same table, as this can lead to an infinite
00869   cycle. Check that we are not updating the same table which is
00870   already being modified in this cascade chain. We have to check
00871   this also because the modification of the indexes of a 'parent'
00872   table may still be incomplete, and we must avoid seeing the indexes
00873   of the parent table in an inconsistent state! */
00874 
00875   if (!cascade->is_delete
00876       && row_ins_cascade_ancestor_updates_table(cascade, table)) {
00877 
00878     /* We do not know if this would break foreign key
00879     constraints, but play safe and return an error */
00880 
00881     err = DB_ROW_IS_REFERENCED;
00882 
00883     row_ins_foreign_report_err(
00884       "Trying an update, possibly causing a cyclic"
00885       " cascaded update\n"
00886       "in the child table,", thr, foreign,
00887       btr_pcur_get_rec(pcur), entry);
00888 
00889     goto nonstandard_exit_func;
00890   }
00891 
00892   if (row_ins_cascade_n_ancestors(cascade) >= 15) {
00893     err = DB_ROW_IS_REFERENCED;
00894 
00895     row_ins_foreign_report_err(
00896       "Trying a too deep cascaded delete or update\n",
00897       thr, foreign, btr_pcur_get_rec(pcur), entry);
00898 
00899     goto nonstandard_exit_func;
00900   }
00901 
00902   index = btr_pcur_get_btr_cur(pcur)->index;
00903 
00904   ut_a(index == foreign->foreign_index);
00905 
00906   rec = btr_pcur_get_rec(pcur);
00907 
00908   if (dict_index_is_clust(index)) {
00909     /* pcur is already positioned in the clustered index of
00910     the child table */
00911 
00912     clust_index = index;
00913     clust_rec = rec;
00914     clust_block = btr_pcur_get_block(pcur);
00915   } else {
00916     /* We have to look for the record in the clustered index
00917     in the child table */
00918 
00919     clust_index = dict_table_get_first_index(table);
00920 
00921     tmp_heap = mem_heap_create(256);
00922 
00923     ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
00924           tmp_heap);
00925     btr_pcur_open_with_no_init(clust_index, ref,
00926              PAGE_CUR_LE, BTR_SEARCH_LEAF,
00927              cascade->pcur, 0, mtr);
00928 
00929     clust_rec = btr_pcur_get_rec(cascade->pcur);
00930     clust_block = btr_pcur_get_block(cascade->pcur);
00931 
00932     if (!page_rec_is_user_rec(clust_rec)
00933         || btr_pcur_get_low_match(cascade->pcur)
00934         < dict_index_get_n_unique(clust_index)) {
00935 
00936       fputs("InnoDB: error in cascade of a foreign key op\n"
00937             "InnoDB: ", stderr);
00938       dict_index_name_print(stderr, trx, index);
00939 
00940       fputs("\n"
00941             "InnoDB: record ", stderr);
00942       rec_print(stderr, rec, index);
00943       fputs("\n"
00944             "InnoDB: clustered record ", stderr);
00945       rec_print(stderr, clust_rec, clust_index);
00946       fputs("\n"
00947             "InnoDB: Submit a detailed bug report to"
00948             " http://bugs.mysql.com\n", stderr);
00949 
00950       err = DB_SUCCESS;
00951 
00952       goto nonstandard_exit_func;
00953     }
00954   }
00955 
00956   /* Set an X-lock on the row to delete or update in the child table */
00957 
00958   err = lock_table(0, table, LOCK_IX, thr);
00959 
00960   if (err == DB_SUCCESS) {
00961     /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
00962     we already have a normal shared lock on the appropriate
00963     gap if the search criterion was not unique */
00964 
00965     err = lock_clust_rec_read_check_and_lock_alt(
00966       0, clust_block, clust_rec, clust_index,
00967       LOCK_X, LOCK_REC_NOT_GAP, thr);
00968   }
00969 
00970   if (err != DB_SUCCESS) {
00971 
00972     goto nonstandard_exit_func;
00973   }
00974 
00975   if (rec_get_deleted_flag(clust_rec, dict_table_is_comp(table))) {
00976     /* This can happen if there is a circular reference of
00977     rows such that cascading delete comes to delete a row
00978     already in the process of being delete marked */
00979     err = DB_SUCCESS;
00980 
00981     goto nonstandard_exit_func;
00982   }
00983 
00984   if ((node->is_delete
00985        && (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL))
00986       || (!node->is_delete
00987     && (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
00988 
00989     /* Build the appropriate update vector which sets
00990     foreign->n_fields first fields in rec to SQL NULL */
00991 
00992     update = cascade->update;
00993 
00994     update->info_bits = 0;
00995     update->n_fields = foreign->n_fields;
00996 
00997     for (i = 0; i < foreign->n_fields; i++) {
00998       upd_field_t*  ufield = &update->fields[i];
00999 
01000       ufield->field_no = dict_table_get_nth_col_pos(
01001         table,
01002         dict_index_get_nth_col_no(index, i));
01003       ufield->orig_len = 0;
01004       ufield->exp = NULL;
01005       dfield_set_null(&ufield->new_val);
01006     }
01007   }
01008 
01009   if (!node->is_delete
01010       && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
01011 
01012     /* Build the appropriate update vector which sets changing
01013     foreign->n_fields first fields in rec to new values */
01014 
01015     upd_vec_heap = mem_heap_create(256);
01016 
01017     n_to_update = row_ins_cascade_calc_update_vec(node, foreign,
01018                     upd_vec_heap);
01019     if (n_to_update == ULINT_UNDEFINED) {
01020       err = DB_ROW_IS_REFERENCED;
01021 
01022       row_ins_foreign_report_err(
01023         "Trying a cascaded update where the"
01024         " updated value in the child\n"
01025         "table would not fit in the length"
01026         " of the column, or the value would\n"
01027         "be NULL and the column is"
01028         " declared as not NULL in the child table,",
01029         thr, foreign, btr_pcur_get_rec(pcur), entry);
01030 
01031       goto nonstandard_exit_func;
01032     }
01033 
01034     if (cascade->update->n_fields == 0) {
01035 
01036       /* The update does not change any columns referred
01037       to in this foreign key constraint: no need to do
01038       anything */
01039 
01040       err = DB_SUCCESS;
01041 
01042       goto nonstandard_exit_func;
01043     }
01044   }
01045 
01046   /* Store pcur position and initialize or store the cascade node
01047   pcur stored position */
01048 
01049   btr_pcur_store_position(pcur, mtr);
01050 
01051   if (index == clust_index) {
01052     btr_pcur_copy_stored_position(cascade->pcur, pcur);
01053   } else {
01054     btr_pcur_store_position(cascade->pcur, mtr);
01055   }
01056 
01057   mtr_commit(mtr);
01058 
01059   ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
01060 
01061   cascade->state = UPD_NODE_UPDATE_CLUSTERED;
01062 
01063   err = row_update_cascade_for_mysql(thr, cascade,
01064              foreign->foreign_table);
01065 
01066   if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
01067     fprintf(stderr,
01068       "InnoDB: error: table %s has the counter 0"
01069       " though there is\n"
01070       "InnoDB: a FOREIGN KEY check running on it.\n",
01071       foreign->foreign_table->name);
01072   }
01073 
01074   /* Release the data dictionary latch for a while, so that we do not
01075   starve other threads from doing CREATE TABLE etc. if we have a huge
01076   cascaded operation running. The counter n_foreign_key_checks_running
01077   will prevent other users from dropping or ALTERing the table when we
01078   release the latch. */
01079 
01080   row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
01081   row_mysql_freeze_data_dictionary(thr_get_trx(thr));
01082 
01083   mtr_start(mtr);
01084 
01085   /* Restore pcur position */
01086 
01087   btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
01088 
01089   if (tmp_heap) {
01090     mem_heap_free(tmp_heap);
01091   }
01092 
01093   if (upd_vec_heap) {
01094     mem_heap_free(upd_vec_heap);
01095   }
01096 
01097   return(err);
01098 
01099 nonstandard_exit_func:
01100   if (tmp_heap) {
01101     mem_heap_free(tmp_heap);
01102   }
01103 
01104   if (upd_vec_heap) {
01105     mem_heap_free(upd_vec_heap);
01106   }
01107 
01108   btr_pcur_store_position(pcur, mtr);
01109 
01110   mtr_commit(mtr);
01111   mtr_start(mtr);
01112 
01113   btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
01114 
01115   return(err);
01116 }
01117 
01118 /*********************************************************************/
01122 static
01123 enum db_err
01124 row_ins_set_shared_rec_lock(
01125 /*========================*/
01126   ulint     type, 
01128   const buf_block_t*  block,  
01129   const rec_t*    rec,  
01130   dict_index_t*   index,  
01131   const ulint*    offsets,
01132   que_thr_t*    thr)  
01133 {
01134   enum db_err err;
01135 
01136   ut_ad(rec_offs_validate(rec, index, offsets));
01137 
01138   if (dict_index_is_clust(index)) {
01139     err = lock_clust_rec_read_check_and_lock(
01140       0, block, rec, index, offsets, LOCK_S, type, thr);
01141   } else {
01142     err = lock_sec_rec_read_check_and_lock(
01143       0, block, rec, index, offsets, LOCK_S, type, thr);
01144   }
01145 
01146   return(err);
01147 }
01148 
01149 /*********************************************************************/
01153 static
01154 enum db_err
01155 row_ins_set_exclusive_rec_lock(
01156 /*===========================*/
01157   ulint     type, 
01159   const buf_block_t*  block,  
01160   const rec_t*    rec,  
01161   dict_index_t*   index,  
01162   const ulint*    offsets,
01163   que_thr_t*    thr)  
01164 {
01165   enum db_err err;
01166 
01167   ut_ad(rec_offs_validate(rec, index, offsets));
01168 
01169   if (dict_index_is_clust(index)) {
01170     err = lock_clust_rec_read_check_and_lock(
01171       0, block, rec, index, offsets, LOCK_X, type, thr);
01172   } else {
01173     err = lock_sec_rec_read_check_and_lock(
01174       0, block, rec, index, offsets, LOCK_X, type, thr);
01175   }
01176 
01177   return(err);
01178 }
01179 
01180 /***************************************************************/
01185 UNIV_INTERN
01186 ulint
01187 row_ins_check_foreign_constraint(
01188 /*=============================*/
01189   ibool   check_ref,
01192   dict_foreign_t* foreign,
01195   dict_table_t* table,  
01197   dtuple_t* entry,  
01198   que_thr_t*  thr)  
01199 {
01200   upd_node_t* upd_node;
01201   dict_table_t* check_table;
01202   dict_index_t* check_index;
01203   ulint   n_fields_cmp;
01204   btr_pcur_t  pcur;
01205   int   cmp;
01206   ulint   err;
01207   ulint   i;
01208   mtr_t   mtr;
01209   trx_t*    trx   = thr_get_trx(thr);
01210   mem_heap_t* heap    = NULL;
01211   ulint   offsets_[REC_OFFS_NORMAL_SIZE];
01212   ulint*    offsets   = offsets_;
01213   rec_offs_init(offsets_);
01214 
01215 run_again:
01216 #ifdef UNIV_SYNC_DEBUG
01217   ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
01218 #endif /* UNIV_SYNC_DEBUG */
01219 
01220   err = DB_SUCCESS;
01221 
01222   if (trx->check_foreigns == FALSE) {
01223     /* The user has suppressed foreign key checks currently for
01224     this session */
01225     goto exit_func;
01226   }
01227 
01228   /* If any of the foreign key fields in entry is SQL NULL, we
01229   suppress the foreign key check: this is compatible with Oracle,
01230   for example */
01231 
01232   for (i = 0; i < foreign->n_fields; i++) {
01233     if (UNIV_SQL_NULL == dfield_get_len(
01234           dtuple_get_nth_field(entry, i))) {
01235 
01236       goto exit_func;
01237     }
01238   }
01239 
01240   if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
01241     upd_node = static_cast<upd_node_t *>(thr->run_node);
01242 
01243     if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
01244       /* If a cascaded update is done as defined by a
01245       foreign key constraint, do not check that
01246       constraint for the child row. In ON UPDATE CASCADE
01247       the update of the parent row is only half done when
01248       we come here: if we would check the constraint here
01249       for the child row it would fail.
01250 
01251       A QUESTION remains: if in the child table there are
01252       several constraints which refer to the same parent
01253       table, we should merge all updates to the child as
01254       one update? And the updates can be contradictory!
01255       Currently we just perform the update associated
01256       with each foreign key constraint, one after
01257       another, and the user has problems predicting in
01258       which order they are performed. */
01259 
01260       goto exit_func;
01261     }
01262   }
01263 
01264   if (check_ref) {
01265     check_table = foreign->referenced_table;
01266     check_index = foreign->referenced_index;
01267   } else {
01268     check_table = foreign->foreign_table;
01269     check_index = foreign->foreign_index;
01270   }
01271 
01272   if (check_table == NULL || check_table->ibd_file_missing) {
01273     if (check_ref) {
01274       FILE* ef = dict_foreign_err_file;
01275 
01276       row_ins_set_detailed(trx, foreign);
01277 
01278       mutex_enter(&dict_foreign_err_mutex);
01279       rewind(ef);
01280       ut_print_timestamp(ef);
01281       fputs(" Transaction:\n", ef);
01282       trx_print(ef, trx, 600);
01283       fputs("Foreign key constraint fails for table ", ef);
01284       ut_print_name(ef, trx, TRUE,
01285               foreign->foreign_table_name);
01286       fputs(":\n", ef);
01287       dict_print_info_on_foreign_key_in_create_format(
01288         ef, trx, foreign, TRUE);
01289       fputs("\nTrying to add to index ", ef);
01290       ut_print_name(ef, trx, FALSE,
01291               foreign->foreign_index->name);
01292       fputs(" tuple:\n", ef);
01293       dtuple_print(ef, entry);
01294       fputs("\nBut the parent table ", ef);
01295       ut_print_name(ef, trx, TRUE,
01296               foreign->referenced_table_name);
01297       fputs("\nor its .ibd file does"
01298             " not currently exist!\n", ef);
01299       mutex_exit(&dict_foreign_err_mutex);
01300 
01301       err = DB_NO_REFERENCED_ROW;
01302     }
01303 
01304     goto exit_func;
01305   }
01306 
01307   ut_a(check_table);
01308   ut_a(check_index);
01309 
01310   if (check_table != table) {
01311     /* We already have a LOCK_IX on table, but not necessarily
01312     on check_table */
01313 
01314     err = lock_table(0, check_table, LOCK_IS, thr);
01315 
01316     if (err != DB_SUCCESS) {
01317 
01318       goto do_possible_lock_wait;
01319     }
01320   }
01321 
01322   mtr_start(&mtr);
01323 
01324   /* Store old value on n_fields_cmp */
01325 
01326   n_fields_cmp = dtuple_get_n_fields_cmp(entry);
01327 
01328   dtuple_set_n_fields_cmp(entry, foreign->n_fields);
01329 
01330   btr_pcur_open(check_index, entry, PAGE_CUR_GE,
01331           BTR_SEARCH_LEAF, &pcur, &mtr);
01332 
01333   /* Scan index records and check if there is a matching record */
01334 
01335   do {
01336     const rec_t*    rec = btr_pcur_get_rec(&pcur);
01337     const buf_block_t*  block = btr_pcur_get_block(&pcur);
01338 
01339     if (page_rec_is_infimum(rec)) {
01340 
01341       continue;
01342     }
01343 
01344     offsets = rec_get_offsets(rec, check_index,
01345             offsets, ULINT_UNDEFINED, &heap);
01346 
01347     if (page_rec_is_supremum(rec)) {
01348 
01349       err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, block,
01350                 rec, check_index,
01351                 offsets, thr);
01352       switch (err) {
01353       case DB_SUCCESS_LOCKED_REC:
01354       case DB_SUCCESS:
01355         continue;
01356       default:
01357         goto end_scan;
01358       }
01359     }
01360 
01361     cmp = cmp_dtuple_rec(entry, rec, offsets);
01362 
01363     if (cmp == 0) {
01364       if (rec_get_deleted_flag(rec,
01365              rec_offs_comp(offsets))) {
01366         err = row_ins_set_shared_rec_lock(
01367           LOCK_ORDINARY, block,
01368           rec, check_index, offsets, thr);
01369         switch (err) {
01370         case DB_SUCCESS_LOCKED_REC:
01371         case DB_SUCCESS:
01372           break;
01373         default:
01374           goto end_scan;
01375         }
01376       } else {
01377         /* Found a matching record. Lock only
01378         a record because we can allow inserts
01379         into gaps */
01380 
01381         err = row_ins_set_shared_rec_lock(
01382           LOCK_REC_NOT_GAP, block,
01383           rec, check_index, offsets, thr);
01384 
01385         switch (err) {
01386         case DB_SUCCESS_LOCKED_REC:
01387         case DB_SUCCESS:
01388           break;
01389         default:
01390           goto end_scan;
01391         }
01392 
01393         if (check_ref) {
01394           err = DB_SUCCESS;
01395 
01396           goto end_scan;
01397         } else if (foreign->type != 0) {
01398           /* There is an ON UPDATE or ON DELETE
01399           condition: check them in a separate
01400           function */
01401 
01402           err = row_ins_foreign_check_on_constraint(
01403             thr, foreign, &pcur, entry,
01404             &mtr);
01405           if (err != DB_SUCCESS) {
01406             /* Since reporting a plain
01407             "duplicate key" error
01408             message to the user in
01409             cases where a long CASCADE
01410             operation would lead to a
01411             duplicate key in some
01412             other table is very
01413             confusing, map duplicate
01414             key errors resulting from
01415             FK constraints to a
01416             separate error code. */
01417 
01418             if (err == DB_DUPLICATE_KEY) {
01419               err = DB_FOREIGN_DUPLICATE_KEY;
01420             }
01421 
01422             goto end_scan;
01423           }
01424 
01425           /* row_ins_foreign_check_on_constraint
01426           may have repositioned pcur on a
01427           different block */
01428           block = btr_pcur_get_block(&pcur);
01429         } else {
01430           row_ins_foreign_report_err(
01431             "Trying to delete or update",
01432             thr, foreign, rec, entry);
01433 
01434           err = DB_ROW_IS_REFERENCED;
01435           goto end_scan;
01436         }
01437       }
01438     } else {
01439       ut_a(cmp < 0);
01440 
01441       err = row_ins_set_shared_rec_lock(
01442         LOCK_GAP, block,
01443         rec, check_index, offsets, thr);
01444 
01445       switch (err) {
01446       case DB_SUCCESS_LOCKED_REC:
01447       case DB_SUCCESS:
01448         if (check_ref) {
01449           err = DB_NO_REFERENCED_ROW;
01450           row_ins_foreign_report_add_err(
01451             trx, foreign, rec, entry);
01452         } else {
01453           err = DB_SUCCESS;
01454         }
01455       }
01456 
01457       goto end_scan;
01458     }
01459   } while (btr_pcur_move_to_next(&pcur, &mtr));
01460 
01461   if (check_ref) {
01462     row_ins_foreign_report_add_err(
01463       trx, foreign, btr_pcur_get_rec(&pcur), entry);
01464     err = DB_NO_REFERENCED_ROW;
01465   } else {
01466     err = DB_SUCCESS;
01467   }
01468 
01469 end_scan:
01470   btr_pcur_close(&pcur);
01471 
01472   mtr_commit(&mtr);
01473 
01474   /* Restore old value */
01475   dtuple_set_n_fields_cmp(entry, n_fields_cmp);
01476 
01477 do_possible_lock_wait:
01478   if (err == DB_LOCK_WAIT) {
01479     trx->error_state = err;
01480 
01481     que_thr_stop_for_mysql(thr);
01482 
01483     srv_suspend_mysql_thread(thr);
01484 
01485     if (trx->error_state == DB_SUCCESS) {
01486 
01487       goto run_again;
01488     }
01489 
01490     err = trx->error_state;
01491   }
01492 
01493 exit_func:
01494   if (UNIV_LIKELY_NULL(heap)) {
01495     mem_heap_free(heap);
01496   }
01497   return(err);
01498 }
01499 
01500 /***************************************************************/
01507 static
01508 ulint
01509 row_ins_check_foreign_constraints(
01510 /*==============================*/
01511   dict_table_t* table,  
01512   dict_index_t* index,  
01513   dtuple_t* entry,  
01514   que_thr_t*  thr)  
01515 {
01516   dict_foreign_t* foreign;
01517   ulint   err;
01518   trx_t*    trx;
01519   ibool   got_s_lock  = FALSE;
01520 
01521   trx = thr_get_trx(thr);
01522 
01523   foreign = UT_LIST_GET_FIRST(table->foreign_list);
01524 
01525   while (foreign) {
01526     if (foreign->foreign_index == index) {
01527 
01528       if (foreign->referenced_table == NULL) {
01529         dict_table_get(foreign->referenced_table_name,
01530                  FALSE);
01531       }
01532 
01533       if (0 == trx->dict_operation_lock_mode) {
01534         got_s_lock = TRUE;
01535 
01536         row_mysql_freeze_data_dictionary(trx);
01537       }
01538 
01539       if (foreign->referenced_table) {
01540         mutex_enter(&(dict_sys->mutex));
01541 
01542         (foreign->referenced_table
01543          ->n_foreign_key_checks_running)++;
01544 
01545         mutex_exit(&(dict_sys->mutex));
01546       }
01547 
01548       /* NOTE that if the thread ends up waiting for a lock
01549       we will release dict_operation_lock temporarily!
01550       But the counter on the table protects the referenced
01551       table from being dropped while the check is running. */
01552 
01553       err = row_ins_check_foreign_constraint(
01554         TRUE, foreign, table, entry, thr);
01555 
01556       if (foreign->referenced_table) {
01557         mutex_enter(&(dict_sys->mutex));
01558 
01559         ut_a(foreign->referenced_table
01560              ->n_foreign_key_checks_running > 0);
01561         (foreign->referenced_table
01562          ->n_foreign_key_checks_running)--;
01563 
01564         mutex_exit(&(dict_sys->mutex));
01565       }
01566 
01567       if (got_s_lock) {
01568         row_mysql_unfreeze_data_dictionary(trx);
01569       }
01570 
01571       if (err != DB_SUCCESS) {
01572         return(err);
01573       }
01574     }
01575 
01576     foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
01577   }
01578 
01579   return(DB_SUCCESS);
01580 }
01581 
01582 /***************************************************************/
01586 static
01587 ibool
01588 row_ins_dupl_error_with_rec(
01589 /*========================*/
01590   const rec_t*  rec,  
01593   const dtuple_t* entry,  
01594   dict_index_t* index,  
01595   const ulint*  offsets)
01596 {
01597   ulint matched_fields;
01598   ulint matched_bytes;
01599   ulint n_unique;
01600   ulint i;
01601 
01602   ut_ad(rec_offs_validate(rec, index, offsets));
01603 
01604   n_unique = dict_index_get_n_unique(index);
01605 
01606   matched_fields = 0;
01607   matched_bytes = 0;
01608 
01609   cmp_dtuple_rec_with_match(entry, rec, offsets,
01610           &matched_fields, &matched_bytes);
01611 
01612   if (matched_fields < n_unique) {
01613 
01614     return(FALSE);
01615   }
01616 
01617   /* In a unique secondary index we allow equal key values if they
01618   contain SQL NULLs */
01619 
01620   if (!dict_index_is_clust(index)) {
01621 
01622     for (i = 0; i < n_unique; i++) {
01623       if (UNIV_SQL_NULL == dfield_get_len(
01624             dtuple_get_nth_field(entry, i))) {
01625 
01626         return(FALSE);
01627       }
01628     }
01629   }
01630 
01631   return(!rec_get_deleted_flag(rec, rec_offs_comp(offsets)));
01632 }
01633 
01634 /***************************************************************/
01639 static
01640 ulint
01641 row_ins_scan_sec_index_for_duplicate(
01642 /*=================================*/
01643   dict_index_t* index,  
01644   dtuple_t* entry,  
01645   que_thr_t*  thr)  
01646 {
01647   ulint   n_unique;
01648   ulint   i;
01649   int   cmp;
01650   ulint   n_fields_cmp;
01651   btr_pcur_t  pcur;
01652   ulint   err   = DB_SUCCESS;
01653   unsigned  allow_duplicates;
01654   mtr_t   mtr;
01655   mem_heap_t* heap    = NULL;
01656   ulint   offsets_[REC_OFFS_NORMAL_SIZE];
01657   ulint*    offsets   = offsets_;
01658   rec_offs_init(offsets_);
01659 
01660   n_unique = dict_index_get_n_unique(index);
01661 
01662   /* If the secondary index is unique, but one of the fields in the
01663   n_unique first fields is NULL, a unique key violation cannot occur,
01664   since we define NULL != NULL in this case */
01665 
01666   for (i = 0; i < n_unique; i++) {
01667     if (UNIV_SQL_NULL == dfield_get_len(
01668           dtuple_get_nth_field(entry, i))) {
01669 
01670       return(DB_SUCCESS);
01671     }
01672   }
01673 
01674   mtr_start(&mtr);
01675 
01676   /* Store old value on n_fields_cmp */
01677 
01678   n_fields_cmp = dtuple_get_n_fields_cmp(entry);
01679 
01680   dtuple_set_n_fields_cmp(entry, dict_index_get_n_unique(index));
01681 
01682   btr_pcur_open(index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
01683 
01684   allow_duplicates = thr_get_trx(thr)->duplicates & TRX_DUP_IGNORE;
01685 
01686   /* Scan index records and check if there is a duplicate */
01687 
01688   do {
01689     const rec_t*    rec = btr_pcur_get_rec(&pcur);
01690     const buf_block_t*  block = btr_pcur_get_block(&pcur);
01691 
01692     if (page_rec_is_infimum(rec)) {
01693 
01694       continue;
01695     }
01696 
01697     offsets = rec_get_offsets(rec, index, offsets,
01698             ULINT_UNDEFINED, &heap);
01699 
01700     if (allow_duplicates) {
01701 
01702       /* If the SQL-query will update or replace
01703       duplicate key we will take X-lock for
01704       duplicates ( REPLACE, LOAD DATAFILE REPLACE,
01705       INSERT ON DUPLICATE KEY UPDATE). */
01706 
01707       err = row_ins_set_exclusive_rec_lock(
01708         LOCK_ORDINARY, block,
01709         rec, index, offsets, thr);
01710     } else {
01711 
01712       err = row_ins_set_shared_rec_lock(
01713         LOCK_ORDINARY, block,
01714         rec, index, offsets, thr);
01715     }
01716 
01717     switch (err) {
01718     case DB_SUCCESS_LOCKED_REC:
01719       err = DB_SUCCESS;
01720     case DB_SUCCESS:
01721       break;
01722     default:
01723       goto end_scan;
01724     }
01725 
01726     if (page_rec_is_supremum(rec)) {
01727 
01728       continue;
01729     }
01730 
01731     cmp = cmp_dtuple_rec(entry, rec, offsets);
01732 
01733     if (cmp == 0) {
01734       if (row_ins_dupl_error_with_rec(rec, entry,
01735               index, offsets)) {
01736         err = DB_DUPLICATE_KEY;
01737 
01738         thr_get_trx(thr)->error_info = index;
01739 
01740         goto end_scan;
01741       }
01742     } else {
01743       ut_a(cmp < 0);
01744       goto end_scan;
01745     }
01746   } while (btr_pcur_move_to_next(&pcur, &mtr));
01747 
01748 end_scan:
01749   if (UNIV_LIKELY_NULL(heap)) {
01750     mem_heap_free(heap);
01751   }
01752   mtr_commit(&mtr);
01753 
01754   /* Restore old value */
01755   dtuple_set_n_fields_cmp(entry, n_fields_cmp);
01756 
01757   return(err);
01758 }
01759 
01760 /***************************************************************/
01767 static
01768 ulint
01769 row_ins_duplicate_error_in_clust(
01770 /*=============================*/
01771   btr_cur_t*  cursor, 
01772   const dtuple_t* entry,  
01773   que_thr_t*  thr,  
01774   mtr_t*    mtr)  
01775 {
01776   ulint err;
01777   rec_t*  rec;
01778   ulint n_unique;
01779   trx_t*  trx   = thr_get_trx(thr);
01780   mem_heap_t*heap   = NULL;
01781   ulint offsets_[REC_OFFS_NORMAL_SIZE];
01782   ulint*  offsets   = offsets_;
01783   rec_offs_init(offsets_);
01784 
01785   UT_NOT_USED(mtr);
01786 
01787   ut_a(dict_index_is_clust(cursor->index));
01788   ut_ad(dict_index_is_unique(cursor->index));
01789 
01790   /* NOTE: For unique non-clustered indexes there may be any number
01791   of delete marked records with the same value for the non-clustered
01792   index key (remember multiversioning), and which differ only in
01793   the row refererence part of the index record, containing the
01794   clustered index key fields. For such a secondary index record,
01795   to avoid race condition, we must FIRST do the insertion and after
01796   that check that the uniqueness condition is not breached! */
01797 
01798   /* NOTE: A problem is that in the B-tree node pointers on an
01799   upper level may match more to the entry than the actual existing
01800   user records on the leaf level. So, even if low_match would suggest
01801   that a duplicate key violation may occur, this may not be the case. */
01802 
01803   n_unique = dict_index_get_n_unique(cursor->index);
01804 
01805   if (cursor->low_match >= n_unique) {
01806 
01807     rec = btr_cur_get_rec(cursor);
01808 
01809     if (!page_rec_is_infimum(rec)) {
01810       offsets = rec_get_offsets(rec, cursor->index, offsets,
01811               ULINT_UNDEFINED, &heap);
01812 
01813       /* We set a lock on the possible duplicate: this
01814       is needed in logical logging of MySQL to make
01815       sure that in roll-forward we get the same duplicate
01816       errors as in original execution */
01817 
01818       if (trx->duplicates & TRX_DUP_IGNORE) {
01819 
01820         /* If the SQL-query will update or replace
01821         duplicate key we will take X-lock for
01822         duplicates ( REPLACE, LOAD DATAFILE REPLACE,
01823         INSERT ON DUPLICATE KEY UPDATE). */
01824 
01825         err = row_ins_set_exclusive_rec_lock(
01826           LOCK_REC_NOT_GAP,
01827           btr_cur_get_block(cursor),
01828           rec, cursor->index, offsets, thr);
01829       } else {
01830 
01831         err = row_ins_set_shared_rec_lock(
01832           LOCK_REC_NOT_GAP,
01833           btr_cur_get_block(cursor), rec,
01834           cursor->index, offsets, thr);
01835       }
01836 
01837       switch (err) {
01838       case DB_SUCCESS_LOCKED_REC:
01839       case DB_SUCCESS:
01840         break;
01841       default:
01842         goto func_exit;
01843       }
01844 
01845       if (row_ins_dupl_error_with_rec(
01846             rec, entry, cursor->index, offsets)) {
01847         trx->error_info = cursor->index;
01848         err = DB_DUPLICATE_KEY;
01849         goto func_exit;
01850       }
01851     }
01852   }
01853 
01854   if (cursor->up_match >= n_unique) {
01855 
01856     rec = page_rec_get_next(btr_cur_get_rec(cursor));
01857 
01858     if (!page_rec_is_supremum(rec)) {
01859       offsets = rec_get_offsets(rec, cursor->index, offsets,
01860               ULINT_UNDEFINED, &heap);
01861 
01862       if (trx->duplicates & TRX_DUP_IGNORE) {
01863 
01864         /* If the SQL-query will update or replace
01865         duplicate key we will take X-lock for
01866         duplicates ( REPLACE, LOAD DATAFILE REPLACE,
01867         INSERT ON DUPLICATE KEY UPDATE). */
01868 
01869         err = row_ins_set_exclusive_rec_lock(
01870           LOCK_REC_NOT_GAP,
01871           btr_cur_get_block(cursor),
01872           rec, cursor->index, offsets, thr);
01873       } else {
01874 
01875         err = row_ins_set_shared_rec_lock(
01876           LOCK_REC_NOT_GAP,
01877           btr_cur_get_block(cursor),
01878           rec, cursor->index, offsets, thr);
01879       }
01880 
01881       switch (err) {
01882       case DB_SUCCESS_LOCKED_REC:
01883       case DB_SUCCESS:
01884         break;
01885       default:
01886         goto func_exit;
01887       }
01888 
01889       if (row_ins_dupl_error_with_rec(
01890             rec, entry, cursor->index, offsets)) {
01891         trx->error_info = cursor->index;
01892         err = DB_DUPLICATE_KEY;
01893         goto func_exit;
01894       }
01895     }
01896 
01897     ut_a(!dict_index_is_clust(cursor->index));
01898     /* This should never happen */
01899   }
01900 
01901   err = DB_SUCCESS;
01902 func_exit:
01903   if (UNIV_LIKELY_NULL(heap)) {
01904     mem_heap_free(heap);
01905   }
01906   return(err);
01907 }
01908 
01909 /***************************************************************/
01918 UNIV_INLINE
01919 ulint
01920 row_ins_must_modify(
01921 /*================*/
01922   btr_cur_t*  cursor) 
01923 {
01924   ulint enough_match;
01925   rec_t*  rec;
01926 
01927   /* NOTE: (compare to the note in row_ins_duplicate_error) Because node
01928   pointers on upper levels of the B-tree may match more to entry than
01929   to actual user records on the leaf level, we have to check if the
01930   candidate record is actually a user record. In a clustered index
01931   node pointers contain index->n_unique first fields, and in the case
01932   of a secondary index, all fields of the index. */
01933 
01934   enough_match = dict_index_get_n_unique_in_tree(cursor->index);
01935 
01936   if (cursor->low_match >= enough_match) {
01937 
01938     rec = btr_cur_get_rec(cursor);
01939 
01940     if (!page_rec_is_infimum(rec)) {
01941 
01942       return(ROW_INS_PREV);
01943     }
01944   }
01945 
01946   return(0);
01947 }
01948 
01949 /***************************************************************/
01960 static
01961 ulint
01962 row_ins_index_entry_low(
01963 /*====================*/
01964   ulint   mode, 
01967   dict_index_t* index,  
01968   dtuple_t* entry,  
01969   ulint   n_ext,  
01970   que_thr_t*  thr)  
01971 {
01972   btr_cur_t cursor;
01973   ulint   search_mode;
01974   ulint   modify = 0; /* remove warning */
01975   rec_t*    insert_rec;
01976   rec_t*    rec;
01977   ulint   err;
01978   ulint   n_unique;
01979   big_rec_t*  big_rec     = NULL;
01980   mtr_t   mtr;
01981   mem_heap_t* heap      = NULL;
01982 
01983   log_free_check();
01984 
01985   mtr_start(&mtr);
01986 
01987   cursor.thr = thr;
01988 
01989   /* Note that we use PAGE_CUR_LE as the search mode, because then
01990   the function will return in both low_match and up_match of the
01991   cursor sensible values */
01992 
01993   if (dict_index_is_clust(index)) {
01994     search_mode = mode;
01995   } else if (!(thr_get_trx(thr)->check_unique_secondary)) {
01996     search_mode = mode | BTR_INSERT | BTR_IGNORE_SEC_UNIQUE;
01997   } else {
01998     search_mode = mode | BTR_INSERT;
01999   }
02000 
02001   btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
02002             search_mode,
02003             &cursor, 0, __FILE__, __LINE__, &mtr);
02004 
02005   if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
02006     /* The insertion was made to the insert buffer already during
02007     the search: we are done */
02008 
02009     ut_ad(search_mode & BTR_INSERT);
02010     err = DB_SUCCESS;
02011 
02012     goto function_exit;
02013   }
02014 
02015 #ifdef UNIV_DEBUG
02016   {
02017     page_t* page = btr_cur_get_page(&cursor);
02018     rec_t*  first_rec = page_rec_get_next(
02019       page_get_infimum_rec(page));
02020 
02021     ut_ad(page_rec_is_supremum(first_rec)
02022           || rec_get_n_fields(first_rec, index)
02023           == dtuple_get_n_fields(entry));
02024   }
02025 #endif
02026 
02027   n_unique = dict_index_get_n_unique(index);
02028 
02029   if (dict_index_is_unique(index) && (cursor.up_match >= n_unique
02030               || cursor.low_match >= n_unique)) {
02031 
02032     if (dict_index_is_clust(index)) {
02033       /* Note that the following may return also
02034       DB_LOCK_WAIT */
02035 
02036       err = row_ins_duplicate_error_in_clust(
02037         &cursor, entry, thr, &mtr);
02038       if (err != DB_SUCCESS) {
02039 
02040         goto function_exit;
02041       }
02042     } else {
02043       mtr_commit(&mtr);
02044       err = row_ins_scan_sec_index_for_duplicate(
02045         index, entry, thr);
02046       mtr_start(&mtr);
02047 
02048       if (err != DB_SUCCESS) {
02049 
02050         goto function_exit;
02051       }
02052 
02053       /* We did not find a duplicate and we have now
02054       locked with s-locks the necessary records to
02055       prevent any insertion of a duplicate by another
02056       transaction. Let us now reposition the cursor and
02057       continue the insertion. */
02058 
02059       btr_cur_search_to_nth_level(index, 0, entry,
02060                 PAGE_CUR_LE,
02061                 mode | BTR_INSERT,
02062                 &cursor, 0,
02063                 __FILE__, __LINE__, &mtr);
02064     }
02065   }
02066 
02067   modify = row_ins_must_modify(&cursor);
02068 
02069   if (modify != 0) {
02070     /* There is already an index entry with a long enough common
02071     prefix, we must convert the insert into a modify of an
02072     existing record */
02073 
02074     if (modify == ROW_INS_NEXT) {
02075       rec = page_rec_get_next(btr_cur_get_rec(&cursor));
02076 
02077       btr_cur_position(index, rec,
02078            btr_cur_get_block(&cursor),&cursor);
02079     }
02080 
02081     if (dict_index_is_clust(index)) {
02082       err = row_ins_clust_index_entry_by_modify(
02083         mode, &cursor, &heap, &big_rec, entry,
02084         thr, &mtr);
02085     } else {
02086       ut_ad(!n_ext);
02087       err = row_ins_sec_index_entry_by_modify(
02088         mode, &cursor, entry, thr, &mtr);
02089     }
02090   } else {
02091     if (mode == BTR_MODIFY_LEAF) {
02092       err = btr_cur_optimistic_insert(
02093         0, &cursor, entry, &insert_rec, &big_rec,
02094         n_ext, thr, &mtr);
02095     } else {
02096       ut_a(mode == BTR_MODIFY_TREE);
02097       if (buf_LRU_buf_pool_running_out()) {
02098 
02099         err = DB_LOCK_TABLE_FULL;
02100 
02101         goto function_exit;
02102       }
02103       err = btr_cur_pessimistic_insert(
02104         0, &cursor, entry, &insert_rec, &big_rec,
02105         n_ext, thr, &mtr);
02106     }
02107   }
02108 
02109 function_exit:
02110   mtr_commit(&mtr);
02111 
02112   if (UNIV_LIKELY_NULL(big_rec)) {
02113     rec_t*  exit_rec;
02114     ulint*  offsets;
02115     mtr_start(&mtr);
02116 
02117     btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
02118               BTR_MODIFY_TREE, &cursor, 0,
02119               __FILE__, __LINE__, &mtr);
02120     exit_rec = btr_cur_get_rec(&cursor);
02121     offsets = rec_get_offsets(exit_rec, index, NULL,
02122             ULINT_UNDEFINED, &heap);
02123 
02124     err = btr_store_big_rec_extern_fields(
02125       index, btr_cur_get_block(&cursor),
02126       exit_rec, offsets, big_rec, &mtr);
02127 
02128     if (modify) {
02129       dtuple_big_rec_free(big_rec);
02130     } else {
02131       dtuple_convert_back_big_rec(index, entry, big_rec);
02132     }
02133 
02134     mtr_commit(&mtr);
02135   }
02136 
02137   if (UNIV_LIKELY_NULL(heap)) {
02138     mem_heap_free(heap);
02139   }
02140   return(err);
02141 }
02142 
02143 /***************************************************************/
02149 UNIV_INTERN
02150 ulint
02151 row_ins_index_entry(
02152 /*================*/
02153   dict_index_t* index,  
02154   dtuple_t* entry,  
02155   ulint   n_ext,  
02156   ibool   foreign,
02158   que_thr_t*  thr)  
02159 {
02160   enum db_err err;
02161 
02162   if (foreign && UT_LIST_GET_FIRST(index->table->foreign_list)) {
02163     err = static_cast<db_err>(row_ins_check_foreign_constraints(index->table, index,
02164               entry, thr));
02165     if (err != DB_SUCCESS) {
02166 
02167       return(err);
02168     }
02169   }
02170 
02171   /* Try first optimistic descent to the B-tree */
02172 
02173   err = static_cast<db_err>(row_ins_index_entry_low(BTR_MODIFY_LEAF, index, entry,
02174               n_ext, thr));
02175   if (err != DB_FAIL) {
02176 
02177     return(err);
02178   }
02179 
02180   /* Try then pessimistic descent to the B-tree */
02181 
02182   err = static_cast<db_err>(row_ins_index_entry_low(BTR_MODIFY_TREE, index, entry,
02183               n_ext, thr));
02184   return(err);
02185 }
02186 
02187 /***********************************************************/
02190 static
02191 void
02192 row_ins_index_entry_set_vals(
02193 /*=========================*/
02194   dict_index_t* index,  
02195   dtuple_t* entry,  
02196   const dtuple_t* row)  
02197 {
02198   ulint n_fields;
02199   ulint i;
02200 
02201   ut_ad(entry && row);
02202 
02203   n_fields = dtuple_get_n_fields(entry);
02204 
02205   for (i = 0; i < n_fields; i++) {
02206     dict_field_t* ind_field;
02207     dfield_t* field;
02208     const dfield_t* row_field;
02209     ulint   len;
02210 
02211     field = dtuple_get_nth_field(entry, i);
02212     ind_field = dict_index_get_nth_field(index, i);
02213     row_field = dtuple_get_nth_field(row, ind_field->col->ind);
02214     len = dfield_get_len(row_field);
02215 
02216     /* Check column prefix indexes */
02217     if (ind_field->prefix_len > 0
02218         && dfield_get_len(row_field) != UNIV_SQL_NULL) {
02219 
02220       const dict_col_t* col
02221         = dict_field_get_col(ind_field);
02222 
02223       len = dtype_get_at_most_n_mbchars(
02224         col->prtype, col->mbminmaxlen,
02225         ind_field->prefix_len,
02226         len, static_cast<const char *>(dfield_get_data(row_field)));
02227 
02228       ut_ad(!dfield_is_ext(row_field));
02229     }
02230 
02231     dfield_set_data(field, dfield_get_data(row_field), len);
02232     if (dfield_is_ext(row_field)) {
02233       ut_ad(dict_index_is_clust(index));
02234       dfield_set_ext(field);
02235     }
02236   }
02237 }
02238 
02239 /***********************************************************/
02243 static
02244 ulint
02245 row_ins_index_entry_step(
02246 /*=====================*/
02247   ins_node_t* node, 
02248   que_thr_t*  thr)  
02249 {
02250   enum db_err err;
02251 
02252   ut_ad(dtuple_check_typed(node->row));
02253 
02254   row_ins_index_entry_set_vals(node->index, node->entry, node->row);
02255 
02256   ut_ad(dtuple_check_typed(node->entry));
02257 
02258   err = static_cast<db_err>(row_ins_index_entry(node->index, node->entry, 0, TRUE, thr));
02259 
02260   return(err);
02261 }
02262 
02263 /***********************************************************/
02265 UNIV_INLINE
02266 void
02267 row_ins_alloc_row_id_step(
02268 /*======================*/
02269   ins_node_t* node) 
02270 {
02271   row_id_t  row_id;
02272 
02273   ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
02274 
02275   if (dict_index_is_unique(dict_table_get_first_index(node->table))) {
02276 
02277     /* No row id is stored if the clustered index is unique */
02278 
02279     return;
02280   }
02281 
02282   /* Fill in row id value to row */
02283 
02284   row_id = dict_sys_get_new_row_id();
02285 
02286   dict_sys_write_row_id(node->row_id_buf, row_id);
02287 }
02288 
02289 /***********************************************************/
02291 UNIV_INLINE
02292 void
02293 row_ins_get_row_from_values(
02294 /*========================*/
02295   ins_node_t* node) 
02296 {
02297   que_node_t* list_node;
02298   dfield_t* dfield;
02299   dtuple_t* row;
02300   ulint   i;
02301 
02302   /* The field values are copied in the buffers of the select node and
02303   it is safe to use them until we fetch from select again: therefore
02304   we can just copy the pointers */
02305 
02306   row = node->row;
02307 
02308   i = 0;
02309   list_node = node->values_list;
02310 
02311   while (list_node) {
02312     eval_exp(list_node);
02313 
02314     dfield = dtuple_get_nth_field(row, i);
02315     dfield_copy_data(dfield, que_node_get_val(list_node));
02316 
02317     i++;
02318     list_node = que_node_get_next(list_node);
02319   }
02320 }
02321 
02322 /***********************************************************/
02324 UNIV_INLINE
02325 void
02326 row_ins_get_row_from_select(
02327 /*========================*/
02328   ins_node_t* node) 
02329 {
02330   que_node_t* list_node;
02331   dfield_t* dfield;
02332   dtuple_t* row;
02333   ulint   i;
02334 
02335   /* The field values are copied in the buffers of the select node and
02336   it is safe to use them until we fetch from select again: therefore
02337   we can just copy the pointers */
02338 
02339   row = node->row;
02340 
02341   i = 0;
02342   list_node = node->select->select_list;
02343 
02344   while (list_node) {
02345     dfield = dtuple_get_nth_field(row, i);
02346     dfield_copy_data(dfield, que_node_get_val(list_node));
02347 
02348     i++;
02349     list_node = que_node_get_next(list_node);
02350   }
02351 }
02352 
02353 /***********************************************************/
02357 static
02358 ulint
02359 row_ins(
02360 /*====*/
02361   ins_node_t* node, 
02362   que_thr_t*  thr)  
02363 {
02364   ulint err;
02365 
02366   ut_ad(node && thr);
02367 
02368   if (node->state == INS_NODE_ALLOC_ROW_ID) {
02369 
02370     row_ins_alloc_row_id_step(node);
02371 
02372     node->index = dict_table_get_first_index(node->table);
02373     node->entry = UT_LIST_GET_FIRST(node->entry_list);
02374 
02375     if (node->ins_type == INS_SEARCHED) {
02376 
02377       row_ins_get_row_from_select(node);
02378 
02379     } else if (node->ins_type == INS_VALUES) {
02380 
02381       row_ins_get_row_from_values(node);
02382     }
02383 
02384     node->state = INS_NODE_INSERT_ENTRIES;
02385   }
02386 
02387   ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
02388 
02389   while (node->index != NULL) {
02390     err = row_ins_index_entry_step(node, thr);
02391 
02392     if (err != DB_SUCCESS) {
02393 
02394       return(err);
02395     }
02396 
02397     node->index = dict_table_get_next_index(node->index);
02398     node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
02399   }
02400 
02401   ut_ad(node->entry == NULL);
02402 
02403   node->state = INS_NODE_ALLOC_ROW_ID;
02404 
02405   return(DB_SUCCESS);
02406 }
02407 
02408 /***********************************************************/
02412 UNIV_INTERN
02413 que_thr_t*
02414 row_ins_step(
02415 /*=========*/
02416   que_thr_t*  thr)  
02417 {
02418   ins_node_t* node;
02419   que_node_t* parent;
02420   sel_node_t* sel_node;
02421   trx_t*    trx;
02422   ulint   err;
02423 
02424   ut_ad(thr);
02425 
02426   trx = thr_get_trx(thr);
02427 
02428   trx_start_if_not_started(trx);
02429 
02430   node = static_cast<ins_node_t *>(thr->run_node);
02431 
02432   ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
02433 
02434   parent = que_node_get_parent(node);
02435   sel_node = node->select;
02436 
02437   if (thr->prev_node == parent) {
02438     node->state = INS_NODE_SET_IX_LOCK;
02439   }
02440 
02441   /* If this is the first time this node is executed (or when
02442   execution resumes after wait for the table IX lock), set an
02443   IX lock on the table and reset the possible select node. MySQL's
02444   partitioned table code may also call an insert within the same
02445   SQL statement AFTER it has used this table handle to do a search.
02446   This happens, for example, when a row update moves it to another
02447   partition. In that case, we have already set the IX lock on the
02448   table during the search operation, and there is no need to set
02449   it again here. But we must write trx->id to node->trx_id_buf. */
02450 
02451   trx_write_trx_id(node->trx_id_buf, trx->id);
02452 
02453   if (node->state == INS_NODE_SET_IX_LOCK) {
02454 
02455     /* It may be that the current session has not yet started
02456     its transaction, or it has been committed: */
02457 
02458     if (trx->id == node->trx_id) {
02459       /* No need to do IX-locking */
02460 
02461       goto same_trx;
02462     }
02463 
02464     err = lock_table(0, node->table, LOCK_IX, thr);
02465 
02466     if (err != DB_SUCCESS) {
02467 
02468       goto error_handling;
02469     }
02470 
02471     node->trx_id = trx->id;
02472 same_trx:
02473     node->state = INS_NODE_ALLOC_ROW_ID;
02474 
02475     if (node->ins_type == INS_SEARCHED) {
02476       /* Reset the cursor */
02477       sel_node->state = SEL_NODE_OPEN;
02478 
02479       /* Fetch a row to insert */
02480 
02481       thr->run_node = sel_node;
02482 
02483       return(thr);
02484     }
02485   }
02486 
02487   if ((node->ins_type == INS_SEARCHED)
02488       && (sel_node->state != SEL_NODE_FETCH)) {
02489 
02490     ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
02491 
02492     /* No more rows to insert */
02493     thr->run_node = parent;
02494 
02495     return(thr);
02496   }
02497 
02498   /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
02499 
02500   err = row_ins(node, thr);
02501 
02502 error_handling:
02503   trx->error_state = err;
02504 
02505   if (err != DB_SUCCESS) {
02506     /* err == DB_LOCK_WAIT or SQL error detected */
02507     return(NULL);
02508   }
02509 
02510   /* DO THE TRIGGER ACTIONS HERE */
02511 
02512   if (node->ins_type == INS_SEARCHED) {
02513     /* Fetch a row to insert */
02514 
02515     thr->run_node = sel_node;
02516   } else {
02517     thr->run_node = que_node_get_parent(node);
02518   }
02519 
02520   return(thr);
02521 }