Drizzled Public API Documentation

row0merge.cc

00001 /*****************************************************************************
00002 
00003 Copyright (C) 2005, 2010, Innobase Oy. All Rights Reserved.
00004 
00005 This program is free software; you can redistribute it and/or modify it under
00006 the terms of the GNU General Public License as published by the Free Software
00007 Foundation; version 2 of the License.
00008 
00009 This program is distributed in the hope that it will be useful, but WITHOUT
00010 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00011 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
00012 
00013 You should have received a copy of the GNU General Public License along with
00014 this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
00015 St, Fifth Floor, Boston, MA 02110-1301 USA
00016 
00017 *****************************************************************************/
00018 
00019 /**************************************************/
00027 #include "row0merge.h"
00028 #include "row0ext.h"
00029 #include "row0row.h"
00030 #include "row0upd.h"
00031 #include "row0ins.h"
00032 #include "row0sel.h"
00033 #include "dict0dict.h"
00034 #include "dict0mem.h"
00035 #include "dict0boot.h"
00036 #include "dict0crea.h"
00037 #include "dict0load.h"
00038 #include "btr0btr.h"
00039 #include "mach0data.h"
00040 #include "trx0rseg.h"
00041 #include "trx0trx.h"
00042 #include "trx0roll.h"
00043 #include "trx0undo.h"
00044 #include "trx0purge.h"
00045 #include "trx0rec.h"
00046 #include "que0que.h"
00047 #include "rem0cmp.h"
00048 #include "read0read.h"
00049 #include "os0file.h"
00050 #include "lock0lock.h"
00051 #include "data0data.h"
00052 #include "data0type.h"
00053 #include "que0que.h"
00054 #include "pars0pars.h"
00055 #include "mem0mem.h"
00056 #include "log0log.h"
00057 #include "ut0sort.h"
00058 #include "handler0alter.h"
00059 #include <unistd.h>
00060 
00061 /* Ignore posix_fadvise() on those platforms where it does not exist */
00062 #if defined __WIN__
00063 # define posix_fadvise(fd, offset, len, advice) /* nothing */
00064 #endif /* __WIN__ */
00065 
00066 #ifdef UNIV_DEBUG
00067 
00068 /* @{ */
00070 static ibool  row_merge_print_cmp;
00072 static ibool  row_merge_print_read;
00074 static ibool  row_merge_print_write;
00077 static ibool  row_merge_print_block;
00079 static ibool  row_merge_print_block_read;
00081 static ibool  row_merge_print_block_write;
00082 /* @} */
00083 #endif /* UNIV_DEBUG */
00084 
00093 typedef byte  row_merge_block_t[1048576];
00094 
00101 typedef byte  mrec_buf_t[UNIV_PAGE_SIZE];
00102 
00107 typedef byte  mrec_t;
00108 
00110 struct row_merge_buf_struct {
00111   mem_heap_t* heap;   
00112   dict_index_t* index;    
00113   ulint   total_size; 
00114   ulint   n_tuples; 
00115   ulint   max_tuples; 
00116   const dfield_t**tuples;   
00119   const dfield_t**tmp_tuples; 
00121 };
00122 
00124 typedef struct row_merge_buf_struct row_merge_buf_t;
00125 
00127 struct merge_file_struct {
00128   int   fd;   
00129   ulint   offset;   
00130   ib_uint64_t n_rec;    
00131 };
00132 
00134 typedef struct merge_file_struct merge_file_t;
00135 
00136 #ifdef UNIV_DEBUG
00137 /******************************************************/
00139 static
00140 void
00141 row_merge_tuple_print(
00142 /*==================*/
00143   FILE*   f,  
00144   const dfield_t* entry,  
00145   ulint   n_fields)
00146 {
00147   ulint j;
00148 
00149   for (j = 0; j < n_fields; j++) {
00150     const dfield_t* field = &entry[j];
00151 
00152     if (dfield_is_null(field)) {
00153       fputs("\n NULL;", f);
00154     } else {
00155       ulint field_len = dfield_get_len(field);
00156       ulint len   = ut_min(field_len, 20);
00157       if (dfield_is_ext(field)) {
00158         fputs("\nE", f);
00159       } else {
00160         fputs("\n ", f);
00161       }
00162       ut_print_buf(f, dfield_get_data(field), len);
00163       if (len != field_len) {
00164         fprintf(f, " (total %lu bytes)", field_len);
00165       }
00166     }
00167   }
00168   putc('\n', f);
00169 }
00170 #endif /* UNIV_DEBUG */
00171 
00172 /******************************************************/
00175 static
00176 row_merge_buf_t*
00177 row_merge_buf_create_low(
00178 /*=====================*/
00179   mem_heap_t* heap,   
00180   dict_index_t* index,    
00181   ulint   max_tuples, 
00182   ulint   buf_size) 
00183 {
00184   row_merge_buf_t*  buf;
00185 
00186   ut_ad(max_tuples > 0);
00187   ut_ad(max_tuples <= sizeof(row_merge_block_t));
00188   ut_ad(max_tuples < buf_size);
00189 
00190   buf = static_cast<row_merge_buf_t *>(mem_heap_zalloc(heap, buf_size));
00191   buf->heap = heap;
00192   buf->index = index;
00193   buf->max_tuples = max_tuples;
00194   buf->tuples = static_cast<const dfield_t **>(mem_heap_alloc(heap,
00195              2 * max_tuples * sizeof *buf->tuples));
00196   buf->tmp_tuples = buf->tuples + max_tuples;
00197 
00198   return(buf);
00199 }
00200 
00201 /******************************************************/
00204 static
00205 row_merge_buf_t*
00206 row_merge_buf_create(
00207 /*=================*/
00208   dict_index_t* index)  
00209 {
00210   row_merge_buf_t*  buf;
00211   ulint     max_tuples;
00212   ulint     buf_size;
00213   mem_heap_t*   heap;
00214 
00215   max_tuples = sizeof(row_merge_block_t)
00216     / ut_max(1, dict_index_get_min_size(index));
00217 
00218   buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
00219 
00220   heap = mem_heap_create(buf_size + sizeof(row_merge_block_t));
00221 
00222   buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
00223 
00224   return(buf);
00225 }
00226 
00227 /******************************************************/
00230 static
00231 row_merge_buf_t*
00232 row_merge_buf_empty(
00233 /*================*/
00234   row_merge_buf_t*  buf)  
00235 {
00236   ulint   buf_size;
00237   ulint   max_tuples  = buf->max_tuples;
00238   mem_heap_t* heap    = buf->heap;
00239   dict_index_t* index   = buf->index;
00240 
00241   buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
00242 
00243   mem_heap_empty(heap);
00244 
00245   return(row_merge_buf_create_low(heap, index, max_tuples, buf_size));
00246 }
00247 
00248 /******************************************************/
00250 static
00251 void
00252 row_merge_buf_free(
00253 /*===============*/
00254   row_merge_buf_t*  buf)  
00255 {
00256   mem_heap_free(buf->heap);
00257 }
00258 
00259 /******************************************************/
00262 static
00263 ibool
00264 row_merge_buf_add(
00265 /*==============*/
00266   row_merge_buf_t*  buf,  
00267   const dtuple_t*   row,  
00268   const row_ext_t*  ext)  
00270 {
00271   ulint     i;
00272   ulint     n_fields;
00273   ulint     data_size;
00274   ulint     extra_size;
00275   const dict_index_t* index;
00276   dfield_t*   entry;
00277   dfield_t*   field;
00278   const dict_field_t* ifield;
00279 
00280   if (buf->n_tuples >= buf->max_tuples) {
00281     return(FALSE);
00282   }
00283 
00284   UNIV_PREFETCH_R(row->fields);
00285 
00286   index = buf->index;
00287 
00288   n_fields = dict_index_get_n_fields(index);
00289 
00290   entry = static_cast<dfield_t *>(mem_heap_alloc(buf->heap, n_fields * sizeof *entry));
00291   buf->tuples[buf->n_tuples] = entry;
00292   field = entry;
00293 
00294   data_size = 0;
00295   extra_size = UT_BITS_IN_BYTES(index->n_nullable);
00296 
00297   ifield = dict_index_get_nth_field(index, 0);
00298 
00299   for (i = 0; i < n_fields; i++, field++, ifield++) {
00300     const dict_col_t* col;
00301     ulint     col_no;
00302     const dfield_t*   row_field;
00303     ulint     len;
00304 
00305     col = ifield->col;
00306     col_no = dict_col_get_no(col);
00307     row_field = dtuple_get_nth_field(row, col_no);
00308     dfield_copy(field, row_field);
00309     len = dfield_get_len(field);
00310 
00311     if (dfield_is_null(field)) {
00312       ut_ad(!(col->prtype & DATA_NOT_NULL));
00313       continue;
00314     } else if (UNIV_LIKELY(!ext)) {
00315     } else if (dict_index_is_clust(index)) {
00316       /* Flag externally stored fields. */
00317       const byte* row_buf = row_ext_lookup(ext, col_no,
00318                    &len);
00319       if (UNIV_LIKELY_NULL(row_buf)) {
00320         ut_a(row_buf != field_ref_zero);
00321         if (i < dict_index_get_n_unique(index)) {
00322           dfield_set_data(field, row_buf, len);
00323         } else {
00324           dfield_set_ext(field);
00325           len = dfield_get_len(field);
00326         }
00327       }
00328     } else {
00329       const byte* row_buf = row_ext_lookup(ext, col_no,
00330                    &len);
00331       if (UNIV_LIKELY_NULL(row_buf)) {
00332         ut_a(row_buf != field_ref_zero);
00333         dfield_set_data(field, row_buf, len);
00334       }
00335     }
00336 
00337     /* If a column prefix index, take only the prefix */
00338 
00339     if (ifield->prefix_len) {
00340       len = dtype_get_at_most_n_mbchars(
00341         col->prtype,
00342         col->mbminmaxlen,
00343         ifield->prefix_len,
00344         len, static_cast<const char *>(dfield_get_data(field)));
00345       dfield_set_len(field, len);
00346     }
00347 
00348     ut_ad(len <= col->len || col->mtype == DATA_BLOB);
00349 
00350     if (ifield->fixed_len) {
00351       ut_ad(len == ifield->fixed_len);
00352       ut_ad(!dfield_is_ext(field));
00353     } else if (dfield_is_ext(field)) {
00354       extra_size += 2;
00355     } else if (len < 128
00356          || (col->len < 256 && col->mtype != DATA_BLOB)) {
00357       extra_size++;
00358     } else {
00359       /* For variable-length columns, we look up the
00360       maximum length from the column itself.  If this
00361       is a prefix index column shorter than 256 bytes,
00362       this will waste one byte. */
00363       extra_size += 2;
00364     }
00365     data_size += len;
00366   }
00367 
00368 #ifdef UNIV_DEBUG
00369   {
00370     ulint size;
00371     ulint extra;
00372 
00373     size = rec_get_converted_size_comp(index,
00374                REC_STATUS_ORDINARY,
00375                entry, n_fields, &extra);
00376 
00377     ut_ad(data_size + extra_size + REC_N_NEW_EXTRA_BYTES == size);
00378     ut_ad(extra_size + REC_N_NEW_EXTRA_BYTES == extra);
00379   }
00380 #endif /* UNIV_DEBUG */
00381 
00382   /* Add to the total size of the record in row_merge_block_t
00383   the encoded length of extra_size and the extra bytes (extra_size).
00384   See row_merge_buf_write() for the variable-length encoding
00385   of extra_size. */
00386   data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
00387 
00388   /* The following assertion may fail if row_merge_block_t is
00389   declared very small and a PRIMARY KEY is being created with
00390   many prefix columns.  In that case, the record may exceed the
00391   page_zip_rec_needs_ext() limit.  However, no further columns
00392   will be moved to external storage until the record is inserted
00393   to the clustered index B-tree. */
00394   ut_ad(data_size < sizeof(row_merge_block_t));
00395 
00396   /* Reserve one byte for the end marker of row_merge_block_t. */
00397   if (buf->total_size + data_size >= sizeof(row_merge_block_t) - 1) {
00398     return(FALSE);
00399   }
00400 
00401   buf->total_size += data_size;
00402   buf->n_tuples++;
00403 
00404   field = entry;
00405 
00406   /* Copy the data fields. */
00407 
00408   do {
00409     dfield_dup(field++, buf->heap);
00410   } while (--n_fields);
00411 
00412   return(TRUE);
00413 }
00414 
00416 struct row_merge_dup_struct {
00417   const dict_index_t* index;    
00418   TABLE*    table;    
00419   ulint     n_dup;    
00420 };
00421 
00423 typedef struct row_merge_dup_struct row_merge_dup_t;
00424 
00425 /*************************************************************/
00427 static
00428 void
00429 row_merge_dup_report(
00430 /*=================*/
00431   row_merge_dup_t*  dup,  
00432   const dfield_t*   entry)  
00433 {
00434   mrec_buf_t*     buf;
00435   const dtuple_t*   tuple;
00436   dtuple_t    tuple_store;
00437   const rec_t*    rec;
00438   const dict_index_t* index = dup->index;
00439   ulint     n_fields= dict_index_get_n_fields(index);
00440   mem_heap_t*   heap;
00441   ulint*      offsets;
00442   ulint     n_ext;
00443 
00444   if (dup->n_dup++) {
00445     /* Only report the first duplicate record,
00446     but count all duplicate records. */
00447     return;
00448   }
00449 
00450   /* Convert the tuple to a record and then to MySQL format. */
00451   heap = mem_heap_create((1 + REC_OFFS_HEADER_SIZE + n_fields)
00452              * sizeof *offsets
00453              + sizeof *buf);
00454 
00455   buf = static_cast<mrec_buf_t *>(mem_heap_alloc(heap, sizeof *buf));
00456 
00457   tuple = dtuple_from_fields(&tuple_store, entry, n_fields);
00458   n_ext = dict_index_is_clust(index) ? dtuple_get_n_ext(tuple) : 0;
00459 
00460   rec = rec_convert_dtuple_to_rec(*buf, index, tuple, n_ext);
00461   offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
00462 
00463   innobase_rec_to_mysql(dup->table, rec, index, offsets);
00464 
00465   mem_heap_free(heap);
00466 }
00467 
00468 /*************************************************************/
00471 static
00472 int
00473 row_merge_tuple_cmp(
00474 /*================*/
00475   ulint     n_field,
00476   const dfield_t*   a,  
00477   const dfield_t*   b,  
00478   row_merge_dup_t*  dup)  
00479 {
00480   int   cmp;
00481   const dfield_t* field = a;
00482 
00483   /* Compare the fields of the tuples until a difference is
00484   found or we run out of fields to compare.  If !cmp at the
00485   end, the tuples are equal. */
00486   do {
00487     cmp = cmp_dfield_dfield(a++, b++);
00488   } while (!cmp && --n_field);
00489 
00490   if (UNIV_UNLIKELY(!cmp) && UNIV_LIKELY_NULL(dup)) {
00491     /* Report a duplicate value error if the tuples are
00492     logically equal.  NULL columns are logically inequal,
00493     although they are equal in the sorting order.  Find
00494     out if any of the fields are NULL. */
00495     for (b = field; b != a; b++) {
00496       if (dfield_is_null(b)) {
00497 
00498         goto func_exit;
00499       }
00500     }
00501 
00502     row_merge_dup_report(dup, field);
00503   }
00504 
00505 func_exit:
00506   return(cmp);
00507 }
00508 
00515 #define row_merge_tuple_sort_ctx(a,b,c,d) \
00516   row_merge_tuple_sort(n_field, dup, a, b, c, d)
00517 
00522 #define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, a, b, dup)
00523 
00524 /**********************************************************************/
00526 static
00527 void
00528 row_merge_tuple_sort(
00529 /*=================*/
00530   ulint     n_field,
00531   row_merge_dup_t*  dup,  
00532   const dfield_t**  tuples, 
00533   const dfield_t**  aux,  
00534   ulint     low,  
00536   ulint     high) 
00538 {
00539   UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
00540             tuples, aux, low, high, row_merge_tuple_cmp_ctx);
00541 }
00542 
00543 /******************************************************/
00545 static
00546 void
00547 row_merge_buf_sort(
00548 /*===============*/
00549   row_merge_buf_t*  buf,  
00550   row_merge_dup_t*  dup)  
00551 {
00552   row_merge_tuple_sort(dict_index_get_n_unique(buf->index), dup,
00553            buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
00554 }
00555 
00556 /******************************************************/
00558 static
00559 void
00560 row_merge_buf_write(
00561 /*================*/
00562   const row_merge_buf_t*  buf,  
00563 #ifdef UNIV_DEBUG
00564   const merge_file_t* of, 
00565 #endif /* UNIV_DEBUG */
00566   row_merge_block_t*  block)  
00567 #ifndef UNIV_DEBUG
00568 # define row_merge_buf_write(buf, of, block) row_merge_buf_write(buf, block)
00569 #endif /* !UNIV_DEBUG */
00570 {
00571   const dict_index_t* index = buf->index;
00572   ulint     n_fields= dict_index_get_n_fields(index);
00573   byte*     b = &(*block)[0];
00574 
00575   ulint   i;
00576 
00577   for (i = 0; i < buf->n_tuples; i++) {
00578     ulint   size;
00579     ulint   extra_size;
00580     const dfield_t* entry   = buf->tuples[i];
00581 
00582     size = rec_get_converted_size_comp(index,
00583                REC_STATUS_ORDINARY,
00584                entry, n_fields,
00585                &extra_size);
00586     ut_ad(size > extra_size);
00587     ut_ad(extra_size >= REC_N_NEW_EXTRA_BYTES);
00588     extra_size -= REC_N_NEW_EXTRA_BYTES;
00589     size -= REC_N_NEW_EXTRA_BYTES;
00590 
00591     /* Encode extra_size + 1 */
00592     if (extra_size + 1 < 0x80) {
00593       *b++ = (byte) (extra_size + 1);
00594     } else {
00595       ut_ad((extra_size + 1) < 0x8000);
00596       *b++ = (byte) (0x80 | ((extra_size + 1) >> 8));
00597       *b++ = (byte) (extra_size + 1);
00598     }
00599 
00600     ut_ad(b + size < block[1]);
00601 
00602     rec_convert_dtuple_to_rec_comp(b + extra_size, 0, index,
00603                  REC_STATUS_ORDINARY,
00604                  entry, n_fields);
00605 
00606     b += size;
00607 
00608 #ifdef UNIV_DEBUG
00609     if (row_merge_print_write) {
00610       fprintf(stderr, "row_merge_buf_write %p,%d,%lu %lu",
00611         (void*) b, of->fd, (ulong) of->offset,
00612         (ulong) i);
00613       row_merge_tuple_print(stderr, entry, n_fields);
00614     }
00615 #endif /* UNIV_DEBUG */
00616   }
00617 
00618   /* Write an "end-of-chunk" marker. */
00619   ut_a(b < block[1]);
00620   ut_a(b == block[0] + buf->total_size);
00621   *b++ = 0;
00622 #ifdef UNIV_DEBUG_VALGRIND
00623   /* The rest of the block is uninitialized.  Initialize it
00624   to avoid bogus warnings. */
00625   memset(b, 0xff, block[1] - b);
00626 #endif /* UNIV_DEBUG_VALGRIND */
00627 #ifdef UNIV_DEBUG
00628   if (row_merge_print_write) {
00629     fprintf(stderr, "row_merge_buf_write %p,%d,%lu EOF\n",
00630       (void*) b, of->fd, (ulong) of->offset);
00631   }
00632 #endif /* UNIV_DEBUG */
00633 }
00634 
00635 /******************************************************/
00639 static
00640 mem_heap_t*
00641 row_merge_heap_create(
00642 /*==================*/
00643   const dict_index_t* index,    
00644   mrec_buf_t**    buf,    
00645   ulint**     offsets1, 
00646   ulint**     offsets2) 
00647 {
00648   ulint   i = 1 + REC_OFFS_HEADER_SIZE
00649     + dict_index_get_n_fields(index);
00650   mem_heap_t* heap  = mem_heap_create(2 * i * sizeof **offsets1
00651               + 3 * sizeof **buf);
00652 
00653   *buf = static_cast<mrec_buf_t*>(mem_heap_alloc(heap, 3 * sizeof **buf));
00654   *offsets1 = static_cast<ulint*>(mem_heap_alloc(heap, i * sizeof **offsets1));
00655   *offsets2 = static_cast<ulint*>(mem_heap_alloc(heap, i * sizeof **offsets2));
00656 
00657   (*offsets1)[0] = (*offsets2)[0] = i;
00658   (*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
00659 
00660   return(heap);
00661 }
00662 
00663 /**********************************************************************/
00667 static
00668 dict_index_t*
00669 row_merge_dict_table_get_index(
00670 /*===========================*/
00671   dict_table_t*   table,    
00672   const merge_index_def_t*index_def)  
00673 {
00674   ulint   i;
00675   dict_index_t* index;
00676   const char**  column_names;
00677 
00678   column_names = static_cast<const char **>(mem_alloc(index_def->n_fields * sizeof *column_names));
00679 
00680   for (i = 0; i < index_def->n_fields; ++i) {
00681     column_names[i] = index_def->fields[i].field_name;
00682   }
00683 
00684   index = dict_table_get_index_by_max_id(
00685     table, index_def->name, column_names, index_def->n_fields);
00686 
00687   mem_free((void*) column_names);
00688 
00689   return(index);
00690 }
00691 
00692 /********************************************************************/
00695 static
00696 ibool
00697 row_merge_read(
00698 /*===========*/
00699   int     fd, 
00700   ulint     offset, 
00703   row_merge_block_t*  buf)  
00704 {
00705   ib_uint64_t ofs = ((ib_uint64_t) offset) * sizeof *buf;
00706   ibool   success;
00707 
00708 #ifdef UNIV_DEBUG
00709   if (row_merge_print_block_read) {
00710     fprintf(stderr, "row_merge_read fd=%d ofs=%lu\n",
00711       fd, (ulong) offset);
00712   }
00713 #endif /* UNIV_DEBUG */
00714 
00715   success = os_file_read_no_error_handling(OS_FILE_FROM_FD(fd), buf,
00716              (ulint) (ofs & 0xFFFFFFFF),
00717              (ulint) (ofs >> 32),
00718              sizeof *buf);
00719 #ifdef POSIX_FADV_DONTNEED
00720   /* Each block is read exactly once.  Free up the file cache. */
00721   posix_fadvise(fd, ofs, sizeof *buf, POSIX_FADV_DONTNEED);
00722 #endif /* POSIX_FADV_DONTNEED */
00723 
00724   if (UNIV_UNLIKELY(!success)) {
00725     ut_print_timestamp(stderr);
00726     fprintf(stderr,
00727       "  InnoDB: failed to read merge block at %"PRIu64"\n", ofs);
00728   }
00729 
00730   return(UNIV_LIKELY(success));
00731 }
00732 
00733 /********************************************************************/
00736 static
00737 ibool
00738 row_merge_write(
00739 /*============*/
00740   int   fd, 
00741   ulint   offset, 
00743   const void* buf)  
00744 {
00745   size_t    buf_len = sizeof(row_merge_block_t);
00746   ib_uint64_t ofs = buf_len * (ib_uint64_t) offset;
00747   ibool   ret;
00748 
00749   ret = os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf,
00750           (ulint) (ofs & 0xFFFFFFFF),
00751           (ulint) (ofs >> 32),
00752           buf_len);
00753 
00754 #ifdef UNIV_DEBUG
00755   if (row_merge_print_block_write) {
00756     fprintf(stderr, "row_merge_write fd=%d ofs=%lu\n",
00757       fd, (ulong) offset);
00758   }
00759 #endif /* UNIV_DEBUG */
00760 
00761 #ifdef POSIX_FADV_DONTNEED
00762   /* The block will be needed on the next merge pass,
00763   but it can be evicted from the file cache meanwhile. */
00764   posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
00765 #endif /* POSIX_FADV_DONTNEED */
00766 
00767   return(UNIV_LIKELY(ret));
00768 }
00769 
00770 /********************************************************************/
00773 static __attribute__((nonnull))
00774 const byte*
00775 row_merge_read_rec(
00776 /*===============*/
00777   row_merge_block_t*  block,  
00778   mrec_buf_t*   buf,  
00779   const byte*   b,  
00780   const dict_index_t* index,  
00781   int     fd, 
00782   ulint*      foffs,  
00783   const mrec_t**    mrec, 
00786   ulint*      offsets)
00787 {
00788   ulint extra_size;
00789   ulint data_size;
00790   ulint avail_size;
00791 
00792   ut_ad(block);
00793   ut_ad(buf);
00794   ut_ad(b >= block[0]);
00795   ut_ad(b < block[1]);
00796   ut_ad(index);
00797   ut_ad(foffs);
00798   ut_ad(mrec);
00799   ut_ad(offsets);
00800 
00801   ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
00802         + dict_index_get_n_fields(index));
00803 
00804   extra_size = *b++;
00805 
00806   if (UNIV_UNLIKELY(!extra_size)) {
00807     /* End of list */
00808     *mrec = NULL;
00809 #ifdef UNIV_DEBUG
00810     if (row_merge_print_read) {
00811       fprintf(stderr, "row_merge_read %p,%p,%d,%lu EOF\n",
00812         (const void*) b, (const void*) block,
00813         fd, (ulong) *foffs);
00814     }
00815 #endif /* UNIV_DEBUG */
00816     return(NULL);
00817   }
00818 
00819   if (extra_size >= 0x80) {
00820     /* Read another byte of extra_size. */
00821 
00822     if (UNIV_UNLIKELY(b >= block[1])) {
00823       if (!row_merge_read(fd, ++(*foffs), block)) {
00824 err_exit:
00825         /* Signal I/O error. */
00826         *mrec = b;
00827         return(NULL);
00828       }
00829 
00830       /* Wrap around to the beginning of the buffer. */
00831       b = block[0];
00832     }
00833 
00834     extra_size = (extra_size & 0x7f) << 8;
00835     extra_size |= *b++;
00836   }
00837 
00838   /* Normalize extra_size.  Above, value 0 signals "end of list". */
00839   extra_size--;
00840 
00841   /* Read the extra bytes. */
00842 
00843   if (UNIV_UNLIKELY(b + extra_size >= block[1])) {
00844     /* The record spans two blocks.  Copy the entire record
00845     to the auxiliary buffer and handle this as a special
00846     case. */
00847 
00848     avail_size = block[1] - b;
00849 
00850     memcpy(*buf, b, avail_size);
00851 
00852     if (!row_merge_read(fd, ++(*foffs), block)) {
00853 
00854       goto err_exit;
00855     }
00856 
00857     /* Wrap around to the beginning of the buffer. */
00858     b = block[0];
00859 
00860     /* Copy the record. */
00861     memcpy(*buf + avail_size, b, extra_size - avail_size);
00862     b += extra_size - avail_size;
00863 
00864     *mrec = *buf + extra_size;
00865 
00866     rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
00867 
00868     data_size = rec_offs_data_size(offsets);
00869 
00870     /* These overflows should be impossible given that
00871     records are much smaller than either buffer, and
00872     the record starts near the beginning of each buffer. */
00873     ut_a(extra_size + data_size < sizeof *buf);
00874     ut_a(b + data_size < block[1]);
00875 
00876     /* Copy the data bytes. */
00877     memcpy(*buf + extra_size, b, data_size);
00878     b += data_size;
00879 
00880     goto func_exit;
00881   }
00882 
00883   *mrec = b + extra_size;
00884 
00885   rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
00886 
00887   data_size = rec_offs_data_size(offsets);
00888   ut_ad(extra_size + data_size < sizeof *buf);
00889 
00890   b += extra_size + data_size;
00891 
00892   if (UNIV_LIKELY(b < block[1])) {
00893     /* The record fits entirely in the block.
00894     This is the normal case. */
00895     goto func_exit;
00896   }
00897 
00898   /* The record spans two blocks.  Copy it to buf. */
00899 
00900   b -= extra_size + data_size;
00901   avail_size = block[1] - b;
00902   memcpy(*buf, b, avail_size);
00903   *mrec = *buf + extra_size;
00904 #ifdef UNIV_DEBUG
00905   /* We cannot invoke rec_offs_make_valid() here, because there
00906   are no REC_N_NEW_EXTRA_BYTES between extra_size and data_size.
00907   Similarly, rec_offs_validate() would fail, because it invokes
00908   rec_get_status(). */
00909   offsets[2] = (ulint) *mrec;
00910   offsets[3] = (ulint) index;
00911 #endif /* UNIV_DEBUG */
00912 
00913   if (!row_merge_read(fd, ++(*foffs), block)) {
00914 
00915     goto err_exit;
00916   }
00917 
00918   /* Wrap around to the beginning of the buffer. */
00919   b = block[0];
00920 
00921   /* Copy the rest of the record. */
00922   memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
00923   b += extra_size + data_size - avail_size;
00924 
00925 func_exit:
00926 #ifdef UNIV_DEBUG
00927   if (row_merge_print_read) {
00928     fprintf(stderr, "row_merge_read %p,%p,%d,%lu ",
00929       (const void*) b, (const void*) block,
00930       fd, (ulong) *foffs);
00931     rec_print_comp(stderr, *mrec, offsets);
00932     putc('\n', stderr);
00933   }
00934 #endif /* UNIV_DEBUG */
00935 
00936   return(b);
00937 }
00938 
00939 /********************************************************************/
00941 static
00942 void
00943 row_merge_write_rec_low(
00944 /*====================*/
00945   byte*   b,  
00946   ulint   e,  
00947 #ifdef UNIV_DEBUG
00948   ulint   size, 
00949   int   fd, 
00950   ulint   foffs,  
00951 #endif /* UNIV_DEBUG */
00952   const mrec_t* mrec, 
00953   const ulint*  offsets)
00954 #ifndef UNIV_DEBUG
00955 # define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets)  \
00956   row_merge_write_rec_low(b, e, mrec, offsets)
00957 #endif /* !UNIV_DEBUG */
00958 {
00959 #ifdef UNIV_DEBUG
00960   const byte* const end = b + size;
00961   ut_ad(e == rec_offs_extra_size(offsets) + 1);
00962 
00963   if (row_merge_print_write) {
00964     fprintf(stderr, "row_merge_write %p,%d,%lu ",
00965       (void*) b, fd, (ulong) foffs);
00966     rec_print_comp(stderr, mrec, offsets);
00967     putc('\n', stderr);
00968   }
00969 #endif /* UNIV_DEBUG */
00970 
00971   if (e < 0x80) {
00972     *b++ = (byte) e;
00973   } else {
00974     *b++ = (byte) (0x80 | (e >> 8));
00975     *b++ = (byte) e;
00976   }
00977 
00978   memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
00979   ut_ad(b + rec_offs_size(offsets) == end);
00980 }
00981 
00982 /********************************************************************/
00985 static
00986 byte*
00987 row_merge_write_rec(
00988 /*================*/
00989   row_merge_block_t*  block,  
00990   mrec_buf_t*   buf,  
00991   byte*     b,  
00992   int     fd, 
00993   ulint*      foffs,  
00994   const mrec_t*   mrec, 
00995   const ulint*    offsets)
00996 {
00997   ulint extra_size;
00998   ulint size;
00999   ulint avail_size;
01000 
01001   ut_ad(block);
01002   ut_ad(buf);
01003   ut_ad(b >= block[0]);
01004   ut_ad(b < block[1]);
01005   ut_ad(mrec);
01006   ut_ad(foffs);
01007   ut_ad(mrec < block[0] || mrec > block[1]);
01008   ut_ad(mrec < buf[0] || mrec > buf[1]);
01009 
01010   /* Normalize extra_size.  Value 0 signals "end of list". */
01011   extra_size = rec_offs_extra_size(offsets) + 1;
01012 
01013   size = extra_size + (extra_size >= 0x80)
01014     + rec_offs_data_size(offsets);
01015 
01016   if (UNIV_UNLIKELY(b + size >= block[1])) {
01017     /* The record spans two blocks.
01018     Copy it to the temporary buffer first. */
01019     avail_size = block[1] - b;
01020 
01021     row_merge_write_rec_low(buf[0],
01022           extra_size, size, fd, *foffs,
01023           mrec, offsets);
01024 
01025     /* Copy the head of the temporary buffer, write
01026     the completed block, and copy the tail of the
01027     record to the head of the new block. */
01028     memcpy(b, buf[0], avail_size);
01029 
01030     if (!row_merge_write(fd, (*foffs)++, block)) {
01031       return(NULL);
01032     }
01033 
01034     UNIV_MEM_INVALID(block[0], sizeof block[0]);
01035 
01036     /* Copy the rest. */
01037     b = block[0];
01038     memcpy(b, buf[0] + avail_size, size - avail_size);
01039     b += size - avail_size;
01040   } else {
01041     row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
01042           mrec, offsets);
01043     b += size;
01044   }
01045 
01046   return(b);
01047 }
01048 
01049 /********************************************************************/
01052 static
01053 byte*
01054 row_merge_write_eof(
01055 /*================*/
01056   row_merge_block_t*  block,  
01057   byte*     b,  
01058   int     fd, 
01059   ulint*      foffs)  
01060 {
01061   ut_ad(block);
01062   ut_ad(b >= block[0]);
01063   ut_ad(b < block[1]);
01064   ut_ad(foffs);
01065 #ifdef UNIV_DEBUG
01066   if (row_merge_print_write) {
01067     fprintf(stderr, "row_merge_write %p,%p,%d,%lu EOF\n",
01068       (void*) b, (void*) block, fd, (ulong) *foffs);
01069   }
01070 #endif /* UNIV_DEBUG */
01071 
01072   *b++ = 0;
01073   UNIV_MEM_ASSERT_RW(block[0], b - block[0]);
01074   UNIV_MEM_ASSERT_W(block[0], sizeof block[0]);
01075 #ifdef UNIV_DEBUG_VALGRIND
01076   /* The rest of the block is uninitialized.  Initialize it
01077   to avoid bogus warnings. */
01078   memset(b, 0xff, block[1] - b);
01079 #endif /* UNIV_DEBUG_VALGRIND */
01080 
01081   if (!row_merge_write(fd, (*foffs)++, block)) {
01082     return(NULL);
01083   }
01084 
01085   UNIV_MEM_INVALID(block[0], sizeof block[0]);
01086   return(block[0]);
01087 }
01088 
01089 /*************************************************************/
01092 static
01093 int
01094 row_merge_cmp(
01095 /*==========*/
01096   const mrec_t*   mrec1,    
01098   const mrec_t*   mrec2,    
01100   const ulint*    offsets1, 
01101   const ulint*    offsets2, 
01102   const dict_index_t* index,    
01103   ibool*      null_eq)  
01105 {
01106   int cmp;
01107 
01108   cmp = cmp_rec_rec_simple(mrec1, mrec2, offsets1, offsets2, index,
01109          null_eq);
01110 
01111 #ifdef UNIV_DEBUG
01112   if (row_merge_print_cmp) {
01113     fputs("row_merge_cmp1 ", stderr);
01114     rec_print_comp(stderr, mrec1, offsets1);
01115     fputs("\nrow_merge_cmp2 ", stderr);
01116     rec_print_comp(stderr, mrec2, offsets2);
01117     fprintf(stderr, "\nrow_merge_cmp=%d\n", cmp);
01118   }
01119 #endif /* UNIV_DEBUG */
01120 
01121   return(cmp);
01122 }
01123 
01124 /********************************************************************/
01128 static __attribute__((nonnull))
01129 ulint
01130 row_merge_read_clustered_index(
01131 /*===========================*/
01132   trx_t*      trx,  
01133   TABLE*    table,  
01135   const dict_table_t* old_table,
01137   const dict_table_t* new_table,
01140   dict_index_t**    index,  
01141   merge_file_t*   files,  
01142   ulint     n_index,
01143   row_merge_block_t*  block)  
01144 {
01145   dict_index_t*   clust_index;  /* Clustered index */
01146   mem_heap_t*   row_heap; /* Heap memory to create
01147             clustered index records */
01148   row_merge_buf_t** merge_buf;  /* Temporary list for records*/
01149   btr_pcur_t    pcur;   /* Persistent cursor on the
01150             clustered index */
01151   mtr_t     mtr;    /* Mini transaction */
01152   ulint     err = DB_SUCCESS;/* Return code */
01153   ulint     i;
01154   ulint     n_nonnull = 0;  /* number of columns
01155             changed to NOT NULL */
01156   ulint*      nonnull = NULL; /* NOT NULL columns */
01157 
01158   trx->op_info = "reading clustered index";
01159 
01160   ut_ad(trx);
01161   ut_ad(old_table);
01162   ut_ad(new_table);
01163   ut_ad(index);
01164   ut_ad(files);
01165 
01166   /* Create and initialize memory for record buffers */
01167 
01168   merge_buf = static_cast<row_merge_buf_t **>(mem_alloc(n_index * sizeof *merge_buf));
01169 
01170   for (i = 0; i < n_index; i++) {
01171     merge_buf[i] = row_merge_buf_create(index[i]);
01172   }
01173 
01174   mtr_start(&mtr);
01175 
01176   /* Find the clustered index and create a persistent cursor
01177   based on that. */
01178 
01179   clust_index = dict_table_get_first_index(old_table);
01180 
01181   btr_pcur_open_at_index_side(
01182     TRUE, clust_index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
01183 
01184   if (UNIV_UNLIKELY(old_table != new_table)) {
01185     ulint n_cols = dict_table_get_n_cols(old_table);
01186 
01187     /* A primary key will be created.  Identify the
01188     columns that were flagged NOT NULL in the new table,
01189     so that we can quickly check that the records in the
01190     (old) clustered index do not violate the added NOT
01191     NULL constraints. */
01192 
01193     ut_a(n_cols == dict_table_get_n_cols(new_table));
01194 
01195     nonnull = static_cast<ulint*>(mem_alloc(n_cols * sizeof *nonnull));
01196 
01197     for (i = 0; i < n_cols; i++) {
01198       if (dict_table_get_nth_col(old_table, i)->prtype
01199           & DATA_NOT_NULL) {
01200 
01201         continue;
01202       }
01203 
01204       if (dict_table_get_nth_col(new_table, i)->prtype
01205           & DATA_NOT_NULL) {
01206 
01207         nonnull[n_nonnull++] = i;
01208       }
01209     }
01210 
01211     if (!n_nonnull) {
01212       mem_free(nonnull);
01213       nonnull = NULL;
01214     }
01215   }
01216 
01217   row_heap = mem_heap_create(sizeof(mrec_buf_t));
01218 
01219   /* Scan the clustered index. */
01220   for (;;) {
01221     const rec_t*  rec;
01222     ulint*    offsets;
01223     dtuple_t* row   = NULL;
01224     row_ext_t*  ext;
01225     ibool   has_next  = TRUE;
01226 
01227     btr_pcur_move_to_next_on_page(&pcur);
01228 
01229     /* When switching pages, commit the mini-transaction
01230     in order to release the latch on the old page. */
01231 
01232     if (btr_pcur_is_after_last_on_page(&pcur)) {
01233       if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
01234         err = DB_INTERRUPTED;
01235         trx->error_key_num = 0;
01236         goto func_exit;
01237       }
01238 
01239       btr_pcur_store_position(&pcur, &mtr);
01240       mtr_commit(&mtr);
01241       mtr_start(&mtr);
01242       btr_pcur_restore_position(BTR_SEARCH_LEAF,
01243               &pcur, &mtr);
01244       has_next = btr_pcur_move_to_next_user_rec(&pcur, &mtr);
01245     }
01246 
01247     if (UNIV_LIKELY(has_next)) {
01248       rec = btr_pcur_get_rec(&pcur);
01249       offsets = rec_get_offsets(rec, clust_index, NULL,
01250               ULINT_UNDEFINED, &row_heap);
01251 
01252       /* Skip delete marked records. */
01253       if (rec_get_deleted_flag(
01254             rec, dict_table_is_comp(old_table))) {
01255         continue;
01256       }
01257 
01258       srv_n_rows_inserted++;
01259 
01260       /* Build a row based on the clustered index. */
01261 
01262       row = row_build(ROW_COPY_POINTERS, clust_index,
01263           rec, offsets,
01264           new_table, &ext, row_heap);
01265 
01266       if (UNIV_LIKELY_NULL(nonnull)) {
01267         for (i = 0; i < n_nonnull; i++) {
01268           dfield_t* field
01269             = &row->fields[nonnull[i]];
01270           dtype_t*  field_type
01271             = dfield_get_type(field);
01272 
01273           ut_a(!(field_type->prtype
01274                  & DATA_NOT_NULL));
01275 
01276           if (dfield_is_null(field)) {
01277             err = DB_PRIMARY_KEY_IS_NULL;
01278             trx->error_key_num = 0;
01279             goto func_exit;
01280           }
01281 
01282           field_type->prtype |= DATA_NOT_NULL;
01283         }
01284       }
01285     }
01286 
01287     /* Build all entries for all the indexes to be created
01288     in a single scan of the clustered index. */
01289 
01290     for (i = 0; i < n_index; i++) {
01291       row_merge_buf_t*  buf = merge_buf[i];
01292       merge_file_t*   file  = &files[i];
01293       const dict_index_t* buf_index = buf->index;
01294 
01295       if (UNIV_LIKELY
01296           (row && row_merge_buf_add(buf, row, ext))) {
01297         file->n_rec++;
01298         continue;
01299       }
01300 
01301       /* The buffer must be sufficiently large
01302       to hold at least one record. */
01303       ut_ad(buf->n_tuples || !has_next);
01304 
01305       /* We have enough data tuples to form a block.
01306       Sort them and write to disk. */
01307 
01308       if (buf->n_tuples) {
01309         if (dict_index_is_unique(buf_index)) {
01310           row_merge_dup_t dup;
01311           dup.index = buf->index;
01312           dup.table = table;
01313           dup.n_dup = 0;
01314 
01315           row_merge_buf_sort(buf, &dup);
01316 
01317           if (dup.n_dup) {
01318             err = DB_DUPLICATE_KEY;
01319             trx->error_key_num = i;
01320             goto func_exit;
01321           }
01322         } else {
01323           row_merge_buf_sort(buf, NULL);
01324         }
01325       }
01326 
01327       row_merge_buf_write(buf, file, block);
01328 
01329       if (!row_merge_write(file->fd, file->offset++,
01330                block)) {
01331         err = DB_OUT_OF_FILE_SPACE;
01332         trx->error_key_num = i;
01333         goto func_exit;
01334       }
01335 
01336       UNIV_MEM_INVALID(block[0], sizeof block[0]);
01337       merge_buf[i] = row_merge_buf_empty(buf);
01338 
01339       if (UNIV_LIKELY(row != NULL)) {
01340         /* Try writing the record again, now
01341         that the buffer has been written out
01342         and emptied. */
01343 
01344         if (UNIV_UNLIKELY
01345             (!row_merge_buf_add(buf, row, ext))) {
01346           /* An empty buffer should have enough
01347           room for at least one record. */
01348           ut_error;
01349         }
01350 
01351         file->n_rec++;
01352       }
01353     }
01354 
01355     mem_heap_empty(row_heap);
01356 
01357     if (UNIV_UNLIKELY(!has_next)) {
01358       goto func_exit;
01359     }
01360   }
01361 
01362 func_exit:
01363   btr_pcur_close(&pcur);
01364   mtr_commit(&mtr);
01365   mem_heap_free(row_heap);
01366 
01367   if (UNIV_LIKELY_NULL(nonnull)) {
01368     mem_free(nonnull);
01369   }
01370 
01371   for (i = 0; i < n_index; i++) {
01372     row_merge_buf_free(merge_buf[i]);
01373   }
01374 
01375   mem_free(merge_buf);
01376 
01377   trx->op_info = "";
01378 
01379   return(err);
01380 }
01381 
01385 #define ROW_MERGE_WRITE_GET_NEXT(N, AT_END)       \
01386   do {                \
01387     b2 = row_merge_write_rec(&block[2], &buf[2], b2,  \
01388            of->fd, &of->offset,   \
01389            mrec##N, offsets##N);    \
01390     if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) {  \
01391       goto corrupt;         \
01392     }             \
01393     b##N = row_merge_read_rec(&block[N], &buf[N],   \
01394             b##N, index,      \
01395             file->fd, foffs##N,   \
01396             &mrec##N, offsets##N);  \
01397     if (UNIV_UNLIKELY(!b##N)) {       \
01398       if (mrec##N) {          \
01399         goto corrupt;       \
01400       }           \
01401       AT_END;           \
01402     }             \
01403   } while (0)
01404 
01405 /*************************************************************/
01408 static
01409 ulint
01410 row_merge_blocks(
01411 /*=============*/
01412   const dict_index_t* index,  
01413   const merge_file_t* file, 
01415   row_merge_block_t*  block,  
01416   ulint*      foffs0, 
01418   ulint*      foffs1, 
01420   merge_file_t*   of, 
01421   TABLE*    table)  
01424 {
01425   mem_heap_t* heap; 
01427   mrec_buf_t* buf;  
01429   const byte* b0; 
01430   const byte* b1; 
01431   byte*   b2; 
01432   const mrec_t* mrec0;  
01433   const mrec_t* mrec1;  
01434   ulint*    offsets0;/* offsets of mrec0 */
01435   ulint*    offsets1;/* offsets of mrec1 */
01436 
01437 #ifdef UNIV_DEBUG
01438   if (row_merge_print_block) {
01439     fprintf(stderr,
01440       "row_merge_blocks fd=%d ofs=%lu + fd=%d ofs=%lu"
01441       " = fd=%d ofs=%lu\n",
01442       file->fd, (ulong) *foffs0,
01443       file->fd, (ulong) *foffs1,
01444       of->fd, (ulong) of->offset);
01445   }
01446 #endif /* UNIV_DEBUG */
01447 
01448   heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
01449 
01450         buf = static_cast<mrec_buf_t *>(mem_heap_alloc(heap, sizeof(mrec_buf_t) * 3));
01451 
01452   /* Write a record and read the next record.  Split the output
01453   file in two halves, which can be merged on the following pass. */
01454 
01455   if (!row_merge_read(file->fd, *foffs0, &block[0])
01456       || !row_merge_read(file->fd, *foffs1, &block[1])) {
01457 corrupt:
01458     mem_heap_free(heap);
01459     return(DB_CORRUPTION);
01460   }
01461 
01462   b0 = block[0];
01463   b1 = block[1];
01464   b2 = block[2];
01465 
01466   b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
01467         foffs0, &mrec0, offsets0);
01468   b1 = row_merge_read_rec(&block[1], &buf[1], b1, index, file->fd,
01469         foffs1, &mrec1, offsets1);
01470   if (UNIV_UNLIKELY(!b0 && mrec0)
01471       || UNIV_UNLIKELY(!b1 && mrec1)) {
01472 
01473     goto corrupt;
01474   }
01475 
01476   while (mrec0 && mrec1) {
01477     ibool null_eq = FALSE;
01478     switch (row_merge_cmp(mrec0, mrec1,
01479               offsets0, offsets1, index,
01480               &null_eq)) {
01481     case 0:
01482       if (UNIV_UNLIKELY
01483           (dict_index_is_unique(index) && !null_eq)) {
01484         innobase_rec_to_mysql(table, mrec0,
01485                   index, offsets0);
01486         mem_heap_free(heap);
01487         return(DB_DUPLICATE_KEY);
01488       }
01489       /* fall through */
01490     case -1:
01491       ROW_MERGE_WRITE_GET_NEXT(0, goto merged);
01492       break;
01493     case 1:
01494       ROW_MERGE_WRITE_GET_NEXT(1, goto merged);
01495       break;
01496     default:
01497       ut_error;
01498     }
01499 
01500   }
01501 
01502 merged:
01503   if (mrec0) {
01504     /* append all mrec0 to output */
01505     for (;;) {
01506       ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
01507     }
01508   }
01509 done0:
01510   if (mrec1) {
01511     /* append all mrec1 to output */
01512     for (;;) {
01513       ROW_MERGE_WRITE_GET_NEXT(1, goto done1);
01514     }
01515   }
01516 done1:
01517 
01518   mem_heap_free(heap);
01519   b2 = row_merge_write_eof(&block[2], b2, of->fd, &of->offset);
01520   return(b2 ? DB_SUCCESS : DB_CORRUPTION);
01521 }
01522 
01523 /*************************************************************/
01526 static __attribute__((nonnull))
01527 ibool
01528 row_merge_blocks_copy(
01529 /*==================*/
01530   const dict_index_t* index,  
01531   const merge_file_t* file, 
01532   row_merge_block_t*  block,  
01533   ulint*      foffs0, 
01534   merge_file_t*   of) 
01535 {
01536   mem_heap_t* heap; 
01538   mrec_buf_t* buf;  
01540   const byte* b0; 
01541   byte*   b2; 
01542   const mrec_t* mrec0;  
01543   ulint*    offsets0;/* offsets of mrec0 */
01544   ulint*    offsets1;/* dummy offsets */
01545 
01546 #ifdef UNIV_DEBUG
01547   if (row_merge_print_block) {
01548     fprintf(stderr,
01549       "row_merge_blocks_copy fd=%d ofs=%lu"
01550       " = fd=%d ofs=%lu\n",
01551       file->fd, (ulong) foffs0,
01552       of->fd, (ulong) of->offset);
01553   }
01554 #endif /* UNIV_DEBUG */
01555 
01556   heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
01557         buf = static_cast<mrec_buf_t *>(mem_heap_alloc(heap, sizeof(mrec_buf_t) * 3));
01558 
01559   /* Write a record and read the next record.  Split the output
01560   file in two halves, which can be merged on the following pass. */
01561 
01562   if (!row_merge_read(file->fd, *foffs0, &block[0])) {
01563 corrupt:
01564     mem_heap_free(heap);
01565     return(FALSE);
01566   }
01567 
01568   b0 = block[0];
01569   b2 = block[2];
01570 
01571   b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
01572         foffs0, &mrec0, offsets0);
01573   if (UNIV_UNLIKELY(!b0 && mrec0)) {
01574 
01575     goto corrupt;
01576   }
01577 
01578   if (mrec0) {
01579     /* append all mrec0 to output */
01580     for (;;) {
01581       ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
01582     }
01583   }
01584 done0:
01585 
01586   /* The file offset points to the beginning of the last page
01587   that has been read.  Update it to point to the next block. */
01588   (*foffs0)++;
01589 
01590   mem_heap_free(heap);
01591   return(row_merge_write_eof(&block[2], b2, of->fd, &of->offset)
01592          != NULL);
01593 }
01594 
01595 /*************************************************************/
01598 static __attribute__((nonnull))
01599 ulint
01600 row_merge(
01601 /*======*/
01602   trx_t*      trx,  
01603   const dict_index_t* index,  
01604   merge_file_t*   file, 
01606   row_merge_block_t*  block,  
01607   int*      tmpfd,  
01608   TABLE*    table,  
01611   ulint*      num_run,
01613   ulint*      run_offset) 
01616 {
01617   ulint   foffs0; 
01618   ulint   foffs1; 
01619   ulint   error;  
01620   merge_file_t  of; 
01621   const ulint ihalf = run_offset[*num_run / 2];
01623   ulint   n_run = 0;
01627   UNIV_MEM_ASSERT_W(block[0], 3 * sizeof block[0]);
01628   ut_ad(ihalf < file->offset);
01629 
01630   of.fd = *tmpfd;
01631   of.offset = 0;
01632   of.n_rec = 0;
01633 
01634 #ifdef POSIX_FADV_SEQUENTIAL
01635   /* The input file will be read sequentially, starting from the
01636   beginning and the middle.  In Linux, the POSIX_FADV_SEQUENTIAL
01637   affects the entire file.  Each block will be read exactly once. */
01638   posix_fadvise(file->fd, 0, 0,
01639           POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE);
01640 #endif /* POSIX_FADV_SEQUENTIAL */
01641 
01642   /* Merge blocks to the output file. */
01643   foffs0 = 0;
01644   foffs1 = ihalf;
01645 
01646   UNIV_MEM_INVALID(run_offset, *num_run * sizeof *run_offset);
01647 
01648   for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
01649 
01650     if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
01651       return(DB_INTERRUPTED);
01652     }
01653 
01654     /* Remember the offset number for this run */
01655     run_offset[n_run++] = of.offset;
01656 
01657     error = row_merge_blocks(index, file, block,
01658            &foffs0, &foffs1, &of, table);
01659 
01660     if (error != DB_SUCCESS) {
01661       return(error);
01662     }
01663 
01664   }
01665 
01666   /* Copy the last blocks, if there are any. */
01667 
01668   while (foffs0 < ihalf) {
01669     if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
01670       return(DB_INTERRUPTED);
01671     }
01672 
01673     /* Remember the offset number for this run */
01674     run_offset[n_run++] = of.offset;
01675 
01676     if (!row_merge_blocks_copy(index, file, block, &foffs0, &of)) {
01677       return(DB_CORRUPTION);
01678     }
01679   }
01680 
01681   ut_ad(foffs0 == ihalf);
01682 
01683   while (foffs1 < file->offset) {
01684     if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
01685       return(DB_INTERRUPTED);
01686     }
01687 
01688     /* Remember the offset number for this run */
01689     run_offset[n_run++] = of.offset;
01690 
01691     if (!row_merge_blocks_copy(index, file, block, &foffs1, &of)) {
01692       return(DB_CORRUPTION);
01693     }
01694   }
01695 
01696   ut_ad(foffs1 == file->offset);
01697 
01698   if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
01699     return(DB_CORRUPTION);
01700   }
01701 
01702   ut_ad(n_run <= *num_run);
01703 
01704   *num_run = n_run;
01705 
01706   /* Each run can contain one or more offsets. As merge goes on,
01707   the number of runs (to merge) will reduce until we have one
01708   single run. So the number of runs will always be smaller than
01709   the number of offsets in file */
01710   ut_ad((*num_run) <= file->offset);
01711 
01712   /* The number of offsets in output file is always equal or
01713   smaller than input file */
01714   ut_ad(of.offset <= file->offset);
01715 
01716   /* Swap file descriptors for the next pass. */
01717   *tmpfd = file->fd;
01718   *file = of;
01719 
01720   UNIV_MEM_INVALID(block[0], 3 * sizeof block[0]);
01721 
01722   return(DB_SUCCESS);
01723 }
01724 
01725 /*************************************************************/
01728 static
01729 ulint
01730 row_merge_sort(
01731 /*===========*/
01732   trx_t*      trx,  
01733   const dict_index_t* index,  
01734   merge_file_t*   file, 
01736   row_merge_block_t*  block,  
01737   int*      tmpfd,  
01738   TABLE*    table)  
01741 {
01742   ulint half = file->offset / 2;
01743   ulint num_runs;
01744   ulint*  run_offset;
01745   ulint error = DB_SUCCESS;
01746 
01747   /* Record the number of merge runs we need to perform */
01748   num_runs = file->offset;
01749 
01750   /* If num_runs are less than 1, nothing to merge */
01751   if (num_runs <= 1) {
01752     return(error);
01753   }
01754 
01755   /* "run_offset" records each run's first offset number */
01756   run_offset = (ulint*) mem_alloc(file->offset * sizeof(ulint));
01757 
01758   /* This tells row_merge() where to start for the first round
01759   of merge. */
01760   run_offset[half] = half;
01761 
01762   /* The file should always contain at least one byte (the end
01763   of file marker).  Thus, it must be at least one block. */
01764   ut_ad(file->offset > 0);
01765 
01766   /* Merge the runs until we have one big run */
01767   do {
01768     error = row_merge(trx, index, file, block, tmpfd,
01769           table, &num_runs, run_offset);
01770 
01771     UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
01772 
01773     if (error != DB_SUCCESS) {
01774       break;
01775     }
01776   } while (num_runs > 1);
01777 
01778   mem_free(run_offset);
01779 
01780   return(error);
01781 }
01782 
01783 /*************************************************************/
01785 static
01786 void
01787 row_merge_copy_blobs(
01788 /*=================*/
01789   const mrec_t* mrec, 
01790   const ulint*  offsets,
01791   ulint   zip_size,
01792   dtuple_t* tuple,  
01793   mem_heap_t* heap) 
01794 {
01795   ulint i;
01796   ulint n_fields = dtuple_get_n_fields(tuple);
01797 
01798   for (i = 0; i < n_fields; i++) {
01799     ulint   len;
01800     const void* data;
01801     dfield_t* field = dtuple_get_nth_field(tuple, i);
01802 
01803     if (!dfield_is_ext(field)) {
01804       continue;
01805     }
01806 
01807     ut_ad(!dfield_is_null(field));
01808 
01809     /* The table is locked during index creation.
01810     Therefore, externally stored columns cannot possibly
01811     be freed between the time the BLOB pointers are read
01812     (row_merge_read_clustered_index()) and dereferenced
01813     (below). */
01814     data = btr_rec_copy_externally_stored_field(
01815       mrec, offsets, zip_size, i, &len, heap);
01816     /* Because we have locked the table, any records
01817     written by incomplete transactions must have been
01818     rolled back already. There must not be any incomplete
01819     BLOB columns. */
01820     ut_a(data);
01821 
01822     dfield_set_data(field, data, len);
01823   }
01824 }
01825 
01826 /********************************************************************/
01830 static
01831 ulint
01832 row_merge_insert_index_tuples(
01833 /*==========================*/
01834   trx_t*      trx,  
01835   dict_index_t*   index,  
01836   dict_table_t*   table,  
01837   ulint     zip_size,
01839   int     fd, 
01840   row_merge_block_t*  block)  
01841 {
01842   const byte*   b;
01843   que_thr_t*    thr;
01844   ins_node_t*   node;
01845   mem_heap_t*   tuple_heap;
01846   mem_heap_t*   graph_heap;
01847   ulint     error = DB_SUCCESS;
01848   ulint     foffs = 0;
01849   ulint*      offsets;
01850 
01851   ut_ad(trx);
01852   ut_ad(index);
01853   ut_ad(table);
01854 
01855   /* We use the insert query graph as the dummy graph
01856   needed in the row module call */
01857 
01858   trx->op_info = "inserting index entries";
01859 
01860   graph_heap = mem_heap_create(500 + sizeof(mrec_buf_t));
01861   node = ins_node_create(INS_DIRECT, table, graph_heap);
01862 
01863   thr = pars_complete_graph_for_exec(node, trx, graph_heap);
01864 
01865   que_thr_move_to_run_state_for_mysql(thr, trx);
01866 
01867   tuple_heap = mem_heap_create(1000);
01868 
01869   {
01870     ulint i = 1 + REC_OFFS_HEADER_SIZE
01871       + dict_index_get_n_fields(index);
01872     offsets = static_cast<ulint *>(mem_heap_alloc(graph_heap, i * sizeof *offsets));
01873     offsets[0] = i;
01874     offsets[1] = dict_index_get_n_fields(index);
01875   }
01876 
01877   b = *block;
01878 
01879   if (!row_merge_read(fd, foffs, block)) {
01880     error = DB_CORRUPTION;
01881   } else {
01882     mrec_buf_t* buf = static_cast<mrec_buf_t *>(mem_heap_alloc(graph_heap, sizeof *buf));
01883 
01884     for (;;) {
01885       const mrec_t* mrec;
01886       dtuple_t* dtuple;
01887       ulint   n_ext;
01888 
01889       b = row_merge_read_rec(block, buf, b, index,
01890                  fd, &foffs, &mrec, offsets);
01891       if (UNIV_UNLIKELY(!b)) {
01892         /* End of list, or I/O error */
01893         if (mrec) {
01894           error = DB_CORRUPTION;
01895         }
01896         break;
01897       }
01898 
01899       dtuple = row_rec_to_index_entry_low(
01900         mrec, index, offsets, &n_ext, tuple_heap);
01901 
01902       if (UNIV_UNLIKELY(n_ext)) {
01903         row_merge_copy_blobs(mrec, offsets, zip_size,
01904                  dtuple, tuple_heap);
01905       }
01906 
01907       node->row = dtuple;
01908       node->table = table;
01909       node->trx_id = trx->id;
01910 
01911       ut_ad(dtuple_validate(dtuple));
01912 
01913       do {
01914         thr->run_node = thr;
01915         thr->prev_node = thr->common.parent;
01916 
01917         error = row_ins_index_entry(index, dtuple,
01918                   0, FALSE, thr);
01919 
01920         if (UNIV_LIKELY(error == DB_SUCCESS)) {
01921 
01922           goto next_rec;
01923         }
01924 
01925         thr->lock_state = QUE_THR_LOCK_ROW;
01926         trx->error_state = error;
01927         que_thr_stop_for_mysql(thr);
01928         thr->lock_state = QUE_THR_LOCK_NOLOCK;
01929       } while (row_mysql_handle_errors(&error, trx,
01930                thr, NULL));
01931 
01932       goto err_exit;
01933 next_rec:
01934       mem_heap_empty(tuple_heap);
01935     }
01936   }
01937 
01938   que_thr_stop_for_mysql_no_error(thr, trx);
01939 err_exit:
01940   que_graph_free(thr->graph);
01941 
01942   trx->op_info = "";
01943 
01944   mem_heap_free(tuple_heap);
01945 
01946   return(error);
01947 }
01948 
01949 /*********************************************************************/
01952 UNIV_INTERN
01953 ulint
01954 row_merge_lock_table(
01955 /*=================*/
01956   trx_t*    trx,    
01957   dict_table_t* table,    
01958   enum lock_mode  mode)   
01959 {
01960   mem_heap_t* heap;
01961   que_thr_t*  thr;
01962   ulint   err;
01963   sel_node_t* node;
01964 
01965   ut_ad(trx);
01966   ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
01967   ut_ad(mode == LOCK_X || mode == LOCK_S);
01968 
01969   heap = mem_heap_create(512);
01970 
01971   trx->op_info = "setting table lock for creating or dropping index";
01972 
01973   node = sel_node_create(heap);
01974   thr = pars_complete_graph_for_exec(node, trx, heap);
01975   thr->graph->state = QUE_FORK_ACTIVE;
01976 
01977   /* We use the select query graph as the dummy graph needed
01978   in the lock module call */
01979 
01980   thr = que_fork_get_first_thr(static_cast<que_fork_t *>(que_node_get_parent(thr)));
01981   que_thr_move_to_run_state_for_mysql(thr, trx);
01982 
01983 run_again:
01984   thr->run_node = thr;
01985   thr->prev_node = thr->common.parent;
01986 
01987   err = lock_table(0, table, mode, thr);
01988 
01989   trx->error_state = err;
01990 
01991   if (UNIV_LIKELY(err == DB_SUCCESS)) {
01992     que_thr_stop_for_mysql_no_error(thr, trx);
01993   } else {
01994     que_thr_stop_for_mysql(thr);
01995 
01996     if (err != DB_QUE_THR_SUSPENDED) {
01997       ibool was_lock_wait;
01998 
01999       was_lock_wait = row_mysql_handle_errors(
02000         &err, trx, thr, NULL);
02001 
02002       if (was_lock_wait) {
02003         goto run_again;
02004       }
02005     } else {
02006       que_thr_t*  run_thr;
02007       que_node_t* parent;
02008 
02009       parent = que_node_get_parent(thr);
02010       run_thr = que_fork_start_command(static_cast<que_fork_t *>(parent));
02011 
02012       ut_a(run_thr == thr);
02013 
02014       /* There was a lock wait but the thread was not
02015       in a ready to run or running state. */
02016       trx->error_state = DB_LOCK_WAIT;
02017 
02018       goto run_again;
02019     }
02020   }
02021 
02022   que_graph_free(thr->graph);
02023   trx->op_info = "";
02024 
02025   return(err);
02026 }
02027 
02028 /*********************************************************************/
02032 UNIV_INTERN
02033 void
02034 row_merge_drop_index(
02035 /*=================*/
02036   dict_index_t* index,  
02037   dict_table_t* table,  
02038   trx_t*    trx)  
02039 {
02040   ulint   err;
02041   pars_info_t*  info = pars_info_create();
02042 
02043   /* We use the private SQL parser of Innobase to generate the
02044   query graphs needed in deleting the dictionary data from system
02045   tables in Innobase. Deleting a row from SYS_INDEXES table also
02046   frees the file segments of the B-tree associated with the index. */
02047 
02048   static const char str1[] =
02049     "PROCEDURE DROP_INDEX_PROC () IS\n"
02050     "BEGIN\n"
02051     /* Rename the index, so that it will be dropped by
02052     row_merge_drop_temp_indexes() at crash recovery
02053     if the server crashes before this trx is committed. */
02054     "UPDATE SYS_INDEXES SET NAME=CONCAT('"
02055     TEMP_INDEX_PREFIX_STR "', NAME) WHERE ID = :indexid;\n"
02056     "COMMIT WORK;\n"
02057     /* Drop the field definitions of the index. */
02058     "DELETE FROM SYS_FIELDS WHERE INDEX_ID = :indexid;\n"
02059     /* Drop the index definition and the B-tree. */
02060     "DELETE FROM SYS_INDEXES WHERE ID = :indexid;\n"
02061     "END;\n";
02062 
02063   ut_ad(index && table && trx);
02064 
02065   pars_info_add_ull_literal(info, "indexid", index->id);
02066 
02067   trx_start_if_not_started(trx);
02068   trx->op_info = "dropping index";
02069 
02070   ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
02071 
02072   err = que_eval_sql(info, str1, FALSE, trx);
02073 
02074   ut_a(err == DB_SUCCESS);
02075 
02076   /* Replace this index with another equivalent index for all
02077   foreign key constraints on this table where this index is used */
02078 
02079   dict_table_replace_index_in_foreign_list(table, index, trx);
02080   dict_index_remove_from_cache(table, index);
02081 
02082   trx->op_info = "";
02083 }
02084 
02085 /*********************************************************************/
02090 UNIV_INTERN
02091 void
02092 row_merge_drop_indexes(
02093 /*===================*/
02094   trx_t*    trx,    
02095   dict_table_t* table,    
02096   dict_index_t**  index,    
02097   ulint   num_created)  
02098 {
02099   ulint key_num;
02100 
02101   for (key_num = 0; key_num < num_created; key_num++) {
02102     row_merge_drop_index(index[key_num], table, trx);
02103   }
02104 }
02105 
02106 /*********************************************************************/
02108 UNIV_INTERN
02109 void
02110 row_merge_drop_temp_indexes(void)
02111 /*=============================*/
02112 {
02113   trx_t*    trx;
02114   btr_pcur_t  pcur;
02115   mtr_t   mtr;
02116 
02117   /* Load the table definitions that contain partially defined
02118   indexes, so that the data dictionary information can be checked
02119   when accessing the tablename.ibd files. */
02120 
02121   trx = trx_allocate_for_background();
02122   trx->op_info = "dropping partially created indexes";
02123   row_mysql_lock_data_dictionary(trx);
02124 
02125   mtr_start(&mtr);
02126 
02127   btr_pcur_open_at_index_side(
02128     TRUE,
02129     dict_table_get_first_index(dict_sys->sys_indexes),
02130     BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
02131 
02132   for (;;) {
02133     const rec_t*  rec;
02134     const byte* field;
02135     ulint   len;
02136     table_id_t  table_id;
02137     dict_table_t* table;
02138 
02139     btr_pcur_move_to_next_user_rec(&pcur, &mtr);
02140 
02141     if (!btr_pcur_is_on_user_rec(&pcur)) {
02142       break;
02143     }
02144 
02145     rec = btr_pcur_get_rec(&pcur);
02146     field = rec_get_nth_field_old(rec, DICT_SYS_INDEXES_NAME_FIELD,
02147                 &len);
02148     if (len == UNIV_SQL_NULL || len == 0
02149         || (char) *field != TEMP_INDEX_PREFIX) {
02150       continue;
02151     }
02152 
02153     /* This is a temporary index. */
02154 
02155     field = rec_get_nth_field_old(rec, 0/*TABLE_ID*/, &len);
02156     if (len != 8) {
02157       /* Corrupted TABLE_ID */
02158       continue;
02159     }
02160 
02161     table_id = mach_read_from_8(field);
02162 
02163     btr_pcur_store_position(&pcur, &mtr);
02164     btr_pcur_commit_specify_mtr(&pcur, &mtr);
02165 
02166     table = dict_table_get_on_id_low(table_id);
02167 
02168     if (table) {
02169       dict_index_t* index;
02170       dict_index_t* next_index;
02171 
02172       for (index = dict_table_get_first_index(table);
02173            index; index = next_index) {
02174 
02175         next_index = dict_table_get_next_index(index);
02176 
02177         if (*index->name == TEMP_INDEX_PREFIX) {
02178           row_merge_drop_index(index, table, trx);
02179           trx_commit_for_mysql(trx);
02180         }
02181       }
02182     }
02183 
02184     mtr_start(&mtr);
02185     btr_pcur_restore_position(BTR_SEARCH_LEAF,
02186             &pcur, &mtr);
02187   }
02188 
02189   btr_pcur_close(&pcur);
02190   mtr_commit(&mtr);
02191   row_mysql_unlock_data_dictionary(trx);
02192   trx_free_for_background(trx);
02193 }
02194 
02195 /*********************************************************************/
02197 static
02198 void
02199 row_merge_file_create(
02200 /*==================*/
02201   merge_file_t* merge_file) 
02202 {
02203 #ifdef UNIV_PFS_IO
02204   /* This temp file open does not go through normal
02205   file APIs, add instrumentation to register with
02206   performance schema */
02207   struct PSI_file_locker* locker = NULL;
02208   PSI_file_locker_state state;
02209   register_pfs_file_open_begin(&state, locker, innodb_file_temp_key,
02210              PSI_FILE_OPEN,
02211              "Innodb Merge Temp File",
02212              __FILE__, __LINE__);
02213 #endif
02214   merge_file->fd = innobase_mysql_tmpfile();
02215   merge_file->offset = 0;
02216   merge_file->n_rec = 0;
02217 #ifdef UNIV_PFS_IO
02218         register_pfs_file_open_end(locker, merge_file->fd);
02219 #endif
02220 }
02221 
02222 /*********************************************************************/
02224 static
02225 void
02226 row_merge_file_destroy(
02227 /*===================*/
02228   merge_file_t* merge_file) 
02229 {
02230 #ifdef UNIV_PFS_IO
02231   struct PSI_file_locker* locker = NULL;
02232   PSI_file_locker_state state;
02233   register_pfs_file_io_begin(&state, locker, merge_file->fd, 0, PSI_FILE_CLOSE,
02234            __FILE__, __LINE__);
02235 #endif
02236   if (merge_file->fd != -1) {
02237     close(merge_file->fd);
02238     merge_file->fd = -1;
02239   }
02240 
02241 #ifdef UNIV_PFS_IO
02242   register_pfs_file_io_end(locker, 0);
02243 #endif
02244 }
02245 
02246 /*********************************************************************/
02250 UNIV_INLINE
02251 ulint
02252 row_merge_col_prtype(
02253 /*=================*/
02254   const dict_col_t* col,    
02255   const char*   col_name, 
02256   const merge_index_def_t*index_def)  
02258 {
02259   ulint prtype = col->prtype;
02260   ulint i;
02261 
02262   ut_ad(index_def->ind_type & DICT_CLUSTERED);
02263 
02264   if (prtype & DATA_NOT_NULL) {
02265 
02266     return(prtype);
02267   }
02268 
02269   /* All columns that are included
02270   in the PRIMARY KEY must be NOT NULL. */
02271 
02272   for (i = 0; i < index_def->n_fields; i++) {
02273     if (!strcmp(col_name, index_def->fields[i].field_name)) {
02274       return(prtype | DATA_NOT_NULL);
02275     }
02276   }
02277 
02278   return(prtype);
02279 }
02280 
02281 /*********************************************************************/
02285 UNIV_INTERN
02286 dict_table_t*
02287 row_merge_create_temporary_table(
02288 /*=============================*/
02289   const char*   table_name, 
02290   const merge_index_def_t*index_def,  
02292   const dict_table_t* table,    
02293   trx_t*      trx)    
02295 {
02296   ulint   i;
02297   dict_table_t* new_table = NULL;
02298   ulint   n_cols = dict_table_get_n_user_cols(table);
02299   ulint   error;
02300   mem_heap_t* heap = mem_heap_create(1000);
02301 
02302   ut_ad(table_name);
02303   ut_ad(index_def);
02304   ut_ad(table);
02305   ut_ad(mutex_own(&dict_sys->mutex));
02306 
02307   new_table = dict_mem_table_create(table_name, 0, n_cols, table->flags);
02308 
02309   for (i = 0; i < n_cols; i++) {
02310     const dict_col_t* col;
02311     const char*   col_name;
02312 
02313     col = dict_table_get_nth_col(table, i);
02314     col_name = dict_table_get_col_name(table, i);
02315 
02316     dict_mem_table_add_col(new_table, heap, col_name, col->mtype,
02317                row_merge_col_prtype(col, col_name,
02318                   index_def),
02319                col->len);
02320   }
02321 
02322   error = row_create_table_for_mysql(new_table, trx);
02323   mem_heap_free(heap);
02324 
02325   if (error != DB_SUCCESS) {
02326     trx->error_state = error;
02327     new_table = NULL;
02328   }
02329 
02330   return(new_table);
02331 }
02332 
02333 /*********************************************************************/
02338 UNIV_INTERN
02339 ulint
02340 row_merge_rename_indexes(
02341 /*=====================*/
02342   trx_t*    trx,    
02343   dict_table_t* table)    
02344 {
02345   ulint   err = DB_SUCCESS;
02346   pars_info_t*  info = pars_info_create();
02347 
02348   /* We use the private SQL parser of Innobase to generate the
02349   query graphs needed in renaming indexes. */
02350 
02351   static const char rename_indexes[] =
02352     "PROCEDURE RENAME_INDEXES_PROC () IS\n"
02353     "BEGIN\n"
02354     "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
02355     "WHERE TABLE_ID = :tableid AND SUBSTR(NAME,0,1)='"
02356     TEMP_INDEX_PREFIX_STR "';\n"
02357     "END;\n";
02358 
02359   ut_ad(table);
02360   ut_ad(trx);
02361   ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
02362 
02363   trx->op_info = "renaming indexes";
02364 
02365   pars_info_add_ull_literal(info, "tableid", table->id);
02366 
02367   err = que_eval_sql(info, rename_indexes, FALSE, trx);
02368 
02369   if (err == DB_SUCCESS) {
02370     dict_index_t* index = dict_table_get_first_index(table);
02371     do {
02372       if (*index->name == TEMP_INDEX_PREFIX) {
02373         index->name++;
02374       }
02375       index = dict_table_get_next_index(index);
02376     } while (index);
02377   }
02378 
02379   trx->op_info = "";
02380 
02381   return(err);
02382 }
02383 
02384 /*********************************************************************/
02389 UNIV_INTERN
02390 ulint
02391 row_merge_rename_tables(
02392 /*====================*/
02393   dict_table_t* old_table,  
02395   dict_table_t* new_table,  
02397   const char* tmp_name, 
02398   trx_t*    trx)    
02399 {
02400   ulint   err = DB_ERROR;
02401   pars_info_t*  info;
02402   char    old_name[MAX_TABLE_NAME_LEN + 1];
02403 
02404   ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
02405   ut_ad(old_table != new_table);
02406   ut_ad(mutex_own(&dict_sys->mutex));
02407 
02408   ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
02409 
02410   /* store the old/current name to an automatic variable */
02411   if (strlen(old_table->name) + 1 <= sizeof(old_name)) {
02412     memcpy(old_name, old_table->name, strlen(old_table->name) + 1);
02413   } else {
02414     ut_print_timestamp(stderr);
02415     fprintf(stderr, "InnoDB: too long table name: '%s', "
02416       "max length is %d\n", old_table->name,
02417       MAX_TABLE_NAME_LEN);
02418     ut_error;
02419   }
02420 
02421   /* store the old/current name to an automatic variable */
02422   if (strlen(old_table->name) + 1 <= sizeof(old_name)) {
02423     memcpy(old_name, old_table->name, strlen(old_table->name) + 1);
02424   } else {
02425     ut_print_timestamp(stderr);
02426     fprintf(stderr, "InnoDB: too long table name: '%s', "
02427       "max length is %d\n", old_table->name,
02428       MAX_TABLE_NAME_LEN);
02429     ut_error;
02430   }
02431 
02432   trx->op_info = "renaming tables";
02433 
02434   /* We use the private SQL parser of Innobase to generate the query
02435   graphs needed in updating the dictionary data in system tables. */
02436 
02437   info = pars_info_create();
02438 
02439   pars_info_add_str_literal(info, "new_name", new_table->name);
02440   pars_info_add_str_literal(info, "old_name", old_name);
02441   pars_info_add_str_literal(info, "tmp_name", tmp_name);
02442 
02443   err = que_eval_sql(info,
02444          "PROCEDURE RENAME_TABLES () IS\n"
02445          "BEGIN\n"
02446          "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
02447          " WHERE NAME = :old_name;\n"
02448          "UPDATE SYS_TABLES SET NAME = :old_name\n"
02449          " WHERE NAME = :new_name;\n"
02450          "END;\n", FALSE, trx);
02451 
02452   if (err != DB_SUCCESS) {
02453 
02454     goto err_exit;
02455   }
02456 
02457   /* The following calls will also rename the .ibd data files if
02458   the tables are stored in a single-table tablespace */
02459 
02460   if (!dict_table_rename_in_cache(old_table, tmp_name, FALSE)
02461       || !dict_table_rename_in_cache(new_table, old_name, FALSE)) {
02462 
02463     err = DB_ERROR;
02464     goto err_exit;
02465   }
02466 
02467   err = dict_load_foreigns(old_name, FALSE, TRUE);
02468 
02469   if (err != DB_SUCCESS) {
02470 err_exit:
02471     trx->error_state = DB_SUCCESS;
02472     trx_general_rollback_for_mysql(trx, NULL);
02473     trx->error_state = DB_SUCCESS;
02474   }
02475 
02476   trx->op_info = "";
02477 
02478   return(err);
02479 }
02480 
02481 /*********************************************************************/
02484 static
02485 ulint
02486 row_merge_create_index_graph(
02487 /*=========================*/
02488   trx_t*    trx,    
02489   dict_table_t* table,    
02490   dict_index_t* index)    
02491 {
02492   ind_node_t* node;   
02493   mem_heap_t* heap;   
02494   que_thr_t*  thr;    
02495   ulint   err;
02496 
02497   ut_ad(trx);
02498   ut_ad(table);
02499   ut_ad(index);
02500 
02501   heap = mem_heap_create(512);
02502 
02503   index->table = table;
02504   node = ind_create_graph_create(index, heap);
02505   thr = pars_complete_graph_for_exec(node, trx, heap);
02506 
02507   ut_a(thr == que_fork_start_command(static_cast<que_fork_t *>(que_node_get_parent(thr))));
02508 
02509   que_run_threads(thr);
02510 
02511   err = trx->error_state;
02512 
02513   que_graph_free((que_t*) que_node_get_parent(thr));
02514 
02515   return(err);
02516 }
02517 
02518 /*********************************************************************/
02521 UNIV_INTERN
02522 dict_index_t*
02523 row_merge_create_index(
02524 /*===================*/
02525   trx_t*      trx,  
02526   dict_table_t*   table,  
02527   const merge_index_def_t*index_def)
02529 {
02530   dict_index_t* index;
02531   ulint   err;
02532   ulint   n_fields = index_def->n_fields;
02533   ulint   i;
02534 
02535   /* Create the index prototype, using the passed in def, this is not
02536   a persistent operation. We pass 0 as the space id, and determine at
02537   a lower level the space id where to store the table. */
02538 
02539   index = dict_mem_index_create(table->name, index_def->name,
02540               0, index_def->ind_type, n_fields);
02541 
02542   ut_a(index);
02543 
02544   for (i = 0; i < n_fields; i++) {
02545     merge_index_field_t*  ifield = &index_def->fields[i];
02546 
02547     dict_mem_index_add_field(index, ifield->field_name,
02548            ifield->prefix_len);
02549   }
02550 
02551   /* Add the index to SYS_INDEXES, using the index prototype. */
02552   err = row_merge_create_index_graph(trx, table, index);
02553 
02554   if (err == DB_SUCCESS) {
02555 
02556     index = row_merge_dict_table_get_index(
02557       table, index_def);
02558 
02559     ut_a(index);
02560 
02561     /* Note the id of the transaction that created this
02562     index, we use it to restrict readers from accessing
02563     this index, to ensure read consistency. */
02564     index->trx_id = trx->id;
02565   } else {
02566     index = NULL;
02567   }
02568 
02569   return(index);
02570 }
02571 
02572 /*********************************************************************/
02574 UNIV_INTERN
02575 ibool
02576 row_merge_is_index_usable(
02577 /*======================*/
02578   const trx_t*    trx,  
02579   const dict_index_t* index)  
02580 {
02581   return(!trx->read_view
02582          || read_view_sees_trx_id(trx->read_view, index->trx_id));
02583 }
02584 
02585 /*********************************************************************/
02588 UNIV_INTERN
02589 ulint
02590 row_merge_drop_table(
02591 /*=================*/
02592   trx_t*    trx,    
02593   dict_table_t* table)    
02594 {
02595   /* There must be no open transactions on the table. */
02596   ut_a(table->n_mysql_handles_opened == 0);
02597 
02598   return(row_drop_table_for_mysql(table->name, trx, FALSE));
02599 }
02600 
02601 /*********************************************************************/
02606 UNIV_INTERN
02607 ulint
02608 row_merge_build_indexes(
02609 /*====================*/
02610   trx_t*    trx,    
02611   dict_table_t* old_table,  
02613   dict_table_t* new_table,  
02616   dict_index_t**  indexes,  
02617   ulint   n_indexes,  
02618   TABLE*  table)    
02621 {
02622   merge_file_t*   merge_files;
02623   row_merge_block_t*  block;
02624   ulint     block_size;
02625   ulint     i;
02626   ulint     error;
02627   int     tmpfd;
02628 
02629   ut_ad(trx);
02630   ut_ad(old_table);
02631   ut_ad(new_table);
02632   ut_ad(indexes);
02633   ut_ad(n_indexes);
02634 
02635   trx_start_if_not_started(trx);
02636 
02637   /* Allocate memory for merge file data structure and initialize
02638   fields */
02639 
02640   merge_files = static_cast<merge_file_t *>(mem_alloc(n_indexes * sizeof *merge_files));
02641   block_size = 3 * sizeof *block;
02642   block = static_cast<row_merge_block_t *>(os_mem_alloc_large(&block_size));
02643 
02644   for (i = 0; i < n_indexes; i++) {
02645 
02646     row_merge_file_create(&merge_files[i]);
02647   }
02648 
02649   tmpfd = innobase_mysql_tmpfile();
02650 
02651   /* Reset the MySQL row buffer that is used when reporting
02652   duplicate keys. */
02653   innobase_rec_reset(table);
02654 
02655   /* Read clustered index of the table and create files for
02656   secondary index entries for merge sort */
02657 
02658   error = row_merge_read_clustered_index(
02659     trx, table, old_table, new_table, indexes,
02660     merge_files, n_indexes, block);
02661 
02662   if (error != DB_SUCCESS) {
02663 
02664     goto func_exit;
02665   }
02666 
02667   /* Now we have files containing index entries ready for
02668   sorting and inserting. */
02669 
02670   for (i = 0; i < n_indexes; i++) {
02671     error = row_merge_sort(trx, indexes[i], &merge_files[i],
02672                block, &tmpfd, table);
02673 
02674     if (error == DB_SUCCESS) {
02675       error = row_merge_insert_index_tuples(
02676         trx, indexes[i], new_table,
02677         dict_table_zip_size(old_table),
02678         merge_files[i].fd, block);
02679     }
02680 
02681     /* Close the temporary file to free up space. */
02682     row_merge_file_destroy(&merge_files[i]);
02683 
02684     if (error != DB_SUCCESS) {
02685       trx->error_key_num = i;
02686       goto func_exit;
02687     }
02688   }
02689 
02690 func_exit:
02691   close(tmpfd);
02692 
02693   for (i = 0; i < n_indexes; i++) {
02694     row_merge_file_destroy(&merge_files[i]);
02695   }
02696 
02697   mem_free(merge_files);
02698   os_mem_free_large(block, block_size);
02699 
02700   return(error);
02701 }