1 /** 2 MongoDB cursor abstraction 3 4 Copyright: © 2012-2014 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.cursor; 9 10 public import vibe.data.bson; 11 12 import vibe.db.mongo.connection; 13 import vibe.db.mongo.client; 14 15 import std.array : array; 16 import std.algorithm : map, max, min; 17 import std.exception; 18 19 deprecated alias MongoCursor(Q, R = Bson, S = Bson) = MongoCursor!R; 20 21 /** 22 Represents a cursor for a MongoDB query. 23 24 Use foreach( doc; cursor ) to iterate over the list of documents. 25 26 This struct uses reference counting to destroy the underlying MongoDB cursor. 27 */ 28 struct MongoCursor(DocType = Bson) { 29 private MongoCursorData!DocType m_data; 30 31 package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 32 { 33 // TODO: avoid memory allocation, if possible 34 m_data = new MongoFindCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector); 35 } 36 37 package this(MongoClient client, string collection, long cursor, DocType[] existing_documents) 38 { 39 // TODO: avoid memory allocation, if possible 40 m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents); 41 } 42 43 this(this) 44 { 45 if( m_data ) m_data.m_refCount++; 46 } 47 48 ~this() 49 { 50 if( m_data && --m_data.m_refCount == 0 ){ 51 m_data.destroy(); 52 } 53 } 54 55 /** 56 Returns true if there are no more documents for this cursor. 57 58 Throws: An exception if there is a query or communication error. 59 */ 60 @property bool empty() { return m_data ? m_data.empty() : true; } 61 62 /** 63 Returns the current document of the response. 64 65 Use empty and popFront to iterate over the list of documents using an 66 input range interface. Note that calling this function is only allowed 67 if empty returns false. 68 */ 69 @property DocType front() { return m_data.front; } 70 71 /** 72 Controls the order in which the query returns matching documents. 73 74 This method must be called before starting to iterate, or an exception 75 will be thrown. If multiple calls to $(D sort()) are issued, only 76 the last one will have an effect. 77 78 Params: 79 order = A BSON object convertible value that defines the sort order 80 of the result. This BSON object must be structured according to 81 the MongoDB documentation (see below). 82 83 Returns: Reference to the modified original cursor instance. 84 85 Throws: 86 An exception if there is a query or communication error. 87 Also throws if the method was called after beginning of iteration. 88 89 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort) 90 */ 91 MongoCursor sort(T)(T order) 92 { 93 m_data.sort(() @trusted { return serializeToBson(order); } ()); 94 return this; 95 } 96 97 /// 98 @safe unittest { 99 import vibe.core.log; 100 import vibe.db.mongo.mongo; 101 102 void test() 103 @safe { 104 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 105 auto coll = db["testcoll"]; 106 107 // find all entries in reverse date order 108 foreach (entry; coll.find().sort(["date": -1])) 109 () @safe { logInfo("Entry: %s", entry); } (); 110 111 // the same, but using a struct to avoid memory allocations 112 static struct Order { int date; } 113 foreach (entry; coll.find().sort(Order(-1))) 114 logInfo("Entry: %s", entry); 115 } 116 } 117 118 /** 119 Limits the number of documents that the cursor returns. 120 121 This method must be called before beginning iteration in order to have 122 effect. If multiple calls to limit() are made, the one with the lowest 123 limit will be chosen. 124 125 Params: 126 count = The maximum number number of documents to return. A value 127 of zero means unlimited. 128 129 Returns: the same cursor 130 131 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit) 132 */ 133 MongoCursor limit(size_t count) 134 { 135 m_data.limit(count); 136 return this; 137 } 138 139 /** 140 Skips a given number of elements at the beginning of the cursor. 141 142 This method must be called before beginning iteration in order to have 143 effect. If multiple calls to skip() are made, the one with the maximum 144 number will be chosen. 145 146 Params: 147 count = The number of documents to skip. 148 149 Returns: the same cursor 150 151 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip) 152 */ 153 MongoCursor skip(int count) 154 { 155 m_data.skip(count); 156 return this; 157 } 158 159 @safe unittest { 160 import vibe.core.log; 161 import vibe.db.mongo.mongo; 162 163 void test() 164 @safe { 165 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 166 auto coll = db["testcoll"]; 167 168 try { coll.drop(); } catch (Exception) {} 169 170 for (int i = 0; i < 10000; i++) 171 coll.insert(["i": i]); 172 173 static struct Order { int i; } 174 auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array; 175 176 assert(data.length == 2000); 177 assert(data[0]["i"].get!int == 2000); 178 assert(data[$ - 1]["i"].get!int == 3999); 179 } 180 } 181 182 /** 183 Advances the cursor to the next document of the response. 184 185 Note that calling this function is only allowed if empty returns false. 186 */ 187 void popFront() { m_data.popFront(); } 188 189 /** 190 Iterates over all remaining documents. 191 192 Note that iteration is one-way - elements that have already been visited 193 will not be visited again if another iteration is done. 194 195 Throws: An exception if there is a query or communication error. 196 */ 197 auto byPair() 198 { 199 import std.typecons : Tuple, tuple; 200 static struct Rng { 201 private MongoCursorData!DocType data; 202 @property bool empty() { return data.empty; } 203 @property Tuple!(size_t, DocType) front() { return tuple(data.index, data.front); } 204 void popFront() { data.popFront(); } 205 } 206 return Rng(m_data); 207 } 208 } 209 210 211 /** 212 Internal class exposed through MongoCursor. 213 */ 214 private abstract class MongoCursorData(DocType) { 215 private { 216 int m_refCount = 1; 217 MongoClient m_client; 218 string m_collection; 219 long m_cursor; 220 int m_nskip; 221 int m_nret; 222 Bson m_sort = Bson(null); 223 int m_offset; 224 size_t m_currentDoc = 0; 225 DocType[] m_documents; 226 bool m_iterationStarted = false; 227 size_t m_limit = 0; 228 bool m_needDestroy = false; 229 } 230 231 final @property bool empty() 232 @safe { 233 if (!m_iterationStarted) startIterating(); 234 if (m_limit > 0 && index >= m_limit) { 235 destroy(); 236 return true; 237 } 238 if( m_currentDoc < m_documents.length ) 239 return false; 240 if( m_cursor == 0 ) 241 return true; 242 243 auto conn = m_client.lockConnection(); 244 conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument); 245 return m_currentDoc >= m_documents.length; 246 } 247 248 final @property size_t index() 249 @safe { 250 return m_offset + m_currentDoc; 251 } 252 253 final @property DocType front() 254 @safe { 255 if (!m_iterationStarted) startIterating(); 256 assert(!empty(), "Cursor has no more data."); 257 return m_documents[m_currentDoc]; 258 } 259 260 final void sort(Bson order) 261 @safe { 262 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 263 m_sort = order; 264 } 265 266 final void limit(size_t count) 267 @safe { 268 // A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit. 269 if (count > 0) { 270 if (m_nret == 0 || m_nret > count) 271 m_nret = min(count, 1024); 272 273 if (m_limit == 0 || m_limit > count) 274 m_limit = count; 275 } 276 } 277 278 final void skip(int count) 279 @safe { 280 // A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip. 281 m_nskip = max(m_nskip, count); 282 } 283 284 final void popFront() 285 @safe { 286 if (!m_iterationStarted) startIterating(); 287 assert(!empty(), "Cursor has no more data."); 288 m_currentDoc++; 289 } 290 291 abstract void startIterating() @safe; 292 293 final private void destroy() 294 @safe { 295 if (m_cursor == 0) return; 296 auto conn = m_client.lockConnection(); 297 conn.killCursors(() @trusted { return (&m_cursor)[0 .. 1]; } ()); 298 m_cursor = 0; 299 } 300 301 final private void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs) 302 { 303 enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle."); 304 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?"); 305 306 m_cursor = cursor; 307 m_offset = first_doc; 308 m_documents.length = num_docs; 309 m_currentDoc = 0; 310 } 311 312 final private void handleDocument(size_t idx, ref DocType doc) 313 { 314 m_documents[idx] = doc; 315 } 316 } 317 318 /** 319 Internal class implementing MongoCursorData for find queries 320 */ 321 private class MongoFindCursor(Q, R, S) : MongoCursorData!R { 322 private { 323 QueryFlags m_flags; 324 Q m_query; 325 S m_returnFieldSelector; 326 } 327 328 this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 329 { 330 m_client = client; 331 m_collection = collection; 332 m_flags = flags; 333 m_nskip = nskip; 334 m_nret = nret; 335 m_query = query; 336 m_returnFieldSelector = return_field_selector; 337 } 338 339 override void startIterating() 340 @safe { 341 auto conn = m_client.lockConnection(); 342 343 ubyte[256] selector_buf = void; 344 ubyte[256] query_buf = void; 345 346 Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } (); 347 348 Bson query; 349 static if (is(Q == Bson)) { 350 query = m_query; 351 } else { 352 query = () @trusted { return serializeToBson(m_query, query_buf); } (); 353 } 354 355 Bson full_query; 356 357 if (!query["query"].isNull() || !query["$query"].isNull()) { 358 // TODO: emit deprecation warning 359 full_query = query; 360 } else { 361 full_query = Bson.emptyObject; 362 full_query["$query"] = query; 363 } 364 365 if (!m_sort.isNull()) full_query["orderby"] = m_sort; 366 367 conn.query!R(m_collection, m_flags, m_nskip, m_nret, full_query, selector, &handleReply, &handleDocument); 368 369 m_iterationStarted = true; 370 } 371 } 372 373 /** 374 Internal class implementing MongoCursorData for already initialized generic cursors 375 */ 376 private class MongoGenericCursor(DocType) : MongoCursorData!DocType { 377 this(MongoClient client, string collection, long cursor, DocType[] existing_documents) 378 { 379 m_client = client; 380 m_collection = collection; 381 m_cursor = cursor; 382 m_iterationStarted = true; 383 m_documents = existing_documents; 384 } 385 386 override void startIterating() 387 @safe { 388 assert(false, "Calling startIterating on an opaque already initialized cursor"); 389 } 390 }