1 2 /** Web based, bi-directional, concurrent RPC implementation. 3 4 This module implements a generic RPC mechanism that allows transparently 5 calling remote functions over an HTTP based network connection. The current 6 implementation is based on a WebSocket based protocol, serializing method 7 arguments and return types as BSON. 8 9 The RPC API is defined using interfaces, very similar to the system in 10 `vibe.web.rest`. It supports methods with or without a return value, normal, 11 `ref` and `out` parameters, exceptions, properties returning interfaces, 12 and properties returning `vibe.web.rest.Collection!I`. 13 14 Authorization and authentication is supported via the `vibe.web.auth` 15 framework. When using it, the `authenticate` method should be defined as 16 `@noRoute T authenticate(ref const WebRPCPerrInfo)`, where `T` is the type 17 passed to the `@requiresAuth` UDA. 18 19 Any remote function calls can execute concurrently, so that the connection 20 never gets blocked by an unfinished function call. 21 22 Note that this system will establish a bi-directional communication 23 facility, allowing both, the client and the server, to initiate calls. This 24 effectively forms a peer-to-peer connection instead of a client-server 25 connection. The advantage of using HTTP as the basis is that this makes it 26 easy to establish P2P connections where one of the peers is behind a 27 firewall or NAT layer, but the other peer can be reached through a public 28 port or through a (reverse) proxy server. 29 30 31 Defining_a_simple_RPC_interface: 32 33 The API for the interface is defined as a normal D interface: 34 35 --- 36 interface ExampleAPI { 37 void performSomeAction(); 38 int getSomeInformation(); 39 } 40 --- 41 42 An implementation of this interface is required on both, the server and the 43 client side: 44 45 --- 46 class ExampleAPIImplementation : ExampleAPI { 47 void performSomeAction() { ... } 48 int getSomeInformation() { return ...; } 49 } 50 --- 51 52 With this defined, this is the basic code needed to set up the server side: 53 54 --- 55 void handleIncomingPeer(WebRPCPeer!ExampleAPI peer) 56 @safe nothrow { 57 // this gets executed for any client that connects to the server 58 try { 59 peer.performSomeAction(); 60 } catch (Exception e) { 61 logException(e, "Failed to perform peer action"); 62 } 63 } 64 65 auto r = new URLRouter; 66 r.registerWebRPC!ExampleAPI(r, "/rpc", new ExampleAPIImplementation, &handlePeer); 67 // could register other routes here, such as for a web or REST interface 68 69 auto l = listenHTTP("127.0.0.1:1234", r); 70 --- 71 72 A client can now connect to the server and access the API as well: 73 74 --- 75 auto peer = connectWebRPC(URL("http://127.0.0.1:1234/rpc"), 76 new ExampleAPIImplementation); 77 78 peer.performSomeAction(); 79 --- 80 81 82 Copyright: © 2024 Sönke Ludwig 83 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 84 Authors: Sönke Ludwig 85 */ 86 module vibe.web.rpc; 87 88 import vibe.core.log; 89 import vibe.core.core : Task, runTask, yield; 90 import vibe.core.path : InetPath; 91 import vibe.core.stream : isInputStream; 92 import vibe.data.bson; 93 import vibe.inet.url : URL; 94 import vibe.http.router; 95 import vibe.http.server; 96 import vibe.http.websockets; 97 import vibe.stream.tls : TLSCertificateInformation; 98 import vibe.web.internal.rest.common : RestInterface, SubInterfaceType; 99 import vibe.web.auth; 100 import vibe.web.common; 101 import vibe.web.rest : Collection; 102 103 import std.meta; 104 import std.traits; 105 106 107 alias WebRPCPeerCallback(I) = void delegate(WebRPCPeer!I peer) @safe nothrow; 108 109 110 /** Registers a route for handling incoming WebRPC requests. 111 112 The endpoint defined by `path` will attempt to establish a WebSocket 113 connection with the client and subsequently enables bi-directional 114 communication by listening for calls made by the client, as well as invoking 115 the `peer_callback` to allow the server to make calls, too. 116 117 Params: 118 router = The `URLRouter` on which to register the endpoint 119 path = Path of the registered endpoint 120 implementation = The API implementation to invoke for incoming method 121 calls 122 peer_callback = Callback invoked for each incoming connection 123 */ 124 void registerWebRPC(I)(URLRouter router, string path, I implementation, 125 WebRPCPeerCallback!I peer_callback) 126 if (is(I == interface)) 127 { 128 router.get(path, (scope HTTPServerRequest req, scope HTTPServerResponse res) => handleWebRPC!I(implementation, peer_callback, req, res)); 129 } 130 131 132 /** Connects to a WebRPC endpoint. 133 134 This will perform a HTTP GET request to the supplied `url` and attempts 135 to establish a WebSocket connection for bi-directional communication. 136 Incoming method calls will be forwarded to `implementation`. 137 138 Params: 139 url = URL of the endpoint to connect to 140 implementation = The API implementation to invoke for incoming method 141 calls 142 143 Returns: 144 A `WebRPCPeer` instance is returned, which exposes the API interface `I` 145 for making outgoing method calls. 146 */ 147 WebRPCPeer!I connectWebRPC(I)(URL url, I implementation) 148 if (is(I == interface)) 149 { 150 WebRPCPeerInfo info; 151 auto ws = connectWebSocketEx(url, (scope req) { 152 info.address = req.remoteAddress; 153 info.certificate = req.peerCertificate; 154 }); 155 auto h = new WebSocketHandler!I(ws, implementation, info); 156 runTask(&h.runReadLoop); 157 158 return WebRPCPeer!I(new WebRPCPeerImpl!(I, I, "")(h)); 159 } 160 161 162 /** Provides information about a peer; 163 */ 164 struct WebRPCPeerInfo { 165 // (Remote) address of the peer 166 NetworkAddress address; 167 168 // Information about the peer's TLS certificate, if any 169 TLSCertificateInformation certificate; 170 } 171 172 173 /** Reference counted type used to access a peer's API. 174 175 This struct defines an `alias this` to its `implementation` property in 176 order to provide an interface implementation of `I`. Any calls on the 177 methods of this implementation will be forwarded to the remote peer. 178 179 Note that the WebRPC connection will be closed as soon as the last instance 180 of a connected `WebRPCPeer` gets destroyed. 181 */ 182 struct WebRPCPeer(I) { 183 private { 184 WebRPCPeerImpl!(I, I, "") m_impl; 185 } 186 187 @safe: 188 189 private this(WebRPCPeerImpl!(I, I, "") impl) 190 { 191 m_impl = impl; 192 } 193 194 this(this) 195 { 196 if (m_impl && m_impl.m_handler) 197 m_impl.m_handler.addRef(); 198 } 199 200 ~this() 201 { 202 if (m_impl && m_impl.m_handler) 203 m_impl.m_handler.releaseRef(); 204 } 205 206 /** Provides information about the remote peer. 207 */ 208 @property ref const(WebRPCPeerInfo) peerInformation() const { return m_impl.m_handler.m_peerInfo; } 209 210 /** Accesses the remote peer's API interface. 211 212 Note that this does not need to be called explicitly, as an `alias this` 213 will make all methods of `I` available on `WebRPCPeer` directly. 214 */ 215 @property inout(I) implementation() inout { return m_impl; } 216 217 /// 218 alias implementation this; 219 } 220 221 222 final class WebRPCPeerImpl(I, RootI, string method_prefix) : I 223 if (is(I == interface) && is(RootI == interface)) 224 { 225 private alias Info = RestInterface!(I, false); 226 227 mixin(generateModuleImports!I()); 228 229 private alias SubPeerImpl(alias method) = WebRPCPeerImpl!(SubInterfaceType!method, RootI, method_prefix ~ __traits(identifier, method) ~ "."); 230 231 private { 232 WebSocketHandler!RootI m_handler; 233 staticMap!(SubPeerImpl, Info.SubInterfaceFunctions) m_subInterfaces; 234 } 235 236 @safe: 237 238 private this(WebSocketHandler!RootI handler) 239 { 240 m_handler = handler; 241 foreach (i, SI; Info.SubInterfaceTypes) 242 m_subInterfaces[i] = new WebRPCPeerImpl!(SI, RootI, method_prefix ~ __traits(identifier, Info.SubInterfaceFunctions[i]) ~ ".")(handler); 243 } 244 245 mixin(generateWebRPCPeerMethods!I()); 246 247 private ReturnType!method performCall(alias method, PARAMS...)(auto ref PARAMS params) 248 { 249 alias outparams = refOutParameterIndices!method; 250 alias paramnames = ParameterIdentifierTuple!method; 251 252 Bson args = Bson.emptyObject; 253 foreach (i, pname; ParameterIdentifierTuple!method) 254 static if (!is(ParameterTypeTuple!method[i] == AuthInfo!I) && !(ParameterStorageClassTuple!method[i] & ParameterStorageClass.out_)) 255 args[pname] = serializeToBson(params[i]); 256 auto seq = m_handler.sendCall(method_prefix ~ __traits(identifier, method), args); 257 auto ret = m_handler.waitForResponse(seq); 258 static if (outparams.length > 0) { 259 foreach (pi; outparams) 260 params[pi] = ret[paramnames[pi]].deserializeBson!(PARAMS[pi]); 261 static if (!is(ReturnType!method == void)) 262 return ret["return"].deserializeBson!(ReturnType!method); 263 } else static if (isInputStream!(ReturnType!method)) { 264 throw new Exception("Stream type results are not yet supported"); 265 } else static if (!is(ReturnType!method == void)) { 266 return ret.deserializeBson!(ReturnType!method); 267 } 268 } 269 } 270 271 272 version (unittest) { 273 private interface TestSubI { 274 @safe: 275 int test(); 276 } 277 278 private interface TestCollI { 279 @safe: 280 struct CollectionIndices { 281 int index; 282 } 283 284 @property int count(); 285 int get(int index); 286 } 287 288 @requiresAuth!TestAuthInfo 289 private interface TestAuthI { 290 @safe: 291 @noAuth void login(); 292 @noAuth int testUnauthenticated(); 293 @auth(Role.authenticatedPeer) int testAuthenticated(); 294 @noRoute TestAuthInfo authenticate(ref const WebRPCPeerInfo peer); 295 } 296 297 struct TestAuthInfo { 298 bool authenticated; 299 300 bool isAuthenticatedPeer() @safe nothrow { return authenticated; } 301 } 302 303 private interface TestI { 304 @safe: 305 @property TestSubI sub(); 306 @property Collection!TestCollI items(); 307 @property TestAuthI auth(); 308 int add(int a, int b); 309 void add2(int a, int b, out int c); 310 int addmul(ref int a, int b, int c); 311 void except(); 312 } 313 314 private class TestSubC : TestSubI { 315 int test() { return 42; } 316 } 317 318 private class TestCollC : TestCollI { 319 @property int count() { return 4; } 320 int get(int index) { return index * 2; } 321 } 322 323 private class TestAuthC : TestAuthI { 324 private bool m_authenticated; 325 326 void login() { m_authenticated = true; } 327 @noAuth int testUnauthenticated() { return 1; } 328 @auth(Role.authenticatedPeer) int testAuthenticated() { return 2; } 329 330 @noRoute 331 TestAuthInfo authenticate(ref const WebRPCPeerInfo peer) 332 { 333 return TestAuthInfo(m_authenticated); 334 } 335 } 336 337 private class TestC : TestI { 338 TestSubC m_sub; 339 TestCollC m_items; 340 TestAuthC m_auth; 341 this() { 342 m_sub = new TestSubC; 343 m_items = new TestCollC; 344 m_auth = new TestAuthC; 345 } 346 @property TestSubC sub() { return m_sub; } 347 @property Collection!TestCollI items() { return Collection!TestCollI(m_items); } 348 @property TestAuthI auth() { return m_auth; } 349 int add(int a, int b) { return a + b; } 350 void add2(int a, int b, out int c) { c = a + b; } 351 int addmul(ref int a, int b, int c) { a += b; return a * c; } 352 void except() { throw new Exception("Error!"); } 353 } 354 } 355 356 unittest { 357 import core.time : seconds; 358 import std.exception : assertThrown; 359 import vibe.core.core : setTimer; 360 361 auto tm = setTimer(1.seconds, { assert(false, "Test timeout"); }); 362 scope (exit) tm.stop(); 363 364 auto r = new URLRouter; 365 bool got_client = false; 366 registerWebRPC!TestI(r, "/rpc", new TestC, (WebRPCPeer!TestI peer) @safe nothrow { 367 // test the reverse direction (server calls client) 368 try assert(peer.add(2, 3) == 5); 369 catch (Exception e) assert(false, e.msg); 370 got_client = true; 371 }); 372 373 auto l = listenHTTP("127.0.0.1:0", r); 374 auto url = URL("http", "127.0.0.1", l.bindAddresses[0].port, InetPath("/rpc")); 375 auto cli = connectWebRPC!TestI(url, new TestC); 376 377 // simple method call with return value 378 assert(cli.add(3, 4) == 7); 379 380 // sub interface method call 381 assert(cli.sub.test() == 42); 382 383 { // call with out parameter 384 int c; 385 cli.add2(2, 3, c); 386 assert(c == 5); 387 } 388 389 { // call with ref parameter 390 int a; 391 a = 4; 392 assert(cli.addmul(a, 2, 3) == 18); 393 assert(a == 6); 394 } 395 396 try { // call with exception 397 cli.except(); 398 assert(false); 399 } catch (Exception e) { 400 assert(e.msg == "Error!"); 401 } 402 403 // Collection!I syntax 404 assert(cli.items.count == 4); 405 foreach (i; 0 .. 4) 406 assert(cli.items[i].get() == i * 2); 407 408 // "auth" framework tests 409 assert(cli.auth.testUnauthenticated() == 1); 410 assertThrown(cli.auth.testAuthenticated()); 411 cli.auth.login(); 412 assert(cli.auth.testAuthenticated() == 2); 413 414 // make sure the reverse direction got established and tested 415 while (!got_client) yield(); 416 } 417 418 419 private void handleWebRPC(I)(I implementation, WebRPCPeerCallback!I peer_callback, 420 scope HTTPServerRequest req, scope HTTPServerResponse res) 421 { 422 void handleSocket(scope WebSocket ws) 423 nothrow { 424 import std.exception : assumeWontThrow; 425 426 scope const(HTTPServerRequest) req; 427 auto info = const(WebRPCPeerInfo)( 428 ws.request.assumeWontThrow.clientAddress, 429 ws.request.assumeWontThrow.clientCertificate); 430 auto h = new WebSocketHandler!I(ws, implementation, info); 431 h.addRef(); // WebRPCPeer expects to receive an already owned handler 432 433 // start reverse communication asynchronously 434 auto t = runTask((WebRPCPeerCallback!I cb, WebSocketHandler!I h) { 435 cb(WebRPCPeer!I(new WebRPCPeerImpl!(I, I, "")(h))); 436 }, peer_callback, h); 437 438 // handle incoming messages 439 h.runReadLoop(); 440 h.releaseRef(); 441 t.joinUninterruptible(); 442 443 try ws.close(); 444 catch (Exception e) logException(e, "Failed to close WebSocket after handling WebRPC connection"); 445 h.m_socket = WebSocket.init; 446 } 447 448 handleWebSocket(&handleSocket, req, res); 449 } 450 451 452 private string generateWebRPCPeerMethods(I)() 453 { 454 import std.array : join; 455 import std.string : format; 456 import vibe.web.common : NoRouteAttribute; 457 458 alias Info = RestInterface!(I, false); 459 460 string ret = q{ 461 import vibe.internal.meta.codegen : CloneFunction; 462 }; 463 464 // generate sub interface methods 465 foreach (i, SI; Info.SubInterfaceTypes) { 466 alias F = Info.SubInterfaceFunctions[i]; 467 alias RT = ReturnType!F; 468 alias ParamNames = ParameterIdentifierTuple!F; 469 static if (ParamNames.length == 0) enum pnames = ""; 470 else enum pnames = ", " ~ [ParamNames].join(", "); 471 static if (isInstanceOf!(Collection, RT)) { 472 ret ~= q{ 473 mixin CloneFunction!(Info.SubInterfaceFunctions[%1$s], q{ 474 return Collection!(%2$s)(m_subInterfaces[%1$s]%3$s); 475 }); 476 }.format(i, fullyQualifiedName!SI, pnames); 477 } else { 478 ret ~= q{ 479 mixin CloneFunction!(Info.SubInterfaceFunctions[%1$s], q{ 480 return m_subInterfaces[%1$s]; 481 }); 482 }.format(i); 483 } 484 } 485 486 // generate route methods 487 foreach (i, F; Info.RouteFunctions) { 488 alias ParamNames = ParameterIdentifierTuple!F; 489 static if (ParamNames.length == 0) enum pnames = ""; 490 else enum pnames = [ParamNames].join(", "); 491 492 ret ~= q{ 493 mixin CloneFunction!(Info.RouteFunctions[%2$s], q{ 494 return performCall!(Info.RouteFunctions[%2$s])(%3$s); 495 }); 496 }.format(fullyQualifiedName!F, i, pnames); 497 } 498 499 // generate stubs for non-route functions 500 static foreach (m; __traits(allMembers, I)) 501 foreach (i, fun; MemberFunctionsTuple!(I, m)) 502 static if (hasUDA!(fun, NoRouteAttribute)) 503 ret ~= q{ 504 mixin CloneFunction!(MemberFunctionsTuple!(I, "%s")[%s], q{ 505 assert(false); 506 }); 507 }.format(m, i); 508 509 return ret; 510 } 511 512 513 private string generateModuleImports(I)() 514 { 515 if (!__ctfe) 516 assert (false); 517 518 import vibe.internal.meta.codegen : getRequiredImports; 519 import std.algorithm : map; 520 import std.array : join; 521 522 auto modules = getRequiredImports!I(); 523 return join(map!(a => "static import " ~ a ~ ";")(modules), "\n"); 524 } 525 526 527 private final class WebSocketHandler(I) { 528 import vibe.core.sync : LocalManualEvent, TaskMutex, createManualEvent; 529 530 private alias Info = RestInterface!(I, false); 531 532 struct Res { 533 Bson result; 534 string error; 535 } 536 537 private { 538 I m_impl; 539 const WebRPCPeerInfo m_peerInfo; 540 int m_refCount = 1; 541 WebSocket m_socket; 542 TaskMutex m_sendMutex; 543 ulong m_sequence; 544 Res[ulong] m_availableResponses; 545 LocalManualEvent m_responseEvent; 546 } 547 548 @safe: 549 550 this(return WebSocket ws, I impl, ref const(WebRPCPeerInfo) peer_info) 551 { 552 m_impl = impl; 553 m_peerInfo = peer_info; 554 555 static if (__VERSION__ < 2106) 556 () @trusted { m_socket = ws; } (); 557 else m_socket = ws; 558 m_sendMutex = new TaskMutex; 559 m_responseEvent = createManualEvent(); 560 } 561 562 void addRef() 563 { 564 m_refCount++; 565 } 566 567 void releaseRef() 568 { 569 if (!--m_refCount) { 570 try m_socket.close(); 571 catch (Exception e) { 572 logException(e, "Failed to close WebSocket"); 573 } 574 m_socket = null; 575 m_responseEvent.emit(); 576 } 577 } 578 579 ulong sendCall(string method, Bson arguments) 580 { 581 m_sendMutex.lock(); 582 scope (exit) m_sendMutex.unlock(); 583 584 if (!m_socket || !m_socket.connected) 585 throw new Exception("Connection closed before sending WebRPC call"); 586 587 WebRPCCallPacket pack; 588 pack.sequence = m_sequence++; 589 pack.method = method; 590 pack.arguments = arguments; 591 auto bpack = serializeToBson(pack); 592 m_socket.send(WebRPCMessageType.call ~ bpack.data); 593 return pack.sequence; 594 } 595 596 void sendResponse(ulong sequence, Bson result) 597 { 598 m_sendMutex.lock(); 599 scope (exit) m_sendMutex.unlock(); 600 601 if (!m_socket || !m_socket.connected) 602 throw new Exception("Connection closed before sending WebRPC response"); 603 604 WebRPCResponsePacket res; 605 res.sequence = sequence; 606 res.result = result; 607 auto bpack = serializeToBson(res); 608 m_socket.send(WebRPCMessageType.response ~ bpack.data); 609 } 610 611 void sendErrorResponse(ulong sequence, string error_message) 612 { 613 m_sendMutex.lock(); 614 scope (exit) m_sendMutex.unlock(); 615 616 if (!m_socket || !m_socket.connected) 617 throw new Exception("Connection closed before sending WebRPC error response"); 618 619 WebRPCErrorResponsePacket res; 620 res.sequence = sequence; 621 res.message = error_message; 622 auto bpack = serializeToBson(res); 623 m_socket.send(WebRPCMessageType.errorResponse ~ bpack.data); 624 } 625 626 627 Bson waitForResponse(ulong sequence) 628 { 629 auto ec = m_responseEvent.emitCount; 630 while (true) { 631 if (!m_socket || !m_socket.connected) 632 throw new Exception("Connection closed while waiting for WebRPC response"); 633 if (auto pr = sequence in m_availableResponses) { 634 if (pr.error !is null) 635 throw new Exception(pr.error); 636 auto ret = *pr; 637 m_availableResponses.remove(sequence); 638 return ret.result; 639 } 640 ec = m_responseEvent.wait(ec); 641 } 642 } 643 644 private void terminateConnection() 645 nothrow { 646 if (!m_socket) return; 647 try m_socket.close(WebSocketCloseReason.internalError); 648 catch (Exception e2) { 649 logException(e2, "Failed to close WebSocket after failure"); 650 } 651 } 652 653 void runReadLoop() 654 nothrow { 655 try { 656 while (m_socket && m_socket.waitForData) { 657 if (!m_socket) break; 658 auto msg = m_socket.receiveBinary(); 659 auto brep = Bson(Bson.Type.object, msg[1 .. $].idup); 660 switch (msg[0]) { 661 default: 662 logWarn("Unknown message type received (%s) - terminating WebRPC connection", brep["type"].opt!int(-1)); 663 m_socket.close(); 664 return; 665 case WebRPCMessageType.call: 666 addRef(); 667 runTask((WebSocketHandler handler, Bson brep) nothrow { 668 scope (exit) handler.releaseRef(); 669 WebRPCCallPacket cmsg; 670 try cmsg = deserializeBson!WebRPCCallPacket(brep); 671 catch (Exception e) { 672 logException(e, "Invalid call packet"); 673 handler.terminateConnection(); 674 return; 675 } 676 Bson res; 677 try res = handler.invokeMethod(cmsg.method, cmsg.arguments); 678 catch (Exception e) { 679 logDiagnostic("WebRPC method %s has thrown: %s", cmsg.method, e.msg); 680 try handler.sendErrorResponse(cmsg.sequence, e.msg); 681 catch (Exception e) { 682 logException(e, "Failed to send error response"); 683 handler.terminateConnection(); 684 } 685 return; 686 } 687 try handler.sendResponse(cmsg.sequence, res); 688 catch (Exception e) { 689 logException(e, "Failed to send response"); 690 handler.terminateConnection(); 691 } 692 }, this, brep); 693 break; 694 case WebRPCMessageType.response: 695 auto rmsg = deserializeBson!WebRPCResponsePacket(brep); 696 m_availableResponses[rmsg.sequence] = Res(rmsg.result, null); 697 m_responseEvent.emit(); 698 break; 699 case WebRPCMessageType.errorResponse: 700 auto rmsg = deserializeBson!WebRPCErrorResponsePacket(brep); 701 m_availableResponses[rmsg.sequence] = Res(Bson.init, rmsg.message); 702 m_responseEvent.emit(); 703 break; 704 } 705 } 706 } catch (Exception e) { 707 logException(e, "WebRPC read failed"); 708 terminateConnection(); 709 } 710 } 711 712 private Bson invokeMethod(string name, Bson arguments) 713 { 714 switch (name) { 715 default: throw new Exception("Unknown method called: " ~ name); 716 static foreach (FI; recursiveInterfaceFunctions!(I, "")) { 717 case FI.expand[2]: return invokeMethodF!(FI.expand)(arguments); 718 } 719 } 720 } 721 722 private Bson invokeMethodF(SI, alias method, string qualified_name)(Bson arguments) 723 { 724 alias outparams = refOutParameterIndices!method; 725 alias paramnames = ParameterIdentifierTuple!method; 726 727 SI impl = resolveImpl!qualified_name(m_impl, arguments); 728 729 ParameterTypeTuple!method args; 730 resolveArguments!method(impl, arguments, args); 731 732 alias RT = typeof(__traits(getMember, impl, __traits(identifier, method))(args)); 733 static if (!is(RT == void)) { 734 auto ret = __traits(getMember, impl, __traits(identifier, method))(args); 735 } else { 736 __traits(getMember, impl, __traits(identifier, method))(args); 737 } 738 Bson bret; 739 static if (outparams.length > 0) { 740 bret = Bson.emptyObject; 741 foreach (pi; outparams) 742 bret[paramnames[pi]] = serializeToBson(args[pi]); 743 static if (is(typeof(ret)) && !isInputStream!(typeof(ret))) 744 bret["return"] = serializeToBson(ret); 745 } else static if (is(typeof(ret)) && !isInputStream!(typeof(ret))) { 746 bret = serializeToBson(ret); 747 } 748 return bret; 749 } 750 751 private auto resolveImpl(string qualified_name, RI)(RI base, Bson arguments) 752 if (is(RI == interface)) 753 { 754 import std.string : indexOf; 755 enum idx = qualified_name.indexOf('.'); 756 static if (idx < 0) return base; 757 else { 758 enum mname = qualified_name[0 .. idx]; 759 alias method = __traits(getMember, base, mname); 760 761 ParameterTypeTuple!method args; 762 resolveArguments!method(base, arguments, args); 763 764 static if (isInstanceOf!(Collection, ReturnType!(__traits(getMember, base, mname)))) 765 return resolveImpl!(qualified_name[idx+1 .. $])(__traits(getMember, base, mname)(args).m_interface, arguments); 766 else 767 return resolveImpl!(qualified_name[idx+1 .. $])(__traits(getMember, base, mname)(args), arguments); 768 } 769 } 770 771 private void resolveArguments(alias method, SI)(SI impl, Bson arguments, out typeof(ParameterTypeTuple!method.init) args) 772 { 773 alias paramnames = ParameterIdentifierTuple!method; 774 775 static if (isAuthenticated!(SI, method)) { 776 typeof(handleAuthentication!method(impl, m_peerInfo)) auth_info; 777 778 auth_info = handleAuthentication!method(impl, m_peerInfo); 779 } 780 781 foreach (i, name; paramnames) { 782 static if (is(typeof(args[i]) == AuthInfo!SI)) 783 args[i] = auth_info; 784 else static if (!(ParameterStorageClassTuple!method[i] & ParameterStorageClass.out_)) 785 args[i] = deserializeBson!(typeof(args[i]))(arguments[name]); 786 } 787 788 static if (isAuthenticated!(SI, method)) 789 handleAuthorization!(SI, method, args)(auth_info); 790 } 791 } 792 793 794 private enum WebRPCMessageType : ubyte { 795 call = 1, 796 response = 2, 797 errorResponse = 3 798 } 799 800 private struct WebRPCCallPacket { 801 ulong sequence; 802 string method; 803 Bson arguments; 804 } 805 806 private struct WebRPCResponsePacket { 807 ulong sequence; 808 Bson result; 809 } 810 811 private struct WebRPCErrorResponsePacket { 812 ulong sequence; 813 string message; 814 } 815 816 817 private template refOutParameterIndices(alias fun) 818 { 819 alias pcls = ParameterStorageClassTuple!fun; 820 template impl(size_t i) { 821 static if (i < pcls.length) { 822 static if (pcls[i] & (ParameterStorageClass.out_|ParameterStorageClass.ref_)) 823 alias impl = AliasSeq!(i, impl!(i+1)); 824 else alias impl = impl!(i+1); 825 } else alias impl = AliasSeq!(); 826 } 827 alias refOutParameterIndices = impl!0; 828 } 829 830 private template recursiveInterfaceFunctions(I, string method_prefix) 831 { 832 import vibe.internal.meta.typetuple : Group; 833 834 alias Info = RestInterface!(I, false); 835 836 alias MethodEntry(alias method) = Group!(I, method, method_prefix ~ __traits(identifier, method)); 837 838 alias SubInterfaceEntry(alias method) = recursiveInterfaceFunctions!(SubInterfaceType!method, method_prefix ~ __traits(identifier, method) ~ "."); 839 840 alias recursiveInterfaceFunctions = AliasSeq!( 841 staticMap!(MethodEntry, Info.RouteFunctions), 842 staticMap!(SubInterfaceEntry, Info.SubInterfaceFunctions) 843 ); 844 }