00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include <time.h>
00012 #include <sys/types.h>
00013 #include <assert.h>
00014 #define __WVSTREAM_UNIT_TEST 1
00015 #include "wvstream.h"
00016 #include "wvtimeutils.h"
00017 #include "wvcont.h"
00018 #include "wvstreamsdebugger.h"
00019 #include "wvstrutils.h"
00020
00021 #ifdef _WIN32
00022 #define ENOBUFS WSAENOBUFS
00023 #undef errno
00024 #define errno GetLastError()
00025 #ifdef __GNUC__
00026 #include <sys/socket.h>
00027 #endif
00028 #include "streams.h"
00029 #else
00030 #include <errno.h>
00031 #endif
00032
00033
00034
00035 #if 0
00036 # ifndef _MSC_VER
00037 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00038 # else
00039 # define TRACE printf
00040 # endif
00041 #else
00042 # ifndef _MSC_VER
00043 # define TRACE(x, y...)
00044 # else
00045 # define TRACE
00046 # endif
00047 #endif
00048
00049 WvStream *WvStream::globalstream = NULL;
00050
00051 UUID_MAP_BEGIN(WvStream)
00052 UUID_MAP_ENTRY(IObject)
00053 UUID_MAP_ENTRY(IWvStream)
00054 UUID_MAP_END
00055
00056
00057 WvMap<WSID, WvStream *> *WvStream::wsid_map;
00058 WSID WvStream::next_wsid_to_try = 0;
00059
00060
00061 static bool is_prefix_insensitive(const char *str, const char *prefix)
00062 {
00063 size_t len = strlen(prefix);
00064 return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
00065 }
00066
00067
00068 static const char *strstr_insensitive(const char *haystack, const char *needle)
00069 {
00070 while (*haystack != '\0')
00071 {
00072 if (is_prefix_insensitive(haystack, needle))
00073 return haystack;
00074 ++haystack;
00075 }
00076 return NULL;
00077 }
00078
00079
00080 static bool contains_insensitive(const char *haystack, const char *needle)
00081 {
00082 return strstr_insensitive(haystack, needle) != NULL;
00083 }
00084
00085
00086 static const char *list_format = "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
00087 static inline const char *Yes_No(bool val)
00088 {
00089 return val? "Yes": "No";
00090 }
00091
00092
00093 void WvStream::debugger_streams_display_header(WvStringParm cmd,
00094 WvStreamsDebugger::ResultCallback result_cb)
00095 {
00096 WvStringList result;
00097 result.append(list_format, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-",
00098 "----------------Type", "-", "Name--------------------");
00099 result_cb(cmd, result);
00100 }
00101
00102
00103
00104 static WvString friendly_ms(time_t ms)
00105 {
00106 if (ms <= 0)
00107 return WvString("(%s)", ms);
00108 else if (ms < 1000)
00109 return WvString("%sms", ms);
00110 else if (ms < 60*1000)
00111 return WvString("%ss", ms/1000);
00112 else if (ms < 60*60*1000)
00113 return WvString("%sm", ms/(60*1000));
00114 else if (ms <= 24*60*60*1000)
00115 return WvString("%sh", ms/(60*60*1000));
00116 else
00117 return WvString("%sd", ms/(24*60*60*1000));
00118 }
00119
00120 void WvStream::debugger_streams_display_one_stream(WvStream *s,
00121 WvStringParm cmd,
00122 WvStreamsDebugger::ResultCallback result_cb)
00123 {
00124 WvStringList result;
00125 s->addRef();
00126 unsigned refcount = s->release();
00127 result.append(list_format,
00128 s->wsid(), " ",
00129 refcount, " ",
00130 Yes_No(s->isok()), " ",
00131 Yes_No(s->uses_continue_select), " ",
00132 friendly_ms(s->alarm_remaining()), " ",
00133 s->wstype(), " ",
00134 s->wsname());
00135 result_cb(cmd, result);
00136 }
00137
00138
00139 void WvStream::debugger_streams_maybe_display_one_stream(WvStream *s,
00140 WvStringParm cmd,
00141 const WvStringList &args,
00142 WvStreamsDebugger::ResultCallback result_cb)
00143 {
00144 bool show = args.isempty();
00145 WvStringList::Iter arg(args);
00146 for (arg.rewind(); arg.next(); )
00147 {
00148 WSID wsid;
00149 bool is_num = wvstring_to_num(*arg, wsid);
00150
00151 if (is_num)
00152 {
00153 if (s->wsid() == wsid)
00154 {
00155 show = true;
00156 break;
00157 }
00158 }
00159 else
00160 {
00161 if (s->wsname() && contains_insensitive(s->wsname(), *arg)
00162 || s->wstype() && contains_insensitive(s->wstype(), *arg))
00163 {
00164 show = true;
00165 break;
00166 }
00167 }
00168 }
00169 if (show)
00170 debugger_streams_display_one_stream(s, cmd, result_cb);
00171 }
00172
00173
00174 WvString WvStream::debugger_streams_run_cb(WvStringParm cmd,
00175 WvStringList &args,
00176 WvStreamsDebugger::ResultCallback result_cb, void *)
00177 {
00178 debugger_streams_display_header(cmd, result_cb);
00179 if (WvStream::wsid_map)
00180 {
00181 WvMap<WSID, WvStream *>::Iter i(*WvStream::wsid_map);
00182 for (i.rewind(); i.next(); )
00183 debugger_streams_maybe_display_one_stream(i->data,
00184 cmd, args, result_cb);
00185 }
00186
00187 return WvString::null;
00188 }
00189
00190
00191 WvString WvStream::debugger_close_run_cb(WvStringParm cmd,
00192 WvStringList &args,
00193 WvStreamsDebugger::ResultCallback result_cb, void *)
00194 {
00195 if (args.isempty())
00196 return WvString("Usage: %s <WSID>", cmd);
00197 WSID wsid;
00198 WvString wsid_str = args.popstr();
00199 if (!wvstring_to_num(wsid_str, wsid))
00200 return WvString("Invalid WSID '%s'", wsid_str);
00201 IWvStream *s = WvStream::find_by_wsid(wsid);
00202 if (!s)
00203 return WvString("No such stream");
00204 s->close();
00205 return WvString::null;
00206 }
00207
00208
00209 void WvStream::add_debugger_commands()
00210 {
00211 WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb, 0);
00212 WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb, 0);
00213 }
00214
00215
00216 WvStream::WvStream():
00217 read_requires_writable(NULL),
00218 write_requires_readable(NULL),
00219 uses_continue_select(false),
00220 personal_stack_size(131072),
00221 alarm_was_ticking(false),
00222 stop_read(false),
00223 stop_write(false),
00224 closed(false),
00225 userdata(NULL),
00226 readcb(this, &WvStream::legacy_callback),
00227 max_outbuf_size(0),
00228 outbuf_delayed_flush(false),
00229 is_auto_flush(true),
00230 want_to_flush(true),
00231 is_flushing(false),
00232 queue_min(0),
00233 autoclose_time(0),
00234 alarm_time(wvtime_zero),
00235 last_alarm_check(wvtime_zero)
00236 {
00237 TRACE("Creating wvstream %p\n", this);
00238
00239 static bool first = true;
00240 if (first)
00241 {
00242 first = false;
00243 WvStream::add_debugger_commands();
00244 }
00245
00246
00247 if (!wsid_map)
00248 wsid_map = new WvMap<WSID, WvStream *>(256);
00249 WSID first_wsid_tried = next_wsid_to_try;
00250 do
00251 {
00252 if (!wsid_map->exists(next_wsid_to_try))
00253 break;
00254 ++next_wsid_to_try;
00255 } while (next_wsid_to_try != first_wsid_tried);
00256 my_wsid = next_wsid_to_try++;
00257 assert(!wsid_map->exists(my_wsid));
00258 wsid_map->add(my_wsid, this);
00259
00260 #ifdef _WIN32
00261 WSAData wsaData;
00262 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
00263 assert(result == 0);
00264 #endif
00265 }
00266
00267
00268
00269 IWvStream::IWvStream()
00270 {
00271 }
00272
00273
00274 IWvStream::~IWvStream()
00275 {
00276 }
00277
00278
00279 WvStream::~WvStream()
00280 {
00281 TRACE("destroying %p\n", this);
00282 close();
00283
00284
00285
00286
00287 assert(!uses_continue_select || !call_ctx);
00288
00289 call_ctx = 0;
00290
00291 assert(wsid_map);
00292 wsid_map->remove(my_wsid);
00293 if (wsid_map->isempty())
00294 {
00295 delete wsid_map;
00296 wsid_map = NULL;
00297 }
00298
00299 TRACE("done destroying %p\n", this);
00300 }
00301
00302
00303 void WvStream::close()
00304 {
00305 TRACE("flushing in wvstream...\n");
00306 flush(2000);
00307 TRACE("(flushed)\n");
00308
00309 closed = true;
00310
00311 if (!!closecb)
00312 {
00313 IWvStreamCallback cb = closecb;
00314 closecb = 0;
00315 cb(*this);
00316 }
00317
00318
00319
00320
00321 }
00322
00323
00324 void WvStream::autoforward(WvStream &s)
00325 {
00326 setcallback(autoforward_callback, &s);
00327 read_requires_writable = &s;
00328 }
00329
00330
00331 void WvStream::noautoforward()
00332 {
00333 setcallback(0, NULL);
00334 read_requires_writable = NULL;
00335 }
00336
00337
00338 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00339 {
00340 WvStream &s2 = *(WvStream *)userdata;
00341 char buf[1024];
00342 size_t len;
00343
00344 len = s.read(buf, sizeof(buf));
00345
00346 s2.write(buf, len);
00347 }
00348
00349
00350 void WvStream::_callback()
00351 {
00352 execute();
00353 if (!! callfunc)
00354 callfunc(*this, userdata);
00355 }
00356
00357
00358 void *WvStream::_callwrap(void *)
00359 {
00360 _callback();
00361 return NULL;
00362 }
00363
00364
00365 void WvStream::callback()
00366 {
00367 TRACE("(?)");
00368
00369
00370 if (alarm_remaining() == 0)
00371 {
00372 alarm_time = wvtime_zero;
00373 alarm_was_ticking = true;
00374 }
00375 else
00376 alarm_was_ticking = false;
00377
00378 assert(!uses_continue_select || personal_stack_size >= 1024);
00379
00380 #define TEST_CONTINUES_HARSHLY 0
00381 #if TEST_CONTINUES_HARSHLY
00382 #ifndef _WIN32
00383 # warning "Using WvCont for *all* streams for testing!"
00384 #endif
00385 if (1)
00386 #else
00387 if (uses_continue_select && personal_stack_size >= 1024)
00388 #endif
00389 {
00390 if (!call_ctx)
00391 {
00392 call_ctx = WvCont(WvCallback<void*,void*>
00393 (this, &WvStream::_callwrap),
00394 personal_stack_size);
00395 }
00396
00397 call_ctx(NULL);
00398 }
00399 else
00400 _callback();
00401
00402
00403
00404
00405
00406
00407 }
00408
00409
00410 bool WvStream::isok() const
00411 {
00412 return !closed && WvErrorBase::isok();
00413 }
00414
00415
00416 void WvStream::seterr(int _errnum)
00417 {
00418 if (!geterr())
00419 {
00420 WvErrorBase::seterr(_errnum);
00421 close();
00422 }
00423 }
00424
00425
00426 size_t WvStream::read(WvBuf &outbuf, size_t count)
00427 {
00428
00429 size_t free = outbuf.free();
00430 if (count > free)
00431 count = free;
00432
00433 WvDynBuf tmp;
00434 unsigned char *buf = tmp.alloc(count);
00435 size_t len = read(buf, count);
00436 tmp.unalloc(count - len);
00437 outbuf.merge(tmp);
00438 return len;
00439 }
00440
00441
00442 size_t WvStream::write(WvBuf &inbuf, size_t count)
00443 {
00444
00445 size_t avail = inbuf.used();
00446 if (count > avail)
00447 count = avail;
00448 const unsigned char *buf = inbuf.get(count);
00449 size_t len = write(buf, count);
00450 inbuf.unget(count - len);
00451 return len;
00452 }
00453
00454
00455 size_t WvStream::read(void *buf, size_t count)
00456 {
00457 assert(!count || buf);
00458
00459 size_t bufu, i;
00460 unsigned char *newbuf;
00461
00462 bufu = inbuf.used();
00463 if (bufu < queue_min)
00464 {
00465 newbuf = inbuf.alloc(queue_min - bufu);
00466 assert(newbuf);
00467 i = uread(newbuf, queue_min - bufu);
00468 inbuf.unalloc(queue_min - bufu - i);
00469
00470 bufu = inbuf.used();
00471 }
00472
00473 if (bufu < queue_min)
00474 {
00475 maybe_autoclose();
00476 return 0;
00477 }
00478
00479
00480 if (!bufu)
00481 bufu = uread(buf, count);
00482 else
00483 {
00484
00485 if (bufu > count)
00486 bufu = count;
00487
00488 memcpy(buf, inbuf.get(bufu), bufu);
00489 }
00490
00491 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00492 maybe_autoclose();
00493 return bufu;
00494 }
00495
00496
00497 size_t WvStream::write(const void *buf, size_t count)
00498 {
00499 assert(!count || buf);
00500 if (!isok() || !buf || !count || stop_write) return 0;
00501
00502 size_t wrote = 0;
00503 if (!outbuf_delayed_flush && !outbuf.used())
00504 {
00505 wrote = uwrite(buf, count);
00506 count -= wrote;
00507 buf = (const unsigned char *)buf + wrote;
00508
00509 }
00510 if (max_outbuf_size != 0)
00511 {
00512 size_t canbuffer = max_outbuf_size - outbuf.used();
00513 if (count > canbuffer)
00514 count = canbuffer;
00515 }
00516 if (count != 0)
00517 {
00518 outbuf.put(buf, count);
00519 wrote += count;
00520 }
00521
00522 if (should_flush())
00523 {
00524 if (is_auto_flush)
00525 flush(0);
00526 else
00527 flush_outbuf(0);
00528 }
00529
00530 return wrote;
00531 }
00532
00533
00534 void WvStream::noread()
00535 {
00536 stop_read = true;
00537 maybe_autoclose();
00538 }
00539
00540
00541 void WvStream::nowrite()
00542 {
00543 stop_write = true;
00544 maybe_autoclose();
00545 }
00546
00547
00548 void WvStream::maybe_autoclose()
00549 {
00550 if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
00551 close();
00552 }
00553
00554
00555 bool WvStream::isreadable()
00556 {
00557 return isok() && select(0, true, false, false);
00558 }
00559
00560
00561 bool WvStream::iswritable()
00562 {
00563 return !stop_write && isok() && select(0, false, true, false);
00564 }
00565
00566
00567 char *WvStream::blocking_getline(time_t wait_msec, int separator,
00568 int readahead)
00569 {
00570 assert(separator >= 0);
00571 assert(separator <= 255);
00572
00573
00574
00575 WvTime timeout_time(0);
00576 if (wait_msec > 0)
00577 timeout_time = msecadd(wvtime(), wait_msec);
00578
00579 maybe_autoclose();
00580
00581
00582
00583 while (isok())
00584 {
00585
00586 queuemin(0);
00587
00588
00589 if (inbuf.strchr(separator) > 0)
00590 break;
00591 else if (!isok() || stop_read)
00592 break;
00593
00594
00595 queuemin(inbuf.used() + 1);
00596
00597
00598 if (wait_msec > 0)
00599 {
00600 wait_msec = msecdiff(timeout_time, wvtime());
00601 if (wait_msec < 0)
00602 wait_msec = 0;
00603 }
00604
00605
00606
00607 bool hasdata;
00608 if (wait_msec != 0 && uses_continue_select)
00609 hasdata = continue_select(wait_msec);
00610 else
00611 hasdata = select(wait_msec, true, false);
00612
00613 if (!isok())
00614 break;
00615
00616 if (hasdata)
00617 {
00618
00619 WvDynBuf tmp;
00620 unsigned char *buf = tmp.alloc(readahead);
00621 assert(buf);
00622 size_t len = uread(buf, readahead);
00623 tmp.unalloc(readahead - len);
00624 inbuf.put(tmp.get(len), len);
00625 hasdata = len > 0;
00626 }
00627
00628 if (!isok())
00629 break;
00630
00631 if (!hasdata && wait_msec == 0)
00632 return NULL;
00633 }
00634 if (!inbuf.used())
00635 return NULL;
00636
00637
00638 size_t i = 0;
00639 i = inbuf.strchr(separator);
00640 if (i > 0) {
00641 char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00642 assert(eol && *eol == separator);
00643 *eol = 0;
00644 return const_cast<char*>((const char *)inbuf.get(i));
00645 } else {
00646
00647
00648
00649 inbuf.alloc(1)[0] = 0;
00650 return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
00651 }
00652 }
00653
00654
00655 char *WvStream::continue_getline(time_t wait_msec, int separator,
00656 int readahead)
00657 {
00658 assert(false && "not implemented, come back later!");
00659 assert(uses_continue_select);
00660 return NULL;
00661 }
00662
00663
00664 void WvStream::drain()
00665 {
00666 char buf[1024];
00667 while (isreadable())
00668 read(buf, sizeof(buf));
00669 }
00670
00671
00672 bool WvStream::flush(time_t msec_timeout)
00673 {
00674 if (is_flushing) return false;
00675
00676 TRACE("%p flush starts\n", this);
00677
00678 is_flushing = true;
00679 want_to_flush = true;
00680 bool done = flush_internal(msec_timeout)
00681 && flush_outbuf(msec_timeout);
00682 is_flushing = false;
00683
00684 TRACE("flush stops (%d)\n", done);
00685 return done;
00686 }
00687
00688
00689 bool WvStream::should_flush()
00690 {
00691 return want_to_flush;
00692 }
00693
00694
00695 bool WvStream::flush_outbuf(time_t msec_timeout)
00696 {
00697 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00698 bool outbuf_was_used = outbuf.used();
00699
00700
00701
00702
00703 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
00704 {
00705 maybe_autoclose();
00706 return true;
00707 }
00708
00709 WvTime stoptime = msecadd(wvtime(), msec_timeout);
00710
00711
00712 while (outbuf_was_used && isok())
00713 {
00714
00715
00716
00717 size_t attempt = outbuf.optgettable();
00718 size_t real = uwrite(outbuf.get(attempt), attempt);
00719
00720
00721
00722
00723 if (isok() && real < attempt)
00724 {
00725 TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00726 assert(outbuf.ungettable() >= attempt - real);
00727 outbuf.unget(attempt - real);
00728 }
00729
00730
00731
00732
00733
00734 if (!msec_timeout)
00735 break;
00736 if (msec_timeout >= 0
00737 && (stoptime < wvtime() || !select(msec_timeout, false, true)))
00738 break;
00739
00740 outbuf_was_used = outbuf.used();
00741 }
00742
00743
00744 if (autoclose_time && isok())
00745 {
00746 time_t now = time(NULL);
00747 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
00748 this, now - autoclose_time, outbuf.used());
00749 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00750 {
00751 autoclose_time = 0;
00752 close();
00753 }
00754 }
00755
00756 TRACE("flush_outbuf: after autoclose chunk\n");
00757 if (outbuf_delayed_flush && !outbuf_was_used)
00758 want_to_flush = false;
00759
00760 TRACE("flush_outbuf: now isok=%d\n", isok());
00761
00762
00763 if (outbuf_was_used && !isok())
00764 outbuf.zap();
00765
00766 maybe_autoclose();
00767 TRACE("flush_outbuf stops\n");
00768
00769 return !outbuf_was_used;
00770 }
00771
00772
00773 bool WvStream::flush_internal(time_t msec_timeout)
00774 {
00775
00776 return true;
00777 }
00778
00779
00780 int WvStream::getrfd() const
00781 {
00782 return -1;
00783 }
00784
00785
00786 int WvStream::getwfd() const
00787 {
00788 return -1;
00789 }
00790
00791
00792 void WvStream::flush_then_close(int msec_timeout)
00793 {
00794 time_t now = time(NULL);
00795 autoclose_time = now + (msec_timeout + 999) / 1000;
00796
00797 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
00798 this, outbuf.used(), autoclose_time - now);
00799
00800
00801
00802
00803
00804
00805 flush(0);
00806 }
00807
00808
00809 void WvStream::pre_select(SelectInfo &si)
00810 {
00811 maybe_autoclose();
00812
00813 time_t alarmleft = alarm_remaining();
00814
00815 if (!si.inherit_request && alarmleft == 0)
00816 {
00817 si.msec_timeout = 0;
00818 return;
00819 }
00820
00821 if (!si.inherit_request)
00822 {
00823 si.wants.readable |= readcb;
00824 si.wants.writable |= writecb;
00825 si.wants.isexception |= exceptcb;
00826 }
00827
00828
00829 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00830 {
00831 si.msec_timeout = 0;
00832 return;
00833 }
00834 if (alarmleft >= 0
00835 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00836 si.msec_timeout = alarmleft + 10;
00837 }
00838
00839
00840 bool WvStream::post_select(SelectInfo &si)
00841 {
00842 if (!si.inherit_request)
00843 {
00844 si.wants.readable |= readcb;
00845 si.wants.writable |= writecb;
00846 si.wants.isexception |= exceptcb;
00847 }
00848
00849
00850
00851
00852
00853
00854
00855 if (should_flush())
00856 flush(0);
00857 if (!si.inherit_request && alarm_remaining() == 0)
00858 return true;
00859 if ((si.wants.readable || (!si.inherit_request && readcb))
00860 && inbuf.used() && inbuf.used() >= queue_min)
00861 return true;
00862 return false;
00863 }
00864
00865
00866 void WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00867 bool readable, bool writable, bool isexcept, bool forceable)
00868 {
00869 FD_ZERO(&si.read);
00870 FD_ZERO(&si.write);
00871 FD_ZERO(&si.except);
00872
00873 if (forceable)
00874 {
00875 si.wants.readable = readcb;
00876 si.wants.writable = writecb;
00877 si.wants.isexception = exceptcb;
00878 }
00879 else
00880 {
00881 si.wants.readable = readable;
00882 si.wants.writable = writable;
00883 si.wants.isexception = isexcept;
00884 }
00885
00886 si.max_fd = -1;
00887 si.msec_timeout = msec_timeout;
00888 si.inherit_request = ! forceable;
00889 si.global_sure = false;
00890
00891 if (!isok()) return;
00892
00893 wvstime_sync();
00894
00895 pre_select(si);
00896 if (globalstream && forceable && (globalstream != this))
00897 {
00898 WvStream *s = globalstream;
00899 globalstream = NULL;
00900 s->xpre_select(si, SelectRequest(false, false, false));
00901 globalstream = s;
00902 }
00903 }
00904
00905
00906 int WvStream::_do_select(SelectInfo &si)
00907 {
00908
00909 timeval tv;
00910 tv.tv_sec = si.msec_timeout / 1000;
00911 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00912
00913 #ifdef _WIN32
00914
00915 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
00916 FD_SET(fakefd, &si.except);
00917 #endif
00918
00919
00920 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00921 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00922
00923
00924
00925
00926
00927
00928 if (sel < 0
00929 && errno != EAGAIN && errno != EINTR
00930 && errno != EBADF
00931 && errno != ENOBUFS
00932 )
00933 {
00934 seterr(errno);
00935 }
00936 #ifdef _WIN32
00937 ::close(fakefd);
00938 #endif
00939 TRACE("select() returned %d\n", sel);
00940 return sel;
00941 }
00942
00943
00944 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00945 {
00946 if (!isok()) return false;
00947
00948
00949
00950
00951
00952
00953 wvstime_sync_forward();
00954
00955 bool sure = post_select(si);
00956 if (globalstream && forceable && (globalstream != this))
00957 {
00958 WvStream *s = globalstream;
00959 globalstream = NULL;
00960 si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
00961 || si.global_sure;
00962 globalstream = s;
00963 }
00964 return sure;
00965 }
00966
00967
00968 bool WvStream::_select(time_t msec_timeout,
00969 bool readable, bool writable, bool isexcept, bool forceable)
00970 {
00971 assert(wsid_map && wsid_map->exists(my_wsid));
00972
00973 SelectInfo si;
00974 _build_selectinfo(si, msec_timeout,
00975 readable, writable, isexcept, forceable);
00976
00977 if (!isok())
00978 return false;
00979
00980 bool sure = false;
00981 int sel = _do_select(si);
00982 if (sel >= 0)
00983 sure = _process_selectinfo(si, forceable);
00984 if (si.global_sure && globalstream && forceable && (globalstream != this))
00985 globalstream->callback();
00986
00987 return sure;
00988 }
00989
00990
00991 IWvStream::SelectRequest WvStream::get_select_request()
00992 {
00993 return IWvStream::SelectRequest(readcb, writecb, exceptcb);
00994 }
00995
00996
00997 void WvStream::force_select(bool readable, bool writable, bool isexception)
00998 {
00999 if (readable)
01000 readcb = IWvStreamCallback(this, &WvStream::legacy_callback);
01001 if (writable)
01002 writecb = IWvStreamCallback(this, &WvStream::legacy_callback);
01003 if (isexception)
01004 exceptcb = IWvStreamCallback(this, &WvStream::legacy_callback);
01005 }
01006
01007
01008 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
01009 {
01010 if (readable)
01011 readcb = 0;
01012 if (writable)
01013 writecb = 0;
01014 if (isexception)
01015 exceptcb = 0;
01016 }
01017
01018
01019 void WvStream::alarm(time_t msec_timeout)
01020 {
01021 if (msec_timeout >= 0)
01022 alarm_time = msecadd(wvstime(), msec_timeout);
01023 else
01024 alarm_time = wvtime_zero;
01025 }
01026
01027
01028 time_t WvStream::alarm_remaining()
01029 {
01030 if (alarm_time.tv_sec)
01031 {
01032 WvTime now = wvstime();
01033
01034
01035 if (now < last_alarm_check)
01036 {
01037 #if 0 // okay, I give up. Time just plain goes backwards on some systems.
01038
01039 if (msecdiff(last_alarm_check, now) > 200)
01040 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
01041 "(%ld:%ld %ld:%ld)\n",
01042 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
01043 now.tv_sec, now.tv_usec);
01044 #endif
01045 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
01046 }
01047
01048 last_alarm_check = now;
01049
01050 time_t remaining = msecdiff(alarm_time, now);
01051 if (remaining < 0)
01052 remaining = 0;
01053 return remaining;
01054 }
01055 return -1;
01056 }
01057
01058
01059 bool WvStream::continue_select(time_t msec_timeout)
01060 {
01061 assert(uses_continue_select);
01062
01063
01064
01065 assert(call_ctx);
01066
01067 if (msec_timeout >= 0)
01068 alarm(msec_timeout);
01069
01070 alarm(msec_timeout);
01071 WvCont::yield();
01072 alarm(-1);
01073
01074
01075
01076
01077
01078
01079
01080 TRACE("hello-%p\n", this);
01081 return !alarm_was_ticking || select(0, readcb, writecb, exceptcb);
01082 }
01083
01084
01085 void WvStream::terminate_continue_select()
01086 {
01087 close();
01088 call_ctx = 0;
01089 }
01090
01091
01092 const WvAddr *WvStream::src() const
01093 {
01094 return NULL;
01095 }
01096
01097
01098 void WvStream::setcallback(WvStreamCallback _callfunc, void *_userdata)
01099 {
01100 callfunc = _callfunc;
01101 userdata = _userdata;
01102 call_ctx = 0;
01103 }
01104
01105
01106 void WvStream::legacy_callback(IWvStream& s)
01107 {
01108 execute();
01109 if (!! callfunc)
01110 callfunc(*this, userdata);
01111 }
01112
01113
01114 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
01115 {
01116 IWvStreamCallback tmp = readcb;
01117
01118 readcb = _callback;
01119
01120 return tmp;
01121 }
01122
01123
01124 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
01125 {
01126 IWvStreamCallback tmp = writecb;
01127
01128 writecb = _callback;
01129
01130 return tmp;
01131 }
01132
01133
01134 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
01135 {
01136 IWvStreamCallback tmp = exceptcb;
01137
01138 exceptcb = _callback;
01139
01140 return tmp;
01141 }
01142
01143
01144 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
01145 {
01146 IWvStreamCallback tmp = closecb;
01147 if (isok())
01148 closecb = _callback;
01149 else
01150 {
01151
01152 closecb = 0;
01153 if (!!_callback)
01154 _callback(*this);
01155 }
01156 return tmp;
01157 }
01158
01159
01160 void WvStream::unread(WvBuf &unreadbuf, size_t count)
01161 {
01162 WvDynBuf tmp;
01163 tmp.merge(unreadbuf, count);
01164 tmp.merge(inbuf);
01165 inbuf.zap();
01166 inbuf.merge(tmp);
01167 }
01168
01169
01170 IWvStream *WvStream::find_by_wsid(WSID wsid)
01171 {
01172 WvStream **presult = wsid_map? wsid_map->find(wsid): NULL;
01173 if (presult)
01174 return *presult;
01175 else
01176 return NULL;
01177 }