1 /** 2 Low level mongodb protocol. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.connection; 9 10 public import vibe.data.bson; 11 12 import vibe.core.core : vibeVersionString; 13 import vibe.core.log; 14 import vibe.core.net; 15 import vibe.db.mongo.settings; 16 import vibe.db.mongo.flags; 17 import vibe.inet.webform; 18 import vibe.stream.tls; 19 20 import std.algorithm : map, splitter; 21 import std.array; 22 import std.conv; 23 import std.digest.md; 24 import std.exception; 25 import std.range; 26 import std.string; 27 import std.typecons; 28 29 30 private struct _MongoErrorDescription 31 { 32 string message; 33 int code; 34 int connectionId; 35 int n; 36 double ok; 37 } 38 39 /** 40 * D POD representation of Mongo error object. 41 * 42 * For successful queries "code" is negative. 43 * Can be used also to check how many documents where updated upon 44 * a successful query via "n" field. 45 */ 46 alias MongoErrorDescription = immutable(_MongoErrorDescription); 47 48 /** 49 * Root class for vibe.d Mongo driver exception hierarchy. 50 */ 51 class MongoException : Exception 52 { 53 @safe: 54 55 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 56 { 57 super(message, file, line, next); 58 } 59 } 60 61 /** 62 * Generic class for all exception related to unhandled driver problems. 63 * 64 * I.e.: protocol mismatch or unexpected mongo service behavior. 65 */ 66 class MongoDriverException : MongoException 67 { 68 @safe: 69 70 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 71 { 72 super(message, file, line, next); 73 } 74 } 75 76 /** 77 * Wrapper class for all inner mongo collection manipulation errors. 78 * 79 * It does not indicate problem with vibe.d driver itself. Most frequently this 80 * one is thrown when MongoConnection is in checked mode and getLastError() has something interesting. 81 */ 82 class MongoDBException : MongoException 83 { 84 @safe: 85 86 MongoErrorDescription description; 87 alias description this; 88 89 this(MongoErrorDescription description, string file = __FILE__, 90 size_t line = __LINE__, Throwable next = null) 91 { 92 super(description.message, file, line, next); 93 this.description = description; 94 } 95 } 96 97 /** 98 * Generic class for all exceptions related to authentication problems. 99 * 100 * I.e.: unsupported mechanisms or wrong credentials. 101 */ 102 class MongoAuthException : MongoException 103 { 104 @safe: 105 106 this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 107 { 108 super(message, file, line, next); 109 } 110 } 111 112 /** 113 [internal] Provides low-level mongodb protocol access. 114 115 It is not intended for direct usage. Please use vibe.db.mongo.db and vibe.db.mongo.collection modules for your code. 116 Note that a MongoConnection may only be used from one fiber/thread at a time. 117 */ 118 final class MongoConnection { 119 @safe: 120 121 import vibe.stream.wrapper /* : StreamOutputRange, streamOutputRange */; 122 import vibe.internal.interfaceproxy; 123 import vibe.core.stream : InputStream, Stream; 124 125 private { 126 MongoClientSettings m_settings; 127 TCPConnection m_conn; 128 InterfaceProxy!Stream m_stream; 129 ulong m_bytesRead; 130 int m_msgid = 1; 131 StreamOutputRange!(InterfaceProxy!Stream) m_outRange; 132 ServerDescription m_description; 133 /// Flag to prevent recursive connections when server closes connection while connecting 134 bool m_allowReconnect; 135 bool m_isAuthenticating; 136 } 137 138 enum ushort defaultPort = MongoClientSettings.defaultPort; 139 140 /// Simplified constructor overload, with no m_settings 141 this(string server, ushort port = defaultPort) 142 { 143 m_settings = new MongoClientSettings(); 144 m_settings.hosts ~= MongoHost(server, port); 145 } 146 147 this(MongoClientSettings cfg) 148 { 149 m_settings = cfg; 150 151 // Now let's check for features that are not yet supported. 152 if(m_settings.hosts.length > 1) 153 logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s", 154 m_settings.hosts[0].name, m_settings.hosts[0].port); 155 } 156 157 void connect() 158 { 159 bool isTLS; 160 161 /* 162 * TODO: Connect to one of the specified hosts taking into consideration 163 * options such as connect timeouts and so on. 164 */ 165 try { 166 m_conn = connectTCP(m_settings.hosts[0].name, m_settings.hosts[0].port); 167 m_conn.tcpNoDelay = true; 168 if (m_settings.ssl) { 169 auto ctx = createTLSContext(TLSContextKind.client); 170 if (!m_settings.sslverifycertificate) { 171 ctx.peerValidationMode = TLSPeerValidationMode.none; 172 } 173 if (m_settings.sslPEMKeyFile) { 174 ctx.useCertificateChainFile(m_settings.sslPEMKeyFile); 175 ctx.usePrivateKeyFile(m_settings.sslPEMKeyFile); 176 } 177 if (m_settings.sslCAFile) { 178 ctx.useTrustedCertificateFile(m_settings.sslCAFile); 179 } 180 181 m_stream = createTLSStream(m_conn, ctx, m_settings.hosts[0].name); 182 isTLS = true; 183 } 184 else { 185 m_stream = m_conn; 186 } 187 m_outRange = streamOutputRange(m_stream); 188 } 189 catch (Exception e) { 190 throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e); 191 } 192 193 m_allowReconnect = false; 194 scope (exit) 195 m_allowReconnect = true; 196 197 Bson handshake = Bson.emptyObject; 198 handshake["isMaster"] = Bson(1); 199 200 import os = std.system; 201 import compiler = std.compiler; 202 string platform = compiler.name ~ " " 203 ~ compiler.version_major.to!string ~ "." ~ compiler.version_minor.to!string; 204 // TODO: add support for os.version 205 206 handshake["client"] = Bson([ 207 "driver": Bson(["name": Bson("vibe.db.mongo"), "version": Bson(vibeVersionString)]), 208 "os": Bson(["type": Bson(os.os.to!string), "architecture": Bson(hostArchitecture)]), 209 "platform": Bson(platform) 210 ]); 211 212 if (m_settings.appName.length) { 213 enforce!MongoAuthException(m_settings.appName.length <= 128, 214 "The application name may not be larger than 128 bytes"); 215 handshake["client"]["application"] = Bson(["name": Bson(m_settings.appName)]); 216 } 217 218 query!Bson("$external.$cmd", QueryFlags.none, 0, -1, handshake, Bson(null), 219 (cursor, flags, first_doc, num_docs) { 220 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure) && num_docs == 1, 221 "Authentication handshake failed."); 222 }, 223 (idx, ref doc) { 224 enforce!MongoAuthException(doc["ok"].get!double == 1.0, "Authentication failed."); 225 m_description = deserializeBson!ServerDescription(doc); 226 }); 227 228 m_bytesRead = 0; 229 auto authMechanism = m_settings.authMechanism; 230 if (authMechanism == MongoAuthMechanism.none) 231 { 232 if (m_settings.sslPEMKeyFile != null && m_description.satisfiesVersion(WireVersion.v26)) 233 { 234 authMechanism = MongoAuthMechanism.mongoDBX509; 235 } 236 else if (m_settings.digest.length) 237 { 238 // SCRAM-SHA-1 default since 3.0, otherwise use legacy authentication 239 if (m_description.satisfiesVersion(WireVersion.v30)) 240 authMechanism = MongoAuthMechanism.scramSHA1; 241 else 242 authMechanism = MongoAuthMechanism.mongoDBCR; 243 } 244 } 245 246 if (authMechanism == MongoAuthMechanism.mongoDBCR && m_description.satisfiesVersion(WireVersion.v40)) 247 throw new MongoAuthException("Trying to force MONGODB-CR authentication on a >=4.0 server not supported"); 248 249 if (authMechanism == MongoAuthMechanism.scramSHA1 && !m_description.satisfiesVersion(WireVersion.v30)) 250 throw new MongoAuthException("Trying to force SCRAM-SHA-1 authentication on a <3.0 server not supported"); 251 252 if (authMechanism == MongoAuthMechanism.mongoDBX509 && !m_description.satisfiesVersion(WireVersion.v26)) 253 throw new MongoAuthException("Trying to force MONGODB-X509 authentication on a <2.6 server not supported"); 254 255 if (authMechanism == MongoAuthMechanism.mongoDBX509 && !isTLS) 256 throw new MongoAuthException("Trying to force MONGODB-X509 authentication, but didn't use ssl!"); 257 258 m_isAuthenticating = true; 259 scope (exit) 260 m_isAuthenticating = false; 261 final switch (authMechanism) 262 { 263 case MongoAuthMechanism.none: 264 break; 265 case MongoAuthMechanism.mongoDBX509: 266 certAuthenticate(); 267 break; 268 case MongoAuthMechanism.scramSHA1: 269 scramAuthenticate(); 270 break; 271 case MongoAuthMechanism.mongoDBCR: 272 authenticate(); 273 break; 274 } 275 } 276 277 void disconnect() 278 { 279 if (m_conn) { 280 if (m_stream && m_conn.connected) { 281 m_outRange.flush(); 282 283 m_stream.finalize(); 284 m_stream = InterfaceProxy!Stream.init; 285 } 286 287 m_conn.close(); 288 m_conn = TCPConnection.init; 289 } 290 291 m_outRange.drop(); 292 } 293 294 @property bool connected() const { return m_conn && m_conn.connected; } 295 296 @property const(ServerDescription) description() const { return m_description; } 297 298 void update(string collection_name, UpdateFlags flags, Bson selector, Bson update) 299 { 300 scope(failure) disconnect(); 301 send(OpCode.Update, -1, cast(int)0, collection_name, cast(int)flags, selector, update); 302 if (m_settings.safe) checkForError(collection_name); 303 } 304 305 void insert(string collection_name, InsertFlags flags, Bson[] documents) 306 { 307 scope(failure) disconnect(); 308 foreach (d; documents) if (d["_id"].isNull()) d["_id"] = Bson(BsonObjectID.generate()); 309 send(OpCode.Insert, -1, cast(int)flags, collection_name, documents); 310 if (m_settings.safe) checkForError(collection_name); 311 } 312 313 void query(T)(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 314 { 315 scope(failure) disconnect(); 316 flags |= m_settings.defQueryFlags; 317 int id; 318 if (returnFieldSelector.isNull) 319 id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query); 320 else 321 id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query, returnFieldSelector); 322 recvReply!T(id, on_msg, on_doc); 323 } 324 325 void getMore(T)(string collection_name, int nret, long cursor_id, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 326 { 327 scope(failure) disconnect(); 328 auto id = send(OpCode.GetMore, -1, cast(int)0, collection_name, nret, cursor_id); 329 recvReply!T(id, on_msg, on_doc); 330 } 331 332 void delete_(string collection_name, DeleteFlags flags, Bson selector) 333 { 334 scope(failure) disconnect(); 335 send(OpCode.Delete, -1, cast(int)0, collection_name, cast(int)flags, selector); 336 if (m_settings.safe) checkForError(collection_name); 337 } 338 339 void killCursors(long[] cursors) 340 { 341 scope(failure) disconnect(); 342 send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors); 343 } 344 345 MongoErrorDescription getLastError(string db) 346 { 347 // Though higher level abstraction level by concept, this function 348 // is implemented here to allow to check errors upon every request 349 // on connection level. 350 351 Bson command_and_options = Bson.emptyObject; 352 command_and_options["getLastError"] = Bson(1.0); 353 354 if(m_settings.w != m_settings.w.init) 355 command_and_options["w"] = m_settings.w; // Already a Bson struct 356 if(m_settings.wTimeoutMS != m_settings.wTimeoutMS.init) 357 command_and_options["wtimeout"] = Bson(m_settings.wTimeoutMS); 358 if(m_settings.journal) 359 command_and_options["j"] = Bson(true); 360 if(m_settings.fsync) 361 command_and_options["fsync"] = Bson(true); 362 363 _MongoErrorDescription ret; 364 365 query!Bson(db ~ ".$cmd", QueryFlags.NoCursorTimeout | m_settings.defQueryFlags, 366 0, -1, command_and_options, Bson(null), 367 (cursor, flags, first_doc, num_docs) { 368 logTrace("getLastEror(%s) flags: %s, cursor: %s, documents: %s", db, flags, cursor, num_docs); 369 enforce(!(flags & ReplyFlags.QueryFailure), 370 new MongoDriverException(format("MongoDB error: getLastError(%s) call failed.", db)) 371 ); 372 enforce( 373 num_docs == 1, 374 new MongoDriverException(format("getLastError(%s) returned %s documents instead of one.", db, num_docs)) 375 ); 376 }, 377 (idx, ref error) { 378 try { 379 ret = MongoErrorDescription( 380 error["err"].opt!string(""), 381 error["code"].opt!int(-1), 382 error["connectionId"].opt!int(-1), 383 error["n"].get!int(), 384 error["ok"].get!double() 385 ); 386 } catch (Exception e) { 387 throw new MongoDriverException(e.msg); 388 } 389 } 390 ); 391 392 return ret; 393 } 394 395 /** Queries the server for all databases. 396 397 Returns: 398 An input range of $(D MongoDBInfo) values. 399 */ 400 auto listDatabases() 401 { 402 string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd"; 403 404 auto cmd = Bson(["listDatabases":Bson(1)]); 405 406 void on_msg(long cursor, ReplyFlags flags, int first_doc, int num_docs) { 407 if ((flags & ReplyFlags.QueryFailure)) 408 throw new MongoDriverException("Calling listDatabases failed."); 409 } 410 411 static MongoDBInfo toInfo(const(Bson) db_doc) { 412 return MongoDBInfo( 413 db_doc["name"].get!string, 414 db_doc["sizeOnDisk"].get!double, 415 db_doc["empty"].get!bool 416 ); 417 } 418 419 Bson result; 420 void on_doc(size_t idx, ref Bson doc) { 421 if (doc["ok"].get!double != 1.0) 422 throw new MongoAuthException("listDatabases failed."); 423 424 result = doc["databases"]; 425 } 426 427 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), &on_msg, &on_doc); 428 429 return result.byValue.map!toInfo; 430 } 431 432 private int recvReply(T)(int reqid, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 433 { 434 import std.traits; 435 436 auto bytes_read = m_bytesRead; 437 int msglen = recvInt(); 438 int resid = recvInt(); 439 int respto = recvInt(); 440 int opcode = recvInt(); 441 442 enforce(respto == reqid, "Reply is not for the expected message on a sequential connection!"); 443 enforce(opcode == OpCode.Reply, "Got a non-'Reply' reply!"); 444 445 auto flags = cast(ReplyFlags)recvInt(); 446 long cursor = recvLong(); 447 int start = recvInt(); 448 int numret = recvInt(); 449 450 scope (exit) { 451 if (m_bytesRead - bytes_read < msglen) { 452 logWarn("MongoDB reply was longer than expected, skipping the rest: %d vs. %d", msglen, m_bytesRead - bytes_read); 453 ubyte[] dst = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)]; 454 recv(dst); 455 } else if (m_bytesRead - bytes_read > msglen) { 456 logWarn("MongoDB reply was shorter than expected. Dropping connection."); 457 disconnect(); 458 throw new MongoDriverException("MongoDB reply was too short for data."); 459 } 460 } 461 462 on_msg(cursor, flags, start, numret); 463 static if (hasIndirections!T || is(T == Bson)) 464 auto buf = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)]; 465 foreach (i; 0 .. cast(size_t)numret) { 466 // TODO: directly deserialize from the wire 467 static if (!hasIndirections!T && !is(T == Bson)) { 468 ubyte[256] buf = void; 469 ubyte[] bufsl = buf; 470 auto bson = () @trusted { return recvBson(bufsl); } (); 471 } else { 472 auto bson = () @trusted { return recvBson(buf); } (); 473 } 474 475 // logDebugV("Received mongo response on %s:%s: %s", reqid, i, bson); 476 477 static if (is(T == Bson)) on_doc(i, bson); 478 else { 479 T doc = deserializeBson!T(bson); 480 on_doc(i, doc); 481 } 482 } 483 484 return resid; 485 } 486 487 private int send(ARGS...)(OpCode code, int response_to, ARGS args) 488 { 489 if( !connected() ) { 490 if (m_allowReconnect) connect(); 491 else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating"); 492 else throw new MongoDriverException("Connection got closed while connecting"); 493 } 494 int id = nextMessageId(); 495 // sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types 496 sendValue!int(16 + sendLength(args)); 497 sendValue!int(id); 498 sendValue!int(response_to); 499 sendValue!int(cast(int)code); 500 foreach (a; args) sendValue(a); 501 m_outRange.flush(); 502 // logDebugV("Sent mongo opcode %s (id %s) in response to %s with args %s", code, id, response_to, tuple(args)); 503 return id; 504 } 505 506 private void sendValue(T)(T value) 507 { 508 import std.traits; 509 static if (is(T == int)) sendBytes(toBsonData(value)); 510 else static if (is(T == long)) sendBytes(toBsonData(value)); 511 else static if (is(T == Bson)) sendBytes(value.data); 512 else static if (is(T == string)) { 513 sendBytes(cast(const(ubyte)[])value); 514 sendBytes(cast(const(ubyte)[])"\0"); 515 } else static if (isArray!T) { 516 foreach (v; value) 517 sendValue(v); 518 } else static assert(false, "Unexpected type: "~T.stringof); 519 } 520 521 private void sendBytes(in ubyte[] data){ m_outRange.put(data); } 522 523 private int recvInt() { ubyte[int.sizeof] ret; recv(ret); return fromBsonData!int(ret); } 524 private long recvLong() { ubyte[long.sizeof] ret; recv(ret); return fromBsonData!long(ret); } 525 private Bson recvBson(ref ubyte[] buf) 526 @system { 527 int len = recvInt(); 528 ubyte[] dst; 529 if (len > buf.length) dst = new ubyte[len]; 530 else { 531 dst = buf[0 .. len]; 532 buf = buf[len .. $]; 533 } 534 dst[0 .. 4] = toBsonData(len)[]; 535 recv(dst[4 .. $]); 536 return Bson(Bson.Type.Object, cast(immutable)dst); 537 } 538 private void recv(ubyte[] dst) { enforce(m_stream); m_stream.read(dst); m_bytesRead += dst.length; } 539 540 private int nextMessageId() { return m_msgid++; } 541 542 private void checkForError(string collection_name) 543 { 544 auto coll = collection_name.split(".")[0]; 545 auto err = getLastError(coll); 546 547 enforce( 548 err.code < 0, 549 new MongoDBException(err) 550 ); 551 } 552 553 private void certAuthenticate() 554 { 555 Bson cmd = Bson.emptyObject; 556 cmd["authenticate"] = Bson(1); 557 cmd["mechanism"] = Bson("MONGODB-X509"); 558 if (m_description.satisfiesVersion(WireVersion.v34)) 559 { 560 if (m_settings.username.length) 561 cmd["user"] = Bson(m_settings.username); 562 } 563 else 564 { 565 if (!m_settings.username.length) 566 throw new MongoAuthException("No username provided but connected to MongoDB server <=3.2 not supporting this"); 567 568 cmd["user"] = Bson(m_settings.username); 569 } 570 query!Bson("$external.$cmd", QueryFlags.None, 0, -1, cmd, Bson(null), 571 (cursor, flags, first_doc, num_docs) { 572 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 573 throw new MongoDriverException("Calling authenticate failed."); 574 }, 575 (idx, ref doc) { 576 if (doc["ok"].get!double != 1.0) 577 throw new MongoAuthException("Authentication failed."); 578 } 579 ); 580 } 581 582 private void authenticate() 583 { 584 string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd"; 585 586 string nonce, key; 587 588 auto cmd = Bson(["getnonce":Bson(1)]); 589 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), 590 (cursor, flags, first_doc, num_docs) { 591 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 592 throw new MongoDriverException("Calling getNonce failed."); 593 }, 594 (idx, ref doc) { 595 if (doc["ok"].get!double != 1.0) 596 throw new MongoDriverException("getNonce failed."); 597 nonce = doc["nonce"].get!string; 598 key = toLower(toHexString(md5Of(nonce ~ m_settings.username ~ m_settings.digest)).idup); 599 } 600 ); 601 602 cmd = Bson.emptyObject; 603 cmd["authenticate"] = Bson(1); 604 cmd["mechanism"] = Bson("MONGODB-CR"); 605 cmd["nonce"] = Bson(nonce); 606 cmd["user"] = Bson(m_settings.username); 607 cmd["key"] = Bson(key); 608 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), 609 (cursor, flags, first_doc, num_docs) { 610 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 611 throw new MongoDriverException("Calling authenticate failed."); 612 }, 613 (idx, ref doc) { 614 if (doc["ok"].get!double != 1.0) 615 throw new MongoAuthException("Authentication failed."); 616 } 617 ); 618 } 619 620 private void scramAuthenticate() 621 { 622 import vibe.db.mongo.sasl; 623 string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd"; 624 625 ScramState state; 626 string payload = state.createInitialRequest(m_settings.username); 627 628 auto cmd = Bson.emptyObject; 629 cmd["saslStart"] = Bson(1); 630 cmd["mechanism"] = Bson("SCRAM-SHA-1"); 631 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 632 string response; 633 Bson conversationId; 634 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), 635 (cursor, flags, first_doc, num_docs) { 636 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 637 throw new MongoDriverException("SASL start failed."); 638 }, 639 (idx, ref doc) { 640 if (doc["ok"].get!double != 1.0) 641 throw new MongoAuthException("Authentication failed."); 642 response = cast(string)doc["payload"].get!BsonBinData().rawData; 643 conversationId = doc["conversationId"]; 644 }); 645 payload = state.update(m_settings.digest, response); 646 cmd = Bson.emptyObject; 647 cmd["saslContinue"] = Bson(1); 648 cmd["conversationId"] = conversationId; 649 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 650 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), 651 (cursor, flags, first_doc, num_docs) { 652 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 653 throw new MongoDriverException("SASL continue failed."); 654 }, 655 (idx, ref doc) { 656 if (doc["ok"].get!double != 1.0) 657 throw new MongoAuthException("Authentication failed."); 658 response = cast(string)doc["payload"].get!BsonBinData().rawData; 659 }); 660 661 payload = state.finalize(response); 662 cmd = Bson.emptyObject; 663 cmd["saslContinue"] = Bson(1); 664 cmd["conversationId"] = conversationId; 665 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 666 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), 667 (cursor, flags, first_doc, num_docs) { 668 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 669 throw new MongoDriverException("SASL finish failed."); 670 }, 671 (idx, ref doc) { 672 if (doc["ok"].get!double != 1.0) 673 throw new MongoAuthException("Authentication failed."); 674 }); 675 } 676 } 677 678 private enum OpCode : int { 679 Reply = 1, // sent only by DB 680 Msg = 1000, 681 Update = 2001, 682 Insert = 2002, 683 Reserved1 = 2003, 684 Query = 2004, 685 GetMore = 2005, 686 Delete = 2006, 687 KillCursors = 2007 688 } 689 690 alias ReplyDelegate = void delegate(long cursor, ReplyFlags flags, int first_doc, int num_docs) @safe; 691 template DocDelegate(T) { alias DocDelegate = void delegate(size_t idx, ref T doc) @safe; } 692 693 struct MongoDBInfo 694 { 695 string name; 696 double sizeOnDisk; 697 bool empty; 698 } 699 700 private int sendLength(ARGS...)(ARGS args) 701 { 702 import std.traits; 703 static if (ARGS.length == 1) { 704 alias T = ARGS[0]; 705 static if (is(T == string)) return cast(int)args[0].length + 1; 706 else static if (is(T == int)) return 4; 707 else static if (is(T == long)) return 8; 708 else static if (is(T == Bson)) return cast(int)args[0].data.length; 709 else static if (isArray!T) { 710 int ret = 0; 711 foreach (el; args[0]) ret += sendLength(el); 712 return ret; 713 } else static assert(false, "Unexpected type: "~T.stringof); 714 } 715 else if (ARGS.length == 0) return 0; 716 else return sendLength(args[0 .. $/2]) + sendLength(args[$/2 .. $]); 717 } 718 719 struct ServerDescription 720 { 721 enum ServerType 722 { 723 unknown, 724 standalone, 725 mongos, 726 possiblePrimary, 727 RSPrimary, 728 RSSecondary, 729 RSArbiter, 730 RSOther, 731 RSGhost 732 } 733 734 @optional: 735 string address; 736 string error; 737 float roundTripTime = 0; 738 Nullable!BsonDate lastWriteDate; 739 Nullable!BsonObjectID opTime; 740 ServerType type = ServerType.unknown; 741 WireVersion minWireVersion, maxWireVersion; 742 string me; 743 string[] hosts, passives, arbiters; 744 string[string] tags; 745 string setName; 746 Nullable!int setVersion; 747 Nullable!BsonObjectID electionId; 748 string primary; 749 string lastUpdateTime = "infinity ago"; 750 Nullable!int logicalSessionTimeoutMinutes; 751 752 bool satisfiesVersion(WireVersion wireVersion) @safe const @nogc pure nothrow 753 { 754 return maxWireVersion >= wireVersion; 755 } 756 } 757 758 enum WireVersion : int 759 { 760 old, 761 v26, 762 v26_2, 763 v30, 764 v32, 765 v34, 766 v36, 767 v40, 768 v42 769 } 770 771 private string getHostArchitecture() 772 { 773 import os = std.system; 774 775 version (X86_64) 776 string arch = "x86_64 "; 777 else version (X86) 778 string arch = "x86 "; 779 else version (AArch64) 780 string arch = "aarch64 "; 781 else version (ARM_HardFloat) 782 string arch = "armhf "; 783 else version (ARM) 784 string arch = "arm "; 785 else version (PPC64) 786 string arch = "ppc64 "; 787 else version (PPC) 788 string arch = "ppc "; 789 else 790 string arch = "unknown "; 791 792 return arch ~ os.endian.to!string; 793 } 794 795 private static immutable hostArchitecture = getHostArchitecture;