1 /**
2 	Low level mongodb protocol.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.connection;
9 
10 // /// prints ALL modern OP_MSG queries and legacy runCommand invocations to logDiagnostic
11 // debug = VibeVerboseMongo;
12 
13 public import vibe.data.bson;
14 
15 import vibe.core.core : vibeVersionString;
16 import vibe.core.log;
17 import vibe.core.net;
18 import vibe.data.bson;
19 import vibe.db.mongo.flags;
20 import vibe.db.mongo.settings;
21 import vibe.inet.webform;
22 import vibe.stream.tls;
23 
24 import std.algorithm : findSplit, map, splitter;
25 import std.array;
26 import std.conv;
27 import std.digest.md;
28 import std.exception;
29 import std.range;
30 import std.string;
31 import std.traits : hasIndirections;
32 import std.typecons;
33 
34 import core.time;
35 
36 private struct _MongoErrorDescription
37 {
38 	string message;
39 	int code;
40 	int connectionId;
41 	int n;
42 	double ok;
43 }
44 
45 /**
46  * D POD representation of Mongo error object.
47  *
48  * For successful queries "code" is negative.
49  * Can be used also to check how many documents where updated upon
50  * a successful query via "n" field.
51  */
52 alias MongoErrorDescription = immutable(_MongoErrorDescription);
53 
54 /**
55  * Root class for vibe.d Mongo driver exception hierarchy.
56  */
57 class MongoException : Exception
58 {
59 @safe:
60 
61 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
62 	{
63 		super(message, file, line, next);
64 	}
65 }
66 
67 /**
68  * Generic class for all exception related to unhandled driver problems.
69  *
70  * I.e.: protocol mismatch or unexpected mongo service behavior.
71  */
72 class MongoDriverException : MongoException
73 {
74 @safe:
75 
76 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
77 	{
78 		super(message, file, line, next);
79 	}
80 }
81 
82 /**
83  * Wrapper class for all inner mongo collection manipulation errors.
84  *
85  * It does not indicate problem with vibe.d driver itself. Most frequently this
86  * one is thrown when MongoConnection is in checked mode and getLastError() has something interesting.
87  */
88 class MongoDBException : MongoException
89 {
90 @safe:
91 
92 	MongoErrorDescription description;
93 
94 	this(MongoErrorDescription description, string file = __FILE__,
95 			size_t line = __LINE__, Throwable next = null)
96 	{
97 		super(description.message, file, line, next);
98 		this.description = description;
99 	}
100 
101 	// NOTE: .message is a @future member of Throwable
102 	deprecated("Use .msg instead.") alias message = msg;
103 	@property int code() const nothrow { return description.code; };
104 	@property int connectionId() const nothrow { return description.connectionId; };
105 	@property int n() const nothrow { return description.n; };
106 	@property double ok() const nothrow { return description.ok; };
107 }
108 
109 /**
110  * Generic class for all exceptions related to authentication problems.
111  *
112  * I.e.: unsupported mechanisms or wrong credentials.
113  */
114 class MongoAuthException : MongoException
115 {
116 @safe:
117 
118 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
119 	{
120 		super(message, file, line, next);
121 	}
122 }
123 
124 /**
125   [internal] Provides low-level mongodb protocol access.
126 
127   It is not intended for direct usage. Please use vibe.db.mongo.db and vibe.db.mongo.collection modules for your code.
128   Note that a MongoConnection may only be used from one fiber/thread at a time.
129  */
130 final class MongoConnection {
131 @safe:
132 
133 	import vibe.stream.wrapper /* : StreamOutputRange, streamOutputRange */;
134 	import vibe.internal.interfaceproxy;
135 	import vibe.core.stream : InputStream, Stream;
136 
137 	private {
138 		MongoClientSettings m_settings;
139 		TCPConnection m_conn;
140 		InterfaceProxy!Stream m_stream;
141 		ulong m_bytesRead;
142 		int m_msgid = 1;
143 		StreamOutputRange!(InterfaceProxy!Stream) m_outRange;
144 		ServerDescription m_description;
145 		/// Flag to prevent recursive connections when server closes connection while connecting
146 		bool m_allowReconnect;
147 		bool m_isAuthenticating;
148 		bool m_supportsOpMsg;
149 	}
150 
151 	enum ushort defaultPort = MongoClientSettings.defaultPort;
152 
153 	/// Simplified constructor overload, with no m_settings
154 	this(string server, ushort port = defaultPort)
155 	{
156 		m_settings = new MongoClientSettings();
157 		m_settings.hosts ~= MongoHost(server, port);
158 	}
159 
160 	this(MongoClientSettings cfg)
161 	{
162 		m_settings = cfg;
163 
164 		// Now let's check for features that are not yet supported.
165 		if(m_settings.hosts.length > 1)
166 			logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s",
167 					m_settings.hosts[0].name, m_settings.hosts[0].port);
168 	}
169 
170 	void connect()
171 	{
172 		bool isTLS;
173 
174 		/*
175 		 * TODO: Connect to one of the specified hosts taking into consideration
176 		 * options such as connect timeouts and so on.
177 		 */
178 		try {
179 			import core.time : Duration, msecs;
180 
181 			auto connectTimeout = m_settings.connectTimeoutMS.msecs;
182 			if (m_settings.connectTimeoutMS == 0)
183 				connectTimeout = Duration.max;
184 
185 			m_conn = connectTCP(m_settings.hosts[0].name, m_settings.hosts[0].port, null, 0, connectTimeout);
186 			m_conn.tcpNoDelay = true;
187 			if (m_settings.socketTimeout != Duration.zero)
188 				m_conn.readTimeout = m_settings.socketTimeout;
189 			if (m_settings.ssl) {
190 				auto ctx =  createTLSContext(TLSContextKind.client);
191 				if (!m_settings.sslverifycertificate) {
192 					ctx.peerValidationMode = TLSPeerValidationMode.none;
193 				}
194 				if (m_settings.sslPEMKeyFile) {
195 					ctx.useCertificateChainFile(m_settings.sslPEMKeyFile);
196 					ctx.usePrivateKeyFile(m_settings.sslPEMKeyFile);
197 				}
198 				if (m_settings.sslCAFile) {
199 					ctx.useTrustedCertificateFile(m_settings.sslCAFile);
200 				}
201 
202 				m_stream = createTLSStream(m_conn, ctx, m_settings.hosts[0].name);
203 				isTLS = true;
204 			}
205 			else {
206 				m_stream = m_conn;
207 			}
208 			m_outRange = streamOutputRange(m_stream);
209 		}
210 		catch (Exception e) {
211 			throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e);
212 		}
213 
214 		scope (failure) disconnect();
215 
216 		m_allowReconnect = false;
217 		scope (exit)
218 			m_allowReconnect = true;
219 
220 		Bson handshake = Bson.emptyObject;
221 		static assert(!is(typeof(m_settings.loadBalanced)), "loadBalanced was added to the API, set legacy if it's true here!");
222 		// TODO: must use legacy handshake if m_settings.loadBalanced is true
223 		// and also once we allow configuring a server API version in the driver
224 		// (https://github.com/mongodb/specifications/blob/master/source/versioned-api/versioned-api.rst)
225 		m_supportsOpMsg = false;
226 		bool legacyHandshake = false;
227 		if (legacyHandshake)
228 		{
229 			handshake["isMaster"] = Bson(1);
230 			handshake["helloOk"] = Bson(1);
231 		}
232 		else
233 		{
234 			handshake["hello"] = Bson(1);
235 			m_supportsOpMsg = true;
236 		}
237 
238 		import os = std.system;
239 		import compiler = std.compiler;
240 		string platform = compiler.name ~ " "
241 			~ compiler.version_major.to!string ~ "." ~ compiler.version_minor.to!string;
242 		// TODO: add support for os.version
243 
244 		handshake["client"] = Bson([
245 			"driver": Bson(["name": Bson("vibe.db.mongo"), "version": Bson(vibeVersionString)]),
246 			"os": Bson(["type": Bson(os.os.to!string), "architecture": Bson(hostArchitecture)]),
247 			"platform": Bson(platform)
248 		]);
249 
250 		if (m_settings.appName.length) {
251 			enforce!MongoAuthException(m_settings.appName.length <= 128,
252 				"The application name may not be larger than 128 bytes");
253 			handshake["client"]["application"] = Bson(["name": Bson(m_settings.appName)]);
254 		}
255 
256 		auto reply = runCommand!(Bson, MongoAuthException)("admin", handshake);
257 		m_description = deserializeBson!ServerDescription(reply);
258 
259 		if (m_description.satisfiesVersion(WireVersion.v36))
260 			m_supportsOpMsg = true;
261 
262 		m_bytesRead = 0;
263 		auto authMechanism = m_settings.authMechanism;
264 		if (authMechanism == MongoAuthMechanism.none)
265 		{
266 			if (m_settings.sslPEMKeyFile != null && m_description.satisfiesVersion(WireVersion.v26))
267 			{
268 				authMechanism = MongoAuthMechanism.mongoDBX509;
269 			}
270 			else if (m_settings.digest.length)
271 			{
272 				// SCRAM-SHA-1 default since 3.0, otherwise use legacy authentication
273 				if (m_description.satisfiesVersion(WireVersion.v30))
274 					authMechanism = MongoAuthMechanism.scramSHA1;
275 				else
276 					authMechanism = MongoAuthMechanism.mongoDBCR;
277 			}
278 		}
279 
280 		if (authMechanism == MongoAuthMechanism.mongoDBCR && m_description.satisfiesVersion(WireVersion.v40))
281 			throw new MongoAuthException("Trying to force MONGODB-CR authentication on a >=4.0 server not supported");
282 
283 		if (authMechanism == MongoAuthMechanism.scramSHA1 && !m_description.satisfiesVersion(WireVersion.v30))
284 			throw new MongoAuthException("Trying to force SCRAM-SHA-1 authentication on a <3.0 server not supported");
285 
286 		if (authMechanism == MongoAuthMechanism.mongoDBX509 && !m_description.satisfiesVersion(WireVersion.v26))
287 			throw new MongoAuthException("Trying to force MONGODB-X509 authentication on a <2.6 server not supported");
288 
289 		if (authMechanism == MongoAuthMechanism.mongoDBX509 && !isTLS)
290 			throw new MongoAuthException("Trying to force MONGODB-X509 authentication, but didn't use ssl!");
291 
292 		m_isAuthenticating = true;
293 		scope (exit)
294 			m_isAuthenticating = false;
295 		final switch (authMechanism)
296 		{
297 		case MongoAuthMechanism.none:
298 			break;
299 		case MongoAuthMechanism.mongoDBX509:
300 			certAuthenticate();
301 			break;
302 		case MongoAuthMechanism.scramSHA1:
303 			scramAuthenticate();
304 			break;
305 		case MongoAuthMechanism.mongoDBCR:
306 			authenticate();
307 			break;
308 		}
309 	}
310 
311 	void disconnect()
312 	{
313 		if (m_conn) {
314 			if (m_stream && m_conn.connected) {
315 				m_outRange.flush();
316 
317 				m_stream.finalize();
318 				m_stream = InterfaceProxy!Stream.init;
319 			}
320 
321 			m_conn.close();
322 			m_conn = TCPConnection.init;
323 		}
324 
325 		m_outRange.drop();
326 	}
327 
328 	@property bool connected() const { return m_conn && m_conn.connected; }
329 
330 	@property const(ServerDescription) description() const { return m_description; }
331 
332 	deprecated("Non-functional since MongoDB 5.1") void update(string collection_name, UpdateFlags flags, Bson selector, Bson update)
333 	{
334 		scope(failure) disconnect();
335 		send(OpCode.Update, -1, cast(int)0, collection_name, cast(int)flags, selector, update);
336 		if (m_settings.safe) checkForError(collection_name);
337 	}
338 
339 	deprecated("Non-functional since MongoDB 5.1") void insert(string collection_name, InsertFlags flags, Bson[] documents)
340 	{
341 		scope(failure) disconnect();
342 		foreach (d; documents) if (d["_id"].isNull()) d["_id"] = Bson(BsonObjectID.generate());
343 		send(OpCode.Insert, -1, cast(int)flags, collection_name, documents);
344 		if (m_settings.safe) checkForError(collection_name);
345 	}
346 
347 	deprecated("Non-functional since MongoDB 5.1: use `find` to query collections instead - instead of `$cmd` use `runCommand` to send commands - use listIndexes and listCollections instead of `<database>.system.indexes` and `<database>.system.namsepsaces`")
348 	void query(T)(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
349 	{
350 		scope(failure) disconnect();
351 		flags |= m_settings.defQueryFlags;
352 		int id;
353 		if (returnFieldSelector.isNull)
354 			id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query);
355 		else
356 			id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query, returnFieldSelector);
357 		recvReply!T(id, on_msg, on_doc);
358 	}
359 
360 	/**
361 		Runs the given Bson command (Bson object with the first entry in the map
362 		being the command name) on the given database.
363 
364 		Using `runCommand` checks that the command completed successfully by
365 		checking that `result["ok"].get!double == 1.0`. Throws the
366 		`CommandFailException` on failure.
367 
368 		Using `runCommandUnchecked` will return the result as-is. Developers may
369 		check the `result["ok"]` value themselves. (It's a double that needs to
370 		be compared with 1.0 by default)
371 
372 		Throws:
373 			- `CommandFailException` (template argument) only in the
374 				`runCommand` overload, when the command response is not ok.
375 			- `MongoDriverException` when internal protocol errors occur.
376 	*/
377 	Bson runCommand(T, CommandFailException = MongoDriverException)(
378 		string database,
379 		Bson command,
380 		string errorInfo = __FUNCTION__,
381 		string errorFile = __FILE__,
382 		size_t errorLine = __LINE__
383 	)
384 	in(database.length, "runCommand requires a database argument")
385 	{
386 		return runCommandImpl!(T, CommandFailException)(
387 			database, command, true, errorInfo, errorFile, errorLine);
388 	}
389 
390 	Bson runCommandUnchecked(T, CommandFailException = MongoDriverException)(
391 		string database,
392 		Bson command,
393 		string errorInfo = __FUNCTION__,
394 		string errorFile = __FILE__,
395 		size_t errorLine = __LINE__
396 	)
397 	in(database.length, "runCommand requires a database argument")
398 	{
399 		return runCommandImpl!(T, CommandFailException)(
400 			database, command, false, errorInfo, errorFile, errorLine);
401 	}
402 
403 	private Bson runCommandImpl(T, CommandFailException)(
404 		string database,
405 		Bson command,
406 		bool testOk = true,
407 		string errorInfo = __FUNCTION__,
408 		string errorFile = __FILE__,
409 		size_t errorLine = __LINE__
410 	)
411 	in(database.length, "runCommand requires a database argument")
412 	{
413 		import std.array;
414 
415 		string formatErrorInfo(string msg) @safe
416 		{
417 			return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")");
418 		}
419 
420 		Bson ret;
421 
422 		if (m_supportsOpMsg)
423 		{
424 			debug (VibeVerboseMongo)
425 				logDiagnostic("runCommand: [db=%s] %s", database, command);
426 
427 			command["$db"] = Bson(database);
428 
429 			auto id = sendMsg(-1, 0, command);
430 			Appender!(Bson[])[string] docs;
431 			recvMsg!true(id, (flags, root) @safe {
432 				ret = root;
433 			}, (scope ident, size) @safe {
434 				docs[ident.idup] = appender!(Bson[]);
435 			}, (scope ident, push) @safe {
436 				auto pd = ident in docs;
437 				assert(!!pd, "Received data for unexpected identifier");
438 				pd.put(push);
439 			});
440 
441 			foreach (ident, app; docs)
442 				ret[ident] = Bson(app.data);
443 		}
444 		else
445 		{
446 			debug (VibeVerboseMongo)
447 				logDiagnostic("runCommand(legacy): [db=%s] %s", database, command);
448 			auto id = send(OpCode.Query, -1, 0, database ~ ".$cmd", 0, -1, command, Bson(null));
449 			recvReply!T(id,
450 				(cursor, flags, first_doc, num_docs) {
451 					logTrace("runCommand(%s) flags: %s, cursor: %s, documents: %s", database, flags, cursor, num_docs);
452 					enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), formatErrorInfo("command query failed"));
453 					enforce!MongoDriverException(num_docs == 1, formatErrorInfo("received more than one document in command response"));
454 				},
455 				(idx, ref doc) {
456 					ret = doc;
457 				});
458 		}
459 
460 		if (testOk && ret["ok"].get!double != 1.0)
461 			throw new CommandFailException(formatErrorInfo("command failed: "
462 				~ ret["errmsg"].opt!string("(no message)")));
463 
464 		static if (is(T == Bson)) return ret;
465 		else {
466 			T doc = deserializeBson!T(bson);
467 			return doc;
468 		}
469 	}
470 
471 	template getMore(T)
472 	{
473 		deprecated("use the modern overload instead")
474 		void getMore(string collection_name, int nret, long cursor_id, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
475 		{
476 			scope(failure) disconnect();
477 			auto parts = collection_name.findSplit(".");
478 			auto id = send(OpCode.GetMore, -1, cast(int)0, parts[0], parts[2], nret, cursor_id);
479 			recvReply!T(id, on_msg, on_doc);
480 		}
481 
482 		/**
483 		* Modern (MongoDB 3.2+ compatible) getMore implementation using the getMore
484 		* command and OP_MSG. (if supported)
485 		*
486 		* Falls back to compatibility for older MongoDB versions, but those are not
487 		* officially supported anymore.
488 		*
489 		* Upgrade_notes:
490 		* - error checking is now done inside this function
491 		* - document index is no longer sent, instead the callback is called sequentially
492 		*
493 		* Throws: $(LREF MongoDriverException) in case the command fails.
494 		*/
495 		void getMore(long cursor_id, string database, string collection_name, long nret,
496 			scope GetMoreHeaderDelegate on_header,
497 			scope GetMoreDocumentDelegate!T on_doc,
498 			Duration timeout = Duration.max,
499 			string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__)
500 		{
501 			Bson command = Bson.emptyObject;
502 			command["getMore"] = Bson(cursor_id);
503 			command["$db"] = Bson(database);
504 			command["collection"] = Bson(collection_name);
505 			if (nret > 0)
506 				command["batchSize"] = Bson(nret);
507 			if (timeout != Duration.max && timeout.total!"msecs" < int.max)
508 				command["maxTimeMS"] = Bson(cast(int)timeout.total!"msecs");
509 
510 			string formatErrorInfo(string msg) @safe
511 			{
512 				return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")");
513 			}
514 
515 			scope (failure) disconnect();
516 
517 			if (m_supportsOpMsg)
518 			{
519 				enum needsDup = hasIndirections!T || is(T == Bson);
520 
521 				debug (VibeVerboseMongo)
522 					logDiagnostic("getMore: [db=%s] %s", database, command);
523 
524 				auto id = sendMsg(-1, 0, command);
525 				recvMsg!needsDup(id, (flags, scope root) @safe {
526 					if (root["ok"].get!double != 1.0)
527 						throw new MongoDriverException(formatErrorInfo("getMore failed: "
528 							~ root["errmsg"].opt!string("(no message)")));
529 
530 					auto cursor = root["cursor"];
531 					auto batch = cursor["nextBatch"].get!(Bson[]);
532 					on_header(cursor["id"].get!long, cursor["ns"].get!string, batch.length);
533 
534 					foreach (ref push; batch)
535 					{
536 						T doc = deserializeBson!T(push);
537 						on_doc(doc);
538 					}
539 				}, (scope ident, size) @safe {}, (scope ident, scope push) @safe {
540 					throw new MongoDriverException(formatErrorInfo("unexpected section type 1 in getMore response"));
541 				});
542 			}
543 			else
544 			{
545 				debug (VibeVerboseMongo)
546 					logDiagnostic("getMore(legacy): [db=%s] collection=%s, cursor=%s, nret=%s", database, collection_name, cursor_id, nret);
547 
548 				int brokenId = 0;
549 				int nextId = 0;
550 				int num_docs;
551 				// array to store out-of-order items, to push them into the callback properly
552 				T[] compatibilitySort;
553 				string full_name = database ~ '.' ~ collection_name;
554 				auto id = send(OpCode.GetMore, -1, cast(int)0, full_name, nret, cursor_id);
555 				recvReply!T(id, (long cursor, ReplyFlags flags, int first_doc, int num_docs)
556 				{
557 					enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound),
558 						formatErrorInfo("Invalid cursor handle."));
559 					enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure),
560 						formatErrorInfo("Query failed. Does the database exist?"));
561 
562 					on_header(cursor, full_name, num_docs);
563 				}, (size_t idx, ref T doc) {
564 					if (cast(int)idx == nextId) {
565 						on_doc(doc);
566 						nextId++;
567 						brokenId = nextId;
568 					} else {
569 						enforce!MongoDriverException(idx >= brokenId,
570 							formatErrorInfo("Got legacy document with same id after having already processed it!"));
571 						enforce!MongoDriverException(idx < num_docs,
572 							formatErrorInfo("Received more documents than the database reported to us"));
573 
574 						size_t arrayIndex = cast(int)idx - brokenId;
575 						if (!compatibilitySort.length)
576 							compatibilitySort.length = num_docs - brokenId;
577 						compatibilitySort[arrayIndex] = doc;
578 					}
579 				});
580 
581 				foreach (doc; compatibilitySort)
582 					on_doc(doc);
583 			}
584 		}
585 	}
586 
587 	/// Forwards the `find` command passed in to the database, handles the
588 	/// callbacks like with getMore. This exists for easier integration with
589 	/// MongoCursor!T.
590 	package void startFind(T)(Bson command,
591 		scope GetMoreHeaderDelegate on_header,
592 		scope GetMoreDocumentDelegate!T on_doc,
593 		string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__)
594 	{
595 		string formatErrorInfo(string msg) @safe
596 		{
597 			return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")");
598 		}
599 
600 		scope (failure) disconnect();
601 
602 		enforce!MongoDriverException(m_supportsOpMsg, formatErrorInfo("Database does not support required OP_MSG for new style queries"));
603 
604 		enum needsDup = hasIndirections!T || is(T == Bson);
605 
606 		debug (VibeVerboseMongo)
607 			logDiagnostic("startFind: %s", command);
608 
609 		auto id = sendMsg(-1, 0, command);
610 		recvMsg!needsDup(id, (flags, scope root) @safe {
611 			if (root["ok"].get!double != 1.0)
612 				throw new MongoDriverException(formatErrorInfo("find failed: "
613 					~ root["errmsg"].opt!string("(no message)")));
614 
615 			auto cursor = root["cursor"];
616 			auto batch = cursor["firstBatch"].get!(Bson[]);
617 			on_header(cursor["id"].get!long, cursor["ns"].get!string, batch.length);
618 
619 			foreach (ref push; batch)
620 			{
621 				T doc = deserializeBson!T(push);
622 				on_doc(doc);
623 			}
624 		}, (scope ident, size) @safe {}, (scope ident, scope push) @safe {
625 			throw new MongoDriverException(formatErrorInfo("unexpected section type 1 in find response"));
626 		});
627 	}
628 
629 	deprecated("Non-functional since MongoDB 5.1") void delete_(string collection_name, DeleteFlags flags, Bson selector)
630 	{
631 		scope(failure) disconnect();
632 		send(OpCode.Delete, -1, cast(int)0, collection_name, cast(int)flags, selector);
633 		if (m_settings.safe) checkForError(collection_name);
634 	}
635 
636 	deprecated("Non-functional since MongoDB 5.1, use the overload taking the collection as well")
637 	void killCursors(scope long[] cursors)
638 	{
639 		scope(failure) disconnect();
640 		send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors);
641 	}
642 
643 	void killCursors(string collection, scope long[] cursors)
644 	{
645 		scope(failure) disconnect();
646 		// TODO: could add special case to runCommand to not return anything
647 		if (m_supportsOpMsg)
648 		{
649 			Bson command = Bson.emptyObject;
650 			auto parts = collection.findSplit(".");
651 			command["killCursors"] = Bson(parts[2]);
652 			command["cursors"] = () @trusted { return cursors; } ().serializeToBson; // NOTE: "escaping" scope here
653 			runCommand!Bson(parts[0], command);
654 		}
655 		else
656 		{
657 			send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors);
658 		}
659 	}
660 
661 	MongoErrorDescription getLastError(string db)
662 	{
663 		// Though higher level abstraction level by concept, this function
664 		// is implemented here to allow to check errors upon every request
665 		// on connection level.
666 
667 		Bson command_and_options = Bson.emptyObject;
668 		command_and_options["getLastError"] = Bson(1.0);
669 
670 		if(m_settings.w != m_settings.w.init)
671 			command_and_options["w"] = m_settings.w; // Already a Bson struct
672 		if(m_settings.wTimeoutMS != m_settings.wTimeoutMS.init)
673 			command_and_options["wtimeout"] = Bson(m_settings.wTimeoutMS);
674 		if(m_settings.journal)
675 			command_and_options["j"] = Bson(true);
676 		if(m_settings.fsync)
677 			command_and_options["fsync"] = Bson(true);
678 
679 		_MongoErrorDescription ret;
680 
681 		auto error = runCommandUnchecked!Bson(db, command_and_options);
682 
683 		try {
684 			ret = MongoErrorDescription(
685 				error["errmsg"].opt!string(error["err"].opt!string("")),
686 				error["code"].opt!int(-1),
687 				error["connectionId"].opt!int(-1),
688 				error["n"].opt!int(-1),
689 				error["ok"].get!double()
690 			);
691 		} catch (Exception e) {
692 			throw new MongoDriverException(e.msg);
693 		}
694 
695 		return ret;
696 	}
697 
698 	/** Queries the server for all databases.
699 
700 		Returns:
701 			An input range of $(D MongoDBInfo) values.
702 	*/
703 	auto listDatabases()
704 	{
705 		string cn = m_settings.database == string.init ? "admin" : m_settings.database;
706 
707 		auto cmd = Bson(["listDatabases":Bson(1)]);
708 
709 		static MongoDBInfo toInfo(const(Bson) db_doc) {
710 			return MongoDBInfo(
711 				db_doc["name"].get!string,
712 				// double on MongoDB < 5.0, long afterwards
713 				db_doc["sizeOnDisk"].to!double,
714 				db_doc["empty"].get!bool
715 			);
716 		}
717 
718 		auto result = runCommand!Bson(cn, cmd)["databases"];
719 
720 		return result.byValue.map!toInfo;
721 	}
722 
723 	private int recvMsg(bool dupBson = true)(int reqid,
724 		scope MsgReplyDelegate!dupBson on_sec0,
725 		scope MsgSection1StartDelegate on_sec1_start,
726 		scope MsgSection1Delegate!dupBson on_sec1_doc)
727 	{
728 		import std.traits;
729 
730 		auto bytes_read = m_bytesRead;
731 		int msglen = recvInt();
732 		int resid = recvInt();
733 		int respto = recvInt();
734 		int opcode = recvInt();
735 
736 		enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!");
737 		enforce!MongoDriverException(opcode == OpCode.Msg, "Got wrong reply type! (must be OP_MSG)");
738 
739 		uint flagBits = recvUInt();
740 		const bool hasCRC = (flagBits & (1 << 16)) != 0;
741 
742 		int sectionLength = cast(int)(msglen - 4 * int.sizeof - flagBits.sizeof);
743 		if (hasCRC)
744 			sectionLength -= uint.sizeof; // CRC present
745 
746 		bool gotSec0;
747 		while (m_bytesRead - bytes_read < sectionLength) {
748 			// TODO: directly deserialize from the wire
749 			static if (!dupBson) {
750 				ubyte[256] buf = void;
751 				ubyte[] bufsl = buf;
752 			}
753 
754 			ubyte payloadType = recvUByte();
755 			switch (payloadType) {
756 				case 0:
757 					gotSec0 = true;
758 					static if (dupBson)
759 						auto data = recvBsonDup();
760 					else
761 						scope data = (() @trusted => recvBson(bufsl))();
762 
763 					debug (VibeVerboseMongo)
764 						logDiagnostic("recvData: sec0[flags=%x]: %s", flagBits, data);
765 					on_sec0(flagBits, data);
766 					break;
767 				case 1:
768 					if (!gotSec0)
769 						throw new MongoDriverException("Got OP_MSG section 1 before section 0, which is not supported by vibe.d");
770 
771 					auto section_bytes_read = m_bytesRead;
772 					int size = recvInt();
773 					auto identifier = recvCString();
774 					on_sec1_start(identifier, size);
775 					while (m_bytesRead - section_bytes_read < size) {
776 						static if (dupBson)
777 							auto data = recvBsonDup();
778 						else
779 							scope data = (() @trusted => recvBson(bufsl))();
780 
781 						debug (VibeVerboseMongo)
782 							logDiagnostic("recvData: sec1[%s]: %s", identifier, data);
783 
784 						on_sec1_doc(identifier, data);
785 					}
786 					break;
787 				default:
788 					throw new MongoDriverException("Received unexpected payload section type " ~ payloadType.to!string);
789 			}
790 		}
791 
792 		if (hasCRC)
793 		{
794 			uint crc = recvUInt();
795 			// TODO: validate CRC
796 			logDiagnostic("recvData: crc=%s (discarded)", crc);
797 		}
798 
799 		assert(bytes_read + msglen == m_bytesRead,
800 			format!"Packet size mismatch! Expected %s bytes, but read %s."(
801 				msglen, m_bytesRead - bytes_read));
802 
803 		return resid;
804 	}
805 
806 	private int recvReply(T)(int reqid, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
807 	{
808 		auto bytes_read = m_bytesRead;
809 		int msglen = recvInt();
810 		int resid = recvInt();
811 		int respto = recvInt();
812 		int opcode = recvInt();
813 
814 		enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!");
815 		enforce!MongoDriverException(opcode == OpCode.Reply, "Got a non-'Reply' reply!");
816 
817 		auto flags = cast(ReplyFlags)recvInt();
818 		long cursor = recvLong();
819 		int start = recvInt();
820 		int numret = recvInt();
821 
822 		scope (exit) {
823 			if (m_bytesRead - bytes_read < msglen) {
824 				logWarn("MongoDB reply was longer than expected, skipping the rest: %d vs. %d", msglen, m_bytesRead - bytes_read);
825 				ubyte[] dst = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)];
826 				recv(dst);
827 			} else if (m_bytesRead - bytes_read > msglen) {
828 				logWarn("MongoDB reply was shorter than expected. Dropping connection.");
829 				disconnect();
830 				throw new MongoDriverException("MongoDB reply was too short for data.");
831 			}
832 		}
833 
834 		on_msg(cursor, flags, start, numret);
835 		static if (hasIndirections!T || is(T == Bson))
836 			auto buf = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)];
837 		foreach (i; 0 .. cast(size_t)numret) {
838 			// TODO: directly deserialize from the wire
839 			static if (!hasIndirections!T && !is(T == Bson)) {
840 				ubyte[256] buf = void;
841 				ubyte[] bufsl = buf;
842 				auto bson = () @trusted { return recvBson(bufsl); } ();
843 			} else {
844 				auto bson = () @trusted { return recvBson(buf); } ();
845 			}
846 
847 			// logDebugV("Received mongo response on %s:%s: %s", reqid, i, bson);
848 
849 			static if (is(T == Bson)) on_doc(i, bson);
850 			else {
851 				T doc = deserializeBson!T(bson);
852 				on_doc(i, doc);
853 			}
854 		}
855 
856 		return resid;
857 	}
858 
859 	private int send(ARGS...)(OpCode code, int response_to, scope ARGS args)
860 	{
861 		if( !connected() ) {
862 			if (m_allowReconnect) connect();
863 			else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating");
864 			else throw new MongoDriverException("Connection got closed while connecting");
865 		}
866 		int id = nextMessageId();
867 		// sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types
868 		sendValue!int(16 + sendLength(args));
869 		sendValue!int(id);
870 		sendValue!int(response_to);
871 		sendValue!int(cast(int)code);
872 		foreach (a; args) sendValue(a);
873 		m_outRange.flush();
874 		// logDebugV("Sent mongo opcode %s (id %s) in response to %s with args %s", code, id, response_to, tuple(args));
875 		return id;
876 	}
877 
878 	private int sendMsg(int response_to, uint flagBits, Bson document)
879 	{
880 		if( !connected() ) {
881 			if (m_allowReconnect) connect();
882 			else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating");
883 			else throw new MongoDriverException("Connection got closed while connecting");
884 		}
885 		int id = nextMessageId();
886 		// sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types
887 		sendValue!int(21 + sendLength(document));
888 		sendValue!int(id);
889 		sendValue!int(response_to);
890 		sendValue!int(cast(int)OpCode.Msg);
891 		sendValue!uint(flagBits);
892 		const bool hasCRC = (flagBits & (1 << 16)) != 0;
893 		assert(!hasCRC, "sending with CRC bits not yet implemented");
894 		sendValue!ubyte(0);
895 		sendValue(document);
896 		m_outRange.flush();
897 		return id;
898 	}
899 
900 	private void sendValue(T)(scope T value)
901 	{
902 		import std.traits;
903 		static if (is(T == ubyte)) m_outRange.put(value);
904 		else static if (is(T == int) || is(T == uint)) sendBytes(toBsonData(value));
905 		else static if (is(T == long)) sendBytes(toBsonData(value));
906 		else static if (is(T == Bson)) sendBytes(() @trusted { return value.data; } ());
907 		else static if (is(T == string)) {
908 			sendBytes(cast(const(ubyte)[])value);
909 			sendBytes(cast(const(ubyte)[])"\0");
910 		} else static if (isArray!T) {
911 			foreach (v; value)
912 				sendValue(v);
913 		} else static assert(false, "Unexpected type: "~T.stringof);
914 	}
915 
916 	private void sendBytes(scope const(ubyte)[] data){ m_outRange.put(data); }
917 
918 	private T recvInteger(T)() { ubyte[T.sizeof] ret; recv(ret); return fromBsonData!T(ret); }
919 	private alias recvUByte = recvInteger!ubyte;
920 	private alias recvInt = recvInteger!int;
921 	private alias recvUInt = recvInteger!uint;
922 	private alias recvLong = recvInteger!long;
923 	private Bson recvBson(ref ubyte[] buf)
924 	@system {
925 		int len = recvInt();
926 		ubyte[] dst;
927 		if (len > buf.length) dst = new ubyte[len];
928 		else {
929 			dst = buf[0 .. len];
930 			buf = buf[len .. $];
931 		}
932 		dst[0 .. 4] = toBsonData(len)[];
933 		recv(dst[4 .. $]);
934 		return Bson(Bson.Type.object, cast(immutable)dst);
935 	}
936 	private Bson recvBsonDup()
937 	@trusted {
938 		ubyte[4] size;
939 		recv(size[]);
940 		ubyte[] dst = new ubyte[fromBsonData!uint(size)];
941 		dst[0 .. 4] = size;
942 		recv(dst[4 .. $]);
943 		return Bson(Bson.Type.object, cast(immutable)dst);
944 	}
945 	private void recv(scope ubyte[] dst) { enforce(m_stream); m_stream.read(dst); m_bytesRead += dst.length; }
946 	private const(char)[] recvCString()
947 	{
948 		auto buf = new ubyte[32];
949 		ptrdiff_t i = -1;
950 		do
951 		{
952 			i++;
953 			if (i == buf.length) buf.length *= 2;
954 			recv(buf[i .. i + 1]);
955 		} while (buf[i] != 0);
956 		return cast(const(char)[])buf[0 .. i];
957 	}
958 
959 	private int nextMessageId() { return m_msgid++; }
960 
961 	private void checkForError(string collection_name)
962 	{
963 		auto coll = collection_name.split(".")[0];
964 		auto err = getLastError(coll);
965 
966 		enforce(
967 			err.code < 0,
968 			new MongoDBException(err)
969 		);
970 	}
971 
972 	private void certAuthenticate()
973 	{
974 		Bson cmd = Bson.emptyObject;
975 		cmd["authenticate"] = Bson(1);
976 		cmd["mechanism"] = Bson("MONGODB-X509");
977 		if (m_description.satisfiesVersion(WireVersion.v34))
978 		{
979 			if (m_settings.username.length)
980 				cmd["user"] = Bson(m_settings.username);
981 		}
982 		else
983 		{
984 			if (!m_settings.username.length)
985 				throw new MongoAuthException("No username provided but connected to MongoDB server <=3.2 not supporting this");
986 
987 			cmd["user"] = Bson(m_settings.username);
988 		}
989 		runCommand!(Bson, MongoAuthException)(m_settings.getAuthDatabase, cmd);
990 	}
991 
992 	private void authenticate()
993 	{
994 		scope (failure) disconnect();
995 	
996 		string cn = m_settings.getAuthDatabase;
997 
998 		auto cmd = Bson(["getnonce": Bson(1)]);
999 		auto result = runCommand!(Bson, MongoAuthException)(cn, cmd);
1000 		string nonce = result["nonce"].get!string;
1001 		string key = toLower(toHexString(md5Of(nonce ~ m_settings.username ~ m_settings.digest)).idup);
1002 
1003 		cmd = Bson.emptyObject;
1004 		cmd["authenticate"] = Bson(1);
1005 		cmd["mechanism"] = Bson("MONGODB-CR");
1006 		cmd["nonce"] = Bson(nonce);
1007 		cmd["user"] = Bson(m_settings.username);
1008 		cmd["key"] = Bson(key);
1009 		runCommand!(Bson, MongoAuthException)(cn, cmd);
1010 	}
1011 
1012 	private void scramAuthenticate()
1013 	{
1014 		import vibe.db.mongo.sasl;
1015 
1016 		string cn = m_settings.getAuthDatabase;
1017 
1018 		ScramState state;
1019 		string payload = state.createInitialRequest(m_settings.username);
1020 
1021 		auto cmd = Bson.emptyObject;
1022 		cmd["saslStart"] = Bson(1);
1023 		cmd["mechanism"] = Bson("SCRAM-SHA-1");
1024 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
1025 
1026 		auto doc = runCommand!(Bson, MongoAuthException)(cn, cmd);
1027 		string response = cast(string)doc["payload"].get!BsonBinData().rawData;
1028 		Bson conversationId = doc["conversationId"];
1029 
1030 		payload = state.update(m_settings.digest, response);
1031 		cmd = Bson.emptyObject;
1032 		cmd["saslContinue"] = Bson(1);
1033 		cmd["conversationId"] = conversationId;
1034 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
1035 
1036 		doc = runCommand!(Bson, MongoAuthException)(cn, cmd);
1037 		response = cast(string)doc["payload"].get!BsonBinData().rawData;
1038 
1039 		payload = state.finalize(response);
1040 		cmd = Bson.emptyObject;
1041 		cmd["saslContinue"] = Bson(1);
1042 		cmd["conversationId"] = conversationId;
1043 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
1044 		runCommand!(Bson, MongoAuthException)(cn, cmd);
1045 	}
1046 }
1047 
1048 private enum OpCode : int {
1049 	Reply        = 1, // sent only by DB
1050 	Update       = 2001,
1051 	Insert       = 2002,
1052 	Reserved1    = 2003,
1053 	Query        = 2004,
1054 	GetMore      = 2005,
1055 	Delete       = 2006,
1056 	KillCursors  = 2007,
1057 
1058 	Compressed   = 2012,
1059 	Msg          = 2013,
1060 }
1061 
1062 private alias ReplyDelegate = void delegate(long cursor, ReplyFlags flags, int first_doc, int num_docs) @safe;
1063 private template DocDelegate(T) { alias DocDelegate = void delegate(size_t idx, ref T doc) @safe; }
1064 
1065 private alias MsgReplyDelegate(bool dupBson : true) = void delegate(uint flags, Bson document) @safe;
1066 private alias MsgReplyDelegate(bool dupBson : false) = void delegate(uint flags, scope Bson document) @safe;
1067 private alias MsgSection1StartDelegate = void delegate(scope const(char)[] identifier, int size) @safe;
1068 private alias MsgSection1Delegate(bool dupBson : true) = void delegate(scope const(char)[] identifier, Bson document) @safe;
1069 private alias MsgSection1Delegate(bool dupBson : false) = void delegate(scope const(char)[] identifier, scope Bson document) @safe;
1070 
1071 alias GetMoreHeaderDelegate = void delegate(long id, string ns, size_t count) @safe;
1072 alias GetMoreDocumentDelegate(T) = void delegate(ref T document) @safe;
1073 
1074 struct MongoDBInfo
1075 {
1076 	string name;
1077 	double sizeOnDisk;
1078 	bool empty;
1079 }
1080 
1081 private int sendLength(ARGS...)(scope ARGS args)
1082 {
1083 	import std.traits;
1084 	static if (ARGS.length == 1) {
1085 		alias T = ARGS[0];
1086 		static if (is(T == string)) return cast(int)args[0].length + 1;
1087 		else static if (is(T == int)) return 4;
1088 		else static if (is(T == long)) return 8;
1089 		else static if (is(T == Bson)) return cast(int)() @trusted { return args[0].data.length; } ();
1090 		else static if (isArray!T) {
1091 			int ret = 0;
1092 			foreach (el; args[0]) ret += sendLength(el);
1093 			return ret;
1094 		} else static assert(false, "Unexpected type: "~T.stringof);
1095 	}
1096 	else if (ARGS.length == 0) return 0;
1097 	else return sendLength(args[0 .. $/2]) + sendLength(args[$/2 .. $]);
1098 }
1099 
1100 struct ServerDescription
1101 {
1102 	enum ServerType
1103 	{
1104 		unknown,
1105 		standalone,
1106 		mongos,
1107 		possiblePrimary,
1108 		RSPrimary,
1109 		RSSecondary,
1110 		RSArbiter,
1111 		RSOther,
1112 		RSGhost
1113 	}
1114 
1115 @optional:
1116 	string address;
1117 	string error;
1118 	float roundTripTime = 0;
1119 	Nullable!BsonDate lastWriteDate;
1120 	Nullable!BsonObjectID opTime;
1121 	ServerType type = ServerType.unknown;
1122 	WireVersion minWireVersion, maxWireVersion;
1123 	string me;
1124 	string[] hosts, passives, arbiters;
1125 	string[string] tags;
1126 	string setName;
1127 	Nullable!int setVersion;
1128 	Nullable!BsonObjectID electionId;
1129 	string primary;
1130 	string lastUpdateTime = "infinity ago";
1131 	Nullable!int logicalSessionTimeoutMinutes;
1132 
1133 	bool satisfiesVersion(WireVersion wireVersion) @safe const @nogc pure nothrow
1134 	{
1135 		return maxWireVersion >= wireVersion;
1136 	}
1137 }
1138 
1139 enum WireVersion : int
1140 {
1141 	old = 0,
1142 	v26 = 1,
1143 	v26_2 = 2,
1144 	v30 = 3,
1145 	v32 = 4,
1146 	v34 = 5,
1147 	v36 = 6,
1148 	v40 = 7,
1149 	v42 = 8,
1150 	v44 = 9,
1151 	v49 = 12,
1152 	v50 = 13,
1153 	v51 = 14,
1154 	v52 = 15,
1155 	v53 = 16
1156 }
1157 
1158 private string getHostArchitecture()
1159 {
1160 	import os = std.system;
1161 
1162 	version (X86_64)
1163 		string arch = "x86_64 ";
1164 	else version (X86)
1165 		string arch = "x86 ";
1166 	else version (AArch64)
1167 		string arch = "aarch64 ";
1168 	else version (ARM_HardFloat)
1169 		string arch = "armhf ";
1170 	else version (ARM)
1171 		string arch = "arm ";
1172 	else version (PPC64)
1173 		string arch = "ppc64 ";
1174 	else version (PPC)
1175 		string arch = "ppc ";
1176 	else
1177 		string arch = "unknown ";
1178 
1179 	return arch ~ os.endian.to!string;
1180 }
1181 
1182 private static immutable hostArchitecture = getHostArchitecture;