1 /**
2 	MongoDB cursor abstraction
3 
4 	Copyright: © 2012-2014 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.cursor;
9 
10 public import vibe.data.bson;
11 public import vibe.db.mongo.impl.crud;
12 
13 import vibe.core.log;
14 
15 import vibe.db.mongo.connection;
16 import vibe.db.mongo.client;
17 
18 import core.time;
19 import std.array : array;
20 import std.algorithm : map, max, min, skipOver;
21 import std.exception;
22 import std.range : chain;
23 
24 
25 /**
26 	Represents a cursor for a MongoDB query.
27 
28 	Use foreach( doc; cursor ) to iterate over the list of documents.
29 
30 	This struct uses reference counting to destroy the underlying MongoDB cursor.
31 */
32 struct MongoCursor(DocType = Bson) {
33 	private IMongoCursorData!DocType m_data;
34 
35 	deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported")
36 	package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
37 	{
38 		// TODO: avoid memory allocation, if possible
39 		m_data = new MongoQueryCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector);
40 	}
41 
42 	deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported")
43 	package this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
44 	{
45 		// TODO: avoid memory allocation, if possible
46 		m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents);
47 	}
48 
49 	this(Q)(MongoClient client, string database, string collection, Q query, FindOptions options)
50 	{
51 		Bson command = Bson.emptyObject;
52 		command["find"] = Bson(collection);
53 		command["$db"] = Bson(database);
54 		static if (is(Q == Bson))
55 			command["filter"] = query;
56 		else
57 			command["filter"] = serializeToBson(query);
58 
59 		MongoConnection conn = client.lockConnection();
60 		enforceWireVersionConstraints(options, conn.description.maxWireVersion);
61 
62 		// https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#mapping-op_query-behavior-to-the-find-command-limit-and-batchsize-fields
63 		bool singleBatch;
64 		if (!options.limit.isNull && options.limit.get < 0)
65 		{
66 			singleBatch = true;
67 			options.limit = -options.limit.get;
68 			options.batchSize = cast(int)options.limit.get;
69 		}
70 		if (!options.batchSize.isNull && options.batchSize.get < 0)
71 		{
72 			singleBatch = true;
73 			options.batchSize = -options.batchSize.get;
74 		}
75 		if (singleBatch)
76 			command["singleBatch"] = Bson(true);
77 
78 		// https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#semantics-of-maxtimems-for-a-driver
79 		bool allowMaxTime = true;
80 		if (options.cursorType == CursorType.tailable
81 			|| options.cursorType == CursorType.tailableAwait)
82 			command["tailable"] = Bson(true);
83 		else
84 		{
85 			options.maxAwaitTimeMS.nullify();
86 			allowMaxTime = false;
87 		}
88 
89 		if (options.cursorType == CursorType.tailableAwait)
90 			command["awaitData"] = Bson(true);
91 		else
92 		{
93 			options.maxAwaitTimeMS.nullify();
94 			allowMaxTime = false;
95 		}
96 
97 		// see table: https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#find
98 		auto optionsBson = serializeToBson(options);
99 		foreach (string key, value; optionsBson.byKeyValue)
100 			command[key] = value;
101 
102 		this(client, command,
103 			options.batchSize.isNull ? 0 : options.batchSize.get,
104 			!options.maxAwaitTimeMS.isNull ? options.maxAwaitTimeMS.get.msecs
105 				: allowMaxTime && !options.maxTimeMS.isNull ? options.maxTimeMS.get.msecs
106 				: Duration.max);
107 	}
108 
109 	this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max)
110 	{
111 		// TODO: avoid memory allocation, if possible
112 		m_data = new MongoFindCursor!DocType(client, command, batchSize, getMoreMaxTime);
113 	}
114 
115 	this(this)
116 	{
117 		if( m_data ) m_data.refCount++;
118 	}
119 
120 	~this() @safe
121 	{
122 		import core.memory : GC;
123 
124 		if (m_data && --m_data.refCount == 0) {
125 			if (m_data.alive) {
126 				// avoid InvalidMemoryOperation errors in case the cursor was
127 				// leaked to the GC
128 				if(GC.inFinalizer) {
129 					logError("MongoCursor instance that has not been fully processed leaked to the GC!");
130 				} else {
131 					try m_data.killCursors();
132 					catch (MongoException e) {
133 						logWarn("MongoDB failed to kill cursors: %s", e.msg);
134 						logDiagnostic("%s", (() @trusted => e.toString)());
135 					}
136 				}
137 			}
138 		}
139 	}
140 
141 	/**
142 		Returns true if there are no more documents for this cursor.
143 
144 		Throws: An exception if there is a query or communication error.
145 	*/
146 	@property bool empty() { return m_data ? m_data.empty() : true; }
147 
148 	/**
149 		Returns the current document of the response.
150 
151 		Use empty and popFront to iterate over the list of documents using an
152 		input range interface. Note that calling this function is only allowed
153 		if empty returns false.
154 	*/
155 	@property DocType front() { return m_data.front; }
156 
157 	/**
158 		Controls the order in which the query returns matching documents.
159 
160 		This method must be called before starting to iterate, or an exception
161 		will be thrown. If multiple calls to $(D sort()) are issued, only
162 		the last one will have an effect.
163 
164 		Params:
165 			order = A BSON object convertible value that defines the sort order
166 				of the result. This BSON object must be structured according to
167 				the MongoDB documentation (see below).
168 
169 		Returns: Reference to the modified original cursor instance.
170 
171 		Throws:
172 			An exception if there is a query or communication error.
173 			Also throws if the method was called after beginning of iteration.
174 
175 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort)
176 	*/
177 	MongoCursor sort(T)(T order)
178 	{
179 		m_data.sort(() @trusted { return serializeToBson(order); } ());
180 		return this;
181 	}
182 
183 	///
184 	@safe unittest {
185 		import vibe.core.log;
186 		import vibe.db.mongo.mongo;
187 
188 		void test()
189 		@safe {
190 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
191 			auto coll = db["testcoll"];
192 
193 			// find all entries in reverse date order
194 			foreach (entry; coll.find().sort(["date": -1]))
195 				() @safe { logInfo("Entry: %s", entry); } ();
196 
197 			// the same, but using a struct to avoid memory allocations
198 			static struct Order { int date; }
199 			foreach (entry; coll.find().sort(Order(-1)))
200 				logInfo("Entry: %s", entry);
201 		}
202 	}
203 
204 	/**
205 		Limits the number of documents that the cursor returns.
206 
207 		This method must be called before beginning iteration in order to have
208 		effect. If multiple calls to limit() are made, the one with the lowest
209 		limit will be chosen.
210 
211 		Params:
212 			count = The maximum number number of documents to return. A value
213 				of zero means unlimited.
214 
215 		Returns: the same cursor
216 
217 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit)
218 	*/
219 	MongoCursor limit(long count)
220 	{
221 		m_data.limit(count);
222 		return this;
223 	}
224 
225 	/**
226 		Skips a given number of elements at the beginning of the cursor.
227 
228 		This method must be called before beginning iteration in order to have
229 		effect. If multiple calls to skip() are made, the one with the maximum
230 		number will be chosen.
231 
232 		Params:
233 			count = The number of documents to skip.
234 
235 		Returns: the same cursor
236 
237 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip)
238 	*/
239 	MongoCursor skip(long count)
240 	{
241 		m_data.skip(count);
242 		return this;
243 	}
244 
245 	@safe unittest {
246 		import vibe.core.log;
247 		import vibe.db.mongo.mongo;
248 
249 		void test()
250 		@safe {
251 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
252 			auto coll = db["testcoll"];
253 
254 			try { coll.drop(); } catch (Exception) {}
255 
256 			for (int i = 0; i < 10000; i++)
257 				coll.insertOne(["i": i]);
258 
259 			static struct Order { int i; }
260 			auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array;
261 
262 			assert(data.length == 2000);
263 			assert(data[0]["i"].get!int == 2000);
264 			assert(data[$ - 1]["i"].get!int == 3999);
265 		}
266 	}
267 
268 	/**
269 		Advances the cursor to the next document of the response.
270 
271 		Note that calling this function is only allowed if empty returns false.
272 	*/
273 	void popFront() { m_data.popFront(); }
274 
275 	/**
276 		Iterates over all remaining documents.
277 
278 		Note that iteration is one-way - elements that have already been visited
279 		will not be visited again if another iteration is done.
280 
281 		Throws: An exception if there is a query or communication error.
282 	*/
283 	auto byPair()
284 	{
285 		import std.typecons : Tuple, tuple;
286 		static struct Rng {
287 			private IMongoCursorData!DocType data;
288 			@property bool empty() { return data.empty; }
289 			@property Tuple!(long, DocType) front() { return tuple(data.index, data.front); }
290 			void popFront() { data.popFront(); }
291 		}
292 		return Rng(m_data);
293 	}
294 }
295 
296 /// Actual iteration implementation details for MongoCursor. Abstracted using an
297 /// interface because we still have code for legacy (<3.6) MongoDB servers,
298 /// which may still used with the old legacy overloads.
299 private interface IMongoCursorData(DocType) {
300 	@property bool alive() @safe nothrow;
301 	bool empty() @safe; /// Range implementation
302 	long index() @safe; /// Range implementation
303 	DocType front() @safe; /// Range implementation
304 	void popFront() @safe; /// Range implementation
305 	/// Before iterating, specify a MongoDB sort order
306 	void sort(Bson order) @safe;
307 	/// Before iterating, specify maximum number of returned items
308 	void limit(long count) @safe;
309 	/// Before iterating, skip the specified number of items (when sorted)
310 	void skip(long count) @safe;
311 	/// Kills the MongoDB cursor, further iteration attempts will result in
312 	/// errors. Call this in the destructor.
313 	void killCursors() @safe;
314 	/// Define an reference count property on the class, which is returned by
315 	/// reference with this method.
316 	ref int refCount() @safe;
317 }
318 
319 
320 /**
321 	Deprecated query internals exposed through MongoCursor.
322 */
323 private deprecated abstract class LegacyMongoCursorData(DocType) : IMongoCursorData!DocType {
324 	private {
325 		int m_refCount = 1;
326 		MongoClient m_client;
327 		string m_collection;
328 		long m_cursor;
329 		long m_nskip;
330 		int m_nret;
331 		Bson m_sort = Bson(null);
332 		int m_offset;
333 		size_t m_currentDoc = 0;
334 		DocType[] m_documents;
335 		bool m_iterationStarted = false;
336 		long m_limit = 0;
337 	}
338 
339 	@property bool alive() @safe nothrow { return m_cursor != 0; }
340 
341 	final bool empty()
342 	@safe {
343 		if (!m_iterationStarted) startIterating();
344 		if (m_limit > 0 && index >= m_limit) {
345 			killCursors();
346 			return true;
347 		}
348 		if( m_currentDoc < m_documents.length )
349 			return false;
350 		if( m_cursor == 0 )
351 			return true;
352 
353 		auto conn = m_client.lockConnection();
354 		conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument);
355 		return m_currentDoc >= m_documents.length;
356 	}
357 
358 	final long index()
359 	@safe {
360 		return m_offset + m_currentDoc;
361 	}
362 
363 	final DocType front()
364 	@safe {
365 		if (!m_iterationStarted) startIterating();
366 		assert(!empty(), "Cursor has no more data.");
367 		return m_documents[m_currentDoc];
368 	}
369 
370 	final void sort(Bson order)
371 	@safe {
372 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
373 		m_sort = order;
374 	}
375 
376 	final void limit(long count)
377 	@safe {
378 		// A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit.
379 		if (count > 0) {
380 			if (m_nret == 0 || m_nret > count)
381 				m_nret = cast(int)min(count, 1024);
382 
383 			if (m_limit == 0 || m_limit > count)
384 				m_limit = count;
385 		}
386 	}
387 
388 	final void skip(long count)
389 	@safe {
390 		// A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip.
391 		m_nskip = max(m_nskip, count);
392 	}
393 
394 	final void popFront()
395 	@safe {
396 		if (!m_iterationStarted) startIterating();
397 		assert(!empty(), "Cursor has no more data.");
398 		m_currentDoc++;
399 	}
400 
401 	abstract void startIterating() @safe;
402 
403 	final void killCursors()
404 	@safe {
405 		if (m_cursor == 0) return;
406 		auto conn = m_client.lockConnection();
407 		conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ());
408 		m_cursor = 0;
409 	}
410 
411 	final void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs)
412 	{
413 		enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle.");
414 		enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?");
415 
416 		m_cursor = cursor;
417 		m_offset = first_doc;
418 		m_documents.length = num_docs;
419 		m_currentDoc = 0;
420 	}
421 
422 	final void handleDocument(size_t idx, ref DocType doc)
423 	{
424 		m_documents[idx] = doc;
425 	}
426 
427 	final ref int refCount() { return m_refCount; }
428 }
429 
430 /**
431 	Find + getMore internals exposed through MongoCursor. Unifies the old
432 	LegacyMongoCursorData approach, so it can be used both for find queries and
433 	for custom commands.
434 */
435 private class MongoFindCursor(DocType) : IMongoCursorData!DocType {
436 	private {
437 		int m_refCount = 1;
438 		MongoClient m_client;
439 		Bson m_findQuery;
440 		string m_database;
441 		string m_ns;
442 		string m_collection;
443 		long m_cursor;
444 		int m_batchSize;
445 		Duration m_maxTime;
446 		long m_totalReceived;
447 		size_t m_readDoc;
448 		size_t m_insertDoc;
449 		DocType[] m_documents;
450 		bool m_iterationStarted = false;
451 		long m_queryLimit;
452 	}
453 
454 	this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max)
455 	{
456 		m_client = client;
457 		m_findQuery = command;
458 		m_batchSize = batchSize;
459 		m_maxTime = getMoreMaxTime;
460 		m_database = command["$db"].opt!string;
461 	}
462 
463 	@property bool alive() @safe nothrow { return m_cursor != 0; }
464 
465 	bool empty()
466 	@safe {
467 		if (!m_iterationStarted) startIterating();
468 		if (m_queryLimit > 0 && index >= m_queryLimit) {
469 			killCursors();
470 			return true;
471 		}
472 		if( m_readDoc < m_documents.length )
473 			return false;
474 		if( m_cursor == 0 )
475 			return true;
476 
477 		auto conn = m_client.lockConnection();
478 		conn.getMore!DocType(m_cursor, m_database, m_collection, m_batchSize,
479 			&handleReply, &handleDocument, m_maxTime);
480 		return m_readDoc >= m_documents.length;
481 	}
482 
483 	final long index()
484 	@safe {
485 		assert(m_totalReceived >= m_documents.length);
486 		return m_totalReceived - m_documents.length + m_readDoc;
487 	}
488 
489 	final DocType front()
490 	@safe {
491 		if (!m_iterationStarted) startIterating();
492 		assert(!empty(), "Cursor has no more data.");
493 		return m_documents[m_readDoc];
494 	}
495 
496 	final void sort(Bson order)
497 	@safe {
498 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
499 		m_findQuery["sort"] = order;
500 	}
501 
502 	final void limit(long count)
503 	@safe {
504 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
505 		m_findQuery["limit"] = Bson(count);
506 	}
507 
508 	final void skip(long count)
509 	@safe {
510 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
511 		m_findQuery["skip"] = Bson(count);
512 	}
513 
514 	final void popFront()
515 	@safe {
516 		if (!m_iterationStarted) startIterating();
517 		assert(!empty(), "Cursor has no more data.");
518 		m_readDoc++;
519 	}
520 
521 	private void startIterating()
522 	@safe {
523 		auto conn = m_client.lockConnection();
524 		m_totalReceived = 0;
525 		m_queryLimit = m_findQuery["limit"].opt!long(0);
526 		conn.startFind!DocType(m_findQuery, &handleReply, &handleDocument);
527 		m_iterationStarted = true;
528 	}
529 
530 	final void killCursors()
531 	@safe {
532 		if (m_cursor == 0) return;
533 		auto conn = m_client.lockConnection();
534 		conn.killCursors(m_ns, () @trusted { return (&m_cursor)[0 .. 1]; } ());
535 		m_cursor = 0;
536 	}
537 
538 	final void handleReply(long id, string ns, size_t count)
539 	{
540 		m_cursor = id;
541 		m_ns = ns;
542 		// The qualified collection name is reported here, but when requesting
543 		// data, we need to send the database name and the collection name
544 		// separately, so we have to remove the database prefix:
545 		ns.skipOver(m_database.chain("."));
546 		m_collection = ns;
547 		m_documents.length = count;
548 		m_readDoc = 0;
549 		m_insertDoc = 0;
550 	}
551 
552 	final void handleDocument(ref DocType doc)
553 	{
554 		m_documents[m_insertDoc++] = doc;
555 		m_totalReceived++;
556 	}
557 
558 	final ref int refCount() { return m_refCount; }
559 }
560 
561 /**
562 	Internal class implementing MongoCursorData for find queries
563  */
564 private deprecated class MongoQueryCursor(Q, R, S) : LegacyMongoCursorData!R {
565 	private {
566 		QueryFlags m_flags;
567 		Q m_query;
568 		S m_returnFieldSelector;
569 	}
570 
571 	this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
572 	{
573 		m_client = client;
574 		m_collection = collection;
575 		m_flags = flags;
576 		m_nskip = nskip;
577 		m_nret = nret;
578 		m_query = query;
579 		m_returnFieldSelector = return_field_selector;
580 	}
581 
582 	override void startIterating()
583 	@safe {
584 		auto conn = m_client.lockConnection();
585 
586 		ubyte[256] selector_buf = void;
587 		ubyte[256] query_buf = void;
588 
589 		Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } ();
590 
591 		Bson query;
592 		static if (is(Q == Bson)) {
593 			query = m_query;
594 		} else {
595 			query = () @trusted { return serializeToBson(m_query, query_buf); } ();
596 		}
597 
598 		Bson full_query;
599 
600 		if (!query["query"].isNull() || !query["$query"].isNull()) {
601 			// TODO: emit deprecation warning
602 			full_query = query;
603 		} else {
604 			full_query = Bson.emptyObject;
605 			full_query["$query"] = query;
606 		}
607 
608 		if (!m_sort.isNull()) full_query["orderby"] = m_sort;
609 
610 		conn.query!R(m_collection, m_flags, cast(int)m_nskip, cast(int)m_nret, full_query, selector, &handleReply, &handleDocument);
611 
612 		m_iterationStarted = true;
613 	}
614 }
615 
616 /**
617 	Internal class implementing MongoCursorData for already initialized generic cursors
618  */
619 private deprecated class MongoGenericCursor(DocType) : LegacyMongoCursorData!DocType {
620 	this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
621 	{
622 		m_client = client;
623 		m_collection = collection;
624 		m_cursor = cursor;
625 		m_iterationStarted = true;
626 		m_documents = existing_documents;
627 	}
628 
629 	override void startIterating()
630 	@safe {
631 		assert(false, "Calling startIterating on an opaque already initialized cursor");
632 	}
633 }