22 #include <drizzled/sql_load.h>
23 #include <drizzled/error.h>
24 #include <drizzled/catalog/local.h>
25 #include <drizzled/session.h>
26 #include <drizzled/sql_base.h>
27 #include <drizzled/field/epoch.h>
28 #include <drizzled/internal/my_sys.h>
29 #include <drizzled/internal/iocache.h>
30 #include <drizzled/plugin/storage_engine.h>
31 #include <drizzled/sql_lex.h>
32 #include <drizzled/copy_info.h>
33 #include <drizzled/file_exchange.h>
34 #include <drizzled/util/test.h>
35 #include <drizzled/session/transactions.h>
41 #include <boost/filesystem.hpp>
43 namespace fs=boost::filesystem;
50 unsigned char *buffer;
51 unsigned char *end_of_buff;
54 char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
55 uint field_term_length,line_term_length,enclosed_length;
56 int field_term_char,line_term_char,enclosed_char,escape_char;
57 int *stack,*stack_pos;
58 bool found_end_of_line,start_of_line,eof;
59 bool need_end_io_cache;
63 bool error,line_cuted,found_null,enclosed;
64 unsigned char *row_start,
70 String &enclosed,
int escape,
bool is_fifo);
73 int read_fixed_length(
void);
75 char unescape(
char chr);
76 int terminator(
char *ptr,uint32_t length);
77 bool find_start_of_fields();
86 need_end_io_cache = 0;
94 void set_io_cache_arg(
void* arg) { cache.arg = arg; }
101 bool ignore_check_option_errors);
105 String &enclosed, uint32_t skip_lines,
106 bool ignore_check_option_errors);
132 enum enum_duplicates handle_duplicates,
bool ignore)
137 String *field_term=ex->field_term,*escaped=ex->escaped;
138 String *enclosed=ex->enclosed;
141 assert(table_list->getSchemaName());
148 util::string::ptr schema(session->schema());
149 const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName();
151 uint32_t skip_lines= ex->skip_lines;
152 bool transactional_table;
155 if (escaped->length() > 4 || enclosed->length() > 4)
157 my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
164 if (setup_tables_and_check_access(session, &session->lex().select_lex.context,
165 &session->lex().select_lex.top_join_list,
167 &session->lex().select_lex.leaf_tables,
true))
178 if (unique_table(table_list, table_list->
next_global))
180 my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
184 table= table_list->
table;
185 transactional_table= table->
cursor->has_transactions();
187 if (!fields_vars.size())
190 for (field= table->getFields(); *field ; field++)
191 fields_vars.push_back(
new Item_field(*field));
192 table->setWriteSet();
193 table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
198 if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
199 setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
205 if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
206 setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
207 check_that_all_fields_are_given_values(session, table, table_list))
217 table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
225 if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
229 table->mark_columns_needed_for_insert();
232 bool use_blobs= 0, use_vars= 0;
233 List<Item>::iterator it(fields_vars.begin());
238 Item *real_item= item->real_item();
240 if (real_item->type() == Item::FIELD_ITEM)
242 Field *field= ((Item_field*)real_item)->field;
243 if (field->flags & BLOB_FLAG)
249 tot_length+= field->field_length;
251 else if (item->type() == Item::STRING_ITEM)
254 if (use_blobs && !ex->line_term->length() && !field_term->length())
256 my_message(ER_BLOBS_AND_NO_TERMINATED,ER(ER_BLOBS_AND_NO_TERMINATED),
260 if (use_vars && !field_term->length() && !enclosed->length())
262 my_error(ER_LOAD_FROM_FIXED_SIZE_ROWS_TO_VAR, MYF(0));
266 fs::path to_file(ex->file_name);
267 fs::path target_path(fs::system_complete(catalog::local_identifier().getPath()));
268 if (not to_file.has_root_directory())
270 int count_elements= 0;
271 for (fs::path::iterator iter= to_file.begin();
272 iter != to_file.end();
273 ++iter, ++count_elements)
276 if (count_elements == 1)
280 target_path /= to_file;
284 target_path= to_file;
287 if (not secure_file_priv.string().empty())
289 if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
292 my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0),
"--secure-file-priv");
297 struct stat stat_info;
298 if (stat(target_path.file_string().c_str(), &stat_info))
300 my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
305 if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&
306 (stat_info.st_mode & S_IFLNK) != S_IFLNK &&
307 ((stat_info.st_mode & S_IFREG) == S_IFREG ||
308 (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
310 my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
313 if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
317 if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
319 my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
323 memset(&info, 0,
sizeof(info));
325 info.handle_duplicates=handle_duplicates;
326 info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
328 identifier::Schema identifier(session->catalog().identifier(),
330 READ_INFO read_info(file, tot_length,
331 ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
332 *field_term, *ex->line_start, *ex->line_term, *enclosed,
333 info.escape_char, is_fifo);
337 internal::my_close(file,MYF(0));
349 session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
352 if (ex->line_term->length())
355 while (skip_lines > 0)
358 if (read_info.next_line())
363 if (!(error=test(read_info.error)))
368 handle_duplicates == DUP_REPLACE)
369 table->
cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
370 if (handle_duplicates == DUP_REPLACE)
371 table->
cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
372 table->
cursor->ha_start_bulk_insert((ha_rows) 0);
375 session->setAbortOnWarning(
true);
377 if (!field_term->length() && !enclosed->length())
378 error= read_fixed_length(session, info, table_list, fields_vars,
379 set_fields, set_values, read_info,
382 error= read_sep_field(session, info, table_list, fields_vars,
383 set_fields, set_values, read_info,
384 *enclosed, skip_lines, ignore);
385 if (table->
cursor->ha_end_bulk_insert() && !error)
387 table->print_error(errno, MYF(0));
390 table->
cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
391 table->
cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
395 internal::my_close(file,MYF(0));
398 session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
407 snprintf(msg,
sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
413 session->
my_ok(info.copied + info.deleted, 0, 0L, msg);
415 (void)(transactional_table);
416 assert(transactional_table || !(info.copied || info.deleted) ||
419 table->auto_increment_field_not_null=
false;
420 session->setAbortOnWarning(
false);
431 read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
432 List<Item> &fields_vars, List<Item> &set_fields,
433 List<Item> &set_values, READ_INFO &read_info,
434 uint32_t skip_lines,
bool ignore_check_option_errors)
436 List<Item>::iterator it(fields_vars.begin());
437 Item_field *sql_field;
438 Table *table= table_list->table;
441 while (!read_info.read_fixed_length())
443 if (session->getKilled())
445 session->send_kill_message();
459 it= fields_vars.begin();
460 unsigned char *pos=read_info.row_start;
462 read_info.row_end[0]=0;
465 table->restoreRecordAsDefault();
470 while ((sql_field= (Item_field*) it++))
472 Field *field= sql_field->field;
473 if (field == table->next_number_field)
474 table->auto_increment_field_not_null=
true;
480 field->set_notnull();
482 if (pos == read_info.row_end)
484 session->cuted_fields++;
485 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
486 ER_WARN_TOO_FEW_RECORDS,
487 ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
489 if (not field->maybe_null() and field->is_timestamp())
490 ((field::Epoch::pointer) field)->set_time();
495 unsigned char save_chr;
496 if ((length=(uint32_t) (read_info.row_end-pos)) >
499 length=field->field_length;
501 save_chr=pos[length];
503 field->store((
char*) pos,length,read_info.read_charset);
504 pos[length]=save_chr;
505 if ((pos+=length) > read_info.row_end)
506 pos= read_info.row_end;
509 if (pos != read_info.row_end)
511 session->cuted_fields++;
512 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
513 ER_WARN_TOO_MANY_RECORDS,
514 ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
517 if (session->getKilled() ||
518 fill_record(session, set_fields, set_values,
519 ignore_check_option_errors))
522 err= write_record(session, table, &info);
523 table->auto_increment_field_not_null=
false;
531 if (read_info.next_line())
533 if (read_info.line_cuted)
535 session->cuted_fields++;
536 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
537 ER_WARN_TOO_MANY_RECORDS,
538 ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
540 session->row_count++;
542 return(test(read_info.error));
548 read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
549 List<Item> &fields_vars, List<Item> &set_fields,
550 List<Item> &set_values, READ_INFO &read_info,
551 String &enclosed, uint32_t skip_lines,
552 bool ignore_check_option_errors)
554 List<Item>::iterator it(fields_vars.begin());
556 Table *table= table_list->table;
557 uint32_t enclosed_length;
560 enclosed_length=enclosed.length();
562 for (;;it= fields_vars.begin())
564 if (session->getKilled())
566 session->send_kill_message();
570 table->restoreRecordAsDefault();
578 if (read_info.read_field())
585 pos=read_info.row_start;
586 length=(uint32_t) (read_info.row_end-pos);
588 real_item= item->real_item();
590 if ((!read_info.enclosed && (enclosed_length && length == 4 && !memcmp(pos, STRING_WITH_LEN(
"NULL")))) ||
591 (length == 1 && read_info.found_null))
594 if (real_item->type() == Item::FIELD_ITEM)
596 Field *field= ((Item_field *)real_item)->field;
599 my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
604 if (not field->maybe_null())
606 if (field->is_timestamp())
608 ((field::Epoch::pointer) field)->set_time();
610 else if (field != table->next_number_field)
612 field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
616 else if (item->type() == Item::STRING_ITEM)
618 ((Item_user_var_as_out_param *)item)->set_null_value(
619 read_info.read_charset);
623 my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
630 if (real_item->type() == Item::FIELD_ITEM)
632 Field *field= ((Item_field *)real_item)->field;
633 field->set_notnull();
634 read_info.row_end[0]=0;
635 if (field == table->next_number_field)
636 table->auto_increment_field_not_null=
true;
637 field->store((
char*) pos, length, read_info.read_charset);
639 else if (item->type() == Item::STRING_ITEM)
641 ((Item_user_var_as_out_param *)item)->set_value(
str_ref((
char*) pos, length), read_info.read_charset);
645 my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
659 if (item == &fields_vars.front())
661 for (; item ; item= it++)
663 Item *real_item= item->real_item();
664 if (real_item->type() == Item::FIELD_ITEM)
666 Field *field= ((Item_field *)real_item)->field;
669 my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
673 if (not field->maybe_null() and field->is_timestamp())
674 ((field::Epoch::pointer) field)->set_time();
681 session->cuted_fields++;
682 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
683 ER_WARN_TOO_FEW_RECORDS,
684 ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
686 else if (item->type() == Item::STRING_ITEM)
688 ((Item_user_var_as_out_param *)item)->set_null_value(
689 read_info.read_charset);
693 my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
699 if (session->getKilled() ||
700 fill_record(session, set_fields, set_values,
701 ignore_check_option_errors))
704 err= write_record(session, table, &info);
705 table->auto_increment_field_not_null=
false;
712 if (read_info.next_line())
714 if (read_info.line_cuted)
716 session->cuted_fields++;
717 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
718 ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
720 if (session->getKilled())
723 session->row_count++;
725 return(test(read_info.error));
732 READ_INFO::unescape(
char chr)
736 case 'n':
return '\n';
737 case 't':
return '\t';
738 case 'r':
return '\r';
739 case 'b':
return '\b';
741 case 'Z':
return '\032';
742 case 'N': found_null=1;
756 READ_INFO::READ_INFO(
int file_par,
size_t tot_length,
757 const charset_info_st *
const cs,
758 String &field_term, String &line_start, String &line_term,
759 String &enclosed_par,
int escape,
bool is_fifo)
760 :cursor(file_par),escape_char(escape)
763 field_term_ptr=(
char*) field_term.ptr();
764 field_term_length= field_term.length();
765 line_term_ptr=(
char*) line_term.ptr();
766 line_term_length= line_term.length();
767 if (line_start.length() == 0)
774 line_start_ptr=(
char*) line_start.ptr();
775 line_start_end=line_start_ptr+line_start.length();
779 if (field_term_length == line_term_length &&
780 !memcmp(field_term_ptr,line_term_ptr,field_term_length))
783 line_term_ptr=(
char*)
"";
785 enclosed_char= (enclosed_length=enclosed_par.length()) ?
786 (
unsigned char) enclosed_par[0] : INT_MAX;
787 field_term_char= field_term_length ? (
unsigned char) field_term_ptr[0] : INT_MAX;
788 line_term_char= line_term_length ? (
unsigned char) line_term_ptr[0] : INT_MAX;
789 error=eof=found_end_of_line=found_null=line_cuted=0;
790 buff_length=tot_length;
794 size_t length= max(field_term_length,line_term_length)+1;
795 set_if_bigger(length, line_start.length());
796 stack= stack_pos= (
int*) memory::sql_alloc(
sizeof(
int)*length);
798 if (!(buffer=(
unsigned char*) calloc(1, buff_length+1)))
802 end_of_buff=buffer+buff_length;
804 (
false) ? internal::READ_NET :
805 (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
808 free((
unsigned char*) buffer);
818 need_end_io_cache = 1;
824 READ_INFO::~READ_INFO()
828 if (need_end_io_cache)
836 #define GET (stack_pos != stack ? *--stack_pos : cache.get())
837 #define PUSH(A) *(stack_pos++)=(A)
840 inline int READ_INFO::terminator(
char *ptr,uint32_t length)
844 for (i=1 ; i < length ; i++)
846 if ((chr=GET) != *++ptr)
855 PUSH((
unsigned char) *--ptr);
860 int READ_INFO::read_field()
862 int chr,found_enclosed_char;
863 unsigned char *to,*new_buffer;
866 if (found_end_of_line)
874 if (find_start_of_fields())
877 if ((chr=GET) == my_b_EOF)
879 found_end_of_line=eof=1;
883 if (chr == enclosed_char)
885 found_enclosed_char=enclosed_char;
886 *to++=(
unsigned char) chr;
890 found_enclosed_char= INT_MAX;
896 while ( to < end_of_buff)
899 if ((my_mbcharlen(read_charset, chr) > 1) &&
900 to+my_mbcharlen(read_charset, chr) <= end_of_buff)
902 unsigned char* p = (
unsigned char*)to;
904 int ml = my_mbcharlen(read_charset, chr);
906 for (i=1; i<ml; i++) {
912 if (my_ismbchar(read_charset,
917 PUSH((
unsigned char) *--to);
922 if (chr == escape_char)
924 if ((chr=GET) == my_b_EOF)
926 *to++= (
unsigned char) escape_char;
936 if (escape_char != enclosed_char || chr == escape_char)
938 *to++ = (
unsigned char) unescape((
char) chr);
944 #ifdef ALLOW_LINESEPARATOR_IN_STRINGS
945 if (chr == line_term_char)
947 if (chr == line_term_char && found_enclosed_char == INT_MAX)
950 if (terminator(line_term_ptr,line_term_length))
959 if (chr == found_enclosed_char)
961 if ((chr=GET) == found_enclosed_char)
963 *to++ = (
unsigned char) chr;
967 if (chr == my_b_EOF ||
968 (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
976 if (chr == field_term_char &&
977 terminator(field_term_ptr,field_term_length))
990 chr= found_enclosed_char;
992 else if (chr == field_term_char && found_enclosed_char == INT_MAX)
994 if (terminator(field_term_ptr,field_term_length))
1002 *to++ = (
unsigned char) chr;
1007 new_buffer=(
unsigned char*) realloc(buffer, buff_length+1+IO_SIZE);
1008 to=new_buffer + (to-buffer);
1010 buff_length+=IO_SIZE;
1011 end_of_buff=buffer+buff_length;
1016 found_end_of_line=eof=1;
1037 int READ_INFO::read_fixed_length()
1041 if (found_end_of_line)
1047 if (find_start_of_fields())
1051 to=row_start=buffer;
1052 while (to < end_of_buff)
1054 if ((chr=GET) == my_b_EOF)
1056 if (chr == escape_char)
1058 if ((chr=GET) == my_b_EOF)
1060 *to++= (
unsigned char) escape_char;
1063 *to++ =(
unsigned char) unescape((
char) chr);
1066 if (chr == line_term_char)
1068 if (terminator(line_term_ptr,line_term_length))
1070 found_end_of_line=1;
1075 *to++ = (
unsigned char) chr;
1081 found_end_of_line=eof=1;
1084 return to == buffer ? 1 : 0;
1088 int READ_INFO::next_line()
1091 start_of_line= line_start_ptr != 0;
1092 if (found_end_of_line || eof)
1094 found_end_of_line=0;
1097 found_end_of_line=0;
1098 if (!line_term_length)
1103 if (my_mbcharlen(read_charset, chr) > 1)
1106 chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1109 if (chr == escape_char)
1112 if (chr == my_b_EOF)
1117 if (chr == escape_char)
1120 if (GET == my_b_EOF)
1124 if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
1131 bool READ_INFO::find_start_of_fields()
1137 if ((chr=GET) == my_b_EOF)
1139 found_end_of_line=eof=1;
1142 }
while ((
char) chr != line_start_ptr[0]);
1143 for (
char *ptr=line_start_ptr+1 ; ptr != line_start_end ; ptr++)
1146 if ((
char) chr != *ptr)
1149 while (--ptr != line_start_ptr)
1151 PUSH((
unsigned char) *ptr);
void my_ok(ha_rows affected_rows=0, ha_rows found_rows_arg=0, uint64_t passed_id=0, const char *message=NULL)
void ha_release_auto_increment()
bool hasModifiedNonTransData() const
Table * table
opened table
Field * found_next_number_field
int init_io_cache(int file, size_t cachesize, cache_type type, my_off_t seek_offset, bool use_async_io, myf cache_myflags)
Initialize an io_cache_st object.
void markModifiedNonTransData()
Field * next_number_field
field::Epoch * timestamp_field
int end_io_cache()
Free an io_cache_st object.
bool openTablesLock(TableList *)