Drizzled Public API Documentation

sql_load.cc
1 /* Copyright (C) 2000-2006 MySQL AB
2  Copyright (C) 2011 Stewart Smith
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16 
17 
18 /* Copy data from a textfile to table */
19 
20 #include <config.h>
21 
22 #include <drizzled/sql_load.h>
23 #include <drizzled/error.h>
24 #include <drizzled/catalog/local.h>
25 #include <drizzled/session.h>
26 #include <drizzled/sql_base.h>
27 #include <drizzled/field/epoch.h>
28 #include <drizzled/internal/my_sys.h>
29 #include <drizzled/internal/iocache.h>
30 #include <drizzled/plugin/storage_engine.h>
31 #include <drizzled/sql_lex.h>
32 #include <drizzled/copy_info.h>
33 #include <drizzled/file_exchange.h>
34 #include <drizzled/util/test.h>
35 #include <drizzled/session/transactions.h>
36 
37 #include <sys/stat.h>
38 #include <fcntl.h>
39 #include <algorithm>
40 #include <climits>
41 #include <boost/filesystem.hpp>
42 
43 namespace fs=boost::filesystem;
44 using namespace std;
45 namespace drizzled
46 {
47 
48 class READ_INFO {
49  int cursor;
50  unsigned char *buffer; /* Buffer for read text */
51  unsigned char *end_of_buff; /* Data in bufferts ends here */
52  size_t buff_length; /* Length of buffert */
53  size_t max_length; /* Max length of row */
54  char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
55  uint field_term_length,line_term_length,enclosed_length;
56  int field_term_char,line_term_char,enclosed_char,escape_char;
57  int *stack,*stack_pos;
58  bool found_end_of_line,start_of_line,eof;
59  bool need_end_io_cache;
61 
62 public:
63  bool error,line_cuted,found_null,enclosed;
64  unsigned char *row_start, /* Found row starts here */
65  *row_end; /* Found row ends here */
66  const charset_info_st *read_charset;
67 
68  READ_INFO(int cursor, size_t tot_length, const charset_info_st * const cs,
69  String &field_term,String &line_start,String &line_term,
70  String &enclosed,int escape, bool is_fifo);
71  ~READ_INFO();
72  int read_field();
73  int read_fixed_length(void);
74  int next_line(void);
75  char unescape(char chr);
76  int terminator(char *ptr,uint32_t length);
77  bool find_start_of_fields();
78 
79  /*
80  We need to force cache close before destructor is invoked to log
81  the last read block
82  */
83  void end_io_cache()
84  {
85  cache.end_io_cache();
86  need_end_io_cache = 0;
87  }
88 
89  /*
90  Either this method, or we need to make cache public
91  Arg must be set from load() since constructor does not see
92  either the table or Session value
93  */
94  void set_io_cache_arg(void* arg) { cache.arg = arg; }
95 };
96 
97 static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
98  List<Item> &fields_vars, List<Item> &set_fields,
99  List<Item> &set_values, READ_INFO &read_info,
100  uint32_t skip_lines,
101  bool ignore_check_option_errors);
102 static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
103  List<Item> &fields_vars, List<Item> &set_fields,
104  List<Item> &set_values, READ_INFO &read_info,
105  String &enclosed, uint32_t skip_lines,
106  bool ignore_check_option_errors);
107 
108 
109 /*
110  Execute LOAD DATA query
111 
112  SYNOPSYS
113  load()
114  session - current thread
115  ex - file_exchange object representing source cursor and its parsing rules
116  table_list - list of tables to which we are loading data
117  fields_vars - list of fields and variables to which we read
118  data from cursor
119  set_fields - list of fields mentioned in set clause
120  set_values - expressions to assign to fields in previous list
121  handle_duplicates - indicates whenever we should emit error or
122  replace row if we will meet duplicates.
123  ignore - - indicates whenever we should ignore duplicates
124 
125  RETURN VALUES
126  true - error / false - success
127 */
128 
129 int load(Session *session,file_exchange *ex,TableList *table_list,
130  List<Item> &fields_vars, List<Item> &set_fields,
131  List<Item> &set_values,
132  enum enum_duplicates handle_duplicates, bool ignore)
133 {
134  int file;
135  Table *table= NULL;
136  int error;
137  String *field_term=ex->field_term,*escaped=ex->escaped;
138  String *enclosed=ex->enclosed;
139  bool is_fifo=0;
140 
141  assert(table_list->getSchemaName()); // This should never be null
142 
143  /*
144  If path for cursor is not defined, we will use the current database.
145  If this is not set, we will use the directory where the table to be
146  loaded is located
147  */
148  util::string::ptr schema(session->schema());
149  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
150  assert(tdb);
151  uint32_t skip_lines= ex->skip_lines;
152  bool transactional_table;
153 
154  /* Escape and enclosed character may be a utf8 4-byte character */
155  if (escaped->length() > 4 || enclosed->length() > 4)
156  {
157  my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
158  return true;
159  }
160 
161  if (session->openTablesLock(table_list))
162  return true;
163 
164  if (setup_tables_and_check_access(session, &session->lex().select_lex.context,
165  &session->lex().select_lex.top_join_list,
166  table_list,
167  &session->lex().select_lex.leaf_tables, true))
168  return(-1);
169 
170  /*
171  Let us emit an error if we are loading data to table which is used
172  in subselect in SET clause like we do it for INSERT.
173 
174  The main thing to fix to remove this restriction is to ensure that the
175  table is marked to be 'used for insert' in which case we should never
176  mark this table as 'const table' (ie, one that has only one row).
177  */
178  if (unique_table(table_list, table_list->next_global))
179  {
180  my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
181  return true;
182  }
183 
184  table= table_list->table;
185  transactional_table= table->cursor->has_transactions();
186 
187  if (!fields_vars.size())
188  {
189  Field **field;
190  for (field= table->getFields(); *field ; field++)
191  fields_vars.push_back(new Item_field(*field));
192  table->setWriteSet();
193  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
194  /*
195  Let us also prepare SET clause, altough it is probably empty
196  in this case.
197  */
198  if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
199  setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
200  return true;
201  }
202  else
203  { // Part field list
204  /* TODO: use this conds for 'WITH CHECK OPTIONS' */
205  if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
206  setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
207  check_that_all_fields_are_given_values(session, table, table_list))
208  return true;
209  /*
210  Check whenever TIMESTAMP field with auto-set feature specified
211  explicitly.
212  */
213  if (table->timestamp_field)
214  {
215  if (table->isWriteSet(table->timestamp_field->position()))
216  {
217  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
218  }
219  else
220  {
221  table->setWriteSet(table->timestamp_field->position());
222  }
223  }
224  /* Fix the expressions in SET clause */
225  if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
226  return true;
227  }
228 
229  table->mark_columns_needed_for_insert();
230 
231  size_t tot_length=0;
232  bool use_blobs= 0, use_vars= 0;
233  List<Item>::iterator it(fields_vars.begin());
234  Item *item;
235 
236  while ((item= it++))
237  {
238  Item *real_item= item->real_item();
239 
240  if (real_item->type() == Item::FIELD_ITEM)
241  {
242  Field *field= ((Item_field*)real_item)->field;
243  if (field->flags & BLOB_FLAG)
244  {
245  use_blobs= 1;
246  tot_length+= 256; // Will be extended if needed
247  }
248  else
249  tot_length+= field->field_length;
250  }
251  else if (item->type() == Item::STRING_ITEM)
252  use_vars= 1;
253  }
254  if (use_blobs && !ex->line_term->length() && !field_term->length())
255  {
256  my_message(ER_BLOBS_AND_NO_TERMINATED,ER(ER_BLOBS_AND_NO_TERMINATED),
257  MYF(0));
258  return true;
259  }
260  if (use_vars && !field_term->length() && !enclosed->length())
261  {
262  my_error(ER_LOAD_FROM_FIXED_SIZE_ROWS_TO_VAR, MYF(0));
263  return true;
264  }
265 
266  fs::path to_file(ex->file_name);
267  fs::path target_path(fs::system_complete(catalog::local_identifier().getPath()));
268  if (not to_file.has_root_directory())
269  {
270  int count_elements= 0;
271  for (fs::path::iterator iter= to_file.begin();
272  iter != to_file.end();
273  ++iter, ++count_elements)
274  { }
275 
276  if (count_elements == 1)
277  {
278  target_path /= tdb;
279  }
280  target_path /= to_file;
281  }
282  else
283  {
284  target_path= to_file;
285  }
286 
287  if (not secure_file_priv.string().empty())
288  {
289  if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
290  {
291  /* Read only allowed from within dir specified by secure_file_priv */
292  my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
293  return true;
294  }
295  }
296 
297  struct stat stat_info;
298  if (stat(target_path.file_string().c_str(), &stat_info))
299  {
300  my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
301  return true;
302  }
303 
304  // if we are not in slave thread, the cursor must be:
305  if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
306  (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
307  ((stat_info.st_mode & S_IFREG) == S_IFREG ||
308  (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
309  {
310  my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
311  return true;
312  }
313  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
314  is_fifo = 1;
315 
316 
317  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
318  {
319  my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
320  return true;
321  }
322  CopyInfo info;
323  memset(&info, 0, sizeof(info));
324  info.ignore= ignore;
325  info.handle_duplicates=handle_duplicates;
326  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
327 
328  identifier::Schema identifier(session->catalog().identifier(),
329  *schema);
330  READ_INFO read_info(file, tot_length,
331  ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
332  *field_term, *ex->line_start, *ex->line_term, *enclosed,
333  info.escape_char, is_fifo);
334  if (read_info.error)
335  {
336  if (file >= 0)
337  internal::my_close(file,MYF(0)); // no files in net reading
338  return true; // Can't allocate buffers
339  }
340 
341  /*
342  * Per the SQL standard, inserting NULL into a NOT NULL
343  * field requires an error to be thrown.
344  *
345  * @NOTE
346  *
347  * NULL check and handling occurs in field_conv.cc
348  */
349  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
350  session->cuted_fields=0L;
351  /* Skip lines if there is a line terminator */
352  if (ex->line_term->length())
353  {
354  /* ex->skip_lines needs to be preserved for logging */
355  while (skip_lines > 0)
356  {
357  skip_lines--;
358  if (read_info.next_line())
359  break;
360  }
361  }
362 
363  if (!(error=test(read_info.error)))
364  {
365 
367  if (ignore ||
368  handle_duplicates == DUP_REPLACE)
369  table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
370  if (handle_duplicates == DUP_REPLACE)
371  table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
372  table->cursor->ha_start_bulk_insert((ha_rows) 0);
373  table->copy_blobs=1;
374 
375  session->setAbortOnWarning(true);
376 
377  if (!field_term->length() && !enclosed->length())
378  error= read_fixed_length(session, info, table_list, fields_vars,
379  set_fields, set_values, read_info,
380  skip_lines, ignore);
381  else
382  error= read_sep_field(session, info, table_list, fields_vars,
383  set_fields, set_values, read_info,
384  *enclosed, skip_lines, ignore);
385  if (table->cursor->ha_end_bulk_insert() && !error)
386  {
387  table->print_error(errno, MYF(0));
388  error= 1;
389  }
390  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
391  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
392  table->next_number_field=0;
393  }
394  if (file >= 0)
395  internal::my_close(file,MYF(0));
396  free_blobs(table); /* if pack_blob was used */
397  table->copy_blobs=0;
398  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
399 
400  if (error)
401  {
402  error= -1; // Error on read
403  goto err;
404  }
405 
406  char msg[FN_REFLEN];
407  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
408  (info.records - info.copied), session->cuted_fields);
409 
410  if (session->transaction.stmt.hasModifiedNonTransData())
411  session->transaction.all.markModifiedNonTransData();
412 
413  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
414 err:
415  (void)(transactional_table);
416  assert(transactional_table || !(info.copied || info.deleted) ||
417  session->transaction.stmt.hasModifiedNonTransData());
419  table->auto_increment_field_not_null= false;
420  session->setAbortOnWarning(false);
421 
422  return(error);
423 }
424 
425 
426 /****************************************************************************
427 ** Read of rows of fixed size + optional garage + optonal newline
428 ****************************************************************************/
429 
430 static int
431 read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
432  List<Item> &fields_vars, List<Item> &set_fields,
433  List<Item> &set_values, READ_INFO &read_info,
434  uint32_t skip_lines, bool ignore_check_option_errors)
435 {
436  List<Item>::iterator it(fields_vars.begin());
437  Item_field *sql_field;
438  Table *table= table_list->table;
439  bool err;
440 
441  while (!read_info.read_fixed_length())
442  {
443  if (session->getKilled())
444  {
445  session->send_kill_message();
446  return 1;
447  }
448  if (skip_lines)
449  {
450  /*
451  We could implement this with a simple seek if:
452  - We are not using DATA INFILE LOCAL
453  - escape character is ""
454  - line starting prefix is ""
455  */
456  skip_lines--;
457  continue;
458  }
459  it= fields_vars.begin();
460  unsigned char *pos=read_info.row_start;
461 #ifdef HAVE_VALGRIND
462  read_info.row_end[0]=0;
463 #endif
464 
465  table->restoreRecordAsDefault();
466  /*
467  There is no variables in fields_vars list in this format so
468  this conversion is safe.
469  */
470  while ((sql_field= (Item_field*) it++))
471  {
472  Field *field= sql_field->field;
473  if (field == table->next_number_field)
474  table->auto_increment_field_not_null= true;
475  /*
476  No fields specified in fields_vars list can be null in this format.
477  Mark field as not null, we should do this for each row because of
478  restore_record...
479  */
480  field->set_notnull();
481 
482  if (pos == read_info.row_end)
483  {
484  session->cuted_fields++; /* Not enough fields */
485  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
486  ER_WARN_TOO_FEW_RECORDS,
487  ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
488 
489  if (not field->maybe_null() and field->is_timestamp())
490  ((field::Epoch::pointer) field)->set_time();
491  }
492  else
493  {
494  uint32_t length;
495  unsigned char save_chr;
496  if ((length=(uint32_t) (read_info.row_end-pos)) >
497  field->field_length)
498  {
499  length=field->field_length;
500  }
501  save_chr=pos[length];
502  pos[length]='\0'; // Add temp null terminator for store()
503  field->store((char*) pos,length,read_info.read_charset);
504  pos[length]=save_chr;
505  if ((pos+=length) > read_info.row_end)
506  pos= read_info.row_end; /* Fills rest with space */
507  }
508  }
509  if (pos != read_info.row_end)
510  {
511  session->cuted_fields++; /* To long row */
512  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
513  ER_WARN_TOO_MANY_RECORDS,
514  ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
515  }
516 
517  if (session->getKilled() ||
518  fill_record(session, set_fields, set_values,
519  ignore_check_option_errors))
520  return 1;
521 
522  err= write_record(session, table, &info);
523  table->auto_increment_field_not_null= false;
524  if (err)
525  return 1;
526 
527  /*
528  We don't need to reset auto-increment field since we are restoring
529  its default value at the beginning of each loop iteration.
530  */
531  if (read_info.next_line()) // Skip to next line
532  break;
533  if (read_info.line_cuted)
534  {
535  session->cuted_fields++; /* To long row */
536  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
537  ER_WARN_TOO_MANY_RECORDS,
538  ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
539  }
540  session->row_count++;
541  }
542  return(test(read_info.error));
543 }
544 
545 
546 
547 static int
548 read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
549  List<Item> &fields_vars, List<Item> &set_fields,
550  List<Item> &set_values, READ_INFO &read_info,
551  String &enclosed, uint32_t skip_lines,
552  bool ignore_check_option_errors)
553 {
554  List<Item>::iterator it(fields_vars.begin());
555  Item *item;
556  Table *table= table_list->table;
557  uint32_t enclosed_length;
558  bool err;
559 
560  enclosed_length=enclosed.length();
561 
562  for (;;it= fields_vars.begin())
563  {
564  if (session->getKilled())
565  {
566  session->send_kill_message();
567  return 1;
568  }
569 
570  table->restoreRecordAsDefault();
571 
572  while ((item= it++))
573  {
574  uint32_t length;
575  unsigned char *pos;
576  Item *real_item;
577 
578  if (read_info.read_field())
579  break;
580 
581  /* If this line is to be skipped we don't want to fill field or var */
582  if (skip_lines)
583  continue;
584 
585  pos=read_info.row_start;
586  length=(uint32_t) (read_info.row_end-pos);
587 
588  real_item= item->real_item();
589 
590  if ((!read_info.enclosed && (enclosed_length && length == 4 && !memcmp(pos, STRING_WITH_LEN("NULL")))) ||
591  (length == 1 && read_info.found_null))
592  {
593 
594  if (real_item->type() == Item::FIELD_ITEM)
595  {
596  Field *field= ((Item_field *)real_item)->field;
597  if (field->reset())
598  {
599  my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
600  session->row_count);
601  return 1;
602  }
603  field->set_null();
604  if (not field->maybe_null())
605  {
606  if (field->is_timestamp())
607  {
608  ((field::Epoch::pointer) field)->set_time();
609  }
610  else if (field != table->next_number_field)
611  {
612  field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
613  }
614  }
615  }
616  else if (item->type() == Item::STRING_ITEM)
617  {
618  ((Item_user_var_as_out_param *)item)->set_null_value(
619  read_info.read_charset);
620  }
621  else
622  {
623  my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
624  return 1;
625  }
626 
627  continue;
628  }
629 
630  if (real_item->type() == Item::FIELD_ITEM)
631  {
632  Field *field= ((Item_field *)real_item)->field;
633  field->set_notnull();
634  read_info.row_end[0]=0; // Safe to change end marker
635  if (field == table->next_number_field)
636  table->auto_increment_field_not_null= true;
637  field->store((char*) pos, length, read_info.read_charset);
638  }
639  else if (item->type() == Item::STRING_ITEM)
640  {
641  ((Item_user_var_as_out_param *)item)->set_value(str_ref((char*) pos, length), read_info.read_charset);
642  }
643  else
644  {
645  my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
646  return 1;
647  }
648  }
649  if (read_info.error)
650  break;
651  if (skip_lines)
652  {
653  skip_lines--;
654  continue;
655  }
656  if (item)
657  {
658  /* Have not read any field, thus input cursor is simply ended */
659  if (item == &fields_vars.front())
660  break;
661  for (; item ; item= it++)
662  {
663  Item *real_item= item->real_item();
664  if (real_item->type() == Item::FIELD_ITEM)
665  {
666  Field *field= ((Item_field *)real_item)->field;
667  if (field->reset())
668  {
669  my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
670  session->row_count);
671  return 1;
672  }
673  if (not field->maybe_null() and field->is_timestamp())
674  ((field::Epoch::pointer) field)->set_time();
675  /*
676  QQ: We probably should not throw warning for each field.
677  But how about intention to always have the same number
678  of warnings in Session::cuted_fields (and get rid of cuted_fields
679  in the end ?)
680  */
681  session->cuted_fields++;
682  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
683  ER_WARN_TOO_FEW_RECORDS,
684  ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
685  }
686  else if (item->type() == Item::STRING_ITEM)
687  {
688  ((Item_user_var_as_out_param *)item)->set_null_value(
689  read_info.read_charset);
690  }
691  else
692  {
693  my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
694  return 1;
695  }
696  }
697  }
698 
699  if (session->getKilled() ||
700  fill_record(session, set_fields, set_values,
701  ignore_check_option_errors))
702  return 1;
703 
704  err= write_record(session, table, &info);
705  table->auto_increment_field_not_null= false;
706  if (err)
707  return 1;
708  /*
709  We don't need to reset auto-increment field since we are restoring
710  its default value at the beginning of each loop iteration.
711  */
712  if (read_info.next_line()) // Skip to next line
713  break;
714  if (read_info.line_cuted)
715  {
716  session->cuted_fields++; /* To long row */
717  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
718  ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
719  session->row_count);
720  if (session->getKilled())
721  return 1;
722  }
723  session->row_count++;
724  }
725  return(test(read_info.error));
726 }
727 
728 
729 /* Unescape all escape characters, mark \N as null */
730 
731 char
732 READ_INFO::unescape(char chr)
733 {
734  /* keep this switch synchornous with the ESCAPE_CHARS macro */
735  switch(chr) {
736  case 'n': return '\n';
737  case 't': return '\t';
738  case 'r': return '\r';
739  case 'b': return '\b';
740  case '0': return 0; // Ascii null
741  case 'Z': return '\032'; // Win32 end of cursor
742  case 'N': found_null=1;
743 
744  /* fall through */
745  default: return chr;
746  }
747 }
748 
749 
750 /*
751  Read a line using buffering
752  If last line is empty (in line mode) then it isn't outputed
753 */
754 
755 
756 READ_INFO::READ_INFO(int file_par, size_t tot_length,
757  const charset_info_st * const cs,
758  String &field_term, String &line_start, String &line_term,
759  String &enclosed_par, int escape, bool is_fifo)
760  :cursor(file_par),escape_char(escape)
761 {
762  read_charset= cs;
763  field_term_ptr=(char*) field_term.ptr();
764  field_term_length= field_term.length();
765  line_term_ptr=(char*) line_term.ptr();
766  line_term_length= line_term.length();
767  if (line_start.length() == 0)
768  {
769  line_start_ptr=0;
770  start_of_line= 0;
771  }
772  else
773  {
774  line_start_ptr=(char*) line_start.ptr();
775  line_start_end=line_start_ptr+line_start.length();
776  start_of_line= 1;
777  }
778  /* If field_terminator == line_terminator, don't use line_terminator */
779  if (field_term_length == line_term_length &&
780  !memcmp(field_term_ptr,line_term_ptr,field_term_length))
781  {
782  line_term_length=0;
783  line_term_ptr=(char*) "";
784  }
785  enclosed_char= (enclosed_length=enclosed_par.length()) ?
786  (unsigned char) enclosed_par[0] : INT_MAX;
787  field_term_char= field_term_length ? (unsigned char) field_term_ptr[0] : INT_MAX;
788  line_term_char= line_term_length ? (unsigned char) line_term_ptr[0] : INT_MAX;
789  error=eof=found_end_of_line=found_null=line_cuted=0;
790  buff_length=tot_length;
791 
792 
793  /* Set of a stack for unget if long terminators */
794  size_t length= max(field_term_length,line_term_length)+1;
795  set_if_bigger(length, line_start.length());
796  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
797 
798  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
799  error=1;
800  else
801  {
802  end_of_buff=buffer+buff_length;
803  if (cache.init_io_cache((false) ? -1 : cursor, 0,
804  (false) ? internal::READ_NET :
805  (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
806  MYF(MY_WME)))
807  {
808  free((unsigned char*) buffer);
809  error=1;
810  }
811  else
812  {
813  /*
814  init_io_cache() will not initialize read_function member
815  if the cache is READ_NET. So we work around the problem with a
816  manual assignment
817  */
818  need_end_io_cache = 1;
819  }
820  }
821 }
822 
823 
824 READ_INFO::~READ_INFO()
825 {
826  if (!error)
827  {
828  if (need_end_io_cache)
829  cache.end_io_cache();
830  free(buffer);
831  error=1;
832  }
833 }
834 
835 
836 #define GET (stack_pos != stack ? *--stack_pos : cache.get())
837 #define PUSH(A) *(stack_pos++)=(A)
838 
839 
840 inline int READ_INFO::terminator(char *ptr,uint32_t length)
841 {
842  int chr=0; // Keep gcc happy
843  uint32_t i;
844  for (i=1 ; i < length ; i++)
845  {
846  if ((chr=GET) != *++ptr)
847  {
848  break;
849  }
850  }
851  if (i == length)
852  return 1;
853  PUSH(chr);
854  while (i-- > 1)
855  PUSH((unsigned char) *--ptr);
856  return 0;
857 }
858 
859 
860 int READ_INFO::read_field()
861 {
862  int chr,found_enclosed_char;
863  unsigned char *to,*new_buffer;
864 
865  found_null=0;
866  if (found_end_of_line)
867  return 1; // One have to call next_line
868 
869  /* Skip until we find 'line_start' */
870 
871  if (start_of_line)
872  { // Skip until line_start
873  start_of_line=0;
874  if (find_start_of_fields())
875  return 1;
876  }
877  if ((chr=GET) == my_b_EOF)
878  {
879  found_end_of_line=eof=1;
880  return 1;
881  }
882  to=buffer;
883  if (chr == enclosed_char)
884  {
885  found_enclosed_char=enclosed_char;
886  *to++=(unsigned char) chr; // If error
887  }
888  else
889  {
890  found_enclosed_char= INT_MAX;
891  PUSH(chr);
892  }
893 
894  for (;;)
895  {
896  while ( to < end_of_buff)
897  {
898  chr = GET;
899  if ((my_mbcharlen(read_charset, chr) > 1) &&
900  to+my_mbcharlen(read_charset, chr) <= end_of_buff)
901  {
902  unsigned char* p = (unsigned char*)to;
903  *to++ = chr;
904  int ml = my_mbcharlen(read_charset, chr);
905  int i;
906  for (i=1; i<ml; i++) {
907  chr = GET;
908  if (chr == my_b_EOF)
909  goto found_eof;
910  *to++ = chr;
911  }
912  if (my_ismbchar(read_charset,
913  (const char *)p,
914  (const char *)to))
915  continue;
916  for (i=0; i<ml; i++)
917  PUSH((unsigned char) *--to);
918  chr = GET;
919  }
920  if (chr == my_b_EOF)
921  goto found_eof;
922  if (chr == escape_char)
923  {
924  if ((chr=GET) == my_b_EOF)
925  {
926  *to++= (unsigned char) escape_char;
927  goto found_eof;
928  }
929  /*
930  When escape_char == enclosed_char, we treat it like we do for
931  handling quotes in SQL parsing -- you can double-up the
932  escape_char to include it literally, but it doesn't do escapes
933  like \n. This allows: LOAD DATA ... ENCLOSED BY '"' ESCAPED BY '"'
934  with data like: "fie""ld1", "field2"
935  */
936  if (escape_char != enclosed_char || chr == escape_char)
937  {
938  *to++ = (unsigned char) unescape((char) chr);
939  continue;
940  }
941  PUSH(chr);
942  chr= escape_char;
943  }
944 #ifdef ALLOW_LINESEPARATOR_IN_STRINGS
945  if (chr == line_term_char)
946 #else
947  if (chr == line_term_char && found_enclosed_char == INT_MAX)
948 #endif
949  {
950  if (terminator(line_term_ptr,line_term_length))
951  { // Maybe unexpected linefeed
952  enclosed=0;
953  found_end_of_line=1;
954  row_start=buffer;
955  row_end= to;
956  return 0;
957  }
958  }
959  if (chr == found_enclosed_char)
960  {
961  if ((chr=GET) == found_enclosed_char)
962  { // Remove dupplicated
963  *to++ = (unsigned char) chr;
964  continue;
965  }
966  // End of enclosed field if followed by field_term or line_term
967  if (chr == my_b_EOF ||
968  (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
969  { // Maybe unexpected linefeed
970  enclosed=1;
971  found_end_of_line=1;
972  row_start=buffer+1;
973  row_end= to;
974  return 0;
975  }
976  if (chr == field_term_char &&
977  terminator(field_term_ptr,field_term_length))
978  {
979  enclosed=1;
980  row_start=buffer+1;
981  row_end= to;
982  return 0;
983  }
984  /*
985  The string didn't terminate yet.
986  Store back next character for the loop
987  */
988  PUSH(chr);
989  /* copy the found term character to 'to' */
990  chr= found_enclosed_char;
991  }
992  else if (chr == field_term_char && found_enclosed_char == INT_MAX)
993  {
994  if (terminator(field_term_ptr,field_term_length))
995  {
996  enclosed=0;
997  row_start=buffer;
998  row_end= to;
999  return 0;
1000  }
1001  }
1002  *to++ = (unsigned char) chr;
1003  }
1004  /*
1005  ** We come here if buffer is too small. Enlarge it and continue
1006  */
1007  new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE);
1008  to=new_buffer + (to-buffer);
1009  buffer=new_buffer;
1010  buff_length+=IO_SIZE;
1011  end_of_buff=buffer+buff_length;
1012  }
1013 
1014 found_eof:
1015  enclosed=0;
1016  found_end_of_line=eof=1;
1017  row_start=buffer;
1018  row_end=to;
1019  return 0;
1020 }
1021 
1022 /*
1023  Read a row with fixed length.
1024 
1025  NOTES
1026  The row may not be fixed size on disk if there are escape
1027  characters in the cursor.
1028 
1029  IMPLEMENTATION NOTE
1030  One can't use fixed length with multi-byte charset **
1031 
1032  RETURN
1033  0 ok
1034  1 error
1035 */
1036 
1037 int READ_INFO::read_fixed_length()
1038 {
1039  int chr;
1040  unsigned char *to;
1041  if (found_end_of_line)
1042  return 1; // One have to call next_line
1043 
1044  if (start_of_line)
1045  { // Skip until line_start
1046  start_of_line=0;
1047  if (find_start_of_fields())
1048  return 1;
1049  }
1050 
1051  to=row_start=buffer;
1052  while (to < end_of_buff)
1053  {
1054  if ((chr=GET) == my_b_EOF)
1055  goto found_eof;
1056  if (chr == escape_char)
1057  {
1058  if ((chr=GET) == my_b_EOF)
1059  {
1060  *to++= (unsigned char) escape_char;
1061  goto found_eof;
1062  }
1063  *to++ =(unsigned char) unescape((char) chr);
1064  continue;
1065  }
1066  if (chr == line_term_char)
1067  {
1068  if (terminator(line_term_ptr,line_term_length))
1069  { // Maybe unexpected linefeed
1070  found_end_of_line=1;
1071  row_end= to;
1072  return 0;
1073  }
1074  }
1075  *to++ = (unsigned char) chr;
1076  }
1077  row_end=to; // Found full line
1078  return 0;
1079 
1080 found_eof:
1081  found_end_of_line=eof=1;
1082  row_start=buffer;
1083  row_end=to;
1084  return to == buffer ? 1 : 0;
1085 }
1086 
1087 
1088 int READ_INFO::next_line()
1089 {
1090  line_cuted=0;
1091  start_of_line= line_start_ptr != 0;
1092  if (found_end_of_line || eof)
1093  {
1094  found_end_of_line=0;
1095  return eof;
1096  }
1097  found_end_of_line=0;
1098  if (!line_term_length)
1099  return 0; // No lines
1100  for (;;)
1101  {
1102  int chr = GET;
1103  if (my_mbcharlen(read_charset, chr) > 1)
1104  {
1105  for (uint32_t i=1;
1106  chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1107  i++)
1108  chr = GET;
1109  if (chr == escape_char)
1110  continue;
1111  }
1112  if (chr == my_b_EOF)
1113  {
1114  eof=1;
1115  return 1;
1116  }
1117  if (chr == escape_char)
1118  {
1119  line_cuted=1;
1120  if (GET == my_b_EOF)
1121  return 1;
1122  continue;
1123  }
1124  if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
1125  return 0;
1126  line_cuted=1;
1127  }
1128 }
1129 
1130 
1131 bool READ_INFO::find_start_of_fields()
1132 {
1133  int chr;
1134  try_again:
1135  do
1136  {
1137  if ((chr=GET) == my_b_EOF)
1138  {
1139  found_end_of_line=eof=1;
1140  return 1;
1141  }
1142  } while ((char) chr != line_start_ptr[0]);
1143  for (char *ptr=line_start_ptr+1 ; ptr != line_start_end ; ptr++)
1144  {
1145  chr=GET; // Eof will be checked later
1146  if ((char) chr != *ptr)
1147  { // Can't be line_start
1148  PUSH(chr);
1149  while (--ptr != line_start_ptr)
1150  { // Restart with next char
1151  PUSH((unsigned char) *ptr);
1152  }
1153  goto try_again;
1154  }
1155  }
1156  return 0;
1157 }
1158 
1159 
1160 } /* namespace drizzled */
void my_ok(ha_rows affected_rows=0, ha_rows found_rows_arg=0, uint64_t passed_id=0, const char *message=NULL)
Definition: session.cc:1877
void ha_release_auto_increment()
Definition: cursor.cc:576
TableList * next_global
Definition: table_list.h:110
bool copy_blobs
Definition: table.h:178
ha_rows cuted_fields
Definition: session.h:408
Table * table
opened table
Definition: table_list.h:145
Field * found_next_number_field
Definition: table.h:143
int init_io_cache(int file, size_t cachesize, cache_type type, my_off_t seek_offset, bool use_async_io, myf cache_myflags)
Initialize an io_cache_st object.
Definition: mf_iocache.cc:174
Field * next_number_field
Definition: table.h:142
field::Epoch * timestamp_field
Definition: table.h:144
int end_io_cache()
Free an io_cache_st object.
Definition: mf_iocache.cc:685
Cursor * cursor
Definition: table.h:68
bool openTablesLock(TableList *)
Definition: session.cc:1767