00001
00002
00003
00004
00005
00006
00007
00008
00009 #include "wvhttppool.h"
00010 #include "wvtcp.h"
00011 #include "wvsslstream.h"
00012 #include "wvbuf.h"
00013 #include "wvbase64.h"
00014 #include "strutils.h"
00015 #ifdef HAVE_EXECINFO_H
00016 #include <execinfo.h>
00017 #endif
00018
00019 #ifdef _WIN32
00020 #define ETIMEDOUT WSAETIMEDOUT
00021 #endif
00022
00023 WvHttpStream::WvHttpStream(const WvIPPortAddr &_remaddr, WvStringParm _username,
00024 bool _ssl, WvIPPortAddrTable &_pipeline_incompatible)
00025 : WvUrlStream(_remaddr, _username, WvString("HTTP %s", _remaddr)),
00026 pipeline_incompatible(_pipeline_incompatible),
00027 in_doneurl(false)
00028 {
00029 log("Opening server connection.\n");
00030 http_response = "";
00031 encoding = Unknown;
00032 bytes_remaining = 0;
00033 in_chunk_trailer = false;
00034 pipeline_test_count = 0;
00035 last_was_pipeline_test = false;
00036
00037 enable_pipelining = global_enable_pipelining
00038 && !pipeline_incompatible[target.remaddr];
00039 ssl = _ssl;
00040
00041 if (ssl)
00042 cloned = new WvSSLStream(static_cast<WvFDStream*>(cloned));
00043
00044 sent_url_request = false;
00045
00046 alarm(60000);
00047 }
00048
00049
00050 WvHttpStream::~WvHttpStream()
00051 {
00052 log(WvLog::Debug2, "Deleting.\n");
00053
00054 #if 0
00055 #ifdef HAVE_EXECINFO_H
00056 void* trace[10];
00057 int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
00058 char** tracedump = backtrace_symbols(trace, count);
00059 log(WvLog::Debug, "TRACE");
00060 for (int i = 0; i < count; ++i)
00061 log(WvLog::Debug, ":%s", tracedump[i]);
00062 log(WvLog::Debug, "\n");
00063 free(tracedump);
00064 #endif
00065 #endif
00066
00067 if (geterr())
00068 log("Error was: %s\n", errstr());
00069 close();
00070 }
00071
00072
00073 void WvHttpStream::close()
00074 {
00075 log("close called\n");
00076
00077 #if 0
00078 #ifdef HAVE_EXECINFO_H
00079 void *trace[10];
00080 int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
00081 char** tracedump = backtrace_symbols(trace, count);
00082 log(WvLog::Debug, "TRACE");
00083 for (int i = 0; i < count; ++i)
00084 log(WvLog::Debug, ":%s", tracedump[i]);
00085 log(WvLog::Debug, "\n");
00086 free(tracedump);
00087 #endif
00088 #endif
00089
00090
00091
00092 if (enable_pipelining && max_requests > 1
00093 && (pipeline_test_count < 1
00094 || (pipeline_test_count == 1 && last_was_pipeline_test)))
00095 pipelining_is_broken(2);
00096
00097 if (isok())
00098 log("Closing.\n");
00099 WvStreamClone::close();
00100
00101 if (geterr())
00102 {
00103
00104
00105 WvUrlRequest *msgurl = curl;
00106 if (!msgurl && !urls.isempty())
00107 msgurl = urls.first();
00108 if (!msgurl && !waiting_urls.isempty())
00109 msgurl = waiting_urls.first();
00110 if (msgurl)
00111 log("URL '%s' is FAILED (%s (%s))\n", msgurl->url, geterr(),
00112 errstr());
00113 }
00114 waiting_urls.zap();
00115 if (curl)
00116 {
00117 log("curl is %s\n", curl->url);
00118 doneurl();
00119 }
00120 log("close done\n");
00121 }
00122
00123
00124 void WvHttpStream::doneurl()
00125 {
00126
00127
00128
00129
00130
00131 if (in_doneurl)
00132 return;
00133 in_doneurl = true;
00134
00135 assert(curl != NULL);
00136 WvString last_response(http_response);
00137 log("Done URL: %s\n", curl->url);
00138
00139 http_response = "";
00140 encoding = Unknown;
00141 in_chunk_trailer = false;
00142 bytes_remaining = 0;
00143
00144 last_was_pipeline_test = curl->pipeline_test;
00145 bool broken = false;
00146 if (last_was_pipeline_test)
00147 {
00148 pipeline_test_count++;
00149 if (pipeline_test_count == 1)
00150 start_pipeline_test(&curl->url);
00151 else if (pipeline_test_response != last_response)
00152 {
00153
00154
00155
00156 pipelining_is_broken(4);
00157 broken = true;
00158 }
00159 pipeline_test_response = last_response;
00160 }
00161
00162 assert(curl == urls.first());
00163 curl->done();
00164 curl = NULL;
00165 sent_url_request = false;
00166 urls.unlink_first();
00167
00168 if (broken)
00169 close();
00170
00171 request_next();
00172 in_doneurl = false;
00173 }
00174
00175
00176 static WvString encode64(WvStringParm user, WvStringParm password)
00177 {
00178 WvBase64Encoder encoder;
00179 WvString ret;
00180 encoder.flushstrstr(WvString("%s:%s", user, password), ret);
00181 return ret;
00182 }
00183
00184
00185 static WvString fixnl(WvStringParm nonl)
00186 {
00187 WvDynBuf b;
00188 const char *cptr;
00189
00190 for (cptr = nonl; cptr && *cptr; cptr++)
00191 {
00192 if (*cptr == '\r')
00193 continue;
00194 else if (*cptr == '\n')
00195 b.put("\r", 1);
00196 b.put(cptr, 1);
00197 }
00198
00199 return b.getstr();
00200 }
00201
00202
00203 WvString WvHttpStream::request_str(WvUrlRequest *url, bool keepalive)
00204 {
00205 WvString request;
00206 WvString auth(""), content = putstream_data.getstr();
00207 if(!!url->url.getuser() && !!url->url.getpassword())
00208 auth = WvString("Authorization: Basic %s\n",
00209 encode64(url->url.getuser(), url->url.getpassword()));
00210
00211 request = fixnl(WvString("%s %s HTTP/1.1\n"
00212 "Host: %s:%s\n"
00213 "Connection: %s\n"
00214 "%s"
00215 "%s"
00216 "%s%s"
00217 "\n"
00218 "%s",
00219 url->method,
00220 url->url.getfile(),
00221 url->url.gethost(), url->url.getport(),
00222 keepalive ? "keep-alive" : "close",
00223 auth,
00224 (content.len() > 0 ? WvString("Content-Length: %s\n", content.len()).cstr() : ""),
00225 trim_string(url->headers.edit()),
00226 !!url->headers ? "\n" : "",
00227 (content.len() > 0 ? content.cstr() : ""))
00228 );
00229 return request;
00230 }
00231
00232
00233 void WvHttpStream::send_request(WvUrlRequest *url)
00234 {
00235 request_count++;
00236 log("Request #%s: %s\n", request_count, url->url);
00237 write(request_str(url, url->pipeline_test
00238 || request_count < max_requests));
00239 sent_url_request = true;
00240 alarm(60000);
00241 }
00242
00243
00244 void WvHttpStream::start_pipeline_test(WvUrl *url)
00245 {
00246 WvUrl location(WvString(
00247 "%s://%s:%s/wvhttp-pipeline-check-should-not-exist/",
00248 url->getproto(), url->gethost(), url->getport()));
00249 WvUrlRequest *testurl = new WvUrlRequest(location, "HEAD", "", NULL,
00250 false, true);
00251 testurl->instream = this;
00252 send_request(testurl);
00253 urls.append(testurl, true, "sent_running_url");
00254 }
00255
00256
00257 void WvHttpStream::request_next()
00258 {
00259
00260 putstream_data.zap();
00261
00262
00263
00264 if (request_count >= max_requests || waiting_urls.isempty())
00265 return;
00266
00267
00268 if (!enable_pipelining && !urls.isempty())
00269 return;
00270
00271
00272 WvUrlRequest *url = waiting_urls.first();
00273
00274 waiting_urls.unlink_first();
00275 if (!url->putstream)
00276 {
00277 if (enable_pipelining && !request_count && max_requests > 1)
00278 start_pipeline_test(&url->url);
00279 send_request(url);
00280 }
00281 urls.append(url, false, "sent_running_url");
00282 }
00283
00284
00285 void WvHttpStream::pipelining_is_broken(int why)
00286 {
00287 if (!pipeline_incompatible[target.remaddr])
00288 {
00289 pipeline_incompatible.add(new WvIPPortAddr(target.remaddr), true);
00290 log("Pipelining is broken on this server (%s)! Disabling.\n", why);
00291 }
00292 }
00293
00294
00295 void WvHttpStream::pre_select(SelectInfo &si)
00296 {
00297 SelectRequest oldwant = si.wants;
00298 WvUrlRequest *url;
00299
00300 WvUrlStream::pre_select(si);
00301
00302 if (!urls.isempty())
00303 {
00304 url = urls.first();
00305 if(url && url->putstream)
00306 url->putstream->pre_select(si);
00307 }
00308
00309 si.wants = oldwant;
00310 }
00311
00312
00313 bool WvHttpStream::post_select(SelectInfo &si)
00314 {
00315 SelectRequest oldwant = si.wants;
00316 WvUrlRequest *url;
00317
00318 if (WvUrlStream::post_select(si))
00319 return true;
00320
00321 if (!urls.isempty())
00322 {
00323 url = urls.first();
00324 if(url && url->putstream && url->putstream->post_select(si))
00325 return true;
00326 }
00327
00328 si.wants = oldwant;
00329 return false;
00330 }
00331
00332
00333 void WvHttpStream::execute()
00334 {
00335 char buf[1024], *line;
00336 size_t len;
00337
00338 WvStreamClone::execute();
00339
00340
00341 if (alarm_was_ticking)
00342 {
00343 log(WvLog::Debug4, "urls count: %s\n", urls.count());
00344 if (!urls.isempty())
00345 {
00346 seterr(ETIMEDOUT);
00347
00348
00349
00350
00351 if (!urls.isempty())
00352 {
00353 WvUrlRequest *url = urls.first();
00354 if (url->outstream)
00355 url->outstream->seterr(ETIMEDOUT);
00356 }
00357 }
00358 else
00359 close();
00360 return;
00361 }
00362
00363
00364
00365
00366 if (curl && !curl->outstream)
00367 {
00368 if (!(encoding == PostHeadInfinity
00369 || encoding == PostHeadChunked
00370 || encoding == PostHeadStream))
00371 {
00372
00373 pipeline_test_count++;
00374 last_was_pipeline_test = false;
00375 close();
00376 }
00377
00378 if (curl)
00379 doneurl();
00380 return;
00381 }
00382 else if (curl)
00383 curl->inuse = true;
00384
00385 if(!sent_url_request && !urls.isempty())
00386 {
00387 WvUrlRequest *url = urls.first();
00388 if(url)
00389 {
00390 if(url->putstream)
00391 {
00392 int len = 0;
00393 if(url->putstream->isok())
00394 len = url->putstream->read(putstream_data, 1024);
00395
00396 if(!url->putstream->isok() || len == 0)
00397 {
00398 url->putstream = NULL;
00399 send_request(url);
00400 }
00401 }
00402 }
00403 }
00404
00405 if (!curl)
00406 {
00407
00408 line = getline();
00409 if (line)
00410 {
00411 line = trim_string(line);
00412 log(WvLog::Debug4, "Header: '%s'\n", line);
00413 if (!http_response)
00414 {
00415 http_response = line;
00416
00417
00418
00419
00420
00421 if (last_was_pipeline_test
00422 && http_response == pipeline_test_response)
00423 {
00424 pipelining_is_broken(1);
00425 close();
00426 return;
00427 }
00428
00429
00430
00431
00432
00433
00434 if (last_was_pipeline_test && !!http_response)
00435 {
00436 const char *cptr = strchr(http_response, ' ');
00437 if (cptr && atoi(cptr+1) == 400)
00438 {
00439 pipelining_is_broken(3);
00440 close();
00441 return;
00442 }
00443 }
00444 }
00445
00446 if (urls.isempty())
00447 {
00448 log("got unsolicited data.\n");
00449 seterr("unsolicited data from server!");
00450 return;
00451 }
00452
00453 if (!strncasecmp(line, "Content-length: ", 16))
00454 {
00455 bytes_remaining = atoi(line+16);
00456 encoding = ContentLength;
00457 }
00458 else if (!strncasecmp(line, "Transfer-Encoding: ", 19)
00459 && strstr(line+19, "chunked"))
00460 {
00461 encoding = Chunked;
00462 }
00463
00464 if (line[0])
00465 {
00466 char *p;
00467 WvBufUrlStream *outstream = urls.first()->outstream;
00468
00469 if ((p = strchr(line, ':')) != NULL)
00470 {
00471 *p = 0;
00472 p = trim_string(p+1);
00473 if (outstream) {
00474 struct WvHTTPHeader *h;
00475 h = new struct WvHTTPHeader(line, p);
00476 outstream->headers.add(h, true);
00477 }
00478 }
00479 else if (strncasecmp(line, "HTTP/", 5) == 0)
00480 {
00481 char *p = strchr(line, ' ');
00482 if (p)
00483 {
00484 *p = 0;
00485 if (outstream)
00486 {
00487 outstream->version = line+5;
00488 outstream->status = atoi(p+1);
00489 }
00490 }
00491 }
00492 }
00493 else
00494 {
00495
00496 curl = urls.first();
00497 in_chunk_trailer = false;
00498 log(WvLog::Debug4,
00499 "Starting data: %s (enc=%s)\n", bytes_remaining, encoding);
00500
00501 if (encoding == Unknown)
00502 encoding = Infinity;
00503
00504 if (curl->method == "HEAD")
00505 {
00506 log("Got all headers.\n");
00507 if (!enable_pipelining)
00508 doneurl();
00509
00510 if (encoding == Infinity)
00511 encoding = PostHeadInfinity;
00512 else if (encoding == Chunked)
00513 encoding = PostHeadChunked;
00514 else
00515 encoding = PostHeadStream;
00516 }
00517 }
00518 }
00519 }
00520 else if (encoding == PostHeadInfinity
00521 || encoding == PostHeadChunked
00522 || encoding == PostHeadStream)
00523 {
00524 WvDynBuf chkbuf;
00525 len = read(chkbuf, 5);
00526
00527
00528
00529
00530 if (len && strncmp(reinterpret_cast<const char *>(chkbuf.peek(0, 5)),
00531 "HTTP/", 5))
00532 {
00533 if (encoding == PostHeadInfinity)
00534 encoding = ChuckInfinity;
00535 else if (encoding == PostHeadChunked)
00536 encoding = ChuckChunked;
00537 else if (encoding == PostHeadStream)
00538 encoding = ChuckStream;
00539 else
00540 log(WvLog::Warning, "WvHttpStream: inconsistent state.\n");
00541 }
00542 else
00543 doneurl();
00544
00545 unread(chkbuf, len);
00546 }
00547 else if (encoding == ChuckInfinity)
00548 {
00549 len = read(buf, sizeof(buf));
00550 if (len)
00551 log(WvLog::Debug5, "Chucking %s bytes.\n", len);
00552 if (!isok())
00553 doneurl();
00554 }
00555 else if (encoding == ChuckChunked && !bytes_remaining)
00556 {
00557 encoding = Chunked;
00558 }
00559 else if (encoding == ChuckChunked || encoding == ChuckStream)
00560 {
00561 if (bytes_remaining > sizeof(buf))
00562 len = read(buf, sizeof(buf));
00563 else
00564 len = read(buf, bytes_remaining);
00565 bytes_remaining -= len;
00566 if (len)
00567 log(WvLog::Debug5,
00568 "Chucked %s bytes (%s bytes left).\n", len, bytes_remaining);
00569 if (!bytes_remaining && encoding == ContentLength)
00570 doneurl();
00571 if (bytes_remaining && !isok())
00572 seterr("connection interrupted");
00573 }
00574 else if (encoding == Chunked && !bytes_remaining)
00575 {
00576 line = getline();
00577 if (line)
00578 {
00579 line = trim_string(line);
00580
00581 if (in_chunk_trailer)
00582 {
00583
00584 log(WvLog::Debug4, "Trailer: '%s'\n", line);
00585
00586
00587 if (!line[0])
00588 doneurl();
00589 }
00590 else
00591 {
00592
00593 if (line[0])
00594 {
00595 bytes_remaining = (size_t)strtoul(line, NULL, 16);
00596 if (!bytes_remaining)
00597 in_chunk_trailer = true;
00598 log(WvLog::Debug4, "Chunk length is %s ('%s').\n",
00599 bytes_remaining, line);
00600 }
00601 }
00602 }
00603 }
00604 else if (encoding == Infinity)
00605 {
00606
00607
00608
00609 len = read(buf, sizeof(buf));
00610 if (!isok())
00611 return;
00612
00613 if (len)
00614 log(WvLog::Debug5, "Infinity: read %s bytes.\n", len);
00615 if (curl && curl->outstream)
00616 curl->outstream->write(buf, len);
00617
00618 if (!isok() && curl)
00619 doneurl();
00620 }
00621 else
00622 {
00623
00624
00625
00626 if (bytes_remaining > sizeof(buf))
00627 len = read(buf, sizeof(buf));
00628 else
00629 len = read(buf, bytes_remaining);
00630 if (!isok())
00631 return;
00632
00633 bytes_remaining -= len;
00634 if (len)
00635 log(WvLog::Debug5,
00636 "Read %s bytes (%s bytes left).\n", len, bytes_remaining);
00637 if (curl && curl->outstream)
00638 curl->outstream->write(buf, len);
00639
00640 if (!bytes_remaining && encoding == ContentLength && curl)
00641 doneurl();
00642
00643 if (bytes_remaining && !isok())
00644 seterr("connection interrupted");
00645
00646 if (!isok())
00647 doneurl();
00648 }
00649
00650 if (urls.isempty())
00651 alarm(5000);
00652 else
00653 alarm(60000);
00654 }