00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include <config.h>
00020
00021 #include <drizzled/sql_load.h>
00022 #include <drizzled/error.h>
00023 #include <drizzled/data_home.h>
00024 #include <drizzled/session.h>
00025 #include <drizzled/sql_base.h>
00026 #include <drizzled/field/epoch.h>
00027 #include <drizzled/internal/my_sys.h>
00028 #include <drizzled/internal/iocache.h>
00029 #include <drizzled/plugin/storage_engine.h>
00030 #include <drizzled/sql_lex.h>
00031
00032 #include <sys/stat.h>
00033 #include <fcntl.h>
00034 #include <algorithm>
00035 #include <climits>
00036 #include <boost/filesystem.hpp>
00037
00038 namespace fs=boost::filesystem;
00039 using namespace std;
00040 namespace drizzled
00041 {
00042
00043 class READ_INFO {
00044 int cursor;
00045 unsigned char *buffer;
00046 unsigned char *end_of_buff;
00047 size_t buff_length;
00048 size_t max_length;
00049 char *field_term_ptr,*line_term_ptr,*line_start_ptr,*line_start_end;
00050 uint field_term_length,line_term_length,enclosed_length;
00051 int field_term_char,line_term_char,enclosed_char,escape_char;
00052 int *stack,*stack_pos;
00053 bool found_end_of_line,start_of_line,eof;
00054 bool need_end_io_cache;
00055 internal::IO_CACHE cache;
00056
00057 public:
00058 bool error,line_cuted,found_null,enclosed;
00059 unsigned char *row_start,
00060 *row_end;
00061 const CHARSET_INFO *read_charset;
00062
00063 READ_INFO(int cursor, size_t tot_length, const CHARSET_INFO * const cs,
00064 String &field_term,String &line_start,String &line_term,
00065 String &enclosed,int escape, bool is_fifo);
00066 ~READ_INFO();
00067 int read_field();
00068 int read_fixed_length(void);
00069 int next_line(void);
00070 char unescape(char chr);
00071 int terminator(char *ptr,uint32_t length);
00072 bool find_start_of_fields();
00073
00074
00075
00076
00077
00078 void end_io_cache()
00079 {
00080 cache.end_io_cache();
00081 need_end_io_cache = 0;
00082 }
00083
00084
00085
00086
00087
00088
00089 void set_io_cache_arg(void* arg) { cache.arg = arg; }
00090 };
00091
00092 static int read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
00093 List<Item> &fields_vars, List<Item> &set_fields,
00094 List<Item> &set_values, READ_INFO &read_info,
00095 uint32_t skip_lines,
00096 bool ignore_check_option_errors);
00097 static int read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
00098 List<Item> &fields_vars, List<Item> &set_fields,
00099 List<Item> &set_values, READ_INFO &read_info,
00100 String &enclosed, uint32_t skip_lines,
00101 bool ignore_check_option_errors);
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124 int load(Session *session,file_exchange *ex,TableList *table_list,
00125 List<Item> &fields_vars, List<Item> &set_fields,
00126 List<Item> &set_values,
00127 enum enum_duplicates handle_duplicates, bool ignore)
00128 {
00129 int file;
00130 Table *table= NULL;
00131 int error;
00132 String *field_term=ex->field_term,*escaped=ex->escaped;
00133 String *enclosed=ex->enclosed;
00134 bool is_fifo=0;
00135
00136 assert(table_list->getSchemaName());
00137
00138
00139
00140
00141
00142
00143 util::string::const_shared_ptr schema(session->schema());
00144 const char *tdb= (schema and not schema->empty()) ? schema->c_str() : table_list->getSchemaName();
00145 assert(tdb);
00146 uint32_t skip_lines= ex->skip_lines;
00147 bool transactional_table;
00148 Session::killed_state_t killed_status= Session::NOT_KILLED;
00149
00150
00151 if (escaped->length() > 4 || enclosed->length() > 4)
00152 {
00153 my_error(ER_WRONG_FIELD_TERMINATORS,MYF(0),enclosed->c_ptr(), enclosed->length());
00154 return(true);
00155 }
00156
00157 if (session->openTablesLock(table_list))
00158 return(true);
00159
00160 if (setup_tables_and_check_access(session, &session->lex().select_lex.context,
00161 &session->lex().select_lex.top_join_list,
00162 table_list,
00163 &session->lex().select_lex.leaf_tables, true))
00164 return(-1);
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174 if (unique_table(table_list, table_list->next_global))
00175 {
00176 my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->getTableName());
00177 return(true);
00178 }
00179
00180 table= table_list->table;
00181 transactional_table= table->cursor->has_transactions();
00182
00183 if (!fields_vars.size())
00184 {
00185 Field **field;
00186 for (field= table->getFields(); *field ; field++)
00187 fields_vars.push_back(new Item_field(*field));
00188 table->setWriteSet();
00189 table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
00190
00191
00192
00193
00194 if (setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
00195 setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
00196 return(true);
00197 }
00198 else
00199 {
00200
00201 if (setup_fields(session, 0, fields_vars, MARK_COLUMNS_WRITE, 0, 0) ||
00202 setup_fields(session, 0, set_fields, MARK_COLUMNS_WRITE, 0, 0) ||
00203 check_that_all_fields_are_given_values(session, table, table_list))
00204 return(true);
00205
00206
00207
00208
00209 if (table->timestamp_field)
00210 {
00211 if (table->isWriteSet(table->timestamp_field->position()))
00212 {
00213 table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
00214 }
00215 else
00216 {
00217 table->setWriteSet(table->timestamp_field->position());
00218 }
00219 }
00220
00221 if (setup_fields(session, 0, set_values, MARK_COLUMNS_READ, 0, 0))
00222 return(true);
00223 }
00224
00225 table->mark_columns_needed_for_insert();
00226
00227 size_t tot_length=0;
00228 bool use_blobs= 0, use_vars= 0;
00229 List<Item>::iterator it(fields_vars.begin());
00230 Item *item;
00231
00232 while ((item= it++))
00233 {
00234 Item *real_item= item->real_item();
00235
00236 if (real_item->type() == Item::FIELD_ITEM)
00237 {
00238 Field *field= ((Item_field*)real_item)->field;
00239 if (field->flags & BLOB_FLAG)
00240 {
00241 use_blobs= 1;
00242 tot_length+= 256;
00243 }
00244 else
00245 tot_length+= field->field_length;
00246 }
00247 else if (item->type() == Item::STRING_ITEM)
00248 use_vars= 1;
00249 }
00250 if (use_blobs && !ex->line_term->length() && !field_term->length())
00251 {
00252 my_message(ER_BLOBS_AND_NO_TERMINATED,ER(ER_BLOBS_AND_NO_TERMINATED),
00253 MYF(0));
00254 return(true);
00255 }
00256 if (use_vars && !field_term->length() && !enclosed->length())
00257 {
00258 my_error(ER_LOAD_FROM_FIXED_SIZE_ROWS_TO_VAR, MYF(0));
00259 return(true);
00260 }
00261
00262 fs::path to_file(ex->file_name);
00263 fs::path target_path(fs::system_complete(getDataHomeCatalog()));
00264 if (not to_file.has_root_directory())
00265 {
00266 int count_elements= 0;
00267 for (fs::path::iterator iter= to_file.begin();
00268 iter != to_file.end();
00269 ++iter, ++count_elements)
00270 { }
00271
00272 if (count_elements == 1)
00273 {
00274 target_path /= tdb;
00275 }
00276 target_path /= to_file;
00277 }
00278 else
00279 {
00280 target_path= to_file;
00281 }
00282
00283 if (not secure_file_priv.string().empty())
00284 {
00285 if (target_path.file_string().substr(0, secure_file_priv.file_string().size()) != secure_file_priv.file_string())
00286 {
00287
00288 my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--secure-file-priv");
00289 return(true);
00290 }
00291 }
00292
00293 struct stat stat_info;
00294 if (stat(target_path.file_string().c_str(), &stat_info))
00295 {
00296 my_error(ER_FILE_NOT_FOUND, MYF(0), target_path.file_string().c_str(), errno);
00297 return(true);
00298 }
00299
00300
00301 if (!((stat_info.st_mode & S_IROTH) == S_IROTH &&
00302 (stat_info.st_mode & S_IFLNK) != S_IFLNK &&
00303 ((stat_info.st_mode & S_IFREG) == S_IFREG ||
00304 (stat_info.st_mode & S_IFIFO) == S_IFIFO)))
00305 {
00306 my_error(ER_TEXTFILE_NOT_READABLE, MYF(0), target_path.file_string().c_str());
00307 return(true);
00308 }
00309 if ((stat_info.st_mode & S_IFIFO) == S_IFIFO)
00310 is_fifo = 1;
00311
00312
00313 if ((file=internal::my_open(target_path.file_string().c_str(), O_RDONLY,MYF(MY_WME))) < 0)
00314 {
00315 my_error(ER_CANT_OPEN_FILE, MYF(0), target_path.file_string().c_str(), errno);
00316 return(true);
00317 }
00318 CopyInfo info;
00319 memset(&info, 0, sizeof(info));
00320 info.ignore= ignore;
00321 info.handle_duplicates=handle_duplicates;
00322 info.escape_char=escaped->length() ? (*escaped)[0] : INT_MAX;
00323
00324 identifier::Schema identifier(*schema);
00325 READ_INFO read_info(file, tot_length,
00326 ex->cs ? ex->cs : plugin::StorageEngine::getSchemaCollation(identifier),
00327 *field_term, *ex->line_start, *ex->line_term, *enclosed,
00328 info.escape_char, is_fifo);
00329 if (read_info.error)
00330 {
00331 if (file >= 0)
00332 internal::my_close(file,MYF(0));
00333 return(true);
00334 }
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344 session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
00345 session->cuted_fields=0L;
00346
00347 if (ex->line_term->length())
00348 {
00349
00350 while (skip_lines > 0)
00351 {
00352 skip_lines--;
00353 if (read_info.next_line())
00354 break;
00355 }
00356 }
00357
00358 if (!(error=test(read_info.error)))
00359 {
00360
00361 table->next_number_field=table->found_next_number_field;
00362 if (ignore ||
00363 handle_duplicates == DUP_REPLACE)
00364 table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
00365 if (handle_duplicates == DUP_REPLACE)
00366 table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
00367 table->cursor->ha_start_bulk_insert((ha_rows) 0);
00368 table->copy_blobs=1;
00369
00370 session->setAbortOnWarning(true);
00371
00372 if (!field_term->length() && !enclosed->length())
00373 error= read_fixed_length(session, info, table_list, fields_vars,
00374 set_fields, set_values, read_info,
00375 skip_lines, ignore);
00376 else
00377 error= read_sep_field(session, info, table_list, fields_vars,
00378 set_fields, set_values, read_info,
00379 *enclosed, skip_lines, ignore);
00380 if (table->cursor->ha_end_bulk_insert() && !error)
00381 {
00382 table->print_error(errno, MYF(0));
00383 error= 1;
00384 }
00385 table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
00386 table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
00387 table->next_number_field=0;
00388 }
00389 if (file >= 0)
00390 internal::my_close(file,MYF(0));
00391 free_blobs(table);
00392 table->copy_blobs=0;
00393 session->count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
00394
00395
00396
00397
00398 killed_status= (error == 0)? Session::NOT_KILLED : session->getKilled();
00399 if (error)
00400 {
00401 error= -1;
00402 goto err;
00403 }
00404
00405 char msg[FN_REFLEN];
00406 snprintf(msg, sizeof(msg), ER(ER_LOAD_INFO), info.records, info.deleted,
00407 (info.records - info.copied), session->cuted_fields);
00408
00409 if (session->transaction.stmt.hasModifiedNonTransData())
00410 session->transaction.all.markModifiedNonTransData();
00411
00412
00413 session->my_ok(info.copied + info.deleted, 0, 0L, msg);
00414 err:
00415 assert(transactional_table || !(info.copied || info.deleted) ||
00416 session->transaction.stmt.hasModifiedNonTransData());
00417 table->cursor->ha_release_auto_increment();
00418 table->auto_increment_field_not_null= false;
00419 session->setAbortOnWarning(false);
00420
00421 return(error);
00422 }
00423
00424
00425
00426
00427
00428
00429 static int
00430 read_fixed_length(Session *session, CopyInfo &info, TableList *table_list,
00431 List<Item> &fields_vars, List<Item> &set_fields,
00432 List<Item> &set_values, READ_INFO &read_info,
00433 uint32_t skip_lines, bool ignore_check_option_errors)
00434 {
00435 List<Item>::iterator it(fields_vars.begin());
00436 Item_field *sql_field;
00437 Table *table= table_list->table;
00438 uint64_t id;
00439 bool err;
00440
00441 id= 0;
00442
00443 while (!read_info.read_fixed_length())
00444 {
00445 if (session->getKilled())
00446 {
00447 session->send_kill_message();
00448 return(1);
00449 }
00450 if (skip_lines)
00451 {
00452
00453
00454
00455
00456
00457
00458 skip_lines--;
00459 continue;
00460 }
00461 it= fields_vars.begin();
00462 unsigned char *pos=read_info.row_start;
00463 #ifdef HAVE_VALGRIND
00464 read_info.row_end[0]=0;
00465 #endif
00466
00467 table->restoreRecordAsDefault();
00468
00469
00470
00471
00472 while ((sql_field= (Item_field*) it++))
00473 {
00474 Field *field= sql_field->field;
00475 if (field == table->next_number_field)
00476 table->auto_increment_field_not_null= true;
00477
00478
00479
00480
00481
00482 field->set_notnull();
00483
00484 if (pos == read_info.row_end)
00485 {
00486 session->cuted_fields++;
00487 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00488 ER_WARN_TOO_FEW_RECORDS,
00489 ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
00490
00491 if (not field->maybe_null() and field->is_timestamp())
00492 ((field::Epoch::pointer) field)->set_time();
00493 }
00494 else
00495 {
00496 uint32_t length;
00497 unsigned char save_chr;
00498 if ((length=(uint32_t) (read_info.row_end-pos)) >
00499 field->field_length)
00500 {
00501 length=field->field_length;
00502 }
00503 save_chr=pos[length];
00504 pos[length]='\0';
00505 field->store((char*) pos,length,read_info.read_charset);
00506 pos[length]=save_chr;
00507 if ((pos+=length) > read_info.row_end)
00508 pos= read_info.row_end;
00509 }
00510 }
00511 if (pos != read_info.row_end)
00512 {
00513 session->cuted_fields++;
00514 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00515 ER_WARN_TOO_MANY_RECORDS,
00516 ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
00517 }
00518
00519 if (session->getKilled() ||
00520 fill_record(session, set_fields, set_values,
00521 ignore_check_option_errors))
00522 return(1);
00523
00524 err= write_record(session, table, &info);
00525 table->auto_increment_field_not_null= false;
00526 if (err)
00527 return(1);
00528
00529
00530
00531
00532
00533 if (read_info.next_line())
00534 break;
00535 if (read_info.line_cuted)
00536 {
00537 session->cuted_fields++;
00538 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00539 ER_WARN_TOO_MANY_RECORDS,
00540 ER(ER_WARN_TOO_MANY_RECORDS), session->row_count);
00541 }
00542 session->row_count++;
00543 }
00544 return(test(read_info.error));
00545 }
00546
00547
00548
00549 static int
00550 read_sep_field(Session *session, CopyInfo &info, TableList *table_list,
00551 List<Item> &fields_vars, List<Item> &set_fields,
00552 List<Item> &set_values, READ_INFO &read_info,
00553 String &enclosed, uint32_t skip_lines,
00554 bool ignore_check_option_errors)
00555 {
00556 List<Item>::iterator it(fields_vars.begin());
00557 Item *item;
00558 Table *table= table_list->table;
00559 uint32_t enclosed_length;
00560 uint64_t id;
00561 bool err;
00562
00563 enclosed_length=enclosed.length();
00564 id= 0;
00565
00566 for (;;it= fields_vars.begin())
00567 {
00568 if (session->getKilled())
00569 {
00570 session->send_kill_message();
00571 return(1);
00572 }
00573
00574 table->restoreRecordAsDefault();
00575
00576 while ((item= it++))
00577 {
00578 uint32_t length;
00579 unsigned char *pos;
00580 Item *real_item;
00581
00582 if (read_info.read_field())
00583 break;
00584
00585
00586 if (skip_lines)
00587 continue;
00588
00589 pos=read_info.row_start;
00590 length=(uint32_t) (read_info.row_end-pos);
00591
00592 real_item= item->real_item();
00593
00594 if ((!read_info.enclosed && (enclosed_length && length == 4 && !memcmp(pos, STRING_WITH_LEN("NULL")))) ||
00595 (length == 1 && read_info.found_null))
00596 {
00597
00598 if (real_item->type() == Item::FIELD_ITEM)
00599 {
00600 Field *field= ((Item_field *)real_item)->field;
00601 if (field->reset())
00602 {
00603 my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0), field->field_name,
00604 session->row_count);
00605 return(1);
00606 }
00607 field->set_null();
00608 if (not field->maybe_null())
00609 {
00610 if (field->is_timestamp())
00611 {
00612 ((field::Epoch::pointer) field)->set_time();
00613 }
00614 else if (field != table->next_number_field)
00615 {
00616 field->set_warning(DRIZZLE_ERROR::WARN_LEVEL_WARN, ER_WARN_NULL_TO_NOTNULL, 1);
00617 }
00618 }
00619 }
00620 else if (item->type() == Item::STRING_ITEM)
00621 {
00622 ((Item_user_var_as_out_param *)item)->set_null_value(
00623 read_info.read_charset);
00624 }
00625 else
00626 {
00627 my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
00628 return(1);
00629 }
00630
00631 continue;
00632 }
00633
00634 if (real_item->type() == Item::FIELD_ITEM)
00635 {
00636 Field *field= ((Item_field *)real_item)->field;
00637 field->set_notnull();
00638 read_info.row_end[0]=0;
00639 if (field == table->next_number_field)
00640 table->auto_increment_field_not_null= true;
00641 field->store((char*) pos, length, read_info.read_charset);
00642 }
00643 else if (item->type() == Item::STRING_ITEM)
00644 {
00645 ((Item_user_var_as_out_param *)item)->set_value((char*) pos, length,
00646 read_info.read_charset);
00647 }
00648 else
00649 {
00650 my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
00651 return(1);
00652 }
00653 }
00654 if (read_info.error)
00655 break;
00656 if (skip_lines)
00657 {
00658 skip_lines--;
00659 continue;
00660 }
00661 if (item)
00662 {
00663
00664 if (item == &fields_vars.front())
00665 break;
00666 for (; item ; item= it++)
00667 {
00668 Item *real_item= item->real_item();
00669 if (real_item->type() == Item::FIELD_ITEM)
00670 {
00671 Field *field= ((Item_field *)real_item)->field;
00672 if (field->reset())
00673 {
00674 my_error(ER_WARN_NULL_TO_NOTNULL, MYF(0),field->field_name,
00675 session->row_count);
00676 return(1);
00677 }
00678 if (not field->maybe_null() and field->is_timestamp())
00679 ((field::Epoch::pointer) field)->set_time();
00680
00681
00682
00683
00684
00685
00686 session->cuted_fields++;
00687 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00688 ER_WARN_TOO_FEW_RECORDS,
00689 ER(ER_WARN_TOO_FEW_RECORDS), session->row_count);
00690 }
00691 else if (item->type() == Item::STRING_ITEM)
00692 {
00693 ((Item_user_var_as_out_param *)item)->set_null_value(
00694 read_info.read_charset);
00695 }
00696 else
00697 {
00698 my_error(ER_LOAD_DATA_INVALID_COLUMN, MYF(0), item->full_name());
00699 return(1);
00700 }
00701 }
00702 }
00703
00704 if (session->getKilled() ||
00705 fill_record(session, set_fields, set_values,
00706 ignore_check_option_errors))
00707 return(1);
00708
00709 err= write_record(session, table, &info);
00710 table->auto_increment_field_not_null= false;
00711 if (err)
00712 return(1);
00713
00714
00715
00716
00717 if (read_info.next_line())
00718 break;
00719 if (read_info.line_cuted)
00720 {
00721 session->cuted_fields++;
00722 push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_WARN,
00723 ER_WARN_TOO_MANY_RECORDS, ER(ER_WARN_TOO_MANY_RECORDS),
00724 session->row_count);
00725 if (session->getKilled())
00726 return(1);
00727 }
00728 session->row_count++;
00729 }
00730 return(test(read_info.error));
00731 }
00732
00733
00734
00735
00736 char
00737 READ_INFO::unescape(char chr)
00738 {
00739
00740 switch(chr) {
00741 case 'n': return '\n';
00742 case 't': return '\t';
00743 case 'r': return '\r';
00744 case 'b': return '\b';
00745 case '0': return 0;
00746 case 'Z': return '\032';
00747 case 'N': found_null=1;
00748
00749
00750 default: return chr;
00751 }
00752 }
00753
00754
00755
00756
00757
00758
00759
00760
00761 READ_INFO::READ_INFO(int file_par, size_t tot_length,
00762 const CHARSET_INFO * const cs,
00763 String &field_term, String &line_start, String &line_term,
00764 String &enclosed_par, int escape, bool is_fifo)
00765 :cursor(file_par),escape_char(escape)
00766 {
00767 read_charset= cs;
00768 field_term_ptr=(char*) field_term.ptr();
00769 field_term_length= field_term.length();
00770 line_term_ptr=(char*) line_term.ptr();
00771 line_term_length= line_term.length();
00772 if (line_start.length() == 0)
00773 {
00774 line_start_ptr=0;
00775 start_of_line= 0;
00776 }
00777 else
00778 {
00779 line_start_ptr=(char*) line_start.ptr();
00780 line_start_end=line_start_ptr+line_start.length();
00781 start_of_line= 1;
00782 }
00783
00784 if (field_term_length == line_term_length &&
00785 !memcmp(field_term_ptr,line_term_ptr,field_term_length))
00786 {
00787 line_term_length=0;
00788 line_term_ptr=(char*) "";
00789 }
00790 enclosed_char= (enclosed_length=enclosed_par.length()) ?
00791 (unsigned char) enclosed_par[0] : INT_MAX;
00792 field_term_char= field_term_length ? (unsigned char) field_term_ptr[0] : INT_MAX;
00793 line_term_char= line_term_length ? (unsigned char) line_term_ptr[0] : INT_MAX;
00794 error=eof=found_end_of_line=found_null=line_cuted=0;
00795 buff_length=tot_length;
00796
00797
00798
00799 size_t length= max(field_term_length,line_term_length)+1;
00800 set_if_bigger(length, line_start.length());
00801 stack= stack_pos= (int*) memory::sql_alloc(sizeof(int)*length);
00802
00803 if (!(buffer=(unsigned char*) calloc(1, buff_length+1)))
00804 error=1;
00805 else
00806 {
00807 end_of_buff=buffer+buff_length;
00808 if (cache.init_io_cache((false) ? -1 : cursor, 0,
00809 (false) ? internal::READ_NET :
00810 (is_fifo ? internal::READ_FIFO : internal::READ_CACHE),0L,1,
00811 MYF(MY_WME)))
00812 {
00813 free((unsigned char*) buffer);
00814 error=1;
00815 }
00816 else
00817 {
00818
00819
00820
00821
00822
00823 need_end_io_cache = 1;
00824 }
00825 }
00826 }
00827
00828
00829 READ_INFO::~READ_INFO()
00830 {
00831 if (!error)
00832 {
00833 if (need_end_io_cache)
00834 cache.end_io_cache();
00835 free(buffer);
00836 error=1;
00837 }
00838 }
00839
00840
00841 #define GET (stack_pos != stack ? *--stack_pos : my_b_get(&cache))
00842 #define PUSH(A) *(stack_pos++)=(A)
00843
00844
00845 inline int READ_INFO::terminator(char *ptr,uint32_t length)
00846 {
00847 int chr=0;
00848 uint32_t i;
00849 for (i=1 ; i < length ; i++)
00850 {
00851 if ((chr=GET) != *++ptr)
00852 {
00853 break;
00854 }
00855 }
00856 if (i == length)
00857 return 1;
00858 PUSH(chr);
00859 while (i-- > 1)
00860 PUSH((unsigned char) *--ptr);
00861 return 0;
00862 }
00863
00864
00865 int READ_INFO::read_field()
00866 {
00867 int chr,found_enclosed_char;
00868 unsigned char *to,*new_buffer;
00869
00870 found_null=0;
00871 if (found_end_of_line)
00872 return 1;
00873
00874
00875
00876 if (start_of_line)
00877 {
00878 start_of_line=0;
00879 if (find_start_of_fields())
00880 return 1;
00881 }
00882 if ((chr=GET) == my_b_EOF)
00883 {
00884 found_end_of_line=eof=1;
00885 return 1;
00886 }
00887 to=buffer;
00888 if (chr == enclosed_char)
00889 {
00890 found_enclosed_char=enclosed_char;
00891 *to++=(unsigned char) chr;
00892 }
00893 else
00894 {
00895 found_enclosed_char= INT_MAX;
00896 PUSH(chr);
00897 }
00898
00899 for (;;)
00900 {
00901 while ( to < end_of_buff)
00902 {
00903 chr = GET;
00904 if ((my_mbcharlen(read_charset, chr) > 1) &&
00905 to+my_mbcharlen(read_charset, chr) <= end_of_buff)
00906 {
00907 unsigned char* p = (unsigned char*)to;
00908 *to++ = chr;
00909 int ml = my_mbcharlen(read_charset, chr);
00910 int i;
00911 for (i=1; i<ml; i++) {
00912 chr = GET;
00913 if (chr == my_b_EOF)
00914 goto found_eof;
00915 *to++ = chr;
00916 }
00917 if (my_ismbchar(read_charset,
00918 (const char *)p,
00919 (const char *)to))
00920 continue;
00921 for (i=0; i<ml; i++)
00922 PUSH((unsigned char) *--to);
00923 chr = GET;
00924 }
00925 if (chr == my_b_EOF)
00926 goto found_eof;
00927 if (chr == escape_char)
00928 {
00929 if ((chr=GET) == my_b_EOF)
00930 {
00931 *to++= (unsigned char) escape_char;
00932 goto found_eof;
00933 }
00934
00935
00936
00937
00938
00939
00940
00941 if (escape_char != enclosed_char || chr == escape_char)
00942 {
00943 *to++ = (unsigned char) unescape((char) chr);
00944 continue;
00945 }
00946 PUSH(chr);
00947 chr= escape_char;
00948 }
00949 #ifdef ALLOW_LINESEPARATOR_IN_STRINGS
00950 if (chr == line_term_char)
00951 #else
00952 if (chr == line_term_char && found_enclosed_char == INT_MAX)
00953 #endif
00954 {
00955 if (terminator(line_term_ptr,line_term_length))
00956 {
00957 enclosed=0;
00958 found_end_of_line=1;
00959 row_start=buffer;
00960 row_end= to;
00961 return 0;
00962 }
00963 }
00964 if (chr == found_enclosed_char)
00965 {
00966 if ((chr=GET) == found_enclosed_char)
00967 {
00968 *to++ = (unsigned char) chr;
00969 continue;
00970 }
00971
00972 if (chr == my_b_EOF ||
00973 (chr == line_term_char && terminator(line_term_ptr, line_term_length)))
00974 {
00975 enclosed=1;
00976 found_end_of_line=1;
00977 row_start=buffer+1;
00978 row_end= to;
00979 return 0;
00980 }
00981 if (chr == field_term_char &&
00982 terminator(field_term_ptr,field_term_length))
00983 {
00984 enclosed=1;
00985 row_start=buffer+1;
00986 row_end= to;
00987 return 0;
00988 }
00989
00990
00991
00992
00993 PUSH(chr);
00994
00995 chr= found_enclosed_char;
00996 }
00997 else if (chr == field_term_char && found_enclosed_char == INT_MAX)
00998 {
00999 if (terminator(field_term_ptr,field_term_length))
01000 {
01001 enclosed=0;
01002 row_start=buffer;
01003 row_end= to;
01004 return 0;
01005 }
01006 }
01007 *to++ = (unsigned char) chr;
01008 }
01009
01010
01011
01012 if (!(new_buffer=(unsigned char*) realloc(buffer, buff_length+1+IO_SIZE)))
01013 return (error=1);
01014 to=new_buffer + (to-buffer);
01015 buffer=new_buffer;
01016 buff_length+=IO_SIZE;
01017 end_of_buff=buffer+buff_length;
01018 }
01019
01020 found_eof:
01021 enclosed=0;
01022 found_end_of_line=eof=1;
01023 row_start=buffer;
01024 row_end=to;
01025 return 0;
01026 }
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040
01041
01042
01043 int READ_INFO::read_fixed_length()
01044 {
01045 int chr;
01046 unsigned char *to;
01047 if (found_end_of_line)
01048 return 1;
01049
01050 if (start_of_line)
01051 {
01052 start_of_line=0;
01053 if (find_start_of_fields())
01054 return 1;
01055 }
01056
01057 to=row_start=buffer;
01058 while (to < end_of_buff)
01059 {
01060 if ((chr=GET) == my_b_EOF)
01061 goto found_eof;
01062 if (chr == escape_char)
01063 {
01064 if ((chr=GET) == my_b_EOF)
01065 {
01066 *to++= (unsigned char) escape_char;
01067 goto found_eof;
01068 }
01069 *to++ =(unsigned char) unescape((char) chr);
01070 continue;
01071 }
01072 if (chr == line_term_char)
01073 {
01074 if (terminator(line_term_ptr,line_term_length))
01075 {
01076 found_end_of_line=1;
01077 row_end= to;
01078 return 0;
01079 }
01080 }
01081 *to++ = (unsigned char) chr;
01082 }
01083 row_end=to;
01084 return 0;
01085
01086 found_eof:
01087 found_end_of_line=eof=1;
01088 row_start=buffer;
01089 row_end=to;
01090 return to == buffer ? 1 : 0;
01091 }
01092
01093
01094 int READ_INFO::next_line()
01095 {
01096 line_cuted=0;
01097 start_of_line= line_start_ptr != 0;
01098 if (found_end_of_line || eof)
01099 {
01100 found_end_of_line=0;
01101 return eof;
01102 }
01103 found_end_of_line=0;
01104 if (!line_term_length)
01105 return 0;
01106 for (;;)
01107 {
01108 int chr = GET;
01109 if (my_mbcharlen(read_charset, chr) > 1)
01110 {
01111 for (uint32_t i=1;
01112 chr != my_b_EOF && i<my_mbcharlen(read_charset, chr);
01113 i++)
01114 chr = GET;
01115 if (chr == escape_char)
01116 continue;
01117 }
01118 if (chr == my_b_EOF)
01119 {
01120 eof=1;
01121 return 1;
01122 }
01123 if (chr == escape_char)
01124 {
01125 line_cuted=1;
01126 if (GET == my_b_EOF)
01127 return 1;
01128 continue;
01129 }
01130 if (chr == line_term_char && terminator(line_term_ptr,line_term_length))
01131 return 0;
01132 line_cuted=1;
01133 }
01134 }
01135
01136
01137 bool READ_INFO::find_start_of_fields()
01138 {
01139 int chr;
01140 try_again:
01141 do
01142 {
01143 if ((chr=GET) == my_b_EOF)
01144 {
01145 found_end_of_line=eof=1;
01146 return 1;
01147 }
01148 } while ((char) chr != line_start_ptr[0]);
01149 for (char *ptr=line_start_ptr+1 ; ptr != line_start_end ; ptr++)
01150 {
01151 chr=GET;
01152 if ((char) chr != *ptr)
01153 {
01154 PUSH(chr);
01155 while (--ptr != line_start_ptr)
01156 {
01157 PUSH((unsigned char) *ptr);
01158 }
01159 goto try_again;
01160 }
01161 }
01162 return 0;
01163 }
01164
01165
01166 }