1 /** 2 Low level mongodb protocol. 3 4 Copyright: © 2012-2016 RejectedSoftware e.K. 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.connection; 9 10 public import vibe.data.bson; 11 12 import vibe.core.log; 13 import vibe.core.net; 14 import vibe.db.mongo.settings; 15 import vibe.db.mongo.flags; 16 import vibe.inet.webform; 17 import vibe.stream.tls; 18 19 import std.algorithm : map, splitter; 20 import std.array; 21 import std.range; 22 import std.conv; 23 import std.exception; 24 import std.string; 25 import std.digest.md; 26 27 28 private struct _MongoErrorDescription 29 { 30 string message; 31 int code; 32 int connectionId; 33 int n; 34 double ok; 35 } 36 37 /** 38 * D POD representation of Mongo error object. 39 * 40 * For successful queries "code" is negative. 41 * Can be used also to check how many documents where updated upon 42 * a successful query via "n" field. 43 */ 44 alias MongoErrorDescription = immutable(_MongoErrorDescription); 45 46 /** 47 * Root class for vibe.d Mongo driver exception hierarchy. 48 */ 49 class MongoException : Exception 50 { 51 @safe: 52 53 this(string message, string file = __FILE__, int line = __LINE__, Throwable next = null) 54 { 55 super(message, file, line, next); 56 } 57 } 58 59 /** 60 * Generic class for all exception related to unhandled driver problems. 61 * 62 * I.e.: protocol mismatch or unexpected mongo service behavior. 63 */ 64 class MongoDriverException : MongoException 65 { 66 @safe: 67 68 this(string message, string file = __FILE__, int line = __LINE__, Throwable next = null) 69 { 70 super(message, file, line, next); 71 } 72 } 73 74 /** 75 * Wrapper class for all inner mongo collection manipulation errors. 76 * 77 * It does not indicate problem with vibe.d driver itself. Most frequently this 78 * one is thrown when MongoConnection is in checked mode and getLastError() has something interesting. 79 */ 80 class MongoDBException : MongoException 81 { 82 @safe: 83 84 MongoErrorDescription description; 85 alias description this; 86 87 this(MongoErrorDescription description, string file = __FILE__, 88 int line = __LINE__, Throwable next = null) 89 { 90 super(description.message, file, line, next); 91 this.description = description; 92 } 93 } 94 95 /** 96 * Generic class for all exceptions related to authentication problems. 97 * 98 * I.e.: unsupported mechanisms or wrong credentials. 99 */ 100 class MongoAuthException : MongoException 101 { 102 @safe: 103 104 this(string message, string file = __FILE__, int line = __LINE__, Throwable next = null) 105 { 106 super(message, file, line, next); 107 } 108 } 109 110 /** 111 [internal] Provides low-level mongodb protocol access. 112 113 It is not intended for direct usage. Please use vibe.db.mongo.db and vibe.db.mongo.collection modules for your code. 114 Note that a MongoConnection may only be used from one fiber/thread at a time. 115 */ 116 final class MongoConnection { 117 @safe: 118 119 import vibe.stream.wrapper : StreamOutputRange, streamOutputRange; 120 import vibe.internal.interfaceproxy; 121 import vibe.core.stream : InputStream, Stream; 122 123 private { 124 MongoClientSettings m_settings; 125 TCPConnection m_conn; 126 InterfaceProxy!Stream m_stream; 127 ulong m_bytesRead; 128 int m_msgid = 1; 129 StreamOutputRange!(InterfaceProxy!Stream) m_outRange; 130 } 131 132 enum ushort defaultPort = MongoClientSettings.defaultPort; 133 134 /// Simplified constructor overload, with no m_settings 135 this(string server, ushort port = defaultPort) 136 { 137 m_settings = new MongoClientSettings(); 138 m_settings.hosts ~= MongoHost(server, port); 139 } 140 141 this(MongoClientSettings cfg) 142 { 143 m_settings = cfg; 144 145 // Now let's check for features that are not yet supported. 146 if(m_settings.hosts.length > 1) 147 logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s", 148 m_settings.hosts[0].name, m_settings.hosts[0].port); 149 } 150 151 void connect() 152 { 153 /* 154 * TODO: Connect to one of the specified hosts taking into consideration 155 * options such as connect timeouts and so on. 156 */ 157 try { 158 m_conn = connectTCP(m_settings.hosts[0].name, m_settings.hosts[0].port); 159 m_conn.tcpNoDelay = true; 160 if (m_settings.ssl) { 161 auto ctx = createTLSContext(TLSContextKind.client); 162 if (!m_settings.sslverifycertificate) { 163 ctx.peerValidationMode = TLSPeerValidationMode.none; 164 } 165 if (m_settings.sslPEMKeyFile) { 166 ctx.useCertificateChainFile(m_settings.sslPEMKeyFile); 167 ctx.usePrivateKeyFile(m_settings.sslPEMKeyFile); 168 } 169 if (m_settings.sslCAFile) { 170 ctx.useTrustedCertificateFile(m_settings.sslCAFile); 171 } 172 173 m_stream = createTLSStream(m_conn, ctx, m_settings.hosts[0].name); 174 } 175 else { 176 m_stream = m_conn; 177 } 178 m_outRange = streamOutputRange(m_stream); 179 } 180 catch (Exception e) { 181 throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e); 182 } 183 184 m_bytesRead = 0; 185 if(m_settings.digest != string.init) 186 { 187 if (m_settings.authMechanism == MongoAuthMechanism.none) 188 authenticate(); 189 else { 190 /** 191 SCRAM-SHA-1 was released in March 2015 and on a properly 192 configured Mongo instance Authentication.none is disabled. 193 However, as a fallback to avoid breakage with old setups, 194 no authentication is tried in case of an error. 195 */ 196 try 197 scramAuthenticate(); 198 catch (MongoAuthException e) 199 authenticate(); 200 } 201 202 } 203 else if (m_settings.sslPEMKeyFile != null && m_settings.username != null) 204 { 205 certAuthenticate(); 206 } 207 } 208 209 void disconnect() 210 { 211 if (m_conn) { 212 if (m_stream && m_conn.connected) { 213 m_outRange.flush(); 214 215 m_stream.finalize(); 216 m_stream = InterfaceProxy!Stream.init; 217 } 218 219 m_conn.close(); 220 m_conn = TCPConnection.init; 221 } 222 223 m_outRange.drop(); 224 } 225 226 @property bool connected() const { return m_conn && m_conn.connected; } 227 228 229 void update(string collection_name, UpdateFlags flags, Bson selector, Bson update) 230 { 231 scope(failure) disconnect(); 232 send(OpCode.Update, -1, cast(int)0, collection_name, cast(int)flags, selector, update); 233 if (m_settings.safe) checkForError(collection_name); 234 } 235 236 void insert(string collection_name, InsertFlags flags, Bson[] documents) 237 { 238 scope(failure) disconnect(); 239 foreach (d; documents) if (d["_id"].isNull()) d["_id"] = Bson(BsonObjectID.generate()); 240 send(OpCode.Insert, -1, cast(int)flags, collection_name, documents); 241 if (m_settings.safe) checkForError(collection_name); 242 } 243 244 void query(T)(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 245 { 246 scope(failure) disconnect(); 247 flags |= m_settings.defQueryFlags; 248 int id; 249 if (returnFieldSelector.isNull) 250 id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query); 251 else 252 id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query, returnFieldSelector); 253 recvReply!T(id, on_msg, on_doc); 254 } 255 256 void getMore(T)(string collection_name, int nret, long cursor_id, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 257 { 258 scope(failure) disconnect(); 259 auto id = send(OpCode.GetMore, -1, cast(int)0, collection_name, nret, cursor_id); 260 recvReply!T(id, on_msg, on_doc); 261 } 262 263 void delete_(string collection_name, DeleteFlags flags, Bson selector) 264 { 265 scope(failure) disconnect(); 266 send(OpCode.Delete, -1, cast(int)0, collection_name, cast(int)flags, selector); 267 if (m_settings.safe) checkForError(collection_name); 268 } 269 270 void killCursors(long[] cursors) 271 { 272 scope(failure) disconnect(); 273 send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors); 274 } 275 276 MongoErrorDescription getLastError(string db) 277 { 278 // Though higher level abstraction level by concept, this function 279 // is implemented here to allow to check errors upon every request 280 // on conncetion level. 281 282 Bson command_and_options = Bson.emptyObject; 283 command_and_options["getLastError"] = Bson(1.0); 284 285 if(m_settings.w != m_settings.w.init) 286 command_and_options["w"] = m_settings.w; // Already a Bson struct 287 if(m_settings.wTimeoutMS != m_settings.wTimeoutMS.init) 288 command_and_options["wtimeout"] = Bson(m_settings.wTimeoutMS); 289 if(m_settings.journal) 290 command_and_options["j"] = Bson(true); 291 if(m_settings.fsync) 292 command_and_options["fsync"] = Bson(true); 293 294 _MongoErrorDescription ret; 295 296 query!Bson(db ~ ".$cmd", QueryFlags.NoCursorTimeout | m_settings.defQueryFlags, 297 0, -1, command_and_options, Bson(null), 298 (cursor, flags, first_doc, num_docs) { 299 logTrace("getLastEror(%s) flags: %s, cursor: %s, documents: %s", db, flags, cursor, num_docs); 300 enforce(!(flags & ReplyFlags.QueryFailure), 301 new MongoDriverException(format("MongoDB error: getLastError(%s) call failed.", db)) 302 ); 303 enforce( 304 num_docs == 1, 305 new MongoDriverException(format("getLastError(%s) returned %s documents instead of one.", db, num_docs)) 306 ); 307 }, 308 (idx, ref error) { 309 try { 310 ret = MongoErrorDescription( 311 error["err"].opt!string(""), 312 error["code"].opt!int(-1), 313 error["connectionId"].opt!int(-1), 314 error["n"].get!int(), 315 error["ok"].get!double() 316 ); 317 } catch (Exception e) { 318 throw new MongoDriverException(e.msg); 319 } 320 } 321 ); 322 323 return ret; 324 } 325 326 /** Queries the server for all databases. 327 328 Returns: 329 An input range of $(D MongoDBInfo) values. 330 */ 331 auto listDatabases() 332 { 333 string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd"; 334 335 auto cmd = Bson(["listDatabases":Bson(1)]); 336 337 void on_msg(long cursor, ReplyFlags flags, int first_doc, int num_docs) { 338 if ((flags & ReplyFlags.QueryFailure)) 339 throw new MongoDriverException("Calling listDatabases failed."); 340 } 341 342 static MongoDBInfo toInfo(const(Bson) db_doc) { 343 return MongoDBInfo( 344 db_doc["name"].get!string, 345 db_doc["sizeOnDisk"].get!double, 346 db_doc["empty"].get!bool 347 ); 348 } 349 350 Bson result; 351 void on_doc(size_t idx, ref Bson doc) { 352 if (doc["ok"].get!double != 1.0) 353 throw new MongoAuthException("listDatabases failed."); 354 355 result = doc["databases"]; 356 } 357 358 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), &on_msg, &on_doc); 359 360 return result.byValue.map!toInfo; 361 } 362 363 private int recvReply(T)(int reqid, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc) 364 { 365 import std.traits; 366 367 auto bytes_read = m_bytesRead; 368 int msglen = recvInt(); 369 int resid = recvInt(); 370 int respto = recvInt(); 371 int opcode = recvInt(); 372 373 enforce(respto == reqid, "Reply is not for the expected message on a sequential connection!"); 374 enforce(opcode == OpCode.Reply, "Got a non-'Reply' reply!"); 375 376 auto flags = cast(ReplyFlags)recvInt(); 377 long cursor = recvLong(); 378 int start = recvInt(); 379 int numret = recvInt(); 380 381 scope (exit) { 382 if (m_bytesRead - bytes_read < msglen) { 383 logWarn("MongoDB reply was longer than expected, skipping the rest: %d vs. %d", msglen, m_bytesRead - bytes_read); 384 ubyte[] dst = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)]; 385 recv(dst); 386 } else if (m_bytesRead - bytes_read > msglen) { 387 logWarn("MongoDB reply was shorter than expected. Dropping connection."); 388 disconnect(); 389 throw new MongoDriverException("MongoDB reply was too short for data."); 390 } 391 } 392 393 on_msg(cursor, flags, start, numret); 394 static if (hasIndirections!T || is(T == Bson)) 395 auto buf = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)]; 396 foreach (i; 0 .. cast(size_t)numret) { 397 // TODO: directly deserialize from the wire 398 static if (!hasIndirections!T && !is(T == Bson)) { 399 ubyte[256] buf = void; 400 ubyte[] bufsl = buf; 401 auto bson = () @trusted { return recvBson(bufsl); } (); 402 } else { 403 auto bson = () @trusted { return recvBson(buf); } (); 404 } 405 406 static if (is(T == Bson)) on_doc(i, bson); 407 else { 408 T doc = deserializeBson!T(bson); 409 on_doc(i, doc); 410 } 411 } 412 413 return resid; 414 } 415 416 private int send(ARGS...)(OpCode code, int response_to, ARGS args) 417 { 418 if( !connected() ) connect(); 419 int id = nextMessageId(); 420 sendValue(16 + sendLength(args)); 421 sendValue(id); 422 sendValue(response_to); 423 sendValue(cast(int)code); 424 foreach (a; args) sendValue(a); 425 m_outRange.flush(); 426 return id; 427 } 428 429 private void sendValue(T)(T value) 430 { 431 import std.traits; 432 static if (is(T == int)) sendBytes(toBsonData(value)); 433 else static if (is(T == long)) sendBytes(toBsonData(value)); 434 else static if (is(T == Bson)) sendBytes(value.data); 435 else static if (is(T == string)) { 436 sendBytes(cast(const(ubyte)[])value); 437 sendBytes(cast(const(ubyte)[])"\0"); 438 } else static if (isArray!T) { 439 foreach (v; value) 440 sendValue(v); 441 } else static assert(false, "Unexpected type: "~T.stringof); 442 } 443 444 private void sendBytes(in ubyte[] data){ m_outRange.put(data); } 445 446 private int recvInt() { ubyte[int.sizeof] ret; recv(ret); return fromBsonData!int(ret); } 447 private long recvLong() { ubyte[long.sizeof] ret; recv(ret); return fromBsonData!long(ret); } 448 private Bson recvBson(ref ubyte[] buf) 449 @system { 450 int len = recvInt(); 451 ubyte[] dst; 452 if (len > buf.length) dst = new ubyte[len]; 453 else { 454 dst = buf[0 .. len]; 455 buf = buf[len .. $]; 456 } 457 dst[0 .. 4] = toBsonData(len)[]; 458 recv(dst[4 .. $]); 459 return Bson(Bson.Type.Object, cast(immutable)dst); 460 } 461 private void recv(ubyte[] dst) { enforce(m_stream); m_stream.read(dst); m_bytesRead += dst.length; } 462 463 private int nextMessageId() { return m_msgid++; } 464 465 private void checkForError(string collection_name) 466 { 467 auto coll = collection_name.split(".")[0]; 468 auto err = getLastError(coll); 469 470 enforce( 471 err.code < 0, 472 new MongoDBException(err) 473 ); 474 } 475 476 private void certAuthenticate() 477 { 478 Bson cmd = Bson.emptyObject; 479 cmd["authenticate"] = Bson(1); 480 cmd["mechanism"] = Bson("MONGODB-X509"); 481 cmd["user"] = Bson(m_settings.username); 482 query!Bson("$external.$cmd", QueryFlags.None, 0, -1, cmd, Bson(null), 483 (cursor, flags, first_doc, num_docs) { 484 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 485 throw new MongoDriverException("Calling authenticate failed."); 486 }, 487 (idx, ref doc) { 488 if (doc["ok"].get!double != 1.0) 489 throw new MongoAuthException("Authentication failed."); 490 } 491 ); 492 } 493 494 private void authenticate() 495 { 496 string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd"; 497 498 string nonce, key; 499 500 auto cmd = Bson(["getnonce":Bson(1)]); 501 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), 502 (cursor, flags, first_doc, num_docs) { 503 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 504 throw new MongoDriverException("Calling getNonce failed."); 505 }, 506 (idx, ref doc) { 507 if (doc["ok"].get!double != 1.0) 508 throw new MongoDriverException("getNonce failed."); 509 nonce = doc["nonce"].get!string; 510 key = toLower(toHexString(md5Of(nonce ~ m_settings.username ~ m_settings.digest)).idup); 511 } 512 ); 513 514 cmd = Bson.emptyObject; 515 cmd["authenticate"] = Bson(1); 516 cmd["mechanism"] = Bson("MONGODB-CR"); 517 cmd["nonce"] = Bson(nonce); 518 cmd["user"] = Bson(m_settings.username); 519 cmd["key"] = Bson(key); 520 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), 521 (cursor, flags, first_doc, num_docs) { 522 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 523 throw new MongoDriverException("Calling authenticate failed."); 524 }, 525 (idx, ref doc) { 526 if (doc["ok"].get!double != 1.0) 527 throw new MongoAuthException("Authentication failed."); 528 } 529 ); 530 } 531 532 private void scramAuthenticate() 533 { 534 import vibe.db.mongo.sasl; 535 string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd"; 536 537 ScramState state; 538 string payload = state.createInitialRequest(m_settings.username); 539 540 auto cmd = Bson.emptyObject; 541 cmd["saslStart"] = Bson(1); 542 cmd["mechanism"] = Bson("SCRAM-SHA-1"); 543 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 544 string response; 545 Bson conversationId; 546 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), 547 (cursor, flags, first_doc, num_docs) { 548 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 549 throw new MongoDriverException("SASL start failed."); 550 }, 551 (idx, ref doc) { 552 if (doc["ok"].get!double != 1.0) 553 throw new MongoAuthException("Authentication failed."); 554 response = cast(string)doc["payload"].get!BsonBinData().rawData; 555 conversationId = doc["conversationId"]; 556 }); 557 payload = state.update(m_settings.digest, response); 558 cmd = Bson.emptyObject; 559 cmd["saslContinue"] = Bson(1); 560 cmd["conversationId"] = conversationId; 561 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 562 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), 563 (cursor, flags, first_doc, num_docs) { 564 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 565 throw new MongoDriverException("SASL continue failed."); 566 }, 567 (idx, ref doc) { 568 if (doc["ok"].get!double != 1.0) 569 throw new MongoAuthException("Authentication failed."); 570 response = cast(string)doc["payload"].get!BsonBinData().rawData; 571 }); 572 573 payload = state.finalize(response); 574 cmd = Bson.emptyObject; 575 cmd["saslContinue"] = Bson(1); 576 cmd["conversationId"] = conversationId; 577 cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation)); 578 query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), 579 (cursor, flags, first_doc, num_docs) { 580 if ((flags & ReplyFlags.QueryFailure) || num_docs != 1) 581 throw new MongoDriverException("SASL finish failed."); 582 }, 583 (idx, ref doc) { 584 if (doc["ok"].get!double != 1.0) 585 throw new MongoAuthException("Authentication failed."); 586 }); 587 } 588 } 589 590 private enum OpCode : int { 591 Reply = 1, // sent only by DB 592 Msg = 1000, 593 Update = 2001, 594 Insert = 2002, 595 Reserved1 = 2003, 596 Query = 2004, 597 GetMore = 2005, 598 Delete = 2006, 599 KillCursors = 2007 600 } 601 602 alias ReplyDelegate = void delegate(long cursor, ReplyFlags flags, int first_doc, int num_docs) @safe; 603 template DocDelegate(T) { alias DocDelegate = void delegate(size_t idx, ref T doc) @safe; } 604 605 struct MongoDBInfo 606 { 607 string name; 608 double sizeOnDisk; 609 bool empty; 610 } 611 612 private int sendLength(ARGS...)(ARGS args) 613 { 614 import std.traits; 615 static if (ARGS.length == 1) { 616 alias T = ARGS[0]; 617 static if (is(T == string)) return cast(int)args[0].length + 1; 618 else static if (is(T == int)) return 4; 619 else static if (is(T == long)) return 8; 620 else static if (is(T == Bson)) return cast(int)args[0].data.length; 621 else static if (isArray!T) { 622 int ret = 0; 623 foreach (el; args[0]) ret += sendLength(el); 624 return ret; 625 } else static assert(false, "Unexpected type: "~T.stringof); 626 } 627 else if (ARGS.length == 0) return 0; 628 else return sendLength(args[0 .. $/2]) + sendLength(args[$/2 .. $]); 629 }