1 /** 2 Low level mongodb protocol. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.connection; 9 10 // /// prints ALL modern OP_MSG queries and legacy runCommand invocations to logDiagnostic 11 // debug = VibeVerboseMongo; 12 13 public import vibe.data.bson; 14 15 import vibe.core.core : vibeVersionString; 16 import vibe.core.log; 17 import vibe.core.net; 18 import vibe.data.bson; 19 import vibe.db.mongo.flags; 20 import vibe.db.mongo.settings; 21 import vibe.inet.webform; 22 import vibe.stream.tls; 23 24 import std.algorithm : findSplit, map, splitter; 25 import std.array; 26 import std.conv; 27 import std.digest.md; 28 import std.exception; 29 import std.range; 30 import std.string; 31 import std.traits : hasIndirections; 32 import std.typecons; 33 34 import core.time; 35 36 private struct _MongoErrorDescription 37 { 38 string message; 39 int code; 40 int connectionId; 41 int n; 42 double ok; 43 } 44 45 /** 46 * D POD representation of Mongo error object. 47 * 48 * For successful queries "code" is negative. 49 * Can be used also to check how many documents where updated upon 50 * a successful query via "n" field. 51 */ 52 alias MongoErrorDescription = immutable(_MongoErrorDescription); 53 54 /** 55 * Root class for vibe.d Mongo driver exception hierarchy. 56 */ 57 class MongoException : Exception 58 { 59 @safe: 60 61 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 62 { 63 super(message, file, line, next); 64 } 65 } 66 67 /** 68 * Generic class for all exception related to unhandled driver problems. 69 * 70 * I.e.: protocol mismatch or unexpected mongo service behavior. 71 */ 72 class MongoDriverException : MongoException 73 { 74 @safe: 75 76 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 77 { 78 super(message, file, line, next); 79 } 80 } 81 82 /** 83 * Wrapper class for all inner mongo collection manipulation errors. 84 * 85 * It does not indicate problem with vibe.d driver itself. Most frequently this 86 * one is thrown when MongoConnection is in checked mode and getLastError() has something interesting. 87 */ 88 deprecated("Check for MongoException instead - the modern write commands now throw MongoBulkWriteException on error") 89 class MongoDBException : MongoException 90 { 91 @safe: 92 93 MongoErrorDescription description; 94 95 this(MongoErrorDescription description, string file = __FILE__, 96 size_t line = __LINE__, Throwable next = null) 97 { 98 super(description.message, file, line, next); 99 this.description = description; 100 } 101 102 // NOTE: .message is a @future member of Throwable 103 deprecated("Use .msg instead.") alias message = msg; 104 @property int code() const nothrow { return description.code; }; 105 @property int connectionId() const nothrow { return description.connectionId; }; 106 @property int n() const nothrow { return description.n; }; 107 @property double ok() const nothrow { return description.ok; }; 108 } 109 110 /** 111 * Generic class for all exceptions related to authentication problems. 112 * 113 * I.e.: unsupported mechanisms or wrong credentials. 114 */ 115 class MongoAuthException : MongoException 116 { 117 @safe: 118 119 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 120 { 121 super(message, file, line, next); 122 } 123 } 124 125 /** 126 [internal] Provides low-level mongodb protocol access. 127 128 It is not intended for direct usage. Please use vibe.db.mongo.db and vibe.db.mongo.collection modules for your code. 129 Note that a MongoConnection may only be used from one fiber/thread at a time. 130 */ 131 final class MongoConnection { 132 @safe: 133 134 import vibe.stream.wrapper /* : StreamOutputRange, streamOutputRange */; 135 import vibe.internal.interfaceproxy; 136 import vibe.core.stream : InputStream, Stream; 137 138 private { 139 MongoClientSettings m_settings; 140 TCPConnection m_conn; 141 InterfaceProxy!Stream m_stream; 142 ulong m_bytesRead; 143 int m_msgid = 1; 144 StreamOutputRange!(InterfaceProxy!Stream) m_outRange; 145 ServerDescription m_description; 146 /// Flag to prevent recursive connections when server closes connection while connecting 147 bool m_allowReconnect; 148 bool m_isAuthenticating; 149 bool m_supportsOpMsg; 150 } 151 152 enum ushort defaultPort = MongoClientSettings.defaultPort; 153 154 /// Simplified constructor overload, with no m_settings 155 this(string server, ushort port = defaultPort) 156 { 157 m_settings = new MongoClientSettings(); 158 m_settings.hosts ~= MongoHost(server, port); 159 } 160 161 this(MongoClientSettings cfg) 162 { 163 m_settings = cfg; 164 165 // Now let's check for features that are not yet supported. 166 if(m_settings.hosts.length > 1) 167 logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s", 168 m_settings.hosts[0].name, m_settings.hosts[0].port); 169 } 170 171 void connect() 172 { 173 bool isTLS; 174 175 /* 176 * TODO: Connect to one of the specified hosts taking into consideration 177 * options such as connect timeouts and so on. 178 */ 179 try { 180 import core.time : Duration, msecs; 181 182 auto connectTimeout = m_settings.connectTimeoutMS.msecs; 183 if (m_settings.connectTimeoutMS == 0) 184 connectTimeout = Duration.max; 185 186 m_conn = connectTCP(m_settings.hosts[0].name, m_settings.hosts[0].port, null, 0, connectTimeout); 187 m_conn.tcpNoDelay = true; 188 if (m_settings.socketTimeout != Duration.zero) 189 m_conn.readTimeout = m_settings.socketTimeout; 190 if (m_settings.ssl) { 191 auto ctx = createTLSContext(TLSContextKind.client); 192 if (!m_settings.sslverifycertificate) { 193 ctx.peerValidationMode = TLSPeerValidationMode.none; 194 } 195 if (m_settings.sslPEMKeyFile) { 196 ctx.useCertificateChainFile(m_settings.sslPEMKeyFile); 197 ctx.usePrivateKeyFile(m_settings.sslPEMKeyFile); 198 } 199 if (m_settings.sslCAFile) { 200 ctx.useTrustedCertificateFile(m_settings.sslCAFile); 201 } 202 203 m_stream = createTLSStream(m_conn, ctx, m_settings.hosts[0].name); 204 isTLS = true; 205 } 206 else { 207 m_stream = m_conn; 208 } 209 m_outRange = streamOutputRange(m_stream); 210 } 211 catch (Exception e) { 212 throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e); 213 } 214 215 scope (failure) disconnect(); 216 217 m_allowReconnect = false; 218 scope (exit) 219 m_allowReconnect = true; 220 221 Bson handshake = Bson.emptyObject; 222 static assert(!is(typeof(m_settings.loadBalanced)), "loadBalanced was added to the API, set legacy if it's true here!"); 223 // TODO: must use legacy handshake if m_settings.loadBalanced is true 224 // and also once we allow configuring a server API version in the driver 225 // (https://github.com/mongodb/specifications/blob/master/source/versioned-api/versioned-api.rst) 226 m_supportsOpMsg = false; 227 bool legacyHandshake = false; 228 if (legacyHandshake) 229 { 230 handshake["isMaster"] = Bson(1); 231 handshake["helloOk"] = Bson(1); 232 } 233 else 234 { 235 handshake["hello"] = Bson(1); 236 m_supportsOpMsg = true; 237 } 238 239 import os = std.system; 240 import compiler = std.compiler; 241 string platform = compiler.name ~ " " 242 ~ compiler.version_major.to!string ~ "." ~ compiler.version_minor.to!string; 243 // TODO: add support for os.version 244 245 handshake["client"] = Bson([ 246 "driver": Bson(["name": Bson("vibe.db.mongo"), "version": Bson(vibeVersionString)]), 247 "os": Bson(["type": Bson(os.os.to!string), "architecture": Bson(hostArchitecture)]), 248 "platform": Bson(platform) 249 ]); 250 251 if (m_settings.appName.length) { 252 enforce!MongoAuthException(m_settings.appName.length <= 128, 253 "The application name may not be larger than 128 bytes"); 254 handshake["client"]["application"] = Bson(["name": Bson(m_settings.appName)]); 255 } 256 257 auto reply = runCommand!(Bson, MongoAuthException)("admin", handshake); 258 m_description = deserializeBson!ServerDescription(reply); 259 260 if (m_description.satisfiesVersion(WireVersion.v36)) 261 m_supportsOpMsg = true; 262 263 m_bytesRead = 0; 264 auto authMechanism = m_settings.authMechanism; 265 if (authMechanism == MongoAuthMechanism.none) 266 { 267 if (m_settings.sslPEMKeyFile != null && m_description.satisfiesVersion(WireVersion.v26)) 268 { 269 authMechanism = MongoAuthMechanism.mongoDBX509; 270 } 271 else if (m_settings.digest.length) 272 { 273 // SCRAM-SHA-1 default since 3.0, otherwise use legacy authentication 274 if (m_description.satisfiesVersion(WireVersion.v30)) 275 authMechanism = MongoAuthMechanism.scramSHA1; 276 else 277 authMechanism = MongoAuthMechanism.mongoDBCR; 278 } 279 } 280 281 if (authMechanism == MongoAuthMechanism.mongoDBCR && m_description.satisfiesVersion(WireVersion.v40)) 282 throw new MongoAuthException("Trying to force MONGODB-CR authentication on a >=4.0 server not supported"); 283 284 if (authMechanism == MongoAuthMechanism.scramSHA1 && !m_description.satisfiesVersion(WireVersion.v30)) 285 throw new MongoAuthException("Trying to force SCRAM-SHA-1 authentication on a <3.0 server not supported"); 286 287 if (authMechanism == MongoAuthMechanism.mongoDBX509 && !m_description.satisfiesVersion(WireVersion.v26)) 288 throw new MongoAuthException("Trying to force MONGODB-X509 authentication on a <2.6 server not supported"); 289 290 if (authMechanism == MongoAuthMechanism.mongoDBX509 && !isTLS) 291 throw new MongoAuthException("Trying to force MONGODB-X509 authentication, but didn't use ssl!"); 292 293 m_isAuthenticating = true; 294 scope (exit) 295 m_isAuthenticating = false; 296 final switch (authMechanism) 297 { 298 case MongoAuthMechanism.none: 299 break; 300 case MongoAuthMechanism.mongoDBX509: 301 certAuthenticate(); 302 break; 303 case MongoAuthMechanism.scramSHA1: 304 scramAuthenticate(); 305 break; 306 case MongoAuthMechanism.mongoDBCR: 307 authenticate(); 308 break; 309 } 310 } 311 312 void disconnect() 313 { 314 if (m_conn) { 315 if (m_stream && m_conn.connected) { 316 m_outRange.flush(); 317 318 m_stream.finalize(); 319 m_stream = InterfaceProxy!Stream.init; 320 } 321 322 m_conn.close(); 323 m_conn = TCPConnection.init; 324 } 325 326 m_outRange.drop(); 327 } 328 329 @property bool connected() const { return m_conn && m_conn.connected; } 330 331 @property const(ServerDescription) description() const { return m_description; } 332 333 deprecated("Non-functional since MongoDB 5.1") void update(string collection_name, UpdateFlags flags, Bson selector, Bson update) 334 { 335 scope(failure) disconnect(); 336 send(OpCode.Update, -1, cast(int)0, collection_name, cast(int)flags, selector, update); 337 if (m_settings.safe) checkForError(collection_name); 338 } 339 340 deprecated("Non-functional since MongoDB 5.1") void insert(string collection_name, InsertFlags flags, Bson[] documents) 341 { 342 scope(failure) disconnect(); 343 foreach (d; documents) if (d["_id"].isNull()) d["_id"] = Bson(BsonObjectID.generate()); 344 send(OpCode.Insert, -1, cast(int)flags, collection_name, documents); 345 if (m_settings.safe) checkForError(collection_name); 346 } 347 348 deprecated("Non-functional since MongoDB 5.1: use `find` to query collections instead - instead of `$cmd` use `runCommand` to send commands - use listIndexes and listCollections instead of `<database>.system.indexes` and `<database>.system.namsepsaces`") 349 void query(T)(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 350 { 351 scope(failure) disconnect(); 352 flags |= m_settings.defQueryFlags; 353 int id; 354 if (returnFieldSelector.isNull) 355 id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query); 356 else 357 id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query, returnFieldSelector); 358 recvReply!T(id, on_msg, on_doc); 359 } 360 361 /** 362 Runs the given Bson command (Bson object with the first entry in the map 363 being the command name) on the given database. 364 365 Using `runCommand` checks that the command completed successfully by 366 checking that `result["ok"].get!double == 1.0`. Throws the 367 `CommandFailException` on failure. 368 369 Using `runCommandUnchecked` will return the result as-is. Developers may 370 check the `result["ok"]` value themselves. (It's a double that needs to 371 be compared with 1.0 by default) 372 373 Throws: 374 - `CommandFailException` (template argument) only in the 375 `runCommand` overload, when the command response is not ok. 376 - `MongoDriverException` when internal protocol errors occur. 377 */ 378 Bson runCommand(T, CommandFailException = MongoDriverException)( 379 string database, 380 Bson command, 381 string errorInfo = __FUNCTION__, 382 string errorFile = __FILE__, 383 size_t errorLine = __LINE__ 384 ) 385 in(database.length, "runCommand requires a database argument") 386 { 387 return runCommandImpl!(T, CommandFailException)( 388 database, command, true, errorInfo, errorFile, errorLine); 389 } 390 391 Bson runCommandUnchecked(T, CommandFailException = MongoDriverException)( 392 string database, 393 Bson command, 394 string errorInfo = __FUNCTION__, 395 string errorFile = __FILE__, 396 size_t errorLine = __LINE__ 397 ) 398 in(database.length, "runCommand requires a database argument") 399 { 400 return runCommandImpl!(T, CommandFailException)( 401 database, command, false, errorInfo, errorFile, errorLine); 402 } 403 404 private Bson runCommandImpl(T, CommandFailException)( 405 string database, 406 Bson command, 407 bool testOk = true, 408 string errorInfo = __FUNCTION__, 409 string errorFile = __FILE__, 410 size_t errorLine = __LINE__ 411 ) 412 in(database.length, "runCommand requires a database argument") 413 { 414 import std.array; 415 416 string formatErrorInfo(string msg) @safe 417 { 418 return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")"); 419 } 420 421 Bson ret; 422 423 if (m_supportsOpMsg) 424 { 425 debug (VibeVerboseMongo) 426 logDiagnostic("runCommand: [db=%s] %s", database, command); 427 428 command["$db"] = Bson(database); 429 430 auto id = sendMsg(-1, 0, command); 431 Appender!(Bson[])[string] docs; 432 recvMsg!true(id, (flags, root) @safe { 433 ret = root; 434 }, (scope ident, size) @safe { 435 docs[ident.idup] = appender!(Bson[]); 436 }, (scope ident, push) @safe { 437 auto pd = ident in docs; 438 assert(!!pd, "Received data for unexpected identifier"); 439 pd.put(push); 440 }); 441 442 foreach (ident, app; docs) 443 ret[ident] = Bson(app.data); 444 } 445 else 446 { 447 debug (VibeVerboseMongo) 448 logDiagnostic("runCommand(legacy): [db=%s] %s", database, command); 449 auto id = send(OpCode.Query, -1, 0, database ~ ".$cmd", 0, -1, command, Bson(null)); 450 recvReply!T(id, 451 (cursor, flags, first_doc, num_docs) { 452 logTrace("runCommand(%s) flags: %s, cursor: %s, documents: %s", database, flags, cursor, num_docs); 453 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), formatErrorInfo("command query failed")); 454 enforce!MongoDriverException(num_docs == 1, formatErrorInfo("received more than one document in command response")); 455 }, 456 (idx, ref doc) { 457 ret = doc; 458 }); 459 } 460 461 if (testOk && ret["ok"].get!double != 1.0) 462 throw new CommandFailException(formatErrorInfo("command failed: " 463 ~ ret["errmsg"].opt!string("(no message)"))); 464 465 static if (is(T == Bson)) return ret; 466 else { 467 T doc = deserializeBson!T(bson); 468 return doc; 469 } 470 } 471 472 template getMore(T) 473 { 474 deprecated("use the modern overload instead") 475 void getMore(string collection_name, int nret, long cursor_id, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 476 { 477 scope(failure) disconnect(); 478 auto parts = collection_name.findSplit("."); 479 auto id = send(OpCode.GetMore, -1, cast(int)0, parts[0], parts[2], nret, cursor_id); 480 recvReply!T(id, on_msg, on_doc); 481 } 482 483 /** 484 * Modern (MongoDB 3.2+ compatible) getMore implementation using the getMore 485 * command and OP_MSG. (if supported) 486 * 487 * Falls back to compatibility for older MongoDB versions, but those are not 488 * officially supported anymore. 489 * 490 * Upgrade_notes: 491 * - error checking is now done inside this function 492 * - document index is no longer sent, instead the callback is called sequentially 493 * 494 * Throws: $(LREF MongoDriverException) in case the command fails. 495 */ 496 void getMore(long cursor_id, string database, string collection_name, long nret, 497 scope GetMoreHeaderDelegate on_header, 498 scope GetMoreDocumentDelegate!T on_doc, 499 Duration timeout = Duration.max, 500 string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__) 501 { 502 Bson command = Bson.emptyObject; 503 command["getMore"] = Bson(cursor_id); 504 command["$db"] = Bson(database); 505 command["collection"] = Bson(collection_name); 506 if (nret > 0) 507 command["batchSize"] = Bson(nret); 508 if (timeout != Duration.max && timeout.total!"msecs" < int.max) 509 command["maxTimeMS"] = Bson(cast(int)timeout.total!"msecs"); 510 511 string formatErrorInfo(string msg) @safe 512 { 513 return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")"); 514 } 515 516 scope (failure) disconnect(); 517 518 if (m_supportsOpMsg) 519 { 520 startFind!T(command, on_header, on_doc, "nextBatch", errorInfo ~ " (getMore)", errorFile, errorLine); 521 } 522 else 523 { 524 debug (VibeVerboseMongo) 525 logDiagnostic("getMore(legacy): [db=%s] collection=%s, cursor=%s, nret=%s", database, collection_name, cursor_id, nret); 526 527 int brokenId = 0; 528 int nextId = 0; 529 int num_docs; 530 // array to store out-of-order items, to push them into the callback properly 531 T[] compatibilitySort; 532 string full_name = database ~ '.' ~ collection_name; 533 auto id = send(OpCode.GetMore, -1, cast(int)0, full_name, nret, cursor_id); 534 recvReply!T(id, (long cursor, ReplyFlags flags, int first_doc, int num_docs) 535 { 536 enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), 537 formatErrorInfo("Invalid cursor handle.")); 538 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), 539 formatErrorInfo("Query failed. Does the database exist?")); 540 541 on_header(cursor, full_name, num_docs); 542 }, (size_t idx, ref T doc) { 543 if (cast(int)idx == nextId) { 544 on_doc(doc); 545 nextId++; 546 brokenId = nextId; 547 } else { 548 enforce!MongoDriverException(idx >= brokenId, 549 formatErrorInfo("Got legacy document with same id after having already processed it!")); 550 enforce!MongoDriverException(idx < num_docs, 551 formatErrorInfo("Received more documents than the database reported to us")); 552 553 size_t arrayIndex = cast(int)idx - brokenId; 554 if (!compatibilitySort.length) 555 compatibilitySort.length = num_docs - brokenId; 556 compatibilitySort[arrayIndex] = doc; 557 } 558 }); 559 560 foreach (doc; compatibilitySort) 561 on_doc(doc); 562 } 563 } 564 } 565 566 /// Forwards the `find` command passed in to the database, handles the 567 /// callbacks like with getMore. This exists for easier integration with 568 /// MongoCursor!T. 569 package void startFind(T)(Bson command, 570 scope GetMoreHeaderDelegate on_header, 571 scope GetMoreDocumentDelegate!T on_doc, 572 string batchKey = "firstBatch", 573 string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__) 574 { 575 string formatErrorInfo(string msg) @safe 576 { 577 return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")"); 578 } 579 580 scope (failure) disconnect(); 581 582 enforce!MongoDriverException(m_supportsOpMsg, formatErrorInfo("Database does not support required OP_MSG for new style queries")); 583 584 enum needsDup = hasIndirections!T || is(T == Bson); 585 586 debug (VibeVerboseMongo) 587 logDiagnostic("%s: %s", errorInfo, command); 588 589 auto id = sendMsg(-1, 0, command); 590 recvMsg!needsDup(id, (flags, scope root) @safe { 591 if (root["ok"].get!double != 1.0) 592 throw new MongoDriverException(formatErrorInfo("error response: " 593 ~ root["errmsg"].opt!string("(no message)"))); 594 595 auto cursor = root["cursor"]; 596 if (cursor.type == Bson.Type.null_) 597 throw new MongoDriverException(formatErrorInfo("no cursor in response: " 598 ~ root["errmsg"].opt!string("(no error message)"))); 599 auto batch = cursor[batchKey].get!(Bson[]); 600 on_header(cursor["id"].get!long, cursor["ns"].get!string, batch.length); 601 602 foreach (ref push; batch) 603 { 604 T doc = deserializeBson!T(push); 605 on_doc(doc); 606 } 607 }, (scope ident, size) @safe {}, (scope ident, scope push) @safe { 608 throw new MongoDriverException(formatErrorInfo("unexpected section type 1 in response")); 609 }); 610 } 611 612 deprecated("Non-functional since MongoDB 5.1") void delete_(string collection_name, DeleteFlags flags, Bson selector) 613 { 614 scope(failure) disconnect(); 615 send(OpCode.Delete, -1, cast(int)0, collection_name, cast(int)flags, selector); 616 if (m_settings.safe) checkForError(collection_name); 617 } 618 619 deprecated("Non-functional since MongoDB 5.1, use the overload taking the collection as well") 620 void killCursors(scope long[] cursors) 621 { 622 scope(failure) disconnect(); 623 send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors); 624 } 625 626 void killCursors(string collection, scope long[] cursors) 627 { 628 scope(failure) disconnect(); 629 // TODO: could add special case to runCommand to not return anything 630 if (m_supportsOpMsg) 631 { 632 Bson command = Bson.emptyObject; 633 auto parts = collection.findSplit("."); 634 if (!parts[2].length) 635 throw new MongoDriverException( 636 "Attempted to call killCursors with non-fully-qualified collection name: '" 637 ~ collection ~ "'"); 638 command["killCursors"] = Bson(parts[2]); 639 command["cursors"] = () @trusted { return cursors; } ().serializeToBson; // NOTE: "escaping" scope here 640 runCommand!Bson(parts[0], command); 641 } 642 else 643 { 644 send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors); 645 } 646 } 647 648 MongoErrorDescription getLastError(string db) 649 { 650 // Though higher level abstraction level by concept, this function 651 // is implemented here to allow to check errors upon every request 652 // on connection level. 653 654 Bson command_and_options = Bson.emptyObject; 655 command_and_options["getLastError"] = Bson(1.0); 656 657 if(m_settings.w != m_settings.w.init) 658 command_and_options["w"] = m_settings.w; // Already a Bson struct 659 if(m_settings.wTimeoutMS != m_settings.wTimeoutMS.init) 660 command_and_options["wtimeout"] = Bson(m_settings.wTimeoutMS); 661 if(m_settings.journal) 662 command_and_options["j"] = Bson(true); 663 if(m_settings.fsync) 664 command_and_options["fsync"] = Bson(true); 665 666 _MongoErrorDescription ret; 667 668 auto error = runCommandUnchecked!Bson(db, command_and_options); 669 670 try { 671 ret = MongoErrorDescription( 672 error["errmsg"].opt!string(error["err"].opt!string("")), 673 error["code"].opt!int(-1), 674 error["connectionId"].opt!int(-1), 675 error["n"].opt!int(-1), 676 error["ok"].get!double() 677 ); 678 } catch (Exception e) { 679 throw new MongoDriverException(e.msg); 680 } 681 682 return ret; 683 } 684 685 /** Queries the server for all databases. 686 687 Returns: 688 An input range of $(D MongoDBInfo) values. 689 */ 690 auto listDatabases() 691 { 692 string cn = m_settings.database == string.init ? "admin" : m_settings.database; 693 694 auto cmd = Bson(["listDatabases":Bson(1)]); 695 696 static MongoDBInfo toInfo(const(Bson) db_doc) { 697 return MongoDBInfo( 698 db_doc["name"].get!string, 699 // double on MongoDB < 5.0, long afterwards 700 db_doc["sizeOnDisk"].to!double, 701 db_doc["empty"].get!bool 702 ); 703 } 704 705 auto result = runCommand!Bson(cn, cmd)["databases"]; 706 707 return result.byValue.map!toInfo; 708 } 709 710 private int recvMsg(bool dupBson = true)(int reqid, 711 scope MsgReplyDelegate!dupBson on_sec0, 712 scope MsgSection1StartDelegate on_sec1_start, 713 scope MsgSection1Delegate!dupBson on_sec1_doc) 714 { 715 import std.traits; 716 717 auto bytes_read = m_bytesRead; 718 int msglen = recvInt(); 719 int resid = recvInt(); 720 int respto = recvInt(); 721 int opcode = recvInt(); 722 723 enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!"); 724 enforce!MongoDriverException(opcode == OpCode.Msg, "Got wrong reply type! (must be OP_MSG)"); 725 726 uint flagBits = recvUInt(); 727 const bool hasCRC = (flagBits & (1 << 16)) != 0; 728 729 int sectionLength = cast(int)(msglen - 4 * int.sizeof - flagBits.sizeof); 730 if (hasCRC) 731 sectionLength -= uint.sizeof; // CRC present 732 733 bool gotSec0; 734 while (m_bytesRead - bytes_read < sectionLength) { 735 // TODO: directly deserialize from the wire 736 static if (!dupBson) { 737 ubyte[256] buf = void; 738 ubyte[] bufsl = buf; 739 } 740 741 ubyte payloadType = recvUByte(); 742 switch (payloadType) { 743 case 0: 744 gotSec0 = true; 745 static if (dupBson) 746 auto data = recvBsonDup(); 747 else 748 scope data = (() @trusted => recvBson(bufsl))(); 749 750 debug (VibeVerboseMongo) 751 logDiagnostic("recvData: sec0[flags=%x]: %s", flagBits, data); 752 on_sec0(flagBits, data); 753 break; 754 case 1: 755 if (!gotSec0) 756 throw new MongoDriverException("Got OP_MSG section 1 before section 0, which is not supported by vibe.d"); 757 758 auto section_bytes_read = m_bytesRead; 759 int size = recvInt(); 760 auto identifier = recvCString(); 761 on_sec1_start(identifier, size); 762 while (m_bytesRead - section_bytes_read < size) { 763 static if (dupBson) 764 auto data = recvBsonDup(); 765 else 766 scope data = (() @trusted => recvBson(bufsl))(); 767 768 debug (VibeVerboseMongo) 769 logDiagnostic("recvData: sec1[%s]: %s", identifier, data); 770 771 on_sec1_doc(identifier, data); 772 } 773 break; 774 default: 775 throw new MongoDriverException("Received unexpected payload section type " ~ payloadType.to!string); 776 } 777 } 778 779 if (hasCRC) 780 { 781 uint crc = recvUInt(); 782 // TODO: validate CRC 783 logDiagnostic("recvData: crc=%s (discarded)", crc); 784 } 785 786 assert(bytes_read + msglen == m_bytesRead, 787 format!"Packet size mismatch! Expected %s bytes, but read %s."( 788 msglen, m_bytesRead - bytes_read)); 789 790 return resid; 791 } 792 793 private int recvReply(T)(int reqid, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 794 { 795 auto bytes_read = m_bytesRead; 796 int msglen = recvInt(); 797 int resid = recvInt(); 798 int respto = recvInt(); 799 int opcode = recvInt(); 800 801 enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!"); 802 enforce!MongoDriverException(opcode == OpCode.Reply, "Got a non-'Reply' reply!"); 803 804 auto flags = cast(ReplyFlags)recvInt(); 805 long cursor = recvLong(); 806 int start = recvInt(); 807 int numret = recvInt(); 808 809 scope (exit) { 810 if (m_bytesRead - bytes_read < msglen) { 811 logWarn("MongoDB reply was longer than expected, skipping the rest: %d vs. %d", msglen, m_bytesRead - bytes_read); 812 ubyte[] dst = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)]; 813 recv(dst); 814 } else if (m_bytesRead - bytes_read > msglen) { 815 logWarn("MongoDB reply was shorter than expected. Dropping connection."); 816 disconnect(); 817 throw new MongoDriverException("MongoDB reply was too short for data."); 818 } 819 } 820 821 on_msg(cursor, flags, start, numret); 822 static if (hasIndirections!T || is(T == Bson)) 823 auto buf = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)]; 824 foreach (i; 0 .. cast(size_t)numret) { 825 // TODO: directly deserialize from the wire 826 static if (!hasIndirections!T && !is(T == Bson)) { 827 ubyte[256] buf = void; 828 ubyte[] bufsl = buf; 829 auto bson = () @trusted { return recvBson(bufsl); } (); 830 } else { 831 auto bson = () @trusted { return recvBson(buf); } (); 832 } 833 834 // logDebugV("Received mongo response on %s:%s: %s", reqid, i, bson); 835 836 static if (is(T == Bson)) on_doc(i, bson); 837 else { 838 T doc = deserializeBson!T(bson); 839 on_doc(i, doc); 840 } 841 } 842 843 return resid; 844 } 845 846 private int send(ARGS...)(OpCode code, int response_to, scope ARGS args) 847 { 848 if( !connected() ) { 849 if (m_allowReconnect) connect(); 850 else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating"); 851 else throw new MongoDriverException("Connection got closed while connecting"); 852 } 853 int id = nextMessageId(); 854 // sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types 855 sendValue!int(16 + sendLength(args)); 856 sendValue!int(id); 857 sendValue!int(response_to); 858 sendValue!int(cast(int)code); 859 foreach (a; args) sendValue(a); 860 m_outRange.flush(); 861 // logDebugV("Sent mongo opcode %s (id %s) in response to %s with args %s", code, id, response_to, tuple(args)); 862 return id; 863 } 864 865 private int sendMsg(int response_to, uint flagBits, Bson document) 866 { 867 if( !connected() ) { 868 if (m_allowReconnect) connect(); 869 else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating"); 870 else throw new MongoDriverException("Connection got closed while connecting"); 871 } 872 int id = nextMessageId(); 873 // sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types 874 sendValue!int(21 + sendLength(document)); 875 sendValue!int(id); 876 sendValue!int(response_to); 877 sendValue!int(cast(int)OpCode.Msg); 878 sendValue!uint(flagBits); 879 const bool hasCRC = (flagBits & (1 << 16)) != 0; 880 assert(!hasCRC, "sending with CRC bits not yet implemented"); 881 sendValue!ubyte(0); 882 sendValue(document); 883 m_outRange.flush(); 884 return id; 885 } 886 887 private void sendValue(T)(scope T value) 888 { 889 import std.traits; 890 static if (is(T == ubyte)) m_outRange.put(value); 891 else static if (is(T == int) || is(T == uint)) sendBytes(toBsonData(value)); 892 else static if (is(T == long)) sendBytes(toBsonData(value)); 893 else static if (is(T == Bson)) sendBytes(() @trusted { return value.data; } ()); 894 else static if (is(T == string)) { 895 sendBytes(cast(const(ubyte)[])value); 896 sendBytes(cast(const(ubyte)[])"\0"); 897 } else static if (isArray!T) { 898 foreach (v; value) 899 sendValue(v); 900 } else static assert(false, "Unexpected type: "~T.stringof); 901 } 902 903 private void sendBytes(scope const(ubyte)[] data){ m_outRange.put(data); } 904 905 private T recvInteger(T)() { ubyte[T.sizeof] ret; recv(ret); return fromBsonData!T(ret); } 906 private alias recvUByte = recvInteger!ubyte; 907 private alias recvInt = recvInteger!int; 908 private alias recvUInt = recvInteger!uint; 909 private alias recvLong = recvInteger!long; 910 private Bson recvBson(ref ubyte[] buf) 911 @system { 912 int len = recvInt(); 913 ubyte[] dst; 914 if (len > buf.length) dst = new ubyte[len]; 915 else { 916 dst = buf[0 .. len]; 917 buf = buf[len .. $]; 918 } 919 dst[0 .. 4] = toBsonData(len)[]; 920 recv(dst[4 .. $]); 921 return Bson(Bson.Type.object, cast(immutable)dst); 922 } 923 private Bson recvBsonDup() 924 @trusted { 925 ubyte[4] size; 926 recv(size[]); 927 ubyte[] dst = new ubyte[fromBsonData!uint(size)]; 928 dst[0 .. 4] = size; 929 recv(dst[4 .. $]); 930 return Bson(Bson.Type.object, cast(immutable)dst); 931 } 932 private void recv(scope ubyte[] dst) { enforce(m_stream); m_stream.read(dst); m_bytesRead += dst.length; } 933 private const(char)[] recvCString() 934 { 935 auto buf = new ubyte[32]; 936 ptrdiff_t i = -1; 937 do 938 { 939 i++; 940 if (i == buf.length) buf.length *= 2; 941 recv(buf[i .. i + 1]); 942 } while (buf[i] != 0); 943 return cast(const(char)[])buf[0 .. i]; 944 } 945 946 private int nextMessageId() { return m_msgid++; } 947 948 deprecated private void checkForError(string collection_name) 949 { 950 auto coll = collection_name.split(".")[0]; 951 auto err = getLastError(coll); 952 953 enforce( 954 err.code < 0, 955 new MongoDBException(err) 956 ); 957 } 958 959 private void certAuthenticate() 960 { 961 Bson cmd = Bson.emptyObject; 962 cmd["authenticate"] = Bson(1); 963 cmd["mechanism"] = Bson("MONGODB-X509"); 964 if (m_description.satisfiesVersion(WireVersion.v34)) 965 { 966 if (m_settings.username.length) 967 cmd["user"] = Bson(m_settings.username); 968 } 969 else 970 { 971 if (!m_settings.username.length) 972 throw new MongoAuthException("No username provided but connected to MongoDB server <=3.2 not supporting this"); 973 974 cmd["user"] = Bson(m_settings.username); 975 } 976 runCommand!(Bson, MongoAuthException)(m_settings.getAuthDatabase, cmd); 977 } 978 979 private void authenticate() 980 { 981 scope (failure) disconnect(); 982 983 string cn = m_settings.getAuthDatabase; 984 985 auto cmd = Bson(["getnonce": Bson(1)]); 986 auto result = runCommand!(Bson, MongoAuthException)(cn, cmd); 987 string nonce = result["nonce"].get!string; 988 string key = toLower(toHexString(md5Of(nonce ~ m_settings.username ~ m_settings.digest)).idup); 989 990 cmd = Bson.emptyObject; 991 cmd["authenticate"] = Bson(1); 992 cmd["mechanism"] = Bson("MONGODB-CR"); 993 cmd["nonce"] = Bson(nonce); 994 cmd["user"] = Bson(m_settings.username); 995 cmd["key"] = Bson(key); 996 runCommand!(Bson, MongoAuthException)(cn, cmd); 997 } 998 999 private void scramAuthenticate() 1000 { 1001 import vibe.db.mongo.sasl; 1002 1003 string cn = m_settings.getAuthDatabase; 1004 1005 ScramState state; 1006 string payload = state.createInitialRequest(m_settings.username); 1007 1008 auto cmd = Bson.emptyObject; 1009 cmd["saslStart"] = Bson(1); 1010 cmd["mechanism"] = Bson("SCRAM-SHA-1"); 1011 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 1012 1013 auto doc = runCommand!(Bson, MongoAuthException)(cn, cmd); 1014 string response = cast(string)doc["payload"].get!BsonBinData().rawData; 1015 Bson conversationId = doc["conversationId"]; 1016 1017 payload = state.update(m_settings.digest, response); 1018 cmd = Bson.emptyObject; 1019 cmd["saslContinue"] = Bson(1); 1020 cmd["conversationId"] = conversationId; 1021 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 1022 1023 doc = runCommand!(Bson, MongoAuthException)(cn, cmd); 1024 response = cast(string)doc["payload"].get!BsonBinData().rawData; 1025 1026 payload = state.finalize(response); 1027 cmd = Bson.emptyObject; 1028 cmd["saslContinue"] = Bson(1); 1029 cmd["conversationId"] = conversationId; 1030 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 1031 runCommand!(Bson, MongoAuthException)(cn, cmd); 1032 } 1033 } 1034 1035 private enum OpCode : int { 1036 Reply = 1, // sent only by DB 1037 Update = 2001, 1038 Insert = 2002, 1039 Reserved1 = 2003, 1040 Query = 2004, 1041 GetMore = 2005, 1042 Delete = 2006, 1043 KillCursors = 2007, 1044 1045 Compressed = 2012, 1046 Msg = 2013, 1047 } 1048 1049 private alias ReplyDelegate = void delegate(long cursor, ReplyFlags flags, int first_doc, int num_docs) @safe; 1050 private template DocDelegate(T) { alias DocDelegate = void delegate(size_t idx, ref T doc) @safe; } 1051 1052 private alias MsgReplyDelegate(bool dupBson : true) = void delegate(uint flags, Bson document) @safe; 1053 private alias MsgReplyDelegate(bool dupBson : false) = void delegate(uint flags, scope Bson document) @safe; 1054 private alias MsgSection1StartDelegate = void delegate(scope const(char)[] identifier, int size) @safe; 1055 private alias MsgSection1Delegate(bool dupBson : true) = void delegate(scope const(char)[] identifier, Bson document) @safe; 1056 private alias MsgSection1Delegate(bool dupBson : false) = void delegate(scope const(char)[] identifier, scope Bson document) @safe; 1057 1058 alias GetMoreHeaderDelegate = void delegate(long id, string ns, size_t count) @safe; 1059 alias GetMoreDocumentDelegate(T) = void delegate(ref T document) @safe; 1060 1061 struct MongoDBInfo 1062 { 1063 string name; 1064 double sizeOnDisk; 1065 bool empty; 1066 } 1067 1068 private int sendLength(ARGS...)(scope ARGS args) 1069 { 1070 import std.traits; 1071 static if (ARGS.length == 1) { 1072 alias T = ARGS[0]; 1073 static if (is(T == string)) return cast(int)args[0].length + 1; 1074 else static if (is(T == int)) return 4; 1075 else static if (is(T == long)) return 8; 1076 else static if (is(T == Bson)) return cast(int)() @trusted { return args[0].data.length; } (); 1077 else static if (isArray!T) { 1078 int ret = 0; 1079 foreach (el; args[0]) ret += sendLength(el); 1080 return ret; 1081 } else static assert(false, "Unexpected type: "~T.stringof); 1082 } 1083 else if (ARGS.length == 0) return 0; 1084 else return sendLength(args[0 .. $/2]) + sendLength(args[$/2 .. $]); 1085 } 1086 1087 struct ServerDescription 1088 { 1089 enum ServerType 1090 { 1091 unknown, 1092 standalone, 1093 mongos, 1094 possiblePrimary, 1095 RSPrimary, 1096 RSSecondary, 1097 RSArbiter, 1098 RSOther, 1099 RSGhost 1100 } 1101 1102 @optional: 1103 string address; 1104 string error; 1105 float roundTripTime = 0; 1106 Nullable!BsonDate lastWriteDate; 1107 Nullable!BsonObjectID opTime; 1108 ServerType type = ServerType.unknown; 1109 int minWireVersion, maxWireVersion; 1110 string me; 1111 string[] hosts, passives, arbiters; 1112 string[string] tags; 1113 string setName; 1114 Nullable!int setVersion; 1115 Nullable!BsonObjectID electionId; 1116 string primary; 1117 string lastUpdateTime = "infinity ago"; 1118 Nullable!int logicalSessionTimeoutMinutes; 1119 1120 bool satisfiesVersion(WireVersion wireVersion) @safe const @nogc pure nothrow 1121 { 1122 return maxWireVersion >= wireVersion; 1123 } 1124 } 1125 1126 enum WireVersion : int 1127 { 1128 old = 0, 1129 v26 = 1, 1130 v26_2 = 2, 1131 v30 = 3, 1132 v32 = 4, 1133 v34 = 5, 1134 v36 = 6, 1135 v40 = 7, 1136 v42 = 8, 1137 v44 = 9, 1138 v49 = 12, 1139 v50 = 13, 1140 v51 = 14, 1141 v52 = 15, 1142 v53 = 16, 1143 v60 = 17 1144 } 1145 1146 private string getHostArchitecture() 1147 { 1148 import os = std.system; 1149 1150 version (X86_64) 1151 string arch = "x86_64 "; 1152 else version (X86) 1153 string arch = "x86 "; 1154 else version (AArch64) 1155 string arch = "aarch64 "; 1156 else version (ARM_HardFloat) 1157 string arch = "armhf "; 1158 else version (ARM) 1159 string arch = "arm "; 1160 else version (PPC64) 1161 string arch = "ppc64 "; 1162 else version (PPC) 1163 string arch = "ppc "; 1164 else 1165 string arch = "unknown "; 1166 1167 return arch ~ os.endian.to!string; 1168 } 1169 1170 private static immutable hostArchitecture = getHostArchitecture;