1 /**
2 	Implements WebSocket support and fallbacks for older browsers.
3 
4 	Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
5 	Copyright: © 2012-2014 Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Jan Krüger
8 */
9 module vibe.http.websockets;
10 
11 ///
12 @safe unittest {
13 	void handleConn(scope WebSocket sock)
14 	{
15 		// simple echo server
16 		while (sock.connected) {
17 			auto msg = sock.receiveText();
18 			sock.send(msg);
19 		}
20 	}
21 
22 	void startServer()
23 	{
24 		import vibe.http.router;
25 		auto router = new URLRouter;
26 		router.get("/ws", handleWebSockets(&handleConn));
27 
28 		// Start HTTP server using listenHTTP()...
29 	}
30 }
31 
32 import vibe.core.core;
33 import vibe.core.log;
34 import vibe.core.net;
35 import vibe.core.sync;
36 import vibe.stream.operations;
37 import vibe.http.server;
38 import vibe.http.client;
39 import vibe.core.connectionpool;
40 import vibe.utils.array;
41 
42 import core.time;
43 import std.algorithm: equal, splitter;
44 import std.array;
45 import std.base64;
46 import std.conv;
47 import std.exception;
48 import std.bitmanip;
49 import std.digest.sha;
50 import std.string;
51 import std.functional;
52 import std.uuid;
53 import std.base64;
54 import std.digest.sha;
55 import std.uni: asLowerCase;
56 import vibe.crypto.cryptorand;
57 
58 @safe:
59 
60 
61 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow;
62 
63 
64 /// Exception thrown by $(D vibe.http.websockets).
65 class WebSocketException: Exception
66 {
67 	@safe pure nothrow:
68 
69 	///
70 	this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
71 	{
72 		super(msg, file, line, next);
73 	}
74 
75 	///
76 	this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__)
77 	{
78 		super(msg, next, file, line);
79 	}
80 }
81 
82 /** Establishes a WebSocket connection at the specified endpoint.
83 */
84 WebSocket connectWebSocketEx(URL url,
85 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
86 	const(HTTPClientSettings) settings = defaultSettings)
87 @safe {
88 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
89 	url.schema = use_tls ? "https" : "http";
90 
91 	auto rng = secureRNG();
92 	auto challengeKey = generateChallengeKey(rng);
93 	auto answerKey = computeAcceptKey(challengeKey);
94 	auto res = requestHTTP(url, (scope req){
95 		req.method = HTTPMethod.GET;
96 		req.headers["Upgrade"] = "websocket";
97 		req.headers["Connection"] = "Upgrade";
98 		req.headers["Sec-WebSocket-Version"] = "13";
99 		req.headers["Sec-WebSocket-Key"] = challengeKey;
100 		request_modifier(req);
101 	}, settings);
102 
103 	enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
104 
105 	auto key = "sec-websocket-accept" in res.headers;
106 	enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
107 	enforce(*key == answerKey, "Response has wrong accept key");
108 	auto conn = res.switchProtocol("websocket");
109 	return new WebSocket(conn, rng, res);
110 }
111 
112 /// ditto
113 void connectWebSocketEx(URL url,
114 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
115 	scope WebSocketHandshakeDelegate del,
116 	const(HTTPClientSettings) settings = defaultSettings)
117 @safe {
118 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
119 	url.schema = use_tls ? "https" : "http";
120 
121 	/*scope*/auto rng = secureRNG();
122 	auto challengeKey = generateChallengeKey(rng);
123 	auto answerKey = computeAcceptKey(challengeKey);
124 
125 	requestHTTP(url,
126 		(scope req) {
127 			req.method = HTTPMethod.GET;
128 			req.headers["Upgrade"] = "websocket";
129 			req.headers["Connection"] = "Upgrade";
130 			req.headers["Sec-WebSocket-Version"] = "13";
131 			req.headers["Sec-WebSocket-Key"] = challengeKey;
132 			request_modifier(req);
133 		},
134 		(scope res) {
135 			enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
136 			auto key = "sec-websocket-accept" in res.headers;
137 			enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
138 			enforce(*key == answerKey, "Response has wrong accept key");
139 			res.switchProtocol("websocket", (scope conn) @trusted {
140 				scope ws = new WebSocket(conn, rng, res);
141 				del(ws);
142 				if (ws.connected) ws.close();
143 			});
144 		},
145 		settings
146 	);
147 }
148 
149 /// ditto
150 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings)
151 @safe {
152 	return connectWebSocketEx(url, (scope req) {}, settings);
153 }
154 /// ditto
155 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings)
156 @safe {
157 	connectWebSocketEx(url, (scope req) {}, del, settings);
158 }
159 /// ditto
160 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings)
161 @system {
162 	connectWebSocket(url, (scope ws) nothrow {
163 		try del(ws);
164 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
165 	}, settings);
166 }
167 /// Scheduled for deprecation - use a `@safe` callback instead.
168 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings)
169 @system {
170 	connectWebSocket(url, (scope ws) @trusted => del(ws), settings);
171 }
172 /// Scheduled for deprecation - use a `nothrow` callback instead.
173 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings)
174 @safe {
175 	connectWebSocket(url, (scope ws) nothrow {
176 		try del(ws);
177 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
178 	}, settings);
179 }
180 
181 
182 /**
183 	Establishes a web socket conection and passes it to the $(D on_handshake) delegate.
184 */
185 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
186 @safe {
187 	auto pUpgrade = "Upgrade" in req.headers;
188 	auto pConnection = "Connection" in req.headers;
189 	auto pKey = "Sec-WebSocket-Key" in req.headers;
190 	//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
191 	auto pVersion = "Sec-WebSocket-Version" in req.headers;
192 
193 	auto isUpgrade = false;
194 
195 	if( pConnection ) {
196 		auto connectionTypes = splitter(*pConnection, ",");
197 		foreach( t ; connectionTypes ) {
198 			if( t.strip().asLowerCase().equal("upgrade") ) {
199 				isUpgrade = true;
200 				break;
201 			}
202 		}
203 	}
204 
205 	string req_error;
206 	if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests.";
207 	else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header.";
208 	else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported.";
209 	else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header.";
210 
211 	if (req_error.length) {
212 		logDebug("Browser sent invalid WebSocket request: %s", req_error);
213 		res.statusCode = HTTPStatus.badRequest;
214 		res.writeBody(req_error);
215 		return;
216 	}
217 
218 	auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
219 	res.headers["Sec-WebSocket-Accept"] = accept;
220 	res.headers["Connection"] = "Upgrade";
221 	ConnectionStream conn = res.switchProtocol("websocket");
222 
223 	WebSocket socket = new WebSocket(conn, req, res);
224 	try {
225 		on_handshake(socket);
226 	} catch (Exception e) {
227 		logDiagnostic("WebSocket handler failed: %s", e.msg);
228 	}
229 	socket.close();
230 }
231 /// Scheduled for deprecation - use a `@safe` callback instead.
232 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
233 @system {
234 	handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res);
235 }
236 /// Scheduled for deprecation - use a `nothrow` callback instead.
237 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
238 {
239 	handleWebSocket((scope ws) nothrow {
240 		try on_handshake(ws);
241 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
242 	}, req, res);
243 }
244 /// ditto
245 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
246 @system {
247 	handleWebSocket((scope ws) nothrow {
248 		try on_handshake(ws);
249 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
250 	}, req, res);
251 }
252 
253 
254 /**
255 	Returns a HTTP request handler that establishes web socket conections.
256 */
257 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake)
258 @safe {
259 	return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ());
260 }
261 /// ditto
262 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake)
263 @safe {
264 	void callback(scope HTTPServerRequest req, scope HTTPServerResponse res)
265 	@safe {
266 		auto pUpgrade = "Upgrade" in req.headers;
267 		auto pConnection = "Connection" in req.headers;
268 		auto pKey = "Sec-WebSocket-Key" in req.headers;
269 		//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
270 		auto pVersion = "Sec-WebSocket-Version" in req.headers;
271 
272 		auto isUpgrade = false;
273 
274 		if( pConnection ) {
275 			auto connectionTypes = splitter(*pConnection, ",");
276 			foreach( t ; connectionTypes ) {
277 				if( t.strip().asLowerCase().equal("upgrade") ) {
278 					isUpgrade = true;
279 					break;
280 				}
281 			}
282 		}
283 		if( !(isUpgrade &&
284 			  pUpgrade && icmp(*pUpgrade, "websocket") == 0 &&
285 			  pKey &&
286 			  pVersion && *pVersion == "13") )
287 		{
288 			logDebug("Browser sent invalid WebSocket request.");
289 			res.statusCode = HTTPStatus.badRequest;
290 			res.writeVoidBody();
291 			return;
292 		}
293 
294 		auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
295 		res.headers["Sec-WebSocket-Accept"] = accept;
296 		res.headers["Connection"] = "Upgrade";
297 		res.switchProtocol("websocket", (scope conn) {
298 			// TODO: put back 'scope' once it is actually enforced by DMD
299 			/*scope*/ auto socket = new WebSocket(conn, req, res);
300 			try on_handshake(socket);
301 			catch (Exception e) {
302 				logDiagnostic("WebSocket handler failed: %s", e.msg);
303 			}
304 			socket.close();
305 		});
306 	}
307 	return &callback;
308 }
309 /// Scheduled for deprecation - use a `@safe` callback instead.
310 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake)
311 @system {
312 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
313 }
314 /// Scheduled for deprecation - use a `@safe` callback instead.
315 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake)
316 @system {
317 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
318 }
319 /// Scheduled for deprecation - use a `nothrow` callback instead.
320 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake)
321 {
322 	return handleWebSockets(delegate (scope ws) nothrow {
323 		try on_handshake(ws);
324 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
325 	});
326 }
327 /// ditto
328 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake)
329 {
330 	return handleWebSockets(delegate (scope ws) nothrow {
331 		try on_handshake(ws);
332 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
333 	});
334 }
335 /// ditto
336 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake)
337 @system {
338 	return handleWebSockets(delegate (scope ws) nothrow {
339 		try on_handshake(ws);
340 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
341 	});
342 }
343 /// ditto
344 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake)
345 @system {
346 	return handleWebSockets(delegate (scope ws) nothrow {
347 		try on_handshake(ws);
348 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
349 	});
350 }
351 
352 /**
353  * Provides the reason that a websocket connection has closed.
354  *
355  * Further documentation for the WebSocket and it's codes can be found from:
356  * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
357  *
358  * ---
359  *
360  * void echoSocket(scope WebSocket sock)
361  * {
362  *   import std.datetime : seconds;
363  *
364  *   while(sock.waitForData(3.seconds))
365  *   {
366  *     string msg = sock.receiveText;
367  *     logInfo("Got a message: %s", msg);
368  *     sock.send(msg);
369  *   }
370  *
371  *   if(sock.connected)
372  *     sock.close(WebSocketCloseReason.policyViolation, "timeout");
373  * }
374  * ---
375  */
376 enum WebSocketCloseReason : short
377 {
378 	none = 0,
379 	normalClosure = 1000,
380 	goingAway = 1001,
381 	protocolError = 1002,
382 	unsupportedData = 1003,
383 	noStatusReceived = 1005,
384 	abnormalClosure = 1006,
385 	invalidFramePayloadData = 1007,
386 	policyViolation = 1008,
387 	messageTooBig = 1009,
388 	internalError = 1011,
389 	serviceRestart = 1012,
390 	tryAgainLater = 1013,
391 	badGateway = 1014,
392 	tlsHandshake = 1015
393 }
394 
395 string closeReasonString(WebSocketCloseReason reason) @nogc @safe
396 {
397 	import std.math : floor;
398 
399 	//round down to the nearest thousand to get category
400 	switch(cast(short)(cast(float)reason / 1000f).floor)
401 	{
402 		case 0:
403 			return "Reserved and Unused";
404 		case 1:
405 			switch(reason)
406 			{
407 				case 1000:
408 					return "Normal Closure";
409 				case 1001:
410 					return "Going Away";
411 				case 1002:
412 					return "Protocol Error";
413 				case 1003:
414 					return "Unsupported Data";
415 				case 1004:
416 					return "RESERVED";
417 				case 1005:
418 					return "No Status Recvd";
419 				case 1006:
420 					return "Abnormal Closure";
421 				case 1007:
422 					return "Invalid Frame Payload Data";
423 				case 1008:
424 					return "Policy Violation";
425 				case 1009:
426 					return "Message Too Big";
427 				case 1010:
428 					return "Missing Extension";
429 				case 1011:
430 					return "Internal Error";
431 				case 1012:
432 					return "Service Restart";
433 				case 1013:
434 					return "Try Again Later";
435 				case 1014:
436 					return "Bad Gateway";
437 				case 1015:
438 					return "TLS Handshake";
439 				default:
440 					return "RESERVED";
441 			}
442 		case 2:
443 			return "Reserved for extensions";
444 		case 3:
445 			return "Available for frameworks and libraries";
446 		case 4:
447 			return "Available for applications";
448 		default:
449 			return "UNDEFINED - Nasal Demons";
450 	}
451 }
452 
453 unittest
454 {
455 	assert((cast(WebSocketCloseReason)   0).closeReasonString == "Reserved and Unused");
456 	assert((cast(WebSocketCloseReason)   1).closeReasonString == "Reserved and Unused");
457 	assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure");
458 	assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure");
459 	assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED");
460 	assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions");
461 	assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries");
462 	assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications");
463 	assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons");
464 	assert((cast(WebSocketCloseReason)  -1).closeReasonString == "UNDEFINED - Nasal Demons");
465 
466 	//check the other spec cases
467 	for(short i = 1000; i < 1017; i++)
468 	{
469 		if(i == 1004 || i > 1015)
470 		{
471 			assert(
472 				(cast(WebSocketCloseReason)i).closeReasonString == "RESERVED",
473 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
474 			);
475 		}
476 		else
477 			assert(
478 				(cast(WebSocketCloseReason)i).closeReasonString != "RESERVED",
479 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
480 			);
481 	}
482 }
483 
484 
485 /**
486  * Represents a single _WebSocket connection.
487  *
488  * ---
489  * int main (string[] args)
490  * {
491  *   auto taskHandle = runTask(() => connectToWS());
492  *   return runApplication(&args);
493  * }
494  *
495  * void connectToWS ()
496  * {
497  *   auto ws_url = URL("wss://websockets.example.com/websocket/auth_token");
498  *   auto ws = connectWebSocket(ws_url);
499  *   logInfo("WebSocket connected");
500  *
501  *   while (ws.waitForData())
502  *   {
503  *     auto txt = ws.receiveText;
504  *     logInfo("Received: %s", txt);
505  *   }
506  *   logFatal("Connection lost!");
507  * }
508  * ---
509  */
510 final class WebSocket {
511 @safe:
512 
513 	private {
514 		ConnectionStream m_conn;
515 		bool m_sentCloseFrame = false;
516 		IncomingWebSocketMessage m_nextMessage = null;
517 		const HTTPServerRequest m_request;
518 		HTTPServerResponse m_serverResponse;
519 		HTTPClientResponse m_clientResponse;
520 		Task m_reader;
521 		Task m_ownerTask;
522 		InterruptibleTaskMutex m_readMutex, m_writeMutex;
523 		InterruptibleTaskCondition m_readCondition;
524 		Timer m_pingTimer;
525 		uint m_lastPingIndex;
526 		bool m_pongReceived;
527 		short m_closeCode;
528 		const(char)[] m_closeReason;
529 		/// The entropy generator to use
530 		/// If not null, it means this is a server socket.
531 		RandomNumberStream m_rng;
532 	}
533 
534 	/**
535 	 * Private constructor, called from `connectWebSocket`.
536 	 *
537 	 * Params:
538 	 *	 conn = Underlying connection string
539 	 *	 request = HTTP request used to establish the connection
540 	 *	 rng = Source of entropy to use.  If null, assume we're a server socket
541 	 *   client_res = For client sockets, the response object (keeps the http client locked until the socket is done)
542 	 */
543 	private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res)
544 	{
545 		m_ownerTask = Task.getThis();
546 		m_conn = conn;
547 		m_request = request;
548 		m_clientResponse = client_res;
549 		m_serverResponse = server_res;
550 		assert(m_conn);
551 		m_rng = rng;
552 		m_writeMutex = new InterruptibleTaskMutex;
553 		m_readMutex = new InterruptibleTaskMutex;
554 		m_readCondition = new InterruptibleTaskCondition(m_readMutex);
555 		m_readMutex.performLocked!({
556 			m_reader = runTask(&startReader);
557 			if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) {
558 				m_pongReceived = true;
559 				m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true);
560 			}
561 		});
562 	}
563 
564 	private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res)
565 	{
566 		this(conn, null, null, rng, client_res);
567 	}
568 
569 	private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res)
570 	{
571 		this(conn, request, res, null, null);
572 	}
573 
574 	/**
575 		Determines if the WebSocket connection is still alive and ready for sending.
576 
577 		Note that for determining the ready state for $(EM reading), you need
578 		to use $(D waitForData) instead, because both methods can return
579 		different values while a disconnect is in proress.
580 
581 		See_also: $(D waitForData)
582 	*/
583 	@property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; }
584 
585 	/**
586 		Returns the close code sent by the remote end.
587 
588 		Note if the connection was never opened, is still alive, or was closed
589 		locally this value will be 0. If no close code was given by the remote
590 		end in the close frame, the value will be 1005. If the connection was
591 		not closed cleanly by the remote end, this value will be 1006.
592 	*/
593 	@property short closeCode() { return m_closeCode; }
594 
595 	/**
596 		Returns the close reason sent by the remote end.
597 
598 		Note if the connection was never opened, is still alive, or was closed
599 		locally this value will be an empty string.
600 	*/
601 	@property const(char)[] closeReason() { return m_closeReason; }
602 
603 	/**
604 		The HTTP request that established the web socket connection.
605 	*/
606 	@property const(HTTPServerRequest) request() const { return m_request; }
607 
608 	/**
609 		Checks if data is readily available for read.
610 	*/
611 	@property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; }
612 
613 	/** Waits until either a message arrives or until the connection is closed.
614 
615 		This function can be used in a read loop to cleanly determine when to stop reading.
616 	*/
617 	bool waitForData()
618 	{
619 		if (m_nextMessage) return true;
620 
621 		m_readMutex.performLocked!({
622 			while (connected && m_nextMessage is null)
623 				m_readCondition.wait();
624 		});
625 		return m_nextMessage !is null;
626 	}
627 
628 	/// ditto
629 	bool waitForData(Duration timeout)
630 	{
631 		import std.datetime;
632 
633 		if (m_nextMessage) return true;
634 
635 		immutable limit_time = Clock.currTime(UTC()) + timeout;
636 
637 		m_readMutex.performLocked!({
638 			while (connected && m_nextMessage is null && timeout > 0.seconds) {
639 				m_readCondition.wait(timeout);
640 				timeout = limit_time - Clock.currTime(UTC());
641 			}
642 		});
643 		return m_nextMessage !is null;
644 	}
645 
646 	/**
647 		Sends a text message.
648 
649 		On the JavaScript side, the text will be available as message.data (type string).
650 
651 		Throws:
652 			A `WebSocketException` is thrown if the connection gets closed
653 			before or during the transfer of the message.
654 	*/
655 	void send(scope const(char)[] data)
656 	{
657 		send(
658 			(scope message) { message.write(cast(const ubyte[])data); },
659 			FrameOpcode.text);
660 	}
661 
662 	/**
663 		Sends a binary message.
664 
665 		On the JavaScript side, the text will be available as message.data (type Blob).
666 
667 		Throws:
668 			A `WebSocketException` is thrown if the connection gets closed
669 			before or during the transfer of the message.
670 	*/
671 	void send(in ubyte[] data)
672 	{
673 		send((scope message){ message.write(data); }, FrameOpcode.binary);
674 	}
675 
676 	/**
677 		Sends a message using an output stream.
678 
679 		Throws:
680 			A `WebSocketException` is thrown if the connection gets closed
681 			before or during the transfer of the message.
682 	*/
683 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode)
684 	{
685 		m_writeMutex.performLocked!({
686 			enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
687 			/*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng);
688 			scope(exit) message.finalize();
689 			sender(message);
690 		});
691 	}
692 
693 	/// Compatibility overload - will be removed soon.
694 	deprecated("Call the overload which requires an explicit FrameOpcode.")
695 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender)
696 	{
697 		send(sender, FrameOpcode.text);
698 	}
699 
700 	/**
701 		Actively closes the connection.
702 
703 		Params:
704 			code = Numeric code indicating a termination reason.
705 			reason = Message describing why the connection was terminated.
706 	*/
707 	void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "")
708 	{
709 		import std.algorithm.comparison : min;
710 		if(reason !is null && reason.length == 0)
711 			reason = (cast(WebSocketCloseReason)code).closeReasonString;
712 
713 		//control frame payloads are limited to 125 bytes
714 		version(assert)
715 			assert(reason.length <= 123);
716 		else
717 			reason = reason[0 .. min($, 123)];
718 
719 		if (connected) {
720 			try {
721 				send((scope msg) {
722 					m_sentCloseFrame = true;
723 					if (code != 0) {
724 						msg.write(std.bitmanip.nativeToBigEndian(code));
725 						msg.write(cast(const ubyte[])reason);
726 					}
727 				}, FrameOpcode.close);
728 			} catch (Exception e) {
729 				logDiagnostic("Failed to send active web socket close frame: %s", e.msg);
730 			}
731 		}
732 		if (m_pingTimer) m_pingTimer.stop();
733 
734 
735 		if (Task.getThis() == m_ownerTask) {
736 			m_writeMutex.performLocked!({
737 				if (m_clientResponse) {
738 					m_clientResponse.disconnect();
739 					m_clientResponse = HTTPClientResponse.init;
740 				}
741 				if (m_serverResponse) {
742 					m_serverResponse.finalize();
743 					m_serverResponse = HTTPServerResponse.init;
744 				}
745 			});
746 
747 			m_reader.join();
748 
749 			() @trusted { destroy(m_conn); } ();
750 			m_conn = ConnectionStream.init;
751 		}
752 	}
753 
754 	/**
755 		Receives a new message and returns its contents as a newly allocated data array.
756 
757 		Params:
758 			strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise.
759 		Throws: WebSocketException if the connection is closed or
760 			if $(D strict == true) and the frame received is not the right type
761 	*/
762 	ubyte[] receiveBinary(bool strict = true)
763 	{
764 		ubyte[] ret;
765 		receive((scope message){
766 			enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary,
767 				"Expected a binary message, got "~message.frameOpcode.to!string());
768 			ret = message.readAll();
769 		});
770 		return ret;
771 	}
772 	/// ditto
773 	string receiveText(bool strict = true)
774 	{
775 		string ret;
776 		receive((scope message){
777 			enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text,
778 				"Expected a text message, got "~message.frameOpcode.to!string());
779 			ret = message.readAllUTF8();
780 		});
781 		return ret;
782 	}
783 
784 	/**
785 		Receives a new message using an InputStream.
786 		Throws: WebSocketException if the connection is closed.
787 	*/
788 	void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver)
789 	{
790 		m_readMutex.performLocked!({
791 			while (!m_nextMessage) {
792 				enforce!WebSocketException(connected, "Connection closed while reading message.");
793 				m_readCondition.wait();
794 			}
795 			receiver(m_nextMessage);
796 			m_nextMessage = null;
797 			m_readCondition.notifyAll();
798 		});
799 	}
800 
801 	private void startReader()
802 	nothrow {
803 		try m_readMutex.performLocked!({}); //Wait until initialization
804 		catch (Exception e) {
805 			logException(e, "WebSocket reader task failed to wait for initialization");
806 			try m_conn.close();
807 			catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure");
808 			m_closeCode = WebSocketCloseReason.abnormalClosure;
809 			try m_readCondition.notifyAll();
810 			catch (Exception e) assert(false, e.msg);
811 			return;
812 		}
813 
814 		try {
815 			loop:
816 			while (!m_conn.empty) {
817 				assert(!m_nextMessage);
818 				/*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng);
819 
820 				switch (msg.frameOpcode) {
821 					default: throw new WebSocketException("unknown frame opcode");
822 					case FrameOpcode.ping:
823 						send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong);
824 						break;
825 					case FrameOpcode.pong:
826 						// test if pong matches previous ping
827 						if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) {
828 							logDebugV("Received PONG that doesn't match previous ping.");
829 							break;
830 						}
831 						logDebugV("Received matching PONG.");
832 						m_pongReceived = true;
833 						break;
834 					case FrameOpcode.close:
835 						logDebug("Got closing frame (%s)", m_sentCloseFrame);
836 
837 						// If no close code was passed, we default to 1005
838 						this.m_closeCode = WebSocketCloseReason.noStatusReceived;
839 
840 						// If provided in the frame, attempt to parse the close code/reason
841 						if (msg.peek().length >= short.sizeof) {
842 							this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);
843 
844 							if (msg.peek().length > short.sizeof) {
845 								this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
846 							}
847 						}
848 
849 						if(!m_sentCloseFrame) close();
850 						logDebug("Terminating connection (%s)", m_sentCloseFrame);
851 						break loop;
852 					case FrameOpcode.text:
853 					case FrameOpcode.binary:
854 					case FrameOpcode.continuation: // FIXME: add proper support for continuation frames!
855 						m_readMutex.performLocked!({
856 							m_nextMessage = msg;
857 							m_readCondition.notifyAll();
858 							while (m_nextMessage) m_readCondition.wait();
859 						});
860 						break;
861 				}
862 			}
863 		} catch (Exception e) {
864 			logDiagnostic("Error while reading websocket message: %s", e.msg);
865 			logDiagnostic("Closing connection.");
866 		}
867 
868 		// If no close code was passed, e.g. this was an unclean termination
869 		//  of our websocket connection, set the close code to 1006.
870 		if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure;
871 
872 		try m_conn.close();
873 		catch (Exception e) logException(e, "Failed to close WebSocket connection");
874 		try m_readCondition.notifyAll();
875 		catch (Exception e) assert(false, e.msg);
876 	}
877 
878 	private void sendPing()
879 	nothrow {
880 		try {
881 			if (!m_pongReceived) {
882 				logDebug("Pong skipped. Closing connection.");
883 				close();
884 				m_pingTimer.stop();
885 				return;
886 			}
887 			m_pongReceived = false;
888 			send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
889 			logDebugV("Ping sent");
890 		} catch (Exception e) {
891 			logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
892 		}
893 	}
894 }
895 
896 /**
897 	Represents a single outgoing _WebSocket message as an OutputStream.
898 */
899 final class OutgoingWebSocketMessage : OutputStream {
900 @safe:
901 	private {
902 		RandomNumberStream m_rng;
903 		Stream m_conn;
904 		FrameOpcode m_frameOpcode;
905 		Appender!(ubyte[]) m_buffer;
906 		bool m_finalized = false;
907 	}
908 
909 	private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
910 	{
911 		assert(conn !is null);
912 		m_conn = conn;
913 		m_frameOpcode = frameOpcode;
914 		m_rng = rng;
915 	}
916 
917 	size_t write(in ubyte[] bytes, IOMode mode)
918 	{
919 		assert(!m_finalized);
920 
921 		if (!m_buffer.data.length) {
922 			ubyte[Frame.maxHeaderSize] header_padding;
923 			m_buffer.put(header_padding[]);
924 		}
925 
926 		m_buffer.put(bytes);
927 		return bytes.length;
928 	}
929 
930 	void flush()
931 	{
932 		assert(!m_finalized);
933 		if (m_buffer.data.length > 0)
934 			sendFrame(false);
935 	}
936 
937 	void finalize()
938 	{
939 		if (m_finalized) return;
940 		m_finalized = true;
941 		sendFrame(true);
942 	}
943 
944 	private void sendFrame(bool fin)
945 	{
946 		if (!m_buffer.data.length)
947 			write(null, IOMode.once);
948 
949 		assert(m_buffer.data.length >= Frame.maxHeaderSize);
950 
951 		Frame frame;
952 		frame.fin = fin;
953 		frame.opcode = m_frameOpcode;
954 		frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
955 		auto hsize = frame.getHeaderSize(m_rng !is null);
956 		auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
957 		frame.writeHeader(msg[0 .. hsize], m_rng);
958 		m_conn.write(msg);
959 		m_conn.flush();
960 		m_buffer.clear();
961 	}
962 
963 	alias write = OutputStream.write;
964 }
965 
966 
967 /**
968 	Represents a single incoming _WebSocket message as an InputStream.
969 */
970 final class IncomingWebSocketMessage : InputStream {
971 @safe:
972 	private {
973 		RandomNumberStream m_rng;
974 		Stream m_conn;
975 		Frame m_currentFrame;
976 	}
977 
978 	private this(Stream conn, RandomNumberStream rng)
979 	{
980 		assert(conn !is null);
981 		m_conn = conn;
982 		m_rng = rng;
983 		skipFrame(); // reads the first frame
984 	}
985 
986 	@property bool empty() const { return m_currentFrame.payload.length == 0; }
987 
988 	@property ulong leastSize() const { return m_currentFrame.payload.length; }
989 
990 	@property bool dataAvailableForRead() { return true; }
991 
992 	/// The frame type for this nessage;
993 	@property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; }
994 
995 	const(ubyte)[] peek() { return m_currentFrame.payload; }
996 
997 	/**
998 	 * Retrieve the next websocket frame of the stream and discard the current
999 	 * one
1000 	 *
1001 	 * This function is helpful if one wish to process frames by frames,
1002 	 * or minimize memory allocation, as `peek` will only return the current
1003 	 * frame data, and read requires a pre-allocated buffer.
1004 	 *
1005 	 * Returns:
1006 	 * `false` if the current frame is the final one, `true` if a new frame
1007 	 * was read.
1008 	 */
1009 	bool skipFrame()
1010 	{
1011 		if (m_currentFrame.fin)
1012 			return false;
1013 
1014 		m_currentFrame = Frame.readFrame(m_conn);
1015 		return true;
1016 	}
1017 
1018 	size_t read(scope ubyte[] dst, IOMode mode)
1019 	{
1020 		size_t nread = 0;
1021 
1022 		while (dst.length > 0) {
1023 			enforce!WebSocketException(!empty , "cannot read from empty stream");
1024 			enforce!WebSocketException(leastSize > 0, "no data available" );
1025 
1026 			import std.algorithm : min;
1027 			auto sz = cast(size_t)min(leastSize, dst.length);
1028 			dst[0 .. sz] = m_currentFrame.payload[0 .. sz];
1029 			dst = dst[sz .. $];
1030 			m_currentFrame.payload = m_currentFrame.payload[sz .. $];
1031 			nread += sz;
1032 
1033 			if (leastSize == 0) {
1034 				if (mode == IOMode.immediate || mode == IOMode.once && nread > 0)
1035 					break;
1036 				this.skipFrame();
1037 			}
1038 		}
1039 
1040 		return nread;
1041 	}
1042 
1043 	alias read = InputStream.read;
1044 }
1045 
1046 /// Magic string defined by the RFC for challenging the server during upgrade
1047 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1048 
1049 
1050 /**
1051  * The Opcode is 4 bits, as defined in Section 5.2
1052  *
1053  * Values are defined in section 11.8
1054  * Currently only 6 values are defined, however the opcode is defined as
1055  * taking 4 bits.
1056  */
1057 public enum FrameOpcode : ubyte {
1058 	continuation = 0x0,
1059 	text = 0x1,
1060 	binary = 0x2,
1061 	close = 0x8,
1062 	ping = 0x9,
1063 	pong = 0xA
1064 }
1065 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits");
1066 
1067 
1068 private struct Frame {
1069 @safe:
1070 	enum maxHeaderSize = 14;
1071 
1072 	bool fin;
1073 	FrameOpcode opcode;
1074 	ubyte[] payload;
1075 
1076     /**
1077      * Return the header length encoded with the expected amount of bits
1078      *
1079      * The WebSocket RFC define a variable-length payload length.
1080      * In short, it means that:
1081      * - If the length is <= 125, it is stored as the 7 least significant
1082      *   bits of the second header byte.  The first bit is reserved for MASK.
1083      * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of
1084      *   126 is stored in the aforementioned 7 bits, and the actual length
1085      *   is stored in the next two bytes, resulting in a 4 bytes header
1086      *   ( + masking key, if any).
1087      * - If the length is > 65_536, a magic value of 127 will be used for
1088      *   the 7-bit field, and the next 8 bytes are expected to be the length,
1089      *   resulting in a 10 bytes header ( + masking key, if any).
1090      *
1091      * Those functions encapsulate all this logic and allow to just get the
1092      * length with the desired size.
1093      *
1094      * Return:
1095      * - For `ubyte`, the value to store in the 7 bits field, either the
1096      *   length or a magic value (126 or 127).
1097      * - For `ushort`, a value in the range [126; 65_536].
1098      *   If payload.length is not in this bound, an assertion will be triggered.
1099      * - For `ulong`, a value in the range [65_537; size_t.max].
1100      *   If payload.length is not in this bound, an assertion will be triggered.
1101      */
1102 	size_t getHeaderSize(bool mask)
1103 	{
1104 		size_t ret = 1;
1105 		if (payload.length < 126) ret += 1;
1106 		else if (payload.length < 65536) ret += 3;
1107 		else ret += 9;
1108 		if (mask) ret += 4;
1109 		return ret;
1110 	}
1111 
1112 	void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
1113 	{
1114 		ubyte[4] buff;
1115 		ubyte firstByte = cast(ubyte)opcode;
1116 		if (fin) firstByte |= 0x80;
1117 		dst[0] = firstByte;
1118 		dst = dst[1 .. $];
1119 
1120 		auto b1 = sys_rng ? 0x80 : 0x00;
1121 
1122 		if (payload.length < 126) {
1123 			dst[0] = cast(ubyte)(b1 | payload.length);
1124 			dst = dst[1 .. $];
1125 		} else if (payload.length < 65536) {
1126 			dst[0] = cast(ubyte) (b1 | 126);
1127 			dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
1128 			dst = dst[3 .. $];
1129 		} else {
1130 			dst[0] = cast(ubyte) (b1 | 127);
1131 			dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
1132 			dst = dst[9 .. $];
1133 		}
1134 
1135 		if (sys_rng) {
1136 			sys_rng.read(dst[0 .. 4]);
1137 			for (size_t i = 0; i < payload.length; i++)
1138 				payload[i] ^= dst[i % 4];
1139 		}
1140 	}
1141 
1142 	static Frame readFrame(InputStream stream)
1143 	{
1144 		Frame frame;
1145 		ubyte[8] data;
1146 
1147 		stream.read(data[0 .. 2]);
1148 		frame.fin = (data[0] & 0x80) != 0;
1149 		frame.opcode = cast(FrameOpcode)(data[0] & 0x0F);
1150 
1151 		bool masked = !!(data[1] & 0b1000_0000);
1152 
1153 		//parsing length
1154 		ulong length = data[1] & 0b0111_1111;
1155 		if (length == 126) {
1156 			stream.read(data[0 .. 2]);
1157 			length = bigEndianToNative!ushort(data[0 .. 2]);
1158 		} else if (length == 127) {
1159 			stream.read(data);
1160 			length = bigEndianToNative!ulong(data);
1161 
1162 			// RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes
1163 			// interpreted as a 64-bit unsigned integer (the most significant
1164 			// bit MUST be 0)
1165 			enforce!WebSocketException(!(length >> 63),
1166 				"Received length has a non-zero most significant bit");
1167 
1168 		}
1169 		logDebug("Read frame: %s %s %s length=%d",
1170 				 frame.opcode,
1171 				 frame.fin ? "final frame" : "continuation",
1172 				 masked ? "masked" : "not masked",
1173 				 length);
1174 
1175 		// Masking key is 32 bits / uint
1176 		if (masked)
1177 			stream.read(data[0 .. 4]);
1178 
1179 		// Read payload
1180 		// TODO: Provide a way to limit the size read, easy
1181 		// DOS for server code here (rejectedsoftware/vibe.d#1496).
1182 		enforce!WebSocketException(length <= size_t.max);
1183 		frame.payload = new ubyte[](cast(size_t)length);
1184 		stream.read(frame.payload);
1185 
1186 		//de-masking
1187 		if (masked)
1188 			foreach (size_t i; 0 .. cast(size_t)length)
1189 				frame.payload[i] = frame.payload[i] ^ data[i % 4];
1190 
1191 		return frame;
1192 	}
1193 }
1194 
1195 unittest {
1196 	import std.algorithm.searching : all;
1197 
1198 	final class DummyRNG : RandomNumberStream {
1199 	@safe:
1200 		@property bool empty() { return false; }
1201 		@property ulong leastSize() { return ulong.max; }
1202 		@property bool dataAvailableForRead() { return true; }
1203 		const(ubyte)[] peek() { return null; }
1204 		size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; }
1205 		alias read = RandomNumberStream.read;
1206 	}
1207 
1208 	ubyte[14] hdrbuf;
1209 	auto rng = new DummyRNG;
1210 
1211 	Frame f;
1212 	f.payload = new ubyte[125];
1213 
1214 	assert(f.getHeaderSize(false) == 2);
1215 	hdrbuf[] = 0;
1216 	f.writeHeader(hdrbuf[0 .. 2], null);
1217 	assert(hdrbuf[0 .. 2] == [0, 125]);
1218 
1219 	assert(f.getHeaderSize(true) == 6);
1220 	hdrbuf[] = 0;
1221 	f.writeHeader(hdrbuf[0 .. 6], rng);
1222 	assert(hdrbuf[0 .. 2] == [0, 128|125]);
1223 	assert(hdrbuf[2 .. 6].all!(b => b == 13));
1224 
1225 	f.payload = new ubyte[126];
1226 	assert(f.getHeaderSize(false) == 4);
1227 	hdrbuf[] = 0;
1228 	f.writeHeader(hdrbuf[0 .. 4], null);
1229 	assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]);
1230 
1231 	assert(f.getHeaderSize(true) == 8);
1232 	hdrbuf[] = 0;
1233 	f.writeHeader(hdrbuf[0 .. 8], rng);
1234 	assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]);
1235 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1236 
1237 	f.payload = new ubyte[65535];
1238 	assert(f.getHeaderSize(false) == 4);
1239 	hdrbuf[] = 0;
1240 	f.writeHeader(hdrbuf[0 .. 4], null);
1241 	assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]);
1242 
1243 	assert(f.getHeaderSize(true) == 8);
1244 	hdrbuf[] = 0;
1245 	f.writeHeader(hdrbuf[0 .. 8], rng);
1246 	assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]);
1247 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1248 
1249 	f.payload = new ubyte[65536];
1250 	assert(f.getHeaderSize(false) == 10);
1251 	hdrbuf[] = 0;
1252 	f.writeHeader(hdrbuf[0 .. 10], null);
1253 	assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]);
1254 
1255 	assert(f.getHeaderSize(true) == 14);
1256 	hdrbuf[] = 0;
1257 	f.writeHeader(hdrbuf[0 .. 14], rng);
1258 	assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]);
1259 	assert(hdrbuf[10 .. 14].all!(b => b == 13));
1260 }
1261 
1262 /**
1263  * Generate a challenge key for the protocol upgrade phase.
1264  */
1265 private string generateChallengeKey(scope RandomNumberStream rng)
1266 {
1267 	ubyte[16] buffer;
1268 	rng.read(buffer);
1269 	return Base64.encode(buffer);
1270 }
1271 
1272 private string computeAcceptKey(string challengekey)
1273 {
1274 	immutable(ubyte)[] b = challengekey.representation;
1275 	immutable(ubyte)[] a = s_webSocketGuid.representation;
1276 	SHA1 hash;
1277 	hash.start();
1278 	hash.put(b);
1279 	hash.put(a);
1280 	auto result = Base64.encode(hash.finish());
1281 	return to!(string)(result);
1282 }