00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include <time.h>
00012 #include <sys/types.h>
00013 #include <assert.h>
00014 #define __WVSTREAM_UNIT_TEST 1
00015 #include "wvstream.h"
00016 #include "wvtimeutils.h"
00017 #include "wvcont.h"
00018 #include "wvstreamsdebugger.h"
00019 #include "wvstrutils.h"
00020
00021 #ifdef _WIN32
00022 #define ENOBUFS WSAENOBUFS
00023 #undef errno
00024 #define errno GetLastError()
00025 #ifdef __GNUC__
00026 #include <sys/socket.h>
00027 #endif
00028 #include "streams.h"
00029 #else
00030 #include <errno.h>
00031 #endif
00032
00033
00034
00035 #if 0
00036 # ifndef _MSC_VER
00037 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00038 # else
00039 # define TRACE printf
00040 # endif
00041 #else
00042 # ifndef _MSC_VER
00043 # define TRACE(x, y...)
00044 # else
00045 # define TRACE
00046 # endif
00047 #endif
00048
00049 WvStream *WvStream::globalstream = NULL;
00050
00051 UUID_MAP_BEGIN(WvStream)
00052 UUID_MAP_ENTRY(IObject)
00053 UUID_MAP_ENTRY(IWvStream)
00054 UUID_MAP_END
00055
00056
00057 WvMap<WSID, WvStream *> *WvStream::wsid_map;
00058 WSID WvStream::next_wsid_to_try = 0;
00059
00060
00061 static bool is_prefix_insensitive(const char *str, const char *prefix)
00062 {
00063 size_t len = strlen(prefix);
00064 return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
00065 }
00066
00067
00068 static const char *strstr_insensitive(const char *haystack, const char *needle)
00069 {
00070 while (*haystack != '\0')
00071 {
00072 if (is_prefix_insensitive(haystack, needle))
00073 return haystack;
00074 ++haystack;
00075 }
00076 return NULL;
00077 }
00078
00079
00080 static bool contains_insensitive(const char *haystack, const char *needle)
00081 {
00082 return strstr_insensitive(haystack, needle) != NULL;
00083 }
00084
00085
00086 static const char *list_format = "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
00087 static inline const char *Yes_No(bool val)
00088 {
00089 return val? "Yes": "No";
00090 }
00091
00092
00093 void WvStream::debugger_streams_display_header(WvStringParm cmd,
00094 WvStreamsDebugger::ResultCallback result_cb)
00095 {
00096 WvStringList result;
00097 result.append(list_format, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-",
00098 "----------------Type", "-", "Name--------------------");
00099 result_cb(cmd, result);
00100 }
00101
00102
00103
00104 static WvString friendly_ms(time_t ms)
00105 {
00106 if (ms <= 0)
00107 return WvString("(%s)", ms);
00108 else if (ms < 1000)
00109 return WvString("%sms", ms);
00110 else if (ms < 60*1000)
00111 return WvString("%ss", ms/1000);
00112 else if (ms < 60*60*1000)
00113 return WvString("%sm", ms/(60*1000));
00114 else if (ms <= 24*60*60*1000)
00115 return WvString("%sh", ms/(60*60*1000));
00116 else
00117 return WvString("%sd", ms/(24*60*60*1000));
00118 }
00119
00120 void WvStream::debugger_streams_display_one_stream(WvStream *s,
00121 WvStringParm cmd,
00122 WvStreamsDebugger::ResultCallback result_cb)
00123 {
00124 WvStringList result;
00125 s->addRef();
00126 unsigned refcount = s->release();
00127 result.append(list_format,
00128 s->wsid(), " ",
00129 refcount, " ",
00130 Yes_No(s->isok()), " ",
00131 Yes_No(s->uses_continue_select), " ",
00132 friendly_ms(s->alarm_remaining()), " ",
00133 s->wstype(), " ",
00134 s->wsname());
00135 result_cb(cmd, result);
00136 }
00137
00138
00139 void WvStream::debugger_streams_maybe_display_one_stream(WvStream *s,
00140 WvStringParm cmd,
00141 const WvStringList &args,
00142 WvStreamsDebugger::ResultCallback result_cb)
00143 {
00144 bool show = args.isempty();
00145 WvStringList::Iter arg(args);
00146 for (arg.rewind(); arg.next(); )
00147 {
00148 WSID wsid;
00149 bool is_num = wvstring_to_num(*arg, wsid);
00150
00151 if (is_num)
00152 {
00153 if (s->wsid() == wsid)
00154 {
00155 show = true;
00156 break;
00157 }
00158 }
00159 else
00160 {
00161 if (s->wsname() && contains_insensitive(s->wsname(), *arg)
00162 || s->wstype() && contains_insensitive(s->wstype(), *arg))
00163 {
00164 show = true;
00165 break;
00166 }
00167 }
00168 }
00169 if (show)
00170 debugger_streams_display_one_stream(s, cmd, result_cb);
00171 }
00172
00173
00174 WvString WvStream::debugger_streams_run_cb(WvStringParm cmd,
00175 WvStringList &args,
00176 WvStreamsDebugger::ResultCallback result_cb, void *)
00177 {
00178 debugger_streams_display_header(cmd, result_cb);
00179 if (WvStream::wsid_map)
00180 {
00181 WvMap<WSID, WvStream *>::Iter i(*WvStream::wsid_map);
00182 for (i.rewind(); i.next(); )
00183 debugger_streams_maybe_display_one_stream(i->data,
00184 cmd, args, result_cb);
00185 }
00186
00187 return WvString::null;
00188 }
00189
00190
00191 WvString WvStream::debugger_close_run_cb(WvStringParm cmd,
00192 WvStringList &args,
00193 WvStreamsDebugger::ResultCallback result_cb, void *)
00194 {
00195 if (args.isempty())
00196 return WvString("Usage: %s <WSID>", cmd);
00197 WSID wsid;
00198 WvString wsid_str = args.popstr();
00199 if (!wvstring_to_num(wsid_str, wsid))
00200 return WvString("Invalid WSID '%s'", wsid_str);
00201 IWvStream *s = WvStream::find_by_wsid(wsid);
00202 if (!s)
00203 return WvString("No such stream");
00204 s->close();
00205 return WvString::null;
00206 }
00207
00208
00209 void WvStream::add_debugger_commands()
00210 {
00211 WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb, 0);
00212 WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb, 0);
00213 }
00214
00215
00216 WvStream::WvStream():
00217 read_requires_writable(NULL),
00218 write_requires_readable(NULL),
00219 uses_continue_select(false),
00220 personal_stack_size(131072),
00221 alarm_was_ticking(false),
00222 stop_read(false),
00223 stop_write(false),
00224 closed(false),
00225 userdata(NULL),
00226 readcb(this, &WvStream::legacy_callback),
00227 max_outbuf_size(0),
00228 outbuf_delayed_flush(false),
00229 is_auto_flush(true),
00230 want_to_flush(true),
00231 is_flushing(false),
00232 queue_min(0),
00233 autoclose_time(0),
00234 alarm_time(wvtime_zero),
00235 last_alarm_check(wvtime_zero)
00236 {
00237 TRACE("Creating wvstream %p\n", this);
00238
00239 static bool first = true;
00240 if (first)
00241 {
00242 first = false;
00243 WvStream::add_debugger_commands();
00244 }
00245
00246
00247 if (!wsid_map)
00248 wsid_map = new WvMap<WSID, WvStream *>(256);
00249 WSID first_wsid_tried = next_wsid_to_try;
00250 do
00251 {
00252 if (!wsid_map->exists(next_wsid_to_try))
00253 break;
00254 ++next_wsid_to_try;
00255 } while (next_wsid_to_try != first_wsid_tried);
00256 my_wsid = next_wsid_to_try++;
00257 assert(!wsid_map->exists(my_wsid));
00258 wsid_map->add(my_wsid, this);
00259
00260 #ifdef _WIN32
00261 WSAData wsaData;
00262 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
00263 assert(result == 0);
00264 #endif
00265 }
00266
00267
00268
00269 IWvStream::IWvStream()
00270 {
00271 }
00272
00273
00274 IWvStream::~IWvStream()
00275 {
00276 }
00277
00278
00279 WvStream::~WvStream()
00280 {
00281 TRACE("destroying %p\n", this);
00282 close();
00283
00284
00285
00286
00287 assert(!uses_continue_select || !call_ctx);
00288
00289 call_ctx = 0;
00290
00291 assert(wsid_map);
00292 wsid_map->remove(my_wsid);
00293 if (wsid_map->isempty())
00294 {
00295 delete wsid_map;
00296 wsid_map = NULL;
00297 }
00298
00299 TRACE("done destroying %p\n", this);
00300 }
00301
00302
00303 void WvStream::close()
00304 {
00305 TRACE("flushing in wvstream...\n");
00306 flush(2000);
00307 TRACE("(flushed)\n");
00308
00309 closed = true;
00310
00311 if (!!closecb)
00312 {
00313 IWvStreamCallback cb = closecb;
00314 closecb = 0;
00315 cb(*this);
00316 }
00317
00318
00319
00320
00321 }
00322
00323
00324 void WvStream::autoforward(WvStream &s)
00325 {
00326 setcallback(autoforward_callback, &s);
00327 read_requires_writable = &s;
00328 }
00329
00330
00331 void WvStream::noautoforward()
00332 {
00333 setcallback(0, NULL);
00334 read_requires_writable = NULL;
00335 }
00336
00337
00338 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00339 {
00340 WvStream &s2 = *(WvStream *)userdata;
00341 char buf[1024];
00342 size_t len;
00343
00344 len = s.read(buf, sizeof(buf));
00345
00346 s2.write(buf, len);
00347 }
00348
00349
00350 void WvStream::_callback()
00351 {
00352 execute();
00353 if (!! callfunc)
00354 callfunc(*this, userdata);
00355 }
00356
00357
00358 void *WvStream::_callwrap(void *)
00359 {
00360 _callback();
00361 return NULL;
00362 }
00363
00364
00365 void WvStream::callback()
00366 {
00367 TRACE("(?)");
00368
00369
00370 if (alarm_remaining() == 0)
00371 {
00372 alarm_time = wvtime_zero;
00373 alarm_was_ticking = true;
00374 }
00375 else
00376 alarm_was_ticking = false;
00377
00378 assert(!uses_continue_select || personal_stack_size >= 1024);
00379
00380 #define TEST_CONTINUES_HARSHLY 0
00381 #if TEST_CONTINUES_HARSHLY
00382 #ifndef _WIN32
00383 # warning "Using WvCont for *all* streams for testing!"
00384 #endif
00385 if (1)
00386 #else
00387 if (uses_continue_select && personal_stack_size >= 1024)
00388 #endif
00389 {
00390 if (!call_ctx)
00391 {
00392 call_ctx = WvCont(WvCallback<void*,void*>
00393 (this, &WvStream::_callwrap),
00394 personal_stack_size);
00395 }
00396
00397 call_ctx(NULL);
00398 }
00399 else
00400 _callback();
00401
00402
00403
00404
00405
00406
00407 }
00408
00409
00410 bool WvStream::isok() const
00411 {
00412 return !closed && WvErrorBase::isok();
00413 }
00414
00415
00416 void WvStream::seterr(int _errnum)
00417 {
00418 if (!geterr())
00419 {
00420 WvErrorBase::seterr(_errnum);
00421 close();
00422 }
00423 }
00424
00425
00426 size_t WvStream::read(WvBuf &outbuf, size_t count)
00427 {
00428
00429 size_t free = outbuf.free();
00430 if (count > free)
00431 count = free;
00432
00433 WvDynBuf tmp;
00434 unsigned char *buf = tmp.alloc(count);
00435 size_t len = read(buf, count);
00436 tmp.unalloc(count - len);
00437 outbuf.merge(tmp);
00438 return len;
00439 }
00440
00441
00442 size_t WvStream::write(WvBuf &inbuf, size_t count)
00443 {
00444
00445 size_t avail = inbuf.used();
00446 if (count > avail)
00447 count = avail;
00448 const unsigned char *buf = inbuf.get(count);
00449 size_t len = write(buf, count);
00450 inbuf.unget(count - len);
00451 return len;
00452 }
00453
00454
00455 size_t WvStream::read(void *buf, size_t count)
00456 {
00457 assert(!count || buf);
00458
00459 size_t bufu, i;
00460 unsigned char *newbuf;
00461
00462 bufu = inbuf.used();
00463 if (bufu < queue_min)
00464 {
00465 newbuf = inbuf.alloc(queue_min - bufu);
00466 assert(newbuf);
00467 i = uread(newbuf, queue_min - bufu);
00468 inbuf.unalloc(queue_min - bufu - i);
00469
00470 bufu = inbuf.used();
00471 }
00472
00473 if (bufu < queue_min)
00474 {
00475 maybe_autoclose();
00476 return 0;
00477 }
00478
00479
00480 if (!bufu)
00481 bufu = uread(buf, count);
00482 else
00483 {
00484
00485 if (bufu > count)
00486 bufu = count;
00487
00488 memcpy(buf, inbuf.get(bufu), bufu);
00489 }
00490
00491 TRACE("read obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00492 maybe_autoclose();
00493 return bufu;
00494 }
00495
00496
00497 size_t WvStream::write(const void *buf, size_t count)
00498 {
00499 assert(!count || buf);
00500 if (!isok() || !buf || !count || stop_write) return 0;
00501
00502 size_t wrote = 0;
00503 if (!outbuf_delayed_flush && !outbuf.used())
00504 {
00505 wrote = uwrite(buf, count);
00506 count -= wrote;
00507 buf = (const unsigned char *)buf + wrote;
00508
00509 }
00510 if (max_outbuf_size != 0)
00511 {
00512 size_t canbuffer = max_outbuf_size - outbuf.used();
00513 if (count > canbuffer)
00514 count = canbuffer;
00515 }
00516 if (count != 0)
00517 {
00518 outbuf.put(buf, count);
00519 wrote += count;
00520 }
00521
00522 if (should_flush())
00523 {
00524 if (is_auto_flush)
00525 flush(0);
00526 else
00527 flush_outbuf(0);
00528 }
00529
00530 return wrote;
00531 }
00532
00533
00534 void WvStream::noread()
00535 {
00536 stop_read = true;
00537 maybe_autoclose();
00538 }
00539
00540
00541 void WvStream::nowrite()
00542 {
00543 stop_write = true;
00544 maybe_autoclose();
00545 }
00546
00547
00548 void WvStream::maybe_autoclose()
00549 {
00550 if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
00551 close();
00552 }
00553
00554
00555 bool WvStream::isreadable()
00556 {
00557 return isok() && select(0, true, false, false);
00558 }
00559
00560
00561 bool WvStream::iswritable()
00562 {
00563 return !stop_write && isok() && select(0, false, true, false);
00564 }
00565
00566
00567 char *WvStream::blocking_getline(time_t wait_msec, int separator,
00568 int readahead)
00569 {
00570 assert(separator >= 0);
00571 assert(separator <= 255);
00572
00573
00574
00575 struct timeval timeout_time;
00576 if (wait_msec > 0)
00577 timeout_time = msecadd(wvtime(), wait_msec);
00578
00579 maybe_autoclose();
00580
00581
00582
00583 while (isok())
00584 {
00585
00586 queuemin(0);
00587
00588
00589 if (inbuf.strchr(separator) > 0)
00590 break;
00591 else if (!isok() || stop_read)
00592 break;
00593
00594
00595 queuemin(inbuf.used() + 1);
00596
00597
00598 if (wait_msec > 0)
00599 {
00600 wait_msec = msecdiff(timeout_time, wvtime());
00601 if (wait_msec < 0)
00602 wait_msec = 0;
00603 }
00604
00605
00606
00607 bool hasdata;
00608 if (wait_msec != 0 && uses_continue_select)
00609 hasdata = continue_select(wait_msec);
00610 else
00611 hasdata = select(wait_msec, true, false);
00612
00613 if (!isok())
00614 break;
00615
00616 if (hasdata)
00617 {
00618
00619 WvDynBuf tmp;
00620 unsigned char *buf = tmp.alloc(readahead);
00621 assert(buf);
00622 size_t len = uread(buf, readahead);
00623 tmp.unalloc(readahead - len);
00624 inbuf.put(tmp.get(len), len);
00625 hasdata = len > 0;
00626 }
00627
00628 if (!isok())
00629 break;
00630
00631 if (!hasdata && wait_msec == 0)
00632 return NULL;
00633 }
00634 if (!inbuf.used())
00635 return NULL;
00636
00637
00638 size_t i = 0;
00639 i = inbuf.strchr(separator);
00640 if (i > 0) {
00641 char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00642 assert(eol);
00643 *eol = 0;
00644 return const_cast<char*>((const char *)inbuf.get(i));
00645 } else {
00646
00647
00648
00649 inbuf.alloc(1)[0] = 0;
00650 return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
00651 }
00652 }
00653
00654
00655 char *WvStream::continue_getline(time_t wait_msec, int separator,
00656 int readahead)
00657 {
00658 assert(false && "not implemented, come back later!");
00659 assert(uses_continue_select);
00660 return NULL;
00661 }
00662
00663
00664 void WvStream::drain()
00665 {
00666 char buf[1024];
00667 while (isreadable())
00668 read(buf, sizeof(buf));
00669 }
00670
00671
00672 bool WvStream::flush(time_t msec_timeout)
00673 {
00674 if (is_flushing) return false;
00675
00676 TRACE("%p flush starts\n", this);
00677
00678 is_flushing = true;
00679 want_to_flush = true;
00680 bool done = flush_internal(msec_timeout)
00681 && flush_outbuf(msec_timeout);
00682 is_flushing = false;
00683
00684 TRACE("flush stops (%d)\n", done);
00685 return done;
00686 }
00687
00688
00689 bool WvStream::should_flush()
00690 {
00691 return want_to_flush;
00692 }
00693
00694
00695 bool WvStream::flush_outbuf(time_t msec_timeout)
00696 {
00697 TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00698 bool outbuf_was_used = outbuf.used();
00699
00700
00701
00702
00703 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
00704 {
00705 maybe_autoclose();
00706 return true;
00707 }
00708
00709 WvTime stoptime = msecadd(wvtime(), msec_timeout);
00710
00711
00712 while (outbuf_was_used && isok())
00713 {
00714
00715
00716
00717 size_t attempt = outbuf.optgettable();
00718 size_t real = uwrite(outbuf.get(attempt), attempt);
00719
00720
00721
00722
00723 if (isok() && real < attempt)
00724 {
00725 TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00726 assert(outbuf.ungettable() >= attempt - real);
00727 outbuf.unget(attempt - real);
00728 }
00729
00730
00731
00732
00733
00734 if (!msec_timeout)
00735 break;
00736 if (msec_timeout >= 0
00737 && (stoptime < wvtime() || !select(msec_timeout, false, true)))
00738 break;
00739
00740 outbuf_was_used = outbuf.used();
00741 }
00742
00743
00744 if (autoclose_time && isok())
00745 {
00746 time_t now = time(NULL);
00747 TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
00748 this, now - autoclose_time, outbuf.used());
00749 if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00750 {
00751 autoclose_time = 0;
00752 close();
00753 }
00754 }
00755
00756 TRACE("flush_outbuf: after autoclose chunk\n");
00757 if (outbuf_delayed_flush && !outbuf_was_used)
00758 want_to_flush = false;
00759
00760 TRACE("flush_outbuf: now isok=%d\n", isok());
00761
00762
00763 if (outbuf_was_used && !isok())
00764 outbuf.zap();
00765
00766 maybe_autoclose();
00767 TRACE("flush_outbuf stops\n");
00768
00769 return !outbuf_was_used;
00770 }
00771
00772
00773 bool WvStream::flush_internal(time_t msec_timeout)
00774 {
00775
00776 return true;
00777 }
00778
00779
00780 int WvStream::getrfd() const
00781 {
00782 return -1;
00783 }
00784
00785
00786 int WvStream::getwfd() const
00787 {
00788 return -1;
00789 }
00790
00791
00792 void WvStream::flush_then_close(int msec_timeout)
00793 {
00794 time_t now = time(NULL);
00795 autoclose_time = now + (msec_timeout + 999) / 1000;
00796
00797 TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
00798 this, outbuf.used(), autoclose_time - now);
00799
00800
00801
00802
00803
00804
00805 flush(0);
00806 }
00807
00808
00809 bool WvStream::pre_select(SelectInfo &si)
00810 {
00811 maybe_autoclose();
00812
00813 time_t alarmleft = alarm_remaining();
00814
00815 if (!si.inherit_request && alarmleft == 0)
00816 return true;
00817
00818 if (!si.inherit_request)
00819 {
00820 si.wants.readable |= readcb;
00821 si.wants.writable |= writecb;
00822 si.wants.isexception |= exceptcb;
00823 }
00824
00825
00826 if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00827 return true;
00828 if (alarmleft >= 0
00829 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00830 si.msec_timeout = alarmleft + 10;
00831 return false;
00832 }
00833
00834
00835 bool WvStream::post_select(SelectInfo &si)
00836 {
00837
00838
00839
00840
00841
00842
00843 if (should_flush())
00844 flush(0);
00845 if (!si.inherit_request && alarm_remaining() == 0)
00846 return true;
00847 return false;
00848 }
00849
00850
00851 bool WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00852 bool readable, bool writable, bool isexcept, bool forceable)
00853 {
00854 FD_ZERO(&si.read);
00855 FD_ZERO(&si.write);
00856 FD_ZERO(&si.except);
00857
00858 if (forceable)
00859 {
00860 si.wants.readable = readcb;
00861 si.wants.writable = writecb;
00862 si.wants.isexception = exceptcb;
00863 }
00864 else
00865 {
00866 si.wants.readable = readable;
00867 si.wants.writable = writable;
00868 si.wants.isexception = isexcept;
00869 }
00870
00871 si.max_fd = -1;
00872 si.msec_timeout = msec_timeout;
00873 si.inherit_request = ! forceable;
00874 si.global_sure = false;
00875
00876 if (!isok()) return false;
00877
00878 wvstime_sync();
00879
00880 bool sure = pre_select(si);
00881 if (globalstream && forceable && (globalstream != this))
00882 {
00883 WvStream *s = globalstream;
00884 globalstream = NULL;
00885 si.global_sure = s->xpre_select(si, SelectRequest(false, false, false));
00886 globalstream = s;
00887 }
00888 if (sure || si.global_sure)
00889 si.msec_timeout = 0;
00890 return sure;
00891 }
00892
00893
00894 int WvStream::_do_select(SelectInfo &si)
00895 {
00896
00897 timeval tv;
00898 tv.tv_sec = si.msec_timeout / 1000;
00899 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00900
00901 #ifdef _WIN32
00902
00903 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
00904 FD_SET(fakefd, &si.except);
00905 #endif
00906
00907
00908 int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00909 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00910
00911
00912
00913
00914
00915
00916 if (sel < 0
00917 && errno != EAGAIN && errno != EINTR
00918 && errno != EBADF
00919 && errno != ENOBUFS
00920 )
00921 {
00922 seterr(errno);
00923 }
00924 #ifdef _WIN32
00925 ::close(fakefd);
00926 #endif
00927 TRACE("select() returned %d\n", sel);
00928 return sel;
00929 }
00930
00931
00932 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00933 {
00934 if (!isok()) return false;
00935
00936 wvstime_sync();
00937
00938 bool sure = post_select(si);
00939 if (globalstream && forceable && (globalstream != this))
00940 {
00941 WvStream *s = globalstream;
00942 globalstream = NULL;
00943 si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
00944 || si.global_sure;
00945 globalstream = s;
00946 }
00947 return sure;
00948 }
00949
00950
00951 bool WvStream::_select(time_t msec_timeout,
00952 bool readable, bool writable, bool isexcept, bool forceable)
00953 {
00954 assert(wsid_map && wsid_map->exists(my_wsid));
00955
00956 SelectInfo si;
00957 bool sure = _build_selectinfo(si, msec_timeout,
00958 readable, writable, isexcept, forceable);
00959
00960 if (!isok())
00961 return false;
00962
00963
00964
00965
00966
00967
00968
00969
00970
00971 int sel = _do_select(si);
00972 if (sel >= 0)
00973 sure = _process_selectinfo(si, forceable) || sure;
00974 if (si.global_sure && globalstream && forceable && (globalstream != this))
00975 globalstream->callback();
00976 return sure;
00977 }
00978
00979
00980 IWvStream::SelectRequest WvStream::get_select_request()
00981 {
00982 return IWvStream::SelectRequest(readcb, writecb, exceptcb);
00983 }
00984
00985
00986 void WvStream::force_select(bool readable, bool writable, bool isexception)
00987 {
00988 if (readable)
00989 readcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00990 if (writable)
00991 writecb = IWvStreamCallback(this, &WvStream::legacy_callback);
00992 if (isexception)
00993 exceptcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00994 }
00995
00996
00997 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
00998 {
00999 if (readable)
01000 readcb = 0;
01001 if (writable)
01002 writecb = 0;
01003 if (isexception)
01004 exceptcb = 0;
01005 }
01006
01007
01008 void WvStream::alarm(time_t msec_timeout)
01009 {
01010 if (msec_timeout >= 0)
01011 alarm_time = msecadd(wvstime(), msec_timeout);
01012 else
01013 alarm_time = wvtime_zero;
01014 }
01015
01016
01017 time_t WvStream::alarm_remaining()
01018 {
01019 if (alarm_time.tv_sec)
01020 {
01021 WvTime now = wvstime();
01022
01023
01024 if (now < last_alarm_check)
01025 {
01026 #if 0 // okay, I give up. Time just plain goes backwards on some systems.
01027
01028 if (msecdiff(last_alarm_check, now) > 200)
01029 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
01030 "(%ld:%ld %ld:%ld)\n",
01031 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
01032 now.tv_sec, now.tv_usec);
01033 #endif
01034 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
01035 }
01036
01037 last_alarm_check = now;
01038
01039 time_t remaining = msecdiff(alarm_time, now);
01040 if (remaining < 0)
01041 remaining = 0;
01042 return remaining;
01043 }
01044 return -1;
01045 }
01046
01047
01048 bool WvStream::continue_select(time_t msec_timeout)
01049 {
01050 assert(uses_continue_select);
01051
01052
01053
01054 assert(call_ctx);
01055
01056 if (msec_timeout >= 0)
01057 alarm(msec_timeout);
01058
01059 alarm(msec_timeout);
01060 WvCont::yield();
01061 alarm(-1);
01062
01063
01064
01065
01066
01067
01068
01069 TRACE("hello-%p\n", this);
01070 return !alarm_was_ticking || select(0, readcb, writecb, exceptcb);
01071 }
01072
01073
01074 void WvStream::terminate_continue_select()
01075 {
01076 close();
01077 call_ctx = 0;
01078 }
01079
01080
01081 const WvAddr *WvStream::src() const
01082 {
01083 return NULL;
01084 }
01085
01086
01087 void WvStream::setcallback(WvStreamCallback _callfunc, void *_userdata)
01088 {
01089 callfunc = _callfunc;
01090 userdata = _userdata;
01091 call_ctx = 0;
01092 }
01093
01094
01095 void WvStream::legacy_callback(IWvStream& s)
01096 {
01097 execute();
01098 if (!! callfunc)
01099 callfunc(*this, userdata);
01100 }
01101
01102
01103 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
01104 {
01105 IWvStreamCallback tmp = readcb;
01106
01107 readcb = _callback;
01108
01109 return tmp;
01110 }
01111
01112
01113 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
01114 {
01115 IWvStreamCallback tmp = writecb;
01116
01117 writecb = _callback;
01118
01119 return tmp;
01120 }
01121
01122
01123 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
01124 {
01125 IWvStreamCallback tmp = exceptcb;
01126
01127 exceptcb = _callback;
01128
01129 return tmp;
01130 }
01131
01132
01133 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
01134 {
01135 IWvStreamCallback tmp = closecb;
01136 if (isok())
01137 closecb = _callback;
01138 else
01139 {
01140
01141 closecb = 0;
01142 if (!!_callback)
01143 _callback(*this);
01144 }
01145 return tmp;
01146 }
01147
01148
01149 void WvStream::unread(WvBuf &unreadbuf, size_t count)
01150 {
01151 WvDynBuf tmp;
01152 tmp.merge(unreadbuf, count);
01153 tmp.merge(inbuf);
01154 inbuf.zap();
01155 inbuf.merge(tmp);
01156 }
01157
01158
01159 IWvStream *WvStream::find_by_wsid(WSID wsid)
01160 {
01161 WvStream **presult = wsid_map? wsid_map->find(wsid): NULL;
01162 if (presult)
01163 return *presult;
01164 else
01165 return NULL;
01166 }