1 /**
2 	Low level mongodb protocol.
3 
4 	Copyright: © 2012-2016 RejectedSoftware e.K.
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.connection;
9 
10 public import vibe.data.bson;
11 
12 import vibe.core.log;
13 import vibe.core.net;
14 import vibe.db.mongo.settings;
15 import vibe.db.mongo.flags;
16 import vibe.inet.webform;
17 import vibe.stream.tls;
18 
19 import std.algorithm : map, splitter;
20 import std.array;
21 import std.range;
22 import std.conv;
23 import std.exception;
24 import std.string;
25 import std.digest.md;
26 
27 
28 private struct _MongoErrorDescription
29 {
30 	string message;
31 	int code;
32 	int connectionId;
33 	int n;
34 	double ok;
35 }
36 
37 /**
38  * D POD representation of Mongo error object.
39  *
40  * For successful queries "code" is negative.
41  * Can be used also to check how many documents where updated upon
42  * a successful query via "n" field.
43  */
44 alias MongoErrorDescription = immutable(_MongoErrorDescription);
45 
46 /**
47  * Root class for vibe.d Mongo driver exception hierarchy.
48  */
49 class MongoException : Exception
50 {
51 @safe:
52 
53 	this(string message, string file = __FILE__, int line = __LINE__, Throwable next = null)
54 	{
55 		super(message, file, line, next);
56 	}
57 }
58 
59 /**
60  * Generic class for all exception related to unhandled driver problems.
61  *
62  * I.e.: protocol mismatch or unexpected mongo service behavior.
63  */
64 class MongoDriverException : MongoException
65 {
66 @safe:
67 
68 	this(string message, string file = __FILE__, int line = __LINE__, Throwable next = null)
69 	{
70 		super(message, file, line, next);
71 	}
72 }
73 
74 /**
75  * Wrapper class for all inner mongo collection manipulation errors.
76  *
77  * It does not indicate problem with vibe.d driver itself. Most frequently this
78  * one is thrown when MongoConnection is in checked mode and getLastError() has something interesting.
79  */
80 class MongoDBException : MongoException
81 {
82 @safe:
83 
84 	MongoErrorDescription description;
85 	alias description this;
86 
87 	this(MongoErrorDescription description, string file = __FILE__,
88 			int line = __LINE__, Throwable next = null)
89 	{
90 		super(description.message, file, line, next);
91 		this.description = description;
92 	}
93 }
94 
95 /**
96  * Generic class for all exceptions related to authentication problems.
97  *
98  * I.e.: unsupported mechanisms or wrong credentials.
99  */
100 class MongoAuthException : MongoException
101 {
102 @safe:
103 
104 	this(string message, string file = __FILE__, int line = __LINE__, Throwable next = null)
105 	{
106 		super(message, file, line, next);
107 	}
108 }
109 
110 /**
111   [internal] Provides low-level mongodb protocol access.
112 
113   It is not intended for direct usage. Please use vibe.db.mongo.db and vibe.db.mongo.collection modules for your code.
114   Note that a MongoConnection may only be used from one fiber/thread at a time.
115  */
116 final class MongoConnection {
117 @safe:
118 
119 	import vibe.stream.wrapper : StreamOutputRange, streamOutputRange;
120 	import vibe.internal.interfaceproxy;
121 	import vibe.core.stream : InputStream, Stream;
122 
123 	private {
124 		MongoClientSettings m_settings;
125 		TCPConnection m_conn;
126 		InterfaceProxy!Stream m_stream;
127 		ulong m_bytesRead;
128 		int m_msgid = 1;
129 		StreamOutputRange!(InterfaceProxy!Stream) m_outRange;
130 	}
131 
132 	enum ushort defaultPort = MongoClientSettings.defaultPort;
133 
134 	/// Simplified constructor overload, with no m_settings
135 	this(string server, ushort port = defaultPort)
136 	{
137 		m_settings = new MongoClientSettings();
138 		m_settings.hosts ~= MongoHost(server, port);
139 	}
140 
141 	this(MongoClientSettings cfg)
142 	{
143 		m_settings = cfg;
144 
145 		// Now let's check for features that are not yet supported.
146 		if(m_settings.hosts.length > 1)
147 			logWarn("Multiple mongodb hosts are not yet supported. Using first one: %s:%s",
148 					m_settings.hosts[0].name, m_settings.hosts[0].port);
149 	}
150 
151 	void connect()
152 	{
153 		/*
154 		 * TODO: Connect to one of the specified hosts taking into consideration
155 		 * options such as connect timeouts and so on.
156 		 */
157 		try {
158 			m_conn = connectTCP(m_settings.hosts[0].name, m_settings.hosts[0].port);
159 			m_conn.tcpNoDelay = true;
160 			if (m_settings.ssl) {
161 				auto ctx =  createTLSContext(TLSContextKind.client);
162 				if (!m_settings.sslverifycertificate) {
163 					ctx.peerValidationMode = TLSPeerValidationMode.none;
164 				}
165 				if (m_settings.sslPEMKeyFile) {
166 					ctx.useCertificateChainFile(m_settings.sslPEMKeyFile);
167 					ctx.usePrivateKeyFile(m_settings.sslPEMKeyFile);
168 				}
169 				if (m_settings.sslCAFile) {
170 					ctx.useTrustedCertificateFile(m_settings.sslCAFile);
171 				}
172 
173 				m_stream = createTLSStream(m_conn, ctx, m_settings.hosts[0].name);
174 			}
175 			else {
176 				m_stream = m_conn;
177 			}
178 			m_outRange = streamOutputRange(m_stream);
179 		}
180 		catch (Exception e) {
181 			throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e);
182 		}
183 
184 		m_bytesRead = 0;
185 		if(m_settings.digest != string.init)
186 		{
187 			if (m_settings.authMechanism == MongoAuthMechanism.none)
188 				authenticate();
189 			else {
190 				/**
191 				SCRAM-SHA-1 was released in March 2015 and on a properly
192 				configured Mongo instance Authentication.none is disabled.
193 				However, as a fallback to avoid breakage with old setups,
194 				no authentication is tried in case of an error.
195 				*/
196 				try
197 					scramAuthenticate();
198 				catch (MongoAuthException e)
199 					authenticate();
200 			}
201 
202 		}
203 		else if (m_settings.sslPEMKeyFile != null && m_settings.username != null)
204 		{
205 			certAuthenticate();
206 		}
207 	}
208 
209 	void disconnect()
210 	{
211 		if (m_conn) {
212 			if (m_stream && m_conn.connected) {
213 				m_outRange.flush();
214 
215 				m_stream.finalize();
216 				m_stream = InterfaceProxy!Stream.init;
217 			}
218 
219 			m_conn.close();
220 			m_conn = TCPConnection.init;
221 		}
222 
223 		m_outRange.drop();
224 	}
225 
226 	@property bool connected() const { return m_conn && m_conn.connected; }
227 
228 
229 	void update(string collection_name, UpdateFlags flags, Bson selector, Bson update)
230 	{
231 		scope(failure) disconnect();
232 		send(OpCode.Update, -1, cast(int)0, collection_name, cast(int)flags, selector, update);
233 		if (m_settings.safe) checkForError(collection_name);
234 	}
235 
236 	void insert(string collection_name, InsertFlags flags, Bson[] documents)
237 	{
238 		scope(failure) disconnect();
239 		foreach (d; documents) if (d["_id"].isNull()) d["_id"] = Bson(BsonObjectID.generate());
240 		send(OpCode.Insert, -1, cast(int)flags, collection_name, documents);
241 		if (m_settings.safe) checkForError(collection_name);
242 	}
243 
244 	void query(T)(string collection_name, QueryFlags flags, int nskip, int nret, Bson query, Bson returnFieldSelector, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
245 	{
246 		scope(failure) disconnect();
247 		flags |= m_settings.defQueryFlags;
248 		int id;
249 		if (returnFieldSelector.isNull)
250 			id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query);
251 		else
252 			id = send(OpCode.Query, -1, cast(int)flags, collection_name, nskip, nret, query, returnFieldSelector);
253 		recvReply!T(id, on_msg, on_doc);
254 	}
255 
256 	void getMore(T)(string collection_name, int nret, long cursor_id, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
257 	{
258 		scope(failure) disconnect();
259 		auto id = send(OpCode.GetMore, -1, cast(int)0, collection_name, nret, cursor_id);
260 		recvReply!T(id, on_msg, on_doc);
261 	}
262 
263 	void delete_(string collection_name, DeleteFlags flags, Bson selector)
264 	{
265 		scope(failure) disconnect();
266 		send(OpCode.Delete, -1, cast(int)0, collection_name, cast(int)flags, selector);
267 		if (m_settings.safe) checkForError(collection_name);
268 	}
269 
270 	void killCursors(long[] cursors)
271 	{
272 		scope(failure) disconnect();
273 		send(OpCode.KillCursors, -1, cast(int)0, cast(int)cursors.length, cursors);
274 	}
275 
276 	MongoErrorDescription getLastError(string db)
277 	{
278 		// Though higher level abstraction level by concept, this function
279 		// is implemented here to allow to check errors upon every request
280 		// on conncetion level.
281 
282 		Bson command_and_options = Bson.emptyObject;
283 		command_and_options["getLastError"] = Bson(1.0);
284 
285 		if(m_settings.w != m_settings.w.init)
286 			command_and_options["w"] = m_settings.w; // Already a Bson struct
287 		if(m_settings.wTimeoutMS != m_settings.wTimeoutMS.init)
288 			command_and_options["wtimeout"] = Bson(m_settings.wTimeoutMS);
289 		if(m_settings.journal)
290 			command_and_options["j"] = Bson(true);
291 		if(m_settings.fsync)
292 			command_and_options["fsync"] = Bson(true);
293 
294 		_MongoErrorDescription ret;
295 
296 		query!Bson(db ~ ".$cmd", QueryFlags.NoCursorTimeout | m_settings.defQueryFlags,
297 			0, -1, command_and_options, Bson(null),
298 			(cursor, flags, first_doc, num_docs) {
299 				logTrace("getLastEror(%s) flags: %s, cursor: %s, documents: %s", db, flags, cursor, num_docs);
300 				enforce(!(flags & ReplyFlags.QueryFailure),
301 					new MongoDriverException(format("MongoDB error: getLastError(%s) call failed.", db))
302 				);
303 				enforce(
304 					num_docs == 1,
305 					new MongoDriverException(format("getLastError(%s) returned %s documents instead of one.", db, num_docs))
306 				);
307 			},
308 			(idx, ref error) {
309 				try {
310 					ret = MongoErrorDescription(
311 						error["err"].opt!string(""),
312 						error["code"].opt!int(-1),
313 						error["connectionId"].opt!int(-1),
314 						error["n"].get!int(),
315 						error["ok"].get!double()
316 					);
317 				} catch (Exception e) {
318 					throw new MongoDriverException(e.msg);
319 				}
320 			}
321 		);
322 
323 		return ret;
324 	}
325 
326 	/** Queries the server for all databases.
327 
328 		Returns:
329 			An input range of $(D MongoDBInfo) values.
330 	*/
331 	auto listDatabases()
332 	{
333 		string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd";
334 
335 		auto cmd = Bson(["listDatabases":Bson(1)]);
336 
337 		void on_msg(long cursor, ReplyFlags flags, int first_doc, int num_docs) {
338 			if ((flags & ReplyFlags.QueryFailure))
339 				throw new MongoDriverException("Calling listDatabases failed.");
340 		}
341 
342 		static MongoDBInfo toInfo(const(Bson) db_doc) {
343 			return MongoDBInfo(
344 				db_doc["name"].get!string,
345 				db_doc["sizeOnDisk"].get!double,
346 				db_doc["empty"].get!bool
347 			);
348 		}
349 
350 		Bson result;
351 		void on_doc(size_t idx, ref Bson doc) {
352 			if (doc["ok"].get!double != 1.0)
353 				throw new MongoAuthException("listDatabases failed.");
354 
355 			result = doc["databases"];
356 		}
357 
358 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null), &on_msg, &on_doc);
359 
360 		return result.byValue.map!toInfo;
361 	}
362 
363 	private int recvReply(T)(int reqid, scope ReplyDelegate on_msg, scope DocDelegate!T on_doc)
364 	{
365 		import std.traits;
366 
367 		auto bytes_read = m_bytesRead;
368 		int msglen = recvInt();
369 		int resid = recvInt();
370 		int respto = recvInt();
371 		int opcode = recvInt();
372 
373 		enforce(respto == reqid, "Reply is not for the expected message on a sequential connection!");
374 		enforce(opcode == OpCode.Reply, "Got a non-'Reply' reply!");
375 
376 		auto flags = cast(ReplyFlags)recvInt();
377 		long cursor = recvLong();
378 		int start = recvInt();
379 		int numret = recvInt();
380 
381 		scope (exit) {
382 			if (m_bytesRead - bytes_read < msglen) {
383 				logWarn("MongoDB reply was longer than expected, skipping the rest: %d vs. %d", msglen, m_bytesRead - bytes_read);
384 				ubyte[] dst = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)];
385 				recv(dst);
386 			} else if (m_bytesRead - bytes_read > msglen) {
387 				logWarn("MongoDB reply was shorter than expected. Dropping connection.");
388 				disconnect();
389 				throw new MongoDriverException("MongoDB reply was too short for data.");
390 			}
391 		}
392 
393 		on_msg(cursor, flags, start, numret);
394 		static if (hasIndirections!T || is(T == Bson))
395 			auto buf = new ubyte[msglen - cast(size_t)(m_bytesRead - bytes_read)];
396 		foreach (i; 0 .. cast(size_t)numret) {
397 			// TODO: directly deserialize from the wire
398 			static if (!hasIndirections!T && !is(T == Bson)) {
399 				ubyte[256] buf = void;
400 				ubyte[] bufsl = buf;
401 				auto bson = () @trusted { return recvBson(bufsl); } ();
402 			} else {
403 				auto bson = () @trusted { return recvBson(buf); } ();
404 			}
405 
406 			static if (is(T == Bson)) on_doc(i, bson);
407 			else {
408 				T doc = deserializeBson!T(bson);
409 				on_doc(i, doc);
410 			}
411 		}
412 
413 		return resid;
414 	}
415 
416 	private int send(ARGS...)(OpCode code, int response_to, ARGS args)
417 	{
418 		if( !connected() ) connect();
419 		int id = nextMessageId();
420 		sendValue(16 + sendLength(args));
421 		sendValue(id);
422 		sendValue(response_to);
423 		sendValue(cast(int)code);
424 		foreach (a; args) sendValue(a);
425 		m_outRange.flush();
426 		return id;
427 	}
428 
429 	private void sendValue(T)(T value)
430 	{
431 		import std.traits;
432 		static if (is(T == int)) sendBytes(toBsonData(value));
433 		else static if (is(T == long)) sendBytes(toBsonData(value));
434 		else static if (is(T == Bson)) sendBytes(value.data);
435 		else static if (is(T == string)) {
436 			sendBytes(cast(const(ubyte)[])value);
437 			sendBytes(cast(const(ubyte)[])"\0");
438 		} else static if (isArray!T) {
439 			foreach (v; value)
440 				sendValue(v);
441 		} else static assert(false, "Unexpected type: "~T.stringof);
442 	}
443 
444 	private void sendBytes(in ubyte[] data){ m_outRange.put(data); }
445 
446 	private int recvInt() { ubyte[int.sizeof] ret; recv(ret); return fromBsonData!int(ret); }
447 	private long recvLong() { ubyte[long.sizeof] ret; recv(ret); return fromBsonData!long(ret); }
448 	private Bson recvBson(ref ubyte[] buf)
449 	@system {
450 		int len = recvInt();
451 		ubyte[] dst;
452 		if (len > buf.length) dst = new ubyte[len];
453 		else {
454 			dst = buf[0 .. len];
455 			buf = buf[len .. $];
456 		}
457 		dst[0 .. 4] = toBsonData(len)[];
458 		recv(dst[4 .. $]);
459 		return Bson(Bson.Type.Object, cast(immutable)dst);
460 	}
461 	private void recv(ubyte[] dst) { enforce(m_stream); m_stream.read(dst); m_bytesRead += dst.length; }
462 
463 	private int nextMessageId() { return m_msgid++; }
464 
465 	private void checkForError(string collection_name)
466 	{
467 		auto coll = collection_name.split(".")[0];
468 		auto err = getLastError(coll);
469 
470 		enforce(
471 			err.code < 0,
472 			new MongoDBException(err)
473 		);
474 	}
475 
476 	private void certAuthenticate()
477 	{
478 		Bson cmd = Bson.emptyObject;
479 		cmd["authenticate"] = Bson(1);
480 		cmd["mechanism"] = Bson("MONGODB-X509");
481 		cmd["user"] = Bson(m_settings.username);
482 		query!Bson("$external.$cmd", QueryFlags.None, 0, -1, cmd, Bson(null),
483 			(cursor, flags, first_doc, num_docs) {
484 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
485 					throw new MongoDriverException("Calling authenticate failed.");
486 			},
487 			(idx, ref doc) {
488 				if (doc["ok"].get!double != 1.0)
489 					throw new MongoAuthException("Authentication failed.");
490 			}
491 		);
492 	}
493 
494 	private void authenticate()
495 	{
496 		string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd";
497 
498 		string nonce, key;
499 
500 		auto cmd = Bson(["getnonce":Bson(1)]);
501 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null),
502 			(cursor, flags, first_doc, num_docs) {
503 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
504 					throw new MongoDriverException("Calling getNonce failed.");
505 			},
506 			(idx, ref doc) {
507 				if (doc["ok"].get!double != 1.0)
508 					throw new MongoDriverException("getNonce failed.");
509 				nonce = doc["nonce"].get!string;
510 				key = toLower(toHexString(md5Of(nonce ~ m_settings.username ~ m_settings.digest)).idup);
511 			}
512 		);
513 
514 		cmd = Bson.emptyObject;
515 		cmd["authenticate"] = Bson(1);
516 		cmd["mechanism"] = Bson("MONGODB-CR");
517 		cmd["nonce"] = Bson(nonce);
518 		cmd["user"] = Bson(m_settings.username);
519 		cmd["key"] = Bson(key);
520 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null),
521 			(cursor, flags, first_doc, num_docs) {
522 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
523 					throw new MongoDriverException("Calling authenticate failed.");
524 			},
525 			(idx, ref doc) {
526 				if (doc["ok"].get!double != 1.0)
527 					throw new MongoAuthException("Authentication failed.");
528 			}
529 		);
530 	}
531 
532 	private void scramAuthenticate()
533 	{
534 		import vibe.db.mongo.sasl;
535 		string cn = (m_settings.database == string.init ? "admin" : m_settings.database) ~ ".$cmd";
536 
537 		ScramState state;
538 		string payload = state.createInitialRequest(m_settings.username);
539 
540 		auto cmd = Bson.emptyObject;
541 		cmd["saslStart"] = Bson(1);
542 		cmd["mechanism"] = Bson("SCRAM-SHA-1");
543 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
544 		string response;
545 		Bson conversationId;
546 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null),
547 			(cursor, flags, first_doc, num_docs) {
548 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
549 					throw new MongoDriverException("SASL start failed.");
550 			},
551 			(idx, ref doc) {
552 				if (doc["ok"].get!double != 1.0)
553 					throw new MongoAuthException("Authentication failed.");
554 				response = cast(string)doc["payload"].get!BsonBinData().rawData;
555 				conversationId = doc["conversationId"];
556 			});
557 		payload = state.update(m_settings.digest, response);
558 		cmd = Bson.emptyObject;
559 		cmd["saslContinue"] = Bson(1);
560 		cmd["conversationId"] = conversationId;
561 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
562 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null),
563 			(cursor, flags, first_doc, num_docs) {
564 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
565 					throw new MongoDriverException("SASL continue failed.");
566 			},
567 			(idx, ref doc) {
568 				if (doc["ok"].get!double != 1.0)
569 					throw new MongoAuthException("Authentication failed.");
570 				response = cast(string)doc["payload"].get!BsonBinData().rawData;
571 			});
572 
573 		payload = state.finalize(response);
574 		cmd = Bson.emptyObject;
575 		cmd["saslContinue"] = Bson(1);
576 		cmd["conversationId"] = conversationId;
577 		cmd["payload"] = Bson(BsonBinData(BsonBinData.Type.generic, payload.representation));
578 		query!Bson(cn, QueryFlags.None, 0, -1, cmd, Bson(null),
579 			(cursor, flags, first_doc, num_docs) {
580 				if ((flags & ReplyFlags.QueryFailure) || num_docs != 1)
581 					throw new MongoDriverException("SASL finish failed.");
582 			},
583 			(idx, ref doc) {
584 				if (doc["ok"].get!double != 1.0)
585 					throw new MongoAuthException("Authentication failed.");
586 			});
587 	}
588 }
589 
590 private enum OpCode : int {
591 	Reply        = 1, // sent only by DB
592 	Msg          = 1000,
593 	Update       = 2001,
594 	Insert       = 2002,
595 	Reserved1    = 2003,
596 	Query        = 2004,
597 	GetMore      = 2005,
598 	Delete       = 2006,
599 	KillCursors  = 2007
600 }
601 
602 alias ReplyDelegate = void delegate(long cursor, ReplyFlags flags, int first_doc, int num_docs) @safe;
603 template DocDelegate(T) { alias DocDelegate = void delegate(size_t idx, ref T doc) @safe; }
604 
605 struct MongoDBInfo
606 {
607 	string name;
608 	double sizeOnDisk;
609 	bool empty;
610 }
611 
612 private int sendLength(ARGS...)(ARGS args)
613 {
614 	import std.traits;
615 	static if (ARGS.length == 1) {
616 		alias T = ARGS[0];
617 		static if (is(T == string)) return cast(int)args[0].length + 1;
618 		else static if (is(T == int)) return 4;
619 		else static if (is(T == long)) return 8;
620 		else static if (is(T == Bson)) return cast(int)args[0].data.length;
621 		else static if (isArray!T) {
622 			int ret = 0;
623 			foreach (el; args[0]) ret += sendLength(el);
624 			return ret;
625 		} else static assert(false, "Unexpected type: "~T.stringof);
626 	}
627 	else if (ARGS.length == 0) return 0;
628 	else return sendLength(args[0 .. $/2]) + sendLength(args[$/2 .. $]);
629 }