Drizzled Public API Documentation

que0que.cc
1 /*****************************************************************************
2 
3 Copyright (C) 1996, 2009, Innobase Oy. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify it under
6 the terms of the GNU General Public License as published by the Free Software
7 Foundation; version 2 of the License.
8 
9 This program is distributed in the hope that it will be useful, but WITHOUT
10 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along with
14 this program; if not, write to the Free Software Foundation, Inc., 51 Franklin
15 St, Fifth Floor, Boston, MA 02110-1301 USA
16 
17 *****************************************************************************/
18 
19 /**************************************************/
26 #include "que0que.h"
27 
28 #ifdef UNIV_NONINL
29 #include "que0que.ic"
30 #endif
31 
32 #include "usr0sess.h"
33 #include "trx0trx.h"
34 #include "trx0roll.h"
35 #include "row0undo.h"
36 #include "row0ins.h"
37 #include "row0upd.h"
38 #include "row0sel.h"
39 #include "row0purge.h"
40 #include "dict0crea.h"
41 #include "log0log.h"
42 #include "eval0proc.h"
43 #include "eval0eval.h"
44 #include "pars0types.h"
45 
46 #define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256)
47 #define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256)
48 #define QUE_MAX_LOOPS_WITHOUT_CHECK 16
49 
50 #ifdef UNIV_DEBUG
51 /* If the following flag is set TRUE, the module will print trace info
52 of SQL execution in the UNIV_SQL_DEBUG version */
53 UNIV_INTERN ibool que_trace_on = FALSE;
54 #endif /* UNIV_DEBUG */
55 
56 /* Short introduction to query graphs
57  ==================================
58 
59 A query graph consists of nodes linked to each other in various ways. The
60 execution starts at que_run_threads() which takes a que_thr_t parameter.
61 que_thr_t contains two fields that control query graph execution: run_node
62 and prev_node. run_node is the next node to execute and prev_node is the
63 last node executed.
64 
65 Each node has a pointer to a 'next' statement, i.e., its brother, and a
66 pointer to its parent node. The next pointer is NULL in the last statement
67 of a block.
68 
69 Loop nodes contain a link to the first statement of the enclosed statement
70 list. While the loop runs, que_thr_step() checks if execution to the loop
71 node came from its parent or from one of the statement nodes in the loop. If
72 it came from the parent of the loop node it starts executing the first
73 statement node in the loop. If it came from one of the statement nodes in
74 the loop, then it checks if the statement node has another statement node
75 following it, and runs it if so.
76 
77 To signify loop ending, the loop statements (see e.g. while_step()) set
78 que_thr_t->run_node to the loop node's parent node. This is noticed on the
79 next call of que_thr_step() and execution proceeds to the node pointed to by
80 the loop node's 'next' pointer.
81 
82 For example, the code:
83 
84 X := 1;
85 WHILE X < 5 LOOP
86  X := X + 1;
87  X := X + 1;
88 X := 5
89 
90 will result in the following node hierarchy, with the X-axis indicating
91 'next' links and the Y-axis indicating parent/child links:
92 
93 A - W - A
94  |
95  |
96  A - A
97 
98 A = assign_node_t, W = while_node_t. */
99 
100 /* How a stored procedure containing COMMIT or ROLLBACK commands
101 is executed?
102 
103 The commit or rollback can be seen as a subprocedure call.
104 The problem is that if there are several query threads
105 currently running within the transaction, their action could
106 mess the commit or rollback operation. Or, at the least, the
107 operation would be difficult to visualize and keep in control.
108 
109 Therefore the query thread requesting a commit or a rollback
110 sends to the transaction a signal, which moves the transaction
111 to TRX_QUE_SIGNALED state. All running query threads of the
112 transaction will eventually notice that the transaction is now in
113 this state and voluntarily suspend themselves. Only the last
114 query thread which suspends itself will trigger handling of
115 the signal.
116 
117 When the transaction starts to handle a rollback or commit
118 signal, it builds a query graph which, when executed, will
119 roll back or commit the incomplete transaction. The transaction
120 is moved to the TRX_QUE_ROLLING_BACK or TRX_QUE_COMMITTING state.
121 If specified, the SQL cursors opened by the transaction are closed.
122 When the execution of the graph completes, it is like returning
123 from a subprocedure: the query thread which requested the operation
124 starts running again. */
125 
126 /**********************************************************************/
131 static
132 void
133 que_thr_move_to_run_state(
134 /*======================*/
135  que_thr_t* thr);
137 /***********************************************************************/
139 UNIV_INTERN
140 void
142 /*==============*/
143  que_t* graph,
144  sess_t* sess)
145 {
146  ut_ad(mutex_own(&kernel_mutex));
147 
148  UT_LIST_ADD_LAST(graphs, sess->graphs, graph);
149 }
150 
151 /***********************************************************************/
154 UNIV_INTERN
155 que_fork_t*
157 /*============*/
158  que_t* graph,
161  que_node_t* parent,
162  ulint fork_type,
163  mem_heap_t* heap)
164 {
165  que_fork_t* fork;
166 
167  ut_ad(heap);
168 
169  fork = static_cast<que_fork_t *>(mem_heap_alloc(heap, sizeof(que_fork_t)));
170 
171  fork->common.type = QUE_NODE_FORK;
172  fork->n_active_thrs = 0;
173 
174  fork->state = QUE_FORK_COMMAND_WAIT;
175 
176  if (graph != NULL) {
177  fork->graph = graph;
178  } else {
179  fork->graph = fork;
180  }
181 
182  fork->common.parent = parent;
183  fork->fork_type = fork_type;
184 
185  fork->caller = NULL;
186 
187  UT_LIST_INIT(fork->thrs);
188 
189  fork->sym_tab = NULL;
190  fork->info = NULL;
191 
192  fork->heap = heap;
193 
194  return(fork);
195 }
196 
197 /***********************************************************************/
200 UNIV_INTERN
201 que_thr_t*
203 /*===========*/
204  que_fork_t* parent,
205  mem_heap_t* heap)
206 {
207  que_thr_t* thr;
208 
209  ut_ad(parent && heap);
210 
211  thr = static_cast<que_thr_t *>(mem_heap_alloc(heap, sizeof(que_thr_t)));
212 
213  thr->common.type = QUE_NODE_THR;
214  thr->common.parent = parent;
215 
216  thr->magic_n = QUE_THR_MAGIC_N;
217 
218  thr->graph = parent->graph;
219 
220  thr->state = QUE_THR_COMMAND_WAIT;
221 
222  thr->is_active = FALSE;
223 
224  thr->run_node = NULL;
225  thr->resource = 0;
226  thr->lock_state = QUE_THR_LOCK_NOLOCK;
227 
228  UT_LIST_ADD_LAST(thrs, parent->thrs, thr);
229 
230  return(thr);
231 }
232 
233 /**********************************************************************/
238 UNIV_INTERN
239 void
241 /*=============*/
242  que_thr_t* thr,
246  que_thr_t** next_thr)
252 {
253  ibool was_active;
254 
255  ut_ad(mutex_own(&kernel_mutex));
256  ut_ad(thr);
257  ut_ad((thr->state == QUE_THR_LOCK_WAIT)
258  || (thr->state == QUE_THR_PROCEDURE_WAIT)
259  || (thr->state == QUE_THR_SIG_REPLY_WAIT));
260  ut_ad(thr->run_node);
261 
262  thr->prev_node = thr->run_node;
263 
264  was_active = thr->is_active;
265 
266  que_thr_move_to_run_state(thr);
267 
268  if (was_active) {
269 
270  return;
271  }
272 
273  if (next_thr && *next_thr == NULL) {
274  *next_thr = thr;
275  } else {
276  ut_a(0);
278  }
279 }
280 
281 /**********************************************************************/
283 UNIV_INTERN
284 void
286 /*=========================*/
287  que_thr_t* thr)
290 {
291  ibool was_active;
292 
293  ut_a(thr->state == QUE_THR_LOCK_WAIT); /* In MySQL this is the
294  only possible state here */
295  ut_ad(mutex_own(&kernel_mutex));
296  ut_ad(thr);
297  ut_ad((thr->state == QUE_THR_LOCK_WAIT)
298  || (thr->state == QUE_THR_PROCEDURE_WAIT)
299  || (thr->state == QUE_THR_SIG_REPLY_WAIT));
300 
301  was_active = thr->is_active;
302 
303  que_thr_move_to_run_state(thr);
304 
305  if (was_active) {
306 
307  return;
308  }
309 
310  /* In MySQL we let the OS thread (not just the query thread) to wait
311  for the lock to be released: */
312 
314 
315  /* srv_que_task_enqueue_low(thr); */
316 }
317 
318 /**********************************************************************/
320 UNIV_INLINE
321 void
322 que_thr_init_command(
323 /*=================*/
324  que_thr_t* thr)
325 {
326  thr->run_node = thr;
327  thr->prev_node = thr->common.parent;
328 
329  que_thr_move_to_run_state(thr);
330 }
331 
332 /**********************************************************************/
340 UNIV_INTERN
341 que_thr_t*
343 /*===================*/
344  que_fork_t* fork)
345 {
346  que_thr_t* thr;
347  que_thr_t* suspended_thr = NULL;
348  que_thr_t* completed_thr = NULL;
349 
350  fork->state = QUE_FORK_ACTIVE;
351 
352  fork->last_sel_node = NULL;
353 
354  suspended_thr = NULL;
355  completed_thr = NULL;
356 
357  /* Choose the query thread to run: usually there is just one thread,
358  but in a parallelized select, which necessarily is non-scrollable,
359  there may be several to choose from */
360 
361  /* First we try to find a query thread in the QUE_THR_COMMAND_WAIT
362  state. Then we try to find a query thread in the QUE_THR_SUSPENDED
363  state, finally we try to find a query thread in the QUE_THR_COMPLETED
364  state */
365 
366  thr = UT_LIST_GET_FIRST(fork->thrs);
367 
368  /* We make a single pass over the thr list within which we note which
369  threads are ready to run. */
370  while (thr) {
371  switch (thr->state) {
372  case QUE_THR_COMMAND_WAIT:
373 
374  /* We have to send the initial message to query thread
375  to start it */
376 
377  que_thr_init_command(thr);
378 
379  return(thr);
380 
381  case QUE_THR_SUSPENDED:
382  /* In this case the execution of the thread was
383  suspended: no initial message is needed because
384  execution can continue from where it was left */
385  if (!suspended_thr) {
386  suspended_thr = thr;
387  }
388 
389  break;
390 
391  case QUE_THR_COMPLETED:
392  if (!completed_thr) {
393  completed_thr = thr;
394  }
395 
396  break;
397 
398  case QUE_THR_LOCK_WAIT:
399  ut_error;
400 
401  }
402 
403  thr = UT_LIST_GET_NEXT(thrs, thr);
404  }
405 
406  if (suspended_thr) {
407 
408  thr = suspended_thr;
409  que_thr_move_to_run_state(thr);
410 
411  } else if (completed_thr) {
412 
413  thr = completed_thr;
414  que_thr_init_command(thr);
415  }
416 
417  return(thr);
418 }
419 
420 /**********************************************************************/
424 UNIV_INTERN
425 void
427 /*==================*/
428  trx_t* /*trx __attribute__((unused))*/,
429  que_t* fork)
431 {
432  que_thr_t* thr;
433 
434  ut_ad(mutex_own(&kernel_mutex));
435  ut_ad(trx->sess->state == SESS_ERROR);
436  ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
437  ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
438 
439  thr = UT_LIST_GET_FIRST(fork->thrs);
440 
441  while (thr != NULL) {
442  ut_ad(!thr->is_active);
443  ut_ad(thr->state != QUE_THR_SIG_REPLY_WAIT);
444  ut_ad(thr->state != QUE_THR_LOCK_WAIT);
445 
446  thr->run_node = thr;
447  thr->prev_node = thr->child;
448  thr->state = QUE_THR_COMPLETED;
449 
450  thr = UT_LIST_GET_NEXT(thrs, thr);
451  }
452 
453  thr = UT_LIST_GET_FIRST(fork->thrs);
454 
455  que_thr_move_to_run_state(thr);
456 
457  ut_a(0);
459 }
460 
461 /****************************************************************/
465 UNIV_INLINE
466 ibool
467 que_fork_all_thrs_in_state(
468 /*=======================*/
469  que_fork_t* fork,
470  ulint state)
471 {
472  que_thr_t* thr_node;
473 
474  thr_node = UT_LIST_GET_FIRST(fork->thrs);
475 
476  while (thr_node != NULL) {
477  if (thr_node->state != state) {
478 
479  return(FALSE);
480  }
481 
482  thr_node = UT_LIST_GET_NEXT(thrs, thr_node);
483  }
484 
485  return(TRUE);
486 }
487 
488 /**********************************************************************/
490 static
491 void
492 que_graph_free_stat_list(
493 /*=====================*/
494  que_node_t* node)
495 {
496  while (node) {
498 
499  node = que_node_get_next(node);
500  }
501 }
502 
503 /**********************************************************************/
506 UNIV_INTERN
507 void
509 /*=====================*/
510  que_node_t* node)
511 {
512  que_fork_t* fork;
513  que_thr_t* thr;
514  undo_node_t* undo;
515  sel_node_t* sel;
516  ins_node_t* ins;
517  upd_node_t* upd;
518  tab_node_t* cre_tab;
519  ind_node_t* cre_ind;
520  purge_node_t* purge;
521 
522  if (node == NULL) {
523 
524  return;
525  }
526 
527  switch (que_node_get_type(node)) {
528 
529  case QUE_NODE_FORK:
530  fork = static_cast<que_fork_t *>(node);
531 
532  thr = UT_LIST_GET_FIRST(fork->thrs);
533 
534  while (thr) {
536 
537  thr = UT_LIST_GET_NEXT(thrs, thr);
538  }
539 
540  break;
541  case QUE_NODE_THR:
542 
543  thr = static_cast<que_thr_t *>(node);
544 
545  if (thr->magic_n != QUE_THR_MAGIC_N) {
546  fprintf(stderr,
547  "que_thr struct appears corrupt;"
548  " magic n %lu\n",
549  (unsigned long) thr->magic_n);
551  ut_error;
552  }
553 
554  thr->magic_n = QUE_THR_MAGIC_FREED;
555 
557 
558  break;
559  case QUE_NODE_UNDO:
560 
561  undo = static_cast<undo_node_t *>(node);
562 
563  mem_heap_free(undo->heap);
564 
565  break;
566  case QUE_NODE_SELECT:
567 
568  sel = static_cast<sel_node_t *>(node);
569 
571 
572  break;
573  case QUE_NODE_INSERT:
574 
575  ins = static_cast<ins_node_t *>(node);
576 
578 
579  mem_heap_free(ins->entry_sys_heap);
580 
581  break;
582  case QUE_NODE_PURGE:
583  purge = static_cast<purge_node_t *>(node);
584 
585  mem_heap_free(purge->heap);
586 
587  break;
588 
589  case QUE_NODE_UPDATE:
590 
591  upd = static_cast<upd_node_t *>(node);
592 
593  if (upd->in_mysql_interface) {
594 
595  btr_pcur_free_for_mysql(upd->pcur);
596  }
597 
598  que_graph_free_recursive(upd->cascade_node);
599 
600  if (upd->cascade_heap) {
601  mem_heap_free(upd->cascade_heap);
602  }
603 
605 
606  mem_heap_free(upd->heap);
607 
608  break;
609  case QUE_NODE_CREATE_TABLE:
610  cre_tab = static_cast<tab_node_t *>(node);
611 
612  que_graph_free_recursive(cre_tab->tab_def);
613  que_graph_free_recursive(cre_tab->col_def);
614  que_graph_free_recursive(cre_tab->commit_node);
615 
616  mem_heap_free(cre_tab->heap);
617 
618  break;
619  case QUE_NODE_CREATE_INDEX:
620  cre_ind = static_cast<ind_node_t *>(node);
621 
622  que_graph_free_recursive(cre_ind->ind_def);
623  que_graph_free_recursive(cre_ind->field_def);
624  que_graph_free_recursive(cre_ind->commit_node);
625 
626  mem_heap_free(cre_ind->heap);
627 
628  break;
629  case QUE_NODE_PROC:
630  que_graph_free_stat_list(((proc_node_t*)node)->stat_list);
631 
632  break;
633  case QUE_NODE_IF:
634  que_graph_free_stat_list(((if_node_t*)node)->stat_list);
635  que_graph_free_stat_list(((if_node_t*)node)->else_part);
636  que_graph_free_stat_list(((if_node_t*)node)->elsif_list);
637 
638  break;
639  case QUE_NODE_ELSIF:
640  que_graph_free_stat_list(((elsif_node_t*)node)->stat_list);
641 
642  break;
643  case QUE_NODE_WHILE:
644  que_graph_free_stat_list(((while_node_t*)node)->stat_list);
645 
646  break;
647  case QUE_NODE_FOR:
648  que_graph_free_stat_list(((for_node_t*)node)->stat_list);
649 
650  break;
651 
652  case QUE_NODE_ASSIGNMENT:
653  case QUE_NODE_EXIT:
654  case QUE_NODE_RETURN:
655  case QUE_NODE_COMMIT:
656  case QUE_NODE_ROLLBACK:
657  case QUE_NODE_LOCK:
658  case QUE_NODE_FUNC:
659  case QUE_NODE_ORDER:
660  case QUE_NODE_ROW_PRINTF:
661  case QUE_NODE_OPEN:
662  case QUE_NODE_FETCH:
663  /* No need to do anything */
664 
665  break;
666  default:
667  fprintf(stderr,
668  "que_node struct appears corrupt; type %lu\n",
669  (unsigned long) que_node_get_type(node));
671  ut_error;
672  }
673 }
674 
675 /**********************************************************************/
677 UNIV_INTERN
678 void
680 /*===========*/
681  que_t* graph)
686 {
687  ut_ad(graph);
688 
689  if (graph->sym_tab) {
690  /* The following call frees dynamic memory allocated
691  for variables etc. during execution. Frees also explicit
692  cursor definitions. */
693 
695  }
696 
697  if (graph->info && graph->info->graph_owns_us) {
698  pars_info_free(graph->info);
699  }
700 
702 
703  mem_heap_free(graph->heap);
704 }
705 
706 /****************************************************************/
709 static
710 que_thr_t*
711 que_thr_node_step(
712 /*==============*/
713  que_thr_t* thr)
715 {
716  ut_ad(thr->run_node == thr);
717 
718  if (thr->prev_node == thr->common.parent) {
719  /* If control to the node came from above, it is just passed
720  on */
721 
722  thr->run_node = thr->child;
723 
724  return(thr);
725  }
726 
727  mutex_enter(&kernel_mutex);
728 
729  if (que_thr_peek_stop(thr)) {
730 
731  mutex_exit(&kernel_mutex);
732 
733  return(thr);
734  }
735 
736  /* Thread execution completed */
737 
738  thr->state = QUE_THR_COMPLETED;
739 
740  mutex_exit(&kernel_mutex);
741 
742  return(NULL);
743 }
744 
745 /**********************************************************************/
751 static
752 void
753 que_thr_move_to_run_state(
754 /*======================*/
755  que_thr_t* thr)
756 {
757  trx_t* trx;
758 
759  ut_ad(thr->state != QUE_THR_RUNNING);
760 
761  trx = thr_get_trx(thr);
762 
763  if (!thr->is_active) {
764 
765  (thr->graph)->n_active_thrs++;
766 
767  trx->n_active_thrs++;
768 
769  thr->is_active = TRUE;
770 
771  ut_ad((thr->graph)->n_active_thrs == 1);
772  ut_ad(trx->n_active_thrs == 1);
773  }
774 
775  thr->state = QUE_THR_RUNNING;
776 }
777 
778 /**********************************************************************/
786 static
787 void
788 que_thr_dec_refer_count(
789 /*====================*/
790  que_thr_t* thr,
791  que_thr_t** next_thr)
796 {
797  que_fork_t* fork;
798  trx_t* trx;
799  ulint fork_type;
800  ibool stopped;
801 
802  fork = static_cast<que_fork_t *>(thr->common.parent);
803  trx = thr_get_trx(thr);
804 
805  mutex_enter(&kernel_mutex);
806 
807  ut_a(thr->is_active);
808 
809  if (thr->state == QUE_THR_RUNNING) {
810 
811  stopped = que_thr_stop(thr);
812 
813  if (!stopped) {
814  /* The reason for the thr suspension or wait was
815  already canceled before we came here: continue
816  running the thread */
817 
818  /* fputs("!!!!!!!! Wait already ended: continue thr\n",
819  stderr); */
820 
821  if (next_thr && *next_thr == NULL) {
822  /* Normally srv_suspend_mysql_thread resets
823  the state to DB_SUCCESS before waiting, but
824  in this case we have to do it here,
825  otherwise nobody does it. */
826  trx->error_state = DB_SUCCESS;
827 
828  *next_thr = thr;
829  } else {
830  ut_error;
832  }
833 
834  mutex_exit(&kernel_mutex);
835 
836  return;
837  }
838  }
839 
840  ut_ad(fork->n_active_thrs == 1);
841  ut_ad(trx->n_active_thrs == 1);
842 
843  fork->n_active_thrs--;
844  trx->n_active_thrs--;
845 
846  thr->is_active = FALSE;
847 
848  if (trx->n_active_thrs > 0) {
849 
850  mutex_exit(&kernel_mutex);
851 
852  return;
853  }
854 
855  fork_type = fork->fork_type;
856 
857  /* Check if all query threads in the same fork are completed */
858 
859  if (que_fork_all_thrs_in_state(fork, QUE_THR_COMPLETED)) {
860 
861  switch (fork_type) {
862  case QUE_FORK_ROLLBACK:
863  /* This is really the undo graph used in rollback,
864  no roll_node in this graph */
865 
866  ut_ad(UT_LIST_GET_LEN(trx->signals) > 0);
867  ut_ad(trx->handling_signals == TRUE);
868 
869  trx_finish_rollback_off_kernel(fork, trx, next_thr);
870  break;
871 
872  case QUE_FORK_PURGE:
873  case QUE_FORK_RECOVERY:
874  case QUE_FORK_MYSQL_INTERFACE:
875 
876  /* Do nothing */
877  break;
878 
879  default:
880  ut_error;
881  }
882  }
883 
884  if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) {
885 
886  /* If the trx is signaled and its query thread count drops to
887  zero, then we start processing a signal; from it we may get
888  a new query thread to run */
889 
890  trx_sig_start_handle(trx, next_thr);
891  }
892 
893  if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) {
894 
896  }
897 
898  mutex_exit(&kernel_mutex);
899 }
900 
901 /**********************************************************************/
906 UNIV_INTERN
907 ibool
909 /*=========*/
910  que_thr_t* thr)
911 {
912  trx_t* trx;
913  que_t* graph;
914  ibool ret = TRUE;
915 
916  ut_ad(mutex_own(&kernel_mutex));
917 
918  graph = thr->graph;
919  trx = graph->trx;
920 
921  if (graph->state == QUE_FORK_COMMAND_WAIT) {
922  thr->state = QUE_THR_SUSPENDED;
923 
924  } else if (trx->que_state == TRX_QUE_LOCK_WAIT) {
925 
926  UT_LIST_ADD_FIRST(trx_thrs, trx->wait_thrs, thr);
927  thr->state = QUE_THR_LOCK_WAIT;
928 
929  } else if (trx->error_state != DB_SUCCESS
930  && trx->error_state != DB_LOCK_WAIT) {
931 
932  /* Error handling built for the MySQL interface */
933  thr->state = QUE_THR_COMPLETED;
934 
935  } else if (UT_LIST_GET_LEN(trx->signals) > 0
936  && graph->fork_type != QUE_FORK_ROLLBACK) {
937 
938  thr->state = QUE_THR_SUSPENDED;
939  } else {
940  ut_ad(graph->state == QUE_FORK_ACTIVE);
941 
942  ret = FALSE;
943  }
944 
945  return(ret);
946 }
947 
948 /**********************************************************************/
953 UNIV_INTERN
954 void
956 /*===================*/
957  que_thr_t* thr)
958 {
959  trx_t* trx;
960 
961  trx = thr_get_trx(thr);
962 
963  mutex_enter(&kernel_mutex);
964 
965  if (thr->state == QUE_THR_RUNNING) {
966 
967  if (trx->error_state != DB_SUCCESS
968  && trx->error_state != DB_LOCK_WAIT) {
969 
970  /* Error handling built for the MySQL interface */
971  thr->state = QUE_THR_COMPLETED;
972  } else {
973  /* It must have been a lock wait but the lock was
974  already released, or this transaction was chosen
975  as a victim in selective deadlock resolution */
976 
977  mutex_exit(&kernel_mutex);
978 
979  return;
980  }
981  }
982 
983  ut_ad(thr->is_active == TRUE);
984  ut_ad(trx->n_active_thrs == 1);
985  ut_ad(thr->graph->n_active_thrs == 1);
986 
987  thr->is_active = FALSE;
988  (thr->graph)->n_active_thrs--;
989 
990  trx->n_active_thrs--;
991 
992  mutex_exit(&kernel_mutex);
993 }
994 
995 /**********************************************************************/
999 UNIV_INTERN
1000 void
1002 /*================================*/
1003  que_thr_t* thr,
1004  trx_t* trx)
1005 {
1006  if (thr->magic_n != QUE_THR_MAGIC_N) {
1007  fprintf(stderr,
1008  "que_thr struct appears corrupt; magic n %lu\n",
1009  (unsigned long) thr->magic_n);
1010 
1012 
1013  ut_error;
1014  }
1015 
1016  if (!thr->is_active) {
1017 
1018  thr->graph->n_active_thrs++;
1019 
1020  trx->n_active_thrs++;
1021 
1022  thr->is_active = TRUE;
1023  }
1024 
1025  thr->state = QUE_THR_RUNNING;
1026 }
1027 
1028 /**********************************************************************/
1031 UNIV_INTERN
1032 void
1034 /*============================*/
1035  que_thr_t* thr,
1036  trx_t* trx)
1037 {
1038  ut_ad(thr->state == QUE_THR_RUNNING);
1039  ut_ad(thr->is_active == TRUE);
1040  ut_ad(trx->n_active_thrs == 1);
1041  ut_ad(thr->graph->n_active_thrs == 1);
1042 
1043  if (thr->magic_n != QUE_THR_MAGIC_N) {
1044  fprintf(stderr,
1045  "que_thr struct appears corrupt; magic n %lu\n",
1046  (unsigned long) thr->magic_n);
1047 
1049 
1050  ut_error;
1051  }
1052 
1053  thr->state = QUE_THR_COMPLETED;
1054 
1055  thr->is_active = FALSE;
1056  (thr->graph)->n_active_thrs--;
1057 
1058  trx->n_active_thrs--;
1059 }
1060 
1061 /****************************************************************/
1065 UNIV_INTERN
1066 que_node_t*
1068 /*==============================*/
1069  que_node_t* node)
1070 {
1071  ut_ad(node);
1072 
1073  for (;;) {
1074  ulint type;
1075 
1076  node = que_node_get_parent(node);
1077 
1078  if (!node) {
1079  break;
1080  }
1081 
1082  type = que_node_get_type(node);
1083 
1084  if ((type == QUE_NODE_FOR) || (type == QUE_NODE_WHILE)) {
1085  break;
1086  }
1087  }
1088 
1089  return(node);
1090 }
1091 
1092 /**********************************************************************/
1094 UNIV_INTERN
1095 void
1097 /*================*/
1098  que_node_t* node)
1099 {
1100  ulint type;
1101  const char* str;
1102 
1103  type = que_node_get_type(node);
1104 
1105  if (type == QUE_NODE_SELECT) {
1106  str = "SELECT";
1107  } else if (type == QUE_NODE_INSERT) {
1108  str = "INSERT";
1109  } else if (type == QUE_NODE_UPDATE) {
1110  str = "UPDATE";
1111  } else if (type == QUE_NODE_WHILE) {
1112  str = "WHILE";
1113  } else if (type == QUE_NODE_ASSIGNMENT) {
1114  str = "ASSIGNMENT";
1115  } else if (type == QUE_NODE_IF) {
1116  str = "IF";
1117  } else if (type == QUE_NODE_FETCH) {
1118  str = "FETCH";
1119  } else if (type == QUE_NODE_OPEN) {
1120  str = "OPEN";
1121  } else if (type == QUE_NODE_PROC) {
1122  str = "STORED PROCEDURE";
1123  } else if (type == QUE_NODE_FUNC) {
1124  str = "FUNCTION";
1125  } else if (type == QUE_NODE_LOCK) {
1126  str = "LOCK";
1127  } else if (type == QUE_NODE_THR) {
1128  str = "QUERY THREAD";
1129  } else if (type == QUE_NODE_COMMIT) {
1130  str = "COMMIT";
1131  } else if (type == QUE_NODE_UNDO) {
1132  str = "UNDO ROW";
1133  } else if (type == QUE_NODE_PURGE) {
1134  str = "PURGE ROW";
1135  } else if (type == QUE_NODE_ROLLBACK) {
1136  str = "ROLLBACK";
1137  } else if (type == QUE_NODE_CREATE_TABLE) {
1138  str = "CREATE TABLE";
1139  } else if (type == QUE_NODE_CREATE_INDEX) {
1140  str = "CREATE INDEX";
1141  } else if (type == QUE_NODE_FOR) {
1142  str = "FOR LOOP";
1143  } else if (type == QUE_NODE_RETURN) {
1144  str = "RETURN";
1145  } else if (type == QUE_NODE_EXIT) {
1146  str = "EXIT";
1147  } else {
1148  str = "UNKNOWN NODE TYPE";
1149  }
1150 
1151  fprintf(stderr, "Node type %lu: %s, address %p\n",
1152  (ulong) type, str, (void*) node);
1153 }
1154 
1155 /**********************************************************************/
1159 UNIV_INLINE
1160 que_thr_t*
1161 que_thr_step(
1162 /*=========*/
1163  que_thr_t* thr)
1164 {
1165  que_node_t* node;
1166  que_thr_t* old_thr;
1167  trx_t* trx;
1168  ulint type;
1169 
1170  trx = thr_get_trx(thr);
1171 
1172  ut_ad(thr->state == QUE_THR_RUNNING);
1173  ut_a(trx->error_state == DB_SUCCESS);
1174 
1175  thr->resource++;
1176 
1177  node = thr->run_node;
1178  type = que_node_get_type(node);
1179 
1180  old_thr = thr;
1181 
1182 #ifdef UNIV_DEBUG
1183  if (que_trace_on) {
1184  fputs("To execute: ", stderr);
1185  que_node_print_info(node);
1186  }
1187 #endif
1188  if (type & QUE_NODE_CONTROL_STAT) {
1189  if ((thr->prev_node != que_node_get_parent(node))
1190  && que_node_get_next(thr->prev_node)) {
1191 
1192  /* The control statements, like WHILE, always pass the
1193  control to the next child statement if there is any
1194  child left */
1195 
1196  thr->run_node = que_node_get_next(thr->prev_node);
1197 
1198  } else if (type == QUE_NODE_IF) {
1199  if_step(thr);
1200  } else if (type == QUE_NODE_FOR) {
1201  for_step(thr);
1202  } else if (type == QUE_NODE_PROC) {
1203 
1204  /* We can access trx->undo_no without reserving
1205  trx->undo_mutex, because there cannot be active query
1206  threads doing updating or inserting at the moment! */
1207 
1208  if (thr->prev_node == que_node_get_parent(node)) {
1209  trx->last_sql_stat_start.least_undo_no
1210  = trx->undo_no;
1211  }
1212 
1213  proc_step(thr);
1214  } else if (type == QUE_NODE_WHILE) {
1215  while_step(thr);
1216  } else {
1217  ut_error;
1218  }
1219  } else if (type == QUE_NODE_ASSIGNMENT) {
1220  assign_step(thr);
1221  } else if (type == QUE_NODE_SELECT) {
1222  thr = row_sel_step(thr);
1223  } else if (type == QUE_NODE_INSERT) {
1224  thr = row_ins_step(thr);
1225  } else if (type == QUE_NODE_UPDATE) {
1226  thr = row_upd_step(thr);
1227  } else if (type == QUE_NODE_FETCH) {
1228  thr = fetch_step(thr);
1229  } else if (type == QUE_NODE_OPEN) {
1230  thr = open_step(thr);
1231  } else if (type == QUE_NODE_FUNC) {
1232  proc_eval_step(thr);
1233 
1234  } else if (type == QUE_NODE_LOCK) {
1235 
1236  ut_error;
1237  /*
1238  thr = que_lock_step(thr);
1239  */
1240  } else if (type == QUE_NODE_THR) {
1241  thr = que_thr_node_step(thr);
1242  } else if (type == QUE_NODE_COMMIT) {
1243  thr = trx_commit_step(thr);
1244  } else if (type == QUE_NODE_UNDO) {
1245  thr = row_undo_step(thr);
1246  } else if (type == QUE_NODE_PURGE) {
1247  thr = row_purge_step(thr);
1248  } else if (type == QUE_NODE_RETURN) {
1249  thr = return_step(thr);
1250  } else if (type == QUE_NODE_EXIT) {
1251  thr = exit_step(thr);
1252  } else if (type == QUE_NODE_ROLLBACK) {
1253  thr = trx_rollback_step(thr);
1254  } else if (type == QUE_NODE_CREATE_TABLE) {
1255  thr = dict_create_table_step(thr);
1256  } else if (type == QUE_NODE_CREATE_INDEX) {
1257  thr = dict_create_index_step(thr);
1258  } else if (type == QUE_NODE_ROW_PRINTF) {
1259  thr = row_printf_step(thr);
1260  } else {
1261  ut_error;
1262  }
1263 
1264  if (type == QUE_NODE_EXIT) {
1266  } else {
1267  old_thr->prev_node = node;
1268  }
1269 
1270  if (thr) {
1271  ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
1272  }
1273 
1274  return(thr);
1275 }
1276 
1277 /**********************************************************************/
1279 static
1280 void
1281 que_run_threads_low(
1282 /*================*/
1283  que_thr_t* thr)
1284 {
1285  que_thr_t* next_thr;
1286  ulint loop_count;
1287 
1288  ut_ad(thr->state == QUE_THR_RUNNING);
1289  ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
1290  ut_ad(!mutex_own(&kernel_mutex));
1291 
1292  loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
1293 loop:
1294  /* Check that there is enough space in the log to accommodate
1295  possible log entries by this query step; if the operation can touch
1296  more than about 4 pages, checks must be made also within the query
1297  step! */
1298 
1299  log_free_check();
1300 
1301  /* Perform the actual query step: note that the query thread
1302  may change if, e.g., a subprocedure call is made */
1303 
1304  /*-------------------------*/
1305  next_thr = que_thr_step(thr);
1306  /*-------------------------*/
1307 
1308  ut_a(!next_thr || (thr_get_trx(next_thr)->error_state == DB_SUCCESS));
1309 
1310  loop_count++;
1311 
1312  if (next_thr != thr) {
1313  ut_a(next_thr == NULL);
1314 
1315  /* This can change next_thr to a non-NULL value if there was
1316  a lock wait that already completed. */
1317  que_thr_dec_refer_count(thr, &next_thr);
1318 
1319  if (next_thr == NULL) {
1320 
1321  return;
1322  }
1323 
1324  loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
1325 
1326  thr = next_thr;
1327  }
1328 
1329  goto loop;
1330 }
1331 
1332 /**********************************************************************/
1334 UNIV_INTERN
1335 void
1337 /*============*/
1338  que_thr_t* thr)
1339 {
1340 loop:
1341  ut_a(thr_get_trx(thr)->error_state == DB_SUCCESS);
1342  que_run_threads_low(thr);
1343 
1344  mutex_enter(&kernel_mutex);
1345 
1346  switch (thr->state) {
1347 
1348  case QUE_THR_RUNNING:
1349  /* There probably was a lock wait, but it already ended
1350  before we came here: continue running thr */
1351 
1352  mutex_exit(&kernel_mutex);
1353 
1354  goto loop;
1355 
1356  case QUE_THR_LOCK_WAIT:
1357  mutex_exit(&kernel_mutex);
1358 
1359  /* The ..._mysql_... function works also for InnoDB's
1360  internal threads. Let us wait that the lock wait ends. */
1361 
1363 
1364  if (thr_get_trx(thr)->error_state != DB_SUCCESS) {
1365  /* thr was chosen as a deadlock victim or there was
1366  a lock wait timeout */
1367 
1368  que_thr_dec_refer_count(thr, NULL);
1369 
1370  return;
1371  }
1372 
1373  goto loop;
1374 
1375  case QUE_THR_COMPLETED:
1376  case QUE_THR_COMMAND_WAIT:
1377  /* Do nothing */
1378  break;
1379 
1380  default:
1381  ut_error;
1382  }
1383 
1384  mutex_exit(&kernel_mutex);
1385 }
1386 
1387 /*********************************************************************/
1390 UNIV_INTERN
1391 ulint
1393 /*=========*/
1394  pars_info_t* info,
1395  const char* sql,
1396  ibool reserve_dict_mutex,
1399  trx_t* trx)
1400 {
1401  que_thr_t* thr;
1402  que_t* graph;
1403 
1404  ut_a(trx->error_state == DB_SUCCESS);
1405 
1406  if (reserve_dict_mutex) {
1407  mutex_enter(&dict_sys->mutex);
1408  }
1409 
1410  graph = pars_sql(info, sql);
1411 
1412  if (reserve_dict_mutex) {
1413  mutex_exit(&dict_sys->mutex);
1414  }
1415 
1416  ut_a(graph);
1417 
1418  graph->trx = trx;
1419  trx->graph = NULL;
1420 
1421  graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
1422 
1423  ut_a(thr = que_fork_start_command(graph));
1424 
1425  que_run_threads(thr);
1426 
1427  que_graph_free(graph);
1428 
1429  return(trx->error_state);
1430 }