00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00026 #include "que0que.h"
00027
00028 #ifdef UNIV_NONINL
00029 #include "que0que.ic"
00030 #endif
00031
00032 #include "usr0sess.h"
00033 #include "trx0trx.h"
00034 #include "trx0roll.h"
00035 #include "row0undo.h"
00036 #include "row0ins.h"
00037 #include "row0upd.h"
00038 #include "row0sel.h"
00039 #include "row0purge.h"
00040 #include "dict0crea.h"
00041 #include "log0log.h"
00042 #include "eval0proc.h"
00043 #include "eval0eval.h"
00044 #include "pars0types.h"
00045
00046 #define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256)
00047 #define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256)
00048 #define QUE_MAX_LOOPS_WITHOUT_CHECK 16
00049
00050 #ifdef UNIV_DEBUG
00051
00052
00053 UNIV_INTERN ibool que_trace_on = FALSE;
00054 #endif
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00131 static
00132 void
00133 que_thr_move_to_run_state(
00134
00135 que_thr_t* thr);
00137
00139 UNIV_INTERN
00140 void
00141 que_graph_publish(
00142
00143 que_t* graph,
00144 sess_t* sess)
00145 {
00146 ut_ad(mutex_own(&kernel_mutex));
00147
00148 UT_LIST_ADD_LAST(graphs, sess->graphs, graph);
00149 }
00150
00151
00154 UNIV_INTERN
00155 que_fork_t*
00156 que_fork_create(
00157
00158 que_t* graph,
00161 que_node_t* parent,
00162 ulint fork_type,
00163 mem_heap_t* heap)
00164 {
00165 que_fork_t* fork;
00166
00167 ut_ad(heap);
00168
00169 fork = static_cast<que_fork_t *>(mem_heap_alloc(heap, sizeof(que_fork_t)));
00170
00171 fork->common.type = QUE_NODE_FORK;
00172 fork->n_active_thrs = 0;
00173
00174 fork->state = QUE_FORK_COMMAND_WAIT;
00175
00176 if (graph != NULL) {
00177 fork->graph = graph;
00178 } else {
00179 fork->graph = fork;
00180 }
00181
00182 fork->common.parent = parent;
00183 fork->fork_type = fork_type;
00184
00185 fork->caller = NULL;
00186
00187 UT_LIST_INIT(fork->thrs);
00188
00189 fork->sym_tab = NULL;
00190 fork->info = NULL;
00191
00192 fork->heap = heap;
00193
00194 return(fork);
00195 }
00196
00197
00200 UNIV_INTERN
00201 que_thr_t*
00202 que_thr_create(
00203
00204 que_fork_t* parent,
00205 mem_heap_t* heap)
00206 {
00207 que_thr_t* thr;
00208
00209 ut_ad(parent && heap);
00210
00211 thr = static_cast<que_thr_t *>(mem_heap_alloc(heap, sizeof(que_thr_t)));
00212
00213 thr->common.type = QUE_NODE_THR;
00214 thr->common.parent = parent;
00215
00216 thr->magic_n = QUE_THR_MAGIC_N;
00217
00218 thr->graph = parent->graph;
00219
00220 thr->state = QUE_THR_COMMAND_WAIT;
00221
00222 thr->is_active = FALSE;
00223
00224 thr->run_node = NULL;
00225 thr->resource = 0;
00226 thr->lock_state = QUE_THR_LOCK_NOLOCK;
00227
00228 UT_LIST_ADD_LAST(thrs, parent->thrs, thr);
00229
00230 return(thr);
00231 }
00232
00233
00238 UNIV_INTERN
00239 void
00240 que_thr_end_wait(
00241
00242 que_thr_t* thr,
00246 que_thr_t** next_thr)
00252 {
00253 ibool was_active;
00254
00255 ut_ad(mutex_own(&kernel_mutex));
00256 ut_ad(thr);
00257 ut_ad((thr->state == QUE_THR_LOCK_WAIT)
00258 || (thr->state == QUE_THR_PROCEDURE_WAIT)
00259 || (thr->state == QUE_THR_SIG_REPLY_WAIT));
00260 ut_ad(thr->run_node);
00261
00262 thr->prev_node = thr->run_node;
00263
00264 was_active = thr->is_active;
00265
00266 que_thr_move_to_run_state(thr);
00267
00268 if (was_active) {
00269
00270 return;
00271 }
00272
00273 if (next_thr && *next_thr == NULL) {
00274 *next_thr = thr;
00275 } else {
00276 ut_a(0);
00277 srv_que_task_enqueue_low(thr);
00278 }
00279 }
00280
00281
00283 UNIV_INTERN
00284 void
00285 que_thr_end_wait_no_next_thr(
00286
00287 que_thr_t* thr)
00290 {
00291 ibool was_active;
00292
00293 ut_a(thr->state == QUE_THR_LOCK_WAIT);
00294
00295 ut_ad(mutex_own(&kernel_mutex));
00296 ut_ad(thr);
00297 ut_ad((thr->state == QUE_THR_LOCK_WAIT)
00298 || (thr->state == QUE_THR_PROCEDURE_WAIT)
00299 || (thr->state == QUE_THR_SIG_REPLY_WAIT));
00300
00301 was_active = thr->is_active;
00302
00303 que_thr_move_to_run_state(thr);
00304
00305 if (was_active) {
00306
00307 return;
00308 }
00309
00310
00311
00312
00313 srv_release_mysql_thread_if_suspended(thr);
00314
00315
00316 }
00317
00318
00320 UNIV_INLINE
00321 void
00322 que_thr_init_command(
00323
00324 que_thr_t* thr)
00325 {
00326 thr->run_node = thr;
00327 thr->prev_node = thr->common.parent;
00328
00329 que_thr_move_to_run_state(thr);
00330 }
00331
00332
00340 UNIV_INTERN
00341 que_thr_t*
00342 que_fork_start_command(
00343
00344 que_fork_t* fork)
00345 {
00346 que_thr_t* thr;
00347 que_thr_t* suspended_thr = NULL;
00348 que_thr_t* completed_thr = NULL;
00349
00350 fork->state = QUE_FORK_ACTIVE;
00351
00352 fork->last_sel_node = NULL;
00353
00354 suspended_thr = NULL;
00355 completed_thr = NULL;
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366 thr = UT_LIST_GET_FIRST(fork->thrs);
00367
00368
00369
00370 while (thr) {
00371 switch (thr->state) {
00372 case QUE_THR_COMMAND_WAIT:
00373
00374
00375
00376
00377 que_thr_init_command(thr);
00378
00379 return(thr);
00380
00381 case QUE_THR_SUSPENDED:
00382
00383
00384
00385 if (!suspended_thr) {
00386 suspended_thr = thr;
00387 }
00388
00389 break;
00390
00391 case QUE_THR_COMPLETED:
00392 if (!completed_thr) {
00393 completed_thr = thr;
00394 }
00395
00396 break;
00397
00398 case QUE_THR_LOCK_WAIT:
00399 ut_error;
00400
00401 }
00402
00403 thr = UT_LIST_GET_NEXT(thrs, thr);
00404 }
00405
00406 if (suspended_thr) {
00407
00408 thr = suspended_thr;
00409 que_thr_move_to_run_state(thr);
00410
00411 } else if (completed_thr) {
00412
00413 thr = completed_thr;
00414 que_thr_init_command(thr);
00415 }
00416
00417 return(thr);
00418 }
00419
00420
00424 UNIV_INTERN
00425 void
00426 que_fork_error_handle(
00427
00428 trx_t* ,
00429 que_t* fork)
00431 {
00432 que_thr_t* thr;
00433
00434 ut_ad(mutex_own(&kernel_mutex));
00435 ut_ad(trx->sess->state == SESS_ERROR);
00436 ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
00437 ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
00438
00439 thr = UT_LIST_GET_FIRST(fork->thrs);
00440
00441 while (thr != NULL) {
00442 ut_ad(!thr->is_active);
00443 ut_ad(thr->state != QUE_THR_SIG_REPLY_WAIT);
00444 ut_ad(thr->state != QUE_THR_LOCK_WAIT);
00445
00446 thr->run_node = thr;
00447 thr->prev_node = thr->child;
00448 thr->state = QUE_THR_COMPLETED;
00449
00450 thr = UT_LIST_GET_NEXT(thrs, thr);
00451 }
00452
00453 thr = UT_LIST_GET_FIRST(fork->thrs);
00454
00455 que_thr_move_to_run_state(thr);
00456
00457 ut_a(0);
00458 srv_que_task_enqueue_low(thr);
00459 }
00460
00461
00465 UNIV_INLINE
00466 ibool
00467 que_fork_all_thrs_in_state(
00468
00469 que_fork_t* fork,
00470 ulint state)
00471 {
00472 que_thr_t* thr_node;
00473
00474 thr_node = UT_LIST_GET_FIRST(fork->thrs);
00475
00476 while (thr_node != NULL) {
00477 if (thr_node->state != state) {
00478
00479 return(FALSE);
00480 }
00481
00482 thr_node = UT_LIST_GET_NEXT(thrs, thr_node);
00483 }
00484
00485 return(TRUE);
00486 }
00487
00488
00490 static
00491 void
00492 que_graph_free_stat_list(
00493
00494 que_node_t* node)
00495 {
00496 while (node) {
00497 que_graph_free_recursive(node);
00498
00499 node = que_node_get_next(node);
00500 }
00501 }
00502
00503
00506 UNIV_INTERN
00507 void
00508 que_graph_free_recursive(
00509
00510 que_node_t* node)
00511 {
00512 que_fork_t* fork;
00513 que_thr_t* thr;
00514 undo_node_t* undo;
00515 sel_node_t* sel;
00516 ins_node_t* ins;
00517 upd_node_t* upd;
00518 tab_node_t* cre_tab;
00519 ind_node_t* cre_ind;
00520 purge_node_t* purge;
00521
00522 if (node == NULL) {
00523
00524 return;
00525 }
00526
00527 switch (que_node_get_type(node)) {
00528
00529 case QUE_NODE_FORK:
00530 fork = static_cast<que_fork_t *>(node);
00531
00532 thr = UT_LIST_GET_FIRST(fork->thrs);
00533
00534 while (thr) {
00535 que_graph_free_recursive(thr);
00536
00537 thr = UT_LIST_GET_NEXT(thrs, thr);
00538 }
00539
00540 break;
00541 case QUE_NODE_THR:
00542
00543 thr = static_cast<que_thr_t *>(node);
00544
00545 if (thr->magic_n != QUE_THR_MAGIC_N) {
00546 fprintf(stderr,
00547 "que_thr struct appears corrupt;"
00548 " magic n %lu\n",
00549 (unsigned long) thr->magic_n);
00550 mem_analyze_corruption(thr);
00551 ut_error;
00552 }
00553
00554 thr->magic_n = QUE_THR_MAGIC_FREED;
00555
00556 que_graph_free_recursive(thr->child);
00557
00558 break;
00559 case QUE_NODE_UNDO:
00560
00561 undo = static_cast<undo_node_t *>(node);
00562
00563 mem_heap_free(undo->heap);
00564
00565 break;
00566 case QUE_NODE_SELECT:
00567
00568 sel = static_cast<sel_node_t *>(node);
00569
00570 sel_node_free_private(sel);
00571
00572 break;
00573 case QUE_NODE_INSERT:
00574
00575 ins = static_cast<ins_node_t *>(node);
00576
00577 que_graph_free_recursive(ins->select);
00578
00579 mem_heap_free(ins->entry_sys_heap);
00580
00581 break;
00582 case QUE_NODE_PURGE:
00583 purge = static_cast<purge_node_t *>(node);
00584
00585 mem_heap_free(purge->heap);
00586
00587 break;
00588
00589 case QUE_NODE_UPDATE:
00590
00591 upd = static_cast<upd_node_t *>(node);
00592
00593 if (upd->in_mysql_interface) {
00594
00595 btr_pcur_free_for_mysql(upd->pcur);
00596 }
00597
00598 que_graph_free_recursive(upd->cascade_node);
00599
00600 if (upd->cascade_heap) {
00601 mem_heap_free(upd->cascade_heap);
00602 }
00603
00604 que_graph_free_recursive(upd->select);
00605
00606 mem_heap_free(upd->heap);
00607
00608 break;
00609 case QUE_NODE_CREATE_TABLE:
00610 cre_tab = static_cast<tab_node_t *>(node);
00611
00612 que_graph_free_recursive(cre_tab->tab_def);
00613 que_graph_free_recursive(cre_tab->col_def);
00614 que_graph_free_recursive(cre_tab->commit_node);
00615
00616 mem_heap_free(cre_tab->heap);
00617
00618 break;
00619 case QUE_NODE_CREATE_INDEX:
00620 cre_ind = static_cast<ind_node_t *>(node);
00621
00622 que_graph_free_recursive(cre_ind->ind_def);
00623 que_graph_free_recursive(cre_ind->field_def);
00624 que_graph_free_recursive(cre_ind->commit_node);
00625
00626 mem_heap_free(cre_ind->heap);
00627
00628 break;
00629 case QUE_NODE_PROC:
00630 que_graph_free_stat_list(((proc_node_t*)node)->stat_list);
00631
00632 break;
00633 case QUE_NODE_IF:
00634 que_graph_free_stat_list(((if_node_t*)node)->stat_list);
00635 que_graph_free_stat_list(((if_node_t*)node)->else_part);
00636 que_graph_free_stat_list(((if_node_t*)node)->elsif_list);
00637
00638 break;
00639 case QUE_NODE_ELSIF:
00640 que_graph_free_stat_list(((elsif_node_t*)node)->stat_list);
00641
00642 break;
00643 case QUE_NODE_WHILE:
00644 que_graph_free_stat_list(((while_node_t*)node)->stat_list);
00645
00646 break;
00647 case QUE_NODE_FOR:
00648 que_graph_free_stat_list(((for_node_t*)node)->stat_list);
00649
00650 break;
00651
00652 case QUE_NODE_ASSIGNMENT:
00653 case QUE_NODE_EXIT:
00654 case QUE_NODE_RETURN:
00655 case QUE_NODE_COMMIT:
00656 case QUE_NODE_ROLLBACK:
00657 case QUE_NODE_LOCK:
00658 case QUE_NODE_FUNC:
00659 case QUE_NODE_ORDER:
00660 case QUE_NODE_ROW_PRINTF:
00661 case QUE_NODE_OPEN:
00662 case QUE_NODE_FETCH:
00663
00664
00665 break;
00666 default:
00667 fprintf(stderr,
00668 "que_node struct appears corrupt; type %lu\n",
00669 (unsigned long) que_node_get_type(node));
00670 mem_analyze_corruption(node);
00671 ut_error;
00672 }
00673 }
00674
00675
00677 UNIV_INTERN
00678 void
00679 que_graph_free(
00680
00681 que_t* graph)
00686 {
00687 ut_ad(graph);
00688
00689 if (graph->sym_tab) {
00690
00691
00692
00693
00694 sym_tab_free_private(graph->sym_tab);
00695 }
00696
00697 if (graph->info && graph->info->graph_owns_us) {
00698 pars_info_free(graph->info);
00699 }
00700
00701 que_graph_free_recursive(graph);
00702
00703 mem_heap_free(graph->heap);
00704 }
00705
00706
00709 static
00710 que_thr_t*
00711 que_thr_node_step(
00712
00713 que_thr_t* thr)
00715 {
00716 ut_ad(thr->run_node == thr);
00717
00718 if (thr->prev_node == thr->common.parent) {
00719
00720
00721
00722 thr->run_node = thr->child;
00723
00724 return(thr);
00725 }
00726
00727 mutex_enter(&kernel_mutex);
00728
00729 if (que_thr_peek_stop(thr)) {
00730
00731 mutex_exit(&kernel_mutex);
00732
00733 return(thr);
00734 }
00735
00736
00737
00738 thr->state = QUE_THR_COMPLETED;
00739
00740 mutex_exit(&kernel_mutex);
00741
00742 return(NULL);
00743 }
00744
00745
00751 static
00752 void
00753 que_thr_move_to_run_state(
00754
00755 que_thr_t* thr)
00756 {
00757 trx_t* trx;
00758
00759 ut_ad(thr->state != QUE_THR_RUNNING);
00760
00761 trx = thr_get_trx(thr);
00762
00763 if (!thr->is_active) {
00764
00765 (thr->graph)->n_active_thrs++;
00766
00767 trx->n_active_thrs++;
00768
00769 thr->is_active = TRUE;
00770
00771 ut_ad((thr->graph)->n_active_thrs == 1);
00772 ut_ad(trx->n_active_thrs == 1);
00773 }
00774
00775 thr->state = QUE_THR_RUNNING;
00776 }
00777
00778
00786 static
00787 void
00788 que_thr_dec_refer_count(
00789
00790 que_thr_t* thr,
00791 que_thr_t** next_thr)
00796 {
00797 que_fork_t* fork;
00798 trx_t* trx;
00799 ulint fork_type;
00800 ibool stopped;
00801
00802 fork = static_cast<que_fork_t *>(thr->common.parent);
00803 trx = thr_get_trx(thr);
00804
00805 mutex_enter(&kernel_mutex);
00806
00807 ut_a(thr->is_active);
00808
00809 if (thr->state == QUE_THR_RUNNING) {
00810
00811 stopped = que_thr_stop(thr);
00812
00813 if (!stopped) {
00814
00815
00816
00817
00818
00819
00820
00821 if (next_thr && *next_thr == NULL) {
00822
00823
00824
00825
00826 trx->error_state = DB_SUCCESS;
00827
00828 *next_thr = thr;
00829 } else {
00830 ut_error;
00831 srv_que_task_enqueue_low(thr);
00832 }
00833
00834 mutex_exit(&kernel_mutex);
00835
00836 return;
00837 }
00838 }
00839
00840 ut_ad(fork->n_active_thrs == 1);
00841 ut_ad(trx->n_active_thrs == 1);
00842
00843 fork->n_active_thrs--;
00844 trx->n_active_thrs--;
00845
00846 thr->is_active = FALSE;
00847
00848 if (trx->n_active_thrs > 0) {
00849
00850 mutex_exit(&kernel_mutex);
00851
00852 return;
00853 }
00854
00855 fork_type = fork->fork_type;
00856
00857
00858
00859 if (que_fork_all_thrs_in_state(fork, QUE_THR_COMPLETED)) {
00860
00861 switch (fork_type) {
00862 case QUE_FORK_ROLLBACK:
00863
00864
00865
00866 ut_ad(UT_LIST_GET_LEN(trx->signals) > 0);
00867 ut_ad(trx->handling_signals == TRUE);
00868
00869 trx_finish_rollback_off_kernel(fork, trx, next_thr);
00870 break;
00871
00872 case QUE_FORK_PURGE:
00873 case QUE_FORK_RECOVERY:
00874 case QUE_FORK_MYSQL_INTERFACE:
00875
00876
00877 break;
00878
00879 default:
00880 ut_error;
00881 }
00882 }
00883
00884 if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) {
00885
00886
00887
00888
00889
00890 trx_sig_start_handle(trx, next_thr);
00891 }
00892
00893 if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) {
00894
00895 trx_end_signal_handling(trx);
00896 }
00897
00898 mutex_exit(&kernel_mutex);
00899 }
00900
00901
00906 UNIV_INTERN
00907 ibool
00908 que_thr_stop(
00909
00910 que_thr_t* thr)
00911 {
00912 trx_t* trx;
00913 que_t* graph;
00914 ibool ret = TRUE;
00915
00916 ut_ad(mutex_own(&kernel_mutex));
00917
00918 graph = thr->graph;
00919 trx = graph->trx;
00920
00921 if (graph->state == QUE_FORK_COMMAND_WAIT) {
00922 thr->state = QUE_THR_SUSPENDED;
00923
00924 } else if (trx->que_state == TRX_QUE_LOCK_WAIT) {
00925
00926 UT_LIST_ADD_FIRST(trx_thrs, trx->wait_thrs, thr);
00927 thr->state = QUE_THR_LOCK_WAIT;
00928
00929 } else if (trx->error_state != DB_SUCCESS
00930 && trx->error_state != DB_LOCK_WAIT) {
00931
00932
00933 thr->state = QUE_THR_COMPLETED;
00934
00935 } else if (UT_LIST_GET_LEN(trx->signals) > 0
00936 && graph->fork_type != QUE_FORK_ROLLBACK) {
00937
00938 thr->state = QUE_THR_SUSPENDED;
00939 } else {
00940 ut_ad(graph->state == QUE_FORK_ACTIVE);
00941
00942 ret = FALSE;
00943 }
00944
00945 return(ret);
00946 }
00947
00948
00953 UNIV_INTERN
00954 void
00955 que_thr_stop_for_mysql(
00956
00957 que_thr_t* thr)
00958 {
00959 trx_t* trx;
00960
00961 trx = thr_get_trx(thr);
00962
00963 mutex_enter(&kernel_mutex);
00964
00965 if (thr->state == QUE_THR_RUNNING) {
00966
00967 if (trx->error_state != DB_SUCCESS
00968 && trx->error_state != DB_LOCK_WAIT) {
00969
00970
00971 thr->state = QUE_THR_COMPLETED;
00972 } else {
00973
00974
00975
00976
00977 mutex_exit(&kernel_mutex);
00978
00979 return;
00980 }
00981 }
00982
00983 ut_ad(thr->is_active == TRUE);
00984 ut_ad(trx->n_active_thrs == 1);
00985 ut_ad(thr->graph->n_active_thrs == 1);
00986
00987 thr->is_active = FALSE;
00988 (thr->graph)->n_active_thrs--;
00989
00990 trx->n_active_thrs--;
00991
00992 mutex_exit(&kernel_mutex);
00993 }
00994
00995
00999 UNIV_INTERN
01000 void
01001 que_thr_move_to_run_state_for_mysql(
01002
01003 que_thr_t* thr,
01004 trx_t* trx)
01005 {
01006 if (thr->magic_n != QUE_THR_MAGIC_N) {
01007 fprintf(stderr,
01008 "que_thr struct appears corrupt; magic n %lu\n",
01009 (unsigned long) thr->magic_n);
01010
01011 mem_analyze_corruption(thr);
01012
01013 ut_error;
01014 }
01015
01016 if (!thr->is_active) {
01017
01018 thr->graph->n_active_thrs++;
01019
01020 trx->n_active_thrs++;
01021
01022 thr->is_active = TRUE;
01023 }
01024
01025 thr->state = QUE_THR_RUNNING;
01026 }
01027
01028
01031 UNIV_INTERN
01032 void
01033 que_thr_stop_for_mysql_no_error(
01034
01035 que_thr_t* thr,
01036 trx_t* trx)
01037 {
01038 ut_ad(thr->state == QUE_THR_RUNNING);
01039 ut_ad(thr->is_active == TRUE);
01040 ut_ad(trx->n_active_thrs == 1);
01041 ut_ad(thr->graph->n_active_thrs == 1);
01042
01043 if (thr->magic_n != QUE_THR_MAGIC_N) {
01044 fprintf(stderr,
01045 "que_thr struct appears corrupt; magic n %lu\n",
01046 (unsigned long) thr->magic_n);
01047
01048 mem_analyze_corruption(thr);
01049
01050 ut_error;
01051 }
01052
01053 thr->state = QUE_THR_COMPLETED;
01054
01055 thr->is_active = FALSE;
01056 (thr->graph)->n_active_thrs--;
01057
01058 trx->n_active_thrs--;
01059 }
01060
01061
01065 UNIV_INTERN
01066 que_node_t*
01067 que_node_get_containing_loop_node(
01068
01069 que_node_t* node)
01070 {
01071 ut_ad(node);
01072
01073 for (;;) {
01074 ulint type;
01075
01076 node = que_node_get_parent(node);
01077
01078 if (!node) {
01079 break;
01080 }
01081
01082 type = que_node_get_type(node);
01083
01084 if ((type == QUE_NODE_FOR) || (type == QUE_NODE_WHILE)) {
01085 break;
01086 }
01087 }
01088
01089 return(node);
01090 }
01091
01092
01094 UNIV_INTERN
01095 void
01096 que_node_print_info(
01097
01098 que_node_t* node)
01099 {
01100 ulint type;
01101 const char* str;
01102
01103 type = que_node_get_type(node);
01104
01105 if (type == QUE_NODE_SELECT) {
01106 str = "SELECT";
01107 } else if (type == QUE_NODE_INSERT) {
01108 str = "INSERT";
01109 } else if (type == QUE_NODE_UPDATE) {
01110 str = "UPDATE";
01111 } else if (type == QUE_NODE_WHILE) {
01112 str = "WHILE";
01113 } else if (type == QUE_NODE_ASSIGNMENT) {
01114 str = "ASSIGNMENT";
01115 } else if (type == QUE_NODE_IF) {
01116 str = "IF";
01117 } else if (type == QUE_NODE_FETCH) {
01118 str = "FETCH";
01119 } else if (type == QUE_NODE_OPEN) {
01120 str = "OPEN";
01121 } else if (type == QUE_NODE_PROC) {
01122 str = "STORED PROCEDURE";
01123 } else if (type == QUE_NODE_FUNC) {
01124 str = "FUNCTION";
01125 } else if (type == QUE_NODE_LOCK) {
01126 str = "LOCK";
01127 } else if (type == QUE_NODE_THR) {
01128 str = "QUERY THREAD";
01129 } else if (type == QUE_NODE_COMMIT) {
01130 str = "COMMIT";
01131 } else if (type == QUE_NODE_UNDO) {
01132 str = "UNDO ROW";
01133 } else if (type == QUE_NODE_PURGE) {
01134 str = "PURGE ROW";
01135 } else if (type == QUE_NODE_ROLLBACK) {
01136 str = "ROLLBACK";
01137 } else if (type == QUE_NODE_CREATE_TABLE) {
01138 str = "CREATE TABLE";
01139 } else if (type == QUE_NODE_CREATE_INDEX) {
01140 str = "CREATE INDEX";
01141 } else if (type == QUE_NODE_FOR) {
01142 str = "FOR LOOP";
01143 } else if (type == QUE_NODE_RETURN) {
01144 str = "RETURN";
01145 } else if (type == QUE_NODE_EXIT) {
01146 str = "EXIT";
01147 } else {
01148 str = "UNKNOWN NODE TYPE";
01149 }
01150
01151 fprintf(stderr, "Node type %lu: %s, address %p\n",
01152 (ulong) type, str, (void*) node);
01153 }
01154
01155
01159 UNIV_INLINE
01160 que_thr_t*
01161 que_thr_step(
01162
01163 que_thr_t* thr)
01164 {
01165 que_node_t* node;
01166 que_thr_t* old_thr;
01167 trx_t* trx;
01168 ulint type;
01169
01170 trx = thr_get_trx(thr);
01171
01172 ut_ad(thr->state == QUE_THR_RUNNING);
01173 ut_a(trx->error_state == DB_SUCCESS);
01174
01175 thr->resource++;
01176
01177 node = thr->run_node;
01178 type = que_node_get_type(node);
01179
01180 old_thr = thr;
01181
01182 #ifdef UNIV_DEBUG
01183 if (que_trace_on) {
01184 fputs("To execute: ", stderr);
01185 que_node_print_info(node);
01186 }
01187 #endif
01188 if (type & QUE_NODE_CONTROL_STAT) {
01189 if ((thr->prev_node != que_node_get_parent(node))
01190 && que_node_get_next(thr->prev_node)) {
01191
01192
01193
01194
01195
01196 thr->run_node = que_node_get_next(thr->prev_node);
01197
01198 } else if (type == QUE_NODE_IF) {
01199 if_step(thr);
01200 } else if (type == QUE_NODE_FOR) {
01201 for_step(thr);
01202 } else if (type == QUE_NODE_PROC) {
01203
01204
01205
01206
01207
01208 if (thr->prev_node == que_node_get_parent(node)) {
01209 trx->last_sql_stat_start.least_undo_no
01210 = trx->undo_no;
01211 }
01212
01213 proc_step(thr);
01214 } else if (type == QUE_NODE_WHILE) {
01215 while_step(thr);
01216 } else {
01217 ut_error;
01218 }
01219 } else if (type == QUE_NODE_ASSIGNMENT) {
01220 assign_step(thr);
01221 } else if (type == QUE_NODE_SELECT) {
01222 thr = row_sel_step(thr);
01223 } else if (type == QUE_NODE_INSERT) {
01224 thr = row_ins_step(thr);
01225 } else if (type == QUE_NODE_UPDATE) {
01226 thr = row_upd_step(thr);
01227 } else if (type == QUE_NODE_FETCH) {
01228 thr = fetch_step(thr);
01229 } else if (type == QUE_NODE_OPEN) {
01230 thr = open_step(thr);
01231 } else if (type == QUE_NODE_FUNC) {
01232 proc_eval_step(thr);
01233
01234 } else if (type == QUE_NODE_LOCK) {
01235
01236 ut_error;
01237
01238
01239
01240 } else if (type == QUE_NODE_THR) {
01241 thr = que_thr_node_step(thr);
01242 } else if (type == QUE_NODE_COMMIT) {
01243 thr = trx_commit_step(thr);
01244 } else if (type == QUE_NODE_UNDO) {
01245 thr = row_undo_step(thr);
01246 } else if (type == QUE_NODE_PURGE) {
01247 thr = row_purge_step(thr);
01248 } else if (type == QUE_NODE_RETURN) {
01249 thr = return_step(thr);
01250 } else if (type == QUE_NODE_EXIT) {
01251 thr = exit_step(thr);
01252 } else if (type == QUE_NODE_ROLLBACK) {
01253 thr = trx_rollback_step(thr);
01254 } else if (type == QUE_NODE_CREATE_TABLE) {
01255 thr = dict_create_table_step(thr);
01256 } else if (type == QUE_NODE_CREATE_INDEX) {
01257 thr = dict_create_index_step(thr);
01258 } else if (type == QUE_NODE_ROW_PRINTF) {
01259 thr = row_printf_step(thr);
01260 } else {
01261 ut_error;
01262 }
01263
01264 if (type == QUE_NODE_EXIT) {
01265 old_thr->prev_node = que_node_get_containing_loop_node(node);
01266 } else {
01267 old_thr->prev_node = node;
01268 }
01269
01270 if (thr) {
01271 ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
01272 }
01273
01274 return(thr);
01275 }
01276
01277
01279 static
01280 void
01281 que_run_threads_low(
01282
01283 que_thr_t* thr)
01284 {
01285 que_thr_t* next_thr;
01286 ulint loop_count;
01287
01288 ut_ad(thr->state == QUE_THR_RUNNING);
01289 ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
01290 ut_ad(!mutex_own(&kernel_mutex));
01291
01292 loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
01293 loop:
01294
01295
01296
01297
01298
01299 log_free_check();
01300
01301
01302
01303
01304
01305 next_thr = que_thr_step(thr);
01306
01307
01308 ut_a(!next_thr || (thr_get_trx(next_thr)->error_state == DB_SUCCESS));
01309
01310 loop_count++;
01311
01312 if (next_thr != thr) {
01313 ut_a(next_thr == NULL);
01314
01315
01316
01317 que_thr_dec_refer_count(thr, &next_thr);
01318
01319 if (next_thr == NULL) {
01320
01321 return;
01322 }
01323
01324 loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
01325
01326 thr = next_thr;
01327 }
01328
01329 goto loop;
01330 }
01331
01332
01334 UNIV_INTERN
01335 void
01336 que_run_threads(
01337
01338 que_thr_t* thr)
01339 {
01340 loop:
01341 ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
01342 que_run_threads_low(thr);
01343
01344 mutex_enter(&kernel_mutex);
01345
01346 switch (thr->state) {
01347
01348 case QUE_THR_RUNNING:
01349
01350
01351
01352 mutex_exit(&kernel_mutex);
01353
01354 goto loop;
01355
01356 case QUE_THR_LOCK_WAIT:
01357 mutex_exit(&kernel_mutex);
01358
01359
01360
01361
01362 srv_suspend_mysql_thread(thr);
01363
01364 if (thr_get_trx(thr)->error_state != DB_SUCCESS) {
01365
01366
01367
01368 que_thr_dec_refer_count(thr, NULL);
01369
01370 return;
01371 }
01372
01373 goto loop;
01374
01375 case QUE_THR_COMPLETED:
01376 case QUE_THR_COMMAND_WAIT:
01377
01378 break;
01379
01380 default:
01381 ut_error;
01382 }
01383
01384 mutex_exit(&kernel_mutex);
01385 }
01386
01387
01390 UNIV_INTERN
01391 ulint
01392 que_eval_sql(
01393
01394 pars_info_t* info,
01395 const char* sql,
01396 ibool reserve_dict_mutex,
01399 trx_t* trx)
01400 {
01401 que_thr_t* thr;
01402 que_t* graph;
01403
01404 ut_a(trx->error_state == DB_SUCCESS);
01405
01406 if (reserve_dict_mutex) {
01407 mutex_enter(&dict_sys->mutex);
01408 }
01409
01410 graph = pars_sql(info, sql);
01411
01412 if (reserve_dict_mutex) {
01413 mutex_exit(&dict_sys->mutex);
01414 }
01415
01416 ut_a(graph);
01417
01418 graph->trx = trx;
01419 trx->graph = NULL;
01420
01421 graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
01422
01423 ut_a(thr = que_fork_start_command(graph));
01424
01425 que_run_threads(thr);
01426
01427 que_graph_free(graph);
01428
01429 return(trx->error_state);
01430 }