1 /**
2 	Implements WebSocket support and fallbacks for older browsers.
3 
4 	Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
5 	Copyright: © 2012-2014 Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Jan Krüger
8 */
9 module vibe.http.websockets;
10 
11 ///
12 @safe unittest {
13 	void handleConn(scope WebSocket sock)
14 	{
15 		// simple echo server
16 		while (sock.connected) {
17 			auto msg = sock.receiveText();
18 			sock.send(msg);
19 		}
20 	}
21 
22 	void startServer()
23 	{
24 		import vibe.http.router;
25 		auto router = new URLRouter;
26 		router.get("/ws", handleWebSockets(&handleConn));
27 
28 		// Start HTTP server using listenHTTP()...
29 	}
30 }
31 
32 import vibe.core.core;
33 import vibe.core.log;
34 import vibe.core.net;
35 import vibe.core.sync;
36 import vibe.stream.operations;
37 import vibe.http.server;
38 import vibe.http.client;
39 import vibe.core.connectionpool;
40 import vibe.utils.array;
41 
42 import core.time;
43 import std.algorithm: equal, splitter;
44 import std.array;
45 import std.base64;
46 import std.conv;
47 import std.exception;
48 import std.bitmanip;
49 import std.digest.sha;
50 import std.string;
51 import std.functional;
52 import std.uuid;
53 import std.base64;
54 import std.digest.sha;
55 import std.uni: asLowerCase;
56 import vibe.crypto.cryptorand;
57 
58 @safe:
59 
60 
61 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow;
62 
63 
64 /// Exception thrown by $(D vibe.http.websockets).
65 class WebSocketException: Exception
66 {
67 	@safe pure nothrow:
68 
69 	///
70 	this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
71 	{
72 		super(msg, file, line, next);
73 	}
74 
75 	///
76 	this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__)
77 	{
78 		super(msg, next, file, line);
79 	}
80 }
81 
82 /** Establishes a WebSocket connection at the specified endpoint.
83 */
84 WebSocket connectWebSocketEx(URL url,
85 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
86 	const(HTTPClientSettings) settings = defaultSettings)
87 @safe {
88 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
89 	url.schema = use_tls ? "https" : "http";
90 
91 	auto rng = secureRNG();
92 	auto challengeKey = generateChallengeKey(rng);
93 	auto answerKey = computeAcceptKey(challengeKey);
94 	auto res = requestHTTP(url, (scope req){
95 		req.method = HTTPMethod.GET;
96 		req.headers["Upgrade"] = "websocket";
97 		req.headers["Connection"] = "Upgrade";
98 		req.headers["Sec-WebSocket-Version"] = "13";
99 		req.headers["Sec-WebSocket-Key"] = challengeKey;
100 		request_modifier(req);
101 	}, settings);
102 
103 	enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
104 
105 	auto key = "sec-websocket-accept" in res.headers;
106 	enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
107 	enforce(*key == answerKey, "Response has wrong accept key");
108 	auto conn = res.switchProtocol("websocket");
109 	return new WebSocket(conn, rng, res);
110 }
111 
112 /// ditto
113 void connectWebSocketEx(URL url,
114 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
115 	scope WebSocketHandshakeDelegate del,
116 	const(HTTPClientSettings) settings = defaultSettings)
117 @safe {
118 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
119 	url.schema = use_tls ? "https" : "http";
120 
121 	/*scope*/auto rng = secureRNG();
122 	auto challengeKey = generateChallengeKey(rng);
123 	auto answerKey = computeAcceptKey(challengeKey);
124 
125 	requestHTTP(url,
126 		(scope req) {
127 			req.method = HTTPMethod.GET;
128 			req.headers["Upgrade"] = "websocket";
129 			req.headers["Connection"] = "Upgrade";
130 			req.headers["Sec-WebSocket-Version"] = "13";
131 			req.headers["Sec-WebSocket-Key"] = challengeKey;
132 			request_modifier(req);
133 		},
134 		(scope res) {
135 			enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
136 			auto key = "sec-websocket-accept" in res.headers;
137 			enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
138 			enforce(*key == answerKey, "Response has wrong accept key");
139 			res.switchProtocol("websocket", (scope conn) @trusted {
140 				scope ws = new WebSocket(conn, rng, res);
141 				del(ws);
142 				if (ws.connected) ws.close();
143 			});
144 		},
145 		settings
146 	);
147 }
148 
149 /// ditto
150 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings)
151 @safe {
152 	return connectWebSocketEx(url, (scope req) {}, settings);
153 }
154 /// ditto
155 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings)
156 @safe {
157 	connectWebSocketEx(url, (scope req) {}, del, settings);
158 }
159 /// ditto
160 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings)
161 @system {
162 	connectWebSocket(url, (scope ws) nothrow {
163 		try del(ws);
164 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
165 	}, settings);
166 }
167 /// Scheduled for deprecation - use a `@safe` callback instead.
168 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings)
169 @system {
170 	connectWebSocket(url, (scope ws) @trusted => del(ws), settings);
171 }
172 /// Scheduled for deprecation - use a `nothrow` callback instead.
173 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings)
174 @safe {
175 	connectWebSocket(url, (scope ws) nothrow {
176 		try del(ws);
177 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
178 	}, settings);
179 }
180 
181 
182 /**
183 	Establishes a web socket conection and passes it to the $(D on_handshake) delegate.
184 */
185 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
186 @safe {
187 	auto pUpgrade = "Upgrade" in req.headers;
188 	auto pConnection = "Connection" in req.headers;
189 	auto pKey = "Sec-WebSocket-Key" in req.headers;
190 	//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
191 	auto pVersion = "Sec-WebSocket-Version" in req.headers;
192 
193 	auto isUpgrade = false;
194 
195 	if( pConnection ) {
196 		auto connectionTypes = splitter(*pConnection, ",");
197 		foreach( t ; connectionTypes ) {
198 			if( t.strip().asLowerCase().equal("upgrade") ) {
199 				isUpgrade = true;
200 				break;
201 			}
202 		}
203 	}
204 
205 	string req_error;
206 	if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests.";
207 	else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header.";
208 	else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported.";
209 	else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header.";
210 
211 	if (req_error.length) {
212 		logDebug("Browser sent invalid WebSocket request: %s", req_error);
213 		res.statusCode = HTTPStatus.badRequest;
214 		res.writeBody(req_error);
215 		return;
216 	}
217 
218 	auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
219 	res.headers["Sec-WebSocket-Accept"] = accept;
220 	res.headers["Connection"] = "Upgrade";
221 	ConnectionStream conn = res.switchProtocol("websocket");
222 
223 	// NOTE: silencing scope warning here - WebSocket references the scoped
224 	//       req/res objects throughout its lifetime, which has a narrower scope
225 	scope socket = () @trusted { return new WebSocket(conn, req, res); } ();
226 	try {
227 		on_handshake(socket);
228 	} catch (Exception e) {
229 		logDiagnostic("WebSocket handler failed: %s", e.msg);
230 	}
231 	socket.close();
232 }
233 /// Scheduled for deprecation - use a `@safe` callback instead.
234 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
235 @system {
236 	handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res);
237 }
238 /// Scheduled for deprecation - use a `nothrow` callback instead.
239 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
240 {
241 	handleWebSocket((scope ws) nothrow {
242 		try on_handshake(ws);
243 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
244 	}, req, res);
245 }
246 /// ditto
247 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
248 @system {
249 	handleWebSocket((scope ws) nothrow {
250 		try on_handshake(ws);
251 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
252 	}, req, res);
253 }
254 
255 
256 /**
257 	Returns a HTTP request handler that establishes web socket conections.
258 */
259 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake)
260 @safe {
261 	return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ());
262 }
263 /// ditto
264 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake)
265 @safe {
266 	void callback(scope HTTPServerRequest req, scope HTTPServerResponse res)
267 	@safe {
268 		auto pUpgrade = "Upgrade" in req.headers;
269 		auto pConnection = "Connection" in req.headers;
270 		auto pKey = "Sec-WebSocket-Key" in req.headers;
271 		//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
272 		auto pVersion = "Sec-WebSocket-Version" in req.headers;
273 
274 		auto isUpgrade = false;
275 
276 		if( pConnection ) {
277 			auto connectionTypes = splitter(*pConnection, ",");
278 			foreach( t ; connectionTypes ) {
279 				if( t.strip().asLowerCase().equal("upgrade") ) {
280 					isUpgrade = true;
281 					break;
282 				}
283 			}
284 		}
285 		if( !(isUpgrade &&
286 			  pUpgrade && icmp(*pUpgrade, "websocket") == 0 &&
287 			  pKey &&
288 			  pVersion && *pVersion == "13") )
289 		{
290 			logDebug("Browser sent invalid WebSocket request.");
291 			res.statusCode = HTTPStatus.badRequest;
292 			res.writeVoidBody();
293 			return;
294 		}
295 
296 		auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
297 		res.headers["Sec-WebSocket-Accept"] = accept;
298 		res.headers["Connection"] = "Upgrade";
299 		res.switchProtocol("websocket", (scope conn) {
300 			// TODO: put back 'scope' once it is actually enforced by DMD
301 			/*scope*/ auto socket = new WebSocket(conn, req, res);
302 			try on_handshake(socket);
303 			catch (Exception e) {
304 				logDiagnostic("WebSocket handler failed: %s", e.msg);
305 			}
306 			socket.close();
307 		});
308 	}
309 	return &callback;
310 }
311 /// Scheduled for deprecation - use a `@safe` callback instead.
312 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake)
313 @system {
314 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
315 }
316 /// Scheduled for deprecation - use a `@safe` callback instead.
317 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake)
318 @system {
319 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
320 }
321 /// Scheduled for deprecation - use a `nothrow` callback instead.
322 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake)
323 {
324 	return handleWebSockets(delegate (scope ws) nothrow {
325 		try on_handshake(ws);
326 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
327 	});
328 }
329 /// ditto
330 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake)
331 {
332 	return handleWebSockets(delegate (scope ws) nothrow {
333 		try on_handshake(ws);
334 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
335 	});
336 }
337 /// ditto
338 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake)
339 @system {
340 	return handleWebSockets(delegate (scope ws) nothrow {
341 		try on_handshake(ws);
342 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
343 	});
344 }
345 /// ditto
346 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake)
347 @system {
348 	return handleWebSockets(delegate (scope ws) nothrow {
349 		try on_handshake(ws);
350 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
351 	});
352 }
353 
354 /**
355  * Provides the reason that a websocket connection has closed.
356  *
357  * Further documentation for the WebSocket and it's codes can be found from:
358  * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
359  *
360  * ---
361  *
362  * void echoSocket(scope WebSocket sock)
363  * {
364  *   import std.datetime : seconds;
365  *
366  *   while(sock.waitForData(3.seconds))
367  *   {
368  *     string msg = sock.receiveText;
369  *     logInfo("Got a message: %s", msg);
370  *     sock.send(msg);
371  *   }
372  *
373  *   if(sock.connected)
374  *     sock.close(WebSocketCloseReason.policyViolation, "timeout");
375  * }
376  * ---
377  */
378 enum WebSocketCloseReason : short
379 {
380 	none = 0,
381 	normalClosure = 1000,
382 	goingAway = 1001,
383 	protocolError = 1002,
384 	unsupportedData = 1003,
385 	noStatusReceived = 1005,
386 	abnormalClosure = 1006,
387 	invalidFramePayloadData = 1007,
388 	policyViolation = 1008,
389 	messageTooBig = 1009,
390 	internalError = 1011,
391 	serviceRestart = 1012,
392 	tryAgainLater = 1013,
393 	badGateway = 1014,
394 	tlsHandshake = 1015
395 }
396 
397 string closeReasonString(WebSocketCloseReason reason) @nogc @safe
398 {
399 	import std.math : floor;
400 
401 	//round down to the nearest thousand to get category
402 	switch(cast(short)(cast(float)reason / 1000f).floor)
403 	{
404 		case 0:
405 			return "Reserved and Unused";
406 		case 1:
407 			switch(reason)
408 			{
409 				case 1000:
410 					return "Normal Closure";
411 				case 1001:
412 					return "Going Away";
413 				case 1002:
414 					return "Protocol Error";
415 				case 1003:
416 					return "Unsupported Data";
417 				case 1004:
418 					return "RESERVED";
419 				case 1005:
420 					return "No Status Recvd";
421 				case 1006:
422 					return "Abnormal Closure";
423 				case 1007:
424 					return "Invalid Frame Payload Data";
425 				case 1008:
426 					return "Policy Violation";
427 				case 1009:
428 					return "Message Too Big";
429 				case 1010:
430 					return "Missing Extension";
431 				case 1011:
432 					return "Internal Error";
433 				case 1012:
434 					return "Service Restart";
435 				case 1013:
436 					return "Try Again Later";
437 				case 1014:
438 					return "Bad Gateway";
439 				case 1015:
440 					return "TLS Handshake";
441 				default:
442 					return "RESERVED";
443 			}
444 		case 2:
445 			return "Reserved for extensions";
446 		case 3:
447 			return "Available for frameworks and libraries";
448 		case 4:
449 			return "Available for applications";
450 		default:
451 			return "UNDEFINED - Nasal Demons";
452 	}
453 }
454 
455 unittest
456 {
457 	assert((cast(WebSocketCloseReason)   0).closeReasonString == "Reserved and Unused");
458 	assert((cast(WebSocketCloseReason)   1).closeReasonString == "Reserved and Unused");
459 	assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure");
460 	assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure");
461 	assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED");
462 	assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions");
463 	assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries");
464 	assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications");
465 	assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons");
466 	assert((cast(WebSocketCloseReason)  -1).closeReasonString == "UNDEFINED - Nasal Demons");
467 
468 	//check the other spec cases
469 	for(short i = 1000; i < 1017; i++)
470 	{
471 		if(i == 1004 || i > 1015)
472 		{
473 			assert(
474 				(cast(WebSocketCloseReason)i).closeReasonString == "RESERVED",
475 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
476 			);
477 		}
478 		else
479 			assert(
480 				(cast(WebSocketCloseReason)i).closeReasonString != "RESERVED",
481 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
482 			);
483 	}
484 }
485 
486 
487 /**
488  * Represents a single _WebSocket connection.
489  *
490  * ---
491  * int main (string[] args)
492  * {
493  *   auto taskHandle = runTask(() => connectToWS());
494  *   return runApplication(&args);
495  * }
496  *
497  * void connectToWS ()
498  * {
499  *   auto ws_url = URL("wss://websockets.example.com/websocket/auth_token");
500  *   auto ws = connectWebSocket(ws_url);
501  *   logInfo("WebSocket connected");
502  *
503  *   while (ws.waitForData())
504  *   {
505  *     auto txt = ws.receiveText;
506  *     logInfo("Received: %s", txt);
507  *   }
508  *   logFatal("Connection lost!");
509  * }
510  * ---
511  */
512 final class WebSocket {
513 @safe:
514 
515 	private {
516 		ConnectionStream m_conn;
517 		bool m_sentCloseFrame = false;
518 		IncomingWebSocketMessage m_nextMessage = null;
519 		const HTTPServerRequest m_request;
520 		HTTPServerResponse m_serverResponse;
521 		HTTPClientResponse m_clientResponse;
522 		Task m_reader;
523 		Task m_ownerTask;
524 		InterruptibleTaskMutex m_readMutex, m_writeMutex;
525 		InterruptibleTaskCondition m_readCondition;
526 		Timer m_pingTimer;
527 		uint m_lastPingIndex;
528 		bool m_pongReceived;
529 		short m_closeCode;
530 		const(char)[] m_closeReason;
531 		/// The entropy generator to use
532 		/// If not null, it means this is a server socket.
533 		RandomNumberStream m_rng;
534 	}
535 
536 scope:
537 
538 	/**
539 	 * Private constructor, called from `connectWebSocket`.
540 	 *
541 	 * Params:
542 	 *	 conn = Underlying connection string
543 	 *	 request = HTTP request used to establish the connection
544 	 *	 rng = Source of entropy to use.  If null, assume we're a server socket
545 	 *   client_res = For client sockets, the response object (keeps the http client locked until the socket is done)
546 	 */
547 	private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res)
548 	{
549 		m_ownerTask = Task.getThis();
550 		m_conn = conn;
551 		m_request = request;
552 		m_clientResponse = client_res;
553 		m_serverResponse = server_res;
554 		assert(m_conn);
555 		m_rng = rng;
556 		m_writeMutex = new InterruptibleTaskMutex;
557 		m_readMutex = new InterruptibleTaskMutex;
558 		m_readCondition = new InterruptibleTaskCondition(m_readMutex);
559 		m_readMutex.performLocked!({
560 			// NOTE: Silencing scope warning here - m_reader MUST be stopped
561 			//       before the end of the lifetime of the WebSocket object,
562 			//       which is done in the mandatory call to close().
563 			//       The same goes for m_pingTimer below.
564 			m_reader = () @trusted { return runTask(&startReader); } ();
565 			if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) {
566 				m_pongReceived = true;
567 				m_pingTimer = () @trusted { return setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true); } ();
568 			}
569 		});
570 	}
571 
572 	private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res)
573 	{
574 		this(conn, null, null, rng, client_res);
575 	}
576 
577 	private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res)
578 	{
579 		this(conn, request, res, null, null);
580 	}
581 
582 	/**
583 		Determines if the WebSocket connection is still alive and ready for sending.
584 
585 		Note that for determining the ready state for $(EM reading), you need
586 		to use $(D waitForData) instead, because both methods can return
587 		different values while a disconnect is in proress.
588 
589 		See_also: $(D waitForData)
590 	*/
591 	@property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; }
592 
593 	/**
594 		Returns the close code sent by the remote end.
595 
596 		Note if the connection was never opened, is still alive, or was closed
597 		locally this value will be 0. If no close code was given by the remote
598 		end in the close frame, the value will be 1005. If the connection was
599 		not closed cleanly by the remote end, this value will be 1006.
600 	*/
601 	@property short closeCode() { return m_closeCode; }
602 
603 	/**
604 		Returns the close reason sent by the remote end.
605 
606 		Note if the connection was never opened, is still alive, or was closed
607 		locally this value will be an empty string.
608 	*/
609 	@property const(char)[] closeReason() { return m_closeReason; }
610 
611 	/**
612 		The HTTP request that established the web socket connection.
613 	*/
614 	@property const(HTTPServerRequest) request() const { return m_request; }
615 
616 	/**
617 		Checks if data is readily available for read.
618 	*/
619 	@property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; }
620 
621 	/** Waits until either a message arrives or until the connection is closed.
622 
623 		This function can be used in a read loop to cleanly determine when to stop reading.
624 	*/
625 	bool waitForData()
626 	{
627 		if (m_nextMessage) return true;
628 
629 		m_readMutex.performLocked!({
630 			while (connected && m_nextMessage is null)
631 				m_readCondition.wait();
632 		});
633 		return m_nextMessage !is null;
634 	}
635 
636 	/// ditto
637 	bool waitForData(Duration timeout)
638 	{
639 		import std.datetime;
640 
641 		if (m_nextMessage) return true;
642 
643 		immutable limit_time = Clock.currTime(UTC()) + timeout;
644 
645 		m_readMutex.performLocked!({
646 			while (connected && m_nextMessage is null && timeout > 0.seconds) {
647 				m_readCondition.wait(timeout);
648 				timeout = limit_time - Clock.currTime(UTC());
649 			}
650 		});
651 		return m_nextMessage !is null;
652 	}
653 
654 	/**
655 		Sends a text message.
656 
657 		On the JavaScript side, the text will be available as message.data (type string).
658 
659 		Throws:
660 			A `WebSocketException` is thrown if the connection gets closed
661 			before or during the transfer of the message.
662 	*/
663 	void send(scope const(char)[] data)
664 	{
665 		send(
666 			(scope message) { message.write(cast(const ubyte[])data); },
667 			FrameOpcode.text);
668 	}
669 
670 	/**
671 		Sends a binary message.
672 
673 		On the JavaScript side, the text will be available as message.data (type Blob).
674 
675 		Throws:
676 			A `WebSocketException` is thrown if the connection gets closed
677 			before or during the transfer of the message.
678 	*/
679 	void send(in ubyte[] data)
680 	{
681 		send((scope message){ message.write(data); }, FrameOpcode.binary);
682 	}
683 
684 	/**
685 		Sends a message using an output stream.
686 
687 		Throws:
688 			A `WebSocketException` is thrown if the connection gets closed
689 			before or during the transfer of the message.
690 	*/
691 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode)
692 	{
693 		m_writeMutex.performLocked!({
694 			enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
695 			/*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng);
696 			scope(exit) message.finalize();
697 			sender(message);
698 		});
699 	}
700 
701 	/**
702 		Actively closes the connection.
703 
704 		Params:
705 			code = Numeric code indicating a termination reason.
706 			reason = Message describing why the connection was terminated.
707 	*/
708 	void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "")
709 	{
710 		import std.algorithm.comparison : min;
711 		if(reason !is null && reason.length == 0)
712 			reason = (cast(WebSocketCloseReason)code).closeReasonString;
713 
714 		//control frame payloads are limited to 125 bytes
715 		version(assert)
716 			assert(reason.length <= 123);
717 		else
718 			reason = reason[0 .. min($, 123)];
719 
720 		if (connected) {
721 			try {
722 				send((scope msg) {
723 					m_sentCloseFrame = true;
724 					if (code != 0) {
725 						msg.write(std.bitmanip.nativeToBigEndian(code));
726 						msg.write(cast(const ubyte[])reason);
727 					}
728 				}, FrameOpcode.close);
729 			} catch (Exception e) {
730 				logDiagnostic("Failed to send active web socket close frame: %s", e.msg);
731 			}
732 		}
733 		if (m_pingTimer) m_pingTimer.stop();
734 
735 
736 		if (Task.getThis() == m_ownerTask) {
737 			m_writeMutex.performLocked!({
738 				if (m_clientResponse) {
739 					m_clientResponse.disconnect();
740 					m_clientResponse = HTTPClientResponse.init;
741 				}
742 				if (m_serverResponse) {
743 					m_serverResponse.finalize();
744 					m_serverResponse = HTTPServerResponse.init;
745 				}
746 			});
747 
748 			m_reader.join();
749 
750 			() @trusted { destroy(m_conn); } ();
751 			m_conn = ConnectionStream.init;
752 		}
753 	}
754 
755 	/**
756 		Receives a new message and returns its contents as a newly allocated data array.
757 
758 		Params:
759 			strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise.
760 		Throws: WebSocketException if the connection is closed or
761 			if $(D strict == true) and the frame received is not the right type
762 	*/
763 	ubyte[] receiveBinary(bool strict = true)
764 	{
765 		ubyte[] ret;
766 		receive((scope message){
767 			enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary,
768 				"Expected a binary message, got "~message.frameOpcode.to!string());
769 			ret = message.readAll();
770 		});
771 		return ret;
772 	}
773 	/// ditto
774 	string receiveText(bool strict = true)
775 	{
776 		string ret;
777 		receive((scope message){
778 			enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text,
779 				"Expected a text message, got "~message.frameOpcode.to!string());
780 			ret = message.readAllUTF8();
781 		});
782 		return ret;
783 	}
784 
785 	/**
786 		Receives a new message using an InputStream.
787 		Throws: WebSocketException if the connection is closed.
788 	*/
789 	void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver)
790 	{
791 		m_readMutex.performLocked!({
792 			while (!m_nextMessage) {
793 				enforce!WebSocketException(connected, "Connection closed while reading message.");
794 				m_readCondition.wait();
795 			}
796 			receiver(m_nextMessage);
797 			m_nextMessage = null;
798 			m_readCondition.notifyAll();
799 		});
800 	}
801 
802 	private void startReader()
803 	nothrow {
804 		try m_readMutex.performLocked!({}); //Wait until initialization
805 		catch (Exception e) {
806 			logException(e, "WebSocket reader task failed to wait for initialization");
807 			try m_conn.close();
808 			catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure");
809 			m_closeCode = WebSocketCloseReason.abnormalClosure;
810 			try m_readCondition.notifyAll();
811 			catch (Exception e) assert(false, e.msg);
812 			return;
813 		}
814 
815 		try {
816 			loop:
817 			while (!m_conn.empty) {
818 				assert(!m_nextMessage);
819 				/*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng);
820 
821 				switch (msg.frameOpcode) {
822 					default: throw new WebSocketException("unknown frame opcode");
823 					case FrameOpcode.ping:
824 						send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong);
825 						break;
826 					case FrameOpcode.pong:
827 						// test if pong matches previous ping
828 						if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) {
829 							logDebugV("Received PONG that doesn't match previous ping.");
830 							break;
831 						}
832 						logDebugV("Received matching PONG.");
833 						m_pongReceived = true;
834 						break;
835 					case FrameOpcode.close:
836 						logDebug("Got closing frame (%s)", m_sentCloseFrame);
837 
838 						// If no close code was passed, we default to 1005
839 						this.m_closeCode = WebSocketCloseReason.noStatusReceived;
840 
841 						// If provided in the frame, attempt to parse the close code/reason
842 						if (msg.peek().length >= short.sizeof) {
843 							this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);
844 
845 							if (msg.peek().length > short.sizeof) {
846 								this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
847 							}
848 						}
849 
850 						if(!m_sentCloseFrame) close();
851 						logDebug("Terminating connection (%s)", m_sentCloseFrame);
852 						break loop;
853 					case FrameOpcode.text:
854 					case FrameOpcode.binary:
855 					case FrameOpcode.continuation: // FIXME: add proper support for continuation frames!
856 						m_readMutex.performLocked!({
857 							m_nextMessage = msg;
858 							m_readCondition.notifyAll();
859 							while (m_nextMessage) m_readCondition.wait();
860 						});
861 						break;
862 				}
863 			}
864 		} catch (Exception e) {
865 			logDiagnostic("Error while reading websocket message: %s", e.msg);
866 			logDiagnostic("Closing connection.");
867 		}
868 
869 		// If no close code was passed, e.g. this was an unclean termination
870 		//  of our websocket connection, set the close code to 1006.
871 		if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure;
872 
873 		try m_conn.close();
874 		catch (Exception e) logException(e, "Failed to close WebSocket connection");
875 		try m_readCondition.notifyAll();
876 		catch (Exception e) assert(false, e.msg);
877 	}
878 
879 	private void sendPing()
880 	nothrow {
881 		try {
882 			if (!m_pongReceived) {
883 				logDebug("Pong skipped. Closing connection.");
884 				close();
885 				m_pingTimer.stop();
886 				return;
887 			}
888 			m_pongReceived = false;
889 			send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
890 			logDebugV("Ping sent");
891 		} catch (Exception e) {
892 			logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
893 		}
894 	}
895 }
896 
897 /**
898 	Represents a single outgoing _WebSocket message as an OutputStream.
899 */
900 final class OutgoingWebSocketMessage : OutputStream {
901 @safe:
902 	private {
903 		RandomNumberStream m_rng;
904 		Stream m_conn;
905 		FrameOpcode m_frameOpcode;
906 		Appender!(ubyte[]) m_buffer;
907 		bool m_finalized = false;
908 	}
909 
910 	private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
911 	{
912 		assert(conn !is null);
913 		m_conn = conn;
914 		m_frameOpcode = frameOpcode;
915 		m_rng = rng;
916 	}
917 
918 	static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) {
919 		override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
920 	} else {
921 		override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
922 	}
923 
924 	alias write = OutputStream.write;
925 
926 	private size_t doWrite(scope const(ubyte)[] bytes, IOMode mode)
927 	{
928 		assert(!m_finalized);
929 
930 		if (!m_buffer.data.length) {
931 			ubyte[Frame.maxHeaderSize] header_padding;
932 			m_buffer.put(header_padding[]);
933 		}
934 
935 		m_buffer.put(bytes);
936 		return bytes.length;
937 	}
938 
939 	void flush()
940 	{
941 		assert(!m_finalized);
942 		if (m_buffer.data.length > 0)
943 			sendFrame(false);
944 	}
945 
946 	void finalize()
947 	{
948 		if (m_finalized) return;
949 		m_finalized = true;
950 		sendFrame(true);
951 	}
952 
953 	private void sendFrame(bool fin)
954 	{
955 		if (!m_buffer.data.length)
956 			write(null, IOMode.once);
957 
958 		assert(m_buffer.data.length >= Frame.maxHeaderSize);
959 
960 		Frame frame;
961 		frame.fin = fin;
962 		frame.opcode = m_frameOpcode;
963 		frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
964 		auto hsize = frame.getHeaderSize(m_rng !is null);
965 		auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
966 		frame.writeHeader(msg[0 .. hsize], m_rng);
967 		m_conn.write(msg);
968 		m_conn.flush();
969 		m_buffer.clear();
970 	}
971 
972 	alias write = OutputStream.write;
973 }
974 
975 
976 /**
977 	Represents a single incoming _WebSocket message as an InputStream.
978 */
979 final class IncomingWebSocketMessage : InputStream {
980 @safe:
981 	private {
982 		RandomNumberStream m_rng;
983 		Stream m_conn;
984 		Frame m_currentFrame;
985 	}
986 
987 	private this(Stream conn, RandomNumberStream rng)
988 	{
989 		assert(conn !is null);
990 		m_conn = conn;
991 		m_rng = rng;
992 		skipFrame(); // reads the first frame
993 	}
994 
995 	@property bool empty() const { return m_currentFrame.payload.length == 0; }
996 
997 	@property ulong leastSize() const { return m_currentFrame.payload.length; }
998 
999 	@property bool dataAvailableForRead() { return true; }
1000 
1001 	/// The frame type for this nessage;
1002 	@property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; }
1003 
1004 	const(ubyte)[] peek() { return m_currentFrame.payload; }
1005 
1006 	/**
1007 	 * Retrieve the next websocket frame of the stream and discard the current
1008 	 * one
1009 	 *
1010 	 * This function is helpful if one wish to process frames by frames,
1011 	 * or minimize memory allocation, as `peek` will only return the current
1012 	 * frame data, and read requires a pre-allocated buffer.
1013 	 *
1014 	 * Returns:
1015 	 * `false` if the current frame is the final one, `true` if a new frame
1016 	 * was read.
1017 	 */
1018 	bool skipFrame()
1019 	{
1020 		if (m_currentFrame.fin)
1021 			return false;
1022 
1023 		m_currentFrame = Frame.readFrame(m_conn);
1024 		return true;
1025 	}
1026 
1027 	size_t read(scope ubyte[] dst, IOMode mode)
1028 	{
1029 		size_t nread = 0;
1030 
1031 		while (dst.length > 0) {
1032 			enforce!WebSocketException(!empty , "cannot read from empty stream");
1033 			enforce!WebSocketException(leastSize > 0, "no data available" );
1034 
1035 			import std.algorithm : min;
1036 			auto sz = cast(size_t)min(leastSize, dst.length);
1037 			dst[0 .. sz] = m_currentFrame.payload[0 .. sz];
1038 			dst = dst[sz .. $];
1039 			m_currentFrame.payload = m_currentFrame.payload[sz .. $];
1040 			nread += sz;
1041 
1042 			if (leastSize == 0) {
1043 				if (mode == IOMode.immediate || mode == IOMode.once && nread > 0)
1044 					break;
1045 				this.skipFrame();
1046 			}
1047 		}
1048 
1049 		return nread;
1050 	}
1051 
1052 	alias read = InputStream.read;
1053 }
1054 
1055 /// Magic string defined by the RFC for challenging the server during upgrade
1056 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1057 
1058 
1059 /**
1060  * The Opcode is 4 bits, as defined in Section 5.2
1061  *
1062  * Values are defined in section 11.8
1063  * Currently only 6 values are defined, however the opcode is defined as
1064  * taking 4 bits.
1065  */
1066 public enum FrameOpcode : ubyte {
1067 	continuation = 0x0,
1068 	text = 0x1,
1069 	binary = 0x2,
1070 	close = 0x8,
1071 	ping = 0x9,
1072 	pong = 0xA
1073 }
1074 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits");
1075 
1076 
1077 private struct Frame {
1078 @safe:
1079 	enum maxHeaderSize = 14;
1080 
1081 	bool fin;
1082 	FrameOpcode opcode;
1083 	ubyte[] payload;
1084 
1085     /**
1086      * Return the header length encoded with the expected amount of bits
1087      *
1088      * The WebSocket RFC define a variable-length payload length.
1089      * In short, it means that:
1090      * - If the length is <= 125, it is stored as the 7 least significant
1091      *   bits of the second header byte.  The first bit is reserved for MASK.
1092      * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of
1093      *   126 is stored in the aforementioned 7 bits, and the actual length
1094      *   is stored in the next two bytes, resulting in a 4 bytes header
1095      *   ( + masking key, if any).
1096      * - If the length is > 65_536, a magic value of 127 will be used for
1097      *   the 7-bit field, and the next 8 bytes are expected to be the length,
1098      *   resulting in a 10 bytes header ( + masking key, if any).
1099      *
1100      * Those functions encapsulate all this logic and allow to just get the
1101      * length with the desired size.
1102      *
1103      * Return:
1104      * - For `ubyte`, the value to store in the 7 bits field, either the
1105      *   length or a magic value (126 or 127).
1106      * - For `ushort`, a value in the range [126; 65_536].
1107      *   If payload.length is not in this bound, an assertion will be triggered.
1108      * - For `ulong`, a value in the range [65_537; size_t.max].
1109      *   If payload.length is not in this bound, an assertion will be triggered.
1110      */
1111 	size_t getHeaderSize(bool mask)
1112 	{
1113 		size_t ret = 1;
1114 		if (payload.length < 126) ret += 1;
1115 		else if (payload.length < 65536) ret += 3;
1116 		else ret += 9;
1117 		if (mask) ret += 4;
1118 		return ret;
1119 	}
1120 
1121 	void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
1122 	{
1123 		ubyte[4] buff;
1124 		ubyte firstByte = cast(ubyte)opcode;
1125 		if (fin) firstByte |= 0x80;
1126 		dst[0] = firstByte;
1127 		dst = dst[1 .. $];
1128 
1129 		auto b1 = sys_rng ? 0x80 : 0x00;
1130 
1131 		if (payload.length < 126) {
1132 			dst[0] = cast(ubyte)(b1 | payload.length);
1133 			dst = dst[1 .. $];
1134 		} else if (payload.length < 65536) {
1135 			dst[0] = cast(ubyte) (b1 | 126);
1136 			dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
1137 			dst = dst[3 .. $];
1138 		} else {
1139 			dst[0] = cast(ubyte) (b1 | 127);
1140 			dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
1141 			dst = dst[9 .. $];
1142 		}
1143 
1144 		if (sys_rng) {
1145 			sys_rng.read(dst[0 .. 4]);
1146 			for (size_t i = 0; i < payload.length; i++)
1147 				payload[i] ^= dst[i % 4];
1148 		}
1149 	}
1150 
1151 	static Frame readFrame(InputStream stream)
1152 	{
1153 		Frame frame;
1154 		ubyte[8] data;
1155 
1156 		stream.read(data[0 .. 2]);
1157 		frame.fin = (data[0] & 0x80) != 0;
1158 		frame.opcode = cast(FrameOpcode)(data[0] & 0x0F);
1159 
1160 		bool masked = !!(data[1] & 0b1000_0000);
1161 
1162 		//parsing length
1163 		ulong length = data[1] & 0b0111_1111;
1164 		if (length == 126) {
1165 			stream.read(data[0 .. 2]);
1166 			length = bigEndianToNative!ushort(data[0 .. 2]);
1167 		} else if (length == 127) {
1168 			stream.read(data);
1169 			length = bigEndianToNative!ulong(data);
1170 
1171 			// RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes
1172 			// interpreted as a 64-bit unsigned integer (the most significant
1173 			// bit MUST be 0)
1174 			enforce!WebSocketException(!(length >> 63),
1175 				"Received length has a non-zero most significant bit");
1176 
1177 		}
1178 		logDebug("Read frame: %s %s %s length=%d",
1179 				 frame.opcode,
1180 				 frame.fin ? "final frame" : "continuation",
1181 				 masked ? "masked" : "not masked",
1182 				 length);
1183 
1184 		// Masking key is 32 bits / uint
1185 		if (masked)
1186 			stream.read(data[0 .. 4]);
1187 
1188 		// Read payload
1189 		// TODO: Provide a way to limit the size read, easy
1190 		// DOS for server code here (rejectedsoftware/vibe.d#1496).
1191 		enforce!WebSocketException(length <= size_t.max);
1192 		frame.payload = new ubyte[](cast(size_t)length);
1193 		stream.read(frame.payload);
1194 
1195 		//de-masking
1196 		if (masked)
1197 			foreach (size_t i; 0 .. cast(size_t)length)
1198 				frame.payload[i] = frame.payload[i] ^ data[i % 4];
1199 
1200 		return frame;
1201 	}
1202 }
1203 
1204 unittest {
1205 	import std.algorithm.searching : all;
1206 
1207 	final class DummyRNG : RandomNumberStream {
1208 	@safe:
1209 		@property bool empty() { return false; }
1210 		@property ulong leastSize() { return ulong.max; }
1211 		@property bool dataAvailableForRead() { return true; }
1212 		const(ubyte)[] peek() { return null; }
1213 		size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; }
1214 		alias read = RandomNumberStream.read;
1215 	}
1216 
1217 	ubyte[14] hdrbuf;
1218 	auto rng = new DummyRNG;
1219 
1220 	Frame f;
1221 	f.payload = new ubyte[125];
1222 
1223 	assert(f.getHeaderSize(false) == 2);
1224 	hdrbuf[] = 0;
1225 	f.writeHeader(hdrbuf[0 .. 2], null);
1226 	assert(hdrbuf[0 .. 2] == [0, 125]);
1227 
1228 	assert(f.getHeaderSize(true) == 6);
1229 	hdrbuf[] = 0;
1230 	f.writeHeader(hdrbuf[0 .. 6], rng);
1231 	assert(hdrbuf[0 .. 2] == [0, 128|125]);
1232 	assert(hdrbuf[2 .. 6].all!(b => b == 13));
1233 
1234 	f.payload = new ubyte[126];
1235 	assert(f.getHeaderSize(false) == 4);
1236 	hdrbuf[] = 0;
1237 	f.writeHeader(hdrbuf[0 .. 4], null);
1238 	assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]);
1239 
1240 	assert(f.getHeaderSize(true) == 8);
1241 	hdrbuf[] = 0;
1242 	f.writeHeader(hdrbuf[0 .. 8], rng);
1243 	assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]);
1244 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1245 
1246 	f.payload = new ubyte[65535];
1247 	assert(f.getHeaderSize(false) == 4);
1248 	hdrbuf[] = 0;
1249 	f.writeHeader(hdrbuf[0 .. 4], null);
1250 	assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]);
1251 
1252 	assert(f.getHeaderSize(true) == 8);
1253 	hdrbuf[] = 0;
1254 	f.writeHeader(hdrbuf[0 .. 8], rng);
1255 	assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]);
1256 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1257 
1258 	f.payload = new ubyte[65536];
1259 	assert(f.getHeaderSize(false) == 10);
1260 	hdrbuf[] = 0;
1261 	f.writeHeader(hdrbuf[0 .. 10], null);
1262 	assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]);
1263 
1264 	assert(f.getHeaderSize(true) == 14);
1265 	hdrbuf[] = 0;
1266 	f.writeHeader(hdrbuf[0 .. 14], rng);
1267 	assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]);
1268 	assert(hdrbuf[10 .. 14].all!(b => b == 13));
1269 }
1270 
1271 /**
1272  * Generate a challenge key for the protocol upgrade phase.
1273  */
1274 private string generateChallengeKey(RandomNumberStream rng)
1275 {
1276 	ubyte[16] buffer;
1277 	rng.read(buffer);
1278 	return Base64.encode(buffer);
1279 }
1280 
1281 private string computeAcceptKey(string challengekey)
1282 {
1283 	immutable(ubyte)[] b = challengekey.representation;
1284 	immutable(ubyte)[] a = s_webSocketGuid.representation;
1285 	SHA1 hash;
1286 	hash.start();
1287 	hash.put(b);
1288 	hash.put(a);
1289 	auto result = Base64.encode(hash.finish());
1290 	return to!(string)(result);
1291 }