1 /** 2 MongoDB cursor abstraction 3 4 Copyright: © 2012-2014 RejectedSoftware e.K. 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.cursor; 9 10 public import vibe.data.bson; 11 12 import vibe.db.mongo.connection; 13 import vibe.db.mongo.client; 14 15 import std.array : array; 16 import std.algorithm : map, max, min; 17 import std.exception; 18 19 20 /** 21 Represents a cursor for a MongoDB query. 22 23 Use foreach( doc; cursor ) to iterate over the list of documents. 24 25 This struct uses reference counting to destroy the underlying MongoDB cursor. 26 */ 27 struct MongoCursor(Q = Bson, R = Bson, S = Bson) { 28 private MongoCursorData!(Q, R, S) m_data; 29 30 package this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 31 { 32 // TODO: avoid memory allocation, if possible 33 m_data = new MongoCursorData!(Q, R, S)(client, collection, flags, nskip, nret, query, return_field_selector); 34 } 35 36 this(this) 37 { 38 if( m_data ) m_data.m_refCount++; 39 } 40 41 ~this() 42 { 43 if( m_data && --m_data.m_refCount == 0 ){ 44 m_data.destroy(); 45 } 46 } 47 48 /** 49 Returns true if there are no more documents for this cursor. 50 51 Throws: An exception if there is a query or communication error. 52 */ 53 @property bool empty() { return m_data ? m_data.empty() : true; } 54 55 /** 56 Returns the current document of the response. 57 58 Use empty and popFront to iterate over the list of documents using an 59 input range interface. Note that calling this function is only allowed 60 if empty returns false. 61 */ 62 @property R front() { return m_data.front; } 63 64 /** 65 Controls the order in which the query returns matching documents. 66 67 This method must be called before starting to iterate, or an exeption 68 will be thrown. If multiple calls to $(D sort()) are issued, only 69 the last one will have an effect. 70 71 Params: 72 order = A BSON object convertible value that defines the sort order 73 of the result. This BSON object must be structured according to 74 the MongoDB documentation (see below). 75 76 Returns: Reference to the modified original curser instance. 77 78 Throws: 79 An exception if there is a query or communication error. 80 Also throws if the method was called after beginning of iteration. 81 82 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort) 83 */ 84 MongoCursor sort(T)(T order) 85 { 86 m_data.sort(() @trusted { return serializeToBson(order); } ()); 87 return this; 88 } 89 90 /// 91 @safe unittest { 92 import vibe.core.log; 93 import vibe.db.mongo.mongo; 94 95 void test() 96 @safe { 97 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 98 auto coll = db["testcoll"]; 99 100 // find all entries in reverse date order 101 foreach (entry; coll.find().sort(["date": -1])) 102 () @safe { logInfo("Entry: %s", entry); } (); 103 104 // the same, but using a struct to avoid memory allocations 105 static struct Order { int date; } 106 foreach (entry; coll.find().sort(Order(-1))) 107 logInfo("Entry: %s", entry); 108 } 109 } 110 111 /** 112 Limits the number of documents that the cursor returns. 113 114 This method must be called before beginnig iteration in order to have 115 effect. If multiple calls to limit() are made, the one with the lowest 116 limit will be chosen. 117 118 Params: 119 count = The maximum number number of documents to return. A value 120 of zero means unlimited. 121 122 Returns: the same cursor 123 124 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit) 125 */ 126 MongoCursor limit(size_t count) 127 { 128 m_data.limit(count); 129 return this; 130 } 131 132 /** 133 Skips a given number of elements at the beginning of the cursor. 134 135 This method must be called before beginnig iteration in order to have 136 effect. If multiple calls to skip() are made, the one with the maximum 137 number will be chosen. 138 139 Params: 140 count = The number of documents to skip. 141 142 Returns: the same cursor 143 144 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip) 145 */ 146 MongoCursor skip(int count) 147 { 148 m_data.skip(count); 149 return this; 150 } 151 152 @safe unittest { 153 import vibe.core.log; 154 import vibe.db.mongo.mongo; 155 156 void test() 157 @safe { 158 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 159 auto coll = db["testcoll"]; 160 161 try { coll.drop(); } catch (Exception) {} 162 163 for (int i = 0; i < 10000; i++) 164 coll.insert(["i": i]); 165 166 static struct Order { int i; } 167 auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array; 168 169 assert(data.length == 2000); 170 assert(data[0]["i"].get!int == 2000); 171 assert(data[$ - 1]["i"].get!int == 3999); 172 } 173 } 174 175 /** 176 Advances the cursor to the next document of the response. 177 178 Note that calling this function is only allowed if empty returns false. 179 */ 180 void popFront() { m_data.popFront(); } 181 182 /** 183 Iterates over all remaining documents. 184 185 Note that iteration is one-way - elements that have already been visited 186 will not be visited again if another iteration is done. 187 188 Throws: An exception if there is a query or communication error. 189 */ 190 auto byPair() 191 { 192 import std.typecons : Tuple, tuple; 193 static struct Rng { 194 private MongoCursorData!(Q, R, S) data; 195 @property bool empty() { return data.empty; } 196 @property Tuple!(size_t, R) front() { return tuple(data.index, data.front); } 197 void popFront() { data.popFront(); } 198 } 199 return Rng(m_data); 200 } 201 } 202 203 204 /** 205 Internal class exposed through MongoCursor. 206 */ 207 private class MongoCursorData(Q, R, S) { 208 private { 209 int m_refCount = 1; 210 MongoClient m_client; 211 string m_collection; 212 long m_cursor; 213 QueryFlags m_flags; 214 int m_nskip; 215 int m_nret; 216 Q m_query; 217 S m_returnFieldSelector; 218 Bson m_sort = Bson(null); 219 int m_offset; 220 size_t m_currentDoc = 0; 221 R[] m_documents; 222 bool m_iterationStarted = false; 223 size_t m_limit = 0; 224 bool m_needDestroy = false; 225 } 226 227 this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 228 { 229 m_client = client; 230 m_collection = collection; 231 m_flags = flags; 232 m_nskip = nskip; 233 m_nret = nret; 234 m_query = query; 235 m_returnFieldSelector = return_field_selector; 236 } 237 238 @property bool empty() 239 @safe { 240 if (!m_iterationStarted) startIterating(); 241 if (m_limit > 0 && index >= m_limit) { 242 destroy(); 243 return true; 244 } 245 if( m_currentDoc < m_documents.length ) 246 return false; 247 if( m_cursor == 0 ) 248 return true; 249 250 auto conn = m_client.lockConnection(); 251 conn.getMore!R(m_collection, m_nret, m_cursor, &handleReply, &handleDocument); 252 return m_currentDoc >= m_documents.length; 253 } 254 255 @property size_t index() 256 @safe { 257 return m_offset + m_currentDoc; 258 } 259 260 @property R front() 261 @safe { 262 if (!m_iterationStarted) startIterating(); 263 assert(!empty(), "Cursor has no more data."); 264 return m_documents[m_currentDoc]; 265 } 266 267 void sort(Bson order) 268 @safe { 269 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 270 m_sort = order; 271 } 272 273 void limit(size_t count) 274 @safe { 275 // A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit. 276 if (count > 0) { 277 if (m_nret == 0 || m_nret > count) 278 m_nret = min(count, 1024); 279 280 if (m_limit == 0 || m_limit > count) 281 m_limit = count; 282 } 283 } 284 285 void skip(int count) 286 @safe { 287 // A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip. 288 m_nskip = max(m_nskip, count); 289 } 290 291 void popFront() 292 @safe { 293 if (!m_iterationStarted) startIterating(); 294 assert(!empty(), "Cursor has no more data."); 295 m_currentDoc++; 296 } 297 298 private void startIterating() 299 @safe { 300 auto conn = m_client.lockConnection(); 301 302 ubyte[256] selector_buf = void; 303 ubyte[256] query_buf = void; 304 305 Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } (); 306 307 Bson query; 308 static if (is(Q == Bson)) { 309 query = m_query; 310 } else { 311 query = () @trusted { return serializeToBson(m_query, query_buf); } (); 312 } 313 314 Bson full_query; 315 316 if (!query["query"].isNull() || !query["$query"].isNull()) { 317 // TODO: emit deprecation warning 318 full_query = query; 319 } else { 320 full_query = Bson.emptyObject; 321 full_query["$query"] = query; 322 } 323 324 if (!m_sort.isNull()) full_query["orderby"] = m_sort; 325 326 conn.query!R(m_collection, m_flags, m_nskip, m_nret, full_query, selector, &handleReply, &handleDocument); 327 328 m_iterationStarted = true; 329 } 330 331 private void destroy() 332 @safe { 333 if (m_cursor == 0) return; 334 auto conn = m_client.lockConnection(); 335 conn.killCursors(() @trusted { return (&m_cursor)[0 .. 1]; } ()); 336 m_cursor = 0; 337 } 338 339 private void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs) 340 { 341 // FIXME: use MongoDB exceptions 342 enforce(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle."); 343 enforce(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?"); 344 345 m_cursor = cursor; 346 m_offset = first_doc; 347 m_documents.length = num_docs; 348 m_currentDoc = 0; 349 } 350 351 private void handleDocument(size_t idx, ref R doc) 352 { 353 m_documents[idx] = doc; 354 } 355 }