00001
00002
00003
00004
00005
00006
00007 #ifndef __WVSTREAM_H
00008 #define __WVSTREAM_H
00009
00010 #include "iwvstream.h"
00011 #include "wvtimeutils.h"
00012 #include "wvhashtable.h"
00013 #include "wvstreamsdebugger.h"
00014 #include <errno.h>
00015 #include <limits.h>
00016
00017
00018
00019 typedef WvCallback<void, WvStream&, void*> WvStreamCallback;
00020
00028 class WvStream: public IWvStream
00029 {
00030 IMPLEMENT_IOBJECT(WvStream);
00031 public:
00036 WvStream *read_requires_writable;
00037
00042 WvStream *write_requires_readable;
00043
00045 bool uses_continue_select;
00046
00048 size_t personal_stack_size;
00049
00054 bool alarm_was_ticking;
00055
00057 bool stop_read, stop_write, closed;
00058
00060 WvStream();
00061 virtual ~WvStream();
00062
00070 virtual void close();
00071
00073 virtual void seterr(int _errnum);
00074 void seterr(WvStringParm specialerr)
00075 { WvErrorBase::seterr(specialerr); }
00076 void seterr(WVSTRING_FORMAT_DECL)
00077 { seterr(WvString(WVSTRING_FORMAT_CALL)); }
00078
00080 virtual bool isok() const;
00081
00083 virtual size_t read(void *buf, size_t count);
00084
00094 virtual size_t read(WvBuf &outbuf, size_t count);
00095
00101 virtual void unread(WvBuf &outbuf, size_t count);
00102
00109 virtual size_t write(const void *buf, size_t count);
00110
00118 virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX);
00119
00129 void outbuf_limit(size_t size)
00130 { max_outbuf_size = size; }
00131
00132 virtual void noread();
00133 virtual void nowrite();
00134 virtual void maybe_autoclose();
00135
00136 virtual bool isreadable();
00137 virtual bool iswritable();
00138
00146 virtual size_t uread(void *buf, size_t count)
00147 { return 0; }
00148
00156 virtual size_t uwrite(const void *buf, size_t count)
00157 { return count; }
00158
00175 char *getline(time_t wait_msec = 0,
00176 char separator = '\n', int readahead = 1024)
00177 {
00178 return blocking_getline(wait_msec, separator, readahead);
00179 }
00180
00182 char *getline(int wait_msec,
00183 char separator = '\n', int readahead = 1024)
00184 {
00185 return getline(time_t(wait_msec), separator, readahead);
00186 }
00187
00189 char *getline(double wait_msec,
00190 char separator = '\n', int readahead = 1024)
00191 {
00192 return getline(time_t(wait_msec), separator, readahead);
00193 }
00194
00195 private:
00200 char *getline(char, int i = 0);
00201 char *getline(bool, int i = 0);
00202 public:
00203
00215 char *blocking_getline(time_t wait_msec, int separator = '\n',
00216 int readahead = 1024);
00217
00222 char *continue_getline(time_t wait_msec, int separator = '\n',
00223 int readahead = 1024);
00224
00232 void queuemin(size_t count)
00233 { queue_min = count; }
00234
00239 void drain();
00240
00246 void delay_output(bool is_delayed)
00247 {
00248 outbuf_delayed_flush = is_delayed;
00249 want_to_flush = !is_delayed;
00250 }
00251
00258 void auto_flush(bool is_automatic)
00259 { is_auto_flush = is_automatic; }
00260
00267 virtual bool flush(time_t msec_timeout);
00268
00269 virtual bool should_flush();
00270
00277 void flush_then_close(int msec_timeout);
00278
00300 virtual void pre_select(SelectInfo &si);
00301
00306 void pre_select(SelectInfo &si, const SelectRequest &r)
00307 {
00308 SelectRequest oldwant = si.wants;
00309 si.wants = r;
00310 pre_select(si);
00311 si.wants = oldwant;
00312 }
00313
00318 void xpre_select(SelectInfo &si, const SelectRequest &r)
00319 { pre_select(si, r); }
00320
00333 virtual bool post_select(SelectInfo &si);
00334
00339 bool xpost_select(SelectInfo &si, const SelectRequest &r)
00340 { return post_select(si, r); }
00341
00346 bool post_select(SelectInfo &si, const SelectRequest &r)
00347 {
00348 SelectRequest oldwant = si.wants;
00349 si.wants = r;
00350 bool val = post_select(si);
00351 si.wants = oldwant;
00352 return val;
00353 }
00354
00376 bool select(time_t msec_timeout)
00377 { return _select(msec_timeout, false, false, false, true); }
00378
00391 void runonce(time_t msec_timeout = -1)
00392 { if (select(msec_timeout)) callback(); }
00393
00415 bool select(time_t msec_timeout,
00416 bool readable, bool writable, bool isex = false)
00417 { return _select(msec_timeout, readable, writable, isex, false); }
00418
00424 IWvStream::SelectRequest get_select_request();
00425
00434 void force_select(bool readable, bool writable, bool isexception = false);
00435
00440 void undo_force_select(bool readable, bool writable,
00441 bool isexception = false);
00442
00460 bool continue_select(time_t msec_timeout);
00461
00467 void terminate_continue_select();
00468
00473 virtual const WvAddr *src() const;
00474
00479 void setcallback(WvStreamCallback _callfunc, void *_userdata);
00480
00482 IWvStreamCallback setreadcallback(IWvStreamCallback _callback);
00483
00485 IWvStreamCallback setwritecallback(IWvStreamCallback _callback);
00486
00489 IWvStreamCallback setexceptcallback(IWvStreamCallback _callback);
00490
00492 IWvStreamCallback setclosecallback(IWvStreamCallback _callback);
00493
00499 void autoforward(WvStream &s);
00500
00502 void noautoforward();
00503 static void autoforward_callback(WvStream &s, void *userdata);
00504
00508 void *_callwrap(void *);
00509
00513 void _callback();
00514
00519 virtual void callback();
00520
00525 void alarm(time_t msec_timeout);
00526
00532 time_t alarm_remaining();
00533
00538 size_t write(WvStringParm s)
00539 { return write(s.cstr(), s.len()); }
00540 size_t print(WvStringParm s)
00541 { return write(s); }
00542 size_t operator() (WvStringParm s)
00543 { return write(s); }
00544
00546 size_t print(WVSTRING_FORMAT_DECL)
00547 { return write(WvString(WVSTRING_FORMAT_CALL)); }
00548 size_t operator() (WVSTRING_FORMAT_DECL)
00549 { return write(WvString(WVSTRING_FORMAT_CALL)); }
00550
00551 protected:
00552
00553
00554
00555
00556
00557 void _build_selectinfo(SelectInfo &si, time_t msec_timeout,
00558 bool readable, bool writable, bool isexcept,
00559 bool forceable);
00560
00561
00562
00563
00564 int _do_select(SelectInfo &si);
00565
00566
00567
00568 bool _process_selectinfo(SelectInfo &si, bool forceable);
00569
00570
00571
00572
00573
00574 bool flush_outbuf(time_t msec_timeout);
00575
00576
00577
00578 virtual bool flush_internal(time_t msec_timeout);
00579
00580
00581
00582
00583 virtual int getrfd() const;
00584 virtual int getwfd() const;
00585
00586 private:
00588 bool _select(time_t msec_timeout,
00589 bool readable, bool writable, bool isexcept,
00590 bool forceable);
00591
00592 void legacy_callback(IWvStream& s);
00593
00594 protected:
00595
00596
00597 friend class WvHTTPClientProxyStream;
00598
00599 WvDynBuf inbuf, outbuf;
00600
00601 WvStreamCallback callfunc;
00602 void *userdata;
00603 WvCallback<void*,void*> call_ctx;
00604
00605 IWvStreamCallback readcb, writecb, exceptcb, closecb;
00606
00607 size_t max_outbuf_size;
00608 bool outbuf_delayed_flush;
00609 bool is_auto_flush;
00610
00611
00612 bool want_to_flush;
00613
00614
00615 bool is_flushing;
00616
00617 size_t queue_min;
00618 time_t autoclose_time;
00619 WvTime alarm_time;
00620 WvTime last_alarm_check;
00621
00632 virtual void execute()
00633 { }
00634
00635
00636 static WvStream *globalstream;
00637
00638
00639
00640
00641 #ifdef __WVSTREAM_UNIT_TEST
00642 public:
00643 size_t outbuf_used()
00644 { return outbuf.used(); }
00645 size_t inbuf_used()
00646 { return inbuf.used(); }
00647 void inbuf_putstr(WvStringParm t)
00648 { inbuf.putstr(t); }
00649 #endif
00650
00651 private:
00653 WvStream(const WvStream &s);
00654 WvStream& operator= (const WvStream &s);
00655
00656 private:
00657 WvString my_wsname;
00658 public:
00659 const char *wsname() const
00660 { return my_wsname; }
00661 void set_wsname(WvStringParm wsname)
00662 { my_wsname = wsname; }
00663 void set_wsname(WVSTRING_FORMAT_DECL)
00664 { set_wsname(WvString(WVSTRING_FORMAT_CALL)); }
00665
00666 public:
00667 const char *wstype() const { return "WvStream"; }
00668
00669 private:
00670 WSID my_wsid;
00671 public:
00672 WSID wsid() const { return my_wsid; }
00673
00674 private:
00675 static WvMap<WSID, WvStream *> *wsid_map;
00676 static WSID next_wsid_to_try;
00677 public:
00678 static IWvStream *find_by_wsid(WSID wsid);
00679
00680 private:
00681 static void add_debugger_commands();
00682 protected:
00683 static void debugger_streams_display_header(WvStringParm cmd,
00684 WvStreamsDebugger::ResultCallback result_cb);
00685 static void debugger_streams_display_one_stream(WvStream *s,
00686 WvStringParm cmd,
00687 WvStreamsDebugger::ResultCallback result_cb);
00688 static void debugger_streams_maybe_display_one_stream(WvStream *s,
00689 WvStringParm cmd,
00690 const WvStringList &args,
00691 WvStreamsDebugger::ResultCallback result_cb);
00692 private:
00693 static WvString debugger_streams_run_cb(WvStringParm cmd,
00694 WvStringList &args,
00695 WvStreamsDebugger::ResultCallback result_cb, void *);
00696 static WvString debugger_close_run_cb(WvStringParm cmd,
00697 WvStringList &args,
00698 WvStreamsDebugger::ResultCallback result_cb, void *);
00699
00700 };
00701
00708 extern WvStream *wvcon;
00709 extern WvStream *wvin;
00710 extern WvStream *wvout;
00711 extern WvStream *wverr;
00712
00713 #endif // __WVSTREAM_H