1 /** 2 Redis database client implementation. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Jan Krüger, Sönke Ludwig, Michael Eisendle, Etienne Cimon 7 */ 8 module vibe.db.redis.redis; 9 10 public import vibe.core.net; 11 12 import vibe.core.connectionpool; 13 import vibe.core.core; 14 import vibe.core.log; 15 import vibe.inet.url; 16 import vibe.internal.allocator; 17 import vibe.internal.freelistref; 18 import vibe.stream.operations; 19 import std.conv; 20 import std.exception; 21 import std.format; 22 import std.range : isInputRange, isOutputRange; 23 import std.string; 24 import std.traits; 25 import std.typecons : Nullable; 26 import std.utf; 27 28 @safe: 29 30 31 /** 32 Returns a RedisClient that can be used to communicate to the specified database server. 33 */ 34 RedisClient connectRedis(string host, ushort port = RedisClient.defaultPort) 35 { 36 return new RedisClient(host, port); 37 } 38 39 /** 40 Returns a Redis database connection instance corresponding to the given URL. 41 42 The URL must be of the format "redis://server[:port]/dbnum". 43 44 Authentication: 45 Authenticated connections are supported by using a URL connection string 46 such as "redis://password@host". 47 48 Examples: 49 --- 50 // connecting with default settings: 51 auto redisDB = connectRedisDB("redis://127.0.0.1"); 52 --- 53 54 --- 55 // connecting using the URL form with custom settings 56 auto redisDB = connectRedisDB("redis://password:myremotehost/3?maxmemory=10000000"); 57 --- 58 59 Params: 60 url = Redis URI scheme for a Redis database instance 61 62 Returns: 63 A new RedisDatabase instance that can be used to access the database. 64 65 See_also: $(LINK2 https://www.iana.org/assignments/uri-schemes/prov/redis, Redis URI scheme) 66 */ 67 RedisDatabase connectRedisDB(URL url) 68 { 69 auto cli = connectRedis(url.host, url.port != 0 ? url.port : RedisClient.defaultPort); 70 71 if (!url.queryString.empty) 72 { 73 import vibe.inet.webform : FormFields, parseURLEncodedForm; 74 auto query = FormFields.init; 75 parseURLEncodedForm(url.queryString, query); 76 foreach (param, val; query.byKeyValue) 77 { 78 switch (param) 79 { 80 /** 81 The password to use for the Redis AUTH command comes from either the 82 password portion of the "userinfo" URI field or the value from the 83 key-value pair from the "query" URI field with the key "password". 84 If both the password portion of the "userinfo" URI field and a 85 "query" URI field key-value pair with the key "password" are present, 86 the semantics for what password to use for authentication are not 87 well-defined. Such situations therefore ought to be avoided. 88 */ 89 case "password": 90 if (!url.password.empty) 91 cli.auth(val); 92 break; 93 default: 94 throw new Exception(`Redis config parameter "` ~ param ~ `" isn't supported`); 95 } 96 } 97 } 98 99 /* 100 Redis' current optional authentication mechanism does not employ a 101 username, but this might change in the future 102 */ 103 if (!url.password.empty) 104 cli.auth(url.password); 105 106 long databaseIndex; 107 if (url.localURI.length >= 2) 108 databaseIndex = url.pathString[1 .. $].to!long; 109 110 return cli.getDatabase(databaseIndex); 111 } 112 113 /** 114 A redis client with connection pooling. 115 */ 116 final class RedisClient { 117 private { 118 ConnectionPool!RedisConnection m_connections; 119 string m_authPassword; 120 string m_version; 121 long m_selectedDB; 122 } 123 124 enum defaultPort = 6379; 125 126 this(string host = "127.0.0.1", ushort port = defaultPort) 127 { 128 m_connections = new ConnectionPool!RedisConnection({ 129 return new RedisConnection(host, port); 130 }); 131 } 132 133 /** Release all connections that are not in use. Call before exiting the 134 * program to avoid relying on the GC to clean up the sockets. 135 */ 136 void releaseUnusedConnections() @safe 137 { 138 // the try/catch in this is for the old vibe.d:core library, not 139 // necessary in vibe-core, but we have to work with both. 140 m_connections.removeUnused((conn) @safe nothrow { 141 try { 142 conn.m_conn.close(); 143 } catch(Exception e) { 144 logDebug("Failed to close unused Redis connection: %s", e.msg); 145 } 146 }); 147 } 148 149 /// Returns Redis version 150 @property string redisVersion() 151 { 152 if(m_version == "") 153 { 154 import std.string; 155 auto info = info(); 156 auto lines = info.splitLines(); 157 if (lines.length > 1) { 158 foreach (string line; lines) { 159 auto lineParams = line.split(":"); 160 if (lineParams.length > 1 && lineParams[0] == "redis_version") { 161 m_version = lineParams[1]; 162 break; 163 } 164 } 165 } 166 } 167 168 return m_version; 169 } 170 171 /** Returns a handle to the given database. 172 */ 173 RedisDatabase getDatabase(long index) { return RedisDatabase(this, index); } 174 175 /** Creates a RedisSubscriber instance for launching a pubsub listener 176 */ 177 RedisSubscriber createSubscriber() { 178 return RedisSubscriber(this); 179 } 180 181 /* 182 Connection 183 */ 184 185 /// Authenticate to the server 186 void auth(string password) { m_authPassword = password; } 187 /// Echo the given string 188 T echo(T, U)(U data) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("ECHO", data); } 189 /// Ping the server 190 void ping() { request("PING"); } 191 /// Close the connection 192 void quit() { request("QUIT"); } 193 194 /* 195 Server 196 */ 197 198 //TODO: BGREWRITEAOF 199 //TODO: BGSAVE 200 201 /// Get the value of a configuration parameter 202 T getConfig(T)(string parameter) if(isValidRedisValueReturn!T) { return request!T("CONFIG", "GET", parameter); } 203 /// Set a configuration parameter to the given value 204 void setConfig(T)(string parameter, T value) if(isValidRedisValueType!T) { request("CONFIG", "SET", parameter, value); } 205 /// Reset the stats returned by INFO 206 void configResetStat() { request("CONFIG", "RESETSTAT"); } 207 208 //TOOD: Debug Object 209 //TODO: Debug Segfault 210 211 /** Deletes all keys from all databases. 212 213 See_also: $(LINK2 http://redis.io/commands/flushall, FLUSHALL) 214 */ 215 void deleteAll() { request("FLUSHALL"); } 216 217 /// Get information and statistics about the server 218 string info() { return request!string("INFO"); } 219 /// Get the UNIX time stamp of the last successful save to disk 220 long lastSave() { return request!long("LASTSAVE"); } 221 //TODO monitor 222 /// Synchronously save the dataset to disk 223 void save() { request("SAVE"); } 224 /// Synchronously save the dataset to disk and then shut down the server 225 void shutdown() { request("SHUTDOWN"); } 226 /// Make the server a slave of another instance, or promote it as master 227 void slaveOf(string host, ushort port) { request("SLAVEOF", host, port); } 228 229 //TODO slowlog 230 //TODO sync 231 232 private T request(T = void, ARGS...)(string command, scope ARGS args) 233 { 234 return requestDB!(T, ARGS)(m_selectedDB, command, args); 235 } 236 237 private T requestDB(T, ARGS...)(long db, string command, scope ARGS args) 238 { 239 auto conn = m_connections.lockConnection(); 240 conn.setAuth(m_authPassword); 241 conn.setDB(db); 242 version (RedisDebug) { 243 import std.conv; 244 string debugargs = command; 245 foreach (i, A; ARGS) debugargs ~= ", " ~ args[i].to!string; 246 } 247 248 static if (is(T == void)) { 249 version (RedisDebug) logDebug("Redis request: %s => void", debugargs); 250 _request!void(conn, command, args); 251 } else static if (!isInstanceOf!(RedisReply, T)) { 252 auto ret = _request!T(conn, command, args); 253 version (RedisDebug) logDebug("Redis request: %s => %s", debugargs, ret.to!string); 254 return ret; 255 } else { 256 auto ret = _request!T(conn, command, args); 257 version (RedisDebug) logDebug("Redis request: %s => RedisReply", debugargs); 258 return ret; 259 } 260 } 261 } 262 263 264 /** 265 Accesses the contents of a Redis database 266 */ 267 struct RedisDatabase { 268 private { 269 RedisClient m_client; 270 long m_index; 271 } 272 273 private this(RedisClient client, long index) 274 { 275 m_client = client; 276 m_index = index; 277 } 278 279 /** The Redis client with which the database is accessed. 280 */ 281 @property inout(RedisClient) client() inout { return m_client; } 282 283 /** Index of the database. 284 */ 285 @property long index() const { return m_index; } 286 287 /** Deletes all keys of the database. 288 289 See_also: $(LINK2 http://redis.io/commands/flushdb, FLUSHDB) 290 */ 291 void deleteAll() { request!void("FLUSHDB"); } 292 /// Delete a key 293 long del(scope string[] keys...) { return request!long("DEL", keys); } 294 /// Determine if a key exists 295 bool exists(string key) { return request!bool("EXISTS", key); } 296 /// Set a key's time to live in seconds 297 bool expire(string key, long seconds) { return request!bool("EXPIRE", key, seconds); } 298 /// Set a key's time to live with D notation. E.g. $(D 5.minutes) for 60 * 5 seconds. 299 bool expire(string key, Duration timeout) { return request!bool("PEXPIRE", key, timeout.total!"msecs"); } 300 /// Set the expiration for a key as a UNIX timestamp 301 bool expireAt(string key, long timestamp) { return request!bool("EXPIREAT", key, timestamp); } 302 /// Find all keys matching the given glob-style pattern (Supported wildcards: *, ?, [ABC]) 303 RedisReply!T keys(T = string)(string pattern) if(isValidRedisValueType!T) { return request!(RedisReply!T)("KEYS", pattern); } 304 /// Move a key to another database 305 bool move(string key, long db) { return request!bool("MOVE", key, db); } 306 /// Remove the expiration from a key 307 bool persist(string key) { return request!bool("PERSIST", key); } 308 //TODO: object 309 /// Return a random key from the keyspace 310 string randomKey() { return request!string("RANDOMKEY"); } 311 /// Rename a key 312 void rename(string key, string newkey) { request("RENAME", key, newkey); } 313 /// Rename a key, only if the new key does not exist 314 bool renameNX(string key, string newkey) { return request!bool("RENAMENX", key, newkey); } 315 //TODO sort 316 /// Get the time to live for a key 317 long ttl(string key) { return request!long("TTL", key); } 318 /// Get the time to live for a key in milliseconds 319 long pttl(string key) { return request!long("PTTL", key); } 320 /// Determine the type stored at key (string, list, set, zset and hash.) 321 string type(string key) { return request!string("TYPE", key); } 322 323 /* 324 String Commands 325 */ 326 327 /// Append a value to a key 328 long append(T)(string key, T suffix) if(isValidRedisValueType!T) { return request!long("APPEND", key, suffix); } 329 /// Decrement the integer value of a key by one 330 long decr(string key, long value = 1) { return value == 1 ? request!long("DECR", key) : request!long("DECRBY", key, value); } 331 /// Get the value of a key 332 T get(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("GET", key); } 333 /// Returns the bit value at offset in the string value stored at key 334 bool getBit(string key, long offset) { return request!bool("GETBIT", key, offset); } 335 /// Get a substring of the string stored at a key 336 T getRange(T = string)(string key, long start, long end) if(isValidRedisValueReturn!T) { return request!T("GETRANGE", key, start, end); } 337 /// Set the string value of a key and return its old value 338 T getSet(T = string, U)(string key, U value) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("GETSET", key, value); } 339 /// Increment the integer value of a key 340 long incr(string key, long value = 1) { return value == 1 ? request!long("INCR", key) : request!long("INCRBY", key, value); } 341 /// Increment the real number value of a key 342 long incr(string key, double value) { return request!long("INCRBYFLOAT", key, value); } 343 /// Get the values of all the given keys 344 RedisReply!T mget(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("MGET", keys); } 345 346 /// Set multiple keys to multiple values 347 void mset(ARGS...)(ARGS args) 348 { 349 static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value"); 350 foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings."); 351 request("MSET", args); 352 } 353 354 /// Set multiple keys to multiple values, only if none of the keys exist 355 bool msetNX(ARGS...)(ARGS args) { 356 static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value"); 357 foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings."); 358 return request!bool("MSETEX", args); 359 } 360 361 /// Set the string value of a key 362 void set(T)(string key, T value) if(isValidRedisValueType!T) { request("SET", key, value); } 363 /// Set the value of a key, only if the key does not exist 364 bool setNX(T)(string key, T value) if(isValidRedisValueType!T) { return request!bool("SETNX", key, value); } 365 /// Set the value of a key, only if the key already exists 366 bool setXX(T)(string key, T value) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "XX"); } 367 /// Set the value of a key, only if the key does not exist, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes. 368 bool setNX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "NX"); } 369 /// Set the value of a key, only if the key already exists, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes. 370 bool setXX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "XX"); } 371 /// Sets or clears the bit at offset in the string value stored at key 372 bool setBit(string key, long offset, bool value) { return request!bool("SETBIT", key, offset, value ? "1" : "0"); } 373 /// Set the value and expiration of a key 374 void setEX(T)(string key, long seconds, T value) if(isValidRedisValueType!T) { request("SETEX", key, seconds, value); } 375 /// Overwrite part of a string at key starting at the specified offset 376 long setRange(T)(string key, long offset, T value) if(isValidRedisValueType!T) { return request!long("SETRANGE", key, offset, value); } 377 /// Get the length of the value stored in a key 378 long strlen(string key) { return request!long("STRLEN", key); } 379 380 /* 381 Hashes 382 */ 383 /// Delete one or more hash fields 384 long hdel(string key, scope string[] fields...) { return request!long("HDEL", key, fields); } 385 /// Determine if a hash field exists 386 bool hexists(string key, string field) { return request!bool("HEXISTS", key, field); } 387 /// Set multiple hash fields to multiple values 388 void hset(T)(string key, string field, T value) if(isValidRedisValueType!T) { request("HSET", key, field, value); } 389 /// Set the value of a hash field, only if the field does not exist 390 bool hsetNX(T)(string key, string field, T value) if(isValidRedisValueType!T) { return request!bool("HSETNX", key, field, value); } 391 /// Get the value of a hash field. 392 T hget(T = string)(string key, string field) if(isValidRedisValueReturn!T) { return request!T("HGET", key, field); } 393 /// Get all the fields and values in a hash 394 RedisReply!T hgetAll(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HGETALL", key); } 395 /// Increment the integer value of a hash field 396 long hincr(string key, string field, long value=1) { return request!long("HINCRBY", key, field, value); } 397 /// Increment the real number value of a hash field 398 long hincr(string key, string field, double value) { return request!long("HINCRBYFLOAT", key, field, value); } 399 /// Get all the fields in a hash 400 RedisReply!T hkeys(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HKEYS", key); } 401 /// Get the number of fields in a hash 402 long hlen(string key) { return request!long("HLEN", key); } 403 /// Get the values of all the given hash fields 404 RedisReply!T hmget(T = string)(string key, scope string[] fields...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HMGET", key, fields); } 405 /// Set multiple hash fields to multiple values 406 void hmset(ARGS...)(string key, ARGS args) { request("HMSET", key, args); } 407 408 /// Get all the values in a hash 409 RedisReply!T hvals(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HVALS", key); } 410 411 /* 412 Lists 413 */ 414 /// Get an element from a list by its index 415 T lindex(T = string)(string key, long index) if(isValidRedisValueReturn!T) { return request!T("LINDEX", key, index); } 416 /// Insert value in the list stored at key before the reference value pivot. 417 long linsertBefore(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "BEFORE", pivot, value); } 418 /// Insert value in the list stored at key after the reference value pivot. 419 long linsertAfter(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "AFTER", pivot, value); } 420 /// Returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned. 421 long llen(string key) { return request!long("LLEN", key); } 422 /// Insert all the specified values at the head of the list stored at key. 423 long lpush(ARGS...)(string key, ARGS args) { return request!long("LPUSH", key, args); } 424 /// Inserts value at the head of the list stored at key, only if key already exists and holds a list. 425 long lpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("LPUSHX", key, value); } 426 /// Insert all the specified values at the tail of the list stored at key. 427 long rpush(ARGS...)(string key, ARGS args) { return request!long("RPUSH", key, args); } 428 /// Inserts value at the tail of the list stored at key, only if key already exists and holds a list. 429 long rpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("RPUSHX", key, value); } 430 /// Returns the specified elements of the list stored at key. 431 RedisReply!T lrange(T = string)(string key, long start, long stop) { return request!(RedisReply!T)("LRANGE", key, start, stop); } 432 /// Removes the first count occurrences of elements equal to value from the list stored at key. 433 long lrem(T)(string key, long count, T value) if(isValidRedisValueType!T) { return request!long("LREM", key, count, value); } 434 /// Sets the list element at index to value. 435 void lset(T)(string key, long index, T value) if(isValidRedisValueType!T) { request("LSET", key, index, value); } 436 /// Trim an existing list so that it will contain only the specified range of elements specified. 437 /// Equivalent to $(D range = range[start .. stop+1]) 438 void ltrim(string key, long start, long stop) { request("LTRIM", key, start, stop); } 439 /// Removes and returns the last element of the list stored at key. 440 T rpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("RPOP", key); } 441 /// Removes and returns the first element of the list stored at key. 442 T lpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("LPOP", key); } 443 /// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks 444 /// the connection when there are no elements to pop from any of the given lists. 445 Nullable!(Tuple!(string, T)) blpop(T = string)(string key, long seconds) if(isValidRedisValueReturn!T) 446 { 447 auto reply = request!(RedisReply!(ubyte[]))("BLPOP", key, seconds); 448 Nullable!(Tuple!(string, T)) ret; 449 if (reply.empty || reply.frontIsNull) return ret; 450 string rkey = reply.front.convertToType!string(); 451 reply.popFront(); 452 ret = tuple(rkey, reply.front.convertToType!T()); 453 return ret; 454 } 455 /// Atomically returns and removes the last element (tail) of the list stored at source, 456 /// and pushes the element at the first element (head) of the list stored at destination. 457 T rpoplpush(T = string)(string key, string destination) if(isValidRedisValueReturn!T) { return request!T("RPOPLPUSH", key, destination); } 458 459 /* 460 Sets 461 */ 462 /// Add the specified members to the set stored at key. Specified members that are already a member of this set are ignored. 463 /// If key does not exist, a new set is created before adding the specified members. 464 long sadd(ARGS...)(string key, ARGS args) { return request!long("SADD", key, args); } 465 /// Returns the set cardinality (number of elements) of the set stored at key. 466 long scard(string key) { return request!long("SCARD", key); } 467 /// Returns the members of the set resulting from the difference between the first set and all the successive sets. 468 RedisReply!T sdiff(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SDIFF", keys); } 469 /// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination. 470 /// If destination already exists, it is overwritten. 471 long sdiffStore(string destination, scope string[] keys...) { return request!long("SDIFFSTORE", destination, keys); } 472 /// Returns the members of the set resulting from the intersection of all the given sets. 473 RedisReply!T sinter(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SINTER", keys); } 474 /// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination. 475 /// If destination already exists, it is overwritten. 476 long sinterStore(string destination, scope string[] keys...) { return request!long("SINTERSTORE", destination, keys); } 477 /// Returns if member is a member of the set stored at key. 478 bool sisMember(T)(string key, T member) if(isValidRedisValueType!T) { return request!bool("SISMEMBER", key, member); } 479 /// Returns all the members of the set value stored at key. 480 RedisReply!T smembers(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SMEMBERS", key); } 481 /// Move member from the set at source to the set at destination. This operation is atomic. 482 /// In every given moment the element will appear to be a member of source or destination for other clients. 483 bool smove(T)(string source, string destination, T member) if(isValidRedisValueType!T) { return request!bool("SMOVE", source, destination, member); } 484 /// Removes and returns a random element from the set value stored at key. 485 T spop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SPOP", key ); } 486 /// Returns a random element from the set stored at key. 487 T srandMember(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SRANDMEMBER", key ); } 488 ///returns count random elements from the set stored at key 489 RedisReply!T srandMember(T = string)(string key, long count) if(isValidRedisValueReturn!T) { return request!(RedisReply!T)("SRANDMEMBER", key, count ); } 490 491 492 /// Remove the specified members from the set stored at key. 493 long srem(ARGS...)(string key, ARGS args) { return request!long("SREM", key, args); } 494 /// Returns the members of the set resulting from the union of all the given sets. 495 RedisReply!T sunion(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SUNION", keys); } 496 /// This command is equal to SUNION, but instead of returning the resulting set, it is stored in destination. 497 long sunionStore(scope string[] keys...) { return request!long("SUNIONSTORE", keys); } 498 499 /* 500 Sorted Sets 501 */ 502 /// Add one or more members to a sorted set, or update its score if it already exists 503 long zadd(ARGS...)(string key, ARGS args) { return request!long("ZADD", key, args); } 504 /// Returns the sorted set cardinality (number of elements) of the sorted set stored at key. 505 long zcard(string key) { return request!long("ZCARD", key); } 506 /// Returns the number of elements in the sorted set at key with a score between min and max 507 long zcount(string RNG = "[]")(string key, double min, double max) { return request!long("ZCOUNT", key, getMinMaxArgs!RNG(min, max)); } 508 /// Increments the score of member in the sorted set stored at key by increment. 509 double zincrby(T)(string key, double value, T member) if (isValidRedisValueType!T) { return request!double("ZINCRBY", key, value, member); } 510 //TODO: zinterstore 511 /// Returns the specified range of elements in the sorted set stored at key. 512 RedisReply!T zrange(T = string)(string key, long start, long end, bool with_scores = false) 513 if(isValidRedisValueType!T) 514 { 515 if (with_scores) return request!(RedisReply!T)("ZRANGE", key, start, end, "WITHSCORES"); 516 else return request!(RedisReply!T)("ZRANGE", key, start, end); 517 } 518 519 long zlexCount(string key, string min = "-", string max = "+") { return request!long("ZLEXCOUNT", key, min, max); } 520 521 /// When all the elements in a sorted set are inserted with the same score, in order to force lexicographical ordering, 522 /// this command returns all the elements in the sorted set at key with a value between min and max. 523 RedisReply!T zrangeByLex(T = string)(string key, string min = "-", string max = "+", long offset = 0, long count = -1) 524 if(isValidRedisValueType!T) 525 { 526 if (offset > 0 || count != -1) return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max, "LIMIT", offset, count); 527 else return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max); 528 } 529 530 /// Returns all the elements in the sorted set at key with a score between start and end inclusively 531 RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, bool with_scores = false) 532 if(isValidRedisValueType!T) 533 { 534 if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES"); 535 else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end)); 536 } 537 538 /// Computes an internal list of elements in the sorted set at key with a score between start and end inclusively, 539 /// and returns a range subselection similar to $(D results[offset .. offset+count]) 540 RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, long offset, long count, bool with_scores = false) 541 if(isValidRedisValueType!T) 542 { 543 assert(offset >= 0); 544 assert(count >= 0); 545 if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES", "LIMIT", offset, count); 546 else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "LIMIT", offset, count); 547 } 548 549 /// Returns the rank of member in the sorted set stored at key, with the scores ordered from low to high. 550 long zrank(T)(string key, T member) 551 if (isValidRedisValueType!T) 552 { 553 auto str = request!string("ZRANK", key, member); 554 return str != "" ? parse!long(str) : -1; 555 } 556 557 /// Removes the specified members from the sorted set stored at key. 558 long zrem(ARGS...)(string key, ARGS members) { return request!long("ZREM", key, members); } 559 /// Removes all elements in the sorted set stored at key with rank between start and stop. 560 long zremRangeByRank(string key, long start, long stop) { return request!long("ZREMRANGEBYRANK", key, start, stop); } 561 /// Removes all elements in the sorted set stored at key with a score between min and max (inclusive). 562 long zremRangeByScore(string RNG = "[]")(string key, double min, double max) { return request!long("ZREMRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max));} 563 /// Returns the specified range of elements in the sorted set stored at key. 564 RedisReply!T zrevRange(T = string)(string key, long start, long end, bool with_scores = false) 565 if(isValidRedisValueType!T) 566 { 567 if (with_scores) return request!(RedisReply!T)("ZREVRANGE", key, start, end, "WITHSCORES"); 568 else return request!(RedisReply!T)("ZREVRANGE", key, start, end); 569 } 570 571 /// Returns all the elements in the sorted set at key with a score between max and min (including elements with score equal to max or min). 572 RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, bool with_scores=false) 573 if(isValidRedisValueType!T) 574 { 575 if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES"); 576 else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max)); 577 } 578 579 /// Computes an internal list of elements in the sorted set at key with a score between max and min, and 580 /// returns a window of elements selected in a way equivalent to $(D results[offset .. offset + count]) 581 RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, long offset, long count, bool with_scores=false) 582 if(isValidRedisValueType!T) 583 { 584 assert(offset >= 0); 585 assert(count >= 0); 586 if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES", "LIMIT", offset, count); 587 else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "LIMIT", offset, count); 588 } 589 590 /// Returns the rank of member in the sorted set stored at key, with the scores ordered from high to low. 591 long zrevRank(T)(string key, T member) 592 if (isValidRedisValueType!T) 593 { 594 auto str = request!string("ZREVRANK", key, member); 595 return str != "" ? parse!long(str) : -1; 596 } 597 598 /// Returns the score of member in the sorted set at key. 599 RedisReply!T zscore(T = string, U)(string key, U member) 600 if(isValidRedisValueType!T && isValidRedisValueType!U) 601 { 602 return request!(RedisReply!T)("ZSCORE", key, member); 603 } 604 605 /* 606 Hyperloglog 607 */ 608 609 /// Adds one or more Keys to a HyperLogLog data structure . 610 long pfadd(ARGS...)(string key, ARGS args) { return request!long("PFADD", key, args); } 611 612 /** Returns the approximated cardinality computed by the HyperLogLog data 613 structure stored at the specified key. 614 615 When called with a single key, returns the approximated cardinality 616 computed by the HyperLogLog data structure stored at the specified 617 variable, which is 0 if the variable does not exist. 618 619 When called with multiple keys, returns the approximated cardinality 620 of the union of the HyperLogLogs passed, by internally merging the 621 HyperLogLogs stored at the provided keys into a temporary HyperLogLog. 622 */ 623 long pfcount(scope string[] keys...) { return request!long("PFCOUNT", keys); } 624 625 /// Merge multiple HyperLogLog values into a new one. 626 void pfmerge(ARGS...)(string destkey, ARGS args) { request("PFMERGE", destkey, args); } 627 628 629 //TODO: zunionstore 630 631 /* 632 Pub / Sub 633 */ 634 635 /// Publishes a message to all clients subscribed at the channel 636 long publish(string channel, string message) 637 { 638 auto str = request!string("PUBLISH", channel, message); 639 return str != "" ? parse!long(str) : -1; 640 } 641 642 /// Inspect the state of the Pub/Sub subsystem 643 RedisReply!T pubsub(T = string)(string subcommand, scope string[] args...) 644 if(isValidRedisValueType!T) 645 { 646 return request!(RedisReply!T)("PUBSUB", subcommand, args); 647 } 648 649 /* 650 TODO: Transactions 651 */ 652 /// Return the number of keys in the selected database 653 long dbSize() { return request!long("DBSIZE"); } 654 655 /* 656 LUA Scripts 657 */ 658 /// Execute a Lua script server side 659 RedisReply!T eval(T = string, ARGS...)(string lua_code, scope string[] keys, scope ARGS args) 660 if(isValidRedisValueType!T) 661 { 662 return request!(RedisReply!T)("EVAL", lua_code, keys.length, keys, args); 663 } 664 /// Evaluates a script cached on the server side by its SHA1 digest. Scripts are cached on the server side using the scriptLoad function. 665 RedisReply!T evalSHA(T = string, ARGS...)(string sha, scope string[] keys, scope ARGS args) 666 if(isValidRedisValueType!T) 667 { 668 return request!(RedisReply!T)("EVALSHA", sha, keys.length, keys, args); 669 } 670 671 //scriptExists 672 //scriptFlush 673 //scriptKill 674 675 /// Load a script into the scripts cache, without executing it. Run it using evalSHA. 676 string scriptLoad(string lua_code) { return request!string("SCRIPT", "LOAD", lua_code); } 677 678 /// Run the specified command and arguments in the Redis database server 679 T request(T = void, ARGS...)(string command, scope ARGS args) 680 { 681 return m_client.requestDB!(T, ARGS)(m_index, command, args); 682 } 683 684 private static string[2] getMinMaxArgs(string RNG)(double min, double max) 685 { 686 // TODO: avoid GC allocations 687 static assert(RNG.length == 2, "The RNG range specification must be two characters long"); 688 689 string[2] ret; 690 string mins, maxs; 691 mins = min == -double.infinity ? "-inf" : min == double.infinity ? "+inf" : format(typeFormatString!double, min); 692 maxs = max == -double.infinity ? "-inf" : max == double.infinity ? "+inf" : format(typeFormatString!double, max); 693 694 static if (RNG[0] == '[') ret[0] = mins; 695 else static if (RNG[0] == '(') ret[0] = '('~mins; 696 else static assert(false, "Opening range specification mist be either '[' or '('."); 697 698 static if (RNG[1] == ']') ret[1] = maxs; 699 else static if (RNG[1] == ')') ret[1] = '('~maxs; 700 else static assert(false, "Closing range specification mist be either ']' or ')'."); 701 702 return ret; 703 } 704 } 705 706 707 /** 708 A redis subscription listener 709 */ 710 import std.datetime; 711 import std.variant; 712 import std.typecons : Tuple, tuple; 713 import std.container : Array; 714 import std.algorithm : canFind; 715 import std.range : takeOne; 716 import std.array : array; 717 718 import vibe.core.concurrency; 719 import vibe.core.sync; 720 721 alias RedisSubscriber = FreeListRef!RedisSubscriberImpl; 722 723 final class RedisSubscriberImpl { 724 private { 725 RedisClient m_client; 726 LockedConnection!RedisConnection m_lockedConnection; 727 bool[string] m_subscriptions; 728 string[] m_pendingSubscriptions; 729 bool m_listening; 730 bool m_stop; 731 Task m_listener; 732 Task m_listenerHelper; 733 Task m_waiter; 734 Task m_stopWaiter; 735 InterruptibleRecursiveTaskMutex m_mutex; 736 InterruptibleTaskMutex m_connMutex; 737 } 738 739 private enum Action { 740 DATA, 741 STOP, 742 STARTED, 743 SUBSCRIBE, 744 UNSUBSCRIBE 745 } 746 747 @property bool isListening() const { 748 return m_listening; 749 } 750 751 /// Get a list of channels with active subscriptions 752 @property string[] subscriptions() const { 753 return () @trusted { return m_subscriptions.keys; } (); 754 } 755 756 bool hasSubscription(string channel) const { 757 return (channel in m_subscriptions) !is null && m_subscriptions[channel]; 758 } 759 760 this(RedisClient client) { 761 762 logTrace("this()"); 763 m_client = client; 764 m_mutex = new InterruptibleRecursiveTaskMutex; 765 m_connMutex = new InterruptibleTaskMutex; 766 } 767 768 // FIXME: instead of waiting here, the class must be reference counted 769 // and destructions needs to be defered until everything is stopped 770 ~this() { 771 logTrace("~this"); 772 waitForStop(); 773 } 774 775 // Task will block until the listener is finished 776 private void waitForStop() 777 { 778 logTrace("waitForStop"); 779 if (!m_listening) return; 780 781 void impl() @safe { 782 m_mutex.performLocked!({ 783 m_stopWaiter = Task.getThis(); 784 }); 785 if (!m_listening) return; // verify again in case the mutex was locked by bstop 786 scope(exit) { 787 m_mutex.performLocked!({ 788 m_stopWaiter = Task(); 789 }); 790 } 791 bool stopped; 792 do { 793 () @trusted { receive((Action act) { if (act == Action.STOP) stopped = true; }); } (); 794 } while (!stopped); 795 796 enforce(stopped, "Failed to wait for Redis listener to stop"); 797 } 798 inTask(&impl); 799 } 800 801 /// Stop listening and yield until the operation is complete. 802 void bstop(){ 803 logTrace("bstop"); 804 if (!m_listening) return; 805 806 void impl() @safe { 807 m_mutex.performLocked!({ 808 m_waiter = Task.getThis(); 809 scope(exit) m_waiter = Task(); 810 stop(); 811 812 bool stopped; 813 do { 814 if (!() @trusted { return receiveTimeout(3.seconds, (Action act) { if (act == Action.STOP) stopped = true; }); } ()) 815 break; 816 } while (!stopped); 817 818 enforce(stopped, "Failed to wait for Redis listener to stop"); 819 }); 820 } 821 inTask(&impl); 822 } 823 824 /// Stop listening asynchroneously 825 void stop(){ 826 logTrace("stop"); 827 if (!m_listening) 828 return; 829 830 void impl() @safe { 831 m_mutex.performLocked!({ 832 m_stop = true; 833 () @trusted { m_listener.send(Action.STOP); } (); 834 // send a message to wake up the listenerHelper from the reply 835 if (m_subscriptions.length > 0) { 836 m_connMutex.performLocked!(() { 837 _request_void(m_lockedConnection, "UNSUBSCRIBE", () @trusted { return cast(string[]) m_subscriptions.keys.takeOne.array; } ()); 838 }); 839 sleep(30.msecs); 840 } 841 }); 842 } 843 inTask(&impl); 844 } 845 846 private bool hasNewSubscriptionIn(scope string[] args) { 847 bool has_new; 848 foreach (arg; args) 849 if (!hasSubscription(arg)) 850 has_new = true; 851 if (!has_new) 852 return false; 853 854 return true; 855 } 856 857 private bool anySubscribed(scope string[] args) { 858 859 bool any_subscribed; 860 foreach (arg ; args) { 861 if (hasSubscription(arg)) 862 any_subscribed = true; 863 } 864 return any_subscribed; 865 } 866 867 /// Completes the subscription for a listener to start receiving pubsub messages 868 /// on the corresponding channel(s). Returns instantly if already subscribed. 869 /// If a connection error is thrown here, it stops the listener. 870 void subscribe(scope string[] args...) 871 { 872 logTrace("subscribe"); 873 if (!m_listening) { 874 foreach (arg; args) 875 m_pendingSubscriptions ~= arg; 876 return; 877 } 878 879 if (!hasNewSubscriptionIn(args)) 880 return; 881 882 void impl() @safe { 883 884 scope(failure) { logTrace("Failure"); bstop(); } 885 try { 886 m_mutex.performLocked!({ 887 m_waiter = Task.getThis(); 888 scope(exit) m_waiter = Task(); 889 bool subscribed; 890 m_connMutex.performLocked!({ 891 _request_void(m_lockedConnection, "SUBSCRIBE", args); 892 }); 893 while(!() @trusted { return m_subscriptions.byKey.canFind(args); } ()) { 894 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE); }); } ()) 895 break; 896 897 subscribed = true; 898 } 899 debug { 900 auto keys = () @trusted { return m_subscriptions.keys; } (); 901 logTrace("Can find keys?: %s", keys.canFind(args)); 902 logTrace("Subscriptions: %s", keys); 903 } 904 enforce(subscribed, "Could not complete subscription(s)."); 905 }); 906 } catch (Exception e) { 907 logDebug("Redis subscribe() failed: ", e.msg); 908 } 909 } 910 inTask(&impl); 911 } 912 913 /// Unsubscribes from the channel(s) specified, returns immediately if none 914 /// is currently being listened. 915 /// If a connection error is thrown here, it stops the listener. 916 void unsubscribe(scope string[] args...) 917 { 918 logTrace("unsubscribe"); 919 920 void impl() @safe { 921 922 if (!anySubscribed(args)) 923 return; 924 925 scope(failure) bstop(); 926 assert(m_listening); 927 928 m_mutex.performLocked!({ 929 m_waiter = Task.getThis(); 930 scope(exit) m_waiter = Task(); 931 bool unsubscribed; 932 m_connMutex.performLocked!({ 933 _request_void(m_lockedConnection, "UNSUBSCRIBE", args); 934 }); 935 while(() @trusted { return m_subscriptions.byKey.canFind(args); } ()) { 936 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE); }); } ()) { 937 unsubscribed = false; 938 break; 939 } 940 unsubscribed = true; 941 } 942 debug { 943 auto keys = () @trusted { return m_subscriptions.keys; } (); 944 logTrace("Can find keys?: %s", keys.canFind(args)); 945 logTrace("Subscriptions: %s", keys); 946 } 947 enforce(unsubscribed, "Could not complete unsubscription(s)."); 948 }); 949 } 950 inTask(&impl); 951 } 952 953 /// Same as subscribe, but uses glob patterns, and does not return instantly if 954 /// the subscriptions are already registered. 955 /// throws Exception if the pattern does not yield a new subscription. 956 void psubscribe(scope string[] args...) 957 { 958 logTrace("psubscribe"); 959 void impl() @safe { 960 scope(failure) bstop(); 961 assert(m_listening); 962 m_mutex.performLocked!({ 963 m_waiter = Task.getThis(); 964 scope(exit) m_waiter = Task(); 965 bool subscribed; 966 m_connMutex.performLocked!({ 967 _request_void(m_lockedConnection, "PSUBSCRIBE", args); 968 }); 969 970 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE); }); } ()) 971 subscribed = false; 972 else 973 subscribed = true; 974 975 debug logTrace("Subscriptions: %s", () @trusted { return m_subscriptions.keys; } ()); 976 enforce(subscribed, "Could not complete subscription(s)."); 977 }); 978 } 979 inTask(&impl); 980 } 981 982 /// Same as unsubscribe, but uses glob patterns, and does not return instantly if 983 /// the subscriptions are not registered. 984 /// throws Exception if the pattern does not yield a new unsubscription. 985 void punsubscribe(scope string[] args...) 986 { 987 logTrace("punsubscribe"); 988 void impl() @safe { 989 scope(failure) bstop(); 990 assert(m_listening); 991 m_mutex.performLocked!({ 992 m_waiter = Task.getThis(); 993 scope(exit) m_waiter = Task(); 994 bool unsubscribed; 995 m_connMutex.performLocked!({ 996 _request_void(m_lockedConnection, "PUNSUBSCRIBE", args); 997 }); 998 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE); }); } ()) 999 unsubscribed = false; 1000 else 1001 unsubscribed = true; 1002 1003 debug { 1004 auto keys = () @trusted { return m_subscriptions.keys; } (); 1005 logTrace("Can find keys?: %s", keys.canFind(args)); 1006 logTrace("Subscriptions: %s", keys); 1007 } 1008 enforce(unsubscribed, "Could not complete unsubscription(s)."); 1009 }); 1010 } 1011 inTask(&impl); 1012 } 1013 1014 private void inTask(scope void delegate() @safe impl) { 1015 logTrace("inTask"); 1016 if (Task.getThis() == Task()) 1017 { 1018 Throwable ex; 1019 bool done; 1020 Task task = runTask({ 1021 logDebug("inTask %s", Task.getThis()); 1022 try impl(); 1023 catch (Exception e) { 1024 ex = e; 1025 } 1026 done = true; 1027 }); 1028 task.join(); 1029 logDebug("done"); 1030 if (ex) 1031 throw ex; 1032 } 1033 else 1034 impl(); 1035 } 1036 1037 private void init(){ 1038 1039 logTrace("init"); 1040 if (!m_lockedConnection) { 1041 m_lockedConnection = m_client.m_connections.lockConnection(); 1042 m_lockedConnection.setAuth(m_client.m_authPassword); 1043 m_lockedConnection.setDB(m_client.m_selectedDB); 1044 } 1045 1046 if (!m_lockedConnection.conn || !m_lockedConnection.conn.connected) { 1047 try m_lockedConnection.conn = connectTCP(m_lockedConnection.m_host, m_lockedConnection.m_port); 1048 catch (Exception e) { 1049 throw new Exception(format("Failed to connect to Redis server at %s:%s.", m_lockedConnection.m_host, m_lockedConnection.m_port), __FILE__, __LINE__, e); 1050 } 1051 m_lockedConnection.conn.tcpNoDelay = true; 1052 m_lockedConnection.setAuth(m_client.m_authPassword); 1053 m_lockedConnection.setDB(m_client.m_selectedDB); 1054 } 1055 } 1056 1057 // Same as listen, but blocking 1058 void blisten(void delegate(string, string) @safe onMessage, Duration timeout = 0.seconds) 1059 { 1060 init(); 1061 1062 void onSubscribe(string channel) @safe { 1063 logTrace("Callback subscribe(%s)", channel); 1064 m_subscriptions[channel] = true; 1065 if (m_waiter != Task()) 1066 () @trusted { m_waiter.send(Action.SUBSCRIBE); } (); 1067 } 1068 1069 void onUnsubscribe(string channel) @safe { 1070 logTrace("Callback unsubscribe(%s)", channel); 1071 m_subscriptions.remove(channel); 1072 if (m_waiter != Task()) 1073 () @trusted { m_waiter.send(Action.UNSUBSCRIBE); } (); 1074 } 1075 1076 void teardown() @safe { // teardown 1077 logTrace("Redis listener exiting"); 1078 // More publish commands may be sent to this connection after recycling it, so we 1079 // actively destroy it 1080 m_lockedConnection.conn.close(); 1081 m_lockedConnection.destroy(); 1082 m_listening = false; 1083 } 1084 // http://redis.io/topics/pubsub 1085 /** 1086 SUBSCRIBE first second 1087 *3 1088 $9 1089 subscribe 1090 $5 1091 first 1092 :1 1093 *3 1094 $9 1095 subscribe 1096 $6 1097 second 1098 :2 1099 */ 1100 // This is a simple parser/handler for subscribe/unsubscribe/publish 1101 // commands sent by redis. The PubSub client protocol is simple enough 1102 1103 void pubsub_handler() { 1104 TCPConnection conn = m_lockedConnection.conn; 1105 logTrace("Pubsub handler"); 1106 void dropCRLF() @safe { 1107 ubyte[2] crlf; 1108 conn.read(crlf); 1109 } 1110 size_t readArgs() @safe { 1111 char[8] ucnt; 1112 ubyte[1] num; 1113 size_t i; 1114 do { 1115 conn.read(num); 1116 if (num[0] >= 48 && num[0] <= 57) 1117 ucnt[i] = num[0]; 1118 else break; 1119 i++; 1120 } 1121 while (true); // ascii 1122 ubyte[1] b; 1123 conn.read(b); 1124 logTrace("Found %s", ucnt); 1125 // the new line is consumed when num is not in range. 1126 return ucnt[0 .. i].to!size_t; 1127 } 1128 // find the number of arguments in the array 1129 ubyte[1] symbol; 1130 conn.read(symbol); 1131 enforce(symbol[0] == '*', "Expected '*', got '" ~ symbol.to!string ~ "'"); 1132 size_t args = readArgs(); 1133 // get the number of characters in the first string (the command) 1134 conn.read(symbol); 1135 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1136 size_t cnt = readArgs(); 1137 ubyte[] cmd = () @trusted { return theAllocator.makeArray!ubyte(cnt); } (); 1138 scope(exit) () @trusted { theAllocator.dispose(cmd); } (); 1139 conn.read(cmd); 1140 dropCRLF(); 1141 // find the channel 1142 conn.read(symbol); 1143 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1144 cnt = readArgs(); 1145 ubyte[] str = new ubyte[cnt]; 1146 conn.read(str); 1147 dropCRLF(); 1148 string channel = () @trusted { return cast(string)str; } (); 1149 logTrace("chan: %s", channel); 1150 1151 if (cmd == "message") { // find the message 1152 conn.read(symbol); 1153 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1154 cnt = readArgs(); 1155 str = new ubyte[cnt]; 1156 conn.read(str); // channel 1157 string message = () @trusted { return cast(string)str.idup; } (); 1158 logTrace("msg: %s", message); 1159 dropCRLF(); 1160 onMessage(channel, message); 1161 } 1162 else if (cmd == "subscribe" || cmd == "unsubscribe") { // find the remaining subscriptions 1163 bool is_subscribe = (cmd == "subscribe"); 1164 conn.read(symbol); 1165 enforce(symbol[0] == ':', "Expected ':', got '" ~ symbol.to!string ~ "'"); 1166 cnt = readArgs(); // number of subscriptions 1167 logTrace("subscriptions: %d", cnt); 1168 if (is_subscribe) 1169 onSubscribe(channel); 1170 else 1171 onUnsubscribe(channel); 1172 1173 // todo: enforce the number of subscriptions? 1174 } 1175 else assert(false, "Unrecognized pubsub wire protocol command received"); 1176 } 1177 1178 // Waits for data and advises the handler 1179 m_listenerHelper = runTask(() nothrow { 1180 loop: while(true) { 1181 if (m_stop || !m_lockedConnection.conn) break; 1182 1183 try { 1184 const waitResult = m_lockedConnection.conn.waitForDataEx(100.msecs); 1185 if (m_stop) break; 1186 1187 final switch (waitResult) { 1188 case WaitForDataStatus.noMoreData: 1189 break loop; 1190 case WaitForDataStatus.timeout: 1191 logTrace("No data arrival in 100 ms..."); 1192 continue loop; 1193 case WaitForDataStatus.dataAvailable: 1194 } 1195 1196 // Data has arrived, this task is in charge of notifying the main handler loop 1197 logTrace("Notify data arrival"); 1198 1199 () @trusted { receiveTimeout(0.seconds, (Variant v) {}); } (); // clear message queue 1200 () @trusted { m_listener.send(Action.DATA); } (); 1201 if (!() @trusted { return receiveTimeout(5.seconds, (Action act) { assert(act == Action.DATA); }); } ()) 1202 assert(false); 1203 } catch (Exception e) { 1204 logException(e, "Redis listen task failed - stopping to listen"); 1205 break; 1206 } 1207 } 1208 1209 logTrace("Listener Helper exit."); 1210 try () @trusted { m_listener.send(Action.STOP); } (); 1211 catch (Exception e) assert(false, e.msg); 1212 } ); 1213 1214 m_listening = true; 1215 logTrace("Redis listener now listening"); 1216 if (m_waiter != Task()) 1217 () @trusted { m_waiter.send(Action.STARTED); } (); 1218 1219 if (timeout == 0.seconds) 1220 timeout = 365.days; // make sure 0.seconds is considered as big. 1221 1222 scope(exit) { 1223 logTrace("Redis Listener exit."); 1224 if (!m_stop) { 1225 stop(); // notifies the listenerHelper 1226 } 1227 m_listenerHelper.join(); 1228 // close the data connections 1229 teardown(); 1230 1231 if (m_waiter != Task()) 1232 () @trusted { m_waiter.send(Action.STOP); } (); 1233 if (m_stopWaiter != Task()) 1234 () @trusted { m_stopWaiter.send(Action.STOP); } (); 1235 1236 m_listenerHelper = Task(); 1237 m_listener = Task(); 1238 m_stop = false; 1239 } 1240 1241 // Start waiting for data notifications to arrive 1242 while(true) { 1243 1244 auto handler = (Action act) { 1245 if (act == Action.STOP) m_stop = true; 1246 if (m_stop) return; 1247 logTrace("Calling PubSub Handler"); 1248 m_connMutex.performLocked!({ 1249 pubsub_handler(); // handles one command at a time 1250 }); 1251 () @trusted { m_listenerHelper.send(Action.DATA); } (); 1252 }; 1253 1254 if (!() @trusted { return receiveTimeout(timeout, handler); } () || m_stop) { 1255 logTrace("Redis Listener stopped"); 1256 break; 1257 } 1258 1259 } 1260 1261 } 1262 1263 /// Waits for messages and calls the callback with the channel and the message as arguments. 1264 /// The timeout is passed over to the listener, which closes after the period of inactivity. 1265 /// Use 0.seconds timeout to specify a very long time (365 days) 1266 /// Errors will be sent to Callback Delegate on channel "Error". 1267 Task listen(void delegate(string, string) @safe nothrow callback, Duration timeout = 0.seconds) 1268 { 1269 logTrace("Listen"); 1270 void impl() @safe { 1271 logTrace("Listen"); 1272 m_waiter = Task.getThis(); 1273 scope(exit) m_waiter = Task(); 1274 Throwable ex; 1275 m_listener = runTask(() nothrow { 1276 try blisten(callback, timeout); 1277 catch(Exception e) { 1278 ex = e; 1279 if (m_waiter != Task() && !m_listening) { 1280 try () @trusted { m_waiter.send(Action.STARTED); } (); 1281 catch (Exception e) assert(false, e.msg); 1282 return; 1283 } 1284 callback("Error", e.msg); 1285 } 1286 }); 1287 m_mutex.performLocked!({ 1288 import std.datetime : usecs; 1289 () @trusted { receiveTimeout(2.seconds, (Action act) { assert( act == Action.STARTED); }); } (); 1290 if (ex) throw ex; 1291 enforce(m_listening, "Failed to start listening, timeout of 2 seconds expired"); 1292 }); 1293 1294 foreach (channel; m_pendingSubscriptions) { 1295 subscribe(channel); 1296 } 1297 1298 m_pendingSubscriptions = null; 1299 } 1300 inTask(&impl); 1301 return m_listener; 1302 } 1303 } 1304 1305 1306 1307 /** Range interface to a single Redis reply. 1308 */ 1309 struct RedisReply(T = ubyte[]) { 1310 static assert(isInputRange!RedisReply); 1311 1312 private { 1313 uint m_magic = 0x15f67ab3; 1314 RedisConnection m_conn; 1315 LockedConnection!RedisConnection m_lockedConnection; 1316 } 1317 1318 alias ElementType = T; 1319 1320 private this(RedisConnection conn) 1321 { 1322 m_conn = conn; 1323 auto ctx = &conn.m_replyContext; 1324 assert(ctx.refCount == 0); 1325 *ctx = RedisReplyContext.init; 1326 ctx.refCount++; 1327 } 1328 1329 this(this) 1330 { 1331 assert(m_magic == 0x15f67ab3); 1332 if (m_conn) { 1333 auto ctx = &m_conn.m_replyContext; 1334 assert(ctx.refCount > 0); 1335 ctx.refCount++; 1336 } 1337 } 1338 1339 ~this() 1340 { 1341 assert(m_magic == 0x15f67ab3); 1342 if (m_conn) { 1343 if (!--m_conn.m_replyContext.refCount) 1344 drop(); 1345 } 1346 } 1347 1348 @property bool empty() const { return !m_conn || m_conn.m_replyContext.index >= m_conn.m_replyContext.length; } 1349 1350 /** Returns the current element of the reply. 1351 1352 Note that byte and character arrays may be returned as slices to a 1353 temporary buffer. This buffer will be invalidated on the next call to 1354 $(D popFront), so it needs to be duplicated for permanent storage. 1355 */ 1356 @property T front() 1357 { 1358 assert(!empty, "Accessing the front of an empty RedisReply!"); 1359 auto ctx = &m_conn.m_replyContext; 1360 if (!ctx.hasData) readData(); 1361 1362 ubyte[] ret = ctx.data; 1363 1364 return convertToType!T(ret); 1365 } 1366 1367 @property bool frontIsNull() 1368 const { 1369 assert(!empty, "Accessing the front of an empty RedisReply!"); 1370 return m_conn.m_replyContext.frontIsNull; 1371 } 1372 1373 /** Pops the current element of the reply 1374 */ 1375 void popFront() 1376 { 1377 assert(!empty, "Popping the front of an empty RedisReply!"); 1378 1379 auto ctx = &m_conn.m_replyContext; 1380 1381 if (!ctx.hasData) readData(); // ensure that we actually read the data entry from the wire 1382 clearData(); 1383 ctx.index++; 1384 1385 if (ctx.index >= ctx.length && ctx.refCount == 1) { 1386 ctx.refCount = 0; 1387 m_conn = null; 1388 m_lockedConnection.destroy(); 1389 } 1390 } 1391 1392 1393 /// Legacy property for hasNext/next based iteration 1394 @property bool hasNext() const { return !empty; } 1395 1396 /// Legacy property for hasNext/next based iteration 1397 TN next(TN : E[], E)() 1398 { 1399 assert(hasNext, "end of reply"); 1400 1401 auto ret = front.dup; 1402 popFront(); 1403 return () @trusted { return cast(TN)ret; } (); 1404 } 1405 1406 void drop() 1407 { 1408 if (!m_conn) return; 1409 while (!empty) popFront(); 1410 } 1411 1412 private void readData() 1413 { 1414 auto ctx = &m_conn.m_replyContext; 1415 assert(!ctx.hasData && ctx.initialized); 1416 1417 if (ctx.multi) 1418 readBulk(() @trusted { return cast(string)m_conn.conn.readLine(); } ()); 1419 } 1420 1421 private void clearData() 1422 { 1423 auto ctx = &m_conn.m_replyContext; 1424 ctx.data = null; 1425 ctx.hasData = false; 1426 } 1427 1428 private @property void lockedConnection(ref LockedConnection!RedisConnection conn) 1429 { 1430 assert(m_conn !is null); 1431 m_lockedConnection = conn; 1432 } 1433 1434 private void initialize() 1435 { 1436 assert(m_conn !is null); 1437 auto ctx = &m_conn.m_replyContext; 1438 assert(!ctx.initialized); 1439 ctx.initialized = true; 1440 1441 ubyte[] ln = m_conn.conn.readLine(); 1442 1443 switch (ln[0]) { 1444 default: 1445 m_conn.conn.close(); 1446 throw new Exception(format("Unknown reply type: %s", cast(char)ln[0])); 1447 case '+': ctx.data = ln[1 .. $]; ctx.hasData = true; break; 1448 case '-': throw new Exception(() @trusted { return cast(string)ln[1 .. $]; } ()); 1449 case ':': ctx.data = ln[1 .. $]; ctx.hasData = true; break; 1450 case '$': 1451 readBulk(() @trusted { return cast(string)ln; } ()); 1452 break; 1453 case '*': 1454 if (ln.startsWith(cast(const(ubyte)[])"*-1")) { 1455 ctx.length = 0; // TODO: make this NIL reply distinguishable from a 0-length array 1456 } else { 1457 ctx.multi = true; 1458 scope (failure) m_conn.conn.close(); 1459 ctx.length = to!long(() @trusted { return cast(string)ln[1 .. $]; } ()); 1460 } 1461 break; 1462 } 1463 } 1464 1465 private void readBulk(string sizeLn) 1466 { 1467 assert(m_conn !is null); 1468 auto ctx = &m_conn.m_replyContext; 1469 if (sizeLn.startsWith("$-1")) { 1470 ctx.frontIsNull = true; 1471 ctx.hasData = true; 1472 ctx.data = null; 1473 } else { 1474 auto size = to!size_t(sizeLn[1 .. $]); 1475 auto data = new ubyte[size]; 1476 m_conn.conn.read(data); 1477 m_conn.conn.readLine(); 1478 ctx.frontIsNull = false; 1479 ctx.hasData = true; 1480 ctx.data = data; 1481 } 1482 } 1483 } 1484 1485 class RedisProtocolException : Exception { 1486 this(string message, string file = __FILE__, size_t line = __LINE__, Exception next = null) 1487 { 1488 super(message, file, line, next); 1489 } 1490 } 1491 1492 template isValidRedisValueReturn(T) 1493 { 1494 import std.typecons; 1495 static if (isInstanceOf!(Nullable, T)) { 1496 enum isValidRedisValueReturn = isValidRedisValueType!(typeof(T.init.get())); 1497 } else static if (isInstanceOf!(RedisReply, T)) { 1498 enum isValidRedisValueReturn = isValidRedisValueType!(T.ElementType); 1499 } else enum isValidRedisValueReturn = isValidRedisValueType!T; 1500 } 1501 1502 template isValidRedisValueType(T) 1503 { 1504 enum isValidRedisValueType = is(T : const(char)[]) || is(T : const(ubyte)[]) || is(T == long) || is(T == double) || is(T == bool); 1505 } 1506 1507 private RedisReply!T getReply(T = ubyte)(RedisConnection conn) 1508 { 1509 auto repl = RedisReply!T(conn); 1510 repl.initialize(); 1511 return repl; 1512 } 1513 1514 private struct RedisReplyContext { 1515 long refCount = 0; 1516 ubyte[] data; 1517 bool hasData; 1518 bool multi = false; 1519 bool initialized = false; 1520 bool frontIsNull = false; 1521 long length = 1; 1522 long index = 0; 1523 ubyte[128] dataBuffer; 1524 } 1525 1526 private final class RedisConnection { 1527 private { 1528 string m_host; 1529 ushort m_port; 1530 TCPConnection m_conn; 1531 string m_password; 1532 long m_selectedDB; 1533 RedisReplyContext m_replyContext; 1534 } 1535 1536 this(string host, ushort port) 1537 { 1538 m_host = host; 1539 m_port = port; 1540 } 1541 1542 @property TCPConnection conn() nothrow { return m_conn; } 1543 @property void conn(TCPConnection conn) nothrow { m_conn = conn; } 1544 1545 void setAuth(string password) 1546 { 1547 if (m_password == password) return; 1548 _request_reply(this, "AUTH", password); 1549 m_password = password; 1550 } 1551 1552 void setDB(long index) 1553 { 1554 if (index == m_selectedDB) return; 1555 _request_reply(this, "SELECT", index); 1556 m_selectedDB = index; 1557 } 1558 1559 private static long countArgs(ARGS...)(scope ARGS args) 1560 { 1561 long ret = 0; 1562 foreach (i, A; ARGS) { 1563 static if (isArray!A && !(is(A : const(ubyte[])) || is(A : const(char[])))) { 1564 foreach (arg; args[i]) 1565 ret += countArgs(arg); 1566 } else ret++; 1567 } 1568 return ret; 1569 } 1570 1571 unittest { 1572 assert(countArgs() == 0); 1573 assert(countArgs(1, 2, 3) == 3); 1574 assert(countArgs("1", ["2", "3", "4"]) == 4); 1575 assert(countArgs([["1", "2"], ["3"]]) == 3); 1576 } 1577 1578 private static void writeArgs(R, ARGS...)(R dst, scope ARGS args) 1579 if (isOutputRange!(R, char)) 1580 { 1581 foreach (i, A; ARGS) { 1582 static if (is(A == bool)) { 1583 writeArgs(dst, args[i] ? "1" : "0"); 1584 } else static if (is(A : long) || is(A : real) || is(A == string)) { 1585 auto alen = formattedLength(args[i]); 1586 enum fmt = "$%d\r\n"~typeFormatString!A~"\r\n"; 1587 dst.formattedWrite(fmt, alen, args[i]); 1588 } else static if (is(A : const(ubyte[])) || is(A : const(char[]))) { 1589 dst.formattedWrite("$%s\r\n", args[i].length); 1590 dst.put(args[i]); 1591 dst.put("\r\n"); 1592 } else static if (isArray!A) { 1593 foreach (arg; args[i]) 1594 writeArgs(dst, arg); 1595 } else static assert(false, "Unsupported Redis argument type: " ~ A.stringof); 1596 } 1597 } 1598 1599 unittest { 1600 import std.array : appender; 1601 auto dst = appender!string; 1602 writeArgs(dst, false, true, ["2", "3"], "4", 5.0); 1603 assert(dst.data == "$1\r\n0\r\n$1\r\n1\r\n$1\r\n2\r\n$1\r\n3\r\n$1\r\n4\r\n$1\r\n5\r\n"); 1604 } 1605 1606 private static long formattedLength(ARG)(scope ARG arg) 1607 { 1608 static if (is(ARG == string)) return arg.length; 1609 else { 1610 import vibe.internal.rangeutil; 1611 long length; 1612 auto rangeCnt = RangeCounter(() @trusted { return &length; } ()); 1613 rangeCnt.formattedWrite(typeFormatString!ARG, arg); 1614 return length; 1615 } 1616 } 1617 } 1618 1619 private void _request_void(ARGS...)(RedisConnection conn, string command, scope ARGS args) 1620 { 1621 import vibe.stream.wrapper; 1622 1623 if (!conn.conn || !conn.conn.connected) { 1624 try conn.conn = connectTCP(conn.m_host, conn.m_port); 1625 catch (Exception e) { 1626 throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e); 1627 } 1628 conn.conn.tcpNoDelay = true; 1629 } 1630 1631 auto nargs = conn.countArgs(args); 1632 auto rng = streamOutputRange(conn.conn); 1633 formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command); 1634 RedisConnection.writeArgs(() @trusted { return &rng; } (), args); 1635 } 1636 1637 private RedisReply!T _request_reply(T = ubyte[], ARGS...)(RedisConnection conn, string command, scope ARGS args) 1638 { 1639 import vibe.stream.wrapper; 1640 1641 if (!conn.conn || !conn.conn.connected) { 1642 try conn.conn = connectTCP(conn.m_host, conn.m_port); 1643 catch (Exception e) { 1644 throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e); 1645 } 1646 conn.conn.tcpNoDelay = true; 1647 } 1648 1649 auto nargs = conn.countArgs(args); 1650 auto rng = streamOutputRange(conn.conn); 1651 formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command); 1652 RedisConnection.writeArgs(() @trusted { return &rng; } (), args); 1653 rng.flush(); 1654 1655 return conn.getReply!T; 1656 } 1657 1658 private T _request(T, ARGS...)(LockedConnection!RedisConnection conn, string command, scope ARGS args) 1659 { 1660 import std.typecons; 1661 static if (isInstanceOf!(RedisReply, T)) { 1662 auto reply = _request_reply!(T.ElementType)(conn, command, args); 1663 reply.lockedConnection = conn; 1664 return reply; 1665 } else static if (is(T == void)) { 1666 _request_reply(conn, command, args); 1667 } else static if (isInstanceOf!(Nullable, T)) { 1668 alias TB = typeof(T.init.get()); 1669 auto reply = _request_reply!TB(conn, command, args); 1670 T ret; 1671 if (!reply.frontIsNull) ret = reply.front; 1672 return ret; 1673 } else { 1674 auto reply = _request_reply!T(conn, command, args); 1675 return reply.front; 1676 } 1677 } 1678 1679 private T convertToType(T)(ubyte[] data) /// NOTE: data must be unique! 1680 { 1681 static if (isSomeString!T) () @trusted { validate(cast(T)data); } (); 1682 1683 static if (is(T == ubyte[])) return data; 1684 else static if (is(T == string)) return cast(T)data.idup; 1685 else static if (is(T == bool)) return data[0] == '1'; 1686 else static if (is(T == int) || is(T == long) || is(T == size_t) || is(T == double)) { 1687 auto str = () @trusted { return cast(string)data; } (); 1688 return parse!T(str); 1689 } 1690 else static assert(false, "Unsupported Redis reply type: " ~ T.stringof); 1691 } 1692 1693 private template typeFormatString(T) 1694 { 1695 static if (isFloatingPoint!T) enum typeFormatString = "%.16g"; 1696 else enum typeFormatString = "%s"; 1697 }