Drizzled Public API Documentation

CSStream.cc

00001 /* Copyright (C) 2008 PrimeBase Technologies GmbH, Germany
00002  *
00003  * PrimeBase Media Stream for MySQL
00004  *
00005  * This program is free software; you can redistribute it and/or modify
00006  * it under the terms of the GNU General Public License as published by
00007  * the Free Software Foundation; either version 2 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
00018  *
00019  * Original author: Paul McCullagh (H&G2JCtL)
00020  * Continued development: Barry Leslie
00021  *
00022  * 2007-06-07
00023  *
00024  * CORE SYSTEM:
00025  * Basic input and output streams.
00026  *
00027  * These objects wrap the system streams, and simplify things.
00028  * I also want to standardize exceptions and implement
00029  * socket based streams.
00030  *
00031  */
00032 
00033 #include "CSConfig.h"
00034 
00035 #include <string.h>
00036 #include <inttypes.h>
00037 
00038 #include "CSMemory.h"
00039 #include "CSStream.h"
00040 #include "CSGlobal.h"
00041 
00042 /*
00043  * ---------------------------------------------------------------
00044  * STREAM UTILITIES
00045  */
00046 
00047 void CSStream::pipe(CSOutputStream *out, CSInputStream *in)
00048 {
00049   char  *buffer;
00050   size_t  size;
00051 
00052   enter_();
00053   push_(out);
00054   push_(in);
00055   
00056   buffer = (char *) cs_malloc(DEFAULT_BUFFER_SIZE);
00057   push_ptr_(buffer);
00058   
00059   for (;;) {
00060     size = in->read(buffer, DEFAULT_BUFFER_SIZE);
00061     self->interrupted();
00062     if (!size)
00063       break;
00064     out->write(buffer, size);
00065     self->interrupted();
00066   }
00067   in->close();
00068   out->close();
00069   
00070   release_(buffer);
00071   release_(in);
00072   release_(out);
00073   exit_();
00074 }
00075 
00076 /*
00077  * ---------------------------------------------------------------
00078  * INPUT STREAMS
00079  */
00080 
00081 CSStringBuffer *CSInputStream::readLine()
00082 {
00083   int       ch;
00084   CSStringBuffer  *sb = NULL;
00085 
00086   enter_();
00087   
00088   ch = read();
00089   if (ch != -1) {
00090     new_(sb, CSStringBuffer(20));
00091     push_(sb);
00092     
00093     while (ch != '\n' && ch != '\r' && ch != -1) {
00094       sb->append((char) ch);
00095       ch = read();
00096     }
00097     if (ch == '\r') {
00098       if (peek() == '\n')
00099         ch = read();
00100     }
00101 
00102     pop_(sb);
00103   }
00104 
00105   return_(sb);
00106 }
00107 
00108 /*
00109  * ---------------------------------------------------------------
00110  * OUTPUT STREAMS
00111  */
00112 
00113 void CSOutputStream::printLine(const char *cstr)
00114 {
00115   enter_();
00116   print(cstr);
00117   print(getEOL());
00118   flush();
00119   exit_();
00120 }
00121 
00122 void CSOutputStream::print(const char *cstr)
00123 {
00124   enter_();
00125   write(cstr, strlen(cstr));
00126   exit_();
00127 }
00128 
00129 void CSOutputStream::print(CSString *s)
00130 {
00131   enter_();
00132   print(s->getCString());
00133   exit_();
00134 }
00135 
00136 void CSOutputStream::print(int value)
00137 {
00138   char buffer[20];
00139 
00140   snprintf(buffer, 20, "%d", value);
00141   print(buffer);
00142 }
00143 
00144 void CSOutputStream::print(uint64_t value)
00145 {
00146   char buffer[30];
00147 
00148   snprintf(buffer, 30, "%"PRIu64"", value);
00149   print(buffer);
00150 }
00151 
00152 /*
00153  * ---------------------------------------------------------------
00154  * FILE INPUT STREAMS
00155  */
00156 
00157 CSFileInputStream::~CSFileInputStream()
00158 {
00159   if (iFile)
00160     iFile->release();
00161 }
00162 
00163 size_t CSFileInputStream::read(char *b, size_t len)
00164 {
00165   size_t size;
00166 
00167   enter_();
00168   size = iFile->read(b, iReadOffset, len, 0);
00169   iReadOffset += size;
00170   return_(size);
00171 }
00172 
00173 int CSFileInputStream::read()
00174 {
00175   size_t  size;
00176   char  ch;
00177 
00178   enter_();
00179   size = iFile->read(&ch, iReadOffset, 1, 0);
00180   iReadOffset += size;
00181   return_(size == 0 ? -1 : (int) ch);
00182 }
00183 
00184 void CSFileInputStream::reset()
00185 {
00186   iReadOffset = 0;
00187 }
00188 
00189 const char *CSFileInputStream::identify()
00190 {
00191   return iFile->myFilePath->getCString();
00192 }
00193 
00194 int CSFileInputStream::peek()
00195 {
00196   size_t  size;
00197   char  ch;
00198 
00199   enter_();
00200   size = iFile->read(&ch, iReadOffset, 1, 0);
00201   return_(size == 0 ? -1 : (int) ch);
00202 }
00203 
00204 void CSFileInputStream::close()
00205 {
00206   enter_();
00207   iFile->close();
00208   exit_();
00209 }
00210 
00211 CSFileInputStream *CSFileInputStream::newStream(CSFile *f)
00212 {
00213   CSFileInputStream *s;
00214 
00215   if (!(s = new CSFileInputStream())) {
00216     f->release();
00217     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00218   }
00219   s->iFile = f;
00220   return s;
00221 }
00222 
00223 CSFileInputStream *CSFileInputStream::newStream(CSFile *f, off64_t offset)
00224 {
00225   CSFileInputStream *s;
00226 
00227   if (!(s = new CSFileInputStream())) {
00228     f->release();
00229     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00230   }
00231   s->iFile = f;
00232   s->iReadOffset = offset;
00233   return s;
00234 }
00235 
00236 /*
00237  * ---------------------------------------------------------------
00238  * FILE OUTPUT STREAMS
00239  */
00240 
00241 CSFileOutputStream::~CSFileOutputStream()
00242 {
00243   if (iFile)
00244     iFile->release();
00245 }
00246 
00247 void CSFileOutputStream::write(const char *b, size_t len)
00248 {
00249   enter_();
00250   iFile->write(b, iWriteOffset, len);
00251   iWriteOffset += len;
00252   exit_();
00253 }
00254 
00255 const char *CSFileOutputStream::getEOL()
00256 {
00257   enter_();
00258   return_(iFile->getEOL());
00259 }
00260 
00261 void CSFileOutputStream::flush()
00262 {
00263   enter_();
00264   iFile->flush();
00265   exit_();
00266 }
00267 
00268 void CSFileOutputStream::write(char b)
00269 {
00270   enter_();
00271   iFile->write(&b, iWriteOffset, 1);
00272   iWriteOffset += 1;
00273   exit_();
00274 }
00275 
00276 void CSFileOutputStream::reset()
00277 {
00278   iWriteOffset = 0;
00279 }
00280 
00281 const char *CSFileOutputStream::identify()
00282 {
00283   return iFile->myFilePath->getCString();
00284 }
00285 
00286 void CSFileOutputStream::close()
00287 {
00288   enter_();
00289   iFile->close();
00290   exit_();
00291 }
00292 
00293 CSFileOutputStream *CSFileOutputStream::newStream(CSFile *f)
00294 {
00295   CSFileOutputStream *s;
00296 
00297   if (!(s = new CSFileOutputStream())) {
00298     f->release();
00299     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00300   }
00301   s->iFile = f;
00302   return  s;
00303 }
00304 
00305 CSFileOutputStream *CSFileOutputStream::newStream(CSFile *f, off64_t offset)
00306 {
00307   CSFileOutputStream *s;
00308 
00309   if (!(s = new CSFileOutputStream())) {
00310     f->release();
00311     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00312   }
00313   s->iFile = f;
00314   s->iWriteOffset = offset;
00315   return  s;
00316 }
00317 
00318 /*
00319  * ---------------------------------------------------------------
00320  * SOCKET INPUT STREAMS
00321  */
00322 
00323 CSSocketInputStream::~CSSocketInputStream()
00324 {
00325   if (iSocket)
00326     iSocket->release();
00327 }
00328 
00329 void CSSocketInputStream::close()
00330 {
00331   enter_();
00332   iSocket->close();
00333   exit_();
00334 }
00335 
00336 size_t CSSocketInputStream::read(char *b, size_t len)
00337 {
00338   enter_();
00339   return_(iSocket->read(b, len));
00340 }
00341 
00342 int CSSocketInputStream::read()
00343 {
00344   enter_();
00345   return_(iSocket->read());
00346 }
00347 
00348 int CSSocketInputStream::peek()
00349 {
00350   enter_();
00351   return_(iSocket->peek());
00352 }
00353 
00354 void CSSocketInputStream::reset()
00355 {
00356   enter_();
00357   CSException::throwException(CS_CONTEXT, CS_ERR_OPERATION_NOT_SUPPORTED, "CSSocketInputStream::reset() not supported");
00358   exit_();
00359 }
00360 
00361 const char *CSSocketInputStream::identify()
00362 {
00363   return iSocket->identify();
00364 }
00365 
00366 CSSocketInputStream *CSSocketInputStream::newStream(CSSocket *s)
00367 {
00368   CSSocketInputStream *str;
00369 
00370   if (!(str = new CSSocketInputStream())) {
00371     s->release();
00372     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00373   }
00374   str->iSocket = s;
00375   return  str;
00376 }
00377 
00378 /*
00379  * ---------------------------------------------------------------
00380  * SOCKET OUTPUT STREAMS
00381  */
00382 
00383 CSSocketOutputStream::~CSSocketOutputStream()
00384 {
00385   if (iSocket)
00386     iSocket->release();
00387 }
00388 
00389 void CSSocketOutputStream::close()
00390 {
00391   enter_();
00392   iSocket->close();
00393   exit_();
00394 }
00395 
00396 void CSSocketOutputStream::write(const char *b, size_t len)
00397 {
00398   enter_();
00399   iSocket->write(b, len);
00400   exit_();
00401 }
00402 
00403 void CSSocketOutputStream::flush()
00404 {
00405   enter_();
00406   iSocket->flush();
00407   exit_();
00408 }
00409 
00410 void CSSocketOutputStream::write(char b)
00411 {
00412   enter_();
00413   iSocket->write(b);
00414   exit_();
00415 }
00416 
00417 void CSSocketOutputStream::reset()
00418 {
00419   enter_();
00420   CSException::throwException(CS_CONTEXT, CS_ERR_OPERATION_NOT_SUPPORTED, "CSSocketOutputStream::reset() not supported");
00421   exit_();
00422 }
00423 
00424 const char *CSSocketOutputStream::identify()
00425 {
00426   return iSocket->identify();
00427 }
00428 
00429 CSSocketOutputStream *CSSocketOutputStream::newStream(CSSocket *s)
00430 {
00431   CSSocketOutputStream *str;
00432 
00433   if (!(str = new CSSocketOutputStream())) {
00434     s->release();
00435     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00436   }
00437   str->iSocket = s;
00438   return  str;
00439 }
00440 
00441 /*
00442  * ---------------------------------------------------------------
00443  * BUFFERED INPUT STREAMS
00444  */
00445 
00446 CSBufferedInputStream::~CSBufferedInputStream()
00447 {
00448   if (iStream)
00449     iStream->release();
00450 }
00451 
00452 void CSBufferedInputStream::close()
00453 {
00454   enter_();
00455   iStream->close();
00456   exit_();
00457 }
00458 
00459 size_t CSBufferedInputStream::read(char *b, size_t len)
00460 {
00461   size_t tfer;
00462 
00463   enter_();
00464   if (iBuffPos < iBuffTotal) {
00465     tfer = iBuffTotal - iBuffPos;
00466     if (tfer > len)
00467       tfer = len;
00468     memcpy(b, iBuffer + iBuffPos, tfer);
00469     iBuffPos += tfer;
00470   }
00471   else
00472     tfer = iStream->read(b, len);
00473   return_(tfer);
00474 }
00475 
00476 int CSBufferedInputStream::read()
00477 {
00478   int ch;
00479   
00480   enter_();
00481   if (iBuffPos == iBuffTotal) {
00482     iBuffTotal = iStream->read((char *) iBuffer, CS_STREAM_BUFFER_SIZE);
00483     iBuffPos = 0;
00484   }
00485   if (iBuffPos < iBuffTotal) {
00486     ch = iBuffer[iBuffPos];
00487     iBuffPos++;
00488   }
00489   else
00490     ch = -1;
00491   return_(ch);
00492 }
00493 
00494 int CSBufferedInputStream::peek()
00495 {
00496   int ch;
00497   
00498   enter_();
00499   if (iBuffPos == iBuffTotal) {
00500     iBuffTotal = iStream->read((char *) iBuffer, CS_STREAM_BUFFER_SIZE);
00501     iBuffPos = 0;
00502   }
00503   if (iBuffPos < iBuffTotal)
00504     ch = iBuffer[iBuffPos];
00505   else
00506     ch = -1;
00507   return_(ch);
00508 }
00509 
00510 void CSBufferedInputStream::reset()
00511 {
00512   iBuffPos = iBuffTotal =0;
00513   iStream->reset();
00514 }
00515 
00516 const char *CSBufferedInputStream::identify()
00517 {
00518   return iStream->identify();
00519 }
00520 
00521 CSBufferedInputStream *CSBufferedInputStream::newStream(CSInputStream* i)
00522 {
00523   CSBufferedInputStream *s;
00524 
00525   if (!(s = new CSBufferedInputStream())) {
00526     i->release();
00527     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00528   }
00529   s->iStream = i;
00530   return  s;
00531 }
00532 
00533 /*
00534  * ---------------------------------------------------------------
00535  * BUFFERED OUTPUT STREAMS
00536  */
00537 
00538 CSBufferedOutputStream::~CSBufferedOutputStream()
00539 {
00540   if (iStream)
00541     iStream->release();
00542 }
00543 
00544 void CSBufferedOutputStream::close()
00545 {
00546   enter_();
00547   iStream->close();
00548   exit_();
00549 }
00550 
00551 void CSBufferedOutputStream::write(const char *b, size_t len)
00552 {
00553   size_t tfer;
00554 
00555   // If the length of the data being written is greater than half
00556   // the buffer size then the data is written directly through
00557   // with out buffering.
00558   enter_();
00559   if (iBuffTotal < CS_STREAM_BUFFER_SIZE/2) {
00560     tfer = CS_STREAM_BUFFER_SIZE - iBuffTotal;
00561     
00562     if (tfer > len)
00563       tfer = len;
00564     memcpy(iBuffer + iBuffTotal, b, tfer);
00565     iBuffTotal += tfer;
00566     b += tfer;
00567     len -= tfer;
00568   }
00569   if (len > 0) {
00570     flush();
00571     if (len > CS_STREAM_BUFFER_SIZE/2)
00572       iStream->write(b, len);
00573     else {
00574       memcpy(iBuffer, b, len);
00575       iBuffTotal = len;
00576     }
00577   }
00578   exit_();
00579 }
00580 
00581 void CSBufferedOutputStream::flush()
00582 {
00583   size_t len;
00584 
00585   enter_();
00586   if ((len = iBuffTotal)) {
00587     /* Discard the contents of the buffer
00588      * if flush fails, because we do
00589      * not know how much was written anyway!
00590      */
00591     iBuffTotal = 0;
00592     iStream->write((char *) iBuffer, len);
00593   }
00594   exit_();
00595 }
00596 
00597 void CSBufferedOutputStream::write(char b)
00598 {
00599   enter_();
00600   if (iBuffTotal == CS_STREAM_BUFFER_SIZE)
00601     flush();
00602   iBuffer[iBuffTotal] = b;
00603   iBuffTotal++;
00604   exit_();
00605 }
00606 
00607 void CSBufferedOutputStream::reset()
00608 {
00609   iBuffTotal = 0;
00610   iStream->reset();
00611 }
00612 
00613 const char *CSBufferedOutputStream::identify()
00614 {
00615   return iStream->identify();
00616 }
00617 
00618 CSBufferedOutputStream *CSBufferedOutputStream::newStream(CSOutputStream* i)
00619 {
00620   CSBufferedOutputStream *s;
00621 
00622   if (!(s = new CSBufferedOutputStream())) {
00623     i->release();
00624     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00625   }
00626   s->iStream = i;
00627   return s;
00628 }
00629 
00630 /*
00631  * ---------------------------------------------------------------
00632  * MEMORY INPUT STREAMS
00633  */
00634 CSMemoryInputStream *CSMemoryInputStream::newStream(const u_char* buffer, uint32_t length)
00635 {
00636   CSMemoryInputStream *s;
00637 
00638   if (!(s = new CSMemoryInputStream())) {
00639     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00640   }
00641   s->iMemory = buffer;
00642   s->iMemTotal = length;
00643   return s;
00644 }
00645 
00646 
00647 CSMemoryOutputStream *CSMemoryOutputStream::newStream(size_t init_length, size_t min_alloc)
00648 {
00649   CSMemoryOutputStream *s;
00650 
00651   if (!(s = new CSMemoryOutputStream())) {
00652     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00653   }
00654   
00655   s->iMemory = (u_char *) cs_malloc(init_length);
00656   s->iMemTotal = init_length;
00657   s->iMemSpace = init_length;
00658   s->iMemPos = s->iMemory;
00659   s->iMemMin = min_alloc;
00660   return s;
00661 }
00662 
00663 CSMemoryOutputStream::~CSMemoryOutputStream()
00664 {
00665   if (iMemory)
00666     cs_free(iMemory);
00667 }
00668 
00669 void CSMemoryOutputStream::write(const char *b, size_t len)
00670 {
00671   if (iMemSpace < len) {
00672     size_t new_size = iMemTotal + ((len < iMemMin)? iMemMin:len);
00673     
00674     cs_realloc((void**) &iMemory, new_size);
00675     iMemPos = iMemory + (iMemTotal - iMemSpace);
00676     iMemSpace += (new_size - iMemTotal);
00677     iMemTotal = new_size;   
00678   }
00679   memcpy(iMemPos, b, len);
00680   iMemPos +=len;
00681   iMemSpace -= len;
00682 }
00683 
00684 void CSMemoryOutputStream::write(const char b)
00685 {
00686   if (!iMemSpace) {
00687     cs_realloc((void**) &iMemory, iMemTotal + iMemMin);
00688     iMemPos = iMemory + iMemTotal;
00689     iMemSpace += iMemMin;
00690     iMemTotal += iMemMin;   
00691   }
00692   *iMemPos = b;
00693   iMemPos++;
00694   iMemSpace--;
00695 }
00696 
00697 void CSMemoryOutputStream::reset()
00698 {
00699   iMemPos = iMemory;
00700   iMemSpace = iMemTotal;
00701 }
00702 
00703 const char *CSMemoryOutputStream::identify()
00704 {
00705   return "memory stream";
00706 }
00707 
00708 /*
00709  * ---------------------------------------------------------------
00710  * STATIC (user) MEMORY OUTPUT STREAM
00711  */
00712 void CSStaticMemoryOutputStream::write(const char *b, size_t len)
00713 {
00714   if (iMemSpace < len) {
00715     enter_();
00716     CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "CSStaticMemoryOutputStream: overflow");
00717     exit_();
00718   }
00719   memcpy(iMemPos, b, len);
00720   iMemPos +=len;
00721   iMemSpace -= len;
00722 }
00723 
00724 void CSStaticMemoryOutputStream::write(const char b)
00725 {
00726   if (!iMemSpace) {
00727     enter_();
00728     CSException::throwException(CS_CONTEXT, CS_ERR_GENERIC_ERROR, "CSStaticMemoryOutputStream: overflow");
00729     exit_();
00730   }
00731   *iMemPos = b;
00732   iMemPos++;
00733   iMemSpace--;
00734 }
00735 
00736 /*
00737  * ---------------------------------------------------------------
00738  * Callback InPUT STREAM
00739  */
00740 
00741 CSCallbackInputStream *CSCallbackInputStream::newStream(CSStreamReadCallbackFunc in_callback, void *user_data)
00742 {
00743   CSCallbackInputStream *s;
00744 
00745   if (!(s = new CSCallbackInputStream())) {
00746     CSException::throwOSError(CS_CONTEXT, ENOMEM);
00747   }
00748   
00749   s->callback = in_callback;
00750   s->cb_data = user_data;
00751   return s;
00752 }
00753