1 /** 2 Implements WebSocket support and fallbacks for older browsers. 3 4 Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455) 5 Copyright: © 2012-2014 RejectedSoftware e.K. 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Jan Krüger 8 */ 9 module vibe.http.websockets; 10 11 /// 12 @safe unittest { 13 void handleConn(scope WebSocket sock) 14 { 15 // simple echo server 16 while (sock.connected) { 17 auto msg = sock.receiveText(); 18 sock.send(msg); 19 } 20 } 21 22 void startServer() 23 { 24 import vibe.http.router; 25 auto router = new URLRouter; 26 router.get("/ws", handleWebSockets(&handleConn)); 27 28 // Start HTTP server using listenHTTP()... 29 } 30 } 31 32 import vibe.core.core; 33 import vibe.core.log; 34 import vibe.core.net; 35 import vibe.core.sync; 36 import vibe.stream.operations; 37 import vibe.http.server; 38 import vibe.http.client; 39 import vibe.core.connectionpool; 40 import vibe.utils.array; 41 42 import core.time; 43 import std.array; 44 import std.base64; 45 import std.conv; 46 import std.exception; 47 import std.bitmanip; 48 import std.digest.sha; 49 import std.string; 50 import std.functional; 51 import std.uuid; 52 import std.base64; 53 import std.digest.sha; 54 import vibe.crypto.cryptorand; 55 56 @safe: 57 58 59 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket); 60 61 62 /// Exception thrown by $(D vibe.http.websockets). 63 class WebSocketException: Exception 64 { 65 @safe pure nothrow: 66 67 /// 68 this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 69 { 70 super(msg, file, line, next); 71 } 72 73 /// 74 this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__) 75 { 76 super(msg, next, file, line); 77 } 78 } 79 80 /** 81 Returns a WebSocket client object that is connected to the specified host. 82 */ 83 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings) 84 @safe { 85 import std.typecons : Tuple, tuple; 86 87 auto host = url.host; 88 auto port = url.port; 89 bool use_tls = (url.schema == "wss") ? true : false; 90 91 if (port == 0) 92 port = (use_tls) ? 443 : 80; 93 94 static struct ConnInfo { string host; ushort port; bool useTLS; string proxyIP; ushort proxyPort; } 95 static vibe.utils.array.FixedRingBuffer!(Tuple!(ConnInfo, ConnectionPool!HTTPClient), 16) s_connections; 96 auto ckey = ConnInfo(host, port, use_tls, settings ? settings.proxyURL.host : null, settings ? settings.proxyURL.port : 0); 97 98 ConnectionPool!HTTPClient pool; 99 foreach (c; s_connections) 100 if (c[0].host == host && c[0].port == port && c[0].useTLS == use_tls && (settings is null || (c[0].proxyIP == settings.proxyURL.host && c[0].proxyPort == settings.proxyURL.port))) 101 pool = c[1]; 102 103 if (!pool) 104 { 105 logDebug("Create HTTP client pool %s:%s %s proxy %s:%d", host, port, use_tls, (settings) ? settings.proxyURL.host : string.init, (settings) ? settings.proxyURL.port : 0); 106 pool = new ConnectionPool!HTTPClient({ 107 auto ret = new HTTPClient; 108 ret.connect(host, port, use_tls, settings); 109 return ret; 110 }); 111 if (s_connections.full) 112 s_connections.popFront(); 113 s_connections.put(tuple(ckey, pool)); 114 } 115 116 auto rng = secureRNG(); 117 auto challengeKey = generateChallengeKey(rng); 118 auto answerKey = computeAcceptKey(challengeKey); 119 auto cl = pool.lockConnection(); 120 auto res = cl.request((scope req){ 121 req.requestURL = (url.localURI == "") ? "/" : url.localURI; 122 req.method = HTTPMethod.GET; 123 req.headers["Upgrade"] = "websocket"; 124 req.headers["Connection"] = "Upgrade"; 125 req.headers["Sec-WebSocket-Version"] = "13"; 126 req.headers["Sec-WebSocket-Key"] = challengeKey; 127 }); 128 129 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 130 131 auto key = "sec-websocket-accept" in res.headers; 132 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 133 enforce(*key == answerKey, "Response has wrong accept key"); 134 auto conn = res.switchProtocol("websocket"); 135 auto ws = new WebSocket(conn, null, rng); 136 return ws; 137 } 138 139 /// ditto 140 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings) 141 @safe { 142 bool use_tls = (url.schema == "wss") ? true : false; 143 url.schema = use_tls ? "https" : "http"; 144 145 /*scope*/auto rng = secureRNG(); 146 auto challengeKey = generateChallengeKey(rng); 147 auto answerKey = computeAcceptKey(challengeKey); 148 149 requestHTTP(url, 150 (scope req) { 151 req.method = HTTPMethod.GET; 152 req.headers["Upgrade"] = "websocket"; 153 req.headers["Connection"] = "Upgrade"; 154 req.headers["Sec-WebSocket-Version"] = "13"; 155 req.headers["Sec-WebSocket-Key"] = challengeKey; 156 }, 157 (scope res) { 158 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 159 auto key = "sec-websocket-accept" in res.headers; 160 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 161 enforce(*key == answerKey, "Response has wrong accept key"); 162 res.switchProtocol("websocket", (scope conn) @trusted { 163 scope ws = new WebSocket(conn, null, rng); 164 del(ws); 165 }); 166 } 167 ); 168 } 169 /// Scheduled for deprecation - use a `@safe` callback instead. 170 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings) 171 @system { 172 connectWebSocket(url, (scope ws) @trusted => del(ws), settings); 173 } 174 175 176 /** 177 Establishes a web socket conection and passes it to the $(D on_handshake) delegate. 178 */ 179 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 180 @safe { 181 auto pUpgrade = "Upgrade" in req.headers; 182 auto pConnection = "Connection" in req.headers; 183 auto pKey = "Sec-WebSocket-Key" in req.headers; 184 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 185 auto pVersion = "Sec-WebSocket-Version" in req.headers; 186 187 auto isUpgrade = false; 188 189 if( pConnection ) { 190 auto connectionTypes = split(*pConnection, ","); 191 foreach( t ; connectionTypes ) { 192 if( t.strip().toLower() == "upgrade" ) { 193 isUpgrade = true; 194 break; 195 } 196 } 197 } 198 199 string req_error; 200 if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests."; 201 else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header."; 202 else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported."; 203 else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header."; 204 205 if (req_error.length) { 206 logDebug("Browser sent invalid WebSocket request: %s", req_error); 207 res.statusCode = HTTPStatus.badRequest; 208 res.writeBody(req_error); 209 return; 210 } 211 212 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 213 res.headers["Sec-WebSocket-Accept"] = accept; 214 res.headers["Connection"] = "Upgrade"; 215 ConnectionStream conn = res.switchProtocol("websocket"); 216 217 WebSocket socket = new WebSocket(conn, req, null); 218 try { 219 on_handshake(socket); 220 } catch (Exception e) { 221 logDiagnostic("WebSocket handler failed: %s", e.msg); 222 } 223 socket.close(); 224 } 225 /// Scheduled for deprecation - use a `@safe` callback instead. 226 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 227 @system { 228 handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res); 229 } 230 231 232 /** 233 Returns a HTTP request handler that establishes web socket conections. 234 */ 235 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake) 236 @safe { 237 return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ()); 238 } 239 /// ditto 240 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake) 241 @safe { 242 void callback(scope HTTPServerRequest req, scope HTTPServerResponse res) 243 @safe { 244 auto pUpgrade = "Upgrade" in req.headers; 245 auto pConnection = "Connection" in req.headers; 246 auto pKey = "Sec-WebSocket-Key" in req.headers; 247 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 248 auto pVersion = "Sec-WebSocket-Version" in req.headers; 249 250 auto isUpgrade = false; 251 252 if( pConnection ) { 253 auto connectionTypes = split(*pConnection, ","); 254 foreach( t ; connectionTypes ) { 255 if( t.strip().toLower() == "upgrade" ) { 256 isUpgrade = true; 257 break; 258 } 259 } 260 } 261 if( !(isUpgrade && 262 pUpgrade && icmp(*pUpgrade, "websocket") == 0 && 263 pKey && 264 pVersion && *pVersion == "13") ) 265 { 266 logDebug("Browser sent invalid WebSocket request."); 267 res.statusCode = HTTPStatus.badRequest; 268 res.writeVoidBody(); 269 return; 270 } 271 272 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 273 res.headers["Sec-WebSocket-Accept"] = accept; 274 res.headers["Connection"] = "Upgrade"; 275 res.switchProtocol("websocket", (scope conn) { 276 // TODO: put back 'scope' once it is actually enforced by DMD 277 /*scope*/ auto socket = new WebSocket(conn, req, null); 278 try on_handshake(socket); 279 catch (Exception e) { 280 logDiagnostic("WebSocket handler failed: %s", e.msg); 281 } 282 socket.close(); 283 }); 284 } 285 return &callback; 286 } 287 /// Scheduled for deprecation - use a `@safe` callback instead. 288 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake) 289 @system { 290 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 291 } 292 /// Scheduled for deprecation - use a `@safe` callback instead. 293 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake) 294 @system { 295 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 296 } 297 298 299 /** 300 * Represents a single _WebSocket connection. 301 * 302 * --- 303 * shared static this () 304 * { 305 * runTask(() => connectToWS()); 306 * } 307 * 308 * void connectToWS () 309 * { 310 * auto ws_url = URL("wss://websockets.example.com/websocket/auth_token"); 311 * auto ws = connectWebSocket(ws_url); 312 * logInfo("WebSocket connected"); 313 * 314 * while (ws.waitForData()) 315 * { 316 * auto txt = ws.receiveText; 317 * logInfo("Received: %s", txt); 318 * } 319 * logFatal("Connection lost!"); 320 * } 321 * --- 322 */ 323 final class WebSocket { 324 @safe: 325 326 private { 327 ConnectionStream m_conn; 328 bool m_sentCloseFrame = false; 329 IncomingWebSocketMessage m_nextMessage = null; 330 const HTTPServerRequest m_request; 331 Task m_reader; 332 InterruptibleTaskMutex m_readMutex, m_writeMutex; 333 InterruptibleTaskCondition m_readCondition; 334 Timer m_pingTimer; 335 uint m_lastPingIndex; 336 bool m_pongReceived; 337 short m_closeCode; 338 const(char)[] m_closeReason; 339 /// The entropy generator to use 340 /// If not null, it means this is a server socket. 341 RandomNumberStream m_rng; 342 } 343 344 /** 345 * Private constructor, called from `connectWebSocket`. 346 * 347 * Params: 348 * conn = Underlying connection string 349 * request = HTTP request used to establish the connection 350 * rng = Source of entropy to use. If null, assume we're a server socket 351 */ 352 private this(ConnectionStream conn, in HTTPServerRequest request, 353 RandomNumberStream rng) 354 { 355 m_conn = conn; 356 m_request = request; 357 assert(m_conn); 358 m_rng = rng; 359 m_writeMutex = new InterruptibleTaskMutex; 360 m_readMutex = new InterruptibleTaskMutex; 361 m_readCondition = new InterruptibleTaskCondition(m_readMutex); 362 m_readMutex.performLocked!({ 363 m_reader = runTask(&startReader); 364 if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) { 365 m_pongReceived = true; 366 m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true); 367 } 368 }); 369 } 370 371 /** 372 Determines if the WebSocket connection is still alive and ready for sending. 373 374 Note that for determining the ready state for $(EM reading), you need 375 to use $(D waitForData) instead, because both methods can return 376 different values while a disconnect is in proress. 377 378 See_also: $(D waitForData) 379 */ 380 @property bool connected() { return m_conn.connected && !m_sentCloseFrame; } 381 382 /** 383 Returns the close code sent by the remote end. 384 385 Note if the connection was never opened, is still alive, or was closed 386 locally this value will be 0. If no close code was given by the remote 387 end in the close frame, the value will be 1005. If the connection was 388 not closed cleanly by the remote end, this value will be 1006. 389 */ 390 @property short closeCode() { return m_closeCode; } 391 392 /** 393 Returns the close reason sent by the remote end. 394 395 Note if the connection was never opened, is still alive, or was closed 396 locally this value will be an empty string. 397 */ 398 @property const(char)[] closeReason() { return m_closeReason; } 399 400 /** 401 The HTTP request that established the web socket connection. 402 */ 403 @property const(HTTPServerRequest) request() const { return m_request; } 404 405 /** 406 Checks if data is readily available for read. 407 */ 408 @property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; } 409 410 /** Waits until either a message arrives or until the connection is closed. 411 412 This function can be used in a read loop to cleanly determine when to stop reading. 413 */ 414 bool waitForData() 415 { 416 if (m_nextMessage) return true; 417 418 m_readMutex.performLocked!({ 419 while (connected && m_nextMessage is null) 420 m_readCondition.wait(); 421 }); 422 return m_nextMessage !is null; 423 } 424 425 /// ditto 426 bool waitForData(Duration timeout) 427 { 428 import std.datetime; 429 430 if (m_nextMessage) return true; 431 432 immutable limit_time = Clock.currTime(UTC()) + timeout; 433 434 m_readMutex.performLocked!({ 435 while (connected && m_nextMessage is null && timeout > 0.seconds) { 436 m_readCondition.wait(timeout); 437 timeout = limit_time - Clock.currTime(UTC()); 438 } 439 }); 440 return m_nextMessage !is null; 441 } 442 443 /** 444 Sends a text message. 445 446 On the JavaScript side, the text will be available as message.data (type string). 447 448 Throws: 449 A `WebSocketException` is thrown if the connection gets closed 450 before or during the transfer of the message. 451 */ 452 void send(scope const(char)[] data) 453 { 454 send( 455 (scope message) { message.write(cast(const ubyte[])data); }, 456 FrameOpcode.text); 457 } 458 459 /** 460 Sends a binary message. 461 462 On the JavaScript side, the text will be available as message.data (type Blob). 463 464 Throws: 465 A `WebSocketException` is thrown if the connection gets closed 466 before or during the transfer of the message. 467 */ 468 void send(in ubyte[] data) 469 { 470 send((scope message){ message.write(data); }, FrameOpcode.binary); 471 } 472 473 /** 474 Sends a message using an output stream. 475 476 Throws: 477 A `WebSocketException` is thrown if the connection gets closed 478 before or during the transfer of the message. 479 */ 480 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode) 481 { 482 m_writeMutex.performLocked!({ 483 enforceEx!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed."); 484 /*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng); 485 scope(exit) message.finalize(); 486 sender(message); 487 }); 488 } 489 490 /// Compatibility overload - will be removed soon. 491 deprecated("Call the overload which requires an explicit FrameOpcode.") 492 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender) 493 { 494 send(sender, FrameOpcode.text); 495 } 496 497 /** 498 Actively closes the connection. 499 500 Params: 501 code = Numeric code indicating a termination reason. 502 reason = Message describing why the connection was terminated. 503 */ 504 void close(short code = 0, scope const(char)[] reason = "") 505 { 506 //control frame payloads are limited to 125 bytes 507 assert(reason.length <= 123); 508 509 if (connected) { 510 send((scope msg) { 511 m_sentCloseFrame = true; 512 if (code != 0) 513 msg.write(std.bitmanip.nativeToBigEndian(code)); 514 msg.write(cast(const ubyte[])reason); 515 }, FrameOpcode.close); 516 } 517 if (m_pingTimer) m_pingTimer.stop(); 518 if (Task.getThis() != m_reader) m_reader.join(); 519 } 520 521 /** 522 Receives a new message and returns its contents as a newly allocated data array. 523 524 Params: 525 strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise. 526 Throws: WebSocketException if the connection is closed or 527 if $(D strict == true) and the frame received is not the right type 528 */ 529 ubyte[] receiveBinary(bool strict = true) 530 { 531 ubyte[] ret; 532 receive((scope message){ 533 enforceEx!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary, 534 "Expected a binary message, got "~message.frameOpcode.to!string()); 535 ret = message.readAll(); 536 }); 537 return ret; 538 } 539 /// ditto 540 string receiveText(bool strict = true) 541 { 542 string ret; 543 receive((scope message){ 544 enforceEx!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text, 545 "Expected a text message, got "~message.frameOpcode.to!string()); 546 ret = message.readAllUTF8(); 547 }); 548 return ret; 549 } 550 551 /** 552 Receives a new message using an InputStream. 553 Throws: WebSocketException if the connection is closed. 554 */ 555 void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver) 556 { 557 m_readMutex.performLocked!({ 558 while (!m_nextMessage) { 559 enforceEx!WebSocketException(connected, "Connection closed while reading message."); 560 m_readCondition.wait(); 561 } 562 receiver(m_nextMessage); 563 m_nextMessage = null; 564 m_readCondition.notifyAll(); 565 }); 566 } 567 568 private void startReader() 569 { 570 m_readMutex.performLocked!({}); //Wait until initialization 571 scope (exit) m_readCondition.notifyAll(); 572 try { 573 while (!m_conn.empty) { 574 assert(!m_nextMessage); 575 /*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng); 576 577 switch (msg.frameOpcode) { 578 default: throw new WebSocketException("unknown frame opcode"); 579 case FrameOpcode.ping: 580 send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong); 581 break; 582 case FrameOpcode.pong: 583 // test if pong matches previous ping 584 if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) { 585 logDebugV("Received PONG that doesn't match previous ping."); 586 break; 587 } 588 logDebugV("Received matching PONG."); 589 m_pongReceived = true; 590 break; 591 case FrameOpcode.close: 592 logDebug("Got closing frame (%s)", m_sentCloseFrame); 593 594 // If no close code was passed, we default to 1005 595 this.m_closeCode = 1005; 596 597 // If provided in the frame, attempt to parse the close code/reason 598 if (msg.peek().length >= short.sizeof) { 599 this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]); 600 601 if (msg.peek().length > short.sizeof) { 602 this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$]; 603 } 604 } 605 606 if(!m_sentCloseFrame) close(); 607 logDebug("Terminating connection (%s)", m_sentCloseFrame); 608 m_conn.close(); 609 return; 610 case FrameOpcode.text: 611 case FrameOpcode.binary: 612 case FrameOpcode.continuation: // FIXME: add proper support for continuation frames! 613 m_readMutex.performLocked!({ 614 m_nextMessage = msg; 615 m_readCondition.notifyAll(); 616 while (m_nextMessage) m_readCondition.wait(); 617 }); 618 break; 619 } 620 } 621 } catch (Exception e) { 622 logDiagnostic("Error while reading websocket message: %s", e.msg); 623 logDiagnostic("Closing connection."); 624 } 625 626 // If no close code was passed, e.g. this was an unclean termination 627 // of our websocket connection, set the close code to 1006. 628 if (this.m_closeCode == 0) this.m_closeCode = 1006; 629 m_writeMutex.performLocked!({ m_conn.close(); }); 630 } 631 632 private void sendPing() 633 nothrow { 634 try { 635 if (!m_pongReceived) { 636 logDebug("Pong skipped. Closing connection."); 637 m_writeMutex.performLocked!({ m_conn.close(); }); 638 m_pingTimer.stop(); 639 return; 640 } 641 m_pongReceived = false; 642 send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping); 643 logDebugV("Ping sent"); 644 } catch (Exception e) { 645 logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg); 646 } 647 } 648 } 649 650 /** 651 Represents a single outgoing _WebSocket message as an OutputStream. 652 */ 653 final class OutgoingWebSocketMessage : OutputStream { 654 @safe: 655 private { 656 RandomNumberStream m_rng; 657 Stream m_conn; 658 FrameOpcode m_frameOpcode; 659 Appender!(ubyte[]) m_buffer; 660 bool m_finalized = false; 661 } 662 663 private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng) 664 { 665 assert(conn !is null); 666 m_conn = conn; 667 m_frameOpcode = frameOpcode; 668 m_rng = rng; 669 } 670 671 size_t write(in ubyte[] bytes, IOMode mode) 672 { 673 assert(!m_finalized); 674 675 if (!m_buffer.data.length) { 676 ubyte[Frame.maxHeaderSize] header_padding; 677 m_buffer.put(header_padding[]); 678 } 679 680 m_buffer.put(bytes); 681 return bytes.length; 682 } 683 684 void flush() 685 { 686 assert(!m_finalized); 687 if (m_buffer.data.length > 0) 688 sendFrame(false); 689 } 690 691 void finalize() 692 { 693 if (m_finalized) return; 694 m_finalized = true; 695 sendFrame(true); 696 } 697 698 private void sendFrame(bool fin) 699 { 700 if (!m_buffer.data.length) 701 write(null, IOMode.once); 702 703 assert(m_buffer.data.length >= Frame.maxHeaderSize); 704 705 Frame frame; 706 frame.fin = fin; 707 frame.opcode = m_frameOpcode; 708 frame.payload = m_buffer.data[Frame.maxHeaderSize .. $]; 709 auto hsize = frame.getHeaderSize(m_rng !is null); 710 auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $]; 711 frame.writeHeader(msg[0 .. hsize], m_rng); 712 m_conn.write(msg); 713 m_conn.flush(); 714 m_buffer.clear(); 715 } 716 717 alias write = OutputStream.write; 718 } 719 720 721 /** 722 Represents a single incoming _WebSocket message as an InputStream. 723 */ 724 final class IncomingWebSocketMessage : InputStream { 725 @safe: 726 private { 727 RandomNumberStream m_rng; 728 Stream m_conn; 729 Frame m_currentFrame; 730 } 731 732 private this(Stream conn, RandomNumberStream rng) 733 { 734 assert(conn !is null); 735 m_conn = conn; 736 m_rng = rng; 737 skipFrame(); // reads the first frame 738 } 739 740 @property bool empty() const { return m_currentFrame.payload.length == 0; } 741 742 @property ulong leastSize() const { return m_currentFrame.payload.length; } 743 744 @property bool dataAvailableForRead() { return true; } 745 746 /// The frame type for this nessage; 747 @property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; } 748 749 const(ubyte)[] peek() { return m_currentFrame.payload; } 750 751 /** 752 * Retrieve the next websocket frame of the stream and discard the current 753 * one 754 * 755 * This function is helpful if one wish to process frames by frames, 756 * or minimize memory allocation, as `peek` will only return the current 757 * frame data, and read requires a pre-allocated buffer. 758 * 759 * Returns: 760 * `false` if the current frame is the final one, `true` if a new frame 761 * was read. 762 */ 763 bool skipFrame() 764 { 765 if (m_currentFrame.fin) 766 return false; 767 768 m_currentFrame = Frame.readFrame(m_conn); 769 return true; 770 } 771 772 size_t read(scope ubyte[] dst, IOMode mode) 773 { 774 size_t nread = 0; 775 776 while (dst.length > 0) { 777 enforceEx!WebSocketException(!empty , "cannot read from empty stream"); 778 enforceEx!WebSocketException(leastSize > 0, "no data available" ); 779 780 import std.algorithm : min; 781 auto sz = cast(size_t)min(leastSize, dst.length); 782 dst[0 .. sz] = m_currentFrame.payload[0 .. sz]; 783 dst = dst[sz .. $]; 784 m_currentFrame.payload = m_currentFrame.payload[sz .. $]; 785 nread += sz; 786 787 if (leastSize == 0) { 788 if (mode == IOMode.immediate || mode == IOMode.once && nread > 0) 789 break; 790 this.skipFrame(); 791 } 792 } 793 794 return nread; 795 } 796 797 alias read = InputStream.read; 798 } 799 800 /// Magic string defined by the RFC for challenging the server during upgrade 801 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 802 803 804 /** 805 * The Opcode is 4 bits, as defined in Section 5.2 806 * 807 * Values are defined in section 11.8 808 * Currently only 6 values are defined, however the opcode is defined as 809 * taking 4 bits. 810 */ 811 private enum FrameOpcode : ubyte { 812 continuation = 0x0, 813 text = 0x1, 814 binary = 0x2, 815 close = 0x8, 816 ping = 0x9, 817 pong = 0xA 818 } 819 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits"); 820 821 822 private struct Frame { 823 @safe: 824 enum maxHeaderSize = 14; 825 826 bool fin; 827 FrameOpcode opcode; 828 ubyte[] payload; 829 830 /** 831 * Return the header length encoded with the expected amount of bits 832 * 833 * The WebSocket RFC define a variable-length payload length. 834 * In short, it means that: 835 * - If the length is <= 125, it is stored as the 7 least significant 836 * bits of the second header byte. The first bit is reserved for MASK. 837 * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of 838 * 126 is stored in the aforementioned 7 bits, and the actual length 839 * is stored in the next two bytes, resulting in a 4 bytes header 840 * ( + masking key, if any). 841 * - If the length is > 65_536, a magic value of 127 will be used for 842 * the 7-bit field, and the next 8 bytes are expected to be the length, 843 * resulting in a 10 bytes header ( + masking key, if any). 844 * 845 * Those functions encapsulate all this logic and allow to just get the 846 * length with the desired size. 847 * 848 * Return: 849 * - For `ubyte`, the value to store in the 7 bits field, either the 850 * length or a magic value (126 or 127). 851 * - For `ushort`, a value in the range [126; 65_536]. 852 * If payload.length is not in this bound, an assertion will be triggered. 853 * - For `ulong`, a value in the range [65_537; size_t.max]. 854 * If payload.length is not in this bound, an assertion will be triggered. 855 */ 856 size_t getHeaderSize(bool mask) 857 { 858 size_t ret = 1; 859 if (payload.length < 126) ret += 1; 860 else if (payload.length < 65536) ret += 3; 861 else ret += 9; 862 if (mask) ret += 4; 863 return ret; 864 } 865 866 void writeHeader(ubyte[] dst, RandomNumberStream sys_rng) 867 { 868 ubyte[4] buff; 869 ubyte firstByte = cast(ubyte)opcode; 870 if (fin) firstByte |= 0x80; 871 dst[0] = firstByte; 872 dst = dst[1 .. $]; 873 874 auto b1 = sys_rng ? 0x80 : 0x00; 875 876 if (payload.length < 126) { 877 dst[0] = cast(ubyte)(b1 | payload.length); 878 dst = dst[1 .. $]; 879 } else if (payload.length < 65536) { 880 dst[0] = cast(ubyte) (b1 | 126); 881 dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length); 882 dst = dst[3 .. $]; 883 } else { 884 dst[0] = cast(ubyte) (b1 | 127); 885 dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length); 886 dst = dst[9 .. $]; 887 } 888 889 if (sys_rng) { 890 sys_rng.read(dst[0 .. 4]); 891 for (size_t i = 0; i < payload.length; i++) 892 payload[i] ^= dst[i % 4]; 893 } 894 } 895 896 static Frame readFrame(InputStream stream) 897 { 898 Frame frame; 899 ubyte[8] data; 900 901 stream.read(data[0 .. 2]); 902 frame.fin = (data[0] & 0x80) != 0; 903 frame.opcode = cast(FrameOpcode)(data[0] & 0x0F); 904 905 bool masked = !!(data[1] & 0b1000_0000); 906 907 //parsing length 908 ulong length = data[1] & 0b0111_1111; 909 if (length == 126) { 910 stream.read(data[0 .. 2]); 911 length = bigEndianToNative!ushort(data[0 .. 2]); 912 } else if (length == 127) { 913 stream.read(data); 914 length = bigEndianToNative!ulong(data); 915 916 // RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes 917 // interpreted as a 64-bit unsigned integer (the most significant 918 // bit MUST be 0) 919 enforceEx!WebSocketException(!(length >> 63), 920 "Received length has a non-zero most significant bit"); 921 922 } 923 logDebug("Read frame: %s %s %s length=%d", 924 frame.opcode, 925 frame.fin ? "final frame" : "continuation", 926 masked ? "masked" : "not masked", 927 length); 928 929 // Masking key is 32 bits / uint 930 if (masked) 931 stream.read(data[0 .. 4]); 932 933 // Read payload 934 // TODO: Provide a way to limit the size read, easy 935 // DOS for server code here (rejectedsoftware/vibe.d#1496). 936 enforceEx!WebSocketException(length <= size_t.max); 937 frame.payload = new ubyte[](cast(size_t)length); 938 stream.read(frame.payload); 939 940 //de-masking 941 if (masked) 942 foreach (size_t i; 0 .. cast(size_t)length) 943 frame.payload[i] = frame.payload[i] ^ data[i % 4]; 944 945 return frame; 946 } 947 } 948 949 unittest { 950 import std.algorithm.searching : all; 951 952 final class DummyRNG : RandomNumberStream { 953 @safe: 954 @property bool empty() { return false; } 955 @property ulong leastSize() { return ulong.max; } 956 @property bool dataAvailableForRead() { return true; } 957 const(ubyte)[] peek() { return null; } 958 size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; } 959 alias read = RandomNumberStream.read; 960 } 961 962 ubyte[14] hdrbuf; 963 auto rng = new DummyRNG; 964 965 Frame f; 966 f.payload = new ubyte[125]; 967 968 assert(f.getHeaderSize(false) == 2); 969 hdrbuf[] = 0; 970 f.writeHeader(hdrbuf[0 .. 2], null); 971 assert(hdrbuf[0 .. 2] == [0, 125]); 972 973 assert(f.getHeaderSize(true) == 6); 974 hdrbuf[] = 0; 975 f.writeHeader(hdrbuf[0 .. 6], rng); 976 assert(hdrbuf[0 .. 2] == [0, 128|125]); 977 assert(hdrbuf[2 .. 6].all!(b => b == 13)); 978 979 f.payload = new ubyte[126]; 980 assert(f.getHeaderSize(false) == 4); 981 hdrbuf[] = 0; 982 f.writeHeader(hdrbuf[0 .. 4], null); 983 assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]); 984 985 assert(f.getHeaderSize(true) == 8); 986 hdrbuf[] = 0; 987 f.writeHeader(hdrbuf[0 .. 8], rng); 988 assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]); 989 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 990 991 f.payload = new ubyte[65535]; 992 assert(f.getHeaderSize(false) == 4); 993 hdrbuf[] = 0; 994 f.writeHeader(hdrbuf[0 .. 4], null); 995 assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]); 996 997 assert(f.getHeaderSize(true) == 8); 998 hdrbuf[] = 0; 999 f.writeHeader(hdrbuf[0 .. 8], rng); 1000 assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]); 1001 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1002 1003 f.payload = new ubyte[65536]; 1004 assert(f.getHeaderSize(false) == 10); 1005 hdrbuf[] = 0; 1006 f.writeHeader(hdrbuf[0 .. 10], null); 1007 assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]); 1008 1009 assert(f.getHeaderSize(true) == 14); 1010 hdrbuf[] = 0; 1011 f.writeHeader(hdrbuf[0 .. 14], rng); 1012 assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]); 1013 assert(hdrbuf[10 .. 14].all!(b => b == 13)); 1014 } 1015 1016 /** 1017 * Generate a challenge key for the protocol upgrade phase. 1018 */ 1019 private string generateChallengeKey(scope RandomNumberStream rng) 1020 { 1021 ubyte[16] buffer; 1022 rng.read(buffer); 1023 return Base64.encode(buffer); 1024 } 1025 1026 private string computeAcceptKey(string challengekey) 1027 { 1028 immutable(ubyte)[] b = challengekey.representation; 1029 immutable(ubyte)[] a = s_webSocketGuid.representation; 1030 SHA1 hash; 1031 hash.start(); 1032 hash.put(b); 1033 hash.put(a); 1034 auto result = Base64.encode(hash.finish()); 1035 return to!(string)(result); 1036 }