Drizzled Public API Documentation

mi_write.cc
1 /* Copyright (C) 2000-2006 MySQL AB
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 /* Write a row to a MyISAM table */
17 
18 #include "myisam_priv.h"
19 
20 #include <drizzled/internal/m_string.h>
21 #include <drizzled/util/test.h>
22 #include <drizzled/tree.h>
23 
24 using namespace drizzled;
25 
26 #define MAX_POINTER_LENGTH 8
27 
28  /* Functions declared in this file */
29 
30 static int w_search(MI_INFO *info,MI_KEYDEF *keyinfo,
31  uint32_t comp_flag, unsigned char *key,
32  uint32_t key_length, internal::my_off_t pos, unsigned char *father_buff,
33  unsigned char *father_keypos, internal::my_off_t father_page,
34  bool insert_last);
35 static int _mi_balance_page(MI_INFO *info,MI_KEYDEF *keyinfo,unsigned char *key,
36  unsigned char *curr_buff,unsigned char *father_buff,
37  unsigned char *father_keypos,internal::my_off_t father_page);
38 static unsigned char *_mi_find_last_pos(MI_KEYDEF *keyinfo, unsigned char *page,
39  unsigned char *key, uint32_t *return_key_length,
40  unsigned char **after_key);
41 int _mi_ck_write_tree(register MI_INFO *info, uint32_t keynr,unsigned char *key,
42  uint32_t key_length);
43 int _mi_ck_write_btree(register MI_INFO *info, uint32_t keynr,unsigned char *key,
44  uint32_t key_length);
45 
46  /* Write new record to database */
47 
48 int mi_write(MI_INFO *info, unsigned char *record)
49 {
50  MYISAM_SHARE *share=info->s;
51  uint32_t i;
52  int save_errno;
53  internal::my_off_t filepos;
54  unsigned char *buff;
55  bool lock_tree= share->concurrent_insert;
56 
57  if (share->options & HA_OPTION_READ_ONLY_DATA)
58  {
59  return(errno=EACCES);
60  }
61  if (_mi_readinfo(info,F_WRLCK,1))
62  return(errno);
63  filepos= ((share->state.dellink != HA_OFFSET_ERROR &&
64  !info->append_insert_at_end) ?
65  share->state.dellink :
66  info->state->data_file_length);
67 
68  if (share->base.reloc == (ha_rows) 1 &&
69  share->base.records == (ha_rows) 1 &&
70  info->state->records == (ha_rows) 1)
71  { /* System file */
72  errno=HA_ERR_RECORD_FILE_FULL;
73  goto err2;
74  }
75  if (info->state->key_file_length >= share->base.margin_key_file_length)
76  {
77  errno=HA_ERR_INDEX_FILE_FULL;
78  goto err2;
79  }
80  if (_mi_mark_file_changed(info))
81  goto err2;
82 
83  /* Calculate and check all unique constraints */
84  for (i=0 ; i < share->state.header.uniques ; i++)
85  {
86  if (mi_check_unique(info,share->uniqueinfo+i,record,
87  mi_unique_hash(share->uniqueinfo+i,record),
88  HA_OFFSET_ERROR))
89  goto err2;
90  }
91 
92  /* Write all keys to indextree */
93 
94  buff=info->lastkey2;
95  for (i=0 ; i < share->base.keys ; i++)
96  {
97  if (mi_is_key_active(share->state.key_map, i))
98  {
99  bool local_lock_tree= (lock_tree &&
100  !(info->bulk_insert &&
101  info->bulk_insert[i].is_inited()));
102  if (local_lock_tree)
103  {
104  share->keyinfo[i].version++;
105  }
106  {
107  if (share->keyinfo[i].ck_insert(info,i,buff,
108  _mi_make_key(info,i,buff,record,filepos)))
109  {
110  goto err;
111  }
112  }
113 
114  /* The above changed info->lastkey2. Inform mi_rnext_same(). */
115  info->update&= ~HA_STATE_RNEXT_SAME;
116  }
117  }
118  if (share->calc_checksum)
119  info->checksum=(*share->calc_checksum)(info,record);
120  if (!(info->opt_flag & OPT_NO_ROWS))
121  {
122  if ((*share->write_record)(info,record))
123  goto err;
124  info->state->checksum+=info->checksum;
125  }
126  if (share->base.auto_key)
127  set_if_bigger(info->s->state.auto_increment,
128  retrieve_auto_increment(info, record));
129  info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN |
130  HA_STATE_ROW_CHANGED);
131  info->state->records++;
132  info->lastpos=filepos;
133  _mi_writeinfo(info, WRITEINFO_UPDATE_KEYFILE);
134 
135  /*
136  Update status of the table. We need to do so after each row write
137  for the log tables, as we want the new row to become visible to
138  other threads as soon as possible. We don't lock mutex here
139  (as it is required by pthread memory visibility rules) as (1) it's
140  not critical to use outdated share->is_log_table value (2) locking
141  mutex here for every write is too expensive.
142  */
143  if (share->is_log_table) // Log table do not exist in Drizzle
144  assert(0);
145 
146  return(0);
147 
148 err:
149  save_errno=errno;
150  if (errno == HA_ERR_FOUND_DUPP_KEY || errno == HA_ERR_RECORD_FILE_FULL ||
151  errno == HA_ERR_NULL_IN_SPATIAL || errno == HA_ERR_OUT_OF_MEM)
152  {
153  if (info->bulk_insert)
154  {
155  uint32_t j;
156  for (j=0 ; j < share->base.keys ; j++)
157  mi_flush_bulk_insert(info, j);
158  }
159  info->errkey= (int) i;
160  while ( i-- > 0)
161  {
162  if (mi_is_key_active(share->state.key_map, i))
163  {
164  {
165  uint32_t key_length=_mi_make_key(info,i,buff,record,filepos);
166  if (_mi_ck_delete(info,i,buff,key_length))
167  {
168  break;
169  }
170  }
171  }
172  }
173  }
174  else
175  {
176  mi_print_error(info->s, HA_ERR_CRASHED);
177  mi_mark_crashed(info);
178  }
179  info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED);
180  errno=save_errno;
181 err2:
182  save_errno=errno;
183  _mi_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
184  return(errno=save_errno);
185 } /* mi_write */
186 
187 
188  /* Write one key to btree */
189 
190 int _mi_ck_write(MI_INFO *info, uint32_t keynr, unsigned char *key, uint32_t key_length)
191 {
192  if (info->bulk_insert && info->bulk_insert[keynr].is_inited())
193  {
194  return(_mi_ck_write_tree(info, keynr, key, key_length));
195  }
196  else
197  {
198  return(_mi_ck_write_btree(info, keynr, key, key_length));
199  }
200 } /* _mi_ck_write */
201 
202 
203 /**********************************************************************
204  * Normal insert code *
205  **********************************************************************/
206 
207 int _mi_ck_write_btree(register MI_INFO *info, uint32_t keynr, unsigned char *key,
208  uint32_t key_length)
209 {
210  uint32_t error;
211  uint32_t comp_flag;
212  MI_KEYDEF *keyinfo=info->s->keyinfo+keynr;
213  internal::my_off_t *root=&info->s->state.key_root[keynr];
214 
215  if (keyinfo->flag & HA_SORT_ALLOWS_SAME)
216  comp_flag=SEARCH_BIGGER; /* Put after same key */
217  else if (keyinfo->flag & (HA_NOSAME))
218  {
219  comp_flag=SEARCH_FIND | SEARCH_UPDATE; /* No duplicates */
220  if (keyinfo->flag & HA_NULL_ARE_EQUAL)
221  comp_flag|= SEARCH_NULL_ARE_EQUAL;
222  }
223  else
224  comp_flag=SEARCH_SAME; /* Keys in rec-pos order */
225 
226  error=_mi_ck_real_write_btree(info, keyinfo, key, key_length,
227  root, comp_flag);
228  return(error);
229 } /* _mi_ck_write_btree */
230 
231 int _mi_ck_real_write_btree(MI_INFO *info, MI_KEYDEF *keyinfo,
232  unsigned char *key, uint32_t key_length, internal::my_off_t *root, uint32_t comp_flag)
233 {
234  int error;
235  /* key_length parameter is used only if comp_flag is SEARCH_FIND */
236  if (*root == HA_OFFSET_ERROR ||
237  (error=w_search(info, keyinfo, comp_flag, key, key_length,
238  *root, (unsigned char *) 0, (unsigned char*) 0,
239  (internal::my_off_t) 0, 1)) > 0)
240  error=_mi_enlarge_root(info,keyinfo,key,root);
241  return(error);
242 } /* _mi_ck_real_write_btree */
243 
244 
245  /* Make a new root with key as only pointer */
246 
247 int _mi_enlarge_root(MI_INFO *info, MI_KEYDEF *keyinfo, unsigned char *key,
248  internal::my_off_t *root)
249 {
250  uint32_t t_length,nod_flag;
251  MI_KEY_PARAM s_temp;
252  MYISAM_SHARE *share=info->s;
253 
254  nod_flag= (*root != HA_OFFSET_ERROR) ? share->base.key_reflength : 0;
255  _mi_kpointer(info,info->buff+2,*root); /* if nod */
256  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(unsigned char*) 0,
257  (unsigned char*) 0, (unsigned char*) 0, key,&s_temp);
258  mi_putint(info->buff,t_length+2+nod_flag,nod_flag);
259  (*keyinfo->store_key)(keyinfo,info->buff+2+nod_flag,&s_temp);
260  info->buff_used=info->page_changed=1; /* info->buff is used */
261  if ((*root= _mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR ||
262  _mi_write_keypage(info,keyinfo,*root,DFLT_INIT_HITS,info->buff))
263  return(-1);
264  return(0);
265 } /* _mi_enlarge_root */
266 
267 
268  /*
269  Search after a position for a key and store it there
270  Returns -1 = error
271  0 = ok
272  1 = key should be stored in higher tree
273  */
274 
275 static int w_search(register MI_INFO *info, register MI_KEYDEF *keyinfo,
276  uint32_t comp_flag, unsigned char *key, uint32_t key_length, internal::my_off_t page,
277  unsigned char *father_buff, unsigned char *father_keypos,
278  internal::my_off_t father_page, bool insert_last)
279 {
280  int error,flag;
281  uint32_t nod_flag, search_key_length;
282  unsigned char *temp_buff,*keypos;
283  unsigned char keybuff[MI_MAX_KEY_BUFF];
284  bool was_last_key;
285  internal::my_off_t next_page, dupp_key_pos;
286 
287  search_key_length= (comp_flag & SEARCH_FIND) ? key_length : USE_WHOLE_KEY;
288  if (!(temp_buff= (unsigned char*) malloc(keyinfo->block_length+
289  MI_MAX_KEY_BUFF*2)))
290  return(-1);
291  if (!_mi_fetch_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff,0))
292  goto err;
293 
294  flag=(*keyinfo->bin_search)(info,keyinfo,temp_buff,key,search_key_length,
295  comp_flag, &keypos, keybuff, &was_last_key);
296  nod_flag=mi_test_if_nod(temp_buff);
297  if (flag == 0)
298  {
299  uint32_t tmp_key_length;
300  /* get position to record with duplicated key */
301  tmp_key_length=(*keyinfo->get_key)(keyinfo,nod_flag,&keypos,keybuff);
302  if (tmp_key_length)
303  dupp_key_pos=_mi_dpos(info,0,keybuff+tmp_key_length);
304  else
305  dupp_key_pos= HA_OFFSET_ERROR;
306 
307  {
308  info->dupp_key_pos= dupp_key_pos;
309  free(temp_buff);
310  errno=HA_ERR_FOUND_DUPP_KEY;
311  return(-1);
312  }
313  }
314  if (flag == MI_FOUND_WRONG_KEY)
315  return(-1);
316  if (!was_last_key)
317  insert_last=0;
318  next_page=_mi_kpos(nod_flag,keypos);
319  if (next_page == HA_OFFSET_ERROR ||
320  (error=w_search(info, keyinfo, comp_flag, key, key_length, next_page,
321  temp_buff, keypos, page, insert_last)) >0)
322  {
323  error=_mi_insert(info,keyinfo,key,temp_buff,keypos,keybuff,father_buff,
324  father_keypos,father_page, insert_last);
325  if (_mi_write_keypage(info,keyinfo,page,DFLT_INIT_HITS,temp_buff))
326  goto err;
327  }
328  free(temp_buff);
329  return(error);
330 err:
331  free(temp_buff);
332  return (-1);
333 } /* w_search */
334 
335 
336 /*
337  Insert new key.
338 
339  SYNOPSIS
340  _mi_insert()
341  info Open table information.
342  keyinfo Key definition information.
343  key New key.
344  anc_buff Key page (beginning).
345  key_pos Position in key page where to insert.
346  key_buff Copy of previous key.
347  father_buff parent key page for balancing.
348  father_key_pos position in parent key page for balancing.
349  father_page position of parent key page in file.
350  insert_last If to append at end of page.
351 
352  DESCRIPTION
353  Insert new key at right of key_pos.
354 
355  RETURN
356  2 if key contains key to upper level.
357  0 OK.
358  < 0 Error.
359 */
360 
361 int _mi_insert(register MI_INFO *info, register MI_KEYDEF *keyinfo,
362  unsigned char *key, unsigned char *anc_buff, unsigned char *key_pos, unsigned char *key_buff,
363  unsigned char *father_buff, unsigned char *father_key_pos, internal::my_off_t father_page,
364  bool insert_last)
365 {
366  uint32_t a_length,nod_flag;
367  int t_length;
368  unsigned char *endpos, *prev_key;
369  MI_KEY_PARAM s_temp;
370 
371  nod_flag=mi_test_if_nod(anc_buff);
372  a_length=mi_getint(anc_buff);
373  endpos= anc_buff+ a_length;
374  prev_key=(key_pos == anc_buff+2+nod_flag ? (unsigned char*) 0 : key_buff);
375  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,
376  (key_pos == endpos ? (unsigned char*) 0 : key_pos),
377  prev_key, prev_key,
378  key,&s_temp);
379 
380  if (t_length > 0)
381  {
382  if (t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
383  {
384  mi_print_error(info->s, HA_ERR_CRASHED);
385  errno=HA_ERR_CRASHED;
386  return(-1);
387  }
388  internal::bmove_upp((unsigned char*) endpos+t_length,(unsigned char*) endpos,(uint) (endpos-key_pos));
389  }
390  else
391  {
392  if (-t_length >= keyinfo->maxlength*2+MAX_POINTER_LENGTH)
393  {
394  mi_print_error(info->s, HA_ERR_CRASHED);
395  errno=HA_ERR_CRASHED;
396  return(-1);
397  }
398  memmove(key_pos, key_pos - t_length, endpos - key_pos + t_length);
399  }
400  (*keyinfo->store_key)(keyinfo,key_pos,&s_temp);
401  a_length+=t_length;
402  mi_putint(anc_buff,a_length,nod_flag);
403  if (a_length <= keyinfo->block_length)
404  {
405  return(0); /* There is room on page */
406  }
407  /* Page is full */
408  if (nod_flag)
409  insert_last=0;
410  if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) &&
411  father_buff && !insert_last)
412  return(_mi_balance_page(info,keyinfo,key,anc_buff,father_buff,
413  father_key_pos,father_page));
414  return(_mi_split_page(info,keyinfo,key,anc_buff,key_buff, insert_last));
415 } /* _mi_insert */
416 
417 
418  /* split a full page in two and assign emerging item to key */
419 
420 int _mi_split_page(register MI_INFO *info, register MI_KEYDEF *keyinfo,
421  unsigned char *key, unsigned char *buff, unsigned char *key_buff,
422  bool insert_last_key)
423 {
424  uint32_t length,a_length,key_ref_length,t_length,nod_flag,key_length;
425  unsigned char *key_pos,*pos, *after_key= NULL;
426  internal::my_off_t new_pos;
427  MI_KEY_PARAM s_temp;
428 
429  if (info->s->keyinfo+info->lastinx == keyinfo)
430  info->page_changed=1; /* Info->buff is used */
431  info->buff_used=1;
432  nod_flag=mi_test_if_nod(buff);
433  key_ref_length=2+nod_flag;
434  if (insert_last_key)
435  key_pos=_mi_find_last_pos(keyinfo,buff,key_buff, &key_length, &after_key);
436  else
437  key_pos=_mi_find_half_pos(nod_flag,keyinfo,buff,key_buff, &key_length,
438  &after_key);
439  if (!key_pos)
440  return(-1);
441 
442  length=(uint) (key_pos-buff);
443  a_length=mi_getint(buff);
444  mi_putint(buff,length,nod_flag);
445 
446  key_pos=after_key;
447  if (nod_flag)
448  {
449  pos=key_pos-nod_flag;
450  memcpy(info->buff + 2, pos, nod_flag);
451  }
452 
453  /* Move middle item to key and pointer to new page */
454  if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
455  return(-1);
456  _mi_kpointer(info,_mi_move_key(keyinfo,key,key_buff),new_pos);
457 
458  /* Store new page */
459  if (!(*keyinfo->get_key)(keyinfo,nod_flag,&key_pos,key_buff))
460  return(-1);
461 
462  t_length=(*keyinfo->pack_key)(keyinfo,nod_flag,(unsigned char *) 0,
463  (unsigned char*) 0, (unsigned char*) 0,
464  key_buff, &s_temp);
465  length=(uint) ((buff+a_length)-key_pos);
466  memcpy(info->buff+key_ref_length+t_length, key_pos, length);
467  (*keyinfo->store_key)(keyinfo,info->buff+key_ref_length,&s_temp);
468  mi_putint(info->buff,length+t_length+key_ref_length,nod_flag);
469 
470  if (_mi_write_keypage(info,keyinfo,new_pos,DFLT_INIT_HITS,info->buff))
471  return(-1);
472  return(2); /* Middle key up */
473 } /* _mi_split_page */
474 
475 
476  /*
477  Calculate how to much to move to split a page in two
478  Returns pointer to start of key.
479  key will contain the key.
480  return_key_length will contain the length of key
481  after_key will contain the position to where the next key starts
482  */
483 
484 unsigned char *_mi_find_half_pos(uint32_t nod_flag, MI_KEYDEF *keyinfo, unsigned char *page,
485  unsigned char *key, uint32_t *return_key_length,
486  unsigned char **after_key)
487 {
488  uint32_t keys,length,key_ref_length;
489  unsigned char *end,*lastpos;
490 
491  key_ref_length=2+nod_flag;
492  length=mi_getint(page)-key_ref_length;
493  page+=key_ref_length;
494  if (!(keyinfo->flag &
495  (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
496  HA_BINARY_PACK_KEY)))
497  {
498  key_ref_length=keyinfo->keylength+nod_flag;
499  keys=length/(key_ref_length*2);
500  *return_key_length=keyinfo->keylength;
501  end=page+keys*key_ref_length;
502  *after_key=end+key_ref_length;
503  memcpy(key,end,key_ref_length);
504  return(end);
505  }
506 
507  end=page+length/2-key_ref_length; /* This is aprox. half */
508  *key='\0';
509  do
510  {
511  lastpos=page;
512  if (!(length=(*keyinfo->get_key)(keyinfo,nod_flag,&page,key)))
513  return(0);
514  } while (page < end);
515  *return_key_length=length;
516  *after_key=page;
517  return(lastpos);
518 } /* _mi_find_half_pos */
519 
520 
521  /*
522  Split buffer at last key
523  Returns pointer to the start of the key before the last key
524  key will contain the last key
525  */
526 
527 static unsigned char *_mi_find_last_pos(MI_KEYDEF *keyinfo, unsigned char *page,
528  unsigned char *key, uint32_t *return_key_length,
529  unsigned char **after_key)
530 {
531  uint32_t keys;
532  uint32_t length;
533  uint32_t last_length= 0;
534  uint32_t key_ref_length;
535  unsigned char *end, *lastpos, *prevpos= NULL;
536  unsigned char key_buff[MI_MAX_KEY_BUFF];
537 
538  key_ref_length=2;
539  length=mi_getint(page)-key_ref_length;
540  page+=key_ref_length;
541  if (!(keyinfo->flag &
542  (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY |
543  HA_BINARY_PACK_KEY)))
544  {
545  keys=length/keyinfo->keylength-2;
546  *return_key_length=length=keyinfo->keylength;
547  end=page+keys*length;
548  *after_key=end+length;
549  memcpy(key,end,length);
550  return(end);
551  }
552 
553  end= page + length - key_ref_length;
554  *key='\0';
555  length=0;
556  lastpos=page;
557  while (page < end)
558  {
559  prevpos=lastpos; lastpos=page;
560  last_length=length;
561  memcpy(key, key_buff, length); /* previous key */
562  if (!(length=(*keyinfo->get_key)(keyinfo,0,&page,key_buff)))
563  {
564  mi_print_error(keyinfo->share, HA_ERR_CRASHED);
565  errno=HA_ERR_CRASHED;
566  return(0);
567  }
568  }
569  *return_key_length=last_length;
570  *after_key=lastpos;
571  return(prevpos);
572 } /* _mi_find_last_pos */
573 
574 
575  /* Balance page with not packed keys with page on right/left */
576  /* returns 0 if balance was done */
577 
578 static int _mi_balance_page(register MI_INFO *info, MI_KEYDEF *keyinfo,
579  unsigned char *key, unsigned char *curr_buff, unsigned char *father_buff,
580  unsigned char *father_key_pos, internal::my_off_t father_page)
581 {
582  bool right;
583  uint32_t k_length,father_length,father_keylength,nod_flag,curr_keylength,
584  right_length,left_length,new_right_length,new_left_length,extra_length,
585  length,keys;
586  unsigned char *pos,*buff,*extra_buff;
587  internal::my_off_t next_page,new_pos;
588  unsigned char tmp_part_key[MI_MAX_KEY_BUFF];
589 
590  k_length=keyinfo->keylength;
591  father_length=mi_getint(father_buff);
592  father_keylength=k_length+info->s->base.key_reflength;
593  nod_flag=mi_test_if_nod(curr_buff);
594  curr_keylength=k_length+nod_flag;
595  info->page_changed=1;
596 
597  if ((father_key_pos != father_buff+father_length &&
598  (info->state->records & 1)) ||
599  father_key_pos == father_buff+2+info->s->base.key_reflength)
600  {
601  right=1;
602  next_page= _mi_kpos(info->s->base.key_reflength,
603  father_key_pos+father_keylength);
604  buff=info->buff;
605  }
606  else
607  {
608  right=0;
609  father_key_pos-=father_keylength;
610  next_page= _mi_kpos(info->s->base.key_reflength,father_key_pos);
611  /* Fix that curr_buff is to left */
612  buff=curr_buff; curr_buff=info->buff;
613  } /* father_key_pos ptr to parting key */
614 
615  if (!_mi_fetch_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff,0))
616  goto err;
617 
618  /* Test if there is room to share keys */
619 
620  left_length=mi_getint(curr_buff);
621  right_length=mi_getint(buff);
622  keys=(left_length+right_length-4-nod_flag*2)/curr_keylength;
623 
624  if ((right ? right_length : left_length) + curr_keylength <=
625  keyinfo->block_length)
626  { /* Merge buffs */
627  new_left_length=2+nod_flag+(keys/2)*curr_keylength;
628  new_right_length=2+nod_flag+((keys+1)/2)*curr_keylength;
629  mi_putint(curr_buff,new_left_length,nod_flag);
630  mi_putint(buff,new_right_length,nod_flag);
631 
632  if (left_length < new_left_length)
633  { /* Move keys buff -> leaf */
634  pos=curr_buff+left_length;
635  memcpy(pos, father_key_pos, k_length);
636  length= new_left_length - left_length - k_length;
637  memcpy(pos+k_length, buff+2, length);
638  pos=buff+2+length;
639  memcpy(father_key_pos, pos, k_length);
640  memmove(buff+2, pos+k_length, new_right_length);
641  }
642  else
643  { /* Move keys -> buff */
644 
645  internal::bmove_upp((unsigned char*) buff+new_right_length,(unsigned char*) buff+right_length,
646  right_length-2);
647  length=new_right_length-right_length-k_length;
648  memcpy(buff+2+length,father_key_pos, k_length);
649  pos=curr_buff+new_left_length;
650  memcpy(father_key_pos, pos, k_length);
651  memcpy(buff+2, pos+k_length, length);
652  }
653 
654  if (_mi_write_keypage(info,keyinfo,next_page,DFLT_INIT_HITS,info->buff) ||
655  _mi_write_keypage(info,keyinfo,father_page,DFLT_INIT_HITS,father_buff))
656  goto err;
657  return(0);
658  }
659 
660  /* curr_buff[] and buff[] are full, lets split and make new nod */
661 
662  extra_buff=info->buff+info->s->base.max_key_block_length;
663  new_left_length=new_right_length=2+nod_flag+(keys+1)/3*curr_keylength;
664  if (keys == 5) /* Too few keys to balance */
665  new_left_length-=curr_keylength;
666  extra_length=nod_flag+left_length+right_length-
667  new_left_length-new_right_length-curr_keylength;
668  mi_putint(curr_buff,new_left_length,nod_flag);
669  mi_putint(buff,new_right_length,nod_flag);
670  mi_putint(extra_buff,extra_length+2,nod_flag);
671 
672  /* move first largest keys to new page */
673  pos=buff+right_length-extra_length;
674  memcpy(extra_buff+2, pos, extra_length);
675  /* Save new parting key */
676  memcpy(tmp_part_key, pos-k_length,k_length);
677  /* Make place for new keys */
678  internal::bmove_upp((unsigned char*) buff+new_right_length,(unsigned char*) pos-k_length,
679  right_length-extra_length-k_length-2);
680  /* Copy keys from left page */
681  pos= curr_buff+new_left_length;
682  length= left_length - new_left_length - k_length;
683  memcpy(buff+2, pos+k_length, length);
684  /* Copy old parting key */
685  memcpy(buff+2+length, father_key_pos, k_length);
686 
687  /* Move new parting keys up to caller */
688  memcpy((right ? key : father_key_pos), pos, k_length);
689  memcpy((right ? father_key_pos : key), tmp_part_key, k_length);
690 
691  if ((new_pos=_mi_new(info,keyinfo,DFLT_INIT_HITS)) == HA_OFFSET_ERROR)
692  goto err;
693  _mi_kpointer(info,key+k_length,new_pos);
694  if (_mi_write_keypage(info,keyinfo,(right ? new_pos : next_page),
695  DFLT_INIT_HITS,info->buff) ||
696  _mi_write_keypage(info,keyinfo,(right ? next_page : new_pos),
697  DFLT_INIT_HITS,extra_buff))
698  goto err;
699 
700  return(1); /* Middle key up */
701 
702 err:
703  return(-1);
704 } /* _mi_balance_page */
705 
706 /**********************************************************************
707  * Bulk insert code *
708  **********************************************************************/
709 
710 typedef struct {
711  MI_INFO *info;
712  uint32_t keynr;
714 
715 int _mi_ck_write_tree(register MI_INFO *info, uint32_t keynr, unsigned char *key,
716  uint32_t key_length)
717 {
718  int error;
719 
720  error= info->bulk_insert[keynr].tree_insert(key,
721  key_length + info->s->rec_reflength,
722  info->bulk_insert[keynr].getCustomArg()) ? 0 : HA_ERR_OUT_OF_MEM ;
723 
724  return(error);
725 } /* _mi_ck_write_tree */
726 
727 
728 /* typeof(_mi_keys_compare)=qsort_cmp2 */
729 
730 static int keys_compare(bulk_insert_param *param, unsigned char *key1, unsigned char *key2)
731 {
732  uint32_t not_used[2];
733  return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg,
734  key1, key2, USE_WHOLE_KEY, SEARCH_SAME,
735  not_used);
736 }
737 
738 
739 static int keys_free(unsigned char *key, TREE_FREE mode, bulk_insert_param *param)
740 {
741  /*
742  Probably I can use info->lastkey here, but I'm not sure,
743  and to be safe I'd better use local lastkey.
744  */
745  unsigned char lastkey[MI_MAX_KEY_BUFF];
746  uint32_t keylen;
747  MI_KEYDEF *keyinfo;
748 
749  switch (mode) {
750  case free_init:
751  if (param->info->s->concurrent_insert)
752  {
753  param->info->s->keyinfo[param->keynr].version++;
754  }
755  return 0;
756  case free_free:
757  keyinfo=param->info->s->keyinfo+param->keynr;
758  keylen=_mi_keylength(keyinfo, key);
759  memcpy(lastkey, key, keylen);
760  return _mi_ck_write_btree(param->info,param->keynr,lastkey,
761  keylen - param->info->s->rec_reflength);
762  case free_end:
763  return 0;
764  }
765  return -1;
766 }
767 
768 
769 int mi_init_bulk_insert(MI_INFO *info, uint32_t cache_size, ha_rows rows)
770 {
771  MYISAM_SHARE *share=info->s;
772  MI_KEYDEF *key=share->keyinfo;
773  bulk_insert_param *params;
774  uint32_t i, num_keys, total_keylength;
775  uint64_t key_map;
776 
777  assert(!info->bulk_insert &&
778  (!rows || rows >= MI_MIN_ROWS_TO_USE_BULK_INSERT));
779 
780  mi_clear_all_keys_active(key_map);
781  for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++)
782  {
783  if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) &&
784  mi_is_key_active(share->state.key_map, i))
785  {
786  num_keys++;
787  mi_set_key_active(key_map, i);
788  total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE;
789  }
790  }
791 
792  if (num_keys==0 ||
793  num_keys * MI_MIN_SIZE_BULK_INSERT_TREE > cache_size)
794  return(0);
795 
796  if (rows && rows*total_keylength < cache_size)
797  cache_size= (uint32_t)rows;
798  else
799  cache_size/=total_keylength*16;
800 
801  info->bulk_insert=(Tree *) malloc((sizeof(Tree)*share->base.keys+
802  sizeof(bulk_insert_param)*num_keys));
803 
804  if (!info->bulk_insert)
805  return(HA_ERR_OUT_OF_MEM);
806 
807  params=(bulk_insert_param *)(info->bulk_insert+share->base.keys);
808  for (i=0 ; i < share->base.keys ; i++)
809  {
810  if (mi_is_key_active(key_map, i))
811  {
812  params->info=info;
813  params->keynr=i;
814  /* Only allocate a 16'th of the buffer at a time */
815  info->bulk_insert[i].init_tree(cache_size * key[i].maxlength,
816  cache_size * key[i].maxlength, 0,
817  (qsort_cmp2)keys_compare, false,
818  (tree_element_free) keys_free, (void *)params++);
819  }
820  else
821  info->bulk_insert[i].setRoot(0);
822  }
823 
824  return(0);
825 }
826 
827 void mi_flush_bulk_insert(MI_INFO *info, uint32_t inx)
828 {
829  if (info->bulk_insert)
830  {
831  if (info->bulk_insert[inx].is_inited())
832  info->bulk_insert[inx].reset_tree();
833  }
834 }
835 
836 void mi_end_bulk_insert(MI_INFO *info)
837 {
838  if (info->bulk_insert)
839  {
840  uint32_t i;
841  for (i=0 ; i < info->s->base.keys ; i++)
842  {
843  if (info->bulk_insert[i].is_inited())
844  {
845  info->bulk_insert[i].delete_tree();
846  }
847  }
848  free((void *)info->bulk_insert);
849  info->bulk_insert=0;
850  }
851 }