1 /**
2 	Low level mongodb protocol.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.connection;
9 
10 // /// prints ALL modern OP_MSG queries and legacy runCommand invocations to logDiagnostic
11 // debug = VibeVerboseMongo;
12 
13 public import vibe.data.bson;
14 
15 import vibe.core.core : vibeVersionString;
16 import vibe.core.log;
17 import vibe.core.net;
18 import vibe.data.bson;
19 import vibe.db.mongo.flags;
20 import vibe.db.mongo.settings;
21 import vibe.inet.webform;
22 import vibe.stream.tls;
23 
24 import std.algorithm : findSplit, map, splitter;
25 import std.array;
26 import std.conv;
27 import std.digest.md;
28 import std.exception;
29 import std.range;
30 import std.string;
31 import std.traits : hasIndirections;
32 import std.typecons;
33 
34 import core.time;
35 
36 private struct _MongoErrorDescription
37 {
38 	string message;
39 	int code;
40 	int connectionId;
41 	int n;
42 	double ok;
43 }
44 
45 /**
46  * D POD representation of Mongo error object.
47  *
48  * For successful queries "code" is negative.
49  * Can be used also to check how many documents where updated upon
50  * a successful query via "n" field.
51  */
52 alias MongoErrorDescription = immutable(_MongoErrorDescription);
53 
54 /**
55  * Root class for vibe.d Mongo driver exception hierarchy.
56  */
57 class MongoException : Exception
58 {
59 @safe:
60 
61 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
62 	{
63 		super(message, file, line, next);
64 	}
65 }
66 
67 /**
68  * Generic class for all exception related to unhandled driver problems.
69  *
70  * I.e.: protocol mismatch or unexpected mongo service behavior.
71  */
72 class MongoDriverException : MongoException
73 {
74 @safe:
75 
76 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
77 	{
78 		super(message, file, line, next);
79 	}
80 }
81 
82 /**
83  * Wrapper class for all inner mongo collection manipulation errors.
84  *
85  * It does not indicate problem with vibe.d driver itself. Most frequently this
86  * one is thrown when MongoConnection is in checked mode and getLastError() has something interesting.
87  */
88 class MongoDBException : MongoException
89 {
90 @safe:
91 
92 	MongoErrorDescription description;
93 	alias description this;
94 
95 	this(MongoErrorDescription description, string file = __FILE__,
96 			size_t line = __LINE__, Throwable next = null)
97 	{
98 		super(description.message, file, line, next);
99 		this.description = description;
100 	}
101 }
102 
103 /**
104  * Generic class for all exceptions related to authentication problems.
105  *
106  * I.e.: unsupported mechanisms or wrong credentials.
107  */
108 class MongoAuthException : MongoException
109 {
110 @safe:
111 
112 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
113 	{
114 		super(message, file, line, next);
115 	}
116 }
117 
118 /**
119   [internal] Provides low-level mongodb protocol access.
120 
121   It is not intended for direct usage. Please use vibe.db.mongo.db and vibe.db.mongo.collection modules for your code.
122   Note that a MongoConnection may only be used from one fiber/thread at a time.
123  */
124 final class MongoConnection {
125 @safe:
126 
127 	import vibe.stream.wrapper /* : StreamOutputRange, streamOutputRange */;
128 	import vibe.internal.interfaceproxy;
129 	import vibe.core.stream : InputStream, Stream;
130 
131 	private {
132 		MongoClientSettings m_settings;
133 		TCPConnection m_conn;
134 		InterfaceProxy!Stream m_stream;
135 		ulong m_bytesRead;
136 		int m_msgid = 1;
137 		StreamOutputRange!(InterfaceProxy!Stream) m_outRange;
138 		ServerDescription m_description;
139 		/// Flag to prevent recursive connections when server closes connection while connecting
140 		bool m_allowReconnect;
141 		bool m_isAuthenticating;
142 		bool m_supportsOpMsg;
143 	}
144 
145 	enum ushort defaultPort = MongoClientSettings.defaultPort;
146 
147 	/// Simplified constructor overload, with no m_settings
148 	this(string server, ushort port = defaultPort)
149 	{
150 		m_settings = new MongoClientSettings();
151 		m_settings.hosts ~= MongoHost(server, port);
152 	}
153 
154 	this(MongoClientSettings cfg)
155 	{
156 		m_settings = cfg;
157 
158 		// Now let's check for features that are not yet supported.
159 		if(m_settings.hosts.length > 1)
160 			logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s",
161 					m_settings.hosts[0].name, m_settings.hosts[0].port);
162 	}
163 
164 	void connect()
165 	{
166 		bool isTLS;
167 
168 		/*
169 		 * TODO: Connect to one of the specified hosts taking into consideration
170 		 * options such as connect timeouts and so on.
171 		 */
172 		try {
173 			import core.time : Duration, msecs;
174 
175 			auto connectTimeout = m_settings.connectTimeoutMS.msecs;
176 			if (m_settings.connectTimeoutMS == 0)
177 				connectTimeout = Duration.max;
178 
179 			m_conn = connectTCP(m_settings.hosts[0].name, m_settings.hosts[0].port, null, 0, connectTimeout);
180 			m_conn.tcpNoDelay = true;
181 			if (m_settings.socketTimeout != Duration.zero)
182 				m_conn.readTimeout = m_settings.socketTimeout;
183 			if (m_settings.ssl) {
184 				auto ctx =  createTLSContext(TLSContextKind.client);
185 				if (!m_settings.sslverifycertificate) {
186 					ctx.peerValidationMode = TLSPeerValidationMode.none;
187 				}
188 				if (m_settings.sslPEMKeyFile) {
189 					ctx.useCertificateChainFile(m_settings.sslPEMKeyFile);
190 					ctx.usePrivateKeyFile(m_settings.sslPEMKeyFile);
191 				}
192 				if (m_settings.sslCAFile) {
193 					ctx.useTrustedCertificateFile(m_settings.sslCAFile);
194 				}
195 
196 				m_stream = createTLSStream(m_conn, ctx, m_settings.hosts[0].name);
197 				isTLS = true;
198 			}
199 			else {
200 				m_stream = m_conn;
201 			}
202 			m_outRange = streamOutputRange(m_stream);
203 		}
204 		catch (Exception e) {
205 			throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e);
206 		}
207 
208 		scope (failure) disconnect();
209 
210 		m_allowReconnect = false;
211 		scope (exit)
212 			m_allowReconnect = true;
213 
214 		Bson handshake = Bson.emptyObject;
215 		static assert(!is(typeof(m_settings.loadBalanced)), "loadBalanced was added to the API, set legacy if it's true here!");
216 		// TODO: must use legacy handshake if m_settings.loadBalanced is true
217 		// and also once we allow configuring a server API version in the driver
218 		// (https://github.com/mongodb/specifications/blob/master/source/versioned-api/versioned-api.rst)
219 		m_supportsOpMsg = false;
220 		bool legacyHandshake = false;
221 		if (legacyHandshake)
222 		{
223 			handshake["isMaster"] = Bson(1);
224 			handshake["helloOk"] = Bson(1);
225 		}
226 		else
227 		{
228 			handshake["hello"] = Bson(1);
229 			m_supportsOpMsg = true;
230 		}
231 
232 		import os = std.system;
233 		import compiler = std.compiler;
234 		string platform = compiler.name ~ " "
235 			~ compiler.version_major.to!string ~ "." ~ compiler.version_minor.to!string;
236 		// TODO: add support for os.version
237 
238 		handshake["client"] = Bson([
239 			"driver": Bson(["name": Bson("vibe.db.mongo"), "version": Bson(vibeVersionString)]),
240 			"os": Bson(["type": Bson(os.os.to!string), "architecture": Bson(hostArchitecture)]),
241 			"platform": Bson(platform)
242 		]);
243 
244 		if (m_settings.appName.length) {
245 			enforce!MongoAuthException(m_settings.appName.length <= 128,
246 				"The application name may not be larger than 128 bytes");
247 			handshake["client"]["application"] = Bson(["name": Bson(m_settings.appName)]);
248 		}
249 
250 		auto reply = runCommand!(Bson, MongoAuthException)("admin", handshake);
251 		m_description = deserializeBson!ServerDescription(reply);
252 
253 		if (m_description.satisfiesVersion(WireVersion.v36))
254 			m_supportsOpMsg = true;
255 
256 		m_bytesRead = 0;
257 		auto authMechanism = m_settings.authMechanism;
258 		if (authMechanism == MongoAuthMechanism.none)
259 		{
260 			if (m_settings.sslPEMKeyFile != null && m_description.satisfiesVersion(WireVersion.v26))
261 			{
262 				authMechanism = MongoAuthMechanism.mongoDBX509;
263 			}
264 			else if (m_settings.digest.length)
265 			{
266 				// SCRAM-SHA-1 default since 3.0, otherwise use legacy authentication
267 				if (m_description.satisfiesVersion(WireVersion.v30))
268 					authMechanism = MongoAuthMechanism.scramSHA1;
269 				else
270 					authMechanism = MongoAuthMechanism.mongoDBCR;
271 			}
272 		}
273 
274 		if (authMechanism == MongoAuthMechanism.mongoDBCR && m_description.satisfiesVersion(WireVersion.v40))
275 			throw new MongoAuthException("Trying to force MONGODB-CR authentication on a >=4.0 server not supported");
276 
277 		if (authMechanism == MongoAuthMechanism.scramSHA1 && !m_description.satisfiesVersion(WireVersion.v30))
278 			throw new MongoAuthException("Trying to force SCRAM-SHA-1 authentication on a <3.0 server not supported");
279 
280 		if (authMechanism == MongoAuthMechanism.mongoDBX509 && !m_description.satisfiesVersion(WireVersion.v26))
281 			throw new MongoAuthException("Trying to force MONGODB-X509 authentication on a <2.6 server not supported");
282 
283 		if (authMechanism == MongoAuthMechanism.mongoDBX509 && !isTLS)
284 			throw new MongoAuthException("Trying to force MONGODB-X509 authentication, but didn't use ssl!");
285 
286 		m_isAuthenticating = true;
287 		scope (exit)
288 			m_isAuthenticating = false;
289 		final switch (authMechanism)
290 		{
291 		case MongoAuthMechanism.none:
292 			break;
293 		case MongoAuthMechanism.mongoDBX509:
294 			certAuthenticate();
295 			break;
296 		case MongoAuthMechanism.scramSHA1:
297 			scramAuthenticate();
298 			break;
299 		case MongoAuthMechanism.mongoDBCR:
300 			authenticate();
301 			break;
302 		}
303 	}
304 
305 	void disconnect()
306 	{
307 		if (m_conn) {
308 			if (m_stream && m_conn.connected) {
309 				m_outRange.flush();
310 
311 				m_stream.finalize();
312 				m_stream = InterfaceProxy!Stream.init;
313 			}
314 
315 			m_conn.close();
316 			m_conn = TCPConnection.init;
317 		}
318 
319 		m_outRange.drop();
320 	}
321 
322 	@property bool connected() const { return m_conn && m_conn.connected; }
323 
324 	@property const(ServerDescription) description() const { return m_description; }
325 
326 	deprecated("Non-functional since MongoDB 5.1") void update(string collection_name, UpdateFlags flags, Bson selector, Bson update)
327 	{
328 		scope(failure) disconnect();
329 		send(OpCode.Update, -1, cast(int)0, collection_name, cast(int)flags, selector, update);
330 		if (m_settings.safe) checkForError(collection_name);
331 	}
332 
333 	deprecated("Non-functional since MongoDB 5.1") void insert(string collection_name, InsertFlags flags, Bson[] documents)
334 	{
335 		scope(failure) disconnect();
336 		foreach (d; documents) if (d["_id"].isNull()) d["_id"] = Bson(BsonObjectID.generate());
337 		send(OpCode.Insert, -1, cast(int)flags, collection_name, documents);
338 		if (m_settings.safe) checkForError(collection_name);
339 	}
340 
341 	deprecated("Non-functional since MongoDB 5.1: use `find` to query collections instead - instead of `$cmd` use `runCommand` to send commands - use listIndexes and listCollections instead of `<database>.system.indexes` and `<database>.system.namsepsaces`")
342 	void query(T)(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
343 	{
344 		scope(failure) disconnect();
345 		flags |= m_settings.defQueryFlags;
346 		int id;
347 		if (returnFieldSelector.isNull)
348 			id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query);
349 		else
350 			id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query, returnFieldSelector);
351 		recvReply!T(id, on_msg, on_doc);
352 	}
353 
354 	/**
355 		Runs the given Bson command (Bson object with the first entry in the map
356 		being the command name) on the given database.
357 
358 		Using `runCommand` checks that the command completed successfully by
359 		checking that `result["ok"].get!double == 1.0`. Throws the
360 		`CommandFailException` on failure.
361 
362 		Using `runCommandUnchecked` will return the result as-is. Developers may
363 		check the `result["ok"]` value themselves. (It's a double that needs to
364 		be compared with 1.0 by default)
365 
366 		Throws:
367 			- `CommandFailException` (template argument) only in the
368 				`runCommand` overload, when the command response is not ok.
369 			- `MongoDriverException` when internal protocol errors occur.
370 	*/
371 	Bson runCommand(T, CommandFailException = MongoDriverException)(
372 		string database,
373 		Bson command,
374 		string errorInfo = __FUNCTION__,
375 		string errorFile = __FILE__,
376 		size_t errorLine = __LINE__
377 	)
378 	in(database.length, "runCommand requires a database argument")
379 	{
380 		return runCommandImpl!(T, CommandFailException)(
381 			database, command, true, errorInfo, errorFile, errorLine);
382 	}
383 
384 	Bson runCommandUnchecked(T, CommandFailException = MongoDriverException)(
385 		string database,
386 		Bson command,
387 		string errorInfo = __FUNCTION__,
388 		string errorFile = __FILE__,
389 		size_t errorLine = __LINE__
390 	)
391 	in(database.length, "runCommand requires a database argument")
392 	{
393 		return runCommandImpl!(T, CommandFailException)(
394 			database, command, false, errorInfo, errorFile, errorLine);
395 	}
396 
397 	private Bson runCommandImpl(T, CommandFailException)(
398 		string database,
399 		Bson command,
400 		bool testOk = true,
401 		string errorInfo = __FUNCTION__,
402 		string errorFile = __FILE__,
403 		size_t errorLine = __LINE__
404 	)
405 	in(database.length, "runCommand requires a database argument")
406 	{
407 		import std.array;
408 
409 		string formatErrorInfo(string msg) @safe
410 		{
411 			return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")");
412 		}
413 
414 		Bson ret;
415 
416 		if (m_supportsOpMsg)
417 		{
418 			debug (VibeVerboseMongo)
419 				logDiagnostic("runCommand: [db=%s] %s", database, command);
420 
421 			command["$db"] = Bson(database);
422 
423 			auto id = sendMsg(-1, 0, command);
424 			Appender!(Bson[])[string] docs;
425 			recvMsg!true(id, (flags, root) @safe {
426 				ret = root;
427 			}, (scope ident, size) @safe {
428 				docs[ident] = appender!(Bson[]);
429 			}, (scope ident, push) @safe {
430 				docs[ident].put(push);
431 			});
432 
433 			foreach (ident, app; docs)
434 				ret[ident] = Bson(app.data);
435 		}
436 		else
437 		{
438 			debug (VibeVerboseMongo)
439 				logDiagnostic("runCommand(legacy): [db=%s] %s", database, command);
440 			auto id = send(OpCode.Query, -1, 0, database ~ ".$cmd", 0, -1, command, Bson(null));
441 			recvReply!T(id,
442 				(cursor, flags, first_doc, num_docs) {
443 					logTrace("runCommand(%s) flags: %s, cursor: %s, documents: %s", database, flags, cursor, num_docs);
444 					enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), formatErrorInfo("command query failed"));
445 					enforce!MongoDriverException(num_docs == 1, formatErrorInfo("received more than one document in command response"));
446 				},
447 				(idx, ref doc) {
448 					ret = doc;
449 				});
450 		}
451 
452 		if (testOk && ret["ok"].get!double != 1.0)
453 			throw new CommandFailException(formatErrorInfo("command failed: "
454 				~ ret["errmsg"].opt!string("(no message)")));
455 
456 		static if (is(T == Bson)) return ret;
457 		else {
458 			T doc = deserializeBson!T(bson);
459 			return doc;
460 		}
461 	}
462 
463 	template getMore(T)
464 	{
465 		deprecated("use the modern overload instead")
466 		void getMore(string collection_name, int nret, long cursor_id, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
467 		{
468 			scope(failure) disconnect();
469 			auto parts = collection_name.findSplit(".");
470 			auto id = send(OpCode.GetMore, -1, cast(int)0, parts[0], parts[2], nret, cursor_id);
471 			recvReply!T(id, on_msg, on_doc);
472 		}
473 
474 		/**
475 		* Modern (MongoDB 3.2+ compatible) getMore implementation using the getMore
476 		* command and OP_MSG. (if supported)
477 		*
478 		* Falls back to compatibility for older MongoDB versions, but those are not
479 		* officially supported anymore.
480 		*
481 		* Upgrade_notes:
482 		* - error checking is now done inside this function
483 		* - document index is no longer sent, instead the callback is called sequentially
484 		*
485 		* Throws: $(LREF MongoDriverException) in case the command fails.
486 		*/
487 		void getMore(long cursor_id, string database, string collection_name, long nret,
488 			scope GetMoreHeaderDelegate on_header,
489 			scope GetMoreDocumentDelegate!T on_doc,
490 			Duration timeout = Duration.max,
491 			string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__)
492 		{
493 			Bson command = Bson.emptyObject;
494 			command["getMore"] = Bson(cursor_id);
495 			command["$db"] = Bson(database);
496 			command["collection"] = Bson(collection_name);
497 			command["batchSize"] = Bson(nret);
498 			if (timeout != Duration.max && timeout.total!"msecs" < int.max)
499 				command["maxTimeMS"] = Bson(cast(int)timeout.total!"msecs");
500 
501 			string formatErrorInfo(string msg) @safe
502 			{
503 				return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")");
504 			}
505 
506 			scope (failure) disconnect();
507 
508 			if (m_supportsOpMsg)
509 			{
510 				enum needsDup = hasIndirections!T || is(T == Bson);
511 
512 				debug (VibeVerboseMongo)
513 					logDiagnostic("getMore: [db=%s] %s", database, command);
514 
515 				auto id = sendMsg(-1, 0, command);
516 				recvMsg!needsDup(id, (flags, scope root) @safe {
517 					if (root["ok"].get!double != 1.0)
518 						throw new MongoDriverException(formatErrorInfo("getMore failed: "
519 							~ root["errmsg"].opt!string("(no message)")));
520 
521 					auto cursor = root["cursor"];
522 					auto batch = cursor["nextBatch"].get!(Bson[]);
523 					on_header(cursor["id"].get!long, cursor["ns"].get!string, batch.length);
524 
525 					foreach (ref push; batch)
526 					{
527 						T doc = deserializeBson!T(push);
528 						on_doc(doc);
529 					}
530 				}, (scope ident, size) @safe {}, (scope ident, scope push) @safe {
531 					throw new MongoDriverException(formatErrorInfo("unexpected section type 1 in getMore response"));
532 				});
533 			}
534 			else
535 			{
536 				debug (VibeVerboseMongo)
537 					logDiagnostic("getMore(legacy): [db=%s] collection=%s, cursor=%s, nret=%s", database, collection_name, cursor_id, nret);
538 
539 				int brokenId = 0;
540 				int nextId = 0;
541 				int num_docs;
542 				// array to store out-of-order items, to push them into the callback properly
543 				T[] compatibilitySort;
544 				string full_name = database ~ '.' ~ collection_name;
545 				auto id = send(OpCode.GetMore, -1, cast(int)0, full_name, nret, cursor_id);
546 				recvReply!T(id, (long cursor, ReplyFlags flags, int first_doc, int num_docs)
547 				{
548 					enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound),
549 						formatErrorInfo("Invalid cursor handle."));
550 					enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure),
551 						formatErrorInfo("Query failed. Does the database exist?"));
552 
553 					on_header(cursor, full_name, num_docs);
554 				}, (size_t idx, ref T doc) {
555 					if (cast(int)idx == nextId) {
556 						on_doc(doc);
557 						nextId++;
558 						brokenId = nextId;
559 					} else {
560 						enforce!MongoDriverException(idx >= brokenId,
561 							formatErrorInfo("Got legacy document with same id after having already processed it!"));
562 						enforce!MongoDriverException(idx < num_docs,
563 							formatErrorInfo("Received more documents than the database reported to us"));
564 
565 						size_t arrayIndex = cast(int)idx - brokenId;
566 						if (!compatibilitySort.length)
567 							compatibilitySort.length = num_docs - brokenId;
568 						compatibilitySort[arrayIndex] = doc;
569 					}
570 				});
571 
572 				foreach (doc; compatibilitySort)
573 					on_doc(doc);
574 			}
575 		}
576 	}
577 
578 	/// Forwards the `find` command passed in to the database, handles the
579 	/// callbacks like with getMore. This exists for easier integration with
580 	/// MongoCursor!T.
581 	package void startFind(T)(Bson command,
582 		scope GetMoreHeaderDelegate on_header,
583 		scope GetMoreDocumentDelegate!T on_doc,
584 		string errorInfo = __FUNCTION__, string errorFile = __FILE__, size_t errorLine = __LINE__)
585 	{
586 		string formatErrorInfo(string msg) @safe
587 		{
588 			return text(msg, " in ", errorInfo, " (", errorFile, ":", errorLine, ")");
589 		}
590 
591 		scope (failure) disconnect();
592 
593 		enforce!MongoDriverException(m_supportsOpMsg, formatErrorInfo("Database does not support required OP_MSG for new style queries"));
594 
595 		enum needsDup = hasIndirections!T || is(T == Bson);
596 
597 		debug (VibeVerboseMongo)
598 			logDiagnostic("startFind: %s", command);
599 
600 		auto id = sendMsg(-1, 0, command);
601 		recvMsg!needsDup(id, (flags, scope root) @safe {
602 			if (root["ok"].get!double != 1.0)
603 				throw new MongoDriverException(formatErrorInfo("find failed: "
604 					~ root["errmsg"].opt!string("(no message)")));
605 
606 			auto cursor = root["cursor"];
607 			auto batch = cursor["firstBatch"].get!(Bson[]);
608 			on_header(cursor["id"].get!long, cursor["ns"].get!string, batch.length);
609 
610 			foreach (ref push; batch)
611 			{
612 				T doc = deserializeBson!T(push);
613 				on_doc(doc);
614 			}
615 		}, (scope ident, size) @safe {}, (scope ident, scope push) @safe {
616 			throw new MongoDriverException(formatErrorInfo("unexpected section type 1 in find response"));
617 		});
618 	}
619 
620 	deprecated("Non-functional since MongoDB 5.1") void delete_(string collection_name, DeleteFlags flags, Bson selector)
621 	{
622 		scope(failure) disconnect();
623 		send(OpCode.Delete, -1, cast(int)0, collection_name, cast(int)flags, selector);
624 		if (m_settings.safe) checkForError(collection_name);
625 	}
626 
627 	deprecated("Non-functional since MongoDB 5.1, use the overload taking the collection as well")
628 	void killCursors(scope long[] cursors)
629 	{
630 		scope(failure) disconnect();
631 		send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors);
632 	}
633 
634 	void killCursors(string collection, scope long[] cursors)
635 	{
636 		scope(failure) disconnect();
637 		// TODO: could add special case to runCommand to not return anything
638 		if (m_supportsOpMsg)
639 		{
640 			Bson command = Bson.emptyObject;
641 			auto parts = collection.findSplit(".");
642 			command["killCursors"] = Bson(parts[2]);
643 			command["cursors"] = cursors.serializeToBson;
644 			runCommand!Bson(parts[0], command);
645 		}
646 		else
647 		{
648 			send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors);
649 		}
650 	}
651 
652 	MongoErrorDescription getLastError(string db)
653 	{
654 		// Though higher level abstraction level by concept, this function
655 		// is implemented here to allow to check errors upon every request
656 		// on connection level.
657 
658 		Bson command_and_options = Bson.emptyObject;
659 		command_and_options["getLastError"] = Bson(1.0);
660 
661 		if(m_settings.w != m_settings.w.init)
662 			command_and_options["w"] = m_settings.w; // Already a Bson struct
663 		if(m_settings.wTimeoutMS != m_settings.wTimeoutMS.init)
664 			command_and_options["wtimeout"] = Bson(m_settings.wTimeoutMS);
665 		if(m_settings.journal)
666 			command_and_options["j"] = Bson(true);
667 		if(m_settings.fsync)
668 			command_and_options["fsync"] = Bson(true);
669 
670 		_MongoErrorDescription ret;
671 
672 		auto error = runCommandUnchecked!Bson(db, command_and_options);
673 
674 		try {
675 			ret = MongoErrorDescription(
676 				error["errmsg"].opt!string(error["err"].opt!string("")),
677 				error["code"].opt!int(-1),
678 				error["connectionId"].opt!int(-1),
679 				error["n"].opt!int(-1),
680 				error["ok"].get!double()
681 			);
682 		} catch (Exception e) {
683 			throw new MongoDriverException(e.msg);
684 		}
685 
686 		return ret;
687 	}
688 
689 	/** Queries the server for all databases.
690 
691 		Returns:
692 			An input range of $(D MongoDBInfo) values.
693 	*/
694 	auto listDatabases()
695 	{
696 		string cn = m_settings.database == string.init ? "admin" : m_settings.database;
697 
698 		auto cmd = Bson(["listDatabases":Bson(1)]);
699 
700 		static MongoDBInfo toInfo(const(Bson) db_doc) {
701 			return MongoDBInfo(
702 				db_doc["name"].get!string,
703 				// double on MongoDB < 5.0, long afterwards
704 				db_doc["sizeOnDisk"].to!double,
705 				db_doc["empty"].get!bool
706 			);
707 		}
708 
709 		auto result = runCommand!Bson(cn, cmd)["databases"];
710 
711 		return result.byValue.map!toInfo;
712 	}
713 
714 	private int recvMsg(bool dupBson = true)(int reqid,
715 		scope MsgReplyDelegate!dupBson on_sec0,
716 		scope MsgSection1StartDelegate on_sec1_start,
717 		scope MsgSection1Delegate!dupBson on_sec1_doc)
718 	{
719 		import std.traits;
720 
721 		auto bytes_read = m_bytesRead;
722 		int msglen = recvInt();
723 		int resid = recvInt();
724 		int respto = recvInt();
725 		int opcode = recvInt();
726 
727 		enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!");
728 		enforce!MongoDriverException(opcode == OpCode.Msg, "Got wrong reply type! (must be OP_MSG)");
729 
730 		uint flagBits = recvUInt();
731 		const bool hasCRC = (flagBits & (1 << 16)) != 0;
732 
733 		int sectionLength = cast(int)(msglen - 4 * int.sizeof - flagBits.sizeof);
734 		if (hasCRC)
735 			sectionLength -= uint.sizeof; // CRC present
736 
737 		bool gotSec0;
738 		while (m_bytesRead - bytes_read < sectionLength) {
739 			// TODO: directly deserialize from the wire
740 			static if (!dupBson) {
741 				ubyte[256] buf = void;
742 				ubyte[] bufsl = buf;
743 			}
744 
745 			ubyte payloadType = recvUByte();
746 			switch (payloadType) {
747 				case 0:
748 					gotSec0 = true;
749 					scope Bson data;
750 					static if (dupBson)
751 						data = recvBsonDup();
752 					else
753 						data = (() @trusted => recvBson(bufsl))();
754 
755 					debug (VibeVerboseMongo)
756 						logDiagnostic("recvData: sec0[flags=%x]: %s", flagBits, data);
757 					on_sec0(flagBits, data);
758 					break;
759 				case 1:
760 					if (!gotSec0)
761 						throw new MongoDriverException("Got OP_MSG section 1 before section 0, which is not supported by vibe.d");
762 
763 					auto section_bytes_read = m_bytesRead;
764 					int size = recvInt();
765 					auto identifier = recvCString();
766 					on_sec1_start(identifier, size);
767 					while (m_bytesRead - section_bytes_read < size) {
768 						scope Bson data;
769 						static if (dupBson)
770 							data = recvBsonDup();
771 						else
772 							data = (() @trusted => recvBson(bufsl))();
773 
774 						debug (VibeVerboseMongo)
775 							logDiagnostic("recvData: sec1[%s]: %s", identifier, data);
776 
777 						on_sec1_doc(identifier, data);
778 					}
779 					break;
780 				default:
781 					throw new MongoDriverException("Received unexpected payload section type " ~ payloadType.to!string);
782 			}
783 		}
784 
785 		if (hasCRC)
786 		{
787 			uint crc = recvUInt();
788 			// TODO: validate CRC
789 			logDiagnostic("recvData: crc=%s (discarded)", crc);
790 		}
791 
792 		assert(bytes_read + msglen == m_bytesRead,
793 			format!"Packet size mismatch! Expected %s bytes, but read %s."(
794 				msglen, m_bytesRead - bytes_read));
795 
796 		return resid;
797 	}
798 
799 	private int recvReply(T)(int reqid, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
800 	{
801 		auto bytes_read = m_bytesRead;
802 		int msglen = recvInt();
803 		int resid = recvInt();
804 		int respto = recvInt();
805 		int opcode = recvInt();
806 
807 		enforce!MongoDriverException(respto == reqid, "Reply is not for the expected message on a sequential connection!");
808 		enforce!MongoDriverException(opcode == OpCode.Reply, "Got a non-'Reply' reply!");
809 
810 		auto flags = cast(ReplyFlags)recvInt();
811 		long cursor = recvLong();
812 		int start = recvInt();
813 		int numret = recvInt();
814 
815 		scope (exit) {
816 			if (m_bytesRead - bytes_read < msglen) {
817 				logWarn("MongoDB reply was longer than expected, skipping the rest: %d vs. %d", msglen, m_bytesRead - bytes_read);
818 				ubyte[] dst = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)];
819 				recv(dst);
820 			} else if (m_bytesRead - bytes_read > msglen) {
821 				logWarn("MongoDB reply was shorter than expected. Dropping connection.");
822 				disconnect();
823 				throw new MongoDriverException("MongoDB reply was too short for data.");
824 			}
825 		}
826 
827 		on_msg(cursor, flags, start, numret);
828 		static if (hasIndirections!T || is(T == Bson))
829 			auto buf = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)];
830 		foreach (i; 0 .. cast(size_t)numret) {
831 			// TODO: directly deserialize from the wire
832 			static if (!hasIndirections!T && !is(T == Bson)) {
833 				ubyte[256] buf = void;
834 				ubyte[] bufsl = buf;
835 				auto bson = () @trusted { return recvBson(bufsl); } ();
836 			} else {
837 				auto bson = () @trusted { return recvBson(buf); } ();
838 			}
839 
840 			// logDebugV("Received mongo response on %s:%s: %s", reqid, i, bson);
841 
842 			static if (is(T == Bson)) on_doc(i, bson);
843 			else {
844 				T doc = deserializeBson!T(bson);
845 				on_doc(i, doc);
846 			}
847 		}
848 
849 		return resid;
850 	}
851 
852 	private int send(ARGS...)(OpCode code, int response_to, scope ARGS args)
853 	{
854 		if( !connected() ) {
855 			if (m_allowReconnect) connect();
856 			else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating");
857 			else throw new MongoDriverException("Connection got closed while connecting");
858 		}
859 		int id = nextMessageId();
860 		// sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types
861 		sendValue!int(16 + sendLength(args));
862 		sendValue!int(id);
863 		sendValue!int(response_to);
864 		sendValue!int(cast(int)code);
865 		foreach (a; args) sendValue(a);
866 		m_outRange.flush();
867 		// logDebugV("Sent mongo opcode %s (id %s) in response to %s with args %s", code, id, response_to, tuple(args));
868 		return id;
869 	}
870 
871 	private int sendMsg(int response_to, uint flagBits, Bson document)
872 	{
873 		if( !connected() ) {
874 			if (m_allowReconnect) connect();
875 			else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating");
876 			else throw new MongoDriverException("Connection got closed while connecting");
877 		}
878 		int id = nextMessageId();
879 		// sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types
880 		sendValue!int(21 + sendLength(document));
881 		sendValue!int(id);
882 		sendValue!int(response_to);
883 		sendValue!int(cast(int)OpCode.Msg);
884 		sendValue!uint(flagBits);
885 		const bool hasCRC = (flagBits & (1 << 16)) != 0;
886 		assert(!hasCRC, "sending with CRC bits not yet implemented");
887 		sendValue!ubyte(0);
888 		sendValue(document);
889 		m_outRange.flush();
890 		return id;
891 	}
892 
893 	private void sendValue(T)(scope T value)
894 	{
895 		import std.traits;
896 		static if (is(T == ubyte)) m_outRange.put(value);
897 		else static if (is(T == int) || is(T == uint)) sendBytes(toBsonData(value));
898 		else static if (is(T == long)) sendBytes(toBsonData(value));
899 		else static if (is(T == Bson)) sendBytes(value.data);
900 		else static if (is(T == string)) {
901 			sendBytes(cast(const(ubyte)[])value);
902 			sendBytes(cast(const(ubyte)[])"\0");
903 		} else static if (isArray!T) {
904 			foreach (v; value)
905 				sendValue(v);
906 		} else static assert(false, "Unexpected type: "~T.stringof);
907 	}
908 
909 	private void sendBytes(in ubyte[] data){ m_outRange.put(data); }
910 
911 	private T recvInteger(T)() { ubyte[T.sizeof] ret; recv(ret); return fromBsonData!T(ret); }
912 	private alias recvUByte = recvInteger!ubyte;
913 	private alias recvInt = recvInteger!int;
914 	private alias recvUInt = recvInteger!uint;
915 	private alias recvLong = recvInteger!long;
916 	private Bson recvBson(ref ubyte[] buf)
917 	@system {
918 		int len = recvInt();
919 		ubyte[] dst;
920 		if (len > buf.length) dst = new ubyte[len];
921 		else {
922 			dst = buf[0 .. len];
923 			buf = buf[len .. $];
924 		}
925 		dst[0 .. 4] = toBsonData(len)[];
926 		recv(dst[4 .. $]);
927 		return Bson(Bson.Type.object, cast(immutable)dst);
928 	}
929 	private Bson recvBsonDup()
930 	@trusted {
931 		ubyte[4] size;
932 		recv(size[]);
933 		ubyte[] dst = new ubyte[fromBsonData!uint(size)];
934 		dst[0 .. 4] = size;
935 		recv(dst[4 .. $]);
936 		return Bson(Bson.Type.object, cast(immutable)dst);
937 	}
938 	private void recv(ubyte[] dst) { enforce(m_stream); m_stream.read(dst); m_bytesRead += dst.length; }
939 	private const(char)[] recvCString()
940 	{
941 		auto buf = new ubyte[32];
942 		ptrdiff_t i = -1;
943 		do
944 		{
945 			i++;
946 			if (i == buf.length) buf.length *= 2;
947 			recv(buf[i .. i + 1]);
948 		} while (buf[i] != 0);
949 		return cast(const(char)[])buf[0 .. i];
950 	}
951 
952 	private int nextMessageId() { return m_msgid++; }
953 
954 	private void checkForError(string collection_name)
955 	{
956 		auto coll = collection_name.split(".")[0];
957 		auto err = getLastError(coll);
958 
959 		enforce(
960 			err.code < 0,
961 			new MongoDBException(err)
962 		);
963 	}
964 
965 	private void certAuthenticate()
966 	{
967 		Bson cmd = Bson.emptyObject;
968 		cmd["authenticate"] = Bson(1);
969 		cmd["mechanism"] = Bson("MONGODB-X509");
970 		if (m_description.satisfiesVersion(WireVersion.v34))
971 		{
972 			if (m_settings.username.length)
973 				cmd["user"] = Bson(m_settings.username);
974 		}
975 		else
976 		{
977 			if (!m_settings.username.length)
978 				throw new MongoAuthException("No username provided but connected to MongoDB server <=3.2 not supporting this");
979 
980 			cmd["user"] = Bson(m_settings.username);
981 		}
982 		runCommand!(Bson, MongoAuthException)(m_settings.getAuthDatabase, cmd);
983 	}
984 
985 	private void authenticate()
986 	{
987 		scope (failure) disconnect();
988 	
989 		string cn = m_settings.getAuthDatabase;
990 
991 		auto cmd = Bson(["getnonce": Bson(1)]);
992 		auto result = runCommand!(Bson, MongoAuthException)(cn, cmd);
993 		string nonce = result["nonce"].get!string;
994 		string key = toLower(toHexString(md5Of(nonce ~ m_settings.username ~ m_settings.digest)).idup);
995 
996 		cmd = Bson.emptyObject;
997 		cmd["authenticate"] = Bson(1);
998 		cmd["mechanism"] = Bson("MONGODB-CR");
999 		cmd["nonce"] = Bson(nonce);
1000 		cmd["user"] = Bson(m_settings.username);
1001 		cmd["key"] = Bson(key);
1002 		runCommand!(Bson, MongoAuthException)(cn, cmd);
1003 	}
1004 
1005 	private void scramAuthenticate()
1006 	{
1007 		import vibe.db.mongo.sasl;
1008 
1009 		string cn = m_settings.getAuthDatabase;
1010 
1011 		ScramState state;
1012 		string payload = state.createInitialRequest(m_settings.username);
1013 
1014 		auto cmd = Bson.emptyObject;
1015 		cmd["saslStart"] = Bson(1);
1016 		cmd["mechanism"] = Bson("SCRAM-SHA-1");
1017 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
1018 
1019 		auto doc = runCommand!(Bson, MongoAuthException)(cn, cmd);
1020 		string response = cast(string)doc["payload"].get!BsonBinData().rawData;
1021 		Bson conversationId = doc["conversationId"];
1022 
1023 		payload = state.update(m_settings.digest, response);
1024 		cmd = Bson.emptyObject;
1025 		cmd["saslContinue"] = Bson(1);
1026 		cmd["conversationId"] = conversationId;
1027 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
1028 
1029 		doc = runCommand!(Bson, MongoAuthException)(cn, cmd);
1030 		response = cast(string)doc["payload"].get!BsonBinData().rawData;
1031 
1032 		payload = state.finalize(response);
1033 		cmd = Bson.emptyObject;
1034 		cmd["saslContinue"] = Bson(1);
1035 		cmd["conversationId"] = conversationId;
1036 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
1037 		runCommand!(Bson, MongoAuthException)(cn, cmd);
1038 	}
1039 }
1040 
1041 private enum OpCode : int {
1042 	Reply        = 1, // sent only by DB
1043 	Update       = 2001,
1044 	Insert       = 2002,
1045 	Reserved1    = 2003,
1046 	Query        = 2004,
1047 	GetMore      = 2005,
1048 	Delete       = 2006,
1049 	KillCursors  = 2007,
1050 
1051 	Compressed   = 2012,
1052 	Msg          = 2013,
1053 }
1054 
1055 private alias ReplyDelegate = void delegate(long cursor, ReplyFlags flags, int first_doc, int num_docs) @safe;
1056 private template DocDelegate(T) { alias DocDelegate = void delegate(size_t idx, ref T doc) @safe; }
1057 
1058 private alias MsgReplyDelegate(bool dupBson : true) = void delegate(uint flags, Bson document) @safe;
1059 private alias MsgReplyDelegate(bool dupBson : false) = void delegate(uint flags, scope Bson document) @safe;
1060 private alias MsgSection1StartDelegate = void delegate(scope const(char)[] identifier, int size) @safe;
1061 private alias MsgSection1Delegate(bool dupBson : true) = void delegate(scope const(char)[] identifier, Bson document) @safe;
1062 private alias MsgSection1Delegate(bool dupBson : false) = void delegate(scope const(char)[] identifier, scope Bson document) @safe;
1063 
1064 alias GetMoreHeaderDelegate = void delegate(long id, string ns, size_t count) @safe;
1065 alias GetMoreDocumentDelegate(T) = void delegate(ref T document) @safe;
1066 
1067 struct MongoDBInfo
1068 {
1069 	string name;
1070 	double sizeOnDisk;
1071 	bool empty;
1072 }
1073 
1074 private int sendLength(ARGS...)(ARGS args)
1075 {
1076 	import std.traits;
1077 	static if (ARGS.length == 1) {
1078 		alias T = ARGS[0];
1079 		static if (is(T == string)) return cast(int)args[0].length + 1;
1080 		else static if (is(T == int)) return 4;
1081 		else static if (is(T == long)) return 8;
1082 		else static if (is(T == Bson)) return cast(int)args[0].data.length;
1083 		else static if (isArray!T) {
1084 			int ret = 0;
1085 			foreach (el; args[0]) ret += sendLength(el);
1086 			return ret;
1087 		} else static assert(false, "Unexpected type: "~T.stringof);
1088 	}
1089 	else if (ARGS.length == 0) return 0;
1090 	else return sendLength(args[0 .. $/2]) + sendLength(args[$/2 .. $]);
1091 }
1092 
1093 struct ServerDescription
1094 {
1095 	enum ServerType
1096 	{
1097 		unknown,
1098 		standalone,
1099 		mongos,
1100 		possiblePrimary,
1101 		RSPrimary,
1102 		RSSecondary,
1103 		RSArbiter,
1104 		RSOther,
1105 		RSGhost
1106 	}
1107 
1108 @optional:
1109 	string address;
1110 	string error;
1111 	float roundTripTime = 0;
1112 	Nullable!BsonDate lastWriteDate;
1113 	Nullable!BsonObjectID opTime;
1114 	ServerType type = ServerType.unknown;
1115 	WireVersion minWireVersion, maxWireVersion;
1116 	string me;
1117 	string[] hosts, passives, arbiters;
1118 	string[string] tags;
1119 	string setName;
1120 	Nullable!int setVersion;
1121 	Nullable!BsonObjectID electionId;
1122 	string primary;
1123 	string lastUpdateTime = "infinity ago";
1124 	Nullable!int logicalSessionTimeoutMinutes;
1125 
1126 	bool satisfiesVersion(WireVersion wireVersion) @safe const @nogc pure nothrow
1127 	{
1128 		return maxWireVersion >= wireVersion;
1129 	}
1130 }
1131 
1132 enum WireVersion : int
1133 {
1134 	old = 0,
1135 	v26 = 1,
1136 	v26_2 = 2,
1137 	v30 = 3,
1138 	v32 = 4,
1139 	v34 = 5,
1140 	v36 = 6,
1141 	v40 = 7,
1142 	v42 = 8,
1143 	v44 = 9,
1144 	v49 = 12,
1145 	v50 = 13,
1146 	v51 = 14,
1147 	v52 = 15,
1148 	v53 = 16
1149 }
1150 
1151 private string getHostArchitecture()
1152 {
1153 	import os = std.system;
1154 
1155 	version (X86_64)
1156 		string arch = "x86_64 ";
1157 	else version (X86)
1158 		string arch = "x86 ";
1159 	else version (AArch64)
1160 		string arch = "aarch64 ";
1161 	else version (ARM_HardFloat)
1162 		string arch = "armhf ";
1163 	else version (ARM)
1164 		string arch = "arm ";
1165 	else version (PPC64)
1166 		string arch = "ppc64 ";
1167 	else version (PPC)
1168 		string arch = "ppc ";
1169 	else
1170 		string arch = "unknown ";
1171 
1172 	return arch ~ os.endian.to!string;
1173 }
1174 
1175 private static immutable hostArchitecture = getHostArchitecture;