1 /**
2 	MongoCollection class
3 
4 	Copyright: © 2012-2016 RejectedSoftware e.K.
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.collection;
9 
10 public import vibe.db.mongo.cursor;
11 public import vibe.db.mongo.connection;
12 public import vibe.db.mongo.flags;
13 
14 import vibe.core.log;
15 import vibe.db.mongo.client;
16 
17 import core.time;
18 import std.algorithm : countUntil, find;
19 import std.array;
20 import std.conv;
21 import std.exception;
22 import std.string;
23 import std.typecons : Tuple, tuple;
24 
25 
26 /**
27   Represents a single collection inside a MongoDB.
28 
29   All methods take arbitrary types for Bson arguments. serializeToBson() is implicitly called on
30   them before they are send to the database. The following example shows some possible ways
31   to specify objects.
32  */
33 struct MongoCollection {
34 	private {
35 		MongoClient m_client;
36 		MongoDatabase m_db;
37 		string m_name;
38 		string m_fullPath;
39 	}
40 
41 	this(MongoClient client, string fullPath)
42 	@safe {
43 		assert(client !is null);
44 		m_client = client;
45 
46 		auto dotidx = fullPath.indexOf('.');
47 		assert(dotidx > 0, "The collection name passed to MongoCollection must be of the form \"dbname.collectionname\".");
48 
49 		m_fullPath = fullPath;
50 		m_db = m_client.getDatabase(fullPath[0 .. dotidx]);
51 		m_name = fullPath[dotidx+1 .. $];
52 	}
53 
54 	this(ref MongoDatabase db, string name)
55 	@safe {
56 		assert(db.client !is null);
57 		m_client = db.client;
58 		m_fullPath = db.name ~ "." ~ name;
59 		m_db = db;
60 		m_name = name;
61 	}
62 
63 	/**
64 	  Returns: Root database to which this collection belongs.
65 	 */
66 	@property MongoDatabase database() @safe { return m_db; }
67 
68 	/**
69 	  Returns: Name of this collection (excluding the database name).
70 	 */
71 	@property string name() const @safe { return m_name; }
72 
73 	/**
74 	  Performs an update operation on documents matching 'selector', updating them with 'update'.
75 
76 	  Throws: Exception if a DB communication error occured.
77 	  See_Also: $(LINK http://www.mongodb.org/display/DOCS/Updating)
78 	 */
79 	void update(T, U)(T selector, U update, UpdateFlags flags = UpdateFlags.None)
80 	{
81 		assert(m_client !is null, "Updating uninitialized MongoCollection.");
82 		auto conn = m_client.lockConnection();
83 		ubyte[256] selector_buf = void, update_buf = void;
84 		conn.update(m_fullPath, flags, serializeToBson(selector, selector_buf), serializeToBson(update, update_buf));
85 	}
86 
87 	/**
88 	  Inserts new documents into the collection.
89 
90 	  Note that if the `_id` field of the document(s) is not set, typically
91 	  using `BsonObjectID.generate()`, the server will generate IDs
92 	  automatically. If you need to know the IDs of the inserted documents,
93 	  you need to generate them locally.
94 
95 	  Throws: Exception if a DB communication error occured.
96 	  See_Also: $(LINK http://www.mongodb.org/display/DOCS/Inserting)
97 	 */
98 	void insert(T)(T document_or_documents, InsertFlags flags = InsertFlags.None)
99 	{
100 		assert(m_client !is null, "Inserting into uninitialized MongoCollection.");
101 		auto conn = m_client.lockConnection();
102 		Bson[] docs;
103 		Bson bdocs = () @trusted { return serializeToBson(document_or_documents); } ();
104 		if( bdocs.type == Bson.Type.Array ) docs = cast(Bson[])bdocs;
105 		else docs = () @trusted { return (&bdocs)[0 .. 1]; } ();
106 		conn.insert(m_fullPath, flags, docs);
107 	}
108 
109 	/**
110 	  Queries the collection for existing documents.
111 
112 	  If no arguments are passed to find(), all documents of the collection will be returned.
113 
114 	  See_Also: $(LINK http://www.mongodb.org/display/DOCS/Querying)
115 	 */
116 	MongoCursor!(T, R, U) find(R = Bson, T, U)(T query, U returnFieldSelector, QueryFlags flags = QueryFlags.None, int num_skip = 0, int num_docs_per_chunk = 0)
117 	{
118 		assert(m_client !is null, "Querying uninitialized MongoCollection.");
119 		return MongoCursor!(T, R, U)(m_client, m_fullPath, flags, num_skip, num_docs_per_chunk, query, returnFieldSelector);
120 	}
121 
122 	/// ditto
123 	MongoCursor!(T, R, typeof(null)) find(R = Bson, T)(T query) { return find!R(query, null); }
124 
125 	/// ditto
126 	MongoCursor!(Bson, R, typeof(null)) find(R = Bson)() { return find!R(Bson.emptyObject, null); }
127 
128 	/** Queries the collection for existing documents.
129 
130 		Returns:
131 			By default, a Bson value of the matching document is returned, or $(D Bson(null))
132 			when no document matched. For types R that are not Bson, the returned value is either
133 			of type $(D R), or of type $(Nullable!R), if $(D R) is not a reference/pointer type.
134 
135 		Throws: Exception if a DB communication error or a query error occured.
136 		See_Also: $(LINK http://www.mongodb.org/display/DOCS/Querying)
137 	 */
138 	auto findOne(R = Bson, T, U)(T query, U returnFieldSelector, QueryFlags flags = QueryFlags.None)
139 	{
140 		import std.traits;
141 		import std.typecons;
142 
143 		auto c = find!R(query, returnFieldSelector, flags, 0, 1);
144 		static if (is(R == Bson)) {
145 			foreach (doc; c) return doc;
146 			return Bson(null);
147 		} else static if (is(R == class) || isPointer!R || isDynamicArray!R || isAssociativeArray!R) {
148 			foreach (doc; c) return doc;
149 			return null;
150 		} else {
151 			foreach (doc; c) {
152 				Nullable!R ret;
153 				ret = doc;
154 				return ret;
155 			}
156 			return Nullable!R.init;
157 		}
158 	}
159 	/// ditto
160 	auto findOne(R = Bson, T)(T query) { return findOne!R(query, Bson(null)); }
161 
162 	/**
163 	  Removes documents from the collection.
164 
165 	  Throws: Exception if a DB communication error occured.
166 	  See_Also: $(LINK http://www.mongodb.org/display/DOCS/Removing)
167 	 */
168 	void remove(T)(T selector, DeleteFlags flags = DeleteFlags.None)
169 	{
170 		assert(m_client !is null, "Removing from uninitialized MongoCollection.");
171 		auto conn = m_client.lockConnection();
172 		ubyte[256] selector_buf = void;
173 		conn.delete_(m_fullPath, flags, serializeToBson(selector, selector_buf));
174 	}
175 
176 	/// ditto
177 	void remove()() { remove(Bson.emptyObject); }
178 
179 	/**
180 		Combines a modify and find operation to a single atomic operation.
181 
182 		Params:
183 			query = MongoDB query expression to identify the matched document
184 			update = Update expression for the matched document
185 			returnFieldSelector = Optional map of fields to return in the response
186 
187 		Throws:
188 			An `Exception` will be thrown if an error occurs in the
189 			communication with the database server.
190 
191 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/command/findAndModify)
192 	 */
193 	Bson findAndModify(T, U, V)(T query, U update, V returnFieldSelector)
194 	{
195 		static struct CMD {
196 			string findAndModify;
197 			T query;
198 			U update;
199 			V fields;
200 		}
201 		CMD cmd;
202 		cmd.findAndModify = m_name;
203 		cmd.query = query;
204 		cmd.update = update;
205 		cmd.fields = returnFieldSelector;
206 		auto ret = database.runCommand(cmd);
207 		if( !ret["ok"].get!double ) throw new Exception("findAndModify failed.");
208 		return ret["value"];
209 	}
210 
211 	/// ditto
212 	Bson findAndModify(T, U)(T query, U update)
213 	{
214 		return findAndModify(query, update, null);
215 	}
216 
217 	/**
218 		Combines a modify and find operation to a single atomic operation with generic options support.
219 
220 		Params:
221 			query = MongoDB query expression to identify the matched document
222 			update = Update expression for the matched document
223 			options = Generic BSON object that contains additional options
224 				fields, such as `"new": true`
225 
226 		Throws:
227 			An `Exception` will be thrown if an error occurs in the
228 			communication with the database server.
229 
230 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/command/findAndModify)
231 	 */
232 	Bson findAndModifyExt(T, U, V)(T query, U update, V options)
233 	{
234 		auto bopt = serializeToBson(options);
235 		assert(bopt.type == Bson.Type.object,
236 			"The options parameter to findAndModifyExt must be a BSON object.");
237 
238 		Bson cmd = Bson.emptyObject;
239 		cmd["findAndModify"] = m_name;
240 		cmd["query"] = serializeToBson(query);
241 		cmd["update"] = serializeToBson(update);
242 		bopt.opApply(delegate int(string key, Bson value) @safe {
243 			cmd[key] = value;
244 			return 0;
245 		});
246 		auto ret = database.runCommand(cmd);
247 		enforce(ret["ok"].get!double != 0, "findAndModifyExt failed.");
248 		return ret["value"];
249 	}
250 
251 	///
252 	unittest {
253 		import vibe.db.mongo.mongo;
254 
255 		void test()
256 		{
257 			auto coll = connectMongoDB("127.0.0.1").getCollection("test");
258 			coll.findAndModifyExt(["name": "foo"], ["$set": ["value": "bar"]], ["new": true]);
259 		}
260 	}
261 
262 	/**
263 		Counts the results of the specified query expression.
264 
265 		Throws Exception if a DB communication error occured.
266 		See_Also: $(LINK http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-{{count%28%29}})
267 	*/
268 	ulong count(T)(T query)
269 	{
270 		static struct Empty {}
271 		static struct CMD {
272 			string count;
273 			T query;
274 			Empty fields;
275 		}
276 
277 		CMD cmd;
278 		cmd.count = m_name;
279 		cmd.query = query;
280 		auto reply = database.runCommand(cmd);
281 		enforce(reply["ok"].opt!double == 1 || reply["ok"].opt!int == 1, "Count command failed.");
282 		switch (reply["n"].type) with (Bson.Type) {
283 			default: assert(false, "Unsupported data type in BSON reply for COUNT");
284 			case double_: return cast(ulong)reply["n"].get!double; // v2.x
285 			case int_: return reply["n"].get!int; // v3.x
286 			case long_: return reply["n"].get!long; // just in case
287 		}
288 	}
289 
290 	/**
291 		Calculates aggregate values for the data in a collection.
292 
293 		Params:
294 			pipeline = A sequence of data aggregation processes. These can
295 				either be given as separate parameters, or as a single array
296 				parameter.
297 
298 		Returns: An array of documents returned by the pipeline
299 
300 		Throws: Exception if a DB communication error occured
301 
302 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/db.collection.aggregate)
303 	*/
304 	Bson aggregate(ARGS...)(ARGS pipeline)
305 	{
306 		import std.traits;
307 
308 		static if (ARGS.length == 1 && isArray!(ARGS[0]))
309 			alias Pipeline = ARGS[0];
310 		else static struct Pipeline { ARGS args; }
311 
312 		static struct CMD {
313 			string aggregate;
314 			@asArray Pipeline pipeline;
315 		}
316 
317 		CMD cmd;
318 		cmd.aggregate = m_name;
319 		static if (ARGS.length == 1 && isArray!(ARGS[0]))
320 			cmd.pipeline = pipeline[0];
321 		else cmd.pipeline.args = pipeline;
322 		auto ret = database.runCommand(cmd);
323 		enforce(ret["ok"].get!double == 1, "Aggregate command failed.");
324 		return ret["result"];
325 	}
326 
327 	/// Example taken from the MongoDB documentation
328 	unittest {
329 		import vibe.db.mongo.mongo;
330 
331 		void test() {
332 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
333 			auto results = db["coll"].aggregate(
334 				["$match": ["status": "A"]],
335 				["$group": ["_id": Bson("$cust_id"),
336 					"total": Bson(["$sum": Bson("$amount")])]],
337 				["$sort": ["total": -1]]);
338 		}
339 	}
340 
341 	/// The same example, but using an array of arguments
342 	unittest {
343 		import vibe.db.mongo.mongo;
344 
345 		void test() {
346 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
347 
348 			Bson[] args;
349 			args ~= serializeToBson(["$match": ["status": "A"]]);
350 			args ~= serializeToBson(["$group": ["_id": Bson("$cust_id"),
351 					"total": Bson(["$sum": Bson("$amount")])]]);
352 			args ~= serializeToBson(["$sort": ["total": -1]]);
353 
354 			auto results = db["coll"].aggregate(args);
355 		}
356 	}
357 
358 	/**
359 		Returns an input range of all unique values for a certain field for
360 		records matching the given query.
361 
362 		Params:
363 			key = Name of the field for which to collect unique values
364 			query = The query used to select records
365 
366 		Returns:
367 			An input range with items of type `R` (`Bson` by default) is
368 			returned.
369 	*/
370 	auto distinct(R = Bson, Q)(string key, Q query)
371 	{
372 		import std.algorithm : map;
373 
374 		static struct CMD {
375 			string distinct;
376 			string key;
377 			Q query;
378 		}
379 		CMD cmd;
380 		cmd.distinct = m_name;
381 		cmd.key = key;
382 		cmd.query = query;
383 		auto res = m_db.runCommand(cmd);
384 
385 		enforce(res["ok"].get!double != 0, "Distinct query failed: "~res["errmsg"].opt!string);
386 
387 		static if (is(R == Bson)) return res["values"].byValue;
388 		else return res["values"].byValue.map!(b => deserializeBson!R(b));
389 	}
390 
391 	///
392 	unittest {
393 		import std.algorithm : equal;
394 		import vibe.db.mongo.mongo;
395 
396 		void test()
397 		{
398 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
399 			auto coll = db["collection"];
400 
401 			coll.drop();
402 			coll.insert(["a": "first", "b": "foo"]);
403 			coll.insert(["a": "first", "b": "bar"]);
404 			coll.insert(["a": "first", "b": "bar"]);
405 			coll.insert(["a": "second", "b": "baz"]);
406 			coll.insert(["a": "second", "b": "bam"]);
407 
408 			auto result = coll.distinct!string("b", ["a": "first"]);
409 
410 			assert(result.equal(["foo", "bar"]));
411 		}
412 	}
413 
414 	/**
415 		Creates or updates an index.
416 
417 		Note that the overload taking an associative array of field orders
418 		will be removed. Since the order of fields matters, it is
419 		only suitable for single-field indices.
420 	*/
421 	void ensureIndex(scope const(Tuple!(string, int))[] field_orders, IndexFlags flags = IndexFlags.none, Duration expire_time = 0.seconds)
422 	@safe {
423 		// TODO: support 2d indexes
424 
425 		auto key = Bson.emptyObject;
426 		auto indexname = appender!string();
427 		bool first = true;
428 		foreach (fo; field_orders) {
429 			if (!first) indexname.put('_');
430 			else first = false;
431 			indexname.put(fo[0]);
432 			indexname.put('_');
433 			indexname.put(to!string(fo[1]));
434 			key[fo[0]] = Bson(fo[1]);
435 		}
436 
437 		Bson[string] doc;
438 		doc["v"] = 1;
439 		doc["key"] = key;
440 		doc["ns"] = m_fullPath;
441 		doc["name"] = indexname.data;
442 		if (flags & IndexFlags.unique) doc["unique"] = true;
443 		if (flags & IndexFlags.dropDuplicates) doc["dropDups"] = true;
444 		if (flags & IndexFlags.background) doc["background"] = true;
445 		if (flags & IndexFlags.sparse) doc["sparse"] = true;
446 		if (flags & IndexFlags.expireAfterSeconds) doc["expireAfterSeconds"] = expire_time.total!"seconds";
447 		database["system.indexes"].insert(doc);
448 	}
449 	/// ditto
450 	deprecated("Use the overload taking an array of field_orders instead.")
451 	void ensureIndex(int[string] field_orders, IndexFlags flags = IndexFlags.none, ulong expireAfterSeconds = 0)
452 	@safe {
453 		Tuple!(string, int)[] orders;
454 		foreach (k, v; field_orders)
455 			orders ~= tuple(k, v);
456 		ensureIndex(orders, flags, expireAfterSeconds.seconds);
457 	}
458 
459 	void dropIndex(string name)
460 	@safe {
461 		static struct CMD {
462 			string dropIndexes;
463 			string index;
464 		}
465 
466 		CMD cmd;
467 		cmd.dropIndexes = m_name;
468 		cmd.index = name;
469 		auto reply = database.runCommand(cmd);
470 		enforce(reply["ok"].get!double == 1, "dropIndex command failed.");
471 	}
472 
473 	void drop()
474 	@safe {
475 		static struct CMD {
476 			string drop;
477 		}
478 
479 		CMD cmd;
480 		cmd.drop = m_name;
481 		auto reply = database.runCommand(cmd);
482 		enforce(reply["ok"].get!double == 1, "drop command failed.");
483 	}
484 }
485 
486 ///
487 unittest {
488 	import vibe.data.bson;
489 	import vibe.data.json;
490 	import vibe.db.mongo.mongo;
491 
492 	void test()
493 	{
494 		MongoClient client = connectMongoDB("127.0.0.1");
495 		MongoCollection users = client.getCollection("myapp.users");
496 
497 		// canonical version using a Bson object
498 		users.insert(Bson(["name": Bson("admin"), "password": Bson("secret")]));
499 
500 		// short version using a string[string] AA that is automatically
501 		// serialized to Bson
502 		users.insert(["name": "admin", "password": "secret"]);
503 
504 		// BSON specific types are also serialized automatically
505 		auto uid = BsonObjectID.fromString("507f1f77bcf86cd799439011");
506 		Bson usr = users.findOne(["_id": uid]);
507 
508 		// JSON is another possibility
509 		Json jusr = parseJsonString(`{"name": "admin", "password": "secret"}`);
510 		users.insert(jusr);
511 	}
512 }
513 
514 /// Using the type system to define a document "schema"
515 unittest {
516 	import vibe.db.mongo.mongo;
517 	import vibe.data.serialization : name;
518 	import std.typecons : Nullable;
519 
520 	// Nested object within a "User" document
521 	struct Address {
522 		string name;
523 		string street;
524 		int zipCode;
525 	}
526 
527 	// The document structure of the "myapp.users" collection
528 	struct User {
529 		@name("_id") BsonObjectID id; // represented as "_id" in the database
530 		string loginName;
531 		string password;
532 		Address address;
533 	}
534 
535 	void test()
536 	{
537 		MongoClient client = connectMongoDB("127.0.0.1");
538 		MongoCollection users = client.getCollection("myapp.users");
539 
540 		// D values are automatically serialized to the internal BSON format
541 		// upon insertion - see also vibe.data.serialization
542 		User usr;
543 		usr.id = BsonObjectID.generate();
544 		usr.loginName = "admin";
545 		usr.password = "secret";
546 		users.insert(usr);
547 
548 		// find supports direct de-serialization of the returned documents
549 		foreach (usr2; users.find!User()) {
550 			logInfo("User: %s", usr2.loginName);
551 		}
552 
553 		// the same goes for findOne
554 		Nullable!User qusr = users.findOne!User(["_id": usr.id]);
555 		if (!qusr.isNull)
556 			logInfo("User: %s", qusr.loginName);
557 	}
558 }