1 /**
2 	MongoDB cursor abstraction
3 
4 	Copyright: © 2012-2014 RejectedSoftware e.K.
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.db.mongo.cursor;
9 
10 public import vibe.data.bson;
11 
12 import vibe.db.mongo.connection;
13 import vibe.db.mongo.client;
14 
15 import std.array : array;
16 import std.algorithm : map, max, min;
17 import std.exception;
18 
19 
20 /**
21 	Represents a cursor for a MongoDB query.
22 
23 	Use foreach( doc; cursor ) to iterate over the list of documents.
24 
25 	This struct uses reference counting to destroy the underlying MongoDB cursor.
26 */
27 struct MongoCursor(Q = Bson, R = Bson, S = Bson) {
28 	private MongoCursorData!(Q, R, S) m_data;
29 
30 	package this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
31 	{
32 		// TODO: avoid memory allocation, if possible
33 		m_data = new MongoCursorData!(Q, R, S)(client, collection, flags, nskip, nret, query, return_field_selector);
34 	}
35 
36 	this(this)
37 	{
38 		if( m_data ) m_data.m_refCount++;
39 	}
40 
41 	~this()
42 	{
43 		if( m_data && --m_data.m_refCount == 0 ){
44 			m_data.destroy();
45 		}
46 	}
47 
48 	/**
49 		Returns true if there are no more documents for this cursor.
50 
51 		Throws: An exception if there is a query or communication error.
52 	*/
53 	@property bool empty() { return m_data ? m_data.empty() : true; }
54 
55 	/**
56 		Returns the current document of the response.
57 
58 		Use empty and popFront to iterate over the list of documents using an
59 		input range interface. Note that calling this function is only allowed
60 		if empty returns false.
61 	*/
62 	@property R front() { return m_data.front; }
63 
64 	/**
65 		Controls the order in which the query returns matching documents.
66 
67 		This method must be called before starting to iterate, or an exeption
68 		will be thrown. If multiple calls to $(D sort()) are issued, only
69 		the last one will have an effect.
70 
71 		Params:
72 			order = A BSON object convertible value that defines the sort order
73 				of the result. This BSON object must be structured according to
74 				the MongoDB documentation (see below).
75 
76 		Returns: Reference to the modified original curser instance.
77 
78 		Throws:
79 			An exception if there is a query or communication error.
80 			Also throws if the method was called after beginning of iteration.
81 
82 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort)
83 	*/
84 	MongoCursor sort(T)(T order)
85 	{
86 		m_data.sort(() @trusted { return serializeToBson(order); } ());
87 		return this;
88 	}
89 
90 	///
91 	@safe unittest {
92 		import vibe.core.log;
93 		import vibe.db.mongo.mongo;
94 
95 		void test()
96 		@safe {
97 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
98 			auto coll = db["testcoll"];
99 
100 			// find all entries in reverse date order
101 			foreach (entry; coll.find().sort(["date": -1]))
102 				() @safe { logInfo("Entry: %s", entry); } ();
103 
104 			// the same, but using a struct to avoid memory allocations
105 			static struct Order { int date; }
106 			foreach (entry; coll.find().sort(Order(-1)))
107 				logInfo("Entry: %s", entry);
108 		}
109 	}
110 
111 	/**
112 		Limits the number of documents that the cursor returns.
113 
114 		This method must be called before beginnig iteration in order to have
115 		effect. If multiple calls to limit() are made, the one with the lowest
116 		limit will be chosen.
117 
118 		Params:
119 			count = The maximum number number of documents to return. A value
120 				of zero means unlimited.
121 
122 		Returns: the same cursor
123 
124 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit)
125 	*/
126 	MongoCursor limit(size_t count)
127 	{
128 		m_data.limit(count);
129 		return this;
130 	}
131 
132 	/**
133 		Skips a given number of elements at the beginning of the cursor.
134 
135 		This method must be called before beginnig iteration in order to have
136 		effect. If multiple calls to skip() are made, the one with the maximum
137 		number will be chosen.
138 
139 		Params:
140 			count = The number of documents to skip.
141 
142 		Returns: the same cursor
143 
144 		See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip)
145 	*/
146 	MongoCursor skip(int count)
147 	{
148 		m_data.skip(count);
149 		return this;
150 	}
151 
152 	@safe unittest {
153 		import vibe.core.log;
154 		import vibe.db.mongo.mongo;
155 
156 		void test()
157 		@safe {
158 			auto db = connectMongoDB("127.0.0.1").getDatabase("test");
159 			auto coll = db["testcoll"];
160 
161 			try { coll.drop(); } catch (Exception) {}
162 
163 			for (int i = 0; i < 10000; i++)
164 				coll.insert(["i": i]);
165 
166 			static struct Order { int i; }
167 			auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array;
168 
169 			assert(data.length == 2000);
170 			assert(data[0]["i"].get!int == 2000);
171 			assert(data[$ - 1]["i"].get!int == 3999);
172 		}
173 	}
174 
175 	/**
176 		Advances the cursor to the next document of the response.
177 
178 		Note that calling this function is only allowed if empty returns false.
179 	*/
180 	void popFront() { m_data.popFront(); }
181 
182 	/**
183 		Iterates over all remaining documents.
184 
185 		Note that iteration is one-way - elements that have already been visited
186 		will not be visited again if another iteration is done.
187 
188 		Throws: An exception if there is a query or communication error.
189 	*/
190 	auto byPair()
191 	{
192 		import std.typecons : Tuple, tuple;
193 		static struct Rng {
194 			private MongoCursorData!(Q, R, S) data;
195 			@property bool empty() { return data.empty; }
196 			@property Tuple!(size_t, R) front() { return tuple(data.index, data.front); }
197 			void popFront() { data.popFront(); }
198 		}
199 		return Rng(m_data);
200 	}
201 }
202 
203 
204 /**
205 	Internal class exposed through MongoCursor.
206 */
207 private class MongoCursorData(Q, R, S) {
208 	private {
209 		int m_refCount = 1;
210 		MongoClient m_client;
211 		string m_collection;
212 		long m_cursor;
213 		QueryFlags m_flags;
214 		int m_nskip;
215 		int m_nret;
216 		Q m_query;
217 		S m_returnFieldSelector;
218 		Bson m_sort = Bson(null);
219 		int m_offset;
220 		size_t m_currentDoc = 0;
221 		R[] m_documents;
222 		bool m_iterationStarted = false;
223 		size_t m_limit = 0;
224 		bool m_needDestroy = false;
225 	}
226 
227 	this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector)
228 	{
229 		m_client = client;
230 		m_collection = collection;
231 		m_flags = flags;
232 		m_nskip = nskip;
233 		m_nret = nret;
234 		m_query = query;
235 		m_returnFieldSelector = return_field_selector;
236 	}
237 
238 	@property bool empty()
239 	@safe {
240 		if (!m_iterationStarted) startIterating();
241 		if (m_limit > 0 && index >= m_limit) {
242 			destroy();
243 			return true;
244 		}
245 		if( m_currentDoc < m_documents.length )
246 			return false;
247 		if( m_cursor == 0 )
248 			return true;
249 
250 		auto conn = m_client.lockConnection();
251 		conn.getMore!R(m_collection, m_nret, m_cursor, &handleReply, &handleDocument);
252 		return m_currentDoc >= m_documents.length;
253 	}
254 
255 	@property size_t index()
256 	@safe {
257 		return m_offset + m_currentDoc;
258 	}
259 
260 	@property R front()
261 	@safe {
262 		if (!m_iterationStarted) startIterating();
263 		assert(!empty(), "Cursor has no more data.");
264 		return m_documents[m_currentDoc];
265 	}
266 
267 	void sort(Bson order)
268 	@safe {
269 		assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration");
270 		m_sort = order;
271 	}
272 
273 	void limit(size_t count)
274 	@safe {
275 		// A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit.
276 		if (count > 0) {
277 			if (m_nret == 0 || m_nret > count)
278 				m_nret = min(count, 1024);
279 
280 			if (m_limit == 0 || m_limit > count)
281 				m_limit = count;
282 		}
283 	}
284 
285 	void skip(int count)
286 	@safe {
287 		// A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip.
288 		m_nskip = max(m_nskip, count);
289 	}
290 
291 	void popFront()
292 	@safe {
293 		if (!m_iterationStarted) startIterating();
294 		assert(!empty(), "Cursor has no more data.");
295 		m_currentDoc++;
296 	}
297 
298 	private void startIterating()
299 	@safe {
300 		auto conn = m_client.lockConnection();
301 
302 		ubyte[256] selector_buf = void;
303 		ubyte[256] query_buf = void;
304 
305 		Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } ();
306 
307 		Bson query;
308 		static if (is(Q == Bson)) {
309 			query = m_query;
310 		} else {
311 			query = () @trusted { return serializeToBson(m_query, query_buf); } ();
312 		}
313 
314 		Bson full_query;
315 
316 		if (!query["query"].isNull() || !query["$query"].isNull()) {
317 			// TODO: emit deprecation warning
318 			full_query = query;
319 		} else {
320 			full_query = Bson.emptyObject;
321 			full_query["$query"] = query;
322 		}
323 
324 		if (!m_sort.isNull()) full_query["orderby"] = m_sort;
325 
326 		conn.query!R(m_collection, m_flags, m_nskip, m_nret, full_query, selector, &handleReply, &handleDocument);
327 
328 		m_iterationStarted = true;
329 	}
330 
331 	private void destroy()
332 	@safe {
333 		if (m_cursor == 0) return;
334 		auto conn = m_client.lockConnection();
335 		conn.killCursors(() @trusted { return (&m_cursor)[0 .. 1]; } ());
336 		m_cursor = 0;
337 	}
338 
339 	private void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs)
340 	{
341 		// FIXME: use MongoDB exceptions
342 		enforce(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle.");
343 		enforce(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?");
344 
345 		m_cursor = cursor;
346 		m_offset = first_doc;
347 		m_documents.length = num_docs;
348 		m_currentDoc = 0;
349 	}
350 
351 	private void handleDocument(size_t idx, ref R doc)
352 	{
353 		m_documents[idx] = doc;
354 	}
355 }