1 /** 2 Implements WebSocket support and fallbacks for older browsers. 3 4 Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455) 5 Copyright: © 2012-2014 Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Jan Krüger 8 */ 9 module vibe.http.websockets; 10 11 /// 12 @safe unittest { 13 void handleConn(scope WebSocket sock) 14 { 15 // simple echo server 16 while (sock.connected) { 17 auto msg = sock.receiveText(); 18 sock.send(msg); 19 } 20 } 21 22 void startServer() 23 { 24 import vibe.http.router; 25 auto router = new URLRouter; 26 router.get("/ws", handleWebSockets(&handleConn)); 27 28 // Start HTTP server using listenHTTP()... 29 } 30 } 31 32 import vibe.core.core; 33 import vibe.core.log; 34 import vibe.core.net; 35 import vibe.core.sync; 36 import vibe.stream.operations; 37 import vibe.http.server; 38 import vibe.http.client; 39 import vibe.core.connectionpool; 40 import vibe.utils.array; 41 42 import core.time; 43 import std.algorithm: equal, splitter; 44 import std.array; 45 import std.base64; 46 import std.conv; 47 import std.exception; 48 import std.bitmanip; 49 import std.digest.sha; 50 import std.string; 51 import std.functional; 52 import std.uuid; 53 import std.base64; 54 import std.digest.sha; 55 import std.uni: asLowerCase; 56 import vibe.crypto.cryptorand; 57 58 @safe: 59 60 61 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow; 62 63 64 /// Exception thrown by $(D vibe.http.websockets). 65 class WebSocketException: Exception 66 { 67 @safe pure nothrow: 68 69 /// 70 this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 71 { 72 super(msg, file, line, next); 73 } 74 75 /// 76 this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__) 77 { 78 super(msg, next, file, line); 79 } 80 } 81 82 /** Establishes a WebSocket connection at the specified endpoint. 83 */ 84 WebSocket connectWebSocketEx(URL url, 85 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 86 const(HTTPClientSettings) settings = defaultSettings) 87 @safe { 88 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 89 url.schema = use_tls ? "https" : "http"; 90 91 auto rng = secureRNG(); 92 auto challengeKey = generateChallengeKey(rng); 93 auto answerKey = computeAcceptKey(challengeKey); 94 auto res = requestHTTP(url, (scope req){ 95 req.method = HTTPMethod.GET; 96 req.headers["Upgrade"] = "websocket"; 97 req.headers["Connection"] = "Upgrade"; 98 req.headers["Sec-WebSocket-Version"] = "13"; 99 req.headers["Sec-WebSocket-Key"] = challengeKey; 100 request_modifier(req); 101 }, settings); 102 103 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 104 105 auto key = "sec-websocket-accept" in res.headers; 106 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 107 enforce(*key == answerKey, "Response has wrong accept key"); 108 auto conn = res.switchProtocol("websocket"); 109 return new WebSocket(conn, rng, res); 110 } 111 112 /// ditto 113 void connectWebSocketEx(URL url, 114 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 115 scope WebSocketHandshakeDelegate del, 116 const(HTTPClientSettings) settings = defaultSettings) 117 @safe { 118 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 119 url.schema = use_tls ? "https" : "http"; 120 121 /*scope*/auto rng = secureRNG(); 122 auto challengeKey = generateChallengeKey(rng); 123 auto answerKey = computeAcceptKey(challengeKey); 124 125 requestHTTP(url, 126 (scope req) { 127 req.method = HTTPMethod.GET; 128 req.headers["Upgrade"] = "websocket"; 129 req.headers["Connection"] = "Upgrade"; 130 req.headers["Sec-WebSocket-Version"] = "13"; 131 req.headers["Sec-WebSocket-Key"] = challengeKey; 132 request_modifier(req); 133 }, 134 (scope res) { 135 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 136 auto key = "sec-websocket-accept" in res.headers; 137 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 138 enforce(*key == answerKey, "Response has wrong accept key"); 139 res.switchProtocol("websocket", (scope conn) @trusted { 140 scope ws = new WebSocket(conn, rng, res); 141 del(ws); 142 if (ws.connected) ws.close(); 143 }); 144 }, 145 settings 146 ); 147 } 148 149 /// ditto 150 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings) 151 @safe { 152 return connectWebSocketEx(url, (scope req) {}, settings); 153 } 154 /// ditto 155 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings) 156 @safe { 157 connectWebSocketEx(url, (scope req) {}, del, settings); 158 } 159 /// ditto 160 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings) 161 @system { 162 connectWebSocket(url, (scope ws) nothrow { 163 try del(ws); 164 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 165 }, settings); 166 } 167 /// Scheduled for deprecation - use a `@safe` callback instead. 168 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings) 169 @system { 170 connectWebSocket(url, (scope ws) @trusted => del(ws), settings); 171 } 172 /// Scheduled for deprecation - use a `nothrow` callback instead. 173 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings) 174 @safe { 175 connectWebSocket(url, (scope ws) nothrow { 176 try del(ws); 177 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 178 }, settings); 179 } 180 181 182 /** 183 Establishes a web socket conection and passes it to the $(D on_handshake) delegate. 184 */ 185 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 186 @safe { 187 auto pUpgrade = "Upgrade" in req.headers; 188 auto pConnection = "Connection" in req.headers; 189 auto pKey = "Sec-WebSocket-Key" in req.headers; 190 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 191 auto pVersion = "Sec-WebSocket-Version" in req.headers; 192 193 auto isUpgrade = false; 194 195 if( pConnection ) { 196 auto connectionTypes = splitter(*pConnection, ","); 197 foreach( t ; connectionTypes ) { 198 if( t.strip().asLowerCase().equal("upgrade") ) { 199 isUpgrade = true; 200 break; 201 } 202 } 203 } 204 205 string req_error; 206 if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests."; 207 else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header."; 208 else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported."; 209 else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header."; 210 211 if (req_error.length) { 212 logDebug("Browser sent invalid WebSocket request: %s", req_error); 213 res.statusCode = HTTPStatus.badRequest; 214 res.writeBody(req_error); 215 return; 216 } 217 218 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 219 res.headers["Sec-WebSocket-Accept"] = accept; 220 res.headers["Connection"] = "Upgrade"; 221 ConnectionStream conn = res.switchProtocol("websocket"); 222 223 WebSocket socket = new WebSocket(conn, req, res); 224 try { 225 on_handshake(socket); 226 } catch (Exception e) { 227 logDiagnostic("WebSocket handler failed: %s", e.msg); 228 } 229 socket.close(); 230 } 231 /// Scheduled for deprecation - use a `@safe` callback instead. 232 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 233 @system { 234 handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res); 235 } 236 /// Scheduled for deprecation - use a `nothrow` callback instead. 237 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 238 { 239 handleWebSocket((scope ws) nothrow { 240 try on_handshake(ws); 241 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 242 }, req, res); 243 } 244 /// ditto 245 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 246 @system { 247 handleWebSocket((scope ws) nothrow { 248 try on_handshake(ws); 249 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 250 }, req, res); 251 } 252 253 254 /** 255 Returns a HTTP request handler that establishes web socket conections. 256 */ 257 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake) 258 @safe { 259 return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ()); 260 } 261 /// ditto 262 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake) 263 @safe { 264 void callback(scope HTTPServerRequest req, scope HTTPServerResponse res) 265 @safe { 266 auto pUpgrade = "Upgrade" in req.headers; 267 auto pConnection = "Connection" in req.headers; 268 auto pKey = "Sec-WebSocket-Key" in req.headers; 269 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 270 auto pVersion = "Sec-WebSocket-Version" in req.headers; 271 272 auto isUpgrade = false; 273 274 if( pConnection ) { 275 auto connectionTypes = splitter(*pConnection, ","); 276 foreach( t ; connectionTypes ) { 277 if( t.strip().asLowerCase().equal("upgrade") ) { 278 isUpgrade = true; 279 break; 280 } 281 } 282 } 283 if( !(isUpgrade && 284 pUpgrade && icmp(*pUpgrade, "websocket") == 0 && 285 pKey && 286 pVersion && *pVersion == "13") ) 287 { 288 logDebug("Browser sent invalid WebSocket request."); 289 res.statusCode = HTTPStatus.badRequest; 290 res.writeVoidBody(); 291 return; 292 } 293 294 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 295 res.headers["Sec-WebSocket-Accept"] = accept; 296 res.headers["Connection"] = "Upgrade"; 297 res.switchProtocol("websocket", (scope conn) { 298 // TODO: put back 'scope' once it is actually enforced by DMD 299 /*scope*/ auto socket = new WebSocket(conn, req, res); 300 try on_handshake(socket); 301 catch (Exception e) { 302 logDiagnostic("WebSocket handler failed: %s", e.msg); 303 } 304 socket.close(); 305 }); 306 } 307 return &callback; 308 } 309 /// Scheduled for deprecation - use a `@safe` callback instead. 310 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake) 311 @system { 312 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 313 } 314 /// Scheduled for deprecation - use a `@safe` callback instead. 315 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake) 316 @system { 317 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 318 } 319 /// Scheduled for deprecation - use a `nothrow` callback instead. 320 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake) 321 { 322 return handleWebSockets(delegate (scope ws) nothrow { 323 try on_handshake(ws); 324 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 325 }); 326 } 327 /// ditto 328 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake) 329 { 330 return handleWebSockets(delegate (scope ws) nothrow { 331 try on_handshake(ws); 332 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 333 }); 334 } 335 /// ditto 336 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake) 337 @system { 338 return handleWebSockets(delegate (scope ws) nothrow { 339 try on_handshake(ws); 340 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 341 }); 342 } 343 /// ditto 344 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake) 345 @system { 346 return handleWebSockets(delegate (scope ws) nothrow { 347 try on_handshake(ws); 348 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 349 }); 350 } 351 352 /** 353 * Provides the reason that a websocket connection has closed. 354 * 355 * Further documentation for the WebSocket and it's codes can be found from: 356 * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent 357 * 358 * --- 359 * 360 * void echoSocket(scope WebSocket sock) 361 * { 362 * import std.datetime : seconds; 363 * 364 * while(sock.waitForData(3.seconds)) 365 * { 366 * string msg = sock.receiveText; 367 * logInfo("Got a message: %s", msg); 368 * sock.send(msg); 369 * } 370 * 371 * if(sock.connected) 372 * sock.close(WebSocketCloseReason.policyViolation, "timeout"); 373 * } 374 * --- 375 */ 376 enum WebSocketCloseReason : short 377 { 378 none = 0, 379 normalClosure = 1000, 380 goingAway = 1001, 381 protocolError = 1002, 382 unsupportedData = 1003, 383 noStatusReceived = 1005, 384 abnormalClosure = 1006, 385 invalidFramePayloadData = 1007, 386 policyViolation = 1008, 387 messageTooBig = 1009, 388 internalError = 1011, 389 serviceRestart = 1012, 390 tryAgainLater = 1013, 391 badGateway = 1014, 392 tlsHandshake = 1015 393 } 394 395 string closeReasonString(WebSocketCloseReason reason) @nogc @safe 396 { 397 import std.math : floor; 398 399 //round down to the nearest thousand to get category 400 switch(cast(short)(cast(float)reason / 1000f).floor) 401 { 402 case 0: 403 return "Reserved and Unused"; 404 case 1: 405 switch(reason) 406 { 407 case 1000: 408 return "Normal Closure"; 409 case 1001: 410 return "Going Away"; 411 case 1002: 412 return "Protocol Error"; 413 case 1003: 414 return "Unsupported Data"; 415 case 1004: 416 return "RESERVED"; 417 case 1005: 418 return "No Status Recvd"; 419 case 1006: 420 return "Abnormal Closure"; 421 case 1007: 422 return "Invalid Frame Payload Data"; 423 case 1008: 424 return "Policy Violation"; 425 case 1009: 426 return "Message Too Big"; 427 case 1010: 428 return "Missing Extension"; 429 case 1011: 430 return "Internal Error"; 431 case 1012: 432 return "Service Restart"; 433 case 1013: 434 return "Try Again Later"; 435 case 1014: 436 return "Bad Gateway"; 437 case 1015: 438 return "TLS Handshake"; 439 default: 440 return "RESERVED"; 441 } 442 case 2: 443 return "Reserved for extensions"; 444 case 3: 445 return "Available for frameworks and libraries"; 446 case 4: 447 return "Available for applications"; 448 default: 449 return "UNDEFINED - Nasal Demons"; 450 } 451 } 452 453 unittest 454 { 455 assert((cast(WebSocketCloseReason) 0).closeReasonString == "Reserved and Unused"); 456 assert((cast(WebSocketCloseReason) 1).closeReasonString == "Reserved and Unused"); 457 assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure"); 458 assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure"); 459 assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED"); 460 assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions"); 461 assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries"); 462 assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications"); 463 assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons"); 464 assert((cast(WebSocketCloseReason) -1).closeReasonString == "UNDEFINED - Nasal Demons"); 465 466 //check the other spec cases 467 for(short i = 1000; i < 1017; i++) 468 { 469 if(i == 1004 || i > 1015) 470 { 471 assert( 472 (cast(WebSocketCloseReason)i).closeReasonString == "RESERVED", 473 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 474 ); 475 } 476 else 477 assert( 478 (cast(WebSocketCloseReason)i).closeReasonString != "RESERVED", 479 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 480 ); 481 } 482 } 483 484 485 /** 486 * Represents a single _WebSocket connection. 487 * 488 * --- 489 * int main (string[] args) 490 * { 491 * auto taskHandle = runTask(() => connectToWS()); 492 * return runApplication(&args); 493 * } 494 * 495 * void connectToWS () 496 * { 497 * auto ws_url = URL("wss://websockets.example.com/websocket/auth_token"); 498 * auto ws = connectWebSocket(ws_url); 499 * logInfo("WebSocket connected"); 500 * 501 * while (ws.waitForData()) 502 * { 503 * auto txt = ws.receiveText; 504 * logInfo("Received: %s", txt); 505 * } 506 * logFatal("Connection lost!"); 507 * } 508 * --- 509 */ 510 final class WebSocket { 511 @safe: 512 513 private { 514 ConnectionStream m_conn; 515 bool m_sentCloseFrame = false; 516 IncomingWebSocketMessage m_nextMessage = null; 517 const HTTPServerRequest m_request; 518 HTTPServerResponse m_serverResponse; 519 HTTPClientResponse m_clientResponse; 520 Task m_reader; 521 Task m_ownerTask; 522 InterruptibleTaskMutex m_readMutex, m_writeMutex; 523 InterruptibleTaskCondition m_readCondition; 524 Timer m_pingTimer; 525 uint m_lastPingIndex; 526 bool m_pongReceived; 527 short m_closeCode; 528 const(char)[] m_closeReason; 529 /// The entropy generator to use 530 /// If not null, it means this is a server socket. 531 RandomNumberStream m_rng; 532 } 533 534 /** 535 * Private constructor, called from `connectWebSocket`. 536 * 537 * Params: 538 * conn = Underlying connection string 539 * request = HTTP request used to establish the connection 540 * rng = Source of entropy to use. If null, assume we're a server socket 541 * client_res = For client sockets, the response object (keeps the http client locked until the socket is done) 542 */ 543 private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res) 544 { 545 m_ownerTask = Task.getThis(); 546 m_conn = conn; 547 m_request = request; 548 m_clientResponse = client_res; 549 m_serverResponse = server_res; 550 assert(m_conn); 551 m_rng = rng; 552 m_writeMutex = new InterruptibleTaskMutex; 553 m_readMutex = new InterruptibleTaskMutex; 554 m_readCondition = new InterruptibleTaskCondition(m_readMutex); 555 m_readMutex.performLocked!({ 556 m_reader = runTask(&startReader); 557 if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) { 558 m_pongReceived = true; 559 m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true); 560 } 561 }); 562 } 563 564 private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res) 565 { 566 this(conn, null, null, rng, client_res); 567 } 568 569 private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res) 570 { 571 this(conn, request, res, null, null); 572 } 573 574 /** 575 Determines if the WebSocket connection is still alive and ready for sending. 576 577 Note that for determining the ready state for $(EM reading), you need 578 to use $(D waitForData) instead, because both methods can return 579 different values while a disconnect is in proress. 580 581 See_also: $(D waitForData) 582 */ 583 @property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; } 584 585 /** 586 Returns the close code sent by the remote end. 587 588 Note if the connection was never opened, is still alive, or was closed 589 locally this value will be 0. If no close code was given by the remote 590 end in the close frame, the value will be 1005. If the connection was 591 not closed cleanly by the remote end, this value will be 1006. 592 */ 593 @property short closeCode() { return m_closeCode; } 594 595 /** 596 Returns the close reason sent by the remote end. 597 598 Note if the connection was never opened, is still alive, or was closed 599 locally this value will be an empty string. 600 */ 601 @property const(char)[] closeReason() { return m_closeReason; } 602 603 /** 604 The HTTP request that established the web socket connection. 605 */ 606 @property const(HTTPServerRequest) request() const { return m_request; } 607 608 /** 609 Checks if data is readily available for read. 610 */ 611 @property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; } 612 613 /** Waits until either a message arrives or until the connection is closed. 614 615 This function can be used in a read loop to cleanly determine when to stop reading. 616 */ 617 bool waitForData() 618 { 619 if (m_nextMessage) return true; 620 621 m_readMutex.performLocked!({ 622 while (connected && m_nextMessage is null) 623 m_readCondition.wait(); 624 }); 625 return m_nextMessage !is null; 626 } 627 628 /// ditto 629 bool waitForData(Duration timeout) 630 { 631 import std.datetime; 632 633 if (m_nextMessage) return true; 634 635 immutable limit_time = Clock.currTime(UTC()) + timeout; 636 637 m_readMutex.performLocked!({ 638 while (connected && m_nextMessage is null && timeout > 0.seconds) { 639 m_readCondition.wait(timeout); 640 timeout = limit_time - Clock.currTime(UTC()); 641 } 642 }); 643 return m_nextMessage !is null; 644 } 645 646 /** 647 Sends a text message. 648 649 On the JavaScript side, the text will be available as message.data (type string). 650 651 Throws: 652 A `WebSocketException` is thrown if the connection gets closed 653 before or during the transfer of the message. 654 */ 655 void send(scope const(char)[] data) 656 { 657 send( 658 (scope message) { message.write(cast(const ubyte[])data); }, 659 FrameOpcode.text); 660 } 661 662 /** 663 Sends a binary message. 664 665 On the JavaScript side, the text will be available as message.data (type Blob). 666 667 Throws: 668 A `WebSocketException` is thrown if the connection gets closed 669 before or during the transfer of the message. 670 */ 671 void send(in ubyte[] data) 672 { 673 send((scope message){ message.write(data); }, FrameOpcode.binary); 674 } 675 676 /** 677 Sends a message using an output stream. 678 679 Throws: 680 A `WebSocketException` is thrown if the connection gets closed 681 before or during the transfer of the message. 682 */ 683 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode) 684 { 685 m_writeMutex.performLocked!({ 686 enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed."); 687 /*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng); 688 scope(exit) message.finalize(); 689 sender(message); 690 }); 691 } 692 693 /// Compatibility overload - will be removed soon. 694 deprecated("Call the overload which requires an explicit FrameOpcode.") 695 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender) 696 { 697 send(sender, FrameOpcode.text); 698 } 699 700 /** 701 Actively closes the connection. 702 703 Params: 704 code = Numeric code indicating a termination reason. 705 reason = Message describing why the connection was terminated. 706 */ 707 void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "") 708 { 709 import std.algorithm.comparison : min; 710 if(reason !is null && reason.length == 0) 711 reason = (cast(WebSocketCloseReason)code).closeReasonString; 712 713 //control frame payloads are limited to 125 bytes 714 version(assert) 715 assert(reason.length <= 123); 716 else 717 reason = reason[0 .. min($, 123)]; 718 719 if (connected) { 720 try { 721 send((scope msg) { 722 m_sentCloseFrame = true; 723 if (code != 0) { 724 msg.write(std.bitmanip.nativeToBigEndian(code)); 725 msg.write(cast(const ubyte[])reason); 726 } 727 }, FrameOpcode.close); 728 } catch (Exception e) { 729 logDiagnostic("Failed to send active web socket close frame: %s", e.msg); 730 } 731 } 732 if (m_pingTimer) m_pingTimer.stop(); 733 734 735 if (Task.getThis() == m_ownerTask) { 736 m_writeMutex.performLocked!({ 737 if (m_clientResponse) { 738 m_clientResponse.disconnect(); 739 m_clientResponse = HTTPClientResponse.init; 740 } 741 if (m_serverResponse) { 742 m_serverResponse.finalize(); 743 m_serverResponse = HTTPServerResponse.init; 744 } 745 }); 746 747 m_reader.join(); 748 749 () @trusted { destroy(m_conn); } (); 750 m_conn = ConnectionStream.init; 751 } 752 } 753 754 /** 755 Receives a new message and returns its contents as a newly allocated data array. 756 757 Params: 758 strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise. 759 Throws: WebSocketException if the connection is closed or 760 if $(D strict == true) and the frame received is not the right type 761 */ 762 ubyte[] receiveBinary(bool strict = true) 763 { 764 ubyte[] ret; 765 receive((scope message){ 766 enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary, 767 "Expected a binary message, got "~message.frameOpcode.to!string()); 768 ret = message.readAll(); 769 }); 770 return ret; 771 } 772 /// ditto 773 string receiveText(bool strict = true) 774 { 775 string ret; 776 receive((scope message){ 777 enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text, 778 "Expected a text message, got "~message.frameOpcode.to!string()); 779 ret = message.readAllUTF8(); 780 }); 781 return ret; 782 } 783 784 /** 785 Receives a new message using an InputStream. 786 Throws: WebSocketException if the connection is closed. 787 */ 788 void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver) 789 { 790 m_readMutex.performLocked!({ 791 while (!m_nextMessage) { 792 enforce!WebSocketException(connected, "Connection closed while reading message."); 793 m_readCondition.wait(); 794 } 795 receiver(m_nextMessage); 796 m_nextMessage = null; 797 m_readCondition.notifyAll(); 798 }); 799 } 800 801 private void startReader() 802 nothrow { 803 try m_readMutex.performLocked!({}); //Wait until initialization 804 catch (Exception e) { 805 logException(e, "WebSocket reader task failed to wait for initialization"); 806 try m_conn.close(); 807 catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure"); 808 m_closeCode = WebSocketCloseReason.abnormalClosure; 809 try m_readCondition.notifyAll(); 810 catch (Exception e) assert(false, e.msg); 811 return; 812 } 813 814 try { 815 loop: 816 while (!m_conn.empty) { 817 assert(!m_nextMessage); 818 /*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng); 819 820 switch (msg.frameOpcode) { 821 default: throw new WebSocketException("unknown frame opcode"); 822 case FrameOpcode.ping: 823 send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong); 824 break; 825 case FrameOpcode.pong: 826 // test if pong matches previous ping 827 if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) { 828 logDebugV("Received PONG that doesn't match previous ping."); 829 break; 830 } 831 logDebugV("Received matching PONG."); 832 m_pongReceived = true; 833 break; 834 case FrameOpcode.close: 835 logDebug("Got closing frame (%s)", m_sentCloseFrame); 836 837 // If no close code was passed, we default to 1005 838 this.m_closeCode = WebSocketCloseReason.noStatusReceived; 839 840 // If provided in the frame, attempt to parse the close code/reason 841 if (msg.peek().length >= short.sizeof) { 842 this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]); 843 844 if (msg.peek().length > short.sizeof) { 845 this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$]; 846 } 847 } 848 849 if(!m_sentCloseFrame) close(); 850 logDebug("Terminating connection (%s)", m_sentCloseFrame); 851 break loop; 852 case FrameOpcode.text: 853 case FrameOpcode.binary: 854 case FrameOpcode.continuation: // FIXME: add proper support for continuation frames! 855 m_readMutex.performLocked!({ 856 m_nextMessage = msg; 857 m_readCondition.notifyAll(); 858 while (m_nextMessage) m_readCondition.wait(); 859 }); 860 break; 861 } 862 } 863 } catch (Exception e) { 864 logDiagnostic("Error while reading websocket message: %s", e.msg); 865 logDiagnostic("Closing connection."); 866 } 867 868 // If no close code was passed, e.g. this was an unclean termination 869 // of our websocket connection, set the close code to 1006. 870 if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure; 871 872 try m_conn.close(); 873 catch (Exception e) logException(e, "Failed to close WebSocket connection"); 874 try m_readCondition.notifyAll(); 875 catch (Exception e) assert(false, e.msg); 876 } 877 878 private void sendPing() 879 nothrow { 880 try { 881 if (!m_pongReceived) { 882 logDebug("Pong skipped. Closing connection."); 883 close(); 884 m_pingTimer.stop(); 885 return; 886 } 887 m_pongReceived = false; 888 send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping); 889 logDebugV("Ping sent"); 890 } catch (Exception e) { 891 logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg); 892 } 893 } 894 } 895 896 /** 897 Represents a single outgoing _WebSocket message as an OutputStream. 898 */ 899 final class OutgoingWebSocketMessage : OutputStream { 900 @safe: 901 private { 902 RandomNumberStream m_rng; 903 Stream m_conn; 904 FrameOpcode m_frameOpcode; 905 Appender!(ubyte[]) m_buffer; 906 bool m_finalized = false; 907 } 908 909 private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng) 910 { 911 assert(conn !is null); 912 m_conn = conn; 913 m_frameOpcode = frameOpcode; 914 m_rng = rng; 915 } 916 917 size_t write(in ubyte[] bytes, IOMode mode) 918 { 919 assert(!m_finalized); 920 921 if (!m_buffer.data.length) { 922 ubyte[Frame.maxHeaderSize] header_padding; 923 m_buffer.put(header_padding[]); 924 } 925 926 m_buffer.put(bytes); 927 return bytes.length; 928 } 929 930 void flush() 931 { 932 assert(!m_finalized); 933 if (m_buffer.data.length > 0) 934 sendFrame(false); 935 } 936 937 void finalize() 938 { 939 if (m_finalized) return; 940 m_finalized = true; 941 sendFrame(true); 942 } 943 944 private void sendFrame(bool fin) 945 { 946 if (!m_buffer.data.length) 947 write(null, IOMode.once); 948 949 assert(m_buffer.data.length >= Frame.maxHeaderSize); 950 951 Frame frame; 952 frame.fin = fin; 953 frame.opcode = m_frameOpcode; 954 frame.payload = m_buffer.data[Frame.maxHeaderSize .. $]; 955 auto hsize = frame.getHeaderSize(m_rng !is null); 956 auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $]; 957 frame.writeHeader(msg[0 .. hsize], m_rng); 958 m_conn.write(msg); 959 m_conn.flush(); 960 m_buffer.clear(); 961 } 962 963 alias write = OutputStream.write; 964 } 965 966 967 /** 968 Represents a single incoming _WebSocket message as an InputStream. 969 */ 970 final class IncomingWebSocketMessage : InputStream { 971 @safe: 972 private { 973 RandomNumberStream m_rng; 974 Stream m_conn; 975 Frame m_currentFrame; 976 } 977 978 private this(Stream conn, RandomNumberStream rng) 979 { 980 assert(conn !is null); 981 m_conn = conn; 982 m_rng = rng; 983 skipFrame(); // reads the first frame 984 } 985 986 @property bool empty() const { return m_currentFrame.payload.length == 0; } 987 988 @property ulong leastSize() const { return m_currentFrame.payload.length; } 989 990 @property bool dataAvailableForRead() { return true; } 991 992 /// The frame type for this nessage; 993 @property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; } 994 995 const(ubyte)[] peek() { return m_currentFrame.payload; } 996 997 /** 998 * Retrieve the next websocket frame of the stream and discard the current 999 * one 1000 * 1001 * This function is helpful if one wish to process frames by frames, 1002 * or minimize memory allocation, as `peek` will only return the current 1003 * frame data, and read requires a pre-allocated buffer. 1004 * 1005 * Returns: 1006 * `false` if the current frame is the final one, `true` if a new frame 1007 * was read. 1008 */ 1009 bool skipFrame() 1010 { 1011 if (m_currentFrame.fin) 1012 return false; 1013 1014 m_currentFrame = Frame.readFrame(m_conn); 1015 return true; 1016 } 1017 1018 size_t read(scope ubyte[] dst, IOMode mode) 1019 { 1020 size_t nread = 0; 1021 1022 while (dst.length > 0) { 1023 enforce!WebSocketException(!empty , "cannot read from empty stream"); 1024 enforce!WebSocketException(leastSize > 0, "no data available" ); 1025 1026 import std.algorithm : min; 1027 auto sz = cast(size_t)min(leastSize, dst.length); 1028 dst[0 .. sz] = m_currentFrame.payload[0 .. sz]; 1029 dst = dst[sz .. $]; 1030 m_currentFrame.payload = m_currentFrame.payload[sz .. $]; 1031 nread += sz; 1032 1033 if (leastSize == 0) { 1034 if (mode == IOMode.immediate || mode == IOMode.once && nread > 0) 1035 break; 1036 this.skipFrame(); 1037 } 1038 } 1039 1040 return nread; 1041 } 1042 1043 alias read = InputStream.read; 1044 } 1045 1046 /// Magic string defined by the RFC for challenging the server during upgrade 1047 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 1048 1049 1050 /** 1051 * The Opcode is 4 bits, as defined in Section 5.2 1052 * 1053 * Values are defined in section 11.8 1054 * Currently only 6 values are defined, however the opcode is defined as 1055 * taking 4 bits. 1056 */ 1057 public enum FrameOpcode : ubyte { 1058 continuation = 0x0, 1059 text = 0x1, 1060 binary = 0x2, 1061 close = 0x8, 1062 ping = 0x9, 1063 pong = 0xA 1064 } 1065 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits"); 1066 1067 1068 private struct Frame { 1069 @safe: 1070 enum maxHeaderSize = 14; 1071 1072 bool fin; 1073 FrameOpcode opcode; 1074 ubyte[] payload; 1075 1076 /** 1077 * Return the header length encoded with the expected amount of bits 1078 * 1079 * The WebSocket RFC define a variable-length payload length. 1080 * In short, it means that: 1081 * - If the length is <= 125, it is stored as the 7 least significant 1082 * bits of the second header byte. The first bit is reserved for MASK. 1083 * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of 1084 * 126 is stored in the aforementioned 7 bits, and the actual length 1085 * is stored in the next two bytes, resulting in a 4 bytes header 1086 * ( + masking key, if any). 1087 * - If the length is > 65_536, a magic value of 127 will be used for 1088 * the 7-bit field, and the next 8 bytes are expected to be the length, 1089 * resulting in a 10 bytes header ( + masking key, if any). 1090 * 1091 * Those functions encapsulate all this logic and allow to just get the 1092 * length with the desired size. 1093 * 1094 * Return: 1095 * - For `ubyte`, the value to store in the 7 bits field, either the 1096 * length or a magic value (126 or 127). 1097 * - For `ushort`, a value in the range [126; 65_536]. 1098 * If payload.length is not in this bound, an assertion will be triggered. 1099 * - For `ulong`, a value in the range [65_537; size_t.max]. 1100 * If payload.length is not in this bound, an assertion will be triggered. 1101 */ 1102 size_t getHeaderSize(bool mask) 1103 { 1104 size_t ret = 1; 1105 if (payload.length < 126) ret += 1; 1106 else if (payload.length < 65536) ret += 3; 1107 else ret += 9; 1108 if (mask) ret += 4; 1109 return ret; 1110 } 1111 1112 void writeHeader(ubyte[] dst, RandomNumberStream sys_rng) 1113 { 1114 ubyte[4] buff; 1115 ubyte firstByte = cast(ubyte)opcode; 1116 if (fin) firstByte |= 0x80; 1117 dst[0] = firstByte; 1118 dst = dst[1 .. $]; 1119 1120 auto b1 = sys_rng ? 0x80 : 0x00; 1121 1122 if (payload.length < 126) { 1123 dst[0] = cast(ubyte)(b1 | payload.length); 1124 dst = dst[1 .. $]; 1125 } else if (payload.length < 65536) { 1126 dst[0] = cast(ubyte) (b1 | 126); 1127 dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length); 1128 dst = dst[3 .. $]; 1129 } else { 1130 dst[0] = cast(ubyte) (b1 | 127); 1131 dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length); 1132 dst = dst[9 .. $]; 1133 } 1134 1135 if (sys_rng) { 1136 sys_rng.read(dst[0 .. 4]); 1137 for (size_t i = 0; i < payload.length; i++) 1138 payload[i] ^= dst[i % 4]; 1139 } 1140 } 1141 1142 static Frame readFrame(InputStream stream) 1143 { 1144 Frame frame; 1145 ubyte[8] data; 1146 1147 stream.read(data[0 .. 2]); 1148 frame.fin = (data[0] & 0x80) != 0; 1149 frame.opcode = cast(FrameOpcode)(data[0] & 0x0F); 1150 1151 bool masked = !!(data[1] & 0b1000_0000); 1152 1153 //parsing length 1154 ulong length = data[1] & 0b0111_1111; 1155 if (length == 126) { 1156 stream.read(data[0 .. 2]); 1157 length = bigEndianToNative!ushort(data[0 .. 2]); 1158 } else if (length == 127) { 1159 stream.read(data); 1160 length = bigEndianToNative!ulong(data); 1161 1162 // RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes 1163 // interpreted as a 64-bit unsigned integer (the most significant 1164 // bit MUST be 0) 1165 enforce!WebSocketException(!(length >> 63), 1166 "Received length has a non-zero most significant bit"); 1167 1168 } 1169 logDebug("Read frame: %s %s %s length=%d", 1170 frame.opcode, 1171 frame.fin ? "final frame" : "continuation", 1172 masked ? "masked" : "not masked", 1173 length); 1174 1175 // Masking key is 32 bits / uint 1176 if (masked) 1177 stream.read(data[0 .. 4]); 1178 1179 // Read payload 1180 // TODO: Provide a way to limit the size read, easy 1181 // DOS for server code here (rejectedsoftware/vibe.d#1496). 1182 enforce!WebSocketException(length <= size_t.max); 1183 frame.payload = new ubyte[](cast(size_t)length); 1184 stream.read(frame.payload); 1185 1186 //de-masking 1187 if (masked) 1188 foreach (size_t i; 0 .. cast(size_t)length) 1189 frame.payload[i] = frame.payload[i] ^ data[i % 4]; 1190 1191 return frame; 1192 } 1193 } 1194 1195 unittest { 1196 import std.algorithm.searching : all; 1197 1198 final class DummyRNG : RandomNumberStream { 1199 @safe: 1200 @property bool empty() { return false; } 1201 @property ulong leastSize() { return ulong.max; } 1202 @property bool dataAvailableForRead() { return true; } 1203 const(ubyte)[] peek() { return null; } 1204 size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; } 1205 alias read = RandomNumberStream.read; 1206 } 1207 1208 ubyte[14] hdrbuf; 1209 auto rng = new DummyRNG; 1210 1211 Frame f; 1212 f.payload = new ubyte[125]; 1213 1214 assert(f.getHeaderSize(false) == 2); 1215 hdrbuf[] = 0; 1216 f.writeHeader(hdrbuf[0 .. 2], null); 1217 assert(hdrbuf[0 .. 2] == [0, 125]); 1218 1219 assert(f.getHeaderSize(true) == 6); 1220 hdrbuf[] = 0; 1221 f.writeHeader(hdrbuf[0 .. 6], rng); 1222 assert(hdrbuf[0 .. 2] == [0, 128|125]); 1223 assert(hdrbuf[2 .. 6].all!(b => b == 13)); 1224 1225 f.payload = new ubyte[126]; 1226 assert(f.getHeaderSize(false) == 4); 1227 hdrbuf[] = 0; 1228 f.writeHeader(hdrbuf[0 .. 4], null); 1229 assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]); 1230 1231 assert(f.getHeaderSize(true) == 8); 1232 hdrbuf[] = 0; 1233 f.writeHeader(hdrbuf[0 .. 8], rng); 1234 assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]); 1235 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1236 1237 f.payload = new ubyte[65535]; 1238 assert(f.getHeaderSize(false) == 4); 1239 hdrbuf[] = 0; 1240 f.writeHeader(hdrbuf[0 .. 4], null); 1241 assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]); 1242 1243 assert(f.getHeaderSize(true) == 8); 1244 hdrbuf[] = 0; 1245 f.writeHeader(hdrbuf[0 .. 8], rng); 1246 assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]); 1247 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1248 1249 f.payload = new ubyte[65536]; 1250 assert(f.getHeaderSize(false) == 10); 1251 hdrbuf[] = 0; 1252 f.writeHeader(hdrbuf[0 .. 10], null); 1253 assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]); 1254 1255 assert(f.getHeaderSize(true) == 14); 1256 hdrbuf[] = 0; 1257 f.writeHeader(hdrbuf[0 .. 14], rng); 1258 assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]); 1259 assert(hdrbuf[10 .. 14].all!(b => b == 13)); 1260 } 1261 1262 /** 1263 * Generate a challenge key for the protocol upgrade phase. 1264 */ 1265 private string generateChallengeKey(scope RandomNumberStream rng) 1266 { 1267 ubyte[16] buffer; 1268 rng.read(buffer); 1269 return Base64.encode(buffer); 1270 } 1271 1272 private string computeAcceptKey(string challengekey) 1273 { 1274 immutable(ubyte)[] b = challengekey.representation; 1275 immutable(ubyte)[] a = s_webSocketGuid.representation; 1276 SHA1 hash; 1277 hash.start(); 1278 hash.put(b); 1279 hash.put(a); 1280 auto result = Base64.encode(hash.finish()); 1281 return to!(string)(result); 1282 }