1 /**
2 	Implements WebSocket support and fallbacks for older browsers.
3 
4 	Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
5 	Copyright: © 2012-2014 RejectedSoftware e.K.
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Jan Krüger
8 */
9 module vibe.http.websockets;
10 
11 ///
12 @safe unittest {
13 	void handleConn(scope WebSocket sock)
14 	{
15 		// simple echo server
16 		while (sock.connected) {
17 			auto msg = sock.receiveText();
18 			sock.send(msg);
19 		}
20 	}
21 
22 	void startServer()
23 	{
24 		import vibe.http.router;
25 		auto router = new URLRouter;
26 		router.get("/ws", handleWebSockets(&handleConn));
27 
28 		// Start HTTP server using listenHTTP()...
29 	}
30 }
31 
32 import vibe.core.core;
33 import vibe.core.log;
34 import vibe.core.net;
35 import vibe.core.sync;
36 import vibe.stream.operations;
37 import vibe.http.server;
38 import vibe.http.client;
39 import vibe.core.connectionpool;
40 import vibe.utils.array;
41 
42 import core.time;
43 import std.array;
44 import std.base64;
45 import std.conv;
46 import std.exception;
47 import std.bitmanip;
48 import std.digest.sha;
49 import std.string;
50 import std.functional;
51 import std.uuid;
52 import std.base64;
53 import std.digest.sha;
54 import vibe.crypto.cryptorand;
55 
56 @safe:
57 
58 
59 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket);
60 
61 
62 /// Exception thrown by $(D vibe.http.websockets).
63 class WebSocketException: Exception
64 {
65 	@safe pure nothrow:
66 
67 	///
68 	this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
69 	{
70 		super(msg, file, line, next);
71 	}
72 
73 	///
74 	this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__)
75 	{
76 		super(msg, next, file, line);
77 	}
78 }
79 
80 /**
81 	Returns a WebSocket client object that is connected to the specified host.
82 */
83 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings)
84 @safe {
85 	import std.typecons : Tuple, tuple;
86 
87 	auto host = url.host;
88 	auto port = url.port;
89 	bool use_tls = (url.schema == "wss") ? true : false;
90 
91 	if (port == 0)
92 		port = (use_tls) ? 443 : 80;
93 
94 	static struct ConnInfo { string host; ushort port; bool useTLS; string proxyIP; ushort proxyPort; }
95 	static vibe.utils.array.FixedRingBuffer!(Tuple!(ConnInfo, ConnectionPool!HTTPClient), 16) s_connections;
96 	auto   ckey = ConnInfo(host, port, use_tls, settings ? settings.proxyURL.host : null, settings ? settings.proxyURL.port : 0);
97 
98 	ConnectionPool!HTTPClient pool;
99 	foreach (c; s_connections)
100 		if (c[0].host == host && c[0].port == port && c[0].useTLS == use_tls && (settings is null || (c[0].proxyIP == settings.proxyURL.host && c[0].proxyPort == settings.proxyURL.port)))
101 			pool = c[1];
102 
103 	if (!pool)
104 	{
105 		logDebug("Create HTTP client pool %s:%s %s proxy %s:%d", host, port, use_tls, (settings) ? settings.proxyURL.host : string.init, (settings) ? settings.proxyURL.port : 0);
106 		pool = new ConnectionPool!HTTPClient({
107 			auto ret = new HTTPClient;
108 			ret.connect(host, port, use_tls, settings);
109 			return ret;
110 		});
111 		if (s_connections.full)
112 			s_connections.popFront();
113 		s_connections.put(tuple(ckey, pool));
114 	}
115 
116 	auto rng = secureRNG();
117 	auto challengeKey = generateChallengeKey(rng);
118 	auto answerKey = computeAcceptKey(challengeKey);
119 	auto cl = pool.lockConnection();
120 	auto res = cl.request((scope req){
121 		req.requestURL = (url.localURI == "") ? "/" : url.localURI;
122 		req.method = HTTPMethod.GET;
123 		req.headers["Upgrade"] = "websocket";
124 		req.headers["Connection"] = "Upgrade";
125 		req.headers["Sec-WebSocket-Version"] = "13";
126 		req.headers["Sec-WebSocket-Key"] = challengeKey;
127 	});
128 
129 	enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
130 
131 	auto key = "sec-websocket-accept" in res.headers;
132 	enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
133 	enforce(*key == answerKey, "Response has wrong accept key");
134 	auto conn = res.switchProtocol("websocket");
135 	auto ws = new WebSocket(conn, null, rng);
136 	return ws;
137 }
138 
139 /// ditto
140 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings)
141 @safe {
142 	bool use_tls = (url.schema == "wss") ? true : false;
143 	url.schema = use_tls ? "https" : "http";
144 
145 	/*scope*/auto rng = secureRNG();
146 	auto challengeKey = generateChallengeKey(rng);
147 	auto answerKey = computeAcceptKey(challengeKey);
148 
149 	requestHTTP(url,
150 		(scope req) {
151 			req.method = HTTPMethod.GET;
152 			req.headers["Upgrade"] = "websocket";
153 			req.headers["Connection"] = "Upgrade";
154 			req.headers["Sec-WebSocket-Version"] = "13";
155 			req.headers["Sec-WebSocket-Key"] = challengeKey;
156 		},
157 		(scope res) {
158 			enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
159 			auto key = "sec-websocket-accept" in res.headers;
160 			enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
161 			enforce(*key == answerKey, "Response has wrong accept key");
162 			res.switchProtocol("websocket", (scope conn) @trusted {
163 				scope ws = new WebSocket(conn, null, rng);
164 				del(ws);
165 			});
166 		}
167 	);
168 }
169 /// Scheduled for deprecation - use a `@safe` callback instead.
170 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings)
171 @system {
172 	connectWebSocket(url, (scope ws) @trusted => del(ws), settings);
173 }
174 
175 
176 /**
177 	Establishes a web socket conection and passes it to the $(D on_handshake) delegate.
178 */
179 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
180 @safe {
181 	auto pUpgrade = "Upgrade" in req.headers;
182 	auto pConnection = "Connection" in req.headers;
183 	auto pKey = "Sec-WebSocket-Key" in req.headers;
184 	//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
185 	auto pVersion = "Sec-WebSocket-Version" in req.headers;
186 
187 	auto isUpgrade = false;
188 
189 	if( pConnection ) {
190 		auto connectionTypes = split(*pConnection, ",");
191 		foreach( t ; connectionTypes ) {
192 			if( t.strip().toLower() == "upgrade" ) {
193 				isUpgrade = true;
194 				break;
195 			}
196 		}
197 	}
198 
199 	string req_error;
200 	if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests.";
201 	else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header.";
202 	else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported.";
203 	else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header.";
204 
205 	if (req_error.length) {
206 		logDebug("Browser sent invalid WebSocket request: %s", req_error);
207 		res.statusCode = HTTPStatus.badRequest;
208 		res.writeBody(req_error);
209 		return;
210 	}
211 
212 	auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
213 	res.headers["Sec-WebSocket-Accept"] = accept;
214 	res.headers["Connection"] = "Upgrade";
215 	ConnectionStream conn = res.switchProtocol("websocket");
216 
217 	WebSocket socket = new WebSocket(conn, req, null);
218 	try {
219 		on_handshake(socket);
220 	} catch (Exception e) {
221 		logDiagnostic("WebSocket handler failed: %s", e.msg);
222 	}
223 	socket.close();
224 }
225 /// Scheduled for deprecation - use a `@safe` callback instead.
226 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
227 @system {
228 	handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res);
229 }
230 
231 
232 /**
233 	Returns a HTTP request handler that establishes web socket conections.
234 */
235 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake)
236 @safe {
237 	return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ());
238 }
239 /// ditto
240 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake)
241 @safe {
242 	void callback(scope HTTPServerRequest req, scope HTTPServerResponse res)
243 	@safe {
244 		auto pUpgrade = "Upgrade" in req.headers;
245 		auto pConnection = "Connection" in req.headers;
246 		auto pKey = "Sec-WebSocket-Key" in req.headers;
247 		//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
248 		auto pVersion = "Sec-WebSocket-Version" in req.headers;
249 
250 		auto isUpgrade = false;
251 
252 		if( pConnection ) {
253 			auto connectionTypes = split(*pConnection, ",");
254 			foreach( t ; connectionTypes ) {
255 				if( t.strip().toLower() == "upgrade" ) {
256 					isUpgrade = true;
257 					break;
258 				}
259 			}
260 		}
261 		if( !(isUpgrade &&
262 			  pUpgrade && icmp(*pUpgrade, "websocket") == 0 &&
263 			  pKey &&
264 			  pVersion && *pVersion == "13") )
265 		{
266 			logDebug("Browser sent invalid WebSocket request.");
267 			res.statusCode = HTTPStatus.badRequest;
268 			res.writeVoidBody();
269 			return;
270 		}
271 
272 		auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
273 		res.headers["Sec-WebSocket-Accept"] = accept;
274 		res.headers["Connection"] = "Upgrade";
275 		res.switchProtocol("websocket", (scope conn) {
276 			// TODO: put back 'scope' once it is actually enforced by DMD
277 			/*scope*/ auto socket = new WebSocket(conn, req, null);
278 			try on_handshake(socket);
279 			catch (Exception e) {
280 				logDiagnostic("WebSocket handler failed: %s", e.msg);
281 			}
282 			socket.close();
283 		});
284 	}
285 	return &callback;
286 }
287 /// Scheduled for deprecation - use a `@safe` callback instead.
288 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake)
289 @system {
290 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
291 }
292 /// Scheduled for deprecation - use a `@safe` callback instead.
293 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake)
294 @system {
295 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
296 }
297 
298 
299 /**
300  * Represents a single _WebSocket connection.
301  *
302  * ---
303  * shared static this ()
304  * {
305  *   runTask(() => connectToWS());
306  * }
307  *
308  * void connectToWS ()
309  * {
310  *   auto ws_url = URL("wss://websockets.example.com/websocket/auth_token");
311  *   auto ws = connectWebSocket(ws_url);
312  *   logInfo("WebSocket connected");
313  *
314  *   while (ws.waitForData())
315  *   {
316  *     auto txt = ws.receiveText;
317  *     logInfo("Received: %s", txt);
318  *   }
319  *   logFatal("Connection lost!");
320  * }
321  * ---
322  */
323 final class WebSocket {
324 @safe:
325 
326 	private {
327 		ConnectionStream m_conn;
328 		bool m_sentCloseFrame = false;
329 		IncomingWebSocketMessage m_nextMessage = null;
330 		const HTTPServerRequest m_request;
331 		Task m_reader;
332 		InterruptibleTaskMutex m_readMutex, m_writeMutex;
333 		InterruptibleTaskCondition m_readCondition;
334 		Timer m_pingTimer;
335 		uint m_lastPingIndex;
336 		bool m_pongReceived;
337 		short m_closeCode;
338 		const(char)[] m_closeReason;
339 		/// The entropy generator to use
340 		/// If not null, it means this is a server socket.
341 		RandomNumberStream m_rng;
342 	}
343 
344 	/**
345 	 * Private constructor, called from `connectWebSocket`.
346 	 *
347 	 * Params:
348 	 *	 conn = Underlying connection string
349 	 *	 request = HTTP request used to establish the connection
350 	 *	 rng = Source of entropy to use.  If null, assume we're a server socket
351 	 */
352 	private this(ConnectionStream conn, in HTTPServerRequest request,
353 				 RandomNumberStream rng)
354 	{
355 		m_conn = conn;
356 		m_request = request;
357 		assert(m_conn);
358 		m_rng = rng;
359 		m_writeMutex = new InterruptibleTaskMutex;
360 		m_readMutex = new InterruptibleTaskMutex;
361 		m_readCondition = new InterruptibleTaskCondition(m_readMutex);
362 		m_readMutex.performLocked!({
363 			m_reader = runTask(&startReader);
364 			if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) {
365 				m_pongReceived = true;
366 				m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true);
367 			}
368 		});
369 	}
370 
371 	/**
372 		Determines if the WebSocket connection is still alive and ready for sending.
373 
374 		Note that for determining the ready state for $(EM reading), you need
375 		to use $(D waitForData) instead, because both methods can return
376 		different values while a disconnect is in proress.
377 
378 		See_also: $(D waitForData)
379 	*/
380 	@property bool connected() { return m_conn.connected && !m_sentCloseFrame; }
381 
382 	/**
383 		Returns the close code sent by the remote end.
384 
385 		Note if the connection was never opened, is still alive, or was closed
386 		locally this value will be 0. If no close code was given by the remote
387 		end in the close frame, the value will be 1005. If the connection was
388 		not closed cleanly by the remote end, this value will be 1006.
389 	*/
390 	@property short closeCode() { return m_closeCode; }
391 
392 	/**
393 		Returns the close reason sent by the remote end.
394 
395 		Note if the connection was never opened, is still alive, or was closed
396 		locally this value will be an empty string.
397 	*/
398 	@property const(char)[] closeReason() { return m_closeReason; }
399 
400 	/**
401 		The HTTP request that established the web socket connection.
402 	*/
403 	@property const(HTTPServerRequest) request() const { return m_request; }
404 
405 	/**
406 		Checks if data is readily available for read.
407 	*/
408 	@property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; }
409 
410 	/** Waits until either a message arrives or until the connection is closed.
411 
412 		This function can be used in a read loop to cleanly determine when to stop reading.
413 	*/
414 	bool waitForData()
415 	{
416 		if (m_nextMessage) return true;
417 
418 		m_readMutex.performLocked!({
419 			while (connected && m_nextMessage is null)
420 				m_readCondition.wait();
421 		});
422 		return m_nextMessage !is null;
423 	}
424 
425 	/// ditto
426 	bool waitForData(Duration timeout)
427 	{
428 		import std.datetime;
429 
430 		if (m_nextMessage) return true;
431 
432 		immutable limit_time = Clock.currTime(UTC()) + timeout;
433 
434 		m_readMutex.performLocked!({
435 			while (connected && m_nextMessage is null && timeout > 0.seconds) {
436 				m_readCondition.wait(timeout);
437 				timeout = limit_time - Clock.currTime(UTC());
438 			}
439 		});
440 		return m_nextMessage !is null;
441 	}
442 
443 	/**
444 		Sends a text message.
445 
446 		On the JavaScript side, the text will be available as message.data (type string).
447 
448 		Throws:
449 			A `WebSocketException` is thrown if the connection gets closed
450 			before or during the transfer of the message.
451 	*/
452 	void send(scope const(char)[] data)
453 	{
454 		send(
455 			(scope message) { message.write(cast(const ubyte[])data); },
456 			FrameOpcode.text);
457 	}
458 
459 	/**
460 		Sends a binary message.
461 
462 		On the JavaScript side, the text will be available as message.data (type Blob).
463 
464 		Throws:
465 			A `WebSocketException` is thrown if the connection gets closed
466 			before or during the transfer of the message.
467 	*/
468 	void send(in ubyte[] data)
469 	{
470 		send((scope message){ message.write(data); }, FrameOpcode.binary);
471 	}
472 
473 	/**
474 		Sends a message using an output stream.
475 
476 		Throws:
477 			A `WebSocketException` is thrown if the connection gets closed
478 			before or during the transfer of the message.
479 	*/
480 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode)
481 	{
482 		m_writeMutex.performLocked!({
483 			enforceEx!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
484 			/*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng);
485 			scope(exit) message.finalize();
486 			sender(message);
487 		});
488 	}
489 
490 	/// Compatibility overload - will be removed soon.
491 	deprecated("Call the overload which requires an explicit FrameOpcode.")
492 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender)
493 	{
494 		send(sender, FrameOpcode.text);
495 	}
496 
497 	/**
498 		Actively closes the connection.
499 
500 		Params:
501 			code = Numeric code indicating a termination reason.
502 			reason = Message describing why the connection was terminated.
503 	*/
504 	void close(short code = 0, scope const(char)[] reason = "")
505 	{
506 		//control frame payloads are limited to 125 bytes
507 		assert(reason.length <= 123);
508 
509 		if (connected) {
510 			send((scope msg) {
511 				m_sentCloseFrame = true;
512 				if (code != 0)
513 					msg.write(std.bitmanip.nativeToBigEndian(code));
514 					msg.write(cast(const ubyte[])reason);
515 				}, FrameOpcode.close);
516 		}
517 		if (m_pingTimer) m_pingTimer.stop();
518 		if (Task.getThis() != m_reader) m_reader.join();
519 	}
520 
521 	/**
522 		Receives a new message and returns its contents as a newly allocated data array.
523 
524 		Params:
525 			strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise.
526 		Throws: WebSocketException if the connection is closed or
527 			if $(D strict == true) and the frame received is not the right type
528 	*/
529 	ubyte[] receiveBinary(bool strict = true)
530 	{
531 		ubyte[] ret;
532 		receive((scope message){
533 			enforceEx!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary,
534 				"Expected a binary message, got "~message.frameOpcode.to!string());
535 			ret = message.readAll();
536 		});
537 		return ret;
538 	}
539 	/// ditto
540 	string receiveText(bool strict = true)
541 	{
542 		string ret;
543 		receive((scope message){
544 			enforceEx!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text,
545 				"Expected a text message, got "~message.frameOpcode.to!string());
546 			ret = message.readAllUTF8();
547 		});
548 		return ret;
549 	}
550 
551 	/**
552 		Receives a new message using an InputStream.
553 		Throws: WebSocketException if the connection is closed.
554 	*/
555 	void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver)
556 	{
557 		m_readMutex.performLocked!({
558 			while (!m_nextMessage) {
559 				enforceEx!WebSocketException(connected, "Connection closed while reading message.");
560 				m_readCondition.wait();
561 			}
562 			receiver(m_nextMessage);
563 			m_nextMessage = null;
564 			m_readCondition.notifyAll();
565 		});
566 	}
567 
568 	private void startReader()
569 	{
570 		m_readMutex.performLocked!({}); //Wait until initialization
571 		scope (exit) m_readCondition.notifyAll();
572 		try {
573 			while (!m_conn.empty) {
574 				assert(!m_nextMessage);
575 				/*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng);
576 
577 				switch (msg.frameOpcode) {
578 					default: throw new WebSocketException("unknown frame opcode");
579 					case FrameOpcode.ping:
580 						send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong);
581 						break;
582 					case FrameOpcode.pong:
583 						// test if pong matches previous ping
584 						if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) {
585 							logDebugV("Received PONG that doesn't match previous ping.");
586 							break;
587 						}
588 						logDebugV("Received matching PONG.");
589 						m_pongReceived = true;
590 						break;
591 					case FrameOpcode.close:
592 						logDebug("Got closing frame (%s)", m_sentCloseFrame);
593 
594 						// If no close code was passed, we default to 1005
595 						this.m_closeCode = 1005;
596 
597 						// If provided in the frame, attempt to parse the close code/reason
598 						if (msg.peek().length >= short.sizeof) {
599 							this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);
600 
601 							if (msg.peek().length > short.sizeof) {
602 								this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
603 							}
604 						}
605 
606 						if(!m_sentCloseFrame) close();
607 						logDebug("Terminating connection (%s)", m_sentCloseFrame);
608 						m_conn.close();
609 						return;
610 					case FrameOpcode.text:
611 					case FrameOpcode.binary:
612 					case FrameOpcode.continuation: // FIXME: add proper support for continuation frames!
613 						m_readMutex.performLocked!({
614 							m_nextMessage = msg;
615 							m_readCondition.notifyAll();
616 							while (m_nextMessage) m_readCondition.wait();
617 						});
618 						break;
619 				}
620 			}
621 		} catch (Exception e) {
622 			logDiagnostic("Error while reading websocket message: %s", e.msg);
623 			logDiagnostic("Closing connection.");
624 		}
625 
626 		// If no close code was passed, e.g. this was an unclean termination
627 		//  of our websocket connection, set the close code to 1006.
628 		if (this.m_closeCode == 0) this.m_closeCode = 1006;
629 		m_writeMutex.performLocked!({ m_conn.close(); });
630 	}
631 
632 	private void sendPing()
633 	nothrow {
634 		try {
635 			if (!m_pongReceived) {
636 				logDebug("Pong skipped. Closing connection.");
637 				m_writeMutex.performLocked!({ m_conn.close(); });
638 				m_pingTimer.stop();
639 				return;
640 			}
641 			m_pongReceived = false;
642 			send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
643 			logDebugV("Ping sent");
644 		} catch (Exception e) {
645 			logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
646 		}
647 	}
648 }
649 
650 /**
651 	Represents a single outgoing _WebSocket message as an OutputStream.
652 */
653 final class OutgoingWebSocketMessage : OutputStream {
654 @safe:
655 	private {
656 		RandomNumberStream m_rng;
657 		Stream m_conn;
658 		FrameOpcode m_frameOpcode;
659 		Appender!(ubyte[]) m_buffer;
660 		bool m_finalized = false;
661 	}
662 
663 	private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
664 	{
665 		assert(conn !is null);
666 		m_conn = conn;
667 		m_frameOpcode = frameOpcode;
668 		m_rng = rng;
669 	}
670 
671 	size_t write(in ubyte[] bytes, IOMode mode)
672 	{
673 		assert(!m_finalized);
674 
675 		if (!m_buffer.data.length) {
676 			ubyte[Frame.maxHeaderSize] header_padding;
677 			m_buffer.put(header_padding[]);
678 		}
679 
680 		m_buffer.put(bytes);
681 		return bytes.length;
682 	}
683 
684 	void flush()
685 	{
686 		assert(!m_finalized);
687 		if (m_buffer.data.length > 0)
688 			sendFrame(false);
689 	}
690 
691 	void finalize()
692 	{
693 		if (m_finalized) return;
694 		m_finalized = true;
695 		sendFrame(true);
696 	}
697 
698 	private void sendFrame(bool fin)
699 	{
700 		if (!m_buffer.data.length)
701 			write(null, IOMode.once);
702 
703 		assert(m_buffer.data.length >= Frame.maxHeaderSize);
704 
705 		Frame frame;
706 		frame.fin = fin;
707 		frame.opcode = m_frameOpcode;
708 		frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
709 		auto hsize = frame.getHeaderSize(m_rng !is null);
710 		auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
711 		frame.writeHeader(msg[0 .. hsize], m_rng);
712 		m_conn.write(msg);
713 		m_conn.flush();
714 		m_buffer.clear();
715 	}
716 
717 	alias write = OutputStream.write;
718 }
719 
720 
721 /**
722 	Represents a single incoming _WebSocket message as an InputStream.
723 */
724 final class IncomingWebSocketMessage : InputStream {
725 @safe:
726 	private {
727 		RandomNumberStream m_rng;
728 		Stream m_conn;
729 		Frame m_currentFrame;
730 	}
731 
732 	private this(Stream conn, RandomNumberStream rng)
733 	{
734 		assert(conn !is null);
735 		m_conn = conn;
736 		m_rng = rng;
737 		skipFrame(); // reads the first frame
738 	}
739 
740 	@property bool empty() const { return m_currentFrame.payload.length == 0; }
741 
742 	@property ulong leastSize() const { return m_currentFrame.payload.length; }
743 
744 	@property bool dataAvailableForRead() { return true; }
745 
746 	/// The frame type for this nessage;
747 	@property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; }
748 
749 	const(ubyte)[] peek() { return m_currentFrame.payload; }
750 
751 	/**
752 	 * Retrieve the next websocket frame of the stream and discard the current
753 	 * one
754 	 *
755 	 * This function is helpful if one wish to process frames by frames,
756 	 * or minimize memory allocation, as `peek` will only return the current
757 	 * frame data, and read requires a pre-allocated buffer.
758 	 *
759 	 * Returns:
760 	 * `false` if the current frame is the final one, `true` if a new frame
761 	 * was read.
762 	 */
763 	bool skipFrame()
764 	{
765 		if (m_currentFrame.fin)
766 			return false;
767 
768 		m_currentFrame = Frame.readFrame(m_conn);
769 		return true;
770 	}
771 
772 	size_t read(scope ubyte[] dst, IOMode mode)
773 	{
774 		size_t nread = 0;
775 
776 		while (dst.length > 0) {
777 			enforceEx!WebSocketException(!empty , "cannot read from empty stream");
778 			enforceEx!WebSocketException(leastSize > 0, "no data available" );
779 
780 			import std.algorithm : min;
781 			auto sz = cast(size_t)min(leastSize, dst.length);
782 			dst[0 .. sz] = m_currentFrame.payload[0 .. sz];
783 			dst = dst[sz .. $];
784 			m_currentFrame.payload = m_currentFrame.payload[sz .. $];
785 			nread += sz;
786 
787 			if (leastSize == 0) {
788 				if (mode == IOMode.immediate || mode == IOMode.once && nread > 0)
789 					break;
790 				this.skipFrame();
791 			}
792 		}
793 
794 		return nread;
795 	}
796 
797 	alias read = InputStream.read;
798 }
799 
800 /// Magic string defined by the RFC for challenging the server during upgrade
801 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
802 
803 
804 /**
805  * The Opcode is 4 bits, as defined in Section 5.2
806  *
807  * Values are defined in section 11.8
808  * Currently only 6 values are defined, however the opcode is defined as
809  * taking 4 bits.
810  */
811 private enum FrameOpcode : ubyte {
812 	continuation = 0x0,
813 	text = 0x1,
814 	binary = 0x2,
815 	close = 0x8,
816 	ping = 0x9,
817 	pong = 0xA
818 }
819 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits");
820 
821 
822 private struct Frame {
823 @safe:
824 	enum maxHeaderSize = 14;
825 
826 	bool fin;
827 	FrameOpcode opcode;
828 	ubyte[] payload;
829 
830     /**
831      * Return the header length encoded with the expected amount of bits
832      *
833      * The WebSocket RFC define a variable-length payload length.
834      * In short, it means that:
835      * - If the length is <= 125, it is stored as the 7 least significant
836      *   bits of the second header byte.  The first bit is reserved for MASK.
837      * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of
838      *   126 is stored in the aforementioned 7 bits, and the actual length
839      *   is stored in the next two bytes, resulting in a 4 bytes header
840      *   ( + masking key, if any).
841      * - If the length is > 65_536, a magic value of 127 will be used for
842      *   the 7-bit field, and the next 8 bytes are expected to be the length,
843      *   resulting in a 10 bytes header ( + masking key, if any).
844      *
845      * Those functions encapsulate all this logic and allow to just get the
846      * length with the desired size.
847      *
848      * Return:
849      * - For `ubyte`, the value to store in the 7 bits field, either the
850      *   length or a magic value (126 or 127).
851      * - For `ushort`, a value in the range [126; 65_536].
852      *   If payload.length is not in this bound, an assertion will be triggered.
853      * - For `ulong`, a value in the range [65_537; size_t.max].
854      *   If payload.length is not in this bound, an assertion will be triggered.
855      */
856 	size_t getHeaderSize(bool mask)
857 	{
858 		size_t ret = 1;
859 		if (payload.length < 126) ret += 1;
860 		else if (payload.length < 65536) ret += 3;
861 		else ret += 9;
862 		if (mask) ret += 4;
863 		return ret;
864 	}
865 
866 	void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
867 	{
868 		ubyte[4] buff;
869 		ubyte firstByte = cast(ubyte)opcode;
870 		if (fin) firstByte |= 0x80;
871 		dst[0] = firstByte;
872 		dst = dst[1 .. $];
873 
874 		auto b1 = sys_rng ? 0x80 : 0x00;
875 
876 		if (payload.length < 126) {
877 			dst[0] = cast(ubyte)(b1 | payload.length);
878 			dst = dst[1 .. $];
879 		} else if (payload.length < 65536) {
880 			dst[0] = cast(ubyte) (b1 | 126);
881 			dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
882 			dst = dst[3 .. $];
883 		} else {
884 			dst[0] = cast(ubyte) (b1 | 127);
885 			dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
886 			dst = dst[9 .. $];
887 		}
888 
889 		if (sys_rng) {
890             sys_rng.read(dst[0 .. 4]);
891 			for (size_t i = 0; i < payload.length; i++)
892 				payload[i] ^= dst[i % 4];
893 		}
894 	}
895 
896 	static Frame readFrame(InputStream stream)
897 	{
898 		Frame frame;
899 		ubyte[8] data;
900 
901 		stream.read(data[0 .. 2]);
902 		frame.fin = (data[0] & 0x80) != 0;
903 		frame.opcode = cast(FrameOpcode)(data[0] & 0x0F);
904 
905 		bool masked = !!(data[1] & 0b1000_0000);
906 
907 		//parsing length
908 		ulong length = data[1] & 0b0111_1111;
909 		if (length == 126) {
910 			stream.read(data[0 .. 2]);
911 			length = bigEndianToNative!ushort(data[0 .. 2]);
912 		} else if (length == 127) {
913 			stream.read(data);
914 			length = bigEndianToNative!ulong(data);
915 
916 			// RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes
917 			// interpreted as a 64-bit unsigned integer (the most significant
918 			// bit MUST be 0)
919 			enforceEx!WebSocketException(!(length >> 63),
920 				"Received length has a non-zero most significant bit");
921 
922 		}
923 		logDebug("Read frame: %s %s %s length=%d",
924 				 frame.opcode,
925 				 frame.fin ? "final frame" : "continuation",
926 				 masked ? "masked" : "not masked",
927 				 length);
928 
929 		// Masking key is 32 bits / uint
930 		if (masked)
931 			stream.read(data[0 .. 4]);
932 
933 		// Read payload
934 		// TODO: Provide a way to limit the size read, easy
935 		// DOS for server code here (rejectedsoftware/vibe.d#1496).
936 		enforceEx!WebSocketException(length <= size_t.max);
937 		frame.payload = new ubyte[](cast(size_t)length);
938 		stream.read(frame.payload);
939 
940 		//de-masking
941 		if (masked)
942 			foreach (size_t i; 0 .. cast(size_t)length)
943 				frame.payload[i] = frame.payload[i] ^ data[i % 4];
944 
945 		return frame;
946 	}
947 }
948 
949 unittest {
950 	import std.algorithm.searching : all;
951 
952 	final class DummyRNG : RandomNumberStream {
953 	@safe:
954 		@property bool empty() { return false; }
955 		@property ulong leastSize() { return ulong.max; }
956 		@property bool dataAvailableForRead() { return true; }
957 		const(ubyte)[] peek() { return null; }
958 		size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; }
959 		alias read = RandomNumberStream.read;
960 	}
961 
962 	ubyte[14] hdrbuf;
963 	auto rng = new DummyRNG;
964 
965 	Frame f;
966 	f.payload = new ubyte[125];
967 
968 	assert(f.getHeaderSize(false) == 2);
969 	hdrbuf[] = 0;
970 	f.writeHeader(hdrbuf[0 .. 2], null);
971 	assert(hdrbuf[0 .. 2] == [0, 125]);
972 
973 	assert(f.getHeaderSize(true) == 6);
974 	hdrbuf[] = 0;
975 	f.writeHeader(hdrbuf[0 .. 6], rng);
976 	assert(hdrbuf[0 .. 2] == [0, 128|125]);
977 	assert(hdrbuf[2 .. 6].all!(b => b == 13));
978 
979 	f.payload = new ubyte[126];
980 	assert(f.getHeaderSize(false) == 4);
981 	hdrbuf[] = 0;
982 	f.writeHeader(hdrbuf[0 .. 4], null);
983 	assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]);
984 
985 	assert(f.getHeaderSize(true) == 8);
986 	hdrbuf[] = 0;
987 	f.writeHeader(hdrbuf[0 .. 8], rng);
988 	assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]);
989 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
990 
991 	f.payload = new ubyte[65535];
992 	assert(f.getHeaderSize(false) == 4);
993 	hdrbuf[] = 0;
994 	f.writeHeader(hdrbuf[0 .. 4], null);
995 	assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]);
996 
997 	assert(f.getHeaderSize(true) == 8);
998 	hdrbuf[] = 0;
999 	f.writeHeader(hdrbuf[0 .. 8], rng);
1000 	assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]);
1001 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1002 
1003 	f.payload = new ubyte[65536];
1004 	assert(f.getHeaderSize(false) == 10);
1005 	hdrbuf[] = 0;
1006 	f.writeHeader(hdrbuf[0 .. 10], null);
1007 	assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]);
1008 
1009 	assert(f.getHeaderSize(true) == 14);
1010 	hdrbuf[] = 0;
1011 	f.writeHeader(hdrbuf[0 .. 14], rng);
1012 	assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]);
1013 	assert(hdrbuf[10 .. 14].all!(b => b == 13));
1014 }
1015 
1016 /**
1017  * Generate a challenge key for the protocol upgrade phase.
1018  */
1019 private string generateChallengeKey(scope RandomNumberStream rng)
1020 {
1021 	ubyte[16] buffer;
1022 	rng.read(buffer);
1023 	return Base64.encode(buffer);
1024 }
1025 
1026 private string computeAcceptKey(string challengekey)
1027 {
1028 	immutable(ubyte)[] b = challengekey.representation;
1029 	immutable(ubyte)[] a = s_webSocketGuid.representation;
1030 	SHA1 hash;
1031 	hash.start();
1032 	hash.put(b);
1033 	hash.put(a);
1034 	auto result = Base64.encode(hash.finish());
1035 	return to!(string)(result);
1036 }