1 /**
2 	MongoDB cursor abstraction
4 	Copyright: © 2012-2014 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.cursor;
10 public import vibe.data.bson;
11 public import vibe.db.mongo.impl.crud;
13 import vibe.db.mongo.connection;
14 import vibe.db.mongo.client;
16 import std.array : array;
17 import std.algorithm : map, max, min;
18 import std.exception;
20 import core.time;
22 /**
23 	Represents a cursor for a MongoDB query.
25 	Use foreach( doc; cursor ) to iterate over the list of documents.
27 	This struct uses reference counting to destroy the underlying MongoDB cursor.
28 */
29 struct MongoCursor(DocType = Bson) {
30 	private IMongoCursorData!DocType m_data;
32 	deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported")
33 	package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
34 	{
35 		// TODO: avoid memory allocation, if possible
36 		m_data = new MongoQueryCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector);
37 	}
39 	deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported")
40 	package this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
41 	{
42 		// TODO: avoid memory allocation, if possible
43 		m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents);
44 	}
46 	this(Q)(MongoClient client, string database, string collection, Q query, FindOptions options)
47 	{
48 		Bson command = Bson.emptyObject;
49 		command["find"] = Bson(collection);
50 		command["$db"] = Bson(database);
51 		static if (is(Q == Bson))
52 			command["filter"] = query;
53 		else
54 			command["filter"] = serializeToBson(query);
56 		MongoConnection conn = client.lockConnection();
57 		enforceWireVersionConstraints(options, conn.description.maxWireVersion);
59 		// https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#mapping-op_query-behavior-to-the-find-command-limit-and-batchsize-fields
60 		bool singleBatch;
61 		if (!options.limit.isNull && options.limit.get < 0)
62 		{
63 			singleBatch = true;
64 			options.limit = -options.limit.get;
65 			options.batchSize = cast(int)options.limit.get;
66 		}
67 		if (!options.batchSize.isNull && options.batchSize.get < 0)
68 		{
69 			singleBatch = true;
70 			options.batchSize = -options.batchSize.get;
71 		}
72 		if (singleBatch)
73 			command["singleBatch"] = Bson(true);
75 		// https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#semantics-of-maxtimems-for-a-driver
76 		bool allowMaxTime = true;
77 		if (options.cursorType == CursorType.tailable
78 			|| options.cursorType == CursorType.tailableAwait)
79 			command["tailable"] = Bson(true);
80 		else
81 		{
82 			options.maxAwaitTimeMS.nullify();
83 			allowMaxTime = false;
84 		}
86 		if (options.cursorType == CursorType.tailableAwait)
87 			command["awaitData"] = Bson(true);
88 		else
89 		{
90 			options.maxAwaitTimeMS.nullify();
91 			allowMaxTime = false;
92 		}
94 		// see table: https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#find
95 		auto optionsBson = serializeToBson(options);
96 		foreach (string key, value; optionsBson.byKeyValue)
97 			command[key] = value;
99 		this(client, command,
100 			options.batchSize.isNull ? 0 : options.batchSize.get,
101 			!options.maxAwaitTimeMS.isNull ? options.maxAwaitTimeMS.get.msecs
102 				: allowMaxTime && !options.maxTimeMS.isNull ? options.maxTimeMS.get.msecs
103 				: Duration.max);
104 	}
106 	this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max)
107 	{
108 		// TODO: avoid memory allocation, if possible
109 		m_data = new MongoFindCursor!DocType(client, command, batchSize, getMoreMaxTime);
110 	}
112 	this(this)
113 	{
114 		if( m_data ) m_data.refCount++;
115 	}
117 	~this()
118 	{
119 		if( m_data && --m_data.refCount == 0 ){
120 			m_data.killCursors();
121 		}
122 	}
124 	/**
125 		Returns true if there are no more documents for this cursor.
127 		Throws: An exception if there is a query or communication error.
128 	*/
129 	@property bool empty() { return m_data ? m_data.empty() : true; }
131 	/**
132 		Returns the current document of the response.
134 		Use empty and popFront to iterate over the list of documents using an
135 		input range interface. Note that calling this function is only allowed
136 		if empty returns false.
137 	*/
138 	@property DocType front() { return m_data.front; }
140 	/**
141 		Controls the order in which the query returns matching documents.
143 		This method must be called before starting to iterate, or an exception
144 		will be thrown. If multiple calls to $(D sort()) are issued, only
145 		the last one will have an effect.
147 		Params:
148 			order = A BSON object convertible value that defines the sort order
149 				of the result. This BSON object must be structured according to
150 				the MongoDB documentation (see below).
152 		Returns: Reference to the modified original cursor instance.
154 		Throws:
155 			An exception if there is a query or communication error.
156 			Also throws if the method was called after beginning of iteration.
158 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort)
159 	*/
160 	MongoCursor sort(T)(T order)
161 	{
162 		m_data.sort(() @trusted { return serializeToBson(order); } ());
163 		return this;
164 	}
166 	///
167 	@safe unittest {
168 		import vibe.core.log;
169 		import vibe.db.mongo.mongo;
171 		void test()
172 		@safe {
173 			auto db = connectMongoDB("").getDatabase("test");
174 			auto coll = db["testcoll"];
176 			// find all entries in reverse date order
177 			foreach (entry; coll.find().sort(["date": -1]))
178 				() @safe { logInfo("Entry: %s", entry); } ();
180 			// the same, but using a struct to avoid memory allocations
181 			static struct Order { int date; }
182 			foreach (entry; coll.find().sort(Order(-1)))
183 				logInfo("Entry: %s", entry);
184 		}
185 	}
187 	/**
188 		Limits the number of documents that the cursor returns.
190 		This method must be called before beginning iteration in order to have
191 		effect. If multiple calls to limit() are made, the one with the lowest
192 		limit will be chosen.
194 		Params:
195 			count = The maximum number number of documents to return. A value
196 				of zero means unlimited.
198 		Returns: the same cursor
200 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit)
201 	*/
202 	MongoCursor limit(long count)
203 	{
204 		m_data.limit(count);
205 		return this;
206 	}
208 	/**
209 		Skips a given number of elements at the beginning of the cursor.
211 		This method must be called before beginning iteration in order to have
212 		effect. If multiple calls to skip() are made, the one with the maximum
213 		number will be chosen.
215 		Params:
216 			count = The number of documents to skip.
218 		Returns: the same cursor
220 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip)
221 	*/
222 	MongoCursor skip(long count)
223 	{
224 		m_data.skip(count);
225 		return this;
226 	}
228 	@safe unittest {
229 		import vibe.core.log;
230 		import vibe.db.mongo.mongo;
232 		void test()
233 		@safe {
234 			auto db = connectMongoDB("").getDatabase("test");
235 			auto coll = db["testcoll"];
237 			try { coll.drop(); } catch (Exception) {}
239 			for (int i = 0; i < 10000; i++)
240 				coll.insertOne(["i": i]);
242 			static struct Order { int i; }
243 			auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array;
245 			assert(data.length == 2000);
246 			assert(data[0]["i"].get!int == 2000);
247 			assert(data[$ - 1]["i"].get!int == 3999);
248 		}
249 	}
251 	/**
252 		Advances the cursor to the next document of the response.
254 		Note that calling this function is only allowed if empty returns false.
255 	*/
256 	void popFront() { m_data.popFront(); }
258 	/**
259 		Iterates over all remaining documents.
261 		Note that iteration is one-way - elements that have already been visited
262 		will not be visited again if another iteration is done.
264 		Throws: An exception if there is a query or communication error.
265 	*/
266 	auto byPair()
267 	{
268 		import std.typecons : Tuple, tuple;
269 		static struct Rng {
270 			private IMongoCursorData!DocType data;
271 			@property bool empty() { return data.empty; }
272 			@property Tuple!(long, DocType) front() { return tuple(data.index, data.front); }
273 			void popFront() { data.popFront(); }
274 		}
275 		return Rng(m_data);
276 	}
277 }
279 /// Actual iteration implementation details for MongoCursor. Abstracted using an
280 /// interface because we still have code for legacy (<3.6) MongoDB servers,
281 /// which may still used with the old legacy overloads.
282 private interface IMongoCursorData(DocType) {
283 	bool empty() @safe; /// Range implementation
284 	long index() @safe; /// Range implementation
285 	DocType front() @safe; /// Range implementation
286 	void popFront() @safe; /// Range implementation
287 	/// Before iterating, specify a MongoDB sort order
288 	void sort(Bson order) @safe;
289 	/// Before iterating, specify maximum number of returned items
290 	void limit(long count) @safe;
291 	/// Before iterating, skip the specified number of items (when sorted)
292 	void skip(long count) @safe;
293 	/// Kills the MongoDB cursor, further iteration attempts will result in
294 	/// errors. Call this in the destructor.
295 	void killCursors() @safe;
296 	/// Define an reference count property on the class, which is returned by
297 	/// reference with this method.
298 	ref int refCount() @safe;
299 }
302 /**
303 	Deprecated query internals exposed through MongoCursor.
304 */
305 private deprecated abstract class LegacyMongoCursorData(DocType) : IMongoCursorData!DocType {
306 	private {
307 		int m_refCount = 1;
308 		MongoClient m_client;
309 		string m_collection;
310 		long m_cursor;
311 		long m_nskip;
312 		int m_nret;
313 		Bson m_sort = Bson(null);
314 		int m_offset;
315 		size_t m_currentDoc = 0;
316 		DocType[] m_documents;
317 		bool m_iterationStarted = false;
318 		long m_limit = 0;
319 	}
321 	final bool empty()
322 	@safe {
323 		if (!m_iterationStarted) startIterating();
324 		if (m_limit > 0 && index >= m_limit) {
325 			killCursors();
326 			return true;
327 		}
328 		if( m_currentDoc < m_documents.length )
329 			return false;
330 		if( m_cursor == 0 )
331 			return true;
333 		auto conn = m_client.lockConnection();
334 		conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument);
335 		return m_currentDoc >= m_documents.length;
336 	}
338 	final long index()
339 	@safe {
340 		return m_offset + m_currentDoc;
341 	}
343 	final DocType front()
344 	@safe {
345 		if (!m_iterationStarted) startIterating();
346 		assert(!empty(), "Cursor has no more data.");
347 		return m_documents[m_currentDoc];
348 	}
350 	final void sort(Bson order)
351 	@safe {
352 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
353 		m_sort = order;
354 	}
356 	final void limit(long count)
357 	@safe {
358 		// A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit.
359 		if (count > 0) {
360 			if (m_nret == 0 || m_nret > count)
361 				m_nret = cast(int)min(count, 1024);
363 			if (m_limit == 0 || m_limit > count)
364 				m_limit = count;
365 		}
366 	}
368 	final void skip(long count)
369 	@safe {
370 		// A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip.
371 		m_nskip = max(m_nskip, count);
372 	}
374 	final void popFront()
375 	@safe {
376 		if (!m_iterationStarted) startIterating();
377 		assert(!empty(), "Cursor has no more data.");
378 		m_currentDoc++;
379 	}
381 	abstract void startIterating() @safe;
383 	final void killCursors()
384 	@safe {
385 		if (m_cursor == 0) return;
386 		auto conn = m_client.lockConnection();
387 		conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ());
388 		m_cursor = 0;
389 	}
391 	final void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs)
392 	{
393 		enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle.");
394 		enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?");
396 		m_cursor = cursor;
397 		m_offset = first_doc;
398 		m_documents.length = num_docs;
399 		m_currentDoc = 0;
400 	}
402 	final void handleDocument(size_t idx, ref DocType doc)
403 	{
404 		m_documents[idx] = doc;
405 	}
407 	final ref int refCount() { return m_refCount; }
408 }
410 /**
411 	Find + getMore internals exposed through MongoCursor. Unifies the old
412 	LegacyMongoCursorData approach, so it can be used both for find queries and
413 	for custom commands.
414 */
415 private class MongoFindCursor(DocType) : IMongoCursorData!DocType {
416 	private {
417 		int m_refCount = 1;
418 		MongoClient m_client;
419 		Bson m_findQuery;
420 		string m_database;
421 		string m_collection;
422 		long m_cursor;
423 		int m_batchSize;
424 		Duration m_maxTime;
425 		long m_totalReceived;
426 		size_t m_readDoc;
427 		size_t m_insertDoc;
428 		DocType[] m_documents;
429 		bool m_iterationStarted = false;
430 		long m_queryLimit;
431 	}
433 	this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max)
434 	{
435 		m_client = client;
436 		m_findQuery = command;
437 		m_batchSize = batchSize;
438 		m_maxTime = getMoreMaxTime;
439 		m_database = command["$db"].opt!string;
440 	}
442 	bool empty()
443 	@safe {
444 		if (!m_iterationStarted) startIterating();
445 		if (m_queryLimit > 0 && index >= m_queryLimit) {
446 			killCursors();
447 			return true;
448 		}
449 		if( m_readDoc < m_documents.length )
450 			return false;
451 		if( m_cursor == 0 )
452 			return true;
454 		auto conn = m_client.lockConnection();
455 		conn.getMore!DocType(m_cursor, m_database, m_collection, m_batchSize,
456 			&handleReply, &handleDocument, m_maxTime);
457 		return m_readDoc >= m_documents.length;
458 	}
460 	final long index()
461 	@safe {
462 		assert(m_totalReceived >= m_documents.length);
463 		return m_totalReceived - m_documents.length + m_readDoc;
464 	}
466 	final DocType front()
467 	@safe {
468 		if (!m_iterationStarted) startIterating();
469 		assert(!empty(), "Cursor has no more data.");
470 		return m_documents[m_readDoc];
471 	}
473 	final void sort(Bson order)
474 	@safe {
475 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
476 		m_findQuery["sort"] = order;
477 	}
479 	final void limit(long count)
480 	@safe {
481 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
482 		m_findQuery["limit"] = Bson(count);
483 	}
485 	final void skip(long count)
486 	@safe {
487 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
488 		m_findQuery["skip"] = Bson(count);
489 	}
491 	final void popFront()
492 	@safe {
493 		if (!m_iterationStarted) startIterating();
494 		assert(!empty(), "Cursor has no more data.");
495 		m_readDoc++;
496 	}
498 	private void startIterating()
499 	@safe {
500 		auto conn = m_client.lockConnection();
501 		m_totalReceived = 0;
502 		m_queryLimit = m_findQuery["limit"].opt!long(0);
503 		conn.startFind!DocType(m_findQuery, &handleReply, &handleDocument);
504 		m_iterationStarted = true;
505 	}
507 	final void killCursors()
508 	@safe {
509 		if (m_cursor == 0) return;
510 		auto conn = m_client.lockConnection();
511 		conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ());
512 		m_cursor = 0;
513 	}
515 	final void handleReply(long id, string ns, size_t count)
516 	{
517 		m_cursor = id;
518 		m_collection = ns;
519 		m_documents.length = count;
520 		m_readDoc = 0;
521 		m_insertDoc = 0;
522 	}
524 	final void handleDocument(ref DocType doc)
525 	{
526 		m_documents[m_insertDoc++] = doc;
527 		m_totalReceived++;
528 	}
530 	final ref int refCount() { return m_refCount; }
531 }
533 /**
534 	Internal class implementing MongoCursorData for find queries
535  */
536 private deprecated class MongoQueryCursor(Q, R, S) : LegacyMongoCursorData!R {
537 	private {
538 		QueryFlags m_flags;
539 		Q m_query;
540 		S m_returnFieldSelector;
541 	}
543 	this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
544 	{
545 		m_client = client;
546 		m_collection = collection;
547 		m_flags = flags;
548 		m_nskip = nskip;
549 		m_nret = nret;
550 		m_query = query;
551 		m_returnFieldSelector = return_field_selector;
552 	}
554 	override void startIterating()
555 	@safe {
556 		auto conn = m_client.lockConnection();
558 		ubyte[256] selector_buf = void;
559 		ubyte[256] query_buf = void;
561 		Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } ();
563 		Bson query;
564 		static if (is(Q == Bson)) {
565 			query = m_query;
566 		} else {
567 			query = () @trusted { return serializeToBson(m_query, query_buf); } ();
568 		}
570 		Bson full_query;
572 		if (!query["query"].isNull() || !query["$query"].isNull()) {
573 			// TODO: emit deprecation warning
574 			full_query = query;
575 		} else {
576 			full_query = Bson.emptyObject;
577 			full_query["$query"] = query;
578 		}
580 		if (!m_sort.isNull()) full_query["orderby"] = m_sort;
582 		conn.query!R(m_collection, m_flags, cast(int)m_nskip, cast(int)m_nret, full_query, selector, &handleReply, &handleDocument);
584 		m_iterationStarted = true;
585 	}
586 }
588 /**
589 	Internal class implementing MongoCursorData for already initialized generic cursors
590  */
591 private deprecated class MongoGenericCursor(DocType) : LegacyMongoCursorData!DocType {
592 	this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
593 	{
594 		m_client = client;
595 		m_collection = collection;
596 		m_cursor = cursor;
597 		m_iterationStarted = true;
598 		m_documents = existing_documents;
599 	}
601 	override void startIterating()
602 	@safe {
603 		assert(false, "Calling startIterating on an opaque already initialized cursor");
604 	}
605 }