1 /** 2 Redis database client implementation. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Jan Krüger, Sönke Ludwig, Michael Eisendle, Etienne Cimon 7 */ 8 module vibe.db.redis.redis; 9 10 public import vibe.core.net; 11 12 import vibe.core.connectionpool; 13 import vibe.core.core; 14 import vibe.core.log; 15 import vibe.inet.url; 16 import vibe.container.internal.utilallocator; 17 import vibe.internal.freelistref; 18 import vibe.stream.operations; 19 import std.conv; 20 import std.exception; 21 import std.format; 22 import std.range : isInputRange, isOutputRange; 23 import std.string; 24 import std.traits; 25 import std.typecons : Nullable; 26 import std.utf; 27 28 @safe: 29 30 31 /** 32 Returns a RedisClient that can be used to communicate to the specified database server. 33 */ 34 RedisClient connectRedis(string host, ushort port = RedisClient.defaultPort) 35 { 36 return new RedisClient(host, port); 37 } 38 39 /** 40 Returns a Redis database connection instance corresponding to the given URL. 41 42 The URL must be of the format "redis://server[:port]/dbnum". 43 44 Authentication: 45 Authenticated connections are supported by using a URL connection string 46 such as "redis://password@host". 47 48 Examples: 49 --- 50 // connecting with default settings: 51 auto redisDB = connectRedisDB("redis://127.0.0.1"); 52 --- 53 54 --- 55 // connecting using the URL form with custom settings 56 auto redisDB = connectRedisDB("redis://password:myremotehost/3?maxmemory=10000000"); 57 --- 58 59 Params: 60 url = Redis URI scheme for a Redis database instance 61 62 Returns: 63 A new RedisDatabase instance that can be used to access the database. 64 65 See_also: $(LINK2 https://www.iana.org/assignments/uri-schemes/prov/redis, Redis URI scheme) 66 */ 67 RedisDatabase connectRedisDB(URL url) 68 { 69 auto cli = connectRedis(url.host, url.port != 0 ? url.port : RedisClient.defaultPort); 70 71 if (!url.queryString.empty) 72 { 73 import vibe.inet.webform : FormFields, parseURLEncodedForm; 74 auto query = FormFields.init; 75 parseURLEncodedForm(url.queryString, query); 76 foreach (param, val; query.byKeyValue) 77 { 78 switch (param) 79 { 80 /** 81 The password to use for the Redis AUTH command comes from either the 82 password portion of the "userinfo" URI field or the value from the 83 key-value pair from the "query" URI field with the key "password". 84 If both the password portion of the "userinfo" URI field and a 85 "query" URI field key-value pair with the key "password" are present, 86 the semantics for what password to use for authentication are not 87 well-defined. Such situations therefore ought to be avoided. 88 */ 89 case "password": 90 if (!url.password.empty) 91 cli.auth(val); 92 break; 93 default: 94 throw new Exception(`Redis config parameter "` ~ param ~ `" isn't supported`); 95 } 96 } 97 } 98 99 /* 100 Redis' current optional authentication mechanism does not employ a 101 username, but this might change in the future 102 */ 103 if (!url.password.empty) 104 cli.auth(url.password); 105 106 long databaseIndex; 107 if (url.localURI.length >= 2) 108 databaseIndex = url.pathString[1 .. $].to!long; 109 110 return cli.getDatabase(databaseIndex); 111 } 112 113 /** 114 A redis client with connection pooling. 115 */ 116 final class RedisClient { 117 private { 118 ConnectionPool!RedisConnection m_connections; 119 string m_authPassword; 120 string m_version; 121 long m_selectedDB; 122 } 123 124 enum defaultPort = 6379; 125 126 this(string host = "127.0.0.1", ushort port = defaultPort) 127 { 128 m_connections = new ConnectionPool!RedisConnection({ 129 return new RedisConnection(host, port); 130 }); 131 } 132 133 /** Release all connections that are not in use. Call before exiting the 134 * program to avoid relying on the GC to clean up the sockets. 135 */ 136 void releaseUnusedConnections() @safe 137 { 138 // the try/catch in this is for the old vibe.d:core library, not 139 // necessary in vibe-core, but we have to work with both. 140 m_connections.removeUnused((conn) @safe nothrow { 141 try { 142 conn.m_conn.close(); 143 } catch(Exception e) { 144 logDebug("Failed to close unused Redis connection: %s", e.msg); 145 } 146 }); 147 } 148 149 /// Returns Redis version 150 @property string redisVersion() 151 { 152 if(m_version == "") 153 { 154 import std.string; 155 auto info = info(); 156 auto lines = info.splitLines(); 157 if (lines.length > 1) { 158 foreach (string line; lines) { 159 auto lineParams = line.split(":"); 160 if (lineParams.length > 1 && lineParams[0] == "redis_version") { 161 m_version = lineParams[1]; 162 break; 163 } 164 } 165 } 166 } 167 168 return m_version; 169 } 170 171 /** Returns a handle to the given database. 172 */ 173 RedisDatabase getDatabase(long index) { return RedisDatabase(this, index); } 174 175 /** Creates a RedisSubscriber instance for launching a pubsub listener 176 */ 177 RedisSubscriber createSubscriber() { 178 return RedisSubscriber(this); 179 } 180 181 /* 182 Connection 183 */ 184 185 /// Authenticate to the server 186 void auth(string password) { m_authPassword = password; } 187 /// Echo the given string 188 T echo(T, U)(U data) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("ECHO", data); } 189 /// Ping the server 190 void ping() { request("PING"); } 191 /// Close the connection 192 void quit() { request("QUIT"); } 193 194 /* 195 Server 196 */ 197 198 //TODO: BGREWRITEAOF 199 //TODO: BGSAVE 200 201 /// Get the value of a configuration parameter 202 T getConfig(T)(string parameter) if(isValidRedisValueReturn!T) { return request!T("CONFIG", "GET", parameter); } 203 /// Set a configuration parameter to the given value 204 void setConfig(T)(string parameter, T value) if(isValidRedisValueType!T) { request("CONFIG", "SET", parameter, value); } 205 /// Reset the stats returned by INFO 206 void configResetStat() { request("CONFIG", "RESETSTAT"); } 207 208 //TOOD: Debug Object 209 //TODO: Debug Segfault 210 211 /** Deletes all keys from all databases. 212 213 See_also: $(LINK2 http://redis.io/commands/flushall, FLUSHALL) 214 */ 215 void deleteAll() { request("FLUSHALL"); } 216 217 /// Get information and statistics about the server 218 string info() { return request!string("INFO"); } 219 /// Get the UNIX time stamp of the last successful save to disk 220 long lastSave() { return request!long("LASTSAVE"); } 221 //TODO monitor 222 /// Synchronously save the dataset to disk 223 void save() { request("SAVE"); } 224 /// Synchronously save the dataset to disk and then shut down the server 225 void shutdown() { request("SHUTDOWN"); } 226 /// Make the server a slave of another instance, or promote it as master 227 void slaveOf(string host, ushort port) { request("SLAVEOF", host, port); } 228 229 //TODO slowlog 230 //TODO sync 231 232 private T request(T = void, ARGS...)(string command, scope ARGS args) 233 { 234 return requestDB!(T, ARGS)(m_selectedDB, command, args); 235 } 236 237 private T requestDB(T, ARGS...)(long db, string command, scope ARGS args) 238 { 239 auto conn = m_connections.lockConnection(); 240 conn.setAuth(m_authPassword); 241 conn.setDB(db); 242 version (RedisDebug) { 243 import std.conv; 244 string debugargs = command; 245 foreach (i, A; ARGS) debugargs ~= ", " ~ args[i].to!string; 246 } 247 248 static if (is(T == void)) { 249 version (RedisDebug) logDebug("Redis request: %s => void", debugargs); 250 _request!void(conn, command, args); 251 } else static if (!isInstanceOf!(RedisReply, T)) { 252 auto ret = _request!T(conn, command, args); 253 version (RedisDebug) logDebug("Redis request: %s => %s", debugargs, ret.to!string); 254 return ret; 255 } else { 256 auto ret = _request!T(conn, command, args); 257 version (RedisDebug) logDebug("Redis request: %s => RedisReply", debugargs); 258 return ret; 259 } 260 } 261 } 262 263 264 /** 265 Accesses the contents of a Redis database 266 */ 267 struct RedisDatabase { 268 private { 269 RedisClient m_client; 270 long m_index; 271 } 272 273 private this(RedisClient client, long index) 274 { 275 m_client = client; 276 m_index = index; 277 } 278 279 /** The Redis client with which the database is accessed. 280 */ 281 @property inout(RedisClient) client() inout { return m_client; } 282 283 /** Index of the database. 284 */ 285 @property long index() const { return m_index; } 286 287 /** Deletes all keys of the database. 288 289 See_also: $(LINK2 http://redis.io/commands/flushdb, FLUSHDB) 290 */ 291 void deleteAll() { request!void("FLUSHDB"); } 292 /// Delete a key 293 long del(scope string[] keys...) { return request!long("DEL", keys); } 294 /// Determine if a key exists 295 bool exists(string key) { return request!bool("EXISTS", key); } 296 /// Set a key's time to live in seconds 297 bool expire(string key, long seconds) { return request!bool("EXPIRE", key, seconds); } 298 /// Set a key's time to live with D notation. E.g. $(D 5.minutes) for 60 * 5 seconds. 299 bool expire(string key, Duration timeout) { return request!bool("PEXPIRE", key, timeout.total!"msecs"); } 300 /// Set the expiration for a key as a UNIX timestamp 301 bool expireAt(string key, long timestamp) { return request!bool("EXPIREAT", key, timestamp); } 302 /// Find all keys matching the given glob-style pattern (Supported wildcards: *, ?, [ABC]) 303 RedisReply!T keys(T = string)(string pattern) if(isValidRedisValueType!T) { return request!(RedisReply!T)("KEYS", pattern); } 304 /// Move a key to another database 305 bool move(string key, long db) { return request!bool("MOVE", key, db); } 306 /// Remove the expiration from a key 307 bool persist(string key) { return request!bool("PERSIST", key); } 308 //TODO: object 309 /// Return a random key from the keyspace 310 string randomKey() { return request!string("RANDOMKEY"); } 311 /// Rename a key 312 void rename(string key, string newkey) { request("RENAME", key, newkey); } 313 /// Rename a key, only if the new key does not exist 314 bool renameNX(string key, string newkey) { return request!bool("RENAMENX", key, newkey); } 315 //TODO sort 316 /// Get the time to live for a key 317 long ttl(string key) { return request!long("TTL", key); } 318 /// Get the time to live for a key in milliseconds 319 long pttl(string key) { return request!long("PTTL", key); } 320 /// Determine the type stored at key (string, list, set, zset and hash.) 321 string type(string key) { return request!string("TYPE", key); } 322 323 /* 324 String Commands 325 */ 326 327 /// Append a value to a key 328 long append(T)(string key, T suffix) if(isValidRedisValueType!T) { return request!long("APPEND", key, suffix); } 329 /// Decrement the integer value of a key by one 330 long decr(string key, long value = 1) { return value == 1 ? request!long("DECR", key) : request!long("DECRBY", key, value); } 331 /// Get the value of a key 332 T get(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("GET", key); } 333 /// Returns the bit value at offset in the string value stored at key 334 bool getBit(string key, long offset) { return request!bool("GETBIT", key, offset); } 335 /// Get a substring of the string stored at a key 336 T getRange(T = string)(string key, long start, long end) if(isValidRedisValueReturn!T) { return request!T("GETRANGE", key, start, end); } 337 /// Set the string value of a key and return its old value 338 T getSet(T = string, U)(string key, U value) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("GETSET", key, value); } 339 /// Increment the integer value of a key 340 long incr(string key, long value = 1) { return value == 1 ? request!long("INCR", key) : request!long("INCRBY", key, value); } 341 /// Increment the real number value of a key 342 long incr(string key, double value) { return request!long("INCRBYFLOAT", key, value); } 343 /// Get the values of all the given keys 344 RedisReply!T mget(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("MGET", keys); } 345 346 /// Set multiple keys to multiple values 347 void mset(ARGS...)(ARGS args) 348 { 349 static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value"); 350 foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings."); 351 request("MSET", args); 352 } 353 354 /// Set multiple keys to multiple values, only if none of the keys exist 355 bool msetNX(ARGS...)(ARGS args) { 356 static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value"); 357 foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings."); 358 return request!bool("MSETEX", args); 359 } 360 361 /// Set the string value of a key 362 void set(T)(string key, T value) if(isValidRedisValueType!T) { request("SET", key, value); } 363 /// Set the value of a key, only if the key does not exist 364 bool setNX(T)(string key, T value) if(isValidRedisValueType!T) { return request!bool("SETNX", key, value); } 365 /// Set the value of a key, only if the key already exists 366 bool setXX(T)(string key, T value) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "XX"); } 367 /// Set the value of a key, only if the key does not exist, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes. 368 bool setNX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "NX"); } 369 /// Set the value of a key, only if the key already exists, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes. 370 bool setXX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "XX"); } 371 /// Sets or clears the bit at offset in the string value stored at key 372 bool setBit(string key, long offset, bool value) { return request!bool("SETBIT", key, offset, value ? "1" : "0"); } 373 /// Set the value and expiration of a key 374 void setEX(T)(string key, long seconds, T value) if(isValidRedisValueType!T) { request("SETEX", key, seconds, value); } 375 /// Overwrite part of a string at key starting at the specified offset 376 long setRange(T)(string key, long offset, T value) if(isValidRedisValueType!T) { return request!long("SETRANGE", key, offset, value); } 377 /// Get the length of the value stored in a key 378 long strlen(string key) { return request!long("STRLEN", key); } 379 380 /* 381 Hashes 382 */ 383 /// Delete one or more hash fields 384 long hdel(string key, scope string[] fields...) { return request!long("HDEL", key, fields); } 385 /// Determine if a hash field exists 386 bool hexists(string key, string field) { return request!bool("HEXISTS", key, field); } 387 /// Set multiple hash fields to multiple values 388 void hset(T)(string key, string field, T value) if(isValidRedisValueType!T) { request("HSET", key, field, value); } 389 /// Set the value of a hash field, only if the field does not exist 390 bool hsetNX(T)(string key, string field, T value) if(isValidRedisValueType!T) { return request!bool("HSETNX", key, field, value); } 391 /// Get the value of a hash field. 392 T hget(T = string)(string key, string field) if(isValidRedisValueReturn!T) { return request!T("HGET", key, field); } 393 /// Get all the fields and values in a hash 394 RedisReply!T hgetAll(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HGETALL", key); } 395 /// Increment the integer value of a hash field 396 long hincr(string key, string field, long value=1) { return request!long("HINCRBY", key, field, value); } 397 /// Increment the real number value of a hash field 398 long hincr(string key, string field, double value) { return request!long("HINCRBYFLOAT", key, field, value); } 399 /// Get all the fields in a hash 400 RedisReply!T hkeys(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HKEYS", key); } 401 /// Get the number of fields in a hash 402 long hlen(string key) { return request!long("HLEN", key); } 403 /// Get the values of all the given hash fields 404 RedisReply!T hmget(T = string)(string key, scope string[] fields...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HMGET", key, fields); } 405 /// Set multiple hash fields to multiple values 406 void hmset(ARGS...)(string key, ARGS args) { request("HMSET", key, args); } 407 408 /// Get all the values in a hash 409 RedisReply!T hvals(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HVALS", key); } 410 411 /* 412 Lists 413 */ 414 /// Get an element from a list by its index 415 T lindex(T = string)(string key, long index) if(isValidRedisValueReturn!T) { return request!T("LINDEX", key, index); } 416 /// Insert value in the list stored at key before the reference value pivot. 417 long linsertBefore(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "BEFORE", pivot, value); } 418 /// Insert value in the list stored at key after the reference value pivot. 419 long linsertAfter(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "AFTER", pivot, value); } 420 /// Returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned. 421 long llen(string key) { return request!long("LLEN", key); } 422 /// Insert all the specified values at the head of the list stored at key. 423 long lpush(ARGS...)(string key, ARGS args) { return request!long("LPUSH", key, args); } 424 /// Inserts value at the head of the list stored at key, only if key already exists and holds a list. 425 long lpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("LPUSHX", key, value); } 426 /// Insert all the specified values at the tail of the list stored at key. 427 long rpush(ARGS...)(string key, ARGS args) { return request!long("RPUSH", key, args); } 428 /// Inserts value at the tail of the list stored at key, only if key already exists and holds a list. 429 long rpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("RPUSHX", key, value); } 430 /// Returns the specified elements of the list stored at key. 431 RedisReply!T lrange(T = string)(string key, long start, long stop) { return request!(RedisReply!T)("LRANGE", key, start, stop); } 432 /// Removes the first count occurrences of elements equal to value from the list stored at key. 433 long lrem(T)(string key, long count, T value) if(isValidRedisValueType!T) { return request!long("LREM", key, count, value); } 434 /// Sets the list element at index to value. 435 void lset(T)(string key, long index, T value) if(isValidRedisValueType!T) { request("LSET", key, index, value); } 436 /// Trim an existing list so that it will contain only the specified range of elements specified. 437 /// Equivalent to $(D range = range[start .. stop+1]) 438 void ltrim(string key, long start, long stop) { request("LTRIM", key, start, stop); } 439 /// Removes and returns the last element of the list stored at key. 440 T rpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("RPOP", key); } 441 /// Removes and returns the first element of the list stored at key. 442 T lpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("LPOP", key); } 443 /// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks 444 /// the connection when there are no elements to pop from any of the given lists. 445 Nullable!(Tuple!(string, T)) blpop(T = string)(string key, long seconds) if(isValidRedisValueReturn!T) 446 { 447 auto reply = request!(RedisReply!(ubyte[]))("BLPOP", key, seconds); 448 Nullable!(Tuple!(string, T)) ret; 449 if (reply.empty || reply.frontIsNull) return ret; 450 string rkey = reply.front.convertToType!string(); 451 reply.popFront(); 452 ret = tuple(rkey, reply.front.convertToType!T()); 453 return ret; 454 } 455 /// Atomically returns and removes the last element (tail) of the list stored at source, 456 /// and pushes the element at the first element (head) of the list stored at destination. 457 T rpoplpush(T = string)(string key, string destination) if(isValidRedisValueReturn!T) { return request!T("RPOPLPUSH", key, destination); } 458 459 /* 460 Sets 461 */ 462 /// Add the specified members to the set stored at key. Specified members that are already a member of this set are ignored. 463 /// If key does not exist, a new set is created before adding the specified members. 464 long sadd(ARGS...)(string key, ARGS args) { return request!long("SADD", key, args); } 465 /// Returns the set cardinality (number of elements) of the set stored at key. 466 long scard(string key) { return request!long("SCARD", key); } 467 /// Returns the members of the set resulting from the difference between the first set and all the successive sets. 468 RedisReply!T sdiff(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SDIFF", keys); } 469 /// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination. 470 /// If destination already exists, it is overwritten. 471 long sdiffStore(string destination, scope string[] keys...) { return request!long("SDIFFSTORE", destination, keys); } 472 /// Returns the members of the set resulting from the intersection of all the given sets. 473 RedisReply!T sinter(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SINTER", keys); } 474 /// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination. 475 /// If destination already exists, it is overwritten. 476 long sinterStore(string destination, scope string[] keys...) { return request!long("SINTERSTORE", destination, keys); } 477 /// Returns if member is a member of the set stored at key. 478 bool sisMember(T)(string key, T member) if(isValidRedisValueType!T) { return request!bool("SISMEMBER", key, member); } 479 /// Returns all the members of the set value stored at key. 480 RedisReply!T smembers(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SMEMBERS", key); } 481 /// Move member from the set at source to the set at destination. This operation is atomic. 482 /// In every given moment the element will appear to be a member of source or destination for other clients. 483 bool smove(T)(string source, string destination, T member) if(isValidRedisValueType!T) { return request!bool("SMOVE", source, destination, member); } 484 /// Removes and returns a random element from the set value stored at key. 485 T spop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SPOP", key ); } 486 /// Returns a random element from the set stored at key. 487 T srandMember(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SRANDMEMBER", key ); } 488 ///returns count random elements from the set stored at key 489 RedisReply!T srandMember(T = string)(string key, long count) if(isValidRedisValueReturn!T) { return request!(RedisReply!T)("SRANDMEMBER", key, count ); } 490 491 492 /// Remove the specified members from the set stored at key. 493 long srem(ARGS...)(string key, ARGS args) { return request!long("SREM", key, args); } 494 /// Returns the members of the set resulting from the union of all the given sets. 495 RedisReply!T sunion(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SUNION", keys); } 496 /// This command is equal to SUNION, but instead of returning the resulting set, it is stored in destination. 497 long sunionStore(scope string[] keys...) { return request!long("SUNIONSTORE", keys); } 498 499 /* 500 Sorted Sets 501 */ 502 /// Add one or more members to a sorted set, or update its score if it already exists 503 long zadd(ARGS...)(string key, ARGS args) { return request!long("ZADD", key, args); } 504 /// Returns the sorted set cardinality (number of elements) of the sorted set stored at key. 505 long zcard(string key) { return request!long("ZCARD", key); } 506 /// Returns the number of elements in the sorted set at key with a score between min and max 507 long zcount(string RNG = "[]")(string key, double min, double max) { return request!long("ZCOUNT", key, getMinMaxArgs!RNG(min, max)); } 508 /// Increments the score of member in the sorted set stored at key by increment. 509 double zincrby(T)(string key, double value, T member) if (isValidRedisValueType!T) { return request!double("ZINCRBY", key, value, member); } 510 //TODO: zinterstore 511 /// Returns the specified range of elements in the sorted set stored at key. 512 RedisReply!T zrange(T = string)(string key, long start, long end, bool with_scores = false) 513 if(isValidRedisValueType!T) 514 { 515 if (with_scores) return request!(RedisReply!T)("ZRANGE", key, start, end, "WITHSCORES"); 516 else return request!(RedisReply!T)("ZRANGE", key, start, end); 517 } 518 519 long zlexCount(string key, string min = "-", string max = "+") { return request!long("ZLEXCOUNT", key, min, max); } 520 521 /// When all the elements in a sorted set are inserted with the same score, in order to force lexicographical ordering, 522 /// this command returns all the elements in the sorted set at key with a value between min and max. 523 RedisReply!T zrangeByLex(T = string)(string key, string min = "-", string max = "+", long offset = 0, long count = -1) 524 if(isValidRedisValueType!T) 525 { 526 if (offset > 0 || count != -1) return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max, "LIMIT", offset, count); 527 else return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max); 528 } 529 530 /// Returns all the elements in the sorted set at key with a score between start and end inclusively 531 RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, bool with_scores = false) 532 if(isValidRedisValueType!T) 533 { 534 if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES"); 535 else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end)); 536 } 537 538 /// Computes an internal list of elements in the sorted set at key with a score between start and end inclusively, 539 /// and returns a range subselection similar to $(D results[offset .. offset+count]) 540 RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, long offset, long count, bool with_scores = false) 541 if(isValidRedisValueType!T) 542 { 543 assert(offset >= 0); 544 assert(count >= 0); 545 if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES", "LIMIT", offset, count); 546 else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "LIMIT", offset, count); 547 } 548 549 /// Returns the rank of member in the sorted set stored at key, with the scores ordered from low to high. 550 long zrank(T)(string key, T member) 551 if (isValidRedisValueType!T) 552 { 553 auto str = request!string("ZRANK", key, member); 554 return str != "" ? parse!long(str) : -1; 555 } 556 557 /// Removes the specified members from the sorted set stored at key. 558 long zrem(ARGS...)(string key, ARGS members) { return request!long("ZREM", key, members); } 559 /// Removes all elements in the sorted set stored at key with rank between start and stop. 560 long zremRangeByRank(string key, long start, long stop) { return request!long("ZREMRANGEBYRANK", key, start, stop); } 561 /// Removes all elements in the sorted set stored at key with a score between min and max (inclusive). 562 long zremRangeByScore(string RNG = "[]")(string key, double min, double max) { return request!long("ZREMRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max));} 563 /// Returns the specified range of elements in the sorted set stored at key. 564 RedisReply!T zrevRange(T = string)(string key, long start, long end, bool with_scores = false) 565 if(isValidRedisValueType!T) 566 { 567 if (with_scores) return request!(RedisReply!T)("ZREVRANGE", key, start, end, "WITHSCORES"); 568 else return request!(RedisReply!T)("ZREVRANGE", key, start, end); 569 } 570 571 /// Returns all the elements in the sorted set at key with a score between max and min (including elements with score equal to max or min). 572 RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, bool with_scores=false) 573 if(isValidRedisValueType!T) 574 { 575 if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES"); 576 else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max)); 577 } 578 579 /// Computes an internal list of elements in the sorted set at key with a score between max and min, and 580 /// returns a window of elements selected in a way equivalent to $(D results[offset .. offset + count]) 581 RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, long offset, long count, bool with_scores=false) 582 if(isValidRedisValueType!T) 583 { 584 assert(offset >= 0); 585 assert(count >= 0); 586 if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES", "LIMIT", offset, count); 587 else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "LIMIT", offset, count); 588 } 589 590 /// Returns the rank of member in the sorted set stored at key, with the scores ordered from high to low. 591 long zrevRank(T)(string key, T member) 592 if (isValidRedisValueType!T) 593 { 594 auto str = request!string("ZREVRANK", key, member); 595 return str != "" ? parse!long(str) : -1; 596 } 597 598 /// Returns the score of member in the sorted set at key. 599 RedisReply!T zscore(T = string, U)(string key, U member) 600 if(isValidRedisValueType!T && isValidRedisValueType!U) 601 { 602 return request!(RedisReply!T)("ZSCORE", key, member); 603 } 604 605 /* 606 Hyperloglog 607 */ 608 609 /// Adds one or more Keys to a HyperLogLog data structure . 610 long pfadd(ARGS...)(string key, ARGS args) { return request!long("PFADD", key, args); } 611 612 /** Returns the approximated cardinality computed by the HyperLogLog data 613 structure stored at the specified key. 614 615 When called with a single key, returns the approximated cardinality 616 computed by the HyperLogLog data structure stored at the specified 617 variable, which is 0 if the variable does not exist. 618 619 When called with multiple keys, returns the approximated cardinality 620 of the union of the HyperLogLogs passed, by internally merging the 621 HyperLogLogs stored at the provided keys into a temporary HyperLogLog. 622 */ 623 long pfcount(scope string[] keys...) { return request!long("PFCOUNT", keys); } 624 625 /// Merge multiple HyperLogLog values into a new one. 626 void pfmerge(ARGS...)(string destkey, ARGS args) { request("PFMERGE", destkey, args); } 627 628 629 //TODO: zunionstore 630 631 /* 632 Pub / Sub 633 */ 634 635 /// Publishes a message to all clients subscribed at the channel 636 long publish(string channel, string message) 637 { 638 auto str = request!string("PUBLISH", channel, message); 639 return str != "" ? parse!long(str) : -1; 640 } 641 642 /// Inspect the state of the Pub/Sub subsystem 643 RedisReply!T pubsub(T = string)(string subcommand, scope string[] args...) 644 if(isValidRedisValueType!T) 645 { 646 return request!(RedisReply!T)("PUBSUB", subcommand, args); 647 } 648 649 /* 650 TODO: Transactions 651 */ 652 /// Return the number of keys in the selected database 653 long dbSize() { return request!long("DBSIZE"); } 654 655 /* 656 LUA Scripts 657 */ 658 /// Execute a Lua script server side 659 RedisReply!T eval(T = string, ARGS...)(string lua_code, scope string[] keys, scope ARGS args) 660 if(isValidRedisValueType!T) 661 { 662 return request!(RedisReply!T)("EVAL", lua_code, keys.length, keys, args); 663 } 664 /// Evaluates a script cached on the server side by its SHA1 digest. Scripts are cached on the server side using the scriptLoad function. 665 RedisReply!T evalSHA(T = string, ARGS...)(string sha, scope string[] keys, scope ARGS args) 666 if(isValidRedisValueType!T) 667 { 668 return request!(RedisReply!T)("EVALSHA", sha, keys.length, keys, args); 669 } 670 671 //scriptExists 672 //scriptFlush 673 //scriptKill 674 675 /// Load a script into the scripts cache, without executing it. Run it using evalSHA. 676 string scriptLoad(string lua_code) { return request!string("SCRIPT", "LOAD", lua_code); } 677 678 /// Run the specified command and arguments in the Redis database server 679 T request(T = void, ARGS...)(string command, scope ARGS args) 680 { 681 return m_client.requestDB!(T, ARGS)(m_index, command, args); 682 } 683 684 private static string[2] getMinMaxArgs(string RNG)(double min, double max) 685 { 686 // TODO: avoid GC allocations 687 static assert(RNG.length == 2, "The RNG range specification must be two characters long"); 688 689 string[2] ret; 690 string mins, maxs; 691 mins = min == -double.infinity ? "-inf" : min == double.infinity ? "+inf" : format(typeFormatString!double, min); 692 maxs = max == -double.infinity ? "-inf" : max == double.infinity ? "+inf" : format(typeFormatString!double, max); 693 694 static if (RNG[0] == '[') ret[0] = mins; 695 else static if (RNG[0] == '(') ret[0] = '('~mins; 696 else static assert(false, "Opening range specification mist be either '[' or '('."); 697 698 static if (RNG[1] == ']') ret[1] = maxs; 699 else static if (RNG[1] == ')') ret[1] = '('~maxs; 700 else static assert(false, "Closing range specification mist be either ']' or ')'."); 701 702 return ret; 703 } 704 } 705 706 707 /** 708 A redis subscription listener 709 */ 710 import std.datetime; 711 import std.variant; 712 import std.typecons : Tuple, tuple; 713 import std.container : Array; 714 import std.algorithm : canFind; 715 import std.range : takeOne; 716 import std.array : array; 717 718 import vibe.core.concurrency; 719 import vibe.core.sync; 720 721 alias RedisSubscriber = FreeListRef!RedisSubscriberImpl; 722 723 final class RedisSubscriberImpl { 724 private { 725 RedisClient m_client; 726 LockedConnection!RedisConnection m_lockedConnection; 727 bool[string] m_subscriptions; 728 string[] m_pendingSubscriptions; 729 bool m_listening; 730 bool m_stop; 731 Task m_listener; 732 Task m_listenerHelper; 733 Task m_waiter; 734 Task m_stopWaiter; 735 InterruptibleRecursiveTaskMutex m_mutex; 736 InterruptibleTaskMutex m_connMutex; 737 } 738 739 private enum Action { 740 DATA, 741 STOP, 742 STARTED, 743 SUBSCRIBE, 744 UNSUBSCRIBE 745 } 746 747 @property bool isListening() const { 748 return m_listening; 749 } 750 751 /// Get a list of channels with active subscriptions 752 @property string[] subscriptions() const { 753 return () @trusted { return m_subscriptions.keys; } (); 754 } 755 756 bool hasSubscription(string channel) const { 757 return (channel in m_subscriptions) !is null && m_subscriptions[channel]; 758 } 759 760 this(RedisClient client) { 761 762 logTrace("this()"); 763 m_client = client; 764 m_mutex = new InterruptibleRecursiveTaskMutex; 765 m_connMutex = new InterruptibleTaskMutex; 766 } 767 768 // FIXME: instead of waiting here, the class must be reference counted 769 // and destructions needs to be defered until everything is stopped 770 ~this() { 771 logTrace("~this"); 772 waitForStop(); 773 } 774 775 // Task will block until the listener is finished 776 private void waitForStop() 777 scope { 778 logTrace("waitForStop"); 779 if (!m_listening) return; 780 781 void impl() @safe { 782 m_mutex.performLocked!({ 783 m_stopWaiter = Task.getThis(); 784 }); 785 if (!m_listening) return; // verify again in case the mutex was locked by bstop 786 scope(exit) { 787 m_mutex.performLocked!({ 788 m_stopWaiter = Task(); 789 }); 790 } 791 bool stopped; 792 do { 793 () @trusted { receive((Action act) { if (act == Action.STOP) stopped = true; }); } (); 794 } while (!stopped); 795 796 enforce(stopped, "Failed to wait for Redis listener to stop"); 797 } 798 inTask(&impl); 799 } 800 801 /// Stop listening and yield until the operation is complete. 802 void bstop(){ 803 logTrace("bstop"); 804 if (!m_listening) return; 805 806 void impl() @safe { 807 m_mutex.performLocked!({ 808 m_waiter = Task.getThis(); 809 scope(exit) m_waiter = Task(); 810 stop(); 811 812 bool stopped; 813 do { 814 if (!() @trusted { return receiveTimeout(3.seconds, (Action act) { if (act == Action.STOP) stopped = true; }); } ()) 815 break; 816 } while (!stopped); 817 818 enforce(stopped, "Failed to wait for Redis listener to stop"); 819 }); 820 } 821 inTask(&impl); 822 } 823 824 /// Stop listening asynchroneously 825 void stop(){ 826 logTrace("stop"); 827 if (!m_listening) 828 return; 829 830 void impl() @safe { 831 m_mutex.performLocked!({ 832 m_stop = true; 833 () @trusted { m_listener.send(Action.STOP); } (); 834 // send a message to wake up the listenerHelper from the reply 835 if (m_subscriptions.length > 0) { 836 m_connMutex.performLocked!(() { 837 _request_void(m_lockedConnection, "UNSUBSCRIBE", () @trusted { return cast(string[]) m_subscriptions.keys.takeOne.array; } ()); 838 }); 839 sleep(30.msecs); 840 } 841 }); 842 } 843 inTask(&impl); 844 } 845 846 private bool hasNewSubscriptionIn(scope string[] args) { 847 bool has_new; 848 foreach (arg; args) 849 if (!hasSubscription(arg)) 850 has_new = true; 851 if (!has_new) 852 return false; 853 854 return true; 855 } 856 857 private bool anySubscribed(scope string[] args) { 858 859 bool any_subscribed; 860 foreach (arg ; args) { 861 if (hasSubscription(arg)) 862 any_subscribed = true; 863 } 864 return any_subscribed; 865 } 866 867 /// Completes the subscription for a listener to start receiving pubsub messages 868 /// on the corresponding channel(s). Returns instantly if already subscribed. 869 /// If a connection error is thrown here, it stops the listener. 870 void subscribe(scope string[] args...) 871 { 872 logTrace("subscribe"); 873 if (!m_listening) { 874 foreach (arg; args) 875 m_pendingSubscriptions ~= arg; 876 return; 877 } 878 879 if (!hasNewSubscriptionIn(args)) 880 return; 881 882 void impl() @safe { 883 884 scope(failure) { logTrace("Failure"); bstop(); } 885 try { 886 m_mutex.performLocked!({ 887 m_waiter = Task.getThis(); 888 scope(exit) m_waiter = Task(); 889 bool subscribed; 890 m_connMutex.performLocked!({ 891 _request_void(m_lockedConnection, "SUBSCRIBE", args); 892 }); 893 while(!() @trusted { return m_subscriptions.byKey.canFind(args); } ()) { 894 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE); }); } ()) 895 break; 896 897 subscribed = true; 898 } 899 debug { 900 auto keys = () @trusted { return m_subscriptions.keys; } (); 901 logTrace("Can find keys?: %s", keys.canFind(args)); 902 logTrace("Subscriptions: %s", keys); 903 } 904 enforce(subscribed, "Could not complete subscription(s)."); 905 }); 906 } catch (Exception e) { 907 logDebug("Redis subscribe() failed: ", e.msg); 908 } 909 } 910 inTask(&impl); 911 } 912 913 /// Unsubscribes from the channel(s) specified, returns immediately if none 914 /// is currently being listened. 915 /// If a connection error is thrown here, it stops the listener. 916 void unsubscribe(scope string[] args...) 917 { 918 logTrace("unsubscribe"); 919 920 void impl() @safe { 921 922 if (!anySubscribed(args)) 923 return; 924 925 scope(failure) bstop(); 926 assert(m_listening); 927 928 m_mutex.performLocked!({ 929 m_waiter = Task.getThis(); 930 scope(exit) m_waiter = Task(); 931 bool unsubscribed; 932 m_connMutex.performLocked!({ 933 _request_void(m_lockedConnection, "UNSUBSCRIBE", args); 934 }); 935 while(() @trusted { return m_subscriptions.byKey.canFind(args); } ()) { 936 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE); }); } ()) { 937 unsubscribed = false; 938 break; 939 } 940 unsubscribed = true; 941 } 942 debug { 943 auto keys = () @trusted { return m_subscriptions.keys; } (); 944 logTrace("Can find keys?: %s", keys.canFind(args)); 945 logTrace("Subscriptions: %s", keys); 946 } 947 enforce(unsubscribed, "Could not complete unsubscription(s)."); 948 }); 949 } 950 inTask(&impl); 951 } 952 953 /// Same as subscribe, but uses glob patterns, and does not return instantly if 954 /// the subscriptions are already registered. 955 /// throws Exception if the pattern does not yield a new subscription. 956 void psubscribe(scope string[] args...) 957 { 958 logTrace("psubscribe"); 959 void impl() @safe { 960 scope(failure) bstop(); 961 assert(m_listening); 962 m_mutex.performLocked!({ 963 m_waiter = Task.getThis(); 964 scope(exit) m_waiter = Task(); 965 bool subscribed; 966 m_connMutex.performLocked!({ 967 _request_void(m_lockedConnection, "PSUBSCRIBE", args); 968 }); 969 970 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE); }); } ()) 971 subscribed = false; 972 else 973 subscribed = true; 974 975 debug logTrace("Subscriptions: %s", () @trusted { return m_subscriptions.keys; } ()); 976 enforce(subscribed, "Could not complete subscription(s)."); 977 }); 978 } 979 inTask(&impl); 980 } 981 982 /// Same as unsubscribe, but uses glob patterns, and does not return instantly if 983 /// the subscriptions are not registered. 984 /// throws Exception if the pattern does not yield a new unsubscription. 985 void punsubscribe(scope string[] args...) 986 { 987 logTrace("punsubscribe"); 988 void impl() @safe { 989 scope(failure) bstop(); 990 assert(m_listening); 991 m_mutex.performLocked!({ 992 m_waiter = Task.getThis(); 993 scope(exit) m_waiter = Task(); 994 bool unsubscribed; 995 m_connMutex.performLocked!({ 996 _request_void(m_lockedConnection, "PUNSUBSCRIBE", args); 997 }); 998 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE); }); } ()) 999 unsubscribed = false; 1000 else 1001 unsubscribed = true; 1002 1003 debug { 1004 auto keys = () @trusted { return m_subscriptions.keys; } (); 1005 logTrace("Can find keys?: %s", keys.canFind(args)); 1006 logTrace("Subscriptions: %s", keys); 1007 } 1008 enforce(unsubscribed, "Could not complete unsubscription(s)."); 1009 }); 1010 } 1011 inTask(&impl); 1012 } 1013 1014 private static void inTask(scope void delegate() @safe impl) 1015 @trusted { 1016 logTrace("inTask"); 1017 if (Task.getThis() == Task()) 1018 { 1019 Throwable ex; 1020 bool done; 1021 Task task = runTask({ 1022 logDebug("inTask %s", Task.getThis()); 1023 try impl(); 1024 catch (Exception e) { 1025 ex = e; 1026 } 1027 done = true; 1028 }); 1029 task.join(); 1030 logDebug("done"); 1031 if (ex) 1032 throw ex; 1033 } 1034 else 1035 impl(); 1036 } 1037 1038 private void init(){ 1039 1040 logTrace("init"); 1041 if (!m_lockedConnection) { 1042 m_lockedConnection = m_client.m_connections.lockConnection(); 1043 m_lockedConnection.setAuth(m_client.m_authPassword); 1044 m_lockedConnection.setDB(m_client.m_selectedDB); 1045 } 1046 1047 if (!m_lockedConnection.conn || !m_lockedConnection.conn.connected) { 1048 try m_lockedConnection.conn = connectTCP(m_lockedConnection.m_host, m_lockedConnection.m_port); 1049 catch (Exception e) { 1050 throw new Exception(format("Failed to connect to Redis server at %s:%s.", m_lockedConnection.m_host, m_lockedConnection.m_port), __FILE__, __LINE__, e); 1051 } 1052 m_lockedConnection.conn.tcpNoDelay = true; 1053 m_lockedConnection.setAuth(m_client.m_authPassword); 1054 m_lockedConnection.setDB(m_client.m_selectedDB); 1055 } 1056 } 1057 1058 // Same as listen, but blocking 1059 void blisten(void delegate(string, string) @safe onMessage, Duration timeout = 0.seconds) 1060 { 1061 init(); 1062 1063 void onSubscribe(string channel) @safe { 1064 logTrace("Callback subscribe(%s)", channel); 1065 m_subscriptions[channel] = true; 1066 if (m_waiter != Task()) 1067 () @trusted { m_waiter.send(Action.SUBSCRIBE); } (); 1068 } 1069 1070 void onUnsubscribe(string channel) @safe { 1071 logTrace("Callback unsubscribe(%s)", channel); 1072 m_subscriptions.remove(channel); 1073 if (m_waiter != Task()) 1074 () @trusted { m_waiter.send(Action.UNSUBSCRIBE); } (); 1075 } 1076 1077 void teardown() @safe { // teardown 1078 logTrace("Redis listener exiting"); 1079 // More publish commands may be sent to this connection after recycling it, so we 1080 // actively destroy it 1081 m_lockedConnection.conn.close(); 1082 m_lockedConnection.destroy(); 1083 m_listening = false; 1084 } 1085 // http://redis.io/topics/pubsub 1086 /** 1087 SUBSCRIBE first second 1088 *3 1089 $9 1090 subscribe 1091 $5 1092 first 1093 :1 1094 *3 1095 $9 1096 subscribe 1097 $6 1098 second 1099 :2 1100 */ 1101 // This is a simple parser/handler for subscribe/unsubscribe/publish 1102 // commands sent by redis. The PubSub client protocol is simple enough 1103 1104 void pubsub_handler() { 1105 TCPConnection conn = m_lockedConnection.conn; 1106 logTrace("Pubsub handler"); 1107 void dropCRLF() @safe { 1108 ubyte[2] crlf; 1109 conn.read(crlf); 1110 } 1111 size_t readArgs() @safe { 1112 char[8] ucnt; 1113 ubyte[1] num; 1114 size_t i; 1115 do { 1116 conn.read(num); 1117 if (num[0] >= 48 && num[0] <= 57) 1118 ucnt[i] = num[0]; 1119 else break; 1120 i++; 1121 } 1122 while (true); // ascii 1123 ubyte[1] b; 1124 conn.read(b); 1125 logTrace("Found %s", ucnt); 1126 // the new line is consumed when num is not in range. 1127 return ucnt[0 .. i].to!size_t; 1128 } 1129 // find the number of arguments in the array 1130 ubyte[1] symbol; 1131 conn.read(symbol); 1132 enforce(symbol[0] == '*', "Expected '*', got '" ~ symbol.to!string ~ "'"); 1133 size_t args = readArgs(); 1134 // get the number of characters in the first string (the command) 1135 conn.read(symbol); 1136 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1137 size_t cnt = readArgs(); 1138 ubyte[] cmd = () @trusted { return theAllocator.makeArray!ubyte(cnt); } (); 1139 scope(exit) () @trusted { theAllocator.dispose(cmd); } (); 1140 conn.read(cmd); 1141 dropCRLF(); 1142 // find the channel 1143 conn.read(symbol); 1144 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1145 cnt = readArgs(); 1146 ubyte[] str = new ubyte[cnt]; 1147 conn.read(str); 1148 dropCRLF(); 1149 string channel = () @trusted { return cast(string)str; } (); 1150 logTrace("chan: %s", channel); 1151 1152 if (cmd == "message") { // find the message 1153 conn.read(symbol); 1154 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1155 cnt = readArgs(); 1156 str = new ubyte[cnt]; 1157 conn.read(str); // channel 1158 string message = () @trusted { return cast(string)str.idup; } (); 1159 logTrace("msg: %s", message); 1160 dropCRLF(); 1161 onMessage(channel, message); 1162 } 1163 else if (cmd == "subscribe" || cmd == "unsubscribe") { // find the remaining subscriptions 1164 bool is_subscribe = (cmd == "subscribe"); 1165 conn.read(symbol); 1166 enforce(symbol[0] == ':', "Expected ':', got '" ~ symbol.to!string ~ "'"); 1167 cnt = readArgs(); // number of subscriptions 1168 logTrace("subscriptions: %d", cnt); 1169 if (is_subscribe) 1170 onSubscribe(channel); 1171 else 1172 onUnsubscribe(channel); 1173 1174 // todo: enforce the number of subscriptions? 1175 } 1176 else assert(false, "Unrecognized pubsub wire protocol command received"); 1177 } 1178 1179 // Waits for data and advises the handler 1180 m_listenerHelper = runTask(() nothrow { 1181 loop: while(true) { 1182 if (m_stop || !m_lockedConnection.conn) break; 1183 1184 try { 1185 const waitResult = m_lockedConnection.conn.waitForDataEx(100.msecs); 1186 if (m_stop) break; 1187 1188 final switch (waitResult) { 1189 case WaitForDataStatus.noMoreData: 1190 break loop; 1191 case WaitForDataStatus.timeout: 1192 logTrace("No data arrival in 100 ms..."); 1193 continue loop; 1194 case WaitForDataStatus.dataAvailable: 1195 } 1196 1197 // Data has arrived, this task is in charge of notifying the main handler loop 1198 logTrace("Notify data arrival"); 1199 1200 () @trusted { receiveTimeout(0.seconds, (Variant v) {}); } (); // clear message queue 1201 () @trusted { m_listener.send(Action.DATA); } (); 1202 if (!() @trusted { return receiveTimeout(5.seconds, (Action act) { assert(act == Action.DATA); }); } ()) 1203 assert(false); 1204 } catch (Exception e) { 1205 logException(e, "Redis listen task failed - stopping to listen"); 1206 break; 1207 } 1208 } 1209 1210 logTrace("Listener Helper exit."); 1211 try () @trusted { m_listener.send(Action.STOP); } (); 1212 catch (Exception e) assert(false, e.msg); 1213 } ); 1214 1215 m_listening = true; 1216 logTrace("Redis listener now listening"); 1217 if (m_waiter != Task()) 1218 () @trusted { m_waiter.send(Action.STARTED); } (); 1219 1220 if (timeout == 0.seconds) 1221 timeout = 365.days; // make sure 0.seconds is considered as big. 1222 1223 scope(exit) { 1224 logTrace("Redis Listener exit."); 1225 if (!m_stop) { 1226 stop(); // notifies the listenerHelper 1227 } 1228 m_listenerHelper.join(); 1229 // close the data connections 1230 teardown(); 1231 1232 if (m_waiter != Task()) 1233 () @trusted { m_waiter.send(Action.STOP); } (); 1234 if (m_stopWaiter != Task()) 1235 () @trusted { m_stopWaiter.send(Action.STOP); } (); 1236 1237 m_listenerHelper = Task(); 1238 m_listener = Task(); 1239 m_stop = false; 1240 } 1241 1242 // Start waiting for data notifications to arrive 1243 while(true) { 1244 1245 auto handler = (Action act) { 1246 if (act == Action.STOP) m_stop = true; 1247 if (m_stop) return; 1248 logTrace("Calling PubSub Handler"); 1249 m_connMutex.performLocked!({ 1250 pubsub_handler(); // handles one command at a time 1251 }); 1252 () @trusted { m_listenerHelper.send(Action.DATA); } (); 1253 }; 1254 1255 if (!() @trusted { return receiveTimeout(timeout, handler); } () || m_stop) { 1256 logTrace("Redis Listener stopped"); 1257 break; 1258 } 1259 1260 } 1261 1262 } 1263 1264 /// Waits for messages and calls the callback with the channel and the message as arguments. 1265 /// The timeout is passed over to the listener, which closes after the period of inactivity. 1266 /// Use 0.seconds timeout to specify a very long time (365 days) 1267 /// Errors will be sent to Callback Delegate on channel "Error". 1268 Task listen(void delegate(string, string) @safe nothrow callback, Duration timeout = 0.seconds) 1269 { 1270 logTrace("Listen"); 1271 void impl() @safe { 1272 logTrace("Listen"); 1273 m_waiter = Task.getThis(); 1274 scope(exit) m_waiter = Task(); 1275 Throwable ex; 1276 m_listener = runTask(() nothrow { 1277 try blisten(callback, timeout); 1278 catch(Exception e) { 1279 ex = e; 1280 if (m_waiter != Task() && !m_listening) { 1281 try () @trusted { m_waiter.send(Action.STARTED); } (); 1282 catch (Exception e) assert(false, e.msg); 1283 return; 1284 } 1285 callback("Error", e.msg); 1286 } 1287 }); 1288 m_mutex.performLocked!({ 1289 import std.datetime : usecs; 1290 () @trusted { receiveTimeout(2.seconds, (Action act) { assert( act == Action.STARTED); }); } (); 1291 if (ex) throw ex; 1292 enforce(m_listening, "Failed to start listening, timeout of 2 seconds expired"); 1293 }); 1294 1295 foreach (channel; m_pendingSubscriptions) { 1296 subscribe(channel); 1297 } 1298 1299 m_pendingSubscriptions = null; 1300 } 1301 inTask(&impl); 1302 return m_listener; 1303 } 1304 } 1305 1306 1307 1308 /** Range interface to a single Redis reply. 1309 */ 1310 struct RedisReply(T = ubyte[]) { 1311 static assert(isInputRange!RedisReply); 1312 1313 private { 1314 uint m_magic = 0x15f67ab3; 1315 RedisConnection m_conn; 1316 LockedConnection!RedisConnection m_lockedConnection; 1317 } 1318 1319 alias ElementType = T; 1320 1321 private this(RedisConnection conn) 1322 { 1323 m_conn = conn; 1324 auto ctx = &conn.m_replyContext; 1325 assert(ctx.refCount == 0); 1326 *ctx = RedisReplyContext.init; 1327 ctx.refCount++; 1328 } 1329 1330 this(this) 1331 { 1332 assert(m_magic == 0x15f67ab3); 1333 if (m_conn) { 1334 auto ctx = &m_conn.m_replyContext; 1335 assert(ctx.refCount > 0); 1336 ctx.refCount++; 1337 } 1338 } 1339 1340 ~this() 1341 { 1342 assert(m_magic == 0x15f67ab3); 1343 if (m_conn) { 1344 if (!--m_conn.m_replyContext.refCount) 1345 drop(); 1346 } 1347 } 1348 1349 @property bool empty() const { return !m_conn || m_conn.m_replyContext.index >= m_conn.m_replyContext.length; } 1350 1351 /** Returns the current element of the reply. 1352 1353 Note that byte and character arrays may be returned as slices to a 1354 temporary buffer. This buffer will be invalidated on the next call to 1355 $(D popFront), so it needs to be duplicated for permanent storage. 1356 */ 1357 @property T front() 1358 { 1359 assert(!empty, "Accessing the front of an empty RedisReply!"); 1360 auto ctx = &m_conn.m_replyContext; 1361 if (!ctx.hasData) readData(); 1362 1363 ubyte[] ret = ctx.data; 1364 1365 return convertToType!T(ret); 1366 } 1367 1368 @property bool frontIsNull() 1369 const { 1370 assert(!empty, "Accessing the front of an empty RedisReply!"); 1371 return m_conn.m_replyContext.frontIsNull; 1372 } 1373 1374 /** Pops the current element of the reply 1375 */ 1376 void popFront() 1377 { 1378 assert(!empty, "Popping the front of an empty RedisReply!"); 1379 1380 auto ctx = &m_conn.m_replyContext; 1381 1382 if (!ctx.hasData) readData(); // ensure that we actually read the data entry from the wire 1383 clearData(); 1384 ctx.index++; 1385 1386 if (ctx.index >= ctx.length && ctx.refCount == 1) { 1387 ctx.refCount = 0; 1388 m_conn = null; 1389 m_lockedConnection.destroy(); 1390 } 1391 } 1392 1393 1394 /// Legacy property for hasNext/next based iteration 1395 @property bool hasNext() const { return !empty; } 1396 1397 /// Legacy property for hasNext/next based iteration 1398 TN next(TN : E[], E)() 1399 { 1400 assert(hasNext, "end of reply"); 1401 1402 auto ret = front.dup; 1403 popFront(); 1404 return () @trusted { return cast(TN)ret; } (); 1405 } 1406 1407 void drop() 1408 { 1409 if (!m_conn) return; 1410 while (!empty) popFront(); 1411 } 1412 1413 private void readData() 1414 { 1415 auto ctx = &m_conn.m_replyContext; 1416 assert(!ctx.hasData && ctx.initialized); 1417 1418 if (ctx.multi) 1419 readBulk(() @trusted { return cast(string)m_conn.conn.readLine(); } ()); 1420 } 1421 1422 private void clearData() 1423 { 1424 auto ctx = &m_conn.m_replyContext; 1425 ctx.data = null; 1426 ctx.hasData = false; 1427 } 1428 1429 private @property void lockedConnection(ref LockedConnection!RedisConnection conn) 1430 { 1431 assert(m_conn !is null); 1432 m_lockedConnection = conn; 1433 } 1434 1435 private void initialize() 1436 { 1437 assert(m_conn !is null); 1438 auto ctx = &m_conn.m_replyContext; 1439 assert(!ctx.initialized); 1440 ctx.initialized = true; 1441 1442 ubyte[] ln = m_conn.conn.readLine(); 1443 1444 switch (ln[0]) { 1445 default: 1446 m_conn.conn.close(); 1447 throw new Exception(format("Unknown reply type: %s", cast(char)ln[0])); 1448 case '+': ctx.data = ln[1 .. $]; ctx.hasData = true; break; 1449 case '-': throw new Exception(() @trusted { return cast(string)ln[1 .. $]; } ()); 1450 case ':': ctx.data = ln[1 .. $]; ctx.hasData = true; break; 1451 case '$': 1452 readBulk(() @trusted { return cast(string)ln; } ()); 1453 break; 1454 case '*': 1455 if (ln.startsWith(cast(const(ubyte)[])"*-1")) { 1456 ctx.length = 0; // TODO: make this NIL reply distinguishable from a 0-length array 1457 } else { 1458 ctx.multi = true; 1459 scope (failure) m_conn.conn.close(); 1460 ctx.length = to!long(() @trusted { return cast(string)ln[1 .. $]; } ()); 1461 } 1462 break; 1463 } 1464 } 1465 1466 private void readBulk(string sizeLn) 1467 { 1468 assert(m_conn !is null); 1469 auto ctx = &m_conn.m_replyContext; 1470 if (sizeLn.startsWith("$-1")) { 1471 ctx.frontIsNull = true; 1472 ctx.hasData = true; 1473 ctx.data = null; 1474 } else { 1475 auto size = to!size_t(sizeLn[1 .. $]); 1476 auto data = new ubyte[size]; 1477 m_conn.conn.read(data); 1478 m_conn.conn.readLine(); 1479 ctx.frontIsNull = false; 1480 ctx.hasData = true; 1481 ctx.data = data; 1482 } 1483 } 1484 } 1485 1486 class RedisProtocolException : Exception { 1487 this(string message, string file = __FILE__, size_t line = __LINE__, Exception next = null) 1488 { 1489 super(message, file, line, next); 1490 } 1491 } 1492 1493 template isValidRedisValueReturn(T) 1494 { 1495 import std.typecons; 1496 static if (isInstanceOf!(Nullable, T)) { 1497 enum isValidRedisValueReturn = isValidRedisValueType!(typeof(T.init.get())); 1498 } else static if (isInstanceOf!(RedisReply, T)) { 1499 enum isValidRedisValueReturn = isValidRedisValueType!(T.ElementType); 1500 } else enum isValidRedisValueReturn = isValidRedisValueType!T; 1501 } 1502 1503 template isValidRedisValueType(T) 1504 { 1505 enum isValidRedisValueType = is(T : const(char)[]) || is(T : const(ubyte)[]) || is(T == long) || is(T == double) || is(T == bool); 1506 } 1507 1508 private RedisReply!T getReply(T = ubyte)(RedisConnection conn) 1509 { 1510 auto repl = RedisReply!T(conn); 1511 repl.initialize(); 1512 return repl; 1513 } 1514 1515 private struct RedisReplyContext { 1516 long refCount = 0; 1517 ubyte[] data; 1518 bool hasData; 1519 bool multi = false; 1520 bool initialized = false; 1521 bool frontIsNull = false; 1522 long length = 1; 1523 long index = 0; 1524 ubyte[128] dataBuffer; 1525 } 1526 1527 private final class RedisConnection { 1528 private { 1529 string m_host; 1530 ushort m_port; 1531 TCPConnection m_conn; 1532 string m_password; 1533 long m_selectedDB; 1534 RedisReplyContext m_replyContext; 1535 } 1536 1537 this(string host, ushort port) 1538 { 1539 m_host = host; 1540 m_port = port; 1541 } 1542 1543 @property TCPConnection conn() nothrow { return m_conn; } 1544 @property void conn(TCPConnection conn) nothrow { m_conn = conn; } 1545 1546 void setAuth(string password) 1547 { 1548 if (m_password == password) return; 1549 _request_reply(this, "AUTH", password); 1550 m_password = password; 1551 } 1552 1553 void setDB(long index) 1554 { 1555 if (index == m_selectedDB) return; 1556 _request_reply(this, "SELECT", index); 1557 m_selectedDB = index; 1558 } 1559 1560 private static long countArgs(ARGS...)(scope ARGS args) 1561 { 1562 long ret = 0; 1563 foreach (i, A; ARGS) { 1564 static if (isArray!A && !(is(A : const(ubyte[])) || is(A : const(char[])))) { 1565 foreach (arg; args[i]) 1566 ret += countArgs(arg); 1567 } else ret++; 1568 } 1569 return ret; 1570 } 1571 1572 unittest { 1573 assert(countArgs() == 0); 1574 assert(countArgs(1, 2, 3) == 3); 1575 assert(countArgs("1", ["2", "3", "4"]) == 4); 1576 assert(countArgs([["1", "2"], ["3"]]) == 3); 1577 } 1578 1579 private static void writeArgs(R, ARGS...)(R dst, scope ARGS args) 1580 if (isOutputRange!(R, char)) 1581 { 1582 foreach (i, A; ARGS) { 1583 static if (is(A == bool)) { 1584 writeArgs(dst, args[i] ? "1" : "0"); 1585 } else static if (is(A : long) || is(A : real) || is(A == string)) { 1586 auto alen = formattedLength(args[i]); 1587 enum fmt = "$%d\r\n"~typeFormatString!A~"\r\n"; 1588 dst.formattedWrite(fmt, alen, args[i]); 1589 } else static if (is(A : const(ubyte[])) || is(A : const(char[]))) { 1590 dst.formattedWrite("$%s\r\n", args[i].length); 1591 dst.put(args[i]); 1592 dst.put("\r\n"); 1593 } else static if (isArray!A) { 1594 foreach (arg; args[i]) 1595 writeArgs(dst, arg); 1596 } else static assert(false, "Unsupported Redis argument type: " ~ A.stringof); 1597 } 1598 } 1599 1600 unittest { 1601 import std.array : appender; 1602 auto dst = appender!string; 1603 writeArgs(dst, false, true, ["2", "3"], "4", 5.0); 1604 assert(dst.data == "$1\r\n0\r\n$1\r\n1\r\n$1\r\n2\r\n$1\r\n3\r\n$1\r\n4\r\n$1\r\n5\r\n"); 1605 } 1606 1607 private static long formattedLength(ARG)(scope ARG arg) 1608 { 1609 static if (is(ARG == string)) return arg.length; 1610 else { 1611 import vibe.internal.rangeutil; 1612 long length; 1613 auto rangeCnt = RangeCounter(() @trusted { return &length; } ()); 1614 rangeCnt.formattedWrite(typeFormatString!ARG, arg); 1615 return length; 1616 } 1617 } 1618 } 1619 1620 private void _request_void(ARGS...)(RedisConnection conn, string command, scope ARGS args) 1621 { 1622 import vibe.stream.wrapper; 1623 1624 if (!conn.conn || !conn.conn.connected) { 1625 try conn.conn = connectTCP(conn.m_host, conn.m_port); 1626 catch (Exception e) { 1627 throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e); 1628 } 1629 conn.conn.tcpNoDelay = true; 1630 } 1631 1632 auto nargs = conn.countArgs(args); 1633 auto rng = streamOutputRange(conn.conn); 1634 formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command); 1635 RedisConnection.writeArgs(() @trusted { return &rng; } (), args); 1636 } 1637 1638 private RedisReply!T _request_reply(T = ubyte[], ARGS...)(RedisConnection conn, string command, scope ARGS args) 1639 { 1640 import vibe.stream.wrapper; 1641 1642 if (!conn.conn || !conn.conn.connected) { 1643 try conn.conn = connectTCP(conn.m_host, conn.m_port); 1644 catch (Exception e) { 1645 throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e); 1646 } 1647 conn.conn.tcpNoDelay = true; 1648 } 1649 1650 auto nargs = conn.countArgs(args); 1651 auto rng = streamOutputRange(conn.conn); 1652 formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command); 1653 RedisConnection.writeArgs(() @trusted { return &rng; } (), args); 1654 rng.flush(); 1655 1656 return conn.getReply!T; 1657 } 1658 1659 private T _request(T, ARGS...)(LockedConnection!RedisConnection conn, string command, scope ARGS args) 1660 { 1661 import std.typecons; 1662 static if (isInstanceOf!(RedisReply, T)) { 1663 auto reply = _request_reply!(T.ElementType)(conn, command, args); 1664 reply.lockedConnection = conn; 1665 return reply; 1666 } else static if (is(T == void)) { 1667 _request_reply(conn, command, args); 1668 } else static if (isInstanceOf!(Nullable, T)) { 1669 alias TB = typeof(T.init.get()); 1670 auto reply = _request_reply!TB(conn, command, args); 1671 T ret; 1672 if (!reply.frontIsNull) ret = reply.front; 1673 return ret; 1674 } else { 1675 auto reply = _request_reply!T(conn, command, args); 1676 return reply.front; 1677 } 1678 } 1679 1680 private T convertToType(T)(ubyte[] data) /// NOTE: data must be unique! 1681 { 1682 static if (isSomeString!T) () @trusted { validate(cast(T)data); } (); 1683 1684 static if (is(T == ubyte[])) return data; 1685 else static if (is(T == string)) return cast(T)data.idup; 1686 else static if (is(T == bool)) return data[0] == '1'; 1687 else static if (is(T == int) || is(T == long) || is(T == size_t) || is(T == double)) { 1688 auto str = () @trusted { return cast(string)data; } (); 1689 return parse!T(str); 1690 } 1691 else static assert(false, "Unsupported Redis reply type: " ~ T.stringof); 1692 } 1693 1694 private template typeFormatString(T) 1695 { 1696 static if (isFloatingPoint!T) enum typeFormatString = "%.16g"; 1697 else enum typeFormatString = "%s"; 1698 }