1 /**
2 	Implements WebSocket support and fallbacks for older browsers.
3 
4 	Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
5 	Copyright: © 2012-2014 Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Jan Krüger
8 */
9 module vibe.http.websockets;
10 
11 ///
12 @safe unittest {
13 	void handleConn(scope WebSocket sock)
14 	{
15 		// simple echo server
16 		while (sock.connected) {
17 			auto msg = sock.receiveText();
18 			sock.send(msg);
19 		}
20 	}
21 
22 	void startServer()
23 	{
24 		import vibe.http.router;
25 		auto router = new URLRouter;
26 		router.get("/ws", handleWebSockets(&handleConn));
27 
28 		// Start HTTP server using listenHTTP()...
29 	}
30 }
31 
32 import vibe.core.core;
33 import vibe.core.log;
34 import vibe.core.net;
35 import vibe.core.sync;
36 import vibe.stream.operations;
37 import vibe.http.server;
38 import vibe.http.client;
39 import vibe.core.connectionpool;
40 import vibe.utils.array;
41 
42 import core.time;
43 import std.algorithm: equal, splitter;
44 import std.array;
45 import std.base64;
46 import std.conv;
47 import std.exception;
48 import std.bitmanip;
49 import std.digest.sha;
50 import std.string;
51 import std.functional;
52 import std.uuid;
53 import std.base64;
54 import std.digest.sha;
55 import std.uni: asLowerCase;
56 import vibe.crypto.cryptorand;
57 
58 @safe:
59 
60 
61 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow;
62 
63 
64 /// Exception thrown by $(D vibe.http.websockets).
65 class WebSocketException: Exception
66 {
67 	@safe pure nothrow:
68 
69 	///
70 	this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
71 	{
72 		super(msg, file, line, next);
73 	}
74 
75 	///
76 	this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__)
77 	{
78 		super(msg, next, file, line);
79 	}
80 }
81 
82 /** Establishes a WebSocket connection at the specified endpoint.
83 */
84 WebSocket connectWebSocketEx(URL url,
85 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
86 	const(HTTPClientSettings) settings = defaultSettings)
87 @safe {
88 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
89 	url.schema = use_tls ? "https" : "http";
90 
91 	auto rng = secureRNG();
92 	auto challengeKey = generateChallengeKey(rng);
93 	auto answerKey = computeAcceptKey(challengeKey);
94 	auto res = requestHTTP(url, (scope req){
95 		req.method = HTTPMethod.GET;
96 		req.headers["Upgrade"] = "websocket";
97 		req.headers["Connection"] = "Upgrade";
98 		req.headers["Sec-WebSocket-Version"] = "13";
99 		req.headers["Sec-WebSocket-Key"] = challengeKey;
100 		request_modifier(req);
101 	}, settings);
102 
103 	enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
104 
105 	auto key = "sec-websocket-accept" in res.headers;
106 	enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
107 	enforce(*key == answerKey, "Response has wrong accept key");
108 	auto conn = res.switchProtocol("websocket");
109 	return new WebSocket(conn, rng, res);
110 }
111 
112 /// ditto
113 void connectWebSocketEx(URL url,
114 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
115 	scope WebSocketHandshakeDelegate del,
116 	const(HTTPClientSettings) settings = defaultSettings)
117 @safe {
118 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
119 	url.schema = use_tls ? "https" : "http";
120 
121 	/*scope*/auto rng = secureRNG();
122 	auto challengeKey = generateChallengeKey(rng);
123 	auto answerKey = computeAcceptKey(challengeKey);
124 
125 	requestHTTP(url,
126 		(scope req) {
127 			req.method = HTTPMethod.GET;
128 			req.headers["Upgrade"] = "websocket";
129 			req.headers["Connection"] = "Upgrade";
130 			req.headers["Sec-WebSocket-Version"] = "13";
131 			req.headers["Sec-WebSocket-Key"] = challengeKey;
132 			request_modifier(req);
133 		},
134 		(scope res) {
135 			enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
136 			auto key = "sec-websocket-accept" in res.headers;
137 			enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
138 			enforce(*key == answerKey, "Response has wrong accept key");
139 			res.switchProtocol("websocket", (scope conn) @trusted {
140 				scope ws = new WebSocket(conn, rng, res);
141 				del(ws);
142 				if (ws.connected) ws.close();
143 			});
144 		},
145 		settings
146 	);
147 }
148 
149 /// ditto
150 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings)
151 @safe {
152 	return connectWebSocketEx(url, (scope req) {}, settings);
153 }
154 /// ditto
155 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings)
156 @safe {
157 	connectWebSocketEx(url, (scope req) {}, del, settings);
158 }
159 /// ditto
160 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings)
161 @system {
162 	connectWebSocket(url, (scope ws) nothrow {
163 		try del(ws);
164 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
165 	}, settings);
166 }
167 /// Scheduled for deprecation - use a `@safe` callback instead.
168 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings)
169 @system {
170 	connectWebSocket(url, (scope ws) @trusted => del(ws), settings);
171 }
172 /// Scheduled for deprecation - use a `nothrow` callback instead.
173 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings)
174 @safe {
175 	connectWebSocket(url, (scope ws) nothrow {
176 		try del(ws);
177 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
178 	}, settings);
179 }
180 
181 
182 /**
183 	Establishes a web socket conection and passes it to the $(D on_handshake) delegate.
184 */
185 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
186 @safe {
187 	auto pUpgrade = "Upgrade" in req.headers;
188 	auto pConnection = "Connection" in req.headers;
189 	auto pKey = "Sec-WebSocket-Key" in req.headers;
190 	//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
191 	auto pVersion = "Sec-WebSocket-Version" in req.headers;
192 
193 	auto isUpgrade = false;
194 
195 	if( pConnection ) {
196 		auto connectionTypes = splitter(*pConnection, ",");
197 		foreach( t ; connectionTypes ) {
198 			if( t.strip().asLowerCase().equal("upgrade") ) {
199 				isUpgrade = true;
200 				break;
201 			}
202 		}
203 	}
204 
205 	string req_error;
206 	if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests.";
207 	else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header.";
208 	else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported.";
209 	else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header.";
210 
211 	if (req_error.length) {
212 		logDebug("Browser sent invalid WebSocket request: %s", req_error);
213 		res.statusCode = HTTPStatus.badRequest;
214 		res.writeBody(req_error);
215 		return;
216 	}
217 
218 	auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
219 	res.headers["Sec-WebSocket-Accept"] = accept;
220 	res.headers["Connection"] = "Upgrade";
221 	ConnectionStream conn = res.switchProtocol("websocket");
222 
223 	WebSocket socket = new WebSocket(conn, req, res);
224 	try {
225 		on_handshake(socket);
226 	} catch (Exception e) {
227 		logDiagnostic("WebSocket handler failed: %s", e.msg);
228 	}
229 	socket.close();
230 }
231 /// Scheduled for deprecation - use a `@safe` callback instead.
232 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
233 @system {
234 	handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res);
235 }
236 /// Scheduled for deprecation - use a `nothrow` callback instead.
237 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
238 {
239 	handleWebSocket((scope ws) nothrow {
240 		try on_handshake(ws);
241 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
242 	}, req, res);
243 }
244 /// ditto
245 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
246 @system {
247 	handleWebSocket((scope ws) nothrow {
248 		try on_handshake(ws);
249 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
250 	}, req, res);
251 }
252 
253 
254 /**
255 	Returns a HTTP request handler that establishes web socket conections.
256 */
257 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake)
258 @safe {
259 	return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ());
260 }
261 /// ditto
262 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake)
263 @safe {
264 	void callback(scope HTTPServerRequest req, scope HTTPServerResponse res)
265 	@safe {
266 		auto pUpgrade = "Upgrade" in req.headers;
267 		auto pConnection = "Connection" in req.headers;
268 		auto pKey = "Sec-WebSocket-Key" in req.headers;
269 		//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
270 		auto pVersion = "Sec-WebSocket-Version" in req.headers;
271 
272 		auto isUpgrade = false;
273 
274 		if( pConnection ) {
275 			auto connectionTypes = splitter(*pConnection, ",");
276 			foreach( t ; connectionTypes ) {
277 				if( t.strip().asLowerCase().equal("upgrade") ) {
278 					isUpgrade = true;
279 					break;
280 				}
281 			}
282 		}
283 		if( !(isUpgrade &&
284 			  pUpgrade && icmp(*pUpgrade, "websocket") == 0 &&
285 			  pKey &&
286 			  pVersion && *pVersion == "13") )
287 		{
288 			logDebug("Browser sent invalid WebSocket request.");
289 			res.statusCode = HTTPStatus.badRequest;
290 			res.writeVoidBody();
291 			return;
292 		}
293 
294 		auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
295 		res.headers["Sec-WebSocket-Accept"] = accept;
296 		res.headers["Connection"] = "Upgrade";
297 		res.switchProtocol("websocket", (scope conn) {
298 			// TODO: put back 'scope' once it is actually enforced by DMD
299 			/*scope*/ auto socket = new WebSocket(conn, req, res);
300 			try on_handshake(socket);
301 			catch (Exception e) {
302 				logDiagnostic("WebSocket handler failed: %s", e.msg);
303 			}
304 			socket.close();
305 		});
306 	}
307 	return &callback;
308 }
309 /// Scheduled for deprecation - use a `@safe` callback instead.
310 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake)
311 @system {
312 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
313 }
314 /// Scheduled for deprecation - use a `@safe` callback instead.
315 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake)
316 @system {
317 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
318 }
319 /// Scheduled for deprecation - use a `nothrow` callback instead.
320 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake)
321 {
322 	return handleWebSockets(delegate (scope ws) nothrow {
323 		try on_handshake(ws);
324 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
325 	});
326 }
327 /// ditto
328 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake)
329 {
330 	return handleWebSockets(delegate (scope ws) nothrow {
331 		try on_handshake(ws);
332 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
333 	});
334 }
335 /// ditto
336 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake)
337 @system {
338 	return handleWebSockets(delegate (scope ws) nothrow {
339 		try on_handshake(ws);
340 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
341 	});
342 }
343 /// ditto
344 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake)
345 @system {
346 	return handleWebSockets(delegate (scope ws) nothrow {
347 		try on_handshake(ws);
348 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
349 	});
350 }
351 
352 /**
353  * Provides the reason that a websocket connection has closed.
354  *
355  * Further documentation for the WebSocket and it's codes can be found from:
356  * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
357  *
358  * ---
359  *
360  * void echoSocket(scope WebSocket sock)
361  * {
362  *   import std.datetime : seconds;
363  *
364  *   while(sock.waitForData(3.seconds))
365  *   {
366  *     string msg = sock.receiveText;
367  *     logInfo("Got a message: %s", msg);
368  *     sock.send(msg);
369  *   }
370  *
371  *   if(sock.connected)
372  *     sock.close(WebSocketCloseReason.policyViolation, "timeout");
373  * }
374  * ---
375  */
376 enum WebSocketCloseReason : short
377 {
378 	none = 0,
379 	normalClosure = 1000,
380 	goingAway = 1001,
381 	protocolError = 1002,
382 	unsupportedData = 1003,
383 	noStatusReceived = 1005,
384 	abnormalClosure = 1006,
385 	invalidFramePayloadData = 1007,
386 	policyViolation = 1008,
387 	messageTooBig = 1009,
388 	internalError = 1011,
389 	serviceRestart = 1012,
390 	tryAgainLater = 1013,
391 	badGateway = 1014,
392 	tlsHandshake = 1015
393 }
394 
395 string closeReasonString(WebSocketCloseReason reason) @nogc @safe
396 {
397 	import std.math : floor;
398 
399 	//round down to the nearest thousand to get category
400 	switch(cast(short)(cast(float)reason / 1000f).floor)
401 	{
402 		case 0:
403 			return "Reserved and Unused";
404 		case 1:
405 			switch(reason)
406 			{
407 				case 1000:
408 					return "Normal Closure";
409 				case 1001:
410 					return "Going Away";
411 				case 1002:
412 					return "Protocol Error";
413 				case 1003:
414 					return "Unsupported Data";
415 				case 1004:
416 					return "RESERVED";
417 				case 1005:
418 					return "No Status Recvd";
419 				case 1006:
420 					return "Abnormal Closure";
421 				case 1007:
422 					return "Invalid Frame Payload Data";
423 				case 1008:
424 					return "Policy Violation";
425 				case 1009:
426 					return "Message Too Big";
427 				case 1010:
428 					return "Missing Extension";
429 				case 1011:
430 					return "Internal Error";
431 				case 1012:
432 					return "Service Restart";
433 				case 1013:
434 					return "Try Again Later";
435 				case 1014:
436 					return "Bad Gateway";
437 				case 1015:
438 					return "TLS Handshake";
439 				default:
440 					return "RESERVED";
441 			}
442 		case 2:
443 			return "Reserved for extensions";
444 		case 3:
445 			return "Available for frameworks and libraries";
446 		case 4:
447 			return "Available for applications";
448 		default:
449 			return "UNDEFINED - Nasal Demons";
450 	}
451 }
452 
453 unittest
454 {
455 	assert((cast(WebSocketCloseReason)   0).closeReasonString == "Reserved and Unused");
456 	assert((cast(WebSocketCloseReason)   1).closeReasonString == "Reserved and Unused");
457 	assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure");
458 	assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure");
459 	assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED");
460 	assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions");
461 	assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries");
462 	assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications");
463 	assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons");
464 	assert((cast(WebSocketCloseReason)  -1).closeReasonString == "UNDEFINED - Nasal Demons");
465 
466 	//check the other spec cases
467 	for(short i = 1000; i < 1017; i++)
468 	{
469 		if(i == 1004 || i > 1015)
470 		{
471 			assert(
472 				(cast(WebSocketCloseReason)i).closeReasonString == "RESERVED",
473 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
474 			);
475 		}
476 		else
477 			assert(
478 				(cast(WebSocketCloseReason)i).closeReasonString != "RESERVED",
479 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
480 			);
481 	}
482 }
483 
484 
485 /**
486  * Represents a single _WebSocket connection.
487  *
488  * ---
489  * int main (string[] args)
490  * {
491  *   auto taskHandle = runTask(() => connectToWS());
492  *   return runApplication(&args);
493  * }
494  *
495  * void connectToWS ()
496  * {
497  *   auto ws_url = URL("wss://websockets.example.com/websocket/auth_token");
498  *   auto ws = connectWebSocket(ws_url);
499  *   logInfo("WebSocket connected");
500  *
501  *   while (ws.waitForData())
502  *   {
503  *     auto txt = ws.receiveText;
504  *     logInfo("Received: %s", txt);
505  *   }
506  *   logFatal("Connection lost!");
507  * }
508  * ---
509  */
510 final class WebSocket {
511 @safe:
512 
513 	private {
514 		ConnectionStream m_conn;
515 		bool m_sentCloseFrame = false;
516 		IncomingWebSocketMessage m_nextMessage = null;
517 		const HTTPServerRequest m_request;
518 		HTTPServerResponse m_serverResponse;
519 		HTTPClientResponse m_clientResponse;
520 		Task m_reader;
521 		Task m_ownerTask;
522 		InterruptibleTaskMutex m_readMutex, m_writeMutex;
523 		InterruptibleTaskCondition m_readCondition;
524 		Timer m_pingTimer;
525 		uint m_lastPingIndex;
526 		bool m_pongReceived;
527 		short m_closeCode;
528 		const(char)[] m_closeReason;
529 		/// The entropy generator to use
530 		/// If not null, it means this is a server socket.
531 		RandomNumberStream m_rng;
532 	}
533 
534 	/**
535 	 * Private constructor, called from `connectWebSocket`.
536 	 *
537 	 * Params:
538 	 *	 conn = Underlying connection string
539 	 *	 request = HTTP request used to establish the connection
540 	 *	 rng = Source of entropy to use.  If null, assume we're a server socket
541 	 *   client_res = For client sockets, the response object (keeps the http client locked until the socket is done)
542 	 */
543 	private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res)
544 	{
545 		m_ownerTask = Task.getThis();
546 		m_conn = conn;
547 		m_request = request;
548 		m_clientResponse = client_res;
549 		m_serverResponse = server_res;
550 		assert(m_conn);
551 		m_rng = rng;
552 		m_writeMutex = new InterruptibleTaskMutex;
553 		m_readMutex = new InterruptibleTaskMutex;
554 		m_readCondition = new InterruptibleTaskCondition(m_readMutex);
555 		m_readMutex.performLocked!({
556 			m_reader = runTask(&startReader);
557 			if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) {
558 				m_pongReceived = true;
559 				m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true);
560 			}
561 		});
562 	}
563 
564 	private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res)
565 	{
566 		this(conn, null, null, rng, client_res);
567 	}
568 
569 	private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res)
570 	{
571 		this(conn, request, res, null, null);
572 	}
573 
574 	/**
575 		Determines if the WebSocket connection is still alive and ready for sending.
576 
577 		Note that for determining the ready state for $(EM reading), you need
578 		to use $(D waitForData) instead, because both methods can return
579 		different values while a disconnect is in proress.
580 
581 		See_also: $(D waitForData)
582 	*/
583 	@property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; }
584 
585 	/**
586 		Returns the close code sent by the remote end.
587 
588 		Note if the connection was never opened, is still alive, or was closed
589 		locally this value will be 0. If no close code was given by the remote
590 		end in the close frame, the value will be 1005. If the connection was
591 		not closed cleanly by the remote end, this value will be 1006.
592 	*/
593 	@property short closeCode() { return m_closeCode; }
594 
595 	/**
596 		Returns the close reason sent by the remote end.
597 
598 		Note if the connection was never opened, is still alive, or was closed
599 		locally this value will be an empty string.
600 	*/
601 	@property const(char)[] closeReason() { return m_closeReason; }
602 
603 	/**
604 		The HTTP request that established the web socket connection.
605 	*/
606 	@property const(HTTPServerRequest) request() const { return m_request; }
607 
608 	/**
609 		Checks if data is readily available for read.
610 	*/
611 	@property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; }
612 
613 	/** Waits until either a message arrives or until the connection is closed.
614 
615 		This function can be used in a read loop to cleanly determine when to stop reading.
616 	*/
617 	bool waitForData()
618 	{
619 		if (m_nextMessage) return true;
620 
621 		m_readMutex.performLocked!({
622 			while (connected && m_nextMessage is null)
623 				m_readCondition.wait();
624 		});
625 		return m_nextMessage !is null;
626 	}
627 
628 	/// ditto
629 	bool waitForData(Duration timeout)
630 	{
631 		import std.datetime;
632 
633 		if (m_nextMessage) return true;
634 
635 		immutable limit_time = Clock.currTime(UTC()) + timeout;
636 
637 		m_readMutex.performLocked!({
638 			while (connected && m_nextMessage is null && timeout > 0.seconds) {
639 				m_readCondition.wait(timeout);
640 				timeout = limit_time - Clock.currTime(UTC());
641 			}
642 		});
643 		return m_nextMessage !is null;
644 	}
645 
646 	/**
647 		Sends a text message.
648 
649 		On the JavaScript side, the text will be available as message.data (type string).
650 
651 		Throws:
652 			A `WebSocketException` is thrown if the connection gets closed
653 			before or during the transfer of the message.
654 	*/
655 	void send(scope const(char)[] data)
656 	{
657 		send(
658 			(scope message) { message.write(cast(const ubyte[])data); },
659 			FrameOpcode.text);
660 	}
661 
662 	/**
663 		Sends a binary message.
664 
665 		On the JavaScript side, the text will be available as message.data (type Blob).
666 
667 		Throws:
668 			A `WebSocketException` is thrown if the connection gets closed
669 			before or during the transfer of the message.
670 	*/
671 	void send(in ubyte[] data)
672 	{
673 		send((scope message){ message.write(data); }, FrameOpcode.binary);
674 	}
675 
676 	/**
677 		Sends a message using an output stream.
678 
679 		Throws:
680 			A `WebSocketException` is thrown if the connection gets closed
681 			before or during the transfer of the message.
682 	*/
683 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode)
684 	{
685 		m_writeMutex.performLocked!({
686 			enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
687 			/*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng);
688 			scope(exit) message.finalize();
689 			sender(message);
690 		});
691 	}
692 
693 	/**
694 		Actively closes the connection.
695 
696 		Params:
697 			code = Numeric code indicating a termination reason.
698 			reason = Message describing why the connection was terminated.
699 	*/
700 	void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "")
701 	{
702 		import std.algorithm.comparison : min;
703 		if(reason !is null && reason.length == 0)
704 			reason = (cast(WebSocketCloseReason)code).closeReasonString;
705 
706 		//control frame payloads are limited to 125 bytes
707 		version(assert)
708 			assert(reason.length <= 123);
709 		else
710 			reason = reason[0 .. min($, 123)];
711 
712 		if (connected) {
713 			try {
714 				send((scope msg) {
715 					m_sentCloseFrame = true;
716 					if (code != 0) {
717 						msg.write(std.bitmanip.nativeToBigEndian(code));
718 						msg.write(cast(const ubyte[])reason);
719 					}
720 				}, FrameOpcode.close);
721 			} catch (Exception e) {
722 				logDiagnostic("Failed to send active web socket close frame: %s", e.msg);
723 			}
724 		}
725 		if (m_pingTimer) m_pingTimer.stop();
726 
727 
728 		if (Task.getThis() == m_ownerTask) {
729 			m_writeMutex.performLocked!({
730 				if (m_clientResponse) {
731 					m_clientResponse.disconnect();
732 					m_clientResponse = HTTPClientResponse.init;
733 				}
734 				if (m_serverResponse) {
735 					m_serverResponse.finalize();
736 					m_serverResponse = HTTPServerResponse.init;
737 				}
738 			});
739 
740 			m_reader.join();
741 
742 			() @trusted { destroy(m_conn); } ();
743 			m_conn = ConnectionStream.init;
744 		}
745 	}
746 
747 	/**
748 		Receives a new message and returns its contents as a newly allocated data array.
749 
750 		Params:
751 			strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise.
752 		Throws: WebSocketException if the connection is closed or
753 			if $(D strict == true) and the frame received is not the right type
754 	*/
755 	ubyte[] receiveBinary(bool strict = true)
756 	{
757 		ubyte[] ret;
758 		receive((scope message){
759 			enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary,
760 				"Expected a binary message, got "~message.frameOpcode.to!string());
761 			ret = message.readAll();
762 		});
763 		return ret;
764 	}
765 	/// ditto
766 	string receiveText(bool strict = true)
767 	{
768 		string ret;
769 		receive((scope message){
770 			enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text,
771 				"Expected a text message, got "~message.frameOpcode.to!string());
772 			ret = message.readAllUTF8();
773 		});
774 		return ret;
775 	}
776 
777 	/**
778 		Receives a new message using an InputStream.
779 		Throws: WebSocketException if the connection is closed.
780 	*/
781 	void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver)
782 	{
783 		m_readMutex.performLocked!({
784 			while (!m_nextMessage) {
785 				enforce!WebSocketException(connected, "Connection closed while reading message.");
786 				m_readCondition.wait();
787 			}
788 			receiver(m_nextMessage);
789 			m_nextMessage = null;
790 			m_readCondition.notifyAll();
791 		});
792 	}
793 
794 	private void startReader()
795 	nothrow {
796 		try m_readMutex.performLocked!({}); //Wait until initialization
797 		catch (Exception e) {
798 			logException(e, "WebSocket reader task failed to wait for initialization");
799 			try m_conn.close();
800 			catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure");
801 			m_closeCode = WebSocketCloseReason.abnormalClosure;
802 			try m_readCondition.notifyAll();
803 			catch (Exception e) assert(false, e.msg);
804 			return;
805 		}
806 
807 		try {
808 			loop:
809 			while (!m_conn.empty) {
810 				assert(!m_nextMessage);
811 				/*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng);
812 
813 				switch (msg.frameOpcode) {
814 					default: throw new WebSocketException("unknown frame opcode");
815 					case FrameOpcode.ping:
816 						send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong);
817 						break;
818 					case FrameOpcode.pong:
819 						// test if pong matches previous ping
820 						if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) {
821 							logDebugV("Received PONG that doesn't match previous ping.");
822 							break;
823 						}
824 						logDebugV("Received matching PONG.");
825 						m_pongReceived = true;
826 						break;
827 					case FrameOpcode.close:
828 						logDebug("Got closing frame (%s)", m_sentCloseFrame);
829 
830 						// If no close code was passed, we default to 1005
831 						this.m_closeCode = WebSocketCloseReason.noStatusReceived;
832 
833 						// If provided in the frame, attempt to parse the close code/reason
834 						if (msg.peek().length >= short.sizeof) {
835 							this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);
836 
837 							if (msg.peek().length > short.sizeof) {
838 								this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
839 							}
840 						}
841 
842 						if(!m_sentCloseFrame) close();
843 						logDebug("Terminating connection (%s)", m_sentCloseFrame);
844 						break loop;
845 					case FrameOpcode.text:
846 					case FrameOpcode.binary:
847 					case FrameOpcode.continuation: // FIXME: add proper support for continuation frames!
848 						m_readMutex.performLocked!({
849 							m_nextMessage = msg;
850 							m_readCondition.notifyAll();
851 							while (m_nextMessage) m_readCondition.wait();
852 						});
853 						break;
854 				}
855 			}
856 		} catch (Exception e) {
857 			logDiagnostic("Error while reading websocket message: %s", e.msg);
858 			logDiagnostic("Closing connection.");
859 		}
860 
861 		// If no close code was passed, e.g. this was an unclean termination
862 		//  of our websocket connection, set the close code to 1006.
863 		if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure;
864 
865 		try m_conn.close();
866 		catch (Exception e) logException(e, "Failed to close WebSocket connection");
867 		try m_readCondition.notifyAll();
868 		catch (Exception e) assert(false, e.msg);
869 	}
870 
871 	private void sendPing()
872 	nothrow {
873 		try {
874 			if (!m_pongReceived) {
875 				logDebug("Pong skipped. Closing connection.");
876 				close();
877 				m_pingTimer.stop();
878 				return;
879 			}
880 			m_pongReceived = false;
881 			send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
882 			logDebugV("Ping sent");
883 		} catch (Exception e) {
884 			logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
885 		}
886 	}
887 }
888 
889 /**
890 	Represents a single outgoing _WebSocket message as an OutputStream.
891 */
892 final class OutgoingWebSocketMessage : OutputStream {
893 @safe:
894 	private {
895 		RandomNumberStream m_rng;
896 		Stream m_conn;
897 		FrameOpcode m_frameOpcode;
898 		Appender!(ubyte[]) m_buffer;
899 		bool m_finalized = false;
900 	}
901 
902 	private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
903 	{
904 		assert(conn !is null);
905 		m_conn = conn;
906 		m_frameOpcode = frameOpcode;
907 		m_rng = rng;
908 	}
909 
910 	size_t write(in ubyte[] bytes, IOMode mode)
911 	{
912 		assert(!m_finalized);
913 
914 		if (!m_buffer.data.length) {
915 			ubyte[Frame.maxHeaderSize] header_padding;
916 			m_buffer.put(header_padding[]);
917 		}
918 
919 		m_buffer.put(bytes);
920 		return bytes.length;
921 	}
922 
923 	void flush()
924 	{
925 		assert(!m_finalized);
926 		if (m_buffer.data.length > 0)
927 			sendFrame(false);
928 	}
929 
930 	void finalize()
931 	{
932 		if (m_finalized) return;
933 		m_finalized = true;
934 		sendFrame(true);
935 	}
936 
937 	private void sendFrame(bool fin)
938 	{
939 		if (!m_buffer.data.length)
940 			write(null, IOMode.once);
941 
942 		assert(m_buffer.data.length >= Frame.maxHeaderSize);
943 
944 		Frame frame;
945 		frame.fin = fin;
946 		frame.opcode = m_frameOpcode;
947 		frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
948 		auto hsize = frame.getHeaderSize(m_rng !is null);
949 		auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
950 		frame.writeHeader(msg[0 .. hsize], m_rng);
951 		m_conn.write(msg);
952 		m_conn.flush();
953 		m_buffer.clear();
954 	}
955 
956 	alias write = OutputStream.write;
957 }
958 
959 
960 /**
961 	Represents a single incoming _WebSocket message as an InputStream.
962 */
963 final class IncomingWebSocketMessage : InputStream {
964 @safe:
965 	private {
966 		RandomNumberStream m_rng;
967 		Stream m_conn;
968 		Frame m_currentFrame;
969 	}
970 
971 	private this(Stream conn, RandomNumberStream rng)
972 	{
973 		assert(conn !is null);
974 		m_conn = conn;
975 		m_rng = rng;
976 		skipFrame(); // reads the first frame
977 	}
978 
979 	@property bool empty() const { return m_currentFrame.payload.length == 0; }
980 
981 	@property ulong leastSize() const { return m_currentFrame.payload.length; }
982 
983 	@property bool dataAvailableForRead() { return true; }
984 
985 	/// The frame type for this nessage;
986 	@property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; }
987 
988 	const(ubyte)[] peek() { return m_currentFrame.payload; }
989 
990 	/**
991 	 * Retrieve the next websocket frame of the stream and discard the current
992 	 * one
993 	 *
994 	 * This function is helpful if one wish to process frames by frames,
995 	 * or minimize memory allocation, as `peek` will only return the current
996 	 * frame data, and read requires a pre-allocated buffer.
997 	 *
998 	 * Returns:
999 	 * `false` if the current frame is the final one, `true` if a new frame
1000 	 * was read.
1001 	 */
1002 	bool skipFrame()
1003 	{
1004 		if (m_currentFrame.fin)
1005 			return false;
1006 
1007 		m_currentFrame = Frame.readFrame(m_conn);
1008 		return true;
1009 	}
1010 
1011 	size_t read(scope ubyte[] dst, IOMode mode)
1012 	{
1013 		size_t nread = 0;
1014 
1015 		while (dst.length > 0) {
1016 			enforce!WebSocketException(!empty , "cannot read from empty stream");
1017 			enforce!WebSocketException(leastSize > 0, "no data available" );
1018 
1019 			import std.algorithm : min;
1020 			auto sz = cast(size_t)min(leastSize, dst.length);
1021 			dst[0 .. sz] = m_currentFrame.payload[0 .. sz];
1022 			dst = dst[sz .. $];
1023 			m_currentFrame.payload = m_currentFrame.payload[sz .. $];
1024 			nread += sz;
1025 
1026 			if (leastSize == 0) {
1027 				if (mode == IOMode.immediate || mode == IOMode.once && nread > 0)
1028 					break;
1029 				this.skipFrame();
1030 			}
1031 		}
1032 
1033 		return nread;
1034 	}
1035 
1036 	alias read = InputStream.read;
1037 }
1038 
1039 /// Magic string defined by the RFC for challenging the server during upgrade
1040 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1041 
1042 
1043 /**
1044  * The Opcode is 4 bits, as defined in Section 5.2
1045  *
1046  * Values are defined in section 11.8
1047  * Currently only 6 values are defined, however the opcode is defined as
1048  * taking 4 bits.
1049  */
1050 public enum FrameOpcode : ubyte {
1051 	continuation = 0x0,
1052 	text = 0x1,
1053 	binary = 0x2,
1054 	close = 0x8,
1055 	ping = 0x9,
1056 	pong = 0xA
1057 }
1058 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits");
1059 
1060 
1061 private struct Frame {
1062 @safe:
1063 	enum maxHeaderSize = 14;
1064 
1065 	bool fin;
1066 	FrameOpcode opcode;
1067 	ubyte[] payload;
1068 
1069     /**
1070      * Return the header length encoded with the expected amount of bits
1071      *
1072      * The WebSocket RFC define a variable-length payload length.
1073      * In short, it means that:
1074      * - If the length is <= 125, it is stored as the 7 least significant
1075      *   bits of the second header byte.  The first bit is reserved for MASK.
1076      * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of
1077      *   126 is stored in the aforementioned 7 bits, and the actual length
1078      *   is stored in the next two bytes, resulting in a 4 bytes header
1079      *   ( + masking key, if any).
1080      * - If the length is > 65_536, a magic value of 127 will be used for
1081      *   the 7-bit field, and the next 8 bytes are expected to be the length,
1082      *   resulting in a 10 bytes header ( + masking key, if any).
1083      *
1084      * Those functions encapsulate all this logic and allow to just get the
1085      * length with the desired size.
1086      *
1087      * Return:
1088      * - For `ubyte`, the value to store in the 7 bits field, either the
1089      *   length or a magic value (126 or 127).
1090      * - For `ushort`, a value in the range [126; 65_536].
1091      *   If payload.length is not in this bound, an assertion will be triggered.
1092      * - For `ulong`, a value in the range [65_537; size_t.max].
1093      *   If payload.length is not in this bound, an assertion will be triggered.
1094      */
1095 	size_t getHeaderSize(bool mask)
1096 	{
1097 		size_t ret = 1;
1098 		if (payload.length < 126) ret += 1;
1099 		else if (payload.length < 65536) ret += 3;
1100 		else ret += 9;
1101 		if (mask) ret += 4;
1102 		return ret;
1103 	}
1104 
1105 	void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
1106 	{
1107 		ubyte[4] buff;
1108 		ubyte firstByte = cast(ubyte)opcode;
1109 		if (fin) firstByte |= 0x80;
1110 		dst[0] = firstByte;
1111 		dst = dst[1 .. $];
1112 
1113 		auto b1 = sys_rng ? 0x80 : 0x00;
1114 
1115 		if (payload.length < 126) {
1116 			dst[0] = cast(ubyte)(b1 | payload.length);
1117 			dst = dst[1 .. $];
1118 		} else if (payload.length < 65536) {
1119 			dst[0] = cast(ubyte) (b1 | 126);
1120 			dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
1121 			dst = dst[3 .. $];
1122 		} else {
1123 			dst[0] = cast(ubyte) (b1 | 127);
1124 			dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
1125 			dst = dst[9 .. $];
1126 		}
1127 
1128 		if (sys_rng) {
1129 			sys_rng.read(dst[0 .. 4]);
1130 			for (size_t i = 0; i < payload.length; i++)
1131 				payload[i] ^= dst[i % 4];
1132 		}
1133 	}
1134 
1135 	static Frame readFrame(InputStream stream)
1136 	{
1137 		Frame frame;
1138 		ubyte[8] data;
1139 
1140 		stream.read(data[0 .. 2]);
1141 		frame.fin = (data[0] & 0x80) != 0;
1142 		frame.opcode = cast(FrameOpcode)(data[0] & 0x0F);
1143 
1144 		bool masked = !!(data[1] & 0b1000_0000);
1145 
1146 		//parsing length
1147 		ulong length = data[1] & 0b0111_1111;
1148 		if (length == 126) {
1149 			stream.read(data[0 .. 2]);
1150 			length = bigEndianToNative!ushort(data[0 .. 2]);
1151 		} else if (length == 127) {
1152 			stream.read(data);
1153 			length = bigEndianToNative!ulong(data);
1154 
1155 			// RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes
1156 			// interpreted as a 64-bit unsigned integer (the most significant
1157 			// bit MUST be 0)
1158 			enforce!WebSocketException(!(length >> 63),
1159 				"Received length has a non-zero most significant bit");
1160 
1161 		}
1162 		logDebug("Read frame: %s %s %s length=%d",
1163 				 frame.opcode,
1164 				 frame.fin ? "final frame" : "continuation",
1165 				 masked ? "masked" : "not masked",
1166 				 length);
1167 
1168 		// Masking key is 32 bits / uint
1169 		if (masked)
1170 			stream.read(data[0 .. 4]);
1171 
1172 		// Read payload
1173 		// TODO: Provide a way to limit the size read, easy
1174 		// DOS for server code here (rejectedsoftware/vibe.d#1496).
1175 		enforce!WebSocketException(length <= size_t.max);
1176 		frame.payload = new ubyte[](cast(size_t)length);
1177 		stream.read(frame.payload);
1178 
1179 		//de-masking
1180 		if (masked)
1181 			foreach (size_t i; 0 .. cast(size_t)length)
1182 				frame.payload[i] = frame.payload[i] ^ data[i % 4];
1183 
1184 		return frame;
1185 	}
1186 }
1187 
1188 unittest {
1189 	import std.algorithm.searching : all;
1190 
1191 	final class DummyRNG : RandomNumberStream {
1192 	@safe:
1193 		@property bool empty() { return false; }
1194 		@property ulong leastSize() { return ulong.max; }
1195 		@property bool dataAvailableForRead() { return true; }
1196 		const(ubyte)[] peek() { return null; }
1197 		size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; }
1198 		alias read = RandomNumberStream.read;
1199 	}
1200 
1201 	ubyte[14] hdrbuf;
1202 	auto rng = new DummyRNG;
1203 
1204 	Frame f;
1205 	f.payload = new ubyte[125];
1206 
1207 	assert(f.getHeaderSize(false) == 2);
1208 	hdrbuf[] = 0;
1209 	f.writeHeader(hdrbuf[0 .. 2], null);
1210 	assert(hdrbuf[0 .. 2] == [0, 125]);
1211 
1212 	assert(f.getHeaderSize(true) == 6);
1213 	hdrbuf[] = 0;
1214 	f.writeHeader(hdrbuf[0 .. 6], rng);
1215 	assert(hdrbuf[0 .. 2] == [0, 128|125]);
1216 	assert(hdrbuf[2 .. 6].all!(b => b == 13));
1217 
1218 	f.payload = new ubyte[126];
1219 	assert(f.getHeaderSize(false) == 4);
1220 	hdrbuf[] = 0;
1221 	f.writeHeader(hdrbuf[0 .. 4], null);
1222 	assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]);
1223 
1224 	assert(f.getHeaderSize(true) == 8);
1225 	hdrbuf[] = 0;
1226 	f.writeHeader(hdrbuf[0 .. 8], rng);
1227 	assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]);
1228 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1229 
1230 	f.payload = new ubyte[65535];
1231 	assert(f.getHeaderSize(false) == 4);
1232 	hdrbuf[] = 0;
1233 	f.writeHeader(hdrbuf[0 .. 4], null);
1234 	assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]);
1235 
1236 	assert(f.getHeaderSize(true) == 8);
1237 	hdrbuf[] = 0;
1238 	f.writeHeader(hdrbuf[0 .. 8], rng);
1239 	assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]);
1240 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1241 
1242 	f.payload = new ubyte[65536];
1243 	assert(f.getHeaderSize(false) == 10);
1244 	hdrbuf[] = 0;
1245 	f.writeHeader(hdrbuf[0 .. 10], null);
1246 	assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]);
1247 
1248 	assert(f.getHeaderSize(true) == 14);
1249 	hdrbuf[] = 0;
1250 	f.writeHeader(hdrbuf[0 .. 14], rng);
1251 	assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]);
1252 	assert(hdrbuf[10 .. 14].all!(b => b == 13));
1253 }
1254 
1255 /**
1256  * Generate a challenge key for the protocol upgrade phase.
1257  */
1258 private string generateChallengeKey(scope RandomNumberStream rng)
1259 {
1260 	ubyte[16] buffer;
1261 	rng.read(buffer);
1262 	return Base64.encode(buffer);
1263 }
1264 
1265 private string computeAcceptKey(string challengekey)
1266 {
1267 	immutable(ubyte)[] b = challengekey.representation;
1268 	immutable(ubyte)[] a = s_webSocketGuid.representation;
1269 	SHA1 hash;
1270 	hash.start();
1271 	hash.put(b);
1272 	hash.put(a);
1273 	auto result = Base64.encode(hash.finish());
1274 	return to!(string)(result);
1275 }