1 /** 2 Implements WebSocket support and fallbacks for older browsers. 3 4 Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455) 5 Copyright: © 2012-2014 Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Jan Krüger 8 */ 9 module vibe.http.websockets; 10 11 /// 12 @safe unittest { 13 void handleConn(scope WebSocket sock) 14 { 15 // simple echo server 16 while (sock.connected) { 17 auto msg = sock.receiveText(); 18 sock.send(msg); 19 } 20 } 21 22 void startServer() 23 { 24 import vibe.http.router; 25 auto router = new URLRouter; 26 router.get("/ws", handleWebSockets(&handleConn)); 27 28 // Start HTTP server using listenHTTP()... 29 } 30 } 31 32 import vibe.core.core; 33 import vibe.core.log; 34 import vibe.core.net; 35 import vibe.core.sync; 36 import vibe.stream.operations; 37 import vibe.http.server; 38 import vibe.http.client; 39 import vibe.core.connectionpool; 40 import vibe.utils.array; 41 42 import core.time; 43 import std.algorithm: equal, splitter; 44 import std.array; 45 import std.base64; 46 import std.conv; 47 import std.exception; 48 import std.bitmanip; 49 import std.digest.sha; 50 import std.string; 51 import std.functional; 52 import std.uuid; 53 import std.base64; 54 import std.digest.sha; 55 import std.uni: asLowerCase; 56 import vibe.crypto.cryptorand; 57 58 @safe: 59 60 61 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow; 62 63 64 /// Exception thrown by $(D vibe.http.websockets). 65 class WebSocketException: Exception 66 { 67 @safe pure nothrow: 68 69 /// 70 this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 71 { 72 super(msg, file, line, next); 73 } 74 75 /// 76 this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__) 77 { 78 super(msg, next, file, line); 79 } 80 } 81 82 /** Establishes a WebSocket connection at the specified endpoint. 83 */ 84 WebSocket connectWebSocketEx(URL url, 85 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 86 const(HTTPClientSettings) settings = defaultSettings) 87 @safe { 88 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 89 url.schema = use_tls ? "https" : "http"; 90 91 auto rng = secureRNG(); 92 auto challengeKey = generateChallengeKey(rng); 93 auto answerKey = computeAcceptKey(challengeKey); 94 auto res = requestHTTP(url, (scope req){ 95 req.method = HTTPMethod.GET; 96 req.headers["Upgrade"] = "websocket"; 97 req.headers["Connection"] = "Upgrade"; 98 req.headers["Sec-WebSocket-Version"] = "13"; 99 req.headers["Sec-WebSocket-Key"] = challengeKey; 100 request_modifier(req); 101 }, settings); 102 103 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 104 105 auto key = "sec-websocket-accept" in res.headers; 106 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 107 enforce(*key == answerKey, "Response has wrong accept key"); 108 auto conn = res.switchProtocol("websocket"); 109 return new WebSocket(conn, rng, res); 110 } 111 112 /// ditto 113 void connectWebSocketEx(URL url, 114 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 115 scope WebSocketHandshakeDelegate del, 116 const(HTTPClientSettings) settings = defaultSettings) 117 @safe { 118 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 119 url.schema = use_tls ? "https" : "http"; 120 121 /*scope*/auto rng = secureRNG(); 122 auto challengeKey = generateChallengeKey(rng); 123 auto answerKey = computeAcceptKey(challengeKey); 124 125 requestHTTP(url, 126 (scope req) { 127 req.method = HTTPMethod.GET; 128 req.headers["Upgrade"] = "websocket"; 129 req.headers["Connection"] = "Upgrade"; 130 req.headers["Sec-WebSocket-Version"] = "13"; 131 req.headers["Sec-WebSocket-Key"] = challengeKey; 132 request_modifier(req); 133 }, 134 (scope res) { 135 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 136 auto key = "sec-websocket-accept" in res.headers; 137 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 138 enforce(*key == answerKey, "Response has wrong accept key"); 139 res.switchProtocol("websocket", (scope conn) @trusted { 140 scope ws = new WebSocket(conn, rng, res); 141 del(ws); 142 if (ws.connected) ws.close(); 143 }); 144 }, 145 settings 146 ); 147 } 148 149 /// ditto 150 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings) 151 @safe { 152 return connectWebSocketEx(url, (scope req) {}, settings); 153 } 154 /// ditto 155 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings) 156 @safe { 157 connectWebSocketEx(url, (scope req) {}, del, settings); 158 } 159 /// ditto 160 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings) 161 @system { 162 connectWebSocket(url, (scope ws) nothrow { 163 try del(ws); 164 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 165 }, settings); 166 } 167 /// Scheduled for deprecation - use a `@safe` callback instead. 168 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings) 169 @system { 170 connectWebSocket(url, (scope ws) @trusted => del(ws), settings); 171 } 172 /// Scheduled for deprecation - use a `nothrow` callback instead. 173 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings) 174 @safe { 175 connectWebSocket(url, (scope ws) nothrow { 176 try del(ws); 177 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 178 }, settings); 179 } 180 181 182 /** 183 Establishes a web socket conection and passes it to the $(D on_handshake) delegate. 184 */ 185 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 186 @safe { 187 auto pUpgrade = "Upgrade" in req.headers; 188 auto pConnection = "Connection" in req.headers; 189 auto pKey = "Sec-WebSocket-Key" in req.headers; 190 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 191 auto pVersion = "Sec-WebSocket-Version" in req.headers; 192 193 auto isUpgrade = false; 194 195 if( pConnection ) { 196 auto connectionTypes = splitter(*pConnection, ","); 197 foreach( t ; connectionTypes ) { 198 if( t.strip().asLowerCase().equal("upgrade") ) { 199 isUpgrade = true; 200 break; 201 } 202 } 203 } 204 205 string req_error; 206 if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests."; 207 else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header."; 208 else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported."; 209 else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header."; 210 211 if (req_error.length) { 212 logDebug("Browser sent invalid WebSocket request: %s", req_error); 213 res.statusCode = HTTPStatus.badRequest; 214 res.writeBody(req_error); 215 return; 216 } 217 218 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 219 res.headers["Sec-WebSocket-Accept"] = accept; 220 res.headers["Connection"] = "Upgrade"; 221 ConnectionStream conn = res.switchProtocol("websocket"); 222 223 WebSocket socket = new WebSocket(conn, req, res); 224 try { 225 on_handshake(socket); 226 } catch (Exception e) { 227 logDiagnostic("WebSocket handler failed: %s", e.msg); 228 } 229 socket.close(); 230 } 231 /// Scheduled for deprecation - use a `@safe` callback instead. 232 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 233 @system { 234 handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res); 235 } 236 /// Scheduled for deprecation - use a `nothrow` callback instead. 237 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 238 { 239 handleWebSocket((scope ws) nothrow { 240 try on_handshake(ws); 241 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 242 }, req, res); 243 } 244 /// ditto 245 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 246 @system { 247 handleWebSocket((scope ws) nothrow { 248 try on_handshake(ws); 249 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 250 }, req, res); 251 } 252 253 254 /** 255 Returns a HTTP request handler that establishes web socket conections. 256 */ 257 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake) 258 @safe { 259 return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ()); 260 } 261 /// ditto 262 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake) 263 @safe { 264 void callback(scope HTTPServerRequest req, scope HTTPServerResponse res) 265 @safe { 266 auto pUpgrade = "Upgrade" in req.headers; 267 auto pConnection = "Connection" in req.headers; 268 auto pKey = "Sec-WebSocket-Key" in req.headers; 269 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 270 auto pVersion = "Sec-WebSocket-Version" in req.headers; 271 272 auto isUpgrade = false; 273 274 if( pConnection ) { 275 auto connectionTypes = splitter(*pConnection, ","); 276 foreach( t ; connectionTypes ) { 277 if( t.strip().asLowerCase().equal("upgrade") ) { 278 isUpgrade = true; 279 break; 280 } 281 } 282 } 283 if( !(isUpgrade && 284 pUpgrade && icmp(*pUpgrade, "websocket") == 0 && 285 pKey && 286 pVersion && *pVersion == "13") ) 287 { 288 logDebug("Browser sent invalid WebSocket request."); 289 res.statusCode = HTTPStatus.badRequest; 290 res.writeVoidBody(); 291 return; 292 } 293 294 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 295 res.headers["Sec-WebSocket-Accept"] = accept; 296 res.headers["Connection"] = "Upgrade"; 297 res.switchProtocol("websocket", (scope conn) { 298 // TODO: put back 'scope' once it is actually enforced by DMD 299 /*scope*/ auto socket = new WebSocket(conn, req, res); 300 try on_handshake(socket); 301 catch (Exception e) { 302 logDiagnostic("WebSocket handler failed: %s", e.msg); 303 } 304 socket.close(); 305 }); 306 } 307 return &callback; 308 } 309 /// Scheduled for deprecation - use a `@safe` callback instead. 310 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake) 311 @system { 312 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 313 } 314 /// Scheduled for deprecation - use a `@safe` callback instead. 315 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake) 316 @system { 317 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 318 } 319 /// Scheduled for deprecation - use a `nothrow` callback instead. 320 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake) 321 { 322 return handleWebSockets(delegate (scope ws) nothrow { 323 try on_handshake(ws); 324 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 325 }); 326 } 327 /// ditto 328 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake) 329 { 330 return handleWebSockets(delegate (scope ws) nothrow { 331 try on_handshake(ws); 332 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 333 }); 334 } 335 /// ditto 336 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake) 337 @system { 338 return handleWebSockets(delegate (scope ws) nothrow { 339 try on_handshake(ws); 340 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 341 }); 342 } 343 /// ditto 344 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake) 345 @system { 346 return handleWebSockets(delegate (scope ws) nothrow { 347 try on_handshake(ws); 348 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 349 }); 350 } 351 352 /** 353 * Provides the reason that a websocket connection has closed. 354 * 355 * Further documentation for the WebSocket and it's codes can be found from: 356 * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent 357 * 358 * --- 359 * 360 * void echoSocket(scope WebSocket sock) 361 * { 362 * import std.datetime : seconds; 363 * 364 * while(sock.waitForData(3.seconds)) 365 * { 366 * string msg = sock.receiveText; 367 * logInfo("Got a message: %s", msg); 368 * sock.send(msg); 369 * } 370 * 371 * if(sock.connected) 372 * sock.close(WebSocketCloseReason.policyViolation, "timeout"); 373 * } 374 * --- 375 */ 376 enum WebSocketCloseReason : short 377 { 378 none = 0, 379 normalClosure = 1000, 380 goingAway = 1001, 381 protocolError = 1002, 382 unsupportedData = 1003, 383 noStatusReceived = 1005, 384 abnormalClosure = 1006, 385 invalidFramePayloadData = 1007, 386 policyViolation = 1008, 387 messageTooBig = 1009, 388 internalError = 1011, 389 serviceRestart = 1012, 390 tryAgainLater = 1013, 391 badGateway = 1014, 392 tlsHandshake = 1015 393 } 394 395 string closeReasonString(WebSocketCloseReason reason) @nogc @safe 396 { 397 import std.math : floor; 398 399 //round down to the nearest thousand to get category 400 switch(cast(short)(cast(float)reason / 1000f).floor) 401 { 402 case 0: 403 return "Reserved and Unused"; 404 case 1: 405 switch(reason) 406 { 407 case 1000: 408 return "Normal Closure"; 409 case 1001: 410 return "Going Away"; 411 case 1002: 412 return "Protocol Error"; 413 case 1003: 414 return "Unsupported Data"; 415 case 1004: 416 return "RESERVED"; 417 case 1005: 418 return "No Status Recvd"; 419 case 1006: 420 return "Abnormal Closure"; 421 case 1007: 422 return "Invalid Frame Payload Data"; 423 case 1008: 424 return "Policy Violation"; 425 case 1009: 426 return "Message Too Big"; 427 case 1010: 428 return "Missing Extension"; 429 case 1011: 430 return "Internal Error"; 431 case 1012: 432 return "Service Restart"; 433 case 1013: 434 return "Try Again Later"; 435 case 1014: 436 return "Bad Gateway"; 437 case 1015: 438 return "TLS Handshake"; 439 default: 440 return "RESERVED"; 441 } 442 case 2: 443 return "Reserved for extensions"; 444 case 3: 445 return "Available for frameworks and libraries"; 446 case 4: 447 return "Available for applications"; 448 default: 449 return "UNDEFINED - Nasal Demons"; 450 } 451 } 452 453 unittest 454 { 455 assert((cast(WebSocketCloseReason) 0).closeReasonString == "Reserved and Unused"); 456 assert((cast(WebSocketCloseReason) 1).closeReasonString == "Reserved and Unused"); 457 assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure"); 458 assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure"); 459 assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED"); 460 assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions"); 461 assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries"); 462 assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications"); 463 assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons"); 464 assert((cast(WebSocketCloseReason) -1).closeReasonString == "UNDEFINED - Nasal Demons"); 465 466 //check the other spec cases 467 for(short i = 1000; i < 1017; i++) 468 { 469 if(i == 1004 || i > 1015) 470 { 471 assert( 472 (cast(WebSocketCloseReason)i).closeReasonString == "RESERVED", 473 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 474 ); 475 } 476 else 477 assert( 478 (cast(WebSocketCloseReason)i).closeReasonString != "RESERVED", 479 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 480 ); 481 } 482 } 483 484 485 /** 486 * Represents a single _WebSocket connection. 487 * 488 * --- 489 * int main (string[] args) 490 * { 491 * auto taskHandle = runTask(() => connectToWS()); 492 * return runApplication(&args); 493 * } 494 * 495 * void connectToWS () 496 * { 497 * auto ws_url = URL("wss://websockets.example.com/websocket/auth_token"); 498 * auto ws = connectWebSocket(ws_url); 499 * logInfo("WebSocket connected"); 500 * 501 * while (ws.waitForData()) 502 * { 503 * auto txt = ws.receiveText; 504 * logInfo("Received: %s", txt); 505 * } 506 * logFatal("Connection lost!"); 507 * } 508 * --- 509 */ 510 final class WebSocket { 511 @safe: 512 513 private { 514 ConnectionStream m_conn; 515 bool m_sentCloseFrame = false; 516 IncomingWebSocketMessage m_nextMessage = null; 517 const HTTPServerRequest m_request; 518 HTTPServerResponse m_serverResponse; 519 HTTPClientResponse m_clientResponse; 520 Task m_reader; 521 Task m_ownerTask; 522 InterruptibleTaskMutex m_readMutex, m_writeMutex; 523 InterruptibleTaskCondition m_readCondition; 524 Timer m_pingTimer; 525 uint m_lastPingIndex; 526 bool m_pongReceived; 527 short m_closeCode; 528 const(char)[] m_closeReason; 529 /// The entropy generator to use 530 /// If not null, it means this is a server socket. 531 RandomNumberStream m_rng; 532 } 533 534 /** 535 * Private constructor, called from `connectWebSocket`. 536 * 537 * Params: 538 * conn = Underlying connection string 539 * request = HTTP request used to establish the connection 540 * rng = Source of entropy to use. If null, assume we're a server socket 541 * client_res = For client sockets, the response object (keeps the http client locked until the socket is done) 542 */ 543 private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res) 544 { 545 m_ownerTask = Task.getThis(); 546 m_conn = conn; 547 m_request = request; 548 m_clientResponse = client_res; 549 m_serverResponse = server_res; 550 assert(m_conn); 551 m_rng = rng; 552 m_writeMutex = new InterruptibleTaskMutex; 553 m_readMutex = new InterruptibleTaskMutex; 554 m_readCondition = new InterruptibleTaskCondition(m_readMutex); 555 m_readMutex.performLocked!({ 556 m_reader = runTask(&startReader); 557 if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) { 558 m_pongReceived = true; 559 m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true); 560 } 561 }); 562 } 563 564 private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res) 565 { 566 this(conn, null, null, rng, client_res); 567 } 568 569 private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res) 570 { 571 this(conn, request, res, null, null); 572 } 573 574 /** 575 Determines if the WebSocket connection is still alive and ready for sending. 576 577 Note that for determining the ready state for $(EM reading), you need 578 to use $(D waitForData) instead, because both methods can return 579 different values while a disconnect is in proress. 580 581 See_also: $(D waitForData) 582 */ 583 @property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; } 584 585 /** 586 Returns the close code sent by the remote end. 587 588 Note if the connection was never opened, is still alive, or was closed 589 locally this value will be 0. If no close code was given by the remote 590 end in the close frame, the value will be 1005. If the connection was 591 not closed cleanly by the remote end, this value will be 1006. 592 */ 593 @property short closeCode() { return m_closeCode; } 594 595 /** 596 Returns the close reason sent by the remote end. 597 598 Note if the connection was never opened, is still alive, or was closed 599 locally this value will be an empty string. 600 */ 601 @property const(char)[] closeReason() { return m_closeReason; } 602 603 /** 604 The HTTP request that established the web socket connection. 605 */ 606 @property const(HTTPServerRequest) request() const { return m_request; } 607 608 /** 609 Checks if data is readily available for read. 610 */ 611 @property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; } 612 613 /** Waits until either a message arrives or until the connection is closed. 614 615 This function can be used in a read loop to cleanly determine when to stop reading. 616 */ 617 bool waitForData() 618 { 619 if (m_nextMessage) return true; 620 621 m_readMutex.performLocked!({ 622 while (connected && m_nextMessage is null) 623 m_readCondition.wait(); 624 }); 625 return m_nextMessage !is null; 626 } 627 628 /// ditto 629 bool waitForData(Duration timeout) 630 { 631 import std.datetime; 632 633 if (m_nextMessage) return true; 634 635 immutable limit_time = Clock.currTime(UTC()) + timeout; 636 637 m_readMutex.performLocked!({ 638 while (connected && m_nextMessage is null && timeout > 0.seconds) { 639 m_readCondition.wait(timeout); 640 timeout = limit_time - Clock.currTime(UTC()); 641 } 642 }); 643 return m_nextMessage !is null; 644 } 645 646 /** 647 Sends a text message. 648 649 On the JavaScript side, the text will be available as message.data (type string). 650 651 Throws: 652 A `WebSocketException` is thrown if the connection gets closed 653 before or during the transfer of the message. 654 */ 655 void send(scope const(char)[] data) 656 { 657 send( 658 (scope message) { message.write(cast(const ubyte[])data); }, 659 FrameOpcode.text); 660 } 661 662 /** 663 Sends a binary message. 664 665 On the JavaScript side, the text will be available as message.data (type Blob). 666 667 Throws: 668 A `WebSocketException` is thrown if the connection gets closed 669 before or during the transfer of the message. 670 */ 671 void send(in ubyte[] data) 672 { 673 send((scope message){ message.write(data); }, FrameOpcode.binary); 674 } 675 676 /** 677 Sends a message using an output stream. 678 679 Throws: 680 A `WebSocketException` is thrown if the connection gets closed 681 before or during the transfer of the message. 682 */ 683 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode) 684 { 685 m_writeMutex.performLocked!({ 686 enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed."); 687 /*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng); 688 scope(exit) message.finalize(); 689 sender(message); 690 }); 691 } 692 693 /** 694 Actively closes the connection. 695 696 Params: 697 code = Numeric code indicating a termination reason. 698 reason = Message describing why the connection was terminated. 699 */ 700 void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "") 701 { 702 import std.algorithm.comparison : min; 703 if(reason !is null && reason.length == 0) 704 reason = (cast(WebSocketCloseReason)code).closeReasonString; 705 706 //control frame payloads are limited to 125 bytes 707 version(assert) 708 assert(reason.length <= 123); 709 else 710 reason = reason[0 .. min($, 123)]; 711 712 if (connected) { 713 try { 714 send((scope msg) { 715 m_sentCloseFrame = true; 716 if (code != 0) { 717 msg.write(std.bitmanip.nativeToBigEndian(code)); 718 msg.write(cast(const ubyte[])reason); 719 } 720 }, FrameOpcode.close); 721 } catch (Exception e) { 722 logDiagnostic("Failed to send active web socket close frame: %s", e.msg); 723 } 724 } 725 if (m_pingTimer) m_pingTimer.stop(); 726 727 728 if (Task.getThis() == m_ownerTask) { 729 m_writeMutex.performLocked!({ 730 if (m_clientResponse) { 731 m_clientResponse.disconnect(); 732 m_clientResponse = HTTPClientResponse.init; 733 } 734 if (m_serverResponse) { 735 m_serverResponse.finalize(); 736 m_serverResponse = HTTPServerResponse.init; 737 } 738 }); 739 740 m_reader.join(); 741 742 () @trusted { destroy(m_conn); } (); 743 m_conn = ConnectionStream.init; 744 } 745 } 746 747 /** 748 Receives a new message and returns its contents as a newly allocated data array. 749 750 Params: 751 strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise. 752 Throws: WebSocketException if the connection is closed or 753 if $(D strict == true) and the frame received is not the right type 754 */ 755 ubyte[] receiveBinary(bool strict = true) 756 { 757 ubyte[] ret; 758 receive((scope message){ 759 enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary, 760 "Expected a binary message, got "~message.frameOpcode.to!string()); 761 ret = message.readAll(); 762 }); 763 return ret; 764 } 765 /// ditto 766 string receiveText(bool strict = true) 767 { 768 string ret; 769 receive((scope message){ 770 enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text, 771 "Expected a text message, got "~message.frameOpcode.to!string()); 772 ret = message.readAllUTF8(); 773 }); 774 return ret; 775 } 776 777 /** 778 Receives a new message using an InputStream. 779 Throws: WebSocketException if the connection is closed. 780 */ 781 void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver) 782 { 783 m_readMutex.performLocked!({ 784 while (!m_nextMessage) { 785 enforce!WebSocketException(connected, "Connection closed while reading message."); 786 m_readCondition.wait(); 787 } 788 receiver(m_nextMessage); 789 m_nextMessage = null; 790 m_readCondition.notifyAll(); 791 }); 792 } 793 794 private void startReader() 795 nothrow { 796 try m_readMutex.performLocked!({}); //Wait until initialization 797 catch (Exception e) { 798 logException(e, "WebSocket reader task failed to wait for initialization"); 799 try m_conn.close(); 800 catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure"); 801 m_closeCode = WebSocketCloseReason.abnormalClosure; 802 try m_readCondition.notifyAll(); 803 catch (Exception e) assert(false, e.msg); 804 return; 805 } 806 807 try { 808 loop: 809 while (!m_conn.empty) { 810 assert(!m_nextMessage); 811 /*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng); 812 813 switch (msg.frameOpcode) { 814 default: throw new WebSocketException("unknown frame opcode"); 815 case FrameOpcode.ping: 816 send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong); 817 break; 818 case FrameOpcode.pong: 819 // test if pong matches previous ping 820 if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) { 821 logDebugV("Received PONG that doesn't match previous ping."); 822 break; 823 } 824 logDebugV("Received matching PONG."); 825 m_pongReceived = true; 826 break; 827 case FrameOpcode.close: 828 logDebug("Got closing frame (%s)", m_sentCloseFrame); 829 830 // If no close code was passed, we default to 1005 831 this.m_closeCode = WebSocketCloseReason.noStatusReceived; 832 833 // If provided in the frame, attempt to parse the close code/reason 834 if (msg.peek().length >= short.sizeof) { 835 this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]); 836 837 if (msg.peek().length > short.sizeof) { 838 this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$]; 839 } 840 } 841 842 if(!m_sentCloseFrame) close(); 843 logDebug("Terminating connection (%s)", m_sentCloseFrame); 844 break loop; 845 case FrameOpcode.text: 846 case FrameOpcode.binary: 847 case FrameOpcode.continuation: // FIXME: add proper support for continuation frames! 848 m_readMutex.performLocked!({ 849 m_nextMessage = msg; 850 m_readCondition.notifyAll(); 851 while (m_nextMessage) m_readCondition.wait(); 852 }); 853 break; 854 } 855 } 856 } catch (Exception e) { 857 logDiagnostic("Error while reading websocket message: %s", e.msg); 858 logDiagnostic("Closing connection."); 859 } 860 861 // If no close code was passed, e.g. this was an unclean termination 862 // of our websocket connection, set the close code to 1006. 863 if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure; 864 865 try m_conn.close(); 866 catch (Exception e) logException(e, "Failed to close WebSocket connection"); 867 try m_readCondition.notifyAll(); 868 catch (Exception e) assert(false, e.msg); 869 } 870 871 private void sendPing() 872 nothrow { 873 try { 874 if (!m_pongReceived) { 875 logDebug("Pong skipped. Closing connection."); 876 close(); 877 m_pingTimer.stop(); 878 return; 879 } 880 m_pongReceived = false; 881 send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping); 882 logDebugV("Ping sent"); 883 } catch (Exception e) { 884 logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg); 885 } 886 } 887 } 888 889 /** 890 Represents a single outgoing _WebSocket message as an OutputStream. 891 */ 892 final class OutgoingWebSocketMessage : OutputStream { 893 @safe: 894 private { 895 RandomNumberStream m_rng; 896 Stream m_conn; 897 FrameOpcode m_frameOpcode; 898 Appender!(ubyte[]) m_buffer; 899 bool m_finalized = false; 900 } 901 902 private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng) 903 { 904 assert(conn !is null); 905 m_conn = conn; 906 m_frameOpcode = frameOpcode; 907 m_rng = rng; 908 } 909 910 size_t write(in ubyte[] bytes, IOMode mode) 911 { 912 assert(!m_finalized); 913 914 if (!m_buffer.data.length) { 915 ubyte[Frame.maxHeaderSize] header_padding; 916 m_buffer.put(header_padding[]); 917 } 918 919 m_buffer.put(bytes); 920 return bytes.length; 921 } 922 923 void flush() 924 { 925 assert(!m_finalized); 926 if (m_buffer.data.length > 0) 927 sendFrame(false); 928 } 929 930 void finalize() 931 { 932 if (m_finalized) return; 933 m_finalized = true; 934 sendFrame(true); 935 } 936 937 private void sendFrame(bool fin) 938 { 939 if (!m_buffer.data.length) 940 write(null, IOMode.once); 941 942 assert(m_buffer.data.length >= Frame.maxHeaderSize); 943 944 Frame frame; 945 frame.fin = fin; 946 frame.opcode = m_frameOpcode; 947 frame.payload = m_buffer.data[Frame.maxHeaderSize .. $]; 948 auto hsize = frame.getHeaderSize(m_rng !is null); 949 auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $]; 950 frame.writeHeader(msg[0 .. hsize], m_rng); 951 m_conn.write(msg); 952 m_conn.flush(); 953 m_buffer.clear(); 954 } 955 956 alias write = OutputStream.write; 957 } 958 959 960 /** 961 Represents a single incoming _WebSocket message as an InputStream. 962 */ 963 final class IncomingWebSocketMessage : InputStream { 964 @safe: 965 private { 966 RandomNumberStream m_rng; 967 Stream m_conn; 968 Frame m_currentFrame; 969 } 970 971 private this(Stream conn, RandomNumberStream rng) 972 { 973 assert(conn !is null); 974 m_conn = conn; 975 m_rng = rng; 976 skipFrame(); // reads the first frame 977 } 978 979 @property bool empty() const { return m_currentFrame.payload.length == 0; } 980 981 @property ulong leastSize() const { return m_currentFrame.payload.length; } 982 983 @property bool dataAvailableForRead() { return true; } 984 985 /// The frame type for this nessage; 986 @property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; } 987 988 const(ubyte)[] peek() { return m_currentFrame.payload; } 989 990 /** 991 * Retrieve the next websocket frame of the stream and discard the current 992 * one 993 * 994 * This function is helpful if one wish to process frames by frames, 995 * or minimize memory allocation, as `peek` will only return the current 996 * frame data, and read requires a pre-allocated buffer. 997 * 998 * Returns: 999 * `false` if the current frame is the final one, `true` if a new frame 1000 * was read. 1001 */ 1002 bool skipFrame() 1003 { 1004 if (m_currentFrame.fin) 1005 return false; 1006 1007 m_currentFrame = Frame.readFrame(m_conn); 1008 return true; 1009 } 1010 1011 size_t read(scope ubyte[] dst, IOMode mode) 1012 { 1013 size_t nread = 0; 1014 1015 while (dst.length > 0) { 1016 enforce!WebSocketException(!empty , "cannot read from empty stream"); 1017 enforce!WebSocketException(leastSize > 0, "no data available" ); 1018 1019 import std.algorithm : min; 1020 auto sz = cast(size_t)min(leastSize, dst.length); 1021 dst[0 .. sz] = m_currentFrame.payload[0 .. sz]; 1022 dst = dst[sz .. $]; 1023 m_currentFrame.payload = m_currentFrame.payload[sz .. $]; 1024 nread += sz; 1025 1026 if (leastSize == 0) { 1027 if (mode == IOMode.immediate || mode == IOMode.once && nread > 0) 1028 break; 1029 this.skipFrame(); 1030 } 1031 } 1032 1033 return nread; 1034 } 1035 1036 alias read = InputStream.read; 1037 } 1038 1039 /// Magic string defined by the RFC for challenging the server during upgrade 1040 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 1041 1042 1043 /** 1044 * The Opcode is 4 bits, as defined in Section 5.2 1045 * 1046 * Values are defined in section 11.8 1047 * Currently only 6 values are defined, however the opcode is defined as 1048 * taking 4 bits. 1049 */ 1050 public enum FrameOpcode : ubyte { 1051 continuation = 0x0, 1052 text = 0x1, 1053 binary = 0x2, 1054 close = 0x8, 1055 ping = 0x9, 1056 pong = 0xA 1057 } 1058 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits"); 1059 1060 1061 private struct Frame { 1062 @safe: 1063 enum maxHeaderSize = 14; 1064 1065 bool fin; 1066 FrameOpcode opcode; 1067 ubyte[] payload; 1068 1069 /** 1070 * Return the header length encoded with the expected amount of bits 1071 * 1072 * The WebSocket RFC define a variable-length payload length. 1073 * In short, it means that: 1074 * - If the length is <= 125, it is stored as the 7 least significant 1075 * bits of the second header byte. The first bit is reserved for MASK. 1076 * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of 1077 * 126 is stored in the aforementioned 7 bits, and the actual length 1078 * is stored in the next two bytes, resulting in a 4 bytes header 1079 * ( + masking key, if any). 1080 * - If the length is > 65_536, a magic value of 127 will be used for 1081 * the 7-bit field, and the next 8 bytes are expected to be the length, 1082 * resulting in a 10 bytes header ( + masking key, if any). 1083 * 1084 * Those functions encapsulate all this logic and allow to just get the 1085 * length with the desired size. 1086 * 1087 * Return: 1088 * - For `ubyte`, the value to store in the 7 bits field, either the 1089 * length or a magic value (126 or 127). 1090 * - For `ushort`, a value in the range [126; 65_536]. 1091 * If payload.length is not in this bound, an assertion will be triggered. 1092 * - For `ulong`, a value in the range [65_537; size_t.max]. 1093 * If payload.length is not in this bound, an assertion will be triggered. 1094 */ 1095 size_t getHeaderSize(bool mask) 1096 { 1097 size_t ret = 1; 1098 if (payload.length < 126) ret += 1; 1099 else if (payload.length < 65536) ret += 3; 1100 else ret += 9; 1101 if (mask) ret += 4; 1102 return ret; 1103 } 1104 1105 void writeHeader(ubyte[] dst, RandomNumberStream sys_rng) 1106 { 1107 ubyte[4] buff; 1108 ubyte firstByte = cast(ubyte)opcode; 1109 if (fin) firstByte |= 0x80; 1110 dst[0] = firstByte; 1111 dst = dst[1 .. $]; 1112 1113 auto b1 = sys_rng ? 0x80 : 0x00; 1114 1115 if (payload.length < 126) { 1116 dst[0] = cast(ubyte)(b1 | payload.length); 1117 dst = dst[1 .. $]; 1118 } else if (payload.length < 65536) { 1119 dst[0] = cast(ubyte) (b1 | 126); 1120 dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length); 1121 dst = dst[3 .. $]; 1122 } else { 1123 dst[0] = cast(ubyte) (b1 | 127); 1124 dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length); 1125 dst = dst[9 .. $]; 1126 } 1127 1128 if (sys_rng) { 1129 sys_rng.read(dst[0 .. 4]); 1130 for (size_t i = 0; i < payload.length; i++) 1131 payload[i] ^= dst[i % 4]; 1132 } 1133 } 1134 1135 static Frame readFrame(InputStream stream) 1136 { 1137 Frame frame; 1138 ubyte[8] data; 1139 1140 stream.read(data[0 .. 2]); 1141 frame.fin = (data[0] & 0x80) != 0; 1142 frame.opcode = cast(FrameOpcode)(data[0] & 0x0F); 1143 1144 bool masked = !!(data[1] & 0b1000_0000); 1145 1146 //parsing length 1147 ulong length = data[1] & 0b0111_1111; 1148 if (length == 126) { 1149 stream.read(data[0 .. 2]); 1150 length = bigEndianToNative!ushort(data[0 .. 2]); 1151 } else if (length == 127) { 1152 stream.read(data); 1153 length = bigEndianToNative!ulong(data); 1154 1155 // RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes 1156 // interpreted as a 64-bit unsigned integer (the most significant 1157 // bit MUST be 0) 1158 enforce!WebSocketException(!(length >> 63), 1159 "Received length has a non-zero most significant bit"); 1160 1161 } 1162 logDebug("Read frame: %s %s %s length=%d", 1163 frame.opcode, 1164 frame.fin ? "final frame" : "continuation", 1165 masked ? "masked" : "not masked", 1166 length); 1167 1168 // Masking key is 32 bits / uint 1169 if (masked) 1170 stream.read(data[0 .. 4]); 1171 1172 // Read payload 1173 // TODO: Provide a way to limit the size read, easy 1174 // DOS for server code here (rejectedsoftware/vibe.d#1496). 1175 enforce!WebSocketException(length <= size_t.max); 1176 frame.payload = new ubyte[](cast(size_t)length); 1177 stream.read(frame.payload); 1178 1179 //de-masking 1180 if (masked) 1181 foreach (size_t i; 0 .. cast(size_t)length) 1182 frame.payload[i] = frame.payload[i] ^ data[i % 4]; 1183 1184 return frame; 1185 } 1186 } 1187 1188 unittest { 1189 import std.algorithm.searching : all; 1190 1191 final class DummyRNG : RandomNumberStream { 1192 @safe: 1193 @property bool empty() { return false; } 1194 @property ulong leastSize() { return ulong.max; } 1195 @property bool dataAvailableForRead() { return true; } 1196 const(ubyte)[] peek() { return null; } 1197 size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; } 1198 alias read = RandomNumberStream.read; 1199 } 1200 1201 ubyte[14] hdrbuf; 1202 auto rng = new DummyRNG; 1203 1204 Frame f; 1205 f.payload = new ubyte[125]; 1206 1207 assert(f.getHeaderSize(false) == 2); 1208 hdrbuf[] = 0; 1209 f.writeHeader(hdrbuf[0 .. 2], null); 1210 assert(hdrbuf[0 .. 2] == [0, 125]); 1211 1212 assert(f.getHeaderSize(true) == 6); 1213 hdrbuf[] = 0; 1214 f.writeHeader(hdrbuf[0 .. 6], rng); 1215 assert(hdrbuf[0 .. 2] == [0, 128|125]); 1216 assert(hdrbuf[2 .. 6].all!(b => b == 13)); 1217 1218 f.payload = new ubyte[126]; 1219 assert(f.getHeaderSize(false) == 4); 1220 hdrbuf[] = 0; 1221 f.writeHeader(hdrbuf[0 .. 4], null); 1222 assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]); 1223 1224 assert(f.getHeaderSize(true) == 8); 1225 hdrbuf[] = 0; 1226 f.writeHeader(hdrbuf[0 .. 8], rng); 1227 assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]); 1228 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1229 1230 f.payload = new ubyte[65535]; 1231 assert(f.getHeaderSize(false) == 4); 1232 hdrbuf[] = 0; 1233 f.writeHeader(hdrbuf[0 .. 4], null); 1234 assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]); 1235 1236 assert(f.getHeaderSize(true) == 8); 1237 hdrbuf[] = 0; 1238 f.writeHeader(hdrbuf[0 .. 8], rng); 1239 assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]); 1240 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1241 1242 f.payload = new ubyte[65536]; 1243 assert(f.getHeaderSize(false) == 10); 1244 hdrbuf[] = 0; 1245 f.writeHeader(hdrbuf[0 .. 10], null); 1246 assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]); 1247 1248 assert(f.getHeaderSize(true) == 14); 1249 hdrbuf[] = 0; 1250 f.writeHeader(hdrbuf[0 .. 14], rng); 1251 assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]); 1252 assert(hdrbuf[10 .. 14].all!(b => b == 13)); 1253 } 1254 1255 /** 1256 * Generate a challenge key for the protocol upgrade phase. 1257 */ 1258 private string generateChallengeKey(scope RandomNumberStream rng) 1259 { 1260 ubyte[16] buffer; 1261 rng.read(buffer); 1262 return Base64.encode(buffer); 1263 } 1264 1265 private string computeAcceptKey(string challengekey) 1266 { 1267 immutable(ubyte)[] b = challengekey.representation; 1268 immutable(ubyte)[] a = s_webSocketGuid.representation; 1269 SHA1 hash; 1270 hash.start(); 1271 hash.put(b); 1272 hash.put(a); 1273 auto result = Base64.encode(hash.finish()); 1274 return to!(string)(result); 1275 }