1 /** 2 Low level mongodb protocol. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.connection; 9 10 // /// prints ALL modern OP_MSG queries and legacy runCommand invocations to logDiagnostic 11 // debug = VibeVerboseMongo; 12 13 public import vibe.data.bson; 14 15 import vibe.core.core : vibeVersionString; 16 import vibe.core.log; 17 import vibe.core.net; 18 import vibe.data.bson; 19 import vibe.db.mongo.flags; 20 import vibe.db.mongo.settings; 21 import vibe.inet.webform; 22 import vibe.stream.tls; 23 24 import std.algorithm : findSplit, map, splitter; 25 import std.array; 26 import std.conv; 27 import std.digest.md; 28 import std.exception; 29 import std.range; 30 import std.string; 31 import std.traits : hasIndirections; 32 import std.typecons; 33 34 import core.time; 35 36 private struct _MongoErrorDescription 37 { 38 string message; 39 int code; 40 int connectionId; 41 int n; 42 double ok; 43 } 44 45 /** 46 * D POD representation of Mongo error object. 47 * 48 * For successful queries "code" is negative. 49 * Can be used also to check how many documents where updated upon 50 * a successful query via "n" field. 51 */ 52 alias MongoErrorDescription = immutable(_MongoErrorDescription); 53 54 /** 55 * Root class for vibe.d Mongo driver exception hierarchy. 56 */ 57 class MongoException : Exception 58 { 59 @safe: 60 61 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 62 { 63 super(message, file, line, next); 64 } 65 } 66 67 /** 68 * Generic class for all exception related to unhandled driver problems. 69 * 70 * I.e.: protocol mismatch or unexpected mongo service behavior. 71 */ 72 class MongoDriverException : MongoException 73 { 74 @safe: 75 76 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 77 { 78 super(message, file, line, next); 79 } 80 } 81 82 /** 83 * Wrapper class for all inner mongo collection manipulation errors. 84 * 85 * It does not indicate problem with vibe.d driver itself. Most frequently this 86 * one is thrown when MongoConnection is in checked mode and getLastError() has something interesting. 87 */ 88 class MongoDBException : MongoException 89 { 90 @safe: 91 92 MongoErrorDescription description; 93 alias description this; 94 95 this(MongoErrorDescription description, string file = __FILE__, 96 size_t line = __LINE__, Throwable next = null) 97 { 98 super(description.message, file, line, next); 99 this.description = description; 100 } 101 } 102 103 /** 104 * Generic class for all exceptions related to authentication problems. 105 * 106 * I.e.: unsupported mechanisms or wrong credentials. 107 */ 108 class MongoAuthException : MongoException 109 { 110 @safe: 111 112 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 113 { 114 super(message, file, line, next); 115 } 116 } 117 118 /** 119 [internal] Provides low-level mongodb protocol access. 120 121 It is not intended for direct usage. Please use vibe.db.mongo.db and vibe.db.mongo.collection modules for your code. 122 Note that a MongoConnection may only be used from one fiber/thread at a time. 123 */ 124 final class MongoConnection { 125 @safe: 126 127 import vibe.stream.wrapper /* : StreamOutputRange, streamOutputRange */; 128 import vibe.internal.interfaceproxy; 129 import vibe.core.stream : InputStream, Stream; 130 131 private { 132 MongoClientSettings m_settings; 133 TCPConnection m_conn; 134 InterfaceProxy!Stream m_stream; 135 ulong m_bytesRead; 136 int m_msgid = 1; 137 StreamOutputRange!(InterfaceProxy!Stream) m_outRange; 138 ServerDescription m_description; 139 /// Flag to prevent recursive connections when server closes connection while connecting 140 bool m_allowReconnect; 141 bool m_isAuthenticating; 142 bool m_supportsOpMsg; 143 } 144 145 enum ushort defaultPort = MongoClientSettings.defaultPort; 146 147 /// Simplified constructor overload, with no m_settings 148 this(string server, ushort port = defaultPort) 149 { 150 m_settings = new MongoClientSettings(); 151 m_settings.hosts ~= MongoHost(server, port); 152 } 153 154 this(MongoClientSettings cfg) 155 { 156 m_settings = cfg; 157 158 // Now let's check for features that are not yet supported. 159 if(m_settings.hosts.length > 1) 160 logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s", 161 m_settings.hosts[0].name, m_settings.hosts[0].port); 162 } 163 164 void connect() 165 { 166 bool isTLS; 167 168 /* 169 * TODO: Connect to one of the specified hosts taking into consideration 170 * options such as connect timeouts and so on. 171 */ 172 try { 173 import core.time : Duration, msecs; 174 175 auto connectTimeout = m_settings.connectTimeoutMS.msecs; 176 if (m_settings.connectTimeoutMS == 0) 177 connectTimeout = Duration.max; 178 179 m_conn = connectTCP(m_settings.hosts[0].name, m_settings.hosts[0].port, null, 0, connectTimeout); 180 m_conn.tcpNoDelay = true; 181 if (m_settings.socketTimeout != Duration.zero) 182 m_conn.readTimeout = m_settings.socketTimeout; 183 if (m_settings.ssl) { 184 auto ctx = createTLSContext(TLSContextKind.client); 185 if (!m_settings.sslverifycertificate) { 186 ctx.peerValidationMode = TLSPeerValidationMode.none; 187 } 188 if (m_settings.sslPEMKeyFile) { 189 ctx.useCertificateChainFile(m_settings.sslPEMKeyFile); 190 ctx.usePrivateKeyFile(m_settings.sslPEMKeyFile); 191 } 192 if (m_settings.sslCAFile) { 193 ctx.useTrustedCertificateFile(m_settings.sslCAFile); 194 } 195 196 m_stream = createTLSStream(m_conn, ctx, m_settings.hosts[0].name); 197 isTLS = true; 198 } 199 else { 200 m_stream = m_conn; 201 } 202 m_outRange = streamOutputRange(m_stream); 203 } 204 catch (Exception e) { 205 throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e); 206 } 207 208 scope (failure) disconnect(); 209 210 m_allowReconnect = false; 211 scope (exit) 212 m_allowReconnect = true; 213 214 Bson handshake = Bson.emptyObject; 215 static assert(!is(typeof(m_settings.loadBalanced)), "loadBalanced was added to the API, set legacy if it's true here!"); 216 // TODO: must use legacy handshake if m_settings.loadBalanced is true 217 // and also once we allow configuring a server API version in the driver 218 // (https://github.com/mongodb/specifications/blob/master/source/versioned-api/versioned-api.rst) 219 m_supportsOpMsg = false; 220 bool legacyHandshake = false; 221 if (legacyHandshake) 222 { 223 handshake["isMaster"] = Bson(1); 224 handshake["helloOk"] = Bson(1); 225 } 226 else 227 { 228 handshake["hello"] = Bson(1); 229 m_supportsOpMsg = true; 230 } 231 232 import os = std.system; 233 import compiler = std.compiler; 234 string platform = compiler.name ~ " " 235 ~ compiler.version_major.to!string ~ "." ~ compiler.version_minor.to!string; 236 // TODO: add support for os.version 237 238 handshake["client"] = Bson([ 239 "driver": Bson(["name": Bson("vibe.db.mongo"), "version": Bson(vibeVersionString)]), 240 "os": Bson(["type": Bson(os.os.to!string), "architecture": Bson(hostArchitecture)]), 241 "platform": Bson(platform) 242 ]); 243 244 if (m_settings.appName.length) { 245 enforce!MongoAuthException(m_settings.appName.length <= 128, 246 "The application name may not be larger than 128 bytes"); 247 handshake["client"]["application"] = Bson(["name": Bson(m_settings.appName)]); 248 } 249 250 auto reply = runCommand!(Bson, MongoAuthException)("admin", handshake); 251 m_description = deserializeBson!ServerDescription(reply); 252 253 if (m_description.satisfiesVersion(WireVersion.v36)) 254 m_supportsOpMsg = true; 255 256 m_bytesRead = 0; 257 auto authMechanism = m_settings.authMechanism; 258 if (authMechanism == MongoAuthMechanism.none) 259 { 260 if (m_settings.sslPEMKeyFile != null && m_description.satisfiesVersion(WireVersion.v26)) 261 { 262 authMechanism = MongoAuthMechanism.mongoDBX509; 263 } 264 else if (m_settings.digest.length) 265 { 266 // SCRAM-SHA-1 default since 3.0, otherwise use legacy authentication 267 if (m_description.satisfiesVersion(WireVersion.v30)) 268 authMechanism = MongoAuthMechanism.scramSHA1; 269 else 270 authMechanism = MongoAuthMechanism.mongoDBCR; 271 } 272 } 273 274 if (authMechanism == MongoAuthMechanism.mongoDBCR && m_description.satisfiesVersion(WireVersion.v40)) 275 throw new MongoAuthException("Trying to force MONGODB-CR authentication on a >=4.0 server not supported"); 276 277 if (authMechanism == MongoAuthMechanism.scramSHA1 && !m_description.satisfiesVersion(WireVersion.v30)) 278 throw new MongoAuthException("Trying to force SCRAM-SHA-1 authentication on a <3.0 server not supported"); 279 280 if (authMechanism == MongoAuthMechanism.mongoDBX509 && !m_description.satisfiesVersion(WireVersion.v26)) 281 throw new MongoAuthException("Trying to force MONGODB-X509 authentication on a <2.6 server not supported"); 282 283 if (authMechanism == MongoAuthMechanism.mongoDBX509 && !isTLS) 284 throw new MongoAuthException("Trying to force MONGODB-X509 authentication, but didn't use ssl!"); 285 286 m_isAuthenticating = true; 287 scope (exit) 288 m_isAuthenticating = false; 289 final switch (authMechanism) 290 { 291 case MongoAuthMechanism.none: 292 break; 293 case MongoAuthMechanism.mongoDBX509: 294 certAuthenticate(); 295 break; 296 case MongoAuthMechanism.scramSHA1: 297 scramAuthenticate(); 298 break; 299 case MongoAuthMechanism.mongoDBCR: 300 authenticate(); 301 break; 302 } 303 } 304 305 void disconnect() 306 { 307 if (m_conn) { 308 if (m_stream && m_conn.connected) { 309 m_outRange.flush(); 310 311 m_stream.finalize(); 312 m_stream = InterfaceProxy!Stream.init; 313 } 314 315 m_conn.close(); 316 m_conn = TCPConnection.init; 317 } 318 319 m_outRange.drop(); 320 } 321 322 @property bool connected() const { return m_conn && m_conn.connected; } 323 324 @property const(ServerDescription) description() const { return m_description; } 325 326 deprecated("Non-functional since MongoDB 5.1") void update(string collection_name, UpdateFlags flags, Bson selector, Bson update) 327 { 328 scope(failure) disconnect(); 329 send(OpCode.Update, -1, cast(int)0, collection_name, cast(int)flags, selector, update); 330 if (m_settings.safe) checkForError(collection_name); 331 } 332 333 deprecated("Non-functional since MongoDB 5.1") void insert(string collection_name, InsertFlags flags, Bson[] documents) 334 { 335 scope(failure) disconnect(); 336 foreach (d; documents) if (d["_id"].isNull()) d["_id"] = Bson(BsonObjectID.generate()); 337 send(OpCode.Insert, -1, cast(int)flags, collection_name, documents); 338 if (m_settings.safe) checkForError(collection_name); 339 } 340 341 deprecated("Non-functional since MongoDB 5.1: use `find` to query collections instead - instead of `$cmd` use `runCommand` to send commands - use listIndexes and listCollections instead of `<database>.system.indexes` and `<database>.system.namsepsaces`") 342 void query(T)(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 343 { 344 scope(failure) disconnect(); 345 flags |= m_settings.defQueryFlags; 346 int id; 347 if (returnFieldSelector.isNull) 348 id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query); 349 else 350 id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query, returnFieldSelector); 351 recvReply!T(id, on_msg, on_doc); 352 } 353 354 /** 355 Runs the given Bson command (Bson object with the first entry in the map 356 being the command name) on the given database. 357 358 Using `runCommand` checks that the command completed successfully by 359 checking that `result["ok"].get!double == 1.0`. Throws the 360 `CommandFailException` on failure. 361 362 Using `runCommandUnchecked` will return the result as-is. Developers may 363 check the `result["ok"]` value themselves. (It's a double that needs to 364 be compared with 1.0 by default) 365 366 Throws: 367 - `CommandFailException` (template argument) only in the 368 `runCommand` overload, when the command response is not ok. 369 - `MongoDriverException` when internal protocol errors occur. 370 */ 371 Bson runCommand(T, CommandFailException = MongoDriverException)( 372 string database, 373 Bson command, 374 string errorInfo = __FUNCTION__, 375 string errorFile = __FILE__, 376 size_t errorLine = __LINE__ 377 ) 378 in(database.length, "runCommand requires a database argument") 379 { 380 return runCommandImpl!(T, CommandFailException)( 381 database, command, true, errorInfo, errorFile, errorLine); 382 } 383 384 Bson runCommandUnchecked(T, CommandFailException = MongoDriverException)( 385 string database, 386 Bson command, 387 string errorInfo = __FUNCTION__, 388 string errorFile = __FILE__, 389 size_t errorLine = __LINE__ 390 ) 391 in(database.length, "runCommand requires a database argument") 392 { 393 return runCommandImpl!(T, CommandFailException)( 394 database, command, false, errorInfo, errorFile, errorLine); 395 } 396 397 private Bson runCommandImpl(T, CommandFailException)( 398 string database, 399 Bson command, 400 bool testOk = true, 401 string errorInfo = __FUNCTION__, 402 string errorFile = __FILE__, 403 size_t errorLine = __LINE__ 404 ) 405 in(database.length, "runCommand requires a database argument") 406 { 407 import std.array; 408 409 string formatErrorInfo(string msg) @safe 410 { 411 return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")"); 412 } 413 414 Bson ret; 415 416 if (m_supportsOpMsg) 417 { 418 debug (VibeVerboseMongo) 419 logDiagnostic("runCommand: [db=%s] %s", database, command); 420 421 command["$db"] = Bson(database); 422 423 auto id = sendMsg(-1, 0, command); 424 Appender!(Bson[])[string] docs; 425 recvMsg!true(id, (flags, root) @safe { 426 ret = root; 427 }, (scope ident, size) @safe { 428 docs[ident] = appender!(Bson[]); 429 }, (scope ident, push) @safe { 430 docs[ident].put(push); 431 }); 432 433 foreach (ident, app; docs) 434 ret[ident] = Bson(app.data); 435 } 436 else 437 { 438 debug (VibeVerboseMongo) 439 logDiagnostic("runCommand(legacy): [db=%s] %s", database, command); 440 auto id = send(OpCode.Query, -1, 0, database ~ ".$cmd", 0, -1, command, Bson(null)); 441 recvReply!T(id, 442 (cursor, flags, first_doc, num_docs) { 443 logTrace("runCommand(%s) flags: %s, cursor: %s, documents: %s", database, flags, cursor, num_docs); 444 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), formatErrorInfo("command query failed")); 445 enforce!MongoDriverException(num_docs == 1, formatErrorInfo("received more than one document in command response")); 446 }, 447 (idx, ref doc) { 448 ret = doc; 449 }); 450 } 451 452 if (testOk && ret["ok"].get!double != 1.0) 453 throw new CommandFailException(formatErrorInfo("command failed: " 454 ~ ret["errmsg"].opt!string("(no message)"))); 455 456 static if (is(T == Bson)) return ret; 457 else { 458 T doc = deserializeBson!T(bson); 459 return doc; 460 } 461 } 462 463 template getMore(T) 464 { 465 deprecated("use the modern overload instead") 466 void getMore(string collection_name, int nret, long cursor_id, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 467 { 468 scope(failure) disconnect(); 469 auto parts = collection_name.findSplit("."); 470 auto id = send(OpCode.GetMore, -1, cast(int)0, parts[0], parts[2], nret, cursor_id); 471 recvReply!T(id, on_msg, on_doc); 472 } 473 474 /** 475 * Modern (MongoDB 3.2+ compatible) getMore implementation using the getMore 476 * command and OP_MSG. (if supported) 477 * 478 * Falls back to compatibility for older MongoDB versions, but those are not 479 * officially supported anymore. 480 * 481 * Upgrade_notes: 482 * - error checking is now done inside this function 483 * - document index is no longer sent, instead the callback is called sequentially 484 * 485 * Throws: $(LREF MongoDriverException) in case the command fails. 486 */ 487 void getMore(long cursor_id, string database, string collection_name, long nret, 488 scope GetMoreHeaderDelegate on_header, 489 scope GetMoreDocumentDelegate!T on_doc, 490 Duration timeout = Duration.max, 491 string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__) 492 { 493 Bson command = Bson.emptyObject; 494 command["getMore"] = Bson(cursor_id); 495 command["$db"] = Bson(database); 496 command["collection"] = Bson(collection_name); 497 command["batchSize"] = Bson(nret); 498 if (timeout != Duration.max && timeout.total!"msecs" < int.max) 499 command["maxTimeMS"] = Bson(cast(int)timeout.total!"msecs"); 500 501 string formatErrorInfo(string msg) @safe 502 { 503 return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")"); 504 } 505 506 scope (failure) disconnect(); 507 508 if (m_supportsOpMsg) 509 { 510 enum needsDup = hasIndirections!T || is(T == Bson); 511 512 debug (VibeVerboseMongo) 513 logDiagnostic("getMore: [db=%s] %s", database, command); 514 515 auto id = sendMsg(-1, 0, command); 516 recvMsg!needsDup(id, (flags, scope root) @safe { 517 if (root["ok"].get!double != 1.0) 518 throw new MongoDriverException(formatErrorInfo("getMore failed: " 519 ~ root["errmsg"].opt!string("(no message)"))); 520 521 auto cursor = root["cursor"]; 522 auto batch = cursor["nextBatch"].get!(Bson[]); 523 on_header(cursor["id"].get!long, cursor["ns"].get!string, batch.length); 524 525 foreach (ref push; batch) 526 { 527 T doc = deserializeBson!T(push); 528 on_doc(doc); 529 } 530 }, (scope ident, size) @safe {}, (scope ident, scope push) @safe { 531 throw new MongoDriverException(formatErrorInfo("unexpected section type 1 in getMore response")); 532 }); 533 } 534 else 535 { 536 debug (VibeVerboseMongo) 537 logDiagnostic("getMore(legacy): [db=%s] collection=%s, cursor=%s, nret=%s", database, collection_name, cursor_id, nret); 538 539 int brokenId = 0; 540 int nextId = 0; 541 int num_docs; 542 // array to store out-of-order items, to push them into the callback properly 543 T[] compatibilitySort; 544 string full_name = database ~ '.' ~ collection_name; 545 auto id = send(OpCode.GetMore, -1, cast(int)0, full_name, nret, cursor_id); 546 recvReply!T(id, (long cursor, ReplyFlags flags, int first_doc, int num_docs) 547 { 548 enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), 549 formatErrorInfo("Invalid cursor handle.")); 550 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), 551 formatErrorInfo("Query failed. Does the database exist?")); 552 553 on_header(cursor, full_name, num_docs); 554 }, (size_t idx, ref T doc) { 555 if (cast(int)idx == nextId) { 556 on_doc(doc); 557 nextId++; 558 brokenId = nextId; 559 } else { 560 enforce!MongoDriverException(idx >= brokenId, 561 formatErrorInfo("Got legacy document with same id after having already processed it!")); 562 enforce!MongoDriverException(idx < num_docs, 563 formatErrorInfo("Received more documents than the database reported to us")); 564 565 size_t arrayIndex = cast(int)idx - brokenId; 566 if (!compatibilitySort.length) 567 compatibilitySort.length = num_docs - brokenId; 568 compatibilitySort[arrayIndex] = doc; 569 } 570 }); 571 572 foreach (doc; compatibilitySort) 573 on_doc(doc); 574 } 575 } 576 } 577 578 /// Forwards the `find` command passed in to the database, handles the 579 /// callbacks like with getMore. This exists for easier integration with 580 /// MongoCursor!T. 581 package void startFind(T)(Bson command, 582 scope GetMoreHeaderDelegate on_header, 583 scope GetMoreDocumentDelegate!T on_doc, 584 string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__) 585 { 586 string formatErrorInfo(string msg) @safe 587 { 588 return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")"); 589 } 590 591 scope (failure) disconnect(); 592 593 enforce!MongoDriverException(m_supportsOpMsg, formatErrorInfo("Database does not support required OP_MSG for new style queries")); 594 595 enum needsDup = hasIndirections!T || is(T == Bson); 596 597 debug (VibeVerboseMongo) 598 logDiagnostic("startFind: %s", command); 599 600 auto id = sendMsg(-1, 0, command); 601 recvMsg!needsDup(id, (flags, scope root) @safe { 602 if (root["ok"].get!double != 1.0) 603 throw new MongoDriverException(formatErrorInfo("find failed: " 604 ~ root["errmsg"].opt!string("(no message)"))); 605 606 auto cursor = root["cursor"]; 607 auto batch = cursor["firstBatch"].get!(Bson[]); 608 on_header(cursor["id"].get!long, cursor["ns"].get!string, batch.length); 609 610 foreach (ref push; batch) 611 { 612 T doc = deserializeBson!T(push); 613 on_doc(doc); 614 } 615 }, (scope ident, size) @safe {}, (scope ident, scope push) @safe { 616 throw new MongoDriverException(formatErrorInfo("unexpected section type 1 in find response")); 617 }); 618 } 619 620 deprecated("Non-functional since MongoDB 5.1") void delete_(string collection_name, DeleteFlags flags, Bson selector) 621 { 622 scope(failure) disconnect(); 623 send(OpCode.Delete, -1, cast(int)0, collection_name, cast(int)flags, selector); 624 if (m_settings.safe) checkForError(collection_name); 625 } 626 627 deprecated("Non-functional since MongoDB 5.1, use the overload taking the collection as well") 628 void killCursors(scope long[] cursors) 629 { 630 scope(failure) disconnect(); 631 send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors); 632 } 633 634 void killCursors(string collection, scope long[] cursors) 635 { 636 scope(failure) disconnect(); 637 // TODO: could add special case to runCommand to not return anything 638 if (m_supportsOpMsg) 639 { 640 Bson command = Bson.emptyObject; 641 auto parts = collection.findSplit("."); 642 command["killCursors"] = Bson(parts[2]); 643 command["cursors"] = cursors.serializeToBson; 644 runCommand!Bson(parts[0], command); 645 } 646 else 647 { 648 send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors); 649 } 650 } 651 652 MongoErrorDescription getLastError(string db) 653 { 654 // Though higher level abstraction level by concept, this function 655 // is implemented here to allow to check errors upon every request 656 // on connection level. 657 658 Bson command_and_options = Bson.emptyObject; 659 command_and_options["getLastError"] = Bson(1.0); 660 661 if(m_settings.w != m_settings.w.init) 662 command_and_options["w"] = m_settings.w; // Already a Bson struct 663 if(m_settings.wTimeoutMS != m_settings.wTimeoutMS.init) 664 command_and_options["wtimeout"] = Bson(m_settings.wTimeoutMS); 665 if(m_settings.journal) 666 command_and_options["j"] = Bson(true); 667 if(m_settings.fsync) 668 command_and_options["fsync"] = Bson(true); 669 670 _MongoErrorDescription ret; 671 672 auto error = runCommandUnchecked!Bson(db, command_and_options); 673 674 try { 675 ret = MongoErrorDescription( 676 error["errmsg"].opt!string(error["err"].opt!string("")), 677 error["code"].opt!int(-1), 678 error["connectionId"].opt!int(-1), 679 error["n"].opt!int(-1), 680 error["ok"].get!double() 681 ); 682 } catch (Exception e) { 683 throw new MongoDriverException(e.msg); 684 } 685 686 return ret; 687 } 688 689 /** Queries the server for all databases. 690 691 Returns: 692 An input range of $(D MongoDBInfo) values. 693 */ 694 auto listDatabases() 695 { 696 string cn = m_settings.database == string.init ? "admin" : m_settings.database; 697 698 auto cmd = Bson(["listDatabases":Bson(1)]); 699 700 static MongoDBInfo toInfo(const(Bson) db_doc) { 701 return MongoDBInfo( 702 db_doc["name"].get!string, 703 // double on MongoDB < 5.0, long afterwards 704 db_doc["sizeOnDisk"].to!double, 705 db_doc["empty"].get!bool 706 ); 707 } 708 709 auto result = runCommand!Bson(cn, cmd)["databases"]; 710 711 return result.byValue.map!toInfo; 712 } 713 714 private int recvMsg(bool dupBson = true)(int reqid, 715 scope MsgReplyDelegate!dupBson on_sec0, 716 scope MsgSection1StartDelegate on_sec1_start, 717 scope MsgSection1Delegate!dupBson on_sec1_doc) 718 { 719 import std.traits; 720 721 auto bytes_read = m_bytesRead; 722 int msglen = recvInt(); 723 int resid = recvInt(); 724 int respto = recvInt(); 725 int opcode = recvInt(); 726 727 enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!"); 728 enforce!MongoDriverException(opcode == OpCode.Msg, "Got wrong reply type! (must be OP_MSG)"); 729 730 uint flagBits = recvUInt(); 731 const bool hasCRC = (flagBits & (1 << 16)) != 0; 732 733 int sectionLength = cast(int)(msglen - 4 * int.sizeof - flagBits.sizeof); 734 if (hasCRC) 735 sectionLength -= uint.sizeof; // CRC present 736 737 bool gotSec0; 738 while (m_bytesRead - bytes_read < sectionLength) { 739 // TODO: directly deserialize from the wire 740 static if (!dupBson) { 741 ubyte[256] buf = void; 742 ubyte[] bufsl = buf; 743 } 744 745 ubyte payloadType = recvUByte(); 746 switch (payloadType) { 747 case 0: 748 gotSec0 = true; 749 scope Bson data; 750 static if (dupBson) 751 data = recvBsonDup(); 752 else 753 data = (() @trusted => recvBson(bufsl))(); 754 755 debug (VibeVerboseMongo) 756 logDiagnostic("recvData: sec0[flags=%x]: %s", flagBits, data); 757 on_sec0(flagBits, data); 758 break; 759 case 1: 760 if (!gotSec0) 761 throw new MongoDriverException("Got OP_MSG section 1 before section 0, which is not supported by vibe.d"); 762 763 auto section_bytes_read = m_bytesRead; 764 int size = recvInt(); 765 auto identifier = recvCString(); 766 on_sec1_start(identifier, size); 767 while (m_bytesRead - section_bytes_read < size) { 768 scope Bson data; 769 static if (dupBson) 770 data = recvBsonDup(); 771 else 772 data = (() @trusted => recvBson(bufsl))(); 773 774 debug (VibeVerboseMongo) 775 logDiagnostic("recvData: sec1[%s]: %s", identifier, data); 776 777 on_sec1_doc(identifier, data); 778 } 779 break; 780 default: 781 throw new MongoDriverException("Received unexpected payload section type " ~ payloadType.to!string); 782 } 783 } 784 785 if (hasCRC) 786 { 787 uint crc = recvUInt(); 788 // TODO: validate CRC 789 logDiagnostic("recvData: crc=%s (discarded)", crc); 790 } 791 792 assert(bytes_read + msglen == m_bytesRead, 793 format!"Packet size mismatch! Expected %s bytes, but read %s."( 794 msglen, m_bytesRead - bytes_read)); 795 796 return resid; 797 } 798 799 private int recvReply(T)(int reqid, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 800 { 801 auto bytes_read = m_bytesRead; 802 int msglen = recvInt(); 803 int resid = recvInt(); 804 int respto = recvInt(); 805 int opcode = recvInt(); 806 807 enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!"); 808 enforce!MongoDriverException(opcode == OpCode.Reply, "Got a non-'Reply' reply!"); 809 810 auto flags = cast(ReplyFlags)recvInt(); 811 long cursor = recvLong(); 812 int start = recvInt(); 813 int numret = recvInt(); 814 815 scope (exit) { 816 if (m_bytesRead - bytes_read < msglen) { 817 logWarn("MongoDB reply was longer than expected, skipping the rest: %d vs. %d", msglen, m_bytesRead - bytes_read); 818 ubyte[] dst = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)]; 819 recv(dst); 820 } else if (m_bytesRead - bytes_read > msglen) { 821 logWarn("MongoDB reply was shorter than expected. Dropping connection."); 822 disconnect(); 823 throw new MongoDriverException("MongoDB reply was too short for data."); 824 } 825 } 826 827 on_msg(cursor, flags, start, numret); 828 static if (hasIndirections!T || is(T == Bson)) 829 auto buf = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)]; 830 foreach (i; 0 .. cast(size_t)numret) { 831 // TODO: directly deserialize from the wire 832 static if (!hasIndirections!T && !is(T == Bson)) { 833 ubyte[256] buf = void; 834 ubyte[] bufsl = buf; 835 auto bson = () @trusted { return recvBson(bufsl); } (); 836 } else { 837 auto bson = () @trusted { return recvBson(buf); } (); 838 } 839 840 // logDebugV("Received mongo response on %s:%s: %s", reqid, i, bson); 841 842 static if (is(T == Bson)) on_doc(i, bson); 843 else { 844 T doc = deserializeBson!T(bson); 845 on_doc(i, doc); 846 } 847 } 848 849 return resid; 850 } 851 852 private int send(ARGS...)(OpCode code, int response_to, scope ARGS args) 853 { 854 if( !connected() ) { 855 if (m_allowReconnect) connect(); 856 else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating"); 857 else throw new MongoDriverException("Connection got closed while connecting"); 858 } 859 int id = nextMessageId(); 860 // sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types 861 sendValue!int(16 + sendLength(args)); 862 sendValue!int(id); 863 sendValue!int(response_to); 864 sendValue!int(cast(int)code); 865 foreach (a; args) sendValue(a); 866 m_outRange.flush(); 867 // logDebugV("Sent mongo opcode %s (id %s) in response to %s with args %s", code, id, response_to, tuple(args)); 868 return id; 869 } 870 871 private int sendMsg(int response_to, uint flagBits, Bson document) 872 { 873 if( !connected() ) { 874 if (m_allowReconnect) connect(); 875 else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating"); 876 else throw new MongoDriverException("Connection got closed while connecting"); 877 } 878 int id = nextMessageId(); 879 // sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types 880 sendValue!int(21 + sendLength(document)); 881 sendValue!int(id); 882 sendValue!int(response_to); 883 sendValue!int(cast(int)OpCode.Msg); 884 sendValue!uint(flagBits); 885 const bool hasCRC = (flagBits & (1 << 16)) != 0; 886 assert(!hasCRC, "sending with CRC bits not yet implemented"); 887 sendValue!ubyte(0); 888 sendValue(document); 889 m_outRange.flush(); 890 return id; 891 } 892 893 private void sendValue(T)(scope T value) 894 { 895 import std.traits; 896 static if (is(T == ubyte)) m_outRange.put(value); 897 else static if (is(T == int) || is(T == uint)) sendBytes(toBsonData(value)); 898 else static if (is(T == long)) sendBytes(toBsonData(value)); 899 else static if (is(T == Bson)) sendBytes(value.data); 900 else static if (is(T == string)) { 901 sendBytes(cast(const(ubyte)[])value); 902 sendBytes(cast(const(ubyte)[])"\0"); 903 } else static if (isArray!T) { 904 foreach (v; value) 905 sendValue(v); 906 } else static assert(false, "Unexpected type: "~T.stringof); 907 } 908 909 private void sendBytes(in ubyte[] data){ m_outRange.put(data); } 910 911 private T recvInteger(T)() { ubyte[T.sizeof] ret; recv(ret); return fromBsonData!T(ret); } 912 private alias recvUByte = recvInteger!ubyte; 913 private alias recvInt = recvInteger!int; 914 private alias recvUInt = recvInteger!uint; 915 private alias recvLong = recvInteger!long; 916 private Bson recvBson(ref ubyte[] buf) 917 @system { 918 int len = recvInt(); 919 ubyte[] dst; 920 if (len > buf.length) dst = new ubyte[len]; 921 else { 922 dst = buf[0 .. len]; 923 buf = buf[len .. $]; 924 } 925 dst[0 .. 4] = toBsonData(len)[]; 926 recv(dst[4 .. $]); 927 return Bson(Bson.Type.object, cast(immutable)dst); 928 } 929 private Bson recvBsonDup() 930 @trusted { 931 ubyte[4] size; 932 recv(size[]); 933 ubyte[] dst = new ubyte[fromBsonData!uint(size)]; 934 dst[0 .. 4] = size; 935 recv(dst[4 .. $]); 936 return Bson(Bson.Type.object, cast(immutable)dst); 937 } 938 private void recv(ubyte[] dst) { enforce(m_stream); m_stream.read(dst); m_bytesRead += dst.length; } 939 private const(char)[] recvCString() 940 { 941 auto buf = new ubyte[32]; 942 ptrdiff_t i = -1; 943 do 944 { 945 i++; 946 if (i == buf.length) buf.length *= 2; 947 recv(buf[i .. i + 1]); 948 } while (buf[i] != 0); 949 return cast(const(char)[])buf[0 .. i]; 950 } 951 952 private int nextMessageId() { return m_msgid++; } 953 954 private void checkForError(string collection_name) 955 { 956 auto coll = collection_name.split(".")[0]; 957 auto err = getLastError(coll); 958 959 enforce( 960 err.code < 0, 961 new MongoDBException(err) 962 ); 963 } 964 965 private void certAuthenticate() 966 { 967 Bson cmd = Bson.emptyObject; 968 cmd["authenticate"] = Bson(1); 969 cmd["mechanism"] = Bson("MONGODB-X509"); 970 if (m_description.satisfiesVersion(WireVersion.v34)) 971 { 972 if (m_settings.username.length) 973 cmd["user"] = Bson(m_settings.username); 974 } 975 else 976 { 977 if (!m_settings.username.length) 978 throw new MongoAuthException("No username provided but connected to MongoDB server <=3.2 not supporting this"); 979 980 cmd["user"] = Bson(m_settings.username); 981 } 982 runCommand!(Bson, MongoAuthException)(m_settings.getAuthDatabase, cmd); 983 } 984 985 private void authenticate() 986 { 987 scope (failure) disconnect(); 988 989 string cn = m_settings.getAuthDatabase; 990 991 auto cmd = Bson(["getnonce": Bson(1)]); 992 auto result = runCommand!(Bson, MongoAuthException)(cn, cmd); 993 string nonce = result["nonce"].get!string; 994 string key = toLower(toHexString(md5Of(nonce ~ m_settings.username ~ m_settings.digest)).idup); 995 996 cmd = Bson.emptyObject; 997 cmd["authenticate"] = Bson(1); 998 cmd["mechanism"] = Bson("MONGODB-CR"); 999 cmd["nonce"] = Bson(nonce); 1000 cmd["user"] = Bson(m_settings.username); 1001 cmd["key"] = Bson(key); 1002 runCommand!(Bson, MongoAuthException)(cn, cmd); 1003 } 1004 1005 private void scramAuthenticate() 1006 { 1007 import vibe.db.mongo.sasl; 1008 1009 string cn = m_settings.getAuthDatabase; 1010 1011 ScramState state; 1012 string payload = state.createInitialRequest(m_settings.username); 1013 1014 auto cmd = Bson.emptyObject; 1015 cmd["saslStart"] = Bson(1); 1016 cmd["mechanism"] = Bson("SCRAM-SHA-1"); 1017 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 1018 1019 auto doc = runCommand!(Bson, MongoAuthException)(cn, cmd); 1020 string response = cast(string)doc["payload"].get!BsonBinData().rawData; 1021 Bson conversationId = doc["conversationId"]; 1022 1023 payload = state.update(m_settings.digest, response); 1024 cmd = Bson.emptyObject; 1025 cmd["saslContinue"] = Bson(1); 1026 cmd["conversationId"] = conversationId; 1027 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 1028 1029 doc = runCommand!(Bson, MongoAuthException)(cn, cmd); 1030 response = cast(string)doc["payload"].get!BsonBinData().rawData; 1031 1032 payload = state.finalize(response); 1033 cmd = Bson.emptyObject; 1034 cmd["saslContinue"] = Bson(1); 1035 cmd["conversationId"] = conversationId; 1036 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 1037 runCommand!(Bson, MongoAuthException)(cn, cmd); 1038 } 1039 } 1040 1041 private enum OpCode : int { 1042 Reply = 1, // sent only by DB 1043 Update = 2001, 1044 Insert = 2002, 1045 Reserved1 = 2003, 1046 Query = 2004, 1047 GetMore = 2005, 1048 Delete = 2006, 1049 KillCursors = 2007, 1050 1051 Compressed = 2012, 1052 Msg = 2013, 1053 } 1054 1055 private alias ReplyDelegate = void delegate(long cursor, ReplyFlags flags, int first_doc, int num_docs) @safe; 1056 private template DocDelegate(T) { alias DocDelegate = void delegate(size_t idx, ref T doc) @safe; } 1057 1058 private alias MsgReplyDelegate(bool dupBson : true) = void delegate(uint flags, Bson document) @safe; 1059 private alias MsgReplyDelegate(bool dupBson : false) = void delegate(uint flags, scope Bson document) @safe; 1060 private alias MsgSection1StartDelegate = void delegate(scope const(char)[] identifier, int size) @safe; 1061 private alias MsgSection1Delegate(bool dupBson : true) = void delegate(scope const(char)[] identifier, Bson document) @safe; 1062 private alias MsgSection1Delegate(bool dupBson : false) = void delegate(scope const(char)[] identifier, scope Bson document) @safe; 1063 1064 alias GetMoreHeaderDelegate = void delegate(long id, string ns, size_t count) @safe; 1065 alias GetMoreDocumentDelegate(T) = void delegate(ref T document) @safe; 1066 1067 struct MongoDBInfo 1068 { 1069 string name; 1070 double sizeOnDisk; 1071 bool empty; 1072 } 1073 1074 private int sendLength(ARGS...)(ARGS args) 1075 { 1076 import std.traits; 1077 static if (ARGS.length == 1) { 1078 alias T = ARGS[0]; 1079 static if (is(T == string)) return cast(int)args[0].length + 1; 1080 else static if (is(T == int)) return 4; 1081 else static if (is(T == long)) return 8; 1082 else static if (is(T == Bson)) return cast(int)args[0].data.length; 1083 else static if (isArray!T) { 1084 int ret = 0; 1085 foreach (el; args[0]) ret += sendLength(el); 1086 return ret; 1087 } else static assert(false, "Unexpected type: "~T.stringof); 1088 } 1089 else if (ARGS.length == 0) return 0; 1090 else return sendLength(args[0 .. $/2]) + sendLength(args[$/2 .. $]); 1091 } 1092 1093 struct ServerDescription 1094 { 1095 enum ServerType 1096 { 1097 unknown, 1098 standalone, 1099 mongos, 1100 possiblePrimary, 1101 RSPrimary, 1102 RSSecondary, 1103 RSArbiter, 1104 RSOther, 1105 RSGhost 1106 } 1107 1108 @optional: 1109 string address; 1110 string error; 1111 float roundTripTime = 0; 1112 Nullable!BsonDate lastWriteDate; 1113 Nullable!BsonObjectID opTime; 1114 ServerType type = ServerType.unknown; 1115 WireVersion minWireVersion, maxWireVersion; 1116 string me; 1117 string[] hosts, passives, arbiters; 1118 string[string] tags; 1119 string setName; 1120 Nullable!int setVersion; 1121 Nullable!BsonObjectID electionId; 1122 string primary; 1123 string lastUpdateTime = "infinity ago"; 1124 Nullable!int logicalSessionTimeoutMinutes; 1125 1126 bool satisfiesVersion(WireVersion wireVersion) @safe const @nogc pure nothrow 1127 { 1128 return maxWireVersion >= wireVersion; 1129 } 1130 } 1131 1132 enum WireVersion : int 1133 { 1134 old = 0, 1135 v26 = 1, 1136 v26_2 = 2, 1137 v30 = 3, 1138 v32 = 4, 1139 v34 = 5, 1140 v36 = 6, 1141 v40 = 7, 1142 v42 = 8, 1143 v44 = 9, 1144 v49 = 12, 1145 v50 = 13, 1146 v51 = 14, 1147 v52 = 15, 1148 v53 = 16 1149 } 1150 1151 private string getHostArchitecture() 1152 { 1153 import os = std.system; 1154 1155 version (X86_64) 1156 string arch = "x86_64 "; 1157 else version (X86) 1158 string arch = "x86 "; 1159 else version (AArch64) 1160 string arch = "aarch64 "; 1161 else version (ARM_HardFloat) 1162 string arch = "armhf "; 1163 else version (ARM) 1164 string arch = "arm "; 1165 else version (PPC64) 1166 string arch = "ppc64 "; 1167 else version (PPC) 1168 string arch = "ppc "; 1169 else 1170 string arch = "unknown "; 1171 1172 return arch ~ os.endian.to!string; 1173 } 1174 1175 private static immutable hostArchitecture = getHostArchitecture;