00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00027 #include "row0merge.h"
00028 #include "row0ext.h"
00029 #include "row0row.h"
00030 #include "row0upd.h"
00031 #include "row0ins.h"
00032 #include "row0sel.h"
00033 #include "dict0dict.h"
00034 #include "dict0mem.h"
00035 #include "dict0boot.h"
00036 #include "dict0crea.h"
00037 #include "dict0load.h"
00038 #include "btr0btr.h"
00039 #include "mach0data.h"
00040 #include "trx0rseg.h"
00041 #include "trx0trx.h"
00042 #include "trx0roll.h"
00043 #include "trx0undo.h"
00044 #include "trx0purge.h"
00045 #include "trx0rec.h"
00046 #include "que0que.h"
00047 #include "rem0cmp.h"
00048 #include "read0read.h"
00049 #include "os0file.h"
00050 #include "lock0lock.h"
00051 #include "data0data.h"
00052 #include "data0type.h"
00053 #include "que0que.h"
00054 #include "pars0pars.h"
00055 #include "mem0mem.h"
00056 #include "log0log.h"
00057 #include "ut0sort.h"
00058 #include "handler0alter.h"
00059 #include <unistd.h>
00060
00061
00062 #if defined __WIN__
00063 # define posix_fadvise(fd, offset, len, advice)
00064 #endif
00065
00066 #ifdef UNIV_DEBUG
00067
00068
00070 static ibool row_merge_print_cmp;
00072 static ibool row_merge_print_read;
00074 static ibool row_merge_print_write;
00077 static ibool row_merge_print_block;
00079 static ibool row_merge_print_block_read;
00081 static ibool row_merge_print_block_write;
00082
00083 #endif
00084
00093 typedef byte row_merge_block_t[1048576];
00094
00101 typedef byte mrec_buf_t[UNIV_PAGE_SIZE];
00102
00107 typedef byte mrec_t;
00108
00110 struct row_merge_buf_struct {
00111 mem_heap_t* heap;
00112 dict_index_t* index;
00113 ulint total_size;
00114 ulint n_tuples;
00115 ulint max_tuples;
00116 const dfield_t**tuples;
00119 const dfield_t**tmp_tuples;
00121 };
00122
00124 typedef struct row_merge_buf_struct row_merge_buf_t;
00125
00127 struct merge_file_struct {
00128 int fd;
00129 ulint offset;
00130 ib_uint64_t n_rec;
00131 };
00132
00134 typedef struct merge_file_struct merge_file_t;
00135
00136 #ifdef UNIV_DEBUG
00137
00139 static
00140 void
00141 row_merge_tuple_print(
00142
00143 FILE* f,
00144 const dfield_t* entry,
00145 ulint n_fields)
00146 {
00147 ulint j;
00148
00149 for (j = 0; j < n_fields; j++) {
00150 const dfield_t* field = &entry[j];
00151
00152 if (dfield_is_null(field)) {
00153 fputs("\n NULL;", f);
00154 } else {
00155 ulint field_len = dfield_get_len(field);
00156 ulint len = ut_min(field_len, 20);
00157 if (dfield_is_ext(field)) {
00158 fputs("\nE", f);
00159 } else {
00160 fputs("\n ", f);
00161 }
00162 ut_print_buf(f, dfield_get_data(field), len);
00163 if (len != field_len) {
00164 fprintf(f, " (total %lu bytes)", field_len);
00165 }
00166 }
00167 }
00168 putc('\n', f);
00169 }
00170 #endif
00171
00172
00175 static
00176 row_merge_buf_t*
00177 row_merge_buf_create_low(
00178
00179 mem_heap_t* heap,
00180 dict_index_t* index,
00181 ulint max_tuples,
00182 ulint buf_size)
00183 {
00184 row_merge_buf_t* buf;
00185
00186 ut_ad(max_tuples > 0);
00187 ut_ad(max_tuples <= sizeof(row_merge_block_t));
00188 ut_ad(max_tuples < buf_size);
00189
00190 buf = static_cast<row_merge_buf_t *>(mem_heap_zalloc(heap, buf_size));
00191 buf->heap = heap;
00192 buf->index = index;
00193 buf->max_tuples = max_tuples;
00194 buf->tuples = static_cast<const dfield_t **>(mem_heap_alloc(heap,
00195 2 * max_tuples * sizeof *buf->tuples));
00196 buf->tmp_tuples = buf->tuples + max_tuples;
00197
00198 return(buf);
00199 }
00200
00201
00204 static
00205 row_merge_buf_t*
00206 row_merge_buf_create(
00207
00208 dict_index_t* index)
00209 {
00210 row_merge_buf_t* buf;
00211 ulint max_tuples;
00212 ulint buf_size;
00213 mem_heap_t* heap;
00214
00215 max_tuples = sizeof(row_merge_block_t)
00216 / ut_max(1, dict_index_get_min_size(index));
00217
00218 buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
00219
00220 heap = mem_heap_create(buf_size + sizeof(row_merge_block_t));
00221
00222 buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
00223
00224 return(buf);
00225 }
00226
00227
00230 static
00231 row_merge_buf_t*
00232 row_merge_buf_empty(
00233
00234 row_merge_buf_t* buf)
00235 {
00236 ulint buf_size;
00237 ulint max_tuples = buf->max_tuples;
00238 mem_heap_t* heap = buf->heap;
00239 dict_index_t* index = buf->index;
00240
00241 buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
00242
00243 mem_heap_empty(heap);
00244
00245 return(row_merge_buf_create_low(heap, index, max_tuples, buf_size));
00246 }
00247
00248
00250 static
00251 void
00252 row_merge_buf_free(
00253
00254 row_merge_buf_t* buf)
00255 {
00256 mem_heap_free(buf->heap);
00257 }
00258
00259
00262 static
00263 ibool
00264 row_merge_buf_add(
00265
00266 row_merge_buf_t* buf,
00267 const dtuple_t* row,
00268 const row_ext_t* ext)
00270 {
00271 ulint i;
00272 ulint n_fields;
00273 ulint data_size;
00274 ulint extra_size;
00275 const dict_index_t* index;
00276 dfield_t* entry;
00277 dfield_t* field;
00278 const dict_field_t* ifield;
00279
00280 if (buf->n_tuples >= buf->max_tuples) {
00281 return(FALSE);
00282 }
00283
00284 UNIV_PREFETCH_R(row->fields);
00285
00286 index = buf->index;
00287
00288 n_fields = dict_index_get_n_fields(index);
00289
00290 entry = static_cast<dfield_t *>(mem_heap_alloc(buf->heap, n_fields * sizeof *entry));
00291 buf->tuples[buf->n_tuples] = entry;
00292 field = entry;
00293
00294 data_size = 0;
00295 extra_size = UT_BITS_IN_BYTES(index->n_nullable);
00296
00297 ifield = dict_index_get_nth_field(index, 0);
00298
00299 for (i = 0; i < n_fields; i++, field++, ifield++) {
00300 const dict_col_t* col;
00301 ulint col_no;
00302 const dfield_t* row_field;
00303 ulint len;
00304
00305 col = ifield->col;
00306 col_no = dict_col_get_no(col);
00307 row_field = dtuple_get_nth_field(row, col_no);
00308 dfield_copy(field, row_field);
00309 len = dfield_get_len(field);
00310
00311 if (dfield_is_null(field)) {
00312 ut_ad(!(col->prtype & DATA_NOT_NULL));
00313 continue;
00314 } else if (UNIV_LIKELY(!ext)) {
00315 } else if (dict_index_is_clust(index)) {
00316
00317 const byte* row_buf = row_ext_lookup(ext, col_no,
00318 &len);
00319 if (UNIV_LIKELY_NULL(row_buf)) {
00320 ut_a(row_buf != field_ref_zero);
00321 if (i < dict_index_get_n_unique(index)) {
00322 dfield_set_data(field, row_buf, len);
00323 } else {
00324 dfield_set_ext(field);
00325 len = dfield_get_len(field);
00326 }
00327 }
00328 } else {
00329 const byte* row_buf = row_ext_lookup(ext, col_no,
00330 &len);
00331 if (UNIV_LIKELY_NULL(row_buf)) {
00332 ut_a(row_buf != field_ref_zero);
00333 dfield_set_data(field, row_buf, len);
00334 }
00335 }
00336
00337
00338
00339 if (ifield->prefix_len) {
00340 len = dtype_get_at_most_n_mbchars(
00341 col->prtype,
00342 col->mbminmaxlen,
00343 ifield->prefix_len,
00344 len, static_cast<const char *>(dfield_get_data(field)));
00345 dfield_set_len(field, len);
00346 }
00347
00348 ut_ad(len <= col->len || col->mtype == DATA_BLOB);
00349
00350 if (ifield->fixed_len) {
00351 ut_ad(len == ifield->fixed_len);
00352 ut_ad(!dfield_is_ext(field));
00353 } else if (dfield_is_ext(field)) {
00354 extra_size += 2;
00355 } else if (len < 128
00356 || (col->len < 256 && col->mtype != DATA_BLOB)) {
00357 extra_size++;
00358 } else {
00359
00360
00361
00362
00363 extra_size += 2;
00364 }
00365 data_size += len;
00366 }
00367
00368 #ifdef UNIV_DEBUG
00369 {
00370 ulint size;
00371 ulint extra;
00372
00373 size = rec_get_converted_size_comp(index,
00374 REC_STATUS_ORDINARY,
00375 entry, n_fields, &extra);
00376
00377 ut_ad(data_size + extra_size + REC_N_NEW_EXTRA_BYTES == size);
00378 ut_ad(extra_size + REC_N_NEW_EXTRA_BYTES == extra);
00379 }
00380 #endif
00381
00382
00383
00384
00385
00386 data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
00387
00388
00389
00390
00391
00392
00393
00394 ut_ad(data_size < sizeof(row_merge_block_t));
00395
00396
00397 if (buf->total_size + data_size >= sizeof(row_merge_block_t) - 1) {
00398 return(FALSE);
00399 }
00400
00401 buf->total_size += data_size;
00402 buf->n_tuples++;
00403
00404 field = entry;
00405
00406
00407
00408 do {
00409 dfield_dup(field++, buf->heap);
00410 } while (--n_fields);
00411
00412 return(TRUE);
00413 }
00414
00416 struct row_merge_dup_struct {
00417 const dict_index_t* index;
00418 TABLE* table;
00419 ulint n_dup;
00420 };
00421
00423 typedef struct row_merge_dup_struct row_merge_dup_t;
00424
00425
00427 static
00428 void
00429 row_merge_dup_report(
00430
00431 row_merge_dup_t* dup,
00432 const dfield_t* entry)
00433 {
00434 mrec_buf_t* buf;
00435 const dtuple_t* tuple;
00436 dtuple_t tuple_store;
00437 const rec_t* rec;
00438 const dict_index_t* index = dup->index;
00439 ulint n_fields= dict_index_get_n_fields(index);
00440 mem_heap_t* heap;
00441 ulint* offsets;
00442 ulint n_ext;
00443
00444 if (dup->n_dup++) {
00445
00446
00447 return;
00448 }
00449
00450
00451 heap = mem_heap_create((1 + REC_OFFS_HEADER_SIZE + n_fields)
00452 * sizeof *offsets
00453 + sizeof *buf);
00454
00455 buf = static_cast<mrec_buf_t *>(mem_heap_alloc(heap, sizeof *buf));
00456
00457 tuple = dtuple_from_fields(&tuple_store, entry, n_fields);
00458 n_ext = dict_index_is_clust(index) ? dtuple_get_n_ext(tuple) : 0;
00459
00460 rec = rec_convert_dtuple_to_rec(*buf, index, tuple, n_ext);
00461 offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
00462
00463 innobase_rec_to_mysql(dup->table, rec, index, offsets);
00464
00465 mem_heap_free(heap);
00466 }
00467
00468
00471 static
00472 int
00473 row_merge_tuple_cmp(
00474
00475 ulint n_field,
00476 const dfield_t* a,
00477 const dfield_t* b,
00478 row_merge_dup_t* dup)
00479 {
00480 int cmp;
00481 const dfield_t* field = a;
00482
00483
00484
00485
00486 do {
00487 cmp = cmp_dfield_dfield(a++, b++);
00488 } while (!cmp && --n_field);
00489
00490 if (UNIV_UNLIKELY(!cmp) && UNIV_LIKELY_NULL(dup)) {
00491
00492
00493
00494
00495 for (b = field; b != a; b++) {
00496 if (dfield_is_null(b)) {
00497
00498 goto func_exit;
00499 }
00500 }
00501
00502 row_merge_dup_report(dup, field);
00503 }
00504
00505 func_exit:
00506 return(cmp);
00507 }
00508
00515 #define row_merge_tuple_sort_ctx(a,b,c,d) \
00516 row_merge_tuple_sort(n_field, dup, a, b, c, d)
00517
00522 #define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, a, b, dup)
00523
00524
00526 static
00527 void
00528 row_merge_tuple_sort(
00529
00530 ulint n_field,
00531 row_merge_dup_t* dup,
00532 const dfield_t** tuples,
00533 const dfield_t** aux,
00534 ulint low,
00536 ulint high)
00538 {
00539 UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
00540 tuples, aux, low, high, row_merge_tuple_cmp_ctx);
00541 }
00542
00543
00545 static
00546 void
00547 row_merge_buf_sort(
00548
00549 row_merge_buf_t* buf,
00550 row_merge_dup_t* dup)
00551 {
00552 row_merge_tuple_sort(dict_index_get_n_unique(buf->index), dup,
00553 buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
00554 }
00555
00556
00558 static
00559 void
00560 row_merge_buf_write(
00561
00562 const row_merge_buf_t* buf,
00563 #ifdef UNIV_DEBUG
00564 const merge_file_t* of,
00565 #endif
00566 row_merge_block_t* block)
00567 #ifndef UNIV_DEBUG
00568 # define row_merge_buf_write(buf, of, block) row_merge_buf_write(buf, block)
00569 #endif
00570 {
00571 const dict_index_t* index = buf->index;
00572 ulint n_fields= dict_index_get_n_fields(index);
00573 byte* b = &(*block)[0];
00574
00575 ulint i;
00576
00577 for (i = 0; i < buf->n_tuples; i++) {
00578 ulint size;
00579 ulint extra_size;
00580 const dfield_t* entry = buf->tuples[i];
00581
00582 size = rec_get_converted_size_comp(index,
00583 REC_STATUS_ORDINARY,
00584 entry, n_fields,
00585 &extra_size);
00586 ut_ad(size > extra_size);
00587 ut_ad(extra_size >= REC_N_NEW_EXTRA_BYTES);
00588 extra_size -= REC_N_NEW_EXTRA_BYTES;
00589 size -= REC_N_NEW_EXTRA_BYTES;
00590
00591
00592 if (extra_size + 1 < 0x80) {
00593 *b++ = (byte) (extra_size + 1);
00594 } else {
00595 ut_ad((extra_size + 1) < 0x8000);
00596 *b++ = (byte) (0x80 | ((extra_size + 1) >> 8));
00597 *b++ = (byte) (extra_size + 1);
00598 }
00599
00600 ut_ad(b + size < block[1]);
00601
00602 rec_convert_dtuple_to_rec_comp(b + extra_size, 0, index,
00603 REC_STATUS_ORDINARY,
00604 entry, n_fields);
00605
00606 b += size;
00607
00608 #ifdef UNIV_DEBUG
00609 if (row_merge_print_write) {
00610 fprintf(stderr, "row_merge_buf_write %p,%d,%lu %lu",
00611 (void*) b, of->fd, (ulong) of->offset,
00612 (ulong) i);
00613 row_merge_tuple_print(stderr, entry, n_fields);
00614 }
00615 #endif
00616 }
00617
00618
00619 ut_a(b < block[1]);
00620 ut_a(b == block[0] + buf->total_size);
00621 *b++ = 0;
00622 #ifdef UNIV_DEBUG_VALGRIND
00623
00624
00625 memset(b, 0xff, block[1] - b);
00626 #endif
00627 #ifdef UNIV_DEBUG
00628 if (row_merge_print_write) {
00629 fprintf(stderr, "row_merge_buf_write %p,%d,%lu EOF\n",
00630 (void*) b, of->fd, (ulong) of->offset);
00631 }
00632 #endif
00633 }
00634
00635
00639 static
00640 mem_heap_t*
00641 row_merge_heap_create(
00642
00643 const dict_index_t* index,
00644 mrec_buf_t** buf,
00645 ulint** offsets1,
00646 ulint** offsets2)
00647 {
00648 ulint i = 1 + REC_OFFS_HEADER_SIZE
00649 + dict_index_get_n_fields(index);
00650 mem_heap_t* heap = mem_heap_create(2 * i * sizeof **offsets1
00651 + 3 * sizeof **buf);
00652
00653 *buf = static_cast<mrec_buf_t*>(mem_heap_alloc(heap, 3 * sizeof **buf));
00654 *offsets1 = static_cast<ulint*>(mem_heap_alloc(heap, i * sizeof **offsets1));
00655 *offsets2 = static_cast<ulint*>(mem_heap_alloc(heap, i * sizeof **offsets2));
00656
00657 (*offsets1)[0] = (*offsets2)[0] = i;
00658 (*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
00659
00660 return(heap);
00661 }
00662
00663
00667 static
00668 dict_index_t*
00669 row_merge_dict_table_get_index(
00670
00671 dict_table_t* table,
00672 const merge_index_def_t*index_def)
00673 {
00674 ulint i;
00675 dict_index_t* index;
00676 const char** column_names;
00677
00678 column_names = static_cast<const char **>(mem_alloc(index_def->n_fields * sizeof *column_names));
00679
00680 for (i = 0; i < index_def->n_fields; ++i) {
00681 column_names[i] = index_def->fields[i].field_name;
00682 }
00683
00684 index = dict_table_get_index_by_max_id(
00685 table, index_def->name, column_names, index_def->n_fields);
00686
00687 mem_free((void*) column_names);
00688
00689 return(index);
00690 }
00691
00692
00695 static
00696 ibool
00697 row_merge_read(
00698
00699 int fd,
00700 ulint offset,
00703 row_merge_block_t* buf)
00704 {
00705 ib_uint64_t ofs = ((ib_uint64_t) offset) * sizeof *buf;
00706 ibool success;
00707
00708 #ifdef UNIV_DEBUG
00709 if (row_merge_print_block_read) {
00710 fprintf(stderr, "row_merge_read fd=%d ofs=%lu\n",
00711 fd, (ulong) offset);
00712 }
00713 #endif
00714
00715 success = os_file_read_no_error_handling(OS_FILE_FROM_FD(fd), buf,
00716 (ulint) (ofs & 0xFFFFFFFF),
00717 (ulint) (ofs >> 32),
00718 sizeof *buf);
00719 #ifdef POSIX_FADV_DONTNEED
00720
00721 posix_fadvise(fd, ofs, sizeof *buf, POSIX_FADV_DONTNEED);
00722 #endif
00723
00724 if (UNIV_UNLIKELY(!success)) {
00725 ut_print_timestamp(stderr);
00726 fprintf(stderr,
00727 " InnoDB: failed to read merge block at %"PRIu64"\n", ofs);
00728 }
00729
00730 return(UNIV_LIKELY(success));
00731 }
00732
00733
00736 static
00737 ibool
00738 row_merge_write(
00739
00740 int fd,
00741 ulint offset,
00743 const void* buf)
00744 {
00745 size_t buf_len = sizeof(row_merge_block_t);
00746 ib_uint64_t ofs = buf_len * (ib_uint64_t) offset;
00747 ibool ret;
00748
00749 ret = os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf,
00750 (ulint) (ofs & 0xFFFFFFFF),
00751 (ulint) (ofs >> 32),
00752 buf_len);
00753
00754 #ifdef UNIV_DEBUG
00755 if (row_merge_print_block_write) {
00756 fprintf(stderr, "row_merge_write fd=%d ofs=%lu\n",
00757 fd, (ulong) offset);
00758 }
00759 #endif
00760
00761 #ifdef POSIX_FADV_DONTNEED
00762
00763
00764 posix_fadvise(fd, ofs, buf_len, POSIX_FADV_DONTNEED);
00765 #endif
00766
00767 return(UNIV_LIKELY(ret));
00768 }
00769
00770
00773 static __attribute__((nonnull))
00774 const byte*
00775 row_merge_read_rec(
00776
00777 row_merge_block_t* block,
00778 mrec_buf_t* buf,
00779 const byte* b,
00780 const dict_index_t* index,
00781 int fd,
00782 ulint* foffs,
00783 const mrec_t** mrec,
00786 ulint* offsets)
00787 {
00788 ulint extra_size;
00789 ulint data_size;
00790 ulint avail_size;
00791
00792 ut_ad(block);
00793 ut_ad(buf);
00794 ut_ad(b >= block[0]);
00795 ut_ad(b < block[1]);
00796 ut_ad(index);
00797 ut_ad(foffs);
00798 ut_ad(mrec);
00799 ut_ad(offsets);
00800
00801 ut_ad(*offsets == 1 + REC_OFFS_HEADER_SIZE
00802 + dict_index_get_n_fields(index));
00803
00804 extra_size = *b++;
00805
00806 if (UNIV_UNLIKELY(!extra_size)) {
00807
00808 *mrec = NULL;
00809 #ifdef UNIV_DEBUG
00810 if (row_merge_print_read) {
00811 fprintf(stderr, "row_merge_read %p,%p,%d,%lu EOF\n",
00812 (const void*) b, (const void*) block,
00813 fd, (ulong) *foffs);
00814 }
00815 #endif
00816 return(NULL);
00817 }
00818
00819 if (extra_size >= 0x80) {
00820
00821
00822 if (UNIV_UNLIKELY(b >= block[1])) {
00823 if (!row_merge_read(fd, ++(*foffs), block)) {
00824 err_exit:
00825
00826 *mrec = b;
00827 return(NULL);
00828 }
00829
00830
00831 b = block[0];
00832 }
00833
00834 extra_size = (extra_size & 0x7f) << 8;
00835 extra_size |= *b++;
00836 }
00837
00838
00839 extra_size--;
00840
00841
00842
00843 if (UNIV_UNLIKELY(b + extra_size >= block[1])) {
00844
00845
00846
00847
00848 avail_size = block[1] - b;
00849
00850 memcpy(*buf, b, avail_size);
00851
00852 if (!row_merge_read(fd, ++(*foffs), block)) {
00853
00854 goto err_exit;
00855 }
00856
00857
00858 b = block[0];
00859
00860
00861 memcpy(*buf + avail_size, b, extra_size - avail_size);
00862 b += extra_size - avail_size;
00863
00864 *mrec = *buf + extra_size;
00865
00866 rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
00867
00868 data_size = rec_offs_data_size(offsets);
00869
00870
00871
00872
00873 ut_a(extra_size + data_size < sizeof *buf);
00874 ut_a(b + data_size < block[1]);
00875
00876
00877 memcpy(*buf + extra_size, b, data_size);
00878 b += data_size;
00879
00880 goto func_exit;
00881 }
00882
00883 *mrec = b + extra_size;
00884
00885 rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
00886
00887 data_size = rec_offs_data_size(offsets);
00888 ut_ad(extra_size + data_size < sizeof *buf);
00889
00890 b += extra_size + data_size;
00891
00892 if (UNIV_LIKELY(b < block[1])) {
00893
00894
00895 goto func_exit;
00896 }
00897
00898
00899
00900 b -= extra_size + data_size;
00901 avail_size = block[1] - b;
00902 memcpy(*buf, b, avail_size);
00903 *mrec = *buf + extra_size;
00904 #ifdef UNIV_DEBUG
00905
00906
00907
00908
00909 offsets[2] = (ulint) *mrec;
00910 offsets[3] = (ulint) index;
00911 #endif
00912
00913 if (!row_merge_read(fd, ++(*foffs), block)) {
00914
00915 goto err_exit;
00916 }
00917
00918
00919 b = block[0];
00920
00921
00922 memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
00923 b += extra_size + data_size - avail_size;
00924
00925 func_exit:
00926 #ifdef UNIV_DEBUG
00927 if (row_merge_print_read) {
00928 fprintf(stderr, "row_merge_read %p,%p,%d,%lu ",
00929 (const void*) b, (const void*) block,
00930 fd, (ulong) *foffs);
00931 rec_print_comp(stderr, *mrec, offsets);
00932 putc('\n', stderr);
00933 }
00934 #endif
00935
00936 return(b);
00937 }
00938
00939
00941 static
00942 void
00943 row_merge_write_rec_low(
00944
00945 byte* b,
00946 ulint e,
00947 #ifdef UNIV_DEBUG
00948 ulint size,
00949 int fd,
00950 ulint foffs,
00951 #endif
00952 const mrec_t* mrec,
00953 const ulint* offsets)
00954 #ifndef UNIV_DEBUG
00955 # define row_merge_write_rec_low(b, e, size, fd, foffs, mrec, offsets) \
00956 row_merge_write_rec_low(b, e, mrec, offsets)
00957 #endif
00958 {
00959 #ifdef UNIV_DEBUG
00960 const byte* const end = b + size;
00961 ut_ad(e == rec_offs_extra_size(offsets) + 1);
00962
00963 if (row_merge_print_write) {
00964 fprintf(stderr, "row_merge_write %p,%d,%lu ",
00965 (void*) b, fd, (ulong) foffs);
00966 rec_print_comp(stderr, mrec, offsets);
00967 putc('\n', stderr);
00968 }
00969 #endif
00970
00971 if (e < 0x80) {
00972 *b++ = (byte) e;
00973 } else {
00974 *b++ = (byte) (0x80 | (e >> 8));
00975 *b++ = (byte) e;
00976 }
00977
00978 memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
00979 ut_ad(b + rec_offs_size(offsets) == end);
00980 }
00981
00982
00985 static
00986 byte*
00987 row_merge_write_rec(
00988
00989 row_merge_block_t* block,
00990 mrec_buf_t* buf,
00991 byte* b,
00992 int fd,
00993 ulint* foffs,
00994 const mrec_t* mrec,
00995 const ulint* offsets)
00996 {
00997 ulint extra_size;
00998 ulint size;
00999 ulint avail_size;
01000
01001 ut_ad(block);
01002 ut_ad(buf);
01003 ut_ad(b >= block[0]);
01004 ut_ad(b < block[1]);
01005 ut_ad(mrec);
01006 ut_ad(foffs);
01007 ut_ad(mrec < block[0] || mrec > block[1]);
01008 ut_ad(mrec < buf[0] || mrec > buf[1]);
01009
01010
01011 extra_size = rec_offs_extra_size(offsets) + 1;
01012
01013 size = extra_size + (extra_size >= 0x80)
01014 + rec_offs_data_size(offsets);
01015
01016 if (UNIV_UNLIKELY(b + size >= block[1])) {
01017
01018
01019 avail_size = block[1] - b;
01020
01021 row_merge_write_rec_low(buf[0],
01022 extra_size, size, fd, *foffs,
01023 mrec, offsets);
01024
01025
01026
01027
01028 memcpy(b, buf[0], avail_size);
01029
01030 if (!row_merge_write(fd, (*foffs)++, block)) {
01031 return(NULL);
01032 }
01033
01034 UNIV_MEM_INVALID(block[0], sizeof block[0]);
01035
01036
01037 b = block[0];
01038 memcpy(b, buf[0] + avail_size, size - avail_size);
01039 b += size - avail_size;
01040 } else {
01041 row_merge_write_rec_low(b, extra_size, size, fd, *foffs,
01042 mrec, offsets);
01043 b += size;
01044 }
01045
01046 return(b);
01047 }
01048
01049
01052 static
01053 byte*
01054 row_merge_write_eof(
01055
01056 row_merge_block_t* block,
01057 byte* b,
01058 int fd,
01059 ulint* foffs)
01060 {
01061 ut_ad(block);
01062 ut_ad(b >= block[0]);
01063 ut_ad(b < block[1]);
01064 ut_ad(foffs);
01065 #ifdef UNIV_DEBUG
01066 if (row_merge_print_write) {
01067 fprintf(stderr, "row_merge_write %p,%p,%d,%lu EOF\n",
01068 (void*) b, (void*) block, fd, (ulong) *foffs);
01069 }
01070 #endif
01071
01072 *b++ = 0;
01073 UNIV_MEM_ASSERT_RW(block[0], b - block[0]);
01074 UNIV_MEM_ASSERT_W(block[0], sizeof block[0]);
01075 #ifdef UNIV_DEBUG_VALGRIND
01076
01077
01078 memset(b, 0xff, block[1] - b);
01079 #endif
01080
01081 if (!row_merge_write(fd, (*foffs)++, block)) {
01082 return(NULL);
01083 }
01084
01085 UNIV_MEM_INVALID(block[0], sizeof block[0]);
01086 return(block[0]);
01087 }
01088
01089
01092 static
01093 int
01094 row_merge_cmp(
01095
01096 const mrec_t* mrec1,
01098 const mrec_t* mrec2,
01100 const ulint* offsets1,
01101 const ulint* offsets2,
01102 const dict_index_t* index,
01103 ibool* null_eq)
01105 {
01106 int cmp;
01107
01108 cmp = cmp_rec_rec_simple(mrec1, mrec2, offsets1, offsets2, index,
01109 null_eq);
01110
01111 #ifdef UNIV_DEBUG
01112 if (row_merge_print_cmp) {
01113 fputs("row_merge_cmp1 ", stderr);
01114 rec_print_comp(stderr, mrec1, offsets1);
01115 fputs("\nrow_merge_cmp2 ", stderr);
01116 rec_print_comp(stderr, mrec2, offsets2);
01117 fprintf(stderr, "\nrow_merge_cmp=%d\n", cmp);
01118 }
01119 #endif
01120
01121 return(cmp);
01122 }
01123
01124
01128 static __attribute__((nonnull))
01129 ulint
01130 row_merge_read_clustered_index(
01131
01132 trx_t* trx,
01133 TABLE* table,
01135 const dict_table_t* old_table,
01137 const dict_table_t* new_table,
01140 dict_index_t** index,
01141 merge_file_t* files,
01142 ulint n_index,
01143 row_merge_block_t* block)
01144 {
01145 dict_index_t* clust_index;
01146 mem_heap_t* row_heap;
01147
01148 row_merge_buf_t** merge_buf;
01149 btr_pcur_t pcur;
01150
01151 mtr_t mtr;
01152 ulint err = DB_SUCCESS;
01153 ulint i;
01154 ulint n_nonnull = 0;
01155
01156 ulint* nonnull = NULL;
01157
01158 trx->op_info = "reading clustered index";
01159
01160 ut_ad(trx);
01161 ut_ad(old_table);
01162 ut_ad(new_table);
01163 ut_ad(index);
01164 ut_ad(files);
01165
01166
01167
01168 merge_buf = static_cast<row_merge_buf_t **>(mem_alloc(n_index * sizeof *merge_buf));
01169
01170 for (i = 0; i < n_index; i++) {
01171 merge_buf[i] = row_merge_buf_create(index[i]);
01172 }
01173
01174 mtr_start(&mtr);
01175
01176
01177
01178
01179 clust_index = dict_table_get_first_index(old_table);
01180
01181 btr_pcur_open_at_index_side(
01182 TRUE, clust_index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
01183
01184 if (UNIV_UNLIKELY(old_table != new_table)) {
01185 ulint n_cols = dict_table_get_n_cols(old_table);
01186
01187
01188
01189
01190
01191
01192
01193 ut_a(n_cols == dict_table_get_n_cols(new_table));
01194
01195 nonnull = static_cast<ulint*>(mem_alloc(n_cols * sizeof *nonnull));
01196
01197 for (i = 0; i < n_cols; i++) {
01198 if (dict_table_get_nth_col(old_table, i)->prtype
01199 & DATA_NOT_NULL) {
01200
01201 continue;
01202 }
01203
01204 if (dict_table_get_nth_col(new_table, i)->prtype
01205 & DATA_NOT_NULL) {
01206
01207 nonnull[n_nonnull++] = i;
01208 }
01209 }
01210
01211 if (!n_nonnull) {
01212 mem_free(nonnull);
01213 nonnull = NULL;
01214 }
01215 }
01216
01217 row_heap = mem_heap_create(sizeof(mrec_buf_t));
01218
01219
01220 for (;;) {
01221 const rec_t* rec;
01222 ulint* offsets;
01223 dtuple_t* row = NULL;
01224 row_ext_t* ext;
01225 ibool has_next = TRUE;
01226
01227 btr_pcur_move_to_next_on_page(&pcur);
01228
01229
01230
01231
01232 if (btr_pcur_is_after_last_on_page(&pcur)) {
01233 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
01234 err = DB_INTERRUPTED;
01235 trx->error_key_num = 0;
01236 goto func_exit;
01237 }
01238
01239 btr_pcur_store_position(&pcur, &mtr);
01240 mtr_commit(&mtr);
01241 mtr_start(&mtr);
01242 btr_pcur_restore_position(BTR_SEARCH_LEAF,
01243 &pcur, &mtr);
01244 has_next = btr_pcur_move_to_next_user_rec(&pcur, &mtr);
01245 }
01246
01247 if (UNIV_LIKELY(has_next)) {
01248 rec = btr_pcur_get_rec(&pcur);
01249 offsets = rec_get_offsets(rec, clust_index, NULL,
01250 ULINT_UNDEFINED, &row_heap);
01251
01252
01253 if (rec_get_deleted_flag(
01254 rec, dict_table_is_comp(old_table))) {
01255 continue;
01256 }
01257
01258 srv_n_rows_inserted++;
01259
01260
01261
01262 row = row_build(ROW_COPY_POINTERS, clust_index,
01263 rec, offsets,
01264 new_table, &ext, row_heap);
01265
01266 if (UNIV_LIKELY_NULL(nonnull)) {
01267 for (i = 0; i < n_nonnull; i++) {
01268 dfield_t* field
01269 = &row->fields[nonnull[i]];
01270 dtype_t* field_type
01271 = dfield_get_type(field);
01272
01273 ut_a(!(field_type->prtype
01274 & DATA_NOT_NULL));
01275
01276 if (dfield_is_null(field)) {
01277 err = DB_PRIMARY_KEY_IS_NULL;
01278 trx->error_key_num = 0;
01279 goto func_exit;
01280 }
01281
01282 field_type->prtype |= DATA_NOT_NULL;
01283 }
01284 }
01285 }
01286
01287
01288
01289
01290 for (i = 0; i < n_index; i++) {
01291 row_merge_buf_t* buf = merge_buf[i];
01292 merge_file_t* file = &files[i];
01293 const dict_index_t* buf_index = buf->index;
01294
01295 if (UNIV_LIKELY
01296 (row && row_merge_buf_add(buf, row, ext))) {
01297 file->n_rec++;
01298 continue;
01299 }
01300
01301
01302
01303 ut_ad(buf->n_tuples || !has_next);
01304
01305
01306
01307
01308 if (buf->n_tuples) {
01309 if (dict_index_is_unique(buf_index)) {
01310 row_merge_dup_t dup;
01311 dup.index = buf->index;
01312 dup.table = table;
01313 dup.n_dup = 0;
01314
01315 row_merge_buf_sort(buf, &dup);
01316
01317 if (dup.n_dup) {
01318 err = DB_DUPLICATE_KEY;
01319 trx->error_key_num = i;
01320 goto func_exit;
01321 }
01322 } else {
01323 row_merge_buf_sort(buf, NULL);
01324 }
01325 }
01326
01327 row_merge_buf_write(buf, file, block);
01328
01329 if (!row_merge_write(file->fd, file->offset++,
01330 block)) {
01331 err = DB_OUT_OF_FILE_SPACE;
01332 trx->error_key_num = i;
01333 goto func_exit;
01334 }
01335
01336 UNIV_MEM_INVALID(block[0], sizeof block[0]);
01337 merge_buf[i] = row_merge_buf_empty(buf);
01338
01339 if (UNIV_LIKELY(row != NULL)) {
01340
01341
01342
01343
01344 if (UNIV_UNLIKELY
01345 (!row_merge_buf_add(buf, row, ext))) {
01346
01347
01348 ut_error;
01349 }
01350
01351 file->n_rec++;
01352 }
01353 }
01354
01355 mem_heap_empty(row_heap);
01356
01357 if (UNIV_UNLIKELY(!has_next)) {
01358 goto func_exit;
01359 }
01360 }
01361
01362 func_exit:
01363 btr_pcur_close(&pcur);
01364 mtr_commit(&mtr);
01365 mem_heap_free(row_heap);
01366
01367 if (UNIV_LIKELY_NULL(nonnull)) {
01368 mem_free(nonnull);
01369 }
01370
01371 for (i = 0; i < n_index; i++) {
01372 row_merge_buf_free(merge_buf[i]);
01373 }
01374
01375 mem_free(merge_buf);
01376
01377 trx->op_info = "";
01378
01379 return(err);
01380 }
01381
01385 #define ROW_MERGE_WRITE_GET_NEXT(N, AT_END) \
01386 do { \
01387 b2 = row_merge_write_rec(&block[2], &buf[2], b2, \
01388 of->fd, &of->offset, \
01389 mrec##N, offsets##N); \
01390 if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) { \
01391 goto corrupt; \
01392 } \
01393 b##N = row_merge_read_rec(&block[N], &buf[N], \
01394 b##N, index, \
01395 file->fd, foffs##N, \
01396 &mrec##N, offsets##N); \
01397 if (UNIV_UNLIKELY(!b##N)) { \
01398 if (mrec##N) { \
01399 goto corrupt; \
01400 } \
01401 AT_END; \
01402 } \
01403 } while (0)
01404
01405
01408 static
01409 ulint
01410 row_merge_blocks(
01411
01412 const dict_index_t* index,
01413 const merge_file_t* file,
01415 row_merge_block_t* block,
01416 ulint* foffs0,
01418 ulint* foffs1,
01420 merge_file_t* of,
01421 TABLE* table)
01424 {
01425 mem_heap_t* heap;
01427 mrec_buf_t* buf;
01429 const byte* b0;
01430 const byte* b1;
01431 byte* b2;
01432 const mrec_t* mrec0;
01433 const mrec_t* mrec1;
01434 ulint* offsets0;
01435 ulint* offsets1;
01436
01437 #ifdef UNIV_DEBUG
01438 if (row_merge_print_block) {
01439 fprintf(stderr,
01440 "row_merge_blocks fd=%d ofs=%lu + fd=%d ofs=%lu"
01441 " = fd=%d ofs=%lu\n",
01442 file->fd, (ulong) *foffs0,
01443 file->fd, (ulong) *foffs1,
01444 of->fd, (ulong) of->offset);
01445 }
01446 #endif
01447
01448 heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
01449
01450 buf = static_cast<mrec_buf_t *>(mem_heap_alloc(heap, sizeof(mrec_buf_t) * 3));
01451
01452
01453
01454
01455 if (!row_merge_read(file->fd, *foffs0, &block[0])
01456 || !row_merge_read(file->fd, *foffs1, &block[1])) {
01457 corrupt:
01458 mem_heap_free(heap);
01459 return(DB_CORRUPTION);
01460 }
01461
01462 b0 = block[0];
01463 b1 = block[1];
01464 b2 = block[2];
01465
01466 b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
01467 foffs0, &mrec0, offsets0);
01468 b1 = row_merge_read_rec(&block[1], &buf[1], b1, index, file->fd,
01469 foffs1, &mrec1, offsets1);
01470 if (UNIV_UNLIKELY(!b0 && mrec0)
01471 || UNIV_UNLIKELY(!b1 && mrec1)) {
01472
01473 goto corrupt;
01474 }
01475
01476 while (mrec0 && mrec1) {
01477 ibool null_eq = FALSE;
01478 switch (row_merge_cmp(mrec0, mrec1,
01479 offsets0, offsets1, index,
01480 &null_eq)) {
01481 case 0:
01482 if (UNIV_UNLIKELY
01483 (dict_index_is_unique(index) && !null_eq)) {
01484 innobase_rec_to_mysql(table, mrec0,
01485 index, offsets0);
01486 mem_heap_free(heap);
01487 return(DB_DUPLICATE_KEY);
01488 }
01489
01490 case -1:
01491 ROW_MERGE_WRITE_GET_NEXT(0, goto merged);
01492 break;
01493 case 1:
01494 ROW_MERGE_WRITE_GET_NEXT(1, goto merged);
01495 break;
01496 default:
01497 ut_error;
01498 }
01499
01500 }
01501
01502 merged:
01503 if (mrec0) {
01504
01505 for (;;) {
01506 ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
01507 }
01508 }
01509 done0:
01510 if (mrec1) {
01511
01512 for (;;) {
01513 ROW_MERGE_WRITE_GET_NEXT(1, goto done1);
01514 }
01515 }
01516 done1:
01517
01518 mem_heap_free(heap);
01519 b2 = row_merge_write_eof(&block[2], b2, of->fd, &of->offset);
01520 return(b2 ? DB_SUCCESS : DB_CORRUPTION);
01521 }
01522
01523
01526 static __attribute__((nonnull))
01527 ibool
01528 row_merge_blocks_copy(
01529
01530 const dict_index_t* index,
01531 const merge_file_t* file,
01532 row_merge_block_t* block,
01533 ulint* foffs0,
01534 merge_file_t* of)
01535 {
01536 mem_heap_t* heap;
01538 mrec_buf_t* buf;
01540 const byte* b0;
01541 byte* b2;
01542 const mrec_t* mrec0;
01543 ulint* offsets0;
01544 ulint* offsets1;
01545
01546 #ifdef UNIV_DEBUG
01547 if (row_merge_print_block) {
01548 fprintf(stderr,
01549 "row_merge_blocks_copy fd=%d ofs=%lu"
01550 " = fd=%d ofs=%lu\n",
01551 file->fd, (ulong) foffs0,
01552 of->fd, (ulong) of->offset);
01553 }
01554 #endif
01555
01556 heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
01557 buf = static_cast<mrec_buf_t *>(mem_heap_alloc(heap, sizeof(mrec_buf_t) * 3));
01558
01559
01560
01561
01562 if (!row_merge_read(file->fd, *foffs0, &block[0])) {
01563 corrupt:
01564 mem_heap_free(heap);
01565 return(FALSE);
01566 }
01567
01568 b0 = block[0];
01569 b2 = block[2];
01570
01571 b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
01572 foffs0, &mrec0, offsets0);
01573 if (UNIV_UNLIKELY(!b0 && mrec0)) {
01574
01575 goto corrupt;
01576 }
01577
01578 if (mrec0) {
01579
01580 for (;;) {
01581 ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
01582 }
01583 }
01584 done0:
01585
01586
01587
01588 (*foffs0)++;
01589
01590 mem_heap_free(heap);
01591 return(row_merge_write_eof(&block[2], b2, of->fd, &of->offset)
01592 != NULL);
01593 }
01594
01595
01598 static __attribute__((nonnull))
01599 ulint
01600 row_merge(
01601
01602 trx_t* trx,
01603 const dict_index_t* index,
01604 merge_file_t* file,
01606 row_merge_block_t* block,
01607 int* tmpfd,
01608 TABLE* table,
01611 ulint* num_run,
01613 ulint* run_offset)
01616 {
01617 ulint foffs0;
01618 ulint foffs1;
01619 ulint error;
01620 merge_file_t of;
01621 const ulint ihalf = run_offset[*num_run / 2];
01623 ulint n_run = 0;
01627 UNIV_MEM_ASSERT_W(block[0], 3 * sizeof block[0]);
01628 ut_ad(ihalf < file->offset);
01629
01630 of.fd = *tmpfd;
01631 of.offset = 0;
01632 of.n_rec = 0;
01633
01634 #ifdef POSIX_FADV_SEQUENTIAL
01635
01636
01637
01638 posix_fadvise(file->fd, 0, 0,
01639 POSIX_FADV_SEQUENTIAL | POSIX_FADV_NOREUSE);
01640 #endif
01641
01642
01643 foffs0 = 0;
01644 foffs1 = ihalf;
01645
01646 UNIV_MEM_INVALID(run_offset, *num_run * sizeof *run_offset);
01647
01648 for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
01649
01650 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
01651 return(DB_INTERRUPTED);
01652 }
01653
01654
01655 run_offset[n_run++] = of.offset;
01656
01657 error = row_merge_blocks(index, file, block,
01658 &foffs0, &foffs1, &of, table);
01659
01660 if (error != DB_SUCCESS) {
01661 return(error);
01662 }
01663
01664 }
01665
01666
01667
01668 while (foffs0 < ihalf) {
01669 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
01670 return(DB_INTERRUPTED);
01671 }
01672
01673
01674 run_offset[n_run++] = of.offset;
01675
01676 if (!row_merge_blocks_copy(index, file, block, &foffs0, &of)) {
01677 return(DB_CORRUPTION);
01678 }
01679 }
01680
01681 ut_ad(foffs0 == ihalf);
01682
01683 while (foffs1 < file->offset) {
01684 if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
01685 return(DB_INTERRUPTED);
01686 }
01687
01688
01689 run_offset[n_run++] = of.offset;
01690
01691 if (!row_merge_blocks_copy(index, file, block, &foffs1, &of)) {
01692 return(DB_CORRUPTION);
01693 }
01694 }
01695
01696 ut_ad(foffs1 == file->offset);
01697
01698 if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
01699 return(DB_CORRUPTION);
01700 }
01701
01702 ut_ad(n_run <= *num_run);
01703
01704 *num_run = n_run;
01705
01706
01707
01708
01709
01710 ut_ad((*num_run) <= file->offset);
01711
01712
01713
01714 ut_ad(of.offset <= file->offset);
01715
01716
01717 *tmpfd = file->fd;
01718 *file = of;
01719
01720 UNIV_MEM_INVALID(block[0], 3 * sizeof block[0]);
01721
01722 return(DB_SUCCESS);
01723 }
01724
01725
01728 static
01729 ulint
01730 row_merge_sort(
01731
01732 trx_t* trx,
01733 const dict_index_t* index,
01734 merge_file_t* file,
01736 row_merge_block_t* block,
01737 int* tmpfd,
01738 TABLE* table)
01741 {
01742 ulint half = file->offset / 2;
01743 ulint num_runs;
01744 ulint* run_offset;
01745 ulint error = DB_SUCCESS;
01746
01747
01748 num_runs = file->offset;
01749
01750
01751 if (num_runs <= 1) {
01752 return(error);
01753 }
01754
01755
01756 run_offset = (ulint*) mem_alloc(file->offset * sizeof(ulint));
01757
01758
01759
01760 run_offset[half] = half;
01761
01762
01763
01764 ut_ad(file->offset > 0);
01765
01766
01767 do {
01768 error = row_merge(trx, index, file, block, tmpfd,
01769 table, &num_runs, run_offset);
01770
01771 UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
01772
01773 if (error != DB_SUCCESS) {
01774 break;
01775 }
01776 } while (num_runs > 1);
01777
01778 mem_free(run_offset);
01779
01780 return(error);
01781 }
01782
01783
01785 static
01786 void
01787 row_merge_copy_blobs(
01788
01789 const mrec_t* mrec,
01790 const ulint* offsets,
01791 ulint zip_size,
01792 dtuple_t* tuple,
01793 mem_heap_t* heap)
01794 {
01795 ulint i;
01796 ulint n_fields = dtuple_get_n_fields(tuple);
01797
01798 for (i = 0; i < n_fields; i++) {
01799 ulint len;
01800 const void* data;
01801 dfield_t* field = dtuple_get_nth_field(tuple, i);
01802
01803 if (!dfield_is_ext(field)) {
01804 continue;
01805 }
01806
01807 ut_ad(!dfield_is_null(field));
01808
01809
01810
01811
01812
01813
01814 data = btr_rec_copy_externally_stored_field(
01815 mrec, offsets, zip_size, i, &len, heap);
01816
01817
01818
01819
01820 ut_a(data);
01821
01822 dfield_set_data(field, data, len);
01823 }
01824 }
01825
01826
01830 static
01831 ulint
01832 row_merge_insert_index_tuples(
01833
01834 trx_t* trx,
01835 dict_index_t* index,
01836 dict_table_t* table,
01837 ulint zip_size,
01839 int fd,
01840 row_merge_block_t* block)
01841 {
01842 const byte* b;
01843 que_thr_t* thr;
01844 ins_node_t* node;
01845 mem_heap_t* tuple_heap;
01846 mem_heap_t* graph_heap;
01847 ulint error = DB_SUCCESS;
01848 ulint foffs = 0;
01849 ulint* offsets;
01850
01851 ut_ad(trx);
01852 ut_ad(index);
01853 ut_ad(table);
01854
01855
01856
01857
01858 trx->op_info = "inserting index entries";
01859
01860 graph_heap = mem_heap_create(500 + sizeof(mrec_buf_t));
01861 node = ins_node_create(INS_DIRECT, table, graph_heap);
01862
01863 thr = pars_complete_graph_for_exec(node, trx, graph_heap);
01864
01865 que_thr_move_to_run_state_for_mysql(thr, trx);
01866
01867 tuple_heap = mem_heap_create(1000);
01868
01869 {
01870 ulint i = 1 + REC_OFFS_HEADER_SIZE
01871 + dict_index_get_n_fields(index);
01872 offsets = static_cast<ulint *>(mem_heap_alloc(graph_heap, i * sizeof *offsets));
01873 offsets[0] = i;
01874 offsets[1] = dict_index_get_n_fields(index);
01875 }
01876
01877 b = *block;
01878
01879 if (!row_merge_read(fd, foffs, block)) {
01880 error = DB_CORRUPTION;
01881 } else {
01882 mrec_buf_t* buf = static_cast<mrec_buf_t *>(mem_heap_alloc(graph_heap, sizeof *buf));
01883
01884 for (;;) {
01885 const mrec_t* mrec;
01886 dtuple_t* dtuple;
01887 ulint n_ext;
01888
01889 b = row_merge_read_rec(block, buf, b, index,
01890 fd, &foffs, &mrec, offsets);
01891 if (UNIV_UNLIKELY(!b)) {
01892
01893 if (mrec) {
01894 error = DB_CORRUPTION;
01895 }
01896 break;
01897 }
01898
01899 dtuple = row_rec_to_index_entry_low(
01900 mrec, index, offsets, &n_ext, tuple_heap);
01901
01902 if (UNIV_UNLIKELY(n_ext)) {
01903 row_merge_copy_blobs(mrec, offsets, zip_size,
01904 dtuple, tuple_heap);
01905 }
01906
01907 node->row = dtuple;
01908 node->table = table;
01909 node->trx_id = trx->id;
01910
01911 ut_ad(dtuple_validate(dtuple));
01912
01913 do {
01914 thr->run_node = thr;
01915 thr->prev_node = thr->common.parent;
01916
01917 error = row_ins_index_entry(index, dtuple,
01918 0, FALSE, thr);
01919
01920 if (UNIV_LIKELY(error == DB_SUCCESS)) {
01921
01922 goto next_rec;
01923 }
01924
01925 thr->lock_state = QUE_THR_LOCK_ROW;
01926 trx->error_state = error;
01927 que_thr_stop_for_mysql(thr);
01928 thr->lock_state = QUE_THR_LOCK_NOLOCK;
01929 } while (row_mysql_handle_errors(&error, trx,
01930 thr, NULL));
01931
01932 goto err_exit;
01933 next_rec:
01934 mem_heap_empty(tuple_heap);
01935 }
01936 }
01937
01938 que_thr_stop_for_mysql_no_error(thr, trx);
01939 err_exit:
01940 que_graph_free(thr->graph);
01941
01942 trx->op_info = "";
01943
01944 mem_heap_free(tuple_heap);
01945
01946 return(error);
01947 }
01948
01949
01952 UNIV_INTERN
01953 ulint
01954 row_merge_lock_table(
01955
01956 trx_t* trx,
01957 dict_table_t* table,
01958 enum lock_mode mode)
01959 {
01960 mem_heap_t* heap;
01961 que_thr_t* thr;
01962 ulint err;
01963 sel_node_t* node;
01964
01965 ut_ad(trx);
01966 ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
01967 ut_ad(mode == LOCK_X || mode == LOCK_S);
01968
01969 heap = mem_heap_create(512);
01970
01971 trx->op_info = "setting table lock for creating or dropping index";
01972
01973 node = sel_node_create(heap);
01974 thr = pars_complete_graph_for_exec(node, trx, heap);
01975 thr->graph->state = QUE_FORK_ACTIVE;
01976
01977
01978
01979
01980 thr = que_fork_get_first_thr(static_cast<que_fork_t *>(que_node_get_parent(thr)));
01981 que_thr_move_to_run_state_for_mysql(thr, trx);
01982
01983 run_again:
01984 thr->run_node = thr;
01985 thr->prev_node = thr->common.parent;
01986
01987 err = lock_table(0, table, mode, thr);
01988
01989 trx->error_state = err;
01990
01991 if (UNIV_LIKELY(err == DB_SUCCESS)) {
01992 que_thr_stop_for_mysql_no_error(thr, trx);
01993 } else {
01994 que_thr_stop_for_mysql(thr);
01995
01996 if (err != DB_QUE_THR_SUSPENDED) {
01997 ibool was_lock_wait;
01998
01999 was_lock_wait = row_mysql_handle_errors(
02000 &err, trx, thr, NULL);
02001
02002 if (was_lock_wait) {
02003 goto run_again;
02004 }
02005 } else {
02006 que_thr_t* run_thr;
02007 que_node_t* parent;
02008
02009 parent = que_node_get_parent(thr);
02010 run_thr = que_fork_start_command(static_cast<que_fork_t *>(parent));
02011
02012 ut_a(run_thr == thr);
02013
02014
02015
02016 trx->error_state = DB_LOCK_WAIT;
02017
02018 goto run_again;
02019 }
02020 }
02021
02022 que_graph_free(thr->graph);
02023 trx->op_info = "";
02024
02025 return(err);
02026 }
02027
02028
02032 UNIV_INTERN
02033 void
02034 row_merge_drop_index(
02035
02036 dict_index_t* index,
02037 dict_table_t* table,
02038 trx_t* trx)
02039 {
02040 ulint err;
02041 pars_info_t* info = pars_info_create();
02042
02043
02044
02045
02046
02047
02048 static const char str1[] =
02049 "PROCEDURE DROP_INDEX_PROC () IS\n"
02050 "BEGIN\n"
02051
02052
02053
02054 "UPDATE SYS_INDEXES SET NAME=CONCAT('"
02055 TEMP_INDEX_PREFIX_STR "', NAME) WHERE ID = :indexid;\n"
02056 "COMMIT WORK;\n"
02057
02058 "DELETE FROM SYS_FIELDS WHERE INDEX_ID = :indexid;\n"
02059
02060 "DELETE FROM SYS_INDEXES WHERE ID = :indexid;\n"
02061 "END;\n";
02062
02063 ut_ad(index && table && trx);
02064
02065 pars_info_add_ull_literal(info, "indexid", index->id);
02066
02067 trx_start_if_not_started(trx);
02068 trx->op_info = "dropping index";
02069
02070 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
02071
02072 err = que_eval_sql(info, str1, FALSE, trx);
02073
02074 ut_a(err == DB_SUCCESS);
02075
02076
02077
02078
02079 dict_table_replace_index_in_foreign_list(table, index, trx);
02080 dict_index_remove_from_cache(table, index);
02081
02082 trx->op_info = "";
02083 }
02084
02085
02090 UNIV_INTERN
02091 void
02092 row_merge_drop_indexes(
02093
02094 trx_t* trx,
02095 dict_table_t* table,
02096 dict_index_t** index,
02097 ulint num_created)
02098 {
02099 ulint key_num;
02100
02101 for (key_num = 0; key_num < num_created; key_num++) {
02102 row_merge_drop_index(index[key_num], table, trx);
02103 }
02104 }
02105
02106
02108 UNIV_INTERN
02109 void
02110 row_merge_drop_temp_indexes(void)
02111
02112 {
02113 trx_t* trx;
02114 btr_pcur_t pcur;
02115 mtr_t mtr;
02116
02117
02118
02119
02120
02121 trx = trx_allocate_for_background();
02122 trx->op_info = "dropping partially created indexes";
02123 row_mysql_lock_data_dictionary(trx);
02124
02125 mtr_start(&mtr);
02126
02127 btr_pcur_open_at_index_side(
02128 TRUE,
02129 dict_table_get_first_index(dict_sys->sys_indexes),
02130 BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
02131
02132 for (;;) {
02133 const rec_t* rec;
02134 const byte* field;
02135 ulint len;
02136 table_id_t table_id;
02137 dict_table_t* table;
02138
02139 btr_pcur_move_to_next_user_rec(&pcur, &mtr);
02140
02141 if (!btr_pcur_is_on_user_rec(&pcur)) {
02142 break;
02143 }
02144
02145 rec = btr_pcur_get_rec(&pcur);
02146 field = rec_get_nth_field_old(rec, DICT_SYS_INDEXES_NAME_FIELD,
02147 &len);
02148 if (len == UNIV_SQL_NULL || len == 0
02149 || (char) *field != TEMP_INDEX_PREFIX) {
02150 continue;
02151 }
02152
02153
02154
02155 field = rec_get_nth_field_old(rec, 0, &len);
02156 if (len != 8) {
02157
02158 continue;
02159 }
02160
02161 table_id = mach_read_from_8(field);
02162
02163 btr_pcur_store_position(&pcur, &mtr);
02164 btr_pcur_commit_specify_mtr(&pcur, &mtr);
02165
02166 table = dict_table_get_on_id_low(table_id);
02167
02168 if (table) {
02169 dict_index_t* index;
02170 dict_index_t* next_index;
02171
02172 for (index = dict_table_get_first_index(table);
02173 index; index = next_index) {
02174
02175 next_index = dict_table_get_next_index(index);
02176
02177 if (*index->name == TEMP_INDEX_PREFIX) {
02178 row_merge_drop_index(index, table, trx);
02179 trx_commit_for_mysql(trx);
02180 }
02181 }
02182 }
02183
02184 mtr_start(&mtr);
02185 btr_pcur_restore_position(BTR_SEARCH_LEAF,
02186 &pcur, &mtr);
02187 }
02188
02189 btr_pcur_close(&pcur);
02190 mtr_commit(&mtr);
02191 row_mysql_unlock_data_dictionary(trx);
02192 trx_free_for_background(trx);
02193 }
02194
02195
02197 static
02198 void
02199 row_merge_file_create(
02200
02201 merge_file_t* merge_file)
02202 {
02203 #ifdef UNIV_PFS_IO
02204
02205
02206
02207 struct PSI_file_locker* locker = NULL;
02208 PSI_file_locker_state state;
02209 register_pfs_file_open_begin(&state, locker, innodb_file_temp_key,
02210 PSI_FILE_OPEN,
02211 "Innodb Merge Temp File",
02212 __FILE__, __LINE__);
02213 #endif
02214 merge_file->fd = innobase_mysql_tmpfile();
02215 merge_file->offset = 0;
02216 merge_file->n_rec = 0;
02217 #ifdef UNIV_PFS_IO
02218 register_pfs_file_open_end(locker, merge_file->fd);
02219 #endif
02220 }
02221
02222
02224 static
02225 void
02226 row_merge_file_destroy(
02227
02228 merge_file_t* merge_file)
02229 {
02230 #ifdef UNIV_PFS_IO
02231 struct PSI_file_locker* locker = NULL;
02232 PSI_file_locker_state state;
02233 register_pfs_file_io_begin(&state, locker, merge_file->fd, 0, PSI_FILE_CLOSE,
02234 __FILE__, __LINE__);
02235 #endif
02236 if (merge_file->fd != -1) {
02237 close(merge_file->fd);
02238 merge_file->fd = -1;
02239 }
02240
02241 #ifdef UNIV_PFS_IO
02242 register_pfs_file_io_end(locker, 0);
02243 #endif
02244 }
02245
02246
02250 UNIV_INLINE
02251 ulint
02252 row_merge_col_prtype(
02253
02254 const dict_col_t* col,
02255 const char* col_name,
02256 const merge_index_def_t*index_def)
02258 {
02259 ulint prtype = col->prtype;
02260 ulint i;
02261
02262 ut_ad(index_def->ind_type & DICT_CLUSTERED);
02263
02264 if (prtype & DATA_NOT_NULL) {
02265
02266 return(prtype);
02267 }
02268
02269
02270
02271
02272 for (i = 0; i < index_def->n_fields; i++) {
02273 if (!strcmp(col_name, index_def->fields[i].field_name)) {
02274 return(prtype | DATA_NOT_NULL);
02275 }
02276 }
02277
02278 return(prtype);
02279 }
02280
02281
02285 UNIV_INTERN
02286 dict_table_t*
02287 row_merge_create_temporary_table(
02288
02289 const char* table_name,
02290 const merge_index_def_t*index_def,
02292 const dict_table_t* table,
02293 trx_t* trx)
02295 {
02296 ulint i;
02297 dict_table_t* new_table = NULL;
02298 ulint n_cols = dict_table_get_n_user_cols(table);
02299 ulint error;
02300 mem_heap_t* heap = mem_heap_create(1000);
02301
02302 ut_ad(table_name);
02303 ut_ad(index_def);
02304 ut_ad(table);
02305 ut_ad(mutex_own(&dict_sys->mutex));
02306
02307 new_table = dict_mem_table_create(table_name, 0, n_cols, table->flags);
02308
02309 for (i = 0; i < n_cols; i++) {
02310 const dict_col_t* col;
02311 const char* col_name;
02312
02313 col = dict_table_get_nth_col(table, i);
02314 col_name = dict_table_get_col_name(table, i);
02315
02316 dict_mem_table_add_col(new_table, heap, col_name, col->mtype,
02317 row_merge_col_prtype(col, col_name,
02318 index_def),
02319 col->len);
02320 }
02321
02322 error = row_create_table_for_mysql(new_table, trx);
02323 mem_heap_free(heap);
02324
02325 if (error != DB_SUCCESS) {
02326 trx->error_state = error;
02327 new_table = NULL;
02328 }
02329
02330 return(new_table);
02331 }
02332
02333
02338 UNIV_INTERN
02339 ulint
02340 row_merge_rename_indexes(
02341
02342 trx_t* trx,
02343 dict_table_t* table)
02344 {
02345 ulint err = DB_SUCCESS;
02346 pars_info_t* info = pars_info_create();
02347
02348
02349
02350
02351 static const char rename_indexes[] =
02352 "PROCEDURE RENAME_INDEXES_PROC () IS\n"
02353 "BEGIN\n"
02354 "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
02355 "WHERE TABLE_ID = :tableid AND SUBSTR(NAME,0,1)='"
02356 TEMP_INDEX_PREFIX_STR "';\n"
02357 "END;\n";
02358
02359 ut_ad(table);
02360 ut_ad(trx);
02361 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
02362
02363 trx->op_info = "renaming indexes";
02364
02365 pars_info_add_ull_literal(info, "tableid", table->id);
02366
02367 err = que_eval_sql(info, rename_indexes, FALSE, trx);
02368
02369 if (err == DB_SUCCESS) {
02370 dict_index_t* index = dict_table_get_first_index(table);
02371 do {
02372 if (*index->name == TEMP_INDEX_PREFIX) {
02373 index->name++;
02374 }
02375 index = dict_table_get_next_index(index);
02376 } while (index);
02377 }
02378
02379 trx->op_info = "";
02380
02381 return(err);
02382 }
02383
02384
02389 UNIV_INTERN
02390 ulint
02391 row_merge_rename_tables(
02392
02393 dict_table_t* old_table,
02395 dict_table_t* new_table,
02397 const char* tmp_name,
02398 trx_t* trx)
02399 {
02400 ulint err = DB_ERROR;
02401 pars_info_t* info;
02402 char old_name[MAX_TABLE_NAME_LEN + 1];
02403
02404 ut_ad(trx->mysql_thread_id == os_thread_get_curr_id());
02405 ut_ad(old_table != new_table);
02406 ut_ad(mutex_own(&dict_sys->mutex));
02407
02408 ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
02409
02410
02411 if (strlen(old_table->name) + 1 <= sizeof(old_name)) {
02412 memcpy(old_name, old_table->name, strlen(old_table->name) + 1);
02413 } else {
02414 ut_print_timestamp(stderr);
02415 fprintf(stderr, "InnoDB: too long table name: '%s', "
02416 "max length is %d\n", old_table->name,
02417 MAX_TABLE_NAME_LEN);
02418 ut_error;
02419 }
02420
02421
02422 if (strlen(old_table->name) + 1 <= sizeof(old_name)) {
02423 memcpy(old_name, old_table->name, strlen(old_table->name) + 1);
02424 } else {
02425 ut_print_timestamp(stderr);
02426 fprintf(stderr, "InnoDB: too long table name: '%s', "
02427 "max length is %d\n", old_table->name,
02428 MAX_TABLE_NAME_LEN);
02429 ut_error;
02430 }
02431
02432 trx->op_info = "renaming tables";
02433
02434
02435
02436
02437 info = pars_info_create();
02438
02439 pars_info_add_str_literal(info, "new_name", new_table->name);
02440 pars_info_add_str_literal(info, "old_name", old_name);
02441 pars_info_add_str_literal(info, "tmp_name", tmp_name);
02442
02443 err = que_eval_sql(info,
02444 "PROCEDURE RENAME_TABLES () IS\n"
02445 "BEGIN\n"
02446 "UPDATE SYS_TABLES SET NAME = :tmp_name\n"
02447 " WHERE NAME = :old_name;\n"
02448 "UPDATE SYS_TABLES SET NAME = :old_name\n"
02449 " WHERE NAME = :new_name;\n"
02450 "END;\n", FALSE, trx);
02451
02452 if (err != DB_SUCCESS) {
02453
02454 goto err_exit;
02455 }
02456
02457
02458
02459
02460 if (!dict_table_rename_in_cache(old_table, tmp_name, FALSE)
02461 || !dict_table_rename_in_cache(new_table, old_name, FALSE)) {
02462
02463 err = DB_ERROR;
02464 goto err_exit;
02465 }
02466
02467 err = dict_load_foreigns(old_name, FALSE, TRUE);
02468
02469 if (err != DB_SUCCESS) {
02470 err_exit:
02471 trx->error_state = DB_SUCCESS;
02472 trx_general_rollback_for_mysql(trx, NULL);
02473 trx->error_state = DB_SUCCESS;
02474 }
02475
02476 trx->op_info = "";
02477
02478 return(err);
02479 }
02480
02481
02484 static
02485 ulint
02486 row_merge_create_index_graph(
02487
02488 trx_t* trx,
02489 dict_table_t* table,
02490 dict_index_t* index)
02491 {
02492 ind_node_t* node;
02493 mem_heap_t* heap;
02494 que_thr_t* thr;
02495 ulint err;
02496
02497 ut_ad(trx);
02498 ut_ad(table);
02499 ut_ad(index);
02500
02501 heap = mem_heap_create(512);
02502
02503 index->table = table;
02504 node = ind_create_graph_create(index, heap);
02505 thr = pars_complete_graph_for_exec(node, trx, heap);
02506
02507 ut_a(thr == que_fork_start_command(static_cast<que_fork_t *>(que_node_get_parent(thr))));
02508
02509 que_run_threads(thr);
02510
02511 err = trx->error_state;
02512
02513 que_graph_free((que_t*) que_node_get_parent(thr));
02514
02515 return(err);
02516 }
02517
02518
02521 UNIV_INTERN
02522 dict_index_t*
02523 row_merge_create_index(
02524
02525 trx_t* trx,
02526 dict_table_t* table,
02527 const merge_index_def_t*index_def)
02529 {
02530 dict_index_t* index;
02531 ulint err;
02532 ulint n_fields = index_def->n_fields;
02533 ulint i;
02534
02535
02536
02537
02538
02539 index = dict_mem_index_create(table->name, index_def->name,
02540 0, index_def->ind_type, n_fields);
02541
02542 ut_a(index);
02543
02544 for (i = 0; i < n_fields; i++) {
02545 merge_index_field_t* ifield = &index_def->fields[i];
02546
02547 dict_mem_index_add_field(index, ifield->field_name,
02548 ifield->prefix_len);
02549 }
02550
02551
02552 err = row_merge_create_index_graph(trx, table, index);
02553
02554 if (err == DB_SUCCESS) {
02555
02556 index = row_merge_dict_table_get_index(
02557 table, index_def);
02558
02559 ut_a(index);
02560
02561
02562
02563
02564 index->trx_id = trx->id;
02565 } else {
02566 index = NULL;
02567 }
02568
02569 return(index);
02570 }
02571
02572
02574 UNIV_INTERN
02575 ibool
02576 row_merge_is_index_usable(
02577
02578 const trx_t* trx,
02579 const dict_index_t* index)
02580 {
02581 return(!trx->read_view
02582 || read_view_sees_trx_id(trx->read_view, index->trx_id));
02583 }
02584
02585
02588 UNIV_INTERN
02589 ulint
02590 row_merge_drop_table(
02591
02592 trx_t* trx,
02593 dict_table_t* table)
02594 {
02595
02596 ut_a(table->n_mysql_handles_opened == 0);
02597
02598 return(row_drop_table_for_mysql(table->name, trx, FALSE));
02599 }
02600
02601
02606 UNIV_INTERN
02607 ulint
02608 row_merge_build_indexes(
02609
02610 trx_t* trx,
02611 dict_table_t* old_table,
02613 dict_table_t* new_table,
02616 dict_index_t** indexes,
02617 ulint n_indexes,
02618 TABLE* table)
02621 {
02622 merge_file_t* merge_files;
02623 row_merge_block_t* block;
02624 ulint block_size;
02625 ulint i;
02626 ulint error;
02627 int tmpfd;
02628
02629 ut_ad(trx);
02630 ut_ad(old_table);
02631 ut_ad(new_table);
02632 ut_ad(indexes);
02633 ut_ad(n_indexes);
02634
02635 trx_start_if_not_started(trx);
02636
02637
02638
02639
02640 merge_files = static_cast<merge_file_t *>(mem_alloc(n_indexes * sizeof *merge_files));
02641 block_size = 3 * sizeof *block;
02642 block = static_cast<row_merge_block_t *>(os_mem_alloc_large(&block_size));
02643
02644 for (i = 0; i < n_indexes; i++) {
02645
02646 row_merge_file_create(&merge_files[i]);
02647 }
02648
02649 tmpfd = innobase_mysql_tmpfile();
02650
02651
02652
02653 innobase_rec_reset(table);
02654
02655
02656
02657
02658 error = row_merge_read_clustered_index(
02659 trx, table, old_table, new_table, indexes,
02660 merge_files, n_indexes, block);
02661
02662 if (error != DB_SUCCESS) {
02663
02664 goto func_exit;
02665 }
02666
02667
02668
02669
02670 for (i = 0; i < n_indexes; i++) {
02671 error = row_merge_sort(trx, indexes[i], &merge_files[i],
02672 block, &tmpfd, table);
02673
02674 if (error == DB_SUCCESS) {
02675 error = row_merge_insert_index_tuples(
02676 trx, indexes[i], new_table,
02677 dict_table_zip_size(old_table),
02678 merge_files[i].fd, block);
02679 }
02680
02681
02682 row_merge_file_destroy(&merge_files[i]);
02683
02684 if (error != DB_SUCCESS) {
02685 trx->error_key_num = i;
02686 goto func_exit;
02687 }
02688 }
02689
02690 func_exit:
02691 close(tmpfd);
02692
02693 for (i = 0; i < n_indexes; i++) {
02694 row_merge_file_destroy(&merge_files[i]);
02695 }
02696
02697 mem_free(merge_files);
02698 os_mem_free_large(block, block_size);
02699
02700 return(error);
02701 }