Drizzled Public API Documentation

mf_iocache.cc

00001 /* Copyright (C) 2000 MySQL AB
00002 
00003    This program is free software; you can redistribute it and/or modify
00004    it under the terms of the GNU General Public License as published by
00005    the Free Software Foundation; version 2 of the License.
00006 
00007    This program is distributed in the hope that it will be useful,
00008    but WITHOUT ANY WARRANTY; without even the implied warranty of
00009    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00010    GNU General Public License for more details.
00011 
00012    You should have received a copy of the GNU General Public License
00013    along with this program; if not, write to the Free Software
00014    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
00015 
00016 /*
00017   Cashing of files with only does (sequential) read or writes of fixed-
00018   length records. A read isn't allowed to go over file-length. A read is ok
00019   if it ends at file-length and next read can try to read after file-length
00020   (and get a EOF-error).
00021   Possibly use of asyncronic io.
00022   macros for read and writes for faster io.
00023   Used instead of FILE when reading or writing whole files.
00024   This code makes mf_rec_cache obsolete (currently only used by ISAM)
00025   One can change info->pos_in_file to a higher value to skip bytes in file if
00026   also info->read_pos is set to info->read_end.
00027   If called through open_cached_file(), then the temporary file will
00028   only be created if a write exeeds the file buffer or if one calls
00029   my_b_flush_io_cache().
00030 
00031   If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
00032   reading and another for writing.  Reads are first done from disk and
00033   then done from the write buffer.  This is an efficient way to read
00034   from a log file when one is writing to it at the same time.
00035   For this to work, the file has to be opened in append mode!
00036   Note that when one uses SEQ_READ_APPEND, one MUST write using
00037   my_b_append !  This is needed because we need to lock the mutex
00038   every time we access the write buffer.
00039 
00040 TODO:
00041   When one SEQ_READ_APPEND and we are reading and writing at the same time,
00042   each time the write buffer gets full and it's written to disk, we will
00043   always do a disk read to read a part of the buffer from disk to the
00044   read buffer.
00045   This should be fixed so that when we do a my_b_flush_io_cache() and
00046   we have been reading the write buffer, we should transfer the rest of the
00047   write buffer to the read buffer before we start to reuse it.
00048 */
00049 
00050 #include <config.h>
00051 
00052 #include <drizzled/internal/my_sys.h>
00053 #include <drizzled/internal/m_string.h>
00054 #include <drizzled/drizzled.h>
00055 #ifdef HAVE_AIOWAIT
00056 #include <drizzled/error.h>
00057 #include <drizzled/internal/aio_result.h>
00058 static void my_aiowait(my_aio_result *result);
00059 #endif
00060 #include <drizzled/internal/iocache.h>
00061 #include <errno.h>
00062 #include <drizzled/util/test.h>
00063 #include <stdlib.h>
00064 #include <algorithm>
00065 
00066 using namespace std;
00067 
00068 namespace drizzled
00069 {
00070 namespace internal
00071 {
00072 
00073 static int _my_b_read(st_io_cache *info, unsigned char *Buffer, size_t Count);
00074 static int _my_b_write(st_io_cache *info, const unsigned char *Buffer, size_t Count);
00075 
00080 inline
00081 static void lock_append_buffer(st_io_cache *, int )
00082 {
00083 }
00084 
00089 inline
00090 static void unlock_append_buffer(st_io_cache *, int )
00091 {
00092 }
00093 
00098 inline
00099 static size_t io_round_up(size_t x)
00100 {
00101   return ((x+IO_SIZE-1) & ~(IO_SIZE-1));
00102 }
00103 
00108 inline
00109 static size_t io_round_dn(size_t x)
00110 {
00111   return (x & ~(IO_SIZE-1));
00112 }
00113 
00114 
00125 void st_io_cache::setup_io_cache()
00126 {
00127   /* Ensure that my_b_tell() and my_b_bytes_in_cache works */
00128   if (type == WRITE_CACHE)
00129   {
00130     current_pos= &write_pos;
00131     current_end= &write_end;
00132   }
00133   else
00134   {
00135     current_pos= &read_pos;
00136     current_end= &read_end;
00137   }
00138 }
00139 
00140 
00141 void st_io_cache::init_functions()
00142 {
00143   switch (type) {
00144   case READ_NET:
00145     /*
00146       Must be initialized by the caller. The problem is that
00147       _my_b_net_read has to be defined in sql directory because of
00148       the dependency on THD, and therefore cannot be visible to
00149       programs that link against mysys but know nothing about THD, such
00150       as myisamchk
00151     */
00152     break;
00153   default:
00154     read_function = _my_b_read;
00155     write_function = _my_b_write;
00156   }
00157 
00158   setup_io_cache();
00159 }
00160 
00178 int st_io_cache::init_io_cache(int file_arg, size_t cachesize,
00179                                enum cache_type type_arg, my_off_t seek_offset,
00180                                bool use_async_io, myf cache_myflags)
00181 {
00182   size_t min_cache;
00183   off_t pos;
00184   my_off_t end_of_file_local= ~(my_off_t) 0;
00185 
00186   file= file_arg;
00187   type= TYPE_NOT_SET;     /* Don't set it until mutex are created */
00188   pos_in_file= seek_offset;
00189   pre_close = pre_read = post_read = 0;
00190   arg = 0;
00191   alloced_buffer = 0;
00192   buffer=0;
00193   seek_not_done= 0;
00194 
00195   if (file >= 0)
00196   {
00197     pos= lseek(file, 0, SEEK_CUR);
00198     if ((pos == MY_FILEPOS_ERROR) && (errno == ESPIPE))
00199     {
00200       /*
00201          This kind of object doesn't support seek() or tell(). Don't set a
00202          flag that will make us again try to seek() later and fail.
00203       */
00204       seek_not_done= 0;
00205       /*
00206         Additionally, if we're supposed to start somewhere other than the
00207         the beginning of whatever this file is, then somebody made a bad
00208         assumption.
00209       */
00210       assert(seek_offset == 0);
00211     }
00212     else
00213       seek_not_done= test(seek_offset != (my_off_t)pos);
00214   }
00215 
00216   if (!cachesize && !(cachesize= my_default_record_cache_size))
00217     return 1;       /* No cache requested */
00218   min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
00219   if (type_arg == READ_CACHE)
00220   {           /* Assume file isn't growing */
00221     if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
00222     {
00223       /* Calculate end of file to avoid allocating oversized buffers */
00224       end_of_file_local=lseek(file,0L,SEEK_END);
00225       /* Need to reset seek_not_done now that we just did a seek. */
00226       seek_not_done= end_of_file_local == seek_offset ? 0 : 1;
00227       if (end_of_file_local < seek_offset)
00228   end_of_file_local=seek_offset;
00229       /* Trim cache size if the file is very small */
00230       if ((my_off_t) cachesize > end_of_file_local-seek_offset+IO_SIZE*2-1)
00231       {
00232   cachesize= (size_t) (end_of_file_local-seek_offset)+IO_SIZE*2-1;
00233   use_async_io=0;       /* No need to use async */
00234       }
00235     }
00236   }
00237   cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
00238   if (type_arg != READ_NET && type_arg != WRITE_NET)
00239   {
00240     /* Retry allocating memory in smaller blocks until we get one */
00241     cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
00242     for (;;)
00243     {
00244       size_t buffer_block;
00245       if (cachesize < min_cache)
00246   cachesize = min_cache;
00247       buffer_block= cachesize;
00248       if ((type_arg == READ_CACHE) and (not global_read_buffer.add(buffer_block)))
00249       {
00250         my_error(ER_OUT_OF_GLOBAL_READMEMORY, MYF(ME_ERROR+ME_WAITTANG));
00251         return 2;
00252       }
00253 
00254       if ((buffer=
00255      (unsigned char*) malloc(buffer_block)) != 0)
00256       {
00257   write_buffer=buffer;
00258   alloced_buffer= true;
00259   break;          /* Enough memory found */
00260       }
00261       if (cachesize == min_cache)
00262       {
00263         if (type_arg == READ_CACHE)
00264           global_read_buffer.sub(buffer_block);
00265   return 2;       /* Can't alloc cache */
00266       }
00267       /* Try with less memory */
00268       if (type_arg == READ_CACHE)
00269         global_read_buffer.sub(buffer_block);
00270       cachesize= (cachesize*3/4 & ~(min_cache-1));
00271     }
00272   }
00273 
00274   read_length=buffer_length=cachesize;
00275   myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
00276   request_pos= read_pos= write_pos = buffer;
00277 
00278   if (type_arg == WRITE_CACHE)
00279     write_end=
00280       buffer+buffer_length- (seek_offset & (IO_SIZE-1));
00281   else
00282     read_end=buffer;    /* Nothing in cache */
00283 
00284   /* End_of_file may be changed by user later */
00285   end_of_file= end_of_file_local;
00286   error= 0;
00287   type= type_arg;
00288   init_functions();
00289 #ifdef HAVE_AIOWAIT
00290   if (use_async_io && ! my_disable_async_io)
00291   {
00292     read_length/=2;
00293     read_function=_my_b_async_read;
00294   }
00295   inited= aio_result.pending= 0;
00296 #endif
00297   return 0;
00298 }           /* init_io_cache */
00299 
00300   /* Wait until current request is ready */
00301 
00302 #ifdef HAVE_AIOWAIT
00303 static void my_aiowait(my_aio_result *result)
00304 {
00305   if (result->pending)
00306   {
00307     struct aio_result_t *tmp;
00308     for (;;)
00309     {
00310       if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
00311       {
00312   if (errno == EINTR)
00313     continue;
00314   result->pending=0;      /* Assume everythings is ok */
00315   break;
00316       }
00317       ((my_aio_result*) tmp)->pending=0;
00318       if ((my_aio_result*) tmp == result)
00319   break;
00320     }
00321   }
00322 }
00323 #endif
00324 
00335 bool st_io_cache::reinit_io_cache(enum cache_type type_arg,
00336                                   my_off_t seek_offset,
00337                                   bool use_async_io,
00338                                   bool clear_cache)
00339 {
00340   /* One can't do reinit with the following types */
00341   assert(type_arg != READ_NET && type != READ_NET &&
00342         type_arg != WRITE_NET && type != WRITE_NET);
00343 
00344   /* If the whole file is in memory, avoid flushing to disk */
00345   if (! clear_cache &&
00346       seek_offset >= pos_in_file &&
00347       seek_offset <= my_b_tell(this))
00348   {
00349     /* Reuse current buffer without flushing it to disk */
00350     unsigned char *pos;
00351     if (type == WRITE_CACHE && type_arg == READ_CACHE)
00352     {
00353       read_end=write_pos;
00354       end_of_file=my_b_tell(this);
00355       /*
00356         Trigger a new seek only if we have a valid
00357         file handle.
00358       */
00359       seek_not_done= (file != -1);
00360     }
00361     else if (type_arg == WRITE_CACHE)
00362     {
00363       if (type == READ_CACHE)
00364       {
00365   write_end=write_buffer+buffer_length;
00366   seek_not_done=1;
00367       }
00368       end_of_file = ~(my_off_t) 0;
00369     }
00370     pos=request_pos+(seek_offset-pos_in_file);
00371     if (type_arg == WRITE_CACHE)
00372       write_pos=pos;
00373     else
00374       read_pos= pos;
00375 #ifdef HAVE_AIOWAIT
00376     my_aiowait(&aio_result);    /* Wait for outstanding req */
00377 #endif
00378   }
00379   else
00380   {
00381     /*
00382       If we change from WRITE_CACHE to READ_CACHE, assume that everything
00383       after the current positions should be ignored
00384     */
00385     if (type == WRITE_CACHE && type_arg == READ_CACHE)
00386       end_of_file=my_b_tell(this);
00387     /* flush cache if we want to reuse it */
00388     if (!clear_cache && my_b_flush_io_cache(this, 1))
00389       return 1;
00390     pos_in_file=seek_offset;
00391     /* Better to do always do a seek */
00392     seek_not_done=1;
00393     request_pos=read_pos=write_pos=buffer;
00394     if (type_arg == READ_CACHE)
00395     {
00396       read_end=buffer;    /* Nothing in cache */
00397     }
00398     else
00399     {
00400       write_end=(buffer + buffer_length -
00401            (seek_offset & (IO_SIZE-1)));
00402       end_of_file= ~(my_off_t) 0;
00403     }
00404   }
00405   type= type_arg;
00406   error=0;
00407   init_functions();
00408 
00409 #ifdef HAVE_AIOWAIT
00410   if (use_async_io && ! my_disable_async_io &&
00411       ((uint32_t) buffer_length <
00412        (uint32_t) (end_of_file - seek_offset)))
00413   {
00414     read_length=buffer_length/2;
00415     read_function=_my_b_async_read;
00416   }
00417   inited= 0;
00418 #else
00419   (void)use_async_io;
00420 #endif
00421   return 0;
00422 } /* reinit_io_cache */
00423 
00444 static int _my_b_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
00445 {
00446   size_t length_local,diff_length,left_length, max_length;
00447   my_off_t pos_in_file_local;
00448 
00449   if ((left_length= (size_t) (info->read_end-info->read_pos)))
00450   {
00451     assert(Count >= left_length); /* User is not using my_b_read() */
00452     memcpy(Buffer,info->read_pos, left_length);
00453     Buffer+=left_length;
00454     Count-=left_length;
00455   }
00456 
00457   /* pos_in_file always point on where info->buffer was read */
00458   pos_in_file_local=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
00459 
00460   /*
00461     Whenever a function which operates on st_io_cache flushes/writes
00462     some part of the st_io_cache to disk it will set the property
00463     "seek_not_done" to indicate this to other functions operating
00464     on the st_io_cache.
00465   */
00466   if (info->seek_not_done)
00467   {
00468     if ((lseek(info->file,pos_in_file_local,SEEK_SET) != MY_FILEPOS_ERROR))
00469     {
00470       /* No error, reset seek_not_done flag. */
00471       info->seek_not_done= 0;
00472     }
00473     else
00474     {
00475       /*
00476         If the seek failed and the error number is ESPIPE, it is because
00477         info->file is a pipe or socket or FIFO.  We never should have tried
00478         to seek on that.  See Bugs#25807 and #22828 for more info.
00479       */
00480       assert(errno != ESPIPE);
00481       info->error= -1;
00482       return(1);
00483     }
00484   }
00485 
00486   diff_length= (size_t) (pos_in_file_local & (IO_SIZE-1));
00487   if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
00488   {         /* Fill first intern buffer */
00489     size_t read_length;
00490     if (info->end_of_file <= pos_in_file_local)
00491     {         /* End of file */
00492       info->error= (int) left_length;
00493       return(1);
00494     }
00495     length_local=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
00496     if ((read_length= my_read(info->file,Buffer, length_local, info->myflags)) != length_local)
00497     {
00498       info->error= (read_length == (size_t) -1 ? -1 :
00499         (int) (read_length+left_length));
00500       return(1);
00501     }
00502     Count-= length_local;
00503     Buffer+= length_local;
00504     pos_in_file_local+= length_local;
00505     left_length+= length_local;
00506     diff_length=0;
00507   }
00508 
00509   max_length= info->read_length-diff_length;
00510   if (info->type != READ_FIFO &&
00511       max_length > (info->end_of_file - pos_in_file_local))
00512     max_length= (size_t) (info->end_of_file - pos_in_file_local);
00513   if (!max_length)
00514   {
00515     if (Count)
00516     {
00517       info->error= static_cast<int>(left_length); /* We only got this many char */
00518       return(1);
00519     }
00520      length_local=0;        /* Didn't read any chars */
00521   }
00522   else if (( length_local= my_read(info->file,info->buffer, max_length,
00523                             info->myflags)) < Count ||
00524       length_local == (size_t) -1)
00525   {
00526     if ( length_local != (size_t) -1)
00527       memcpy(Buffer, info->buffer,  length_local);
00528     info->pos_in_file= pos_in_file_local;
00529     info->error=  length_local == (size_t) -1 ? -1 : (int) ( length_local+left_length);
00530     info->read_pos=info->read_end=info->buffer;
00531     return(1);
00532   }
00533   info->read_pos=info->buffer+Count;
00534   info->read_end=info->buffer+ length_local;
00535   info->pos_in_file=pos_in_file_local;
00536   memcpy(Buffer, info->buffer, Count);
00537   return(0);
00538 }
00539 
00540 
00541 #ifdef HAVE_AIOWAIT
00542 
00555 int _my_b_async_read(st_io_cache *info, unsigned char *Buffer, size_t Count)
00556 {
00557   size_t length_local,read_length,diff_length,left_length,use_length,org_Count;
00558   size_t max_length;
00559   my_off_t next_pos_in_file;
00560   unsigned char *read_buffer;
00561 
00562   memcpy(Buffer,info->read_pos,
00563    (left_length= (size_t) (info->read_end-info->read_pos)));
00564   Buffer+=left_length;
00565   org_Count=Count;
00566   Count-=left_length;
00567 
00568   if (info->inited)
00569   {           /* wait for read block */
00570     info->inited=0;       /* No more block to read */
00571     my_aiowait(&info->aio_result);    /* Wait for outstanding req */
00572     if (info->aio_result.result.aio_errno)
00573     {
00574       if (info->myflags & MY_WME)
00575   my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
00576      my_filename(info->file),
00577      info->aio_result.result.aio_errno);
00578       errno=info->aio_result.result.aio_errno;
00579       info->error= -1;
00580       return(1);
00581     }
00582     if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
00583   read_length == (size_t) -1)
00584     {
00585       errno=0;        /* For testing */
00586       info->error= (read_length == (size_t) -1 ? -1 :
00587         (int) (read_length+left_length));
00588       return(1);
00589     }
00590     info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
00591 
00592     if (info->request_pos != info->buffer)
00593       info->request_pos=info->buffer;
00594     else
00595       info->request_pos=info->buffer+info->read_length;
00596     info->read_pos=info->request_pos;
00597     next_pos_in_file=info->aio_read_pos+read_length;
00598 
00599   /* Check if pos_in_file is changed
00600      (_ni_read_cache may have skipped some bytes) */
00601 
00602     if (info->aio_read_pos < info->pos_in_file)
00603     {           /* Fix if skipped bytes */
00604       if (info->aio_read_pos + read_length < info->pos_in_file)
00605       {
00606   read_length=0;        /* Skip block */
00607   next_pos_in_file=info->pos_in_file;
00608       }
00609       else
00610       {
00611   my_off_t offset= (info->pos_in_file - info->aio_read_pos);
00612   info->pos_in_file=info->aio_read_pos; /* Whe are here */
00613   info->read_pos=info->request_pos+offset;
00614   read_length-=offset;      /* Bytes left from read_pos */
00615       }
00616     }
00617   /* Copy found bytes to buffer */
00618     length_local=min(Count,read_length);
00619     memcpy(Buffer,info->read_pos,(size_t) length_local);
00620     Buffer+=length_local;
00621     Count-=length_local;
00622     left_length+=length_local;
00623     info->read_end=info->rc_pos+read_length;
00624     info->read_pos+=length_local;
00625   }
00626   else
00627     next_pos_in_file=(info->pos_in_file+ (size_t)
00628           (info->read_end - info->request_pos));
00629 
00630   /* If reading large blocks, or first read or read with skip */
00631   if (Count)
00632   {
00633     if (next_pos_in_file == info->end_of_file)
00634     {
00635       info->error=(int) (read_length+left_length);
00636       return 1;
00637     }
00638 
00639     if (lseek(info->file,next_pos_in_file,SEEK_SET) == MY_FILEPOS_ERROR)
00640     {
00641       info->error= -1;
00642       return (1);
00643     }
00644 
00645     read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
00646     if (Count < read_length)
00647     {         /* Small block, read to cache */
00648       if ((read_length=my_read(info->file,info->request_pos,
00649              read_length, info->myflags)) == (size_t) -1)
00650         return info->error= -1;
00651       use_length=min(Count,read_length);
00652       memcpy(Buffer,info->request_pos,(size_t) use_length);
00653       info->read_pos=info->request_pos+Count;
00654       info->read_end=info->request_pos+read_length;
00655       info->pos_in_file=next_pos_in_file; /* Start of block in cache */
00656       next_pos_in_file+=read_length;
00657 
00658       if (Count != use_length)
00659       {         /* Didn't find hole block */
00660   if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
00661     my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
00662        my_filename(info->file),errno);
00663   info->error=(int) (read_length+left_length);
00664   return 1;
00665       }
00666     }
00667     else
00668     {           /* Big block, don't cache it */
00669       if ((read_length= my_read(info->file,Buffer, Count,info->myflags))
00670     != Count)
00671       {
00672   info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
00673   return 1;
00674       }
00675       info->read_pos=info->read_end=info->request_pos;
00676       info->pos_in_file=(next_pos_in_file+=Count);
00677     }
00678   }
00679 
00680   /* Read next block with asyncronic io */
00681   diff_length=(next_pos_in_file & (IO_SIZE-1));
00682   max_length= info->read_length - diff_length;
00683   if (max_length > info->end_of_file - next_pos_in_file)
00684     max_length= (size_t) (info->end_of_file - next_pos_in_file);
00685 
00686   if (info->request_pos != info->buffer)
00687     read_buffer=info->buffer;
00688   else
00689     read_buffer=info->buffer+info->read_length;
00690   info->aio_read_pos=next_pos_in_file;
00691   if (max_length)
00692   {
00693     info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
00694     if (aioread(info->file,read_buffer, max_length,
00695     (my_off_t) next_pos_in_file,SEEK_SET,
00696     &info->aio_result.result))
00697     {           /* Skip async io */
00698       errno=errno;
00699       if (info->request_pos != info->buffer)
00700       {
00701         memmove(info->buffer, info->request_pos,
00702                 (size_t) (info->read_end - info->read_pos));
00703   info->request_pos=info->buffer;
00704   info->read_pos-=info->read_length;
00705   info->read_end-=info->read_length;
00706       }
00707       info->read_length=info->buffer_length;  /* Use hole buffer */
00708       info->read_function=_my_b_read;   /* Use normal IO_READ next */
00709     }
00710     else
00711       info->inited=info->aio_result.pending=1;
00712   }
00713   return 0;         /* Block read, async in use */
00714 } /* _my_b_async_read */
00715 #endif
00716 
00717 
00722 int _my_b_get(st_io_cache *info)
00723 {
00724   unsigned char buff;
00725   IO_CACHE_CALLBACK pre_read,post_read;
00726   if ((pre_read = info->pre_read))
00727     (*pre_read)(info);
00728   if ((*(info)->read_function)(info,&buff,1))
00729     return my_b_EOF;
00730   if ((post_read = info->post_read))
00731     (*post_read)(info);
00732   return (int) (unsigned char) buff;
00733 }
00734 
00743 int _my_b_write(st_io_cache *info, const unsigned char *Buffer, size_t Count)
00744 {
00745   size_t rest_length,length_local;
00746 
00747   if (info->pos_in_file+info->buffer_length > info->end_of_file)
00748   {
00749     errno=EFBIG;
00750     return info->error = -1;
00751   }
00752 
00753   rest_length= (size_t) (info->write_end - info->write_pos);
00754   memcpy(info->write_pos,Buffer,(size_t) rest_length);
00755   Buffer+=rest_length;
00756   Count-=rest_length;
00757   info->write_pos+=rest_length;
00758 
00759   if (my_b_flush_io_cache(info,1))
00760     return 1;
00761   if (Count >= IO_SIZE)
00762   {         /* Fill first intern buffer */
00763     length_local=Count & (size_t) ~(IO_SIZE-1);
00764     if (info->seek_not_done)
00765     {
00766       /*
00767         Whenever a function which operates on st_io_cache flushes/writes
00768         some part of the st_io_cache to disk it will set the property
00769         "seek_not_done" to indicate this to other functions operating
00770         on the st_io_cache.
00771       */
00772       if (lseek(info->file,info->pos_in_file,SEEK_SET))
00773       {
00774         info->error= -1;
00775         return (1);
00776       }
00777       info->seek_not_done=0;
00778     }
00779     if (my_write(info->file, Buffer, length_local, info->myflags | MY_NABP))
00780       return info->error= -1;
00781 
00782     Count-=length_local;
00783     Buffer+=length_local;
00784     info->pos_in_file+=length_local;
00785   }
00786   memcpy(info->write_pos,Buffer,(size_t) Count);
00787   info->write_pos+=Count;
00788   return 0;
00789 }
00790 
00797 int my_block_write(st_io_cache *info, const unsigned char *Buffer, size_t Count,
00798        my_off_t pos)
00799 {
00800   size_t length_local;
00801   int error=0;
00802 
00803   if (pos < info->pos_in_file)
00804   {
00805     /* Of no overlap, write everything without buffering */
00806     if (pos + Count <= info->pos_in_file)
00807       return (pwrite(info->file, Buffer, Count, pos) == 0);
00808     /* Write the part of the block that is before buffer */
00809     length_local= (uint32_t) (info->pos_in_file - pos);
00810     if (pwrite(info->file, Buffer, length_local, pos) == 0)
00811       info->error= error= -1;
00812     Buffer+=length_local;
00813     pos+=  length_local;
00814     Count-= length_local;
00815   }
00816 
00817   /* Check if we want to write inside the used part of the buffer.*/
00818   length_local= (size_t) (info->write_end - info->buffer);
00819   if (pos < info->pos_in_file + length_local)
00820   {
00821     size_t offset= (size_t) (pos - info->pos_in_file);
00822     length_local-=offset;
00823     if (length_local > Count)
00824       length_local=Count;
00825     memcpy(info->buffer+offset, Buffer, length_local);
00826     Buffer+=length_local;
00827     Count-= length_local;
00828     /* Fix length_local of buffer if the new data was larger */
00829     if (info->buffer+length_local > info->write_pos)
00830       info->write_pos=info->buffer+length_local;
00831     if (!Count)
00832       return (error);
00833   }
00834   /* Write at the end of the current buffer; This is the normal case */
00835   if (_my_b_write(info, Buffer, Count))
00836     error= -1;
00837   return error;
00838 }
00839 
00844 int my_b_flush_io_cache(st_io_cache *info, int need_append_buffer_lock)
00845 {
00846   size_t length_local;
00847   bool append_cache= false;
00848   my_off_t pos_in_file_local;
00849 
00850   if (info->type == WRITE_CACHE || append_cache)
00851   {
00852     if (info->file == -1)
00853     {
00854       if (info->real_open_cached_file())
00855   return((info->error= -1));
00856     }
00857     lock_append_buffer(info, need_append_buffer_lock);
00858 
00859     if ((length_local=(size_t) (info->write_pos - info->write_buffer)))
00860     {
00861       pos_in_file_local=info->pos_in_file;
00862       /*
00863   If we have append cache, we always open the file with
00864   O_APPEND which moves the pos to EOF automatically on every write
00865       */
00866       if (!append_cache && info->seek_not_done)
00867       {         /* File touched, do seek */
00868   if (lseek(info->file,pos_in_file_local,SEEK_SET) == MY_FILEPOS_ERROR)
00869   {
00870     unlock_append_buffer(info, need_append_buffer_lock);
00871     return((info->error= -1));
00872   }
00873   if (!append_cache)
00874     info->seek_not_done=0;
00875       }
00876       if (!append_cache)
00877   info->pos_in_file+=length_local;
00878       info->write_end= (info->write_buffer+info->buffer_length-
00879       ((pos_in_file_local+length_local) & (IO_SIZE-1)));
00880 
00881       if (my_write(info->file,info->write_buffer,length_local,
00882        info->myflags | MY_NABP))
00883   info->error= -1;
00884       else
00885   info->error= 0;
00886       if (!append_cache)
00887       {
00888         set_if_bigger(info->end_of_file,(pos_in_file_local+length_local));
00889       }
00890       else
00891       {
00892   info->end_of_file+=(info->write_pos-info->append_read_pos);
00893   my_off_t tell_ret= lseek(info->file, 0, SEEK_CUR);
00894   assert(info->end_of_file == tell_ret);
00895       }
00896 
00897       info->append_read_pos=info->write_pos=info->write_buffer;
00898       unlock_append_buffer(info, need_append_buffer_lock);
00899       return(info->error);
00900     }
00901   }
00902 #ifdef HAVE_AIOWAIT
00903   else if (info->type != READ_NET)
00904   {
00905     my_aiowait(&info->aio_result);    /* Wait for outstanding req */
00906     info->inited=0;
00907   }
00908 #endif
00909   unlock_append_buffer(info, need_append_buffer_lock);
00910   return(0);
00911 }
00912 
00927 int st_io_cache::end_io_cache()
00928 {
00929   int _error=0;
00930 
00931   if (pre_close)
00932   {
00933     (*pre_close)(this);
00934     pre_close= 0;
00935   }
00936   if (alloced_buffer)
00937   {
00938     if (type == READ_CACHE)
00939       global_read_buffer.sub(buffer_length);
00940     alloced_buffer=0;
00941     if (file != -1)     /* File doesn't exist */
00942       _error= my_b_flush_io_cache(this, 1);
00943     free((unsigned char*) buffer);
00944     buffer=read_pos=(unsigned char*) 0;
00945   }
00946 
00947   return _error;
00948 } /* end_io_cache */
00949 
00950 } /* namespace internal */
00951 } /* namespace drizzled */