1 /**
2 	MongoDB cursor abstraction
3 
4 	Copyright: © 2012-2014 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.cursor;
9 
10 public import vibe.data.bson;
11 
12 import vibe.db.mongo.connection;
13 import vibe.db.mongo.client;
14 
15 import std.array : array;
16 import std.algorithm : map, max, min;
17 import std.exception;
18 
19 deprecated alias MongoCursor(Q, R = Bson, S = Bson) = MongoCursor!R;
20 
21 /**
22 	Represents a cursor for a MongoDB query.
23 
24 	Use foreach( doc; cursor ) to iterate over the list of documents.
25 
26 	This struct uses reference counting to destroy the underlying MongoDB cursor.
27 */
28 struct MongoCursor(DocType = Bson) {
29 	private MongoCursorData!DocType m_data;
30 
31 	package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
32 	{
33 		// TODO: avoid memory allocation, if possible
34 		m_data = new MongoFindCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector);
35 	}
36 
37 	package this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
38 	{
39 		// TODO: avoid memory allocation, if possible
40 		m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents);
41 	}
42 
43 	this(this)
44 	{
45 		if( m_data ) m_data.m_refCount++;
46 	}
47 
48 	~this()
49 	{
50 		if( m_data && --m_data.m_refCount == 0 ){
51 			m_data.destroy();
52 		}
53 	}
54 
55 	/**
56 		Returns true if there are no more documents for this cursor.
57 
58 		Throws: An exception if there is a query or communication error.
59 	*/
60 	@property bool empty() { return m_data ? m_data.empty() : true; }
61 
62 	/**
63 		Returns the current document of the response.
64 
65 		Use empty and popFront to iterate over the list of documents using an
66 		input range interface. Note that calling this function is only allowed
67 		if empty returns false.
68 	*/
69 	@property DocType front() { return m_data.front; }
70 
71 	/**
72 		Controls the order in which the query returns matching documents.
73 
74 		This method must be called before starting to iterate, or an exception
75 		will be thrown. If multiple calls to $(D sort()) are issued, only
76 		the last one will have an effect.
77 
78 		Params:
79 			order = A BSON object convertible value that defines the sort order
80 				of the result. This BSON object must be structured according to
81 				the MongoDB documentation (see below).
82 
83 		Returns: Reference to the modified original cursor instance.
84 
85 		Throws:
86 			An exception if there is a query or communication error.
87 			Also throws if the method was called after beginning of iteration.
88 
89 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort)
90 	*/
91 	MongoCursor sort(T)(T order)
92 	{
93 		m_data.sort(() @trusted { return serializeToBson(order); } ());
94 		return this;
95 	}
96 
97 	///
98 	@safe unittest {
99 		import vibe.core.log;
100 		import vibe.db.mongo.mongo;
101 
102 		void test()
103 		@safe {
104 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
105 			auto coll = db["testcoll"];
106 
107 			// find all entries in reverse date order
108 			foreach (entry; coll.find().sort(["date": -1]))
109 				() @safe { logInfo("Entry: %s", entry); } ();
110 
111 			// the same, but using a struct to avoid memory allocations
112 			static struct Order { int date; }
113 			foreach (entry; coll.find().sort(Order(-1)))
114 				logInfo("Entry: %s", entry);
115 		}
116 	}
117 
118 	/**
119 		Limits the number of documents that the cursor returns.
120 
121 		This method must be called before beginning iteration in order to have
122 		effect. If multiple calls to limit() are made, the one with the lowest
123 		limit will be chosen.
124 
125 		Params:
126 			count = The maximum number number of documents to return. A value
127 				of zero means unlimited.
128 
129 		Returns: the same cursor
130 
131 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit)
132 	*/
133 	MongoCursor limit(size_t count)
134 	{
135 		m_data.limit(count);
136 		return this;
137 	}
138 
139 	/**
140 		Skips a given number of elements at the beginning of the cursor.
141 
142 		This method must be called before beginning iteration in order to have
143 		effect. If multiple calls to skip() are made, the one with the maximum
144 		number will be chosen.
145 
146 		Params:
147 			count = The number of documents to skip.
148 
149 		Returns: the same cursor
150 
151 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip)
152 	*/
153 	MongoCursor skip(int count)
154 	{
155 		m_data.skip(count);
156 		return this;
157 	}
158 
159 	@safe unittest {
160 		import vibe.core.log;
161 		import vibe.db.mongo.mongo;
162 
163 		void test()
164 		@safe {
165 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
166 			auto coll = db["testcoll"];
167 
168 			try { coll.drop(); } catch (Exception) {}
169 
170 			for (int i = 0; i < 10000; i++)
171 				coll.insert(["i": i]);
172 
173 			static struct Order { int i; }
174 			auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array;
175 
176 			assert(data.length == 2000);
177 			assert(data[0]["i"].get!int == 2000);
178 			assert(data[$ - 1]["i"].get!int == 3999);
179 		}
180 	}
181 
182 	/**
183 		Advances the cursor to the next document of the response.
184 
185 		Note that calling this function is only allowed if empty returns false.
186 	*/
187 	void popFront() { m_data.popFront(); }
188 
189 	/**
190 		Iterates over all remaining documents.
191 
192 		Note that iteration is one-way - elements that have already been visited
193 		will not be visited again if another iteration is done.
194 
195 		Throws: An exception if there is a query or communication error.
196 	*/
197 	auto byPair()
198 	{
199 		import std.typecons : Tuple, tuple;
200 		static struct Rng {
201 			private MongoCursorData!DocType data;
202 			@property bool empty() { return data.empty; }
203 			@property Tuple!(size_t, DocType) front() { return tuple(data.index, data.front); }
204 			void popFront() { data.popFront(); }
205 		}
206 		return Rng(m_data);
207 	}
208 }
209 
210 
211 /**
212 	Internal class exposed through MongoCursor.
213 */
214 private abstract class MongoCursorData(DocType) {
215 	private {
216 		int m_refCount = 1;
217 		MongoClient m_client;
218 		string m_collection;
219 		long m_cursor;
220 		int m_nskip;
221 		int m_nret;
222 		Bson m_sort = Bson(null);
223 		int m_offset;
224 		size_t m_currentDoc = 0;
225 		DocType[] m_documents;
226 		bool m_iterationStarted = false;
227 		size_t m_limit = 0;
228 		bool m_needDestroy = false;
229 	}
230 
231 	final @property bool empty()
232 	@safe {
233 		if (!m_iterationStarted) startIterating();
234 		if (m_limit > 0 && index >= m_limit) {
235 			destroy();
236 			return true;
237 		}
238 		if( m_currentDoc < m_documents.length )
239 			return false;
240 		if( m_cursor == 0 )
241 			return true;
242 
243 		auto conn = m_client.lockConnection();
244 		conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument);
245 		return m_currentDoc >= m_documents.length;
246 	}
247 
248 	final @property size_t index()
249 	@safe {
250 		return m_offset + m_currentDoc;
251 	}
252 
253 	final @property DocType front()
254 	@safe {
255 		if (!m_iterationStarted) startIterating();
256 		assert(!empty(), "Cursor has no more data.");
257 		return m_documents[m_currentDoc];
258 	}
259 
260 	final void sort(Bson order)
261 	@safe {
262 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
263 		m_sort = order;
264 	}
265 
266 	final void limit(size_t count)
267 	@safe {
268 		// A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit.
269 		if (count > 0) {
270 			if (m_nret == 0 || m_nret > count)
271 				m_nret = min(count, 1024);
272 
273 			if (m_limit == 0 || m_limit > count)
274 				m_limit = count;
275 		}
276 	}
277 
278 	final void skip(int count)
279 	@safe {
280 		// A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip.
281 		m_nskip = max(m_nskip, count);
282 	}
283 
284 	final void popFront()
285 	@safe {
286 		if (!m_iterationStarted) startIterating();
287 		assert(!empty(), "Cursor has no more data.");
288 		m_currentDoc++;
289 	}
290 
291 	abstract void startIterating() @safe;
292 
293 	final private void destroy()
294 	@safe {
295 		if (m_cursor == 0) return;
296 		auto conn = m_client.lockConnection();
297 		conn.killCursors(() @trusted { return (&m_cursor)[0 .. 1]; } ());
298 		m_cursor = 0;
299 	}
300 
301 	final private void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs)
302 	{
303 		enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle.");
304 		enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?");
305 
306 		m_cursor = cursor;
307 		m_offset = first_doc;
308 		m_documents.length = num_docs;
309 		m_currentDoc = 0;
310 	}
311 
312 	final private void handleDocument(size_t idx, ref DocType doc)
313 	{
314 		m_documents[idx] = doc;
315 	}
316 }
317 
318 /**
319 	Internal class implementing MongoCursorData for find queries
320  */
321 private class MongoFindCursor(Q, R, S) : MongoCursorData!R {
322 	private {
323 		QueryFlags m_flags;
324 		Q m_query;
325 		S m_returnFieldSelector;
326 	}
327 
328 	this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
329 	{
330 		m_client = client;
331 		m_collection = collection;
332 		m_flags = flags;
333 		m_nskip = nskip;
334 		m_nret = nret;
335 		m_query = query;
336 		m_returnFieldSelector = return_field_selector;
337 	}
338 
339 	override void startIterating()
340 	@safe {
341 		auto conn = m_client.lockConnection();
342 
343 		ubyte[256] selector_buf = void;
344 		ubyte[256] query_buf = void;
345 
346 		Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } ();
347 
348 		Bson query;
349 		static if (is(Q == Bson)) {
350 			query = m_query;
351 		} else {
352 			query = () @trusted { return serializeToBson(m_query, query_buf); } ();
353 		}
354 
355 		Bson full_query;
356 
357 		if (!query["query"].isNull() || !query["$query"].isNull()) {
358 			// TODO: emit deprecation warning
359 			full_query = query;
360 		} else {
361 			full_query = Bson.emptyObject;
362 			full_query["$query"] = query;
363 		}
364 
365 		if (!m_sort.isNull()) full_query["orderby"] = m_sort;
366 
367 		conn.query!R(m_collection, m_flags, m_nskip, m_nret, full_query, selector, &handleReply, &handleDocument);
368 
369 		m_iterationStarted = true;
370 	}
371 }
372 
373 /**
374 	Internal class implementing MongoCursorData for already initialized generic cursors
375  */
376 private class MongoGenericCursor(DocType) : MongoCursorData!DocType {
377 	this(MongoClient client, string collection, long cursor, DocType[] existing_documents)
378 	{
379 		m_client = client;
380 		m_collection = collection;
381 		m_cursor = cursor;
382 		m_iterationStarted = true;
383 		m_documents = existing_documents;
384 	}
385 
386 	override void startIterating()
387 	@safe {
388 		assert(false, "Calling startIterating on an opaque already initialized cursor");
389 	}
390 }