Drizzled Public API Documentation

base.cc

00001 /* -*- mode: c++; c-basic-offset: 2; i/dent-tabs-mode: nil; -*-
00002  *  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
00003  *
00004  *  Copyright (C) 2010 Brian Aker
00005  *  Copyright (C) 2009 Sun Microsystems, Inc.
00006  *
00007  *  This program is free software; you can redistribute it and/or modify
00008  *  it under the terms of the GNU General Public License as published by
00009  *  the Free Software Foundation; either version 2 of the License, or
00010  *  (at your option) any later version.
00011  *
00012  *  This program is distributed in the hope that it will be useful,
00013  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  *  GNU General Public License for more details.
00016  *
00017  *  You should have received a copy of the GNU General Public License
00018  *  along with this program; if not, write to the Free Software
00019  *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
00020  */
00021 
00022 /*
00023   This class is shared between different table objects. There is one
00024   instance of table share per one table in the database.
00025 */
00026 
00027 /* Basic functions needed by many modules */
00028 #include <config.h>
00029 
00030 #include <pthread.h>
00031 #include <float.h>
00032 
00033 #include <sys/types.h>
00034 #include <sys/stat.h>
00035 #include <fcntl.h>
00036 
00037 
00038 #include <cassert>
00039 
00040 #include <drizzled/error.h>
00041 #include <drizzled/gettext.h>
00042 #include <drizzled/sql_base.h>
00043 #include <drizzled/pthread_globals.h>
00044 #include <drizzled/internal/my_pthread.h>
00045 
00046 #include <drizzled/table.h>
00047 #include <drizzled/table/shell.h>
00048 
00049 #include <drizzled/session.h>
00050 
00051 #include <drizzled/charset.h>
00052 #include <drizzled/internal/m_string.h>
00053 #include <drizzled/internal/my_sys.h>
00054 
00055 #include <drizzled/item/string.h>
00056 #include <drizzled/item/int.h>
00057 #include <drizzled/item/decimal.h>
00058 #include <drizzled/item/float.h>
00059 #include <drizzled/item/null.h>
00060 #include <drizzled/temporal.h>
00061 
00062 #include <drizzled/field.h>
00063 #include <drizzled/field/str.h>
00064 #include <drizzled/field/num.h>
00065 #include <drizzled/field/blob.h>
00066 #include <drizzled/field/boolean.h>
00067 #include <drizzled/field/enum.h>
00068 #include <drizzled/field/null.h>
00069 #include <drizzled/field/date.h>
00070 #include <drizzled/field/decimal.h>
00071 #include <drizzled/field/real.h>
00072 #include <drizzled/field/double.h>
00073 #include <drizzled/field/int32.h>
00074 #include <drizzled/field/int64.h>
00075 #include <drizzled/field/size.h>
00076 #include <drizzled/field/num.h>
00077 #include <drizzled/field/time.h>
00078 #include <drizzled/field/epoch.h>
00079 #include <drizzled/field/datetime.h>
00080 #include <drizzled/field/microtime.h>
00081 #include <drizzled/field/varstring.h>
00082 #include <drizzled/field/uuid.h>
00083 #include <drizzled/plugin/storage_engine.h>
00084 #include <drizzled/definition/cache.h>
00085 #include <drizzled/typelib.h>
00086 #include <drizzled/refresh_version.h>
00087 #include <drizzled/key.h>
00088 
00089 using namespace std;
00090 
00091 namespace drizzled
00092 {
00093 
00094 extern size_t table_def_size;
00095 
00096 
00097 static enum_field_types proto_field_type_to_drizzle_type(const message::Table::Field &field)
00098 {
00099   switch(field.type())
00100   {
00101   case message::Table::Field::INTEGER:
00102     return DRIZZLE_TYPE_LONG;
00103 
00104   case message::Table::Field::DOUBLE:
00105     return DRIZZLE_TYPE_DOUBLE;
00106 
00107   case message::Table::Field::EPOCH:
00108     if (field.has_time_options() and field.time_options().microseconds())
00109       return DRIZZLE_TYPE_MICROTIME;
00110 
00111     return DRIZZLE_TYPE_TIMESTAMP;
00112 
00113   case message::Table::Field::BIGINT:
00114     return DRIZZLE_TYPE_LONGLONG;
00115 
00116   case message::Table::Field::DATETIME:
00117     return DRIZZLE_TYPE_DATETIME;
00118 
00119   case message::Table::Field::DATE:
00120     return DRIZZLE_TYPE_DATE;
00121 
00122   case message::Table::Field::VARCHAR:
00123     return DRIZZLE_TYPE_VARCHAR;
00124 
00125   case message::Table::Field::DECIMAL:
00126     return DRIZZLE_TYPE_DECIMAL;
00127 
00128   case message::Table::Field::ENUM:
00129     return DRIZZLE_TYPE_ENUM;
00130 
00131   case message::Table::Field::BLOB:
00132     return DRIZZLE_TYPE_BLOB;
00133 
00134   case message::Table::Field::UUID:
00135     return  DRIZZLE_TYPE_UUID;
00136 
00137   case message::Table::Field::BOOLEAN:
00138     return DRIZZLE_TYPE_BOOLEAN;
00139 
00140   case message::Table::Field::TIME:
00141     return DRIZZLE_TYPE_TIME;
00142   }
00143 
00144   abort();
00145 }
00146 
00147 static Item *default_value_item(enum_field_types field_type,
00148                                 const CHARSET_INFO *charset,
00149                                 bool default_null, const string *default_value,
00150                                 const string *default_bin_value)
00151 {
00152   Item *default_item= NULL;
00153   int error= 0;
00154 
00155   if (default_null)
00156   {
00157     return new Item_null();
00158   }
00159 
00160   switch(field_type)
00161   {
00162   case DRIZZLE_TYPE_LONG:
00163   case DRIZZLE_TYPE_LONGLONG:
00164     default_item= new Item_int(default_value->c_str(),
00165                                (int64_t) internal::my_strtoll10(default_value->c_str(),
00166                                                                 NULL,
00167                                                                 &error),
00168                                default_value->length());
00169     break;
00170   case DRIZZLE_TYPE_DOUBLE:
00171     default_item= new Item_float(default_value->c_str(),
00172                                  default_value->length());
00173     break;
00174   case DRIZZLE_TYPE_NULL:
00175     assert(0);
00176     abort();
00177   case DRIZZLE_TYPE_TIMESTAMP:
00178   case DRIZZLE_TYPE_DATETIME:
00179   case DRIZZLE_TYPE_TIME:
00180   case DRIZZLE_TYPE_DATE:
00181   case DRIZZLE_TYPE_ENUM:
00182   case DRIZZLE_TYPE_UUID:
00183   case DRIZZLE_TYPE_MICROTIME:
00184   case DRIZZLE_TYPE_BOOLEAN:
00185     default_item= new Item_string(default_value->c_str(),
00186                                   default_value->length(),
00187                                   system_charset_info);
00188     break;
00189   case DRIZZLE_TYPE_VARCHAR:
00190   case DRIZZLE_TYPE_BLOB: /* Blob is here due to TINYTEXT. Feel the hate. */
00191     if (charset==&my_charset_bin)
00192     {
00193       default_item= new Item_string(default_bin_value->c_str(),
00194                                     default_bin_value->length(),
00195                                     &my_charset_bin);
00196     }
00197     else
00198     {
00199       default_item= new Item_string(default_value->c_str(),
00200                                     default_value->length(),
00201                                     system_charset_info);
00202     }
00203     break;
00204   case DRIZZLE_TYPE_DECIMAL:
00205     default_item= new Item_decimal(default_value->c_str(),
00206                                    default_value->length(),
00207                                    system_charset_info);
00208     break;
00209   }
00210 
00211   return default_item;
00212 }
00213 
00214 
00215 
00221 bool TableShare::fieldInPrimaryKey(Field *in_field) const
00222 {
00223   assert(getTableMessage());
00224 
00225   size_t num_indexes= getTableMessage()->indexes_size();
00226 
00227   for (size_t x= 0; x < num_indexes; ++x)
00228   {
00229     const message::Table::Index &index= getTableMessage()->indexes(x);
00230     if (index.is_primary())
00231     {
00232       size_t num_parts= index.index_part_size();
00233       for (size_t y= 0; y < num_parts; ++y)
00234       {
00235         if (index.index_part(y).fieldnr() == in_field->position())
00236           return true;
00237       }
00238     }
00239   }
00240   return false;
00241 }
00242 
00243 TableShare::TableShare(const identifier::Table::Type type_arg) :
00244   table_category(TABLE_UNKNOWN_CATEGORY),
00245   found_next_number_field(NULL),
00246   timestamp_field(NULL),
00247   key_info(NULL),
00248   mem_root(TABLE_ALLOC_BLOCK_SIZE),
00249   all_set(),
00250   db(NULL_LEX_STRING),
00251   table_name(NULL_LEX_STRING),
00252   path(NULL_LEX_STRING),
00253   normalized_path(NULL_LEX_STRING),
00254   block_size(0),
00255   version(0),
00256   timestamp_offset(0),
00257   reclength(0),
00258   stored_rec_length(0),
00259   max_rows(0),
00260   _table_message(NULL),
00261   storage_engine(NULL),
00262   tmp_table(type_arg),
00263   _ref_count(0),
00264   null_bytes(0),
00265   last_null_bit_pos(0),
00266   _field_size(0),
00267   rec_buff_length(0),
00268   keys(0),
00269   key_parts(0),
00270   max_key_length(0),
00271   max_unique_length(0),
00272   total_key_length(0),
00273   uniques(0),
00274   null_fields(0),
00275   blob_fields(0),
00276   has_variable_width(false),
00277   db_create_options(0),
00278   db_options_in_use(0),
00279   db_record_offset(0),
00280   rowid_field_offset(0),
00281   primary_key(MAX_KEY),
00282   next_number_index(0),
00283   next_number_key_offset(0),
00284   next_number_keypart(0),
00285   error(0),
00286   open_errno(0),
00287   errarg(0),
00288   blob_ptr_size(portable_sizeof_char_ptr),
00289   db_low_byte_first(false),
00290   keys_in_use(0),
00291   keys_for_keyread(0)
00292 {
00293   if (type_arg == message::Table::INTERNAL)
00294   {
00295     identifier::Table::build_tmptable_filename(private_key_for_cache.vectorPtr());
00296     init(private_key_for_cache.vector(), private_key_for_cache.vector());
00297   }
00298   else
00299   {
00300     init("", "");
00301   }
00302 }
00303 
00304 TableShare::TableShare(const identifier::Table &identifier, const identifier::Table::Key &key) :// Used by placeholder
00305   table_category(TABLE_UNKNOWN_CATEGORY),
00306   found_next_number_field(NULL),
00307   timestamp_field(NULL),
00308   key_info(NULL),
00309   mem_root(TABLE_ALLOC_BLOCK_SIZE),
00310   table_charset(0),
00311   all_set(),
00312   db(NULL_LEX_STRING),
00313   table_name(NULL_LEX_STRING),
00314   path(NULL_LEX_STRING),
00315   normalized_path(NULL_LEX_STRING),
00316   block_size(0),
00317   version(0),
00318   timestamp_offset(0),
00319   reclength(0),
00320   stored_rec_length(0),
00321   max_rows(0),
00322   _table_message(NULL),
00323   storage_engine(NULL),
00324   tmp_table(message::Table::INTERNAL),
00325   _ref_count(0),
00326   null_bytes(0),
00327   last_null_bit_pos(0),
00328   _field_size(0),
00329   rec_buff_length(0),
00330   keys(0),
00331   key_parts(0),
00332   max_key_length(0),
00333   max_unique_length(0),
00334   total_key_length(0),
00335   uniques(0),
00336   null_fields(0),
00337   blob_fields(0),
00338   has_variable_width(false),
00339   db_create_options(0),
00340   db_options_in_use(0),
00341   db_record_offset(0),
00342   rowid_field_offset(0),
00343   primary_key(MAX_KEY),
00344   next_number_index(0),
00345   next_number_key_offset(0),
00346   next_number_keypart(0),
00347   error(0),
00348   open_errno(0),
00349   errarg(0),
00350   blob_ptr_size(portable_sizeof_char_ptr),
00351   db_low_byte_first(false),
00352   keys_in_use(0),
00353   keys_for_keyread(0)
00354 {
00355   assert(identifier.getKey() == key);
00356 
00357   private_key_for_cache= key;
00358 
00359   table_category=         TABLE_CATEGORY_TEMPORARY;
00360   tmp_table=              message::Table::INTERNAL;
00361 
00362   db.str= const_cast<char *>(private_key_for_cache.vector());
00363   db.length= strlen(private_key_for_cache.vector());
00364 
00365   table_name.str= const_cast<char *>(private_key_for_cache.vector()) + strlen(private_key_for_cache.vector()) + 1;
00366   table_name.length= strlen(table_name.str);
00367   path.str= (char *)"";
00368   normalized_path.str= path.str;
00369   path.length= normalized_path.length= 0;
00370 
00371   std::string tb_name(identifier.getTableName());
00372   std::transform(tb_name.begin(), tb_name.end(), tb_name.begin(), ::tolower);
00373   assert(strcmp(tb_name.c_str(), table_name.str) == 0);
00374 
00375   assert(strcmp(identifier.getSchemaName().c_str(), db.str) == 0);
00376 }
00377 
00378 
00379 TableShare::TableShare(const identifier::Table &identifier) : // Just used during createTable()
00380   table_category(TABLE_UNKNOWN_CATEGORY),
00381   found_next_number_field(NULL),
00382   timestamp_field(NULL),
00383   key_info(NULL),
00384   mem_root(TABLE_ALLOC_BLOCK_SIZE),
00385   table_charset(0),
00386   all_set(),
00387   db(NULL_LEX_STRING),
00388   table_name(NULL_LEX_STRING),
00389   path(NULL_LEX_STRING),
00390   normalized_path(NULL_LEX_STRING),
00391   block_size(0),
00392   version(0),
00393   timestamp_offset(0),
00394   reclength(0),
00395   stored_rec_length(0),
00396   max_rows(0),
00397   _table_message(NULL),
00398   storage_engine(NULL),
00399   tmp_table(identifier.getType()),
00400   _ref_count(0),
00401   null_bytes(0),
00402   last_null_bit_pos(0),
00403   _field_size(0),
00404   rec_buff_length(0),
00405   keys(0),
00406   key_parts(0),
00407   max_key_length(0),
00408   max_unique_length(0),
00409   total_key_length(0),
00410   uniques(0),
00411   null_fields(0),
00412   blob_fields(0),
00413   has_variable_width(false),
00414   db_create_options(0),
00415   db_options_in_use(0),
00416   db_record_offset(0),
00417   rowid_field_offset(0),
00418   primary_key(MAX_KEY),
00419   next_number_index(0),
00420   next_number_key_offset(0),
00421   next_number_keypart(0),
00422   error(0),
00423   open_errno(0),
00424   errarg(0),
00425   blob_ptr_size(portable_sizeof_char_ptr),
00426   db_low_byte_first(false),
00427   keys_in_use(0),
00428   keys_for_keyread(0)
00429 {
00430   private_key_for_cache= identifier.getKey();
00431   assert(identifier.getPath().size()); // Since we are doing a create table, this should be a positive value
00432   private_normalized_path.resize(identifier.getPath().size() + 1);
00433   memcpy(&private_normalized_path[0], identifier.getPath().c_str(), identifier.getPath().size());
00434 
00435   {
00436     table_category=         TABLE_CATEGORY_TEMPORARY;
00437     tmp_table=              message::Table::INTERNAL;
00438     db.str= const_cast<char *>(private_key_for_cache.vector());
00439     db.length= strlen(private_key_for_cache.vector());
00440     table_name.str= db.str + 1;
00441     table_name.length= strlen(table_name.str);
00442     path.str= &private_normalized_path[0];
00443     normalized_path.str= path.str;
00444     path.length= normalized_path.length= private_normalized_path.size();
00445   }
00446 }
00447 
00448 
00449 /*
00450   Used for shares that will go into the cache.
00451 */
00452 TableShare::TableShare(const identifier::Table::Type type_arg,
00453                        const identifier::Table &identifier,
00454                        char *path_arg,
00455                        uint32_t path_length_arg) :
00456   table_category(TABLE_UNKNOWN_CATEGORY),
00457   found_next_number_field(NULL),
00458   timestamp_field(NULL),
00459   key_info(NULL),
00460   mem_root(TABLE_ALLOC_BLOCK_SIZE),
00461   table_charset(0),
00462   all_set(),
00463   db(NULL_LEX_STRING),
00464   table_name(NULL_LEX_STRING),
00465   path(NULL_LEX_STRING),
00466   normalized_path(NULL_LEX_STRING),
00467   block_size(0),
00468   version(0),
00469   timestamp_offset(0),
00470   reclength(0),
00471   stored_rec_length(0),
00472   max_rows(0),
00473   _table_message(NULL),
00474   storage_engine(NULL),
00475   tmp_table(type_arg),
00476   _ref_count(0),
00477   null_bytes(0),
00478   last_null_bit_pos(0),
00479   _field_size(0),
00480   rec_buff_length(0),
00481   keys(0),
00482   key_parts(0),
00483   max_key_length(0),
00484   max_unique_length(0),
00485   total_key_length(0),
00486   uniques(0),
00487   null_fields(0),
00488   blob_fields(0),
00489   has_variable_width(false),
00490   db_create_options(0),
00491   db_options_in_use(0),
00492   db_record_offset(0),
00493   rowid_field_offset(0),
00494   primary_key(MAX_KEY),
00495   next_number_index(0),
00496   next_number_key_offset(0),
00497   next_number_keypart(0),
00498   error(0),
00499   open_errno(0),
00500   errarg(0),
00501   blob_ptr_size(portable_sizeof_char_ptr),
00502   db_low_byte_first(false),
00503   keys_in_use(0),
00504   keys_for_keyread(0)
00505 {
00506   char *path_buff;
00507   std::string _path;
00508 
00509   private_key_for_cache= identifier.getKey();
00510   /*
00511     Let us use the fact that the key is "db/0/table_name/0" + optional
00512     part for temporary tables.
00513   */
00514   db.str= const_cast<char *>(private_key_for_cache.vector());
00515   db.length=         strlen(db.str);
00516   table_name.str=    db.str + db.length + 1;
00517   table_name.length= strlen(table_name.str);
00518 
00519   if (path_arg)
00520   {
00521     _path.append(path_arg, path_length_arg);
00522   }
00523   else
00524   {
00525     identifier::Table::build_table_filename(_path, db.str, table_name.str, false);
00526   }
00527 
00528   if ((path_buff= (char *)mem_root.alloc_root(_path.length() + 1)))
00529   {
00530     setPath(path_buff, _path.length());
00531     strcpy(path_buff, _path.c_str());
00532     setNormalizedPath(path_buff, _path.length());
00533 
00534     version= refresh_version;
00535   }
00536   else
00537   {
00538     assert(0); // We should throw here.
00539     abort();
00540   }
00541 }
00542 
00543 void TableShare::init(const char *new_table_name,
00544                       const char *new_path)
00545 {
00546 
00547   table_category=         TABLE_CATEGORY_TEMPORARY;
00548   tmp_table=              message::Table::INTERNAL;
00549   db.str= (char *)"";
00550   db.length= 0;
00551   table_name.str=         (char*) new_table_name;
00552   table_name.length=      strlen(new_table_name);
00553   path.str=               (char*) new_path;
00554   normalized_path.str=    (char*) new_path;
00555   path.length= normalized_path.length= strlen(new_path);
00556 }
00557 
00558 TableShare::~TableShare() 
00559 {
00560   storage_engine= NULL;
00561 
00562   mem_root.free_root(MYF(0));                 // Free's share
00563 }
00564 
00565 void TableShare::setIdentifier(const identifier::Table &identifier_arg)
00566 {
00567   private_key_for_cache= identifier_arg.getKey();
00568 
00569   /*
00570     Let us use the fact that the key is "db/0/table_name/0" + optional
00571     part for temporary tables.
00572   */
00573   db.str= const_cast<char *>(private_key_for_cache.vector());
00574   db.length=         strlen(db.str);
00575   table_name.str=    db.str + db.length + 1;
00576   table_name.length= strlen(table_name.str);
00577 
00578   getTableMessage()->set_name(identifier_arg.getTableName());
00579   getTableMessage()->set_schema(identifier_arg.getSchemaName());
00580 }
00581 
00582 bool TableShare::parse_table_proto(Session& session, const message::Table &table)
00583 {
00584   drizzled::error_t local_error= EE_OK;
00585 
00586   if (! table.IsInitialized())
00587   {
00588     my_error(ER_CORRUPT_TABLE_DEFINITION, MYF(0),
00589              table.name().empty() ? " " :  table.name().c_str(),
00590              table.InitializationErrorString().c_str());
00591 
00592     return ER_CORRUPT_TABLE_DEFINITION;
00593   }
00594 
00595   setTableMessage(table);
00596 
00597   storage_engine= plugin::StorageEngine::findByName(session, table.engine().name());
00598   assert(storage_engine); // We use an assert() here because we should never get this far and still have no suitable engine.
00599 
00600   message::Table::TableOptions table_options;
00601 
00602   if (table.has_options())
00603     table_options= table.options();
00604 
00605   uint32_t local_db_create_options= 0;
00606 
00607   if (table_options.pack_record())
00608     local_db_create_options|= HA_OPTION_PACK_RECORD;
00609 
00610   /* local_db_create_options was stored as 2 bytes in FRM
00611     Any HA_OPTION_ that doesn't fit into 2 bytes was silently truncated away.
00612   */
00613   db_create_options= (local_db_create_options & 0x0000FFFF);
00614   db_options_in_use= db_create_options;
00615 
00616   block_size= table_options.has_block_size() ?
00617     table_options.block_size() : 0;
00618 
00619   table_charset= get_charset(table_options.collation_id());
00620 
00621   if (not table_charset)
00622   {
00623     my_error(ER_CORRUPT_TABLE_DEFINITION_UNKNOWN_COLLATION, MYF(0),
00624              table_options.collation().c_str(),
00625              table.name().c_str());
00626 
00627     return ER_CORRUPT_TABLE_DEFINITION; // Historical
00628   }
00629 
00630   db_record_offset= 1;
00631 
00632   keys= table.indexes_size();
00633 
00634   key_parts= 0;
00635   for (int indx= 0; indx < table.indexes_size(); indx++)
00636     key_parts+= table.indexes(indx).index_part_size();
00637 
00638   key_info= (KeyInfo*) alloc_root( table.indexes_size() * sizeof(KeyInfo) +key_parts*sizeof(KeyPartInfo));
00639 
00640   KeyPartInfo *key_part;
00641 
00642   key_part= reinterpret_cast<KeyPartInfo*>
00643     (key_info+table.indexes_size());
00644 
00645 
00646   ulong *rec_per_key= (ulong*) alloc_root(sizeof(ulong*)*key_parts);
00647 
00648   KeyInfo* keyinfo= key_info;
00649   for (int keynr= 0; keynr < table.indexes_size(); keynr++, keyinfo++)
00650   {
00651     message::Table::Index indx= table.indexes(keynr);
00652 
00653     keyinfo->table= 0;
00654     keyinfo->flags= 0;
00655 
00656     if (indx.is_unique())
00657       keyinfo->flags|= HA_NOSAME;
00658 
00659     if (indx.has_options())
00660     {
00661       message::Table::Index::Options indx_options= indx.options();
00662       if (indx_options.pack_key())
00663         keyinfo->flags|= HA_PACK_KEY;
00664 
00665       if (indx_options.var_length_key())
00666         keyinfo->flags|= HA_VAR_LENGTH_PART;
00667 
00668       if (indx_options.null_part_key())
00669         keyinfo->flags|= HA_NULL_PART_KEY;
00670 
00671       if (indx_options.binary_pack_key())
00672         keyinfo->flags|= HA_BINARY_PACK_KEY;
00673 
00674       if (indx_options.has_partial_segments())
00675         keyinfo->flags|= HA_KEY_HAS_PART_KEY_SEG;
00676 
00677       if (indx_options.auto_generated_key())
00678         keyinfo->flags|= HA_GENERATED_KEY;
00679 
00680       if (indx_options.has_key_block_size())
00681       {
00682         keyinfo->flags|= HA_USES_BLOCK_SIZE;
00683         keyinfo->block_size= indx_options.key_block_size();
00684       }
00685       else
00686       {
00687         keyinfo->block_size= 0;
00688       }
00689     }
00690 
00691     switch (indx.type())
00692     {
00693     case message::Table::Index::UNKNOWN_INDEX:
00694       keyinfo->algorithm= HA_KEY_ALG_UNDEF;
00695       break;
00696     case message::Table::Index::BTREE:
00697       keyinfo->algorithm= HA_KEY_ALG_BTREE;
00698       break;
00699     case message::Table::Index::HASH:
00700       keyinfo->algorithm= HA_KEY_ALG_HASH;
00701       break;
00702 
00703     default:
00704       /* TODO: suitable warning ? */
00705       keyinfo->algorithm= HA_KEY_ALG_UNDEF;
00706       break;
00707     }
00708 
00709     keyinfo->key_length= indx.key_length();
00710 
00711     keyinfo->key_parts= indx.index_part_size();
00712 
00713     keyinfo->key_part= key_part;
00714     keyinfo->rec_per_key= rec_per_key;
00715 
00716     for (unsigned int partnr= 0;
00717          partnr < keyinfo->key_parts;
00718          partnr++, key_part++)
00719     {
00720       message::Table::Index::IndexPart part;
00721       part= indx.index_part(partnr);
00722 
00723       *rec_per_key++= 0;
00724 
00725       key_part->field= NULL;
00726       key_part->fieldnr= part.fieldnr() + 1; // start from 1.
00727       key_part->null_bit= 0;
00728       /* key_part->null_offset is only set if null_bit (see later) */
00729       /* key_part->key_type= */ /* I *THINK* this may be okay.... */
00730       /* key_part->type ???? */
00731       key_part->key_part_flag= 0;
00732       if (part.has_in_reverse_order())
00733         key_part->key_part_flag= part.in_reverse_order()? HA_REVERSE_SORT : 0;
00734 
00735       key_part->length= part.compare_length();
00736 
00737       int mbmaxlen= 1;
00738 
00739       if (table.field(part.fieldnr()).type() == message::Table::Field::VARCHAR
00740           || table.field(part.fieldnr()).type() == message::Table::Field::BLOB)
00741       {
00742         uint32_t collation_id;
00743 
00744         if (table.field(part.fieldnr()).string_options().has_collation_id())
00745           collation_id= table.field(part.fieldnr()).string_options().collation_id();
00746         else
00747           collation_id= table.options().collation_id();
00748 
00749         const CHARSET_INFO *cs= get_charset(collation_id);
00750 
00751         mbmaxlen= cs->mbmaxlen;
00752       }
00753       key_part->length*= mbmaxlen;
00754 
00755       key_part->store_length= key_part->length;
00756 
00757       /* key_part->offset is set later */
00758       key_part->key_type= 0;
00759     }
00760 
00761     if (! indx.has_comment())
00762     {
00763       keyinfo->comment.length= 0;
00764       keyinfo->comment.str= NULL;
00765     }
00766     else
00767     {
00768       keyinfo->flags|= HA_USES_COMMENT;
00769       keyinfo->comment.length= indx.comment().length();
00770       keyinfo->comment.str= strmake_root(indx.comment().c_str(), keyinfo->comment.length);
00771     }
00772 
00773     keyinfo->name= strmake_root(indx.name().c_str(), indx.name().length());
00774 
00775     addKeyName(string(keyinfo->name, indx.name().length()));
00776   }
00777 
00778   keys_for_keyread.reset();
00779   set_prefix(keys_in_use, keys);
00780 
00781   _field_size= table.field_size();
00782 
00783   setFields(_field_size + 1);
00784   _fields[_field_size]= NULL;
00785 
00786   uint32_t local_null_fields= 0;
00787   reclength= 0;
00788 
00789   std::vector<uint32_t> field_offsets;
00790   std::vector<uint32_t> field_pack_length;
00791 
00792   field_offsets.resize(_field_size);
00793   field_pack_length.resize(_field_size);
00794 
00795   uint32_t interval_count= 0;
00796   uint32_t interval_parts= 0;
00797 
00798   uint32_t stored_columns_reclength= 0;
00799 
00800   for (unsigned int fieldnr= 0; fieldnr < _field_size; fieldnr++)
00801   {
00802     message::Table::Field pfield= table.field(fieldnr);
00803     if (pfield.constraints().is_nullable()) // Historical reference
00804     {
00805       local_null_fields++;
00806     }
00807     else if (not pfield.constraints().is_notnull())
00808     {
00809       local_null_fields++;
00810     }
00811 
00812     enum_field_types drizzle_field_type= proto_field_type_to_drizzle_type(pfield);
00813 
00814     field_offsets[fieldnr]= stored_columns_reclength;
00815 
00816     /* the below switch is very similar to
00817       CreateField::create_length_to_internal_length in field.cc
00818       (which should one day be replace by just this code)
00819     */
00820     switch(drizzle_field_type)
00821     {
00822     case DRIZZLE_TYPE_BLOB:
00823     case DRIZZLE_TYPE_VARCHAR:
00824       {
00825         message::Table::Field::StringFieldOptions field_options= pfield.string_options();
00826 
00827         const CHARSET_INFO *cs= get_charset(field_options.has_collation_id() ?
00828                                             field_options.collation_id() : 0);
00829 
00830         if (! cs)
00831           cs= default_charset_info;
00832 
00833         field_pack_length[fieldnr]= calc_pack_length(drizzle_field_type,
00834                                                      field_options.length() * cs->mbmaxlen);
00835       }
00836       break;
00837     case DRIZZLE_TYPE_ENUM:
00838       {
00839         message::Table::Field::EnumerationValues field_options= pfield.enumeration_values();
00840 
00841         field_pack_length[fieldnr]= 4;
00842 
00843         interval_count++;
00844         interval_parts+= field_options.field_value_size();
00845       }
00846       break;
00847     case DRIZZLE_TYPE_DECIMAL:
00848       {
00849         message::Table::Field::NumericFieldOptions fo= pfield.numeric_options();
00850 
00851         field_pack_length[fieldnr]= class_decimal_get_binary_size(fo.precision(), fo.scale());
00852       }
00853       break;
00854     default:
00855       /* Zero is okay here as length is fixed for other types. */
00856       field_pack_length[fieldnr]= calc_pack_length(drizzle_field_type, 0);
00857     }
00858 
00859     reclength+= field_pack_length[fieldnr];
00860     stored_columns_reclength+= field_pack_length[fieldnr];
00861   }
00862 
00863   /* data_offset added to stored_rec_length later */
00864   stored_rec_length= stored_columns_reclength;
00865 
00866   null_fields= local_null_fields;
00867 
00868   ulong null_bits= local_null_fields;
00869   if (! table_options.pack_record())
00870     null_bits++;
00871   ulong data_offset= (null_bits + 7)/8;
00872 
00873 
00874   reclength+= data_offset;
00875   stored_rec_length+= data_offset;
00876 
00877   ulong local_rec_buff_length;
00878 
00879   local_rec_buff_length= ALIGN_SIZE(reclength + 1);
00880   rec_buff_length= local_rec_buff_length;
00881 
00882   resizeDefaultValues(local_rec_buff_length);
00883   unsigned char* record= getDefaultValues();
00884   int null_count= 0;
00885 
00886   if (! table_options.pack_record())
00887   {
00888     null_count++; // one bit for delete mark.
00889     *record|= 1;
00890   }
00891 
00892 
00893   intervals.resize(interval_count);
00894 
00895   /* Now fix the TYPELIBs for the intervals (enum values)
00896     and field names.
00897   */
00898 
00899   uint32_t interval_nr= 0;
00900 
00901   for (unsigned int fieldnr= 0; fieldnr < _field_size; fieldnr++)
00902   {
00903     message::Table::Field pfield= table.field(fieldnr);
00904 
00905     /* enum typelibs */
00906     if (pfield.type() != message::Table::Field::ENUM)
00907       continue;
00908 
00909     message::Table::Field::EnumerationValues field_options= pfield.enumeration_values();
00910 
00911     if (field_options.field_value_size() > Field_enum::max_supported_elements)
00912     {
00913       my_error(ER_CORRUPT_TABLE_DEFINITION_ENUM, MYF(0), table.name().c_str());
00914 
00915       return ER_CORRUPT_TABLE_DEFINITION_ENUM; // Historical
00916     }
00917 
00918 
00919     const CHARSET_INFO *charset= get_charset(field_options.has_collation_id() ?
00920                                              field_options.collation_id() : 0);
00921 
00922     if (! charset)
00923       charset= default_charset_info;
00924 
00925     TYPELIB *t= (&intervals[interval_nr]);
00926 
00927     t->type_names= (const char**)alloc_root((field_options.field_value_size() + 1) * sizeof(char*));
00928 
00929     t->type_lengths= (unsigned int*) alloc_root((field_options.field_value_size() + 1) * sizeof(unsigned int));
00930 
00931     t->type_names[field_options.field_value_size()]= NULL;
00932     t->type_lengths[field_options.field_value_size()]= 0;
00933 
00934     t->count= field_options.field_value_size();
00935     t->name= NULL;
00936 
00937     for (int n= 0; n < field_options.field_value_size(); n++)
00938     {
00939       t->type_names[n]= strmake_root(field_options.field_value(n).c_str(), field_options.field_value(n).length());
00940 
00941       /* 
00942        * Go ask the charset what the length is as for "" length=1
00943        * and there's stripping spaces or some other crack going on.
00944      */
00945       uint32_t lengthsp;
00946       lengthsp= charset->cset->lengthsp(charset,
00947                                         t->type_names[n],
00948                                         field_options.field_value(n).length());
00949       t->type_lengths[n]= lengthsp;
00950     }
00951     interval_nr++;
00952   }
00953 
00954 
00955   /* and read the fields */
00956   interval_nr= 0;
00957 
00958   bool use_hash= _field_size >= MAX_FIELDS_BEFORE_HASH;
00959 
00960   unsigned char* null_pos= getDefaultValues();
00961   int null_bit_pos= (table_options.pack_record()) ? 0 : 1;
00962 
00963   for (unsigned int fieldnr= 0; fieldnr < _field_size; fieldnr++)
00964   {
00965     message::Table::Field pfield= table.field(fieldnr);
00966 
00967     Field::utype unireg_type= Field::NONE;
00968 
00969     if (pfield.has_numeric_options() &&
00970         pfield.numeric_options().is_autoincrement())
00971     {
00972       unireg_type= Field::NEXT_NUMBER;
00973     }
00974 
00975     if (pfield.has_options() &&
00976         pfield.options().has_default_expression() &&
00977         pfield.options().default_expression().compare("CURRENT_TIMESTAMP") == 0)
00978     {
00979       if (pfield.options().has_update_expression() &&
00980           pfield.options().update_expression().compare("CURRENT_TIMESTAMP") == 0)
00981       {
00982         unireg_type= Field::TIMESTAMP_DNUN_FIELD;
00983       }
00984       else if (! pfield.options().has_update_expression())
00985       {
00986         unireg_type= Field::TIMESTAMP_DN_FIELD;
00987       }
00988       else
00989       {
00990         assert(0); // Invalid update value.
00991         abort();
00992       }
00993     }
00994     else if (pfield.has_options() &&
00995              pfield.options().has_update_expression() &&
00996              pfield.options().update_expression().compare("CURRENT_TIMESTAMP") == 0)
00997     {
00998       unireg_type= Field::TIMESTAMP_UN_FIELD;
00999     }
01000 
01001     LEX_STRING comment;
01002     if (!pfield.has_comment())
01003     {
01004       comment.str= (char*)"";
01005       comment.length= 0;
01006     }
01007     else
01008     {
01009       size_t len= pfield.comment().length();
01010       const char* str= pfield.comment().c_str();
01011 
01012       comment.str= strmake_root(str, len);
01013       comment.length= len;
01014     }
01015 
01016     enum_field_types field_type;
01017 
01018     field_type= proto_field_type_to_drizzle_type(pfield);
01019 
01020     const CHARSET_INFO *charset= &my_charset_bin;
01021 
01022     if (field_type == DRIZZLE_TYPE_BLOB ||
01023         field_type == DRIZZLE_TYPE_VARCHAR)
01024     {
01025       message::Table::Field::StringFieldOptions field_options= pfield.string_options();
01026 
01027       charset= get_charset(field_options.has_collation_id() ?
01028                            field_options.collation_id() : 0);
01029 
01030       if (! charset)
01031         charset= default_charset_info;
01032     }
01033 
01034     if (field_type == DRIZZLE_TYPE_ENUM)
01035     {
01036       message::Table::Field::EnumerationValues field_options= pfield.enumeration_values();
01037 
01038       charset= get_charset(field_options.has_collation_id()?
01039                            field_options.collation_id() : 0);
01040 
01041       if (! charset)
01042         charset= default_charset_info;
01043     }
01044 
01045     uint8_t decimals= 0;
01046     if (field_type == DRIZZLE_TYPE_DECIMAL
01047         || field_type == DRIZZLE_TYPE_DOUBLE)
01048     {
01049       message::Table::Field::NumericFieldOptions fo= pfield.numeric_options();
01050 
01051       if (! pfield.has_numeric_options() || ! fo.has_scale())
01052       {
01053         /*
01054           We don't write the default to table proto so
01055           if no decimals specified for DOUBLE, we use the default.
01056         */
01057         decimals= NOT_FIXED_DEC;
01058       }
01059       else
01060       {
01061         if (fo.scale() > DECIMAL_MAX_SCALE)
01062         {
01063           local_error= ER_NOT_FORM_FILE;
01064 
01065           return true;
01066         }
01067         decimals= static_cast<uint8_t>(fo.scale());
01068       }
01069     }
01070 
01071     Item *default_value= NULL;
01072 
01073     if (pfield.options().has_default_value() ||
01074         pfield.options().default_null()  ||
01075         pfield.options().has_default_bin_value())
01076     {
01077       default_value= default_value_item(field_type,
01078                                         charset,
01079                                         pfield.options().default_null(),
01080                                         &pfield.options().default_value(),
01081                                         &pfield.options().default_bin_value());
01082     }
01083 
01084 
01085     uint32_t field_length= 0; //Assignment is for compiler complaint.
01086 
01087     // We set field_length in this loop.
01088     switch (field_type)
01089     {
01090     case DRIZZLE_TYPE_BLOB:
01091     case DRIZZLE_TYPE_VARCHAR:
01092       {
01093         message::Table::Field::StringFieldOptions field_options= pfield.string_options();
01094 
01095         charset= get_charset(field_options.has_collation_id() ?
01096                              field_options.collation_id() : 0);
01097 
01098         if (! charset)
01099           charset= default_charset_info;
01100 
01101         field_length= field_options.length() * charset->mbmaxlen;
01102       }
01103       break;
01104     case DRIZZLE_TYPE_DOUBLE:
01105       {
01106         message::Table::Field::NumericFieldOptions fo= pfield.numeric_options();
01107         if (!fo.has_precision() && !fo.has_scale())
01108         {
01109           field_length= DBL_DIG+7;
01110         }
01111         else
01112         {
01113           field_length= fo.precision();
01114         }
01115         if (field_length < decimals &&
01116             decimals != NOT_FIXED_DEC)
01117         {
01118           my_error(ER_M_BIGGER_THAN_D, MYF(0), pfield.name().c_str());
01119           local_error= ER_M_BIGGER_THAN_D;
01120           return true;
01121         }
01122         break;
01123       }
01124     case DRIZZLE_TYPE_DECIMAL:
01125       {
01126         message::Table::Field::NumericFieldOptions fo= pfield.numeric_options();
01127 
01128         field_length= class_decimal_precision_to_length(fo.precision(), fo.scale(),
01129                                                      false);
01130         break;
01131       }
01132     case DRIZZLE_TYPE_DATETIME:
01133       field_length= DateTime::MAX_STRING_LENGTH;
01134       break;
01135     case DRIZZLE_TYPE_DATE:
01136       field_length= Date::MAX_STRING_LENGTH;
01137       break;
01138     case DRIZZLE_TYPE_ENUM:
01139       {
01140         field_length= 0;
01141 
01142         message::Table::Field::EnumerationValues fo= pfield.enumeration_values();
01143 
01144         for (int valnr= 0; valnr < fo.field_value_size(); valnr++)
01145         {
01146           if (fo.field_value(valnr).length() > field_length)
01147           {
01148             field_length= charset->cset->numchars(charset,
01149                                                   fo.field_value(valnr).c_str(),
01150                                                   fo.field_value(valnr).c_str()
01151                                                   + fo.field_value(valnr).length())
01152               * charset->mbmaxlen;
01153           }
01154         }
01155       }
01156       break;
01157     case DRIZZLE_TYPE_LONG:
01158       {
01159         uint32_t sign_len= pfield.constraints().is_unsigned() ? 0 : 1;
01160         field_length= MAX_INT_WIDTH+sign_len;
01161       }
01162       break;
01163     case DRIZZLE_TYPE_LONGLONG:
01164       {
01165         uint32_t sign_len= pfield.constraints().is_unsigned() ? 0 : 1;
01166         field_length= MAX_BIGINT_WIDTH+sign_len;
01167       }
01168       break;
01169     case DRIZZLE_TYPE_UUID:
01170       field_length= field::Uuid::max_string_length();
01171       break;
01172     case DRIZZLE_TYPE_BOOLEAN:
01173       field_length= field::Boolean::max_string_length();
01174       break;
01175     case DRIZZLE_TYPE_MICROTIME:
01176       field_length= field::Microtime::max_string_length();
01177       break;
01178     case DRIZZLE_TYPE_TIMESTAMP:
01179       field_length= field::Epoch::max_string_length();
01180       break;
01181     case DRIZZLE_TYPE_TIME:
01182       field_length= field::Time::max_string_length();
01183       break;
01184     case DRIZZLE_TYPE_NULL:
01185       abort(); // Programming error
01186     }
01187 
01188     bool is_not_null= false;
01189 
01190     if (not pfield.constraints().is_nullable())
01191     {
01192       is_not_null= true;
01193     }
01194     else if (pfield.constraints().is_notnull())
01195     {
01196       is_not_null= true;
01197     }
01198 
01199     Field* f= make_field(pfield,
01200                          record + field_offsets[fieldnr] + data_offset,
01201                          field_length,
01202                          not is_not_null,
01203                          null_pos,
01204                          null_bit_pos,
01205                          decimals,
01206                          field_type,
01207                          charset,
01208                          MTYP_TYPENR(unireg_type),
01209                          ((field_type == DRIZZLE_TYPE_ENUM) ?  &intervals[interval_nr++] : (TYPELIB*) 0),
01210                          getTableMessage()->field(fieldnr).name().c_str());
01211 
01212     _fields[fieldnr]= f;
01213 
01214     // Insert post make_field code here.
01215     switch (field_type)
01216     {
01217     case DRIZZLE_TYPE_BLOB:
01218     case DRIZZLE_TYPE_VARCHAR:
01219     case DRIZZLE_TYPE_DOUBLE:
01220     case DRIZZLE_TYPE_DECIMAL:
01221     case DRIZZLE_TYPE_TIMESTAMP:
01222     case DRIZZLE_TYPE_TIME:
01223     case DRIZZLE_TYPE_DATETIME:
01224     case DRIZZLE_TYPE_MICROTIME:
01225     case DRIZZLE_TYPE_DATE:
01226     case DRIZZLE_TYPE_ENUM:
01227     case DRIZZLE_TYPE_LONG:
01228     case DRIZZLE_TYPE_LONGLONG:
01229     case DRIZZLE_TYPE_NULL:
01230     case DRIZZLE_TYPE_UUID:
01231     case DRIZZLE_TYPE_BOOLEAN:
01232       break;
01233     }
01234 
01235     // This needs to go, we should be setting the "use" on the field so that
01236     // it does not reference the share/table.
01237     table::Shell temp_table(*this); /* Use this so that BLOB DEFAULT '' works */
01238     temp_table.in_use= &session;
01239 
01240     f->init(&temp_table); /* blob default values need table obj */
01241 
01242     if (! (f->flags & NOT_NULL_FLAG))
01243     {
01244       *f->null_ptr|= f->null_bit;
01245       if (! (null_bit_pos= (null_bit_pos + 1) & 7)) /* @TODO Ugh. */
01246         null_pos++;
01247       null_count++;
01248     }
01249 
01250     if (default_value)
01251     {
01252       enum_check_fields old_count_cuted_fields= session.count_cuted_fields;
01253       session.count_cuted_fields= CHECK_FIELD_ERROR_FOR_NULL;
01254       int res= default_value->save_in_field(f, 1);
01255       session.count_cuted_fields= old_count_cuted_fields;
01256       if (res != 0 && res != 3) /* @TODO Huh? */
01257       {
01258         my_error(ER_INVALID_DEFAULT, MYF(0), f->field_name);
01259         local_error= ER_INVALID_DEFAULT;
01260 
01261         return true;
01262       }
01263     }
01264     else if (f->real_type() == DRIZZLE_TYPE_ENUM && (f->flags & NOT_NULL_FLAG))
01265     {
01266       f->set_notnull();
01267       f->store((int64_t) 1, true);
01268     }
01269     else
01270     {
01271       f->reset();
01272     }
01273 
01274     /* hack to undo f->init() */
01275     f->setTable(NULL);
01276     f->orig_table= NULL;
01277 
01278     f->setPosition(fieldnr);
01279     f->comment= comment;
01280     if (not default_value &&
01281         not (f->unireg_check==Field::NEXT_NUMBER) &&
01282         (f->flags & NOT_NULL_FLAG) &&
01283         (not f->is_timestamp()))
01284     {
01285       f->flags|= NO_DEFAULT_VALUE_FLAG;
01286     }
01287 
01288     if (f->unireg_check == Field::NEXT_NUMBER)
01289       found_next_number_field= &(_fields[fieldnr]);
01290 
01291     if (use_hash) /* supposedly this never fails... but comments lie */
01292     {
01293       const char *local_field_name= _fields[fieldnr]->field_name;
01294       name_hash.insert(make_pair(local_field_name, &(_fields[fieldnr])));
01295     }
01296   }
01297 
01298   keyinfo= key_info;
01299   for (unsigned int keynr= 0; keynr < keys; keynr++, keyinfo++)
01300   {
01301     key_part= keyinfo->key_part;
01302 
01303     for (unsigned int partnr= 0;
01304          partnr < keyinfo->key_parts;
01305          partnr++, key_part++)
01306     {
01307       /* 
01308        * Fix up key_part->offset by adding data_offset.
01309        * We really should compute offset as well.
01310        * But at least this way we are a little better.
01311      */
01312       key_part->offset= field_offsets[key_part->fieldnr-1] + data_offset;
01313     }
01314   }
01315 
01316   /*
01317     We need to set the unused bits to 1. If the number of bits is a multiple
01318     of 8 there are no unused bits.
01319   */
01320   if (null_count & 7)
01321     *(record + null_count / 8)|= ~(((unsigned char) 1 << (null_count & 7)) - 1);
01322 
01323   null_bytes= (null_pos - (unsigned char*) record + (null_bit_pos + 7) / 8);
01324 
01325   last_null_bit_pos= null_bit_pos;
01326 
01327   /* Fix key stuff */
01328   if (key_parts)
01329   {
01330     uint32_t local_primary_key= 0;
01331     doesKeyNameExist("PRIMARY", local_primary_key);
01332 
01333     keyinfo= key_info;
01334     key_part= keyinfo->key_part;
01335 
01336     for (uint32_t key= 0; key < keys; key++,keyinfo++)
01337     {
01338       uint32_t usable_parts= 0;
01339 
01340       if (local_primary_key >= MAX_KEY && (keyinfo->flags & HA_NOSAME))
01341       {
01342         /*
01343           If the UNIQUE key doesn't have NULL columns and is not a part key
01344           declare this as a primary key.
01345         */
01346         local_primary_key=key;
01347         for (uint32_t i= 0; i < keyinfo->key_parts; i++)
01348         {
01349           uint32_t fieldnr= key_part[i].fieldnr;
01350           if (not fieldnr ||
01351               _fields[fieldnr-1]->null_ptr ||
01352               _fields[fieldnr-1]->key_length() != key_part[i].length)
01353           {
01354             local_primary_key= MAX_KEY; // Can't be used
01355             break;
01356           }
01357         }
01358       }
01359 
01360       for (uint32_t i= 0 ; i < keyinfo->key_parts ; key_part++,i++)
01361       {
01362         Field *local_field;
01363         if (! key_part->fieldnr)
01364         {
01365           return ENOMEM;
01366         }
01367         local_field= key_part->field= _fields[key_part->fieldnr-1];
01368         key_part->type= local_field->key_type();
01369         if (local_field->null_ptr)
01370         {
01371           key_part->null_offset=(uint32_t) ((unsigned char*) local_field->null_ptr - getDefaultValues());
01372           key_part->null_bit= local_field->null_bit;
01373           key_part->store_length+=HA_KEY_NULL_LENGTH;
01374           keyinfo->flags|=HA_NULL_PART_KEY;
01375           keyinfo->extra_length+= HA_KEY_NULL_LENGTH;
01376           keyinfo->key_length+= HA_KEY_NULL_LENGTH;
01377         }
01378         if (local_field->type() == DRIZZLE_TYPE_BLOB ||
01379             local_field->real_type() == DRIZZLE_TYPE_VARCHAR)
01380         {
01381           if (local_field->type() == DRIZZLE_TYPE_BLOB)
01382             key_part->key_part_flag|= HA_BLOB_PART;
01383           else
01384             key_part->key_part_flag|= HA_VAR_LENGTH_PART;
01385           keyinfo->extra_length+=HA_KEY_BLOB_LENGTH;
01386           key_part->store_length+=HA_KEY_BLOB_LENGTH;
01387           keyinfo->key_length+= HA_KEY_BLOB_LENGTH;
01388         }
01389         if (i == 0 && key != local_primary_key)
01390           local_field->flags |= (((keyinfo->flags & HA_NOSAME) &&
01391                                   (keyinfo->key_parts == 1)) ?
01392                                  UNIQUE_KEY_FLAG : MULTIPLE_KEY_FLAG);
01393         if (i == 0)
01394           local_field->key_start.set(key);
01395         if (local_field->key_length() == key_part->length &&
01396             !(local_field->flags & BLOB_FLAG))
01397         {
01398           enum ha_key_alg algo= key_info[key].algorithm;
01399           if (db_type()->index_flags(algo) & HA_KEYREAD_ONLY)
01400           {
01401             keys_for_keyread.set(key);
01402             local_field->part_of_key.set(key);
01403             local_field->part_of_key_not_clustered.set(key);
01404           }
01405           if (db_type()->index_flags(algo) & HA_READ_ORDER)
01406             local_field->part_of_sortkey.set(key);
01407         }
01408         if (!(key_part->key_part_flag & HA_REVERSE_SORT) &&
01409             usable_parts == i)
01410           usable_parts++;     // For FILESORT
01411         local_field->flags|= PART_KEY_FLAG;
01412         if (key == local_primary_key)
01413         {
01414           local_field->flags|= PRI_KEY_FLAG;
01415           /*
01416             If this field is part of the primary key and all keys contains
01417             the primary key, then we can use any key to find this column
01418           */
01419           if (storage_engine->check_flag(HTON_BIT_PRIMARY_KEY_IN_READ_INDEX))
01420           {
01421             local_field->part_of_key= keys_in_use;
01422             if (local_field->part_of_sortkey.test(key))
01423               local_field->part_of_sortkey= keys_in_use;
01424           }
01425         }
01426         if (local_field->key_length() != key_part->length)
01427         {
01428           key_part->key_part_flag|= HA_PART_KEY_SEG;
01429         }
01430       }
01431       keyinfo->usable_key_parts= usable_parts; // Filesort
01432 
01433       set_if_bigger(max_key_length,keyinfo->key_length+
01434                     keyinfo->key_parts);
01435       total_key_length+= keyinfo->key_length;
01436 
01437       if (keyinfo->flags & HA_NOSAME)
01438       {
01439         set_if_bigger(max_unique_length,keyinfo->key_length);
01440       }
01441     }
01442     if (local_primary_key < MAX_KEY &&
01443         (keys_in_use.test(local_primary_key)))
01444     {
01445       primary_key= local_primary_key;
01446       /*
01447         If we are using an integer as the primary key then allow the user to
01448         refer to it as '_rowid'
01449       */
01450       if (key_info[local_primary_key].key_parts == 1)
01451       {
01452         Field *local_field= key_info[local_primary_key].key_part[0].field;
01453         if (local_field && local_field->result_type() == INT_RESULT)
01454         {
01455           /* note that fieldnr here (and rowid_field_offset) starts from 1 */
01456           rowid_field_offset= (key_info[local_primary_key].key_part[0].
01457                                       fieldnr);
01458         }
01459       }
01460     }
01461   }
01462 
01463   if (found_next_number_field)
01464   {
01465     Field *reg_field= *found_next_number_field;
01466     if ((int) (next_number_index= (uint32_t)
01467                find_ref_key(key_info, keys,
01468                             getDefaultValues(), reg_field,
01469                             &next_number_key_offset,
01470                             &next_number_keypart)) < 0)
01471     {
01472       /* Wrong field definition */
01473       local_error= ER_NOT_FORM_FILE;
01474 
01475       return true;
01476     }
01477     else
01478     {
01479       reg_field->flags |= AUTO_INCREMENT_FLAG;
01480     }
01481   }
01482 
01483   if (blob_fields)
01484   {
01485     /* Store offsets to blob fields to find them fast */
01486     blob_field.resize(blob_fields);
01487     uint32_t *save= &blob_field[0];
01488     uint32_t k= 0;
01489     for (Fields::iterator iter= _fields.begin(); iter != _fields.end()-1; iter++, k++)
01490     {
01491       if ((*iter)->flags & BLOB_FLAG)
01492         (*save++)= k;
01493     }
01494   }
01495 
01496   all_set.clear();
01497   all_set.resize(_field_size);
01498   all_set.set();
01499 
01500   return local_error != EE_OK;
01501 }
01502 
01503 /*
01504   Read table definition from a binary / text based .frm cursor
01505 
01506   SYNOPSIS
01507   open_table_def()
01508   session   Thread Cursor
01509   share   Fill this with table definition
01510 
01511   NOTES
01512   This function is called when the table definition is not cached in
01513   definition::Cache::singleton().getCache()
01514   The data is returned in 'share', which is alloced by
01515   alloc_table_share().. The code assumes that share is initialized.
01516 
01517   RETURN VALUES
01518   0 ok
01519   1 Error (see open_table_error)
01520   2    Error (see open_table_error)
01521   3    Wrong data in .frm cursor
01522   4    Error (see open_table_error)
01523   5    Error (see open_table_error: charset unavailable)
01524   6    Unknown .frm version
01525 */
01526 
01527 int TableShare::open_table_def(Session& session, const identifier::Table &identifier)
01528 {
01529   drizzled::error_t local_error= EE_OK;
01530 
01531   message::table::shared_ptr table= plugin::StorageEngine::getTableMessage(session, identifier, local_error);
01532 
01533   if (table and table->IsInitialized())
01534   {
01535     if (parse_table_proto(session, *table))
01536     {
01537       local_error= ER_CORRUPT_TABLE_DEFINITION_UNKNOWN;
01538       my_error(ER_CORRUPT_TABLE_DEFINITION_UNKNOWN, identifier);
01539     }
01540     else
01541     {
01542       setTableCategory(TABLE_CATEGORY_USER);
01543       local_error= EE_OK;
01544     }
01545   }
01546   else if (table and not table->IsInitialized())
01547   {
01548     local_error= ER_CORRUPT_TABLE_DEFINITION_UNKNOWN;
01549     my_error(ER_CORRUPT_TABLE_DEFINITION_UNKNOWN, identifier);
01550   }
01551   else
01552   {
01553     local_error= ER_TABLE_UNKNOWN;
01554     my_error(ER_TABLE_UNKNOWN, identifier);
01555   }
01556 
01557   return static_cast<int>(local_error);
01558 }
01559 
01560 
01561 /*
01562   Open a table based on a TableShare
01563 
01564   SYNOPSIS
01565   open_table_from_share()
01566   session     Thread Cursor
01567   share   Table definition
01568   alias         Alias for table
01569   db_stat   open flags (for example HA_OPEN_KEYFILE|
01570   HA_OPEN_RNDFILE..) can be 0 (example in
01571   ha_example_table)
01572   ha_open_flags HA_OPEN_ABORT_IF_LOCKED etc..
01573   outparam        result table
01574 
01575   RETURN VALUES
01576   0 ok
01577   1 Error (see open_table_error)
01578   2    Error (see open_table_error)
01579   3    Wrong data in .frm cursor
01580   4    Error (see open_table_error)
01581   5    Error (see open_table_error: charset unavailable)
01582   7    Table definition has changed in engine
01583 */
01584 int TableShare::open_table_from_share(Session *session,
01585                                       const identifier::Table &identifier,
01586                                       const char *alias,
01587                                       uint32_t db_stat, uint32_t ha_open_flags,
01588                                       Table &outparam)
01589 {
01590   bool error_reported= false;
01591   int ret= open_table_from_share_inner(session, alias, db_stat, outparam);
01592 
01593   if (not ret)
01594     ret= open_table_cursor_inner(identifier, db_stat, ha_open_flags, outparam, error_reported);
01595 
01596   if (not ret)
01597     return ret;
01598 
01599   if (not error_reported)
01600     open_table_error(ret, errno, 0);
01601 
01602   boost::checked_delete(outparam.cursor);
01603   outparam.cursor= 0;       // For easier error checking
01604   outparam.db_stat= 0;
01605   outparam.getMemRoot()->free_root(MYF(0));       // Safe to call on zeroed root
01606   outparam.clearAlias();
01607 
01608   return ret;
01609 }
01610 
01611 int TableShare::open_table_from_share_inner(Session *session,
01612                                             const char *alias,
01613                                             uint32_t db_stat,
01614                                             Table &outparam)
01615 {
01616   int local_error;
01617   uint32_t records;
01618   unsigned char *record= NULL;
01619   Field **field_ptr;
01620 
01621   local_error= 1;
01622   outparam.resetTable(session, this, db_stat);
01623 
01624   outparam.setAlias(alias);
01625 
01626   /* Allocate Cursor */
01627   if (not (outparam.cursor= db_type()->getCursor(outparam)))
01628     return local_error;
01629 
01630   local_error= 4;
01631   records= 0;
01632   if ((db_stat & HA_OPEN_KEYFILE))
01633     records=1;
01634 
01635   records++;
01636 
01637   if (!(record= (unsigned char*) outparam.alloc_root(rec_buff_length * records)))
01638     return local_error;
01639 
01640   if (records == 0)
01641   {
01642     /* We are probably in hard repair, and the buffers should not be used */
01643     outparam.record[0]= outparam.record[1]= getDefaultValues();
01644   }
01645   else
01646   {
01647     outparam.record[0]= record;
01648     if (records > 1)
01649       outparam.record[1]= record+ rec_buff_length;
01650     else
01651       outparam.record[1]= outparam.getInsertRecord();   // Safety
01652   }
01653 
01654 #ifdef HAVE_VALGRIND
01655   /*
01656     We need this because when we read var-length rows, we are not updating
01657     bytes after end of varchar
01658   */
01659   if (records > 1)
01660   {
01661     memcpy(outparam.getInsertRecord(), getDefaultValues(), rec_buff_length);
01662     memcpy(outparam.getUpdateRecord(), getDefaultValues(), null_bytes);
01663     if (records > 2)
01664       memcpy(outparam.getUpdateRecord(), getDefaultValues(), rec_buff_length);
01665   }
01666 #endif
01667   if (records > 1)
01668   {
01669     memcpy(outparam.getUpdateRecord(), getDefaultValues(), null_bytes);
01670   }
01671 
01672   if (!(field_ptr = (Field **) outparam.alloc_root( (uint32_t) ((_field_size+1)* sizeof(Field*)))))
01673   {
01674     return local_error;
01675   }
01676 
01677   outparam.setFields(field_ptr);
01678 
01679   record= (unsigned char*) outparam.getInsertRecord()-1;  /* Fieldstart = 1 */
01680 
01681   outparam.null_flags= (unsigned char*) record+1;
01682 
01683   /* Setup copy of fields from share, but use the right alias and record */
01684   for (uint32_t i= 0 ; i < _field_size; i++, field_ptr++)
01685   {
01686     if (!((*field_ptr)= _fields[i]->clone(outparam.getMemRoot(), &outparam)))
01687       return local_error;
01688   }
01689   (*field_ptr)= 0;                              // End marker
01690 
01691   if (found_next_number_field)
01692     outparam.found_next_number_field=
01693       outparam.getField(positionFields(found_next_number_field));
01694   if (timestamp_field)
01695     outparam.timestamp_field= (field::Epoch*) outparam.getField(timestamp_field->position());
01696 
01697   /* Fix key->name and key_part->field */
01698   if (key_parts)
01699   {
01700     KeyInfo *local_key_info, *key_info_end;
01701     KeyPartInfo *key_part;
01702     uint32_t n_length;
01703     n_length= keys*sizeof(KeyInfo) + key_parts*sizeof(KeyPartInfo);
01704     if (!(local_key_info= (KeyInfo*) outparam.alloc_root(n_length)))
01705       return local_error;
01706     outparam.key_info= local_key_info;
01707     key_part= (reinterpret_cast<KeyPartInfo*> (local_key_info+keys));
01708 
01709     memcpy(local_key_info, key_info, sizeof(*local_key_info)*keys);
01710     memcpy(key_part, key_info[0].key_part, (sizeof(*key_part) *
01711                                             key_parts));
01712 
01713     for (key_info_end= local_key_info + keys ;
01714          local_key_info < key_info_end ;
01715          local_key_info++)
01716     {
01717       KeyPartInfo *key_part_end;
01718 
01719       local_key_info->table= &outparam;
01720       local_key_info->key_part= key_part;
01721 
01722       for (key_part_end= key_part+ local_key_info->key_parts ;
01723            key_part < key_part_end ;
01724            key_part++)
01725       {
01726         Field *local_field= key_part->field= outparam.getField(key_part->fieldnr-1);
01727 
01728         if (local_field->key_length() != key_part->length &&
01729             !(local_field->flags & BLOB_FLAG))
01730         {
01731           /*
01732             We are using only a prefix of the column as a key:
01733             Create a new field for the key part that matches the index
01734           */
01735           local_field= key_part->field= local_field->new_field(outparam.getMemRoot(), &outparam, 0);
01736           local_field->field_length= key_part->length;
01737         }
01738       }
01739     }
01740   }
01741 
01742   /* Allocate bitmaps */
01743 
01744   outparam.def_read_set.resize(_field_size);
01745   outparam.def_write_set.resize(_field_size);
01746   outparam.tmp_set.resize(_field_size);
01747   outparam.default_column_bitmaps();
01748 
01749   return 0;
01750 }
01751 
01752 int TableShare::open_table_cursor_inner(const identifier::Table &identifier,
01753                                         uint32_t db_stat, uint32_t ha_open_flags,
01754                                         Table &outparam,
01755                                         bool &error_reported)
01756 {
01757   /* The table struct is now initialized;  Open the table */
01758   int local_error= 2;
01759   if (db_stat)
01760   {
01761     assert(!(db_stat & HA_WAIT_IF_LOCKED));
01762     int ha_err;
01763 
01764     if ((ha_err= (outparam.cursor->ha_open(identifier,
01765                                            (db_stat & HA_READ_ONLY ? O_RDONLY : O_RDWR),
01766                                            (db_stat & HA_OPEN_TEMPORARY ? HA_OPEN_TMP_TABLE : HA_OPEN_IGNORE_IF_LOCKED) | ha_open_flags))))
01767     {
01768       switch (ha_err)
01769       {
01770       case HA_ERR_NO_SUCH_TABLE:
01771         /*
01772           The table did not exists in storage engine, use same error message
01773           as if the .frm cursor didn't exist
01774         */
01775         local_error= 1;
01776         errno= ENOENT;
01777         break;
01778       case EMFILE:
01779         /*
01780           Too many files opened, use same error message as if the .frm
01781           cursor can't open
01782         */
01783         local_error= 1;
01784         errno= EMFILE;
01785         break;
01786       default:
01787         outparam.print_error(ha_err, MYF(0));
01788         error_reported= true;
01789         if (ha_err == HA_ERR_TABLE_DEF_CHANGED)
01790           local_error= 7;
01791         break;
01792       }
01793       return local_error;
01794     }
01795   }
01796 
01797   return 0;
01798 }
01799 
01800 /* error message when opening a form cursor */
01801 void TableShare::open_table_error(int pass_error, int db_errno, int pass_errarg)
01802 {
01803   char buff[FN_REFLEN];
01804   myf errortype= ME_ERROR+ME_WAITTANG;
01805 
01806   switch (pass_error) {
01807   case 7:
01808   case 1:
01809     if (db_errno == ENOENT)
01810     {
01811       identifier::Table identifier(db.str, table_name.str);
01812       my_error(ER_TABLE_UNKNOWN, identifier);
01813     }
01814     else
01815     {
01816       snprintf(buff, sizeof(buff), "%s",normalized_path.str);
01817       my_error((db_errno == EMFILE) ? ER_CANT_OPEN_FILE : ER_FILE_NOT_FOUND,
01818                errortype, buff, db_errno);
01819     }
01820     break;
01821   case 2:
01822     {
01823       drizzled::error_t err_no;
01824 
01825       err_no= (db_errno == ENOENT) ? ER_FILE_NOT_FOUND : (db_errno == EAGAIN) ?
01826         ER_FILE_USED : ER_CANT_OPEN_FILE;
01827 
01828       my_error(err_no, errortype, normalized_path.str, db_errno);
01829       break;
01830     }
01831   case 5:
01832     {
01833       const char *csname= get_charset_name((uint32_t) pass_errarg);
01834       char tmp[10];
01835       if (!csname || csname[0] =='?')
01836       {
01837         snprintf(tmp, sizeof(tmp), "#%d", pass_errarg);
01838         csname= tmp;
01839       }
01840       my_printf_error(ER_UNKNOWN_COLLATION,
01841                       _("Unknown collation '%s' in table '%-.64s' definition"),
01842                       MYF(0), csname, table_name.str);
01843       break;
01844     }
01845   case 6:
01846     snprintf(buff, sizeof(buff), "%s", normalized_path.str);
01847     my_printf_error(ER_NOT_FORM_FILE,
01848                     _("Table '%-.64s' was created with a different version "
01849                       "of Drizzle and cannot be read"),
01850                     MYF(0), buff);
01851     break;
01852   case 8:
01853     break;
01854   default:        /* Better wrong error than none */
01855   case 4:
01856     snprintf(buff, sizeof(buff), "%s", normalized_path.str);
01857     my_error(ER_NOT_FORM_FILE, errortype, buff, 0);
01858     break;
01859   }
01860   return;
01861 } /* open_table_error */
01862 
01863 Field *TableShare::make_field(const message::Table::Field &pfield,
01864                               unsigned char *ptr,
01865                               uint32_t field_length,
01866                               bool is_nullable,
01867                               unsigned char *null_pos,
01868                               unsigned char null_bit,
01869                               uint8_t decimals,
01870                               enum_field_types field_type,
01871                               const CHARSET_INFO * field_charset,
01872                               Field::utype unireg_check,
01873                               TYPELIB *interval,
01874                               const char *field_name)
01875 {
01876   return make_field(pfield,
01877                     ptr,
01878                     field_length,
01879                     is_nullable,
01880                     null_pos,
01881                     null_bit,
01882                     decimals,
01883                     field_type,
01884                     field_charset,
01885                     unireg_check,
01886                     interval,
01887                     field_name,
01888                     pfield.constraints().is_unsigned());
01889 }
01890 
01891 Field *TableShare::make_field(const message::Table::Field &,
01892                               unsigned char *ptr,
01893                               uint32_t field_length,
01894                               bool is_nullable,
01895                               unsigned char *null_pos,
01896                               unsigned char null_bit,
01897                               uint8_t decimals,
01898                               enum_field_types field_type,
01899                               const CHARSET_INFO * field_charset,
01900                               Field::utype unireg_check,
01901                               TYPELIB *interval,
01902                               const char *field_name, 
01903                               bool is_unsigned)
01904 {
01905   if (! is_nullable)
01906   {
01907     null_pos=0;
01908     null_bit=0;
01909   }
01910   else
01911   {
01912     null_bit= ((unsigned char) 1) << null_bit;
01913   }
01914 
01915   switch (field_type)
01916   {
01917   case DRIZZLE_TYPE_ENUM:
01918     return new (&mem_root) Field_enum(ptr,
01919                                       field_length,
01920                                       null_pos,
01921                                       null_bit,
01922                                       field_name,
01923                                       interval,
01924                                       field_charset);
01925   case DRIZZLE_TYPE_VARCHAR:
01926     setVariableWidth();
01927     return new (&mem_root) Field_varstring(ptr,field_length,
01928                                       ha_varchar_packlength(field_length),
01929                                       null_pos,null_bit,
01930                                       field_name,
01931                                       field_charset);
01932   case DRIZZLE_TYPE_BLOB:
01933     return new (&mem_root) Field_blob(ptr,
01934                                       null_pos,
01935                                       null_bit,
01936                                       field_name,
01937                                       this,
01938                                       field_charset);
01939   case DRIZZLE_TYPE_DECIMAL:
01940     return new (&mem_root) Field_decimal(ptr,
01941                                          field_length,
01942                                          null_pos,
01943                                          null_bit,
01944                                          unireg_check,
01945                                          field_name,
01946                                          decimals);
01947   case DRIZZLE_TYPE_DOUBLE:
01948     return new (&mem_root) Field_double(ptr,
01949                                    field_length,
01950                                    null_pos,
01951                                    null_bit,
01952                                    unireg_check,
01953                                    field_name,
01954                                    decimals,
01955                                    false,
01956                                    false /* is_unsigned */);
01957   case DRIZZLE_TYPE_UUID:
01958     return new (&mem_root) field::Uuid(ptr,
01959                                        field_length,
01960                                        null_pos,
01961                                        null_bit,
01962                                        field_name);
01963   case DRIZZLE_TYPE_BOOLEAN:
01964     return new (&mem_root) field::Boolean(ptr,
01965                                           field_length,
01966                                           null_pos,
01967                                           null_bit,
01968                                           field_name,
01969                                           is_unsigned);
01970   case DRIZZLE_TYPE_LONG:
01971     return new (&mem_root) field::Int32(ptr,
01972                                         field_length,
01973                                         null_pos,
01974                                         null_bit,
01975                                         unireg_check,
01976                                         field_name);
01977   case DRIZZLE_TYPE_LONGLONG:
01978     {
01979       if (is_unsigned)
01980       {
01981         return new (&mem_root) field::Size(ptr,
01982                                            field_length,
01983                                            null_pos,
01984                                            null_bit,
01985                                            unireg_check,
01986                                            field_name);
01987       }
01988 
01989       return new (&mem_root) field::Int64(ptr,
01990                                           field_length,
01991                                           null_pos,
01992                                           null_bit,
01993                                           unireg_check,
01994                                           field_name);
01995     }
01996   case DRIZZLE_TYPE_MICROTIME:
01997     return new (&mem_root) field::Microtime(ptr,
01998                                             null_pos,
01999                                             null_bit,
02000                                             unireg_check,
02001                                             field_name,
02002                                             this);
02003   case DRIZZLE_TYPE_TIMESTAMP:
02004     return new (&mem_root) field::Epoch(ptr,
02005                                         null_pos,
02006                                         null_bit,
02007                                         unireg_check,
02008                                         field_name,
02009                                         this);
02010   case DRIZZLE_TYPE_TIME:
02011     return new (&mem_root) field::Time(ptr,
02012                                        field_length,
02013                                        null_pos,
02014                                        null_bit,
02015                                        field_name);
02016   case DRIZZLE_TYPE_DATE:
02017     return new (&mem_root) Field_date(ptr,
02018                                  null_pos,
02019                                  null_bit,
02020                                  field_name);
02021   case DRIZZLE_TYPE_DATETIME:
02022     return new (&mem_root) Field_datetime(ptr,
02023                                      null_pos,
02024                                      null_bit,
02025                                      field_name);
02026   case DRIZZLE_TYPE_NULL:
02027     return new (&mem_root) Field_null(ptr,
02028                                       field_length,
02029                                       field_name);
02030   }
02031   assert(0);
02032   abort();
02033 }
02034 
02035 void TableShare::refreshVersion()
02036 {
02037   version= refresh_version;
02038 }
02039 
02040 
02041 } /* namespace drizzled */