1 /**
2 	Low level mongodb protocol.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.connection;
9 
10 // /// prints ALL modern OP_MSG queries and legacy runCommand invocations to logDiagnostic
11 // debug = VibeVerboseMongo;
12 
13 public import vibe.data.bson;
14 
15 import vibe.core.core : vibeVersionString;
16 import vibe.core.log;
17 import vibe.core.net;
18 import vibe.data.bson;
19 import vibe.db.mongo.flags;
20 import vibe.db.mongo.settings;
21 import vibe.inet.webform;
22 import vibe.stream.tls;
23 
24 import std.algorithm : findSplit, map, splitter;
25 import std.array;
26 import std.conv;
27 import std.digest.md;
28 import std.exception;
29 import std.range;
30 import std.string;
31 import std.traits : hasIndirections;
32 import std.typecons;
33 
34 import core.time;
35 
36 private struct _MongoErrorDescription
37 {
38 	string message;
39 	int code;
40 	int connectionId;
41 	int n;
42 	double ok;
43 }
44 
45 /**
46  * D POD representation of Mongo error object.
47  *
48  * For successful queries "code" is negative.
49  * Can be used also to check how many documents where updated upon
50  * a successful query via "n" field.
51  */
52 alias MongoErrorDescription = immutable(_MongoErrorDescription);
53 
54 /**
55  * Root class for vibe.d Mongo driver exception hierarchy.
56  */
57 class MongoException : Exception
58 {
59 @safe:
60 
61 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
62 	{
63 		super(message, file, line, next);
64 	}
65 }
66 
67 /**
68  * Generic class for all exception related to unhandled driver problems.
69  *
70  * I.e.: protocol mismatch or unexpected mongo service behavior.
71  */
72 class MongoDriverException : MongoException
73 {
74 @safe:
75 
76 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
77 	{
78 		super(message, file, line, next);
79 	}
80 }
81 
82 /**
83  * Wrapper class for all inner mongo collection manipulation errors.
84  *
85  * It does not indicate problem with vibe.d driver itself. Most frequently this
86  * one is thrown when MongoConnection is in checked mode and getLastError() has something interesting.
87  */
88 deprecated("Check for MongoException instead - the modern write commands now throw MongoBulkWriteException on error")
89 class MongoDBException : MongoException
90 {
91 @safe:
92 
93 	MongoErrorDescription description;
94 
95 	this(MongoErrorDescription description, string file = __FILE__,
96 			size_t line = __LINE__, Throwable next = null)
97 	{
98 		super(description.message, file, line, next);
99 		this.description = description;
100 	}
101 
102 	// NOTE: .message is a @future member of Throwable
103 	deprecated("Use .msg instead.") alias message = msg;
104 	@property int code() const nothrow { return description.code; };
105 	@property int connectionId() const nothrow { return description.connectionId; };
106 	@property int n() const nothrow { return description.n; };
107 	@property double ok() const nothrow { return description.ok; };
108 }
109 
110 /**
111  * Generic class for all exceptions related to authentication problems.
112  *
113  * I.e.: unsupported mechanisms or wrong credentials.
114  */
115 class MongoAuthException : MongoException
116 {
117 @safe:
118 
119 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
120 	{
121 		super(message, file, line, next);
122 	}
123 }
124 
125 /**
126   [internal] Provides low-level mongodb protocol access.
127 
128   It is not intended for direct usage. Please use vibe.db.mongo.db and vibe.db.mongo.collection modules for your code.
129   Note that a MongoConnection may only be used from one fiber/thread at a time.
130  */
131 final class MongoConnection {
132 @safe:
133 
134 	import vibe.stream.wrapper /* : StreamOutputRange, streamOutputRange */;
135 	import vibe.internal.interfaceproxy;
136 	import vibe.core.stream : InputStream, Stream;
137 
138 	private {
139 		MongoClientSettings m_settings;
140 		TCPConnection m_conn;
141 		InterfaceProxy!Stream m_stream;
142 		ulong m_bytesRead;
143 		int m_msgid = 1;
144 		StreamOutputRange!(InterfaceProxy!Stream) m_outRange;
145 		ServerDescription m_description;
146 		/// Flag to prevent recursive connections when server closes connection while connecting
147 		bool m_allowReconnect;
148 		bool m_isAuthenticating;
149 		bool m_supportsOpMsg;
150 	}
151 
152 	enum ushort defaultPort = MongoClientSettings.defaultPort;
153 
154 	/// Simplified constructor overload, with no m_settings
155 	this(string server, ushort port = defaultPort)
156 	{
157 		m_settings = new MongoClientSettings();
158 		m_settings.hosts ~= MongoHost(server, port);
159 	}
160 
161 	this(MongoClientSettings cfg)
162 	{
163 		m_settings = cfg;
164 
165 		// Now let's check for features that are not yet supported.
166 		if(m_settings.hosts.length > 1)
167 			logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s",
168 					m_settings.hosts[0].name, m_settings.hosts[0].port);
169 	}
170 
171 	void connect()
172 	{
173 		bool isTLS;
174 
175 		/*
176 		 * TODO: Connect to one of the specified hosts taking into consideration
177 		 * options such as connect timeouts and so on.
178 		 */
179 		try {
180 			import core.time : Duration, msecs;
181 
182 			auto connectTimeout = m_settings.connectTimeoutMS.msecs;
183 			if (m_settings.connectTimeoutMS == 0)
184 				connectTimeout = Duration.max;
185 
186 			m_conn = connectTCP(m_settings.hosts[0].name, m_settings.hosts[0].port, null, 0, connectTimeout);
187 			m_conn.tcpNoDelay = true;
188 			if (m_settings.socketTimeout != Duration.zero)
189 				m_conn.readTimeout = m_settings.socketTimeout;
190 			if (m_settings.ssl) {
191 				auto ctx =  createTLSContext(TLSContextKind.client);
192 				if (!m_settings.sslverifycertificate) {
193 					ctx.peerValidationMode = TLSPeerValidationMode.none;
194 				}
195 				if (m_settings.sslPEMKeyFile) {
196 					ctx.useCertificateChainFile(m_settings.sslPEMKeyFile);
197 					ctx.usePrivateKeyFile(m_settings.sslPEMKeyFile);
198 				}
199 				if (m_settings.sslCAFile) {
200 					ctx.useTrustedCertificateFile(m_settings.sslCAFile);
201 				}
202 
203 				m_stream = createTLSStream(m_conn, ctx, m_settings.hosts[0].name);
204 				isTLS = true;
205 			}
206 			else {
207 				m_stream = m_conn;
208 			}
209 			m_outRange = streamOutputRange(m_stream);
210 		}
211 		catch (Exception e) {
212 			throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e);
213 		}
214 
215 		scope (failure) disconnect();
216 
217 		m_allowReconnect = false;
218 		scope (exit)
219 			m_allowReconnect = true;
220 
221 		Bson handshake = Bson.emptyObject;
222 		static assert(!is(typeof(m_settings.loadBalanced)), "loadBalanced was added to the API, set legacy if it's true here!");
223 		// TODO: must use legacy handshake if m_settings.loadBalanced is true
224 		// and also once we allow configuring a server API version in the driver
225 		// (https://github.com/mongodb/specifications/blob/master/source/versioned-api/versioned-api.rst)
226 		m_supportsOpMsg = false;
227 		bool legacyHandshake = false;
228 		if (legacyHandshake)
229 		{
230 			handshake["isMaster"] = Bson(1);
231 			handshake["helloOk"] = Bson(1);
232 		}
233 		else
234 		{
235 			handshake["hello"] = Bson(1);
236 			m_supportsOpMsg = true;
237 		}
238 
239 		import os = std.system;
240 		import compiler = std.compiler;
241 		string platform = compiler.name ~ " "
242 			~ compiler.version_major.to!string ~ "." ~ compiler.version_minor.to!string;
243 		// TODO: add support for os.version
244 
245 		handshake["client"] = Bson([
246 			"driver": Bson(["name": Bson("vibe.db.mongo"), "version": Bson(vibeVersionString)]),
247 			"os": Bson(["type": Bson(os.os.to!string), "architecture": Bson(hostArchitecture)]),
248 			"platform": Bson(platform)
249 		]);
250 
251 		if (m_settings.appName.length) {
252 			enforce!MongoAuthException(m_settings.appName.length <= 128,
253 				"The application name may not be larger than 128 bytes");
254 			handshake["client"]["application"] = Bson(["name": Bson(m_settings.appName)]);
255 		}
256 
257 		auto reply = runCommand!(Bson, MongoAuthException)("admin", handshake);
258 		m_description = deserializeBson!ServerDescription(reply);
259 
260 		if (m_description.satisfiesVersion(WireVersion.v36))
261 			m_supportsOpMsg = true;
262 
263 		m_bytesRead = 0;
264 		auto authMechanism = m_settings.authMechanism;
265 		if (authMechanism == MongoAuthMechanism.none)
266 		{
267 			if (m_settings.sslPEMKeyFile != null && m_description.satisfiesVersion(WireVersion.v26))
268 			{
269 				authMechanism = MongoAuthMechanism.mongoDBX509;
270 			}
271 			else if (m_settings.digest.length)
272 			{
273 				// SCRAM-SHA-1 default since 3.0, otherwise use legacy authentication
274 				if (m_description.satisfiesVersion(WireVersion.v30))
275 					authMechanism = MongoAuthMechanism.scramSHA1;
276 				else
277 					authMechanism = MongoAuthMechanism.mongoDBCR;
278 			}
279 		}
280 
281 		if (authMechanism == MongoAuthMechanism.mongoDBCR && m_description.satisfiesVersion(WireVersion.v40))
282 			throw new MongoAuthException("Trying to force MONGODB-CR authentication on a >=4.0 server not supported");
283 
284 		if (authMechanism == MongoAuthMechanism.scramSHA1 && !m_description.satisfiesVersion(WireVersion.v30))
285 			throw new MongoAuthException("Trying to force SCRAM-SHA-1 authentication on a <3.0 server not supported");
286 
287 		if (authMechanism == MongoAuthMechanism.mongoDBX509 && !m_description.satisfiesVersion(WireVersion.v26))
288 			throw new MongoAuthException("Trying to force MONGODB-X509 authentication on a <2.6 server not supported");
289 
290 		if (authMechanism == MongoAuthMechanism.mongoDBX509 && !isTLS)
291 			throw new MongoAuthException("Trying to force MONGODB-X509 authentication, but didn't use ssl!");
292 
293 		m_isAuthenticating = true;
294 		scope (exit)
295 			m_isAuthenticating = false;
296 		final switch (authMechanism)
297 		{
298 		case MongoAuthMechanism.none:
299 			break;
300 		case MongoAuthMechanism.mongoDBX509:
301 			certAuthenticate();
302 			break;
303 		case MongoAuthMechanism.scramSHA1:
304 			scramAuthenticate();
305 			break;
306 		case MongoAuthMechanism.mongoDBCR:
307 			authenticate();
308 			break;
309 		}
310 	}
311 
312 	void disconnect()
313 	{
314 		if (m_conn) {
315 			if (m_stream && m_conn.connected) {
316 				m_outRange.flush();
317 
318 				m_stream.finalize();
319 				m_stream = InterfaceProxy!Stream.init;
320 			}
321 
322 			m_conn.close();
323 			m_conn = TCPConnection.init;
324 		}
325 
326 		m_outRange.drop();
327 	}
328 
329 	@property bool connected() const { return m_conn && m_conn.connected; }
330 
331 	@property const(ServerDescription) description() const { return m_description; }
332 
333 	deprecated("Non-functional since MongoDB 5.1") void update(string collection_name, UpdateFlags flags, Bson selector, Bson update)
334 	{
335 		scope(failure) disconnect();
336 		send(OpCode.Update, -1, cast(int)0, collection_name, cast(int)flags, selector, update);
337 		if (m_settings.safe) checkForError(collection_name);
338 	}
339 
340 	deprecated("Non-functional since MongoDB 5.1") void insert(string collection_name, InsertFlags flags, Bson[] documents)
341 	{
342 		scope(failure) disconnect();
343 		foreach (d; documents) if (d["_id"].isNull()) d["_id"] = Bson(BsonObjectID.generate());
344 		send(OpCode.Insert, -1, cast(int)flags, collection_name, documents);
345 		if (m_settings.safe) checkForError(collection_name);
346 	}
347 
348 	deprecated("Non-functional since MongoDB 5.1: use `find` to query collections instead - instead of `$cmd` use `runCommand` to send commands - use listIndexes and listCollections instead of `<database>.system.indexes` and `<database>.system.namsepsaces`")
349 	void query(T)(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
350 	{
351 		scope(failure) disconnect();
352 		flags |= m_settings.defQueryFlags;
353 		int id;
354 		if (returnFieldSelector.isNull)
355 			id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query);
356 		else
357 			id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query, returnFieldSelector);
358 		recvReply!T(id, on_msg, on_doc);
359 	}
360 
361 	/**
362 		Runs the given Bson command (Bson object with the first entry in the map
363 		being the command name) on the given database.
364 
365 		Using `runCommand` checks that the command completed successfully by
366 		checking that `result["ok"].get!double == 1.0`. Throws the
367 		`CommandFailException` on failure.
368 
369 		Using `runCommandUnchecked` will return the result as-is. Developers may
370 		check the `result["ok"]` value themselves. (It's a double that needs to
371 		be compared with 1.0 by default)
372 
373 		Throws:
374 			- `CommandFailException` (template argument) only in the
375 				`runCommand` overload, when the command response is not ok.
376 			- `MongoDriverException` when internal protocol errors occur.
377 	*/
378 	Bson runCommand(T, CommandFailException = MongoDriverException)(
379 		string database,
380 		Bson command,
381 		string errorInfo = __FUNCTION__,
382 		string errorFile = __FILE__,
383 		size_t errorLine = __LINE__
384 	)
385 	in(database.length, "runCommand requires a database argument")
386 	{
387 		return runCommandImpl!(T, CommandFailException)(
388 			database, command, true, errorInfo, errorFile, errorLine);
389 	}
390 
391 	Bson runCommandUnchecked(T, CommandFailException = MongoDriverException)(
392 		string database,
393 		Bson command,
394 		string errorInfo = __FUNCTION__,
395 		string errorFile = __FILE__,
396 		size_t errorLine = __LINE__
397 	)
398 	in(database.length, "runCommand requires a database argument")
399 	{
400 		return runCommandImpl!(T, CommandFailException)(
401 			database, command, false, errorInfo, errorFile, errorLine);
402 	}
403 
404 	private Bson runCommandImpl(T, CommandFailException)(
405 		string database,
406 		Bson command,
407 		bool testOk = true,
408 		string errorInfo = __FUNCTION__,
409 		string errorFile = __FILE__,
410 		size_t errorLine = __LINE__
411 	)
412 	in(database.length, "runCommand requires a database argument")
413 	{
414 		import std.array;
415 
416 		string formatErrorInfo(string msg) @safe
417 		{
418 			return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")");
419 		}
420 
421 		Bson ret;
422 
423 		if (m_supportsOpMsg)
424 		{
425 			debug (VibeVerboseMongo)
426 				logDiagnostic("runCommand: [db=%s] %s", database, command);
427 
428 			command["$db"] = Bson(database);
429 
430 			auto id = sendMsg(-1, 0, command);
431 			Appender!(Bson[])[string] docs;
432 			recvMsg!true(id, (flags, root) @safe {
433 				ret = root;
434 			}, (scope ident, size) @safe {
435 				docs[ident.idup] = appender!(Bson[]);
436 			}, (scope ident, push) @safe {
437 				auto pd = ident in docs;
438 				assert(!!pd, "Received data for unexpected identifier");
439 				pd.put(push);
440 			});
441 
442 			foreach (ident, app; docs)
443 				ret[ident] = Bson(app.data);
444 		}
445 		else
446 		{
447 			debug (VibeVerboseMongo)
448 				logDiagnostic("runCommand(legacy): [db=%s] %s", database, command);
449 			auto id = send(OpCode.Query, -1, 0, database ~ ".$cmd", 0, -1, command, Bson(null));
450 			recvReply!T(id,
451 				(cursor, flags, first_doc, num_docs) {
452 					logTrace("runCommand(%s) flags: %s, cursor: %s, documents: %s", database, flags, cursor, num_docs);
453 					enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), formatErrorInfo("command query failed"));
454 					enforce!MongoDriverException(num_docs == 1, formatErrorInfo("received more than one document in command response"));
455 				},
456 				(idx, ref doc) {
457 					ret = doc;
458 				});
459 		}
460 
461 		if (testOk && ret["ok"].get!double != 1.0)
462 			throw new CommandFailException(formatErrorInfo("command failed: "
463 				~ ret["errmsg"].opt!string("(no message)")));
464 
465 		static if (is(T == Bson)) return ret;
466 		else {
467 			T doc = deserializeBson!T(bson);
468 			return doc;
469 		}
470 	}
471 
472 	template getMore(T)
473 	{
474 		deprecated("use the modern overload instead")
475 		void getMore(string collection_name, int nret, long cursor_id, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
476 		{
477 			scope(failure) disconnect();
478 			auto parts = collection_name.findSplit(".");
479 			auto id = send(OpCode.GetMore, -1, cast(int)0, parts[0], parts[2], nret, cursor_id);
480 			recvReply!T(id, on_msg, on_doc);
481 		}
482 
483 		/**
484 		* Modern (MongoDB 3.2+ compatible) getMore implementation using the getMore
485 		* command and OP_MSG. (if supported)
486 		*
487 		* Falls back to compatibility for older MongoDB versions, but those are not
488 		* officially supported anymore.
489 		*
490 		* Upgrade_notes:
491 		* - error checking is now done inside this function
492 		* - document index is no longer sent, instead the callback is called sequentially
493 		*
494 		* Throws: $(LREF MongoDriverException) in case the command fails.
495 		*/
496 		void getMore(long cursor_id, string database, string collection_name, long nret,
497 			scope GetMoreHeaderDelegate on_header,
498 			scope GetMoreDocumentDelegate!T on_doc,
499 			Duration timeout = Duration.max,
500 			string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__)
501 		{
502 			Bson command = Bson.emptyObject;
503 			command["getMore"] = Bson(cursor_id);
504 			command["$db"] = Bson(database);
505 			command["collection"] = Bson(collection_name);
506 			if (nret > 0)
507 				command["batchSize"] = Bson(nret);
508 			if (timeout != Duration.max && timeout.total!"msecs" < int.max)
509 				command["maxTimeMS"] = Bson(cast(int)timeout.total!"msecs");
510 
511 			string formatErrorInfo(string msg) @safe
512 			{
513 				return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")");
514 			}
515 
516 			scope (failure) disconnect();
517 
518 			if (m_supportsOpMsg)
519 			{
520 				startFind!T(command, on_header, on_doc, "nextBatch", errorInfo ~ " (getMore)", errorFile, errorLine);
521 			}
522 			else
523 			{
524 				debug (VibeVerboseMongo)
525 					logDiagnostic("getMore(legacy): [db=%s] collection=%s, cursor=%s, nret=%s", database, collection_name, cursor_id, nret);
526 
527 				int brokenId = 0;
528 				int nextId = 0;
529 				int num_docs;
530 				// array to store out-of-order items, to push them into the callback properly
531 				T[] compatibilitySort;
532 				string full_name = database ~ '.' ~ collection_name;
533 				auto id = send(OpCode.GetMore, -1, cast(int)0, full_name, nret, cursor_id);
534 				recvReply!T(id, (long cursor, ReplyFlags flags, int first_doc, int num_docs)
535 				{
536 					enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound),
537 						formatErrorInfo("Invalid cursor handle."));
538 					enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure),
539 						formatErrorInfo("Query failed. Does the database exist?"));
540 
541 					on_header(cursor, full_name, num_docs);
542 				}, (size_t idx, ref T doc) {
543 					if (cast(int)idx == nextId) {
544 						on_doc(doc);
545 						nextId++;
546 						brokenId = nextId;
547 					} else {
548 						enforce!MongoDriverException(idx >= brokenId,
549 							formatErrorInfo("Got legacy document with same id after having already processed it!"));
550 						enforce!MongoDriverException(idx < num_docs,
551 							formatErrorInfo("Received more documents than the database reported to us"));
552 
553 						size_t arrayIndex = cast(int)idx - brokenId;
554 						if (!compatibilitySort.length)
555 							compatibilitySort.length = num_docs - brokenId;
556 						compatibilitySort[arrayIndex] = doc;
557 					}
558 				});
559 
560 				foreach (doc; compatibilitySort)
561 					on_doc(doc);
562 			}
563 		}
564 	}
565 
566 	/// Forwards the `find` command passed in to the database, handles the
567 	/// callbacks like with getMore. This exists for easier integration with
568 	/// MongoCursor!T.
569 	package void startFind(T)(Bson command,
570 		scope GetMoreHeaderDelegate on_header,
571 		scope GetMoreDocumentDelegate!T on_doc,
572 		string batchKey = "firstBatch",
573 		string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__)
574 	{
575 		string formatErrorInfo(string msg) @safe
576 		{
577 			return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")");
578 		}
579 
580 		scope (failure) disconnect();
581 
582 		enforce!MongoDriverException(m_supportsOpMsg, formatErrorInfo("Database does not support required OP_MSG for new style queries"));
583 
584 		enum needsDup = hasIndirections!T || is(T == Bson);
585 
586 		debug (VibeVerboseMongo)
587 			logDiagnostic("%s: %s", errorInfo, command);
588 
589 		auto id = sendMsg(-1, 0, command);
590 		recvMsg!needsDup(id, (flags, scope root) @safe {
591 			if (root["ok"].get!double != 1.0)
592 				throw new MongoDriverException(formatErrorInfo("error response: "
593 					~ root["errmsg"].opt!string("(no message)")));
594 
595 			auto cursor = root["cursor"];
596 			if (cursor.type == Bson.Type.null_)
597 				throw new MongoDriverException(formatErrorInfo("no cursor in response: "
598 					~ root["errmsg"].opt!string("(no error message)")));
599 			auto batch = cursor[batchKey].get!(Bson[]);
600 			on_header(cursor["id"].get!long, cursor["ns"].get!string, batch.length);
601 
602 			foreach (ref push; batch)
603 			{
604 				T doc = deserializeBson!T(push);
605 				on_doc(doc);
606 			}
607 		}, (scope ident, size) @safe {}, (scope ident, scope push) @safe {
608 			throw new MongoDriverException(formatErrorInfo("unexpected section type 1 in response"));
609 		});
610 	}
611 
612 	deprecated("Non-functional since MongoDB 5.1") void delete_(string collection_name, DeleteFlags flags, Bson selector)
613 	{
614 		scope(failure) disconnect();
615 		send(OpCode.Delete, -1, cast(int)0, collection_name, cast(int)flags, selector);
616 		if (m_settings.safe) checkForError(collection_name);
617 	}
618 
619 	deprecated("Non-functional since MongoDB 5.1, use the overload taking the collection as well")
620 	void killCursors(scope long[] cursors)
621 	{
622 		scope(failure) disconnect();
623 		send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors);
624 	}
625 
626 	void killCursors(string collection, scope long[] cursors)
627 	{
628 		scope(failure) disconnect();
629 		// TODO: could add special case to runCommand to not return anything
630 		if (m_supportsOpMsg)
631 		{
632 			Bson command = Bson.emptyObject;
633 			auto parts = collection.findSplit(".");
634 			if (!parts[2].length)
635 				throw new MongoDriverException(
636 					"Attempted to call killCursors with non-fully-qualified collection name: '"
637 					~ collection ~ "'");
638 			command["killCursors"] = Bson(parts[2]);
639 			command["cursors"] = () @trusted { return cursors; } ().serializeToBson; // NOTE: "escaping" scope here
640 			runCommand!Bson(parts[0], command);
641 		}
642 		else
643 		{
644 			send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors);
645 		}
646 	}
647 
648 	MongoErrorDescription getLastError(string db)
649 	{
650 		// Though higher level abstraction level by concept, this function
651 		// is implemented here to allow to check errors upon every request
652 		// on connection level.
653 
654 		Bson command_and_options = Bson.emptyObject;
655 		command_and_options["getLastError"] = Bson(1.0);
656 
657 		if(m_settings.w != m_settings.w.init)
658 			command_and_options["w"] = m_settings.w; // Already a Bson struct
659 		if(m_settings.wTimeoutMS != m_settings.wTimeoutMS.init)
660 			command_and_options["wtimeout"] = Bson(m_settings.wTimeoutMS);
661 		if(m_settings.journal)
662 			command_and_options["j"] = Bson(true);
663 		if(m_settings.fsync)
664 			command_and_options["fsync"] = Bson(true);
665 
666 		_MongoErrorDescription ret;
667 
668 		auto error = runCommandUnchecked!Bson(db, command_and_options);
669 
670 		try {
671 			ret = MongoErrorDescription(
672 				error["errmsg"].opt!string(error["err"].opt!string("")),
673 				error["code"].opt!int(-1),
674 				error["connectionId"].opt!int(-1),
675 				error["n"].opt!int(-1),
676 				error["ok"].get!double()
677 			);
678 		} catch (Exception e) {
679 			throw new MongoDriverException(e.msg);
680 		}
681 
682 		return ret;
683 	}
684 
685 	/** Queries the server for all databases.
686 
687 		Returns:
688 			An input range of $(D MongoDBInfo) values.
689 	*/
690 	auto listDatabases()
691 	{
692 		string cn = m_settings.database == string.init ? "admin" : m_settings.database;
693 
694 		auto cmd = Bson(["listDatabases":Bson(1)]);
695 
696 		static MongoDBInfo toInfo(const(Bson) db_doc) {
697 			return MongoDBInfo(
698 				db_doc["name"].get!string,
699 				// double on MongoDB < 5.0, long afterwards
700 				db_doc["sizeOnDisk"].to!double,
701 				db_doc["empty"].get!bool
702 			);
703 		}
704 
705 		auto result = runCommand!Bson(cn, cmd)["databases"];
706 
707 		return result.byValue.map!toInfo;
708 	}
709 
710 	private int recvMsg(bool dupBson = true)(int reqid,
711 		scope MsgReplyDelegate!dupBson on_sec0,
712 		scope MsgSection1StartDelegate on_sec1_start,
713 		scope MsgSection1Delegate!dupBson on_sec1_doc)
714 	{
715 		import std.traits;
716 
717 		auto bytes_read = m_bytesRead;
718 		int msglen = recvInt();
719 		int resid = recvInt();
720 		int respto = recvInt();
721 		int opcode = recvInt();
722 
723 		enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!");
724 		enforce!MongoDriverException(opcode == OpCode.Msg, "Got wrong reply type! (must be OP_MSG)");
725 
726 		uint flagBits = recvUInt();
727 		const bool hasCRC = (flagBits & (1 << 16)) != 0;
728 
729 		int sectionLength = cast(int)(msglen - 4 * int.sizeof - flagBits.sizeof);
730 		if (hasCRC)
731 			sectionLength -= uint.sizeof; // CRC present
732 
733 		bool gotSec0;
734 		while (m_bytesRead - bytes_read < sectionLength) {
735 			// TODO: directly deserialize from the wire
736 			static if (!dupBson) {
737 				ubyte[256] buf = void;
738 				ubyte[] bufsl = buf;
739 			}
740 
741 			ubyte payloadType = recvUByte();
742 			switch (payloadType) {
743 				case 0:
744 					gotSec0 = true;
745 					static if (dupBson)
746 						auto data = recvBsonDup();
747 					else
748 						scope data = (() @trusted => recvBson(bufsl))();
749 
750 					debug (VibeVerboseMongo)
751 						logDiagnostic("recvData: sec0[flags=%x]: %s", flagBits, data);
752 					on_sec0(flagBits, data);
753 					break;
754 				case 1:
755 					if (!gotSec0)
756 						throw new MongoDriverException("Got OP_MSG section 1 before section 0, which is not supported by vibe.d");
757 
758 					auto section_bytes_read = m_bytesRead;
759 					int size = recvInt();
760 					auto identifier = recvCString();
761 					on_sec1_start(identifier, size);
762 					while (m_bytesRead - section_bytes_read < size) {
763 						static if (dupBson)
764 							auto data = recvBsonDup();
765 						else
766 							scope data = (() @trusted => recvBson(bufsl))();
767 
768 						debug (VibeVerboseMongo)
769 							logDiagnostic("recvData: sec1[%s]: %s", identifier, data);
770 
771 						on_sec1_doc(identifier, data);
772 					}
773 					break;
774 				default:
775 					throw new MongoDriverException("Received unexpected payload section type " ~ payloadType.to!string);
776 			}
777 		}
778 
779 		if (hasCRC)
780 		{
781 			uint crc = recvUInt();
782 			// TODO: validate CRC
783 			logDiagnostic("recvData: crc=%s (discarded)", crc);
784 		}
785 
786 		assert(bytes_read + msglen == m_bytesRead,
787 			format!"Packet size mismatch! Expected %s bytes, but read %s."(
788 				msglen, m_bytesRead - bytes_read));
789 
790 		return resid;
791 	}
792 
793 	private int recvReply(T)(int reqid, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
794 	{
795 		auto bytes_read = m_bytesRead;
796 		int msglen = recvInt();
797 		int resid = recvInt();
798 		int respto = recvInt();
799 		int opcode = recvInt();
800 
801 		enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!");
802 		enforce!MongoDriverException(opcode == OpCode.Reply, "Got a non-'Reply' reply!");
803 
804 		auto flags = cast(ReplyFlags)recvInt();
805 		long cursor = recvLong();
806 		int start = recvInt();
807 		int numret = recvInt();
808 
809 		scope (exit) {
810 			if (m_bytesRead - bytes_read < msglen) {
811 				logWarn("MongoDB reply was longer than expected, skipping the rest: %d vs. %d", msglen, m_bytesRead - bytes_read);
812 				ubyte[] dst = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)];
813 				recv(dst);
814 			} else if (m_bytesRead - bytes_read > msglen) {
815 				logWarn("MongoDB reply was shorter than expected. Dropping connection.");
816 				disconnect();
817 				throw new MongoDriverException("MongoDB reply was too short for data.");
818 			}
819 		}
820 
821 		on_msg(cursor, flags, start, numret);
822 		static if (hasIndirections!T || is(T == Bson))
823 			auto buf = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)];
824 		foreach (i; 0 .. cast(size_t)numret) {
825 			// TODO: directly deserialize from the wire
826 			static if (!hasIndirections!T && !is(T == Bson)) {
827 				ubyte[256] buf = void;
828 				ubyte[] bufsl = buf;
829 				auto bson = () @trusted { return recvBson(bufsl); } ();
830 			} else {
831 				auto bson = () @trusted { return recvBson(buf); } ();
832 			}
833 
834 			// logDebugV("Received mongo response on %s:%s: %s", reqid, i, bson);
835 
836 			static if (is(T == Bson)) on_doc(i, bson);
837 			else {
838 				T doc = deserializeBson!T(bson);
839 				on_doc(i, doc);
840 			}
841 		}
842 
843 		return resid;
844 	}
845 
846 	private int send(ARGS...)(OpCode code, int response_to, scope ARGS args)
847 	{
848 		if( !connected() ) {
849 			if (m_allowReconnect) connect();
850 			else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating");
851 			else throw new MongoDriverException("Connection got closed while connecting");
852 		}
853 		int id = nextMessageId();
854 		// sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types
855 		sendValue!int(16 + sendLength(args));
856 		sendValue!int(id);
857 		sendValue!int(response_to);
858 		sendValue!int(cast(int)code);
859 		foreach (a; args) sendValue(a);
860 		m_outRange.flush();
861 		// logDebugV("Sent mongo opcode %s (id %s) in response to %s with args %s", code, id, response_to, tuple(args));
862 		return id;
863 	}
864 
865 	private int sendMsg(int response_to, uint flagBits, Bson document)
866 	{
867 		if( !connected() ) {
868 			if (m_allowReconnect) connect();
869 			else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating");
870 			else throw new MongoDriverException("Connection got closed while connecting");
871 		}
872 		int id = nextMessageId();
873 		// sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types
874 		sendValue!int(21 + sendLength(document));
875 		sendValue!int(id);
876 		sendValue!int(response_to);
877 		sendValue!int(cast(int)OpCode.Msg);
878 		sendValue!uint(flagBits);
879 		const bool hasCRC = (flagBits & (1 << 16)) != 0;
880 		assert(!hasCRC, "sending with CRC bits not yet implemented");
881 		sendValue!ubyte(0);
882 		sendValue(document);
883 		m_outRange.flush();
884 		return id;
885 	}
886 
887 	private void sendValue(T)(scope T value)
888 	{
889 		import std.traits;
890 		static if (is(T == ubyte)) m_outRange.put(value);
891 		else static if (is(T == int) || is(T == uint)) sendBytes(toBsonData(value));
892 		else static if (is(T == long)) sendBytes(toBsonData(value));
893 		else static if (is(T == Bson)) sendBytes(() @trusted { return value.data; } ());
894 		else static if (is(T == string)) {
895 			sendBytes(cast(const(ubyte)[])value);
896 			sendBytes(cast(const(ubyte)[])"\0");
897 		} else static if (isArray!T) {
898 			foreach (v; value)
899 				sendValue(v);
900 		} else static assert(false, "Unexpected type: "~T.stringof);
901 	}
902 
903 	private void sendBytes(scope const(ubyte)[] data){ m_outRange.put(data); }
904 
905 	private T recvInteger(T)() { ubyte[T.sizeof] ret; recv(ret); return fromBsonData!T(ret); }
906 	private alias recvUByte = recvInteger!ubyte;
907 	private alias recvInt = recvInteger!int;
908 	private alias recvUInt = recvInteger!uint;
909 	private alias recvLong = recvInteger!long;
910 	private Bson recvBson(ref ubyte[] buf)
911 	@system {
912 		int len = recvInt();
913 		ubyte[] dst;
914 		if (len > buf.length) dst = new ubyte[len];
915 		else {
916 			dst = buf[0 .. len];
917 			buf = buf[len .. $];
918 		}
919 		dst[0 .. 4] = toBsonData(len)[];
920 		recv(dst[4 .. $]);
921 		return Bson(Bson.Type.object, cast(immutable)dst);
922 	}
923 	private Bson recvBsonDup()
924 	@trusted {
925 		ubyte[4] size;
926 		recv(size[]);
927 		ubyte[] dst = new ubyte[fromBsonData!uint(size)];
928 		dst[0 .. 4] = size;
929 		recv(dst[4 .. $]);
930 		return Bson(Bson.Type.object, cast(immutable)dst);
931 	}
932 	private void recv(scope ubyte[] dst) { enforce(m_stream); m_stream.read(dst); m_bytesRead += dst.length; }
933 	private const(char)[] recvCString()
934 	{
935 		auto buf = new ubyte[32];
936 		ptrdiff_t i = -1;
937 		do
938 		{
939 			i++;
940 			if (i == buf.length) buf.length *= 2;
941 			recv(buf[i .. i + 1]);
942 		} while (buf[i] != 0);
943 		return cast(const(char)[])buf[0 .. i];
944 	}
945 
946 	private int nextMessageId() { return m_msgid++; }
947 
948 	deprecated private void checkForError(string collection_name)
949 	{
950 		auto coll = collection_name.split(".")[0];
951 		auto err = getLastError(coll);
952 
953 		enforce(
954 			err.code < 0,
955 			new MongoDBException(err)
956 		);
957 	}
958 
959 	private void certAuthenticate()
960 	{
961 		Bson cmd = Bson.emptyObject;
962 		cmd["authenticate"] = Bson(1);
963 		cmd["mechanism"] = Bson("MONGODB-X509");
964 		if (m_description.satisfiesVersion(WireVersion.v34))
965 		{
966 			if (m_settings.username.length)
967 				cmd["user"] = Bson(m_settings.username);
968 		}
969 		else
970 		{
971 			if (!m_settings.username.length)
972 				throw new MongoAuthException("No username provided but connected to MongoDB server <=3.2 not supporting this");
973 
974 			cmd["user"] = Bson(m_settings.username);
975 		}
976 		runCommand!(Bson, MongoAuthException)(m_settings.getAuthDatabase, cmd);
977 	}
978 
979 	private void authenticate()
980 	{
981 		scope (failure) disconnect();
982 
983 		string cn = m_settings.getAuthDatabase;
984 
985 		auto cmd = Bson(["getnonce": Bson(1)]);
986 		auto result = runCommand!(Bson, MongoAuthException)(cn, cmd);
987 		string nonce = result["nonce"].get!string;
988 		string key = toLower(toHexString(md5Of(nonce ~ m_settings.username ~ m_settings.digest)).idup);
989 
990 		cmd = Bson.emptyObject;
991 		cmd["authenticate"] = Bson(1);
992 		cmd["mechanism"] = Bson("MONGODB-CR");
993 		cmd["nonce"] = Bson(nonce);
994 		cmd["user"] = Bson(m_settings.username);
995 		cmd["key"] = Bson(key);
996 		runCommand!(Bson, MongoAuthException)(cn, cmd);
997 	}
998 
999 	private void scramAuthenticate()
1000 	{
1001 		import vibe.db.mongo.sasl;
1002 
1003 		string cn = m_settings.getAuthDatabase;
1004 
1005 		ScramState state;
1006 		string payload = state.createInitialRequest(m_settings.username);
1007 
1008 		auto cmd = Bson.emptyObject;
1009 		cmd["saslStart"] = Bson(1);
1010 		cmd["mechanism"] = Bson("SCRAM-SHA-1");
1011 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
1012 
1013 		auto doc = runCommand!(Bson, MongoAuthException)(cn, cmd);
1014 		string response = cast(string)doc["payload"].get!BsonBinData().rawData;
1015 		Bson conversationId = doc["conversationId"];
1016 
1017 		payload = state.update(m_settings.digest, response);
1018 		cmd = Bson.emptyObject;
1019 		cmd["saslContinue"] = Bson(1);
1020 		cmd["conversationId"] = conversationId;
1021 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
1022 
1023 		doc = runCommand!(Bson, MongoAuthException)(cn, cmd);
1024 		response = cast(string)doc["payload"].get!BsonBinData().rawData;
1025 
1026 		payload = state.finalize(response);
1027 		cmd = Bson.emptyObject;
1028 		cmd["saslContinue"] = Bson(1);
1029 		cmd["conversationId"] = conversationId;
1030 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
1031 		runCommand!(Bson, MongoAuthException)(cn, cmd);
1032 	}
1033 }
1034 
1035 private enum OpCode : int {
1036 	Reply        = 1, // sent only by DB
1037 	Update       = 2001,
1038 	Insert       = 2002,
1039 	Reserved1    = 2003,
1040 	Query        = 2004,
1041 	GetMore      = 2005,
1042 	Delete       = 2006,
1043 	KillCursors  = 2007,
1044 
1045 	Compressed   = 2012,
1046 	Msg          = 2013,
1047 }
1048 
1049 private alias ReplyDelegate = void delegate(long cursor, ReplyFlags flags, int first_doc, int num_docs) @safe;
1050 private template DocDelegate(T) { alias DocDelegate = void delegate(size_t idx, ref T doc) @safe; }
1051 
1052 private alias MsgReplyDelegate(bool dupBson : true) = void delegate(uint flags, Bson document) @safe;
1053 private alias MsgReplyDelegate(bool dupBson : false) = void delegate(uint flags, scope Bson document) @safe;
1054 private alias MsgSection1StartDelegate = void delegate(scope const(char)[] identifier, int size) @safe;
1055 private alias MsgSection1Delegate(bool dupBson : true) = void delegate(scope const(char)[] identifier, Bson document) @safe;
1056 private alias MsgSection1Delegate(bool dupBson : false) = void delegate(scope const(char)[] identifier, scope Bson document) @safe;
1057 
1058 alias GetMoreHeaderDelegate = void delegate(long id, string ns, size_t count) @safe;
1059 alias GetMoreDocumentDelegate(T) = void delegate(ref T document) @safe;
1060 
1061 struct MongoDBInfo
1062 {
1063 	string name;
1064 	double sizeOnDisk;
1065 	bool empty;
1066 }
1067 
1068 private int sendLength(ARGS...)(scope ARGS args)
1069 {
1070 	import std.traits;
1071 	static if (ARGS.length == 1) {
1072 		alias T = ARGS[0];
1073 		static if (is(T == string)) return cast(int)args[0].length + 1;
1074 		else static if (is(T == int)) return 4;
1075 		else static if (is(T == long)) return 8;
1076 		else static if (is(T == Bson)) return cast(int)() @trusted { return args[0].data.length; } ();
1077 		else static if (isArray!T) {
1078 			int ret = 0;
1079 			foreach (el; args[0]) ret += sendLength(el);
1080 			return ret;
1081 		} else static assert(false, "Unexpected type: "~T.stringof);
1082 	}
1083 	else if (ARGS.length == 0) return 0;
1084 	else return sendLength(args[0 .. $/2]) + sendLength(args[$/2 .. $]);
1085 }
1086 
1087 struct ServerDescription
1088 {
1089 	enum ServerType
1090 	{
1091 		unknown,
1092 		standalone,
1093 		mongos,
1094 		possiblePrimary,
1095 		RSPrimary,
1096 		RSSecondary,
1097 		RSArbiter,
1098 		RSOther,
1099 		RSGhost
1100 	}
1101 
1102 @optional:
1103 	string address;
1104 	string error;
1105 	float roundTripTime = 0;
1106 	Nullable!BsonDate lastWriteDate;
1107 	Nullable!BsonObjectID opTime;
1108 	ServerType type = ServerType.unknown;
1109 	int minWireVersion, maxWireVersion;
1110 	string me;
1111 	string[] hosts, passives, arbiters;
1112 	string[string] tags;
1113 	string setName;
1114 	Nullable!int setVersion;
1115 	Nullable!BsonObjectID electionId;
1116 	string primary;
1117 	string lastUpdateTime = "infinity ago";
1118 	Nullable!int logicalSessionTimeoutMinutes;
1119 
1120 	bool satisfiesVersion(WireVersion wireVersion) @safe const @nogc pure nothrow
1121 	{
1122 		return maxWireVersion >= wireVersion;
1123 	}
1124 }
1125 
1126 enum WireVersion : int
1127 {
1128 	old = 0,
1129 	v26 = 1,
1130 	v26_2 = 2,
1131 	v30 = 3,
1132 	v32 = 4,
1133 	v34 = 5,
1134 	v36 = 6,
1135 	v40 = 7,
1136 	v42 = 8,
1137 	v44 = 9,
1138 	v49 = 12,
1139 	v50 = 13,
1140 	v51 = 14,
1141 	v52 = 15,
1142 	v53 = 16,
1143 	v60 = 17
1144 }
1145 
1146 private string getHostArchitecture()
1147 {
1148 	import os = std.system;
1149 
1150 	version (X86_64)
1151 		string arch = "x86_64 ";
1152 	else version (X86)
1153 		string arch = "x86 ";
1154 	else version (AArch64)
1155 		string arch = "aarch64 ";
1156 	else version (ARM_HardFloat)
1157 		string arch = "armhf ";
1158 	else version (ARM)
1159 		string arch = "arm ";
1160 	else version (PPC64)
1161 		string arch = "ppc64 ";
1162 	else version (PPC)
1163 		string arch = "ppc ";
1164 	else
1165 		string arch = "unknown ";
1166 
1167 	return arch ~ os.endian.to!string;
1168 }
1169 
1170 private static immutable hostArchitecture = getHostArchitecture;