1 /** 2 Redis database client implementation. 3 4 Copyright: © 2012-2016 RejectedSoftware e.K. 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Jan Krüger, Sönke Ludwig, Michael Eisendle, Etienne Cimon 7 */ 8 module vibe.db.redis.redis; 9 10 public import vibe.core.net; 11 12 import vibe.core.connectionpool; 13 import vibe.core.core; 14 import vibe.core.log; 15 import vibe.inet.url; 16 import vibe.internal.allocator; 17 import vibe.internal.freelistref; 18 import vibe.stream.operations; 19 import std.conv; 20 import std.exception; 21 import std.format; 22 import std.range : isInputRange, isOutputRange; 23 import std.string; 24 import std.traits; 25 import std.typecons : Nullable; 26 import std.utf; 27 28 @safe: 29 30 31 /** 32 Returns a RedisClient that can be used to communicate to the specified database server. 33 */ 34 RedisClient connectRedis(string host, ushort port = RedisClient.defaultPort) 35 { 36 return new RedisClient(host, port); 37 } 38 39 /** 40 Returns a Redis database connecction instance corresponding to the given URL. 41 42 The URL must be of the format "redis://server[:port]/dbnum". 43 */ 44 RedisDatabase connectRedisDB(URL url) 45 { 46 auto cli = connectRedis(url.host, url.port != 0 ? url.port : RedisClient.defaultPort); 47 // TODO: support password 48 return cli.getDatabase(url.localURI[1 .. $].to!long); 49 } 50 51 /** 52 A redis client with connection pooling. 53 */ 54 final class RedisClient { 55 private { 56 ConnectionPool!RedisConnection m_connections; 57 string m_authPassword; 58 string m_version; 59 long m_selectedDB; 60 } 61 62 enum defaultPort = 6379; 63 64 this(string host = "127.0.0.1", ushort port = defaultPort) 65 { 66 m_connections = new ConnectionPool!RedisConnection({ 67 return new RedisConnection(host, port); 68 }); 69 } 70 71 /// Returns Redis version 72 @property string redisVersion() 73 { 74 if(m_version == "") 75 { 76 import std.string; 77 auto info = info(); 78 auto lines = info.splitLines(); 79 if (lines.length > 1) { 80 foreach (string line; lines) { 81 auto lineParams = line.split(":"); 82 if (lineParams.length > 1 && lineParams[0] == "redis_version") { 83 m_version = lineParams[1]; 84 break; 85 } 86 } 87 } 88 } 89 90 return m_version; 91 } 92 93 /** Returns a handle to the given database. 94 */ 95 RedisDatabase getDatabase(long index) { return RedisDatabase(this, index); } 96 97 /** Creates a RedisSubscriber instance for launching a pubsub listener 98 */ 99 RedisSubscriber createSubscriber() { 100 return RedisSubscriber(this); 101 } 102 103 /* 104 Connection 105 */ 106 107 /// Authenticate to the server 108 void auth(string password) { m_authPassword = password; } 109 /// Echo the given string 110 T echo(T, U)(U data) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("ECHO", data); } 111 /// Ping the server 112 void ping() { request("PING"); } 113 /// Close the connection 114 void quit() { request("QUIT"); } 115 116 /* 117 Server 118 */ 119 120 //TODO: BGREWRITEAOF 121 //TODO: BGSAVE 122 123 /// Get the value of a configuration parameter 124 T getConfig(T)(string parameter) if(isValidRedisValueReturn!T) { return request!T("CONFIG", "GET", parameter); } 125 /// Set a configuration parameter to the given value 126 void setConfig(T)(string parameter, T value) if(isValidRedisValueType!T) { request("CONFIG", "SET", parameter, value); } 127 /// Reset the stats returned by INFO 128 void configResetStat() { request("CONFIG", "RESETSTAT"); } 129 130 //TOOD: Debug Object 131 //TODO: Debug Segfault 132 133 /** Deletes all keys from all databases. 134 135 See_also: $(LINK2 http://redis.io/commands/flushall, FLUSHALL) 136 */ 137 void deleteAll() { request("FLUSHALL"); } 138 139 /// Get information and statistics about the server 140 string info() { return request!string("INFO"); } 141 /// Get the UNIX time stamp of the last successful save to disk 142 long lastSave() { return request!long("LASTSAVE"); } 143 //TODO monitor 144 /// Synchronously save the dataset to disk 145 void save() { request("SAVE"); } 146 /// Synchronously save the dataset to disk and then shut down the server 147 void shutdown() { request("SHUTDOWN"); } 148 /// Make the server a slave of another instance, or promote it as master 149 void slaveOf(string host, ushort port) { request("SLAVEOF", host, port); } 150 151 //TODO slowlog 152 //TODO sync 153 154 private T request(T = void, ARGS...)(string command, scope ARGS args) 155 { 156 return requestDB!(T, ARGS)(m_selectedDB, command, args); 157 } 158 159 private T requestDB(T, ARGS...)(long db, string command, scope ARGS args) 160 { 161 auto conn = m_connections.lockConnection(); 162 conn.setAuth(m_authPassword); 163 conn.setDB(db); 164 version (RedisDebug) { 165 import std.conv; 166 string debugargs = command; 167 foreach (i, A; ARGS) debugargs ~= ", " ~ args[i].to!string; 168 } 169 170 static if (is(T == void)) { 171 version (RedisDebug) logDebug("Redis request: %s => void", debugargs); 172 _request!void(conn, command, args); 173 } else static if (!isInstanceOf!(RedisReply, T)) { 174 auto ret = _request!T(conn, command, args); 175 version (RedisDebug) logDebug("Redis request: %s => %s", debugargs, ret.to!string); 176 return ret; 177 } else { 178 auto ret = _request!T(conn, command, args); 179 version (RedisDebug) logDebug("Redis request: %s => RedisReply", debugargs); 180 return ret; 181 } 182 } 183 } 184 185 186 /** 187 Accesses the contents of a Redis database 188 */ 189 struct RedisDatabase { 190 private { 191 RedisClient m_client; 192 long m_index; 193 } 194 195 private this(RedisClient client, long index) 196 { 197 m_client = client; 198 m_index = index; 199 } 200 201 /** The Redis client with which the database is accessed. 202 */ 203 @property inout(RedisClient) client() inout { return m_client; } 204 205 /** Index of the database. 206 */ 207 @property long index() const { return m_index; } 208 209 /** Deletes all keys of the database. 210 211 See_also: $(LINK2 http://redis.io/commands/flushdb, FLUSHDB) 212 */ 213 void deleteAll() { request!void("FLUSHDB"); } 214 /// Delete a key 215 long del(scope string[] keys...) { return request!long("DEL", keys); } 216 /// Determine if a key exists 217 bool exists(string key) { return request!bool("EXISTS", key); } 218 /// Set a key's time to live in seconds 219 bool expire(string key, long seconds) { return request!bool("EXPIRE", key, seconds); } 220 /// Set a key's time to live with D notation. E.g. $(D 5.minutes) for 60 * 5 seconds. 221 bool expire(string key, Duration timeout) { return request!bool("PEXPIRE", key, timeout.total!"msecs"); } 222 /// Set the expiration for a key as a UNIX timestamp 223 bool expireAt(string key, long timestamp) { return request!bool("EXPIREAT", key, timestamp); } 224 /// Find all keys matching the given glob-style pattern (Supported wildcards: *, ?, [ABC]) 225 RedisReply!T keys(T = string)(string pattern) if(isValidRedisValueType!T) { return request!(RedisReply!T)("KEYS", pattern); } 226 /// Move a key to another database 227 bool move(string key, long db) { return request!bool("MOVE", key, db); } 228 /// Remove the expiration from a key 229 bool persist(string key) { return request!bool("PERSIST", key); } 230 //TODO: object 231 /// Return a random key from the keyspace 232 string randomKey() { return request!string("RANDOMKEY"); } 233 /// Rename a key 234 void rename(string key, string newkey) { request("RENAME", key, newkey); } 235 /// Rename a key, only if the new key does not exist 236 bool renameNX(string key, string newkey) { return request!bool("RENAMENX", key, newkey); } 237 //TODO sort 238 /// Get the time to live for a key 239 long ttl(string key) { return request!long("TTL", key); } 240 /// Get the time to live for a key in milliseconds 241 long pttl(string key) { return request!long("PTTL", key); } 242 /// Determine the type stored at key (string, list, set, zset and hash.) 243 string type(string key) { return request!string("TYPE", key); } 244 245 /* 246 String Commands 247 */ 248 249 /// Append a value to a key 250 long append(T)(string key, T suffix) if(isValidRedisValueType!T) { return request!long("APPEND", key, suffix); } 251 /// Decrement the integer value of a key by one 252 long decr(string key, long value = 1) { return value == 1 ? request!long("DECR", key) : request!long("DECRBY", key, value); } 253 /// Get the value of a key 254 T get(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("GET", key); } 255 /// Returns the bit value at offset in the string value stored at key 256 bool getBit(string key, long offset) { return request!bool("GETBIT", key, offset); } 257 /// Get a substring of the string stored at a key 258 T getRange(T = string)(string key, long start, long end) if(isValidRedisValueReturn!T) { return request!T("GETRANGE", key, start, end); } 259 /// Set the string value of a key and return its old value 260 T getSet(T = string, U)(string key, U value) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("GETSET", key, value); } 261 /// Increment the integer value of a key 262 long incr(string key, long value = 1) { return value == 1 ? request!long("INCR", key) : request!long("INCRBY", key, value); } 263 /// Increment the real number value of a key 264 long incr(string key, double value) { return request!long("INCRBYFLOAT", key, value); } 265 /// Get the values of all the given keys 266 RedisReply!T mget(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("MGET", keys); } 267 268 /// Set multiple keys to multiple values 269 void mset(ARGS...)(ARGS args) 270 { 271 static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value"); 272 foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings."); 273 request("MSET", args); 274 } 275 276 /// Set multiple keys to multiple values, only if none of the keys exist 277 bool msetNX(ARGS...)(ARGS args) { 278 static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value"); 279 foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings."); 280 return request!bool("MSETEX", args); 281 } 282 283 /// Set the string value of a key 284 void set(T)(string key, T value) if(isValidRedisValueType!T) { request("SET", key, value); } 285 /// Set the value of a key, only if the key does not exist 286 bool setNX(T)(string key, T value) if(isValidRedisValueType!T) { return request!bool("SETNX", key, value); } 287 /// Set the value of a key, only if the key already exists 288 bool setXX(T)(string key, T value) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "XX"); } 289 /// Set the value of a key, only if the key does not exist, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes. 290 bool setNX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "NX"); } 291 /// Set the value of a key, only if the key already exists, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes. 292 bool setXX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "XX"); } 293 /// Sets or clears the bit at offset in the string value stored at key 294 bool setBit(string key, long offset, bool value) { return request!bool("SETBIT", key, offset, value ? "1" : "0"); } 295 /// Set the value and expiration of a key 296 void setEX(T)(string key, long seconds, T value) if(isValidRedisValueType!T) { request("SETEX", key, seconds, value); } 297 /// Overwrite part of a string at key starting at the specified offset 298 long setRange(T)(string key, long offset, T value) if(isValidRedisValueType!T) { return request!long("SETRANGE", key, offset, value); } 299 /// Get the length of the value stored in a key 300 long strlen(string key) { return request!long("STRLEN", key); } 301 302 /* 303 Hashes 304 */ 305 /// Delete one or more hash fields 306 long hdel(string key, scope string[] fields...) { return request!long("HDEL", key, fields); } 307 /// Determine if a hash field exists 308 bool hexists(string key, string field) { return request!bool("HEXISTS", key, field); } 309 /// Set multiple hash fields to multiple values 310 void hset(T)(string key, string field, T value) if(isValidRedisValueType!T) { request("HSET", key, field, value); } 311 /// Set the value of a hash field, only if the field does not exist 312 bool hsetNX(T)(string key, string field, T value) if(isValidRedisValueType!T) { return request!bool("HSETNX", key, field, value); } 313 /// Get the value of a hash field. 314 T hget(T = string)(string key, string field) if(isValidRedisValueReturn!T) { return request!T("HGET", key, field); } 315 /// Get all the fields and values in a hash 316 RedisReply!T hgetAll(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HGETALL", key); } 317 /// Increment the integer value of a hash field 318 long hincr(string key, string field, long value=1) { return request!long("HINCRBY", key, field, value); } 319 /// Increment the real number value of a hash field 320 long hincr(string key, string field, double value) { return request!long("HINCRBYFLOAT", key, field, value); } 321 /// Get all the fields in a hash 322 RedisReply!T hkeys(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HKEYS", key); } 323 /// Get the number of fields in a hash 324 long hlen(string key) { return request!long("HLEN", key); } 325 /// Get the values of all the given hash fields 326 RedisReply!T hmget(T = string)(string key, scope string[] fields...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HMGET", key, fields); } 327 /// Set multiple hash fields to multiple values 328 void hmset(ARGS...)(string key, ARGS args) { request("HMSET", key, args); } 329 330 /// Get all the values in a hash 331 RedisReply!T hvals(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HVALS", key); } 332 333 /* 334 Lists 335 */ 336 /// Get an element from a list by its index 337 T lindex(T = string)(string key, long index) if(isValidRedisValueReturn!T) { return request!T("LINDEX", key, index); } 338 /// Insert value in the list stored at key before the reference value pivot. 339 long linsertBefore(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "BEFORE", pivot, value); } 340 /// Insert value in the list stored at key after the reference value pivot. 341 long linsertAfter(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "AFTER", pivot, value); } 342 /// Returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned. 343 long llen(string key) { return request!long("LLEN", key); } 344 /// Insert all the specified values at the head of the list stored at key. 345 long lpush(ARGS...)(string key, ARGS args) { return request!long("LPUSH", key, args); } 346 /// Inserts value at the head of the list stored at key, only if key already exists and holds a list. 347 long lpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("LPUSHX", key, value); } 348 /// Insert all the specified values at the tail of the list stored at key. 349 long rpush(ARGS...)(string key, ARGS args) { return request!long("RPUSH", key, args); } 350 /// Inserts value at the tail of the list stored at key, only if key already exists and holds a list. 351 long rpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("RPUSHX", key, value); } 352 /// Returns the specified elements of the list stored at key. 353 RedisReply!T lrange(T = string)(string key, long start, long stop) { return request!(RedisReply!T)("LRANGE", key, start, stop); } 354 /// Removes the first count occurrences of elements equal to value from the list stored at key. 355 long lrem(T)(string key, long count, T value) if(isValidRedisValueType!T) { return request!long("LREM", key, count, value); } 356 /// Sets the list element at index to value. 357 void lset(T)(string key, long index, T value) if(isValidRedisValueType!T) { request("LSET", key, index, value); } 358 /// Trim an existing list so that it will contain only the specified range of elements specified. 359 /// Equivalent to $(D range = range[start .. stop+1]) 360 void ltrim(string key, long start, long stop) { request("LTRIM", key, start, stop); } 361 /// Removes and returns the last element of the list stored at key. 362 T rpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("RPOP", key); } 363 /// Removes and returns the first element of the list stored at key. 364 T lpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("LPOP", key); } 365 /// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks 366 /// the connection when there are no elements to pop from any of the given lists. 367 Nullable!(Tuple!(string, T)) blpop(T = string)(string key, long seconds) if(isValidRedisValueReturn!T) 368 { 369 auto reply = request!(RedisReply!(ubyte[]))("BLPOP", key, seconds); 370 Nullable!(Tuple!(string, T)) ret; 371 if (reply.empty || reply.frontIsNull) return ret; 372 string rkey = reply.front.convertToType!string(); 373 reply.popFront(); 374 ret = tuple(rkey, reply.front.convertToType!T()); 375 return ret; 376 } 377 /// Atomically returns and removes the last element (tail) of the list stored at source, 378 /// and pushes the element at the first element (head) of the list stored at destination. 379 T rpoplpush(T = string)(string key, string destination) if(isValidRedisValueReturn!T) { return request!T("RPOPLPUSH", key, destination); } 380 381 /* 382 Sets 383 */ 384 /// Add the specified members to the set stored at key. Specified members that are already a member of this set are ignored. 385 /// If key does not exist, a new set is created before adding the specified members. 386 long sadd(ARGS...)(string key, ARGS args) { return request!long("SADD", key, args); } 387 /// Returns the set cardinality (number of elements) of the set stored at key. 388 long scard(string key) { return request!long("SCARD", key); } 389 /// Returns the members of the set resulting from the difference between the first set and all the successive sets. 390 RedisReply!T sdiff(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SDIFF", keys); } 391 /// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination. 392 /// If destination already exists, it is overwritten. 393 long sdiffStore(string destination, scope string[] keys...) { return request!long("SDIFFSTORE", destination, keys); } 394 /// Returns the members of the set resulting from the intersection of all the given sets. 395 RedisReply!T sinter(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SINTER", keys); } 396 /// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination. 397 /// If destination already exists, it is overwritten. 398 long sinterStore(string destination, scope string[] keys...) { return request!long("SINTERSTORE", destination, keys); } 399 /// Returns if member is a member of the set stored at key. 400 bool sisMember(T)(string key, T member) if(isValidRedisValueType!T) { return request!bool("SISMEMBER", key, member); } 401 /// Returns all the members of the set value stored at key. 402 RedisReply!T smembers(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SMEMBERS", key); } 403 /// Move member from the set at source to the set at destination. This operation is atomic. 404 /// In every given moment the element will appear to be a member of source or destination for other clients. 405 bool smove(T)(string source, string destination, T member) if(isValidRedisValueType!T) { return request!bool("SMOVE", source, destination, member); } 406 /// Removes and returns a random element from the set value stored at key. 407 T spop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SPOP", key ); } 408 /// Returns a random element from the set stored at key. 409 T srandMember(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SRANDMEMBER", key ); } 410 ///returns count random elements from the set stored at key 411 RedisReply!T srandMember(T = string)(string key, long count) if(isValidRedisValueReturn!T) { return request!(RedisReply!T)("SRANDMEMBER", key, count ); } 412 413 414 /// Remove the specified members from the set stored at key. 415 long srem(ARGS...)(string key, ARGS args) { return request!long("SREM", key, args); } 416 /// Returns the members of the set resulting from the union of all the given sets. 417 RedisReply!T sunion(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SUNION", keys); } 418 /// This command is equal to SUNION, but instead of returning the resulting set, it is stored in destination. 419 long sunionStore(scope string[] keys...) { return request!long("SUNIONSTORE", keys); } 420 421 /* 422 Sorted Sets 423 */ 424 /// Add one or more members to a sorted set, or update its score if it already exists 425 long zadd(ARGS...)(string key, ARGS args) { return request!long("ZADD", key, args); } 426 /// Returns the sorted set cardinality (number of elements) of the sorted set stored at key. 427 long zcard(string key) { return request!long("ZCARD", key); } 428 /// Returns the number of elements in the sorted set at key with a score between min and max 429 long zcount(string RNG = "[]")(string key, double min, double max) { return request!long("ZCOUNT", key, getMinMaxArgs!RNG(min, max)); } 430 /// Increments the score of member in the sorted set stored at key by increment. 431 double zincrby(T)(string key, double value, T member) if (isValidRedisValueType!T) { return request!double("ZINCRBY", key, value, member); } 432 //TODO: zinterstore 433 /// Returns the specified range of elements in the sorted set stored at key. 434 RedisReply!T zrange(T = string)(string key, long start, long end, bool with_scores = false) 435 if(isValidRedisValueType!T) 436 { 437 if (with_scores) return request!(RedisReply!T)("ZRANGE", key, start, end, "WITHSCORES"); 438 else return request!(RedisReply!T)("ZRANGE", key, start, end); 439 } 440 441 /// When all the elements in a sorted set are inserted with the same score, in order to force lexicographical ordering, 442 /// this command returns all the elements in the sorted set at key with a value between min and max. 443 RedisReply!T zrangeByLex(T = string)(string key, string min = "-", string max = "+", long offset = 0, long count = -1) 444 if(isValidRedisValueType!T) 445 { 446 if (offset > 0 || count != -1) return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max, "LIMIT", offset, count); 447 else return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max); 448 } 449 450 /// Returns all the elements in the sorted set at key with a score between start and end inclusively 451 RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, bool with_scores = false) 452 if(isValidRedisValueType!T) 453 { 454 if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES"); 455 else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end)); 456 } 457 458 /// Computes an internal list of elements in the sorted set at key with a score between start and end inclusively, 459 /// and returns a range subselection similar to $(D results[offset .. offset+count]) 460 RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, long offset, long count, bool with_scores = false) 461 if(isValidRedisValueType!T) 462 { 463 assert(offset >= 0); 464 assert(count >= 0); 465 if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES", "LIMIT", offset, count); 466 else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "LIMIT", offset, count); 467 } 468 469 /// Returns the rank of member in the sorted set stored at key, with the scores ordered from low to high. 470 long zrank(T)(string key, T member) 471 if (isValidRedisValueType!T) 472 { 473 auto str = request!string("ZRANK", key, member); 474 return str != "" ? parse!long(str) : -1; 475 } 476 477 /// Removes the specified members from the sorted set stored at key. 478 long zrem(ARGS...)(string key, ARGS members) { return request!long("ZREM", key, members); } 479 /// Removes all elements in the sorted set stored at key with rank between start and stop. 480 long zremRangeByRank(string key, long start, long stop) { return request!long("ZREMRANGEBYRANK", key, start, stop); } 481 /// Removes all elements in the sorted set stored at key with a score between min and max (inclusive). 482 long zremRangeByScore(string RNG = "[]")(string key, double min, double max) { return request!long("ZREMRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max));} 483 /// Returns the specified range of elements in the sorted set stored at key. 484 RedisReply!T zrevRange(T = string)(string key, long start, long end, bool with_scores = false) 485 if(isValidRedisValueType!T) 486 { 487 if (with_scores) return request!(RedisReply!T)("ZREVRANGE", key, start, end, "WITHSCORES"); 488 else return request!(RedisReply!T)("ZREVRANGE", key, start, end); 489 } 490 491 /// Returns all the elements in the sorted set at key with a score between max and min (including elements with score equal to max or min). 492 RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, bool with_scores=false) 493 if(isValidRedisValueType!T) 494 { 495 if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES"); 496 else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max)); 497 } 498 499 /// Computes an internal list of elements in the sorted set at key with a score between max and min, and 500 /// returns a window of elements selected in a way equivalent to $(D results[offset .. offset + count]) 501 RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, long offset, long count, bool with_scores=false) 502 if(isValidRedisValueType!T) 503 { 504 assert(offset >= 0); 505 assert(count >= 0); 506 if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES", "LIMIT", offset, count); 507 else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "LIMIT", offset, count); 508 } 509 510 /// Returns the rank of member in the sorted set stored at key, with the scores ordered from high to low. 511 long zrevRank(T)(string key, T member) 512 if (isValidRedisValueType!T) 513 { 514 auto str = request!string("ZREVRANK", key, member); 515 return str != "" ? parse!long(str) : -1; 516 } 517 518 /// Returns the score of member in the sorted set at key. 519 RedisReply!T zscore(T = string, U)(string key, U member) 520 if(isValidRedisValueType!T && isValidRedisValueType!U) 521 { 522 return request!(RedisReply!T)("ZSCORE", key, member); 523 } 524 525 /* 526 Hyperloglog 527 */ 528 529 /// Adds one or more Keys to a HyperLogLog data structure . 530 long pfadd(ARGS...)(string key, ARGS args) { return request!long("PFADD", key, args); } 531 532 /** Returns the approximated cardinality computed by the HyperLogLog data 533 structure stored at the specified key. 534 535 When called with a single key, returns the approximated cardinality 536 computed by the HyperLogLog data structure stored at the specified 537 variable, which is 0 if the variable does not exist. 538 539 When called with multiple keys, returns the approximated cardinality 540 of the union of the HyperLogLogs passed, by internally merging the 541 HyperLogLogs stored at the provided keys into a temporary HyperLogLog. 542 */ 543 long pfcount(scope string[] keys...) { return request!long("PFCOUNT", keys); } 544 545 /// Merge multiple HyperLogLog values into a new one. 546 void pfmerge(ARGS...)(string destkey, ARGS args) { request("PFMERGE", destkey, args); } 547 548 549 //TODO: zunionstore 550 551 /* 552 Pub / Sub 553 */ 554 555 /// Publishes a message to all clients subscribed at the channel 556 long publish(string channel, string message) 557 { 558 auto str = request!string("PUBLISH", channel, message); 559 return str != "" ? parse!long(str) : -1; 560 } 561 562 /// Inspect the state of the Pub/Sub subsystem 563 RedisReply!T pubsub(T = string)(string subcommand, scope string[] args...) 564 if(isValidRedisValueType!T) 565 { 566 return request!(RedisReply!T)("PUBSUB", subcommand, args); 567 } 568 569 /* 570 TODO: Transactions 571 */ 572 /// Return the number of keys in the selected database 573 long dbSize() { return request!long("DBSIZE"); } 574 575 /* 576 LUA Scripts 577 */ 578 /// Execute a Lua script server side 579 RedisReply!T eval(T = string, ARGS...)(string lua_code, scope string[] keys, scope ARGS args) 580 if(isValidRedisValueType!T) 581 { 582 return request!(RedisReply!T)("EVAL", lua_code, keys.length, keys, args); 583 } 584 /// Evaluates a script cached on the server side by its SHA1 digest. Scripts are cached on the server side using the scriptLoad function. 585 RedisReply!T evalSHA(T = string, ARGS...)(string sha, scope string[] keys, scope ARGS args) 586 if(isValidRedisValueType!T) 587 { 588 return request!(RedisReply!T)("EVALSHA", sha, keys.length, keys, args); 589 } 590 591 //scriptExists 592 //scriptFlush 593 //scriptKill 594 595 /// Load a script into the scripts cache, without executing it. Run it using evalSHA. 596 string scriptLoad(string lua_code) { return request!string("SCRIPT", "LOAD", lua_code); } 597 598 /// Run the specified command and arguments in the Redis database server 599 T request(T = void, ARGS...)(string command, scope ARGS args) 600 { 601 return m_client.requestDB!(T, ARGS)(m_index, command, args); 602 } 603 604 private static string[2] getMinMaxArgs(string RNG)(double min, double max) 605 { 606 // TODO: avoid GC allocations 607 static assert(RNG.length == 2, "The RNG range specification must be two characters long"); 608 609 string[2] ret; 610 string mins, maxs; 611 mins = min == -double.infinity ? "-inf" : min == double.infinity ? "+inf" : format(typeFormatString!double, min); 612 maxs = max == -double.infinity ? "-inf" : max == double.infinity ? "+inf" : format(typeFormatString!double, max); 613 614 static if (RNG[0] == '[') ret[0] = mins; 615 else static if (RNG[0] == '(') ret[0] = '('~mins; 616 else static assert(false, "Opening range specification mist be either '[' or '('."); 617 618 static if (RNG[1] == ']') ret[1] = maxs; 619 else static if (RNG[1] == ')') ret[1] = '('~maxs; 620 else static assert(false, "Closing range specification mist be either ']' or ')'."); 621 622 return ret; 623 } 624 } 625 626 627 /** 628 A redis subscription listener 629 */ 630 import std.datetime; 631 import std.variant; 632 import std.typecons : Tuple, tuple; 633 import std.container : Array; 634 import std.algorithm : canFind; 635 import std.range : takeOne; 636 import std.array : array; 637 638 import vibe.core.concurrency; 639 import vibe.core.sync; 640 641 alias RedisSubscriber = FreeListRef!RedisSubscriberImpl; 642 643 final class RedisSubscriberImpl { 644 private { 645 RedisClient m_client; 646 LockedConnection!RedisConnection m_lockedConnection; 647 bool[string] m_subscriptions; 648 string[] m_pendingSubscriptions; 649 bool m_listening; 650 bool m_stop; 651 Task m_listener; 652 Task m_listenerHelper; 653 Task m_waiter; 654 InterruptibleRecursiveTaskMutex m_mutex; 655 InterruptibleTaskMutex m_connMutex; 656 } 657 658 private enum Action { 659 DATA, 660 STOP, 661 STARTED, 662 SUBSCRIBE, 663 UNSUBSCRIBE 664 } 665 666 @property bool isListening() const { 667 return m_listening; 668 } 669 670 /// Get a list of channels with active subscriptions 671 @property string[] subscriptions() const { 672 return () @trusted { return m_subscriptions.keys; } (); 673 } 674 675 bool hasSubscription(string channel) const { 676 return (channel in m_subscriptions) !is null && m_subscriptions[channel]; 677 } 678 679 this(RedisClient client) { 680 681 logTrace("this()"); 682 m_client = client; 683 m_mutex = new InterruptibleRecursiveTaskMutex; 684 m_connMutex = new InterruptibleTaskMutex; 685 } 686 687 ~this() { 688 logTrace("~this"); 689 bstop(); 690 } 691 692 /// Stop listening and yield until the operation is complete. 693 void bstop(){ 694 logTrace("bstop"); 695 if (!m_listening) return; 696 697 void impl() @safe { 698 m_mutex.performLocked!({ 699 m_waiter = Task.getThis(); 700 scope(exit) m_waiter = Task(); 701 stop(); 702 703 bool stopped; 704 do { 705 if (!() @trusted { return receiveTimeout(3.seconds, (Action act) { if (act == Action.STOP) stopped = true; }); } ()) 706 break; 707 } while (!stopped); 708 709 enforce(stopped, "Failed to wait for Redis listener to stop"); 710 }); 711 } 712 inTask(&impl); 713 } 714 715 /// Stop listening asynchroneously 716 void stop(){ 717 logTrace("stop"); 718 if (!m_listening) 719 return; 720 721 void impl() @safe { 722 m_mutex.performLocked!({ 723 m_stop = true; 724 () @trusted { m_listener.send(Action.STOP); } (); 725 // send a message to wake up the listenerHelper from the reply 726 if (m_subscriptions.length > 0) { 727 m_connMutex.performLocked!(() { 728 _request_void(m_lockedConnection, "UNSUBSCRIBE", () @trusted { return cast(string[]) m_subscriptions.keys.takeOne.array; } ()); 729 }); 730 sleep(30.msecs); 731 } 732 }); 733 } 734 inTask(&impl); 735 } 736 737 private bool hasNewSubscriptionIn(scope string[] args) { 738 bool has_new; 739 foreach (arg; args) 740 if (!hasSubscription(arg)) 741 has_new = true; 742 if (!has_new) 743 return false; 744 745 return true; 746 } 747 748 private bool anySubscribed(scope string[] args) { 749 750 bool any_subscribed; 751 foreach (arg ; args) { 752 if (hasSubscription(arg)) 753 any_subscribed = true; 754 } 755 return any_subscribed; 756 } 757 758 /// Completes the subscription for a listener to start receiving pubsub messages 759 /// on the corresponding channel(s). Returns instantly if already subscribed. 760 /// If a connection error is thrown here, it stops the listener. 761 void subscribe(scope string[] args...) 762 { 763 logTrace("subscribe"); 764 if (!m_listening) { 765 foreach (arg; args) 766 m_pendingSubscriptions ~= arg; 767 return; 768 } 769 770 if (!hasNewSubscriptionIn(args)) 771 return; 772 773 void impl() @safe { 774 775 scope(failure) { logTrace("Failure"); bstop(); } 776 try { 777 m_mutex.performLocked!({ 778 m_waiter = Task.getThis(); 779 scope(exit) m_waiter = Task(); 780 bool subscribed; 781 m_connMutex.performLocked!({ 782 _request_void(m_lockedConnection, "SUBSCRIBE", args); 783 }); 784 while(!() @trusted { return m_subscriptions.byKey.canFind(args); } ()) { 785 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE); }); } ()) 786 break; 787 788 subscribed = true; 789 } 790 debug { 791 auto keys = () @trusted { return m_subscriptions.keys; } (); 792 logTrace("Can find keys?: %s", keys.canFind(args)); 793 logTrace("Subscriptions: %s", keys); 794 } 795 enforce(subscribed, "Could not complete subscription(s)."); 796 }); 797 } catch (Exception e) { 798 logDebug("Redis subscribe() failed: ", e.msg); 799 } 800 } 801 inTask(&impl); 802 } 803 804 /// Unsubscribes from the channel(s) specified, returns immediately if none 805 /// is currently being listened. 806 /// If a connection error is thrown here, it stops the listener. 807 void unsubscribe(scope string[] args...) 808 { 809 logTrace("unsubscribe"); 810 811 void impl() @safe { 812 813 if (!anySubscribed(args)) 814 return; 815 816 scope(failure) bstop(); 817 assert(m_listening); 818 819 m_mutex.performLocked!({ 820 m_waiter = Task.getThis(); 821 scope(exit) m_waiter = Task(); 822 bool unsubscribed; 823 m_connMutex.performLocked!({ 824 _request_void(m_lockedConnection, "UNSUBSCRIBE", args); 825 }); 826 while(() @trusted { return m_subscriptions.byKey.canFind(args); } ()) { 827 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE); }); } ()) { 828 unsubscribed = false; 829 break; 830 } 831 unsubscribed = true; 832 } 833 debug { 834 auto keys = () @trusted { return m_subscriptions.keys; } (); 835 logTrace("Can find keys?: %s", keys.canFind(args)); 836 logTrace("Subscriptions: %s", keys); 837 } 838 enforce(unsubscribed, "Could not complete unsubscription(s)."); 839 }); 840 } 841 inTask(&impl); 842 } 843 844 /// Same as subscribe, but uses glob patterns, and does not return instantly if 845 /// the subscriptions are already registered. 846 /// throws Exception if the pattern does not yield a new subscription. 847 void psubscribe(scope string[] args...) 848 { 849 logTrace("psubscribe"); 850 void impl() @safe { 851 scope(failure) bstop(); 852 assert(m_listening); 853 m_mutex.performLocked!({ 854 m_waiter = Task.getThis(); 855 scope(exit) m_waiter = Task(); 856 bool subscribed; 857 m_connMutex.performLocked!({ 858 _request_void(m_lockedConnection, "PSUBSCRIBE", args); 859 }); 860 861 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE); }); } ()) 862 subscribed = false; 863 else 864 subscribed = true; 865 866 debug logTrace("Subscriptions: %s", () @trusted { return m_subscriptions.keys; } ()); 867 enforce(subscribed, "Could not complete subscription(s)."); 868 }); 869 } 870 inTask(&impl); 871 } 872 873 /// Same as unsubscribe, but uses glob patterns, and does not return instantly if 874 /// the subscriptions are not registered. 875 /// throws Exception if the pattern does not yield a new unsubscription. 876 void punsubscribe(scope string[] args...) 877 { 878 logTrace("punsubscribe"); 879 void impl() @safe { 880 scope(failure) bstop(); 881 assert(m_listening); 882 m_mutex.performLocked!({ 883 m_waiter = Task.getThis(); 884 scope(exit) m_waiter = Task(); 885 bool unsubscribed; 886 m_connMutex.performLocked!({ 887 _request_void(m_lockedConnection, "PUNSUBSCRIBE", args); 888 }); 889 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE); }); } ()) 890 unsubscribed = false; 891 else 892 unsubscribed = true; 893 894 debug { 895 auto keys = () @trusted { return m_subscriptions.keys; } (); 896 logTrace("Can find keys?: %s", keys.canFind(args)); 897 logTrace("Subscriptions: %s", keys); 898 } 899 enforce(unsubscribed, "Could not complete unsubscription(s)."); 900 }); 901 } 902 inTask(&impl); 903 } 904 905 private void inTask(scope void delegate() @safe impl) { 906 logTrace("inTask"); 907 if (Task.getThis() == Task()) 908 { 909 Throwable ex; 910 bool done; 911 Task task = runTask({ 912 logDebug("inTask %s", Task.getThis()); 913 try impl(); 914 catch (Exception e) { 915 ex = e; 916 } 917 done = true; 918 }); 919 task.join(); 920 logDebug("done"); 921 if (ex) 922 throw ex; 923 } 924 else 925 impl(); 926 } 927 928 private void init(){ 929 930 logTrace("init"); 931 if (!m_lockedConnection) { 932 m_lockedConnection = m_client.m_connections.lockConnection(); 933 m_lockedConnection.setAuth(m_client.m_authPassword); 934 m_lockedConnection.setDB(m_client.m_selectedDB); 935 } 936 937 if (!m_lockedConnection.conn || !m_lockedConnection.conn.connected) { 938 try m_lockedConnection.conn = connectTCP(m_lockedConnection.m_host, m_lockedConnection.m_port); 939 catch (Exception e) { 940 throw new Exception(format("Failed to connect to Redis server at %s:%s.", m_lockedConnection.m_host, m_lockedConnection.m_port), __FILE__, __LINE__, e); 941 } 942 m_lockedConnection.conn.tcpNoDelay = true; 943 m_lockedConnection.setAuth(m_client.m_authPassword); 944 m_lockedConnection.setDB(m_client.m_selectedDB); 945 } 946 } 947 948 // Same as listen, but blocking 949 void blisten(void delegate(string, string) @safe onMessage, Duration timeout = 0.seconds) 950 { 951 init(); 952 953 void onSubscribe(string channel) @safe { 954 logTrace("Callback subscribe(%s)", channel); 955 m_subscriptions[channel] = true; 956 if (m_waiter != Task()) 957 () @trusted { m_waiter.send(Action.SUBSCRIBE); } (); 958 } 959 960 void onUnsubscribe(string channel) @safe { 961 logTrace("Callback unsubscribe(%s)", channel); 962 m_subscriptions.remove(channel); 963 if (m_waiter != Task()) 964 () @trusted { m_waiter.send(Action.UNSUBSCRIBE); } (); 965 } 966 967 void teardown() @safe { // teardown 968 logTrace("Redis listener exiting"); 969 // More publish commands may be sent to this connection after recycling it, so we 970 // actively destroy it 971 Action act; 972 // wait for the listener helper to send its stop message 973 while (act != Action.STOP) 974 act = () @trusted { return receiveOnly!Action(); } (); 975 m_lockedConnection.conn.close(); 976 m_lockedConnection.destroy(); 977 m_listening = false; 978 return; 979 } 980 // http://redis.io/topics/pubsub 981 /** 982 SUBSCRIBE first second 983 *3 984 $9 985 subscribe 986 $5 987 first 988 :1 989 *3 990 $9 991 subscribe 992 $6 993 second 994 :2 995 */ 996 // This is a simple parser/handler for subscribe/unsubscribe/publish 997 // commands sent by redis. The PubSub client protocol is simple enough 998 999 void pubsub_handler() { 1000 TCPConnection conn = m_lockedConnection.conn; 1001 logTrace("Pubsub handler"); 1002 void dropCRLF() @safe { 1003 ubyte[2] crlf; 1004 conn.read(crlf); 1005 } 1006 size_t readArgs() @safe { 1007 char[8] ucnt; 1008 ubyte[1] num; 1009 size_t i; 1010 do { 1011 conn.read(num); 1012 if (num[0] >= 48 && num[0] <= 57) 1013 ucnt[i] = num[0]; 1014 else break; 1015 i++; 1016 } 1017 while (true); // ascii 1018 ubyte[1] b; 1019 conn.read(b); 1020 logTrace("Found %s", ucnt); 1021 // the new line is consumed when num is not in range. 1022 return ucnt[0 .. i].to!size_t; 1023 } 1024 // find the number of arguments in the array 1025 ubyte[1] symbol; 1026 conn.read(symbol); 1027 enforce(symbol[0] == '*', "Expected '*', got '" ~ symbol.to!string ~ "'"); 1028 size_t args = readArgs(); 1029 // get the number of characters in the first string (the command) 1030 conn.read(symbol); 1031 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1032 size_t cnt = readArgs(); 1033 ubyte[] cmd = () @trusted { return theAllocator.makeArray!ubyte(cnt); } (); 1034 scope(exit) () @trusted { theAllocator.dispose(cmd); } (); 1035 conn.read(cmd); 1036 dropCRLF(); 1037 // find the channel 1038 conn.read(symbol); 1039 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1040 cnt = readArgs(); 1041 ubyte[] str = new ubyte[cnt]; 1042 conn.read(str); 1043 dropCRLF(); 1044 string channel = () @trusted { return cast(string)str; } (); 1045 logTrace("chan: %s", channel); 1046 1047 if (cmd == "message") { // find the message 1048 conn.read(symbol); 1049 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1050 cnt = readArgs(); 1051 str = new ubyte[cnt]; 1052 conn.read(str); // channel 1053 string message = () @trusted { return cast(string)str.idup; } (); 1054 logTrace("msg: %s", message); 1055 dropCRLF(); 1056 onMessage(channel, message); 1057 } 1058 else if (cmd == "subscribe" || cmd == "unsubscribe") { // find the remaining subscriptions 1059 bool is_subscribe = (cmd == "subscribe"); 1060 conn.read(symbol); 1061 enforce(symbol[0] == ':', "Expected ':', got '" ~ symbol.to!string ~ "'"); 1062 cnt = readArgs(); // number of subscriptions 1063 logTrace("subscriptions: %d", cnt); 1064 if (is_subscribe) 1065 onSubscribe(channel); 1066 else 1067 onUnsubscribe(channel); 1068 1069 // todo: enforce the number of subscriptions? 1070 } 1071 else assert(false, "Unrecognized pubsub wire protocol command received"); 1072 } 1073 1074 // Waits for data and advises the handler 1075 m_listenerHelper = runTask( { 1076 while(true) { 1077 if (!m_stop && m_lockedConnection.conn && m_lockedConnection.conn.waitForData(100.msecs)) { 1078 // We check every 5 seconds if this task should stay active 1079 if (m_stop) break; 1080 else if (m_lockedConnection.conn && !m_lockedConnection.conn.dataAvailableForRead) continue; 1081 // Data has arrived, this task is in charge of notifying the main handler loop 1082 logTrace("Notify data arrival"); 1083 1084 () @trusted { receiveTimeout(0.seconds, (Variant v) {}); } (); // clear message queue 1085 () @trusted { m_listener.send(Action.DATA); } (); 1086 if (!() @trusted { return receiveTimeout(5.seconds, (Action act) { assert(act == Action.DATA); }); } ()) 1087 assert(false); 1088 1089 } else if (m_stop || !m_lockedConnection.conn) break; 1090 logTrace("No data arrival in 100 ms..."); 1091 } 1092 logTrace("Listener Helper exit."); 1093 () @trusted { m_listener.send(Action.STOP); } (); 1094 } ); 1095 1096 m_listening = true; 1097 logTrace("Redis listener now listening"); 1098 if (m_waiter != Task()) 1099 () @trusted { m_waiter.send(Action.STARTED); } (); 1100 1101 if (timeout == 0.seconds) 1102 timeout = 365.days; // make sure 0.seconds is considered as big. 1103 1104 scope(exit) { 1105 logTrace("Redis Listener exit."); 1106 if (!m_stop) { 1107 stop(); // notifies the listenerHelper 1108 } 1109 m_listenerHelper.join(); 1110 // close the data connections 1111 teardown(); 1112 1113 if (m_waiter != Task()) 1114 () @trusted { m_waiter.send(Action.STOP); } (); 1115 1116 m_listenerHelper = Task(); 1117 m_listener = Task(); 1118 m_stop = false; 1119 } 1120 1121 // Start waiting for data notifications to arrive 1122 while(true) { 1123 1124 auto handler = (Action act) { 1125 if (act == Action.STOP) m_stop = true; 1126 if (m_stop) return; 1127 logTrace("Calling PubSub Handler"); 1128 m_connMutex.performLocked!({ 1129 pubsub_handler(); // handles one command at a time 1130 }); 1131 () @trusted { m_listenerHelper.send(Action.DATA); } (); 1132 }; 1133 1134 if (!() @trusted { return receiveTimeout(timeout, handler); } () || m_stop) { 1135 logTrace("Redis Listener stopped"); 1136 break; 1137 } 1138 1139 } 1140 1141 } 1142 /// ditto 1143 deprecated("Use an @safe message callback") 1144 void blisten(void delegate(string, string) @system onMessage, Duration timeout = 0.seconds) 1145 { 1146 blisten((string ch, string msg) @trusted => onMessage(ch, msg)); 1147 } 1148 1149 /// Waits for messages and calls the callback with the channel and the message as arguments. 1150 /// The timeout is passed over to the listener, which closes after the period of inactivity. 1151 /// Use 0.seconds timeout to specify a very long time (365 days) 1152 /// Errors will be sent to Callback Delegate on channel "Error". 1153 Task listen(void delegate(string, string) @safe callback, Duration timeout = 0.seconds) 1154 { 1155 logTrace("Listen"); 1156 void impl() @safe { 1157 logTrace("Listen"); 1158 m_waiter = Task.getThis(); 1159 scope(exit) m_waiter = Task(); 1160 Throwable ex; 1161 m_listener = runTask({ 1162 try blisten(callback, timeout); 1163 catch(Exception e) { 1164 ex = e; 1165 if (m_waiter != Task() && !m_listening) { 1166 () @trusted { m_waiter.send(Action.STARTED); } (); 1167 return; 1168 } 1169 callback("Error", e.msg); 1170 } 1171 }); 1172 m_mutex.performLocked!({ 1173 import std.datetime : usecs; 1174 () @trusted { receiveTimeout(2.seconds, (Action act) { assert( act == Action.STARTED); }); } (); 1175 if (ex) throw ex; 1176 enforce(m_listening, "Failed to start listening, timeout of 2 seconds expired"); 1177 }); 1178 1179 foreach (channel; m_pendingSubscriptions) { 1180 subscribe(channel); 1181 } 1182 1183 m_pendingSubscriptions = null; 1184 } 1185 inTask(&impl); 1186 return m_listener; 1187 } 1188 /// ditto 1189 deprecated("Use an @safe message callback") 1190 Task listen(void delegate(string, string) @system onMessage, Duration timeout = 0.seconds) 1191 { 1192 return listen((string ch, string msg) @trusted => onMessage(ch, msg)); 1193 } 1194 } 1195 1196 1197 1198 /** Range interface to a single Redis reply. 1199 */ 1200 struct RedisReply(T = ubyte[]) { 1201 static assert(isInputRange!RedisReply); 1202 1203 private { 1204 uint m_magic = 0x15f67ab3; 1205 RedisConnection m_conn; 1206 LockedConnection!RedisConnection m_lockedConnection; 1207 } 1208 1209 alias ElementType = T; 1210 1211 private this(RedisConnection conn) 1212 { 1213 m_conn = conn; 1214 auto ctx = &conn.m_replyContext; 1215 assert(ctx.refCount == 0); 1216 *ctx = RedisReplyContext.init; 1217 ctx.refCount++; 1218 } 1219 1220 this(this) 1221 { 1222 assert(m_magic == 0x15f67ab3); 1223 if (m_conn) { 1224 auto ctx = &m_conn.m_replyContext; 1225 assert(ctx.refCount > 0); 1226 ctx.refCount++; 1227 } 1228 } 1229 1230 ~this() 1231 { 1232 assert(m_magic == 0x15f67ab3); 1233 if (m_conn) { 1234 if (!--m_conn.m_replyContext.refCount) 1235 drop(); 1236 } 1237 } 1238 1239 @property bool empty() const { return !m_conn || m_conn.m_replyContext.index >= m_conn.m_replyContext.length; } 1240 1241 /** Returns the current element of the reply. 1242 1243 Note that byte and character arrays may be returned as slices to a 1244 temporary buffer. This buffer will be invalidated on the next call to 1245 $(D popFront), so it needs to be duplicated for permanent storage. 1246 */ 1247 @property T front() 1248 { 1249 assert(!empty, "Accessing the front of an empty RedisReply!"); 1250 auto ctx = &m_conn.m_replyContext; 1251 if (!ctx.hasData) readData(); 1252 1253 ubyte[] ret = ctx.data; 1254 1255 return convertToType!T(ret); 1256 } 1257 1258 @property bool frontIsNull() 1259 const { 1260 assert(!empty, "Accessing the front of an empty RedisReply!"); 1261 return m_conn.m_replyContext.frontIsNull; 1262 } 1263 1264 /** Pops the current element of the reply 1265 */ 1266 void popFront() 1267 { 1268 assert(!empty, "Popping the front of an empty RedisReply!"); 1269 1270 auto ctx = &m_conn.m_replyContext; 1271 1272 if (!ctx.hasData) readData(); // ensure that we actually read the data entry from the wire 1273 clearData(); 1274 ctx.index++; 1275 1276 if (ctx.index >= ctx.length && ctx.refCount == 1) { 1277 ctx.refCount = 0; 1278 m_conn = null; 1279 m_lockedConnection.destroy(); 1280 } 1281 } 1282 1283 1284 /// Legacy property for hasNext/next based iteration 1285 @property bool hasNext() const { return !empty; } 1286 1287 /// Legacy property for hasNext/next based iteration 1288 TN next(TN : E[], E)() 1289 { 1290 assert(hasNext, "end of reply"); 1291 1292 auto ret = front.dup; 1293 popFront(); 1294 return () @trusted { return cast(TN)ret; } (); 1295 } 1296 1297 void drop() 1298 { 1299 if (!m_conn) return; 1300 while (!empty) popFront(); 1301 } 1302 1303 private void readData() 1304 { 1305 auto ctx = &m_conn.m_replyContext; 1306 assert(!ctx.hasData && ctx.initialized); 1307 1308 if (ctx.multi) 1309 readBulk(() @trusted { return cast(string)m_conn.conn.readLine(); } ()); 1310 } 1311 1312 private void clearData() 1313 { 1314 auto ctx = &m_conn.m_replyContext; 1315 ctx.data = null; 1316 ctx.hasData = false; 1317 } 1318 1319 private @property void lockedConnection(ref LockedConnection!RedisConnection conn) 1320 { 1321 assert(m_conn !is null); 1322 m_lockedConnection = conn; 1323 } 1324 1325 private void initialize() 1326 { 1327 assert(m_conn !is null); 1328 auto ctx = &m_conn.m_replyContext; 1329 assert(!ctx.initialized); 1330 ctx.initialized = true; 1331 1332 ubyte[] ln = m_conn.conn.readLine(); 1333 1334 switch (ln[0]) { 1335 default: 1336 m_conn.conn.close(); 1337 throw new Exception(format("Unknown reply type: %s", cast(char)ln[0])); 1338 case '+': ctx.data = ln[1 .. $]; ctx.hasData = true; break; 1339 case '-': throw new Exception(() @trusted { return cast(string)ln[1 .. $]; } ()); 1340 case ':': ctx.data = ln[1 .. $]; ctx.hasData = true; break; 1341 case '$': 1342 readBulk(() @trusted { return cast(string)ln; } ()); 1343 break; 1344 case '*': 1345 if (ln.startsWith(cast(const(ubyte)[])"*-1")) { 1346 ctx.length = 0; // TODO: make this NIL reply distinguishable from a 0-length array 1347 } else { 1348 ctx.multi = true; 1349 scope (failure) m_conn.conn.close(); 1350 ctx.length = to!long(() @trusted { return cast(string)ln[1 .. $]; } ()); 1351 } 1352 break; 1353 } 1354 } 1355 1356 private void readBulk(string sizeLn) 1357 { 1358 assert(m_conn !is null); 1359 auto ctx = &m_conn.m_replyContext; 1360 if (sizeLn.startsWith("$-1")) { 1361 ctx.frontIsNull = true; 1362 ctx.hasData = true; 1363 ctx.data = null; 1364 } else { 1365 auto size = to!size_t(sizeLn[1 .. $]); 1366 auto data = new ubyte[size]; 1367 m_conn.conn.read(data); 1368 m_conn.conn.readLine(); 1369 ctx.frontIsNull = false; 1370 ctx.hasData = true; 1371 ctx.data = data; 1372 } 1373 } 1374 } 1375 1376 class RedisProtocolException : Exception { 1377 this(string message, string file = __FILE__, size_t line = __LINE__, Exception next = null) 1378 { 1379 super(message, file, line, next); 1380 } 1381 } 1382 1383 template isValidRedisValueReturn(T) 1384 { 1385 import std.typecons; 1386 static if (isInstanceOf!(Nullable, T)) { 1387 enum isValidRedisValueReturn = isValidRedisValueType!(typeof(T.init.get())); 1388 } else static if (isInstanceOf!(RedisReply, T)) { 1389 enum isValidRedisValueReturn = isValidRedisValueType!(T.ElementType); 1390 } else enum isValidRedisValueReturn = isValidRedisValueType!T; 1391 } 1392 1393 template isValidRedisValueType(T) 1394 { 1395 enum isValidRedisValueType = is(T : const(char)[]) || is(T : const(ubyte)[]) || is(T == long) || is(T == double) || is(T == bool); 1396 } 1397 1398 private RedisReply!T getReply(T = ubyte)(RedisConnection conn) 1399 { 1400 auto repl = RedisReply!T(conn); 1401 repl.initialize(); 1402 return repl; 1403 } 1404 1405 private struct RedisReplyContext { 1406 long refCount = 0; 1407 ubyte[] data; 1408 bool hasData; 1409 bool multi = false; 1410 bool initialized = false; 1411 bool frontIsNull = false; 1412 long length = 1; 1413 long index = 0; 1414 ubyte[128] dataBuffer; 1415 } 1416 1417 private final class RedisConnection { 1418 private { 1419 string m_host; 1420 ushort m_port; 1421 TCPConnection m_conn; 1422 string m_password; 1423 long m_selectedDB; 1424 RedisReplyContext m_replyContext; 1425 } 1426 1427 this(string host, ushort port) 1428 { 1429 m_host = host; 1430 m_port = port; 1431 } 1432 1433 @property TCPConnection conn() { return m_conn; } 1434 @property void conn(TCPConnection conn) { m_conn = conn; } 1435 1436 void setAuth(string password) 1437 { 1438 if (m_password == password) return; 1439 _request_reply(this, "AUTH", password); 1440 m_password = password; 1441 } 1442 1443 void setDB(long index) 1444 { 1445 if (index == m_selectedDB) return; 1446 _request_reply(this, "SELECT", index); 1447 m_selectedDB = index; 1448 } 1449 1450 private static long countArgs(ARGS...)(scope ARGS args) 1451 { 1452 long ret = 0; 1453 foreach (i, A; ARGS) { 1454 static if (isArray!A && !(is(A : const(ubyte[])) || is(A : const(char[])))) { 1455 foreach (arg; args[i]) 1456 ret += countArgs(arg); 1457 } else ret++; 1458 } 1459 return ret; 1460 } 1461 1462 unittest { 1463 assert(countArgs() == 0); 1464 assert(countArgs(1, 2, 3) == 3); 1465 assert(countArgs("1", ["2", "3", "4"]) == 4); 1466 assert(countArgs([["1", "2"], ["3"]]) == 3); 1467 } 1468 1469 private static void writeArgs(R, ARGS...)(R dst, scope ARGS args) 1470 if (isOutputRange!(R, char)) 1471 { 1472 foreach (i, A; ARGS) { 1473 static if (is(A == bool)) { 1474 writeArgs(dst, args[i] ? "1" : "0"); 1475 } else static if (is(A : long) || is(A : real) || is(A == string)) { 1476 auto alen = formattedLength(args[i]); 1477 enum fmt = "$%d\r\n"~typeFormatString!A~"\r\n"; 1478 dst.formattedWrite(fmt, alen, args[i]); 1479 } else static if (is(A : const(ubyte[])) || is(A : const(char[]))) { 1480 dst.formattedWrite("$%s\r\n", args[i].length); 1481 dst.put(args[i]); 1482 dst.put("\r\n"); 1483 } else static if (isArray!A) { 1484 foreach (arg; args[i]) 1485 writeArgs(dst, arg); 1486 } else static assert(false, "Unsupported Redis argument type: " ~ A.stringof); 1487 } 1488 } 1489 1490 unittest { 1491 import std.array : appender; 1492 auto dst = appender!string; 1493 writeArgs(dst, false, true, ["2", "3"], "4", 5.0); 1494 assert(dst.data == "$1\r\n0\r\n$1\r\n1\r\n$1\r\n2\r\n$1\r\n3\r\n$1\r\n4\r\n$1\r\n5\r\n"); 1495 } 1496 1497 private static long formattedLength(ARG)(scope ARG arg) 1498 { 1499 static if (is(ARG == string)) return arg.length; 1500 else { 1501 import vibe.internal.rangeutil; 1502 long length; 1503 auto rangeCnt = RangeCounter(() @trusted { return &length; } ()); 1504 rangeCnt.formattedWrite(typeFormatString!ARG, arg); 1505 return length; 1506 } 1507 } 1508 } 1509 1510 private void _request_void(ARGS...)(RedisConnection conn, string command, scope ARGS args) 1511 { 1512 import vibe.stream.wrapper; 1513 1514 if (!conn.conn || !conn.conn.connected) { 1515 try conn.conn = connectTCP(conn.m_host, conn.m_port); 1516 catch (Exception e) { 1517 throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e); 1518 } 1519 conn.conn.tcpNoDelay = true; 1520 } 1521 1522 auto nargs = conn.countArgs(args); 1523 auto rng = streamOutputRange(conn.conn); 1524 formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command); 1525 RedisConnection.writeArgs(() @trusted { return &rng; } (), args); 1526 } 1527 1528 private RedisReply!T _request_reply(T = ubyte[], ARGS...)(RedisConnection conn, string command, scope ARGS args) 1529 { 1530 import vibe.stream.wrapper; 1531 1532 if (!conn.conn || !conn.conn.connected) { 1533 try conn.conn = connectTCP(conn.m_host, conn.m_port); 1534 catch (Exception e) { 1535 throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e); 1536 } 1537 conn.conn.tcpNoDelay = true; 1538 } 1539 1540 auto nargs = conn.countArgs(args); 1541 auto rng = streamOutputRange(conn.conn); 1542 formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command); 1543 RedisConnection.writeArgs(() @trusted { return &rng; } (), args); 1544 rng.flush(); 1545 1546 return conn.getReply!T; 1547 } 1548 1549 private T _request(T, ARGS...)(LockedConnection!RedisConnection conn, string command, scope ARGS args) 1550 { 1551 import std.typecons; 1552 static if (isInstanceOf!(RedisReply, T)) { 1553 auto reply = _request_reply!(T.ElementType)(conn, command, args); 1554 reply.lockedConnection = conn; 1555 return reply; 1556 } else static if (is(T == void)) { 1557 _request_reply(conn, command, args); 1558 } else static if (isInstanceOf!(Nullable, T)) { 1559 alias TB = typeof(T.init.get()); 1560 auto reply = _request_reply!TB(conn, command, args); 1561 T ret; 1562 if (!reply.frontIsNull) ret = reply.front; 1563 return ret; 1564 } else { 1565 auto reply = _request_reply!T(conn, command, args); 1566 return reply.front; 1567 } 1568 } 1569 1570 private T convertToType(T)(ubyte[] data) /// NOTE: data must be unique! 1571 { 1572 static if (isSomeString!T) () @trusted { validate(cast(T)data); } (); 1573 1574 static if (is(T == ubyte[])) return data; 1575 else static if (is(T == string)) return cast(T)data.idup; 1576 else static if (is(T == bool)) return data[0] == '1'; 1577 else static if (is(T == int) || is(T == long) || is(T == size_t) || is(T == double)) { 1578 auto str = () @trusted { return cast(string)data; } (); 1579 return parse!T(str); 1580 } 1581 else static assert(false, "Unsupported Redis reply type: " ~ T.stringof); 1582 } 1583 1584 private template typeFormatString(T) 1585 { 1586 static if (isFloatingPoint!T) enum typeFormatString = "%.16g"; 1587 else enum typeFormatString = "%s"; 1588 }