Drizzled Public API Documentation

sql_load.cc
1 /* Copyright (C) 2000-2006 MySQL AB
2  Copyright (C) 2011 Stewart Smith
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
16 
17 
18 /* Copy data from a textfile to table */
19 
20 #include <config.h>
21 
22 #include <drizzled/sql_load.h>
23 #include <drizzled/error.h>
24 #include <drizzled/catalog/local.h>
25 #include <drizzled/session.h>
26 #include <drizzled/sql_base.h>
27 #include <drizzled/field/epoch.h>
28 #include <drizzled/internal/my_sys.h>
29 #include <drizzled/internal/iocache.h>
30 #include <drizzled/plugin/storage_engine.h>
31 #include <drizzled/sql_lex.h>
32 #include <drizzled/copy_info.h>
33 #include <drizzled/file_exchange.h>
34 #include <drizzled/util/test.h>
35 #include <drizzled/session/transactions.h>
36 
37 #include <sys/stat.h>
38 #include <fcntl.h>
39 #include <algorithm>
40 #include <climits>
41 #include <boost/filesystem.hpp>
42 
43 namespace fs=boost::filesystem;
44 using namespace std;
45 namespace drizzled
46 {
47 
48 class READ_INFO {
49  int cursor;
50  unsigned char *buffer; /* Buffer for read text */
51  unsigned char *end_of_buff; /* Data in bufferts ends here */
52  size_t buff_length; /* Length of buffert */
53  size_t max_length; /* Max length of row */
54  char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
55  uint field_term_length,line_term_length,enclosed_length;
56  int field_term_char,line_term_char,enclosed_char,escape_char;
57  int *stack,*stack_pos;
58  bool found_end_of_line,start_of_line,eof;
59  bool need_end_io_cache;
61 
62 public:
63  bool error,line_cuted,found_null,enclosed;
64  unsigned char *row_start, /* Found row starts here */
65  *row_end; /* Found row ends here */
66  const charset_info_st *read_charset;
67 
68  READ_INFO(int cursor, size_t tot_length, const charset_info_st * const cs,
69  String &field_term,String &line_start,String &line_term,
70  String &enclosed,int escape, bool is_fifo);
71  ~READ_INFO();
72  int read_field();
73  int read_fixed_length(void);
74  int next_line(void);
75  char unescape(char chr);
76  int terminator(char *ptr,uint32_t length);
77  bool find_start_of_fields();
78 
79  /*
80  We need to force cache close before destructor is invoked to log
81  the last read block
82  */
83  void end_io_cache()
84  {
85  cache.end_io_cache();
86  need_end_io_cache = 0;
87  }
88 
89  /*
90  Either this method, or we need to make cache public
91  Arg must be set from load() since constructor does not see
92  either the table or Session value
93  */
94  void set_io_cache_arg(void* arg) { cache.arg = arg; }
95 };
96 
97 static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
98  List<Item> &fields_vars, List<Item> &set_fields,
99  List<Item> &set_values, READ_INFO &read_info,
100  uint32_t skip_lines,
101  bool ignore_check_option_errors);
102 static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
103  List<Item> &fields_vars, List<Item> &set_fields,
104  List<Item> &set_values, READ_INFO &read_info,
105  String &enclosed, uint32_t skip_lines,
106  bool ignore_check_option_errors);
107 
108 
109 /*
110  Execute LOAD DATA query
111 
112  SYNOPSYS
113  load()
114  session - current thread
115  ex - file_exchange object representing source cursor and its parsing rules
116  table_list - list of tables to which we are loading data
117  fields_vars - list of fields and variables to which we read
118  data from cursor
119  set_fields - list of fields mentioned in set clause
120  set_values - expressions to assign to fields in previous list
121  handle_duplicates - indicates whenever we should emit error or
122  replace row if we will meet duplicates.
123  ignore - - indicates whenever we should ignore duplicates
124 
125  RETURN VALUES
126  true - error / false - success
127 */
128 
129 int load(Session *session,file_exchange *ex,TableList *table_list,
130  List<Item> &fields_vars, List<Item> &set_fields,
131  List<Item> &set_values,
132  enum enum_duplicates handle_duplicates, bool ignore)
133 {
134  int file;
135  Table *table= NULL;
136  int error;
137  String *field_term=ex->field_term,*escaped=ex->escaped;
138  String *enclosed=ex->enclosed;
139  bool is_fifo=0;
140 
141  assert(table_list->getSchemaName()); // This should never be null
142 
143  /*
144  If path for cursor is not defined, we will use the current database.
145  If this is not set, we will use the directory where the table to be
146  loaded is located
147  */
148  util::string::ptr schema(session->schema());
149  const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName(); // Result should never be null
150  assert(tdb);
151  uint32_t skip_lines= ex->skip_lines;
152  bool transactional_table;
153 
154  /* Escape and enclosed character may be a utf8 4-byte character */
155  if (escaped->length() > 4 || enclosed->length() > 4)
156  {
157  my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
158  return true;
159  }
160 
161  if (session->openTablesLock(table_list))
162  return true;
163 
164  if (setup_tables_and_check_access(session, &session->lex().select_lex.context,
165  &session->lex().select_lex.top_join_list,
166  table_list,
167  &session->lex().select_lex.leaf_tables, true))
168  return(-1);
169 
170  /*
171  Let us emit an error if we are loading data to table which is used
172  in subselect in SET clause like we do it for INSERT.
173 
174  The main thing to fix to remove this restriction is to ensure that the
175  table is marked to be 'used for insert' in which case we should never
176  mark this table as 'const table' (ie, one that has only one row).
177  */
178  if (unique_table(table_list, table_list->next_global))
179  {
180  my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
181  return true;
182  }
183 
184  table= table_list->table;
185  transactional_table= table->cursor->has_transactions();
186 
187  if (!fields_vars.size())
188  {
189  Field **field;
190  for (field= table->getFields(); *field ; field++)
191  fields_vars.push_back(new Item_field(*field));
192  table->setWriteSet();
193  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
194  /*
195  Let us also prepare SET clause, altough it is probably empty
196  in this case.
197  */
198  if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
199  setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
200  return true;
201  }
202  else
203  { // Part field list
204  /* TODO: use this conds for 'WITH CHECK OPTIONS' */
205  if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
206  setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
207  check_that_all_fields_are_given_values(session, table, table_list))
208  return true;
209  /*
210  Check whenever TIMESTAMP field with auto-set feature specified
211  explicitly.
212  */
213  if (table->timestamp_field)
214  {
215  if (table->isWriteSet(table->timestamp_field->position()))
216  {
217  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
218  }
219  else
220  {
221  table->setWriteSet(table->timestamp_field->position());
222  }
223  }
224  /* Fix the expressions in SET clause */
225  if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
226  return true;
227  }
228 
229  table->mark_columns_needed_for_insert();
230 
231  size_t tot_length=0;
232  bool use_blobs= 0, use_vars= 0;
233  List<Item>::iterator it(fields_vars.begin());
234  Item *item;
235 
236  while ((item= it++))
237  {
238  Item *real_item= item->real_item();
239 
240  if (real_item->type() == Item::FIELD_ITEM)
241  {
242  Field *field= ((Item_field*)real_item)->field;
243  if (field->flags & BLOB_FLAG)
244  {
245  use_blobs= 1;
246  tot_length+= 256; // Will be extended if needed
247  }
248  else
249  tot_length+= field->field_length;
250  }
251  else if (item->type() == Item::STRING_ITEM)
252  use_vars= 1;
253  }
254  if (use_blobs && !ex->line_term->length() && !field_term->length())
255  {
256  my_message(ER_BLOBS_AND_NO_TERMINATED,ER(ER_BLOBS_AND_NO_TERMINATED),
257  MYF(0));
258  return true;
259  }
260  if (use_vars && !field_term->length() && !enclosed->length())
261  {
262  my_error(ER_LOAD_FROM_FIXED_SIZE_ROWS_TO_VAR, MYF(0));
263  return true;
264  }
265 
266  fs::path to_file(ex->file_name);
267  fs::path target_path(fs::system_complete(catalog::local_identifier().getPath()));
268  if (not to_file.has_root_directory())
269  {
270  int count_elements= 0;
271  for (fs::path::iterator iter= to_file.begin();
272  iter != to_file.end();
273  ++iter, ++count_elements)
274  { }
275 
276  if (count_elements == 1)
277  {
278  target_path /= tdb;
279  }
280  target_path /= to_file;
281  }
282  else
283  {
284  target_path= to_file;
285  }
286 
287  if (not secure_file_priv.string().empty())
288  {
289  if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
290  {
291  /* Read only allowed from within dir specified by secure_file_priv */
292  my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
293  return true;
294  }
295  }
296 
297  struct stat stat_info;
298  if (stat(target_path.file_string().c_str(), &stat_info))
299  {
300  my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
301  return true;
302  }
303 
304  // if we are not in slave thread, the cursor must be:
305  if (!((stat_info.st_mode & S_IROTH) == S_IROTH && // readable by others
306  (stat_info.st_mode & S_IFLNK) != S_IFLNK && // and not a symlink
307  ((stat_info.st_mode & S_IFREG) == S_IFREG ||
308  (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
309  {
310  my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
311  return true;
312  }
313  if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
314  is_fifo = 1;
315 
316 
317  if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
318  {
319  my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
320  return true;
321  }
322  CopyInfo info;
323  memset(&info, 0, sizeof(info));
324  info.ignore= ignore;
325  info.handle_duplicates=handle_duplicates;
326  info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
327 
328  identifier::Schema identifier(*schema);
329  READ_INFO read_info(file, tot_length,
330  ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
331  *field_term, *ex->line_start, *ex->line_term, *enclosed,
332  info.escape_char, is_fifo);
333  if (read_info.error)
334  {
335  if (file >= 0)
336  internal::my_close(file,MYF(0)); // no files in net reading
337  return true; // Can't allocate buffers
338  }
339 
340  /*
341  * Per the SQL standard, inserting NULL into a NOT NULL
342  * field requires an error to be thrown.
343  *
344  * @NOTE
345  *
346  * NULL check and handling occurs in field_conv.cc
347  */
348  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
349  session->cuted_fields=0L;
350  /* Skip lines if there is a line terminator */
351  if (ex->line_term->length())
352  {
353  /* ex->skip_lines needs to be preserved for logging */
354  while (skip_lines > 0)
355  {
356  skip_lines--;
357  if (read_info.next_line())
358  break;
359  }
360  }
361 
362  if (!(error=test(read_info.error)))
363  {
364 
366  if (ignore ||
367  handle_duplicates == DUP_REPLACE)
368  table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
369  if (handle_duplicates == DUP_REPLACE)
370  table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
371  table->cursor->ha_start_bulk_insert((ha_rows) 0);
372  table->copy_blobs=1;
373 
374  session->setAbortOnWarning(true);
375 
376  if (!field_term->length() && !enclosed->length())
377  error= read_fixed_length(session, info, table_list, fields_vars,
378  set_fields, set_values, read_info,
379  skip_lines, ignore);
380  else
381  error= read_sep_field(session, info, table_list, fields_vars,
382  set_fields, set_values, read_info,
383  *enclosed, skip_lines, ignore);
384  if (table->cursor->ha_end_bulk_insert() && !error)
385  {
386  table->print_error(errno, MYF(0));
387  error= 1;
388  }
389  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
390  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
391  table->next_number_field=0;
392  }
393  if (file >= 0)
394  internal::my_close(file,MYF(0));
395  free_blobs(table); /* if pack_blob was used */
396  table->copy_blobs=0;
397  session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
398 
399  if (error)
400  {
401  error= -1; // Error on read
402  goto err;
403  }
404 
405  char msg[FN_REFLEN];
406  snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
407  (info.records - info.copied), session->cuted_fields);
408 
409  if (session->transaction.stmt.hasModifiedNonTransData())
410  session->transaction.all.markModifiedNonTransData();
411 
412  session->my_ok(info.copied + info.deleted, 0, 0L, msg);
413 err:
414  assert(transactional_table || !(info.copied || info.deleted) ||
415  session->transaction.stmt.hasModifiedNonTransData());
417  table->auto_increment_field_not_null= false;
418  session->setAbortOnWarning(false);
419 
420  return(error);
421 }
422 
423 
424 /****************************************************************************
425 ** Read of rows of fixed size + optional garage + optonal newline
426 ****************************************************************************/
427 
428 static int
429 read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
430  List<Item> &fields_vars, List<Item> &set_fields,
431  List<Item> &set_values, READ_INFO &read_info,
432  uint32_t skip_lines, bool ignore_check_option_errors)
433 {
434  List<Item>::iterator it(fields_vars.begin());
435  Item_field *sql_field;
436  Table *table= table_list->table;
437  bool err;
438 
439  while (!read_info.read_fixed_length())
440  {
441  if (session->getKilled())
442  {
443  session->send_kill_message();
444  return 1;
445  }
446  if (skip_lines)
447  {
448  /*
449  We could implement this with a simple seek if:
450  - We are not using DATA INFILE LOCAL
451  - escape character is ""
452  - line starting prefix is ""
453  */
454  skip_lines--;
455  continue;
456  }
457  it= fields_vars.begin();
458  unsigned char *pos=read_info.row_start;
459 #ifdef HAVE_VALGRIND
460  read_info.row_end[0]=0;
461 #endif
462 
463  table->restoreRecordAsDefault();
464  /*
465  There is no variables in fields_vars list in this format so
466  this conversion is safe.
467  */
468  while ((sql_field= (Item_field*) it++))
469  {
470  Field *field= sql_field->field;
471  if (field == table->next_number_field)
472  table->auto_increment_field_not_null= true;
473  /*
474  No fields specified in fields_vars list can be null in this format.
475  Mark field as not null, we should do this for each row because of
476  restore_record...
477  */
478  field->set_notnull();
479 
480  if (pos == read_info.row_end)
481  {
482  session->cuted_fields++; /* Not enough fields */
483  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
484  ER_WARN_TOO_FEW_RECORDS,
485  ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
486 
487  if (not field->maybe_null() and field->is_timestamp())
488  ((field::Epoch::pointer) field)->set_time();
489  }
490  else
491  {
492  uint32_t length;
493  unsigned char save_chr;
494  if ((length=(uint32_t) (read_info.row_end-pos)) >
495  field->field_length)
496  {
497  length=field->field_length;
498  }
499  save_chr=pos[length];
500  pos[length]='\0'; // Add temp null terminator for store()
501  field->store((char*) pos,length,read_info.read_charset);
502  pos[length]=save_chr;
503  if ((pos+=length) > read_info.row_end)
504  pos= read_info.row_end; /* Fills rest with space */
505  }
506  }
507  if (pos != read_info.row_end)
508  {
509  session->cuted_fields++; /* To long row */
510  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
511  ER_WARN_TOO_MANY_RECORDS,
512  ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
513  }
514 
515  if (session->getKilled() ||
516  fill_record(session, set_fields, set_values,
517  ignore_check_option_errors))
518  return 1;
519 
520  err= write_record(session, table, &info);
521  table->auto_increment_field_not_null= false;
522  if (err)
523  return 1;
524 
525  /*
526  We don't need to reset auto-increment field since we are restoring
527  its default value at the beginning of each loop iteration.
528  */
529  if (read_info.next_line()) // Skip to next line
530  break;
531  if (read_info.line_cuted)
532  {
533  session->cuted_fields++; /* To long row */
534  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
535  ER_WARN_TOO_MANY_RECORDS,
536  ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
537  }
538  session->row_count++;
539  }
540  return(test(read_info.error));
541 }
542 
543 
544 
545 static int
546 read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
547  List<Item> &fields_vars, List<Item> &set_fields,
548  List<Item> &set_values, READ_INFO &read_info,
549  String &enclosed, uint32_t skip_lines,
550  bool ignore_check_option_errors)
551 {
552  List<Item>::iterator it(fields_vars.begin());
553  Item *item;
554  Table *table= table_list->table;
555  uint32_t enclosed_length;
556  bool err;
557 
558  enclosed_length=enclosed.length();
559 
560  for (;;it= fields_vars.begin())
561  {
562  if (session->getKilled())
563  {
564  session->send_kill_message();
565  return 1;
566  }
567 
568  table->restoreRecordAsDefault();
569 
570  while ((item= it++))
571  {
572  uint32_t length;
573  unsigned char *pos;
574  Item *real_item;
575 
576  if (read_info.read_field())
577  break;
578 
579  /* If this line is to be skipped we don't want to fill field or var */
580  if (skip_lines)
581  continue;
582 
583  pos=read_info.row_start;
584  length=(uint32_t) (read_info.row_end-pos);
585 
586  real_item= item->real_item();
587 
588  if ((!read_info.enclosed && (enclosed_length && length == 4 && !memcmp(pos, STRING_WITH_LEN("NULL")))) ||
589  (length == 1 && read_info.found_null))
590  {
591 
592  if (real_item->type() == Item::FIELD_ITEM)
593  {
594  Field *field= ((Item_field *)real_item)->field;
595  if (field->reset())
596  {
597  my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
598  session->row_count);
599  return 1;
600  }
601  field->set_null();
602  if (not field->maybe_null())
603  {
604  if (field->is_timestamp())
605  {
606  ((field::Epoch::pointer) field)->set_time();
607  }
608  else if (field != table->next_number_field)
609  {
610  field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
611  }
612  }
613  }
614  else if (item->type() == Item::STRING_ITEM)
615  {
616  ((Item_user_var_as_out_param *)item)->set_null_value(
617  read_info.read_charset);
618  }
619  else
620  {
621  my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
622  return 1;
623  }
624 
625  continue;
626  }
627 
628  if (real_item->type() == Item::FIELD_ITEM)
629  {
630  Field *field= ((Item_field *)real_item)->field;
631  field->set_notnull();
632  read_info.row_end[0]=0; // Safe to change end marker
633  if (field == table->next_number_field)
634  table->auto_increment_field_not_null= true;
635  field->store((char*) pos, length, read_info.read_charset);
636  }
637  else if (item->type() == Item::STRING_ITEM)
638  {
639  ((Item_user_var_as_out_param *)item)->set_value(str_ref((char*) pos, length), read_info.read_charset);
640  }
641  else
642  {
643  my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
644  return 1;
645  }
646  }
647  if (read_info.error)
648  break;
649  if (skip_lines)
650  {
651  skip_lines--;
652  continue;
653  }
654  if (item)
655  {
656  /* Have not read any field, thus input cursor is simply ended */
657  if (item == &fields_vars.front())
658  break;
659  for (; item ; item= it++)
660  {
661  Item *real_item= item->real_item();
662  if (real_item->type() == Item::FIELD_ITEM)
663  {
664  Field *field= ((Item_field *)real_item)->field;
665  if (field->reset())
666  {
667  my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
668  session->row_count);
669  return 1;
670  }
671  if (not field->maybe_null() and field->is_timestamp())
672  ((field::Epoch::pointer) field)->set_time();
673  /*
674  QQ: We probably should not throw warning for each field.
675  But how about intention to always have the same number
676  of warnings in Session::cuted_fields (and get rid of cuted_fields
677  in the end ?)
678  */
679  session->cuted_fields++;
680  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
681  ER_WARN_TOO_FEW_RECORDS,
682  ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
683  }
684  else if (item->type() == Item::STRING_ITEM)
685  {
686  ((Item_user_var_as_out_param *)item)->set_null_value(
687  read_info.read_charset);
688  }
689  else
690  {
691  my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
692  return 1;
693  }
694  }
695  }
696 
697  if (session->getKilled() ||
698  fill_record(session, set_fields, set_values,
699  ignore_check_option_errors))
700  return 1;
701 
702  err= write_record(session, table, &info);
703  table->auto_increment_field_not_null= false;
704  if (err)
705  return 1;
706  /*
707  We don't need to reset auto-increment field since we are restoring
708  its default value at the beginning of each loop iteration.
709  */
710  if (read_info.next_line()) // Skip to next line
711  break;
712  if (read_info.line_cuted)
713  {
714  session->cuted_fields++; /* To long row */
715  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
716  ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
717  session->row_count);
718  if (session->getKilled())
719  return 1;
720  }
721  session->row_count++;
722  }
723  return(test(read_info.error));
724 }
725 
726 
727 /* Unescape all escape characters, mark \N as null */
728 
729 char
730 READ_INFO::unescape(char chr)
731 {
732  /* keep this switch synchornous with the ESCAPE_CHARS macro */
733  switch(chr) {
734  case 'n': return '\n';
735  case 't': return '\t';
736  case 'r': return '\r';
737  case 'b': return '\b';
738  case '0': return 0; // Ascii null
739  case 'Z': return '\032'; // Win32 end of cursor
740  case 'N': found_null=1;
741 
742  /* fall through */
743  default: return chr;
744  }
745 }
746 
747 
748 /*
749  Read a line using buffering
750  If last line is empty (in line mode) then it isn't outputed
751 */
752 
753 
754 READ_INFO::READ_INFO(int file_par, size_t tot_length,
755  const charset_info_st * const cs,
756  String &field_term, String &line_start, String &line_term,
757  String &enclosed_par, int escape, bool is_fifo)
758  :cursor(file_par),escape_char(escape)
759 {
760  read_charset= cs;
761  field_term_ptr=(char*) field_term.ptr();
762  field_term_length= field_term.length();
763  line_term_ptr=(char*) line_term.ptr();
764  line_term_length= line_term.length();
765  if (line_start.length() == 0)
766  {
767  line_start_ptr=0;
768  start_of_line= 0;
769  }
770  else
771  {
772  line_start_ptr=(char*) line_start.ptr();
773  line_start_end=line_start_ptr+line_start.length();
774  start_of_line= 1;
775  }
776  /* If field_terminator == line_terminator, don't use line_terminator */
777  if (field_term_length == line_term_length &&
778  !memcmp(field_term_ptr,line_term_ptr,field_term_length))
779  {
780  line_term_length=0;
781  line_term_ptr=(char*) "";
782  }
783  enclosed_char= (enclosed_length=enclosed_par.length()) ?
784  (unsigned char) enclosed_par[0] : INT_MAX;
785  field_term_char= field_term_length ? (unsigned char) field_term_ptr[0] : INT_MAX;
786  line_term_char= line_term_length ? (unsigned char) line_term_ptr[0] : INT_MAX;
787  error=eof=found_end_of_line=found_null=line_cuted=0;
788  buff_length=tot_length;
789 
790 
791  /* Set of a stack for unget if long terminators */
792  size_t length= max(field_term_length,line_term_length)+1;
793  set_if_bigger(length, line_start.length());
794  stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
795 
796  if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
797  error=1;
798  else
799  {
800  end_of_buff=buffer+buff_length;
801  if (cache.init_io_cache((false) ? -1 : cursor, 0,
802  (false) ? internal::READ_NET :
803  (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
804  MYF(MY_WME)))
805  {
806  free((unsigned char*) buffer);
807  error=1;
808  }
809  else
810  {
811  /*
812  init_io_cache() will not initialize read_function member
813  if the cache is READ_NET. So we work around the problem with a
814  manual assignment
815  */
816  need_end_io_cache = 1;
817  }
818  }
819 }
820 
821 
822 READ_INFO::~READ_INFO()
823 {
824  if (!error)
825  {
826  if (need_end_io_cache)
827  cache.end_io_cache();
828  free(buffer);
829  error=1;
830  }
831 }
832 
833 
834 #define GET (stack_pos != stack ? *--stack_pos : cache.get())
835 #define PUSH(A) *(stack_pos++)=(A)
836 
837 
838 inline int READ_INFO::terminator(char *ptr,uint32_t length)
839 {
840  int chr=0; // Keep gcc happy
841  uint32_t i;
842  for (i=1 ; i < length ; i++)
843  {
844  if ((chr=GET) != *++ptr)
845  {
846  break;
847  }
848  }
849  if (i == length)
850  return 1;
851  PUSH(chr);
852  while (i-- > 1)
853  PUSH((unsigned char) *--ptr);
854  return 0;
855 }
856 
857 
858 int READ_INFO::read_field()
859 {
860  int chr,found_enclosed_char;
861  unsigned char *to,*new_buffer;
862 
863  found_null=0;
864  if (found_end_of_line)
865  return 1; // One have to call next_line
866 
867  /* Skip until we find 'line_start' */
868 
869  if (start_of_line)
870  { // Skip until line_start
871  start_of_line=0;
872  if (find_start_of_fields())
873  return 1;
874  }
875  if ((chr=GET) == my_b_EOF)
876  {
877  found_end_of_line=eof=1;
878  return 1;
879  }
880  to=buffer;
881  if (chr == enclosed_char)
882  {
883  found_enclosed_char=enclosed_char;
884  *to++=(unsigned char) chr; // If error
885  }
886  else
887  {
888  found_enclosed_char= INT_MAX;
889  PUSH(chr);
890  }
891 
892  for (;;)
893  {
894  while ( to < end_of_buff)
895  {
896  chr = GET;
897  if ((my_mbcharlen(read_charset, chr) > 1) &&
898  to+my_mbcharlen(read_charset, chr) <= end_of_buff)
899  {
900  unsigned char* p = (unsigned char*)to;
901  *to++ = chr;
902  int ml = my_mbcharlen(read_charset, chr);
903  int i;
904  for (i=1; i<ml; i++) {
905  chr = GET;
906  if (chr == my_b_EOF)
907  goto found_eof;
908  *to++ = chr;
909  }
910  if (my_ismbchar(read_charset,
911  (const char *)p,
912  (const char *)to))
913  continue;
914  for (i=0; i<ml; i++)
915  PUSH((unsigned char) *--to);
916  chr = GET;
917  }
918  if (chr == my_b_EOF)
919  goto found_eof;
920  if (chr == escape_char)
921  {
922  if ((chr=GET) == my_b_EOF)
923  {
924  *to++= (unsigned char) escape_char;
925  goto found_eof;
926  }
927  /*
928  When escape_char == enclosed_char, we treat it like we do for
929  handling quotes in SQL parsing -- you can double-up the
930  escape_char to include it literally, but it doesn't do escapes
931  like \n. This allows: LOAD DATA ... ENCLOSED BY '"' ESCAPED BY '"'
932  with data like: "fie""ld1", "field2"
933  */
934  if (escape_char != enclosed_char || chr == escape_char)
935  {
936  *to++ = (unsigned char) unescape((char) chr);
937  continue;
938  }
939  PUSH(chr);
940  chr= escape_char;
941  }
942 #ifdef ALLOW_LINESEPARATOR_IN_STRINGS
943  if (chr == line_term_char)
944 #else
945  if (chr == line_term_char && found_enclosed_char == INT_MAX)
946 #endif
947  {
948  if (terminator(line_term_ptr,line_term_length))
949  { // Maybe unexpected linefeed
950  enclosed=0;
951  found_end_of_line=1;
952  row_start=buffer;
953  row_end= to;
954  return 0;
955  }
956  }
957  if (chr == found_enclosed_char)
958  {
959  if ((chr=GET) == found_enclosed_char)
960  { // Remove dupplicated
961  *to++ = (unsigned char) chr;
962  continue;
963  }
964  // End of enclosed field if followed by field_term or line_term
965  if (chr == my_b_EOF ||
966  (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
967  { // Maybe unexpected linefeed
968  enclosed=1;
969  found_end_of_line=1;
970  row_start=buffer+1;
971  row_end= to;
972  return 0;
973  }
974  if (chr == field_term_char &&
975  terminator(field_term_ptr,field_term_length))
976  {
977  enclosed=1;
978  row_start=buffer+1;
979  row_end= to;
980  return 0;
981  }
982  /*
983  The string didn't terminate yet.
984  Store back next character for the loop
985  */
986  PUSH(chr);
987  /* copy the found term character to 'to' */
988  chr= found_enclosed_char;
989  }
990  else if (chr == field_term_char && found_enclosed_char == INT_MAX)
991  {
992  if (terminator(field_term_ptr,field_term_length))
993  {
994  enclosed=0;
995  row_start=buffer;
996  row_end= to;
997  return 0;
998  }
999  }
1000  *to++ = (unsigned char) chr;
1001  }
1002  /*
1003  ** We come here if buffer is too small. Enlarge it and continue
1004  */
1005  new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE);
1006  to=new_buffer + (to-buffer);
1007  buffer=new_buffer;
1008  buff_length+=IO_SIZE;
1009  end_of_buff=buffer+buff_length;
1010  }
1011 
1012 found_eof:
1013  enclosed=0;
1014  found_end_of_line=eof=1;
1015  row_start=buffer;
1016  row_end=to;
1017  return 0;
1018 }
1019 
1020 /*
1021  Read a row with fixed length.
1022 
1023  NOTES
1024  The row may not be fixed size on disk if there are escape
1025  characters in the cursor.
1026 
1027  IMPLEMENTATION NOTE
1028  One can't use fixed length with multi-byte charset **
1029 
1030  RETURN
1031  0 ok
1032  1 error
1033 */
1034 
1035 int READ_INFO::read_fixed_length()
1036 {
1037  int chr;
1038  unsigned char *to;
1039  if (found_end_of_line)
1040  return 1; // One have to call next_line
1041 
1042  if (start_of_line)
1043  { // Skip until line_start
1044  start_of_line=0;
1045  if (find_start_of_fields())
1046  return 1;
1047  }
1048 
1049  to=row_start=buffer;
1050  while (to < end_of_buff)
1051  {
1052  if ((chr=GET) == my_b_EOF)
1053  goto found_eof;
1054  if (chr == escape_char)
1055  {
1056  if ((chr=GET) == my_b_EOF)
1057  {
1058  *to++= (unsigned char) escape_char;
1059  goto found_eof;
1060  }
1061  *to++ =(unsigned char) unescape((char) chr);
1062  continue;
1063  }
1064  if (chr == line_term_char)
1065  {
1066  if (terminator(line_term_ptr,line_term_length))
1067  { // Maybe unexpected linefeed
1068  found_end_of_line=1;
1069  row_end= to;
1070  return 0;
1071  }
1072  }
1073  *to++ = (unsigned char) chr;
1074  }
1075  row_end=to; // Found full line
1076  return 0;
1077 
1078 found_eof:
1079  found_end_of_line=eof=1;
1080  row_start=buffer;
1081  row_end=to;
1082  return to == buffer ? 1 : 0;
1083 }
1084 
1085 
1086 int READ_INFO::next_line()
1087 {
1088  line_cuted=0;
1089  start_of_line= line_start_ptr != 0;
1090  if (found_end_of_line || eof)
1091  {
1092  found_end_of_line=0;
1093  return eof;
1094  }
1095  found_end_of_line=0;
1096  if (!line_term_length)
1097  return 0; // No lines
1098  for (;;)
1099  {
1100  int chr = GET;
1101  if (my_mbcharlen(read_charset, chr) > 1)
1102  {
1103  for (uint32_t i=1;
1104  chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
1105  i++)
1106  chr = GET;
1107  if (chr == escape_char)
1108  continue;
1109  }
1110  if (chr == my_b_EOF)
1111  {
1112  eof=1;
1113  return 1;
1114  }
1115  if (chr == escape_char)
1116  {
1117  line_cuted=1;
1118  if (GET == my_b_EOF)
1119  return 1;
1120  continue;
1121  }
1122  if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
1123  return 0;
1124  line_cuted=1;
1125  }
1126 }
1127 
1128 
1129 bool READ_INFO::find_start_of_fields()
1130 {
1131  int chr;
1132  try_again:
1133  do
1134  {
1135  if ((chr=GET) == my_b_EOF)
1136  {
1137  found_end_of_line=eof=1;
1138  return 1;
1139  }
1140  } while ((char) chr != line_start_ptr[0]);
1141  for (char *ptr=line_start_ptr+1 ; ptr != line_start_end ; ptr++)
1142  {
1143  chr=GET; // Eof will be checked later
1144  if ((char) chr != *ptr)
1145  { // Can't be line_start
1146  PUSH(chr);
1147  while (--ptr != line_start_ptr)
1148  { // Restart with next char
1149  PUSH((unsigned char) *ptr);
1150  }
1151  goto try_again;
1152  }
1153  }
1154  return 0;
1155 }
1156 
1157 
1158 } /* namespace drizzled */