21 #include "myisam_priv.h"
28 #undef MIN_SORT_MEMORY
30 #undef DISK_BUFFER_SIZE
34 #define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
35 #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
36 #define DISK_BUFFER_SIZE (IO_SIZE*16)
39 using namespace drizzled;
46 extern void print_error(
const char *fmt,...);
50 static ha_rows find_all_keys(
MI_SORT_PARAM *info,uint32_t keys,
51 unsigned char **sort_keys,
56 static int write_keys(
MI_SORT_PARAM *info,
unsigned char **sort_keys,
60 static int write_index(
MI_SORT_PARAM *info,
unsigned char * *sort_keys,
63 unsigned char * *sort_keys,
67 uint32_t sort_length);
70 unsigned char * *sort_keys,
BUFFPEK *lastbuff,
74 static int write_keys_varlen(
MI_SORT_PARAM *info,
unsigned char **sort_keys,
78 uint32_t sort_length);
80 unsigned char *key, uint32_t sort_length, uint32_t count);
83 unsigned char* key, uint32_t sort_length,
103 int _create_index_by_sort(
MI_SORT_PARAM *info,
bool no_messages,
104 size_t sortbuff_size)
107 uint32_t memavl,keys;
111 if (info->keyinfo->flag & HA_VAR_LENGTH_KEY)
113 info->write_keys=write_keys_varlen;
114 info->read_to_buffer=read_to_buffer_varlen;
115 info->write_key= write_merge_key_varlen;
119 info->write_keys=write_keys;
120 info->read_to_buffer=read_to_buffer;
121 info->write_key=write_merge_key;
125 tempfile_for_exceptions.clear();
126 memset(&buffpek, 0,
sizeof(buffpek));
127 unsigned char** sort_keys= (
unsigned char **) NULL;
131 memavl=max(sortbuff_size,(
size_t)MIN_SORT_MEMORY);
132 ha_rows records= info->sort_info->max_records;
133 uint32_t sort_length= info->key_length;
135 while (memavl >= MIN_SORT_MEMORY)
137 if ((records < UINT32_MAX) &&
138 ((internal::my_off_t) (records + 1) *
139 (sort_length +
sizeof(
char*)) <= (internal::my_off_t) memavl))
140 keys= (uint)records+1;
145 if (memavl <
sizeof(
BUFFPEK)* maxbuffer ||
146 (keys=(memavl-
sizeof(
BUFFPEK)* maxbuffer) / (sort_length+
sizeof(
char*))) <= 1 ||
149 mi_check_print_error(info->sort_info->param,
150 "myisam_sort_buffer_size is too small");
154 while ((maxbuffer= (
size_t)(records/(keys-1)+1)) != skr);
156 sort_keys=(
unsigned char **)malloc(keys*(sort_length+
sizeof(
char*)));
157 buffpek.init(
sizeof(
BUFFPEK), maxbuffer, maxbuffer/2);
160 if (memavl < MIN_SORT_MEMORY)
162 mi_check_print_error(info->sort_info->param,
"MyISAM sort buffer too small");
165 (*info->lock_in_memory)(info->sort_info->param);
168 printf(
" - Searching for keys, allocating buffer for %d keys\n",keys);
170 if ((records=find_all_keys(info,keys,sort_keys,&buffpek,&maxbuffer,
171 &tempfile,&tempfile_for_exceptions))
177 printf(
" - Dumping %u keys\n", (uint32_t) records);
178 if (write_index(info,sort_keys, (uint) records))
183 keys=(keys*(sort_length+
sizeof(
char*)))/sort_length;
184 if (maxbuffer >= MERGEBUFF2)
187 printf(
" - Merging %u keys\n", (uint32_t) records);
188 if (merge_many_buff(info,keys,sort_keys, (
BUFFPEK*)buffpek.buffer, &maxbuffer, &tempfile))
191 if (tempfile.flush() ||
195 printf(
" - Last merge and dumping keys\n");
196 if (merge_index(info,keys,sort_keys, (
BUFFPEK*)buffpek.buffer, maxbuffer, &tempfile))
200 if (flush_pending_blocks(info))
203 if (tempfile_for_exceptions.inited())
205 MI_INFO *idx=info->sort_info->info;
206 uint32_t keyno=info->key;
207 uint32_t key_length, ref_length=idx->s->rec_reflength;
210 printf(
" - Adding exceptions\n");
212 if (tempfile_for_exceptions.flush() || tempfile_for_exceptions.
reinit_io_cache(internal::READ_CACHE,0L,0,0))
217 while (not tempfile_for_exceptions.read(&key_length,
sizeof(key_length))
218 && not tempfile_for_exceptions.read(sort_keys, key_length))
220 if (_mi_ck_write(idx,keyno,(
unsigned char*) sort_keys,key_length-ref_length))
230 tempfile.close_cached_file();
231 tempfile_for_exceptions.close_cached_file();
233 return error ? -1 : 0;
239 static ha_rows find_all_keys(
MI_SORT_PARAM *info, uint32_t keys,
240 unsigned char **sort_keys,
249 sort_keys[0]=(
unsigned char*) (sort_keys+keys);
251 while (!(error=(*info->key_read)(info,sort_keys[idx])))
253 if (info->real_key_length > info->key_length)
255 if (write_key(info,sort_keys[idx],tempfile_for_exceptions))
256 return(HA_POS_ERROR);
262 if (info->write_keys(info,sort_keys,idx-1,(
BUFFPEK *)buffpek->alloc(), tempfile))
265 sort_keys[0]=(
unsigned char*) (sort_keys+keys);
266 memcpy(sort_keys[0],sort_keys[idx-1],(
size_t) info->key_length);
269 sort_keys[idx]=sort_keys[idx-1]+info->key_length;
272 return(HA_POS_ERROR);
275 if (info->write_keys(info,sort_keys,idx,(
BUFFPEK *)buffpek->alloc(), tempfile))
277 *maxbuffer=buffpek->size() - 1;
282 return((*maxbuffer)*(keys-1)+idx);
288 SORT_INFO *sort_info= sort_param->sort_info;
290 uint32_t length= 0, keys;
291 ulong *rec_per_key_part= param->rec_per_key_part;
292 int got_error=sort_info->got_error;
297 unsigned char *mergebuf=0;
299 for (i= 0, sinfo= sort_param ;
300 i < sort_info->total_keys ;
301 i++, rec_per_key_part+=sinfo->keyinfo->keysegs, sinfo++)
303 if (!sinfo->sort_keys)
306 void * rec_buff_ptr= mi_get_rec_buff_ptr(info, sinfo->rec_buff);
312 mi_set_key_active(share->state.key_map, sinfo->key);
313 if (!sinfo->buffpek.size())
315 if (param->testflag & T_VERBOSE)
317 printf(
"Key %d - Dumping %u keys\n",sinfo->key+1, sinfo->keys);
320 if (write_index(sinfo, sinfo->sort_keys, sinfo->keys) || flush_pending_blocks(sinfo))
323 if (!got_error && param->testflag & T_STATISTICS)
324 update_key_parts(sinfo->keyinfo, rec_per_key_part, sinfo->unique,
325 param->stats_method == MI_STATS_METHOD_IGNORE_NULLS?
326 sinfo->notnull: NULL,
327 (uint64_t) info->state->records);
329 free((
unsigned char*) sinfo->sort_keys);
330 void * rec_buff_ptr= mi_get_rec_buff_ptr(info, sinfo->rec_buff);
335 for (i= 0, sinfo= sort_param; i < sort_info->total_keys; i++,
336 sinfo->buffpek.free(),
337 sinfo->tempfile.close_cached_file(),
338 sinfo->tempfile_for_exceptions.close_cached_file(),
343 if (sinfo->keyinfo->flag & HA_VAR_LENGTH_KEY)
345 sinfo->write_keys=write_keys_varlen;
346 sinfo->read_to_buffer=read_to_buffer_varlen;
347 sinfo->write_key=write_merge_key_varlen;
351 sinfo->write_keys=write_keys;
352 sinfo->read_to_buffer=read_to_buffer;
353 sinfo->write_key=write_merge_key;
355 if (sinfo->buffpek.size())
357 size_t maxbuffer=sinfo->buffpek.size() - 1;
360 length=param->sort_buffer_length;
361 while (length >= MIN_SORT_MEMORY)
363 if ((mergebuf= (
unsigned char *)malloc(length)))
373 keys=length/sinfo->key_length;
374 if (maxbuffer >= MERGEBUFF2)
376 if (param->testflag & T_VERBOSE)
377 printf(
"Key %d - Merging %u keys\n",sinfo->key+1, sinfo->keys);
378 if (merge_many_buff(sinfo, keys, (
unsigned char **)mergebuf, (
BUFFPEK*)sinfo->buffpek.buffer,
379 &maxbuffer, &sinfo->tempfile))
385 if (sinfo->tempfile.flush() || sinfo->tempfile.
reinit_io_cache(internal::READ_CACHE,0L,0,0))
390 if (param->testflag & T_VERBOSE)
391 printf(
"Key %d - Last merge and dumping keys\n", sinfo->key+1);
392 if (merge_index(sinfo, keys, (
unsigned char **)mergebuf, (
BUFFPEK*)sinfo->buffpek.buffer,
393 maxbuffer,&sinfo->tempfile) ||
394 flush_pending_blocks(sinfo))
400 if (sinfo->tempfile_for_exceptions.inited())
404 if (param->testflag & T_VERBOSE)
405 printf(
"Key %d - Dumping 'long' keys\n", sinfo->key+1);
407 if (sinfo->tempfile_for_exceptions.flush() || sinfo->tempfile_for_exceptions.
reinit_io_cache(internal::READ_CACHE,0L,0,0))
413 while (!got_error && not sinfo->tempfile_for_exceptions.read(&key_length,
sizeof(key_length)))
415 unsigned char ft_buf[10];
416 if (key_length >
sizeof(ft_buf) || sinfo->tempfile_for_exceptions.read(ft_buf, (uint)key_length)
417 || _mi_ck_write(info, sinfo->key, (
unsigned char*)ft_buf, key_length - info->s->rec_reflength))
422 free((
unsigned char*) mergebuf);
428 static int write_keys(
MI_SORT_PARAM *info,
register unsigned char **sort_keys,
432 uint32_t sort_length=info->key_length;
434 internal::my_qsort2((
unsigned char*) sort_keys,count,
sizeof(
unsigned char*),(qsort2_cmp) info->key_cmp, info);
435 if (not tempfile->inited() && tempfile->open_cached_file(P_tmpdir,
"ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
438 buffpek->file_pos= tempfile->tell();
439 buffpek->count=count;
441 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
443 if (tempfile->write(*sort_keys, sort_length))
454 uint16_t len = _mi_keylength(info->keyinfo, (
unsigned char*) bufs);
457 if ((err= to_file->write(&len,
sizeof(len))))
459 if ((err= to_file->write(bufs, len)))
466 register unsigned char **sort_keys,
467 uint32_t count,
BUFFPEK *buffpek,
473 internal::my_qsort2((
unsigned char*) sort_keys,count,
sizeof(
unsigned char*),(qsort2_cmp) info->key_cmp, info);
474 if (not tempfile->inited() && tempfile->open_cached_file(P_tmpdir,
"ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
477 buffpek->file_pos= tempfile->tell();
478 buffpek->count=count;
479 for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
481 if ((err= my_var_write(info,tempfile, (
unsigned char*) *sort_keys)))
488 static int write_key(
MI_SORT_PARAM *info,
unsigned char *key,
491 uint32_t key_length=info->real_key_length;
493 if (not tempfile->inited() && tempfile->open_cached_file(P_tmpdir,
"ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
496 if (tempfile->write(&key_length,
sizeof(key_length)) ||
497 tempfile->write(key, key_length))
505 static int write_index(
MI_SORT_PARAM *info,
register unsigned char **sort_keys,
506 register uint32_t count)
508 internal::my_qsort2((
unsigned char*) sort_keys,(
size_t) count,
sizeof(
unsigned char*),
509 (qsort2_cmp) info->key_cmp,info);
512 if ((*info->key_write)(info,*sort_keys++))
521 static int merge_many_buff(
MI_SORT_PARAM *info, uint32_t keys,
522 unsigned char **sort_keys,
BUFFPEK *buffpek,
529 if (*maxbuffer < MERGEBUFF2)
531 if (t_file->flush() || t_file2.open_cached_file(P_tmpdir,
"ST", DISK_BUFFER_SIZE, info->sort_info->param->myf_rw))
534 from_file= t_file ; to_file= &t_file2;
535 while (*maxbuffer >= MERGEBUFF2)
540 for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
542 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
543 buffpek+i,buffpek+i+MERGEBUFF-1))
546 if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
547 buffpek+i,buffpek+ *maxbuffer))
549 if (to_file->flush())
551 temp=from_file; from_file=to_file; to_file=temp;
552 *maxbuffer= (int) (lastbuff-buffpek)-1;
555 to_file->close_cached_file();
556 if (to_file == t_file)
559 return(*maxbuffer >= MERGEBUFF2);
577 uint32_t sort_length)
579 register uint32_t count;
582 if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
584 if (my_pread(fromfile->file,(
unsigned char*) buffpek->base,
585 (length= sort_length*count),buffpek->file_pos,MYF_RW))
587 buffpek->key=buffpek->base;
588 buffpek->file_pos+= length;
589 buffpek->count-= count;
590 buffpek->mem_count= count;
592 return (count*sort_length);
596 uint32_t sort_length)
598 register uint32_t count;
599 uint16_t length_of_key = 0;
601 unsigned char *buffp;
603 if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
605 buffp = buffpek->base;
607 for (idx=1;idx<=count;idx++)
609 if (my_pread(fromfile->file,(
unsigned char*)&length_of_key,
sizeof(length_of_key),
610 buffpek->file_pos,MYF_RW))
612 buffpek->file_pos+=
sizeof(length_of_key);
613 if (my_pread(fromfile->file,(
unsigned char*) buffp,length_of_key,
614 buffpek->file_pos,MYF_RW))
616 buffpek->file_pos+=length_of_key;
617 buffp = buffp + sort_length;
619 buffpek->key=buffpek->base;
620 buffpek->count-= count;
621 buffpek->mem_count= count;
623 return (count*sort_length);
629 uint32_t sort_length, uint32_t count)
632 unsigned char *bufs = key;
634 for (idx=1;idx<=count;idx++)
637 if ((err= my_var_write(info, to_file, bufs)))
639 bufs=bufs+sort_length;
647 uint32_t sort_length, uint32_t count)
649 return to_file->write(key, sort_length*count);
658 qsort2_cmp key_compare;
659 void *key_compare_arg;
662 : key_compare(in_key_compare), key_compare_arg(in_compare_arg) { }
665 int val= key_compare(key_compare_arg,
682 uint32_t sort_length,maxcount;
684 internal::my_off_t to_start_filepos= 0;
685 unsigned char *strpos;
688 queue(
compare_functor((qsort2_cmp) info->key_cmp, static_cast<void *>(info)));
689 volatile int *killed= killed_ptr(info->sort_info->param);
692 maxcount=keys/((uint) (Tb-Fb) +1);
693 assert(maxcount > 0);
695 to_start_filepos= to_file->tell();
696 strpos=(
unsigned char*) sort_keys;
697 sort_length=info->key_length;
699 for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
701 count+= buffpek->count;
702 buffpek->base= strpos;
703 buffpek->max_keys=maxcount;
704 strpos+= (uint) (error=(
int) info->read_to_buffer(from_file,buffpek,
711 while (queue.size() > 1)
719 buffpek= queue.top();
722 if (info->write_key(info,to_file,(
unsigned char*) buffpek->key,
723 (uint) sort_length,1))
730 if ((*info->key_write)(info,(
void*) buffpek->key))
735 buffpek->key+=sort_length;
736 if (! --buffpek->mem_count)
738 if (!(error=(
int) info->read_to_buffer(from_file,buffpek,sort_length)))
744 else if (error == -1)
751 buffpek= queue.top();
752 buffpek->base=(
unsigned char *) sort_keys;
753 buffpek->max_keys=keys;
758 if (info->write_key(info,to_file,(
unsigned char*) buffpek->key,
759 sort_length,buffpek->mem_count))
766 register unsigned char *end;
767 strpos= buffpek->key;
768 for (end=strpos+buffpek->mem_count*sort_length;
772 if ((*info->key_write)(info,(
void*) strpos))
779 while ((error=(
int) info->read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
782 lastbuff->count=count;
784 lastbuff->file_pos=to_start_filepos;
793 merge_index(
MI_SORT_PARAM *info, uint32_t keys,
unsigned char **sort_keys,
bool reinit_io_cache(cache_type type_arg, my_off_t seek_offset, bool use_async_io, bool clear_cache)
Reset the cache.