1 /** 2 Low level mongodb protocol. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.connection; 9 10 // /// prints ALL modern OP_MSG queries and legacy runCommand invocations to logDiagnostic 11 // debug = VibeVerboseMongo; 12 13 public import vibe.data.bson; 14 15 import vibe.core.core : vibeVersionString; 16 import vibe.core.log; 17 import vibe.core.net; 18 import vibe.data.bson; 19 import vibe.db.mongo.flags; 20 import vibe.db.mongo.settings; 21 import vibe.inet.webform; 22 import vibe.stream.tls; 23 24 import std.algorithm : findSplit, map, splitter; 25 import std.array; 26 import std.conv; 27 import std.digest.md; 28 import std.exception; 29 import std.range; 30 import std.string; 31 import std.traits : hasIndirections; 32 import std.typecons; 33 34 import core.time; 35 36 private struct _MongoErrorDescription 37 { 38 string message; 39 int code; 40 int connectionId; 41 int n; 42 double ok; 43 } 44 45 /** 46 * D POD representation of Mongo error object. 47 * 48 * For successful queries "code" is negative. 49 * Can be used also to check how many documents where updated upon 50 * a successful query via "n" field. 51 */ 52 alias MongoErrorDescription = immutable(_MongoErrorDescription); 53 54 /** 55 * Root class for vibe.d Mongo driver exception hierarchy. 56 */ 57 class MongoException : Exception 58 { 59 @safe: 60 61 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 62 { 63 super(message, file, line, next); 64 } 65 } 66 67 /** 68 * Generic class for all exception related to unhandled driver problems. 69 * 70 * I.e.: protocol mismatch or unexpected mongo service behavior. 71 */ 72 class MongoDriverException : MongoException 73 { 74 @safe: 75 76 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 77 { 78 super(message, file, line, next); 79 } 80 } 81 82 /** 83 * Wrapper class for all inner mongo collection manipulation errors. 84 * 85 * It does not indicate problem with vibe.d driver itself. Most frequently this 86 * one is thrown when MongoConnection is in checked mode and getLastError() has something interesting. 87 */ 88 class MongoDBException : MongoException 89 { 90 @safe: 91 92 MongoErrorDescription description; 93 94 this(MongoErrorDescription description, string file = __FILE__, 95 size_t line = __LINE__, Throwable next = null) 96 { 97 super(description.message, file, line, next); 98 this.description = description; 99 } 100 101 // NOTE: .message is a @future member of Throwable 102 deprecated("Use .msg instead.") alias message = msg; 103 @property int code() const nothrow { return description.code; }; 104 @property int connectionId() const nothrow { return description.connectionId; }; 105 @property int n() const nothrow { return description.n; }; 106 @property double ok() const nothrow { return description.ok; }; 107 } 108 109 /** 110 * Generic class for all exceptions related to authentication problems. 111 * 112 * I.e.: unsupported mechanisms or wrong credentials. 113 */ 114 class MongoAuthException : MongoException 115 { 116 @safe: 117 118 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 119 { 120 super(message, file, line, next); 121 } 122 } 123 124 /** 125 [internal] Provides low-level mongodb protocol access. 126 127 It is not intended for direct usage. Please use vibe.db.mongo.db and vibe.db.mongo.collection modules for your code. 128 Note that a MongoConnection may only be used from one fiber/thread at a time. 129 */ 130 final class MongoConnection { 131 @safe: 132 133 import vibe.stream.wrapper /* : StreamOutputRange, streamOutputRange */; 134 import vibe.internal.interfaceproxy; 135 import vibe.core.stream : InputStream, Stream; 136 137 private { 138 MongoClientSettings m_settings; 139 TCPConnection m_conn; 140 InterfaceProxy!Stream m_stream; 141 ulong m_bytesRead; 142 int m_msgid = 1; 143 StreamOutputRange!(InterfaceProxy!Stream) m_outRange; 144 ServerDescription m_description; 145 /// Flag to prevent recursive connections when server closes connection while connecting 146 bool m_allowReconnect; 147 bool m_isAuthenticating; 148 bool m_supportsOpMsg; 149 } 150 151 enum ushort defaultPort = MongoClientSettings.defaultPort; 152 153 /// Simplified constructor overload, with no m_settings 154 this(string server, ushort port = defaultPort) 155 { 156 m_settings = new MongoClientSettings(); 157 m_settings.hosts ~= MongoHost(server, port); 158 } 159 160 this(MongoClientSettings cfg) 161 { 162 m_settings = cfg; 163 164 // Now let's check for features that are not yet supported. 165 if(m_settings.hosts.length > 1) 166 logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s", 167 m_settings.hosts[0].name, m_settings.hosts[0].port); 168 } 169 170 void connect() 171 { 172 bool isTLS; 173 174 /* 175 * TODO: Connect to one of the specified hosts taking into consideration 176 * options such as connect timeouts and so on. 177 */ 178 try { 179 import core.time : Duration, msecs; 180 181 auto connectTimeout = m_settings.connectTimeoutMS.msecs; 182 if (m_settings.connectTimeoutMS == 0) 183 connectTimeout = Duration.max; 184 185 m_conn = connectTCP(m_settings.hosts[0].name, m_settings.hosts[0].port, null, 0, connectTimeout); 186 m_conn.tcpNoDelay = true; 187 if (m_settings.socketTimeout != Duration.zero) 188 m_conn.readTimeout = m_settings.socketTimeout; 189 if (m_settings.ssl) { 190 auto ctx = createTLSContext(TLSContextKind.client); 191 if (!m_settings.sslverifycertificate) { 192 ctx.peerValidationMode = TLSPeerValidationMode.none; 193 } 194 if (m_settings.sslPEMKeyFile) { 195 ctx.useCertificateChainFile(m_settings.sslPEMKeyFile); 196 ctx.usePrivateKeyFile(m_settings.sslPEMKeyFile); 197 } 198 if (m_settings.sslCAFile) { 199 ctx.useTrustedCertificateFile(m_settings.sslCAFile); 200 } 201 202 m_stream = createTLSStream(m_conn, ctx, m_settings.hosts[0].name); 203 isTLS = true; 204 } 205 else { 206 m_stream = m_conn; 207 } 208 m_outRange = streamOutputRange(m_stream); 209 } 210 catch (Exception e) { 211 throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e); 212 } 213 214 scope (failure) disconnect(); 215 216 m_allowReconnect = false; 217 scope (exit) 218 m_allowReconnect = true; 219 220 Bson handshake = Bson.emptyObject; 221 static assert(!is(typeof(m_settings.loadBalanced)), "loadBalanced was added to the API, set legacy if it's true here!"); 222 // TODO: must use legacy handshake if m_settings.loadBalanced is true 223 // and also once we allow configuring a server API version in the driver 224 // (https://github.com/mongodb/specifications/blob/master/source/versioned-api/versioned-api.rst) 225 m_supportsOpMsg = false; 226 bool legacyHandshake = false; 227 if (legacyHandshake) 228 { 229 handshake["isMaster"] = Bson(1); 230 handshake["helloOk"] = Bson(1); 231 } 232 else 233 { 234 handshake["hello"] = Bson(1); 235 m_supportsOpMsg = true; 236 } 237 238 import os = std.system; 239 import compiler = std.compiler; 240 string platform = compiler.name ~ " " 241 ~ compiler.version_major.to!string ~ "." ~ compiler.version_minor.to!string; 242 // TODO: add support for os.version 243 244 handshake["client"] = Bson([ 245 "driver": Bson(["name": Bson("vibe.db.mongo"), "version": Bson(vibeVersionString)]), 246 "os": Bson(["type": Bson(os.os.to!string), "architecture": Bson(hostArchitecture)]), 247 "platform": Bson(platform) 248 ]); 249 250 if (m_settings.appName.length) { 251 enforce!MongoAuthException(m_settings.appName.length <= 128, 252 "The application name may not be larger than 128 bytes"); 253 handshake["client"]["application"] = Bson(["name": Bson(m_settings.appName)]); 254 } 255 256 auto reply = runCommand!(Bson, MongoAuthException)("admin", handshake); 257 m_description = deserializeBson!ServerDescription(reply); 258 259 if (m_description.satisfiesVersion(WireVersion.v36)) 260 m_supportsOpMsg = true; 261 262 m_bytesRead = 0; 263 auto authMechanism = m_settings.authMechanism; 264 if (authMechanism == MongoAuthMechanism.none) 265 { 266 if (m_settings.sslPEMKeyFile != null && m_description.satisfiesVersion(WireVersion.v26)) 267 { 268 authMechanism = MongoAuthMechanism.mongoDBX509; 269 } 270 else if (m_settings.digest.length) 271 { 272 // SCRAM-SHA-1 default since 3.0, otherwise use legacy authentication 273 if (m_description.satisfiesVersion(WireVersion.v30)) 274 authMechanism = MongoAuthMechanism.scramSHA1; 275 else 276 authMechanism = MongoAuthMechanism.mongoDBCR; 277 } 278 } 279 280 if (authMechanism == MongoAuthMechanism.mongoDBCR && m_description.satisfiesVersion(WireVersion.v40)) 281 throw new MongoAuthException("Trying to force MONGODB-CR authentication on a >=4.0 server not supported"); 282 283 if (authMechanism == MongoAuthMechanism.scramSHA1 && !m_description.satisfiesVersion(WireVersion.v30)) 284 throw new MongoAuthException("Trying to force SCRAM-SHA-1 authentication on a <3.0 server not supported"); 285 286 if (authMechanism == MongoAuthMechanism.mongoDBX509 && !m_description.satisfiesVersion(WireVersion.v26)) 287 throw new MongoAuthException("Trying to force MONGODB-X509 authentication on a <2.6 server not supported"); 288 289 if (authMechanism == MongoAuthMechanism.mongoDBX509 && !isTLS) 290 throw new MongoAuthException("Trying to force MONGODB-X509 authentication, but didn't use ssl!"); 291 292 m_isAuthenticating = true; 293 scope (exit) 294 m_isAuthenticating = false; 295 final switch (authMechanism) 296 { 297 case MongoAuthMechanism.none: 298 break; 299 case MongoAuthMechanism.mongoDBX509: 300 certAuthenticate(); 301 break; 302 case MongoAuthMechanism.scramSHA1: 303 scramAuthenticate(); 304 break; 305 case MongoAuthMechanism.mongoDBCR: 306 authenticate(); 307 break; 308 } 309 } 310 311 void disconnect() 312 { 313 if (m_conn) { 314 if (m_stream && m_conn.connected) { 315 m_outRange.flush(); 316 317 m_stream.finalize(); 318 m_stream = InterfaceProxy!Stream.init; 319 } 320 321 m_conn.close(); 322 m_conn = TCPConnection.init; 323 } 324 325 m_outRange.drop(); 326 } 327 328 @property bool connected() const { return m_conn && m_conn.connected; } 329 330 @property const(ServerDescription) description() const { return m_description; } 331 332 deprecated("Non-functional since MongoDB 5.1") void update(string collection_name, UpdateFlags flags, Bson selector, Bson update) 333 { 334 scope(failure) disconnect(); 335 send(OpCode.Update, -1, cast(int)0, collection_name, cast(int)flags, selector, update); 336 if (m_settings.safe) checkForError(collection_name); 337 } 338 339 deprecated("Non-functional since MongoDB 5.1") void insert(string collection_name, InsertFlags flags, Bson[] documents) 340 { 341 scope(failure) disconnect(); 342 foreach (d; documents) if (d["_id"].isNull()) d["_id"] = Bson(BsonObjectID.generate()); 343 send(OpCode.Insert, -1, cast(int)flags, collection_name, documents); 344 if (m_settings.safe) checkForError(collection_name); 345 } 346 347 deprecated("Non-functional since MongoDB 5.1: use `find` to query collections instead - instead of `$cmd` use `runCommand` to send commands - use listIndexes and listCollections instead of `<database>.system.indexes` and `<database>.system.namsepsaces`") 348 void query(T)(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 349 { 350 scope(failure) disconnect(); 351 flags |= m_settings.defQueryFlags; 352 int id; 353 if (returnFieldSelector.isNull) 354 id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query); 355 else 356 id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query, returnFieldSelector); 357 recvReply!T(id, on_msg, on_doc); 358 } 359 360 /** 361 Runs the given Bson command (Bson object with the first entry in the map 362 being the command name) on the given database. 363 364 Using `runCommand` checks that the command completed successfully by 365 checking that `result["ok"].get!double == 1.0`. Throws the 366 `CommandFailException` on failure. 367 368 Using `runCommandUnchecked` will return the result as-is. Developers may 369 check the `result["ok"]` value themselves. (It's a double that needs to 370 be compared with 1.0 by default) 371 372 Throws: 373 - `CommandFailException` (template argument) only in the 374 `runCommand` overload, when the command response is not ok. 375 - `MongoDriverException` when internal protocol errors occur. 376 */ 377 Bson runCommand(T, CommandFailException = MongoDriverException)( 378 string database, 379 Bson command, 380 string errorInfo = __FUNCTION__, 381 string errorFile = __FILE__, 382 size_t errorLine = __LINE__ 383 ) 384 in(database.length, "runCommand requires a database argument") 385 { 386 return runCommandImpl!(T, CommandFailException)( 387 database, command, true, errorInfo, errorFile, errorLine); 388 } 389 390 Bson runCommandUnchecked(T, CommandFailException = MongoDriverException)( 391 string database, 392 Bson command, 393 string errorInfo = __FUNCTION__, 394 string errorFile = __FILE__, 395 size_t errorLine = __LINE__ 396 ) 397 in(database.length, "runCommand requires a database argument") 398 { 399 return runCommandImpl!(T, CommandFailException)( 400 database, command, false, errorInfo, errorFile, errorLine); 401 } 402 403 private Bson runCommandImpl(T, CommandFailException)( 404 string database, 405 Bson command, 406 bool testOk = true, 407 string errorInfo = __FUNCTION__, 408 string errorFile = __FILE__, 409 size_t errorLine = __LINE__ 410 ) 411 in(database.length, "runCommand requires a database argument") 412 { 413 import std.array; 414 415 string formatErrorInfo(string msg) @safe 416 { 417 return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")"); 418 } 419 420 Bson ret; 421 422 if (m_supportsOpMsg) 423 { 424 debug (VibeVerboseMongo) 425 logDiagnostic("runCommand: [db=%s] %s", database, command); 426 427 command["$db"] = Bson(database); 428 429 auto id = sendMsg(-1, 0, command); 430 Appender!(Bson[])[string] docs; 431 recvMsg!true(id, (flags, root) @safe { 432 ret = root; 433 }, (scope ident, size) @safe { 434 docs[ident.idup] = appender!(Bson[]); 435 }, (scope ident, push) @safe { 436 auto pd = ident in docs; 437 assert(!!pd, "Received data for unexpected identifier"); 438 pd.put(push); 439 }); 440 441 foreach (ident, app; docs) 442 ret[ident] = Bson(app.data); 443 } 444 else 445 { 446 debug (VibeVerboseMongo) 447 logDiagnostic("runCommand(legacy): [db=%s] %s", database, command); 448 auto id = send(OpCode.Query, -1, 0, database ~ ".$cmd", 0, -1, command, Bson(null)); 449 recvReply!T(id, 450 (cursor, flags, first_doc, num_docs) { 451 logTrace("runCommand(%s) flags: %s, cursor: %s, documents: %s", database, flags, cursor, num_docs); 452 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), formatErrorInfo("command query failed")); 453 enforce!MongoDriverException(num_docs == 1, formatErrorInfo("received more than one document in command response")); 454 }, 455 (idx, ref doc) { 456 ret = doc; 457 }); 458 } 459 460 if (testOk && ret["ok"].get!double != 1.0) 461 throw new CommandFailException(formatErrorInfo("command failed: " 462 ~ ret["errmsg"].opt!string("(no message)"))); 463 464 static if (is(T == Bson)) return ret; 465 else { 466 T doc = deserializeBson!T(bson); 467 return doc; 468 } 469 } 470 471 template getMore(T) 472 { 473 deprecated("use the modern overload instead") 474 void getMore(string collection_name, int nret, long cursor_id, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 475 { 476 scope(failure) disconnect(); 477 auto parts = collection_name.findSplit("."); 478 auto id = send(OpCode.GetMore, -1, cast(int)0, parts[0], parts[2], nret, cursor_id); 479 recvReply!T(id, on_msg, on_doc); 480 } 481 482 /** 483 * Modern (MongoDB 3.2+ compatible) getMore implementation using the getMore 484 * command and OP_MSG. (if supported) 485 * 486 * Falls back to compatibility for older MongoDB versions, but those are not 487 * officially supported anymore. 488 * 489 * Upgrade_notes: 490 * - error checking is now done inside this function 491 * - document index is no longer sent, instead the callback is called sequentially 492 * 493 * Throws: $(LREF MongoDriverException) in case the command fails. 494 */ 495 void getMore(long cursor_id, string database, string collection_name, long nret, 496 scope GetMoreHeaderDelegate on_header, 497 scope GetMoreDocumentDelegate!T on_doc, 498 Duration timeout = Duration.max, 499 string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__) 500 { 501 Bson command = Bson.emptyObject; 502 command["getMore"] = Bson(cursor_id); 503 command["$db"] = Bson(database); 504 command["collection"] = Bson(collection_name); 505 if (nret > 0) 506 command["batchSize"] = Bson(nret); 507 if (timeout != Duration.max && timeout.total!"msecs" < int.max) 508 command["maxTimeMS"] = Bson(cast(int)timeout.total!"msecs"); 509 510 string formatErrorInfo(string msg) @safe 511 { 512 return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")"); 513 } 514 515 scope (failure) disconnect(); 516 517 if (m_supportsOpMsg) 518 { 519 enum needsDup = hasIndirections!T || is(T == Bson); 520 521 debug (VibeVerboseMongo) 522 logDiagnostic("getMore: [db=%s] %s", database, command); 523 524 auto id = sendMsg(-1, 0, command); 525 recvMsg!needsDup(id, (flags, scope root) @safe { 526 if (root["ok"].get!double != 1.0) 527 throw new MongoDriverException(formatErrorInfo("getMore failed: " 528 ~ root["errmsg"].opt!string("(no message)"))); 529 530 auto cursor = root["cursor"]; 531 auto batch = cursor["nextBatch"].get!(Bson[]); 532 on_header(cursor["id"].get!long, cursor["ns"].get!string, batch.length); 533 534 foreach (ref push; batch) 535 { 536 T doc = deserializeBson!T(push); 537 on_doc(doc); 538 } 539 }, (scope ident, size) @safe {}, (scope ident, scope push) @safe { 540 throw new MongoDriverException(formatErrorInfo("unexpected section type 1 in getMore response")); 541 }); 542 } 543 else 544 { 545 debug (VibeVerboseMongo) 546 logDiagnostic("getMore(legacy): [db=%s] collection=%s, cursor=%s, nret=%s", database, collection_name, cursor_id, nret); 547 548 int brokenId = 0; 549 int nextId = 0; 550 int num_docs; 551 // array to store out-of-order items, to push them into the callback properly 552 T[] compatibilitySort; 553 string full_name = database ~ '.' ~ collection_name; 554 auto id = send(OpCode.GetMore, -1, cast(int)0, full_name, nret, cursor_id); 555 recvReply!T(id, (long cursor, ReplyFlags flags, int first_doc, int num_docs) 556 { 557 enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), 558 formatErrorInfo("Invalid cursor handle.")); 559 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), 560 formatErrorInfo("Query failed. Does the database exist?")); 561 562 on_header(cursor, full_name, num_docs); 563 }, (size_t idx, ref T doc) { 564 if (cast(int)idx == nextId) { 565 on_doc(doc); 566 nextId++; 567 brokenId = nextId; 568 } else { 569 enforce!MongoDriverException(idx >= brokenId, 570 formatErrorInfo("Got legacy document with same id after having already processed it!")); 571 enforce!MongoDriverException(idx < num_docs, 572 formatErrorInfo("Received more documents than the database reported to us")); 573 574 size_t arrayIndex = cast(int)idx - brokenId; 575 if (!compatibilitySort.length) 576 compatibilitySort.length = num_docs - brokenId; 577 compatibilitySort[arrayIndex] = doc; 578 } 579 }); 580 581 foreach (doc; compatibilitySort) 582 on_doc(doc); 583 } 584 } 585 } 586 587 /// Forwards the `find` command passed in to the database, handles the 588 /// callbacks like with getMore. This exists for easier integration with 589 /// MongoCursor!T. 590 package void startFind(T)(Bson command, 591 scope GetMoreHeaderDelegate on_header, 592 scope GetMoreDocumentDelegate!T on_doc, 593 string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__) 594 { 595 string formatErrorInfo(string msg) @safe 596 { 597 return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")"); 598 } 599 600 scope (failure) disconnect(); 601 602 enforce!MongoDriverException(m_supportsOpMsg, formatErrorInfo("Database does not support required OP_MSG for new style queries")); 603 604 enum needsDup = hasIndirections!T || is(T == Bson); 605 606 debug (VibeVerboseMongo) 607 logDiagnostic("startFind: %s", command); 608 609 auto id = sendMsg(-1, 0, command); 610 recvMsg!needsDup(id, (flags, scope root) @safe { 611 if (root["ok"].get!double != 1.0) 612 throw new MongoDriverException(formatErrorInfo("find failed: " 613 ~ root["errmsg"].opt!string("(no message)"))); 614 615 auto cursor = root["cursor"]; 616 auto batch = cursor["firstBatch"].get!(Bson[]); 617 on_header(cursor["id"].get!long, cursor["ns"].get!string, batch.length); 618 619 foreach (ref push; batch) 620 { 621 T doc = deserializeBson!T(push); 622 on_doc(doc); 623 } 624 }, (scope ident, size) @safe {}, (scope ident, scope push) @safe { 625 throw new MongoDriverException(formatErrorInfo("unexpected section type 1 in find response")); 626 }); 627 } 628 629 deprecated("Non-functional since MongoDB 5.1") void delete_(string collection_name, DeleteFlags flags, Bson selector) 630 { 631 scope(failure) disconnect(); 632 send(OpCode.Delete, -1, cast(int)0, collection_name, cast(int)flags, selector); 633 if (m_settings.safe) checkForError(collection_name); 634 } 635 636 deprecated("Non-functional since MongoDB 5.1, use the overload taking the collection as well") 637 void killCursors(scope long[] cursors) 638 { 639 scope(failure) disconnect(); 640 send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors); 641 } 642 643 void killCursors(string collection, scope long[] cursors) 644 { 645 scope(failure) disconnect(); 646 // TODO: could add special case to runCommand to not return anything 647 if (m_supportsOpMsg) 648 { 649 Bson command = Bson.emptyObject; 650 auto parts = collection.findSplit("."); 651 command["killCursors"] = Bson(parts[2]); 652 command["cursors"] = () @trusted { return cursors; } ().serializeToBson; // NOTE: "escaping" scope here 653 runCommand!Bson(parts[0], command); 654 } 655 else 656 { 657 send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors); 658 } 659 } 660 661 MongoErrorDescription getLastError(string db) 662 { 663 // Though higher level abstraction level by concept, this function 664 // is implemented here to allow to check errors upon every request 665 // on connection level. 666 667 Bson command_and_options = Bson.emptyObject; 668 command_and_options["getLastError"] = Bson(1.0); 669 670 if(m_settings.w != m_settings.w.init) 671 command_and_options["w"] = m_settings.w; // Already a Bson struct 672 if(m_settings.wTimeoutMS != m_settings.wTimeoutMS.init) 673 command_and_options["wtimeout"] = Bson(m_settings.wTimeoutMS); 674 if(m_settings.journal) 675 command_and_options["j"] = Bson(true); 676 if(m_settings.fsync) 677 command_and_options["fsync"] = Bson(true); 678 679 _MongoErrorDescription ret; 680 681 auto error = runCommandUnchecked!Bson(db, command_and_options); 682 683 try { 684 ret = MongoErrorDescription( 685 error["errmsg"].opt!string(error["err"].opt!string("")), 686 error["code"].opt!int(-1), 687 error["connectionId"].opt!int(-1), 688 error["n"].opt!int(-1), 689 error["ok"].get!double() 690 ); 691 } catch (Exception e) { 692 throw new MongoDriverException(e.msg); 693 } 694 695 return ret; 696 } 697 698 /** Queries the server for all databases. 699 700 Returns: 701 An input range of $(D MongoDBInfo) values. 702 */ 703 auto listDatabases() 704 { 705 string cn = m_settings.database == string.init ? "admin" : m_settings.database; 706 707 auto cmd = Bson(["listDatabases":Bson(1)]); 708 709 static MongoDBInfo toInfo(const(Bson) db_doc) { 710 return MongoDBInfo( 711 db_doc["name"].get!string, 712 // double on MongoDB < 5.0, long afterwards 713 db_doc["sizeOnDisk"].to!double, 714 db_doc["empty"].get!bool 715 ); 716 } 717 718 auto result = runCommand!Bson(cn, cmd)["databases"]; 719 720 return result.byValue.map!toInfo; 721 } 722 723 private int recvMsg(bool dupBson = true)(int reqid, 724 scope MsgReplyDelegate!dupBson on_sec0, 725 scope MsgSection1StartDelegate on_sec1_start, 726 scope MsgSection1Delegate!dupBson on_sec1_doc) 727 { 728 import std.traits; 729 730 auto bytes_read = m_bytesRead; 731 int msglen = recvInt(); 732 int resid = recvInt(); 733 int respto = recvInt(); 734 int opcode = recvInt(); 735 736 enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!"); 737 enforce!MongoDriverException(opcode == OpCode.Msg, "Got wrong reply type! (must be OP_MSG)"); 738 739 uint flagBits = recvUInt(); 740 const bool hasCRC = (flagBits & (1 << 16)) != 0; 741 742 int sectionLength = cast(int)(msglen - 4 * int.sizeof - flagBits.sizeof); 743 if (hasCRC) 744 sectionLength -= uint.sizeof; // CRC present 745 746 bool gotSec0; 747 while (m_bytesRead - bytes_read < sectionLength) { 748 // TODO: directly deserialize from the wire 749 static if (!dupBson) { 750 ubyte[256] buf = void; 751 ubyte[] bufsl = buf; 752 } 753 754 ubyte payloadType = recvUByte(); 755 switch (payloadType) { 756 case 0: 757 gotSec0 = true; 758 static if (dupBson) 759 auto data = recvBsonDup(); 760 else 761 scope data = (() @trusted => recvBson(bufsl))(); 762 763 debug (VibeVerboseMongo) 764 logDiagnostic("recvData: sec0[flags=%x]: %s", flagBits, data); 765 on_sec0(flagBits, data); 766 break; 767 case 1: 768 if (!gotSec0) 769 throw new MongoDriverException("Got OP_MSG section 1 before section 0, which is not supported by vibe.d"); 770 771 auto section_bytes_read = m_bytesRead; 772 int size = recvInt(); 773 auto identifier = recvCString(); 774 on_sec1_start(identifier, size); 775 while (m_bytesRead - section_bytes_read < size) { 776 static if (dupBson) 777 auto data = recvBsonDup(); 778 else 779 scope data = (() @trusted => recvBson(bufsl))(); 780 781 debug (VibeVerboseMongo) 782 logDiagnostic("recvData: sec1[%s]: %s", identifier, data); 783 784 on_sec1_doc(identifier, data); 785 } 786 break; 787 default: 788 throw new MongoDriverException("Received unexpected payload section type " ~ payloadType.to!string); 789 } 790 } 791 792 if (hasCRC) 793 { 794 uint crc = recvUInt(); 795 // TODO: validate CRC 796 logDiagnostic("recvData: crc=%s (discarded)", crc); 797 } 798 799 assert(bytes_read + msglen == m_bytesRead, 800 format!"Packet size mismatch! Expected %s bytes, but read %s."( 801 msglen, m_bytesRead - bytes_read)); 802 803 return resid; 804 } 805 806 private int recvReply(T)(int reqid, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 807 { 808 auto bytes_read = m_bytesRead; 809 int msglen = recvInt(); 810 int resid = recvInt(); 811 int respto = recvInt(); 812 int opcode = recvInt(); 813 814 enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!"); 815 enforce!MongoDriverException(opcode == OpCode.Reply, "Got a non-'Reply' reply!"); 816 817 auto flags = cast(ReplyFlags)recvInt(); 818 long cursor = recvLong(); 819 int start = recvInt(); 820 int numret = recvInt(); 821 822 scope (exit) { 823 if (m_bytesRead - bytes_read < msglen) { 824 logWarn("MongoDB reply was longer than expected, skipping the rest: %d vs. %d", msglen, m_bytesRead - bytes_read); 825 ubyte[] dst = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)]; 826 recv(dst); 827 } else if (m_bytesRead - bytes_read > msglen) { 828 logWarn("MongoDB reply was shorter than expected. Dropping connection."); 829 disconnect(); 830 throw new MongoDriverException("MongoDB reply was too short for data."); 831 } 832 } 833 834 on_msg(cursor, flags, start, numret); 835 static if (hasIndirections!T || is(T == Bson)) 836 auto buf = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)]; 837 foreach (i; 0 .. cast(size_t)numret) { 838 // TODO: directly deserialize from the wire 839 static if (!hasIndirections!T && !is(T == Bson)) { 840 ubyte[256] buf = void; 841 ubyte[] bufsl = buf; 842 auto bson = () @trusted { return recvBson(bufsl); } (); 843 } else { 844 auto bson = () @trusted { return recvBson(buf); } (); 845 } 846 847 // logDebugV("Received mongo response on %s:%s: %s", reqid, i, bson); 848 849 static if (is(T == Bson)) on_doc(i, bson); 850 else { 851 T doc = deserializeBson!T(bson); 852 on_doc(i, doc); 853 } 854 } 855 856 return resid; 857 } 858 859 private int send(ARGS...)(OpCode code, int response_to, scope ARGS args) 860 { 861 if( !connected() ) { 862 if (m_allowReconnect) connect(); 863 else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating"); 864 else throw new MongoDriverException("Connection got closed while connecting"); 865 } 866 int id = nextMessageId(); 867 // sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types 868 sendValue!int(16 + sendLength(args)); 869 sendValue!int(id); 870 sendValue!int(response_to); 871 sendValue!int(cast(int)code); 872 foreach (a; args) sendValue(a); 873 m_outRange.flush(); 874 // logDebugV("Sent mongo opcode %s (id %s) in response to %s with args %s", code, id, response_to, tuple(args)); 875 return id; 876 } 877 878 private int sendMsg(int response_to, uint flagBits, Bson document) 879 { 880 if( !connected() ) { 881 if (m_allowReconnect) connect(); 882 else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating"); 883 else throw new MongoDriverException("Connection got closed while connecting"); 884 } 885 int id = nextMessageId(); 886 // sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types 887 sendValue!int(21 + sendLength(document)); 888 sendValue!int(id); 889 sendValue!int(response_to); 890 sendValue!int(cast(int)OpCode.Msg); 891 sendValue!uint(flagBits); 892 const bool hasCRC = (flagBits & (1 << 16)) != 0; 893 assert(!hasCRC, "sending with CRC bits not yet implemented"); 894 sendValue!ubyte(0); 895 sendValue(document); 896 m_outRange.flush(); 897 return id; 898 } 899 900 private void sendValue(T)(scope T value) 901 { 902 import std.traits; 903 static if (is(T == ubyte)) m_outRange.put(value); 904 else static if (is(T == int) || is(T == uint)) sendBytes(toBsonData(value)); 905 else static if (is(T == long)) sendBytes(toBsonData(value)); 906 else static if (is(T == Bson)) sendBytes(() @trusted { return value.data; } ()); 907 else static if (is(T == string)) { 908 sendBytes(cast(const(ubyte)[])value); 909 sendBytes(cast(const(ubyte)[])"\0"); 910 } else static if (isArray!T) { 911 foreach (v; value) 912 sendValue(v); 913 } else static assert(false, "Unexpected type: "~T.stringof); 914 } 915 916 private void sendBytes(scope const(ubyte)[] data){ m_outRange.put(data); } 917 918 private T recvInteger(T)() { ubyte[T.sizeof] ret; recv(ret); return fromBsonData!T(ret); } 919 private alias recvUByte = recvInteger!ubyte; 920 private alias recvInt = recvInteger!int; 921 private alias recvUInt = recvInteger!uint; 922 private alias recvLong = recvInteger!long; 923 private Bson recvBson(ref ubyte[] buf) 924 @system { 925 int len = recvInt(); 926 ubyte[] dst; 927 if (len > buf.length) dst = new ubyte[len]; 928 else { 929 dst = buf[0 .. len]; 930 buf = buf[len .. $]; 931 } 932 dst[0 .. 4] = toBsonData(len)[]; 933 recv(dst[4 .. $]); 934 return Bson(Bson.Type.object, cast(immutable)dst); 935 } 936 private Bson recvBsonDup() 937 @trusted { 938 ubyte[4] size; 939 recv(size[]); 940 ubyte[] dst = new ubyte[fromBsonData!uint(size)]; 941 dst[0 .. 4] = size; 942 recv(dst[4 .. $]); 943 return Bson(Bson.Type.object, cast(immutable)dst); 944 } 945 private void recv(scope ubyte[] dst) { enforce(m_stream); m_stream.read(dst); m_bytesRead += dst.length; } 946 private const(char)[] recvCString() 947 { 948 auto buf = new ubyte[32]; 949 ptrdiff_t i = -1; 950 do 951 { 952 i++; 953 if (i == buf.length) buf.length *= 2; 954 recv(buf[i .. i + 1]); 955 } while (buf[i] != 0); 956 return cast(const(char)[])buf[0 .. i]; 957 } 958 959 private int nextMessageId() { return m_msgid++; } 960 961 private void checkForError(string collection_name) 962 { 963 auto coll = collection_name.split(".")[0]; 964 auto err = getLastError(coll); 965 966 enforce( 967 err.code < 0, 968 new MongoDBException(err) 969 ); 970 } 971 972 private void certAuthenticate() 973 { 974 Bson cmd = Bson.emptyObject; 975 cmd["authenticate"] = Bson(1); 976 cmd["mechanism"] = Bson("MONGODB-X509"); 977 if (m_description.satisfiesVersion(WireVersion.v34)) 978 { 979 if (m_settings.username.length) 980 cmd["user"] = Bson(m_settings.username); 981 } 982 else 983 { 984 if (!m_settings.username.length) 985 throw new MongoAuthException("No username provided but connected to MongoDB server <=3.2 not supporting this"); 986 987 cmd["user"] = Bson(m_settings.username); 988 } 989 runCommand!(Bson, MongoAuthException)(m_settings.getAuthDatabase, cmd); 990 } 991 992 private void authenticate() 993 { 994 scope (failure) disconnect(); 995 996 string cn = m_settings.getAuthDatabase; 997 998 auto cmd = Bson(["getnonce": Bson(1)]); 999 auto result = runCommand!(Bson, MongoAuthException)(cn, cmd); 1000 string nonce = result["nonce"].get!string; 1001 string key = toLower(toHexString(md5Of(nonce ~ m_settings.username ~ m_settings.digest)).idup); 1002 1003 cmd = Bson.emptyObject; 1004 cmd["authenticate"] = Bson(1); 1005 cmd["mechanism"] = Bson("MONGODB-CR"); 1006 cmd["nonce"] = Bson(nonce); 1007 cmd["user"] = Bson(m_settings.username); 1008 cmd["key"] = Bson(key); 1009 runCommand!(Bson, MongoAuthException)(cn, cmd); 1010 } 1011 1012 private void scramAuthenticate() 1013 { 1014 import vibe.db.mongo.sasl; 1015 1016 string cn = m_settings.getAuthDatabase; 1017 1018 ScramState state; 1019 string payload = state.createInitialRequest(m_settings.username); 1020 1021 auto cmd = Bson.emptyObject; 1022 cmd["saslStart"] = Bson(1); 1023 cmd["mechanism"] = Bson("SCRAM-SHA-1"); 1024 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 1025 1026 auto doc = runCommand!(Bson, MongoAuthException)(cn, cmd); 1027 string response = cast(string)doc["payload"].get!BsonBinData().rawData; 1028 Bson conversationId = doc["conversationId"]; 1029 1030 payload = state.update(m_settings.digest, response); 1031 cmd = Bson.emptyObject; 1032 cmd["saslContinue"] = Bson(1); 1033 cmd["conversationId"] = conversationId; 1034 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 1035 1036 doc = runCommand!(Bson, MongoAuthException)(cn, cmd); 1037 response = cast(string)doc["payload"].get!BsonBinData().rawData; 1038 1039 payload = state.finalize(response); 1040 cmd = Bson.emptyObject; 1041 cmd["saslContinue"] = Bson(1); 1042 cmd["conversationId"] = conversationId; 1043 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 1044 runCommand!(Bson, MongoAuthException)(cn, cmd); 1045 } 1046 } 1047 1048 private enum OpCode : int { 1049 Reply = 1, // sent only by DB 1050 Update = 2001, 1051 Insert = 2002, 1052 Reserved1 = 2003, 1053 Query = 2004, 1054 GetMore = 2005, 1055 Delete = 2006, 1056 KillCursors = 2007, 1057 1058 Compressed = 2012, 1059 Msg = 2013, 1060 } 1061 1062 private alias ReplyDelegate = void delegate(long cursor, ReplyFlags flags, int first_doc, int num_docs) @safe; 1063 private template DocDelegate(T) { alias DocDelegate = void delegate(size_t idx, ref T doc) @safe; } 1064 1065 private alias MsgReplyDelegate(bool dupBson : true) = void delegate(uint flags, Bson document) @safe; 1066 private alias MsgReplyDelegate(bool dupBson : false) = void delegate(uint flags, scope Bson document) @safe; 1067 private alias MsgSection1StartDelegate = void delegate(scope const(char)[] identifier, int size) @safe; 1068 private alias MsgSection1Delegate(bool dupBson : true) = void delegate(scope const(char)[] identifier, Bson document) @safe; 1069 private alias MsgSection1Delegate(bool dupBson : false) = void delegate(scope const(char)[] identifier, scope Bson document) @safe; 1070 1071 alias GetMoreHeaderDelegate = void delegate(long id, string ns, size_t count) @safe; 1072 alias GetMoreDocumentDelegate(T) = void delegate(ref T document) @safe; 1073 1074 struct MongoDBInfo 1075 { 1076 string name; 1077 double sizeOnDisk; 1078 bool empty; 1079 } 1080 1081 private int sendLength(ARGS...)(scope ARGS args) 1082 { 1083 import std.traits; 1084 static if (ARGS.length == 1) { 1085 alias T = ARGS[0]; 1086 static if (is(T == string)) return cast(int)args[0].length + 1; 1087 else static if (is(T == int)) return 4; 1088 else static if (is(T == long)) return 8; 1089 else static if (is(T == Bson)) return cast(int)() @trusted { return args[0].data.length; } (); 1090 else static if (isArray!T) { 1091 int ret = 0; 1092 foreach (el; args[0]) ret += sendLength(el); 1093 return ret; 1094 } else static assert(false, "Unexpected type: "~T.stringof); 1095 } 1096 else if (ARGS.length == 0) return 0; 1097 else return sendLength(args[0 .. $/2]) + sendLength(args[$/2 .. $]); 1098 } 1099 1100 struct ServerDescription 1101 { 1102 enum ServerType 1103 { 1104 unknown, 1105 standalone, 1106 mongos, 1107 possiblePrimary, 1108 RSPrimary, 1109 RSSecondary, 1110 RSArbiter, 1111 RSOther, 1112 RSGhost 1113 } 1114 1115 @optional: 1116 string address; 1117 string error; 1118 float roundTripTime = 0; 1119 Nullable!BsonDate lastWriteDate; 1120 Nullable!BsonObjectID opTime; 1121 ServerType type = ServerType.unknown; 1122 WireVersion minWireVersion, maxWireVersion; 1123 string me; 1124 string[] hosts, passives, arbiters; 1125 string[string] tags; 1126 string setName; 1127 Nullable!int setVersion; 1128 Nullable!BsonObjectID electionId; 1129 string primary; 1130 string lastUpdateTime = "infinity ago"; 1131 Nullable!int logicalSessionTimeoutMinutes; 1132 1133 bool satisfiesVersion(WireVersion wireVersion) @safe const @nogc pure nothrow 1134 { 1135 return maxWireVersion >= wireVersion; 1136 } 1137 } 1138 1139 enum WireVersion : int 1140 { 1141 old = 0, 1142 v26 = 1, 1143 v26_2 = 2, 1144 v30 = 3, 1145 v32 = 4, 1146 v34 = 5, 1147 v36 = 6, 1148 v40 = 7, 1149 v42 = 8, 1150 v44 = 9, 1151 v49 = 12, 1152 v50 = 13, 1153 v51 = 14, 1154 v52 = 15, 1155 v53 = 16 1156 } 1157 1158 private string getHostArchitecture() 1159 { 1160 import os = std.system; 1161 1162 version (X86_64) 1163 string arch = "x86_64 "; 1164 else version (X86) 1165 string arch = "x86 "; 1166 else version (AArch64) 1167 string arch = "aarch64 "; 1168 else version (ARM_HardFloat) 1169 string arch = "armhf "; 1170 else version (ARM) 1171 string arch = "arm "; 1172 else version (PPC64) 1173 string arch = "ppc64 "; 1174 else version (PPC) 1175 string arch = "ppc "; 1176 else 1177 string arch = "unknown "; 1178 1179 return arch ~ os.endian.to!string; 1180 } 1181 1182 private static immutable hostArchitecture = getHostArchitecture;