Drizzled Public API Documentation

sort.cc
1 /* Copyright (C) 2000-2006 MySQL AB
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 /*
17  Creates a index for a database by reading keys, sorting them and outputing
18  them in sorted order through SORT_INFO functions.
19 */
20 
21 #include "myisam_priv.h"
22 #include <stddef.h>
23 #include <queue>
24 #include <algorithm>
25 
26 /* static variables */
27 
28 #undef MIN_SORT_MEMORY
29 #undef MYF_RW
30 #undef DISK_BUFFER_SIZE
31 
32 #define MERGEBUFF 15
33 #define MERGEBUFF2 31
34 #define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
35 #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
36 #define DISK_BUFFER_SIZE (IO_SIZE*16)
37 
38 using namespace std;
39 using namespace drizzled;
40 
41 
42 /*
43  Pointers of functions for store and read keys from temp file
44 */
45 
46 extern void print_error(const char *fmt,...);
47 
48 /* Functions defined in this file */
49 
50 static ha_rows find_all_keys(MI_SORT_PARAM *info,uint32_t keys,
51  unsigned char **sort_keys,
53  size_t *maxbuffer,
54  internal::io_cache_st *tempfile,
55  internal::io_cache_st *tempfile_for_exceptions);
56 static int write_keys(MI_SORT_PARAM *info,unsigned char **sort_keys,
57  uint32_t count, BUFFPEK *buffpek,internal::io_cache_st *tempfile);
58 static int write_key(MI_SORT_PARAM *info, unsigned char *key,
59  internal::io_cache_st *tempfile);
60 static int write_index(MI_SORT_PARAM *info,unsigned char * *sort_keys,
61  uint32_t count);
62 static int merge_many_buff(MI_SORT_PARAM *info,uint32_t keys,
63  unsigned char * *sort_keys,
64  BUFFPEK *buffpek,size_t *maxbuffer,
65  internal::io_cache_st *t_file);
66 static uint32_t read_to_buffer(internal::io_cache_st *fromfile,BUFFPEK *buffpek,
67  uint32_t sort_length);
68 static int merge_buffers(MI_SORT_PARAM *info,uint32_t keys,
69  internal::io_cache_st *from_file, internal::io_cache_st *to_file,
70  unsigned char * *sort_keys, BUFFPEK *lastbuff,
71  BUFFPEK *Fb, BUFFPEK *Tb);
72 static int merge_index(MI_SORT_PARAM *,uint,unsigned char **,BUFFPEK *, int,
74 static int write_keys_varlen(MI_SORT_PARAM *info,unsigned char **sort_keys,
75  uint32_t count, BUFFPEK *buffpek,
76  internal::io_cache_st *tempfile);
77 static uint32_t read_to_buffer_varlen(internal::io_cache_st *fromfile,BUFFPEK *buffpek,
78  uint32_t sort_length);
79 static int write_merge_key(MI_SORT_PARAM *info, internal::io_cache_st *to_file,
80  unsigned char *key, uint32_t sort_length, uint32_t count);
81 static int write_merge_key_varlen(MI_SORT_PARAM *info,
82  internal::io_cache_st *to_file,
83  unsigned char* key, uint32_t sort_length,
84  uint32_t count);
85 
86 inline int
87 my_var_write(MI_SORT_PARAM *info, internal::io_cache_st *to_file, unsigned char *bufs);
88 
89 /*
90  Creates a index of sorted keys
91 
92  SYNOPSIS
93  _create_index_by_sort()
94  info Sort parameters
95  no_messages Set to 1 if no output
96  sortbuff_size Size if sortbuffer to allocate
97 
98  RESULT
99  0 ok
100  <> 0 Error
101 */
102 
103 int _create_index_by_sort(MI_SORT_PARAM *info,bool no_messages,
104  size_t sortbuff_size)
105 {
106  size_t skr;
107  uint32_t memavl,keys;
109  internal::io_cache_st tempfile, tempfile_for_exceptions;
110 
111  if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
112  {
113  info->write_keys=write_keys_varlen;
114  info->read_to_buffer=read_to_buffer_varlen;
115  info->write_key= write_merge_key_varlen;
116  }
117  else
118  {
119  info->write_keys=write_keys;
120  info->read_to_buffer=read_to_buffer;
121  info->write_key=write_merge_key;
122  }
123 
124  tempfile.clear();
125  tempfile_for_exceptions.clear();
126  memset(&buffpek, 0, sizeof(buffpek));
127  unsigned char** sort_keys= (unsigned char **) NULL;
128  int error= 1;
129  size_t maxbuffer=1;
130 
131  memavl=max(sortbuff_size,(size_t)MIN_SORT_MEMORY);
132  ha_rows records= info->sort_info->max_records;
133  uint32_t sort_length= info->key_length;
134 
135  while (memavl >= MIN_SORT_MEMORY)
136  {
137  if ((records < UINT32_MAX) &&
138  ((internal::my_off_t) (records + 1) *
139  (sort_length + sizeof(char*)) <= (internal::my_off_t) memavl))
140  keys= (uint)records+1;
141  else
142  do
143  {
144  skr=maxbuffer;
145  if (memavl < sizeof(BUFFPEK)* maxbuffer ||
146  (keys=(memavl-sizeof(BUFFPEK)* maxbuffer) / (sort_length+sizeof(char*))) <= 1 ||
147  keys < maxbuffer)
148  {
149  mi_check_print_error(info->sort_info->param,
150  "myisam_sort_buffer_size is too small");
151  goto err;
152  }
153  }
154  while ((maxbuffer= (size_t)(records/(keys-1)+1)) != skr);
155 
156  sort_keys=(unsigned char **)malloc(keys*(sort_length+sizeof(char*)));
157  buffpek.init(sizeof(BUFFPEK), maxbuffer, maxbuffer/2);
158  break;
159  }
160  if (memavl < MIN_SORT_MEMORY)
161  {
162  mi_check_print_error(info->sort_info->param,"MyISAM sort buffer too small");
163  goto err;
164  }
165  (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
166 
167  if (!no_messages)
168  printf(" - Searching for keys, allocating buffer for %d keys\n",keys);
169 
170  if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
171  &tempfile,&tempfile_for_exceptions))
172  == HA_POS_ERROR)
173  goto err;
174  if (maxbuffer == 0)
175  {
176  if (!no_messages)
177  printf(" - Dumping %u keys\n", (uint32_t) records);
178  if (write_index(info,sort_keys, (uint) records))
179  goto err;
180  }
181  else
182  {
183  keys=(keys*(sort_length+sizeof(char*)))/sort_length;
184  if (maxbuffer >= MERGEBUFF2)
185  {
186  if (!no_messages)
187  printf(" - Merging %u keys\n", (uint32_t) records);
188  if (merge_many_buff(info,keys,sort_keys, (BUFFPEK*)buffpek.buffer, &maxbuffer, &tempfile))
189  goto err;
190  }
191  if (tempfile.flush() ||
192  tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
193  goto err;
194  if (!no_messages)
195  printf(" - Last merge and dumping keys\n");
196  if (merge_index(info,keys,sort_keys, (BUFFPEK*)buffpek.buffer, maxbuffer, &tempfile))
197  goto err;
198  }
199 
200  if (flush_pending_blocks(info))
201  goto err;
202 
203  if (tempfile_for_exceptions.inited())
204  {
205  MI_INFO *idx=info->sort_info->info;
206  uint32_t keyno=info->key;
207  uint32_t key_length, ref_length=idx->s->rec_reflength;
208 
209  if (not no_messages)
210  printf(" - Adding exceptions\n");
211 
212  if (tempfile_for_exceptions.flush() || tempfile_for_exceptions.reinit_io_cache(internal::READ_CACHE,0L,0,0))
213  {
214  goto err;
215  }
216 
217  while (not tempfile_for_exceptions.read(&key_length, sizeof(key_length))
218  && not tempfile_for_exceptions.read(sort_keys, key_length))
219  {
220  if (_mi_ck_write(idx,keyno,(unsigned char*) sort_keys,key_length-ref_length))
221  goto err;
222  }
223  }
224 
225  error =0;
226 
227 err:
228  free(sort_keys);
229  buffpek.free();
230  tempfile.close_cached_file();
231  tempfile_for_exceptions.close_cached_file();
232 
233  return error ? -1 : 0;
234 } /* _create_index_by_sort */
235 
236 
237 /* Search after all keys and place them in a temp. file */
238 
239 static ha_rows find_all_keys(MI_SORT_PARAM *info, uint32_t keys,
240  unsigned char **sort_keys,
241  DYNAMIC_ARRAY *buffpek,
242  size_t *maxbuffer, internal::io_cache_st *tempfile,
243  internal::io_cache_st *tempfile_for_exceptions)
244 {
245  int error;
246  uint32_t idx;
247 
248  idx=error=0;
249  sort_keys[0]=(unsigned char*) (sort_keys+keys);
250 
251  while (!(error=(*info->key_read)(info,sort_keys[idx])))
252  {
253  if (info->real_key_length > info->key_length)
254  {
255  if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
256  return(HA_POS_ERROR);
257  continue;
258  }
259 
260  if (++idx == keys)
261  {
262  if (info->write_keys(info,sort_keys,idx-1,(BUFFPEK *)buffpek->alloc(), tempfile))
263  return HA_POS_ERROR;
264 
265  sort_keys[0]=(unsigned char*) (sort_keys+keys);
266  memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
267  idx=1;
268  }
269  sort_keys[idx]=sort_keys[idx-1]+info->key_length;
270  }
271  if (error > 0)
272  return(HA_POS_ERROR);
273  if (buffpek->size())
274  {
275  if (info->write_keys(info,sort_keys,idx,(BUFFPEK *)buffpek->alloc(), tempfile))
276  return HA_POS_ERROR;
277  *maxbuffer=buffpek->size() - 1;
278  }
279  else
280  *maxbuffer=0;
281 
282  return((*maxbuffer)*(keys-1)+idx);
283 } /* find_all_keys */
284 
285 
286 int thr_write_keys(MI_SORT_PARAM *sort_param)
287 {
288  SORT_INFO *sort_info= sort_param->sort_info;
289  MI_CHECK *param= sort_info->param;
290  uint32_t length= 0, keys;
291  ulong *rec_per_key_part= param->rec_per_key_part;
292  int got_error=sort_info->got_error;
293  uint32_t i;
294  MI_INFO *info=sort_info->info;
295  MYISAM_SHARE *share=info->s;
296  MI_SORT_PARAM *sinfo;
297  unsigned char *mergebuf=0;
298 
299  for (i= 0, sinfo= sort_param ;
300  i < sort_info->total_keys ;
301  i++, rec_per_key_part+=sinfo->keyinfo->keysegs, sinfo++)
302  {
303  if (!sinfo->sort_keys)
304  {
305  got_error=1;
306  void * rec_buff_ptr= mi_get_rec_buff_ptr(info, sinfo->rec_buff);
307  free(rec_buff_ptr);
308  continue;
309  }
310  if (!got_error)
311  {
312  mi_set_key_active(share->state.key_map, sinfo->key);
313  if (!sinfo->buffpek.size())
314  {
315  if (param->testflag & T_VERBOSE)
316  {
317  printf("Key %d - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
318  fflush(stdout);
319  }
320  if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) || flush_pending_blocks(sinfo))
321  got_error=1;
322  }
323  if (!got_error && param->testflag & T_STATISTICS)
324  update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
325  param->stats_method == MI_STATS_METHOD_IGNORE_NULLS?
326  sinfo->notnull: NULL,
327  (uint64_t) info->state->records);
328  }
329  free((unsigned char*) sinfo->sort_keys);
330  void * rec_buff_ptr= mi_get_rec_buff_ptr(info, sinfo->rec_buff);
331  free(rec_buff_ptr);
332  sinfo->sort_keys=0;
333  }
334 
335  for (i= 0, sinfo= sort_param; i < sort_info->total_keys; i++,
336  sinfo->buffpek.free(),
337  sinfo->tempfile.close_cached_file(),
338  sinfo->tempfile_for_exceptions.close_cached_file(),
339  sinfo++)
340  {
341  if (got_error)
342  continue;
343  if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
344  {
345  sinfo->write_keys=write_keys_varlen;
346  sinfo->read_to_buffer=read_to_buffer_varlen;
347  sinfo->write_key=write_merge_key_varlen;
348  }
349  else
350  {
351  sinfo->write_keys=write_keys;
352  sinfo->read_to_buffer=read_to_buffer;
353  sinfo->write_key=write_merge_key;
354  }
355  if (sinfo->buffpek.size())
356  {
357  size_t maxbuffer=sinfo->buffpek.size() - 1;
358  if (!mergebuf)
359  {
360  length=param->sort_buffer_length;
361  while (length >= MIN_SORT_MEMORY)
362  {
363  if ((mergebuf= (unsigned char *)malloc(length)))
364  break;
365  length=length*3/4;
366  }
367  if (!mergebuf)
368  {
369  got_error=1;
370  continue;
371  }
372  }
373  keys=length/sinfo->key_length;
374  if (maxbuffer >= MERGEBUFF2)
375  {
376  if (param->testflag & T_VERBOSE)
377  printf("Key %d - Merging %u keys\n",sinfo->key+1, sinfo->keys);
378  if (merge_many_buff(sinfo, keys, (unsigned char **)mergebuf, (BUFFPEK*)sinfo->buffpek.buffer,
379  &maxbuffer, &sinfo->tempfile))
380  {
381  got_error=1;
382  continue;
383  }
384  }
385  if (sinfo->tempfile.flush() || sinfo->tempfile.reinit_io_cache(internal::READ_CACHE,0L,0,0))
386  {
387  got_error=1;
388  continue;
389  }
390  if (param->testflag & T_VERBOSE)
391  printf("Key %d - Last merge and dumping keys\n", sinfo->key+1);
392  if (merge_index(sinfo, keys, (unsigned char **)mergebuf, (BUFFPEK*)sinfo->buffpek.buffer,
393  maxbuffer,&sinfo->tempfile) ||
394  flush_pending_blocks(sinfo))
395  {
396  got_error=1;
397  continue;
398  }
399  }
400  if (sinfo->tempfile_for_exceptions.inited())
401  {
402  uint32_t key_length;
403 
404  if (param->testflag & T_VERBOSE)
405  printf("Key %d - Dumping 'long' keys\n", sinfo->key+1);
406 
407  if (sinfo->tempfile_for_exceptions.flush() || sinfo->tempfile_for_exceptions.reinit_io_cache(internal::READ_CACHE,0L,0,0))
408  {
409  got_error=1;
410  continue;
411  }
412 
413  while (!got_error && not sinfo->tempfile_for_exceptions.read(&key_length, sizeof(key_length)))
414  {
415  unsigned char ft_buf[10];
416  if (key_length > sizeof(ft_buf) || sinfo->tempfile_for_exceptions.read(ft_buf, (uint)key_length)
417  || _mi_ck_write(info, sinfo->key, (unsigned char*)ft_buf, key_length - info->s->rec_reflength))
418  got_error= 1;
419  }
420  }
421  }
422  free((unsigned char*) mergebuf);
423  return(got_error);
424 }
425 
426  /* Write all keys in memory to file for later merge */
427 
428 static int write_keys(MI_SORT_PARAM *info, register unsigned char **sort_keys,
429  uint32_t count, BUFFPEK *buffpek, internal::io_cache_st *tempfile)
430 {
431  unsigned char **end;
432  uint32_t sort_length=info->key_length;
433 
434  internal::my_qsort2((unsigned char*) sort_keys,count,sizeof(unsigned char*),(qsort2_cmp) info->key_cmp, info);
435  if (not tempfile->inited() && tempfile->open_cached_file(P_tmpdir, "ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
436  return(1);
437 
438  buffpek->file_pos= tempfile->tell();
439  buffpek->count=count;
440 
441  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
442  {
443  if (tempfile->write(*sort_keys, sort_length))
444  return(1);
445  }
446  return(0);
447 } /* write_keys */
448 
449 
450 inline int
451 my_var_write(MI_SORT_PARAM *info, internal::io_cache_st *to_file, unsigned char *bufs)
452 {
453  int err;
454  uint16_t len = _mi_keylength(info->keyinfo, (unsigned char*) bufs);
455 
456  /* The following is safe as this is a local file */
457  if ((err= to_file->write(&len, sizeof(len))))
458  return (err);
459  if ((err= to_file->write(bufs, len)))
460  return (err);
461  return (0);
462 }
463 
464 
465 static int write_keys_varlen(MI_SORT_PARAM *info,
466  register unsigned char **sort_keys,
467  uint32_t count, BUFFPEK *buffpek,
468  internal::io_cache_st *tempfile)
469 {
470  unsigned char **end;
471  int err;
472 
473  internal::my_qsort2((unsigned char*) sort_keys,count,sizeof(unsigned char*),(qsort2_cmp) info->key_cmp, info);
474  if (not tempfile->inited() && tempfile->open_cached_file(P_tmpdir, "ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
475  return(1);
476 
477  buffpek->file_pos= tempfile->tell();
478  buffpek->count=count;
479  for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
480  {
481  if ((err= my_var_write(info,tempfile, (unsigned char*) *sort_keys)))
482  return(err);
483  }
484  return(0);
485 } /* write_keys_varlen */
486 
487 
488 static int write_key(MI_SORT_PARAM *info, unsigned char *key,
489  internal::io_cache_st *tempfile)
490 {
491  uint32_t key_length=info->real_key_length;
492 
493  if (not tempfile->inited() && tempfile->open_cached_file(P_tmpdir, "ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
494  return(1);
495 
496  if (tempfile->write(&key_length, sizeof(key_length)) ||
497  tempfile->write(key, key_length))
498  return(1);
499  return(0);
500 } /* write_key */
501 
502 
503 /* Write index */
504 
505 static int write_index(MI_SORT_PARAM *info, register unsigned char **sort_keys,
506  register uint32_t count)
507 {
508  internal::my_qsort2((unsigned char*) sort_keys,(size_t) count,sizeof(unsigned char*),
509  (qsort2_cmp) info->key_cmp,info);
510  while (count--)
511  {
512  if ((*info->key_write)(info,*sort_keys++))
513  return(-1);
514  }
515  return(0);
516 } /* write_index */
517 
518 
519  /* Merge buffers to make < MERGEBUFF2 buffers */
520 
521 static int merge_many_buff(MI_SORT_PARAM *info, uint32_t keys,
522  unsigned char **sort_keys, BUFFPEK *buffpek,
523  size_t *maxbuffer, internal::io_cache_st *t_file)
524 {
525  uint32_t i;
526  internal::io_cache_st t_file2, *from_file, *to_file, *temp;
527  BUFFPEK *lastbuff;
528 
529  if (*maxbuffer < MERGEBUFF2)
530  return(0);
531  if (t_file->flush() || t_file2.open_cached_file(P_tmpdir, "ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
532  return(1);
533 
534  from_file= t_file ; to_file= &t_file2;
535  while (*maxbuffer >= MERGEBUFF2)
536  {
537  from_file->reinit_io_cache(internal::READ_CACHE,0L,0,0);
538  to_file->reinit_io_cache(internal::WRITE_CACHE,0L,0,0);
539  lastbuff=buffpek;
540  for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
541  {
542  if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
543  buffpek+i,buffpek+i+MERGEBUFF-1))
544  goto cleanup;
545  }
546  if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
547  buffpek+i,buffpek+ *maxbuffer))
548  break;
549  if (to_file->flush())
550  break;
551  temp=from_file; from_file=to_file; to_file=temp;
552  *maxbuffer= (int) (lastbuff-buffpek)-1;
553  }
554 cleanup:
555  to_file->close_cached_file(); /* This holds old result */
556  if (to_file == t_file)
557  *t_file=t_file2; /* Copy result file */
558 
559  return(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
560 } /* merge_many_buff */
561 
562 
563 /*
564  Read data to buffer
565 
566  SYNOPSIS
567  read_to_buffer()
568  fromfile File to read from
569  buffpek Where to read from
570  sort_length max length to read
571  RESULT
572  > 0 Ammount of bytes read
573  -1 Error
574 */
575 
576 static uint32_t read_to_buffer(internal::io_cache_st *fromfile, BUFFPEK *buffpek,
577  uint32_t sort_length)
578 {
579  register uint32_t count;
580  uint32_t length;
581 
582  if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
583  {
584  if (my_pread(fromfile->file,(unsigned char*) buffpek->base,
585  (length= sort_length*count),buffpek->file_pos,MYF_RW))
586  return((uint) -1);
587  buffpek->key=buffpek->base;
588  buffpek->file_pos+= length; /* New filepos */
589  buffpek->count-= count;
590  buffpek->mem_count= count;
591  }
592  return (count*sort_length);
593 } /* read_to_buffer */
594 
595 static uint32_t read_to_buffer_varlen(internal::io_cache_st *fromfile, BUFFPEK *buffpek,
596  uint32_t sort_length)
597 {
598  register uint32_t count;
599  uint16_t length_of_key = 0;
600  uint32_t idx;
601  unsigned char *buffp;
602 
603  if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
604  {
605  buffp = buffpek->base;
606 
607  for (idx=1;idx<=count;idx++)
608  {
609  if (my_pread(fromfile->file,(unsigned char*)&length_of_key,sizeof(length_of_key),
610  buffpek->file_pos,MYF_RW))
611  return((uint) -1);
612  buffpek->file_pos+=sizeof(length_of_key);
613  if (my_pread(fromfile->file,(unsigned char*) buffp,length_of_key,
614  buffpek->file_pos,MYF_RW))
615  return((uint) -1);
616  buffpek->file_pos+=length_of_key;
617  buffp = buffp + sort_length;
618  }
619  buffpek->key=buffpek->base;
620  buffpek->count-= count;
621  buffpek->mem_count= count;
622  }
623  return (count*sort_length);
624 } /* read_to_buffer_varlen */
625 
626 
627 static int write_merge_key_varlen(MI_SORT_PARAM *info,
628  internal::io_cache_st *to_file, unsigned char* key,
629  uint32_t sort_length, uint32_t count)
630 {
631  uint32_t idx;
632  unsigned char *bufs = key;
633 
634  for (idx=1;idx<=count;idx++)
635  {
636  int err;
637  if ((err= my_var_write(info, to_file, bufs)))
638  return (err);
639  bufs=bufs+sort_length;
640  }
641  return(0);
642 }
643 
644 
645 static int write_merge_key(MI_SORT_PARAM*,
646  internal::io_cache_st *to_file, unsigned char *key,
647  uint32_t sort_length, uint32_t count)
648 {
649  return to_file->write(key, sort_length*count);
650 }
651 
652 /*
653  * Function object to be used as the comparison function
654  * for the priority queue in the merge_buffers method.
655  */
657 {
658  qsort2_cmp key_compare;
659  void *key_compare_arg;
660  public:
661  compare_functor(qsort2_cmp in_key_compare, void *in_compare_arg)
662  : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
663  inline bool operator()(const BUFFPEK *i, const BUFFPEK *j) const
664  {
665  int val= key_compare(key_compare_arg,
666  &i->key, &j->key);
667  return (val >= 0);
668  }
669 };
670 
671 /*
672  Merge buffers to one buffer
673  If to_file == 0 then use info->key_write
674 */
675 
676 static int
677 merge_buffers(MI_SORT_PARAM *info, uint32_t keys, internal::io_cache_st *from_file,
678  internal::io_cache_st *to_file, unsigned char **sort_keys, BUFFPEK *lastbuff,
679  BUFFPEK *Fb, BUFFPEK *Tb)
680 {
681  int error;
682  uint32_t sort_length,maxcount;
683  ha_rows count;
684  internal::my_off_t to_start_filepos= 0;
685  unsigned char *strpos;
686  BUFFPEK *buffpek;
687  priority_queue<BUFFPEK *, vector<BUFFPEK *>, compare_functor >
688  queue(compare_functor((qsort2_cmp) info->key_cmp, static_cast<void *>(info)));
689  volatile int *killed= killed_ptr(info->sort_info->param);
690 
691  count=error=0;
692  maxcount=keys/((uint) (Tb-Fb) +1);
693  assert(maxcount > 0);
694  if (to_file)
695  to_start_filepos= to_file->tell();
696  strpos=(unsigned char*) sort_keys;
697  sort_length=info->key_length;
698 
699  for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
700  {
701  count+= buffpek->count;
702  buffpek->base= strpos;
703  buffpek->max_keys=maxcount;
704  strpos+= (uint) (error=(int) info->read_to_buffer(from_file,buffpek,
705  sort_length));
706  if (error == -1)
707  goto err;
708  queue.push(buffpek);
709  }
710 
711  while (queue.size() > 1)
712  {
713  for (;;)
714  {
715  if (*killed)
716  {
717  error=1; goto err;
718  }
719  buffpek= queue.top();
720  if (to_file)
721  {
722  if (info->write_key(info,to_file,(unsigned char*) buffpek->key,
723  (uint) sort_length,1))
724  {
725  error=1; goto err;
726  }
727  }
728  else
729  {
730  if ((*info->key_write)(info,(void*) buffpek->key))
731  {
732  error=1; goto err;
733  }
734  }
735  buffpek->key+=sort_length;
736  if (! --buffpek->mem_count)
737  {
738  if (!(error=(int) info->read_to_buffer(from_file,buffpek,sort_length)))
739  {
740  queue.pop();
741  break; /* One buffer have been removed */
742  }
743  }
744  else if (error == -1)
745  goto err;
746  /* Top element has been replaced */
747  queue.pop();
748  queue.push(buffpek);
749  }
750  }
751  buffpek= queue.top();
752  buffpek->base=(unsigned char *) sort_keys;
753  buffpek->max_keys=keys;
754  do
755  {
756  if (to_file)
757  {
758  if (info->write_key(info,to_file,(unsigned char*) buffpek->key,
759  sort_length,buffpek->mem_count))
760  {
761  error=1; goto err;
762  }
763  }
764  else
765  {
766  register unsigned char *end;
767  strpos= buffpek->key;
768  for (end=strpos+buffpek->mem_count*sort_length;
769  strpos != end ;
770  strpos+=sort_length)
771  {
772  if ((*info->key_write)(info,(void*) strpos))
773  {
774  error=1; goto err;
775  }
776  }
777  }
778  }
779  while ((error=(int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
780  error != 0);
781 
782  lastbuff->count=count;
783  if (to_file)
784  lastbuff->file_pos=to_start_filepos;
785 err:
786  return(error);
787 } /* merge_buffers */
788 
789 
790  /* Do a merge to output-file (save only positions) */
791 
792 static int
793 merge_index(MI_SORT_PARAM *info, uint32_t keys, unsigned char **sort_keys,
794  BUFFPEK *buffpek, int maxbuffer, internal::io_cache_st *tempfile)
795 {
796  if (merge_buffers(info,keys,tempfile,(internal::io_cache_st*) 0,sort_keys,buffpek,buffpek,
797  buffpek+maxbuffer))
798  return(1);
799  return(0);
800 } /* merge_index */
801