1 /** 2 MongoCollection class 3 4 Copyright: © 2012-2016 RejectedSoftware e.K. 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.db.mongo.collection; 9 10 public import vibe.db.mongo.cursor; 11 public import vibe.db.mongo.connection; 12 public import vibe.db.mongo.flags; 13 14 import vibe.core.log; 15 import vibe.db.mongo.client; 16 17 import core.time; 18 import std.algorithm : countUntil, find; 19 import std.array; 20 import std.conv; 21 import std.exception; 22 import std.string; 23 import std.typecons : Tuple, tuple; 24 25 26 /** 27 Represents a single collection inside a MongoDB. 28 29 All methods take arbitrary types for Bson arguments. serializeToBson() is implicitly called on 30 them before they are send to the database. The following example shows some possible ways 31 to specify objects. 32 */ 33 struct MongoCollection { 34 private { 35 MongoClient m_client; 36 MongoDatabase m_db; 37 string m_name; 38 string m_fullPath; 39 } 40 41 this(MongoClient client, string fullPath) 42 @safe { 43 assert(client !is null); 44 m_client = client; 45 46 auto dotidx = fullPath.indexOf('.'); 47 assert(dotidx > 0, "The collection name passed to MongoCollection must be of the form \"dbname.collectionname\"."); 48 49 m_fullPath = fullPath; 50 m_db = m_client.getDatabase(fullPath[0 .. dotidx]); 51 m_name = fullPath[dotidx+1 .. $]; 52 } 53 54 this(ref MongoDatabase db, string name) 55 @safe { 56 assert(db.client !is null); 57 m_client = db.client; 58 m_fullPath = db.name ~ "." ~ name; 59 m_db = db; 60 m_name = name; 61 } 62 63 /** 64 Returns: Root database to which this collection belongs. 65 */ 66 @property MongoDatabase database() @safe { return m_db; } 67 68 /** 69 Returns: Name of this collection (excluding the database name). 70 */ 71 @property string name() const @safe { return m_name; } 72 73 /** 74 Performs an update operation on documents matching 'selector', updating them with 'update'. 75 76 Throws: Exception if a DB communication error occured. 77 See_Also: $(LINK http://www.mongodb.org/display/DOCS/Updating) 78 */ 79 void update(T, U)(T selector, U update, UpdateFlags flags = UpdateFlags.None) 80 { 81 assert(m_client !is null, "Updating uninitialized MongoCollection."); 82 auto conn = m_client.lockConnection(); 83 ubyte[256] selector_buf = void, update_buf = void; 84 conn.update(m_fullPath, flags, serializeToBson(selector, selector_buf), serializeToBson(update, update_buf)); 85 } 86 87 /** 88 Inserts new documents into the collection. 89 90 Note that if the `_id` field of the document(s) is not set, typically 91 using `BsonObjectID.generate()`, the server will generate IDs 92 automatically. If you need to know the IDs of the inserted documents, 93 you need to generate them locally. 94 95 Throws: Exception if a DB communication error occured. 96 See_Also: $(LINK http://www.mongodb.org/display/DOCS/Inserting) 97 */ 98 void insert(T)(T document_or_documents, InsertFlags flags = InsertFlags.None) 99 { 100 assert(m_client !is null, "Inserting into uninitialized MongoCollection."); 101 auto conn = m_client.lockConnection(); 102 Bson[] docs; 103 Bson bdocs = () @trusted { return serializeToBson(document_or_documents); } (); 104 if( bdocs.type == Bson.Type.Array ) docs = cast(Bson[])bdocs; 105 else docs = () @trusted { return (&bdocs)[0 .. 1]; } (); 106 conn.insert(m_fullPath, flags, docs); 107 } 108 109 /** 110 Queries the collection for existing documents. 111 112 If no arguments are passed to find(), all documents of the collection will be returned. 113 114 See_Also: $(LINK http://www.mongodb.org/display/DOCS/Querying) 115 */ 116 MongoCursor!(T, R, U) find(R = Bson, T, U)(T query, U returnFieldSelector, QueryFlags flags = QueryFlags.None, int num_skip = 0, int num_docs_per_chunk = 0) 117 { 118 assert(m_client !is null, "Querying uninitialized MongoCollection."); 119 return MongoCursor!(T, R, U)(m_client, m_fullPath, flags, num_skip, num_docs_per_chunk, query, returnFieldSelector); 120 } 121 122 /// ditto 123 MongoCursor!(T, R, typeof(null)) find(R = Bson, T)(T query) { return find!R(query, null); } 124 125 /// ditto 126 MongoCursor!(Bson, R, typeof(null)) find(R = Bson)() { return find!R(Bson.emptyObject, null); } 127 128 /** Queries the collection for existing documents. 129 130 Returns: 131 By default, a Bson value of the matching document is returned, or $(D Bson(null)) 132 when no document matched. For types R that are not Bson, the returned value is either 133 of type $(D R), or of type $(Nullable!R), if $(D R) is not a reference/pointer type. 134 135 Throws: Exception if a DB communication error or a query error occured. 136 See_Also: $(LINK http://www.mongodb.org/display/DOCS/Querying) 137 */ 138 auto findOne(R = Bson, T, U)(T query, U returnFieldSelector, QueryFlags flags = QueryFlags.None) 139 { 140 import std.traits; 141 import std.typecons; 142 143 auto c = find!R(query, returnFieldSelector, flags, 0, 1); 144 static if (is(R == Bson)) { 145 foreach (doc; c) return doc; 146 return Bson(null); 147 } else static if (is(R == class) || isPointer!R || isDynamicArray!R || isAssociativeArray!R) { 148 foreach (doc; c) return doc; 149 return null; 150 } else { 151 foreach (doc; c) { 152 Nullable!R ret; 153 ret = doc; 154 return ret; 155 } 156 return Nullable!R.init; 157 } 158 } 159 /// ditto 160 auto findOne(R = Bson, T)(T query) { return findOne!R(query, Bson(null)); } 161 162 /** 163 Removes documents from the collection. 164 165 Throws: Exception if a DB communication error occured. 166 See_Also: $(LINK http://www.mongodb.org/display/DOCS/Removing) 167 */ 168 void remove(T)(T selector, DeleteFlags flags = DeleteFlags.None) 169 { 170 assert(m_client !is null, "Removing from uninitialized MongoCollection."); 171 auto conn = m_client.lockConnection(); 172 ubyte[256] selector_buf = void; 173 conn.delete_(m_fullPath, flags, serializeToBson(selector, selector_buf)); 174 } 175 176 /// ditto 177 void remove()() { remove(Bson.emptyObject); } 178 179 /** 180 Combines a modify and find operation to a single atomic operation. 181 182 Params: 183 query = MongoDB query expression to identify the matched document 184 update = Update expression for the matched document 185 returnFieldSelector = Optional map of fields to return in the response 186 187 Throws: 188 An `Exception` will be thrown if an error occurs in the 189 communication with the database server. 190 191 See_Also: $(LINK http://docs.mongodb.org/manual/reference/command/findAndModify) 192 */ 193 Bson findAndModify(T, U, V)(T query, U update, V returnFieldSelector) 194 { 195 static struct CMD { 196 string findAndModify; 197 T query; 198 U update; 199 V fields; 200 } 201 CMD cmd; 202 cmd.findAndModify = m_name; 203 cmd.query = query; 204 cmd.update = update; 205 cmd.fields = returnFieldSelector; 206 auto ret = database.runCommand(cmd); 207 if( !ret["ok"].get!double ) throw new Exception("findAndModify failed."); 208 return ret["value"]; 209 } 210 211 /// ditto 212 Bson findAndModify(T, U)(T query, U update) 213 { 214 return findAndModify(query, update, null); 215 } 216 217 /** 218 Combines a modify and find operation to a single atomic operation with generic options support. 219 220 Params: 221 query = MongoDB query expression to identify the matched document 222 update = Update expression for the matched document 223 options = Generic BSON object that contains additional options 224 fields, such as `"new": true` 225 226 Throws: 227 An `Exception` will be thrown if an error occurs in the 228 communication with the database server. 229 230 See_Also: $(LINK http://docs.mongodb.org/manual/reference/command/findAndModify) 231 */ 232 Bson findAndModifyExt(T, U, V)(T query, U update, V options) 233 { 234 auto bopt = serializeToBson(options); 235 assert(bopt.type == Bson.Type.object, 236 "The options parameter to findAndModifyExt must be a BSON object."); 237 238 Bson cmd = Bson.emptyObject; 239 cmd["findAndModify"] = m_name; 240 cmd["query"] = serializeToBson(query); 241 cmd["update"] = serializeToBson(update); 242 bopt.opApply(delegate int(string key, Bson value) @safe { 243 cmd[key] = value; 244 return 0; 245 }); 246 auto ret = database.runCommand(cmd); 247 enforce(ret["ok"].get!double != 0, "findAndModifyExt failed."); 248 return ret["value"]; 249 } 250 251 /// 252 unittest { 253 import vibe.db.mongo.mongo; 254 255 void test() 256 { 257 auto coll = connectMongoDB("127.0.0.1").getCollection("test"); 258 coll.findAndModifyExt(["name": "foo"], ["$set": ["value": "bar"]], ["new": true]); 259 } 260 } 261 262 /** 263 Counts the results of the specified query expression. 264 265 Throws Exception if a DB communication error occured. 266 See_Also: $(LINK http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-{{count%28%29}}) 267 */ 268 ulong count(T)(T query) 269 { 270 static struct Empty {} 271 static struct CMD { 272 string count; 273 T query; 274 Empty fields; 275 } 276 277 CMD cmd; 278 cmd.count = m_name; 279 cmd.query = query; 280 auto reply = database.runCommand(cmd); 281 enforce(reply["ok"].opt!double == 1 || reply["ok"].opt!int == 1, "Count command failed."); 282 switch (reply["n"].type) with (Bson.Type) { 283 default: assert(false, "Unsupported data type in BSON reply for COUNT"); 284 case double_: return cast(ulong)reply["n"].get!double; // v2.x 285 case int_: return reply["n"].get!int; // v3.x 286 case long_: return reply["n"].get!long; // just in case 287 } 288 } 289 290 /** 291 Calculates aggregate values for the data in a collection. 292 293 Params: 294 pipeline = A sequence of data aggregation processes. These can 295 either be given as separate parameters, or as a single array 296 parameter. 297 298 Returns: An array of documents returned by the pipeline 299 300 Throws: Exception if a DB communication error occured 301 302 See_Also: $(LINK http://docs.mongodb.org/manual/reference/method/db.collection.aggregate) 303 */ 304 Bson aggregate(ARGS...)(ARGS pipeline) 305 { 306 import std.traits; 307 308 static if (ARGS.length == 1 && isArray!(ARGS[0])) 309 alias Pipeline = ARGS[0]; 310 else static struct Pipeline { ARGS args; } 311 312 static struct CMD { 313 string aggregate; 314 @asArray Pipeline pipeline; 315 } 316 317 CMD cmd; 318 cmd.aggregate = m_name; 319 static if (ARGS.length == 1 && isArray!(ARGS[0])) 320 cmd.pipeline = pipeline[0]; 321 else cmd.pipeline.args = pipeline; 322 auto ret = database.runCommand(cmd); 323 enforce(ret["ok"].get!double == 1, "Aggregate command failed."); 324 return ret["result"]; 325 } 326 327 /// Example taken from the MongoDB documentation 328 unittest { 329 import vibe.db.mongo.mongo; 330 331 void test() { 332 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 333 auto results = db["coll"].aggregate( 334 ["$match": ["status": "A"]], 335 ["$group": ["_id": Bson("$cust_id"), 336 "total": Bson(["$sum": Bson("$amount")])]], 337 ["$sort": ["total": -1]]); 338 } 339 } 340 341 /// The same example, but using an array of arguments 342 unittest { 343 import vibe.db.mongo.mongo; 344 345 void test() { 346 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 347 348 Bson[] args; 349 args ~= serializeToBson(["$match": ["status": "A"]]); 350 args ~= serializeToBson(["$group": ["_id": Bson("$cust_id"), 351 "total": Bson(["$sum": Bson("$amount")])]]); 352 args ~= serializeToBson(["$sort": ["total": -1]]); 353 354 auto results = db["coll"].aggregate(args); 355 } 356 } 357 358 /** 359 Returns an input range of all unique values for a certain field for 360 records matching the given query. 361 362 Params: 363 key = Name of the field for which to collect unique values 364 query = The query used to select records 365 366 Returns: 367 An input range with items of type `R` (`Bson` by default) is 368 returned. 369 */ 370 auto distinct(R = Bson, Q)(string key, Q query) 371 { 372 import std.algorithm : map; 373 374 static struct CMD { 375 string distinct; 376 string key; 377 Q query; 378 } 379 CMD cmd; 380 cmd.distinct = m_name; 381 cmd.key = key; 382 cmd.query = query; 383 auto res = m_db.runCommand(cmd); 384 385 enforce(res["ok"].get!double != 0, "Distinct query failed: "~res["errmsg"].opt!string); 386 387 static if (is(R == Bson)) return res["values"].byValue; 388 else return res["values"].byValue.map!(b => deserializeBson!R(b)); 389 } 390 391 /// 392 unittest { 393 import std.algorithm : equal; 394 import vibe.db.mongo.mongo; 395 396 void test() 397 { 398 auto db = connectMongoDB("127.0.0.1").getDatabase("test"); 399 auto coll = db["collection"]; 400 401 coll.drop(); 402 coll.insert(["a": "first", "b": "foo"]); 403 coll.insert(["a": "first", "b": "bar"]); 404 coll.insert(["a": "first", "b": "bar"]); 405 coll.insert(["a": "second", "b": "baz"]); 406 coll.insert(["a": "second", "b": "bam"]); 407 408 auto result = coll.distinct!string("b", ["a": "first"]); 409 410 assert(result.equal(["foo", "bar"])); 411 } 412 } 413 414 /** 415 Creates or updates an index. 416 417 Note that the overload taking an associative array of field orders 418 will be removed. Since the order of fields matters, it is 419 only suitable for single-field indices. 420 */ 421 void ensureIndex(scope const(Tuple!(string, int))[] field_orders, IndexFlags flags = IndexFlags.none, Duration expire_time = 0.seconds) 422 @safe { 423 // TODO: support 2d indexes 424 425 auto key = Bson.emptyObject; 426 auto indexname = appender!string(); 427 bool first = true; 428 foreach (fo; field_orders) { 429 if (!first) indexname.put('_'); 430 else first = false; 431 indexname.put(fo[0]); 432 indexname.put('_'); 433 indexname.put(to!string(fo[1])); 434 key[fo[0]] = Bson(fo[1]); 435 } 436 437 Bson[string] doc; 438 doc["v"] = 1; 439 doc["key"] = key; 440 doc["ns"] = m_fullPath; 441 doc["name"] = indexname.data; 442 if (flags & IndexFlags.unique) doc["unique"] = true; 443 if (flags & IndexFlags.dropDuplicates) doc["dropDups"] = true; 444 if (flags & IndexFlags.background) doc["background"] = true; 445 if (flags & IndexFlags.sparse) doc["sparse"] = true; 446 if (flags & IndexFlags.expireAfterSeconds) doc["expireAfterSeconds"] = expire_time.total!"seconds"; 447 database["system.indexes"].insert(doc); 448 } 449 /// ditto 450 deprecated("Use the overload taking an array of field_orders instead.") 451 void ensureIndex(int[string] field_orders, IndexFlags flags = IndexFlags.none, ulong expireAfterSeconds = 0) 452 @safe { 453 Tuple!(string, int)[] orders; 454 foreach (k, v; field_orders) 455 orders ~= tuple(k, v); 456 ensureIndex(orders, flags, expireAfterSeconds.seconds); 457 } 458 459 void dropIndex(string name) 460 @safe { 461 static struct CMD { 462 string dropIndexes; 463 string index; 464 } 465 466 CMD cmd; 467 cmd.dropIndexes = m_name; 468 cmd.index = name; 469 auto reply = database.runCommand(cmd); 470 enforce(reply["ok"].get!double == 1, "dropIndex command failed."); 471 } 472 473 void drop() 474 @safe { 475 static struct CMD { 476 string drop; 477 } 478 479 CMD cmd; 480 cmd.drop = m_name; 481 auto reply = database.runCommand(cmd); 482 enforce(reply["ok"].get!double == 1, "drop command failed."); 483 } 484 } 485 486 /// 487 unittest { 488 import vibe.data.bson; 489 import vibe.data.json; 490 import vibe.db.mongo.mongo; 491 492 void test() 493 { 494 MongoClient client = connectMongoDB("127.0.0.1"); 495 MongoCollection users = client.getCollection("myapp.users"); 496 497 // canonical version using a Bson object 498 users.insert(Bson(["name": Bson("admin"), "password": Bson("secret")])); 499 500 // short version using a string[string] AA that is automatically 501 // serialized to Bson 502 users.insert(["name": "admin", "password": "secret"]); 503 504 // BSON specific types are also serialized automatically 505 auto uid = BsonObjectID.fromString("507f1f77bcf86cd799439011"); 506 Bson usr = users.findOne(["_id": uid]); 507 508 // JSON is another possibility 509 Json jusr = parseJsonString(`{"name": "admin", "password": "secret"}`); 510 users.insert(jusr); 511 } 512 } 513 514 /// Using the type system to define a document "schema" 515 unittest { 516 import vibe.db.mongo.mongo; 517 import vibe.data.serialization : name; 518 import std.typecons : Nullable; 519 520 // Nested object within a "User" document 521 struct Address { 522 string name; 523 string street; 524 int zipCode; 525 } 526 527 // The document structure of the "myapp.users" collection 528 struct User { 529 @name("_id") BsonObjectID id; // represented as "_id" in the database 530 string loginName; 531 string password; 532 Address address; 533 } 534 535 void test() 536 { 537 MongoClient client = connectMongoDB("127.0.0.1"); 538 MongoCollection users = client.getCollection("myapp.users"); 539 540 // D values are automatically serialized to the internal BSON format 541 // upon insertion - see also vibe.data.serialization 542 User usr; 543 usr.id = BsonObjectID.generate(); 544 usr.loginName = "admin"; 545 usr.password = "secret"; 546 users.insert(usr); 547 548 // find supports direct de-serialization of the returned documents 549 foreach (usr2; users.find!User()) { 550 logInfo("User: %s", usr2.loginName); 551 } 552 553 // the same goes for findOne 554 Nullable!User qusr = users.findOne!User(["_id": usr.id]); 555 if (!qusr.isNull) 556 logInfo("User: %s", qusr.loginName); 557 } 558 }