1 /** 2 MongoDB cursor abstraction 3 4 Copyright: © 2012-2014 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.cursor; 9 10 public import vibe.data.bson; 11 public import vibe.db.mongo.impl.crud; 12 13 import vibe.db.mongo.connection; 14 import vibe.db.mongo.client; 15 16 import core.time; 17 import std.array : array; 18 import std.algorithm : map, max, min, skipOver; 19 import std.exception; 20 import std.range : chain; 21 22 23 /** 24 Represents a cursor for a MongoDB query. 25 26 Use foreach( doc; cursor ) to iterate over the list of documents. 27 28 This struct uses reference counting to destroy the underlying MongoDB cursor. 29 */ 30 struct MongoCursor(DocType = Bson) { 31 private IMongoCursorData!DocType m_data; 32 33 deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported") 34 package this(Q, S)(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 35 { 36 // TODO: avoid memory allocation, if possible 37 m_data = new MongoQueryCursor!(Q, DocType, S)(client, collection, flags, nskip, nret, query, return_field_selector); 38 } 39 40 deprecated("Old (MongoDB <3.6) style cursor iteration no longer supported") 41 package this(MongoClient client, string collection, long cursor, DocType[] existing_documents) 42 { 43 // TODO: avoid memory allocation, if possible 44 m_data = new MongoGenericCursor!DocType(client, collection, cursor, existing_documents); 45 } 46 47 this(Q)(MongoClient client, string database, string collection, Q query, FindOptions options) 48 { 49 Bson command = Bson.emptyObject; 50 command["find"] = Bson(collection); 51 command["$db"] = Bson(database); 52 static if (is(Q == Bson)) 53 command["filter"] = query; 54 else 55 command["filter"] = serializeToBson(query); 56 57 MongoConnection conn = client.lockConnection(); 58 enforceWireVersionConstraints(options, conn.description.maxWireVersion); 59 60 // https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#mapping-op_query-behavior-to-the-find-command-limit-and-batchsize-fields 61 bool singleBatch; 62 if (!options.limit.isNull && options.limit.get < 0) 63 { 64 singleBatch = true; 65 options.limit = -options.limit.get; 66 options.batchSize = cast(int)options.limit.get; 67 } 68 if (!options.batchSize.isNull && options.batchSize.get < 0) 69 { 70 singleBatch = true; 71 options.batchSize = -options.batchSize.get; 72 } 73 if (singleBatch) 74 command["singleBatch"] = Bson(true); 75 76 // https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#semantics-of-maxtimems-for-a-driver 77 bool allowMaxTime = true; 78 if (options.cursorType == CursorType.tailable 79 || options.cursorType == CursorType.tailableAwait) 80 command["tailable"] = Bson(true); 81 else 82 { 83 options.maxAwaitTimeMS.nullify(); 84 allowMaxTime = false; 85 } 86 87 if (options.cursorType == CursorType.tailableAwait) 88 command["awaitData"] = Bson(true); 89 else 90 { 91 options.maxAwaitTimeMS.nullify(); 92 allowMaxTime = false; 93 } 94 95 // see table: https://github.com/mongodb/specifications/blob/525dae0aa8791e782ad9dd93e507b60c55a737bb/source/find_getmore_killcursors_commands.rst#find 96 auto optionsBson = serializeToBson(options); 97 foreach (string key, value; optionsBson.byKeyValue) 98 command[key] = value; 99 100 this(client, command, 101 options.batchSize.isNull ? 0 : options.batchSize.get, 102 !options.maxAwaitTimeMS.isNull ? options.maxAwaitTimeMS.get.msecs 103 : allowMaxTime && !options.maxTimeMS.isNull ? options.maxTimeMS.get.msecs 104 : Duration.max); 105 } 106 107 this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max) 108 { 109 // TODO: avoid memory allocation, if possible 110 m_data = new MongoFindCursor!DocType(client, command, batchSize, getMoreMaxTime); 111 } 112 113 this(this) 114 { 115 if( m_data ) m_data.refCount++; 116 } 117 118 ~this() 119 { 120 if( m_data && --m_data.refCount == 0 ){ 121 m_data.killCursors(); 122 } 123 } 124 125 /** 126 Returns true if there are no more documents for this cursor. 127 128 Throws: An exception if there is a query or communication error. 129 */ 130 @property bool empty() { return m_data ? m_data.empty() : true; } 131 132 /** 133 Returns the current document of the response. 134 135 Use empty and popFront to iterate over the list of documents using an 136 input range interface. Note that calling this function is only allowed 137 if empty returns false. 138 */ 139 @property DocType front() { return m_data.front; } 140 141 /** 142 Controls the order in which the query returns matching documents. 143 144 This method must be called before starting to iterate, or an exception 145 will be thrown. If multiple calls to $(D sort()) are issued, only 146 the last one will have an effect. 147 148 Params: 149 order = A BSON object convertible value that defines the sort order 150 of the result. This BSON object must be structured according to 151 the MongoDB documentation (see below). 152 153 Returns: Reference to the modified original cursor instance. 154 155 Throws: 156 An exception if there is a query or communication error. 157 Also throws if the method was called after beginning of iteration. 158 159 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.sort) 160 */ 161 MongoCursor sort(T)(T order) 162 { 163 m_data.sort(() @trusted { return serializeToBson(order); } ()); 164 return this; 165 } 166 167 /// 168 @safe unittest { 169 import vibe.core.log; 170 import vibe.db.mongo.mongo; 171 172 void test() 173 @safe { 174 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 175 auto coll = db["testcoll"]; 176 177 // find all entries in reverse date order 178 foreach (entry; coll.find().sort(["date": -1])) 179 () @safe { logInfo("Entry: %s", entry); } (); 180 181 // the same, but using a struct to avoid memory allocations 182 static struct Order { int date; } 183 foreach (entry; coll.find().sort(Order(-1))) 184 logInfo("Entry: %s", entry); 185 } 186 } 187 188 /** 189 Limits the number of documents that the cursor returns. 190 191 This method must be called before beginning iteration in order to have 192 effect. If multiple calls to limit() are made, the one with the lowest 193 limit will be chosen. 194 195 Params: 196 count = The maximum number number of documents to return. A value 197 of zero means unlimited. 198 199 Returns: the same cursor 200 201 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.limit) 202 */ 203 MongoCursor limit(long count) 204 { 205 m_data.limit(count); 206 return this; 207 } 208 209 /** 210 Skips a given number of elements at the beginning of the cursor. 211 212 This method must be called before beginning iteration in order to have 213 effect. If multiple calls to skip() are made, the one with the maximum 214 number will be chosen. 215 216 Params: 217 count = The number of documents to skip. 218 219 Returns: the same cursor 220 221 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/cursor.skip) 222 */ 223 MongoCursor skip(long count) 224 { 225 m_data.skip(count); 226 return this; 227 } 228 229 @safe unittest { 230 import vibe.core.log; 231 import vibe.db.mongo.mongo; 232 233 void test() 234 @safe { 235 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 236 auto coll = db["testcoll"]; 237 238 try { coll.drop(); } catch (Exception) {} 239 240 for (int i = 0; i < 10000; i++) 241 coll.insertOne(["i": i]); 242 243 static struct Order { int i; } 244 auto data = coll.find().sort(Order(1)).skip(2000).limit(2000).array; 245 246 assert(data.length == 2000); 247 assert(data[0]["i"].get!int == 2000); 248 assert(data[$ - 1]["i"].get!int == 3999); 249 } 250 } 251 252 /** 253 Advances the cursor to the next document of the response. 254 255 Note that calling this function is only allowed if empty returns false. 256 */ 257 void popFront() { m_data.popFront(); } 258 259 /** 260 Iterates over all remaining documents. 261 262 Note that iteration is one-way - elements that have already been visited 263 will not be visited again if another iteration is done. 264 265 Throws: An exception if there is a query or communication error. 266 */ 267 auto byPair() 268 { 269 import std.typecons : Tuple, tuple; 270 static struct Rng { 271 private IMongoCursorData!DocType data; 272 @property bool empty() { return data.empty; } 273 @property Tuple!(long, DocType) front() { return tuple(data.index, data.front); } 274 void popFront() { data.popFront(); } 275 } 276 return Rng(m_data); 277 } 278 } 279 280 /// Actual iteration implementation details for MongoCursor. Abstracted using an 281 /// interface because we still have code for legacy (<3.6) MongoDB servers, 282 /// which may still used with the old legacy overloads. 283 private interface IMongoCursorData(DocType) { 284 bool empty() @safe; /// Range implementation 285 long index() @safe; /// Range implementation 286 DocType front() @safe; /// Range implementation 287 void popFront() @safe; /// Range implementation 288 /// Before iterating, specify a MongoDB sort order 289 void sort(Bson order) @safe; 290 /// Before iterating, specify maximum number of returned items 291 void limit(long count) @safe; 292 /// Before iterating, skip the specified number of items (when sorted) 293 void skip(long count) @safe; 294 /// Kills the MongoDB cursor, further iteration attempts will result in 295 /// errors. Call this in the destructor. 296 void killCursors() @safe; 297 /// Define an reference count property on the class, which is returned by 298 /// reference with this method. 299 ref int refCount() @safe; 300 } 301 302 303 /** 304 Deprecated query internals exposed through MongoCursor. 305 */ 306 private deprecated abstract class LegacyMongoCursorData(DocType) : IMongoCursorData!DocType { 307 private { 308 int m_refCount = 1; 309 MongoClient m_client; 310 string m_collection; 311 long m_cursor; 312 long m_nskip; 313 int m_nret; 314 Bson m_sort = Bson(null); 315 int m_offset; 316 size_t m_currentDoc = 0; 317 DocType[] m_documents; 318 bool m_iterationStarted = false; 319 long m_limit = 0; 320 } 321 322 final bool empty() 323 @safe { 324 if (!m_iterationStarted) startIterating(); 325 if (m_limit > 0 && index >= m_limit) { 326 killCursors(); 327 return true; 328 } 329 if( m_currentDoc < m_documents.length ) 330 return false; 331 if( m_cursor == 0 ) 332 return true; 333 334 auto conn = m_client.lockConnection(); 335 conn.getMore!DocType(m_collection, m_nret, m_cursor, &handleReply, &handleDocument); 336 return m_currentDoc >= m_documents.length; 337 } 338 339 final long index() 340 @safe { 341 return m_offset + m_currentDoc; 342 } 343 344 final DocType front() 345 @safe { 346 if (!m_iterationStarted) startIterating(); 347 assert(!empty(), "Cursor has no more data."); 348 return m_documents[m_currentDoc]; 349 } 350 351 final void sort(Bson order) 352 @safe { 353 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 354 m_sort = order; 355 } 356 357 final void limit(long count) 358 @safe { 359 // A limit() value of 0 (e.g. “.limit(0)”) is equivalent to setting no limit. 360 if (count > 0) { 361 if (m_nret == 0 || m_nret > count) 362 m_nret = cast(int)min(count, 1024); 363 364 if (m_limit == 0 || m_limit > count) 365 m_limit = count; 366 } 367 } 368 369 final void skip(long count) 370 @safe { 371 // A skip() value of 0 (e.g. “.skip(0)”) is equivalent to setting no skip. 372 m_nskip = max(m_nskip, count); 373 } 374 375 final void popFront() 376 @safe { 377 if (!m_iterationStarted) startIterating(); 378 assert(!empty(), "Cursor has no more data."); 379 m_currentDoc++; 380 } 381 382 abstract void startIterating() @safe; 383 384 final void killCursors() 385 @safe { 386 if (m_cursor == 0) return; 387 auto conn = m_client.lockConnection(); 388 conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ()); 389 m_cursor = 0; 390 } 391 392 final void handleReply(long cursor, ReplyFlags flags, int first_doc, int num_docs) 393 { 394 enforce!MongoDriverException(!(flags & ReplyFlags.CursorNotFound), "Invalid cursor handle."); 395 enforce!MongoDriverException(!(flags & ReplyFlags.QueryFailure), "Query failed. Does the database exist?"); 396 397 m_cursor = cursor; 398 m_offset = first_doc; 399 m_documents.length = num_docs; 400 m_currentDoc = 0; 401 } 402 403 final void handleDocument(size_t idx, ref DocType doc) 404 { 405 m_documents[idx] = doc; 406 } 407 408 final ref int refCount() { return m_refCount; } 409 } 410 411 /** 412 Find + getMore internals exposed through MongoCursor. Unifies the old 413 LegacyMongoCursorData approach, so it can be used both for find queries and 414 for custom commands. 415 */ 416 private class MongoFindCursor(DocType) : IMongoCursorData!DocType { 417 private { 418 int m_refCount = 1; 419 MongoClient m_client; 420 Bson m_findQuery; 421 string m_database; 422 string m_collection; 423 long m_cursor; 424 int m_batchSize; 425 Duration m_maxTime; 426 long m_totalReceived; 427 size_t m_readDoc; 428 size_t m_insertDoc; 429 DocType[] m_documents; 430 bool m_iterationStarted = false; 431 long m_queryLimit; 432 } 433 434 this(MongoClient client, Bson command, int batchSize = 0, Duration getMoreMaxTime = Duration.max) 435 { 436 m_client = client; 437 m_findQuery = command; 438 m_batchSize = batchSize; 439 m_maxTime = getMoreMaxTime; 440 m_database = command["$db"].opt!string; 441 } 442 443 bool empty() 444 @safe { 445 if (!m_iterationStarted) startIterating(); 446 if (m_queryLimit > 0 && index >= m_queryLimit) { 447 killCursors(); 448 return true; 449 } 450 if( m_readDoc < m_documents.length ) 451 return false; 452 if( m_cursor == 0 ) 453 return true; 454 455 auto conn = m_client.lockConnection(); 456 conn.getMore!DocType(m_cursor, m_database, m_collection, m_batchSize, 457 &handleReply, &handleDocument, m_maxTime); 458 return m_readDoc >= m_documents.length; 459 } 460 461 final long index() 462 @safe { 463 assert(m_totalReceived >= m_documents.length); 464 return m_totalReceived - m_documents.length + m_readDoc; 465 } 466 467 final DocType front() 468 @safe { 469 if (!m_iterationStarted) startIterating(); 470 assert(!empty(), "Cursor has no more data."); 471 return m_documents[m_readDoc]; 472 } 473 474 final void sort(Bson order) 475 @safe { 476 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 477 m_findQuery["sort"] = order; 478 } 479 480 final void limit(long count) 481 @safe { 482 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 483 m_findQuery["limit"] = Bson(count); 484 } 485 486 final void skip(long count) 487 @safe { 488 assert(!m_iterationStarted, "Cursor cannot be modified after beginning iteration"); 489 m_findQuery["skip"] = Bson(count); 490 } 491 492 final void popFront() 493 @safe { 494 if (!m_iterationStarted) startIterating(); 495 assert(!empty(), "Cursor has no more data."); 496 m_readDoc++; 497 } 498 499 private void startIterating() 500 @safe { 501 auto conn = m_client.lockConnection(); 502 m_totalReceived = 0; 503 m_queryLimit = m_findQuery["limit"].opt!long(0); 504 conn.startFind!DocType(m_findQuery, &handleReply, &handleDocument); 505 m_iterationStarted = true; 506 } 507 508 final void killCursors() 509 @safe { 510 if (m_cursor == 0) return; 511 auto conn = m_client.lockConnection(); 512 conn.killCursors(m_collection, () @trusted { return (&m_cursor)[0 .. 1]; } ()); 513 m_cursor = 0; 514 } 515 516 final void handleReply(long id, string ns, size_t count) 517 { 518 m_cursor = id; 519 // The qualified collection name is reported here, but when requesting 520 // data, we need to send the database name and the collection name 521 // separately, so we have to remove the database prefix: 522 ns.skipOver(m_database.chain(".")); 523 m_collection = ns; 524 m_documents.length = count; 525 m_readDoc = 0; 526 m_insertDoc = 0; 527 } 528 529 final void handleDocument(ref DocType doc) 530 { 531 m_documents[m_insertDoc++] = doc; 532 m_totalReceived++; 533 } 534 535 final ref int refCount() { return m_refCount; } 536 } 537 538 /** 539 Internal class implementing MongoCursorData for find queries 540 */ 541 private deprecated class MongoQueryCursor(Q, R, S) : LegacyMongoCursorData!R { 542 private { 543 QueryFlags m_flags; 544 Q m_query; 545 S m_returnFieldSelector; 546 } 547 548 this(MongoClient client, string collection, QueryFlags flags, int nskip, int nret, Q query, S return_field_selector) 549 { 550 m_client = client; 551 m_collection = collection; 552 m_flags = flags; 553 m_nskip = nskip; 554 m_nret = nret; 555 m_query = query; 556 m_returnFieldSelector = return_field_selector; 557 } 558 559 override void startIterating() 560 @safe { 561 auto conn = m_client.lockConnection(); 562 563 ubyte[256] selector_buf = void; 564 ubyte[256] query_buf = void; 565 566 Bson selector = () @trusted { return serializeToBson(m_returnFieldSelector, selector_buf); } (); 567 568 Bson query; 569 static if (is(Q == Bson)) { 570 query = m_query; 571 } else { 572 query = () @trusted { return serializeToBson(m_query, query_buf); } (); 573 } 574 575 Bson full_query; 576 577 if (!query["query"].isNull() || !query["$query"].isNull()) { 578 // TODO: emit deprecation warning 579 full_query = query; 580 } else { 581 full_query = Bson.emptyObject; 582 full_query["$query"] = query; 583 } 584 585 if (!m_sort.isNull()) full_query["orderby"] = m_sort; 586 587 conn.query!R(m_collection, m_flags, cast(int)m_nskip, cast(int)m_nret, full_query, selector, &handleReply, &handleDocument); 588 589 m_iterationStarted = true; 590 } 591 } 592 593 /** 594 Internal class implementing MongoCursorData for already initialized generic cursors 595 */ 596 private deprecated class MongoGenericCursor(DocType) : LegacyMongoCursorData!DocType { 597 this(MongoClient client, string collection, long cursor, DocType[] existing_documents) 598 { 599 m_client = client; 600 m_collection = collection; 601 m_cursor = cursor; 602 m_iterationStarted = true; 603 m_documents = existing_documents; 604 } 605 606 override void startIterating() 607 @safe { 608 assert(false, "Calling startIterating on an opaque already initialized cursor"); 609 } 610 }