1 /**
2 	Implements WebSocket support and fallbacks for older browsers.
3 
4 	Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
5 	Copyright: © 2012-2014 Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Jan Krüger
8 */
9 module vibe.http.websockets;
10 
11 ///
12 @safe unittest {
13 	void handleConn(scope WebSocket sock)
14 	{
15 		// simple echo server
16 		while (sock.connected) {
17 			auto msg = sock.receiveText();
18 			sock.send(msg);
19 		}
20 	}
21 
22 	void startServer()
23 	{
24 		import vibe.http.router;
25 		auto router = new URLRouter;
26 		router.get("/ws", handleWebSockets(&handleConn));
27 
28 		// Start HTTP server using listenHTTP()...
29 	}
30 }
31 
32 import vibe.core.core;
33 import vibe.core.log;
34 import vibe.core.net;
35 import vibe.core.sync;
36 import vibe.stream.operations;
37 import vibe.http.server;
38 import vibe.http.client;
39 import vibe.core.connectionpool;
40 import vibe.utils.array;
41 
42 import core.time;
43 import std.algorithm: equal, splitter;
44 import std.array;
45 import std.base64;
46 import std.conv;
47 import std.exception;
48 import std.bitmanip;
49 import std.digest.sha;
50 import std.string;
51 import std.functional;
52 import std.uuid;
53 import std.base64;
54 import std.digest.sha;
55 import std.uni: asLowerCase;
56 import vibe.crypto.cryptorand;
57 
58 @safe:
59 
60 
61 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow;
62 
63 
64 /// Exception thrown by $(D vibe.http.websockets).
65 class WebSocketException: Exception
66 {
67 	@safe pure nothrow:
68 
69 	///
70 	this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
71 	{
72 		super(msg, file, line, next);
73 	}
74 
75 	///
76 	this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__)
77 	{
78 		super(msg, next, file, line);
79 	}
80 }
81 
82 /** Establishes a WebSocket connection at the specified endpoint.
83 */
84 WebSocket connectWebSocketEx(URL url,
85 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
86 	const(HTTPClientSettings) settings = defaultSettings)
87 @safe {
88 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
89 	url.schema = use_tls ? "https" : "http";
90 
91 	auto rng = secureRNG();
92 	auto challengeKey = generateChallengeKey(rng);
93 	auto answerKey = computeAcceptKey(challengeKey);
94 	auto res = requestHTTP(url, (scope req){
95 		req.method = HTTPMethod.GET;
96 		req.headers["Upgrade"] = "websocket";
97 		req.headers["Connection"] = "Upgrade";
98 		req.headers["Sec-WebSocket-Version"] = "13";
99 		req.headers["Sec-WebSocket-Key"] = challengeKey;
100 		request_modifier(req);
101 	}, settings);
102 
103 	enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
104 
105 	auto key = "sec-websocket-accept" in res.headers;
106 	enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
107 	enforce(*key == answerKey, "Response has wrong accept key");
108 	auto conn = res.switchProtocol("websocket");
109 	return new WebSocket(conn, rng, res);
110 }
111 
112 /// ditto
113 void connectWebSocketEx(URL url,
114 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
115 	scope WebSocketHandshakeDelegate del,
116 	const(HTTPClientSettings) settings = defaultSettings)
117 @safe {
118 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
119 	url.schema = use_tls ? "https" : "http";
120 
121 	/*scope*/auto rng = secureRNG();
122 	auto challengeKey = generateChallengeKey(rng);
123 	auto answerKey = computeAcceptKey(challengeKey);
124 
125 	requestHTTP(url,
126 		(scope req) {
127 			req.method = HTTPMethod.GET;
128 			req.headers["Upgrade"] = "websocket";
129 			req.headers["Connection"] = "Upgrade";
130 			req.headers["Sec-WebSocket-Version"] = "13";
131 			req.headers["Sec-WebSocket-Key"] = challengeKey;
132 			request_modifier(req);
133 		},
134 		(scope res) {
135 			enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
136 			auto key = "sec-websocket-accept" in res.headers;
137 			enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
138 			enforce(*key == answerKey, "Response has wrong accept key");
139 			res.switchProtocol("websocket", (scope conn) @trusted {
140 				scope ws = new WebSocket(conn, rng, res);
141 				del(ws);
142 				if (ws.connected) ws.close();
143 			});
144 		},
145 		settings
146 	);
147 }
148 
149 /// ditto
150 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings)
151 @safe {
152 	return connectWebSocketEx(url, (scope req) {}, settings);
153 }
154 /// ditto
155 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings)
156 @safe {
157 	connectWebSocketEx(url, (scope req) {}, del, settings);
158 }
159 /// ditto
160 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings)
161 @system {
162 	connectWebSocket(url, (scope ws) nothrow {
163 		try del(ws);
164 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
165 	}, settings);
166 }
167 /// Scheduled for deprecation - use a `@safe` callback instead.
168 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings)
169 @system {
170 	connectWebSocket(url, (scope ws) @trusted => del(ws), settings);
171 }
172 /// Scheduled for deprecation - use a `nothrow` callback instead.
173 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings)
174 @safe {
175 	connectWebSocket(url, (scope ws) nothrow {
176 		try del(ws);
177 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
178 	}, settings);
179 }
180 
181 
182 /**
183 	Establishes a web socket conection and passes it to the $(D on_handshake) delegate.
184 */
185 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
186 @safe {
187 	auto pUpgrade = "Upgrade" in req.headers;
188 	auto pConnection = "Connection" in req.headers;
189 	auto pKey = "Sec-WebSocket-Key" in req.headers;
190 	//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
191 	auto pVersion = "Sec-WebSocket-Version" in req.headers;
192 
193 	auto isUpgrade = false;
194 
195 	if( pConnection ) {
196 		auto connectionTypes = splitter(*pConnection, ",");
197 		foreach( t ; connectionTypes ) {
198 			if( t.strip().asLowerCase().equal("upgrade") ) {
199 				isUpgrade = true;
200 				break;
201 			}
202 		}
203 	}
204 
205 	string req_error;
206 	if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests.";
207 	else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header.";
208 	else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported.";
209 	else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header.";
210 
211 	if (req_error.length) {
212 		logDebug("Browser sent invalid WebSocket request: %s", req_error);
213 		res.statusCode = HTTPStatus.badRequest;
214 		res.writeBody(req_error);
215 		return;
216 	}
217 
218 	auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
219 	res.headers["Sec-WebSocket-Accept"] = accept;
220 	res.headers["Connection"] = "Upgrade";
221 	ConnectionStream conn = res.switchProtocol("websocket");
222 
223 	WebSocket socket = new WebSocket(conn, req, res);
224 	try {
225 		on_handshake(socket);
226 	} catch (Exception e) {
227 		logDiagnostic("WebSocket handler failed: %s", e.msg);
228 	}
229 	socket.close();
230 }
231 /// Scheduled for deprecation - use a `@safe` callback instead.
232 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
233 @system {
234 	handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res);
235 }
236 /// Scheduled for deprecation - use a `nothrow` callback instead.
237 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
238 {
239 	handleWebSocket((scope ws) nothrow {
240 		try on_handshake(ws);
241 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
242 	}, req, res);
243 }
244 /// ditto
245 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
246 @system {
247 	handleWebSocket((scope ws) nothrow {
248 		try on_handshake(ws);
249 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
250 	}, req, res);
251 }
252 
253 
254 /**
255 	Returns a HTTP request handler that establishes web socket conections.
256 */
257 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake)
258 @safe {
259 	return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ());
260 }
261 /// ditto
262 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake)
263 @safe {
264 	void callback(scope HTTPServerRequest req, scope HTTPServerResponse res)
265 	@safe {
266 		auto pUpgrade = "Upgrade" in req.headers;
267 		auto pConnection = "Connection" in req.headers;
268 		auto pKey = "Sec-WebSocket-Key" in req.headers;
269 		//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
270 		auto pVersion = "Sec-WebSocket-Version" in req.headers;
271 
272 		auto isUpgrade = false;
273 
274 		if( pConnection ) {
275 			auto connectionTypes = splitter(*pConnection, ",");
276 			foreach( t ; connectionTypes ) {
277 				if( t.strip().asLowerCase().equal("upgrade") ) {
278 					isUpgrade = true;
279 					break;
280 				}
281 			}
282 		}
283 		if( !(isUpgrade &&
284 			  pUpgrade && icmp(*pUpgrade, "websocket") == 0 &&
285 			  pKey &&
286 			  pVersion && *pVersion == "13") )
287 		{
288 			logDebug("Browser sent invalid WebSocket request.");
289 			res.statusCode = HTTPStatus.badRequest;
290 			res.writeVoidBody();
291 			return;
292 		}
293 
294 		auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
295 		res.headers["Sec-WebSocket-Accept"] = accept;
296 		res.headers["Connection"] = "Upgrade";
297 		res.switchProtocol("websocket", (scope conn) {
298 			// TODO: put back 'scope' once it is actually enforced by DMD
299 			/*scope*/ auto socket = new WebSocket(conn, req, res);
300 			try on_handshake(socket);
301 			catch (Exception e) {
302 				logDiagnostic("WebSocket handler failed: %s", e.msg);
303 			}
304 			socket.close();
305 		});
306 	}
307 	return &callback;
308 }
309 /// Scheduled for deprecation - use a `@safe` callback instead.
310 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake)
311 @system {
312 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
313 }
314 /// Scheduled for deprecation - use a `@safe` callback instead.
315 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake)
316 @system {
317 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
318 }
319 /// Scheduled for deprecation - use a `nothrow` callback instead.
320 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake)
321 {
322 	return handleWebSockets(delegate (scope ws) nothrow {
323 		try on_handshake(ws);
324 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
325 	});
326 }
327 /// ditto
328 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake)
329 {
330 	return handleWebSockets(delegate (scope ws) nothrow {
331 		try on_handshake(ws);
332 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
333 	});
334 }
335 /// ditto
336 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake)
337 @system {
338 	return handleWebSockets(delegate (scope ws) nothrow {
339 		try on_handshake(ws);
340 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
341 	});
342 }
343 /// ditto
344 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake)
345 @system {
346 	return handleWebSockets(delegate (scope ws) nothrow {
347 		try on_handshake(ws);
348 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
349 	});
350 }
351 
352 /**
353  * Provides the reason that a websocket connection has closed.
354  *
355  * Further documentation for the WebSocket and it's codes can be found from:
356  * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
357  *
358  * ---
359  *
360  * void echoSocket(scope WebSocket sock)
361  * {
362  *   import std.datetime : seconds;
363  *
364  *   while(sock.waitForData(3.seconds))
365  *   {
366  *     string msg = sock.receiveText;
367  *     logInfo("Got a message: %s", msg);
368  *     sock.send(msg);
369  *   }
370  *
371  *   if(sock.connected)
372  *     sock.close(WebSocketCloseReason.policyViolation, "timeout");
373  * }
374  * ---
375  */
376 enum WebSocketCloseReason : short
377 {
378 	none = 0,
379 	normalClosure = 1000,
380 	goingAway = 1001,
381 	protocolError = 1002,
382 	unsupportedData = 1003,
383 	noStatusReceived = 1005,
384 	abnormalClosure = 1006,
385 	invalidFramePayloadData = 1007,
386 	policyViolation = 1008,
387 	messageTooBig = 1009,
388 	internalError = 1011,
389 	serviceRestart = 1012,
390 	tryAgainLater = 1013,
391 	badGateway = 1014,
392 	tlsHandshake = 1015
393 }
394 
395 string closeReasonString(WebSocketCloseReason reason) @nogc @safe
396 {
397 	import std.math : floor;
398 
399 	//round down to the nearest thousand to get category
400 	switch(cast(short)(cast(float)reason / 1000f).floor)
401 	{
402 		case 0:
403 			return "Reserved and Unused";
404 		case 1:
405 			switch(reason)
406 			{
407 				case 1000:
408 					return "Normal Closure";
409 				case 1001:
410 					return "Going Away";
411 				case 1002:
412 					return "Protocol Error";
413 				case 1003:
414 					return "Unsupported Data";
415 				case 1004:
416 					return "RESERVED";
417 				case 1005:
418 					return "No Status Recvd";
419 				case 1006:
420 					return "Abnormal Closure";
421 				case 1007:
422 					return "Invalid Frame Payload Data";
423 				case 1008:
424 					return "Policy Violation";
425 				case 1009:
426 					return "Message Too Big";
427 				case 1010:
428 					return "Missing Extension";
429 				case 1011:
430 					return "Internal Error";
431 				case 1012:
432 					return "Service Restart";
433 				case 1013:
434 					return "Try Again Later";
435 				case 1014:
436 					return "Bad Gateway";
437 				case 1015:
438 					return "TLS Handshake";
439 				default:
440 					return "RESERVED";
441 			}
442 		case 2:
443 			return "Reserved for extensions";
444 		case 3:
445 			return "Available for frameworks and libraries";
446 		case 4:
447 			return "Available for applications";
448 		default:
449 			return "UNDEFINED - Nasal Demons";
450 	}
451 }
452 
453 unittest
454 {
455 	assert((cast(WebSocketCloseReason)   0).closeReasonString == "Reserved and Unused");
456 	assert((cast(WebSocketCloseReason)   1).closeReasonString == "Reserved and Unused");
457 	assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure");
458 	assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure");
459 	assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED");
460 	assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions");
461 	assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries");
462 	assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications");
463 	assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons");
464 	assert((cast(WebSocketCloseReason)  -1).closeReasonString == "UNDEFINED - Nasal Demons");
465 
466 	//check the other spec cases
467 	for(short i = 1000; i < 1017; i++)
468 	{
469 		if(i == 1004 || i > 1015)
470 		{
471 			assert(
472 				(cast(WebSocketCloseReason)i).closeReasonString == "RESERVED",
473 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
474 			);
475 		}
476 		else
477 			assert(
478 				(cast(WebSocketCloseReason)i).closeReasonString != "RESERVED",
479 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
480 			);
481 	}
482 }
483 
484 
485 /**
486  * Represents a single _WebSocket connection.
487  *
488  * ---
489  * shared static this ()
490  * {
491  *   runTask(() => connectToWS());
492  * }
493  *
494  * void connectToWS ()
495  * {
496  *   auto ws_url = URL("wss://websockets.example.com/websocket/auth_token");
497  *   auto ws = connectWebSocket(ws_url);
498  *   logInfo("WebSocket connected");
499  *
500  *   while (ws.waitForData())
501  *   {
502  *     auto txt = ws.receiveText;
503  *     logInfo("Received: %s", txt);
504  *   }
505  *   logFatal("Connection lost!");
506  * }
507  * ---
508  */
509 final class WebSocket {
510 @safe:
511 
512 	private {
513 		ConnectionStream m_conn;
514 		bool m_sentCloseFrame = false;
515 		IncomingWebSocketMessage m_nextMessage = null;
516 		const HTTPServerRequest m_request;
517 		HTTPServerResponse m_serverResponse;
518 		HTTPClientResponse m_clientResponse;
519 		Task m_reader;
520 		Task m_ownerTask;
521 		InterruptibleTaskMutex m_readMutex, m_writeMutex;
522 		InterruptibleTaskCondition m_readCondition;
523 		Timer m_pingTimer;
524 		uint m_lastPingIndex;
525 		bool m_pongReceived;
526 		short m_closeCode;
527 		const(char)[] m_closeReason;
528 		/// The entropy generator to use
529 		/// If not null, it means this is a server socket.
530 		RandomNumberStream m_rng;
531 	}
532 
533 	/**
534 	 * Private constructor, called from `connectWebSocket`.
535 	 *
536 	 * Params:
537 	 *	 conn = Underlying connection string
538 	 *	 request = HTTP request used to establish the connection
539 	 *	 rng = Source of entropy to use.  If null, assume we're a server socket
540 	 *   client_res = For client sockets, the response object (keeps the http client locked until the socket is done)
541 	 */
542 	private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res)
543 	{
544 		m_ownerTask = Task.getThis();
545 		m_conn = conn;
546 		m_request = request;
547 		m_clientResponse = client_res;
548 		m_serverResponse = server_res;
549 		assert(m_conn);
550 		m_rng = rng;
551 		m_writeMutex = new InterruptibleTaskMutex;
552 		m_readMutex = new InterruptibleTaskMutex;
553 		m_readCondition = new InterruptibleTaskCondition(m_readMutex);
554 		m_readMutex.performLocked!({
555 			m_reader = runTask(&startReader);
556 			if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) {
557 				m_pongReceived = true;
558 				m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true);
559 			}
560 		});
561 	}
562 
563 	private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res)
564 	{
565 		this(conn, null, null, rng, client_res);
566 	}
567 
568 	private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res)
569 	{
570 		this(conn, request, res, null, null);
571 	}
572 
573 	/**
574 		Determines if the WebSocket connection is still alive and ready for sending.
575 
576 		Note that for determining the ready state for $(EM reading), you need
577 		to use $(D waitForData) instead, because both methods can return
578 		different values while a disconnect is in proress.
579 
580 		See_also: $(D waitForData)
581 	*/
582 	@property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; }
583 
584 	/**
585 		Returns the close code sent by the remote end.
586 
587 		Note if the connection was never opened, is still alive, or was closed
588 		locally this value will be 0. If no close code was given by the remote
589 		end in the close frame, the value will be 1005. If the connection was
590 		not closed cleanly by the remote end, this value will be 1006.
591 	*/
592 	@property short closeCode() { return m_closeCode; }
593 
594 	/**
595 		Returns the close reason sent by the remote end.
596 
597 		Note if the connection was never opened, is still alive, or was closed
598 		locally this value will be an empty string.
599 	*/
600 	@property const(char)[] closeReason() { return m_closeReason; }
601 
602 	/**
603 		The HTTP request that established the web socket connection.
604 	*/
605 	@property const(HTTPServerRequest) request() const { return m_request; }
606 
607 	/**
608 		Checks if data is readily available for read.
609 	*/
610 	@property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; }
611 
612 	/** Waits until either a message arrives or until the connection is closed.
613 
614 		This function can be used in a read loop to cleanly determine when to stop reading.
615 	*/
616 	bool waitForData()
617 	{
618 		if (m_nextMessage) return true;
619 
620 		m_readMutex.performLocked!({
621 			while (connected && m_nextMessage is null)
622 				m_readCondition.wait();
623 		});
624 		return m_nextMessage !is null;
625 	}
626 
627 	/// ditto
628 	bool waitForData(Duration timeout)
629 	{
630 		import std.datetime;
631 
632 		if (m_nextMessage) return true;
633 
634 		immutable limit_time = Clock.currTime(UTC()) + timeout;
635 
636 		m_readMutex.performLocked!({
637 			while (connected && m_nextMessage is null && timeout > 0.seconds) {
638 				m_readCondition.wait(timeout);
639 				timeout = limit_time - Clock.currTime(UTC());
640 			}
641 		});
642 		return m_nextMessage !is null;
643 	}
644 
645 	/**
646 		Sends a text message.
647 
648 		On the JavaScript side, the text will be available as message.data (type string).
649 
650 		Throws:
651 			A `WebSocketException` is thrown if the connection gets closed
652 			before or during the transfer of the message.
653 	*/
654 	void send(scope const(char)[] data)
655 	{
656 		send(
657 			(scope message) { message.write(cast(const ubyte[])data); },
658 			FrameOpcode.text);
659 	}
660 
661 	/**
662 		Sends a binary message.
663 
664 		On the JavaScript side, the text will be available as message.data (type Blob).
665 
666 		Throws:
667 			A `WebSocketException` is thrown if the connection gets closed
668 			before or during the transfer of the message.
669 	*/
670 	void send(in ubyte[] data)
671 	{
672 		send((scope message){ message.write(data); }, FrameOpcode.binary);
673 	}
674 
675 	/**
676 		Sends a message using an output stream.
677 
678 		Throws:
679 			A `WebSocketException` is thrown if the connection gets closed
680 			before or during the transfer of the message.
681 	*/
682 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode)
683 	{
684 		m_writeMutex.performLocked!({
685 			enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
686 			/*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng);
687 			scope(exit) message.finalize();
688 			sender(message);
689 		});
690 	}
691 
692 	/// Compatibility overload - will be removed soon.
693 	deprecated("Call the overload which requires an explicit FrameOpcode.")
694 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender)
695 	{
696 		send(sender, FrameOpcode.text);
697 	}
698 
699 	/**
700 		Actively closes the connection.
701 
702 		Params:
703 			code = Numeric code indicating a termination reason.
704 			reason = Message describing why the connection was terminated.
705 	*/
706 	void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "")
707 	{
708 		import std.algorithm.comparison : min;
709 		if(reason !is null && reason.length == 0)
710 			reason = (cast(WebSocketCloseReason)code).closeReasonString;
711 
712 		//control frame payloads are limited to 125 bytes
713 		version(assert)
714 			assert(reason.length <= 123);
715 		else
716 			reason = reason[0 .. min($, 123)];
717 
718 		if (connected) {
719 			try {
720 				send((scope msg) {
721 					m_sentCloseFrame = true;
722 					if (code != 0) {
723 						msg.write(std.bitmanip.nativeToBigEndian(code));
724 						msg.write(cast(const ubyte[])reason);
725 					}
726 				}, FrameOpcode.close);
727 			} catch (Exception e) {
728 				logDiagnostic("Failed to send active web socket close frame: %s", e.msg);
729 			}
730 		}
731 		if (m_pingTimer) m_pingTimer.stop();
732 
733 
734 		if (Task.getThis() == m_ownerTask) {
735 			m_writeMutex.performLocked!({
736 				if (m_clientResponse) {
737 					m_clientResponse.disconnect();
738 					m_clientResponse = HTTPClientResponse.init;
739 				}
740 				if (m_serverResponse) {
741 					m_serverResponse.finalize();
742 					m_serverResponse = HTTPServerResponse.init;
743 				}
744 			});
745 
746 			m_reader.join();
747 
748 			() @trusted { destroy(m_conn); } ();
749 			m_conn = ConnectionStream.init;
750 		}
751 	}
752 
753 	/**
754 		Receives a new message and returns its contents as a newly allocated data array.
755 
756 		Params:
757 			strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise.
758 		Throws: WebSocketException if the connection is closed or
759 			if $(D strict == true) and the frame received is not the right type
760 	*/
761 	ubyte[] receiveBinary(bool strict = true)
762 	{
763 		ubyte[] ret;
764 		receive((scope message){
765 			enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary,
766 				"Expected a binary message, got "~message.frameOpcode.to!string());
767 			ret = message.readAll();
768 		});
769 		return ret;
770 	}
771 	/// ditto
772 	string receiveText(bool strict = true)
773 	{
774 		string ret;
775 		receive((scope message){
776 			enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text,
777 				"Expected a text message, got "~message.frameOpcode.to!string());
778 			ret = message.readAllUTF8();
779 		});
780 		return ret;
781 	}
782 
783 	/**
784 		Receives a new message using an InputStream.
785 		Throws: WebSocketException if the connection is closed.
786 	*/
787 	void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver)
788 	{
789 		m_readMutex.performLocked!({
790 			while (!m_nextMessage) {
791 				enforce!WebSocketException(connected, "Connection closed while reading message.");
792 				m_readCondition.wait();
793 			}
794 			receiver(m_nextMessage);
795 			m_nextMessage = null;
796 			m_readCondition.notifyAll();
797 		});
798 	}
799 
800 	private void startReader()
801 	nothrow {
802 		try m_readMutex.performLocked!({}); //Wait until initialization
803 		catch (Exception e) {
804 			logException(e, "WebSocket reader task failed to wait for initialization");
805 			try m_conn.close();
806 			catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure");
807 			m_closeCode = WebSocketCloseReason.abnormalClosure;
808 			try m_readCondition.notifyAll();
809 			catch (Exception e) assert(false, e.msg);
810 			return;
811 		}
812 
813 		try {
814 			loop:
815 			while (!m_conn.empty) {
816 				assert(!m_nextMessage);
817 				/*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng);
818 
819 				switch (msg.frameOpcode) {
820 					default: throw new WebSocketException("unknown frame opcode");
821 					case FrameOpcode.ping:
822 						send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong);
823 						break;
824 					case FrameOpcode.pong:
825 						// test if pong matches previous ping
826 						if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) {
827 							logDebugV("Received PONG that doesn't match previous ping.");
828 							break;
829 						}
830 						logDebugV("Received matching PONG.");
831 						m_pongReceived = true;
832 						break;
833 					case FrameOpcode.close:
834 						logDebug("Got closing frame (%s)", m_sentCloseFrame);
835 
836 						// If no close code was passed, we default to 1005
837 						this.m_closeCode = WebSocketCloseReason.noStatusReceived;
838 
839 						// If provided in the frame, attempt to parse the close code/reason
840 						if (msg.peek().length >= short.sizeof) {
841 							this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);
842 
843 							if (msg.peek().length > short.sizeof) {
844 								this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
845 							}
846 						}
847 
848 						if(!m_sentCloseFrame) close();
849 						logDebug("Terminating connection (%s)", m_sentCloseFrame);
850 						break loop;
851 					case FrameOpcode.text:
852 					case FrameOpcode.binary:
853 					case FrameOpcode.continuation: // FIXME: add proper support for continuation frames!
854 						m_readMutex.performLocked!({
855 							m_nextMessage = msg;
856 							m_readCondition.notifyAll();
857 							while (m_nextMessage) m_readCondition.wait();
858 						});
859 						break;
860 				}
861 			}
862 		} catch (Exception e) {
863 			logDiagnostic("Error while reading websocket message: %s", e.msg);
864 			logDiagnostic("Closing connection.");
865 		}
866 
867 		// If no close code was passed, e.g. this was an unclean termination
868 		//  of our websocket connection, set the close code to 1006.
869 		if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure;
870 
871 		try m_conn.close();
872 		catch (Exception e) logException(e, "Failed to close WebSocket connection");
873 		try m_readCondition.notifyAll();
874 		catch (Exception e) assert(false, e.msg);
875 	}
876 
877 	private void sendPing()
878 	nothrow {
879 		try {
880 			if (!m_pongReceived) {
881 				logDebug("Pong skipped. Closing connection.");
882 				close();
883 				m_pingTimer.stop();
884 				return;
885 			}
886 			m_pongReceived = false;
887 			send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
888 			logDebugV("Ping sent");
889 		} catch (Exception e) {
890 			logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
891 		}
892 	}
893 }
894 
895 /**
896 	Represents a single outgoing _WebSocket message as an OutputStream.
897 */
898 final class OutgoingWebSocketMessage : OutputStream {
899 @safe:
900 	private {
901 		RandomNumberStream m_rng;
902 		Stream m_conn;
903 		FrameOpcode m_frameOpcode;
904 		Appender!(ubyte[]) m_buffer;
905 		bool m_finalized = false;
906 	}
907 
908 	private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
909 	{
910 		assert(conn !is null);
911 		m_conn = conn;
912 		m_frameOpcode = frameOpcode;
913 		m_rng = rng;
914 	}
915 
916 	size_t write(in ubyte[] bytes, IOMode mode)
917 	{
918 		assert(!m_finalized);
919 
920 		if (!m_buffer.data.length) {
921 			ubyte[Frame.maxHeaderSize] header_padding;
922 			m_buffer.put(header_padding[]);
923 		}
924 
925 		m_buffer.put(bytes);
926 		return bytes.length;
927 	}
928 
929 	void flush()
930 	{
931 		assert(!m_finalized);
932 		if (m_buffer.data.length > 0)
933 			sendFrame(false);
934 	}
935 
936 	void finalize()
937 	{
938 		if (m_finalized) return;
939 		m_finalized = true;
940 		sendFrame(true);
941 	}
942 
943 	private void sendFrame(bool fin)
944 	{
945 		if (!m_buffer.data.length)
946 			write(null, IOMode.once);
947 
948 		assert(m_buffer.data.length >= Frame.maxHeaderSize);
949 
950 		Frame frame;
951 		frame.fin = fin;
952 		frame.opcode = m_frameOpcode;
953 		frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
954 		auto hsize = frame.getHeaderSize(m_rng !is null);
955 		auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
956 		frame.writeHeader(msg[0 .. hsize], m_rng);
957 		m_conn.write(msg);
958 		m_conn.flush();
959 		m_buffer.clear();
960 	}
961 
962 	alias write = OutputStream.write;
963 }
964 
965 
966 /**
967 	Represents a single incoming _WebSocket message as an InputStream.
968 */
969 final class IncomingWebSocketMessage : InputStream {
970 @safe:
971 	private {
972 		RandomNumberStream m_rng;
973 		Stream m_conn;
974 		Frame m_currentFrame;
975 	}
976 
977 	private this(Stream conn, RandomNumberStream rng)
978 	{
979 		assert(conn !is null);
980 		m_conn = conn;
981 		m_rng = rng;
982 		skipFrame(); // reads the first frame
983 	}
984 
985 	@property bool empty() const { return m_currentFrame.payload.length == 0; }
986 
987 	@property ulong leastSize() const { return m_currentFrame.payload.length; }
988 
989 	@property bool dataAvailableForRead() { return true; }
990 
991 	/// The frame type for this nessage;
992 	@property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; }
993 
994 	const(ubyte)[] peek() { return m_currentFrame.payload; }
995 
996 	/**
997 	 * Retrieve the next websocket frame of the stream and discard the current
998 	 * one
999 	 *
1000 	 * This function is helpful if one wish to process frames by frames,
1001 	 * or minimize memory allocation, as `peek` will only return the current
1002 	 * frame data, and read requires a pre-allocated buffer.
1003 	 *
1004 	 * Returns:
1005 	 * `false` if the current frame is the final one, `true` if a new frame
1006 	 * was read.
1007 	 */
1008 	bool skipFrame()
1009 	{
1010 		if (m_currentFrame.fin)
1011 			return false;
1012 
1013 		m_currentFrame = Frame.readFrame(m_conn);
1014 		return true;
1015 	}
1016 
1017 	size_t read(scope ubyte[] dst, IOMode mode)
1018 	{
1019 		size_t nread = 0;
1020 
1021 		while (dst.length > 0) {
1022 			enforce!WebSocketException(!empty , "cannot read from empty stream");
1023 			enforce!WebSocketException(leastSize > 0, "no data available" );
1024 
1025 			import std.algorithm : min;
1026 			auto sz = cast(size_t)min(leastSize, dst.length);
1027 			dst[0 .. sz] = m_currentFrame.payload[0 .. sz];
1028 			dst = dst[sz .. $];
1029 			m_currentFrame.payload = m_currentFrame.payload[sz .. $];
1030 			nread += sz;
1031 
1032 			if (leastSize == 0) {
1033 				if (mode == IOMode.immediate || mode == IOMode.once && nread > 0)
1034 					break;
1035 				this.skipFrame();
1036 			}
1037 		}
1038 
1039 		return nread;
1040 	}
1041 
1042 	alias read = InputStream.read;
1043 }
1044 
1045 /// Magic string defined by the RFC for challenging the server during upgrade
1046 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1047 
1048 
1049 /**
1050  * The Opcode is 4 bits, as defined in Section 5.2
1051  *
1052  * Values are defined in section 11.8
1053  * Currently only 6 values are defined, however the opcode is defined as
1054  * taking 4 bits.
1055  */
1056 public enum FrameOpcode : ubyte {
1057 	continuation = 0x0,
1058 	text = 0x1,
1059 	binary = 0x2,
1060 	close = 0x8,
1061 	ping = 0x9,
1062 	pong = 0xA
1063 }
1064 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits");
1065 
1066 
1067 private struct Frame {
1068 @safe:
1069 	enum maxHeaderSize = 14;
1070 
1071 	bool fin;
1072 	FrameOpcode opcode;
1073 	ubyte[] payload;
1074 
1075     /**
1076      * Return the header length encoded with the expected amount of bits
1077      *
1078      * The WebSocket RFC define a variable-length payload length.
1079      * In short, it means that:
1080      * - If the length is <= 125, it is stored as the 7 least significant
1081      *   bits of the second header byte.  The first bit is reserved for MASK.
1082      * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of
1083      *   126 is stored in the aforementioned 7 bits, and the actual length
1084      *   is stored in the next two bytes, resulting in a 4 bytes header
1085      *   ( + masking key, if any).
1086      * - If the length is > 65_536, a magic value of 127 will be used for
1087      *   the 7-bit field, and the next 8 bytes are expected to be the length,
1088      *   resulting in a 10 bytes header ( + masking key, if any).
1089      *
1090      * Those functions encapsulate all this logic and allow to just get the
1091      * length with the desired size.
1092      *
1093      * Return:
1094      * - For `ubyte`, the value to store in the 7 bits field, either the
1095      *   length or a magic value (126 or 127).
1096      * - For `ushort`, a value in the range [126; 65_536].
1097      *   If payload.length is not in this bound, an assertion will be triggered.
1098      * - For `ulong`, a value in the range [65_537; size_t.max].
1099      *   If payload.length is not in this bound, an assertion will be triggered.
1100      */
1101 	size_t getHeaderSize(bool mask)
1102 	{
1103 		size_t ret = 1;
1104 		if (payload.length < 126) ret += 1;
1105 		else if (payload.length < 65536) ret += 3;
1106 		else ret += 9;
1107 		if (mask) ret += 4;
1108 		return ret;
1109 	}
1110 
1111 	void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
1112 	{
1113 		ubyte[4] buff;
1114 		ubyte firstByte = cast(ubyte)opcode;
1115 		if (fin) firstByte |= 0x80;
1116 		dst[0] = firstByte;
1117 		dst = dst[1 .. $];
1118 
1119 		auto b1 = sys_rng ? 0x80 : 0x00;
1120 
1121 		if (payload.length < 126) {
1122 			dst[0] = cast(ubyte)(b1 | payload.length);
1123 			dst = dst[1 .. $];
1124 		} else if (payload.length < 65536) {
1125 			dst[0] = cast(ubyte) (b1 | 126);
1126 			dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
1127 			dst = dst[3 .. $];
1128 		} else {
1129 			dst[0] = cast(ubyte) (b1 | 127);
1130 			dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
1131 			dst = dst[9 .. $];
1132 		}
1133 
1134 		if (sys_rng) {
1135             sys_rng.read(dst[0 .. 4]);
1136 			for (size_t i = 0; i < payload.length; i++)
1137 				payload[i] ^= dst[i % 4];
1138 		}
1139 	}
1140 
1141 	static Frame readFrame(InputStream stream)
1142 	{
1143 		Frame frame;
1144 		ubyte[8] data;
1145 
1146 		stream.read(data[0 .. 2]);
1147 		frame.fin = (data[0] & 0x80) != 0;
1148 		frame.opcode = cast(FrameOpcode)(data[0] & 0x0F);
1149 
1150 		bool masked = !!(data[1] & 0b1000_0000);
1151 
1152 		//parsing length
1153 		ulong length = data[1] & 0b0111_1111;
1154 		if (length == 126) {
1155 			stream.read(data[0 .. 2]);
1156 			length = bigEndianToNative!ushort(data[0 .. 2]);
1157 		} else if (length == 127) {
1158 			stream.read(data);
1159 			length = bigEndianToNative!ulong(data);
1160 
1161 			// RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes
1162 			// interpreted as a 64-bit unsigned integer (the most significant
1163 			// bit MUST be 0)
1164 			enforce!WebSocketException(!(length >> 63),
1165 				"Received length has a non-zero most significant bit");
1166 
1167 		}
1168 		logDebug("Read frame: %s %s %s length=%d",
1169 				 frame.opcode,
1170 				 frame.fin ? "final frame" : "continuation",
1171 				 masked ? "masked" : "not masked",
1172 				 length);
1173 
1174 		// Masking key is 32 bits / uint
1175 		if (masked)
1176 			stream.read(data[0 .. 4]);
1177 
1178 		// Read payload
1179 		// TODO: Provide a way to limit the size read, easy
1180 		// DOS for server code here (rejectedsoftware/vibe.d#1496).
1181 		enforce!WebSocketException(length <= size_t.max);
1182 		frame.payload = new ubyte[](cast(size_t)length);
1183 		stream.read(frame.payload);
1184 
1185 		//de-masking
1186 		if (masked)
1187 			foreach (size_t i; 0 .. cast(size_t)length)
1188 				frame.payload[i] = frame.payload[i] ^ data[i % 4];
1189 
1190 		return frame;
1191 	}
1192 }
1193 
1194 unittest {
1195 	import std.algorithm.searching : all;
1196 
1197 	final class DummyRNG : RandomNumberStream {
1198 	@safe:
1199 		@property bool empty() { return false; }
1200 		@property ulong leastSize() { return ulong.max; }
1201 		@property bool dataAvailableForRead() { return true; }
1202 		const(ubyte)[] peek() { return null; }
1203 		size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; }
1204 		alias read = RandomNumberStream.read;
1205 	}
1206 
1207 	ubyte[14] hdrbuf;
1208 	auto rng = new DummyRNG;
1209 
1210 	Frame f;
1211 	f.payload = new ubyte[125];
1212 
1213 	assert(f.getHeaderSize(false) == 2);
1214 	hdrbuf[] = 0;
1215 	f.writeHeader(hdrbuf[0 .. 2], null);
1216 	assert(hdrbuf[0 .. 2] == [0, 125]);
1217 
1218 	assert(f.getHeaderSize(true) == 6);
1219 	hdrbuf[] = 0;
1220 	f.writeHeader(hdrbuf[0 .. 6], rng);
1221 	assert(hdrbuf[0 .. 2] == [0, 128|125]);
1222 	assert(hdrbuf[2 .. 6].all!(b => b == 13));
1223 
1224 	f.payload = new ubyte[126];
1225 	assert(f.getHeaderSize(false) == 4);
1226 	hdrbuf[] = 0;
1227 	f.writeHeader(hdrbuf[0 .. 4], null);
1228 	assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]);
1229 
1230 	assert(f.getHeaderSize(true) == 8);
1231 	hdrbuf[] = 0;
1232 	f.writeHeader(hdrbuf[0 .. 8], rng);
1233 	assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]);
1234 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1235 
1236 	f.payload = new ubyte[65535];
1237 	assert(f.getHeaderSize(false) == 4);
1238 	hdrbuf[] = 0;
1239 	f.writeHeader(hdrbuf[0 .. 4], null);
1240 	assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]);
1241 
1242 	assert(f.getHeaderSize(true) == 8);
1243 	hdrbuf[] = 0;
1244 	f.writeHeader(hdrbuf[0 .. 8], rng);
1245 	assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]);
1246 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1247 
1248 	f.payload = new ubyte[65536];
1249 	assert(f.getHeaderSize(false) == 10);
1250 	hdrbuf[] = 0;
1251 	f.writeHeader(hdrbuf[0 .. 10], null);
1252 	assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]);
1253 
1254 	assert(f.getHeaderSize(true) == 14);
1255 	hdrbuf[] = 0;
1256 	f.writeHeader(hdrbuf[0 .. 14], rng);
1257 	assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]);
1258 	assert(hdrbuf[10 .. 14].all!(b => b == 13));
1259 }
1260 
1261 /**
1262  * Generate a challenge key for the protocol upgrade phase.
1263  */
1264 private string generateChallengeKey(scope RandomNumberStream rng)
1265 {
1266 	ubyte[16] buffer;
1267 	rng.read(buffer);
1268 	return Base64.encode(buffer);
1269 }
1270 
1271 private string computeAcceptKey(string challengekey)
1272 {
1273 	immutable(ubyte)[] b = challengekey.representation;
1274 	immutable(ubyte)[] a = s_webSocketGuid.representation;
1275 	SHA1 hash;
1276 	hash.start();
1277 	hash.put(b);
1278 	hash.put(a);
1279 	auto result = Base64.encode(hash.finish());
1280 	return to!(string)(result);
1281 }