00001
00002
00003
00004
00005
00006
00007
00008
00009 #include "wvhttppool.h"
00010 #include "wvtcp.h"
00011 #include "wvsslstream.h"
00012 #include "wvbuf.h"
00013 #include "wvbase64.h"
00014 #include "strutils.h"
00015 #ifndef _WIN32
00016 #include <execinfo.h>
00017 #endif
00018
00019 #ifdef _WIN32
00020 #define ETIMEDOUT WSAETIMEDOUT
00021 #endif
00022
00023 WvHttpStream::WvHttpStream(const WvIPPortAddr &_remaddr, WvStringParm _username,
00024 bool _ssl, WvIPPortAddrTable &_pipeline_incompatible)
00025 : WvUrlStream(_remaddr, _username, WvString("HTTP %s", _remaddr)),
00026 pipeline_incompatible(_pipeline_incompatible), in_doneurl(false)
00027 {
00028 log("Opening server connection.\n");
00029 http_response = "";
00030 encoding = Unknown;
00031 bytes_remaining = 0;
00032 in_chunk_trailer = false;
00033 pipeline_test_count = 0;
00034 last_was_pipeline_test = false;
00035
00036 enable_pipelining = global_enable_pipelining
00037 && !pipeline_incompatible[target.remaddr];
00038 ssl = _ssl;
00039
00040 if (ssl)
00041 cloned = new WvSSLStream(static_cast<WvFDStream*>(cloned));
00042
00043 sent_url_request = false;
00044
00045 alarm(60000);
00046 }
00047
00048
00049 WvHttpStream::~WvHttpStream()
00050 {
00051 log(WvLog::Debug2, "Deleting.\n");
00052
00053 #ifndef _WIN32
00054 void* trace[10];
00055 int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
00056 char** tracedump = backtrace_symbols(trace, count);
00057 log(WvLog::Debug, "TRACE");
00058 for (int i = 0; i < count; ++i)
00059 log(WvLog::Debug, ":%s", tracedump[i]);
00060 log(WvLog::Debug, "\n");
00061 free(tracedump);
00062 #endif
00063
00064 if (geterr())
00065 log("Error was: %s\n", errstr());
00066 close();
00067 }
00068
00069
00070 void WvHttpStream::close()
00071 {
00072 log("close called\n");
00073
00074 #ifndef _WIN32
00075 void *trace[10];
00076 int count = backtrace(trace, sizeof(trace)/sizeof(trace[0]));
00077 char** tracedump = backtrace_symbols(trace, count);
00078 log(WvLog::Debug, "TRACE");
00079 for (int i = 0; i < count; ++i)
00080 log(WvLog::Debug, ":%s", tracedump[i]);
00081 log(WvLog::Debug, "\n");
00082 free(tracedump);
00083 #endif
00084
00085
00086
00087 if (enable_pipelining && max_requests > 1
00088 && (pipeline_test_count < 1
00089 || (pipeline_test_count == 1 && last_was_pipeline_test)))
00090 pipelining_is_broken(2);
00091
00092 if (isok())
00093 log("Closing.\n");
00094 WvStreamClone::close();
00095
00096 if (geterr())
00097 {
00098
00099
00100 WvUrlRequest *msgurl = curl;
00101 if (!msgurl && !urls.isempty())
00102 msgurl = urls.first();
00103 if (!msgurl && !waiting_urls.isempty())
00104 msgurl = waiting_urls.first();
00105 if (msgurl)
00106 log("URL '%s' is FAILED (%s (%s))\n", msgurl->url, geterr(),
00107 errstr());
00108 }
00109 waiting_urls.zap();
00110 if (curl)
00111 {
00112 log("curl is %s\n", curl->url);
00113 doneurl();
00114 }
00115 log("close done\n");
00116 }
00117
00118
00119 void WvHttpStream::doneurl()
00120 {
00121
00122
00123
00124
00125
00126 if (in_doneurl)
00127 return;
00128 in_doneurl = true;
00129
00130 assert(curl != NULL);
00131 WvString last_response(http_response);
00132 log("Done URL: %s\n", curl->url);
00133
00134 http_response = "";
00135 encoding = Unknown;
00136 in_chunk_trailer = false;
00137 bytes_remaining = 0;
00138
00139 last_was_pipeline_test = curl->pipeline_test;
00140 bool broken = false;
00141 if (last_was_pipeline_test)
00142 {
00143 pipeline_test_count++;
00144 if (pipeline_test_count == 1)
00145 start_pipeline_test(&curl->url);
00146 else if (pipeline_test_response != last_response)
00147 {
00148
00149
00150
00151 pipelining_is_broken(4);
00152 broken = true;
00153 }
00154 pipeline_test_response = last_response;
00155 }
00156
00157 assert(curl == urls.first());
00158 curl->done();
00159 curl = NULL;
00160 sent_url_request = false;
00161 urls.unlink_first();
00162
00163 if (broken)
00164 close();
00165
00166 request_next();
00167 in_doneurl = false;
00168 }
00169
00170
00171 static WvString encode64(WvStringParm user, WvStringParm password)
00172 {
00173 WvBase64Encoder encoder;
00174 WvString ret;
00175 encoder.flushstrstr(WvString("%s:%s", user, password), ret);
00176 return ret;
00177 }
00178
00179
00180 static WvString fixnl(WvStringParm nonl)
00181 {
00182 WvDynBuf b;
00183 const char *cptr;
00184
00185 for (cptr = nonl; cptr && *cptr; cptr++)
00186 {
00187 if (*cptr == '\r')
00188 continue;
00189 else if (*cptr == '\n')
00190 b.put("\r", 1);
00191 b.put(cptr, 1);
00192 }
00193
00194 return b.getstr();
00195 }
00196
00197
00198 WvString WvHttpStream::request_str(WvUrlRequest *url, bool keepalive)
00199 {
00200 WvString request;
00201 WvString auth(""), content = putstream_data.getstr();
00202 if(!!url->url.getuser() && !!url->url.getpassword())
00203 auth = WvString("Authorization: Basic %s\n",
00204 encode64(url->url.getuser(), url->url.getpassword()));
00205
00206 request = fixnl(WvString("%s %s HTTP/1.1\n"
00207 "Host: %s:%s\n"
00208 "Connection: %s\n"
00209 "%s"
00210 "%s"
00211 "%s%s"
00212 "\n"
00213 "%s",
00214 url->method,
00215 url->url.getfile(),
00216 url->url.gethost(), url->url.getport(),
00217 keepalive ? "keep-alive" : "close",
00218 auth,
00219 (content.len() > 0 ? WvString("Content-Length: %s\n", content.len()).cstr() : ""),
00220 trim_string(url->headers.edit()),
00221 !!url->headers ? "\n" : "",
00222 (content.len() > 0 ? content.cstr() : ""))
00223 );
00224 return request;
00225 }
00226
00227
00228 void WvHttpStream::send_request(WvUrlRequest *url)
00229 {
00230 request_count++;
00231 log("Request #%s: %s\n", request_count, url->url);
00232 write(request_str(url, url->pipeline_test
00233 || request_count < max_requests));
00234 sent_url_request = true;
00235 alarm(60000);
00236 }
00237
00238
00239 void WvHttpStream::start_pipeline_test(WvUrl *url)
00240 {
00241 WvUrl location(WvString(
00242 "%s://%s:%s/wvhttp-pipeline-check-should-not-exist/",
00243 url->getproto(), url->gethost(), url->getport()));
00244 WvUrlRequest *testurl = new WvUrlRequest(location, "HEAD", "", NULL,
00245 false, true);
00246 testurl->instream = this;
00247 send_request(testurl);
00248 urls.append(testurl, true, "sent_running_url");
00249 }
00250
00251
00252 void WvHttpStream::request_next()
00253 {
00254
00255 putstream_data.zap();
00256
00257
00258
00259 if (request_count >= max_requests || waiting_urls.isempty())
00260 return;
00261
00262
00263 if (!enable_pipelining && !urls.isempty())
00264 return;
00265
00266
00267 WvUrlRequest *url = waiting_urls.first();
00268
00269 waiting_urls.unlink_first();
00270 if (!url->putstream)
00271 {
00272 if (enable_pipelining && !request_count && max_requests > 1)
00273 start_pipeline_test(&url->url);
00274 send_request(url);
00275 }
00276 urls.append(url, false, "sent_running_url");
00277 }
00278
00279
00280 void WvHttpStream::pipelining_is_broken(int why)
00281 {
00282 if (!pipeline_incompatible[target.remaddr])
00283 {
00284 pipeline_incompatible.add(new WvIPPortAddr(target.remaddr), true);
00285 log("Pipelining is broken on this server (%s)! Disabling.\n", why);
00286 }
00287 }
00288
00289
00290 bool WvHttpStream::pre_select(SelectInfo &si)
00291 {
00292 SelectRequest oldwant = si.wants;
00293 WvUrlRequest *url;
00294
00295 if (WvUrlStream::pre_select(si))
00296 return true;
00297
00298 if (!urls.isempty())
00299 {
00300 url = urls.first();
00301 if(url && url->putstream && url->putstream->pre_select(si))
00302 return true;
00303 }
00304
00305 si.wants = oldwant;
00306 return false;
00307 }
00308
00309
00310 bool WvHttpStream::post_select(SelectInfo &si)
00311 {
00312 SelectRequest oldwant = si.wants;
00313 WvUrlRequest *url;
00314
00315 if (WvUrlStream::post_select(si))
00316 return true;
00317
00318 if (!urls.isempty())
00319 {
00320 url = urls.first();
00321 if(url && url->putstream && url->putstream->post_select(si))
00322 return true;
00323 }
00324
00325 si.wants = oldwant;
00326 return false;
00327 }
00328
00329
00330 void WvHttpStream::execute()
00331 {
00332 char buf[1024], *line;
00333 size_t len;
00334
00335 WvStreamClone::execute();
00336
00337
00338 if (alarm_was_ticking)
00339 {
00340 log(WvLog::Debug4, "urls count: %s\n", urls.count());
00341 if (!urls.isempty())
00342 {
00343 seterr(ETIMEDOUT);
00344
00345
00346
00347
00348 if (!urls.isempty())
00349 {
00350 WvUrlRequest *url = urls.first();
00351 if (url->outstream)
00352 url->outstream->seterr(ETIMEDOUT);
00353 }
00354 }
00355 else
00356 close();
00357 return;
00358 }
00359
00360
00361
00362
00363 if (curl && !curl->outstream)
00364 {
00365 if (!(encoding == PostHeadInfinity
00366 || encoding == PostHeadChunked
00367 || encoding == PostHeadStream))
00368 {
00369
00370 pipeline_test_count++;
00371 last_was_pipeline_test = false;
00372 close();
00373 }
00374
00375 if (curl)
00376 doneurl();
00377 return;
00378 }
00379 else if (curl)
00380 curl->inuse = true;
00381
00382 if(!sent_url_request && !urls.isempty())
00383 {
00384 WvUrlRequest *url = urls.first();
00385 if(url)
00386 {
00387 if(url->putstream)
00388 {
00389 int len = 0;
00390 if(url->putstream->isok())
00391 len = url->putstream->read(putstream_data, 1024);
00392
00393 if(!url->putstream->isok() || len == 0)
00394 {
00395 url->putstream = NULL;
00396 send_request(url);
00397 }
00398 }
00399 }
00400 }
00401
00402 if (!curl)
00403 {
00404
00405 line = getline();
00406 if (line)
00407 {
00408 line = trim_string(line);
00409 log(WvLog::Debug4, "Header: '%s'\n", line);
00410 if (!http_response)
00411 {
00412 http_response = line;
00413
00414
00415
00416
00417
00418 if (last_was_pipeline_test
00419 && http_response == pipeline_test_response)
00420 {
00421 pipelining_is_broken(1);
00422 close();
00423 return;
00424 }
00425
00426
00427
00428
00429
00430
00431 if (last_was_pipeline_test && !!http_response)
00432 {
00433 const char *cptr = strchr(http_response, ' ');
00434 if (cptr && atoi(cptr+1) == 400)
00435 {
00436 pipelining_is_broken(3);
00437 close();
00438 return;
00439 }
00440 }
00441 }
00442
00443 if (urls.isempty())
00444 {
00445 log("got unsolicited data.\n");
00446 seterr("unsolicited data from server!");
00447 return;
00448 }
00449
00450 if (!strncasecmp(line, "Content-length: ", 16))
00451 {
00452 bytes_remaining = atoi(line+16);
00453 encoding = ContentLength;
00454 }
00455 else if (!strncasecmp(line, "Transfer-Encoding: ", 19)
00456 && strstr(line+19, "chunked"))
00457 {
00458 encoding = Chunked;
00459 }
00460
00461 if (line[0])
00462 {
00463 char *p;
00464 WvBufUrlStream *outstream = urls.first()->outstream;
00465
00466 if ((p = strchr(line, ':')) != NULL)
00467 {
00468 *p = 0;
00469 p = trim_string(p+1);
00470 if (outstream) {
00471 struct WvHTTPHeader *h;
00472 h = new struct WvHTTPHeader(line, p);
00473 outstream->headers.add(h, true);
00474 }
00475 }
00476 else if (strncasecmp(line, "HTTP/", 5) == 0)
00477 {
00478 char *p = strchr(line, ' ');
00479 if (p)
00480 {
00481 *p = 0;
00482 if (outstream)
00483 {
00484 outstream->version = line+5;
00485 outstream->status = atoi(p+1);
00486 }
00487 }
00488 }
00489 }
00490 else
00491 {
00492
00493 curl = urls.first();
00494 in_chunk_trailer = false;
00495 log(WvLog::Debug4,
00496 "Starting data: %s (enc=%s)\n", bytes_remaining, encoding);
00497
00498 if (encoding == Unknown)
00499 encoding = Infinity;
00500
00501 if (curl->method == "HEAD")
00502 {
00503 log("Got all headers.\n");
00504 if (!enable_pipelining)
00505 doneurl();
00506
00507 if (encoding == Infinity)
00508 encoding = PostHeadInfinity;
00509 else if (encoding == Chunked)
00510 encoding = PostHeadChunked;
00511 else
00512 encoding = PostHeadStream;
00513 }
00514 }
00515 }
00516 }
00517 else if (encoding == PostHeadInfinity
00518 || encoding == PostHeadChunked
00519 || encoding == PostHeadStream)
00520 {
00521 WvDynBuf chkbuf;
00522 len = read(chkbuf, 5);
00523
00524
00525
00526
00527 if (len && strncmp(reinterpret_cast<const char *>(chkbuf.peek(0, 5)),
00528 "HTTP/", 5))
00529 {
00530 if (encoding == PostHeadInfinity)
00531 encoding = ChuckInfinity;
00532 else if (encoding == PostHeadChunked)
00533 encoding = ChuckChunked;
00534 else if (encoding == PostHeadStream)
00535 encoding = ChuckStream;
00536 else
00537 log(WvLog::Warning, "WvHttpStream: inconsistent state.\n");
00538 }
00539 else
00540 doneurl();
00541
00542 unread(chkbuf, len);
00543 }
00544 else if (encoding == ChuckInfinity)
00545 {
00546 len = read(buf, sizeof(buf));
00547 if (len)
00548 log(WvLog::Debug5, "Chucking %s bytes.\n", len);
00549 if (!isok())
00550 doneurl();
00551 }
00552 else if (encoding == ChuckChunked && !bytes_remaining)
00553 {
00554 encoding = Chunked;
00555 }
00556 else if (encoding == ChuckChunked || encoding == ChuckStream)
00557 {
00558 if (bytes_remaining > sizeof(buf))
00559 len = read(buf, sizeof(buf));
00560 else
00561 len = read(buf, bytes_remaining);
00562 bytes_remaining -= len;
00563 if (len)
00564 log(WvLog::Debug5,
00565 "Chucked %s bytes (%s bytes left).\n", len, bytes_remaining);
00566 if (!bytes_remaining && encoding == ContentLength)
00567 doneurl();
00568 if (bytes_remaining && !isok())
00569 seterr("connection interrupted");
00570 }
00571 else if (encoding == Chunked && !bytes_remaining)
00572 {
00573 line = getline();
00574 if (line)
00575 {
00576 line = trim_string(line);
00577
00578 if (in_chunk_trailer)
00579 {
00580
00581 log(WvLog::Debug4, "Trailer: '%s'\n", line);
00582
00583
00584 if (!line[0])
00585 doneurl();
00586 }
00587 else
00588 {
00589
00590 if (line[0])
00591 {
00592 bytes_remaining = (size_t)strtoul(line, NULL, 16);
00593 if (!bytes_remaining)
00594 in_chunk_trailer = true;
00595 log(WvLog::Debug4, "Chunk length is %s ('%s').\n",
00596 bytes_remaining, line);
00597 }
00598 }
00599 }
00600 }
00601 else if (encoding == Infinity)
00602 {
00603
00604
00605
00606 len = read(buf, sizeof(buf));
00607 if (!isok())
00608 return;
00609
00610 if (len)
00611 log(WvLog::Debug5, "Infinity: read %s bytes.\n", len);
00612 if (curl && curl->outstream)
00613 curl->outstream->write(buf, len);
00614
00615 if (!isok() && curl)
00616 doneurl();
00617 }
00618 else
00619 {
00620
00621
00622
00623 if (bytes_remaining > sizeof(buf))
00624 len = read(buf, sizeof(buf));
00625 else
00626 len = read(buf, bytes_remaining);
00627 if (!isok())
00628 return;
00629
00630 bytes_remaining -= len;
00631 if (len)
00632 log(WvLog::Debug5,
00633 "Read %s bytes (%s bytes left).\n", len, bytes_remaining);
00634 if (curl && curl->outstream)
00635 curl->outstream->write(buf, len);
00636
00637 if (!bytes_remaining && encoding == ContentLength && curl)
00638 doneurl();
00639
00640 if (bytes_remaining && !isok())
00641 seterr("connection interrupted");
00642
00643 if (!isok())
00644 doneurl();
00645 }
00646
00647 if (urls.isempty())
00648 alarm(5000);
00649 else
00650 alarm(60000);
00651 }