00001
00002
00003
00004
00005
00006
00007 #ifndef __WVSTREAM_H
00008 #define __WVSTREAM_H
00009
00010 #include "iwvstream.h"
00011 #include "wvtimeutils.h"
00012 #include "wvhashtable.h"
00013 #include "wvstreamsdebugger.h"
00014 #include <errno.h>
00015 #include <limits.h>
00016
00017 #ifdef _WIN32
00018 #include <time.h>
00019 #include <winsock2.h>
00020 #include <ws2tcpip.h>
00021 #include "wvwin32-sanitize.h"
00022 #else
00023 #include <unistd.h>
00024 #include <sys/time.h>
00025 #endif
00026
00027
00028
00029 typedef WvCallback<void, WvStream&, void*> WvStreamCallback;
00030
00038 class WvStream: public IWvStream
00039 {
00040 IMPLEMENT_IOBJECT(WvStream);
00041 public:
00046 WvStream *read_requires_writable;
00047
00052 WvStream *write_requires_readable;
00053
00055 bool uses_continue_select;
00056
00058 size_t personal_stack_size;
00059
00064 bool alarm_was_ticking;
00065
00067 bool stop_read, stop_write, closed;
00068
00070 WvStream();
00071 virtual ~WvStream();
00072
00080 virtual void close();
00081
00083 virtual void seterr(int _errnum);
00084 void seterr(WvStringParm specialerr)
00085 { WvErrorBase::seterr(specialerr); }
00086 void seterr(WVSTRING_FORMAT_DECL)
00087 { seterr(WvString(WVSTRING_FORMAT_CALL)); }
00088
00090 virtual bool isok() const;
00091
00093 virtual size_t read(void *buf, size_t count);
00094
00104 virtual size_t read(WvBuf &outbuf, size_t count);
00105
00111 virtual void unread(WvBuf &outbuf, size_t count);
00112
00119 virtual size_t write(const void *buf, size_t count);
00120
00128 virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX);
00129
00139 void outbuf_limit(size_t size)
00140 { max_outbuf_size = size; }
00141
00142 virtual void noread();
00143 virtual void nowrite();
00144 virtual void maybe_autoclose();
00145
00146 virtual bool isreadable();
00147 virtual bool iswritable();
00148
00156 virtual size_t uread(void *buf, size_t count)
00157 { return 0; }
00158
00166 virtual size_t uwrite(const void *buf, size_t count)
00167 { return count; }
00168
00185 char *getline(time_t wait_msec = 0,
00186 char separator = '\n', int readahead = 1024)
00187 {
00188 return blocking_getline(wait_msec, separator, readahead);
00189 }
00190
00192 char *getline(int wait_msec,
00193 char separator = '\n', int readahead = 1024)
00194 {
00195 return getline(time_t(wait_msec), separator, readahead);
00196 }
00197
00199 char *getline(double wait_msec,
00200 char separator = '\n', int readahead = 1024)
00201 {
00202 return getline(time_t(wait_msec), separator, readahead);
00203 }
00204
00205 private:
00210 char *getline(char, int i = 0);
00211 char *getline(bool, int i = 0);
00212 public:
00213
00225 char *blocking_getline(time_t wait_msec, int separator = '\n',
00226 int readahead = 1024);
00227
00232 char *continue_getline(time_t wait_msec, int separator = '\n',
00233 int readahead = 1024);
00234
00242 void queuemin(size_t count)
00243 { queue_min = count; }
00244
00249 void drain();
00250
00256 void delay_output(bool is_delayed)
00257 {
00258 outbuf_delayed_flush = is_delayed;
00259 want_to_flush = !is_delayed;
00260 }
00261
00268 void auto_flush(bool is_automatic)
00269 { is_auto_flush = is_automatic; }
00270
00277 virtual bool flush(time_t msec_timeout);
00278
00279 virtual bool should_flush();
00280
00287 void flush_then_close(int msec_timeout);
00288
00310 virtual bool pre_select(SelectInfo &si);
00311
00316 bool pre_select(SelectInfo &si, const SelectRequest &r)
00317 {
00318 SelectRequest oldwant = si.wants;
00319 si.wants = r;
00320 bool val = pre_select(si);
00321 si.wants = oldwant;
00322 return val;
00323 }
00324
00329 bool xpre_select(SelectInfo &si, const SelectRequest &r)
00330 { return pre_select(si, r); }
00331
00344 virtual bool post_select(SelectInfo &si);
00345
00350 bool xpost_select(SelectInfo &si, const SelectRequest &r)
00351 { return post_select(si, r); }
00352
00357 bool post_select(SelectInfo &si, const SelectRequest &r)
00358 {
00359 SelectRequest oldwant = si.wants;
00360 si.wants = r;
00361 bool val = post_select(si);
00362 si.wants = oldwant;
00363 return val;
00364 }
00365
00387 bool select(time_t msec_timeout)
00388 { return _select(msec_timeout, false, false, false, true); }
00389
00402 void runonce(time_t msec_timeout = -1)
00403 { if (select(msec_timeout)) callback(); }
00404
00426 bool select(time_t msec_timeout,
00427 bool readable, bool writable, bool isex = false)
00428 { return _select(msec_timeout, readable, writable, isex, false); }
00429
00435 IWvStream::SelectRequest get_select_request();
00436
00445 void force_select(bool readable, bool writable, bool isexception = false);
00446
00451 void undo_force_select(bool readable, bool writable,
00452 bool isexception = false);
00453
00471 bool continue_select(time_t msec_timeout);
00472
00478 void terminate_continue_select();
00479
00484 virtual const WvAddr *src() const;
00485
00490 void setcallback(WvStreamCallback _callfunc, void *_userdata);
00491
00493 IWvStreamCallback setreadcallback(IWvStreamCallback _callback);
00494
00496 IWvStreamCallback setwritecallback(IWvStreamCallback _callback);
00497
00500 IWvStreamCallback setexceptcallback(IWvStreamCallback _callback);
00501
00503 IWvStreamCallback setclosecallback(IWvStreamCallback _callback);
00504
00510 void autoforward(WvStream &s);
00511
00513 void noautoforward();
00514 static void autoforward_callback(WvStream &s, void *userdata);
00515
00519 void *_callwrap(void *);
00520
00524 void _callback();
00525
00530 virtual void callback();
00531
00536 void alarm(time_t msec_timeout);
00537
00543 time_t alarm_remaining();
00544
00549 size_t write(WvStringParm s)
00550 { return write(s.cstr(), s.len()); }
00551 size_t print(WvStringParm s)
00552 { return write(s); }
00553 size_t operator() (WvStringParm s)
00554 { return write(s); }
00555
00557 size_t print(WVSTRING_FORMAT_DECL)
00558 { return write(WvString(WVSTRING_FORMAT_CALL)); }
00559 size_t operator() (WVSTRING_FORMAT_DECL)
00560 { return write(WvString(WVSTRING_FORMAT_CALL)); }
00561
00562 protected:
00563
00564
00565
00566
00567
00568 bool _build_selectinfo(SelectInfo &si, time_t msec_timeout,
00569 bool readable, bool writable, bool isexcept,
00570 bool forceable);
00571
00572
00573
00574
00575 int _do_select(SelectInfo &si);
00576
00577
00578
00579 bool _process_selectinfo(SelectInfo &si, bool forceable);
00580
00581
00582
00583
00584
00585 bool flush_outbuf(time_t msec_timeout);
00586
00587
00588
00589 virtual bool flush_internal(time_t msec_timeout);
00590
00591
00592
00593
00594 virtual int getrfd() const;
00595 virtual int getwfd() const;
00596
00597 private:
00599 bool _select(time_t msec_timeout,
00600 bool readable, bool writable, bool isexcept,
00601 bool forceable);
00602
00603 void legacy_callback(IWvStream& s);
00604
00605 protected:
00606
00607
00608 friend class WvHTTPClientProxyStream;
00609
00610 WvDynBuf inbuf, outbuf;
00611
00612 WvStreamCallback callfunc;
00613 void *userdata;
00614 WvCallback<void*,void*> call_ctx;
00615
00616 IWvStreamCallback readcb, writecb, exceptcb, closecb;
00617
00618 size_t max_outbuf_size;
00619 bool outbuf_delayed_flush;
00620 bool is_auto_flush;
00621
00622
00623 bool want_to_flush;
00624
00625
00626 bool is_flushing;
00627
00628 size_t queue_min;
00629 time_t autoclose_time;
00630 WvTime alarm_time;
00631 WvTime last_alarm_check;
00632
00643 virtual void execute()
00644 { }
00645
00646
00647 static WvStream *globalstream;
00648
00649
00650
00651
00652 #ifdef __WVSTREAM_UNIT_TEST
00653 public:
00654 size_t outbuf_used()
00655 { return outbuf.used(); }
00656 size_t inbuf_used()
00657 { return inbuf.used(); }
00658 void inbuf_putstr(WvStringParm t)
00659 { inbuf.putstr(t); }
00660 #endif
00661
00662 private:
00664 WvStream(const WvStream &s);
00665 WvStream& operator= (const WvStream &s);
00666
00667 private:
00668 WvString my_wsname;
00669 public:
00670 const char *wsname() const
00671 { return my_wsname; }
00672 void set_wsname(WvStringParm wsname)
00673 { my_wsname = wsname; }
00674 void set_wsname(WVSTRING_FORMAT_DECL)
00675 { set_wsname(WvString(WVSTRING_FORMAT_CALL)); }
00676
00677 public:
00678 const char *wstype() const { return "WvStream"; }
00679
00680 private:
00681 WSID my_wsid;
00682 public:
00683 WSID wsid() const { return my_wsid; }
00684
00685 private:
00686 static WvMap<WSID, WvStream *> *wsid_map;
00687 static WSID next_wsid_to_try;
00688 public:
00689 static IWvStream *find_by_wsid(WSID wsid);
00690
00691 private:
00692 static void add_debugger_commands();
00693 protected:
00694 static void debugger_streams_display_header(WvStringParm cmd,
00695 WvStreamsDebugger::ResultCallback result_cb);
00696 static void debugger_streams_display_one_stream(WvStream *s,
00697 WvStringParm cmd,
00698 WvStreamsDebugger::ResultCallback result_cb);
00699 static void debugger_streams_maybe_display_one_stream(WvStream *s,
00700 WvStringParm cmd,
00701 const WvStringList &args,
00702 WvStreamsDebugger::ResultCallback result_cb);
00703 private:
00704 static WvString debugger_streams_run_cb(WvStringParm cmd,
00705 WvStringList &args,
00706 WvStreamsDebugger::ResultCallback result_cb, void *);
00707 static WvString debugger_close_run_cb(WvStringParm cmd,
00708 WvStringList &args,
00709 WvStreamsDebugger::ResultCallback result_cb, void *);
00710
00711 };
00712
00719 extern WvStream *wvcon;
00720 extern WvStream *wvin;
00721 extern WvStream *wvout;
00722 extern WvStream *wverr;
00723
00724 #endif // __WVSTREAM_H