wvstream.cc

00001 /*
00002  * Worldvisions Weaver Software:
00003  *   Copyright (C) 1997-2002 Net Integration Technologies, Inc.
00004  * 
00005  * Unified support for streams, that is, sequences of bytes that may or
00006  * may not be ready for read/write at any given time.
00007  * 
00008  * We provide typical read and write routines, as well as a select() function
00009  * for each stream.
00010  */
00011 #include <time.h>
00012 #include <sys/types.h>
00013 #include <assert.h>
00014 #define __WVSTREAM_UNIT_TEST 1
00015 #include "wvstream.h"
00016 #include "wvtimeutils.h"
00017 #include "wvcont.h"
00018 #include "wvstreamsdebugger.h"
00019 #include "wvstrutils.h"
00020 
00021 #ifdef _WIN32
00022 #define ENOBUFS WSAENOBUFS
00023 #undef errno
00024 #define errno GetLastError()
00025 #ifdef __GNUC__
00026 #include <sys/socket.h>
00027 #endif
00028 #include "streams.h"
00029 #else
00030 #include <errno.h>
00031 #endif
00032 
00033 // enable this to add some read/write trace messages (this can be VERY
00034 // verbose)
00035 #if 0
00036 # ifndef _MSC_VER
00037 #  define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr);
00038 # else
00039 #  define TRACE printf
00040 # endif
00041 #else
00042 # ifndef _MSC_VER
00043 #  define TRACE(x, y...)
00044 # else
00045 #  define TRACE
00046 # endif
00047 #endif
00048 
00049 WvStream *WvStream::globalstream = NULL;
00050 
00051 UUID_MAP_BEGIN(WvStream)
00052   UUID_MAP_ENTRY(IObject)
00053   UUID_MAP_ENTRY(IWvStream)
00054   UUID_MAP_END
00055 
00056 
00057 WvMap<WSID, WvStream *> *WvStream::wsid_map;
00058 WSID WvStream::next_wsid_to_try = 0;
00059 
00060 
00061 static bool is_prefix_insensitive(const char *str, const char *prefix)
00062 {
00063     size_t len = strlen(prefix);
00064     return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
00065 }
00066 
00067 
00068 static const char *strstr_insensitive(const char *haystack, const char *needle)
00069 {
00070     while (*haystack != '\0')
00071     {
00072         if (is_prefix_insensitive(haystack, needle))
00073             return haystack;
00074         ++haystack;
00075     }
00076     return NULL;
00077 }
00078 
00079 
00080 static bool contains_insensitive(const char *haystack, const char *needle)
00081 {
00082     return strstr_insensitive(haystack, needle) != NULL;
00083 }
00084 
00085 
00086 static const char *list_format = "%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
00087 static inline const char *Yes_No(bool val)
00088 {
00089     return val? "Yes": "No";
00090 }
00091 
00092 
00093 void WvStream::debugger_streams_display_header(WvStringParm cmd,
00094         WvStreamsDebugger::ResultCallback result_cb)
00095 {
00096     WvStringList result;
00097     result.append(list_format, "--WSID", "-", "RC", "-", "-Ok", "-", "-Cs", "-", "-AlRem", "-",
00098             "----------------Type", "-", "Name--------------------");
00099     result_cb(cmd, result);
00100 }
00101 
00102 
00103 // Set to fit in 6 chars
00104 static WvString friendly_ms(time_t ms)
00105 {
00106     if (ms <= 0)
00107         return WvString("(%s)", ms);
00108     else if (ms < 1000)
00109         return WvString("%sms", ms);
00110     else if (ms < 60*1000)
00111         return WvString("%ss", ms/1000);
00112     else if (ms < 60*60*1000)
00113         return WvString("%sm", ms/(60*1000));
00114     else if (ms <= 24*60*60*1000)
00115         return WvString("%sh", ms/(60*60*1000));
00116     else
00117         return WvString("%sd", ms/(24*60*60*1000));
00118 }
00119 
00120 void WvStream::debugger_streams_display_one_stream(WvStream *s,
00121         WvStringParm cmd,
00122         WvStreamsDebugger::ResultCallback result_cb)
00123 {
00124     WvStringList result;
00125     s->addRef();
00126     unsigned refcount = s->release();
00127     result.append(list_format,
00128             s->wsid(), " ",
00129             refcount, " ",
00130             Yes_No(s->isok()), " ",
00131             Yes_No(s->uses_continue_select), " ",
00132             friendly_ms(s->alarm_remaining()), " ",
00133             s->wstype(), " ",
00134             s->wsname());
00135     result_cb(cmd, result);
00136 }
00137 
00138 
00139 void WvStream::debugger_streams_maybe_display_one_stream(WvStream *s,
00140         WvStringParm cmd,
00141         const WvStringList &args,
00142         WvStreamsDebugger::ResultCallback result_cb)
00143 {
00144     bool show = args.isempty();
00145     WvStringList::Iter arg(args);
00146     for (arg.rewind(); arg.next(); )
00147     {
00148         WSID wsid;
00149         bool is_num = wvstring_to_num(*arg, wsid);
00150         
00151         if (is_num)
00152         {
00153             if (s->wsid() == wsid)
00154             {
00155                 show = true;
00156                 break;
00157             }
00158         }
00159         else
00160         {
00161             if (s->wsname() && contains_insensitive(s->wsname(), *arg)
00162                     || s->wstype() && contains_insensitive(s->wstype(), *arg))
00163             {
00164                 show = true;
00165                 break;
00166             }
00167         }
00168     }
00169     if (show)
00170         debugger_streams_display_one_stream(s, cmd, result_cb);
00171 }
00172 
00173 
00174 WvString WvStream::debugger_streams_run_cb(WvStringParm cmd,
00175         WvStringList &args,
00176         WvStreamsDebugger::ResultCallback result_cb, void *)
00177 {
00178     debugger_streams_display_header(cmd, result_cb);
00179     if (WvStream::wsid_map)
00180     {
00181         WvMap<WSID, WvStream *>::Iter i(*WvStream::wsid_map);
00182         for (i.rewind(); i.next(); )
00183             debugger_streams_maybe_display_one_stream(i->data,
00184                     cmd, args, result_cb);
00185     }
00186     
00187     return WvString::null;
00188 }
00189 
00190 
00191 WvString WvStream::debugger_close_run_cb(WvStringParm cmd,
00192         WvStringList &args,
00193         WvStreamsDebugger::ResultCallback result_cb, void *)
00194 {
00195     if (args.isempty())
00196         return WvString("Usage: %s <WSID>", cmd);
00197     WSID wsid;
00198     WvString wsid_str = args.popstr();
00199     if (!wvstring_to_num(wsid_str, wsid))
00200         return WvString("Invalid WSID '%s'", wsid_str);
00201     IWvStream *s = WvStream::find_by_wsid(wsid);
00202     if (!s)
00203         return WvString("No such stream");
00204     s->close();
00205     return WvString::null;
00206 }
00207 
00208 
00209 void WvStream::add_debugger_commands()
00210 {
00211     WvStreamsDebugger::add_command("streams", 0, debugger_streams_run_cb, 0);
00212     WvStreamsDebugger::add_command("close", 0, debugger_close_run_cb, 0);
00213 }
00214 
00215 
00216 WvStream::WvStream():
00217     read_requires_writable(NULL),
00218     write_requires_readable(NULL),
00219     uses_continue_select(false),
00220     personal_stack_size(131072),
00221     alarm_was_ticking(false),
00222     stop_read(false),
00223     stop_write(false),
00224     closed(false),
00225     userdata(NULL),
00226     readcb(this, &WvStream::legacy_callback),
00227     max_outbuf_size(0),
00228     outbuf_delayed_flush(false),
00229     is_auto_flush(true),
00230     want_to_flush(true),
00231     is_flushing(false),
00232     queue_min(0),
00233     autoclose_time(0),
00234     alarm_time(wvtime_zero),
00235     last_alarm_check(wvtime_zero)
00236 {
00237     TRACE("Creating wvstream %p\n", this);
00238     
00239     static bool first = true;
00240     if (first)
00241     {
00242         first = false;
00243         WvStream::add_debugger_commands();
00244     }
00245     
00246     // Choose a wsid;
00247     if (!wsid_map)
00248         wsid_map = new WvMap<WSID, WvStream *>(256);
00249     WSID first_wsid_tried = next_wsid_to_try;
00250     do
00251     {
00252         if (!wsid_map->exists(next_wsid_to_try))
00253             break;
00254         ++next_wsid_to_try;
00255     } while (next_wsid_to_try != first_wsid_tried);
00256     my_wsid = next_wsid_to_try++;
00257     assert(!wsid_map->exists(my_wsid));
00258     wsid_map->add(my_wsid, this);
00259     
00260 #ifdef _WIN32
00261     WSAData wsaData;
00262     int result = WSAStartup(MAKEWORD(2,0), &wsaData); 
00263     assert(result == 0);
00264 #endif
00265 }
00266 
00267 
00268 // FIXME: interfaces (IWvStream) shouldn't have implementations!
00269 IWvStream::IWvStream()
00270 {
00271 }
00272 
00273 
00274 IWvStream::~IWvStream()
00275 {
00276 }
00277 
00278 
00279 WvStream::~WvStream()
00280 {
00281     TRACE("destroying %p\n", this);
00282     close();
00283     
00284     // if this assertion fails, then uses_continue_select is true, but you
00285     // didn't call terminate_continue_select() or close() before destroying
00286     // your object.  Shame on you!
00287     assert(!uses_continue_select || !call_ctx);
00288     
00289     call_ctx = 0; // finish running the suspended callback, if any
00290 
00291     assert(wsid_map);
00292     wsid_map->remove(my_wsid);
00293     if (wsid_map->isempty())
00294     {
00295         delete wsid_map;
00296         wsid_map = NULL;
00297     }
00298     
00299     TRACE("done destroying %p\n", this);
00300 }
00301 
00302 
00303 void WvStream::close()
00304 {
00305     TRACE("flushing in wvstream...\n");
00306     flush(2000); // fixme: should not hardcode this stuff
00307     TRACE("(flushed)\n");
00308 
00309     closed = true;
00310     
00311     if (!!closecb)
00312     {
00313         IWvStreamCallback cb = closecb;
00314         closecb = 0; // ensure callback is only called once
00315         cb(*this);
00316     }
00317     
00318     // I would like to delete call_ctx here, but then if someone calls
00319     // close() from *inside* a continuable callback, we explode.  Oops!
00320     //call_ctx = 0; // destroy the context, if necessary
00321 }
00322 
00323 
00324 void WvStream::autoforward(WvStream &s)
00325 {
00326     setcallback(autoforward_callback, &s);
00327     read_requires_writable = &s;
00328 }
00329 
00330 
00331 void WvStream::noautoforward()
00332 {
00333     setcallback(0, NULL);
00334     read_requires_writable = NULL;
00335 }
00336 
00337 
00338 void WvStream::autoforward_callback(WvStream &s, void *userdata)
00339 {
00340     WvStream &s2 = *(WvStream *)userdata;
00341     char buf[1024];
00342     size_t len;
00343     
00344     len = s.read(buf, sizeof(buf));
00345     // fprintf(stderr, "autoforward read %d bytes\n", (int)len);
00346     s2.write(buf, len);
00347 }
00348 
00349 
00350 void WvStream::_callback()
00351 {
00352     execute();
00353     if (!! callfunc)
00354         callfunc(*this, userdata);
00355 }
00356 
00357 
00358 void *WvStream::_callwrap(void *)
00359 {
00360     _callback();
00361     return NULL;
00362 }
00363 
00364 
00365 void WvStream::callback()
00366 {
00367     TRACE("(?)");
00368     
00369     // if the alarm has gone off and we're calling callback... good!
00370     if (alarm_remaining() == 0)
00371     {
00372         alarm_time = wvtime_zero;
00373         alarm_was_ticking = true;
00374     }
00375     else
00376         alarm_was_ticking = false;
00377     
00378     assert(!uses_continue_select || personal_stack_size >= 1024);
00379 
00380 #define TEST_CONTINUES_HARSHLY 0
00381 #if TEST_CONTINUES_HARSHLY
00382 #ifndef _WIN32
00383 # warning "Using WvCont for *all* streams for testing!"
00384 #endif
00385     if (1)
00386 #else
00387     if (uses_continue_select && personal_stack_size >= 1024)
00388 #endif
00389     {
00390         if (!call_ctx) // no context exists yet!
00391         {
00392             call_ctx = WvCont(WvCallback<void*,void*>
00393                               (this, &WvStream::_callwrap),
00394                               personal_stack_size);
00395         }
00396         
00397         call_ctx(NULL);
00398     }
00399     else
00400         _callback();
00401 
00402     // if this assertion fails, a derived class's virtual execute() function
00403     // didn't call its parent's execute() function, and we didn't make it
00404     // all the way back up to WvStream::execute().  This doesn't always
00405     // matter right now, but it could lead to obscure bugs later, so we'll
00406     // enforce it.
00407 }
00408 
00409 
00410 bool WvStream::isok() const
00411 {
00412     return !closed && WvErrorBase::isok();
00413 }
00414 
00415 
00416 void WvStream::seterr(int _errnum)
00417 {
00418     if (!geterr()) // no pre-existing error
00419     {
00420         WvErrorBase::seterr(_errnum);
00421         close();
00422     }
00423 }
00424 
00425 
00426 size_t WvStream::read(WvBuf &outbuf, size_t count)
00427 {
00428     // for now, just wrap the older read function
00429     size_t free = outbuf.free();
00430     if (count > free)
00431         count = free;
00432 
00433     WvDynBuf tmp;
00434     unsigned char *buf = tmp.alloc(count);
00435     size_t len = read(buf, count);
00436     tmp.unalloc(count - len);
00437     outbuf.merge(tmp);
00438     return len;
00439 }
00440 
00441 
00442 size_t WvStream::write(WvBuf &inbuf, size_t count)
00443 {
00444     // for now, just wrap the older write function
00445     size_t avail = inbuf.used();
00446     if (count > avail)
00447         count = avail;
00448     const unsigned char *buf = inbuf.get(count);
00449     size_t len = write(buf, count);
00450     inbuf.unget(count - len);
00451     return len;
00452 }
00453 
00454 
00455 size_t WvStream::read(void *buf, size_t count)
00456 {
00457     assert(!count || buf);
00458     
00459     size_t bufu, i;
00460     unsigned char *newbuf;
00461 
00462     bufu = inbuf.used();
00463     if (bufu < queue_min)
00464     {
00465         newbuf = inbuf.alloc(queue_min - bufu);
00466         assert(newbuf);
00467         i = uread(newbuf, queue_min - bufu);
00468         inbuf.unalloc(queue_min - bufu - i);
00469         
00470         bufu = inbuf.used();
00471     }
00472     
00473     if (bufu < queue_min)
00474     {
00475         maybe_autoclose();
00476         return 0;
00477     }
00478         
00479     // if buffer is empty, do a hard read
00480     if (!bufu)
00481         bufu = uread(buf, count);
00482     else
00483     {
00484         // otherwise just read from the buffer
00485         if (bufu > count)
00486             bufu = count;
00487     
00488         memcpy(buf, inbuf.get(bufu), bufu);
00489     }
00490     
00491     TRACE("read  obj 0x%08x, bytes %d/%d\n", (unsigned int)this, bufu, count);
00492     maybe_autoclose();
00493     return bufu;
00494 }
00495 
00496 
00497 size_t WvStream::write(const void *buf, size_t count)
00498 {
00499     assert(!count || buf);
00500     if (!isok() || !buf || !count || stop_write) return 0;
00501     
00502     size_t wrote = 0;
00503     if (!outbuf_delayed_flush && !outbuf.used())
00504     {
00505         wrote = uwrite(buf, count);
00506         count -= wrote;
00507         buf = (const unsigned char *)buf + wrote;
00508         // if (!count) return wrote; // short circuit if no buffering needed
00509     }
00510     if (max_outbuf_size != 0)
00511     {
00512         size_t canbuffer = max_outbuf_size - outbuf.used();
00513         if (count > canbuffer)
00514             count = canbuffer; // can't write the whole amount
00515     }
00516     if (count != 0)
00517     {
00518         outbuf.put(buf, count);
00519         wrote += count;
00520     }
00521 
00522     if (should_flush())
00523     {
00524         if (is_auto_flush)
00525             flush(0);
00526         else 
00527             flush_outbuf(0);
00528     }
00529 
00530     return wrote;
00531 }
00532 
00533 
00534 void WvStream::noread()
00535 {
00536     stop_read = true;
00537     maybe_autoclose();
00538 }
00539 
00540 
00541 void WvStream::nowrite()
00542 {
00543     stop_write = true;
00544     maybe_autoclose();
00545 }
00546 
00547 
00548 void WvStream::maybe_autoclose()
00549 {
00550     if (stop_read && stop_write && !outbuf.used() && !inbuf.used() && !closed)
00551         close();
00552 }
00553 
00554 
00555 bool WvStream::isreadable()
00556 {
00557     return isok() && select(0, true, false, false);
00558 }
00559 
00560 
00561 bool WvStream::iswritable()
00562 {
00563     return !stop_write && isok() && select(0, false, true, false);
00564 }
00565 
00566 
00567 char *WvStream::blocking_getline(time_t wait_msec, int separator,
00568                                  int readahead)
00569 {
00570     assert(separator >= 0);
00571     assert(separator <= 255);
00572     
00573     //assert(uses_continue_select || wait_msec == 0);
00574 
00575     struct timeval timeout_time;
00576     if (wait_msec > 0)
00577         timeout_time = msecadd(wvtime(), wait_msec);
00578     
00579     maybe_autoclose();
00580 
00581     // if we get here, we either want to wait a bit or there is data
00582     // available.
00583     while (isok())
00584     {
00585         // fprintf(stderr, "(inbuf used = %d)\n", inbuf.used()); fflush(stderr);
00586         queuemin(0);
00587     
00588         // if there is a newline already, we have enough data.
00589         if (inbuf.strchr(separator) > 0)
00590             break;
00591         else if (!isok() || stop_read)    // uh oh, stream is in trouble.
00592             break;
00593 
00594         // make select not return true until more data is available
00595         queuemin(inbuf.used() + 1);
00596 
00597         // compute remaining timeout
00598         if (wait_msec > 0)
00599         {
00600             wait_msec = msecdiff(timeout_time, wvtime());
00601             if (wait_msec < 0)
00602                 wait_msec = 0;
00603         }
00604         
00605         // FIXME: this is blocking_getline.  It shouldn't
00606         // call continue_select()!
00607         bool hasdata;
00608         if (wait_msec != 0 && uses_continue_select)
00609             hasdata = continue_select(wait_msec);
00610         else
00611             hasdata = select(wait_msec, true, false);
00612         
00613         if (!isok())
00614             break;
00615 
00616         if (hasdata)
00617         {
00618             // read a few bytes
00619             WvDynBuf tmp;
00620             unsigned char *buf = tmp.alloc(readahead);
00621             assert(buf);
00622             size_t len = uread(buf, readahead);
00623             tmp.unalloc(readahead - len);
00624             inbuf.put(tmp.get(len), len);
00625             hasdata = len > 0; // enough?
00626         }
00627 
00628         if (!isok())
00629             break;
00630         
00631         if (!hasdata && wait_msec == 0)
00632             return NULL; // handle timeout
00633     }
00634     if (!inbuf.used())
00635         return NULL;
00636 
00637     // return the appropriate data
00638     size_t i = 0;
00639     i = inbuf.strchr(separator);
00640     if (i > 0) {
00641         char *eol = (char *)inbuf.mutablepeek(i - 1, 1);
00642         assert(eol);
00643         *eol = 0;
00644         return const_cast<char*>((const char *)inbuf.get(i));
00645     } else {
00646         // handle "EOF without newline" condition
00647         // FIXME: it's very silly that buffers can't return editable
00648         // char* arrays.
00649         inbuf.alloc(1)[0] = 0; // null-terminate it
00650         return const_cast<char *>((const char *)inbuf.get(inbuf.used()));
00651     }
00652 }
00653 
00654 
00655 char *WvStream::continue_getline(time_t wait_msec, int separator,
00656                                  int readahead)
00657 {
00658     assert(false && "not implemented, come back later!");
00659     assert(uses_continue_select);
00660     return NULL;
00661 }
00662 
00663 
00664 void WvStream::drain()
00665 {
00666     char buf[1024];
00667     while (isreadable())
00668         read(buf, sizeof(buf));
00669 }
00670 
00671 
00672 bool WvStream::flush(time_t msec_timeout)
00673 {
00674     if (is_flushing) return false;
00675     
00676     TRACE("%p flush starts\n", this);
00677 
00678     is_flushing = true;
00679     want_to_flush = true;
00680     bool done = flush_internal(msec_timeout) // any other internal buffers
00681         && flush_outbuf(msec_timeout);  // our own outbuf
00682     is_flushing = false;
00683 
00684     TRACE("flush stops (%d)\n", done);
00685     return done;
00686 }
00687 
00688 
00689 bool WvStream::should_flush()
00690 {
00691     return want_to_flush;
00692 }
00693 
00694 
00695 bool WvStream::flush_outbuf(time_t msec_timeout)
00696 {
00697     TRACE("%p flush_outbuf starts (isok=%d)\n", this, isok());
00698     bool outbuf_was_used = outbuf.used();
00699     
00700     // do-nothing shortcut for speed
00701     // FIXME: definitely makes a "measurable" difference...
00702     //   but is it worth the risk?
00703     if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
00704     {
00705         maybe_autoclose();
00706         return true;
00707     }
00708     
00709     WvTime stoptime = msecadd(wvtime(), msec_timeout);
00710     
00711     // flush outbuf
00712     while (outbuf_was_used && isok())
00713     {
00714 //      fprintf(stderr, "%p: fd:%d/%d, used:%d\n", 
00715 //              this, getrfd(), getwfd(), outbuf.used());
00716         
00717         size_t attempt = outbuf.optgettable();
00718         size_t real = uwrite(outbuf.get(attempt), attempt);
00719         
00720         // WARNING: uwrite() may have messed up our outbuf!
00721         // This probably only happens if uwrite() closed the stream because
00722         // of an error, so we'll check isok().
00723         if (isok() && real < attempt)
00724         {
00725             TRACE("flush_outbuf: unget %d-%d\n", attempt, real);
00726             assert(outbuf.ungettable() >= attempt - real);
00727             outbuf.unget(attempt - real);
00728         }
00729         
00730         // since post_select() can call us, and select() calls post_select(),
00731         // we need to be careful not to call select() if we don't need to!
00732         // post_select() will only call us with msec_timeout==0, and we don't
00733         // need to do select() in that case anyway.
00734         if (!msec_timeout)
00735             break;
00736         if (msec_timeout >= 0 
00737           && (stoptime < wvtime() || !select(msec_timeout, false, true)))
00738             break;
00739         
00740         outbuf_was_used = outbuf.used();
00741     }
00742 
00743     // handle autoclose
00744     if (autoclose_time && isok())
00745     {
00746         time_t now = time(NULL);
00747         TRACE("Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n", 
00748               this, now - autoclose_time, outbuf.used());
00749         if ((flush_internal(0) && !outbuf.used()) || now > autoclose_time)
00750         {
00751             autoclose_time = 0; // avoid infinite recursion!
00752             close();
00753         }
00754     }
00755 
00756     TRACE("flush_outbuf: after autoclose chunk\n");
00757     if (outbuf_delayed_flush && !outbuf_was_used)
00758         want_to_flush = false;
00759     
00760     TRACE("flush_outbuf: now isok=%d\n", isok());
00761 
00762     // if we can't flush the outbuf, at least empty it!
00763     if (outbuf_was_used && !isok())
00764         outbuf.zap();
00765 
00766     maybe_autoclose();
00767     TRACE("flush_outbuf stops\n");
00768     
00769     return !outbuf_was_used;
00770 }
00771 
00772 
00773 bool WvStream::flush_internal(time_t msec_timeout)
00774 {
00775     // once outbuf emptied, that's it for most streams
00776     return true;
00777 }
00778 
00779 
00780 int WvStream::getrfd() const
00781 {
00782     return -1;
00783 }
00784 
00785 
00786 int WvStream::getwfd() const
00787 {
00788     return -1;
00789 }
00790 
00791 
00792 void WvStream::flush_then_close(int msec_timeout)
00793 {
00794     time_t now = time(NULL);
00795     autoclose_time = now + (msec_timeout + 999) / 1000;
00796     
00797     TRACE("Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n", 
00798             this, outbuf.used(), autoclose_time - now);
00799 
00800     // as a fast track, we _could_ close here: but that's not a good idea,
00801     // since flush_then_close() deals with obscure situations, and we don't
00802     // want the caller to use it incorrectly.  So we make things _always_
00803     // break when the caller forgets to call select() later.
00804     
00805     flush(0);
00806 }
00807 
00808 
00809 bool WvStream::pre_select(SelectInfo &si)
00810 {
00811     maybe_autoclose();
00812     
00813     time_t alarmleft = alarm_remaining();
00814     
00815     if (!si.inherit_request && alarmleft == 0)
00816         return true; // alarm has rung
00817 
00818     if (!si.inherit_request)
00819     {
00820         si.wants.readable |= readcb;
00821         si.wants.writable |= writecb;
00822         si.wants.isexception |= exceptcb;
00823     }
00824     
00825     // handle read-ahead buffering
00826     if (si.wants.readable && inbuf.used() && inbuf.used() >= queue_min)
00827         return true; // already ready
00828     if (alarmleft >= 0
00829       && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
00830         si.msec_timeout = alarmleft + 10;
00831     return false;
00832 }
00833 
00834 
00835 bool WvStream::post_select(SelectInfo &si)
00836 {
00837     // FIXME: need sane buffer flush support for non FD-based streams
00838     // FIXME: need read_requires_writable and write_requires_readable
00839     //        support for non FD-based streams
00840     
00841     // note: flush(nonzero) might call select(), but flush(0) never does,
00842     // so this is safe.
00843     if (should_flush())
00844         flush(0);
00845     if (!si.inherit_request && alarm_remaining() == 0)
00846         return true; // alarm ticked
00847     return false;
00848 }
00849 
00850 
00851 bool WvStream::_build_selectinfo(SelectInfo &si, time_t msec_timeout,
00852     bool readable, bool writable, bool isexcept, bool forceable)
00853 {
00854     FD_ZERO(&si.read);
00855     FD_ZERO(&si.write);
00856     FD_ZERO(&si.except);
00857     
00858     if (forceable)
00859     {
00860         si.wants.readable = readcb;
00861         si.wants.writable = writecb;
00862         si.wants.isexception = exceptcb;
00863     }
00864     else
00865     {
00866         si.wants.readable = readable;
00867         si.wants.writable = writable;
00868         si.wants.isexception = isexcept;
00869     }
00870     
00871     si.max_fd = -1;
00872     si.msec_timeout = msec_timeout;
00873     si.inherit_request = ! forceable;
00874     si.global_sure = false;
00875 
00876     if (!isok()) return false;
00877 
00878     wvstime_sync();
00879 
00880     bool sure = pre_select(si);
00881     if (globalstream && forceable && (globalstream != this))
00882     {
00883         WvStream *s = globalstream;
00884         globalstream = NULL; // prevent recursion
00885         si.global_sure = s->xpre_select(si, SelectRequest(false, false, false));
00886         globalstream = s;
00887     }
00888     if (sure || si.global_sure)
00889         si.msec_timeout = 0;
00890     return sure;
00891 }
00892 
00893 
00894 int WvStream::_do_select(SelectInfo &si)
00895 {
00896     // prepare timeout
00897     timeval tv;
00898     tv.tv_sec = si.msec_timeout / 1000;
00899     tv.tv_usec = (si.msec_timeout % 1000) * 1000;
00900     
00901 #ifdef _WIN32
00902     // selecting on an empty set of sockets doesn't cause a delay in win32.
00903     SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
00904     FD_SET(fakefd, &si.except);
00905 #endif    
00906     
00907     // block
00908     int sel = ::select(si.max_fd+1, &si.read, &si.write, &si.except,
00909         si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
00910 
00911     // handle errors.
00912     //   EAGAIN and EINTR don't matter because they're totally normal.
00913     //   ENOBUFS is hopefully transient.
00914     //   EBADF is kind of gross and might imply that something is wrong,
00915     //      but it happens sometimes...
00916     if (sel < 0 
00917       && errno != EAGAIN && errno != EINTR 
00918       && errno != EBADF
00919       && errno != ENOBUFS
00920       )
00921     {
00922         seterr(errno);
00923     }
00924 #ifdef _WIN32
00925     ::close(fakefd);
00926 #endif
00927     TRACE("select() returned %d\n", sel);
00928     return sel;
00929 }
00930 
00931 
00932 bool WvStream::_process_selectinfo(SelectInfo &si, bool forceable)
00933 {
00934     if (!isok()) return false;
00935 
00936     wvstime_sync();
00937         
00938     bool sure = post_select(si);
00939     if (globalstream && forceable && (globalstream != this))
00940     {
00941         WvStream *s = globalstream;
00942         globalstream = NULL; // prevent recursion
00943         si.global_sure = s->xpost_select(si, SelectRequest(false, false, false))
00944                                          || si.global_sure;
00945         globalstream = s;
00946     }
00947     return sure;
00948 }
00949 
00950 
00951 bool WvStream::_select(time_t msec_timeout,
00952     bool readable, bool writable, bool isexcept, bool forceable)
00953 {
00954     assert(wsid_map && wsid_map->exists(my_wsid)); // Detect use of deleted stream
00955         
00956     SelectInfo si;
00957     bool sure = _build_selectinfo(si, msec_timeout,
00958                                   readable, writable, isexcept, forceable);
00959     
00960     if (!isok())
00961         return false;
00962     
00963     // the eternal question: if 'sure' is true already, do we need to do the
00964     // rest of this stuff?  If we do, it might increase fairness a bit, but
00965     // it encourages select()ing when we know something fishy has happened -
00966     // when a stream is !isok() in a list, for example, pre_select() returns
00967     // true.  If that's the case, our SelectInfo structure might not be
00968     // quite right (eg. it might be selecting on invalid fds).  That doesn't
00969     // sound *too* bad, so let's go for the fairness.
00970 
00971     int sel = _do_select(si);
00972     if (sel >= 0)
00973         sure = _process_selectinfo(si, forceable) || sure; // note the order
00974     if (si.global_sure && globalstream && forceable && (globalstream != this))
00975         globalstream->callback();
00976     return sure;
00977 }
00978 
00979 
00980 IWvStream::SelectRequest WvStream::get_select_request()
00981 {
00982     return IWvStream::SelectRequest(readcb, writecb, exceptcb);
00983 }
00984 
00985 
00986 void WvStream::force_select(bool readable, bool writable, bool isexception)
00987 {
00988     if (readable)
00989         readcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00990     if (writable)
00991         writecb = IWvStreamCallback(this, &WvStream::legacy_callback);
00992     if (isexception)
00993         exceptcb = IWvStreamCallback(this, &WvStream::legacy_callback);
00994 }
00995 
00996 
00997 void WvStream::undo_force_select(bool readable, bool writable, bool isexception)
00998 {
00999     if (readable)
01000         readcb = 0;
01001     if (writable)
01002         writecb = 0;
01003     if (isexception)
01004         exceptcb = 0;
01005 }
01006 
01007 
01008 void WvStream::alarm(time_t msec_timeout)
01009 {
01010     if (msec_timeout >= 0)
01011         alarm_time = msecadd(wvstime(), msec_timeout);
01012     else
01013         alarm_time = wvtime_zero;
01014 }
01015 
01016 
01017 time_t WvStream::alarm_remaining()
01018 {
01019     if (alarm_time.tv_sec)
01020     {
01021         WvTime now = wvstime();
01022 
01023         // Time is going backward!
01024         if (now < last_alarm_check)
01025         {
01026 #if 0 // okay, I give up.  Time just plain goes backwards on some systems.
01027             // warn only if it's a "big" difference (sigh...)
01028             if (msecdiff(last_alarm_check, now) > 200)
01029                 fprintf(stderr, " ************* TIME WENT BACKWARDS! "
01030                         "(%ld:%ld %ld:%ld)\n",
01031                         last_alarm_check.tv_sec, last_alarm_check.tv_usec,
01032                         now.tv_sec, now.tv_usec);
01033 #endif
01034             alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
01035         }
01036 
01037         last_alarm_check = now;
01038 
01039         time_t remaining = msecdiff(alarm_time, now);
01040         if (remaining < 0)
01041             remaining = 0;
01042         return remaining;
01043     }
01044     return -1;
01045 }
01046 
01047 
01048 bool WvStream::continue_select(time_t msec_timeout)
01049 {
01050     assert(uses_continue_select);
01051     
01052     // if this assertion triggers, you probably tried to do continue_select()
01053     // while inside terminate_continue_select().
01054     assert(call_ctx);
01055     
01056     if (msec_timeout >= 0)
01057         alarm(msec_timeout);
01058 
01059     alarm(msec_timeout);
01060     WvCont::yield();
01061     alarm(-1); // cancel the still-pending alarm, or it might go off later!
01062     
01063     // when we get here, someone has jumped back into our task.
01064     // We have to select(0) here because it's possible that the alarm was 
01065     // ticking _and_ data was available.  This is aggravated especially if
01066     // msec_delay was zero.  Note that running select() here isn't
01067     // inefficient, because if the alarm was expired then pre_select()
01068     // returned true anyway and short-circuited the previous select().
01069     TRACE("hello-%p\n", this);
01070     return !alarm_was_ticking || select(0, readcb, writecb, exceptcb);
01071 }
01072 
01073 
01074 void WvStream::terminate_continue_select()
01075 {
01076     close();
01077     call_ctx = 0; // destroy the context, if necessary
01078 }
01079 
01080 
01081 const WvAddr *WvStream::src() const
01082 {
01083     return NULL;
01084 }
01085 
01086 
01087 void WvStream::setcallback(WvStreamCallback _callfunc, void *_userdata)
01088 { 
01089     callfunc = _callfunc;
01090     userdata = _userdata;
01091     call_ctx = 0; // delete any in-progress WvCont
01092 }
01093 
01094 
01095 void WvStream::legacy_callback(IWvStream& s)
01096 {
01097     execute();
01098     if (!! callfunc)
01099         callfunc(*this, userdata);
01100 }
01101 
01102 
01103 IWvStreamCallback WvStream::setreadcallback(IWvStreamCallback _callback)
01104 {
01105     IWvStreamCallback tmp = readcb;
01106 
01107     readcb = _callback;
01108 
01109     return tmp;
01110 }
01111 
01112 
01113 IWvStreamCallback WvStream::setwritecallback(IWvStreamCallback _callback)
01114 {
01115     IWvStreamCallback tmp = writecb;
01116 
01117     writecb = _callback;
01118 
01119     return tmp;
01120 }
01121 
01122 
01123 IWvStreamCallback WvStream::setexceptcallback(IWvStreamCallback _callback)
01124 {
01125     IWvStreamCallback tmp = exceptcb;
01126 
01127     exceptcb = _callback;
01128 
01129     return tmp;
01130 }
01131 
01132 
01133 IWvStreamCallback WvStream::setclosecallback(IWvStreamCallback _callback)
01134 {
01135     IWvStreamCallback tmp = closecb;
01136     if (isok())
01137         closecb = _callback;
01138     else
01139     {
01140         // already closed?  notify immediately!
01141         closecb = 0;
01142         if (!!_callback)
01143             _callback(*this);
01144     }
01145     return tmp;
01146 }
01147 
01148 
01149 void WvStream::unread(WvBuf &unreadbuf, size_t count)
01150 {
01151     WvDynBuf tmp;
01152     tmp.merge(unreadbuf, count);
01153     tmp.merge(inbuf);
01154     inbuf.zap();
01155     inbuf.merge(tmp);
01156 }
01157 
01158 
01159 IWvStream *WvStream::find_by_wsid(WSID wsid)
01160 {
01161     WvStream **presult = wsid_map? wsid_map->find(wsid): NULL;
01162     if (presult)
01163         return *presult;
01164     else
01165         return NULL;   
01166 }

Generated on Fri Oct 5 18:20:28 2007 for WvStreams by  doxygen 1.5.3