1 /** 2 MongoDB cursor abstraction 3 4 Copyright: © 2012-2014 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.cursor; 9 10 public import vibe.data.bson; 11 public import vibe.db.mongo.impl.crud; 12 13 import vibe.db.mongo.connection; 14 import vibe.db.mongo.client; 15 16 import std.array : array; 17 import std.algorithm : map, max, min; 18 import std.exception; 19 20 import core.time; 21 22 /** 23 Represents a cursor for a MongoDB query. 24 25 Use foreach( doc; cursor ) to iterate over the list of documents. 26 27 This struct uses reference counting to destroy the underlying MongoDB cursor. 28 */ 29 struct MongoCursor(DocType = Bson) { 30 private IMongoCursorData!DocType m_data; 31 32 deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported") 33 package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 34 { 35 // TODO: avoid memory allocation, if possible 36 m_data = new MongoQueryCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector); 37 } 38 39 deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported") 40 package this(MongoClient client, string collection, long cursor, DocType[] existing_documents) 41 { 42 // TODO: avoid memory allocation, if possible 43 m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents); 44 } 45 46 this(Q)(MongoClient client, string database, string collection, Q query, FindOptions options) 47 { 48 Bson command = Bson.emptyObject; 49 command["find"] = Bson(collection); 50 command["$db"] = Bson(database); 51 static if (is(Q == Bson)) 52 command["filter"] = query; 53 else 54 command["filter"] = serializeToBson(query); 55 56 MongoConnection conn = client.lockConnection(); 57 enforceWireVersionConstraints(options, conn.description.maxWireVersion); 58 59 // https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#mapping-op_query-behavior-to-the-find-command-limit-and-batchsize-fields 60 bool singleBatch; 61 if (!options.limit.isNull && options.limit.get < 0) 62 { 63 singleBatch = true; 64 options.limit = -options.limit.get; 65 options.batchSize = cast(int)options.limit.get; 66 } 67 if (!options.batchSize.isNull && options.batchSize.get < 0) 68 { 69 singleBatch = true; 70 options.batchSize = -options.batchSize.get; 71 } 72 if (singleBatch) 73 command["singleBatch"] = Bson(true); 74 75 // https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#semantics-of-maxtimems-for-a-driver 76 bool allowMaxTime = true; 77 if (options.cursorType == CursorType.tailable 78 || options.cursorType == CursorType.tailableAwait) 79 command["tailable"] = Bson(true); 80 else 81 { 82 options.maxAwaitTimeMS.nullify(); 83 allowMaxTime = false; 84 } 85 86 if (options.cursorType == CursorType.tailableAwait) 87 command["awaitData"] = Bson(true); 88 else 89 { 90 options.maxAwaitTimeMS.nullify(); 91 allowMaxTime = false; 92 } 93 94 // see table: https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#find 95 auto optionsBson = serializeToBson(options); 96 foreach (string key, value; optionsBson.byKeyValue) 97 command[key] = value; 98 99 this(client, command, 100 options.batchSize.isNull ? 0 : options.batchSize.get, 101 !options.maxAwaitTimeMS.isNull ? options.maxAwaitTimeMS.get.msecs 102 : allowMaxTime && !options.maxTimeMS.isNull ? options.maxTimeMS.get.msecs 103 : Duration.max); 104 } 105 106 this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max) 107 { 108 // TODO: avoid memory allocation, if possible 109 m_data = new MongoFindCursor!DocType(client, command, batchSize, getMoreMaxTime); 110 } 111 112 this(this) 113 { 114 if( m_data ) m_data.refCount++; 115 } 116 117 ~this() 118 { 119 if( m_data && --m_data.refCount == 0 ){ 120 m_data.killCursors(); 121 } 122 } 123 124 /** 125 Returns true if there are no more documents for this cursor. 126 127 Throws: An exception if there is a query or communication error. 128 */ 129 @property bool empty() { return m_data ? m_data.empty() : true; } 130 131 /** 132 Returns the current document of the response. 133 134 Use empty and popFront to iterate over the list of documents using an 135 input range interface. Note that calling this function is only allowed 136 if empty returns false. 137 */ 138 @property DocType front() { return m_data.front; } 139 140 /** 141 Controls the order in which the query returns matching documents. 142 143 This method must be called before starting to iterate, or an exception 144 will be thrown. If multiple calls to $(D sort()) are issued, only 145 the last one will have an effect. 146 147 Params: 148 order = A BSON object convertible value that defines the sort order 149 of the result. This BSON object must be structured according to 150 the MongoDB documentation (see below). 151 152 Returns: Reference to the modified original cursor instance. 153 154 Throws: 155 An exception if there is a query or communication error. 156 Also throws if the method was called after beginning of iteration. 157 158 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort) 159 */ 160 MongoCursor sort(T)(T order) 161 { 162 m_data.sort(() @trusted { return serializeToBson(order); } ()); 163 return this; 164 } 165 166 /// 167 @safe unittest { 168 import vibe.core.log; 169 import vibe.db.mongo.mongo; 170 171 void test() 172 @safe { 173 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 174 auto coll = db["testcoll"]; 175 176 // find all entries in reverse date order 177 foreach (entry; coll.find().sort(["date": -1])) 178 () @safe { logInfo("Entry: %s", entry); } (); 179 180 // the same, but using a struct to avoid memory allocations 181 static struct Order { int date; } 182 foreach (entry; coll.find().sort(Order(-1))) 183 logInfo("Entry: %s", entry); 184 } 185 } 186 187 /** 188 Limits the number of documents that the cursor returns. 189 190 This method must be called before beginning iteration in order to have 191 effect. If multiple calls to limit() are made, the one with the lowest 192 limit will be chosen. 193 194 Params: 195 count = The maximum number number of documents to return. A value 196 of zero means unlimited. 197 198 Returns: the same cursor 199 200 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit) 201 */ 202 MongoCursor limit(long count) 203 { 204 m_data.limit(count); 205 return this; 206 } 207 208 /** 209 Skips a given number of elements at the beginning of the cursor. 210 211 This method must be called before beginning iteration in order to have 212 effect. If multiple calls to skip() are made, the one with the maximum 213 number will be chosen. 214 215 Params: 216 count = The number of documents to skip. 217 218 Returns: the same cursor 219 220 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip) 221 */ 222 MongoCursor skip(long count) 223 { 224 m_data.skip(count); 225 return this; 226 } 227 228 @safe unittest { 229 import vibe.core.log; 230 import vibe.db.mongo.mongo; 231 232 void test() 233 @safe { 234 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 235 auto coll = db["testcoll"]; 236 237 try { coll.drop(); } catch (Exception) {} 238 239 for (int i = 0; i < 10000; i++) 240 coll.insertOne(["i": i]); 241 242 static struct Order { int i; } 243 auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array; 244 245 assert(data.length == 2000); 246 assert(data[0]["i"].get!int == 2000); 247 assert(data[$ - 1]["i"].get!int == 3999); 248 } 249 } 250 251 /** 252 Advances the cursor to the next document of the response. 253 254 Note that calling this function is only allowed if empty returns false. 255 */ 256 void popFront() { m_data.popFront(); } 257 258 /** 259 Iterates over all remaining documents. 260 261 Note that iteration is one-way - elements that have already been visited 262 will not be visited again if another iteration is done. 263 264 Throws: An exception if there is a query or communication error. 265 */ 266 auto byPair() 267 { 268 import std.typecons : Tuple, tuple; 269 static struct Rng { 270 private IMongoCursorData!DocType data; 271 @property bool empty() { return data.empty; } 272 @property Tuple!(long, DocType) front() { return tuple(data.index, data.front); } 273 void popFront() { data.popFront(); } 274 } 275 return Rng(m_data); 276 } 277 } 278 279 /// Actual iteration implementation details for MongoCursor. Abstracted using an 280 /// interface because we still have code for legacy (<3.6) MongoDB servers, 281 /// which may still used with the old legacy overloads. 282 private interface IMongoCursorData(DocType) { 283 bool empty() @safe; /// Range implementation 284 long index() @safe; /// Range implementation 285 DocType front() @safe; /// Range implementation 286 void popFront() @safe; /// Range implementation 287 /// Before iterating, specify a MongoDB sort order 288 void sort(Bson order) @safe; 289 /// Before iterating, specify maximum number of returned items 290 void limit(long count) @safe; 291 /// Before iterating, skip the specified number of items (when sorted) 292 void skip(long count) @safe; 293 /// Kills the MongoDB cursor, further iteration attempts will result in 294 /// errors. Call this in the destructor. 295 void killCursors() @safe; 296 /// Define an reference count property on the class, which is returned by 297 /// reference with this method. 298 ref int refCount() @safe; 299 } 300 301 302 /** 303 Deprecated query internals exposed through MongoCursor. 304 */ 305 private deprecated abstract class LegacyMongoCursorData(DocType) : IMongoCursorData!DocType { 306 private { 307 int m_refCount = 1; 308 MongoClient m_client; 309 string m_collection; 310 long m_cursor; 311 long m_nskip; 312 int m_nret; 313 Bson m_sort = Bson(null); 314 int m_offset; 315 size_t m_currentDoc = 0; 316 DocType[] m_documents; 317 bool m_iterationStarted = false; 318 long m_limit = 0; 319 } 320 321 final bool empty() 322 @safe { 323 if (!m_iterationStarted) startIterating(); 324 if (m_limit > 0 && index >= m_limit) { 325 killCursors(); 326 return true; 327 } 328 if( m_currentDoc < m_documents.length ) 329 return false; 330 if( m_cursor == 0 ) 331 return true; 332 333 auto conn = m_client.lockConnection(); 334 conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument); 335 return m_currentDoc >= m_documents.length; 336 } 337 338 final long index() 339 @safe { 340 return m_offset + m_currentDoc; 341 } 342 343 final DocType front() 344 @safe { 345 if (!m_iterationStarted) startIterating(); 346 assert(!empty(), "Cursor has no more data."); 347 return m_documents[m_currentDoc]; 348 } 349 350 final void sort(Bson order) 351 @safe { 352 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 353 m_sort = order; 354 } 355 356 final void limit(long count) 357 @safe { 358 // A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit. 359 if (count > 0) { 360 if (m_nret == 0 || m_nret > count) 361 m_nret = cast(int)min(count, 1024); 362 363 if (m_limit == 0 || m_limit > count) 364 m_limit = count; 365 } 366 } 367 368 final void skip(long count) 369 @safe { 370 // A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip. 371 m_nskip = max(m_nskip, count); 372 } 373 374 final void popFront() 375 @safe { 376 if (!m_iterationStarted) startIterating(); 377 assert(!empty(), "Cursor has no more data."); 378 m_currentDoc++; 379 } 380 381 abstract void startIterating() @safe; 382 383 final void killCursors() 384 @safe { 385 if (m_cursor == 0) return; 386 auto conn = m_client.lockConnection(); 387 conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ()); 388 m_cursor = 0; 389 } 390 391 final void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs) 392 { 393 enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle."); 394 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?"); 395 396 m_cursor = cursor; 397 m_offset = first_doc; 398 m_documents.length = num_docs; 399 m_currentDoc = 0; 400 } 401 402 final void handleDocument(size_t idx, ref DocType doc) 403 { 404 m_documents[idx] = doc; 405 } 406 407 final ref int refCount() { return m_refCount; } 408 } 409 410 /** 411 Find + getMore internals exposed through MongoCursor. Unifies the old 412 LegacyMongoCursorData approach, so it can be used both for find queries and 413 for custom commands. 414 */ 415 private class MongoFindCursor(DocType) : IMongoCursorData!DocType { 416 private { 417 int m_refCount = 1; 418 MongoClient m_client; 419 Bson m_findQuery; 420 string m_database; 421 string m_collection; 422 long m_cursor; 423 int m_batchSize; 424 Duration m_maxTime; 425 long m_totalReceived; 426 size_t m_readDoc; 427 size_t m_insertDoc; 428 DocType[] m_documents; 429 bool m_iterationStarted = false; 430 long m_queryLimit; 431 } 432 433 this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max) 434 { 435 m_client = client; 436 m_findQuery = command; 437 m_batchSize = batchSize; 438 m_maxTime = getMoreMaxTime; 439 m_database = command["$db"].opt!string; 440 } 441 442 bool empty() 443 @safe { 444 if (!m_iterationStarted) startIterating(); 445 if (m_queryLimit > 0 && index >= m_queryLimit) { 446 killCursors(); 447 return true; 448 } 449 if( m_readDoc < m_documents.length ) 450 return false; 451 if( m_cursor == 0 ) 452 return true; 453 454 auto conn = m_client.lockConnection(); 455 conn.getMore!DocType(m_cursor, m_database, m_collection, m_batchSize, 456 &handleReply, &handleDocument, m_maxTime); 457 return m_readDoc >= m_documents.length; 458 } 459 460 final long index() 461 @safe { 462 assert(m_totalReceived >= m_documents.length); 463 return m_totalReceived - m_documents.length + m_readDoc; 464 } 465 466 final DocType front() 467 @safe { 468 if (!m_iterationStarted) startIterating(); 469 assert(!empty(), "Cursor has no more data."); 470 return m_documents[m_readDoc]; 471 } 472 473 final void sort(Bson order) 474 @safe { 475 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 476 m_findQuery["sort"] = order; 477 } 478 479 final void limit(long count) 480 @safe { 481 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 482 m_findQuery["limit"] = Bson(count); 483 } 484 485 final void skip(long count) 486 @safe { 487 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 488 m_findQuery["skip"] = Bson(count); 489 } 490 491 final void popFront() 492 @safe { 493 if (!m_iterationStarted) startIterating(); 494 assert(!empty(), "Cursor has no more data."); 495 m_readDoc++; 496 } 497 498 private void startIterating() 499 @safe { 500 auto conn = m_client.lockConnection(); 501 m_totalReceived = 0; 502 m_queryLimit = m_findQuery["limit"].opt!long(0); 503 conn.startFind!DocType(m_findQuery, &handleReply, &handleDocument); 504 m_iterationStarted = true; 505 } 506 507 final void killCursors() 508 @safe { 509 if (m_cursor == 0) return; 510 auto conn = m_client.lockConnection(); 511 conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ()); 512 m_cursor = 0; 513 } 514 515 final void handleReply(long id, string ns, size_t count) 516 { 517 m_cursor = id; 518 m_collection = ns; 519 m_documents.length = count; 520 m_readDoc = 0; 521 m_insertDoc = 0; 522 } 523 524 final void handleDocument(ref DocType doc) 525 { 526 m_documents[m_insertDoc++] = doc; 527 m_totalReceived++; 528 } 529 530 final ref int refCount() { return m_refCount; } 531 } 532 533 /** 534 Internal class implementing MongoCursorData for find queries 535 */ 536 private deprecated class MongoQueryCursor(Q, R, S) : LegacyMongoCursorData!R { 537 private { 538 QueryFlags m_flags; 539 Q m_query; 540 S m_returnFieldSelector; 541 } 542 543 this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 544 { 545 m_client = client; 546 m_collection = collection; 547 m_flags = flags; 548 m_nskip = nskip; 549 m_nret = nret; 550 m_query = query; 551 m_returnFieldSelector = return_field_selector; 552 } 553 554 override void startIterating() 555 @safe { 556 auto conn = m_client.lockConnection(); 557 558 ubyte[256] selector_buf = void; 559 ubyte[256] query_buf = void; 560 561 Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } (); 562 563 Bson query; 564 static if (is(Q == Bson)) { 565 query = m_query; 566 } else { 567 query = () @trusted { return serializeToBson(m_query, query_buf); } (); 568 } 569 570 Bson full_query; 571 572 if (!query["query"].isNull() || !query["$query"].isNull()) { 573 // TODO: emit deprecation warning 574 full_query = query; 575 } else { 576 full_query = Bson.emptyObject; 577 full_query["$query"] = query; 578 } 579 580 if (!m_sort.isNull()) full_query["orderby"] = m_sort; 581 582 conn.query!R(m_collection, m_flags, cast(int)m_nskip, cast(int)m_nret, full_query, selector, &handleReply, &handleDocument); 583 584 m_iterationStarted = true; 585 } 586 } 587 588 /** 589 Internal class implementing MongoCursorData for already initialized generic cursors 590 */ 591 private deprecated class MongoGenericCursor(DocType) : LegacyMongoCursorData!DocType { 592 this(MongoClient client, string collection, long cursor, DocType[] existing_documents) 593 { 594 m_client = client; 595 m_collection = collection; 596 m_cursor = cursor; 597 m_iterationStarted = true; 598 m_documents = existing_documents; 599 } 600 601 override void startIterating() 602 @safe { 603 assert(false, "Calling startIterating on an opaque already initialized cursor"); 604 } 605 }