1 /**
2 	MongoDB cursor abstraction
3 
4 	Copyright: © 2012-2014 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.cursor;
9 
10 public import vibe.data.bson;
11 public import vibe.db.mongo.impl.crud;
12 
13 import vibe.db.mongo.connection;
14 import vibe.db.mongo.client;
15 
16 import core.time;
17 import std.array : array;
18 import std.algorithm : map, max, min, skipOver;
19 import std.exception;
20 import std.range : chain;
21 
22 
23 /**
24 	Represents a cursor for a MongoDB query.
25 
26 	Use foreach( doc; cursor ) to iterate over the list of documents.
27 
28 	This struct uses reference counting to destroy the underlying MongoDB cursor.
29 */
30 struct MongoCursor(DocType = Bson) {
31 	private IMongoCursorData!DocType m_data;
32 
33 	deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported")
34 	package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
35 	{
36 		// TODO: avoid memory allocation, if possible
37 		m_data = new MongoQueryCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector);
38 	}
39 
40 	deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported")
41 	package this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
42 	{
43 		// TODO: avoid memory allocation, if possible
44 		m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents);
45 	}
46 
47 	this(Q)(MongoClient client, string database, string collection, Q query, FindOptions options)
48 	{
49 		Bson command = Bson.emptyObject;
50 		command["find"] = Bson(collection);
51 		command["$db"] = Bson(database);
52 		static if (is(Q == Bson))
53 			command["filter"] = query;
54 		else
55 			command["filter"] = serializeToBson(query);
56 
57 		MongoConnection conn = client.lockConnection();
58 		enforceWireVersionConstraints(options, conn.description.maxWireVersion);
59 
60 		// https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#mapping-op_query-behavior-to-the-find-command-limit-and-batchsize-fields
61 		bool singleBatch;
62 		if (!options.limit.isNull && options.limit.get < 0)
63 		{
64 			singleBatch = true;
65 			options.limit = -options.limit.get;
66 			options.batchSize = cast(int)options.limit.get;
67 		}
68 		if (!options.batchSize.isNull && options.batchSize.get < 0)
69 		{
70 			singleBatch = true;
71 			options.batchSize = -options.batchSize.get;
72 		}
73 		if (singleBatch)
74 			command["singleBatch"] = Bson(true);
75 
76 		// https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#semantics-of-maxtimems-for-a-driver
77 		bool allowMaxTime = true;
78 		if (options.cursorType == CursorType.tailable
79 			|| options.cursorType == CursorType.tailableAwait)
80 			command["tailable"] = Bson(true);
81 		else
82 		{
83 			options.maxAwaitTimeMS.nullify();
84 			allowMaxTime = false;
85 		}
86 
87 		if (options.cursorType == CursorType.tailableAwait)
88 			command["awaitData"] = Bson(true);
89 		else
90 		{
91 			options.maxAwaitTimeMS.nullify();
92 			allowMaxTime = false;
93 		}
94 
95 		// see table: https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#find
96 		auto optionsBson = serializeToBson(options);
97 		foreach (string key, value; optionsBson.byKeyValue)
98 			command[key] = value;
99 
100 		this(client, command,
101 			options.batchSize.isNull ? 0 : options.batchSize.get,
102 			!options.maxAwaitTimeMS.isNull ? options.maxAwaitTimeMS.get.msecs
103 				: allowMaxTime && !options.maxTimeMS.isNull ? options.maxTimeMS.get.msecs
104 				: Duration.max);
105 	}
106 
107 	this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max)
108 	{
109 		// TODO: avoid memory allocation, if possible
110 		m_data = new MongoFindCursor!DocType(client, command, batchSize, getMoreMaxTime);
111 	}
112 
113 	this(this)
114 	{
115 		if( m_data ) m_data.refCount++;
116 	}
117 
118 	~this()
119 	{
120 		if( m_data && --m_data.refCount == 0 ){
121 			m_data.killCursors();
122 		}
123 	}
124 
125 	/**
126 		Returns true if there are no more documents for this cursor.
127 
128 		Throws: An exception if there is a query or communication error.
129 	*/
130 	@property bool empty() { return m_data ? m_data.empty() : true; }
131 
132 	/**
133 		Returns the current document of the response.
134 
135 		Use empty and popFront to iterate over the list of documents using an
136 		input range interface. Note that calling this function is only allowed
137 		if empty returns false.
138 	*/
139 	@property DocType front() { return m_data.front; }
140 
141 	/**
142 		Controls the order in which the query returns matching documents.
143 
144 		This method must be called before starting to iterate, or an exception
145 		will be thrown. If multiple calls to $(D sort()) are issued, only
146 		the last one will have an effect.
147 
148 		Params:
149 			order = A BSON object convertible value that defines the sort order
150 				of the result. This BSON object must be structured according to
151 				the MongoDB documentation (see below).
152 
153 		Returns: Reference to the modified original cursor instance.
154 
155 		Throws:
156 			An exception if there is a query or communication error.
157 			Also throws if the method was called after beginning of iteration.
158 
159 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort)
160 	*/
161 	MongoCursor sort(T)(T order)
162 	{
163 		m_data.sort(() @trusted { return serializeToBson(order); } ());
164 		return this;
165 	}
166 
167 	///
168 	@safe unittest {
169 		import vibe.core.log;
170 		import vibe.db.mongo.mongo;
171 
172 		void test()
173 		@safe {
174 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
175 			auto coll = db["testcoll"];
176 
177 			// find all entries in reverse date order
178 			foreach (entry; coll.find().sort(["date": -1]))
179 				() @safe { logInfo("Entry: %s", entry); } ();
180 
181 			// the same, but using a struct to avoid memory allocations
182 			static struct Order { int date; }
183 			foreach (entry; coll.find().sort(Order(-1)))
184 				logInfo("Entry: %s", entry);
185 		}
186 	}
187 
188 	/**
189 		Limits the number of documents that the cursor returns.
190 
191 		This method must be called before beginning iteration in order to have
192 		effect. If multiple calls to limit() are made, the one with the lowest
193 		limit will be chosen.
194 
195 		Params:
196 			count = The maximum number number of documents to return. A value
197 				of zero means unlimited.
198 
199 		Returns: the same cursor
200 
201 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit)
202 	*/
203 	MongoCursor limit(long count)
204 	{
205 		m_data.limit(count);
206 		return this;
207 	}
208 
209 	/**
210 		Skips a given number of elements at the beginning of the cursor.
211 
212 		This method must be called before beginning iteration in order to have
213 		effect. If multiple calls to skip() are made, the one with the maximum
214 		number will be chosen.
215 
216 		Params:
217 			count = The number of documents to skip.
218 
219 		Returns: the same cursor
220 
221 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip)
222 	*/
223 	MongoCursor skip(long count)
224 	{
225 		m_data.skip(count);
226 		return this;
227 	}
228 
229 	@safe unittest {
230 		import vibe.core.log;
231 		import vibe.db.mongo.mongo;
232 
233 		void test()
234 		@safe {
235 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
236 			auto coll = db["testcoll"];
237 
238 			try { coll.drop(); } catch (Exception) {}
239 
240 			for (int i = 0; i < 10000; i++)
241 				coll.insertOne(["i": i]);
242 
243 			static struct Order { int i; }
244 			auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array;
245 
246 			assert(data.length == 2000);
247 			assert(data[0]["i"].get!int == 2000);
248 			assert(data[$ - 1]["i"].get!int == 3999);
249 		}
250 	}
251 
252 	/**
253 		Advances the cursor to the next document of the response.
254 
255 		Note that calling this function is only allowed if empty returns false.
256 	*/
257 	void popFront() { m_data.popFront(); }
258 
259 	/**
260 		Iterates over all remaining documents.
261 
262 		Note that iteration is one-way - elements that have already been visited
263 		will not be visited again if another iteration is done.
264 
265 		Throws: An exception if there is a query or communication error.
266 	*/
267 	auto byPair()
268 	{
269 		import std.typecons : Tuple, tuple;
270 		static struct Rng {
271 			private IMongoCursorData!DocType data;
272 			@property bool empty() { return data.empty; }
273 			@property Tuple!(long, DocType) front() { return tuple(data.index, data.front); }
274 			void popFront() { data.popFront(); }
275 		}
276 		return Rng(m_data);
277 	}
278 }
279 
280 /// Actual iteration implementation details for MongoCursor. Abstracted using an
281 /// interface because we still have code for legacy (<3.6) MongoDB servers,
282 /// which may still used with the old legacy overloads.
283 private interface IMongoCursorData(DocType) {
284 	bool empty() @safe; /// Range implementation
285 	long index() @safe; /// Range implementation
286 	DocType front() @safe; /// Range implementation
287 	void popFront() @safe; /// Range implementation
288 	/// Before iterating, specify a MongoDB sort order
289 	void sort(Bson order) @safe;
290 	/// Before iterating, specify maximum number of returned items
291 	void limit(long count) @safe;
292 	/// Before iterating, skip the specified number of items (when sorted)
293 	void skip(long count) @safe;
294 	/// Kills the MongoDB cursor, further iteration attempts will result in
295 	/// errors. Call this in the destructor.
296 	void killCursors() @safe;
297 	/// Define an reference count property on the class, which is returned by
298 	/// reference with this method.
299 	ref int refCount() @safe;
300 }
301 
302 
303 /**
304 	Deprecated query internals exposed through MongoCursor.
305 */
306 private deprecated abstract class LegacyMongoCursorData(DocType) : IMongoCursorData!DocType {
307 	private {
308 		int m_refCount = 1;
309 		MongoClient m_client;
310 		string m_collection;
311 		long m_cursor;
312 		long m_nskip;
313 		int m_nret;
314 		Bson m_sort = Bson(null);
315 		int m_offset;
316 		size_t m_currentDoc = 0;
317 		DocType[] m_documents;
318 		bool m_iterationStarted = false;
319 		long m_limit = 0;
320 	}
321 
322 	final bool empty()
323 	@safe {
324 		if (!m_iterationStarted) startIterating();
325 		if (m_limit > 0 && index >= m_limit) {
326 			killCursors();
327 			return true;
328 		}
329 		if( m_currentDoc < m_documents.length )
330 			return false;
331 		if( m_cursor == 0 )
332 			return true;
333 
334 		auto conn = m_client.lockConnection();
335 		conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument);
336 		return m_currentDoc >= m_documents.length;
337 	}
338 
339 	final long index()
340 	@safe {
341 		return m_offset + m_currentDoc;
342 	}
343 
344 	final DocType front()
345 	@safe {
346 		if (!m_iterationStarted) startIterating();
347 		assert(!empty(), "Cursor has no more data.");
348 		return m_documents[m_currentDoc];
349 	}
350 
351 	final void sort(Bson order)
352 	@safe {
353 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
354 		m_sort = order;
355 	}
356 
357 	final void limit(long count)
358 	@safe {
359 		// A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit.
360 		if (count > 0) {
361 			if (m_nret == 0 || m_nret > count)
362 				m_nret = cast(int)min(count, 1024);
363 
364 			if (m_limit == 0 || m_limit > count)
365 				m_limit = count;
366 		}
367 	}
368 
369 	final void skip(long count)
370 	@safe {
371 		// A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip.
372 		m_nskip = max(m_nskip, count);
373 	}
374 
375 	final void popFront()
376 	@safe {
377 		if (!m_iterationStarted) startIterating();
378 		assert(!empty(), "Cursor has no more data.");
379 		m_currentDoc++;
380 	}
381 
382 	abstract void startIterating() @safe;
383 
384 	final void killCursors()
385 	@safe {
386 		if (m_cursor == 0) return;
387 		auto conn = m_client.lockConnection();
388 		conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ());
389 		m_cursor = 0;
390 	}
391 
392 	final void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs)
393 	{
394 		enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle.");
395 		enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?");
396 
397 		m_cursor = cursor;
398 		m_offset = first_doc;
399 		m_documents.length = num_docs;
400 		m_currentDoc = 0;
401 	}
402 
403 	final void handleDocument(size_t idx, ref DocType doc)
404 	{
405 		m_documents[idx] = doc;
406 	}
407 
408 	final ref int refCount() { return m_refCount; }
409 }
410 
411 /**
412 	Find + getMore internals exposed through MongoCursor. Unifies the old
413 	LegacyMongoCursorData approach, so it can be used both for find queries and
414 	for custom commands.
415 */
416 private class MongoFindCursor(DocType) : IMongoCursorData!DocType {
417 	private {
418 		int m_refCount = 1;
419 		MongoClient m_client;
420 		Bson m_findQuery;
421 		string m_database;
422 		string m_collection;
423 		long m_cursor;
424 		int m_batchSize;
425 		Duration m_maxTime;
426 		long m_totalReceived;
427 		size_t m_readDoc;
428 		size_t m_insertDoc;
429 		DocType[] m_documents;
430 		bool m_iterationStarted = false;
431 		long m_queryLimit;
432 	}
433 
434 	this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max)
435 	{
436 		m_client = client;
437 		m_findQuery = command;
438 		m_batchSize = batchSize;
439 		m_maxTime = getMoreMaxTime;
440 		m_database = command["$db"].opt!string;
441 	}
442 
443 	bool empty()
444 	@safe {
445 		if (!m_iterationStarted) startIterating();
446 		if (m_queryLimit > 0 && index >= m_queryLimit) {
447 			killCursors();
448 			return true;
449 		}
450 		if( m_readDoc < m_documents.length )
451 			return false;
452 		if( m_cursor == 0 )
453 			return true;
454 
455 		auto conn = m_client.lockConnection();
456 		conn.getMore!DocType(m_cursor, m_database, m_collection, m_batchSize,
457 			&handleReply, &handleDocument, m_maxTime);
458 		return m_readDoc >= m_documents.length;
459 	}
460 
461 	final long index()
462 	@safe {
463 		assert(m_totalReceived >= m_documents.length);
464 		return m_totalReceived - m_documents.length + m_readDoc;
465 	}
466 
467 	final DocType front()
468 	@safe {
469 		if (!m_iterationStarted) startIterating();
470 		assert(!empty(), "Cursor has no more data.");
471 		return m_documents[m_readDoc];
472 	}
473 
474 	final void sort(Bson order)
475 	@safe {
476 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
477 		m_findQuery["sort"] = order;
478 	}
479 
480 	final void limit(long count)
481 	@safe {
482 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
483 		m_findQuery["limit"] = Bson(count);
484 	}
485 
486 	final void skip(long count)
487 	@safe {
488 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
489 		m_findQuery["skip"] = Bson(count);
490 	}
491 
492 	final void popFront()
493 	@safe {
494 		if (!m_iterationStarted) startIterating();
495 		assert(!empty(), "Cursor has no more data.");
496 		m_readDoc++;
497 	}
498 
499 	private void startIterating()
500 	@safe {
501 		auto conn = m_client.lockConnection();
502 		m_totalReceived = 0;
503 		m_queryLimit = m_findQuery["limit"].opt!long(0);
504 		conn.startFind!DocType(m_findQuery, &handleReply, &handleDocument);
505 		m_iterationStarted = true;
506 	}
507 
508 	final void killCursors()
509 	@safe {
510 		if (m_cursor == 0) return;
511 		auto conn = m_client.lockConnection();
512 		conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ());
513 		m_cursor = 0;
514 	}
515 
516 	final void handleReply(long id, string ns, size_t count)
517 	{
518 		m_cursor = id;
519 		// The qualified collection name is reported here, but when requesting
520 		// data, we need to send the database name and the collection name
521 		// separately, so we have to remove the database prefix:
522 		ns.skipOver(m_database.chain("."));
523 		m_collection = ns;
524 		m_documents.length = count;
525 		m_readDoc = 0;
526 		m_insertDoc = 0;
527 	}
528 
529 	final void handleDocument(ref DocType doc)
530 	{
531 		m_documents[m_insertDoc++] = doc;
532 		m_totalReceived++;
533 	}
534 
535 	final ref int refCount() { return m_refCount; }
536 }
537 
538 /**
539 	Internal class implementing MongoCursorData for find queries
540  */
541 private deprecated class MongoQueryCursor(Q, R, S) : LegacyMongoCursorData!R {
542 	private {
543 		QueryFlags m_flags;
544 		Q m_query;
545 		S m_returnFieldSelector;
546 	}
547 
548 	this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
549 	{
550 		m_client = client;
551 		m_collection = collection;
552 		m_flags = flags;
553 		m_nskip = nskip;
554 		m_nret = nret;
555 		m_query = query;
556 		m_returnFieldSelector = return_field_selector;
557 	}
558 
559 	override void startIterating()
560 	@safe {
561 		auto conn = m_client.lockConnection();
562 
563 		ubyte[256] selector_buf = void;
564 		ubyte[256] query_buf = void;
565 
566 		Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } ();
567 
568 		Bson query;
569 		static if (is(Q == Bson)) {
570 			query = m_query;
571 		} else {
572 			query = () @trusted { return serializeToBson(m_query, query_buf); } ();
573 		}
574 
575 		Bson full_query;
576 
577 		if (!query["query"].isNull() || !query["$query"].isNull()) {
578 			// TODO: emit deprecation warning
579 			full_query = query;
580 		} else {
581 			full_query = Bson.emptyObject;
582 			full_query["$query"] = query;
583 		}
584 
585 		if (!m_sort.isNull()) full_query["orderby"] = m_sort;
586 
587 		conn.query!R(m_collection, m_flags, cast(int)m_nskip, cast(int)m_nret, full_query, selector, &handleReply, &handleDocument);
588 
589 		m_iterationStarted = true;
590 	}
591 }
592 
593 /**
594 	Internal class implementing MongoCursorData for already initialized generic cursors
595  */
596 private deprecated class MongoGenericCursor(DocType) : LegacyMongoCursorData!DocType {
597 	this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
598 	{
599 		m_client = client;
600 		m_collection = collection;
601 		m_cursor = cursor;
602 		m_iterationStarted = true;
603 		m_documents = existing_documents;
604 	}
605 
606 	override void startIterating()
607 	@safe {
608 		assert(false, "Calling startIterating on an opaque already initialized cursor");
609 	}
610 }