Drizzled Public API Documentation

cursor.cc
1 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2008 Sun Microsystems, Inc.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; version 2 of the License.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18  */
19 
26 #include <config.h>
27 #include <fcntl.h>
28 #include <drizzled/error.h>
29 #include <drizzled/field/epoch.h>
30 #include <drizzled/gettext.h>
31 #include <drizzled/internal/my_sys.h>
32 #include <drizzled/item/empty_string.h>
33 #include <drizzled/item/int.h>
34 #include <drizzled/lock.h>
35 #include <drizzled/message/table.h>
36 #include <drizzled/optimizer/cost_vector.h>
37 #include <drizzled/plugin/client.h>
38 #include <drizzled/plugin/event_observer.h>
39 #include <drizzled/plugin/storage_engine.h>
40 #include <drizzled/probes.h>
41 #include <drizzled/session.h>
42 #include <drizzled/sql_base.h>
43 #include <drizzled/sql_parse.h>
44 #include <drizzled/transaction_services.h>
45 #include <drizzled/key.h>
46 #include <drizzled/sql_lex.h>
47 #include <drizzled/resource_context.h>
48 #include <drizzled/statistics_variables.h>
49 #include <drizzled/system_variables.h>
50 
51 using namespace std;
52 
53 namespace drizzled {
54 
55 /****************************************************************************
56 ** General Cursor functions
57 ****************************************************************************/
58 Cursor::Cursor(plugin::StorageEngine &engine_arg,
59  Table &arg)
60  : table(arg),
61  engine(engine_arg),
62  estimation_rows_to_insert(0),
63  ref(0),
64  key_used_on_scan(MAX_KEY), active_index(MAX_KEY),
65  ref_length(sizeof(internal::my_off_t)),
66  inited(NONE),
67  locked(false),
68  next_insert_id(0), insert_id_for_cur_row(0)
69 { }
70 
71 Cursor::~Cursor()
72 {
73  assert(not locked);
74  /* TODO: assert(inited == NONE); */
75 }
76 
77 
78 /*
79  * @note this only used in
80  * optimizer::QuickRangeSelect::init_ror_merged_scan(bool reuse_handler) as
81  * of the writing of this comment. -Brian
82  */
83 Cursor *Cursor::clone(memory::Root *mem_root)
84 {
85  Cursor *new_handler= getTable()->getMutableShare()->db_type()->getCursor(*getTable());
86  /*
87  Allocate Cursor->ref here because otherwise ha_open will allocate it
88  on this->table->mem_root and we will not be able to reclaim that memory
89  when the clone Cursor object is destroyed.
90  */
91  new_handler->ref= mem_root->alloc(ALIGN_SIZE(ref_length)*2);
92  identifier::Table identifier(*getTable());
93  return new_handler->ha_open(identifier, getTable()->getDBStat(), HA_OPEN_IGNORE_IF_LOCKED) ? NULL : new_handler;
94 }
95 
96 /*
97  DESCRIPTION
98  given a buffer with a key value, and a map of keyparts
99  that are present in this value, returns the length of the value
100 */
101 uint32_t Cursor::calculate_key_len(uint32_t key_position, key_part_map keypart_map_arg)
102 {
103  /* works only with key prefixes */
104  assert(((keypart_map_arg + 1) & keypart_map_arg) == 0);
105 
106  const KeyPartInfo *key_part_found= getTable()->getShare()->getKeyInfo(key_position).key_part;
107  const KeyPartInfo *end_key_part_found= key_part_found + getTable()->getShare()->getKeyInfo(key_position).key_parts;
108  uint32_t length= 0;
109 
110  while (key_part_found < end_key_part_found && keypart_map_arg)
111  {
112  length+= key_part_found->store_length;
113  keypart_map_arg >>= 1;
114  key_part_found++;
115  }
116  return length;
117 }
118 
119 int Cursor::startIndexScan(uint32_t idx, bool sorted)
120 {
121  int result;
122  assert(inited == NONE);
123  if (!(result= doStartIndexScan(idx, sorted)))
124  inited=INDEX;
125  end_range= NULL;
126  return result;
127 }
128 
129 int Cursor::endIndexScan()
130 {
131  assert(inited==INDEX);
132  inited=NONE;
133  end_range= NULL;
134  return(doEndIndexScan());
135 }
136 
137 int Cursor::startTableScan(bool scan)
138 {
139  int result;
140  assert(inited==NONE || (inited==RND && scan));
141  inited= (result= doStartTableScan(scan)) ? NONE: RND;
142 
143  return result;
144 }
145 
146 int Cursor::endTableScan()
147 {
148  assert(inited==RND);
149  inited=NONE;
150  return(doEndTableScan());
151 }
152 
153 int Cursor::ha_index_or_rnd_end()
154 {
155  return inited == INDEX ? endIndexScan() : inited == RND ? endTableScan() : 0;
156 }
157 
158 void Cursor::ha_start_bulk_insert(ha_rows rows)
159 {
160  estimation_rows_to_insert= rows;
161  start_bulk_insert(rows);
162 }
163 
164 int Cursor::ha_end_bulk_insert()
165 {
166  estimation_rows_to_insert= 0;
167  return end_bulk_insert();
168 }
169 
170 const key_map *Cursor::keys_to_use_for_scanning()
171 {
172  return &key_map_empty;
173 }
174 
175 bool Cursor::has_transactions()
176 {
177  return (getTable()->getShare()->db_type()->check_flag(HTON_BIT_DOES_TRANSACTIONS));
178 }
179 
180 void Cursor::ha_statistic_increment(uint64_t system_status_var::*offset) const
181 {
182  (getTable()->in_use->status_var.*offset)++;
183 }
184 
185 void **Cursor::ha_data(Session *session) const
186 {
187  return session->getEngineData(getEngine());
188 }
189 
190 bool Cursor::is_fatal_error(int error, uint32_t flags)
191 {
192  if (!error ||
193  ((flags & HA_CHECK_DUP_KEY) &&
194  (error == HA_ERR_FOUND_DUPP_KEY ||
195  error == HA_ERR_FOUND_DUPP_UNIQUE)))
196  return false;
197  return true;
198 }
199 
200 
201 ha_rows Cursor::records() { return stats.records; }
202 uint64_t Cursor::tableSize() { return stats.index_file_length + stats.data_file_length; }
203 uint64_t Cursor::rowSize() { return getTable()->getRecordLength() + getTable()->sizeFields(); }
204 
205 int Cursor::doOpen(const identifier::Table &identifier, int mode, uint32_t test_if_locked)
206 {
207  return open(identifier.getPath().c_str(), mode, test_if_locked);
208 }
209 
216 int Cursor::ha_open(const identifier::Table &identifier,
217  int mode,
218  int test_if_locked)
219 {
220  int error;
221 
222  if ((error= doOpen(identifier, mode, test_if_locked)))
223  {
224  if ((error == EACCES || error == EROFS) && mode == O_RDWR &&
225  (getTable()->db_stat & HA_TRY_READ_ONLY))
226  {
227  getTable()->db_stat|=HA_READ_ONLY;
228  error= doOpen(identifier, O_RDONLY,test_if_locked);
229  }
230  }
231  if (error)
232  {
233  errno= error; /* Safeguard */
234  }
235  else
236  {
237  if (getTable()->getShare()->db_options_in_use & HA_OPTION_READ_ONLY_DATA)
238  getTable()->db_stat|=HA_READ_ONLY;
239  (void) extra(HA_EXTRA_NO_READCHECK); // Not needed in SQL
240 
241  /* ref is already allocated for us if we're called from Cursor::clone() */
242  if (!ref)
243  ref= getTable()->alloc(ALIGN_SIZE(ref_length)*2);
244  dup_ref=ref+ALIGN_SIZE(ref_length);
245  }
246  return error;
247 }
248 
255 int Cursor::read_first_row(unsigned char * buf, uint32_t primary_key)
256 {
257  int error;
258 
259  ha_statistic_increment(&system_status_var::ha_read_first_count);
260 
261  /*
262  If there is very few deleted rows in the table, find the first row by
263  scanning the table.
264  @todo remove the test for HA_READ_ORDER
265  */
266  if (stats.deleted < 10 || primary_key >= MAX_KEY ||
267  !(getTable()->index_flags(primary_key) & HA_READ_ORDER))
268  {
269  error= startTableScan(1);
270  if (error == 0)
271  {
272  while ((error= rnd_next(buf)) == HA_ERR_RECORD_DELETED) ;
273  (void) endTableScan();
274  }
275  }
276  else
277  {
278  /* Find the first row through the primary key */
279  error= startIndexScan(primary_key, 0);
280  if (error == 0)
281  {
282  error=index_first(buf);
283  (void) endIndexScan();
284  }
285  }
286  return error;
287 }
288 
300 inline uint64_t
302 {
303  if (variables->auto_increment_increment == 1)
304  return (nr+1); // optimization of the formula below
305  nr= (((nr+ variables->auto_increment_increment -
306  variables->auto_increment_offset)) /
307  (uint64_t) variables->auto_increment_increment);
308  return (nr* (uint64_t) variables->auto_increment_increment +
309  variables->auto_increment_offset);
310 }
311 
312 
313 void Cursor::adjust_next_insert_id_after_explicit_value(uint64_t nr)
314 {
315  /*
316  If we have set Session::next_insert_id previously and plan to insert an
317  explicitely-specified value larger than this, we need to increase
318  Session::next_insert_id to be greater than the explicit value.
319  */
320  if ((next_insert_id > 0) && (nr >= next_insert_id))
321  set_next_insert_id(compute_next_insert_id(nr, &getTable()->in_use->variables));
322 }
323 
324 
340 inline uint64_t
341 prev_insert_id(uint64_t nr, drizzle_system_variables *variables)
342 {
343  if (unlikely(nr < variables->auto_increment_offset))
344  {
345  /*
346  There's nothing good we can do here. That is a pathological case, where
347  the offset is larger than the column's max possible value, i.e. not even
348  the first sequence value may be inserted. User will receive warning.
349  */
350  return nr;
351  }
352  if (variables->auto_increment_increment == 1)
353  return nr; // optimization of the formula below
354  nr= (((nr - variables->auto_increment_offset)) /
355  (uint64_t) variables->auto_increment_increment);
356  return (nr * (uint64_t) variables->auto_increment_increment +
357  variables->auto_increment_offset);
358 }
359 
360 
422 #define AUTO_INC_DEFAULT_NB_ROWS 1 // Some prefer 1024 here
423 #define AUTO_INC_DEFAULT_NB_MAX_BITS 16
424 #define AUTO_INC_DEFAULT_NB_MAX ((1 << AUTO_INC_DEFAULT_NB_MAX_BITS) - 1)
425 
426 int Cursor::update_auto_increment()
427 {
428  uint64_t nr, nb_reserved_values;
429  bool append= false;
430  Session *session= getTable()->in_use;
431  drizzle_system_variables *variables= &session->variables;
432 
433  /*
434  next_insert_id is a "cursor" into the reserved interval, it may go greater
435  than the interval, but not smaller.
436  */
437  assert(next_insert_id >= auto_inc_interval_for_cur_row.minimum());
438 
439  /* We check if auto_increment_field_not_null is false
440  for an auto increment column, not a magic value like NULL is.
441  same as sql_mode=NO_AUTO_VALUE_ON_ZERO */
442 
443  if ((nr= getTable()->next_number_field->val_int()) != 0
444  || getTable()->auto_increment_field_not_null)
445  {
446  /*
447  Update next_insert_id if we had already generated a value in this
448  statement (case of INSERT VALUES(null),(3763),(null):
449  the last NULL needs to insert 3764, not the value of the first NULL plus
450  1).
451  */
452  adjust_next_insert_id_after_explicit_value(nr);
453  insert_id_for_cur_row= 0; // didn't generate anything
454 
455  return 0;
456  }
457 
458  if ((nr= next_insert_id) >= auto_inc_interval_for_cur_row.maximum())
459  {
460  {
461  /*
462  Cursor::estimation_rows_to_insert was set by
463  Cursor::ha_start_bulk_insert(); if 0 it means "unknown".
464  */
465  uint32_t nb_already_reserved_intervals= 0;
466  uint64_t nb_desired_values;
467  /*
468  If an estimation was given to the engine:
469  - use it.
470  - if we already reserved numbers, it means the estimation was
471  not accurate, then we'll reserve 2*AUTO_INC_DEFAULT_NB_ROWS the 2nd
472  time, twice that the 3rd time etc.
473  If no estimation was given, use those increasing defaults from the
474  start, starting from AUTO_INC_DEFAULT_NB_ROWS.
475  Don't go beyond a max to not reserve "way too much" (because
476  reservation means potentially losing unused values).
477  */
478  if (nb_already_reserved_intervals == 0 &&
479  (estimation_rows_to_insert > 0))
480  nb_desired_values= estimation_rows_to_insert;
481  else /* go with the increasing defaults */
482  {
483  /* avoid overflow in formula, with this if() */
484  if (nb_already_reserved_intervals <= AUTO_INC_DEFAULT_NB_MAX_BITS)
485  {
486  nb_desired_values= AUTO_INC_DEFAULT_NB_ROWS *
487  (1 << nb_already_reserved_intervals);
488  set_if_smaller(nb_desired_values, (uint64_t)AUTO_INC_DEFAULT_NB_MAX);
489  }
490  else
491  nb_desired_values= AUTO_INC_DEFAULT_NB_MAX;
492  }
493  /* This call ignores all its parameters but nr, currently */
494  get_auto_increment(variables->auto_increment_offset,
495  variables->auto_increment_increment,
496  nb_desired_values, &nr,
497  &nb_reserved_values);
498  if (nr == ~(uint64_t) 0)
499  return HA_ERR_AUTOINC_READ_FAILED; // Mark failure
500 
501  /*
502  That rounding below should not be needed when all engines actually
503  respect offset and increment in get_auto_increment(). But they don't
504  so we still do it. Wonder if for the not-first-in-index we should do
505  it. Hope that this rounding didn't push us out of the interval; even
506  if it did we cannot do anything about it (calling the engine again
507  will not help as we inserted no row).
508  */
509  nr= compute_next_insert_id(nr-1, variables);
510  }
511 
512  if (getTable()->getShare()->next_number_keypart == 0)
513  {
514  /* We must defer the appending until "nr" has been possibly truncated */
515  append= true;
516  }
517  }
518 
519  if (unlikely(getTable()->next_number_field->store((int64_t) nr, true)))
520  {
521  /*
522  first test if the query was aborted due to strict mode constraints
523  */
524  if (session->getKilled() == Session::KILL_BAD_DATA)
525  return HA_ERR_AUTOINC_ERANGE;
526 
527  /*
528  field refused this value (overflow) and truncated it, use the result of
529  the truncation (which is going to be inserted); however we try to
530  decrease it to honour auto_increment_* variables.
531  That will shift the left bound of the reserved interval, we don't
532  bother shifting the right bound (anyway any other value from this
533  interval will cause a duplicate key).
534  */
535  nr= prev_insert_id(getTable()->next_number_field->val_int(), variables);
536  if (unlikely(getTable()->next_number_field->store((int64_t) nr, true)))
537  nr= getTable()->next_number_field->val_int();
538  }
539  if (append)
540  auto_inc_interval_for_cur_row.replace(nr, nb_reserved_values, variables->auto_increment_increment);
541 
542  /*
543  Record this autogenerated value. If the caller then
544  succeeds to insert this value, it will call
545  record_first_successful_insert_id_in_cur_stmt()
546  which will set first_successful_insert_id_in_cur_stmt if it's not
547  already set.
548  */
549  insert_id_for_cur_row= nr;
550  /*
551  Set next insert id to point to next auto-increment value to be able to
552  handle multi-row statements.
553  */
554  set_next_insert_id(compute_next_insert_id(nr, variables));
555 
556  return 0;
557 }
558 
559 
576 void Cursor::ha_release_auto_increment()
577 {
578  release_auto_increment();
579  insert_id_for_cur_row= 0;
580  auto_inc_interval_for_cur_row.replace(0, 0, 0);
581  next_insert_id= 0;
582 }
583 
584 void Cursor::drop_table()
585 {
586  close();
587 }
588 
589 int Cursor::ha_check(Session*)
590 {
591  return HA_ADMIN_OK;
592 }
593 
599 inline
600 void
601 Cursor::setTransactionReadWrite()
602 {
603  /*
604  * If the cursor has not context for execution then there should be no
605  * possible resource to gain (and if there is... then there is a bug such
606  * that in_use should have been set.
607  */
608  if (not getTable()->in_use)
609  return;
610 
611  /*
612  When a storage engine method is called, the transaction must
613  have been started, unless it's a DDL call, for which the
614  storage engine starts the transaction internally, and commits
615  it internally, without registering in the ha_list.
616  Unfortunately here we can't know know for sure if the engine
617  has registered the transaction or not, so we must check.
618  */
619  ResourceContext& resource_context= getTable()->in_use->getResourceContext(*getEngine());
620  if (resource_context.isStarted())
621  resource_context.markModifiedData();
622 }
623 
624 
635 int
636 Cursor::ha_delete_all_rows()
637 {
638  setTransactionReadWrite();
639 
640  int result= delete_all_rows();
641 
642  if (result == 0)
643  {
650  Session& session= *getTable()->in_use;
651  TransactionServices::truncateTable(session, *getTable());
652  }
653 
654  return result;
655 }
656 
657 
664 int
665 Cursor::ha_reset_auto_increment(uint64_t value)
666 {
667  setTransactionReadWrite();
668 
669  return reset_auto_increment(value);
670 }
671 
672 
679 int
680 Cursor::ha_analyze(Session* session)
681 {
682  setTransactionReadWrite();
683 
684  return analyze(session);
685 }
686 
693 int
694 Cursor::ha_disable_indexes(uint32_t mode)
695 {
696  setTransactionReadWrite();
697 
698  return disable_indexes(mode);
699 }
700 
701 
708 int
709 Cursor::ha_enable_indexes(uint32_t mode)
710 {
711  setTransactionReadWrite();
712 
713  return enable_indexes(mode);
714 }
715 
716 
723 int Cursor::ha_discard_or_import_tablespace(bool discard)
724 {
725  setTransactionReadWrite();
726  return discard_or_import_tablespace(discard);
727 }
728 
735 void Cursor::closeMarkForDelete()
736 {
737  setTransactionReadWrite();
738  return drop_table();
739 }
740 
741 int Cursor::index_next_same(unsigned char *buf, const unsigned char *key, uint32_t keylen)
742 {
743  int error= index_next(buf);
744  if (error)
745  return error;
746 
747  ptrdiff_t ptrdiff= buf - getTable()->getInsertRecord();
748  unsigned char *save_record_0= NULL;
749  KeyInfo *key_info= NULL;
750  KeyPartInfo *key_part;
751  KeyPartInfo *key_part_end= NULL;
752 
753  /*
754  key_cmp_if_same() compares table->getInsertRecord() against 'key'.
755  In parts it uses table->getInsertRecord() directly, in parts it uses
756  field objects with their local pointers into table->getInsertRecord().
757  If 'buf' is distinct from table->getInsertRecord(), we need to move
758  all record references. This is table->getInsertRecord() itself and
759  the field pointers of the fields used in this key.
760  */
761  if (ptrdiff)
762  {
763  save_record_0= getTable()->getInsertRecord();
764  getTable()->record[0]= buf;
765  key_info= getTable()->key_info + active_index;
766  key_part= key_info->key_part;
767  key_part_end= key_part + key_info->key_parts;
768  for (; key_part < key_part_end; key_part++)
769  {
770  assert(key_part->field);
771  key_part->field->move_field_offset(ptrdiff);
772  }
773  }
774 
775  if (key_cmp_if_same(getTable(), key, active_index, keylen))
776  {
777  getTable()->status=STATUS_NOT_FOUND;
778  error= HA_ERR_END_OF_FILE;
779  }
780 
781  /* Move back if necessary. */
782  if (ptrdiff)
783  {
784  getTable()->record[0]= save_record_0;
785  for (key_part= key_info->key_part; key_part < key_part_end; key_part++)
786  key_part->field->move_field_offset(-ptrdiff);
787  }
788  return error;
789 }
790 
791 
792 /****************************************************************************
793 ** Some general functions that isn't in the Cursor class
794 ****************************************************************************/
795 
817 double Cursor::index_only_read_time(uint32_t keynr, double key_records)
818 {
819  uint32_t keys_per_block= (stats.block_size/2/
820  (getTable()->key_info[keynr].key_length + ref_length) + 1);
821  return ((double) (key_records + keys_per_block-1) /
822  (double) keys_per_block);
823 }
824 
825 
826 /****************************************************************************
827  * Default MRR implementation (MRR to non-MRR converter)
828  ***************************************************************************/
829 
861 ha_rows
862 Cursor::multi_range_read_info_const(uint32_t keyno, RANGE_SEQ_IF *seq,
863  void *seq_init_param,
864  uint32_t ,
865  uint32_t *bufsz, uint32_t *flags, optimizer::CostVector *cost)
866 {
867  KEY_MULTI_RANGE range;
868  range_seq_t seq_it;
869  ha_rows rows, total_rows= 0;
870  uint32_t n_ranges=0;
871 
872  /* Default MRR implementation doesn't need buffer */
873  *bufsz= 0;
874 
875  seq_it= seq->init(seq_init_param, n_ranges, *flags);
876  while (!seq->next(seq_it, &range))
877  {
878  n_ranges++;
879  key_range *min_endp, *max_endp;
880  {
881  min_endp= range.start_key.length? &range.start_key : NULL;
882  max_endp= range.end_key.length? &range.end_key : NULL;
883  }
884  if ((range.range_flag & UNIQUE_RANGE) && !(range.range_flag & NULL_RANGE))
885  rows= 1; /* there can be at most one row */
886  else
887  {
888  if (HA_POS_ERROR == (rows= this->records_in_range(keyno, min_endp,
889  max_endp)))
890  {
891  /* Can't scan one range => can't do MRR scan at all */
892  total_rows= HA_POS_ERROR;
893  break;
894  }
895  }
896  total_rows += rows;
897  }
898 
899  if (total_rows != HA_POS_ERROR)
900  {
901  /* The following calculation is the same as in multi_range_read_info(): */
902  *flags |= HA_MRR_USE_DEFAULT_IMPL;
903  cost->zero();
904  cost->setAvgIOCost(1); /* assume random seeks */
905  if ((*flags & HA_MRR_INDEX_ONLY) && total_rows > 2)
906  cost->setIOCount(index_only_read_time(keyno, (uint32_t)total_rows));
907  else
908  cost->setIOCount(read_time(keyno, n_ranges, total_rows));
909  cost->setCpuCost((double) total_rows / TIME_FOR_COMPARE + 0.01);
910  }
911  return total_rows;
912 }
913 
914 
949 int Cursor::multi_range_read_info(uint32_t keyno, uint32_t n_ranges, uint32_t n_rows,
950  uint32_t *bufsz, uint32_t *flags, optimizer::CostVector *cost)
951 {
952  *bufsz= 0; /* Default implementation doesn't need a buffer */
953 
954  *flags |= HA_MRR_USE_DEFAULT_IMPL;
955 
956  cost->zero();
957  cost->setAvgIOCost(1); /* assume random seeks */
958 
959  /* Produce the same cost as non-MRR code does */
960  if (*flags & HA_MRR_INDEX_ONLY)
961  cost->setIOCount(index_only_read_time(keyno, n_rows));
962  else
963  cost->setIOCount(read_time(keyno, n_ranges, n_rows));
964  return 0;
965 }
966 
967 
1009 int
1010 Cursor::multi_range_read_init(RANGE_SEQ_IF *seq_funcs, void *seq_init_param,
1011  uint32_t n_ranges, uint32_t mode)
1012 {
1013  mrr_iter= seq_funcs->init(seq_init_param, n_ranges, mode);
1014  mrr_funcs= *seq_funcs;
1015  mrr_is_output_sorted= test(mode & HA_MRR_SORTED);
1016  mrr_have_range= false;
1017 
1018  return 0;
1019 }
1020 
1021 
1035 int Cursor::multi_range_read_next(char **range_info)
1036 {
1037  int result= 0;
1038  int range_res= 0;
1039 
1040  if (not mrr_have_range)
1041  {
1042  mrr_have_range= true;
1043  goto start;
1044  }
1045 
1046  do
1047  {
1048  /* Save a call if there can be only one row in range. */
1049  if (mrr_cur_range.range_flag != (UNIQUE_RANGE | EQ_RANGE))
1050  {
1051  result= read_range_next();
1052  /* On success or non-EOF errors jump to the end. */
1053  if (result != HA_ERR_END_OF_FILE)
1054  break;
1055  }
1056  else
1057  {
1058  if (was_semi_consistent_read())
1059  goto scan_it_again;
1060  /*
1061  We need to set this for the last range only, but checking this
1062  condition is more expensive than just setting the result code.
1063  */
1064  result= HA_ERR_END_OF_FILE;
1065  }
1066 
1067 start:
1068  /* Try the next range(s) until one matches a record. */
1069  while (!(range_res= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
1070  {
1071 scan_it_again:
1072  result= read_range_first(mrr_cur_range.start_key.keypart_map ?
1073  &mrr_cur_range.start_key : 0,
1074  mrr_cur_range.end_key.keypart_map ?
1075  &mrr_cur_range.end_key : 0,
1076  test(mrr_cur_range.range_flag & EQ_RANGE),
1077  mrr_is_output_sorted);
1078  if (result != HA_ERR_END_OF_FILE)
1079  break;
1080  }
1081  }
1082  while ((result == HA_ERR_END_OF_FILE) && !range_res);
1083 
1084  *range_info= mrr_cur_range.ptr;
1085  return result;
1086 }
1087 
1088 
1089 /* **************************************************************************
1090  * DS-MRR implementation ends
1091  ***************************************************************************/
1092 
1111 int Cursor::read_range_first(const key_range *start_key,
1112  const key_range *end_key,
1113  bool eq_range_arg,
1114  bool )
1115 {
1116  int result;
1117 
1118  eq_range= eq_range_arg;
1119  end_range= 0;
1120  if (end_key)
1121  {
1122  end_range= &save_end_range;
1123  save_end_range= *end_key;
1124  key_compare_result_on_equal= ((end_key->flag == HA_READ_BEFORE_KEY) ? 1 :
1125  (end_key->flag == HA_READ_AFTER_KEY) ? -1 : 0);
1126  }
1127  range_key_part= getTable()->key_info[active_index].key_part;
1128 
1129  if (!start_key) // Read first record
1130  result= index_first(getTable()->getInsertRecord());
1131  else
1132  result= index_read_map(getTable()->getInsertRecord(),
1133  start_key->key,
1134  start_key->keypart_map,
1135  start_key->flag);
1136  if (result)
1137  return((result == HA_ERR_KEY_NOT_FOUND)
1138  ? HA_ERR_END_OF_FILE
1139  : result);
1140 
1141  return (compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
1142 }
1143 
1144 
1158 int Cursor::read_range_next()
1159 {
1160  int result;
1161 
1162  if (eq_range)
1163  {
1164  /* We trust that index_next_same always gives a row in range */
1165  return(index_next_same(getTable()->getInsertRecord(),
1166  end_range->key,
1167  end_range->length));
1168  }
1169  result= index_next(getTable()->getInsertRecord());
1170  if (result)
1171  return result;
1172  return(compare_key(end_range) <= 0 ? 0 : HA_ERR_END_OF_FILE);
1173 }
1174 
1175 
1191 int Cursor::compare_key(key_range *range)
1192 {
1193  int cmp;
1194  if (not range)
1195  return 0; // No max range
1196  cmp= key_cmp(range_key_part, range->key, range->length);
1197  if (!cmp)
1198  cmp= key_compare_result_on_equal;
1199  return cmp;
1200 }
1201 
1202 int Cursor::index_read_idx_map(unsigned char * buf, uint32_t index,
1203  const unsigned char * key,
1204  key_part_map keypart_map,
1205  enum ha_rkey_function find_flag)
1206 {
1207  int error, error1;
1208  error= doStartIndexScan(index, 0);
1209  if (!error)
1210  {
1211  error= index_read_map(buf, key, keypart_map, find_flag);
1212  error1= doEndIndexScan();
1213  }
1214  return error ? error : error1;
1215 }
1216 
1217 static bool log_row_for_replication(Table* table,
1218  const unsigned char *before_record,
1219  const unsigned char *after_record)
1220 {
1221  Session *const session= table->in_use;
1222 
1223  if (table->getShare()->getType() || not TransactionServices::shouldConstructMessages())
1224  return false;
1225 
1226  bool result= false;
1227 
1228  switch (session->lex().sql_command)
1229  {
1230  case SQLCOM_CREATE_TABLE:
1231  /*
1232  * We are in a CREATE TABLE ... SELECT statement
1233  * and the kernel has already created the table
1234  * and put a CreateTableStatement in the active
1235  * Transaction message. Here, we add a new InsertRecord
1236  * to a new Transaction message (because the above
1237  * CREATE TABLE will commit the transaction containing
1238  * it).
1239  */
1240  result= TransactionServices::insertRecord(*session, *table);
1241  break;
1242  case SQLCOM_REPLACE:
1243  case SQLCOM_REPLACE_SELECT:
1244  /*
1245  * This is a total hack because of the code that is
1246  * in write_record() in sql_insert.cc. During
1247  * a REPLACE statement, a call to insertRecord() is
1248  * called. If it fails, then a call to deleteRecord()
1249  * is called, followed by a repeat of the original
1250  * call to insertRecord(). So, log_row_for_replication
1251  * could be called multiple times for a REPLACE
1252  * statement. The below looks at the values of before_record
1253  * and after_record to determine which call to this
1254  * function is for the delete or the insert, since NULL
1255  * is passed for after_record for the delete and NULL is
1256  * passed for before_record for the insert...
1257  *
1258  * In addition, there is an optimization that allows an
1259  * engine to convert the above delete + insert into an
1260  * update, so we must also check for this case below...
1261  */
1262  if (after_record == NULL)
1263  {
1264  /*
1265  * The storage engine is passed the record in table->record[1]
1266  * as the row to delete (this is the conflicting row), so
1267  * we need to notify TransactionService to use that row.
1268  */
1269  TransactionServices::deleteRecord(*session, *table, true);
1270  /*
1271  * We set the "current" statement message to NULL. This triggers
1272  * the replication services component to generate a new statement
1273  * message for the inserted record which will come next.
1274  */
1275  TransactionServices::finalizeStatementMessage(*session->getStatementMessage(), *session);
1276  }
1277  else
1278  {
1279  if (before_record == NULL)
1280  result= TransactionServices::insertRecord(*session, *table);
1281  else
1282  TransactionServices::updateRecord(*session, *table, before_record, after_record);
1283  }
1284  break;
1285  case SQLCOM_INSERT:
1286  case SQLCOM_INSERT_SELECT:
1287  case SQLCOM_LOAD:
1288  /*
1289  * The else block below represents an
1290  * INSERT ... ON DUPLICATE KEY UPDATE that
1291  * has hit a key conflict and actually done
1292  * an update.
1293  */
1294  if (before_record == NULL)
1295  result= TransactionServices::insertRecord(*session, *table);
1296  else
1297  TransactionServices::updateRecord(*session, *table, before_record, after_record);
1298  break;
1299 
1300  case SQLCOM_UPDATE:
1301  TransactionServices::updateRecord(*session, *table, before_record, after_record);
1302  break;
1303 
1304  case SQLCOM_DELETE:
1305  TransactionServices::deleteRecord(*session, *table);
1306  break;
1307  default:
1308  break;
1309  }
1310 
1311  return result;
1312 }
1313 
1314 int Cursor::ha_external_lock(Session *session, int lock_type)
1315 {
1316  /*
1317  Whether this is lock or unlock, this should be true, and is to verify that
1318  if get_auto_increment() was called (thus may have reserved intervals or
1319  taken a table lock), ha_release_auto_increment() was too.
1320  */
1321  assert(next_insert_id == 0);
1322 
1323  if (DRIZZLE_CURSOR_RDLOCK_START_ENABLED() ||
1324  DRIZZLE_CURSOR_WRLOCK_START_ENABLED() ||
1325  DRIZZLE_CURSOR_UNLOCK_START_ENABLED())
1326  {
1327  if (lock_type == F_RDLCK)
1328  {
1329  DRIZZLE_CURSOR_RDLOCK_START(getTable()->getShare()->getSchemaName(),
1330  getTable()->getShare()->getTableName());
1331  }
1332  else if (lock_type == F_WRLCK)
1333  {
1334  DRIZZLE_CURSOR_WRLOCK_START(getTable()->getShare()->getSchemaName(),
1335  getTable()->getShare()->getTableName());
1336  }
1337  else if (lock_type == F_UNLCK)
1338  {
1339  DRIZZLE_CURSOR_UNLOCK_START(getTable()->getShare()->getSchemaName(),
1340  getTable()->getShare()->getTableName());
1341  }
1342  }
1343 
1344  /*
1345  We cache the table flags if the locking succeeded. Otherwise, we
1346  keep them as they were when they were fetched in ha_open().
1347  */
1348 
1349  int error= external_lock(session, lock_type);
1350 
1351  if (DRIZZLE_CURSOR_RDLOCK_DONE_ENABLED() ||
1352  DRIZZLE_CURSOR_WRLOCK_DONE_ENABLED() ||
1353  DRIZZLE_CURSOR_UNLOCK_DONE_ENABLED())
1354  {
1355  if (lock_type == F_RDLCK)
1356  {
1357  DRIZZLE_CURSOR_RDLOCK_DONE(error);
1358  }
1359  else if (lock_type == F_WRLCK)
1360  {
1361  DRIZZLE_CURSOR_WRLOCK_DONE(error);
1362  }
1363  else if (lock_type == F_UNLCK)
1364  {
1365  DRIZZLE_CURSOR_UNLOCK_DONE(error);
1366  }
1367  }
1368 
1369  return error;
1370 }
1371 
1372 
1376 int Cursor::ha_reset()
1377 {
1378  /* Check that we have called all proper deallocation functions */
1379  assert(! getTable()->getShare()->all_set.none());
1380  assert(getTable()->key_read == 0);
1381  /* ensure that ha_index_end / endTableScan has been called */
1382  assert(inited == NONE);
1383  /* Free cache used by filesort */
1384  getTable()->free_io_cache();
1385  /* reset the bitmaps to point to defaults */
1386  getTable()->default_column_bitmaps();
1387  return(reset());
1388 }
1389 
1390 
1391 int Cursor::insertRecord(unsigned char *buf)
1392 {
1393  int error;
1394 
1395  /*
1396  * If we have a timestamp column, update it to the current time
1397  *
1398  * @TODO Technically, the below two lines can be take even further out of the
1399  * Cursor interface and into the fill_record() method.
1400  */
1401  if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
1402  {
1403  getTable()->timestamp_field->set_time();
1404  }
1405 
1406  DRIZZLE_INSERT_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1407  setTransactionReadWrite();
1408 
1409  if (unlikely(plugin::EventObserver::beforeInsertRecord(*getTable(), buf)))
1410  {
1411  error= ER_EVENT_OBSERVER_PLUGIN;
1412  }
1413  else
1414  {
1415  error= doInsertRecord(buf);
1416  if (unlikely(plugin::EventObserver::afterInsertRecord(*getTable(), buf, error)))
1417  {
1418  error= ER_EVENT_OBSERVER_PLUGIN;
1419  }
1420  }
1421 
1422  ha_statistic_increment(&system_status_var::ha_write_count);
1423 
1424  DRIZZLE_INSERT_ROW_DONE(error);
1425 
1426  if (unlikely(error))
1427  {
1428  return error;
1429  }
1430 
1431  if (unlikely(log_row_for_replication(getTable(), NULL, buf)))
1432  return HA_ERR_LOG_ROW_FOR_REPLICATION_FAILED;
1433 
1434  return 0;
1435 }
1436 
1437 
1438 int Cursor::updateRecord(const unsigned char *old_data, unsigned char *new_data)
1439 {
1440  int error;
1441 
1442  /*
1443  Some storage engines require that the new record is in getInsertRecord()
1444  (and the old record is in getUpdateRecord()).
1445  */
1446  assert(new_data == getTable()->getInsertRecord());
1447 
1448  DRIZZLE_UPDATE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1449  setTransactionReadWrite();
1450  if (unlikely(plugin::EventObserver::beforeUpdateRecord(*getTable(), old_data, new_data)))
1451  {
1452  error= ER_EVENT_OBSERVER_PLUGIN;
1453  }
1454  else
1455  {
1456  if (getTable()->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
1457  {
1458  getTable()->timestamp_field->set_time();
1459  }
1460 
1461  error= doUpdateRecord(old_data, new_data);
1462  if (unlikely(plugin::EventObserver::afterUpdateRecord(*getTable(), old_data, new_data, error)))
1463  {
1464  error= ER_EVENT_OBSERVER_PLUGIN;
1465  }
1466  }
1467 
1468  ha_statistic_increment(&system_status_var::ha_update_count);
1469 
1470  DRIZZLE_UPDATE_ROW_DONE(error);
1471 
1472  if (unlikely(error))
1473  {
1474  return error;
1475  }
1476 
1477  if (unlikely(log_row_for_replication(getTable(), old_data, new_data)))
1478  return HA_ERR_LOG_ROW_FOR_REPLICATION_FAILED;
1479 
1480  return 0;
1481 }
1482 TableShare *Cursor::getShare()
1483 {
1484  return getTable()->getMutableShare();
1485 }
1486 
1487 int Cursor::deleteRecord(const unsigned char *buf)
1488 {
1489  int error;
1490 
1491  DRIZZLE_DELETE_ROW_START(getTable()->getShare()->getSchemaName(), getTable()->getShare()->getTableName());
1492  setTransactionReadWrite();
1493  if (unlikely(plugin::EventObserver::beforeDeleteRecord(*getTable(), buf)))
1494  {
1495  error= ER_EVENT_OBSERVER_PLUGIN;
1496  }
1497  else
1498  {
1499  error= doDeleteRecord(buf);
1500  if (unlikely(plugin::EventObserver::afterDeleteRecord(*getTable(), buf, error)))
1501  {
1502  error= ER_EVENT_OBSERVER_PLUGIN;
1503  }
1504  }
1505 
1506  ha_statistic_increment(&system_status_var::ha_delete_count);
1507 
1508  DRIZZLE_DELETE_ROW_DONE(error);
1509 
1510  if (unlikely(error))
1511  return error;
1512 
1513  if (unlikely(log_row_for_replication(getTable(), buf, NULL)))
1514  return HA_ERR_LOG_ROW_FOR_REPLICATION_FAILED;
1515 
1516  return 0;
1517 }
1518 
1519 } /* namespace drizzled */
message::Statement * getStatementMessage() const
Definition: session.h:896
uint64_t prev_insert_id(uint64_t nr, drizzle_system_variables *variables)
Definition: cursor.cc:341
int key_cmp(KeyPartInfo *key_part, const unsigned char *key, uint32_t key_length)
Definition: key.cc:438
#define TIME_FOR_COMPARE
Definition: definitions.h:144
uint64_t compute_next_insert_id(uint64_t nr, drizzle_system_variables *variables)
Definition: cursor.cc:301
Session * in_use
Definition: table.h:123
bool key_cmp_if_same(Table *table, const unsigned char *key, uint32_t idx, uint32_t key_length)
Definition: key.cc:266