wvstream.h

00001 /* -*- Mode: C++ -*-
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * Provides basic streaming I/O support.
00006  */ 
00007 #ifndef __WVSTREAM_H
00008 #define __WVSTREAM_H
00009 
00010 #include "iwvstream.h"
00011 #include "wvtimeutils.h"
00012 #include "wvhashtable.h"
00013 #include "wvstreamsdebugger.h"
00014 #include <errno.h>
00015 #include <limits.h>
00016 
00017 #ifdef _WIN32
00018 #include <time.h>
00019 #include <winsock2.h>
00020 #include <ws2tcpip.h>
00021 #include "wvwin32-sanitize.h"
00022 #else
00023 #include <unistd.h> // not strictly necessary, but EVERYBODY uses this...
00024 #include <sys/time.h>
00025 #endif
00026 
00027 
00028 // parameters are: owning-stream, userdata
00029 typedef WvCallback<void, WvStream&, void*> WvStreamCallback;
00030 
00038 class WvStream: public IWvStream
00039 {
00040     IMPLEMENT_IOBJECT(WvStream);
00041 public:
00046     WvStream *read_requires_writable;
00047 
00052     WvStream *write_requires_readable;
00053     
00055     bool uses_continue_select;
00056 
00058     size_t personal_stack_size;
00059 
00064     bool alarm_was_ticking;
00065     
00067     bool stop_read, stop_write, closed;
00068     
00070     WvStream();
00071     virtual ~WvStream();
00072 
00080     virtual void close();
00081 
00083     virtual void seterr(int _errnum);
00084     void seterr(WvStringParm specialerr)
00085         { WvErrorBase::seterr(specialerr); }
00086     void seterr(WVSTRING_FORMAT_DECL)
00087         { seterr(WvString(WVSTRING_FORMAT_CALL)); }
00088     
00090     virtual bool isok() const;
00091     
00093     virtual size_t read(void *buf, size_t count);
00094 
00104     virtual size_t read(WvBuf &outbuf, size_t count);
00105 
00111     virtual void unread(WvBuf &outbuf, size_t count);
00112 
00119     virtual size_t write(const void *buf, size_t count);
00120 
00128     virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX);
00129 
00139     void outbuf_limit(size_t size)
00140         { max_outbuf_size = size; }
00141 
00142     virtual void noread();
00143     virtual void nowrite();
00144     virtual void maybe_autoclose();
00145     
00146     virtual bool isreadable();
00147     virtual bool iswritable();
00148     
00156     virtual size_t uread(void *buf, size_t count)
00157         { return 0; /* basic WvStream doesn't actually do anything! */ }
00158 
00166     virtual size_t uwrite(const void *buf, size_t count)
00167         { return count; /* basic WvStream doesn't actually do anything! */ }
00168 
00185     char *getline(time_t wait_msec = 0,
00186                   char separator = '\n', int readahead = 1024)
00187     {
00188         return blocking_getline(wait_msec, separator, readahead);
00189     }
00190 
00192     char *getline(int wait_msec,
00193                   char separator = '\n', int readahead = 1024)
00194     {
00195         return getline(time_t(wait_msec), separator, readahead);
00196     }
00197 
00199     char *getline(double wait_msec,
00200                   char separator = '\n', int readahead = 1024)
00201     {
00202         return getline(time_t(wait_msec), separator, readahead);
00203     }
00204 
00205 private:
00210     char *getline(char, int i = 0);
00211     char *getline(bool, int i = 0);
00212 public:
00213 
00225     char *blocking_getline(time_t wait_msec, int separator = '\n',
00226                            int readahead = 1024);
00227 
00232     char *continue_getline(time_t wait_msec, int separator = '\n',
00233                            int readahead = 1024);
00234 
00242     void queuemin(size_t count)
00243         { queue_min = count; }
00244 
00249     void drain();
00250     
00256     void delay_output(bool is_delayed)
00257     {
00258         outbuf_delayed_flush = is_delayed;
00259         want_to_flush = !is_delayed;
00260     }
00261 
00268     void auto_flush(bool is_automatic)
00269         { is_auto_flush = is_automatic; }
00270 
00277     virtual bool flush(time_t msec_timeout);
00278 
00279     virtual bool should_flush();
00280 
00287     void flush_then_close(int msec_timeout);
00288     
00310     virtual bool pre_select(SelectInfo &si);
00311     
00316     bool pre_select(SelectInfo &si, const SelectRequest &r)
00317     {
00318         SelectRequest oldwant = si.wants;
00319         si.wants = r;
00320         bool val = pre_select(si);
00321         si.wants = oldwant;
00322         return val;
00323     }
00324     
00329     bool xpre_select(SelectInfo &si, const SelectRequest &r)
00330         { return pre_select(si, r); }
00331     
00344     virtual bool post_select(SelectInfo &si);
00345 
00350     bool xpost_select(SelectInfo &si, const SelectRequest &r)
00351         { return post_select(si, r); }
00352     
00357     bool post_select(SelectInfo &si, const SelectRequest &r)
00358     {
00359         SelectRequest oldwant = si.wants;
00360         si.wants = r;
00361         bool val = post_select(si);
00362         si.wants = oldwant;
00363         return val;
00364     }
00365     
00387     bool select(time_t msec_timeout)
00388         { return _select(msec_timeout, false, false, false, true); }
00389     
00402     void runonce(time_t msec_timeout = -1)
00403         { if (select(msec_timeout)) callback(); }
00404     
00426     bool select(time_t msec_timeout,
00427                 bool readable, bool writable, bool isex = false)
00428         { return _select(msec_timeout, readable, writable, isex, false); }
00429 
00435     IWvStream::SelectRequest get_select_request();
00436 
00445     void force_select(bool readable, bool writable, bool isexception = false);
00446     
00451     void undo_force_select(bool readable, bool writable,
00452                            bool isexception = false);
00453     
00471     bool continue_select(time_t msec_timeout);
00472     
00478     void terminate_continue_select();
00479 
00484     virtual const WvAddr *src() const;
00485     
00490     void setcallback(WvStreamCallback _callfunc, void *_userdata);
00491         
00493     IWvStreamCallback setreadcallback(IWvStreamCallback _callback);
00494 
00496     IWvStreamCallback setwritecallback(IWvStreamCallback _callback);
00497 
00500     IWvStreamCallback setexceptcallback(IWvStreamCallback _callback);
00501 
00503     IWvStreamCallback setclosecallback(IWvStreamCallback _callback);
00504 
00510     void autoforward(WvStream &s);
00511 
00513     void noautoforward();
00514     static void autoforward_callback(WvStream &s, void *userdata);
00515     
00519     void *_callwrap(void *);
00520     
00524     void _callback();
00525     
00530     virtual void callback();
00531     
00536     void alarm(time_t msec_timeout);
00537 
00543     time_t alarm_remaining();
00544 
00549     size_t write(WvStringParm s)
00550         { return write(s.cstr(), s.len()); }
00551     size_t print(WvStringParm s)
00552         { return write(s); }
00553     size_t operator() (WvStringParm s)
00554         { return write(s); }
00555 
00557     size_t print(WVSTRING_FORMAT_DECL)
00558         { return write(WvString(WVSTRING_FORMAT_CALL)); }
00559     size_t operator() (WVSTRING_FORMAT_DECL)
00560         { return write(WvString(WVSTRING_FORMAT_CALL)); }
00561 
00562 protected:
00563     // builds the SelectInfo data structure (runs pre_select)
00564     // returns true if there are callbacks to be dispatched
00565     //
00566     // all of the fields are filled in with new values
00567     // si.msec_timeout contains the time until the next alarm expires
00568     bool _build_selectinfo(SelectInfo &si, time_t msec_timeout,
00569         bool readable, bool writable, bool isexcept,
00570         bool forceable);
00571 
00572     // runs the actual select() function over the given
00573     // SelectInfo data structure, returns the number of descriptors
00574     // in the set, and sets the error code if a problem occurs
00575     int _do_select(SelectInfo &si);
00576 
00577     // processes the SelectInfo data structure (runs post_select)
00578     // returns true if there are callbacks to be dispatched
00579     bool _process_selectinfo(SelectInfo &si, bool forceable);
00580 
00581     // tries to empty the output buffer if the stream is writable
00582     // not quite the same as flush() since it merely empties the output
00583     // buffer asynchronously whereas flush() might have other semantics
00584     // also handles autoclose (eg. after flush)
00585     bool flush_outbuf(time_t msec_timeout);
00586 
00587     // called once flush() has emptied outbuf to ensure that any other
00588     // internal stream buffers actually do get flushed before it returns
00589     virtual bool flush_internal(time_t msec_timeout);
00590     
00591     // the real implementations for these are actually in WvFDStream, which
00592     // is where they belong.  By IWvStream needs them to exist for now, so
00593     // it's a hack.  In standard WvStream they return -1.
00594     virtual int getrfd() const;
00595     virtual int getwfd() const;
00596     
00597 private:
00599     bool _select(time_t msec_timeout,
00600                  bool readable, bool writable, bool isexcept,
00601                  bool forceable);
00602 
00603     void legacy_callback(IWvStream& s);
00604 
00605 protected:
00606     // FIXME: this one is so bad, I'm not touching it. Quick hack to
00607     // make it work anyway.
00608     friend class WvHTTPClientProxyStream;
00609 
00610     WvDynBuf inbuf, outbuf;
00611 
00612     WvStreamCallback callfunc;
00613     void *userdata;
00614     WvCallback<void*,void*> call_ctx;
00615 
00616     IWvStreamCallback readcb, writecb, exceptcb, closecb;
00617 
00618     size_t max_outbuf_size;
00619     bool outbuf_delayed_flush;
00620     bool is_auto_flush;
00621 
00622     // Used to guard against excessive flushing when using delay_flush
00623     bool want_to_flush;
00624 
00625     // Used to ensure we don't flush recursively.
00626     bool is_flushing;
00627 
00628     size_t queue_min;           // minimum bytes to read()
00629     time_t autoclose_time;      // close eventually, even if output is queued
00630     WvTime alarm_time;          // select() returns true at this time
00631     WvTime last_alarm_check;    // last time we checked the alarm_remaining
00632     
00643     virtual void execute()
00644         { }
00645     
00646     // every call to select() selects on the globalstream.
00647     static WvStream *globalstream;
00648     
00649     // ridiculous hackery for now so that the wvstream unit test can poke
00650     // around in the insides of WvStream.  Eventually, inbuf will go away
00651     // from the base WvStream class, so nothing like this will be needed.
00652 #ifdef __WVSTREAM_UNIT_TEST
00653 public:
00654     size_t outbuf_used() 
00655         { return outbuf.used(); }
00656     size_t inbuf_used()
00657         { return inbuf.used(); }
00658     void inbuf_putstr(WvStringParm t)
00659         { inbuf.putstr(t); }
00660 #endif
00661     
00662 private:
00664     WvStream(const WvStream &s);
00665     WvStream& operator= (const WvStream &s);
00666 
00667 private:
00668     WvString my_wsname;
00669 public:
00670     const char *wsname() const
00671         { return my_wsname; }
00672     void set_wsname(WvStringParm wsname)
00673         { my_wsname = wsname; }
00674     void set_wsname(WVSTRING_FORMAT_DECL)
00675         { set_wsname(WvString(WVSTRING_FORMAT_CALL)); }
00676         
00677 public:
00678     const char *wstype() const { return "WvStream"; }
00679     
00680 private:
00681     WSID my_wsid;
00682 public:
00683     WSID wsid() const { return my_wsid; }
00684     
00685 private:
00686     static WvMap<WSID, WvStream *> *wsid_map;
00687     static WSID next_wsid_to_try;
00688 public:
00689     static IWvStream *find_by_wsid(WSID wsid);
00690     
00691 private:
00692     static void add_debugger_commands();
00693 protected:
00694     static void debugger_streams_display_header(WvStringParm cmd,
00695             WvStreamsDebugger::ResultCallback result_cb);
00696     static void debugger_streams_display_one_stream(WvStream *s,
00697             WvStringParm cmd,
00698             WvStreamsDebugger::ResultCallback result_cb);
00699     static void debugger_streams_maybe_display_one_stream(WvStream *s,
00700             WvStringParm cmd,
00701             const WvStringList &args,
00702             WvStreamsDebugger::ResultCallback result_cb);
00703 private:
00704     static WvString debugger_streams_run_cb(WvStringParm cmd,
00705         WvStringList &args,
00706         WvStreamsDebugger::ResultCallback result_cb, void *);
00707     static WvString debugger_close_run_cb(WvStringParm cmd,
00708         WvStringList &args,
00709         WvStreamsDebugger::ResultCallback result_cb, void *);
00710 
00711 };
00712 
00719 extern WvStream *wvcon; // tied stdin and stdout stream
00720 extern WvStream *wvin;  // stdin stream
00721 extern WvStream *wvout; // stdout stream
00722 extern WvStream *wverr; // stderr stream
00723 
00724 #endif // __WVSTREAM_H

Generated on Fri Oct 5 18:20:28 2007 for WvStreams by  doxygen 1.5.3