1 /** 2 A simple HTTP/1.1 client implementation. 3 4 Copyright: © 2012-2014 RejectedSoftware e.K. 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig, Jan Krüger 7 */ 8 module vibe.http.client; 9 10 public import vibe.core.net; 11 public import vibe.http.common; 12 public import vibe.inet.url; 13 14 import vibe.core.connectionpool; 15 import vibe.core.core; 16 import vibe.core.log; 17 import vibe.data.json; 18 import vibe.inet.message; 19 import vibe.inet.url; 20 import vibe.stream.counting; 21 import vibe.stream.tls; 22 import vibe.stream.operations; 23 import vibe.stream.wrapper : createConnectionProxyStream; 24 import vibe.stream.zlib; 25 import vibe.utils.array; 26 import vibe.internal.allocator; 27 import vibe.internal.freelistref; 28 import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy; 29 30 import core.exception : AssertError; 31 import std.algorithm : splitter; 32 import std.array; 33 import std.conv; 34 import std.encoding : sanitize; 35 import std.exception; 36 import std.format; 37 import std.string; 38 import std.typecons; 39 import std.datetime; 40 import std.socket : AddressFamily; 41 42 version(Posix) 43 { 44 version(VibeLibeventDriver) 45 { 46 version = UnixSocket; 47 } 48 } 49 50 51 /**************************************************************************************************/ 52 /* Public functions */ 53 /**************************************************************************************************/ 54 @safe: 55 56 /** 57 Performs a synchronous HTTP request on the specified URL. 58 59 The requester parameter allows to customize the request and to specify the request body for 60 non-GET requests before it is sent. A response object is then returned or passed to the 61 responder callback synchronously. 62 63 This function is a low-level HTTP client facility. It will not perform automatic redirect, 64 caching or similar tasks. For a high-level download facility (similar to cURL), see the 65 `vibe.inet.urltransfer` module. 66 67 Note that it is highly recommended to use one of the overloads that take a responder callback, 68 as they can avoid some memory allocations and are safe against accidentally leaving stale 69 response objects (objects whose response body wasn't fully read). For the returning overloads 70 of the function it is recommended to put a `scope(exit)` right after the call in which 71 `HTTPClientResponse.dropBody` is called to avoid this. 72 73 See_also: `vibe.inet.urltransfer.download` 74 */ 75 HTTPClientResponse requestHTTP(string url, scope void delegate(scope HTTPClientRequest req) requester = null, const(HTTPClientSettings) settings = defaultSettings) 76 { 77 return requestHTTP(URL.parse(url), requester, settings); 78 } 79 /// ditto 80 HTTPClientResponse requestHTTP(URL url, scope void delegate(scope HTTPClientRequest req) requester = null, const(HTTPClientSettings) settings = defaultSettings) 81 { 82 version(UnixSocket) { 83 enforce(url.schema == "http" || url.schema == "https" || url.schema == "http+unix" || url.schema == "https+unix", "URL schema must be http(s) or http(s)+unix."); 84 } else { 85 enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s)."); 86 } 87 enforce(url.host.length > 0, "URL must contain a host name."); 88 bool use_tls; 89 90 if (settings.proxyURL.schema !is null) 91 use_tls = settings.proxyURL.schema == "https"; 92 else 93 { 94 version(UnixSocket) 95 use_tls = url.schema == "https" || url.schema == "https+unix"; 96 else 97 use_tls = url.schema == "https"; 98 } 99 100 auto cli = connectHTTP(url.getFilteredHost, url.port, use_tls, settings); 101 auto res = cli.request((req){ 102 if (url.localURI.length) { 103 assert(url.path.absolute, "Request URL path must be absolute."); 104 req.requestURL = url.localURI; 105 } 106 if (settings.proxyURL.schema !is null) 107 req.requestURL = url.toString(); // proxy exception to the URL representation 108 109 // Provide port number when it is not the default one (RFC2616 section 14.23) 110 if (url.port && url.port != url.defaultPort) 111 req.headers["Host"] = format("%s:%d", url.host, url.port); 112 else 113 req.headers["Host"] = url.host; 114 115 if ("authorization" !in req.headers && url.username != "") { 116 import std.base64; 117 string pwstr = url.username ~ ":" ~ url.password; 118 req.headers["Authorization"] = "Basic " ~ 119 cast(string)Base64.encode(cast(ubyte[])pwstr); 120 } 121 if (requester) requester(req); 122 }); 123 124 // make sure the connection stays locked if the body still needs to be read 125 if( res.m_client ) res.lockedConnection = cli; 126 127 logTrace("Returning HTTPClientResponse for conn %s", () @trusted { return cast(void*)res.lockedConnection.__conn; } ()); 128 return res; 129 } 130 /// ditto 131 void requestHTTP(string url, scope void delegate(scope HTTPClientRequest req) requester, scope void delegate(scope HTTPClientResponse req) responder, const(HTTPClientSettings) settings = defaultSettings) 132 { 133 requestHTTP(URL(url), requester, responder, settings); 134 } 135 /// ditto 136 void requestHTTP(URL url, scope void delegate(scope HTTPClientRequest req) requester, scope void delegate(scope HTTPClientResponse req) responder, const(HTTPClientSettings) settings = defaultSettings) 137 { 138 version(UnixSocket) { 139 enforce(url.schema == "http" || url.schema == "https" || url.schema == "http+unix" || url.schema == "https+unix", "URL schema must be http(s) or http(s)+unix."); 140 } else { 141 enforce(url.schema == "http" || url.schema == "https", "URL schema must be http(s)."); 142 } 143 enforce(url.host.length > 0, "URL must contain a host name."); 144 bool use_tls; 145 146 if (settings.proxyURL.schema !is null) 147 use_tls = settings.proxyURL.schema == "https"; 148 else 149 { 150 version(UnixSocket) 151 use_tls = url.schema == "https"; 152 else 153 use_tls = url.schema == "https" || url.schema == "https+unix"; 154 } 155 156 auto cli = connectHTTP(url.getFilteredHost, url.port, use_tls, settings); 157 cli.request((scope req) { 158 if (url.localURI.length) { 159 assert(url.path.absolute, "Request URL path must be absolute."); 160 req.requestURL = url.localURI; 161 } 162 if (settings.proxyURL.schema !is null) 163 req.requestURL = url.toString(); // proxy exception to the URL representation 164 165 // Provide port number when it is not the default one (RFC2616 section 14.23) 166 if (url.port && url.port != url.defaultPort) 167 req.headers["Host"] = format("%s:%d", url.host, url.port); 168 else 169 req.headers["Host"] = url.host; 170 171 if ("authorization" !in req.headers && url.username != "") { 172 import std.base64; 173 string pwstr = url.username ~ ":" ~ url.password; 174 req.headers["Authorization"] = "Basic " ~ 175 cast(string)Base64.encode(cast(ubyte[])pwstr); 176 } 177 if (requester) requester(req); 178 }, responder); 179 assert(!cli.m_requesting, "HTTP client still requesting after return!?"); 180 assert(!cli.m_responding, "HTTP client still responding after return!?"); 181 } 182 183 /** Posts a simple JSON request. Note that the server www.example.org does not 184 exists, so there will be no meaningful result. 185 */ 186 unittest { 187 import vibe.core.log; 188 import vibe.http.client; 189 import vibe.stream.operations; 190 191 void test() 192 { 193 requestHTTP("http://www.example.org/", 194 (scope req) { 195 req.method = HTTPMethod.POST; 196 //req.writeJsonBody(["name": "My Name"]); 197 }, 198 (scope res) { 199 logInfo("Response: %s", res.bodyReader.readAllUTF8()); 200 } 201 ); 202 } 203 } 204 205 206 /** 207 Returns a HTTPClient proxy object that is connected to the specified host. 208 209 Internally, a connection pool is used to reuse already existing connections. Note that 210 usually requestHTTP should be used for making requests instead of manually using a 211 HTTPClient to do so. 212 */ 213 auto connectHTTP(string host, ushort port = 0, bool use_tls = false, const(HTTPClientSettings) settings = null) 214 { 215 static struct ConnInfo { string host; ushort port; bool useTLS; string proxyIP; ushort proxyPort; NetworkAddress bind_addr; } 216 static vibe.utils.array.FixedRingBuffer!(Tuple!(ConnInfo, ConnectionPool!HTTPClient), 16) s_connections; 217 218 auto sttngs = settings ? settings : defaultSettings; 219 220 if( port == 0 ) port = use_tls ? 443 : 80; 221 auto ckey = ConnInfo(host, port, use_tls, sttngs.proxyURL.host, sttngs.proxyURL.port, sttngs.networkInterface); 222 223 ConnectionPool!HTTPClient pool; 224 s_connections.opApply((ref c) @safe { 225 if (c[0] == ckey) 226 pool = c[1]; 227 return 0; 228 }); 229 230 if (!pool) { 231 logDebug("Create HTTP client pool %s:%s %s proxy %s:%d", host, port, use_tls, sttngs.proxyURL.host, sttngs.proxyURL.port); 232 pool = new ConnectionPool!HTTPClient({ 233 auto ret = new HTTPClient; 234 ret.connect(host, port, use_tls, sttngs); 235 return ret; 236 }); 237 if (s_connections.full) s_connections.popFront(); 238 s_connections.put(tuple(ckey, pool)); 239 } 240 241 return pool.lockConnection(); 242 } 243 244 245 /**************************************************************************************************/ 246 /* Public types */ 247 /**************************************************************************************************/ 248 249 /** 250 Defines an HTTP/HTTPS proxy request or a connection timeout for an HTTPClient. 251 */ 252 class HTTPClientSettings { 253 URL proxyURL; 254 Duration defaultKeepAliveTimeout = 10.seconds; 255 256 /// Forces a specific network interface to use for outgoing connections. 257 NetworkAddress networkInterface = anyAddress; 258 259 /// Can be used to force looking up IPv4/IPv6 addresses for host names. 260 AddressFamily dnsAddressFamily = AddressFamily.UNSPEC; 261 } 262 263 /// 264 unittest { 265 void test() { 266 267 HTTPClientSettings settings = new HTTPClientSettings; 268 settings.proxyURL = URL.parse("http://proxyuser:proxypass@192.168.2.50:3128"); 269 settings.defaultKeepAliveTimeout = 0.seconds; // closes connection immediately after receiving the data. 270 requestHTTP("http://www.example.org", 271 (scope req){ 272 req.method = HTTPMethod.GET; 273 }, 274 (scope res){ 275 logInfo("Headers:"); 276 foreach(key, ref value; res.headers) { 277 logInfo("%s: %s", key, value); 278 } 279 logInfo("Response: %s", res.bodyReader.readAllUTF8()); 280 }, settings); 281 282 } 283 } 284 285 286 /** 287 Implementation of a HTTP 1.0/1.1 client with keep-alive support. 288 289 Note that it is usually recommended to use requestHTTP for making requests as that will use a 290 pool of HTTPClient instances to keep the number of connection establishments low while not 291 blocking requests from different tasks. 292 */ 293 final class HTTPClient { 294 @safe: 295 296 enum maxHeaderLineLength = 4096; 297 298 private { 299 Rebindable!(const(HTTPClientSettings)) m_settings; 300 string m_server; 301 ushort m_port; 302 bool m_useTLS; 303 TCPConnection m_conn; 304 InterfaceProxy!Stream m_stream; 305 TLSStream m_tlsStream; 306 TLSContext m_tls; 307 static __gshared m_userAgent = "vibe.d/"~vibeVersionString~" (HTTPClient, +http://vibed.org/)"; 308 static __gshared void function(TLSContext) ms_tlsSetup; 309 bool m_requesting = false, m_responding = false; 310 SysTime m_keepAliveLimit; 311 Duration m_keepAliveTimeout; 312 } 313 314 /** Get the current settings for the HTTP client. **/ 315 @property const(HTTPClientSettings) settings() const { 316 return m_settings; 317 } 318 319 /** 320 Sets the default user agent string for new HTTP requests. 321 */ 322 static void setUserAgentString(string str) @trusted { m_userAgent = str; } 323 324 /** 325 Sets a callback that will be called for every TLS context that is created. 326 327 Setting such a callback is useful for adjusting the validation parameters 328 of the TLS context. 329 */ 330 static void setTLSSetupCallback(void function(TLSContext) @safe func) @trusted { ms_tlsSetup = func; } 331 332 /** 333 Connects to a specific server. 334 335 This method may only be called if any previous connection has been closed. 336 */ 337 void connect(string server, ushort port = 80, bool use_tls = false, const(HTTPClientSettings) settings = defaultSettings) 338 { 339 assert(!m_conn); 340 assert(port != 0); 341 disconnect(); 342 m_conn = TCPConnection.init; 343 m_settings = settings; 344 m_keepAliveTimeout = settings.defaultKeepAliveTimeout; 345 m_keepAliveLimit = Clock.currTime(UTC()) + m_keepAliveTimeout; 346 m_server = server; 347 m_port = port; 348 m_useTLS = use_tls; 349 if (use_tls) { 350 m_tls = createTLSContext(TLSContextKind.client); 351 // this will be changed to trustedCert once a proper root CA store is available by default 352 m_tls.peerValidationMode = TLSPeerValidationMode.none; 353 () @trusted { if (ms_tlsSetup) ms_tlsSetup(m_tls); } (); 354 } 355 } 356 357 /** 358 Forcefully closes the TCP connection. 359 360 Before calling this method, be sure that no request is currently being processed. 361 */ 362 void disconnect() 363 { 364 if (m_conn) { 365 if (m_conn.connected) { 366 try m_stream.finalize(); 367 catch (Exception e) logDebug("Failed to finalize connection stream when closing HTTP client connection: %s", e.msg); 368 m_conn.close(); 369 } 370 if (m_useTLS) { 371 () @trusted { return destroy(m_stream); } (); 372 m_stream = InterfaceProxy!Stream.init; 373 } 374 () @trusted { return destroy(m_conn); } (); 375 m_conn = TCPConnection.init; 376 } 377 } 378 379 private void doProxyRequest(T, U)(ref T res, U requester, ref bool close_conn, ref bool has_body) 380 @trusted { // scope new 381 import std.conv : to; 382 import vibe.internal.utilallocator: RegionListAllocator; 383 version (VibeManualMemoryManagement) 384 scope request_allocator = new RegionListAllocator!(shared(Mallocator), false)(1024, Mallocator.instance); 385 else 386 scope request_allocator = new RegionListAllocator!(shared(GCAllocator), true)(1024, GCAllocator.instance); 387 388 res.dropBody(); 389 scope(failure) 390 res.disconnect(); 391 if (res.statusCode != 407) { 392 throw new HTTPStatusException(HTTPStatus.internalServerError, "Proxy returned Proxy-Authenticate without a 407 status code."); 393 } 394 395 // send the request again with the proxy authentication information if available 396 if (m_settings.proxyURL.username is null) { 397 throw new HTTPStatusException(HTTPStatus.proxyAuthenticationRequired, "Proxy Authentication Required."); 398 } 399 400 m_responding = false; 401 close_conn = false; 402 bool found_proxy_auth; 403 404 foreach (string proxyAuth; res.headers.getAll("Proxy-Authenticate")) 405 { 406 if (proxyAuth.length >= "Basic".length && proxyAuth[0.."Basic".length] == "Basic") 407 { 408 found_proxy_auth = true; 409 break; 410 } 411 } 412 413 if (!found_proxy_auth) 414 { 415 throw new HTTPStatusException(HTTPStatus.notAcceptable, "The Proxy Server didn't allow Basic Authentication"); 416 } 417 418 SysTime connected_time; 419 has_body = doRequestWithRetry(requester, true, close_conn, connected_time); 420 m_responding = true; 421 422 static if (is(T == HTTPClientResponse)) 423 res = new HTTPClientResponse(this, has_body, close_conn, request_allocator, connected_time); 424 else 425 res = scoped!HTTPClientResponse(this, has_body, close_conn, request_allocator, connected_time); 426 427 if (res.headers.get("Proxy-Authenticate", null) !is null){ 428 res.dropBody(); 429 throw new HTTPStatusException(HTTPStatus.ProxyAuthenticationRequired, "Proxy Authentication Failed."); 430 } 431 432 } 433 434 /** 435 Performs a HTTP request. 436 437 `requester` is called first to populate the request with headers and the desired 438 HTTP method and version. After a response has been received it is then passed 439 to the caller which can in turn read the reponse body. Any part of the body 440 that has not been processed will automatically be consumed and dropped. 441 442 Note that the `requester` callback might be invoked multiple times in the event 443 that a request has to be resent due to a connection failure. 444 445 Also note that the second form of this method (returning a `HTTPClientResponse`) is 446 not recommended to use as it may accidentially block a HTTP connection when 447 only part of the response body was read and also requires a heap allocation 448 for the response object. The callback based version on the other hand uses 449 a stack allocation and guarantees that the request has been fully processed 450 once it has returned. 451 */ 452 void request(scope void delegate(scope HTTPClientRequest req) requester, scope void delegate(scope HTTPClientResponse) responder) 453 @trusted { // scope new 454 import vibe.internal.utilallocator: RegionListAllocator; 455 version (VibeManualMemoryManagement) 456 scope request_allocator = new RegionListAllocator!(shared(Mallocator), false)(1024, Mallocator.instance); 457 else 458 scope request_allocator = new RegionListAllocator!(shared(GCAllocator), true)(1024, GCAllocator.instance); 459 460 bool close_conn; 461 SysTime connected_time; 462 bool has_body = doRequestWithRetry(requester, false, close_conn, connected_time); 463 464 m_responding = true; 465 auto res = scoped!HTTPClientResponse(this, has_body, close_conn, request_allocator, connected_time); 466 467 // proxy implementation 468 if (res.headers.get("Proxy-Authenticate", null) !is null) { 469 doProxyRequest(res, requester, close_conn, has_body); 470 } 471 472 Exception user_exception; 473 { 474 scope (failure) { 475 m_responding = false; 476 disconnect(); 477 } 478 try responder(res); 479 catch (Exception e) { 480 logDebug("Error while handling response: %s", e.toString().sanitize()); 481 user_exception = e; 482 } 483 if (m_responding) { 484 logDebug("Failed to handle the complete response of the server - disconnecting."); 485 res.disconnect(); 486 } 487 assert(!m_responding, "Still in responding state after finalizing the response!?"); 488 489 if (user_exception || res.headers.get("Connection") == "close") 490 disconnect(); 491 } 492 if (user_exception) throw user_exception; 493 } 494 495 /// ditto 496 HTTPClientResponse request(scope void delegate(HTTPClientRequest) requester) 497 { 498 bool close_conn; 499 SysTime connected_time; 500 bool has_body = doRequestWithRetry(requester, false, close_conn, connected_time); 501 m_responding = true; 502 auto res = new HTTPClientResponse(this, has_body, close_conn, () @trusted { return vibeThreadAllocator(); } (), connected_time); 503 504 // proxy implementation 505 if (res.headers.get("Proxy-Authenticate", null) !is null) { 506 doProxyRequest(res, requester, close_conn, has_body); 507 } 508 509 return res; 510 } 511 512 private bool doRequestWithRetry(scope void delegate(HTTPClientRequest req) requester, bool confirmed_proxy_auth /* basic only */, out bool close_conn, out SysTime connected_time) 513 { 514 if (m_conn && m_conn.connected && connected_time > m_keepAliveLimit){ 515 logDebug("Disconnected to avoid timeout"); 516 disconnect(); 517 } 518 519 // check if this isn't the first request on a connection 520 bool is_persistent_request = m_conn && m_conn.connected; 521 522 // retry the request if the connection gets closed prematurely and this is a persistent request 523 bool has_body; 524 foreach (i; 0 .. is_persistent_request ? 2 : 1) { 525 connected_time = Clock.currTime(UTC()); 526 527 close_conn = false; 528 has_body = doRequest(requester, close_conn, false, connected_time); 529 530 logTrace("HTTP client waiting for response"); 531 if (!m_stream.empty) break; 532 533 enforce(i != 1, "Second attempt to send HTTP request failed."); 534 } 535 return has_body; 536 } 537 538 private bool doRequest(scope void delegate(HTTPClientRequest req) requester, ref bool close_conn, bool confirmed_proxy_auth = false /* basic only */, SysTime connected_time = Clock.currTime(UTC())) 539 { 540 assert(!m_requesting, "Interleaved HTTP client requests detected!"); 541 assert(!m_responding, "Interleaved HTTP client request/response detected!"); 542 543 m_requesting = true; 544 scope(exit) m_requesting = false; 545 546 if (!m_conn || !m_conn.connected) { 547 if (m_conn) m_conn.close(); // make sure all resources are freed 548 if (m_settings.proxyURL.host !is null){ 549 550 enum AddressType { 551 IPv4, 552 IPv6, 553 Host 554 } 555 556 static AddressType getAddressType(string host){ 557 import std.regex : regex, Captures, Regex, matchFirst; 558 559 static IPv4Regex = regex(`^\s*((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))\s*$`, ``); 560 static IPv6Regex = regex(`^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$`, ``); 561 562 if (!matchFirst(host, IPv4Regex).empty) 563 { 564 return AddressType.IPv4; 565 } 566 else if (!matchFirst(host, IPv6Regex).empty) 567 { 568 return AddressType.IPv6; 569 } 570 else 571 { 572 return AddressType.Host; 573 } 574 } 575 576 import std.functional : memoize; 577 alias findAddressType = memoize!getAddressType; 578 579 bool use_dns; 580 if (() @trusted { return findAddressType(m_settings.proxyURL.host); } () == AddressType.Host) 581 { 582 use_dns = true; 583 } 584 585 NetworkAddress proxyAddr = resolveHost(m_settings.proxyURL.host, m_settings.dnsAddressFamily, use_dns); 586 proxyAddr.port = m_settings.proxyURL.port; 587 m_conn = connectTCP(proxyAddr, m_settings.networkInterface); 588 } 589 else { 590 version(UnixSocket) 591 { 592 import core.sys.posix.sys.un; 593 import core.sys.posix.sys.socket; 594 import std.regex : regex, Captures, Regex, matchFirst, ctRegex; 595 import core.stdc.string : strcpy; 596 597 NetworkAddress addr; 598 if (m_server[0] == '/') 599 { 600 addr.family = AF_UNIX; 601 sockaddr_un* s = addr.sockAddrUnix(); 602 enforce(s.sun_path.length > m_server.length, "Unix sockets cannot have that long a name."); 603 s.sun_family = AF_UNIX; 604 () @trusted { strcpy(cast(char*)s.sun_path.ptr,m_server.toStringz()); } (); 605 } else 606 { 607 addr = resolveHost(m_server, m_settings.dnsAddressFamily); 608 addr.port = m_port; 609 } 610 m_conn = connectTCP(addr, m_settings.networkInterface); 611 } else 612 { 613 auto addr = resolveHost(m_server, m_settings.dnsAddressFamily); 614 addr.port = m_port; 615 m_conn = connectTCP(addr, m_settings.networkInterface); 616 } 617 } 618 619 m_stream = m_conn; 620 if (m_useTLS) { 621 try m_tlsStream = createTLSStream(m_conn, m_tls, TLSStreamState.connecting, m_server, m_conn.remoteAddress); 622 catch (Exception e) { 623 m_conn.close(); 624 throw e; 625 } 626 m_stream = m_tlsStream; 627 } 628 } 629 630 return () @trusted { // scoped 631 auto req = scoped!HTTPClientRequest(m_stream, m_conn.localAddress); 632 if (m_useTLS) 633 req.m_peerCertificate = m_tlsStream.peerCertificate; 634 635 req.headers["User-Agent"] = m_userAgent; 636 if (m_settings.proxyURL.host !is null){ 637 req.headers["Proxy-Connection"] = "keep-alive"; 638 close_conn = false; // req.headers.get("Proxy-Connection", "keep-alive") != "keep-alive"; 639 if (confirmed_proxy_auth) 640 { 641 import std.base64; 642 ubyte[] user_pass = cast(ubyte[])(m_settings.proxyURL.username ~ ":" ~ m_settings.proxyURL.password); 643 644 req.headers["Proxy-Authorization"] = "Basic " ~ cast(string) Base64.encode(user_pass); 645 } 646 } 647 else { 648 req.headers["Connection"] = "keep-alive"; 649 close_conn = false; // req.headers.get("Connection", "keep-alive") != "keep-alive"; 650 } 651 req.headers["Accept-Encoding"] = "gzip, deflate"; 652 req.headers["Host"] = m_server; 653 requester(req); 654 req.finalize(); 655 656 return req.method != HTTPMethod.HEAD; 657 } (); 658 } 659 } 660 661 662 /** 663 Represents a HTTP client request (as sent to the server). 664 */ 665 final class HTTPClientRequest : HTTPRequest { 666 private { 667 InterfaceProxy!OutputStream m_bodyWriter; 668 FreeListRef!ChunkedOutputStream m_chunkedStream; 669 bool m_headerWritten = false; 670 FixedAppender!(string, 22) m_contentLengthBuffer; 671 NetworkAddress m_localAddress; 672 TLSCertificateInformation m_peerCertificate; 673 } 674 675 676 /// private 677 this(InterfaceProxy!Stream conn, NetworkAddress local_addr) 678 { 679 super(conn); 680 m_localAddress = local_addr; 681 } 682 683 @property NetworkAddress localAddress() const { return m_localAddress; } 684 685 @property ref inout(TLSCertificateInformation) peerCertificate() inout { return m_peerCertificate; } 686 687 /** 688 Accesses the Content-Length header of the request. 689 690 Negative values correspond to an unset Content-Length header. 691 */ 692 @property long contentLength() const { return headers.get("Content-Length", "-1").to!long(); } 693 /// ditto 694 @property void contentLength(long value) 695 { 696 if (value >= 0) headers["Content-Length"] = clengthString(value); 697 else if ("Content-Length" in headers) headers.remove("Content-Length"); 698 } 699 700 /** 701 Writes the whole request body at once using raw bytes. 702 */ 703 void writeBody(RandomAccessStream data) 704 { 705 writeBody(data, data.size - data.tell()); 706 } 707 /// ditto 708 void writeBody(InputStream data) 709 { 710 headers["Transfer-Encoding"] = "chunked"; 711 data.pipe(bodyWriter); 712 finalize(); 713 } 714 /// ditto 715 void writeBody(InputStream data, ulong length) 716 { 717 headers["Content-Length"] = clengthString(length); 718 data.pipe(bodyWriter, length); 719 finalize(); 720 } 721 /// ditto 722 void writeBody(in ubyte[] data, string content_type = null) 723 { 724 if( content_type != "" ) headers["Content-Type"] = content_type; 725 headers["Content-Length"] = clengthString(data.length); 726 bodyWriter.write(data); 727 finalize(); 728 } 729 730 /** 731 Writes the request body as JSON data. 732 */ 733 void writeJsonBody(T)(T data, bool allow_chunked = false) 734 { 735 import vibe.stream.wrapper : streamOutputRange; 736 737 headers["Content-Type"] = "application/json"; 738 739 // set an explicit content-length field if chunked encoding is not allowed 740 if (!allow_chunked) { 741 import vibe.internal.rangeutil; 742 long length = 0; 743 auto counter = () @trusted { return RangeCounter(&length); } (); 744 () @trusted { serializeToJson(counter, data); } (); 745 headers["Content-Length"] = clengthString(length); 746 } 747 748 auto rng = streamOutputRange!1024(bodyWriter); 749 () @trusted { serializeToJson(&rng, data); } (); 750 rng.flush(); 751 finalize(); 752 } 753 754 /** Writes the request body as form data. 755 */ 756 void writeFormBody(T)(T key_value_map) 757 { 758 import vibe.inet.webform : formEncode; 759 import vibe.stream.wrapper : streamOutputRange; 760 761 import vibe.internal.rangeutil; 762 long length = 0; 763 auto counter = () @trusted { return RangeCounter(&length); } (); 764 counter.formEncode(key_value_map); 765 headers["Content-Length"] = clengthString(length); 766 headers["Content-Type"] = "application/x-www-form-urlencoded"; 767 auto dst = streamOutputRange!1024(bodyWriter); 768 () @trusted { return &dst; } ().formEncode(key_value_map); 769 } 770 771 /// 772 unittest { 773 void test(HTTPClientRequest req) { 774 req.writeFormBody(["foo": "bar"]); 775 } 776 } 777 778 void writePart(MultiPart part) 779 { 780 assert(false, "TODO"); 781 } 782 783 /** 784 An output stream suitable for writing the request body. 785 786 The first retrieval will cause the request header to be written, make sure 787 that all headers are set up in advance.s 788 */ 789 @property InterfaceProxy!OutputStream bodyWriter() 790 { 791 if (m_bodyWriter) return m_bodyWriter; 792 793 assert(!m_headerWritten, "Trying to write request body after body was already written."); 794 795 if ("Content-Length" !in headers && "Transfer-Encoding" !in headers 796 && headers.get("Connection", "") != "close") 797 { 798 headers["Transfer-Encoding"] = "chunked"; 799 } 800 801 writeHeader(); 802 m_bodyWriter = m_conn; 803 804 if (headers.get("Transfer-Encoding", null) == "chunked") { 805 m_chunkedStream = createChunkedOutputStreamFL(m_bodyWriter); 806 m_bodyWriter = m_chunkedStream; 807 } 808 809 return m_bodyWriter; 810 } 811 812 private void writeHeader() 813 { 814 import vibe.stream.wrapper; 815 816 assert(!m_headerWritten, "HTTPClient tried to write headers twice."); 817 m_headerWritten = true; 818 819 auto output = streamOutputRange!1024(m_conn); 820 821 formattedWrite(() @trusted { return &output; } (), "%s %s %s\r\n", httpMethodString(method), requestURL, getHTTPVersionString(httpVersion)); 822 logTrace("--------------------"); 823 logTrace("HTTP client request:"); 824 logTrace("--------------------"); 825 logTrace("%s", this); 826 foreach (k, v; headers) { 827 () @trusted { formattedWrite(&output, "%s: %s\r\n", k, v); } (); 828 logTrace("%s: %s", k, v); 829 } 830 output.put("\r\n"); 831 logTrace("--------------------"); 832 } 833 834 private void finalize() 835 { 836 // test if already finalized 837 if (m_headerWritten && !m_bodyWriter) 838 return; 839 840 // force the request to be sent 841 if (!m_headerWritten) writeHeader(); 842 else { 843 bodyWriter.flush(); 844 if (m_chunkedStream) { 845 m_bodyWriter.finalize(); 846 m_conn.flush(); 847 } 848 m_bodyWriter = typeof(m_bodyWriter).init; 849 m_conn = typeof(m_conn).init; 850 } 851 } 852 853 private string clengthString(ulong len) 854 { 855 m_contentLengthBuffer.clear(); 856 () @trusted { formattedWrite(&m_contentLengthBuffer, "%s", len); } (); 857 return () @trusted { return m_contentLengthBuffer.data; } (); 858 } 859 } 860 861 862 /** 863 Represents a HTTP client response (as received from the server). 864 */ 865 final class HTTPClientResponse : HTTPResponse { 866 @safe: 867 868 private { 869 HTTPClient m_client; 870 LockedConnection!HTTPClient lockedConnection; 871 FreeListRef!LimitedInputStream m_limitedInputStream; 872 FreeListRef!ChunkedInputStream m_chunkedInputStream; 873 FreeListRef!ZlibInputStream m_zlibInputStream; 874 FreeListRef!EndCallbackInputStream m_endCallback; 875 InterfaceProxy!InputStream m_bodyReader; 876 bool m_closeConn; 877 int m_maxRequests; 878 } 879 880 /// Contains the keep-alive 'max' parameter, indicates how many requests a client can 881 /// make before the server closes the connection. 882 @property int maxRequests() const { 883 return m_maxRequests; 884 } 885 886 /// private 887 this(HTTPClient client, bool has_body, bool close_conn, IAllocator alloc, SysTime connected_time = Clock.currTime(UTC())) 888 { 889 m_client = client; 890 m_closeConn = close_conn; 891 892 scope(failure) finalize(true); 893 894 // read and parse status line ("HTTP/#.# #[ $]\r\n") 895 logTrace("HTTP client reading status line"); 896 string stln = () @trusted { return cast(string)client.m_stream.readLine(HTTPClient.maxHeaderLineLength, "\r\n", alloc); } (); 897 logTrace("stln: %s", stln); 898 this.httpVersion = parseHTTPVersion(stln); 899 900 enforce(stln.startsWith(" ")); 901 stln = stln[1 .. $]; 902 this.statusCode = parse!int(stln); 903 if( stln.length > 0 ){ 904 enforce(stln.startsWith(" ")); 905 stln = stln[1 .. $]; 906 this.statusPhrase = stln; 907 } 908 909 // read headers until an empty line is hit 910 parseRFC5322Header(client.m_stream, this.headers, HTTPClient.maxHeaderLineLength, alloc, false); 911 912 logTrace("---------------------"); 913 logTrace("HTTP client response:"); 914 logTrace("---------------------"); 915 logTrace("%s", this); 916 foreach (k, v; this.headers) 917 logTrace("%s: %s", k, v); 918 logTrace("---------------------"); 919 920 Duration server_timeout; 921 bool has_server_timeout; 922 if (auto pka = "Keep-Alive" in this.headers) { 923 foreach(s; splitter(*pka, ',')){ 924 auto pair = s.splitter('='); 925 auto name = pair.front.strip(); 926 pair.popFront(); 927 if (icmp(name, "timeout") == 0) { 928 has_server_timeout = true; 929 server_timeout = pair.front.to!int().seconds; 930 } else if (icmp(name, "max") == 0) { 931 m_maxRequests = pair.front.to!int(); 932 } 933 } 934 } 935 Duration elapsed = Clock.currTime(UTC()) - connected_time; 936 if (this.headers.get("Connection") == "close") { 937 // this header will trigger m_client.disconnect() in m_client.doRequest() when it goes out of scope 938 } else if (has_server_timeout && m_client.m_keepAliveTimeout > server_timeout) { 939 m_client.m_keepAliveLimit = Clock.currTime(UTC()) + server_timeout - elapsed; 940 } else if (this.httpVersion == HTTPVersion.HTTP_1_1) { 941 m_client.m_keepAliveLimit = Clock.currTime(UTC()) + m_client.m_keepAliveTimeout; 942 } 943 944 if (!has_body) finalize(); 945 } 946 947 ~this() 948 { 949 debug if (m_client) { import std.stdio; writefln("WARNING: HTTPClientResponse not fully processed before being finalized"); } 950 } 951 952 /** 953 An input stream suitable for reading the response body. 954 */ 955 @property InterfaceProxy!InputStream bodyReader() 956 { 957 if( m_bodyReader ) return m_bodyReader; 958 959 assert (m_client, "Response was already read or no response body, may not use bodyReader."); 960 961 // prepare body the reader 962 if (auto pte = "Transfer-Encoding" in this.headers) { 963 enforce(*pte == "chunked"); 964 m_chunkedInputStream = createChunkedInputStreamFL(m_client.m_stream); 965 m_bodyReader = this.m_chunkedInputStream; 966 } else if (auto pcl = "Content-Length" in this.headers) { 967 m_limitedInputStream = createLimitedInputStreamFL(m_client.m_stream, to!ulong(*pcl)); 968 m_bodyReader = m_limitedInputStream; 969 } else if (isKeepAliveResponse) { 970 m_limitedInputStream = createLimitedInputStreamFL(m_client.m_stream, 0); 971 m_bodyReader = m_limitedInputStream; 972 } else { 973 m_bodyReader = m_client.m_stream; 974 } 975 976 if( auto pce = "Content-Encoding" in this.headers ){ 977 if( *pce == "deflate" ){ 978 m_zlibInputStream = createDeflateInputStreamFL(m_bodyReader); 979 m_bodyReader = m_zlibInputStream; 980 } else if( *pce == "gzip" || *pce == "x-gzip"){ 981 m_zlibInputStream = createGzipInputStreamFL(m_bodyReader); 982 m_bodyReader = m_zlibInputStream; 983 } 984 else enforce(*pce == "identity" || *pce == "", "Unsuported content encoding: "~*pce); 985 } 986 987 // be sure to free resouces as soon as the response has been read 988 m_endCallback = createEndCallbackInputStreamFL(m_bodyReader, &this.finalize); 989 m_bodyReader = m_endCallback; 990 991 return m_bodyReader; 992 } 993 994 /** 995 Provides unsafe means to read raw data from the connection. 996 997 No transfer decoding and no content decoding is done on the data. 998 999 Not that the provided delegate must read the whole stream, 1000 as the state of the response is unknown after raw bytes have been 1001 taken. Failure to read the right amount of data will lead to 1002 protocol corruption in later requests. 1003 */ 1004 void readRawBody(scope void delegate(scope InterfaceProxy!InputStream stream) @safe del) 1005 { 1006 assert(!m_bodyReader, "May not mix use of readRawBody and bodyReader."); 1007 del(interfaceProxy!InputStream(m_client.m_stream)); 1008 finalize(); 1009 } 1010 /// ditto 1011 static if (!is(InputStream == InterfaceProxy!InputStream)) 1012 void readRawBody(scope void delegate(scope InputStream stream) @safe del) 1013 { 1014 import vibe.internal.interfaceproxy : asInterface; 1015 1016 assert(!m_bodyReader, "May not mix use of readRawBody and bodyReader."); 1017 del(m_client.m_stream.asInterface!(.InputStream)); 1018 finalize(); 1019 } 1020 1021 /** 1022 Reads the whole response body and tries to parse it as JSON. 1023 */ 1024 Json readJson(){ 1025 auto bdy = bodyReader.readAllUTF8(); 1026 return () @trusted { return parseJson(bdy); } (); 1027 } 1028 1029 /** 1030 Reads and discards the response body. 1031 */ 1032 void dropBody() 1033 { 1034 if (m_client) { 1035 if( bodyReader.empty ){ 1036 finalize(); 1037 } else { 1038 bodyReader.pipe(nullSink); 1039 assert(!lockedConnection.__conn); 1040 } 1041 } 1042 } 1043 1044 /** 1045 Forcefully terminates the connection regardless of the current state. 1046 1047 Note that this will only actually disconnect if the request has not yet 1048 been fully processed. If the whole body was already read, the 1049 connection is not owned by the current request operation anymore and 1050 cannot be accessed. Use a "Connection: close" header instead in this 1051 case to let the server close the connection. 1052 */ 1053 void disconnect() 1054 { 1055 finalize(true); 1056 } 1057 1058 /** 1059 Switches the connection to a new protocol and returns the resulting ConnectionStream. 1060 1061 The caller caller gets ownership of the ConnectionStream and is responsible 1062 for closing it. 1063 1064 Notice: 1065 When using the overload that returns a `ConnectionStream`, the caller 1066 must make sure that the stream is not used after the 1067 `HTTPClientRequest` has been destroyed. 1068 1069 Params: 1070 new_protocol = The protocol to which the connection is expected to 1071 upgrade. Should match the Upgrade header of the request. If an 1072 empty string is passed, the "Upgrade" header will be ignored and 1073 should be checked by other means. 1074 */ 1075 ConnectionStream switchProtocol(string new_protocol) 1076 { 1077 enforce(statusCode == HTTPStatus.switchingProtocols, "Server did not send a 101 - Switching Protocols response"); 1078 string *resNewProto = "Upgrade" in headers; 1079 enforce(resNewProto, "Server did not send an Upgrade header"); 1080 enforce(!new_protocol.length || !icmp(*resNewProto, new_protocol), 1081 "Expected Upgrade: " ~ new_protocol ~", received Upgrade: " ~ *resNewProto); 1082 auto stream = createConnectionProxyStream!(typeof(m_client.m_stream), typeof(m_client.m_conn))(m_client.m_stream, m_client.m_conn); 1083 m_client.m_responding = false; 1084 m_client = null; 1085 m_closeConn = true; // cannot reuse connection for further requests! 1086 return stream; 1087 } 1088 /// ditto 1089 void switchProtocol(string new_protocol, scope void delegate(ConnectionStream str) @safe del) 1090 { 1091 enforce(statusCode == HTTPStatus.switchingProtocols, "Server did not send a 101 - Switching Protocols response"); 1092 string *resNewProto = "Upgrade" in headers; 1093 enforce(resNewProto, "Server did not send an Upgrade header"); 1094 enforce(!new_protocol.length || !icmp(*resNewProto, new_protocol), 1095 "Expected Upgrade: " ~ new_protocol ~", received Upgrade: " ~ *resNewProto); 1096 scope stream = createConnectionProxyStream(m_client.m_stream, m_client.m_conn); 1097 m_client.m_responding = false; 1098 m_client = null; 1099 m_closeConn = true; 1100 del(stream); 1101 } 1102 1103 private @property isKeepAliveResponse() 1104 const { 1105 string conn; 1106 if (this.httpVersion == HTTPVersion.HTTP_1_0) conn = this.headers.get("Connection", "close"); 1107 else conn = this.headers.get("Connection", "keep-alive"); 1108 return icmp(conn, "close") != 0; 1109 } 1110 1111 private void finalize() 1112 { 1113 finalize(m_closeConn); 1114 } 1115 1116 private void finalize(bool disconnect) 1117 { 1118 // ignore duplicate and too early calls to finalize 1119 // (too early happesn for empty response bodies) 1120 if (!m_client) return; 1121 1122 auto cli = m_client; 1123 m_client = null; 1124 cli.m_responding = false; 1125 destroy(m_zlibInputStream); 1126 destroy(m_chunkedInputStream); 1127 destroy(m_limitedInputStream); 1128 if (disconnect) cli.disconnect(); 1129 destroy(lockedConnection); 1130 } 1131 } 1132 1133 /** Returns clean host string. In case of unix socket it performs urlDecode on host. */ 1134 package auto getFilteredHost(URL url) 1135 { 1136 version(UnixSocket) 1137 { 1138 import vibe.textfilter.urlencode : urlDecode; 1139 if (url.schema == "https+unix" || url.schema == "http+unix") 1140 return urlDecode(url.host); 1141 else 1142 return url.host; 1143 } else 1144 return url.host; 1145 } 1146 1147 // This object is a placeholder and should to never be modified. 1148 package @property const(HTTPClientSettings) defaultSettings() 1149 @trusted { 1150 __gshared HTTPClientSettings ret = new HTTPClientSettings; 1151 return ret; 1152 }