1 /**
2 	Low level mongodb protocol.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.connection;
9 
10 public import vibe.data.bson;
11 
12 import vibe.core.core : vibeVersionString;
13 import vibe.core.log;
14 import vibe.core.net;
15 import vibe.db.mongo.settings;
16 import vibe.db.mongo.flags;
17 import vibe.inet.webform;
18 import vibe.stream.tls;
19 
20 import std.algorithm : map, splitter;
21 import std.array;
22 import std.conv;
23 import std.digest.md;
24 import std.exception;
25 import std.range;
26 import std.string;
27 import std.typecons;
28 
29 
30 private struct _MongoErrorDescription
31 {
32 	string message;
33 	int code;
34 	int connectionId;
35 	int n;
36 	double ok;
37 }
38 
39 /**
40  * D POD representation of Mongo error object.
41  *
42  * For successful queries "code" is negative.
43  * Can be used also to check how many documents where updated upon
44  * a successful query via "n" field.
45  */
46 alias MongoErrorDescription = immutable(_MongoErrorDescription);
47 
48 /**
49  * Root class for vibe.d Mongo driver exception hierarchy.
50  */
51 class MongoException : Exception
52 {
53 @safe:
54 
55 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
56 	{
57 		super(message, file, line, next);
58 	}
59 }
60 
61 /**
62  * Generic class for all exception related to unhandled driver problems.
63  *
64  * I.e.: protocol mismatch or unexpected mongo service behavior.
65  */
66 class MongoDriverException : MongoException
67 {
68 @safe:
69 
70 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
71 	{
72 		super(message, file, line, next);
73 	}
74 }
75 
76 /**
77  * Wrapper class for all inner mongo collection manipulation errors.
78  *
79  * It does not indicate problem with vibe.d driver itself. Most frequently this
80  * one is thrown when MongoConnection is in checked mode and getLastError() has something interesting.
81  */
82 class MongoDBException : MongoException
83 {
84 @safe:
85 
86 	MongoErrorDescription description;
87 	alias description this;
88 
89 	this(MongoErrorDescription description, string file = __FILE__,
90 			size_t line = __LINE__, Throwable next = null)
91 	{
92 		super(description.message, file, line, next);
93 		this.description = description;
94 	}
95 }
96 
97 /**
98  * Generic class for all exceptions related to authentication problems.
99  *
100  * I.e.: unsupported mechanisms or wrong credentials.
101  */
102 class MongoAuthException : MongoException
103 {
104 @safe:
105 
106 	this(string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
107 	{
108 		super(message, file, line, next);
109 	}
110 }
111 
112 /**
113   [internal] Provides low-level mongodb protocol access.
114 
115   It is not intended for direct usage. Please use vibe.db.mongo.db and vibe.db.mongo.collection modules for your code.
116   Note that a MongoConnection may only be used from one fiber/thread at a time.
117  */
118 final class MongoConnection {
119 @safe:
120 
121 	import vibe.stream.wrapper /* : StreamOutputRange, streamOutputRange */;
122 	import vibe.internal.interfaceproxy;
123 	import vibe.core.stream : InputStream, Stream;
124 
125 	private {
126 		MongoClientSettings m_settings;
127 		TCPConnection m_conn;
128 		InterfaceProxy!Stream m_stream;
129 		ulong m_bytesRead;
130 		int m_msgid = 1;
131 		StreamOutputRange!(InterfaceProxy!Stream) m_outRange;
132 		ServerDescription m_description;
133 		/// Flag to prevent recursive connections when server closes connection while connecting
134 		bool m_allowReconnect;
135 		bool m_isAuthenticating;
136 	}
137 
138 	enum ushort defaultPort = MongoClientSettings.defaultPort;
139 
140 	/// Simplified constructor overload, with no m_settings
141 	this(string server, ushort port = defaultPort)
142 	{
143 		m_settings = new MongoClientSettings();
144 		m_settings.hosts ~= MongoHost(server, port);
145 	}
146 
147 	this(MongoClientSettings cfg)
148 	{
149 		m_settings = cfg;
150 
151 		// Now let's check for features that are not yet supported.
152 		if(m_settings.hosts.length > 1)
153 			logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s",
154 					m_settings.hosts[0].name, m_settings.hosts[0].port);
155 	}
156 
157 	void connect()
158 	{
159 		bool isTLS;
160 
161 		/*
162 		 * TODO: Connect to one of the specified hosts taking into consideration
163 		 * options such as connect timeouts and so on.
164 		 */
165 		try {
166 			m_conn = connectTCP(m_settings.hosts[0].name, m_settings.hosts[0].port);
167 			m_conn.tcpNoDelay = true;
168 			if (m_settings.ssl) {
169 				auto ctx =  createTLSContext(TLSContextKind.client);
170 				if (!m_settings.sslverifycertificate) {
171 					ctx.peerValidationMode = TLSPeerValidationMode.none;
172 				}
173 				if (m_settings.sslPEMKeyFile) {
174 					ctx.useCertificateChainFile(m_settings.sslPEMKeyFile);
175 					ctx.usePrivateKeyFile(m_settings.sslPEMKeyFile);
176 				}
177 				if (m_settings.sslCAFile) {
178 					ctx.useTrustedCertificateFile(m_settings.sslCAFile);
179 				}
180 
181 				m_stream = createTLSStream(m_conn, ctx, m_settings.hosts[0].name);
182 				isTLS = true;
183 			}
184 			else {
185 				m_stream = m_conn;
186 			}
187 			m_outRange = streamOutputRange(m_stream);
188 		}
189 		catch (Exception e) {
190 			throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e);
191 		}
192 
193 		m_allowReconnect = false;
194 		scope (exit)
195 			m_allowReconnect = true;
196 
197 		Bson handshake = Bson.emptyObject;
198 		handshake["isMaster"] = Bson(1);
199 
200 		import os = std.system;
201 		import compiler = std.compiler;
202 		string platform = compiler.name ~ " "
203 			~ compiler.version_major.to!string ~ "." ~ compiler.version_minor.to!string;
204 		// TODO: add support for os.version
205 
206 		handshake["client"] = Bson([
207 			"driver": Bson(["name": Bson("vibe.db.mongo"), "version": Bson(vibeVersionString)]),
208 			"os": Bson(["type": Bson(os.os.to!string), "architecture": Bson(hostArchitecture)]),
209 			"platform": Bson(platform)
210 		]);
211 
212 		if (m_settings.appName.length) {
213 			enforce!MongoAuthException(m_settings.appName.length <= 128,
214 				"The application name may not be larger than 128 bytes");
215 			handshake["client"]["application"] = Bson(["name": Bson(m_settings.appName)]);
216 		}
217 
218 		query!Bson("$external.$cmd", QueryFlags.none, 0, -1, handshake, Bson(null),
219 			(cursor, flags, first_doc, num_docs) {
220 				enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure) && num_docs == 1,
221 					"Authentication handshake failed.");
222 			},
223 			(idx, ref doc) {
224 				enforce!MongoAuthException(doc["ok"].get!double == 1.0, "Authentication failed.");
225 				m_description = deserializeBson!ServerDescription(doc);
226 			});
227 
228 		m_bytesRead = 0;
229 		auto authMechanism = m_settings.authMechanism;
230 		if (authMechanism == MongoAuthMechanism.none)
231 		{
232 			if (m_settings.sslPEMKeyFile != null && m_description.satisfiesVersion(WireVersion.v26))
233 			{
234 				authMechanism = MongoAuthMechanism.mongoDBX509;
235 			}
236 			else if (m_settings.digest.length)
237 			{
238 				// SCRAM-SHA-1 default since 3.0, otherwise use legacy authentication
239 				if (m_description.satisfiesVersion(WireVersion.v30))
240 					authMechanism = MongoAuthMechanism.scramSHA1;
241 				else
242 					authMechanism = MongoAuthMechanism.mongoDBCR;
243 			}
244 		}
245 
246 		if (authMechanism == MongoAuthMechanism.mongoDBCR && m_description.satisfiesVersion(WireVersion.v40))
247 			throw new MongoAuthException("Trying to force MONGODB-CR authentication on a >=4.0 server not supported");
248 
249 		if (authMechanism == MongoAuthMechanism.scramSHA1 && !m_description.satisfiesVersion(WireVersion.v30))
250 			throw new MongoAuthException("Trying to force SCRAM-SHA-1 authentication on a <3.0 server not supported");
251 
252 		if (authMechanism == MongoAuthMechanism.mongoDBX509 && !m_description.satisfiesVersion(WireVersion.v26))
253 			throw new MongoAuthException("Trying to force MONGODB-X509 authentication on a <2.6 server not supported");
254 
255 		if (authMechanism == MongoAuthMechanism.mongoDBX509 && !isTLS)
256 			throw new MongoAuthException("Trying to force MONGODB-X509 authentication, but didn't use ssl!");
257 
258 		m_isAuthenticating = true;
259 		scope (exit)
260 			m_isAuthenticating = false;
261 		final switch (authMechanism)
262 		{
263 		case MongoAuthMechanism.none:
264 			break;
265 		case MongoAuthMechanism.mongoDBX509:
266 			certAuthenticate();
267 			break;
268 		case MongoAuthMechanism.scramSHA1:
269 			scramAuthenticate();
270 			break;
271 		case MongoAuthMechanism.mongoDBCR:
272 			authenticate();
273 			break;
274 		}
275 	}
276 
277 	void disconnect()
278 	{
279 		if (m_conn) {
280 			if (m_stream && m_conn.connected) {
281 				m_outRange.flush();
282 
283 				m_stream.finalize();
284 				m_stream = InterfaceProxy!Stream.init;
285 			}
286 
287 			m_conn.close();
288 			m_conn = TCPConnection.init;
289 		}
290 
291 		m_outRange.drop();
292 	}
293 
294 	@property bool connected() const { return m_conn && m_conn.connected; }
295 
296 	@property const(ServerDescription) description() const { return m_description; }
297 
298 	void update(string collection_name, UpdateFlags flags, Bson selector, Bson update)
299 	{
300 		scope(failure) disconnect();
301 		send(OpCode.Update, -1, cast(int)0, collection_name, cast(int)flags, selector, update);
302 		if (m_settings.safe) checkForError(collection_name);
303 	}
304 
305 	void insert(string collection_name, InsertFlags flags, Bson[] documents)
306 	{
307 		scope(failure) disconnect();
308 		foreach (d; documents) if (d["_id"].isNull()) d["_id"] = Bson(BsonObjectID.generate());
309 		send(OpCode.Insert, -1, cast(int)flags, collection_name, documents);
310 		if (m_settings.safe) checkForError(collection_name);
311 	}
312 
313 	void query(T)(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
314 	{
315 		scope(failure) disconnect();
316 		flags |= m_settings.defQueryFlags;
317 		int id;
318 		if (returnFieldSelector.isNull)
319 			id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query);
320 		else
321 			id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query, returnFieldSelector);
322 		recvReply!T(id, on_msg, on_doc);
323 	}
324 
325 	void getMore(T)(string collection_name, int nret, long cursor_id, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
326 	{
327 		scope(failure) disconnect();
328 		auto id = send(OpCode.GetMore, -1, cast(int)0, collection_name, nret, cursor_id);
329 		recvReply!T(id, on_msg, on_doc);
330 	}
331 
332 	void delete_(string collection_name, DeleteFlags flags, Bson selector)
333 	{
334 		scope(failure) disconnect();
335 		send(OpCode.Delete, -1, cast(int)0, collection_name, cast(int)flags, selector);
336 		if (m_settings.safe) checkForError(collection_name);
337 	}
338 
339 	void killCursors(long[] cursors)
340 	{
341 		scope(failure) disconnect();
342 		send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors);
343 	}
344 
345 	MongoErrorDescription getLastError(string db)
346 	{
347 		// Though higher level abstraction level by concept, this function
348 		// is implemented here to allow to check errors upon every request
349 		// on connection level.
350 
351 		Bson command_and_options = Bson.emptyObject;
352 		command_and_options["getLastError"] = Bson(1.0);
353 
354 		if(m_settings.w != m_settings.w.init)
355 			command_and_options["w"] = m_settings.w; // Already a Bson struct
356 		if(m_settings.wTimeoutMS != m_settings.wTimeoutMS.init)
357 			command_and_options["wtimeout"] = Bson(m_settings.wTimeoutMS);
358 		if(m_settings.journal)
359 			command_and_options["j"] = Bson(true);
360 		if(m_settings.fsync)
361 			command_and_options["fsync"] = Bson(true);
362 
363 		_MongoErrorDescription ret;
364 
365 		query!Bson(db ~ ".$cmd", QueryFlags.NoCursorTimeout | m_settings.defQueryFlags,
366 			0, -1, command_and_options, Bson(null),
367 			(cursor, flags, first_doc, num_docs) {
368 				logTrace("getLastEror(%s) flags: %s, cursor: %s, documents: %s", db, flags, cursor, num_docs);
369 				enforce(!(flags & ReplyFlags.QueryFailure),
370 					new MongoDriverException(format("MongoDB error: getLastError(%s) call failed.", db))
371 				);
372 				enforce(
373 					num_docs == 1,
374 					new MongoDriverException(format("getLastError(%s) returned %s documents instead of one.", db, num_docs))
375 				);
376 			},
377 			(idx, ref error) {
378 				try {
379 					ret = MongoErrorDescription(
380 						error["err"].opt!string(""),
381 						error["code"].opt!int(-1),
382 						error["connectionId"].opt!int(-1),
383 						error["n"].get!int(),
384 						error["ok"].get!double()
385 					);
386 				} catch (Exception e) {
387 					throw new MongoDriverException(e.msg);
388 				}
389 			}
390 		);
391 
392 		return ret;
393 	}
394 
395 	/** Queries the server for all databases.
396 
397 		Returns:
398 			An input range of $(D MongoDBInfo) values.
399 	*/
400 	auto listDatabases()
401 	{
402 		string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd";
403 
404 		auto cmd = Bson(["listDatabases":Bson(1)]);
405 
406 		void on_msg(long cursor, ReplyFlags flags, int first_doc, int num_docs) {
407 			if ((flags & ReplyFlags.QueryFailure))
408 				throw new MongoDriverException("Calling listDatabases failed.");
409 		}
410 
411 		static MongoDBInfo toInfo(const(Bson) db_doc) {
412 			return MongoDBInfo(
413 				db_doc["name"].get!string,
414 				db_doc["sizeOnDisk"].get!double,
415 				db_doc["empty"].get!bool
416 			);
417 		}
418 
419 		Bson result;
420 		void on_doc(size_t idx, ref Bson doc) {
421 			if (doc["ok"].get!double != 1.0)
422 				throw new MongoAuthException("listDatabases failed.");
423 
424 			result = doc["databases"];
425 		}
426 
427 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), &on_msg, &on_doc);
428 
429 		return result.byValue.map!toInfo;
430 	}
431 
432 	private int recvReply(T)(int reqid, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
433 	{
434 		import std.traits;
435 
436 		auto bytes_read = m_bytesRead;
437 		int msglen = recvInt();
438 		int resid = recvInt();
439 		int respto = recvInt();
440 		int opcode = recvInt();
441 
442 		enforce(respto == reqid, "Reply is not for the expected message on a sequential connection!");
443 		enforce(opcode == OpCode.Reply, "Got a non-'Reply' reply!");
444 
445 		auto flags = cast(ReplyFlags)recvInt();
446 		long cursor = recvLong();
447 		int start = recvInt();
448 		int numret = recvInt();
449 
450 		scope (exit) {
451 			if (m_bytesRead - bytes_read < msglen) {
452 				logWarn("MongoDB reply was longer than expected, skipping the rest: %d vs. %d", msglen, m_bytesRead - bytes_read);
453 				ubyte[] dst = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)];
454 				recv(dst);
455 			} else if (m_bytesRead - bytes_read > msglen) {
456 				logWarn("MongoDB reply was shorter than expected. Dropping connection.");
457 				disconnect();
458 				throw new MongoDriverException("MongoDB reply was too short for data.");
459 			}
460 		}
461 
462 		on_msg(cursor, flags, start, numret);
463 		static if (hasIndirections!T || is(T == Bson))
464 			auto buf = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)];
465 		foreach (i; 0 .. cast(size_t)numret) {
466 			// TODO: directly deserialize from the wire
467 			static if (!hasIndirections!T && !is(T == Bson)) {
468 				ubyte[256] buf = void;
469 				ubyte[] bufsl = buf;
470 				auto bson = () @trusted { return recvBson(bufsl); } ();
471 			} else {
472 				auto bson = () @trusted { return recvBson(buf); } ();
473 			}
474 
475 			// logDebugV("Received mongo response on %s:%s: %s", reqid, i, bson);
476 
477 			static if (is(T == Bson)) on_doc(i, bson);
478 			else {
479 				T doc = deserializeBson!T(bson);
480 				on_doc(i, doc);
481 			}
482 		}
483 
484 		return resid;
485 	}
486 
487 	private int send(ARGS...)(OpCode code, int response_to, ARGS args)
488 	{
489 		if( !connected() ) {
490 			if (m_allowReconnect) connect();
491 			else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating");
492 			else throw new MongoDriverException("Connection got closed while connecting");
493 		}
494 		int id = nextMessageId();
495 		// sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types
496 		sendValue!int(16 + sendLength(args));
497 		sendValue!int(id);
498 		sendValue!int(response_to);
499 		sendValue!int(cast(int)code);
500 		foreach (a; args) sendValue(a);
501 		m_outRange.flush();
502 		// logDebugV("Sent mongo opcode %s (id %s) in response to %s with args %s", code, id, response_to, tuple(args));
503 		return id;
504 	}
505 
506 	private void sendValue(T)(T value)
507 	{
508 		import std.traits;
509 		static if (is(T == int)) sendBytes(toBsonData(value));
510 		else static if (is(T == long)) sendBytes(toBsonData(value));
511 		else static if (is(T == Bson)) sendBytes(value.data);
512 		else static if (is(T == string)) {
513 			sendBytes(cast(const(ubyte)[])value);
514 			sendBytes(cast(const(ubyte)[])"\0");
515 		} else static if (isArray!T) {
516 			foreach (v; value)
517 				sendValue(v);
518 		} else static assert(false, "Unexpected type: "~T.stringof);
519 	}
520 
521 	private void sendBytes(in ubyte[] data){ m_outRange.put(data); }
522 
523 	private int recvInt() { ubyte[int.sizeof] ret; recv(ret); return fromBsonData!int(ret); }
524 	private long recvLong() { ubyte[long.sizeof] ret; recv(ret); return fromBsonData!long(ret); }
525 	private Bson recvBson(ref ubyte[] buf)
526 	@system {
527 		int len = recvInt();
528 		ubyte[] dst;
529 		if (len > buf.length) dst = new ubyte[len];
530 		else {
531 			dst = buf[0 .. len];
532 			buf = buf[len .. $];
533 		}
534 		dst[0 .. 4] = toBsonData(len)[];
535 		recv(dst[4 .. $]);
536 		return Bson(Bson.Type.Object, cast(immutable)dst);
537 	}
538 	private void recv(ubyte[] dst) { enforce(m_stream); m_stream.read(dst); m_bytesRead += dst.length; }
539 
540 	private int nextMessageId() { return m_msgid++; }
541 
542 	private void checkForError(string collection_name)
543 	{
544 		auto coll = collection_name.split(".")[0];
545 		auto err = getLastError(coll);
546 
547 		enforce(
548 			err.code < 0,
549 			new MongoDBException(err)
550 		);
551 	}
552 
553 	private void certAuthenticate()
554 	{
555 		Bson cmd = Bson.emptyObject;
556 		cmd["authenticate"] = Bson(1);
557 		cmd["mechanism"] = Bson("MONGODB-X509");
558 		if (m_description.satisfiesVersion(WireVersion.v34))
559 		{
560 			if (m_settings.username.length)
561 				cmd["user"] = Bson(m_settings.username);
562 		}
563 		else
564 		{
565 			if (!m_settings.username.length)
566 				throw new MongoAuthException("No username provided but connected to MongoDB server <=3.2 not supporting this");
567 
568 			cmd["user"] = Bson(m_settings.username);
569 		}
570 		query!Bson("$external.$cmd", QueryFlags.None, 0, -1, cmd, Bson(null),
571 			(cursor, flags, first_doc, num_docs) {
572 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
573 					throw new MongoDriverException("Calling authenticate failed.");
574 			},
575 			(idx, ref doc) {
576 				if (doc["ok"].get!double != 1.0)
577 					throw new MongoAuthException("Authentication failed.");
578 			}
579 		);
580 	}
581 
582 	private void authenticate()
583 	{
584 		string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd";
585 
586 		string nonce, key;
587 
588 		auto cmd = Bson(["getnonce":Bson(1)]);
589 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null),
590 			(cursor, flags, first_doc, num_docs) {
591 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
592 					throw new MongoDriverException("Calling getNonce failed.");
593 			},
594 			(idx, ref doc) {
595 				if (doc["ok"].get!double != 1.0)
596 					throw new MongoDriverException("getNonce failed.");
597 				nonce = doc["nonce"].get!string;
598 				key = toLower(toHexString(md5Of(nonce ~ m_settings.username ~ m_settings.digest)).idup);
599 			}
600 		);
601 
602 		cmd = Bson.emptyObject;
603 		cmd["authenticate"] = Bson(1);
604 		cmd["mechanism"] = Bson("MONGODB-CR");
605 		cmd["nonce"] = Bson(nonce);
606 		cmd["user"] = Bson(m_settings.username);
607 		cmd["key"] = Bson(key);
608 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null),
609 			(cursor, flags, first_doc, num_docs) {
610 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
611 					throw new MongoDriverException("Calling authenticate failed.");
612 			},
613 			(idx, ref doc) {
614 				if (doc["ok"].get!double != 1.0)
615 					throw new MongoAuthException("Authentication failed.");
616 			}
617 		);
618 	}
619 
620 	private void scramAuthenticate()
621 	{
622 		import vibe.db.mongo.sasl;
623 		string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd";
624 
625 		ScramState state;
626 		string payload = state.createInitialRequest(m_settings.username);
627 
628 		auto cmd = Bson.emptyObject;
629 		cmd["saslStart"] = Bson(1);
630 		cmd["mechanism"] = Bson("SCRAM-SHA-1");
631 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
632 		string response;
633 		Bson conversationId;
634 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null),
635 			(cursor, flags, first_doc, num_docs) {
636 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
637 					throw new MongoDriverException("SASL start failed.");
638 			},
639 			(idx, ref doc) {
640 				if (doc["ok"].get!double != 1.0)
641 					throw new MongoAuthException("Authentication failed.");
642 				response = cast(string)doc["payload"].get!BsonBinData().rawData;
643 				conversationId = doc["conversationId"];
644 			});
645 		payload = state.update(m_settings.digest, response);
646 		cmd = Bson.emptyObject;
647 		cmd["saslContinue"] = Bson(1);
648 		cmd["conversationId"] = conversationId;
649 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
650 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null),
651 			(cursor, flags, first_doc, num_docs) {
652 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
653 					throw new MongoDriverException("SASL continue failed.");
654 			},
655 			(idx, ref doc) {
656 				if (doc["ok"].get!double != 1.0)
657 					throw new MongoAuthException("Authentication failed.");
658 				response = cast(string)doc["payload"].get!BsonBinData().rawData;
659 			});
660 
661 		payload = state.finalize(response);
662 		cmd = Bson.emptyObject;
663 		cmd["saslContinue"] = Bson(1);
664 		cmd["conversationId"] = conversationId;
665 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
666 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null),
667 			(cursor, flags, first_doc, num_docs) {
668 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
669 					throw new MongoDriverException("SASL finish failed.");
670 			},
671 			(idx, ref doc) {
672 				if (doc["ok"].get!double != 1.0)
673 					throw new MongoAuthException("Authentication failed.");
674 			});
675 	}
676 }
677 
678 private enum OpCode : int {
679 	Reply        = 1, // sent only by DB
680 	Msg          = 1000,
681 	Update       = 2001,
682 	Insert       = 2002,
683 	Reserved1    = 2003,
684 	Query        = 2004,
685 	GetMore      = 2005,
686 	Delete       = 2006,
687 	KillCursors  = 2007
688 }
689 
690 alias ReplyDelegate = void delegate(long cursor, ReplyFlags flags, int first_doc, int num_docs) @safe;
691 template DocDelegate(T) { alias DocDelegate = void delegate(size_t idx, ref T doc) @safe; }
692 
693 struct MongoDBInfo
694 {
695 	string name;
696 	double sizeOnDisk;
697 	bool empty;
698 }
699 
700 private int sendLength(ARGS...)(ARGS args)
701 {
702 	import std.traits;
703 	static if (ARGS.length == 1) {
704 		alias T = ARGS[0];
705 		static if (is(T == string)) return cast(int)args[0].length + 1;
706 		else static if (is(T == int)) return 4;
707 		else static if (is(T == long)) return 8;
708 		else static if (is(T == Bson)) return cast(int)args[0].data.length;
709 		else static if (isArray!T) {
710 			int ret = 0;
711 			foreach (el; args[0]) ret += sendLength(el);
712 			return ret;
713 		} else static assert(false, "Unexpected type: "~T.stringof);
714 	}
715 	else if (ARGS.length == 0) return 0;
716 	else return sendLength(args[0 .. $/2]) + sendLength(args[$/2 .. $]);
717 }
718 
719 struct ServerDescription
720 {
721 	enum ServerType
722 	{
723 		unknown,
724 		standalone,
725 		mongos,
726 		possiblePrimary,
727 		RSPrimary,
728 		RSSecondary,
729 		RSArbiter,
730 		RSOther,
731 		RSGhost
732 	}
733 
734 @optional:
735 	string address;
736 	string error;
737 	float roundTripTime = 0;
738 	Nullable!BsonDate lastWriteDate;
739 	Nullable!BsonObjectID opTime;
740 	ServerType type = ServerType.unknown;
741 	WireVersion minWireVersion, maxWireVersion;
742 	string me;
743 	string[] hosts, passives, arbiters;
744 	string[string] tags;
745 	string setName;
746 	Nullable!int setVersion;
747 	Nullable!BsonObjectID electionId;
748 	string primary;
749 	string lastUpdateTime = "infinity ago";
750 	Nullable!int logicalSessionTimeoutMinutes;
751 
752 	bool satisfiesVersion(WireVersion wireVersion) @safe const @nogc pure nothrow
753 	{
754 		return maxWireVersion >= wireVersion;
755 	}
756 }
757 
758 enum WireVersion : int
759 {
760 	old,
761 	v26,
762 	v26_2,
763 	v30,
764 	v32,
765 	v34,
766 	v36,
767 	v40,
768 	v42
769 }
770 
771 private string getHostArchitecture()
772 {
773 	import os = std.system;
774 
775 	version (X86_64)
776 		string arch = "x86_64 ";
777 	else version (X86)
778 		string arch = "x86 ";
779 	else version (AArch64)
780 		string arch = "aarch64 ";
781 	else version (ARM_HardFloat)
782 		string arch = "armhf ";
783 	else version (ARM)
784 		string arch = "arm ";
785 	else version (PPC64)
786 		string arch = "ppc64 ";
787 	else version (PPC)
788 		string arch = "ppc ";
789 	else
790 		string arch = "unknown ";
791 
792 	return arch ~ os.endian.to!string;
793 }
794 
795 private static immutable hostArchitecture = getHostArchitecture;