1 
2 /** Web based, bi-directional, concurrent RPC implementation.
3 
4 	This module implements a generic RPC mechanism that allows transparently
5 	calling remote functions over an HTTP based network connection. The current
6 	implementation is based on a WebSocket based protocol, serializing method
7 	arguments and return types as BSON.
8 
9 	The RPC API is defined using interfaces, very similar to the system in
10 	`vibe.web.rest`. It supports methods with or without a return value, normal,
11 	`ref` and `out` parameters, exceptions, properties returning interfaces,
12 	and properties returning `vibe.web.rest.Collection!I`.
13 
14 	Authorization and authentication is supported via the `vibe.web.auth`
15 	framework. When using it, the `authenticate` method should be defined as
16 	`@noRoute T authenticate(ref const WebRPCPerrInfo)`, where `T` is the type
17 	passed to the `@requiresAuth` UDA.
18 
19 	Any remote function calls can execute concurrently, so that the connection
20 	never gets blocked by an unfinished function call.
21 
22 	Note that this system will establish a bi-directional communication
23 	facility, allowing both, the client and the server, to initiate calls. This
24 	effectively forms a peer-to-peer connection instead of a client-server
25 	connection. The advantage of using HTTP as the basis is that this makes it
26 	easy to establish P2P connections where one of the peers is behind a
27 	firewall or NAT layer, but the other peer can be reached through a public
28 	port or through a (reverse) proxy server.
29 
30 
31 	Defining_a_simple_RPC_interface:
32 
33 	The API for the interface is defined as a normal D interface:
34 
35 	---
36 	interface ExampleAPI {
37 		void performSomeAction();
38 		int getSomeInformation();
39 	}
40 	---
41 
42 	An implementation of this interface is required on both, the server and the
43 	client side:
44 
45 	---
46 	class ExampleAPIImplementation : ExampleAPI {
47 		void performSomeAction() { ... }
48 		int getSomeInformation() { return ...; }
49 	}
50 	---
51 
52 	With this defined, this is the basic code needed to set up the server side:
53 
54 	---
55 	void handleIncomingPeer(WebRPCPeer!ExampleAPI peer)
56 	@safe nothrow {
57 		// this gets executed for any client that connects to the server
58 		try {
59 			peer.performSomeAction();
60 		} catch (Exception e) {
61 			logException(e, "Failed to perform peer action");
62 		}
63 	}
64 
65 	auto r = new URLRouter;
66 	r.registerWebRPC!ExampleAPI(r, "/rpc", new ExampleAPIImplementation, &handlePeer);
67 	// could register other routes here, such as for a web or REST interface
68 
69 	auto l = listenHTTP("127.0.0.1:1234", r);
70 	---
71 
72 	A client can now connect to the server and access the API as well:
73 
74 	---
75 	auto peer = connectWebRPC(URL("http://127.0.0.1:1234/rpc"),
76 		new ExampleAPIImplementation);
77 
78 	peer.performSomeAction();
79 	---
80 
81 
82 	Copyright: © 2024 Sönke Ludwig
83 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
84 	Authors: Sönke Ludwig
85 */
86 module vibe.web.rpc;
87 
88 import vibe.core.log;
89 import vibe.core.core : Task, runTask, yield;
90 import vibe.core.path : InetPath;
91 import vibe.core.stream : isInputStream;
92 import vibe.data.bson;
93 import vibe.inet.url : URL;
94 import vibe.http.router;
95 import vibe.http.server;
96 import vibe.http.websockets;
97 import vibe.stream.tls : TLSCertificateInformation;
98 import vibe.web.internal.rest.common : RestInterface, SubInterfaceType;
99 import vibe.web.auth;
100 import vibe.web.common;
101 import vibe.web.rest : Collection;
102 
103 import std.meta;
104 import std.traits;
105 
106 
107 alias WebRPCPeerCallback(I) = void delegate(WebRPCPeer!I peer) @safe nothrow;
108 
109 
110 /** Registers a route for handling incoming WebRPC requests.
111 
112 	The endpoint defined by `path` will attempt to establish a WebSocket
113 	connection with the client and subsequently enables bi-directional
114 	communication by listening for calls made by the client, as well as invoking
115 	the `peer_callback` to allow the server to make calls, too.
116 
117 	Params:
118 		router = The `URLRouter` on which to register the endpoint
119 		path = Path of the registered endpoint
120 		implementation = The API implementation to invoke for incoming method
121 			calls
122 		peer_callback = Callback invoked for each incoming connection
123 */
124 void registerWebRPC(I)(URLRouter router, string path, I implementation,
125 	WebRPCPeerCallback!I peer_callback)
126 	if (is(I == interface))
127 {
128 	router.get(path, (scope HTTPServerRequest req, scope HTTPServerResponse res) => handleWebRPC!I(implementation, peer_callback, req, res));
129 }
130 
131 
132 /** Connects to a WebRPC endpoint.
133 
134 	This will perform a HTTP GET request to the supplied `url` and attempts
135 	to establish a WebSocket connection for bi-directional communication.
136 	Incoming method calls will be forwarded to `implementation`.
137 
138 	Params:
139 		url = URL of the endpoint to connect to
140 		implementation = The API implementation to invoke for incoming method
141 			calls
142 
143 	Returns:
144 		A `WebRPCPeer` instance is returned, which exposes the API interface `I`
145 		for making outgoing method calls.
146 */
147 WebRPCPeer!I connectWebRPC(I)(URL url, I implementation)
148 	if (is(I == interface))
149 {
150 	WebRPCPeerInfo info;
151 	auto ws = connectWebSocketEx(url, (scope req) {
152 		info.address = req.remoteAddress;
153 		info.certificate = req.peerCertificate;
154 	});
155 	auto h = new WebSocketHandler!I(ws, implementation, info);
156 	runTask(&h.runReadLoop);
157 
158 	return WebRPCPeer!I(new WebRPCPeerImpl!(I, I, "")(h));
159 }
160 
161 
162 /** Provides information about a peer;
163 */
164 struct WebRPCPeerInfo {
165 	// (Remote) address of the peer
166 	NetworkAddress address;
167 
168 	// Information about the peer's TLS certificate, if any
169 	TLSCertificateInformation certificate;
170 }
171 
172 
173 /** Reference counted type used to access a peer's API.
174 
175 	This struct defines an `alias this` to its `implementation` property in
176 	order to provide an interface implementation of `I`. Any calls on the
177 	methods of this implementation will be forwarded to the remote peer.
178 
179 	Note that the WebRPC connection will be closed as soon as the last instance
180 	of a connected `WebRPCPeer` gets destroyed.
181 */
182 struct WebRPCPeer(I) {
183 	private {
184 		WebRPCPeerImpl!(I, I, "") m_impl;
185 	}
186 
187 @safe:
188 
189 	private this(WebRPCPeerImpl!(I, I, "") impl)
190 	{
191 		m_impl = impl;
192 	}
193 
194 	this(this)
195 	{
196 		if (m_impl && m_impl.m_handler)
197 			m_impl.m_handler.addRef();
198 	}
199 
200 	~this()
201 	{
202 		if (m_impl && m_impl.m_handler)
203 			m_impl.m_handler.releaseRef();
204 	}
205 
206 	/** Provides information about the remote peer.
207 	*/
208 	@property ref const(WebRPCPeerInfo) peerInformation() const { return m_impl.m_handler.m_peerInfo; }
209 
210 	/** Accesses the remote peer's API interface.
211 
212 		Note that this does not need to be called explicitly, as an `alias this`
213 		will make all methods of `I` available on `WebRPCPeer` directly.
214 	*/
215 	@property inout(I) implementation() inout { return m_impl; }
216 
217 	///
218 	alias implementation this;
219 }
220 
221 
222 final class WebRPCPeerImpl(I, RootI, string method_prefix) : I
223 	if (is(I == interface) && is(RootI == interface))
224 {
225 	private alias Info = RestInterface!(I, false);
226 
227 	mixin(generateModuleImports!I());
228 
229 	private alias SubPeerImpl(alias method) = WebRPCPeerImpl!(SubInterfaceType!method, RootI, method_prefix ~ __traits(identifier, method) ~ ".");
230 
231 	private {
232 		WebSocketHandler!RootI m_handler;
233 		staticMap!(SubPeerImpl, Info.SubInterfaceFunctions) m_subInterfaces;
234 	}
235 
236 @safe:
237 
238 	private this(WebSocketHandler!RootI handler)
239 	{
240 		m_handler = handler;
241 		foreach (i, SI; Info.SubInterfaceTypes)
242 			m_subInterfaces[i] = new WebRPCPeerImpl!(SI, RootI, method_prefix ~ __traits(identifier, Info.SubInterfaceFunctions[i]) ~ ".")(handler);
243 	}
244 
245 	mixin(generateWebRPCPeerMethods!I());
246 
247 	private ReturnType!method performCall(alias method, PARAMS...)(auto ref PARAMS params)
248 	{
249 		alias outparams = refOutParameterIndices!method;
250 		alias paramnames = ParameterIdentifierTuple!method;
251 
252 		Bson args = Bson.emptyObject;
253 		foreach (i, pname; ParameterIdentifierTuple!method)
254 			static if (!is(ParameterTypeTuple!method[i] == AuthInfo!I) && !(ParameterStorageClassTuple!method[i] & ParameterStorageClass.out_))
255 				args[pname] = serializeToBson(params[i]);
256 		auto seq = m_handler.sendCall(method_prefix ~ __traits(identifier, method), args);
257 		auto ret = m_handler.waitForResponse(seq);
258 		static if (outparams.length > 0) {
259 			foreach (pi; outparams)
260 				params[pi] = ret[paramnames[pi]].deserializeBson!(PARAMS[pi]);
261 			static if (!is(ReturnType!method == void))
262 				return ret["return"].deserializeBson!(ReturnType!method);
263 		} else static if (isInputStream!(ReturnType!method)) {
264 			throw new Exception("Stream type results are not yet supported");
265 		} else static if (!is(ReturnType!method == void)) {
266 			return ret.deserializeBson!(ReturnType!method);
267 		}
268 	}
269 }
270 
271 
272 version (unittest) {
273 	private interface TestSubI {
274 	@safe:
275 		int test();
276 	}
277 
278 	private interface TestCollI {
279 	@safe:
280 		struct CollectionIndices {
281 			int index;
282 		}
283 
284 		@property int count();
285 		int get(int index);
286 	}
287 
288 	@requiresAuth!TestAuthInfo
289 	private interface TestAuthI {
290 	@safe:
291 		@noAuth void login();
292 		@noAuth int testUnauthenticated();
293 		@auth(Role.authenticatedPeer) int testAuthenticated();
294 		@noRoute TestAuthInfo authenticate(ref const WebRPCPeerInfo peer);
295 	}
296 
297 	struct TestAuthInfo {
298 		bool authenticated;
299 
300 		bool isAuthenticatedPeer() @safe nothrow { return authenticated; }
301 	}
302 
303 	private interface TestI {
304 	@safe:
305 		@property TestSubI sub();
306 		@property Collection!TestCollI items();
307 		@property TestAuthI auth();
308 		int add(int a, int b);
309 		void add2(int a, int b, out int c);
310 		int addmul(ref int a, int b, int c);
311 		void except();
312 	}
313 
314 	private class TestSubC : TestSubI {
315 		int test() { return 42; }
316 	}
317 
318 	private class TestCollC : TestCollI {
319 		@property int count() { return 4; }
320 		int get(int index) { return index * 2; }
321 	}
322 
323 	private class TestAuthC : TestAuthI {
324 		private bool m_authenticated;
325 
326 		void login() { m_authenticated = true; }
327 		@noAuth int testUnauthenticated() { return 1; }
328 		@auth(Role.authenticatedPeer) int testAuthenticated() { return 2; }
329 
330 		@noRoute
331 		TestAuthInfo authenticate(ref const WebRPCPeerInfo peer)
332 		{
333 			return TestAuthInfo(m_authenticated);
334 		}
335 	}
336 
337 	private class TestC : TestI {
338 		TestSubC m_sub;
339 		TestCollC m_items;
340 		TestAuthC m_auth;
341 		this() {
342 			m_sub = new TestSubC;
343 			m_items = new TestCollC;
344 			m_auth = new TestAuthC;
345 		}
346 		@property TestSubC sub() { return m_sub; }
347 		@property Collection!TestCollI items() { return Collection!TestCollI(m_items); }
348 		@property TestAuthI auth() { return m_auth; }
349 		int add(int a, int b) { return a + b; }
350 		void add2(int a, int b, out int c) { c = a + b; }
351 		int addmul(ref int a, int b, int c) { a += b; return a * c; }
352 		void except() { throw new Exception("Error!"); }
353 	}
354 }
355 
356 unittest {
357 	import core.time : seconds;
358 	import std.exception : assertThrown;
359 	import vibe.core.core : setTimer;
360 
361 	auto tm = setTimer(1.seconds, { assert(false, "Test timeout"); });
362 	scope (exit) tm.stop();
363 
364 	auto r = new URLRouter;
365 	bool got_client = false;
366 	registerWebRPC!TestI(r, "/rpc", new TestC, (WebRPCPeer!TestI peer) @safe nothrow {
367 		// test the reverse direction (server calls client)
368 		try assert(peer.add(2, 3) == 5);
369 		catch (Exception e) assert(false, e.msg);
370 		got_client = true;
371 	});
372 
373 	auto l = listenHTTP("127.0.0.1:0", r);
374 	auto url = URL("http", "127.0.0.1", l.bindAddresses[0].port, InetPath("/rpc"));
375 	auto cli = connectWebRPC!TestI(url, new TestC);
376 
377 	// simple method call with return value
378 	assert(cli.add(3, 4) == 7);
379 
380 	// sub interface method call
381 	assert(cli.sub.test() == 42);
382 
383 	{ // call with out parameter
384 		int c;
385 		cli.add2(2, 3, c);
386 		assert(c == 5);
387 	}
388 
389 	{ // call with ref parameter
390 		int a;
391 		a = 4;
392 		assert(cli.addmul(a, 2, 3) == 18);
393 		assert(a == 6);
394 	}
395 
396 	try { // call with exception
397 		cli.except();
398 		assert(false);
399 	} catch (Exception e) {
400 		assert(e.msg == "Error!");
401 	}
402 
403 	// Collection!I syntax
404 	assert(cli.items.count == 4);
405 	foreach (i; 0 .. 4)
406 		assert(cli.items[i].get() == i * 2);
407 
408 	// "auth" framework tests
409 	assert(cli.auth.testUnauthenticated() == 1);
410 	assertThrown(cli.auth.testAuthenticated());
411 	cli.auth.login();
412 	assert(cli.auth.testAuthenticated() == 2);
413 
414 	// make sure the reverse direction got established and tested
415 	while (!got_client) yield();
416 }
417 
418 
419 private void handleWebRPC(I)(I implementation, WebRPCPeerCallback!I peer_callback,
420 	scope HTTPServerRequest req, scope HTTPServerResponse res)
421 {
422 	void handleSocket(scope WebSocket ws)
423 	nothrow {
424 		import std.exception : assumeWontThrow;
425 
426 		scope const(HTTPServerRequest) req;
427 		auto info = const(WebRPCPeerInfo)(
428 			ws.request.assumeWontThrow.clientAddress,
429 			ws.request.assumeWontThrow.clientCertificate);
430 		auto h = new WebSocketHandler!I(ws, implementation, info);
431 		h.addRef(); // WebRPCPeer expects to receive an already owned handler
432 
433 		// start reverse communication asynchronously
434 		auto t = runTask((WebRPCPeerCallback!I cb, WebSocketHandler!I h) {
435 			cb(WebRPCPeer!I(new WebRPCPeerImpl!(I, I, "")(h)));
436 		}, peer_callback, h);
437 
438 		// handle incoming messages
439 		h.runReadLoop();
440 		h.releaseRef();
441 		t.joinUninterruptible();
442 
443 		try ws.close();
444 		catch (Exception e) logException(e, "Failed to close WebSocket after handling WebRPC connection");
445 		h.m_socket = WebSocket.init;
446 	}
447 
448 	handleWebSocket(&handleSocket, req, res);
449 }
450 
451 
452 private string generateWebRPCPeerMethods(I)()
453 {
454 	import std.array : join;
455 	import std.string : format;
456 	import vibe.web.common : NoRouteAttribute;
457 
458 	alias Info = RestInterface!(I, false);
459 
460 	string ret = q{
461 		import vibe.internal.meta.codegen : CloneFunction;
462 	};
463 
464 	// generate sub interface methods
465 	foreach (i, SI; Info.SubInterfaceTypes) {
466 		alias F = Info.SubInterfaceFunctions[i];
467 		alias RT = ReturnType!F;
468 		alias ParamNames = ParameterIdentifierTuple!F;
469 		static if (ParamNames.length == 0) enum pnames = "";
470 		else enum pnames = ", " ~ [ParamNames].join(", ");
471 		static if (isInstanceOf!(Collection, RT)) {
472 			ret ~= q{
473 					mixin CloneFunction!(Info.SubInterfaceFunctions[%1$s], q{
474 						return Collection!(%2$s)(m_subInterfaces[%1$s]%3$s);
475 					});
476 				}.format(i, fullyQualifiedName!SI, pnames);
477 		} else {
478 			ret ~= q{
479 					mixin CloneFunction!(Info.SubInterfaceFunctions[%1$s], q{
480 						return m_subInterfaces[%1$s];
481 					});
482 				}.format(i);
483 		}
484 	}
485 
486 	// generate route methods
487 	foreach (i, F; Info.RouteFunctions) {
488 		alias ParamNames = ParameterIdentifierTuple!F;
489 		static if (ParamNames.length == 0) enum pnames = "";
490 		else enum pnames = [ParamNames].join(", ");
491 
492 		ret ~= q{
493 				mixin CloneFunction!(Info.RouteFunctions[%2$s], q{
494 					return performCall!(Info.RouteFunctions[%2$s])(%3$s);
495 				});
496 			}.format(fullyQualifiedName!F, i, pnames);
497 	}
498 
499 	// generate stubs for non-route functions
500 	static foreach (m; __traits(allMembers, I))
501 		foreach (i, fun; MemberFunctionsTuple!(I, m))
502 			static if (hasUDA!(fun, NoRouteAttribute))
503 				ret ~= q{
504 					mixin CloneFunction!(MemberFunctionsTuple!(I, "%s")[%s], q{
505 						assert(false);
506 					});
507 				}.format(m, i);
508 
509 	return ret;
510 }
511 
512 
513 private string generateModuleImports(I)()
514 {
515 	if (!__ctfe)
516 		assert (false);
517 
518 	import vibe.internal.meta.codegen : getRequiredImports;
519 	import std.algorithm : map;
520 	import std.array : join;
521 
522 	auto modules = getRequiredImports!I();
523 	return join(map!(a => "static import " ~ a ~ ";")(modules), "\n");
524 }
525 
526 
527 private final class WebSocketHandler(I) {
528 	import vibe.core.sync : LocalManualEvent, TaskMutex, createManualEvent;
529 
530 	private alias Info = RestInterface!(I, false);
531 
532 	struct Res {
533 		Bson result;
534 		string error;
535 	}
536 
537 	private {
538 		I m_impl;
539 		const WebRPCPeerInfo m_peerInfo;
540 		int m_refCount = 1;
541 		WebSocket m_socket;
542 		TaskMutex m_sendMutex;
543 		ulong m_sequence;
544 		Res[ulong] m_availableResponses;
545 		LocalManualEvent m_responseEvent;
546 	}
547 
548 @safe:
549 
550 	this(return WebSocket ws, I impl, ref const(WebRPCPeerInfo) peer_info)
551 	{
552 		m_impl = impl;
553 		m_peerInfo = peer_info;
554 
555 		static if (__VERSION__ < 2106)
556 			() @trusted { m_socket = ws; } ();
557 		else m_socket = ws;
558 		m_sendMutex = new TaskMutex;
559 		m_responseEvent = createManualEvent();
560 	}
561 
562 	void addRef()
563 	{
564 		m_refCount++;
565 	}
566 
567 	void releaseRef()
568 	{
569 		if (!--m_refCount) {
570 			try m_socket.close();
571 			catch (Exception e) {
572 				logException(e, "Failed to close WebSocket");
573 			}
574 			m_socket = null;
575 			m_responseEvent.emit();
576 		}
577 	}
578 
579 	ulong sendCall(string method, Bson arguments)
580 	{
581 		m_sendMutex.lock();
582 		scope (exit) m_sendMutex.unlock();
583 
584 		if (!m_socket || !m_socket.connected)
585 			throw new Exception("Connection closed before sending WebRPC call");
586 
587 		WebRPCCallPacket pack;
588 		pack.sequence = m_sequence++;
589 		pack.method = method;
590 		pack.arguments = arguments;
591 		auto bpack = serializeToBson(pack);
592 		m_socket.send(WebRPCMessageType.call ~ bpack.data);
593 		return pack.sequence;
594 	}
595 
596 	void sendResponse(ulong sequence, Bson result)
597 	{
598 		m_sendMutex.lock();
599 		scope (exit) m_sendMutex.unlock();
600 
601 		if (!m_socket || !m_socket.connected)
602 			throw new Exception("Connection closed before sending WebRPC response");
603 
604 		WebRPCResponsePacket res;
605 		res.sequence = sequence;
606 		res.result = result;
607 		auto bpack = serializeToBson(res);
608 		m_socket.send(WebRPCMessageType.response ~ bpack.data);
609 	}
610 
611 	void sendErrorResponse(ulong sequence, string error_message)
612 	{
613 		m_sendMutex.lock();
614 		scope (exit) m_sendMutex.unlock();
615 
616 		if (!m_socket || !m_socket.connected)
617 			throw new Exception("Connection closed before sending WebRPC error response");
618 
619 		WebRPCErrorResponsePacket res;
620 		res.sequence = sequence;
621 		res.message = error_message;
622 		auto bpack = serializeToBson(res);
623 		m_socket.send(WebRPCMessageType.errorResponse ~ bpack.data);
624 	}
625 
626 
627 	Bson waitForResponse(ulong sequence)
628 	{
629 		auto ec = m_responseEvent.emitCount;
630 		while (true) {
631 			if (!m_socket || !m_socket.connected)
632 				throw new Exception("Connection closed while waiting for WebRPC response");
633 			if (auto pr = sequence in m_availableResponses) {
634 				if (pr.error !is null)
635 					throw new Exception(pr.error);
636 				auto ret = *pr;
637 				m_availableResponses.remove(sequence);
638 				return ret.result;
639 			}
640 			ec = m_responseEvent.wait(ec);
641 		}
642 	}
643 
644 	private void terminateConnection()
645 	nothrow {
646 		if (!m_socket) return;
647 		try m_socket.close(WebSocketCloseReason.internalError);
648 		catch (Exception e2) {
649 			logException(e2, "Failed to close WebSocket after failure");
650 		}
651 	}
652 
653 	void runReadLoop()
654 	nothrow {
655 		try {
656 			while (m_socket && m_socket.waitForData) {
657 				if (!m_socket) break;
658 				auto msg = m_socket.receiveBinary();
659 				auto brep = Bson(Bson.Type.object, msg[1 .. $].idup);
660 				switch (msg[0]) {
661 					default:
662 						logWarn("Unknown message type received (%s) - terminating WebRPC connection", brep["type"].opt!int(-1));
663 						m_socket.close();
664 						return;
665 					case WebRPCMessageType.call:
666 						addRef();
667 						runTask((WebSocketHandler handler, Bson brep) nothrow {
668 							scope (exit) handler.releaseRef();
669 							WebRPCCallPacket cmsg;
670 							try cmsg = deserializeBson!WebRPCCallPacket(brep);
671 							catch (Exception e) {
672 								logException(e, "Invalid call packet");
673 								handler.terminateConnection();
674 								return;
675 							}
676 							Bson res;
677 							try res = handler.invokeMethod(cmsg.method, cmsg.arguments);
678 							catch (Exception e) {
679 								logDiagnostic("WebRPC method %s has thrown: %s", cmsg.method, e.msg);
680 								try handler.sendErrorResponse(cmsg.sequence, e.msg);
681 								catch (Exception e) {
682 									logException(e, "Failed to send error response");
683 									handler.terminateConnection();
684 								}
685 								return;
686 							}
687 							try handler.sendResponse(cmsg.sequence, res);
688 							catch (Exception e) {
689 								logException(e, "Failed to send response");
690 								handler.terminateConnection();
691 							}
692 						}, this, brep);
693 						break;
694 					case WebRPCMessageType.response:
695 						auto rmsg = deserializeBson!WebRPCResponsePacket(brep);
696 						m_availableResponses[rmsg.sequence] = Res(rmsg.result, null);
697 						m_responseEvent.emit();
698 						break;
699 					case WebRPCMessageType.errorResponse:
700 						auto rmsg = deserializeBson!WebRPCErrorResponsePacket(brep);
701 						m_availableResponses[rmsg.sequence] = Res(Bson.init, rmsg.message);
702 						m_responseEvent.emit();
703 						break;
704 				}
705 			}
706 		} catch (Exception e) {
707 			logException(e, "WebRPC read failed");
708 			terminateConnection();
709 		}
710 	}
711 
712 	private Bson invokeMethod(string name, Bson arguments)
713 	{
714 		switch (name) {
715 			default: throw new Exception("Unknown method called: " ~ name);
716 			static foreach (FI; recursiveInterfaceFunctions!(I, "")) {
717 				case FI.expand[2]: return invokeMethodF!(FI.expand)(arguments);
718 			}
719 		}
720 	}
721 
722 	private Bson invokeMethodF(SI, alias method, string qualified_name)(Bson arguments)
723 	{
724 		alias outparams = refOutParameterIndices!method;
725 		alias paramnames = ParameterIdentifierTuple!method;
726 
727 		SI impl = resolveImpl!qualified_name(m_impl, arguments);
728 
729 		ParameterTypeTuple!method args;
730 		resolveArguments!method(impl, arguments, args);
731 
732 		alias RT = typeof(__traits(getMember, impl, __traits(identifier, method))(args));
733 		static if (!is(RT == void)) {
734 			auto ret = __traits(getMember, impl, __traits(identifier, method))(args);
735 		} else {
736 			__traits(getMember, impl, __traits(identifier, method))(args);
737 		}
738 		Bson bret;
739 		static if (outparams.length > 0) {
740 			bret = Bson.emptyObject;
741 			foreach (pi; outparams)
742 				bret[paramnames[pi]] = serializeToBson(args[pi]);
743 			static if (is(typeof(ret)) && !isInputStream!(typeof(ret)))
744 				bret["return"] = serializeToBson(ret);
745 		} else static if (is(typeof(ret)) && !isInputStream!(typeof(ret))) {
746 			bret = serializeToBson(ret);
747 		}
748 		return bret;
749 	}
750 
751 	private auto resolveImpl(string qualified_name, RI)(RI base, Bson arguments)
752 		if (is(RI == interface))
753 	{
754 		import std.string : indexOf;
755 		enum idx = qualified_name.indexOf('.');
756 		static if (idx < 0) return base;
757 		else {
758 			enum mname = qualified_name[0 .. idx];
759 			alias method = __traits(getMember, base, mname);
760 
761 			ParameterTypeTuple!method args;
762 			resolveArguments!method(base, arguments, args);
763 
764 			static if (isInstanceOf!(Collection, ReturnType!(__traits(getMember, base, mname))))
765 				return resolveImpl!(qualified_name[idx+1 .. $])(__traits(getMember, base, mname)(args).m_interface, arguments);
766 			else
767 				return resolveImpl!(qualified_name[idx+1 .. $])(__traits(getMember, base, mname)(args), arguments);
768 		}
769 	}
770 
771 	private void resolveArguments(alias method, SI)(SI impl, Bson arguments, out typeof(ParameterTypeTuple!method.init) args)
772 	{
773 		alias paramnames = ParameterIdentifierTuple!method;
774 
775 		static if (isAuthenticated!(SI, method)) {
776 			typeof(handleAuthentication!method(impl, m_peerInfo)) auth_info;
777 
778 			auth_info = handleAuthentication!method(impl, m_peerInfo);
779 		}
780 
781 		foreach (i, name; paramnames) {
782 			static if (is(typeof(args[i]) == AuthInfo!SI))
783 				args[i] = auth_info;
784 			else static if (!(ParameterStorageClassTuple!method[i] & ParameterStorageClass.out_))
785 				args[i] = deserializeBson!(typeof(args[i]))(arguments[name]);
786 		}
787 
788 		static if (isAuthenticated!(SI, method))
789 			handleAuthorization!(SI, method, args)(auth_info);
790 	}
791 }
792 
793 
794 private enum WebRPCMessageType : ubyte {
795 	call = 1,
796 	response = 2,
797 	errorResponse = 3
798 }
799 
800 private struct WebRPCCallPacket {
801 	ulong sequence;
802 	string method;
803 	Bson arguments;
804 }
805 
806 private struct WebRPCResponsePacket {
807 	ulong sequence;
808 	Bson result;
809 }
810 
811 private struct WebRPCErrorResponsePacket {
812 	ulong sequence;
813 	string message;
814 }
815 
816 
817 private template refOutParameterIndices(alias fun)
818 {
819 	alias pcls = ParameterStorageClassTuple!fun;
820 	template impl(size_t i) {
821 		static if (i < pcls.length) {
822 			static if (pcls[i] & (ParameterStorageClass.out_|ParameterStorageClass.ref_))
823 				alias impl = AliasSeq!(i, impl!(i+1));
824 			else alias impl = impl!(i+1);
825 		} else alias impl = AliasSeq!();
826 	}
827 	alias refOutParameterIndices = impl!0;
828 }
829 
830 private template recursiveInterfaceFunctions(I, string method_prefix)
831 {
832 	import vibe.internal.meta.typetuple : Group;
833 
834 	alias Info = RestInterface!(I, false);
835 
836 	alias MethodEntry(alias method) = Group!(I, method, method_prefix ~ __traits(identifier, method));
837 
838 	alias SubInterfaceEntry(alias method) = recursiveInterfaceFunctions!(SubInterfaceType!method, method_prefix ~ __traits(identifier, method) ~ ".");
839 
840 	alias recursiveInterfaceFunctions = AliasSeq!(
841 		staticMap!(MethodEntry, Info.RouteFunctions),
842 		staticMap!(SubInterfaceEntry, Info.SubInterfaceFunctions)
843 	);
844 }