Drizzled Public API Documentation

que0que.cc

00001 /*****************************************************************************
00002 
00003 Copyright (C) 1996, 2009, Innobase Oy. All Rights Reserved.
00004 
00005 This program is free software; you can redistribute it and/or modify it under
00006 the terms of the GNU General Public License as published by the Free Software
00007 Foundation; version 2 of the License.
00008 
00009 This program is distributed in the hope that it will be useful, but WITHOUT
00010 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
00011 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
00012 
00013 You should have received a copy of the GNU General Public License along with
00014 this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
00015 St, Fifth Floor, Boston, MA 02110-1301 USA
00016 
00017 *****************************************************************************/
00018 
00019 /**************************************************/
00026 #include "que0que.h"
00027 
00028 #ifdef UNIV_NONINL
00029 #include "que0que.ic"
00030 #endif
00031 
00032 #include "usr0sess.h"
00033 #include "trx0trx.h"
00034 #include "trx0roll.h"
00035 #include "row0undo.h"
00036 #include "row0ins.h"
00037 #include "row0upd.h"
00038 #include "row0sel.h"
00039 #include "row0purge.h"
00040 #include "dict0crea.h"
00041 #include "log0log.h"
00042 #include "eval0proc.h"
00043 #include "eval0eval.h"
00044 #include "pars0types.h"
00045 
00046 #define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256)
00047 #define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256)
00048 #define QUE_MAX_LOOPS_WITHOUT_CHECK 16
00049 
00050 #ifdef UNIV_DEBUG
00051 /* If the following flag is set TRUE, the module will print trace info
00052 of SQL execution in the UNIV_SQL_DEBUG version */
00053 UNIV_INTERN ibool que_trace_on    = FALSE;
00054 #endif /* UNIV_DEBUG */
00055 
00056 /* Short introduction to query graphs
00057    ==================================
00058 
00059 A query graph consists of nodes linked to each other in various ways. The
00060 execution starts at que_run_threads() which takes a que_thr_t parameter.
00061 que_thr_t contains two fields that control query graph execution: run_node
00062 and prev_node. run_node is the next node to execute and prev_node is the
00063 last node executed.
00064 
00065 Each node has a pointer to a 'next' statement, i.e., its brother, and a
00066 pointer to its parent node. The next pointer is NULL in the last statement
00067 of a block.
00068 
00069 Loop nodes contain a link to the first statement of the enclosed statement
00070 list. While the loop runs, que_thr_step() checks if execution to the loop
00071 node came from its parent or from one of the statement nodes in the loop. If
00072 it came from the parent of the loop node it starts executing the first
00073 statement node in the loop. If it came from one of the statement nodes in
00074 the loop, then it checks if the statement node has another statement node
00075 following it, and runs it if so.
00076 
00077 To signify loop ending, the loop statements (see e.g. while_step()) set
00078 que_thr_t->run_node to the loop node's parent node. This is noticed on the
00079 next call of que_thr_step() and execution proceeds to the node pointed to by
00080 the loop node's 'next' pointer.
00081 
00082 For example, the code:
00083 
00084 X := 1;
00085 WHILE X < 5 LOOP
00086  X := X + 1;
00087  X := X + 1;
00088 X := 5
00089 
00090 will result in the following node hierarchy, with the X-axis indicating
00091 'next' links and the Y-axis indicating parent/child links:
00092 
00093 A - W - A
00094     |
00095     |
00096     A - A
00097 
00098 A = assign_node_t, W = while_node_t. */
00099 
00100 /* How a stored procedure containing COMMIT or ROLLBACK commands
00101 is executed?
00102 
00103 The commit or rollback can be seen as a subprocedure call.
00104 The problem is that if there are several query threads
00105 currently running within the transaction, their action could
00106 mess the commit or rollback operation. Or, at the least, the
00107 operation would be difficult to visualize and keep in control.
00108 
00109 Therefore the query thread requesting a commit or a rollback
00110 sends to the transaction a signal, which moves the transaction
00111 to TRX_QUE_SIGNALED state. All running query threads of the
00112 transaction will eventually notice that the transaction is now in
00113 this state and voluntarily suspend themselves. Only the last
00114 query thread which suspends itself will trigger handling of
00115 the signal.
00116 
00117 When the transaction starts to handle a rollback or commit
00118 signal, it builds a query graph which, when executed, will
00119 roll back or commit the incomplete transaction. The transaction
00120 is moved to the TRX_QUE_ROLLING_BACK or TRX_QUE_COMMITTING state.
00121 If specified, the SQL cursors opened by the transaction are closed.
00122 When the execution of the graph completes, it is like returning
00123 from a subprocedure: the query thread which requested the operation
00124 starts running again. */
00125 
00126 /**********************************************************************/
00131 static
00132 void
00133 que_thr_move_to_run_state(
00134 /*======================*/
00135   que_thr_t*  thr); 
00137 /***********************************************************************/
00139 UNIV_INTERN
00140 void
00141 que_graph_publish(
00142 /*==============*/
00143   que_t*  graph,  
00144   sess_t* sess) 
00145 {
00146   ut_ad(mutex_own(&kernel_mutex));
00147 
00148   UT_LIST_ADD_LAST(graphs, sess->graphs, graph);
00149 }
00150 
00151 /***********************************************************************/
00154 UNIV_INTERN
00155 que_fork_t*
00156 que_fork_create(
00157 /*============*/
00158   que_t*    graph,    
00161   que_node_t* parent,   
00162   ulint   fork_type,  
00163   mem_heap_t* heap)   
00164 {
00165   que_fork_t* fork;
00166 
00167   ut_ad(heap);
00168 
00169   fork = static_cast<que_fork_t *>(mem_heap_alloc(heap, sizeof(que_fork_t)));
00170 
00171   fork->common.type = QUE_NODE_FORK;
00172   fork->n_active_thrs = 0;
00173 
00174   fork->state = QUE_FORK_COMMAND_WAIT;
00175 
00176   if (graph != NULL) {
00177     fork->graph = graph;
00178   } else {
00179     fork->graph = fork;
00180   }
00181 
00182   fork->common.parent = parent;
00183   fork->fork_type = fork_type;
00184 
00185   fork->caller = NULL;
00186 
00187   UT_LIST_INIT(fork->thrs);
00188 
00189   fork->sym_tab = NULL;
00190   fork->info = NULL;
00191 
00192   fork->heap = heap;
00193 
00194   return(fork);
00195 }
00196 
00197 /***********************************************************************/
00200 UNIV_INTERN
00201 que_thr_t*
00202 que_thr_create(
00203 /*===========*/
00204   que_fork_t* parent, 
00205   mem_heap_t* heap) 
00206 {
00207   que_thr_t*  thr;
00208 
00209   ut_ad(parent && heap);
00210 
00211   thr = static_cast<que_thr_t *>(mem_heap_alloc(heap, sizeof(que_thr_t)));
00212 
00213   thr->common.type = QUE_NODE_THR;
00214   thr->common.parent = parent;
00215 
00216   thr->magic_n = QUE_THR_MAGIC_N;
00217 
00218   thr->graph = parent->graph;
00219 
00220   thr->state = QUE_THR_COMMAND_WAIT;
00221 
00222   thr->is_active = FALSE;
00223 
00224   thr->run_node = NULL;
00225   thr->resource = 0;
00226   thr->lock_state = QUE_THR_LOCK_NOLOCK;
00227 
00228   UT_LIST_ADD_LAST(thrs, parent->thrs, thr);
00229 
00230   return(thr);
00231 }
00232 
00233 /**********************************************************************/
00238 UNIV_INTERN
00239 void
00240 que_thr_end_wait(
00241 /*=============*/
00242   que_thr_t*  thr,    
00246   que_thr_t** next_thr) 
00252 {
00253   ibool was_active;
00254 
00255   ut_ad(mutex_own(&kernel_mutex));
00256   ut_ad(thr);
00257   ut_ad((thr->state == QUE_THR_LOCK_WAIT)
00258         || (thr->state == QUE_THR_PROCEDURE_WAIT)
00259         || (thr->state == QUE_THR_SIG_REPLY_WAIT));
00260   ut_ad(thr->run_node);
00261 
00262   thr->prev_node = thr->run_node;
00263 
00264   was_active = thr->is_active;
00265 
00266   que_thr_move_to_run_state(thr);
00267 
00268   if (was_active) {
00269 
00270     return;
00271   }
00272 
00273   if (next_thr && *next_thr == NULL) {
00274     *next_thr = thr;
00275   } else {
00276     ut_a(0);
00277     srv_que_task_enqueue_low(thr);
00278   }
00279 }
00280 
00281 /**********************************************************************/
00283 UNIV_INTERN
00284 void
00285 que_thr_end_wait_no_next_thr(
00286 /*=========================*/
00287   que_thr_t*  thr)  
00290 {
00291   ibool was_active;
00292 
00293   ut_a(thr->state == QUE_THR_LOCK_WAIT);  /* In MySQL this is the
00294             only possible state here */
00295   ut_ad(mutex_own(&kernel_mutex));
00296   ut_ad(thr);
00297   ut_ad((thr->state == QUE_THR_LOCK_WAIT)
00298         || (thr->state == QUE_THR_PROCEDURE_WAIT)
00299         || (thr->state == QUE_THR_SIG_REPLY_WAIT));
00300 
00301   was_active = thr->is_active;
00302 
00303   que_thr_move_to_run_state(thr);
00304 
00305   if (was_active) {
00306 
00307     return;
00308   }
00309 
00310   /* In MySQL we let the OS thread (not just the query thread) to wait
00311   for the lock to be released: */
00312 
00313   srv_release_mysql_thread_if_suspended(thr);
00314 
00315   /* srv_que_task_enqueue_low(thr); */
00316 }
00317 
00318 /**********************************************************************/
00320 UNIV_INLINE
00321 void
00322 que_thr_init_command(
00323 /*=================*/
00324   que_thr_t*  thr)  
00325 {
00326   thr->run_node = thr;
00327   thr->prev_node = thr->common.parent;
00328 
00329   que_thr_move_to_run_state(thr);
00330 }
00331 
00332 /**********************************************************************/
00340 UNIV_INTERN
00341 que_thr_t*
00342 que_fork_start_command(
00343 /*===================*/
00344   que_fork_t* fork) 
00345 {
00346   que_thr_t*  thr;
00347   que_thr_t*  suspended_thr = NULL;
00348   que_thr_t*  completed_thr = NULL;
00349 
00350   fork->state = QUE_FORK_ACTIVE;
00351 
00352   fork->last_sel_node = NULL;
00353 
00354   suspended_thr = NULL;
00355   completed_thr = NULL;
00356 
00357   /* Choose the query thread to run: usually there is just one thread,
00358   but in a parallelized select, which necessarily is non-scrollable,
00359   there may be several to choose from */
00360 
00361   /* First we try to find a query thread in the QUE_THR_COMMAND_WAIT
00362   state. Then we try to find a query thread in the QUE_THR_SUSPENDED
00363   state, finally we try to find a query thread in the QUE_THR_COMPLETED
00364   state */
00365 
00366   thr = UT_LIST_GET_FIRST(fork->thrs);
00367 
00368   /* We make a single pass over the thr list within which we note which
00369   threads are ready to run. */
00370   while (thr) {
00371     switch (thr->state) {
00372     case QUE_THR_COMMAND_WAIT:
00373 
00374       /* We have to send the initial message to query thread
00375       to start it */
00376 
00377       que_thr_init_command(thr);
00378 
00379       return(thr);
00380 
00381     case QUE_THR_SUSPENDED:
00382       /* In this case the execution of the thread was
00383       suspended: no initial message is needed because
00384       execution can continue from where it was left */
00385       if (!suspended_thr) {
00386         suspended_thr = thr;
00387       }
00388 
00389       break;
00390 
00391     case QUE_THR_COMPLETED:
00392       if (!completed_thr) {
00393         completed_thr = thr;
00394       }
00395 
00396       break;
00397 
00398     case QUE_THR_LOCK_WAIT:
00399       ut_error;
00400 
00401     }
00402 
00403     thr = UT_LIST_GET_NEXT(thrs, thr);
00404   }
00405 
00406   if (suspended_thr) {
00407 
00408     thr = suspended_thr;
00409     que_thr_move_to_run_state(thr);
00410 
00411   } else if (completed_thr) {
00412 
00413     thr = completed_thr;
00414     que_thr_init_command(thr);
00415   }
00416 
00417   return(thr);
00418 }
00419 
00420 /**********************************************************************/
00424 UNIV_INTERN
00425 void
00426 que_fork_error_handle(
00427 /*==================*/
00428   trx_t*  /*trx __attribute__((unused))*/,  
00429   que_t*  fork) 
00431 {
00432   que_thr_t*  thr;
00433 
00434   ut_ad(mutex_own(&kernel_mutex));
00435   ut_ad(trx->sess->state == SESS_ERROR);
00436   ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
00437   ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
00438 
00439   thr = UT_LIST_GET_FIRST(fork->thrs);
00440 
00441   while (thr != NULL) {
00442     ut_ad(!thr->is_active);
00443     ut_ad(thr->state != QUE_THR_SIG_REPLY_WAIT);
00444     ut_ad(thr->state != QUE_THR_LOCK_WAIT);
00445 
00446     thr->run_node = thr;
00447     thr->prev_node = thr->child;
00448     thr->state = QUE_THR_COMPLETED;
00449 
00450     thr = UT_LIST_GET_NEXT(thrs, thr);
00451   }
00452 
00453   thr = UT_LIST_GET_FIRST(fork->thrs);
00454 
00455   que_thr_move_to_run_state(thr);
00456 
00457   ut_a(0);
00458   srv_que_task_enqueue_low(thr);
00459 }
00460 
00461 /****************************************************************/
00465 UNIV_INLINE
00466 ibool
00467 que_fork_all_thrs_in_state(
00468 /*=======================*/
00469   que_fork_t* fork, 
00470   ulint   state)  
00471 {
00472   que_thr_t*  thr_node;
00473 
00474   thr_node = UT_LIST_GET_FIRST(fork->thrs);
00475 
00476   while (thr_node != NULL) {
00477     if (thr_node->state != state) {
00478 
00479       return(FALSE);
00480     }
00481 
00482     thr_node = UT_LIST_GET_NEXT(thrs, thr_node);
00483   }
00484 
00485   return(TRUE);
00486 }
00487 
00488 /**********************************************************************/
00490 static
00491 void
00492 que_graph_free_stat_list(
00493 /*=====================*/
00494   que_node_t* node) 
00495 {
00496   while (node) {
00497     que_graph_free_recursive(node);
00498 
00499     node = que_node_get_next(node);
00500   }
00501 }
00502 
00503 /**********************************************************************/
00506 UNIV_INTERN
00507 void
00508 que_graph_free_recursive(
00509 /*=====================*/
00510   que_node_t* node) 
00511 {
00512   que_fork_t* fork;
00513   que_thr_t*  thr;
00514   undo_node_t*  undo;
00515   sel_node_t* sel;
00516   ins_node_t* ins;
00517   upd_node_t* upd;
00518   tab_node_t* cre_tab;
00519   ind_node_t* cre_ind;
00520   purge_node_t* purge;
00521 
00522   if (node == NULL) {
00523 
00524     return;
00525   }
00526 
00527   switch (que_node_get_type(node)) {
00528 
00529   case QUE_NODE_FORK:
00530     fork = static_cast<que_fork_t *>(node);
00531 
00532     thr = UT_LIST_GET_FIRST(fork->thrs);
00533 
00534     while (thr) {
00535       que_graph_free_recursive(thr);
00536 
00537       thr = UT_LIST_GET_NEXT(thrs, thr);
00538     }
00539 
00540     break;
00541   case QUE_NODE_THR:
00542 
00543     thr = static_cast<que_thr_t *>(node);
00544 
00545     if (thr->magic_n != QUE_THR_MAGIC_N) {
00546       fprintf(stderr,
00547         "que_thr struct appears corrupt;"
00548         " magic n %lu\n",
00549         (unsigned long) thr->magic_n);
00550       mem_analyze_corruption(thr);
00551       ut_error;
00552     }
00553 
00554     thr->magic_n = QUE_THR_MAGIC_FREED;
00555 
00556     que_graph_free_recursive(thr->child);
00557 
00558     break;
00559   case QUE_NODE_UNDO:
00560 
00561     undo = static_cast<undo_node_t *>(node);
00562 
00563     mem_heap_free(undo->heap);
00564 
00565     break;
00566   case QUE_NODE_SELECT:
00567 
00568     sel = static_cast<sel_node_t *>(node);
00569 
00570     sel_node_free_private(sel);
00571 
00572     break;
00573   case QUE_NODE_INSERT:
00574 
00575     ins = static_cast<ins_node_t *>(node);
00576 
00577     que_graph_free_recursive(ins->select);
00578 
00579     mem_heap_free(ins->entry_sys_heap);
00580 
00581     break;
00582   case QUE_NODE_PURGE:
00583     purge = static_cast<purge_node_t *>(node);
00584 
00585     mem_heap_free(purge->heap);
00586 
00587     break;
00588 
00589   case QUE_NODE_UPDATE:
00590 
00591     upd = static_cast<upd_node_t *>(node);
00592 
00593     if (upd->in_mysql_interface) {
00594 
00595       btr_pcur_free_for_mysql(upd->pcur);
00596     }
00597 
00598     que_graph_free_recursive(upd->cascade_node);
00599 
00600     if (upd->cascade_heap) {
00601       mem_heap_free(upd->cascade_heap);
00602     }
00603 
00604     que_graph_free_recursive(upd->select);
00605 
00606     mem_heap_free(upd->heap);
00607 
00608     break;
00609   case QUE_NODE_CREATE_TABLE:
00610     cre_tab = static_cast<tab_node_t *>(node);
00611 
00612     que_graph_free_recursive(cre_tab->tab_def);
00613     que_graph_free_recursive(cre_tab->col_def);
00614     que_graph_free_recursive(cre_tab->commit_node);
00615 
00616     mem_heap_free(cre_tab->heap);
00617 
00618     break;
00619   case QUE_NODE_CREATE_INDEX:
00620     cre_ind = static_cast<ind_node_t *>(node);
00621 
00622     que_graph_free_recursive(cre_ind->ind_def);
00623     que_graph_free_recursive(cre_ind->field_def);
00624     que_graph_free_recursive(cre_ind->commit_node);
00625 
00626     mem_heap_free(cre_ind->heap);
00627 
00628     break;
00629   case QUE_NODE_PROC:
00630     que_graph_free_stat_list(((proc_node_t*)node)->stat_list);
00631 
00632     break;
00633   case QUE_NODE_IF:
00634     que_graph_free_stat_list(((if_node_t*)node)->stat_list);
00635     que_graph_free_stat_list(((if_node_t*)node)->else_part);
00636     que_graph_free_stat_list(((if_node_t*)node)->elsif_list);
00637 
00638     break;
00639   case QUE_NODE_ELSIF:
00640     que_graph_free_stat_list(((elsif_node_t*)node)->stat_list);
00641 
00642     break;
00643   case QUE_NODE_WHILE:
00644     que_graph_free_stat_list(((while_node_t*)node)->stat_list);
00645 
00646     break;
00647   case QUE_NODE_FOR:
00648     que_graph_free_stat_list(((for_node_t*)node)->stat_list);
00649 
00650     break;
00651 
00652   case QUE_NODE_ASSIGNMENT:
00653   case QUE_NODE_EXIT:
00654   case QUE_NODE_RETURN:
00655   case QUE_NODE_COMMIT:
00656   case QUE_NODE_ROLLBACK:
00657   case QUE_NODE_LOCK:
00658   case QUE_NODE_FUNC:
00659   case QUE_NODE_ORDER:
00660   case QUE_NODE_ROW_PRINTF:
00661   case QUE_NODE_OPEN:
00662   case QUE_NODE_FETCH:
00663     /* No need to do anything */
00664 
00665     break;
00666   default:
00667     fprintf(stderr,
00668       "que_node struct appears corrupt; type %lu\n",
00669       (unsigned long) que_node_get_type(node));
00670     mem_analyze_corruption(node);
00671     ut_error;
00672   }
00673 }
00674 
00675 /**********************************************************************/
00677 UNIV_INTERN
00678 void
00679 que_graph_free(
00680 /*===========*/
00681   que_t*  graph)  
00686 {
00687   ut_ad(graph);
00688 
00689   if (graph->sym_tab) {
00690     /* The following call frees dynamic memory allocated
00691     for variables etc. during execution. Frees also explicit
00692     cursor definitions. */
00693 
00694     sym_tab_free_private(graph->sym_tab);
00695   }
00696 
00697   if (graph->info && graph->info->graph_owns_us) {
00698     pars_info_free(graph->info);
00699   }
00700 
00701   que_graph_free_recursive(graph);
00702 
00703   mem_heap_free(graph->heap);
00704 }
00705 
00706 /****************************************************************/
00709 static
00710 que_thr_t*
00711 que_thr_node_step(
00712 /*==============*/
00713   que_thr_t*  thr)  
00715 {
00716   ut_ad(thr->run_node == thr);
00717 
00718   if (thr->prev_node == thr->common.parent) {
00719     /* If control to the node came from above, it is just passed
00720     on */
00721 
00722     thr->run_node = thr->child;
00723 
00724     return(thr);
00725   }
00726 
00727   mutex_enter(&kernel_mutex);
00728 
00729   if (que_thr_peek_stop(thr)) {
00730 
00731     mutex_exit(&kernel_mutex);
00732 
00733     return(thr);
00734   }
00735 
00736   /* Thread execution completed */
00737 
00738   thr->state = QUE_THR_COMPLETED;
00739 
00740   mutex_exit(&kernel_mutex);
00741 
00742   return(NULL);
00743 }
00744 
00745 /**********************************************************************/
00751 static
00752 void
00753 que_thr_move_to_run_state(
00754 /*======================*/
00755   que_thr_t*  thr)  
00756 {
00757   trx_t*  trx;
00758 
00759   ut_ad(thr->state != QUE_THR_RUNNING);
00760 
00761   trx = thr_get_trx(thr);
00762 
00763   if (!thr->is_active) {
00764 
00765     (thr->graph)->n_active_thrs++;
00766 
00767     trx->n_active_thrs++;
00768 
00769     thr->is_active = TRUE;
00770 
00771     ut_ad((thr->graph)->n_active_thrs == 1);
00772     ut_ad(trx->n_active_thrs == 1);
00773   }
00774 
00775   thr->state = QUE_THR_RUNNING;
00776 }
00777 
00778 /**********************************************************************/
00786 static
00787 void
00788 que_thr_dec_refer_count(
00789 /*====================*/
00790   que_thr_t*  thr,    
00791   que_thr_t** next_thr) 
00796 {
00797   que_fork_t* fork;
00798   trx_t*    trx;
00799   ulint   fork_type;
00800   ibool   stopped;
00801 
00802   fork = static_cast<que_fork_t *>(thr->common.parent);
00803   trx = thr_get_trx(thr);
00804 
00805   mutex_enter(&kernel_mutex);
00806 
00807   ut_a(thr->is_active);
00808 
00809   if (thr->state == QUE_THR_RUNNING) {
00810 
00811     stopped = que_thr_stop(thr);
00812 
00813     if (!stopped) {
00814       /* The reason for the thr suspension or wait was
00815       already canceled before we came here: continue
00816       running the thread */
00817 
00818       /* fputs("!!!!!!!! Wait already ended: continue thr\n",
00819       stderr); */
00820 
00821       if (next_thr && *next_thr == NULL) {
00822         /* Normally srv_suspend_mysql_thread resets
00823         the state to DB_SUCCESS before waiting, but
00824         in this case we have to do it here,
00825         otherwise nobody does it. */
00826         trx->error_state = DB_SUCCESS;
00827 
00828         *next_thr = thr;
00829       } else {
00830         ut_error;
00831         srv_que_task_enqueue_low(thr);
00832       }
00833 
00834       mutex_exit(&kernel_mutex);
00835 
00836       return;
00837     }
00838   }
00839 
00840   ut_ad(fork->n_active_thrs == 1);
00841   ut_ad(trx->n_active_thrs == 1);
00842 
00843   fork->n_active_thrs--;
00844   trx->n_active_thrs--;
00845 
00846   thr->is_active = FALSE;
00847 
00848   if (trx->n_active_thrs > 0) {
00849 
00850     mutex_exit(&kernel_mutex);
00851 
00852     return;
00853   }
00854 
00855   fork_type = fork->fork_type;
00856 
00857   /* Check if all query threads in the same fork are completed */
00858 
00859   if (que_fork_all_thrs_in_state(fork, QUE_THR_COMPLETED)) {
00860 
00861     switch (fork_type) {
00862     case QUE_FORK_ROLLBACK:
00863       /* This is really the undo graph used in rollback,
00864       no roll_node in this graph */
00865 
00866       ut_ad(UT_LIST_GET_LEN(trx->signals) > 0);
00867       ut_ad(trx->handling_signals == TRUE);
00868 
00869       trx_finish_rollback_off_kernel(fork, trx, next_thr);
00870       break;
00871 
00872     case QUE_FORK_PURGE:
00873     case QUE_FORK_RECOVERY:
00874     case QUE_FORK_MYSQL_INTERFACE:
00875 
00876       /* Do nothing */
00877       break;
00878 
00879     default:
00880       ut_error; 
00881     }
00882   }
00883 
00884   if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) {
00885 
00886     /* If the trx is signaled and its query thread count drops to
00887     zero, then we start processing a signal; from it we may get
00888     a new query thread to run */
00889 
00890     trx_sig_start_handle(trx, next_thr);
00891   }
00892 
00893   if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) {
00894 
00895     trx_end_signal_handling(trx);
00896   }
00897 
00898   mutex_exit(&kernel_mutex);
00899 }
00900 
00901 /**********************************************************************/
00906 UNIV_INTERN
00907 ibool
00908 que_thr_stop(
00909 /*=========*/
00910   que_thr_t*  thr)  
00911 {
00912   trx_t*  trx;
00913   que_t*  graph;
00914   ibool ret = TRUE;
00915 
00916   ut_ad(mutex_own(&kernel_mutex));
00917 
00918   graph = thr->graph;
00919   trx = graph->trx;
00920 
00921   if (graph->state == QUE_FORK_COMMAND_WAIT) {
00922     thr->state = QUE_THR_SUSPENDED;
00923 
00924   } else if (trx->que_state == TRX_QUE_LOCK_WAIT) {
00925 
00926     UT_LIST_ADD_FIRST(trx_thrs, trx->wait_thrs, thr);
00927     thr->state = QUE_THR_LOCK_WAIT;
00928 
00929   } else if (trx->error_state != DB_SUCCESS
00930        && trx->error_state != DB_LOCK_WAIT) {
00931 
00932     /* Error handling built for the MySQL interface */
00933     thr->state = QUE_THR_COMPLETED;
00934 
00935   } else if (UT_LIST_GET_LEN(trx->signals) > 0
00936        && graph->fork_type != QUE_FORK_ROLLBACK) {
00937 
00938     thr->state = QUE_THR_SUSPENDED;
00939   } else {
00940     ut_ad(graph->state == QUE_FORK_ACTIVE);
00941 
00942     ret = FALSE;
00943   }
00944 
00945   return(ret);
00946 }
00947 
00948 /**********************************************************************/
00953 UNIV_INTERN
00954 void
00955 que_thr_stop_for_mysql(
00956 /*===================*/
00957   que_thr_t*  thr)  
00958 {
00959   trx_t*  trx;
00960 
00961   trx = thr_get_trx(thr);
00962 
00963   mutex_enter(&kernel_mutex);
00964 
00965   if (thr->state == QUE_THR_RUNNING) {
00966 
00967     if (trx->error_state != DB_SUCCESS
00968         && trx->error_state != DB_LOCK_WAIT) {
00969 
00970       /* Error handling built for the MySQL interface */
00971       thr->state = QUE_THR_COMPLETED;
00972     } else {
00973       /* It must have been a lock wait but the lock was
00974       already released, or this transaction was chosen
00975       as a victim in selective deadlock resolution */
00976 
00977       mutex_exit(&kernel_mutex);
00978 
00979       return;
00980     }
00981   }
00982 
00983   ut_ad(thr->is_active == TRUE);
00984   ut_ad(trx->n_active_thrs == 1);
00985   ut_ad(thr->graph->n_active_thrs == 1);
00986 
00987   thr->is_active = FALSE;
00988   (thr->graph)->n_active_thrs--;
00989 
00990   trx->n_active_thrs--;
00991 
00992   mutex_exit(&kernel_mutex);
00993 }
00994 
00995 /**********************************************************************/
00999 UNIV_INTERN
01000 void
01001 que_thr_move_to_run_state_for_mysql(
01002 /*================================*/
01003   que_thr_t*  thr,  
01004   trx_t*    trx)  
01005 {
01006   if (thr->magic_n != QUE_THR_MAGIC_N) {
01007     fprintf(stderr,
01008       "que_thr struct appears corrupt; magic n %lu\n",
01009       (unsigned long) thr->magic_n);
01010 
01011     mem_analyze_corruption(thr);
01012 
01013     ut_error;
01014   }
01015 
01016   if (!thr->is_active) {
01017 
01018     thr->graph->n_active_thrs++;
01019 
01020     trx->n_active_thrs++;
01021 
01022     thr->is_active = TRUE;
01023   }
01024 
01025   thr->state = QUE_THR_RUNNING;
01026 }
01027 
01028 /**********************************************************************/
01031 UNIV_INTERN
01032 void
01033 que_thr_stop_for_mysql_no_error(
01034 /*============================*/
01035   que_thr_t*  thr,  
01036   trx_t*    trx)  
01037 {
01038   ut_ad(thr->state == QUE_THR_RUNNING);
01039   ut_ad(thr->is_active == TRUE);
01040   ut_ad(trx->n_active_thrs == 1);
01041   ut_ad(thr->graph->n_active_thrs == 1);
01042 
01043   if (thr->magic_n != QUE_THR_MAGIC_N) {
01044     fprintf(stderr,
01045       "que_thr struct appears corrupt; magic n %lu\n",
01046       (unsigned long) thr->magic_n);
01047 
01048     mem_analyze_corruption(thr);
01049 
01050     ut_error;
01051   }
01052 
01053   thr->state = QUE_THR_COMPLETED;
01054 
01055   thr->is_active = FALSE;
01056   (thr->graph)->n_active_thrs--;
01057 
01058   trx->n_active_thrs--;
01059 }
01060 
01061 /****************************************************************/
01065 UNIV_INTERN
01066 que_node_t*
01067 que_node_get_containing_loop_node(
01068 /*==============================*/
01069   que_node_t* node) 
01070 {
01071   ut_ad(node);
01072 
01073   for (;;) {
01074     ulint type;
01075 
01076     node = que_node_get_parent(node);
01077 
01078     if (!node) {
01079       break;
01080     }
01081 
01082     type = que_node_get_type(node);
01083 
01084     if ((type == QUE_NODE_FOR) || (type == QUE_NODE_WHILE)) {
01085       break;
01086     }
01087   }
01088 
01089   return(node);
01090 }
01091 
01092 /**********************************************************************/
01094 UNIV_INTERN
01095 void
01096 que_node_print_info(
01097 /*================*/
01098   que_node_t* node) 
01099 {
01100   ulint   type;
01101   const char* str;
01102 
01103   type = que_node_get_type(node);
01104 
01105   if (type == QUE_NODE_SELECT) {
01106     str = "SELECT";
01107   } else if (type == QUE_NODE_INSERT) {
01108     str = "INSERT";
01109   } else if (type == QUE_NODE_UPDATE) {
01110     str = "UPDATE";
01111   } else if (type == QUE_NODE_WHILE) {
01112     str = "WHILE";
01113   } else if (type == QUE_NODE_ASSIGNMENT) {
01114     str = "ASSIGNMENT";
01115   } else if (type == QUE_NODE_IF) {
01116     str = "IF";
01117   } else if (type == QUE_NODE_FETCH) {
01118     str = "FETCH";
01119   } else if (type == QUE_NODE_OPEN) {
01120     str = "OPEN";
01121   } else if (type == QUE_NODE_PROC) {
01122     str = "STORED PROCEDURE";
01123   } else if (type == QUE_NODE_FUNC) {
01124     str = "FUNCTION";
01125   } else if (type == QUE_NODE_LOCK) {
01126     str = "LOCK";
01127   } else if (type == QUE_NODE_THR) {
01128     str = "QUERY THREAD";
01129   } else if (type == QUE_NODE_COMMIT) {
01130     str = "COMMIT";
01131   } else if (type == QUE_NODE_UNDO) {
01132     str = "UNDO ROW";
01133   } else if (type == QUE_NODE_PURGE) {
01134     str = "PURGE ROW";
01135   } else if (type == QUE_NODE_ROLLBACK) {
01136     str = "ROLLBACK";
01137   } else if (type == QUE_NODE_CREATE_TABLE) {
01138     str = "CREATE TABLE";
01139   } else if (type == QUE_NODE_CREATE_INDEX) {
01140     str = "CREATE INDEX";
01141   } else if (type == QUE_NODE_FOR) {
01142     str = "FOR LOOP";
01143   } else if (type == QUE_NODE_RETURN) {
01144     str = "RETURN";
01145   } else if (type == QUE_NODE_EXIT) {
01146     str = "EXIT";
01147   } else {
01148     str = "UNKNOWN NODE TYPE";
01149   }
01150 
01151   fprintf(stderr, "Node type %lu: %s, address %p\n",
01152     (ulong) type, str, (void*) node);
01153 }
01154 
01155 /**********************************************************************/
01159 UNIV_INLINE
01160 que_thr_t*
01161 que_thr_step(
01162 /*=========*/
01163   que_thr_t*  thr)  
01164 {
01165   que_node_t* node;
01166   que_thr_t*  old_thr;
01167   trx_t*    trx;
01168   ulint   type;
01169 
01170   trx = thr_get_trx(thr);
01171 
01172   ut_ad(thr->state == QUE_THR_RUNNING);
01173   ut_a(trx->error_state == DB_SUCCESS);
01174 
01175   thr->resource++;
01176 
01177   node = thr->run_node;
01178   type = que_node_get_type(node);
01179 
01180   old_thr = thr;
01181 
01182 #ifdef UNIV_DEBUG
01183   if (que_trace_on) {
01184     fputs("To execute: ", stderr);
01185     que_node_print_info(node);
01186   }
01187 #endif
01188   if (type & QUE_NODE_CONTROL_STAT) {
01189     if ((thr->prev_node != que_node_get_parent(node))
01190         && que_node_get_next(thr->prev_node)) {
01191 
01192       /* The control statements, like WHILE, always pass the
01193       control to the next child statement if there is any
01194       child left */
01195 
01196       thr->run_node = que_node_get_next(thr->prev_node);
01197 
01198     } else if (type == QUE_NODE_IF) {
01199       if_step(thr);
01200     } else if (type == QUE_NODE_FOR) {
01201       for_step(thr);
01202     } else if (type == QUE_NODE_PROC) {
01203 
01204       /* We can access trx->undo_no without reserving
01205       trx->undo_mutex, because there cannot be active query
01206       threads doing updating or inserting at the moment! */
01207 
01208       if (thr->prev_node == que_node_get_parent(node)) {
01209         trx->last_sql_stat_start.least_undo_no
01210           = trx->undo_no;
01211       }
01212 
01213       proc_step(thr);
01214     } else if (type == QUE_NODE_WHILE) {
01215       while_step(thr);
01216     } else {
01217       ut_error;
01218     }
01219   } else if (type == QUE_NODE_ASSIGNMENT) {
01220     assign_step(thr);
01221   } else if (type == QUE_NODE_SELECT) {
01222     thr = row_sel_step(thr);
01223   } else if (type == QUE_NODE_INSERT) {
01224     thr = row_ins_step(thr);
01225   } else if (type == QUE_NODE_UPDATE) {
01226     thr = row_upd_step(thr);
01227   } else if (type == QUE_NODE_FETCH) {
01228     thr = fetch_step(thr);
01229   } else if (type == QUE_NODE_OPEN) {
01230     thr = open_step(thr);
01231   } else if (type == QUE_NODE_FUNC) {
01232     proc_eval_step(thr);
01233 
01234   } else if (type == QUE_NODE_LOCK) {
01235 
01236     ut_error;
01237     /*
01238     thr = que_lock_step(thr);
01239     */
01240   } else if (type == QUE_NODE_THR) {
01241     thr = que_thr_node_step(thr);
01242   } else if (type == QUE_NODE_COMMIT) {
01243     thr = trx_commit_step(thr);
01244   } else if (type == QUE_NODE_UNDO) {
01245     thr = row_undo_step(thr);
01246   } else if (type == QUE_NODE_PURGE) {
01247     thr = row_purge_step(thr);
01248   } else if (type == QUE_NODE_RETURN) {
01249     thr = return_step(thr);
01250   } else if (type == QUE_NODE_EXIT) {
01251     thr = exit_step(thr);
01252   } else if (type == QUE_NODE_ROLLBACK) {
01253     thr = trx_rollback_step(thr);
01254   } else if (type == QUE_NODE_CREATE_TABLE) {
01255     thr = dict_create_table_step(thr);
01256   } else if (type == QUE_NODE_CREATE_INDEX) {
01257     thr = dict_create_index_step(thr);
01258   } else if (type == QUE_NODE_ROW_PRINTF) {
01259     thr = row_printf_step(thr);
01260   } else {
01261     ut_error;
01262   }
01263 
01264   if (type == QUE_NODE_EXIT) {
01265     old_thr->prev_node = que_node_get_containing_loop_node(node);
01266   } else {
01267     old_thr->prev_node = node;
01268   }
01269 
01270   if (thr) {
01271     ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
01272   }
01273 
01274   return(thr);
01275 }
01276 
01277 /**********************************************************************/
01279 static
01280 void
01281 que_run_threads_low(
01282 /*================*/
01283   que_thr_t*  thr)  
01284 {
01285   que_thr_t*  next_thr;
01286   ulint   loop_count;
01287 
01288   ut_ad(thr->state == QUE_THR_RUNNING);
01289   ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
01290   ut_ad(!mutex_own(&kernel_mutex));
01291 
01292   loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
01293 loop:
01294   /* Check that there is enough space in the log to accommodate
01295   possible log entries by this query step; if the operation can touch
01296   more than about 4 pages, checks must be made also within the query
01297   step! */
01298 
01299   log_free_check();
01300 
01301   /* Perform the actual query step: note that the query thread
01302   may change if, e.g., a subprocedure call is made */
01303 
01304   /*-------------------------*/
01305   next_thr = que_thr_step(thr);
01306   /*-------------------------*/
01307 
01308   ut_a(!next_thr || (thr_get_trx(next_thr)->error_state == DB_SUCCESS));
01309 
01310   loop_count++;
01311 
01312   if (next_thr != thr) {
01313     ut_a(next_thr == NULL);
01314 
01315     /* This can change next_thr to a non-NULL value if there was
01316     a lock wait that already completed. */
01317     que_thr_dec_refer_count(thr, &next_thr);
01318 
01319     if (next_thr == NULL) {
01320 
01321       return;
01322     }
01323 
01324     loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
01325 
01326     thr = next_thr;
01327   }
01328 
01329   goto loop;
01330 }
01331 
01332 /**********************************************************************/
01334 UNIV_INTERN
01335 void
01336 que_run_threads(
01337 /*============*/
01338   que_thr_t*  thr)  
01339 {
01340 loop:
01341   ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
01342   que_run_threads_low(thr);
01343 
01344   mutex_enter(&kernel_mutex);
01345 
01346   switch (thr->state) {
01347 
01348   case QUE_THR_RUNNING:
01349     /* There probably was a lock wait, but it already ended
01350     before we came here: continue running thr */
01351 
01352     mutex_exit(&kernel_mutex);
01353 
01354     goto loop;
01355 
01356   case QUE_THR_LOCK_WAIT:
01357     mutex_exit(&kernel_mutex);
01358 
01359     /* The ..._mysql_... function works also for InnoDB's
01360     internal threads. Let us wait that the lock wait ends. */
01361 
01362     srv_suspend_mysql_thread(thr);
01363 
01364     if (thr_get_trx(thr)->error_state != DB_SUCCESS) {
01365       /* thr was chosen as a deadlock victim or there was
01366       a lock wait timeout */
01367 
01368       que_thr_dec_refer_count(thr, NULL);
01369 
01370       return;
01371     }
01372 
01373     goto loop;
01374 
01375   case QUE_THR_COMPLETED:
01376   case QUE_THR_COMMAND_WAIT:
01377     /* Do nothing */
01378     break;
01379 
01380   default:
01381     ut_error;
01382   }
01383 
01384   mutex_exit(&kernel_mutex);
01385 }
01386 
01387 /*********************************************************************/
01390 UNIV_INTERN
01391 ulint
01392 que_eval_sql(
01393 /*=========*/
01394   pars_info_t*  info, 
01395   const char* sql,  
01396   ibool   reserve_dict_mutex,
01399   trx_t*    trx)  
01400 {
01401   que_thr_t*  thr;
01402   que_t*    graph;
01403 
01404   ut_a(trx->error_state == DB_SUCCESS);
01405 
01406   if (reserve_dict_mutex) {
01407     mutex_enter(&dict_sys->mutex);
01408   }
01409 
01410   graph = pars_sql(info, sql);
01411 
01412   if (reserve_dict_mutex) {
01413     mutex_exit(&dict_sys->mutex);
01414   }
01415 
01416   ut_a(graph);
01417 
01418   graph->trx = trx;
01419   trx->graph = NULL;
01420 
01421   graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
01422 
01423   ut_a(thr = que_fork_start_command(graph));
01424 
01425   que_run_threads(thr);
01426 
01427   que_graph_free(graph);
01428 
01429   return(trx->error_state);
01430 }