1 /**
2 	MongoDB cursor abstraction
3 
4 	Copyright: © 2012-2014 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.cursor;
9 
10 public import vibe.data.bson;
11 public import vibe.db.mongo.impl.crud;
12 
13 import vibe.db.mongo.connection;
14 import vibe.db.mongo.client;
15 
16 import std.array : array;
17 import std.algorithm : map, max, min;
18 import std.exception;
19 
20 import core.time;
21 
22 /**
23 	Represents a cursor for a MongoDB query.
24 
25 	Use foreach( doc; cursor ) to iterate over the list of documents.
26 
27 	This struct uses reference counting to destroy the underlying MongoDB cursor.
28 */
29 struct MongoCursor(DocType = Bson) {
30 	private IMongoCursorData!DocType m_data;
31 
32 	deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported")
33 	package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
34 	{
35 		// TODO: avoid memory allocation, if possible
36 		m_data = new MongoQueryCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector);
37 	}
38 
39 	deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported")
40 	package this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
41 	{
42 		// TODO: avoid memory allocation, if possible
43 		m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents);
44 	}
45 
46 	this(Q)(MongoClient client, string database, string collection, Q query, FindOptions options)
47 	{
48 		Bson command = Bson.emptyObject;
49 		command["find"] = Bson(collection);
50 		command["$db"] = Bson(database);
51 		static if (is(Q == Bson))
52 			command["filter"] = query;
53 		else
54 			command["filter"] = serializeToBson(query);
55 
56 		MongoConnection conn = client.lockConnection();
57 		enforceWireVersionConstraints(options, conn.description.maxWireVersion);
58 
59 		// https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#mapping-op_query-behavior-to-the-find-command-limit-and-batchsize-fields
60 		bool singleBatch;
61 		if (!options.limit.isNull && options.limit.get < 0)
62 		{
63 			singleBatch = true;
64 			options.limit = -options.limit.get;
65 			options.batchSize = cast(int)options.limit.get;
66 		}
67 		if (!options.batchSize.isNull && options.batchSize.get < 0)
68 		{
69 			singleBatch = true;
70 			options.batchSize = -options.batchSize.get;
71 		}
72 		if (singleBatch)
73 			command["singleBatch"] = Bson(true);
74 
75 		// https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#semantics-of-maxtimems-for-a-driver
76 		bool allowMaxTime = true;
77 		if (options.cursorType == CursorType.tailable
78 			|| options.cursorType == CursorType.tailableAwait)
79 			command["tailable"] = Bson(true);
80 		else
81 		{
82 			options.maxAwaitTimeMS.nullify();
83 			allowMaxTime = false;
84 		}
85 
86 		if (options.cursorType == CursorType.tailableAwait)
87 			command["awaitData"] = Bson(true);
88 		else
89 		{
90 			options.maxAwaitTimeMS.nullify();
91 			allowMaxTime = false;
92 		}
93 
94 		// see table: https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#find
95 		auto optionsBson = serializeToBson(options);
96 		foreach (string key, value; optionsBson.byKeyValue)
97 			command[key] = value;
98 
99 		this(client, command,
100 			options.batchSize.isNull ? 0 : options.batchSize.get,
101 			!options.maxAwaitTimeMS.isNull ? options.maxAwaitTimeMS.get.msecs
102 				: allowMaxTime && !options.maxTimeMS.isNull ? options.maxTimeMS.get.msecs
103 				: Duration.max);
104 	}
105 
106 	this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max)
107 	{
108 		// TODO: avoid memory allocation, if possible
109 		m_data = new MongoFindCursor!DocType(client, command, batchSize, getMoreMaxTime);
110 	}
111 
112 	this(this)
113 	{
114 		if( m_data ) m_data.refCount++;
115 	}
116 
117 	~this()
118 	{
119 		if( m_data && --m_data.refCount == 0 ){
120 			m_data.killCursors();
121 		}
122 	}
123 
124 	/**
125 		Returns true if there are no more documents for this cursor.
126 
127 		Throws: An exception if there is a query or communication error.
128 	*/
129 	@property bool empty() { return m_data ? m_data.empty() : true; }
130 
131 	/**
132 		Returns the current document of the response.
133 
134 		Use empty and popFront to iterate over the list of documents using an
135 		input range interface. Note that calling this function is only allowed
136 		if empty returns false.
137 	*/
138 	@property DocType front() { return m_data.front; }
139 
140 	/**
141 		Controls the order in which the query returns matching documents.
142 
143 		This method must be called before starting to iterate, or an exception
144 		will be thrown. If multiple calls to $(D sort()) are issued, only
145 		the last one will have an effect.
146 
147 		Params:
148 			order = A BSON object convertible value that defines the sort order
149 				of the result. This BSON object must be structured according to
150 				the MongoDB documentation (see below).
151 
152 		Returns: Reference to the modified original cursor instance.
153 
154 		Throws:
155 			An exception if there is a query or communication error.
156 			Also throws if the method was called after beginning of iteration.
157 
158 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort)
159 	*/
160 	MongoCursor sort(T)(T order)
161 	{
162 		m_data.sort(() @trusted { return serializeToBson(order); } ());
163 		return this;
164 	}
165 
166 	///
167 	@safe unittest {
168 		import vibe.core.log;
169 		import vibe.db.mongo.mongo;
170 
171 		void test()
172 		@safe {
173 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
174 			auto coll = db["testcoll"];
175 
176 			// find all entries in reverse date order
177 			foreach (entry; coll.find().sort(["date": -1]))
178 				() @safe { logInfo("Entry: %s", entry); } ();
179 
180 			// the same, but using a struct to avoid memory allocations
181 			static struct Order { int date; }
182 			foreach (entry; coll.find().sort(Order(-1)))
183 				logInfo("Entry: %s", entry);
184 		}
185 	}
186 
187 	/**
188 		Limits the number of documents that the cursor returns.
189 
190 		This method must be called before beginning iteration in order to have
191 		effect. If multiple calls to limit() are made, the one with the lowest
192 		limit will be chosen.
193 
194 		Params:
195 			count = The maximum number number of documents to return. A value
196 				of zero means unlimited.
197 
198 		Returns: the same cursor
199 
200 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit)
201 	*/
202 	MongoCursor limit(long count)
203 	{
204 		m_data.limit(count);
205 		return this;
206 	}
207 
208 	/**
209 		Skips a given number of elements at the beginning of the cursor.
210 
211 		This method must be called before beginning iteration in order to have
212 		effect. If multiple calls to skip() are made, the one with the maximum
213 		number will be chosen.
214 
215 		Params:
216 			count = The number of documents to skip.
217 
218 		Returns: the same cursor
219 
220 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip)
221 	*/
222 	MongoCursor skip(long count)
223 	{
224 		m_data.skip(count);
225 		return this;
226 	}
227 
228 	@safe unittest {
229 		import vibe.core.log;
230 		import vibe.db.mongo.mongo;
231 
232 		void test()
233 		@safe {
234 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
235 			auto coll = db["testcoll"];
236 
237 			try { coll.drop(); } catch (Exception) {}
238 
239 			for (int i = 0; i < 10000; i++)
240 				coll.insertOne(["i": i]);
241 
242 			static struct Order { int i; }
243 			auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array;
244 
245 			assert(data.length == 2000);
246 			assert(data[0]["i"].get!int == 2000);
247 			assert(data[$ - 1]["i"].get!int == 3999);
248 		}
249 	}
250 
251 	/**
252 		Advances the cursor to the next document of the response.
253 
254 		Note that calling this function is only allowed if empty returns false.
255 	*/
256 	void popFront() { m_data.popFront(); }
257 
258 	/**
259 		Iterates over all remaining documents.
260 
261 		Note that iteration is one-way - elements that have already been visited
262 		will not be visited again if another iteration is done.
263 
264 		Throws: An exception if there is a query or communication error.
265 	*/
266 	auto byPair()
267 	{
268 		import std.typecons : Tuple, tuple;
269 		static struct Rng {
270 			private IMongoCursorData!DocType data;
271 			@property bool empty() { return data.empty; }
272 			@property Tuple!(long, DocType) front() { return tuple(data.index, data.front); }
273 			void popFront() { data.popFront(); }
274 		}
275 		return Rng(m_data);
276 	}
277 }
278 
279 /// Actual iteration implementation details for MongoCursor. Abstracted using an
280 /// interface because we still have code for legacy (<3.6) MongoDB servers,
281 /// which may still used with the old legacy overloads.
282 private interface IMongoCursorData(DocType) {
283 	bool empty() @safe; /// Range implementation
284 	long index() @safe; /// Range implementation
285 	DocType front() @safe; /// Range implementation
286 	void popFront() @safe; /// Range implementation
287 	/// Before iterating, specify a MongoDB sort order
288 	void sort(Bson order) @safe;
289 	/// Before iterating, specify maximum number of returned items
290 	void limit(long count) @safe;
291 	/// Before iterating, skip the specified number of items (when sorted)
292 	void skip(long count) @safe;
293 	/// Kills the MongoDB cursor, further iteration attempts will result in
294 	/// errors. Call this in the destructor.
295 	void killCursors() @safe;
296 	/// Define an reference count property on the class, which is returned by
297 	/// reference with this method.
298 	ref int refCount() @safe;
299 }
300 
301 
302 /**
303 	Deprecated query internals exposed through MongoCursor.
304 */
305 private deprecated abstract class LegacyMongoCursorData(DocType) : IMongoCursorData!DocType {
306 	private {
307 		int m_refCount = 1;
308 		MongoClient m_client;
309 		string m_collection;
310 		long m_cursor;
311 		long m_nskip;
312 		int m_nret;
313 		Bson m_sort = Bson(null);
314 		int m_offset;
315 		size_t m_currentDoc = 0;
316 		DocType[] m_documents;
317 		bool m_iterationStarted = false;
318 		long m_limit = 0;
319 	}
320 
321 	final bool empty()
322 	@safe {
323 		if (!m_iterationStarted) startIterating();
324 		if (m_limit > 0 && index >= m_limit) {
325 			killCursors();
326 			return true;
327 		}
328 		if( m_currentDoc < m_documents.length )
329 			return false;
330 		if( m_cursor == 0 )
331 			return true;
332 
333 		auto conn = m_client.lockConnection();
334 		conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument);
335 		return m_currentDoc >= m_documents.length;
336 	}
337 
338 	final long index()
339 	@safe {
340 		return m_offset + m_currentDoc;
341 	}
342 
343 	final DocType front()
344 	@safe {
345 		if (!m_iterationStarted) startIterating();
346 		assert(!empty(), "Cursor has no more data.");
347 		return m_documents[m_currentDoc];
348 	}
349 
350 	final void sort(Bson order)
351 	@safe {
352 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
353 		m_sort = order;
354 	}
355 
356 	final void limit(long count)
357 	@safe {
358 		// A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit.
359 		if (count > 0) {
360 			if (m_nret == 0 || m_nret > count)
361 				m_nret = cast(int)min(count, 1024);
362 
363 			if (m_limit == 0 || m_limit > count)
364 				m_limit = count;
365 		}
366 	}
367 
368 	final void skip(long count)
369 	@safe {
370 		// A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip.
371 		m_nskip = max(m_nskip, count);
372 	}
373 
374 	final void popFront()
375 	@safe {
376 		if (!m_iterationStarted) startIterating();
377 		assert(!empty(), "Cursor has no more data.");
378 		m_currentDoc++;
379 	}
380 
381 	abstract void startIterating() @safe;
382 
383 	final void killCursors()
384 	@safe {
385 		if (m_cursor == 0) return;
386 		auto conn = m_client.lockConnection();
387 		conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ());
388 		m_cursor = 0;
389 	}
390 
391 	final void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs)
392 	{
393 		enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle.");
394 		enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?");
395 
396 		m_cursor = cursor;
397 		m_offset = first_doc;
398 		m_documents.length = num_docs;
399 		m_currentDoc = 0;
400 	}
401 
402 	final void handleDocument(size_t idx, ref DocType doc)
403 	{
404 		m_documents[idx] = doc;
405 	}
406 
407 	final ref int refCount() { return m_refCount; }
408 }
409 
410 /**
411 	Find + getMore internals exposed through MongoCursor. Unifies the old
412 	LegacyMongoCursorData approach, so it can be used both for find queries and
413 	for custom commands.
414 */
415 private class MongoFindCursor(DocType) : IMongoCursorData!DocType {
416 	private {
417 		int m_refCount = 1;
418 		MongoClient m_client;
419 		Bson m_findQuery;
420 		string m_database;
421 		string m_collection;
422 		long m_cursor;
423 		int m_batchSize;
424 		Duration m_maxTime;
425 		long m_totalReceived;
426 		size_t m_readDoc;
427 		size_t m_insertDoc;
428 		DocType[] m_documents;
429 		bool m_iterationStarted = false;
430 		long m_queryLimit;
431 	}
432 
433 	this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max)
434 	{
435 		m_client = client;
436 		m_findQuery = command;
437 		m_batchSize = batchSize;
438 		m_maxTime = getMoreMaxTime;
439 		m_database = command["$db"].opt!string;
440 	}
441 
442 	bool empty()
443 	@safe {
444 		if (!m_iterationStarted) startIterating();
445 		if (m_queryLimit > 0 && index >= m_queryLimit) {
446 			killCursors();
447 			return true;
448 		}
449 		if( m_readDoc < m_documents.length )
450 			return false;
451 		if( m_cursor == 0 )
452 			return true;
453 
454 		auto conn = m_client.lockConnection();
455 		conn.getMore!DocType(m_cursor, m_database, m_collection, m_batchSize,
456 			&handleReply, &handleDocument, m_maxTime);
457 		return m_readDoc >= m_documents.length;
458 	}
459 
460 	final long index()
461 	@safe {
462 		assert(m_totalReceived >= m_documents.length);
463 		return m_totalReceived - m_documents.length + m_readDoc;
464 	}
465 
466 	final DocType front()
467 	@safe {
468 		if (!m_iterationStarted) startIterating();
469 		assert(!empty(), "Cursor has no more data.");
470 		return m_documents[m_readDoc];
471 	}
472 
473 	final void sort(Bson order)
474 	@safe {
475 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
476 		m_findQuery["sort"] = order;
477 	}
478 
479 	final void limit(long count)
480 	@safe {
481 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
482 		m_findQuery["limit"] = Bson(count);
483 	}
484 
485 	final void skip(long count)
486 	@safe {
487 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
488 		m_findQuery["skip"] = Bson(count);
489 	}
490 
491 	final void popFront()
492 	@safe {
493 		if (!m_iterationStarted) startIterating();
494 		assert(!empty(), "Cursor has no more data.");
495 		m_readDoc++;
496 	}
497 
498 	private void startIterating()
499 	@safe {
500 		auto conn = m_client.lockConnection();
501 		m_totalReceived = 0;
502 		m_queryLimit = m_findQuery["limit"].opt!long(0);
503 		conn.startFind!DocType(m_findQuery, &handleReply, &handleDocument);
504 		m_iterationStarted = true;
505 	}
506 
507 	final void killCursors()
508 	@safe {
509 		if (m_cursor == 0) return;
510 		auto conn = m_client.lockConnection();
511 		conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ());
512 		m_cursor = 0;
513 	}
514 
515 	final void handleReply(long id, string ns, size_t count)
516 	{
517 		m_cursor = id;
518 		m_collection = ns;
519 		m_documents.length = count;
520 		m_readDoc = 0;
521 		m_insertDoc = 0;
522 	}
523 
524 	final void handleDocument(ref DocType doc)
525 	{
526 		m_documents[m_insertDoc++] = doc;
527 		m_totalReceived++;
528 	}
529 
530 	final ref int refCount() { return m_refCount; }
531 }
532 
533 /**
534 	Internal class implementing MongoCursorData for find queries
535  */
536 private deprecated class MongoQueryCursor(Q, R, S) : LegacyMongoCursorData!R {
537 	private {
538 		QueryFlags m_flags;
539 		Q m_query;
540 		S m_returnFieldSelector;
541 	}
542 
543 	this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
544 	{
545 		m_client = client;
546 		m_collection = collection;
547 		m_flags = flags;
548 		m_nskip = nskip;
549 		m_nret = nret;
550 		m_query = query;
551 		m_returnFieldSelector = return_field_selector;
552 	}
553 
554 	override void startIterating()
555 	@safe {
556 		auto conn = m_client.lockConnection();
557 
558 		ubyte[256] selector_buf = void;
559 		ubyte[256] query_buf = void;
560 
561 		Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } ();
562 
563 		Bson query;
564 		static if (is(Q == Bson)) {
565 			query = m_query;
566 		} else {
567 			query = () @trusted { return serializeToBson(m_query, query_buf); } ();
568 		}
569 
570 		Bson full_query;
571 
572 		if (!query["query"].isNull() || !query["$query"].isNull()) {
573 			// TODO: emit deprecation warning
574 			full_query = query;
575 		} else {
576 			full_query = Bson.emptyObject;
577 			full_query["$query"] = query;
578 		}
579 
580 		if (!m_sort.isNull()) full_query["orderby"] = m_sort;
581 
582 		conn.query!R(m_collection, m_flags, cast(int)m_nskip, cast(int)m_nret, full_query, selector, &handleReply, &handleDocument);
583 
584 		m_iterationStarted = true;
585 	}
586 }
587 
588 /**
589 	Internal class implementing MongoCursorData for already initialized generic cursors
590  */
591 private deprecated class MongoGenericCursor(DocType) : LegacyMongoCursorData!DocType {
592 	this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
593 	{
594 		m_client = client;
595 		m_collection = collection;
596 		m_cursor = cursor;
597 		m_iterationStarted = true;
598 		m_documents = existing_documents;
599 	}
600 
601 	override void startIterating()
602 	@safe {
603 		assert(false, "Calling startIterating on an opaque already initialized cursor");
604 	}
605 }