1 /** 2 MongoDB cursor abstraction 3 4 Copyright: © 2012-2014 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.cursor; 9 10 public import vibe.data.bson; 11 public import vibe.db.mongo.impl.crud; 12 13 import vibe.core.log; 14 15 import vibe.db.mongo.connection; 16 import vibe.db.mongo.client; 17 18 import core.time; 19 import std.array : array; 20 import std.algorithm : map, max, min, skipOver; 21 import std.exception; 22 import std.range : chain; 23 24 25 /** 26 Represents a cursor for a MongoDB query. 27 28 Use foreach( doc; cursor ) to iterate over the list of documents. 29 30 This struct uses reference counting to destroy the underlying MongoDB cursor. 31 */ 32 struct MongoCursor(DocType = Bson) { 33 private IMongoCursorData!DocType m_data; 34 35 deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported") 36 package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 37 { 38 // TODO: avoid memory allocation, if possible 39 m_data = new MongoQueryCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector); 40 } 41 42 deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported") 43 package this(MongoClient client, string collection, long cursor, DocType[] existing_documents) 44 { 45 // TODO: avoid memory allocation, if possible 46 m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents); 47 } 48 49 this(Q)(MongoClient client, string database, string collection, Q query, FindOptions options) 50 { 51 Bson command = Bson.emptyObject; 52 command["find"] = Bson(collection); 53 command["$db"] = Bson(database); 54 static if (is(Q == Bson)) 55 command["filter"] = query; 56 else 57 command["filter"] = serializeToBson(query); 58 59 MongoConnection conn = client.lockConnection(); 60 enforceWireVersionConstraints(options, conn.description.maxWireVersion); 61 62 // https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#mapping-op_query-behavior-to-the-find-command-limit-and-batchsize-fields 63 bool singleBatch; 64 if (!options.limit.isNull && options.limit.get < 0) 65 { 66 singleBatch = true; 67 options.limit = -options.limit.get; 68 options.batchSize = cast(int)options.limit.get; 69 } 70 if (!options.batchSize.isNull && options.batchSize.get < 0) 71 { 72 singleBatch = true; 73 options.batchSize = -options.batchSize.get; 74 } 75 if (singleBatch) 76 command["singleBatch"] = Bson(true); 77 78 // https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#semantics-of-maxtimems-for-a-driver 79 bool allowMaxTime = true; 80 if (options.cursorType == CursorType.tailable 81 || options.cursorType == CursorType.tailableAwait) 82 command["tailable"] = Bson(true); 83 else 84 { 85 options.maxAwaitTimeMS.nullify(); 86 allowMaxTime = false; 87 } 88 89 if (options.cursorType == CursorType.tailableAwait) 90 command["awaitData"] = Bson(true); 91 else 92 { 93 options.maxAwaitTimeMS.nullify(); 94 allowMaxTime = false; 95 } 96 97 // see table: https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#find 98 auto optionsBson = serializeToBson(options); 99 foreach (string key, value; optionsBson.byKeyValue) 100 command[key] = value; 101 102 this(client, command, 103 options.batchSize.isNull ? 0 : options.batchSize.get, 104 !options.maxAwaitTimeMS.isNull ? options.maxAwaitTimeMS.get.msecs 105 : allowMaxTime && !options.maxTimeMS.isNull ? options.maxTimeMS.get.msecs 106 : Duration.max); 107 } 108 109 this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max) 110 { 111 // TODO: avoid memory allocation, if possible 112 m_data = new MongoFindCursor!DocType(client, command, batchSize, getMoreMaxTime); 113 } 114 115 this(this) 116 { 117 if( m_data ) m_data.refCount++; 118 } 119 120 ~this() @safe 121 { 122 import core.memory : GC; 123 124 if (m_data && --m_data.refCount == 0) { 125 if (m_data.alive) { 126 // avoid InvalidMemoryOperation errors in case the cursor was 127 // leaked to the GC 128 if(GC.inFinalizer) { 129 logError("MongoCursor instance that has not been fully processed leaked to the GC!"); 130 } else { 131 try m_data.killCursors(); 132 catch (MongoException e) { 133 logWarn("MongoDB failed to kill cursors: %s", e.msg); 134 logDiagnostic("%s", (() @trusted => e.toString)()); 135 } 136 } 137 } 138 } 139 } 140 141 /** 142 Returns true if there are no more documents for this cursor. 143 144 Throws: An exception if there is a query or communication error. 145 */ 146 @property bool empty() { return m_data ? m_data.empty() : true; } 147 148 /** 149 Returns the current document of the response. 150 151 Use empty and popFront to iterate over the list of documents using an 152 input range interface. Note that calling this function is only allowed 153 if empty returns false. 154 */ 155 @property DocType front() { return m_data.front; } 156 157 /** 158 Controls the order in which the query returns matching documents. 159 160 This method must be called before starting to iterate, or an exception 161 will be thrown. If multiple calls to $(D sort()) are issued, only 162 the last one will have an effect. 163 164 Params: 165 order = A BSON object convertible value that defines the sort order 166 of the result. This BSON object must be structured according to 167 the MongoDB documentation (see below). 168 169 Returns: Reference to the modified original cursor instance. 170 171 Throws: 172 An exception if there is a query or communication error. 173 Also throws if the method was called after beginning of iteration. 174 175 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort) 176 */ 177 MongoCursor sort(T)(T order) 178 { 179 m_data.sort(() @trusted { return serializeToBson(order); } ()); 180 return this; 181 } 182 183 /// 184 @safe unittest { 185 import vibe.core.log; 186 import vibe.db.mongo.mongo; 187 188 void test() 189 @safe { 190 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 191 auto coll = db["testcoll"]; 192 193 // find all entries in reverse date order 194 foreach (entry; coll.find().sort(["date": -1])) 195 () @safe { logInfo("Entry: %s", entry); } (); 196 197 // the same, but using a struct to avoid memory allocations 198 static struct Order { int date; } 199 foreach (entry; coll.find().sort(Order(-1))) 200 logInfo("Entry: %s", entry); 201 } 202 } 203 204 /** 205 Limits the number of documents that the cursor returns. 206 207 This method must be called before beginning iteration in order to have 208 effect. If multiple calls to limit() are made, the one with the lowest 209 limit will be chosen. 210 211 Params: 212 count = The maximum number number of documents to return. A value 213 of zero means unlimited. 214 215 Returns: the same cursor 216 217 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit) 218 */ 219 MongoCursor limit(long count) 220 { 221 m_data.limit(count); 222 return this; 223 } 224 225 /** 226 Skips a given number of elements at the beginning of the cursor. 227 228 This method must be called before beginning iteration in order to have 229 effect. If multiple calls to skip() are made, the one with the maximum 230 number will be chosen. 231 232 Params: 233 count = The number of documents to skip. 234 235 Returns: the same cursor 236 237 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip) 238 */ 239 MongoCursor skip(long count) 240 { 241 m_data.skip(count); 242 return this; 243 } 244 245 @safe unittest { 246 import vibe.core.log; 247 import vibe.db.mongo.mongo; 248 249 void test() 250 @safe { 251 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 252 auto coll = db["testcoll"]; 253 254 try { coll.drop(); } catch (Exception) {} 255 256 for (int i = 0; i < 10000; i++) 257 coll.insertOne(["i": i]); 258 259 static struct Order { int i; } 260 auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array; 261 262 assert(data.length == 2000); 263 assert(data[0]["i"].get!int == 2000); 264 assert(data[$ - 1]["i"].get!int == 3999); 265 } 266 } 267 268 /** 269 Advances the cursor to the next document of the response. 270 271 Note that calling this function is only allowed if empty returns false. 272 */ 273 void popFront() { m_data.popFront(); } 274 275 /** 276 Iterates over all remaining documents. 277 278 Note that iteration is one-way - elements that have already been visited 279 will not be visited again if another iteration is done. 280 281 Throws: An exception if there is a query or communication error. 282 */ 283 auto byPair() 284 { 285 import std.typecons : Tuple, tuple; 286 static struct Rng { 287 private IMongoCursorData!DocType data; 288 @property bool empty() { return data.empty; } 289 @property Tuple!(long, DocType) front() { return tuple(data.index, data.front); } 290 void popFront() { data.popFront(); } 291 } 292 return Rng(m_data); 293 } 294 } 295 296 /// Actual iteration implementation details for MongoCursor. Abstracted using an 297 /// interface because we still have code for legacy (<3.6) MongoDB servers, 298 /// which may still used with the old legacy overloads. 299 private interface IMongoCursorData(DocType) { 300 @property bool alive() @safe nothrow; 301 bool empty() @safe; /// Range implementation 302 long index() @safe; /// Range implementation 303 DocType front() @safe; /// Range implementation 304 void popFront() @safe; /// Range implementation 305 /// Before iterating, specify a MongoDB sort order 306 void sort(Bson order) @safe; 307 /// Before iterating, specify maximum number of returned items 308 void limit(long count) @safe; 309 /// Before iterating, skip the specified number of items (when sorted) 310 void skip(long count) @safe; 311 /// Kills the MongoDB cursor, further iteration attempts will result in 312 /// errors. Call this in the destructor. 313 void killCursors() @safe; 314 /// Define an reference count property on the class, which is returned by 315 /// reference with this method. 316 ref int refCount() @safe; 317 } 318 319 320 /** 321 Deprecated query internals exposed through MongoCursor. 322 */ 323 private deprecated abstract class LegacyMongoCursorData(DocType) : IMongoCursorData!DocType { 324 private { 325 int m_refCount = 1; 326 MongoClient m_client; 327 string m_collection; 328 long m_cursor; 329 long m_nskip; 330 int m_nret; 331 Bson m_sort = Bson(null); 332 int m_offset; 333 size_t m_currentDoc = 0; 334 DocType[] m_documents; 335 bool m_iterationStarted = false; 336 long m_limit = 0; 337 } 338 339 @property bool alive() @safe nothrow { return m_cursor != 0; } 340 341 final bool empty() 342 @safe { 343 if (!m_iterationStarted) startIterating(); 344 if (m_limit > 0 && index >= m_limit) { 345 killCursors(); 346 return true; 347 } 348 if( m_currentDoc < m_documents.length ) 349 return false; 350 if( m_cursor == 0 ) 351 return true; 352 353 auto conn = m_client.lockConnection(); 354 conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument); 355 return m_currentDoc >= m_documents.length; 356 } 357 358 final long index() 359 @safe { 360 return m_offset + m_currentDoc; 361 } 362 363 final DocType front() 364 @safe { 365 if (!m_iterationStarted) startIterating(); 366 assert(!empty(), "Cursor has no more data."); 367 return m_documents[m_currentDoc]; 368 } 369 370 final void sort(Bson order) 371 @safe { 372 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 373 m_sort = order; 374 } 375 376 final void limit(long count) 377 @safe { 378 // A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit. 379 if (count > 0) { 380 if (m_nret == 0 || m_nret > count) 381 m_nret = cast(int)min(count, 1024); 382 383 if (m_limit == 0 || m_limit > count) 384 m_limit = count; 385 } 386 } 387 388 final void skip(long count) 389 @safe { 390 // A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip. 391 m_nskip = max(m_nskip, count); 392 } 393 394 final void popFront() 395 @safe { 396 if (!m_iterationStarted) startIterating(); 397 assert(!empty(), "Cursor has no more data."); 398 m_currentDoc++; 399 } 400 401 abstract void startIterating() @safe; 402 403 final void killCursors() 404 @safe { 405 if (m_cursor == 0) return; 406 auto conn = m_client.lockConnection(); 407 conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ()); 408 m_cursor = 0; 409 } 410 411 final void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs) 412 { 413 enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle."); 414 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?"); 415 416 m_cursor = cursor; 417 m_offset = first_doc; 418 m_documents.length = num_docs; 419 m_currentDoc = 0; 420 } 421 422 final void handleDocument(size_t idx, ref DocType doc) 423 { 424 m_documents[idx] = doc; 425 } 426 427 final ref int refCount() { return m_refCount; } 428 } 429 430 /** 431 Find + getMore internals exposed through MongoCursor. Unifies the old 432 LegacyMongoCursorData approach, so it can be used both for find queries and 433 for custom commands. 434 */ 435 private class MongoFindCursor(DocType) : IMongoCursorData!DocType { 436 private { 437 int m_refCount = 1; 438 MongoClient m_client; 439 Bson m_findQuery; 440 string m_database; 441 string m_ns; 442 string m_collection; 443 long m_cursor; 444 int m_batchSize; 445 Duration m_maxTime; 446 long m_totalReceived; 447 size_t m_readDoc; 448 size_t m_insertDoc; 449 DocType[] m_documents; 450 bool m_iterationStarted = false; 451 long m_queryLimit; 452 } 453 454 this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max) 455 { 456 m_client = client; 457 m_findQuery = command; 458 m_batchSize = batchSize; 459 m_maxTime = getMoreMaxTime; 460 m_database = command["$db"].opt!string; 461 } 462 463 @property bool alive() @safe nothrow { return m_cursor != 0; } 464 465 bool empty() 466 @safe { 467 if (!m_iterationStarted) startIterating(); 468 if (m_queryLimit > 0 && index >= m_queryLimit) { 469 killCursors(); 470 return true; 471 } 472 if( m_readDoc < m_documents.length ) 473 return false; 474 if( m_cursor == 0 ) 475 return true; 476 477 auto conn = m_client.lockConnection(); 478 conn.getMore!DocType(m_cursor, m_database, m_collection, m_batchSize, 479 &handleReply, &handleDocument, m_maxTime); 480 return m_readDoc >= m_documents.length; 481 } 482 483 final long index() 484 @safe { 485 assert(m_totalReceived >= m_documents.length); 486 return m_totalReceived - m_documents.length + m_readDoc; 487 } 488 489 final DocType front() 490 @safe { 491 if (!m_iterationStarted) startIterating(); 492 assert(!empty(), "Cursor has no more data."); 493 return m_documents[m_readDoc]; 494 } 495 496 final void sort(Bson order) 497 @safe { 498 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 499 m_findQuery["sort"] = order; 500 } 501 502 final void limit(long count) 503 @safe { 504 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 505 m_findQuery["limit"] = Bson(count); 506 } 507 508 final void skip(long count) 509 @safe { 510 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 511 m_findQuery["skip"] = Bson(count); 512 } 513 514 final void popFront() 515 @safe { 516 if (!m_iterationStarted) startIterating(); 517 assert(!empty(), "Cursor has no more data."); 518 m_readDoc++; 519 } 520 521 private void startIterating() 522 @safe { 523 auto conn = m_client.lockConnection(); 524 m_totalReceived = 0; 525 m_queryLimit = m_findQuery["limit"].opt!long(0); 526 conn.startFind!DocType(m_findQuery, &handleReply, &handleDocument); 527 m_iterationStarted = true; 528 } 529 530 final void killCursors() 531 @safe { 532 if (m_cursor == 0) return; 533 auto conn = m_client.lockConnection(); 534 conn.killCursors(m_ns, () @trusted { return (&m_cursor)[0 .. 1]; } ()); 535 m_cursor = 0; 536 } 537 538 final void handleReply(long id, string ns, size_t count) 539 { 540 m_cursor = id; 541 m_ns = ns; 542 // The qualified collection name is reported here, but when requesting 543 // data, we need to send the database name and the collection name 544 // separately, so we have to remove the database prefix: 545 ns.skipOver(m_database.chain(".")); 546 m_collection = ns; 547 m_documents.length = count; 548 m_readDoc = 0; 549 m_insertDoc = 0; 550 } 551 552 final void handleDocument(ref DocType doc) 553 { 554 m_documents[m_insertDoc++] = doc; 555 m_totalReceived++; 556 } 557 558 final ref int refCount() { return m_refCount; } 559 } 560 561 /** 562 Internal class implementing MongoCursorData for find queries 563 */ 564 private deprecated class MongoQueryCursor(Q, R, S) : LegacyMongoCursorData!R { 565 private { 566 QueryFlags m_flags; 567 Q m_query; 568 S m_returnFieldSelector; 569 } 570 571 this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 572 { 573 m_client = client; 574 m_collection = collection; 575 m_flags = flags; 576 m_nskip = nskip; 577 m_nret = nret; 578 m_query = query; 579 m_returnFieldSelector = return_field_selector; 580 } 581 582 override void startIterating() 583 @safe { 584 auto conn = m_client.lockConnection(); 585 586 ubyte[256] selector_buf = void; 587 ubyte[256] query_buf = void; 588 589 Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } (); 590 591 Bson query; 592 static if (is(Q == Bson)) { 593 query = m_query; 594 } else { 595 query = () @trusted { return serializeToBson(m_query, query_buf); } (); 596 } 597 598 Bson full_query; 599 600 if (!query["query"].isNull() || !query["$query"].isNull()) { 601 // TODO: emit deprecation warning 602 full_query = query; 603 } else { 604 full_query = Bson.emptyObject; 605 full_query["$query"] = query; 606 } 607 608 if (!m_sort.isNull()) full_query["orderby"] = m_sort; 609 610 conn.query!R(m_collection, m_flags, cast(int)m_nskip, cast(int)m_nret, full_query, selector, &handleReply, &handleDocument); 611 612 m_iterationStarted = true; 613 } 614 } 615 616 /** 617 Internal class implementing MongoCursorData for already initialized generic cursors 618 */ 619 private deprecated class MongoGenericCursor(DocType) : LegacyMongoCursorData!DocType { 620 this(MongoClient client, string collection, long cursor, DocType[] existing_documents) 621 { 622 m_client = client; 623 m_collection = collection; 624 m_cursor = cursor; 625 m_iterationStarted = true; 626 m_documents = existing_documents; 627 } 628 629 override void startIterating() 630 @safe { 631 assert(false, "Calling startIterating on an opaque already initialized cursor"); 632 } 633 }