1 /** 2 Implements WebSocket support and fallbacks for older browsers. 3 4 Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455) 5 Copyright: © 2012-2014 Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Jan Krüger 8 */ 9 module vibe.http.websockets; 10 11 /// 12 @safe unittest { 13 void handleConn(scope WebSocket sock) 14 { 15 // simple echo server 16 while (sock.connected) { 17 auto msg = sock.receiveText(); 18 sock.send(msg); 19 } 20 } 21 22 void startServer() 23 { 24 import vibe.http.router; 25 auto router = new URLRouter; 26 router.get("/ws", handleWebSockets(&handleConn)); 27 28 // Start HTTP server using listenHTTP()... 29 } 30 } 31 32 import vibe.core.core; 33 import vibe.core.log; 34 import vibe.core.net; 35 import vibe.core.sync; 36 import vibe.stream.operations; 37 import vibe.http.server; 38 import vibe.http.client; 39 import vibe.core.connectionpool; 40 import vibe.utils.array; 41 42 import core.time; 43 import std.algorithm: equal, splitter; 44 import std.array; 45 import std.base64; 46 import std.conv; 47 import std.exception; 48 import std.bitmanip; 49 import std.digest.sha; 50 import std.string; 51 import std.functional; 52 import std.uuid; 53 import std.base64; 54 import std.digest.sha; 55 import std.uni: asLowerCase; 56 import vibe.crypto.cryptorand; 57 58 @safe: 59 60 61 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow; 62 63 64 /// Exception thrown by $(D vibe.http.websockets). 65 class WebSocketException: Exception 66 { 67 @safe pure nothrow: 68 69 /// 70 this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 71 { 72 super(msg, file, line, next); 73 } 74 75 /// 76 this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__) 77 { 78 super(msg, next, file, line); 79 } 80 } 81 82 /** Establishes a WebSocket connection at the specified endpoint. 83 */ 84 WebSocket connectWebSocketEx(URL url, 85 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 86 const(HTTPClientSettings) settings = defaultSettings) 87 @safe { 88 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 89 url.schema = use_tls ? "https" : "http"; 90 91 auto rng = secureRNG(); 92 auto challengeKey = generateChallengeKey(rng); 93 auto answerKey = computeAcceptKey(challengeKey); 94 auto res = requestHTTP(url, (scope req){ 95 req.method = HTTPMethod.GET; 96 req.headers["Upgrade"] = "websocket"; 97 req.headers["Connection"] = "Upgrade"; 98 req.headers["Sec-WebSocket-Version"] = "13"; 99 req.headers["Sec-WebSocket-Key"] = challengeKey; 100 request_modifier(req); 101 }, settings); 102 103 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 104 105 auto key = "sec-websocket-accept" in res.headers; 106 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 107 enforce(*key == answerKey, "Response has wrong accept key"); 108 auto conn = res.switchProtocol("websocket"); 109 return new WebSocket(conn, rng, res); 110 } 111 112 /// ditto 113 void connectWebSocketEx(URL url, 114 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 115 scope WebSocketHandshakeDelegate del, 116 const(HTTPClientSettings) settings = defaultSettings) 117 @safe { 118 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 119 url.schema = use_tls ? "https" : "http"; 120 121 /*scope*/auto rng = secureRNG(); 122 auto challengeKey = generateChallengeKey(rng); 123 auto answerKey = computeAcceptKey(challengeKey); 124 125 requestHTTP(url, 126 (scope req) { 127 req.method = HTTPMethod.GET; 128 req.headers["Upgrade"] = "websocket"; 129 req.headers["Connection"] = "Upgrade"; 130 req.headers["Sec-WebSocket-Version"] = "13"; 131 req.headers["Sec-WebSocket-Key"] = challengeKey; 132 request_modifier(req); 133 }, 134 (scope res) { 135 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 136 auto key = "sec-websocket-accept" in res.headers; 137 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 138 enforce(*key == answerKey, "Response has wrong accept key"); 139 res.switchProtocol("websocket", (scope conn) @trusted { 140 scope ws = new WebSocket(conn, rng, res); 141 del(ws); 142 if (ws.connected) ws.close(); 143 }); 144 }, 145 settings 146 ); 147 } 148 149 /// ditto 150 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings) 151 @safe { 152 return connectWebSocketEx(url, (scope req) {}, settings); 153 } 154 /// ditto 155 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings) 156 @safe { 157 connectWebSocketEx(url, (scope req) {}, del, settings); 158 } 159 /// ditto 160 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings) 161 @system { 162 connectWebSocket(url, (scope ws) nothrow { 163 try del(ws); 164 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 165 }, settings); 166 } 167 /// Scheduled for deprecation - use a `@safe` callback instead. 168 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings) 169 @system { 170 connectWebSocket(url, (scope ws) @trusted => del(ws), settings); 171 } 172 /// Scheduled for deprecation - use a `nothrow` callback instead. 173 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings) 174 @safe { 175 connectWebSocket(url, (scope ws) nothrow { 176 try del(ws); 177 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 178 }, settings); 179 } 180 181 182 /** 183 Establishes a web socket conection and passes it to the $(D on_handshake) delegate. 184 */ 185 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 186 @safe { 187 auto pUpgrade = "Upgrade" in req.headers; 188 auto pConnection = "Connection" in req.headers; 189 auto pKey = "Sec-WebSocket-Key" in req.headers; 190 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 191 auto pVersion = "Sec-WebSocket-Version" in req.headers; 192 193 auto isUpgrade = false; 194 195 if( pConnection ) { 196 auto connectionTypes = splitter(*pConnection, ","); 197 foreach( t ; connectionTypes ) { 198 if( t.strip().asLowerCase().equal("upgrade") ) { 199 isUpgrade = true; 200 break; 201 } 202 } 203 } 204 205 string req_error; 206 if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests."; 207 else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header."; 208 else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported."; 209 else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header."; 210 211 if (req_error.length) { 212 logDebug("Browser sent invalid WebSocket request: %s", req_error); 213 res.statusCode = HTTPStatus.badRequest; 214 res.writeBody(req_error); 215 return; 216 } 217 218 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 219 res.headers["Sec-WebSocket-Accept"] = accept; 220 res.headers["Connection"] = "Upgrade"; 221 ConnectionStream conn = res.switchProtocol("websocket"); 222 223 WebSocket socket = new WebSocket(conn, req, res); 224 try { 225 on_handshake(socket); 226 } catch (Exception e) { 227 logDiagnostic("WebSocket handler failed: %s", e.msg); 228 } 229 socket.close(); 230 } 231 /// Scheduled for deprecation - use a `@safe` callback instead. 232 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 233 @system { 234 handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res); 235 } 236 /// Scheduled for deprecation - use a `nothrow` callback instead. 237 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 238 { 239 handleWebSocket((scope ws) nothrow { 240 try on_handshake(ws); 241 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 242 }, req, res); 243 } 244 /// ditto 245 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 246 @system { 247 handleWebSocket((scope ws) nothrow { 248 try on_handshake(ws); 249 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 250 }, req, res); 251 } 252 253 254 /** 255 Returns a HTTP request handler that establishes web socket conections. 256 */ 257 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake) 258 @safe { 259 return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ()); 260 } 261 /// ditto 262 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake) 263 @safe { 264 void callback(scope HTTPServerRequest req, scope HTTPServerResponse res) 265 @safe { 266 auto pUpgrade = "Upgrade" in req.headers; 267 auto pConnection = "Connection" in req.headers; 268 auto pKey = "Sec-WebSocket-Key" in req.headers; 269 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 270 auto pVersion = "Sec-WebSocket-Version" in req.headers; 271 272 auto isUpgrade = false; 273 274 if( pConnection ) { 275 auto connectionTypes = splitter(*pConnection, ","); 276 foreach( t ; connectionTypes ) { 277 if( t.strip().asLowerCase().equal("upgrade") ) { 278 isUpgrade = true; 279 break; 280 } 281 } 282 } 283 if( !(isUpgrade && 284 pUpgrade && icmp(*pUpgrade, "websocket") == 0 && 285 pKey && 286 pVersion && *pVersion == "13") ) 287 { 288 logDebug("Browser sent invalid WebSocket request."); 289 res.statusCode = HTTPStatus.badRequest; 290 res.writeVoidBody(); 291 return; 292 } 293 294 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 295 res.headers["Sec-WebSocket-Accept"] = accept; 296 res.headers["Connection"] = "Upgrade"; 297 res.switchProtocol("websocket", (scope conn) { 298 // TODO: put back 'scope' once it is actually enforced by DMD 299 /*scope*/ auto socket = new WebSocket(conn, req, res); 300 try on_handshake(socket); 301 catch (Exception e) { 302 logDiagnostic("WebSocket handler failed: %s", e.msg); 303 } 304 socket.close(); 305 }); 306 } 307 return &callback; 308 } 309 /// Scheduled for deprecation - use a `@safe` callback instead. 310 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake) 311 @system { 312 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 313 } 314 /// Scheduled for deprecation - use a `@safe` callback instead. 315 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake) 316 @system { 317 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 318 } 319 /// Scheduled for deprecation - use a `nothrow` callback instead. 320 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake) 321 { 322 return handleWebSockets(delegate (scope ws) nothrow { 323 try on_handshake(ws); 324 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 325 }); 326 } 327 /// ditto 328 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake) 329 { 330 return handleWebSockets(delegate (scope ws) nothrow { 331 try on_handshake(ws); 332 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 333 }); 334 } 335 /// ditto 336 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake) 337 @system { 338 return handleWebSockets(delegate (scope ws) nothrow { 339 try on_handshake(ws); 340 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 341 }); 342 } 343 /// ditto 344 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake) 345 @system { 346 return handleWebSockets(delegate (scope ws) nothrow { 347 try on_handshake(ws); 348 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 349 }); 350 } 351 352 /** 353 * Provides the reason that a websocket connection has closed. 354 * 355 * Further documentation for the WebSocket and it's codes can be found from: 356 * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent 357 * 358 * --- 359 * 360 * void echoSocket(scope WebSocket sock) 361 * { 362 * import std.datetime : seconds; 363 * 364 * while(sock.waitForData(3.seconds)) 365 * { 366 * string msg = sock.receiveText; 367 * logInfo("Got a message: %s", msg); 368 * sock.send(msg); 369 * } 370 * 371 * if(sock.connected) 372 * sock.close(WebSocketCloseReason.policyViolation, "timeout"); 373 * } 374 * --- 375 */ 376 enum WebSocketCloseReason : short 377 { 378 none = 0, 379 normalClosure = 1000, 380 goingAway = 1001, 381 protocolError = 1002, 382 unsupportedData = 1003, 383 noStatusReceived = 1005, 384 abnormalClosure = 1006, 385 invalidFramePayloadData = 1007, 386 policyViolation = 1008, 387 messageTooBig = 1009, 388 internalError = 1011, 389 serviceRestart = 1012, 390 tryAgainLater = 1013, 391 badGateway = 1014, 392 tlsHandshake = 1015 393 } 394 395 string closeReasonString(WebSocketCloseReason reason) @nogc @safe 396 { 397 import std.math : floor; 398 399 //round down to the nearest thousand to get category 400 switch(cast(short)(cast(float)reason / 1000f).floor) 401 { 402 case 0: 403 return "Reserved and Unused"; 404 case 1: 405 switch(reason) 406 { 407 case 1000: 408 return "Normal Closure"; 409 case 1001: 410 return "Going Away"; 411 case 1002: 412 return "Protocol Error"; 413 case 1003: 414 return "Unsupported Data"; 415 case 1004: 416 return "RESERVED"; 417 case 1005: 418 return "No Status Recvd"; 419 case 1006: 420 return "Abnormal Closure"; 421 case 1007: 422 return "Invalid Frame Payload Data"; 423 case 1008: 424 return "Policy Violation"; 425 case 1009: 426 return "Message Too Big"; 427 case 1010: 428 return "Missing Extension"; 429 case 1011: 430 return "Internal Error"; 431 case 1012: 432 return "Service Restart"; 433 case 1013: 434 return "Try Again Later"; 435 case 1014: 436 return "Bad Gateway"; 437 case 1015: 438 return "TLS Handshake"; 439 default: 440 return "RESERVED"; 441 } 442 case 2: 443 return "Reserved for extensions"; 444 case 3: 445 return "Available for frameworks and libraries"; 446 case 4: 447 return "Available for applications"; 448 default: 449 return "UNDEFINED - Nasal Demons"; 450 } 451 } 452 453 unittest 454 { 455 assert((cast(WebSocketCloseReason) 0).closeReasonString == "Reserved and Unused"); 456 assert((cast(WebSocketCloseReason) 1).closeReasonString == "Reserved and Unused"); 457 assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure"); 458 assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure"); 459 assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED"); 460 assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions"); 461 assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries"); 462 assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications"); 463 assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons"); 464 assert((cast(WebSocketCloseReason) -1).closeReasonString == "UNDEFINED - Nasal Demons"); 465 466 //check the other spec cases 467 for(short i = 1000; i < 1017; i++) 468 { 469 if(i == 1004 || i > 1015) 470 { 471 assert( 472 (cast(WebSocketCloseReason)i).closeReasonString == "RESERVED", 473 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 474 ); 475 } 476 else 477 assert( 478 (cast(WebSocketCloseReason)i).closeReasonString != "RESERVED", 479 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 480 ); 481 } 482 } 483 484 485 /** 486 * Represents a single _WebSocket connection. 487 * 488 * --- 489 * shared static this () 490 * { 491 * runTask(() => connectToWS()); 492 * } 493 * 494 * void connectToWS () 495 * { 496 * auto ws_url = URL("wss://websockets.example.com/websocket/auth_token"); 497 * auto ws = connectWebSocket(ws_url); 498 * logInfo("WebSocket connected"); 499 * 500 * while (ws.waitForData()) 501 * { 502 * auto txt = ws.receiveText; 503 * logInfo("Received: %s", txt); 504 * } 505 * logFatal("Connection lost!"); 506 * } 507 * --- 508 */ 509 final class WebSocket { 510 @safe: 511 512 private { 513 ConnectionStream m_conn; 514 bool m_sentCloseFrame = false; 515 IncomingWebSocketMessage m_nextMessage = null; 516 const HTTPServerRequest m_request; 517 HTTPServerResponse m_serverResponse; 518 HTTPClientResponse m_clientResponse; 519 Task m_reader; 520 Task m_ownerTask; 521 InterruptibleTaskMutex m_readMutex, m_writeMutex; 522 InterruptibleTaskCondition m_readCondition; 523 Timer m_pingTimer; 524 uint m_lastPingIndex; 525 bool m_pongReceived; 526 short m_closeCode; 527 const(char)[] m_closeReason; 528 /// The entropy generator to use 529 /// If not null, it means this is a server socket. 530 RandomNumberStream m_rng; 531 } 532 533 /** 534 * Private constructor, called from `connectWebSocket`. 535 * 536 * Params: 537 * conn = Underlying connection string 538 * request = HTTP request used to establish the connection 539 * rng = Source of entropy to use. If null, assume we're a server socket 540 * client_res = For client sockets, the response object (keeps the http client locked until the socket is done) 541 */ 542 private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res) 543 { 544 m_ownerTask = Task.getThis(); 545 m_conn = conn; 546 m_request = request; 547 m_clientResponse = client_res; 548 m_serverResponse = server_res; 549 assert(m_conn); 550 m_rng = rng; 551 m_writeMutex = new InterruptibleTaskMutex; 552 m_readMutex = new InterruptibleTaskMutex; 553 m_readCondition = new InterruptibleTaskCondition(m_readMutex); 554 m_readMutex.performLocked!({ 555 m_reader = runTask(&startReader); 556 if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) { 557 m_pongReceived = true; 558 m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true); 559 } 560 }); 561 } 562 563 private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res) 564 { 565 this(conn, null, null, rng, client_res); 566 } 567 568 private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res) 569 { 570 this(conn, request, res, null, null); 571 } 572 573 /** 574 Determines if the WebSocket connection is still alive and ready for sending. 575 576 Note that for determining the ready state for $(EM reading), you need 577 to use $(D waitForData) instead, because both methods can return 578 different values while a disconnect is in proress. 579 580 See_also: $(D waitForData) 581 */ 582 @property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; } 583 584 /** 585 Returns the close code sent by the remote end. 586 587 Note if the connection was never opened, is still alive, or was closed 588 locally this value will be 0. If no close code was given by the remote 589 end in the close frame, the value will be 1005. If the connection was 590 not closed cleanly by the remote end, this value will be 1006. 591 */ 592 @property short closeCode() { return m_closeCode; } 593 594 /** 595 Returns the close reason sent by the remote end. 596 597 Note if the connection was never opened, is still alive, or was closed 598 locally this value will be an empty string. 599 */ 600 @property const(char)[] closeReason() { return m_closeReason; } 601 602 /** 603 The HTTP request that established the web socket connection. 604 */ 605 @property const(HTTPServerRequest) request() const { return m_request; } 606 607 /** 608 Checks if data is readily available for read. 609 */ 610 @property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; } 611 612 /** Waits until either a message arrives or until the connection is closed. 613 614 This function can be used in a read loop to cleanly determine when to stop reading. 615 */ 616 bool waitForData() 617 { 618 if (m_nextMessage) return true; 619 620 m_readMutex.performLocked!({ 621 while (connected && m_nextMessage is null) 622 m_readCondition.wait(); 623 }); 624 return m_nextMessage !is null; 625 } 626 627 /// ditto 628 bool waitForData(Duration timeout) 629 { 630 import std.datetime; 631 632 if (m_nextMessage) return true; 633 634 immutable limit_time = Clock.currTime(UTC()) + timeout; 635 636 m_readMutex.performLocked!({ 637 while (connected && m_nextMessage is null && timeout > 0.seconds) { 638 m_readCondition.wait(timeout); 639 timeout = limit_time - Clock.currTime(UTC()); 640 } 641 }); 642 return m_nextMessage !is null; 643 } 644 645 /** 646 Sends a text message. 647 648 On the JavaScript side, the text will be available as message.data (type string). 649 650 Throws: 651 A `WebSocketException` is thrown if the connection gets closed 652 before or during the transfer of the message. 653 */ 654 void send(scope const(char)[] data) 655 { 656 send( 657 (scope message) { message.write(cast(const ubyte[])data); }, 658 FrameOpcode.text); 659 } 660 661 /** 662 Sends a binary message. 663 664 On the JavaScript side, the text will be available as message.data (type Blob). 665 666 Throws: 667 A `WebSocketException` is thrown if the connection gets closed 668 before or during the transfer of the message. 669 */ 670 void send(in ubyte[] data) 671 { 672 send((scope message){ message.write(data); }, FrameOpcode.binary); 673 } 674 675 /** 676 Sends a message using an output stream. 677 678 Throws: 679 A `WebSocketException` is thrown if the connection gets closed 680 before or during the transfer of the message. 681 */ 682 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode) 683 { 684 m_writeMutex.performLocked!({ 685 enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed."); 686 /*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng); 687 scope(exit) message.finalize(); 688 sender(message); 689 }); 690 } 691 692 /// Compatibility overload - will be removed soon. 693 deprecated("Call the overload which requires an explicit FrameOpcode.") 694 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender) 695 { 696 send(sender, FrameOpcode.text); 697 } 698 699 /** 700 Actively closes the connection. 701 702 Params: 703 code = Numeric code indicating a termination reason. 704 reason = Message describing why the connection was terminated. 705 */ 706 void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "") 707 { 708 import std.algorithm.comparison : min; 709 if(reason !is null && reason.length == 0) 710 reason = (cast(WebSocketCloseReason)code).closeReasonString; 711 712 //control frame payloads are limited to 125 bytes 713 version(assert) 714 assert(reason.length <= 123); 715 else 716 reason = reason[0 .. min($, 123)]; 717 718 if (connected) { 719 try { 720 send((scope msg) { 721 m_sentCloseFrame = true; 722 if (code != 0) { 723 msg.write(std.bitmanip.nativeToBigEndian(code)); 724 msg.write(cast(const ubyte[])reason); 725 } 726 }, FrameOpcode.close); 727 } catch (Exception e) { 728 logDiagnostic("Failed to send active web socket close frame: %s", e.msg); 729 } 730 } 731 if (m_pingTimer) m_pingTimer.stop(); 732 733 734 if (Task.getThis() == m_ownerTask) { 735 m_writeMutex.performLocked!({ 736 if (m_clientResponse) { 737 m_clientResponse.disconnect(); 738 m_clientResponse = HTTPClientResponse.init; 739 } 740 if (m_serverResponse) { 741 m_serverResponse.finalize(); 742 m_serverResponse = HTTPServerResponse.init; 743 } 744 }); 745 746 m_reader.join(); 747 748 () @trusted { destroy(m_conn); } (); 749 m_conn = ConnectionStream.init; 750 } 751 } 752 753 /** 754 Receives a new message and returns its contents as a newly allocated data array. 755 756 Params: 757 strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise. 758 Throws: WebSocketException if the connection is closed or 759 if $(D strict == true) and the frame received is not the right type 760 */ 761 ubyte[] receiveBinary(bool strict = true) 762 { 763 ubyte[] ret; 764 receive((scope message){ 765 enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary, 766 "Expected a binary message, got "~message.frameOpcode.to!string()); 767 ret = message.readAll(); 768 }); 769 return ret; 770 } 771 /// ditto 772 string receiveText(bool strict = true) 773 { 774 string ret; 775 receive((scope message){ 776 enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text, 777 "Expected a text message, got "~message.frameOpcode.to!string()); 778 ret = message.readAllUTF8(); 779 }); 780 return ret; 781 } 782 783 /** 784 Receives a new message using an InputStream. 785 Throws: WebSocketException if the connection is closed. 786 */ 787 void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver) 788 { 789 m_readMutex.performLocked!({ 790 while (!m_nextMessage) { 791 enforce!WebSocketException(connected, "Connection closed while reading message."); 792 m_readCondition.wait(); 793 } 794 receiver(m_nextMessage); 795 m_nextMessage = null; 796 m_readCondition.notifyAll(); 797 }); 798 } 799 800 private void startReader() 801 nothrow { 802 try m_readMutex.performLocked!({}); //Wait until initialization 803 catch (Exception e) { 804 logException(e, "WebSocket reader task failed to wait for initialization"); 805 try m_conn.close(); 806 catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure"); 807 m_closeCode = WebSocketCloseReason.abnormalClosure; 808 try m_readCondition.notifyAll(); 809 catch (Exception e) assert(false, e.msg); 810 return; 811 } 812 813 try { 814 loop: 815 while (!m_conn.empty) { 816 assert(!m_nextMessage); 817 /*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng); 818 819 switch (msg.frameOpcode) { 820 default: throw new WebSocketException("unknown frame opcode"); 821 case FrameOpcode.ping: 822 send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong); 823 break; 824 case FrameOpcode.pong: 825 // test if pong matches previous ping 826 if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) { 827 logDebugV("Received PONG that doesn't match previous ping."); 828 break; 829 } 830 logDebugV("Received matching PONG."); 831 m_pongReceived = true; 832 break; 833 case FrameOpcode.close: 834 logDebug("Got closing frame (%s)", m_sentCloseFrame); 835 836 // If no close code was passed, we default to 1005 837 this.m_closeCode = WebSocketCloseReason.noStatusReceived; 838 839 // If provided in the frame, attempt to parse the close code/reason 840 if (msg.peek().length >= short.sizeof) { 841 this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]); 842 843 if (msg.peek().length > short.sizeof) { 844 this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$]; 845 } 846 } 847 848 if(!m_sentCloseFrame) close(); 849 logDebug("Terminating connection (%s)", m_sentCloseFrame); 850 break loop; 851 case FrameOpcode.text: 852 case FrameOpcode.binary: 853 case FrameOpcode.continuation: // FIXME: add proper support for continuation frames! 854 m_readMutex.performLocked!({ 855 m_nextMessage = msg; 856 m_readCondition.notifyAll(); 857 while (m_nextMessage) m_readCondition.wait(); 858 }); 859 break; 860 } 861 } 862 } catch (Exception e) { 863 logDiagnostic("Error while reading websocket message: %s", e.msg); 864 logDiagnostic("Closing connection."); 865 } 866 867 // If no close code was passed, e.g. this was an unclean termination 868 // of our websocket connection, set the close code to 1006. 869 if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure; 870 871 try m_conn.close(); 872 catch (Exception e) logException(e, "Failed to close WebSocket connection"); 873 try m_readCondition.notifyAll(); 874 catch (Exception e) assert(false, e.msg); 875 } 876 877 private void sendPing() 878 nothrow { 879 try { 880 if (!m_pongReceived) { 881 logDebug("Pong skipped. Closing connection."); 882 close(); 883 m_pingTimer.stop(); 884 return; 885 } 886 m_pongReceived = false; 887 send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping); 888 logDebugV("Ping sent"); 889 } catch (Exception e) { 890 logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg); 891 } 892 } 893 } 894 895 /** 896 Represents a single outgoing _WebSocket message as an OutputStream. 897 */ 898 final class OutgoingWebSocketMessage : OutputStream { 899 @safe: 900 private { 901 RandomNumberStream m_rng; 902 Stream m_conn; 903 FrameOpcode m_frameOpcode; 904 Appender!(ubyte[]) m_buffer; 905 bool m_finalized = false; 906 } 907 908 private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng) 909 { 910 assert(conn !is null); 911 m_conn = conn; 912 m_frameOpcode = frameOpcode; 913 m_rng = rng; 914 } 915 916 size_t write(in ubyte[] bytes, IOMode mode) 917 { 918 assert(!m_finalized); 919 920 if (!m_buffer.data.length) { 921 ubyte[Frame.maxHeaderSize] header_padding; 922 m_buffer.put(header_padding[]); 923 } 924 925 m_buffer.put(bytes); 926 return bytes.length; 927 } 928 929 void flush() 930 { 931 assert(!m_finalized); 932 if (m_buffer.data.length > 0) 933 sendFrame(false); 934 } 935 936 void finalize() 937 { 938 if (m_finalized) return; 939 m_finalized = true; 940 sendFrame(true); 941 } 942 943 private void sendFrame(bool fin) 944 { 945 if (!m_buffer.data.length) 946 write(null, IOMode.once); 947 948 assert(m_buffer.data.length >= Frame.maxHeaderSize); 949 950 Frame frame; 951 frame.fin = fin; 952 frame.opcode = m_frameOpcode; 953 frame.payload = m_buffer.data[Frame.maxHeaderSize .. $]; 954 auto hsize = frame.getHeaderSize(m_rng !is null); 955 auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $]; 956 frame.writeHeader(msg[0 .. hsize], m_rng); 957 m_conn.write(msg); 958 m_conn.flush(); 959 m_buffer.clear(); 960 } 961 962 alias write = OutputStream.write; 963 } 964 965 966 /** 967 Represents a single incoming _WebSocket message as an InputStream. 968 */ 969 final class IncomingWebSocketMessage : InputStream { 970 @safe: 971 private { 972 RandomNumberStream m_rng; 973 Stream m_conn; 974 Frame m_currentFrame; 975 } 976 977 private this(Stream conn, RandomNumberStream rng) 978 { 979 assert(conn !is null); 980 m_conn = conn; 981 m_rng = rng; 982 skipFrame(); // reads the first frame 983 } 984 985 @property bool empty() const { return m_currentFrame.payload.length == 0; } 986 987 @property ulong leastSize() const { return m_currentFrame.payload.length; } 988 989 @property bool dataAvailableForRead() { return true; } 990 991 /// The frame type for this nessage; 992 @property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; } 993 994 const(ubyte)[] peek() { return m_currentFrame.payload; } 995 996 /** 997 * Retrieve the next websocket frame of the stream and discard the current 998 * one 999 * 1000 * This function is helpful if one wish to process frames by frames, 1001 * or minimize memory allocation, as `peek` will only return the current 1002 * frame data, and read requires a pre-allocated buffer. 1003 * 1004 * Returns: 1005 * `false` if the current frame is the final one, `true` if a new frame 1006 * was read. 1007 */ 1008 bool skipFrame() 1009 { 1010 if (m_currentFrame.fin) 1011 return false; 1012 1013 m_currentFrame = Frame.readFrame(m_conn); 1014 return true; 1015 } 1016 1017 size_t read(scope ubyte[] dst, IOMode mode) 1018 { 1019 size_t nread = 0; 1020 1021 while (dst.length > 0) { 1022 enforce!WebSocketException(!empty , "cannot read from empty stream"); 1023 enforce!WebSocketException(leastSize > 0, "no data available" ); 1024 1025 import std.algorithm : min; 1026 auto sz = cast(size_t)min(leastSize, dst.length); 1027 dst[0 .. sz] = m_currentFrame.payload[0 .. sz]; 1028 dst = dst[sz .. $]; 1029 m_currentFrame.payload = m_currentFrame.payload[sz .. $]; 1030 nread += sz; 1031 1032 if (leastSize == 0) { 1033 if (mode == IOMode.immediate || mode == IOMode.once && nread > 0) 1034 break; 1035 this.skipFrame(); 1036 } 1037 } 1038 1039 return nread; 1040 } 1041 1042 alias read = InputStream.read; 1043 } 1044 1045 /// Magic string defined by the RFC for challenging the server during upgrade 1046 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 1047 1048 1049 /** 1050 * The Opcode is 4 bits, as defined in Section 5.2 1051 * 1052 * Values are defined in section 11.8 1053 * Currently only 6 values are defined, however the opcode is defined as 1054 * taking 4 bits. 1055 */ 1056 public enum FrameOpcode : ubyte { 1057 continuation = 0x0, 1058 text = 0x1, 1059 binary = 0x2, 1060 close = 0x8, 1061 ping = 0x9, 1062 pong = 0xA 1063 } 1064 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits"); 1065 1066 1067 private struct Frame { 1068 @safe: 1069 enum maxHeaderSize = 14; 1070 1071 bool fin; 1072 FrameOpcode opcode; 1073 ubyte[] payload; 1074 1075 /** 1076 * Return the header length encoded with the expected amount of bits 1077 * 1078 * The WebSocket RFC define a variable-length payload length. 1079 * In short, it means that: 1080 * - If the length is <= 125, it is stored as the 7 least significant 1081 * bits of the second header byte. The first bit is reserved for MASK. 1082 * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of 1083 * 126 is stored in the aforementioned 7 bits, and the actual length 1084 * is stored in the next two bytes, resulting in a 4 bytes header 1085 * ( + masking key, if any). 1086 * - If the length is > 65_536, a magic value of 127 will be used for 1087 * the 7-bit field, and the next 8 bytes are expected to be the length, 1088 * resulting in a 10 bytes header ( + masking key, if any). 1089 * 1090 * Those functions encapsulate all this logic and allow to just get the 1091 * length with the desired size. 1092 * 1093 * Return: 1094 * - For `ubyte`, the value to store in the 7 bits field, either the 1095 * length or a magic value (126 or 127). 1096 * - For `ushort`, a value in the range [126; 65_536]. 1097 * If payload.length is not in this bound, an assertion will be triggered. 1098 * - For `ulong`, a value in the range [65_537; size_t.max]. 1099 * If payload.length is not in this bound, an assertion will be triggered. 1100 */ 1101 size_t getHeaderSize(bool mask) 1102 { 1103 size_t ret = 1; 1104 if (payload.length < 126) ret += 1; 1105 else if (payload.length < 65536) ret += 3; 1106 else ret += 9; 1107 if (mask) ret += 4; 1108 return ret; 1109 } 1110 1111 void writeHeader(ubyte[] dst, RandomNumberStream sys_rng) 1112 { 1113 ubyte[4] buff; 1114 ubyte firstByte = cast(ubyte)opcode; 1115 if (fin) firstByte |= 0x80; 1116 dst[0] = firstByte; 1117 dst = dst[1 .. $]; 1118 1119 auto b1 = sys_rng ? 0x80 : 0x00; 1120 1121 if (payload.length < 126) { 1122 dst[0] = cast(ubyte)(b1 | payload.length); 1123 dst = dst[1 .. $]; 1124 } else if (payload.length < 65536) { 1125 dst[0] = cast(ubyte) (b1 | 126); 1126 dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length); 1127 dst = dst[3 .. $]; 1128 } else { 1129 dst[0] = cast(ubyte) (b1 | 127); 1130 dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length); 1131 dst = dst[9 .. $]; 1132 } 1133 1134 if (sys_rng) { 1135 sys_rng.read(dst[0 .. 4]); 1136 for (size_t i = 0; i < payload.length; i++) 1137 payload[i] ^= dst[i % 4]; 1138 } 1139 } 1140 1141 static Frame readFrame(InputStream stream) 1142 { 1143 Frame frame; 1144 ubyte[8] data; 1145 1146 stream.read(data[0 .. 2]); 1147 frame.fin = (data[0] & 0x80) != 0; 1148 frame.opcode = cast(FrameOpcode)(data[0] & 0x0F); 1149 1150 bool masked = !!(data[1] & 0b1000_0000); 1151 1152 //parsing length 1153 ulong length = data[1] & 0b0111_1111; 1154 if (length == 126) { 1155 stream.read(data[0 .. 2]); 1156 length = bigEndianToNative!ushort(data[0 .. 2]); 1157 } else if (length == 127) { 1158 stream.read(data); 1159 length = bigEndianToNative!ulong(data); 1160 1161 // RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes 1162 // interpreted as a 64-bit unsigned integer (the most significant 1163 // bit MUST be 0) 1164 enforce!WebSocketException(!(length >> 63), 1165 "Received length has a non-zero most significant bit"); 1166 1167 } 1168 logDebug("Read frame: %s %s %s length=%d", 1169 frame.opcode, 1170 frame.fin ? "final frame" : "continuation", 1171 masked ? "masked" : "not masked", 1172 length); 1173 1174 // Masking key is 32 bits / uint 1175 if (masked) 1176 stream.read(data[0 .. 4]); 1177 1178 // Read payload 1179 // TODO: Provide a way to limit the size read, easy 1180 // DOS for server code here (rejectedsoftware/vibe.d#1496). 1181 enforce!WebSocketException(length <= size_t.max); 1182 frame.payload = new ubyte[](cast(size_t)length); 1183 stream.read(frame.payload); 1184 1185 //de-masking 1186 if (masked) 1187 foreach (size_t i; 0 .. cast(size_t)length) 1188 frame.payload[i] = frame.payload[i] ^ data[i % 4]; 1189 1190 return frame; 1191 } 1192 } 1193 1194 unittest { 1195 import std.algorithm.searching : all; 1196 1197 final class DummyRNG : RandomNumberStream { 1198 @safe: 1199 @property bool empty() { return false; } 1200 @property ulong leastSize() { return ulong.max; } 1201 @property bool dataAvailableForRead() { return true; } 1202 const(ubyte)[] peek() { return null; } 1203 size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; } 1204 alias read = RandomNumberStream.read; 1205 } 1206 1207 ubyte[14] hdrbuf; 1208 auto rng = new DummyRNG; 1209 1210 Frame f; 1211 f.payload = new ubyte[125]; 1212 1213 assert(f.getHeaderSize(false) == 2); 1214 hdrbuf[] = 0; 1215 f.writeHeader(hdrbuf[0 .. 2], null); 1216 assert(hdrbuf[0 .. 2] == [0, 125]); 1217 1218 assert(f.getHeaderSize(true) == 6); 1219 hdrbuf[] = 0; 1220 f.writeHeader(hdrbuf[0 .. 6], rng); 1221 assert(hdrbuf[0 .. 2] == [0, 128|125]); 1222 assert(hdrbuf[2 .. 6].all!(b => b == 13)); 1223 1224 f.payload = new ubyte[126]; 1225 assert(f.getHeaderSize(false) == 4); 1226 hdrbuf[] = 0; 1227 f.writeHeader(hdrbuf[0 .. 4], null); 1228 assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]); 1229 1230 assert(f.getHeaderSize(true) == 8); 1231 hdrbuf[] = 0; 1232 f.writeHeader(hdrbuf[0 .. 8], rng); 1233 assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]); 1234 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1235 1236 f.payload = new ubyte[65535]; 1237 assert(f.getHeaderSize(false) == 4); 1238 hdrbuf[] = 0; 1239 f.writeHeader(hdrbuf[0 .. 4], null); 1240 assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]); 1241 1242 assert(f.getHeaderSize(true) == 8); 1243 hdrbuf[] = 0; 1244 f.writeHeader(hdrbuf[0 .. 8], rng); 1245 assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]); 1246 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1247 1248 f.payload = new ubyte[65536]; 1249 assert(f.getHeaderSize(false) == 10); 1250 hdrbuf[] = 0; 1251 f.writeHeader(hdrbuf[0 .. 10], null); 1252 assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]); 1253 1254 assert(f.getHeaderSize(true) == 14); 1255 hdrbuf[] = 0; 1256 f.writeHeader(hdrbuf[0 .. 14], rng); 1257 assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]); 1258 assert(hdrbuf[10 .. 14].all!(b => b == 13)); 1259 } 1260 1261 /** 1262 * Generate a challenge key for the protocol upgrade phase. 1263 */ 1264 private string generateChallengeKey(scope RandomNumberStream rng) 1265 { 1266 ubyte[16] buffer; 1267 rng.read(buffer); 1268 return Base64.encode(buffer); 1269 } 1270 1271 private string computeAcceptKey(string challengekey) 1272 { 1273 immutable(ubyte)[] b = challengekey.representation; 1274 immutable(ubyte)[] a = s_webSocketGuid.representation; 1275 SHA1 hash; 1276 hash.start(); 1277 hash.put(b); 1278 hash.put(a); 1279 auto result = Base64.encode(hash.finish()); 1280 return to!(string)(result); 1281 }