1 /** 2 Implements WebSocket support and fallbacks for older browsers. 3 4 Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455) 5 Copyright: © 2012-2014 Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Jan Krüger 8 */ 9 module vibe.http.websockets; 10 11 /// 12 @safe unittest { 13 void handleConn(scope WebSocket sock) 14 { 15 // simple echo server 16 while (sock.connected) { 17 auto msg = sock.receiveText(); 18 sock.send(msg); 19 } 20 } 21 22 void startServer() 23 { 24 import vibe.http.router; 25 auto router = new URLRouter; 26 router.get("/ws", handleWebSockets(&handleConn)); 27 28 // Start HTTP server using listenHTTP()... 29 } 30 } 31 32 import vibe.core.core; 33 import vibe.core.log; 34 import vibe.core.net; 35 import vibe.core.sync; 36 import vibe.stream.operations; 37 import vibe.http.server; 38 import vibe.http.client; 39 import vibe.core.connectionpool; 40 import vibe.utils.array; 41 42 import core.time; 43 import std.algorithm: equal, splitter; 44 import std.array; 45 import std.base64; 46 import std.conv; 47 import std.exception; 48 import std.bitmanip; 49 import std.digest.sha; 50 import std.string; 51 import std.functional; 52 import std.uuid; 53 import std.base64; 54 import std.digest.sha; 55 import std.uni: asLowerCase; 56 import vibe.crypto.cryptorand; 57 58 @safe: 59 60 61 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow; 62 63 64 /// Exception thrown by $(D vibe.http.websockets). 65 class WebSocketException: Exception 66 { 67 @safe pure nothrow: 68 69 /// 70 this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 71 { 72 super(msg, file, line, next); 73 } 74 75 /// 76 this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__) 77 { 78 super(msg, next, file, line); 79 } 80 } 81 82 /** Establishes a WebSocket connection at the specified endpoint. 83 */ 84 WebSocket connectWebSocketEx(URL url, 85 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 86 const(HTTPClientSettings) settings = defaultSettings) 87 @safe { 88 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 89 url.schema = use_tls ? "https" : "http"; 90 91 auto rng = secureRNG(); 92 auto challengeKey = generateChallengeKey(rng); 93 auto answerKey = computeAcceptKey(challengeKey); 94 auto res = requestHTTP(url, (scope req){ 95 req.method = HTTPMethod.GET; 96 req.headers["Upgrade"] = "websocket"; 97 req.headers["Connection"] = "Upgrade"; 98 req.headers["Sec-WebSocket-Version"] = "13"; 99 req.headers["Sec-WebSocket-Key"] = challengeKey; 100 request_modifier(req); 101 }, settings); 102 103 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 104 105 auto key = "sec-websocket-accept" in res.headers; 106 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 107 enforce(*key == answerKey, "Response has wrong accept key"); 108 auto conn = res.switchProtocol("websocket"); 109 return new WebSocket(conn, rng, res); 110 } 111 112 /// ditto 113 void connectWebSocketEx(URL url, 114 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 115 scope WebSocketHandshakeDelegate del, 116 const(HTTPClientSettings) settings = defaultSettings) 117 @safe { 118 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 119 url.schema = use_tls ? "https" : "http"; 120 121 /*scope*/auto rng = secureRNG(); 122 auto challengeKey = generateChallengeKey(rng); 123 auto answerKey = computeAcceptKey(challengeKey); 124 125 requestHTTP(url, 126 (scope req) { 127 req.method = HTTPMethod.GET; 128 req.headers["Upgrade"] = "websocket"; 129 req.headers["Connection"] = "Upgrade"; 130 req.headers["Sec-WebSocket-Version"] = "13"; 131 req.headers["Sec-WebSocket-Key"] = challengeKey; 132 request_modifier(req); 133 }, 134 (scope res) { 135 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 136 auto key = "sec-websocket-accept" in res.headers; 137 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 138 enforce(*key == answerKey, "Response has wrong accept key"); 139 res.switchProtocol("websocket", (scope conn) @trusted { 140 scope ws = new WebSocket(conn, rng, res); 141 del(ws); 142 if (ws.connected) ws.close(); 143 }); 144 }, 145 settings 146 ); 147 } 148 149 /// ditto 150 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings) 151 @safe { 152 return connectWebSocketEx(url, (scope req) {}, settings); 153 } 154 /// ditto 155 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings) 156 @safe { 157 connectWebSocketEx(url, (scope req) {}, del, settings); 158 } 159 /// ditto 160 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings) 161 @system { 162 connectWebSocket(url, (scope ws) nothrow { 163 try del(ws); 164 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 165 }, settings); 166 } 167 /// Scheduled for deprecation - use a `@safe` callback instead. 168 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings) 169 @system { 170 connectWebSocket(url, (scope ws) @trusted => del(ws), settings); 171 } 172 /// Scheduled for deprecation - use a `nothrow` callback instead. 173 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings) 174 @safe { 175 connectWebSocket(url, (scope ws) nothrow { 176 try del(ws); 177 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 178 }, settings); 179 } 180 181 182 /** 183 Establishes a web socket conection and passes it to the $(D on_handshake) delegate. 184 */ 185 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 186 @safe { 187 auto pUpgrade = "Upgrade" in req.headers; 188 auto pConnection = "Connection" in req.headers; 189 auto pKey = "Sec-WebSocket-Key" in req.headers; 190 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 191 auto pVersion = "Sec-WebSocket-Version" in req.headers; 192 193 auto isUpgrade = false; 194 195 if( pConnection ) { 196 auto connectionTypes = splitter(*pConnection, ","); 197 foreach( t ; connectionTypes ) { 198 if( t.strip().asLowerCase().equal("upgrade") ) { 199 isUpgrade = true; 200 break; 201 } 202 } 203 } 204 205 string req_error; 206 if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests."; 207 else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header."; 208 else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported."; 209 else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header."; 210 211 if (req_error.length) { 212 logDebug("Browser sent invalid WebSocket request: %s", req_error); 213 res.statusCode = HTTPStatus.badRequest; 214 res.writeBody(req_error); 215 return; 216 } 217 218 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 219 res.headers["Sec-WebSocket-Accept"] = accept; 220 res.headers["Connection"] = "Upgrade"; 221 ConnectionStream conn = res.switchProtocol("websocket"); 222 223 // NOTE: silencing scope warning here - WebSocket references the scoped 224 // req/res objects throughout its lifetime, which has a narrower scope 225 scope socket = () @trusted { return new WebSocket(conn, req, res); } (); 226 try { 227 on_handshake(socket); 228 } catch (Exception e) { 229 logDiagnostic("WebSocket handler failed: %s", e.msg); 230 } 231 socket.close(); 232 } 233 /// Scheduled for deprecation - use a `@safe` callback instead. 234 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 235 @system { 236 handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res); 237 } 238 /// Scheduled for deprecation - use a `nothrow` callback instead. 239 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 240 { 241 handleWebSocket((scope ws) nothrow { 242 try on_handshake(ws); 243 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 244 }, req, res); 245 } 246 /// ditto 247 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 248 @system { 249 handleWebSocket((scope ws) nothrow { 250 try on_handshake(ws); 251 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 252 }, req, res); 253 } 254 255 256 /** 257 Returns a HTTP request handler that establishes web socket conections. 258 */ 259 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake) 260 @safe { 261 return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ()); 262 } 263 /// ditto 264 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake) 265 @safe { 266 void callback(scope HTTPServerRequest req, scope HTTPServerResponse res) 267 @safe { 268 auto pUpgrade = "Upgrade" in req.headers; 269 auto pConnection = "Connection" in req.headers; 270 auto pKey = "Sec-WebSocket-Key" in req.headers; 271 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 272 auto pVersion = "Sec-WebSocket-Version" in req.headers; 273 274 auto isUpgrade = false; 275 276 if( pConnection ) { 277 auto connectionTypes = splitter(*pConnection, ","); 278 foreach( t ; connectionTypes ) { 279 if( t.strip().asLowerCase().equal("upgrade") ) { 280 isUpgrade = true; 281 break; 282 } 283 } 284 } 285 if( !(isUpgrade && 286 pUpgrade && icmp(*pUpgrade, "websocket") == 0 && 287 pKey && 288 pVersion && *pVersion == "13") ) 289 { 290 logDebug("Browser sent invalid WebSocket request."); 291 res.statusCode = HTTPStatus.badRequest; 292 res.writeVoidBody(); 293 return; 294 } 295 296 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 297 res.headers["Sec-WebSocket-Accept"] = accept; 298 res.headers["Connection"] = "Upgrade"; 299 res.switchProtocol("websocket", (scope conn) { 300 // TODO: put back 'scope' once it is actually enforced by DMD 301 /*scope*/ auto socket = new WebSocket(conn, req, res); 302 try on_handshake(socket); 303 catch (Exception e) { 304 logDiagnostic("WebSocket handler failed: %s", e.msg); 305 } 306 socket.close(); 307 }); 308 } 309 return &callback; 310 } 311 /// Scheduled for deprecation - use a `@safe` callback instead. 312 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake) 313 @system { 314 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 315 } 316 /// Scheduled for deprecation - use a `@safe` callback instead. 317 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake) 318 @system { 319 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 320 } 321 /// Scheduled for deprecation - use a `nothrow` callback instead. 322 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake) 323 { 324 return handleWebSockets(delegate (scope ws) nothrow { 325 try on_handshake(ws); 326 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 327 }); 328 } 329 /// ditto 330 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake) 331 { 332 return handleWebSockets(delegate (scope ws) nothrow { 333 try on_handshake(ws); 334 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 335 }); 336 } 337 /// ditto 338 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake) 339 @system { 340 return handleWebSockets(delegate (scope ws) nothrow { 341 try on_handshake(ws); 342 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 343 }); 344 } 345 /// ditto 346 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake) 347 @system { 348 return handleWebSockets(delegate (scope ws) nothrow { 349 try on_handshake(ws); 350 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 351 }); 352 } 353 354 /** 355 * Provides the reason that a websocket connection has closed. 356 * 357 * Further documentation for the WebSocket and it's codes can be found from: 358 * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent 359 * 360 * --- 361 * 362 * void echoSocket(scope WebSocket sock) 363 * { 364 * import std.datetime : seconds; 365 * 366 * while(sock.waitForData(3.seconds)) 367 * { 368 * string msg = sock.receiveText; 369 * logInfo("Got a message: %s", msg); 370 * sock.send(msg); 371 * } 372 * 373 * if(sock.connected) 374 * sock.close(WebSocketCloseReason.policyViolation, "timeout"); 375 * } 376 * --- 377 */ 378 enum WebSocketCloseReason : short 379 { 380 none = 0, 381 normalClosure = 1000, 382 goingAway = 1001, 383 protocolError = 1002, 384 unsupportedData = 1003, 385 noStatusReceived = 1005, 386 abnormalClosure = 1006, 387 invalidFramePayloadData = 1007, 388 policyViolation = 1008, 389 messageTooBig = 1009, 390 internalError = 1011, 391 serviceRestart = 1012, 392 tryAgainLater = 1013, 393 badGateway = 1014, 394 tlsHandshake = 1015 395 } 396 397 string closeReasonString(WebSocketCloseReason reason) @nogc @safe 398 { 399 import std.math : floor; 400 401 //round down to the nearest thousand to get category 402 switch(cast(short)(cast(float)reason / 1000f).floor) 403 { 404 case 0: 405 return "Reserved and Unused"; 406 case 1: 407 switch(reason) 408 { 409 case 1000: 410 return "Normal Closure"; 411 case 1001: 412 return "Going Away"; 413 case 1002: 414 return "Protocol Error"; 415 case 1003: 416 return "Unsupported Data"; 417 case 1004: 418 return "RESERVED"; 419 case 1005: 420 return "No Status Recvd"; 421 case 1006: 422 return "Abnormal Closure"; 423 case 1007: 424 return "Invalid Frame Payload Data"; 425 case 1008: 426 return "Policy Violation"; 427 case 1009: 428 return "Message Too Big"; 429 case 1010: 430 return "Missing Extension"; 431 case 1011: 432 return "Internal Error"; 433 case 1012: 434 return "Service Restart"; 435 case 1013: 436 return "Try Again Later"; 437 case 1014: 438 return "Bad Gateway"; 439 case 1015: 440 return "TLS Handshake"; 441 default: 442 return "RESERVED"; 443 } 444 case 2: 445 return "Reserved for extensions"; 446 case 3: 447 return "Available for frameworks and libraries"; 448 case 4: 449 return "Available for applications"; 450 default: 451 return "UNDEFINED - Nasal Demons"; 452 } 453 } 454 455 unittest 456 { 457 assert((cast(WebSocketCloseReason) 0).closeReasonString == "Reserved and Unused"); 458 assert((cast(WebSocketCloseReason) 1).closeReasonString == "Reserved and Unused"); 459 assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure"); 460 assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure"); 461 assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED"); 462 assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions"); 463 assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries"); 464 assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications"); 465 assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons"); 466 assert((cast(WebSocketCloseReason) -1).closeReasonString == "UNDEFINED - Nasal Demons"); 467 468 //check the other spec cases 469 for(short i = 1000; i < 1017; i++) 470 { 471 if(i == 1004 || i > 1015) 472 { 473 assert( 474 (cast(WebSocketCloseReason)i).closeReasonString == "RESERVED", 475 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 476 ); 477 } 478 else 479 assert( 480 (cast(WebSocketCloseReason)i).closeReasonString != "RESERVED", 481 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 482 ); 483 } 484 } 485 486 487 /** 488 * Represents a single _WebSocket connection. 489 * 490 * --- 491 * int main (string[] args) 492 * { 493 * auto taskHandle = runTask(() => connectToWS()); 494 * return runApplication(&args); 495 * } 496 * 497 * void connectToWS () 498 * { 499 * auto ws_url = URL("wss://websockets.example.com/websocket/auth_token"); 500 * auto ws = connectWebSocket(ws_url); 501 * logInfo("WebSocket connected"); 502 * 503 * while (ws.waitForData()) 504 * { 505 * auto txt = ws.receiveText; 506 * logInfo("Received: %s", txt); 507 * } 508 * logFatal("Connection lost!"); 509 * } 510 * --- 511 */ 512 final class WebSocket { 513 @safe: 514 515 private { 516 ConnectionStream m_conn; 517 bool m_sentCloseFrame = false; 518 IncomingWebSocketMessage m_nextMessage = null; 519 const HTTPServerRequest m_request; 520 HTTPServerResponse m_serverResponse; 521 HTTPClientResponse m_clientResponse; 522 Task m_reader; 523 Task m_ownerTask; 524 InterruptibleTaskMutex m_readMutex, m_writeMutex; 525 InterruptibleTaskCondition m_readCondition; 526 Timer m_pingTimer; 527 uint m_lastPingIndex; 528 bool m_pongReceived; 529 short m_closeCode; 530 const(char)[] m_closeReason; 531 /// The entropy generator to use 532 /// If not null, it means this is a server socket. 533 RandomNumberStream m_rng; 534 } 535 536 scope: 537 538 /** 539 * Private constructor, called from `connectWebSocket`. 540 * 541 * Params: 542 * conn = Underlying connection string 543 * request = HTTP request used to establish the connection 544 * rng = Source of entropy to use. If null, assume we're a server socket 545 * client_res = For client sockets, the response object (keeps the http client locked until the socket is done) 546 */ 547 private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res) 548 { 549 m_ownerTask = Task.getThis(); 550 m_conn = conn; 551 m_request = request; 552 m_clientResponse = client_res; 553 m_serverResponse = server_res; 554 assert(m_conn); 555 m_rng = rng; 556 m_writeMutex = new InterruptibleTaskMutex; 557 m_readMutex = new InterruptibleTaskMutex; 558 m_readCondition = new InterruptibleTaskCondition(m_readMutex); 559 m_readMutex.performLocked!({ 560 // NOTE: Silencing scope warning here - m_reader MUST be stopped 561 // before the end of the lifetime of the WebSocket object, 562 // which is done in the mandatory call to close(). 563 // The same goes for m_pingTimer below. 564 m_reader = () @trusted { return runTask(&startReader); } (); 565 if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) { 566 m_pongReceived = true; 567 m_pingTimer = () @trusted { return setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true); } (); 568 } 569 }); 570 } 571 572 private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res) 573 { 574 this(conn, null, null, rng, client_res); 575 } 576 577 private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res) 578 { 579 this(conn, request, res, null, null); 580 } 581 582 /** 583 Determines if the WebSocket connection is still alive and ready for sending. 584 585 Note that for determining the ready state for $(EM reading), you need 586 to use $(D waitForData) instead, because both methods can return 587 different values while a disconnect is in proress. 588 589 See_also: $(D waitForData) 590 */ 591 @property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; } 592 593 /** 594 Returns the close code sent by the remote end. 595 596 Note if the connection was never opened, is still alive, or was closed 597 locally this value will be 0. If no close code was given by the remote 598 end in the close frame, the value will be 1005. If the connection was 599 not closed cleanly by the remote end, this value will be 1006. 600 */ 601 @property short closeCode() { return m_closeCode; } 602 603 /** 604 Returns the close reason sent by the remote end. 605 606 Note if the connection was never opened, is still alive, or was closed 607 locally this value will be an empty string. 608 */ 609 @property const(char)[] closeReason() { return m_closeReason; } 610 611 /** 612 The HTTP request that established the web socket connection. 613 */ 614 @property const(HTTPServerRequest) request() const { return m_request; } 615 616 /** 617 Checks if data is readily available for read. 618 */ 619 @property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; } 620 621 /** Waits until either a message arrives or until the connection is closed. 622 623 This function can be used in a read loop to cleanly determine when to stop reading. 624 */ 625 bool waitForData() 626 { 627 if (m_nextMessage) return true; 628 629 m_readMutex.performLocked!({ 630 while (connected && m_nextMessage is null) 631 m_readCondition.wait(); 632 }); 633 return m_nextMessage !is null; 634 } 635 636 /// ditto 637 bool waitForData(Duration timeout) 638 { 639 import std.datetime; 640 641 if (m_nextMessage) return true; 642 643 immutable limit_time = Clock.currTime(UTC()) + timeout; 644 645 m_readMutex.performLocked!({ 646 while (connected && m_nextMessage is null && timeout > 0.seconds) { 647 m_readCondition.wait(timeout); 648 timeout = limit_time - Clock.currTime(UTC()); 649 } 650 }); 651 return m_nextMessage !is null; 652 } 653 654 /** 655 Sends a text message. 656 657 On the JavaScript side, the text will be available as message.data (type string). 658 659 Throws: 660 A `WebSocketException` is thrown if the connection gets closed 661 before or during the transfer of the message. 662 */ 663 void send(scope const(char)[] data) 664 { 665 send( 666 (scope message) { message.write(cast(const ubyte[])data); }, 667 FrameOpcode.text); 668 } 669 670 /** 671 Sends a binary message. 672 673 On the JavaScript side, the text will be available as message.data (type Blob). 674 675 Throws: 676 A `WebSocketException` is thrown if the connection gets closed 677 before or during the transfer of the message. 678 */ 679 void send(in ubyte[] data) 680 { 681 send((scope message){ message.write(data); }, FrameOpcode.binary); 682 } 683 684 /** 685 Sends a message using an output stream. 686 687 Throws: 688 A `WebSocketException` is thrown if the connection gets closed 689 before or during the transfer of the message. 690 */ 691 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode) 692 { 693 m_writeMutex.performLocked!({ 694 enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed."); 695 /*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng); 696 scope(exit) message.finalize(); 697 sender(message); 698 }); 699 } 700 701 /** 702 Actively closes the connection. 703 704 Params: 705 code = Numeric code indicating a termination reason. 706 reason = Message describing why the connection was terminated. 707 */ 708 void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "") 709 { 710 import std.algorithm.comparison : min; 711 if(reason !is null && reason.length == 0) 712 reason = (cast(WebSocketCloseReason)code).closeReasonString; 713 714 //control frame payloads are limited to 125 bytes 715 version(assert) 716 assert(reason.length <= 123); 717 else 718 reason = reason[0 .. min($, 123)]; 719 720 if (connected) { 721 try { 722 send((scope msg) { 723 m_sentCloseFrame = true; 724 if (code != 0) { 725 msg.write(std.bitmanip.nativeToBigEndian(code)); 726 msg.write(cast(const ubyte[])reason); 727 } 728 }, FrameOpcode.close); 729 } catch (Exception e) { 730 logDiagnostic("Failed to send active web socket close frame: %s", e.msg); 731 } 732 } 733 if (m_pingTimer) m_pingTimer.stop(); 734 735 736 if (Task.getThis() == m_ownerTask) { 737 m_writeMutex.performLocked!({ 738 if (m_clientResponse) { 739 m_clientResponse.disconnect(); 740 m_clientResponse = HTTPClientResponse.init; 741 } 742 if (m_serverResponse) { 743 m_serverResponse.finalize(); 744 m_serverResponse = HTTPServerResponse.init; 745 } 746 }); 747 748 m_reader.join(); 749 750 () @trusted { destroy(m_conn); } (); 751 m_conn = ConnectionStream.init; 752 } 753 } 754 755 /** 756 Receives a new message and returns its contents as a newly allocated data array. 757 758 Params: 759 strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise. 760 Throws: WebSocketException if the connection is closed or 761 if $(D strict == true) and the frame received is not the right type 762 */ 763 ubyte[] receiveBinary(bool strict = true) 764 { 765 ubyte[] ret; 766 receive((scope message){ 767 enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary, 768 "Expected a binary message, got "~message.frameOpcode.to!string()); 769 ret = message.readAll(); 770 }); 771 return ret; 772 } 773 /// ditto 774 string receiveText(bool strict = true) 775 { 776 string ret; 777 receive((scope message){ 778 enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text, 779 "Expected a text message, got "~message.frameOpcode.to!string()); 780 ret = message.readAllUTF8(); 781 }); 782 return ret; 783 } 784 785 /** 786 Receives a new message using an InputStream. 787 Throws: WebSocketException if the connection is closed. 788 */ 789 void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver) 790 { 791 m_readMutex.performLocked!({ 792 while (!m_nextMessage) { 793 enforce!WebSocketException(connected, "Connection closed while reading message."); 794 m_readCondition.wait(); 795 } 796 receiver(m_nextMessage); 797 m_nextMessage = null; 798 m_readCondition.notifyAll(); 799 }); 800 } 801 802 private void startReader() 803 nothrow { 804 try m_readMutex.performLocked!({}); //Wait until initialization 805 catch (Exception e) { 806 logException(e, "WebSocket reader task failed to wait for initialization"); 807 try m_conn.close(); 808 catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure"); 809 m_closeCode = WebSocketCloseReason.abnormalClosure; 810 try m_readCondition.notifyAll(); 811 catch (Exception e) assert(false, e.msg); 812 return; 813 } 814 815 try { 816 loop: 817 while (!m_conn.empty) { 818 assert(!m_nextMessage); 819 /*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng); 820 821 switch (msg.frameOpcode) { 822 default: throw new WebSocketException("unknown frame opcode"); 823 case FrameOpcode.ping: 824 send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong); 825 break; 826 case FrameOpcode.pong: 827 // test if pong matches previous ping 828 if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) { 829 logDebugV("Received PONG that doesn't match previous ping."); 830 break; 831 } 832 logDebugV("Received matching PONG."); 833 m_pongReceived = true; 834 break; 835 case FrameOpcode.close: 836 logDebug("Got closing frame (%s)", m_sentCloseFrame); 837 838 // If no close code was passed, we default to 1005 839 this.m_closeCode = WebSocketCloseReason.noStatusReceived; 840 841 // If provided in the frame, attempt to parse the close code/reason 842 if (msg.peek().length >= short.sizeof) { 843 this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]); 844 845 if (msg.peek().length > short.sizeof) { 846 this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$]; 847 } 848 } 849 850 if(!m_sentCloseFrame) close(); 851 logDebug("Terminating connection (%s)", m_sentCloseFrame); 852 break loop; 853 case FrameOpcode.text: 854 case FrameOpcode.binary: 855 case FrameOpcode.continuation: // FIXME: add proper support for continuation frames! 856 m_readMutex.performLocked!({ 857 m_nextMessage = msg; 858 m_readCondition.notifyAll(); 859 while (m_nextMessage) m_readCondition.wait(); 860 }); 861 break; 862 } 863 } 864 } catch (Exception e) { 865 logDiagnostic("Error while reading websocket message: %s", e.msg); 866 logDiagnostic("Closing connection."); 867 } 868 869 // If no close code was passed, e.g. this was an unclean termination 870 // of our websocket connection, set the close code to 1006. 871 if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure; 872 873 try m_conn.close(); 874 catch (Exception e) logException(e, "Failed to close WebSocket connection"); 875 try m_readCondition.notifyAll(); 876 catch (Exception e) assert(false, e.msg); 877 } 878 879 private void sendPing() 880 nothrow { 881 try { 882 if (!m_pongReceived) { 883 logDebug("Pong skipped. Closing connection."); 884 close(); 885 m_pingTimer.stop(); 886 return; 887 } 888 m_pongReceived = false; 889 send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping); 890 logDebugV("Ping sent"); 891 } catch (Exception e) { 892 logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg); 893 } 894 } 895 } 896 897 /** 898 Represents a single outgoing _WebSocket message as an OutputStream. 899 */ 900 final class OutgoingWebSocketMessage : OutputStream { 901 @safe: 902 private { 903 RandomNumberStream m_rng; 904 Stream m_conn; 905 FrameOpcode m_frameOpcode; 906 Appender!(ubyte[]) m_buffer; 907 bool m_finalized = false; 908 } 909 910 private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng) 911 { 912 assert(conn !is null); 913 m_conn = conn; 914 m_frameOpcode = frameOpcode; 915 m_rng = rng; 916 } 917 918 static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) { 919 override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); } 920 } else { 921 override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); } 922 } 923 924 alias write = OutputStream.write; 925 926 private size_t doWrite(scope const(ubyte)[] bytes, IOMode mode) 927 { 928 assert(!m_finalized); 929 930 if (!m_buffer.data.length) { 931 ubyte[Frame.maxHeaderSize] header_padding; 932 m_buffer.put(header_padding[]); 933 } 934 935 m_buffer.put(bytes); 936 return bytes.length; 937 } 938 939 void flush() 940 { 941 assert(!m_finalized); 942 if (m_buffer.data.length > 0) 943 sendFrame(false); 944 } 945 946 void finalize() 947 { 948 if (m_finalized) return; 949 m_finalized = true; 950 sendFrame(true); 951 } 952 953 private void sendFrame(bool fin) 954 { 955 if (!m_buffer.data.length) 956 write(null, IOMode.once); 957 958 assert(m_buffer.data.length >= Frame.maxHeaderSize); 959 960 Frame frame; 961 frame.fin = fin; 962 frame.opcode = m_frameOpcode; 963 frame.payload = m_buffer.data[Frame.maxHeaderSize .. $]; 964 auto hsize = frame.getHeaderSize(m_rng !is null); 965 auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $]; 966 frame.writeHeader(msg[0 .. hsize], m_rng); 967 m_conn.write(msg); 968 m_conn.flush(); 969 m_buffer.clear(); 970 } 971 972 alias write = OutputStream.write; 973 } 974 975 976 /** 977 Represents a single incoming _WebSocket message as an InputStream. 978 */ 979 final class IncomingWebSocketMessage : InputStream { 980 @safe: 981 private { 982 RandomNumberStream m_rng; 983 Stream m_conn; 984 Frame m_currentFrame; 985 } 986 987 private this(Stream conn, RandomNumberStream rng) 988 { 989 assert(conn !is null); 990 m_conn = conn; 991 m_rng = rng; 992 skipFrame(); // reads the first frame 993 } 994 995 @property bool empty() const { return m_currentFrame.payload.length == 0; } 996 997 @property ulong leastSize() const { return m_currentFrame.payload.length; } 998 999 @property bool dataAvailableForRead() { return true; } 1000 1001 /// The frame type for this nessage; 1002 @property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; } 1003 1004 const(ubyte)[] peek() { return m_currentFrame.payload; } 1005 1006 /** 1007 * Retrieve the next websocket frame of the stream and discard the current 1008 * one 1009 * 1010 * This function is helpful if one wish to process frames by frames, 1011 * or minimize memory allocation, as `peek` will only return the current 1012 * frame data, and read requires a pre-allocated buffer. 1013 * 1014 * Returns: 1015 * `false` if the current frame is the final one, `true` if a new frame 1016 * was read. 1017 */ 1018 bool skipFrame() 1019 { 1020 if (m_currentFrame.fin) 1021 return false; 1022 1023 m_currentFrame = Frame.readFrame(m_conn); 1024 return true; 1025 } 1026 1027 size_t read(scope ubyte[] dst, IOMode mode) 1028 { 1029 size_t nread = 0; 1030 1031 while (dst.length > 0) { 1032 enforce!WebSocketException(!empty , "cannot read from empty stream"); 1033 enforce!WebSocketException(leastSize > 0, "no data available" ); 1034 1035 import std.algorithm : min; 1036 auto sz = cast(size_t)min(leastSize, dst.length); 1037 dst[0 .. sz] = m_currentFrame.payload[0 .. sz]; 1038 dst = dst[sz .. $]; 1039 m_currentFrame.payload = m_currentFrame.payload[sz .. $]; 1040 nread += sz; 1041 1042 if (leastSize == 0) { 1043 if (mode == IOMode.immediate || mode == IOMode.once && nread > 0) 1044 break; 1045 this.skipFrame(); 1046 } 1047 } 1048 1049 return nread; 1050 } 1051 1052 alias read = InputStream.read; 1053 } 1054 1055 /// Magic string defined by the RFC for challenging the server during upgrade 1056 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 1057 1058 1059 /** 1060 * The Opcode is 4 bits, as defined in Section 5.2 1061 * 1062 * Values are defined in section 11.8 1063 * Currently only 6 values are defined, however the opcode is defined as 1064 * taking 4 bits. 1065 */ 1066 public enum FrameOpcode : ubyte { 1067 continuation = 0x0, 1068 text = 0x1, 1069 binary = 0x2, 1070 close = 0x8, 1071 ping = 0x9, 1072 pong = 0xA 1073 } 1074 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits"); 1075 1076 1077 private struct Frame { 1078 @safe: 1079 enum maxHeaderSize = 14; 1080 1081 bool fin; 1082 FrameOpcode opcode; 1083 ubyte[] payload; 1084 1085 /** 1086 * Return the header length encoded with the expected amount of bits 1087 * 1088 * The WebSocket RFC define a variable-length payload length. 1089 * In short, it means that: 1090 * - If the length is <= 125, it is stored as the 7 least significant 1091 * bits of the second header byte. The first bit is reserved for MASK. 1092 * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of 1093 * 126 is stored in the aforementioned 7 bits, and the actual length 1094 * is stored in the next two bytes, resulting in a 4 bytes header 1095 * ( + masking key, if any). 1096 * - If the length is > 65_536, a magic value of 127 will be used for 1097 * the 7-bit field, and the next 8 bytes are expected to be the length, 1098 * resulting in a 10 bytes header ( + masking key, if any). 1099 * 1100 * Those functions encapsulate all this logic and allow to just get the 1101 * length with the desired size. 1102 * 1103 * Return: 1104 * - For `ubyte`, the value to store in the 7 bits field, either the 1105 * length or a magic value (126 or 127). 1106 * - For `ushort`, a value in the range [126; 65_536]. 1107 * If payload.length is not in this bound, an assertion will be triggered. 1108 * - For `ulong`, a value in the range [65_537; size_t.max]. 1109 * If payload.length is not in this bound, an assertion will be triggered. 1110 */ 1111 size_t getHeaderSize(bool mask) 1112 { 1113 size_t ret = 1; 1114 if (payload.length < 126) ret += 1; 1115 else if (payload.length < 65536) ret += 3; 1116 else ret += 9; 1117 if (mask) ret += 4; 1118 return ret; 1119 } 1120 1121 void writeHeader(ubyte[] dst, RandomNumberStream sys_rng) 1122 { 1123 ubyte[4] buff; 1124 ubyte firstByte = cast(ubyte)opcode; 1125 if (fin) firstByte |= 0x80; 1126 dst[0] = firstByte; 1127 dst = dst[1 .. $]; 1128 1129 auto b1 = sys_rng ? 0x80 : 0x00; 1130 1131 if (payload.length < 126) { 1132 dst[0] = cast(ubyte)(b1 | payload.length); 1133 dst = dst[1 .. $]; 1134 } else if (payload.length < 65536) { 1135 dst[0] = cast(ubyte) (b1 | 126); 1136 dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length); 1137 dst = dst[3 .. $]; 1138 } else { 1139 dst[0] = cast(ubyte) (b1 | 127); 1140 dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length); 1141 dst = dst[9 .. $]; 1142 } 1143 1144 if (sys_rng) { 1145 sys_rng.read(dst[0 .. 4]); 1146 for (size_t i = 0; i < payload.length; i++) 1147 payload[i] ^= dst[i % 4]; 1148 } 1149 } 1150 1151 static Frame readFrame(InputStream stream) 1152 { 1153 Frame frame; 1154 ubyte[8] data; 1155 1156 stream.read(data[0 .. 2]); 1157 frame.fin = (data[0] & 0x80) != 0; 1158 frame.opcode = cast(FrameOpcode)(data[0] & 0x0F); 1159 1160 bool masked = !!(data[1] & 0b1000_0000); 1161 1162 //parsing length 1163 ulong length = data[1] & 0b0111_1111; 1164 if (length == 126) { 1165 stream.read(data[0 .. 2]); 1166 length = bigEndianToNative!ushort(data[0 .. 2]); 1167 } else if (length == 127) { 1168 stream.read(data); 1169 length = bigEndianToNative!ulong(data); 1170 1171 // RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes 1172 // interpreted as a 64-bit unsigned integer (the most significant 1173 // bit MUST be 0) 1174 enforce!WebSocketException(!(length >> 63), 1175 "Received length has a non-zero most significant bit"); 1176 1177 } 1178 logDebug("Read frame: %s %s %s length=%d", 1179 frame.opcode, 1180 frame.fin ? "final frame" : "continuation", 1181 masked ? "masked" : "not masked", 1182 length); 1183 1184 // Masking key is 32 bits / uint 1185 if (masked) 1186 stream.read(data[0 .. 4]); 1187 1188 // Read payload 1189 // TODO: Provide a way to limit the size read, easy 1190 // DOS for server code here (rejectedsoftware/vibe.d#1496). 1191 enforce!WebSocketException(length <= size_t.max); 1192 frame.payload = new ubyte[](cast(size_t)length); 1193 stream.read(frame.payload); 1194 1195 //de-masking 1196 if (masked) 1197 foreach (size_t i; 0 .. cast(size_t)length) 1198 frame.payload[i] = frame.payload[i] ^ data[i % 4]; 1199 1200 return frame; 1201 } 1202 } 1203 1204 unittest { 1205 import std.algorithm.searching : all; 1206 1207 final class DummyRNG : RandomNumberStream { 1208 @safe: 1209 @property bool empty() { return false; } 1210 @property ulong leastSize() { return ulong.max; } 1211 @property bool dataAvailableForRead() { return true; } 1212 const(ubyte)[] peek() { return null; } 1213 size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; } 1214 alias read = RandomNumberStream.read; 1215 } 1216 1217 ubyte[14] hdrbuf; 1218 auto rng = new DummyRNG; 1219 1220 Frame f; 1221 f.payload = new ubyte[125]; 1222 1223 assert(f.getHeaderSize(false) == 2); 1224 hdrbuf[] = 0; 1225 f.writeHeader(hdrbuf[0 .. 2], null); 1226 assert(hdrbuf[0 .. 2] == [0, 125]); 1227 1228 assert(f.getHeaderSize(true) == 6); 1229 hdrbuf[] = 0; 1230 f.writeHeader(hdrbuf[0 .. 6], rng); 1231 assert(hdrbuf[0 .. 2] == [0, 128|125]); 1232 assert(hdrbuf[2 .. 6].all!(b => b == 13)); 1233 1234 f.payload = new ubyte[126]; 1235 assert(f.getHeaderSize(false) == 4); 1236 hdrbuf[] = 0; 1237 f.writeHeader(hdrbuf[0 .. 4], null); 1238 assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]); 1239 1240 assert(f.getHeaderSize(true) == 8); 1241 hdrbuf[] = 0; 1242 f.writeHeader(hdrbuf[0 .. 8], rng); 1243 assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]); 1244 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1245 1246 f.payload = new ubyte[65535]; 1247 assert(f.getHeaderSize(false) == 4); 1248 hdrbuf[] = 0; 1249 f.writeHeader(hdrbuf[0 .. 4], null); 1250 assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]); 1251 1252 assert(f.getHeaderSize(true) == 8); 1253 hdrbuf[] = 0; 1254 f.writeHeader(hdrbuf[0 .. 8], rng); 1255 assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]); 1256 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1257 1258 f.payload = new ubyte[65536]; 1259 assert(f.getHeaderSize(false) == 10); 1260 hdrbuf[] = 0; 1261 f.writeHeader(hdrbuf[0 .. 10], null); 1262 assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]); 1263 1264 assert(f.getHeaderSize(true) == 14); 1265 hdrbuf[] = 0; 1266 f.writeHeader(hdrbuf[0 .. 14], rng); 1267 assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]); 1268 assert(hdrbuf[10 .. 14].all!(b => b == 13)); 1269 } 1270 1271 /** 1272 * Generate a challenge key for the protocol upgrade phase. 1273 */ 1274 private string generateChallengeKey(RandomNumberStream rng) 1275 { 1276 ubyte[16] buffer; 1277 rng.read(buffer); 1278 return Base64.encode(buffer); 1279 } 1280 1281 private string computeAcceptKey(string challengekey) 1282 { 1283 immutable(ubyte)[] b = challengekey.representation; 1284 immutable(ubyte)[] a = s_webSocketGuid.representation; 1285 SHA1 hash; 1286 hash.start(); 1287 hash.put(b); 1288 hash.put(a); 1289 auto result = Base64.encode(hash.finish()); 1290 return to!(string)(result); 1291 }