1 /**
2 	MongoDB cursor abstraction
3 
4 	Copyright: © 2012-2014 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.cursor;
9 
10 public import vibe.data.bson;
11 public import vibe.db.mongo.impl.crud;
12 
13 import vibe.core.log;
14 
15 import vibe.db.mongo.connection;
16 import vibe.db.mongo.client;
17 
18 import core.time;
19 import std.array : array;
20 import std.algorithm : map, max, min, skipOver;
21 import std.exception;
22 import std.range : chain;
23 
24 
25 /**
26 	Represents a cursor for a MongoDB query.
27 
28 	Use foreach( doc; cursor ) to iterate over the list of documents.
29 
30 	This struct uses reference counting to destroy the underlying MongoDB cursor.
31 */
32 struct MongoCursor(DocType = Bson) {
33 	private IMongoCursorData!DocType m_data;
34 
35 	deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported")
36 	package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
37 	{
38 		// TODO: avoid memory allocation, if possible
39 		m_data = new MongoQueryCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector);
40 	}
41 
42 	deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported")
43 	package this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
44 	{
45 		// TODO: avoid memory allocation, if possible
46 		m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents);
47 	}
48 
49 	this(Q)(MongoClient client, string database, string collection, Q query, FindOptions options)
50 	{
51 		Bson command = Bson.emptyObject;
52 		command["find"] = Bson(collection);
53 		command["$db"] = Bson(database);
54 		static if (is(Q == Bson))
55 			command["filter"] = query;
56 		else
57 			command["filter"] = serializeToBson(query);
58 
59 		MongoConnection conn = client.lockConnection();
60 		enforceWireVersionConstraints(options, conn.description.maxWireVersion);
61 
62 		// https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#mapping-op_query-behavior-to-the-find-command-limit-and-batchsize-fields
63 		bool singleBatch;
64 		if (!options.limit.isNull && options.limit.get < 0)
65 		{
66 			singleBatch = true;
67 			options.limit = -options.limit.get;
68 			options.batchSize = cast(int)options.limit.get;
69 		}
70 		if (!options.batchSize.isNull && options.batchSize.get < 0)
71 		{
72 			singleBatch = true;
73 			options.batchSize = -options.batchSize.get;
74 		}
75 		if (singleBatch)
76 			command["singleBatch"] = Bson(true);
77 
78 		// https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#semantics-of-maxtimems-for-a-driver
79 		bool allowMaxTime = true;
80 		if (options.cursorType == CursorType.tailable
81 			|| options.cursorType == CursorType.tailableAwait)
82 			command["tailable"] = Bson(true);
83 		else
84 		{
85 			options.maxAwaitTimeMS.nullify();
86 			allowMaxTime = false;
87 		}
88 
89 		if (options.cursorType == CursorType.tailableAwait)
90 			command["awaitData"] = Bson(true);
91 		else
92 		{
93 			options.maxAwaitTimeMS.nullify();
94 			allowMaxTime = false;
95 		}
96 
97 		// see table: https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#find
98 		auto optionsBson = serializeToBson(options);
99 		foreach (string key, value; optionsBson.byKeyValue)
100 			command[key] = value;
101 
102 		this(client, command,
103 			options.batchSize.isNull ? 0 : options.batchSize.get,
104 			!options.maxAwaitTimeMS.isNull ? options.maxAwaitTimeMS.get.msecs
105 				: allowMaxTime && !options.maxTimeMS.isNull ? options.maxTimeMS.get.msecs
106 				: Duration.max);
107 	}
108 
109 	this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max)
110 	{
111 		// TODO: avoid memory allocation, if possible
112 		m_data = new MongoFindCursor!DocType(client, command, batchSize, getMoreMaxTime);
113 	}
114 
115 	this(this)
116 	{
117 		if( m_data ) m_data.refCount++;
118 	}
119 
120 	~this() @safe
121 	{
122 		if( m_data && --m_data.refCount == 0 ){
123 			try {
124 				m_data.killCursors();
125 			} catch (MongoException e) {
126 				logWarn("MongoDB failed to kill cursors: %s", e.msg);
127 				logDiagnostic("%s", (() @trusted => e.toString)());
128 			}
129 		}
130 	}
131 
132 	/**
133 		Returns true if there are no more documents for this cursor.
134 
135 		Throws: An exception if there is a query or communication error.
136 	*/
137 	@property bool empty() { return m_data ? m_data.empty() : true; }
138 
139 	/**
140 		Returns the current document of the response.
141 
142 		Use empty and popFront to iterate over the list of documents using an
143 		input range interface. Note that calling this function is only allowed
144 		if empty returns false.
145 	*/
146 	@property DocType front() { return m_data.front; }
147 
148 	/**
149 		Controls the order in which the query returns matching documents.
150 
151 		This method must be called before starting to iterate, or an exception
152 		will be thrown. If multiple calls to $(D sort()) are issued, only
153 		the last one will have an effect.
154 
155 		Params:
156 			order = A BSON object convertible value that defines the sort order
157 				of the result. This BSON object must be structured according to
158 				the MongoDB documentation (see below).
159 
160 		Returns: Reference to the modified original cursor instance.
161 
162 		Throws:
163 			An exception if there is a query or communication error.
164 			Also throws if the method was called after beginning of iteration.
165 
166 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort)
167 	*/
168 	MongoCursor sort(T)(T order)
169 	{
170 		m_data.sort(() @trusted { return serializeToBson(order); } ());
171 		return this;
172 	}
173 
174 	///
175 	@safe unittest {
176 		import vibe.core.log;
177 		import vibe.db.mongo.mongo;
178 
179 		void test()
180 		@safe {
181 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
182 			auto coll = db["testcoll"];
183 
184 			// find all entries in reverse date order
185 			foreach (entry; coll.find().sort(["date": -1]))
186 				() @safe { logInfo("Entry: %s", entry); } ();
187 
188 			// the same, but using a struct to avoid memory allocations
189 			static struct Order { int date; }
190 			foreach (entry; coll.find().sort(Order(-1)))
191 				logInfo("Entry: %s", entry);
192 		}
193 	}
194 
195 	/**
196 		Limits the number of documents that the cursor returns.
197 
198 		This method must be called before beginning iteration in order to have
199 		effect. If multiple calls to limit() are made, the one with the lowest
200 		limit will be chosen.
201 
202 		Params:
203 			count = The maximum number number of documents to return. A value
204 				of zero means unlimited.
205 
206 		Returns: the same cursor
207 
208 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit)
209 	*/
210 	MongoCursor limit(long count)
211 	{
212 		m_data.limit(count);
213 		return this;
214 	}
215 
216 	/**
217 		Skips a given number of elements at the beginning of the cursor.
218 
219 		This method must be called before beginning iteration in order to have
220 		effect. If multiple calls to skip() are made, the one with the maximum
221 		number will be chosen.
222 
223 		Params:
224 			count = The number of documents to skip.
225 
226 		Returns: the same cursor
227 
228 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip)
229 	*/
230 	MongoCursor skip(long count)
231 	{
232 		m_data.skip(count);
233 		return this;
234 	}
235 
236 	@safe unittest {
237 		import vibe.core.log;
238 		import vibe.db.mongo.mongo;
239 
240 		void test()
241 		@safe {
242 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
243 			auto coll = db["testcoll"];
244 
245 			try { coll.drop(); } catch (Exception) {}
246 
247 			for (int i = 0; i < 10000; i++)
248 				coll.insertOne(["i": i]);
249 
250 			static struct Order { int i; }
251 			auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array;
252 
253 			assert(data.length == 2000);
254 			assert(data[0]["i"].get!int == 2000);
255 			assert(data[$ - 1]["i"].get!int == 3999);
256 		}
257 	}
258 
259 	/**
260 		Advances the cursor to the next document of the response.
261 
262 		Note that calling this function is only allowed if empty returns false.
263 	*/
264 	void popFront() { m_data.popFront(); }
265 
266 	/**
267 		Iterates over all remaining documents.
268 
269 		Note that iteration is one-way - elements that have already been visited
270 		will not be visited again if another iteration is done.
271 
272 		Throws: An exception if there is a query or communication error.
273 	*/
274 	auto byPair()
275 	{
276 		import std.typecons : Tuple, tuple;
277 		static struct Rng {
278 			private IMongoCursorData!DocType data;
279 			@property bool empty() { return data.empty; }
280 			@property Tuple!(long, DocType) front() { return tuple(data.index, data.front); }
281 			void popFront() { data.popFront(); }
282 		}
283 		return Rng(m_data);
284 	}
285 }
286 
287 /// Actual iteration implementation details for MongoCursor. Abstracted using an
288 /// interface because we still have code for legacy (<3.6) MongoDB servers,
289 /// which may still used with the old legacy overloads.
290 private interface IMongoCursorData(DocType) {
291 	bool empty() @safe; /// Range implementation
292 	long index() @safe; /// Range implementation
293 	DocType front() @safe; /// Range implementation
294 	void popFront() @safe; /// Range implementation
295 	/// Before iterating, specify a MongoDB sort order
296 	void sort(Bson order) @safe;
297 	/// Before iterating, specify maximum number of returned items
298 	void limit(long count) @safe;
299 	/// Before iterating, skip the specified number of items (when sorted)
300 	void skip(long count) @safe;
301 	/// Kills the MongoDB cursor, further iteration attempts will result in
302 	/// errors. Call this in the destructor.
303 	void killCursors() @safe;
304 	/// Define an reference count property on the class, which is returned by
305 	/// reference with this method.
306 	ref int refCount() @safe;
307 }
308 
309 
310 /**
311 	Deprecated query internals exposed through MongoCursor.
312 */
313 private deprecated abstract class LegacyMongoCursorData(DocType) : IMongoCursorData!DocType {
314 	private {
315 		int m_refCount = 1;
316 		MongoClient m_client;
317 		string m_collection;
318 		long m_cursor;
319 		long m_nskip;
320 		int m_nret;
321 		Bson m_sort = Bson(null);
322 		int m_offset;
323 		size_t m_currentDoc = 0;
324 		DocType[] m_documents;
325 		bool m_iterationStarted = false;
326 		long m_limit = 0;
327 	}
328 
329 	final bool empty()
330 	@safe {
331 		if (!m_iterationStarted) startIterating();
332 		if (m_limit > 0 && index >= m_limit) {
333 			killCursors();
334 			return true;
335 		}
336 		if( m_currentDoc < m_documents.length )
337 			return false;
338 		if( m_cursor == 0 )
339 			return true;
340 
341 		auto conn = m_client.lockConnection();
342 		conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument);
343 		return m_currentDoc >= m_documents.length;
344 	}
345 
346 	final long index()
347 	@safe {
348 		return m_offset + m_currentDoc;
349 	}
350 
351 	final DocType front()
352 	@safe {
353 		if (!m_iterationStarted) startIterating();
354 		assert(!empty(), "Cursor has no more data.");
355 		return m_documents[m_currentDoc];
356 	}
357 
358 	final void sort(Bson order)
359 	@safe {
360 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
361 		m_sort = order;
362 	}
363 
364 	final void limit(long count)
365 	@safe {
366 		// A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit.
367 		if (count > 0) {
368 			if (m_nret == 0 || m_nret > count)
369 				m_nret = cast(int)min(count, 1024);
370 
371 			if (m_limit == 0 || m_limit > count)
372 				m_limit = count;
373 		}
374 	}
375 
376 	final void skip(long count)
377 	@safe {
378 		// A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip.
379 		m_nskip = max(m_nskip, count);
380 	}
381 
382 	final void popFront()
383 	@safe {
384 		if (!m_iterationStarted) startIterating();
385 		assert(!empty(), "Cursor has no more data.");
386 		m_currentDoc++;
387 	}
388 
389 	abstract void startIterating() @safe;
390 
391 	final void killCursors()
392 	@safe {
393 		auto conn = m_client.lockConnection();
394 		if (m_cursor == 0) return;
395 		conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ());
396 		m_cursor = 0;
397 	}
398 
399 	final void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs)
400 	{
401 		enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle.");
402 		enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?");
403 
404 		m_cursor = cursor;
405 		m_offset = first_doc;
406 		m_documents.length = num_docs;
407 		m_currentDoc = 0;
408 	}
409 
410 	final void handleDocument(size_t idx, ref DocType doc)
411 	{
412 		m_documents[idx] = doc;
413 	}
414 
415 	final ref int refCount() { return m_refCount; }
416 }
417 
418 /**
419 	Find + getMore internals exposed through MongoCursor. Unifies the old
420 	LegacyMongoCursorData approach, so it can be used both for find queries and
421 	for custom commands.
422 */
423 private class MongoFindCursor(DocType) : IMongoCursorData!DocType {
424 	private {
425 		int m_refCount = 1;
426 		MongoClient m_client;
427 		Bson m_findQuery;
428 		string m_database;
429 		string m_ns;
430 		string m_collection;
431 		long m_cursor;
432 		int m_batchSize;
433 		Duration m_maxTime;
434 		long m_totalReceived;
435 		size_t m_readDoc;
436 		size_t m_insertDoc;
437 		DocType[] m_documents;
438 		bool m_iterationStarted = false;
439 		long m_queryLimit;
440 	}
441 
442 	this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max)
443 	{
444 		m_client = client;
445 		m_findQuery = command;
446 		m_batchSize = batchSize;
447 		m_maxTime = getMoreMaxTime;
448 		m_database = command["$db"].opt!string;
449 	}
450 
451 	bool empty()
452 	@safe {
453 		if (!m_iterationStarted) startIterating();
454 		if (m_queryLimit > 0 && index >= m_queryLimit) {
455 			killCursors();
456 			return true;
457 		}
458 		if( m_readDoc < m_documents.length )
459 			return false;
460 		if( m_cursor == 0 )
461 			return true;
462 
463 		auto conn = m_client.lockConnection();
464 		conn.getMore!DocType(m_cursor, m_database, m_collection, m_batchSize,
465 			&handleReply, &handleDocument, m_maxTime);
466 		return m_readDoc >= m_documents.length;
467 	}
468 
469 	final long index()
470 	@safe {
471 		assert(m_totalReceived >= m_documents.length);
472 		return m_totalReceived - m_documents.length + m_readDoc;
473 	}
474 
475 	final DocType front()
476 	@safe {
477 		if (!m_iterationStarted) startIterating();
478 		assert(!empty(), "Cursor has no more data.");
479 		return m_documents[m_readDoc];
480 	}
481 
482 	final void sort(Bson order)
483 	@safe {
484 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
485 		m_findQuery["sort"] = order;
486 	}
487 
488 	final void limit(long count)
489 	@safe {
490 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
491 		m_findQuery["limit"] = Bson(count);
492 	}
493 
494 	final void skip(long count)
495 	@safe {
496 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
497 		m_findQuery["skip"] = Bson(count);
498 	}
499 
500 	final void popFront()
501 	@safe {
502 		if (!m_iterationStarted) startIterating();
503 		assert(!empty(), "Cursor has no more data.");
504 		m_readDoc++;
505 	}
506 
507 	private void startIterating()
508 	@safe {
509 		auto conn = m_client.lockConnection();
510 		m_totalReceived = 0;
511 		m_queryLimit = m_findQuery["limit"].opt!long(0);
512 		conn.startFind!DocType(m_findQuery, &handleReply, &handleDocument);
513 		m_iterationStarted = true;
514 	}
515 
516 	final void killCursors()
517 	@safe {
518 		auto conn = m_client.lockConnection();
519 		if (m_cursor == 0) return;
520 		conn.killCursors(m_ns, () @trusted { return (&m_cursor)[0 .. 1]; } ());
521 		m_cursor = 0;
522 	}
523 
524 	final void handleReply(long id, string ns, size_t count)
525 	{
526 		m_cursor = id;
527 		m_ns = ns;
528 		// The qualified collection name is reported here, but when requesting
529 		// data, we need to send the database name and the collection name
530 		// separately, so we have to remove the database prefix:
531 		ns.skipOver(m_database.chain("."));
532 		m_collection = ns;
533 		m_documents.length = count;
534 		m_readDoc = 0;
535 		m_insertDoc = 0;
536 	}
537 
538 	final void handleDocument(ref DocType doc)
539 	{
540 		m_documents[m_insertDoc++] = doc;
541 		m_totalReceived++;
542 	}
543 
544 	final ref int refCount() { return m_refCount; }
545 }
546 
547 /**
548 	Internal class implementing MongoCursorData for find queries
549  */
550 private deprecated class MongoQueryCursor(Q, R, S) : LegacyMongoCursorData!R {
551 	private {
552 		QueryFlags m_flags;
553 		Q m_query;
554 		S m_returnFieldSelector;
555 	}
556 
557 	this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
558 	{
559 		m_client = client;
560 		m_collection = collection;
561 		m_flags = flags;
562 		m_nskip = nskip;
563 		m_nret = nret;
564 		m_query = query;
565 		m_returnFieldSelector = return_field_selector;
566 	}
567 
568 	override void startIterating()
569 	@safe {
570 		auto conn = m_client.lockConnection();
571 
572 		ubyte[256] selector_buf = void;
573 		ubyte[256] query_buf = void;
574 
575 		Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } ();
576 
577 		Bson query;
578 		static if (is(Q == Bson)) {
579 			query = m_query;
580 		} else {
581 			query = () @trusted { return serializeToBson(m_query, query_buf); } ();
582 		}
583 
584 		Bson full_query;
585 
586 		if (!query["query"].isNull() || !query["$query"].isNull()) {
587 			// TODO: emit deprecation warning
588 			full_query = query;
589 		} else {
590 			full_query = Bson.emptyObject;
591 			full_query["$query"] = query;
592 		}
593 
594 		if (!m_sort.isNull()) full_query["orderby"] = m_sort;
595 
596 		conn.query!R(m_collection, m_flags, cast(int)m_nskip, cast(int)m_nret, full_query, selector, &handleReply, &handleDocument);
597 
598 		m_iterationStarted = true;
599 	}
600 }
601 
602 /**
603 	Internal class implementing MongoCursorData for already initialized generic cursors
604  */
605 private deprecated class MongoGenericCursor(DocType) : LegacyMongoCursorData!DocType {
606 	this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
607 	{
608 		m_client = client;
609 		m_collection = collection;
610 		m_cursor = cursor;
611 		m_iterationStarted = true;
612 		m_documents = existing_documents;
613 	}
614 
615 	override void startIterating()
616 	@safe {
617 		assert(false, "Calling startIterating on an opaque already initialized cursor");
618 	}
619 }