1 /** 2 MongoDB cursor abstraction 3 4 Copyright: © 2012-2014 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.cursor; 9 10 public import vibe.data.bson; 11 public import vibe.db.mongo.impl.crud; 12 13 import vibe.core.log; 14 15 import vibe.db.mongo.connection; 16 import vibe.db.mongo.client; 17 18 import core.time; 19 import std.array : array; 20 import std.algorithm : map, max, min, skipOver; 21 import std.exception; 22 import std.range : chain; 23 24 25 /** 26 Represents a cursor for a MongoDB query. 27 28 Use foreach( doc; cursor ) to iterate over the list of documents. 29 30 This struct uses reference counting to destroy the underlying MongoDB cursor. 31 */ 32 struct MongoCursor(DocType = Bson) { 33 private IMongoCursorData!DocType m_data; 34 35 deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported") 36 package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 37 { 38 // TODO: avoid memory allocation, if possible 39 m_data = new MongoQueryCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector); 40 } 41 42 deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported") 43 package this(MongoClient client, string collection, long cursor, DocType[] existing_documents) 44 { 45 // TODO: avoid memory allocation, if possible 46 m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents); 47 } 48 49 this(Q)(MongoClient client, string database, string collection, Q query, FindOptions options) 50 { 51 Bson command = Bson.emptyObject; 52 command["find"] = Bson(collection); 53 command["$db"] = Bson(database); 54 static if (is(Q == Bson)) 55 command["filter"] = query; 56 else 57 command["filter"] = serializeToBson(query); 58 59 MongoConnection conn = client.lockConnection(); 60 enforceWireVersionConstraints(options, conn.description.maxWireVersion); 61 62 // https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#mapping-op_query-behavior-to-the-find-command-limit-and-batchsize-fields 63 bool singleBatch; 64 if (!options.limit.isNull && options.limit.get < 0) 65 { 66 singleBatch = true; 67 options.limit = -options.limit.get; 68 options.batchSize = cast(int)options.limit.get; 69 } 70 if (!options.batchSize.isNull && options.batchSize.get < 0) 71 { 72 singleBatch = true; 73 options.batchSize = -options.batchSize.get; 74 } 75 if (singleBatch) 76 command["singleBatch"] = Bson(true); 77 78 // https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#semantics-of-maxtimems-for-a-driver 79 bool allowMaxTime = true; 80 if (options.cursorType == CursorType.tailable 81 || options.cursorType == CursorType.tailableAwait) 82 command["tailable"] = Bson(true); 83 else 84 { 85 options.maxAwaitTimeMS.nullify(); 86 allowMaxTime = false; 87 } 88 89 if (options.cursorType == CursorType.tailableAwait) 90 command["awaitData"] = Bson(true); 91 else 92 { 93 options.maxAwaitTimeMS.nullify(); 94 allowMaxTime = false; 95 } 96 97 // see table: https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#find 98 auto optionsBson = serializeToBson(options); 99 foreach (string key, value; optionsBson.byKeyValue) 100 command[key] = value; 101 102 this(client, command, 103 options.batchSize.isNull ? 0 : options.batchSize.get, 104 !options.maxAwaitTimeMS.isNull ? options.maxAwaitTimeMS.get.msecs 105 : allowMaxTime && !options.maxTimeMS.isNull ? options.maxTimeMS.get.msecs 106 : Duration.max); 107 } 108 109 this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max) 110 { 111 // TODO: avoid memory allocation, if possible 112 m_data = new MongoFindCursor!DocType(client, command, batchSize, getMoreMaxTime); 113 } 114 115 this(this) 116 { 117 if( m_data ) m_data.refCount++; 118 } 119 120 ~this() @safe 121 { 122 if( m_data && --m_data.refCount == 0 ){ 123 try { 124 m_data.killCursors(); 125 } catch (MongoException e) { 126 logWarn("MongoDB failed to kill cursors: %s", e.msg); 127 logDiagnostic("%s", (() @trusted => e.toString)()); 128 } 129 } 130 } 131 132 /** 133 Returns true if there are no more documents for this cursor. 134 135 Throws: An exception if there is a query or communication error. 136 */ 137 @property bool empty() { return m_data ? m_data.empty() : true; } 138 139 /** 140 Returns the current document of the response. 141 142 Use empty and popFront to iterate over the list of documents using an 143 input range interface. Note that calling this function is only allowed 144 if empty returns false. 145 */ 146 @property DocType front() { return m_data.front; } 147 148 /** 149 Controls the order in which the query returns matching documents. 150 151 This method must be called before starting to iterate, or an exception 152 will be thrown. If multiple calls to $(D sort()) are issued, only 153 the last one will have an effect. 154 155 Params: 156 order = A BSON object convertible value that defines the sort order 157 of the result. This BSON object must be structured according to 158 the MongoDB documentation (see below). 159 160 Returns: Reference to the modified original cursor instance. 161 162 Throws: 163 An exception if there is a query or communication error. 164 Also throws if the method was called after beginning of iteration. 165 166 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort) 167 */ 168 MongoCursor sort(T)(T order) 169 { 170 m_data.sort(() @trusted { return serializeToBson(order); } ()); 171 return this; 172 } 173 174 /// 175 @safe unittest { 176 import vibe.core.log; 177 import vibe.db.mongo.mongo; 178 179 void test() 180 @safe { 181 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 182 auto coll = db["testcoll"]; 183 184 // find all entries in reverse date order 185 foreach (entry; coll.find().sort(["date": -1])) 186 () @safe { logInfo("Entry: %s", entry); } (); 187 188 // the same, but using a struct to avoid memory allocations 189 static struct Order { int date; } 190 foreach (entry; coll.find().sort(Order(-1))) 191 logInfo("Entry: %s", entry); 192 } 193 } 194 195 /** 196 Limits the number of documents that the cursor returns. 197 198 This method must be called before beginning iteration in order to have 199 effect. If multiple calls to limit() are made, the one with the lowest 200 limit will be chosen. 201 202 Params: 203 count = The maximum number number of documents to return. A value 204 of zero means unlimited. 205 206 Returns: the same cursor 207 208 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit) 209 */ 210 MongoCursor limit(long count) 211 { 212 m_data.limit(count); 213 return this; 214 } 215 216 /** 217 Skips a given number of elements at the beginning of the cursor. 218 219 This method must be called before beginning iteration in order to have 220 effect. If multiple calls to skip() are made, the one with the maximum 221 number will be chosen. 222 223 Params: 224 count = The number of documents to skip. 225 226 Returns: the same cursor 227 228 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip) 229 */ 230 MongoCursor skip(long count) 231 { 232 m_data.skip(count); 233 return this; 234 } 235 236 @safe unittest { 237 import vibe.core.log; 238 import vibe.db.mongo.mongo; 239 240 void test() 241 @safe { 242 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 243 auto coll = db["testcoll"]; 244 245 try { coll.drop(); } catch (Exception) {} 246 247 for (int i = 0; i < 10000; i++) 248 coll.insertOne(["i": i]); 249 250 static struct Order { int i; } 251 auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array; 252 253 assert(data.length == 2000); 254 assert(data[0]["i"].get!int == 2000); 255 assert(data[$ - 1]["i"].get!int == 3999); 256 } 257 } 258 259 /** 260 Advances the cursor to the next document of the response. 261 262 Note that calling this function is only allowed if empty returns false. 263 */ 264 void popFront() { m_data.popFront(); } 265 266 /** 267 Iterates over all remaining documents. 268 269 Note that iteration is one-way - elements that have already been visited 270 will not be visited again if another iteration is done. 271 272 Throws: An exception if there is a query or communication error. 273 */ 274 auto byPair() 275 { 276 import std.typecons : Tuple, tuple; 277 static struct Rng { 278 private IMongoCursorData!DocType data; 279 @property bool empty() { return data.empty; } 280 @property Tuple!(long, DocType) front() { return tuple(data.index, data.front); } 281 void popFront() { data.popFront(); } 282 } 283 return Rng(m_data); 284 } 285 } 286 287 /// Actual iteration implementation details for MongoCursor. Abstracted using an 288 /// interface because we still have code for legacy (<3.6) MongoDB servers, 289 /// which may still used with the old legacy overloads. 290 private interface IMongoCursorData(DocType) { 291 bool empty() @safe; /// Range implementation 292 long index() @safe; /// Range implementation 293 DocType front() @safe; /// Range implementation 294 void popFront() @safe; /// Range implementation 295 /// Before iterating, specify a MongoDB sort order 296 void sort(Bson order) @safe; 297 /// Before iterating, specify maximum number of returned items 298 void limit(long count) @safe; 299 /// Before iterating, skip the specified number of items (when sorted) 300 void skip(long count) @safe; 301 /// Kills the MongoDB cursor, further iteration attempts will result in 302 /// errors. Call this in the destructor. 303 void killCursors() @safe; 304 /// Define an reference count property on the class, which is returned by 305 /// reference with this method. 306 ref int refCount() @safe; 307 } 308 309 310 /** 311 Deprecated query internals exposed through MongoCursor. 312 */ 313 private deprecated abstract class LegacyMongoCursorData(DocType) : IMongoCursorData!DocType { 314 private { 315 int m_refCount = 1; 316 MongoClient m_client; 317 string m_collection; 318 long m_cursor; 319 long m_nskip; 320 int m_nret; 321 Bson m_sort = Bson(null); 322 int m_offset; 323 size_t m_currentDoc = 0; 324 DocType[] m_documents; 325 bool m_iterationStarted = false; 326 long m_limit = 0; 327 } 328 329 final bool empty() 330 @safe { 331 if (!m_iterationStarted) startIterating(); 332 if (m_limit > 0 && index >= m_limit) { 333 killCursors(); 334 return true; 335 } 336 if( m_currentDoc < m_documents.length ) 337 return false; 338 if( m_cursor == 0 ) 339 return true; 340 341 auto conn = m_client.lockConnection(); 342 conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument); 343 return m_currentDoc >= m_documents.length; 344 } 345 346 final long index() 347 @safe { 348 return m_offset + m_currentDoc; 349 } 350 351 final DocType front() 352 @safe { 353 if (!m_iterationStarted) startIterating(); 354 assert(!empty(), "Cursor has no more data."); 355 return m_documents[m_currentDoc]; 356 } 357 358 final void sort(Bson order) 359 @safe { 360 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 361 m_sort = order; 362 } 363 364 final void limit(long count) 365 @safe { 366 // A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit. 367 if (count > 0) { 368 if (m_nret == 0 || m_nret > count) 369 m_nret = cast(int)min(count, 1024); 370 371 if (m_limit == 0 || m_limit > count) 372 m_limit = count; 373 } 374 } 375 376 final void skip(long count) 377 @safe { 378 // A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip. 379 m_nskip = max(m_nskip, count); 380 } 381 382 final void popFront() 383 @safe { 384 if (!m_iterationStarted) startIterating(); 385 assert(!empty(), "Cursor has no more data."); 386 m_currentDoc++; 387 } 388 389 abstract void startIterating() @safe; 390 391 final void killCursors() 392 @safe { 393 auto conn = m_client.lockConnection(); 394 if (m_cursor == 0) return; 395 conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ()); 396 m_cursor = 0; 397 } 398 399 final void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs) 400 { 401 enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle."); 402 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?"); 403 404 m_cursor = cursor; 405 m_offset = first_doc; 406 m_documents.length = num_docs; 407 m_currentDoc = 0; 408 } 409 410 final void handleDocument(size_t idx, ref DocType doc) 411 { 412 m_documents[idx] = doc; 413 } 414 415 final ref int refCount() { return m_refCount; } 416 } 417 418 /** 419 Find + getMore internals exposed through MongoCursor. Unifies the old 420 LegacyMongoCursorData approach, so it can be used both for find queries and 421 for custom commands. 422 */ 423 private class MongoFindCursor(DocType) : IMongoCursorData!DocType { 424 private { 425 int m_refCount = 1; 426 MongoClient m_client; 427 Bson m_findQuery; 428 string m_database; 429 string m_ns; 430 string m_collection; 431 long m_cursor; 432 int m_batchSize; 433 Duration m_maxTime; 434 long m_totalReceived; 435 size_t m_readDoc; 436 size_t m_insertDoc; 437 DocType[] m_documents; 438 bool m_iterationStarted = false; 439 long m_queryLimit; 440 } 441 442 this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max) 443 { 444 m_client = client; 445 m_findQuery = command; 446 m_batchSize = batchSize; 447 m_maxTime = getMoreMaxTime; 448 m_database = command["$db"].opt!string; 449 } 450 451 bool empty() 452 @safe { 453 if (!m_iterationStarted) startIterating(); 454 if (m_queryLimit > 0 && index >= m_queryLimit) { 455 killCursors(); 456 return true; 457 } 458 if( m_readDoc < m_documents.length ) 459 return false; 460 if( m_cursor == 0 ) 461 return true; 462 463 auto conn = m_client.lockConnection(); 464 conn.getMore!DocType(m_cursor, m_database, m_collection, m_batchSize, 465 &handleReply, &handleDocument, m_maxTime); 466 return m_readDoc >= m_documents.length; 467 } 468 469 final long index() 470 @safe { 471 assert(m_totalReceived >= m_documents.length); 472 return m_totalReceived - m_documents.length + m_readDoc; 473 } 474 475 final DocType front() 476 @safe { 477 if (!m_iterationStarted) startIterating(); 478 assert(!empty(), "Cursor has no more data."); 479 return m_documents[m_readDoc]; 480 } 481 482 final void sort(Bson order) 483 @safe { 484 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 485 m_findQuery["sort"] = order; 486 } 487 488 final void limit(long count) 489 @safe { 490 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 491 m_findQuery["limit"] = Bson(count); 492 } 493 494 final void skip(long count) 495 @safe { 496 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 497 m_findQuery["skip"] = Bson(count); 498 } 499 500 final void popFront() 501 @safe { 502 if (!m_iterationStarted) startIterating(); 503 assert(!empty(), "Cursor has no more data."); 504 m_readDoc++; 505 } 506 507 private void startIterating() 508 @safe { 509 auto conn = m_client.lockConnection(); 510 m_totalReceived = 0; 511 m_queryLimit = m_findQuery["limit"].opt!long(0); 512 conn.startFind!DocType(m_findQuery, &handleReply, &handleDocument); 513 m_iterationStarted = true; 514 } 515 516 final void killCursors() 517 @safe { 518 auto conn = m_client.lockConnection(); 519 if (m_cursor == 0) return; 520 conn.killCursors(m_ns, () @trusted { return (&m_cursor)[0 .. 1]; } ()); 521 m_cursor = 0; 522 } 523 524 final void handleReply(long id, string ns, size_t count) 525 { 526 m_cursor = id; 527 m_ns = ns; 528 // The qualified collection name is reported here, but when requesting 529 // data, we need to send the database name and the collection name 530 // separately, so we have to remove the database prefix: 531 ns.skipOver(m_database.chain(".")); 532 m_collection = ns; 533 m_documents.length = count; 534 m_readDoc = 0; 535 m_insertDoc = 0; 536 } 537 538 final void handleDocument(ref DocType doc) 539 { 540 m_documents[m_insertDoc++] = doc; 541 m_totalReceived++; 542 } 543 544 final ref int refCount() { return m_refCount; } 545 } 546 547 /** 548 Internal class implementing MongoCursorData for find queries 549 */ 550 private deprecated class MongoQueryCursor(Q, R, S) : LegacyMongoCursorData!R { 551 private { 552 QueryFlags m_flags; 553 Q m_query; 554 S m_returnFieldSelector; 555 } 556 557 this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 558 { 559 m_client = client; 560 m_collection = collection; 561 m_flags = flags; 562 m_nskip = nskip; 563 m_nret = nret; 564 m_query = query; 565 m_returnFieldSelector = return_field_selector; 566 } 567 568 override void startIterating() 569 @safe { 570 auto conn = m_client.lockConnection(); 571 572 ubyte[256] selector_buf = void; 573 ubyte[256] query_buf = void; 574 575 Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } (); 576 577 Bson query; 578 static if (is(Q == Bson)) { 579 query = m_query; 580 } else { 581 query = () @trusted { return serializeToBson(m_query, query_buf); } (); 582 } 583 584 Bson full_query; 585 586 if (!query["query"].isNull() || !query["$query"].isNull()) { 587 // TODO: emit deprecation warning 588 full_query = query; 589 } else { 590 full_query = Bson.emptyObject; 591 full_query["$query"] = query; 592 } 593 594 if (!m_sort.isNull()) full_query["orderby"] = m_sort; 595 596 conn.query!R(m_collection, m_flags, cast(int)m_nskip, cast(int)m_nret, full_query, selector, &handleReply, &handleDocument); 597 598 m_iterationStarted = true; 599 } 600 } 601 602 /** 603 Internal class implementing MongoCursorData for already initialized generic cursors 604 */ 605 private deprecated class MongoGenericCursor(DocType) : LegacyMongoCursorData!DocType { 606 this(MongoClient client, string collection, long cursor, DocType[] existing_documents) 607 { 608 m_client = client; 609 m_collection = collection; 610 m_cursor = cursor; 611 m_iterationStarted = true; 612 m_documents = existing_documents; 613 } 614 615 override void startIterating() 616 @safe { 617 assert(false, "Calling startIterating on an opaque already initialized cursor"); 618 } 619 }