1 /** 2 Redis database client implementation. 3 4 Copyright: © 2012-2016 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Jan Krüger, Sönke Ludwig, Michael Eisendle, Etienne Cimon 7 */ 8 module vibe.db.redis.redis; 9 10 public import vibe.core.net; 11 12 import vibe.core.connectionpool; 13 import vibe.core.core; 14 import vibe.core.log; 15 import vibe.inet.url; 16 import vibe.internal.allocator; 17 import vibe.internal.freelistref; 18 import vibe.stream.operations; 19 import std.conv; 20 import std.exception; 21 import std.format; 22 import std.range : isInputRange, isOutputRange; 23 import std.string; 24 import std.traits; 25 import std.typecons : Nullable; 26 import std.utf; 27 28 @safe: 29 30 31 /** 32 Returns a RedisClient that can be used to communicate to the specified database server. 33 */ 34 RedisClient connectRedis(string host, ushort port = RedisClient.defaultPort) 35 { 36 return new RedisClient(host, port); 37 } 38 39 /** 40 Returns a Redis database connection instance corresponding to the given URL. 41 42 The URL must be of the format "redis://server[:port]/dbnum". 43 44 Authentication: 45 Authenticated connections are supported by using a URL connection string 46 such as "redis://password@host". 47 48 Examples: 49 --- 50 // connecting with default settings: 51 auto redisDB = connectRedisDB("redis://127.0.0.1"); 52 --- 53 54 --- 55 // connecting using the URL form with custom settings 56 auto redisDB = connectRedisDB("redis://password:myremotehost/3?maxmemory=10000000"); 57 --- 58 59 Params: 60 url = Redis URI scheme for a Redis database instance 61 62 Returns: 63 A new RedisDatabase instance that can be used to access the database. 64 65 See_also: $(LINK2 https://www.iana.org/assignments/uri-schemes/prov/redis, Redis URI scheme) 66 */ 67 RedisDatabase connectRedisDB(URL url) 68 { 69 auto cli = connectRedis(url.host, url.port != 0 ? url.port : RedisClient.defaultPort); 70 71 if (!url.queryString.empty) 72 { 73 import vibe.inet.webform : FormFields, parseURLEncodedForm; 74 auto query = FormFields.init; 75 parseURLEncodedForm(url.queryString, query); 76 foreach (param, val; query.byKeyValue) 77 { 78 switch (param) 79 { 80 /** 81 The password to use for the Redis AUTH command comes from either the 82 password portion of the "userinfo" URI field or the value from the 83 key-value pair from the "query" URI field with the key "password". 84 If both the password portion of the "userinfo" URI field and a 85 "query" URI field key-value pair with the key "password" are present, 86 the semantics for what password to use for authentication are not 87 well-defined. Such situations therefore ought to be avoided. 88 */ 89 case "password": 90 if (!url.password.empty) 91 cli.auth(val); 92 break; 93 default: 94 throw new Exception(`Redis config parameter "` ~ param ~ `" isn't supported`); 95 } 96 } 97 } 98 99 /* 100 Redis' current optional authentication mechanism does not employ a 101 username, but this might change in the future 102 */ 103 if (!url.password.empty) 104 cli.auth(url.password); 105 106 long databaseIndex; 107 if (url.localURI.length >= 2) 108 databaseIndex = url.pathString[1 .. $].to!long; 109 110 return cli.getDatabase(databaseIndex); 111 } 112 113 /** 114 A redis client with connection pooling. 115 */ 116 final class RedisClient { 117 private { 118 ConnectionPool!RedisConnection m_connections; 119 string m_authPassword; 120 string m_version; 121 long m_selectedDB; 122 } 123 124 enum defaultPort = 6379; 125 126 this(string host = "127.0.0.1", ushort port = defaultPort) 127 { 128 m_connections = new ConnectionPool!RedisConnection({ 129 return new RedisConnection(host, port); 130 }); 131 } 132 133 /** Release all connections that are not in use. Call before exiting the 134 * program to avoid relying on the GC to clean up the sockets. 135 */ 136 void releaseUnusedConnections() @safe 137 { 138 // the try/catch in this is for the old vibe.d:core library, not 139 // necessary in vibe-core, but we have to work with both. 140 m_connections.removeUnused((conn) @safe nothrow { 141 try { 142 conn.m_conn.close(); 143 } catch(Exception e) { 144 logDebug("Failed to close unused Redis connection: %s", e.msg); 145 } 146 }); 147 } 148 149 /// Returns Redis version 150 @property string redisVersion() 151 { 152 if(m_version == "") 153 { 154 import std.string; 155 auto info = info(); 156 auto lines = info.splitLines(); 157 if (lines.length > 1) { 158 foreach (string line; lines) { 159 auto lineParams = line.split(":"); 160 if (lineParams.length > 1 && lineParams[0] == "redis_version") { 161 m_version = lineParams[1]; 162 break; 163 } 164 } 165 } 166 } 167 168 return m_version; 169 } 170 171 /** Returns a handle to the given database. 172 */ 173 RedisDatabase getDatabase(long index) { return RedisDatabase(this, index); } 174 175 /** Creates a RedisSubscriber instance for launching a pubsub listener 176 */ 177 RedisSubscriber createSubscriber() { 178 return RedisSubscriber(this); 179 } 180 181 /* 182 Connection 183 */ 184 185 /// Authenticate to the server 186 void auth(string password) { m_authPassword = password; } 187 /// Echo the given string 188 T echo(T, U)(U data) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("ECHO", data); } 189 /// Ping the server 190 void ping() { request("PING"); } 191 /// Close the connection 192 void quit() { request("QUIT"); } 193 194 /* 195 Server 196 */ 197 198 //TODO: BGREWRITEAOF 199 //TODO: BGSAVE 200 201 /// Get the value of a configuration parameter 202 T getConfig(T)(string parameter) if(isValidRedisValueReturn!T) { return request!T("CONFIG", "GET", parameter); } 203 /// Set a configuration parameter to the given value 204 void setConfig(T)(string parameter, T value) if(isValidRedisValueType!T) { request("CONFIG", "SET", parameter, value); } 205 /// Reset the stats returned by INFO 206 void configResetStat() { request("CONFIG", "RESETSTAT"); } 207 208 //TOOD: Debug Object 209 //TODO: Debug Segfault 210 211 /** Deletes all keys from all databases. 212 213 See_also: $(LINK2 http://redis.io/commands/flushall, FLUSHALL) 214 */ 215 void deleteAll() { request("FLUSHALL"); } 216 217 /// Get information and statistics about the server 218 string info() { return request!string("INFO"); } 219 /// Get the UNIX time stamp of the last successful save to disk 220 long lastSave() { return request!long("LASTSAVE"); } 221 //TODO monitor 222 /// Synchronously save the dataset to disk 223 void save() { request("SAVE"); } 224 /// Synchronously save the dataset to disk and then shut down the server 225 void shutdown() { request("SHUTDOWN"); } 226 /// Make the server a slave of another instance, or promote it as master 227 void slaveOf(string host, ushort port) { request("SLAVEOF", host, port); } 228 229 //TODO slowlog 230 //TODO sync 231 232 private T request(T = void, ARGS...)(string command, scope ARGS args) 233 { 234 return requestDB!(T, ARGS)(m_selectedDB, command, args); 235 } 236 237 private T requestDB(T, ARGS...)(long db, string command, scope ARGS args) 238 { 239 auto conn = m_connections.lockConnection(); 240 conn.setAuth(m_authPassword); 241 conn.setDB(db); 242 version (RedisDebug) { 243 import std.conv; 244 string debugargs = command; 245 foreach (i, A; ARGS) debugargs ~= ", " ~ args[i].to!string; 246 } 247 248 static if (is(T == void)) { 249 version (RedisDebug) logDebug("Redis request: %s => void", debugargs); 250 _request!void(conn, command, args); 251 } else static if (!isInstanceOf!(RedisReply, T)) { 252 auto ret = _request!T(conn, command, args); 253 version (RedisDebug) logDebug("Redis request: %s => %s", debugargs, ret.to!string); 254 return ret; 255 } else { 256 auto ret = _request!T(conn, command, args); 257 version (RedisDebug) logDebug("Redis request: %s => RedisReply", debugargs); 258 return ret; 259 } 260 } 261 } 262 263 264 /** 265 Accesses the contents of a Redis database 266 */ 267 struct RedisDatabase { 268 private { 269 RedisClient m_client; 270 long m_index; 271 } 272 273 private this(RedisClient client, long index) 274 { 275 m_client = client; 276 m_index = index; 277 } 278 279 /** The Redis client with which the database is accessed. 280 */ 281 @property inout(RedisClient) client() inout { return m_client; } 282 283 /** Index of the database. 284 */ 285 @property long index() const { return m_index; } 286 287 /** Deletes all keys of the database. 288 289 See_also: $(LINK2 http://redis.io/commands/flushdb, FLUSHDB) 290 */ 291 void deleteAll() { request!void("FLUSHDB"); } 292 /// Delete a key 293 long del(scope string[] keys...) { return request!long("DEL", keys); } 294 /// Determine if a key exists 295 bool exists(string key) { return request!bool("EXISTS", key); } 296 /// Set a key's time to live in seconds 297 bool expire(string key, long seconds) { return request!bool("EXPIRE", key, seconds); } 298 /// Set a key's time to live with D notation. E.g. $(D 5.minutes) for 60 * 5 seconds. 299 bool expire(string key, Duration timeout) { return request!bool("PEXPIRE", key, timeout.total!"msecs"); } 300 /// Set the expiration for a key as a UNIX timestamp 301 bool expireAt(string key, long timestamp) { return request!bool("EXPIREAT", key, timestamp); } 302 /// Find all keys matching the given glob-style pattern (Supported wildcards: *, ?, [ABC]) 303 RedisReply!T keys(T = string)(string pattern) if(isValidRedisValueType!T) { return request!(RedisReply!T)("KEYS", pattern); } 304 /// Move a key to another database 305 bool move(string key, long db) { return request!bool("MOVE", key, db); } 306 /// Remove the expiration from a key 307 bool persist(string key) { return request!bool("PERSIST", key); } 308 //TODO: object 309 /// Return a random key from the keyspace 310 string randomKey() { return request!string("RANDOMKEY"); } 311 /// Rename a key 312 void rename(string key, string newkey) { request("RENAME", key, newkey); } 313 /// Rename a key, only if the new key does not exist 314 bool renameNX(string key, string newkey) { return request!bool("RENAMENX", key, newkey); } 315 //TODO sort 316 /// Get the time to live for a key 317 long ttl(string key) { return request!long("TTL", key); } 318 /// Get the time to live for a key in milliseconds 319 long pttl(string key) { return request!long("PTTL", key); } 320 /// Determine the type stored at key (string, list, set, zset and hash.) 321 string type(string key) { return request!string("TYPE", key); } 322 323 /* 324 String Commands 325 */ 326 327 /// Append a value to a key 328 long append(T)(string key, T suffix) if(isValidRedisValueType!T) { return request!long("APPEND", key, suffix); } 329 /// Decrement the integer value of a key by one 330 long decr(string key, long value = 1) { return value == 1 ? request!long("DECR", key) : request!long("DECRBY", key, value); } 331 /// Get the value of a key 332 T get(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("GET", key); } 333 /// Returns the bit value at offset in the string value stored at key 334 bool getBit(string key, long offset) { return request!bool("GETBIT", key, offset); } 335 /// Get a substring of the string stored at a key 336 T getRange(T = string)(string key, long start, long end) if(isValidRedisValueReturn!T) { return request!T("GETRANGE", key, start, end); } 337 /// Set the string value of a key and return its old value 338 T getSet(T = string, U)(string key, U value) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("GETSET", key, value); } 339 /// Increment the integer value of a key 340 long incr(string key, long value = 1) { return value == 1 ? request!long("INCR", key) : request!long("INCRBY", key, value); } 341 /// Increment the real number value of a key 342 long incr(string key, double value) { return request!long("INCRBYFLOAT", key, value); } 343 /// Get the values of all the given keys 344 RedisReply!T mget(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("MGET", keys); } 345 346 /// Set multiple keys to multiple values 347 void mset(ARGS...)(ARGS args) 348 { 349 static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value"); 350 foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings."); 351 request("MSET", args); 352 } 353 354 /// Set multiple keys to multiple values, only if none of the keys exist 355 bool msetNX(ARGS...)(ARGS args) { 356 static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value"); 357 foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings."); 358 return request!bool("MSETEX", args); 359 } 360 361 /// Set the string value of a key 362 void set(T)(string key, T value) if(isValidRedisValueType!T) { request("SET", key, value); } 363 /// Set the value of a key, only if the key does not exist 364 bool setNX(T)(string key, T value) if(isValidRedisValueType!T) { return request!bool("SETNX", key, value); } 365 /// Set the value of a key, only if the key already exists 366 bool setXX(T)(string key, T value) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "XX"); } 367 /// Set the value of a key, only if the key does not exist, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes. 368 bool setNX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "NX"); } 369 /// Set the value of a key, only if the key already exists, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes. 370 bool setXX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "XX"); } 371 /// Sets or clears the bit at offset in the string value stored at key 372 bool setBit(string key, long offset, bool value) { return request!bool("SETBIT", key, offset, value ? "1" : "0"); } 373 /// Set the value and expiration of a key 374 void setEX(T)(string key, long seconds, T value) if(isValidRedisValueType!T) { request("SETEX", key, seconds, value); } 375 /// Overwrite part of a string at key starting at the specified offset 376 long setRange(T)(string key, long offset, T value) if(isValidRedisValueType!T) { return request!long("SETRANGE", key, offset, value); } 377 /// Get the length of the value stored in a key 378 long strlen(string key) { return request!long("STRLEN", key); } 379 380 /* 381 Hashes 382 */ 383 /// Delete one or more hash fields 384 long hdel(string key, scope string[] fields...) { return request!long("HDEL", key, fields); } 385 /// Determine if a hash field exists 386 bool hexists(string key, string field) { return request!bool("HEXISTS", key, field); } 387 /// Set multiple hash fields to multiple values 388 void hset(T)(string key, string field, T value) if(isValidRedisValueType!T) { request("HSET", key, field, value); } 389 /// Set the value of a hash field, only if the field does not exist 390 bool hsetNX(T)(string key, string field, T value) if(isValidRedisValueType!T) { return request!bool("HSETNX", key, field, value); } 391 /// Get the value of a hash field. 392 T hget(T = string)(string key, string field) if(isValidRedisValueReturn!T) { return request!T("HGET", key, field); } 393 /// Get all the fields and values in a hash 394 RedisReply!T hgetAll(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HGETALL", key); } 395 /// Increment the integer value of a hash field 396 long hincr(string key, string field, long value=1) { return request!long("HINCRBY", key, field, value); } 397 /// Increment the real number value of a hash field 398 long hincr(string key, string field, double value) { return request!long("HINCRBYFLOAT", key, field, value); } 399 /// Get all the fields in a hash 400 RedisReply!T hkeys(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HKEYS", key); } 401 /// Get the number of fields in a hash 402 long hlen(string key) { return request!long("HLEN", key); } 403 /// Get the values of all the given hash fields 404 RedisReply!T hmget(T = string)(string key, scope string[] fields...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HMGET", key, fields); } 405 /// Set multiple hash fields to multiple values 406 void hmset(ARGS...)(string key, ARGS args) { request("HMSET", key, args); } 407 408 /// Get all the values in a hash 409 RedisReply!T hvals(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HVALS", key); } 410 411 /* 412 Lists 413 */ 414 /// Get an element from a list by its index 415 T lindex(T = string)(string key, long index) if(isValidRedisValueReturn!T) { return request!T("LINDEX", key, index); } 416 /// Insert value in the list stored at key before the reference value pivot. 417 long linsertBefore(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "BEFORE", pivot, value); } 418 /// Insert value in the list stored at key after the reference value pivot. 419 long linsertAfter(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "AFTER", pivot, value); } 420 /// Returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned. 421 long llen(string key) { return request!long("LLEN", key); } 422 /// Insert all the specified values at the head of the list stored at key. 423 long lpush(ARGS...)(string key, ARGS args) { return request!long("LPUSH", key, args); } 424 /// Inserts value at the head of the list stored at key, only if key already exists and holds a list. 425 long lpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("LPUSHX", key, value); } 426 /// Insert all the specified values at the tail of the list stored at key. 427 long rpush(ARGS...)(string key, ARGS args) { return request!long("RPUSH", key, args); } 428 /// Inserts value at the tail of the list stored at key, only if key already exists and holds a list. 429 long rpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("RPUSHX", key, value); } 430 /// Returns the specified elements of the list stored at key. 431 RedisReply!T lrange(T = string)(string key, long start, long stop) { return request!(RedisReply!T)("LRANGE", key, start, stop); } 432 /// Removes the first count occurrences of elements equal to value from the list stored at key. 433 long lrem(T)(string key, long count, T value) if(isValidRedisValueType!T) { return request!long("LREM", key, count, value); } 434 /// Sets the list element at index to value. 435 void lset(T)(string key, long index, T value) if(isValidRedisValueType!T) { request("LSET", key, index, value); } 436 /// Trim an existing list so that it will contain only the specified range of elements specified. 437 /// Equivalent to $(D range = range[start .. stop+1]) 438 void ltrim(string key, long start, long stop) { request("LTRIM", key, start, stop); } 439 /// Removes and returns the last element of the list stored at key. 440 T rpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("RPOP", key); } 441 /// Removes and returns the first element of the list stored at key. 442 T lpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("LPOP", key); } 443 /// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks 444 /// the connection when there are no elements to pop from any of the given lists. 445 Nullable!(Tuple!(string, T)) blpop(T = string)(string key, long seconds) if(isValidRedisValueReturn!T) 446 { 447 auto reply = request!(RedisReply!(ubyte[]))("BLPOP", key, seconds); 448 Nullable!(Tuple!(string, T)) ret; 449 if (reply.empty || reply.frontIsNull) return ret; 450 string rkey = reply.front.convertToType!string(); 451 reply.popFront(); 452 ret = tuple(rkey, reply.front.convertToType!T()); 453 return ret; 454 } 455 /// Atomically returns and removes the last element (tail) of the list stored at source, 456 /// and pushes the element at the first element (head) of the list stored at destination. 457 T rpoplpush(T = string)(string key, string destination) if(isValidRedisValueReturn!T) { return request!T("RPOPLPUSH", key, destination); } 458 459 /* 460 Sets 461 */ 462 /// Add the specified members to the set stored at key. Specified members that are already a member of this set are ignored. 463 /// If key does not exist, a new set is created before adding the specified members. 464 long sadd(ARGS...)(string key, ARGS args) { return request!long("SADD", key, args); } 465 /// Returns the set cardinality (number of elements) of the set stored at key. 466 long scard(string key) { return request!long("SCARD", key); } 467 /// Returns the members of the set resulting from the difference between the first set and all the successive sets. 468 RedisReply!T sdiff(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SDIFF", keys); } 469 /// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination. 470 /// If destination already exists, it is overwritten. 471 long sdiffStore(string destination, scope string[] keys...) { return request!long("SDIFFSTORE", destination, keys); } 472 /// Returns the members of the set resulting from the intersection of all the given sets. 473 RedisReply!T sinter(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SINTER", keys); } 474 /// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination. 475 /// If destination already exists, it is overwritten. 476 long sinterStore(string destination, scope string[] keys...) { return request!long("SINTERSTORE", destination, keys); } 477 /// Returns if member is a member of the set stored at key. 478 bool sisMember(T)(string key, T member) if(isValidRedisValueType!T) { return request!bool("SISMEMBER", key, member); } 479 /// Returns all the members of the set value stored at key. 480 RedisReply!T smembers(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SMEMBERS", key); } 481 /// Move member from the set at source to the set at destination. This operation is atomic. 482 /// In every given moment the element will appear to be a member of source or destination for other clients. 483 bool smove(T)(string source, string destination, T member) if(isValidRedisValueType!T) { return request!bool("SMOVE", source, destination, member); } 484 /// Removes and returns a random element from the set value stored at key. 485 T spop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SPOP", key ); } 486 /// Returns a random element from the set stored at key. 487 T srandMember(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SRANDMEMBER", key ); } 488 ///returns count random elements from the set stored at key 489 RedisReply!T srandMember(T = string)(string key, long count) if(isValidRedisValueReturn!T) { return request!(RedisReply!T)("SRANDMEMBER", key, count ); } 490 491 492 /// Remove the specified members from the set stored at key. 493 long srem(ARGS...)(string key, ARGS args) { return request!long("SREM", key, args); } 494 /// Returns the members of the set resulting from the union of all the given sets. 495 RedisReply!T sunion(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SUNION", keys); } 496 /// This command is equal to SUNION, but instead of returning the resulting set, it is stored in destination. 497 long sunionStore(scope string[] keys...) { return request!long("SUNIONSTORE", keys); } 498 499 /* 500 Sorted Sets 501 */ 502 /// Add one or more members to a sorted set, or update its score if it already exists 503 long zadd(ARGS...)(string key, ARGS args) { return request!long("ZADD", key, args); } 504 /// Returns the sorted set cardinality (number of elements) of the sorted set stored at key. 505 long zcard(string key) { return request!long("ZCARD", key); } 506 /// Returns the number of elements in the sorted set at key with a score between min and max 507 long zcount(string RNG = "[]")(string key, double min, double max) { return request!long("ZCOUNT", key, getMinMaxArgs!RNG(min, max)); } 508 /// Increments the score of member in the sorted set stored at key by increment. 509 double zincrby(T)(string key, double value, T member) if (isValidRedisValueType!T) { return request!double("ZINCRBY", key, value, member); } 510 //TODO: zinterstore 511 /// Returns the specified range of elements in the sorted set stored at key. 512 RedisReply!T zrange(T = string)(string key, long start, long end, bool with_scores = false) 513 if(isValidRedisValueType!T) 514 { 515 if (with_scores) return request!(RedisReply!T)("ZRANGE", key, start, end, "WITHSCORES"); 516 else return request!(RedisReply!T)("ZRANGE", key, start, end); 517 } 518 519 long zlexCount(string key, string min = "-", string max = "+") { return request!long("ZLEXCOUNT", key, min, max); } 520 521 /// When all the elements in a sorted set are inserted with the same score, in order to force lexicographical ordering, 522 /// this command returns all the elements in the sorted set at key with a value between min and max. 523 RedisReply!T zrangeByLex(T = string)(string key, string min = "-", string max = "+", long offset = 0, long count = -1) 524 if(isValidRedisValueType!T) 525 { 526 if (offset > 0 || count != -1) return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max, "LIMIT", offset, count); 527 else return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max); 528 } 529 530 /// Returns all the elements in the sorted set at key with a score between start and end inclusively 531 RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, bool with_scores = false) 532 if(isValidRedisValueType!T) 533 { 534 if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES"); 535 else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end)); 536 } 537 538 /// Computes an internal list of elements in the sorted set at key with a score between start and end inclusively, 539 /// and returns a range subselection similar to $(D results[offset .. offset+count]) 540 RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, long offset, long count, bool with_scores = false) 541 if(isValidRedisValueType!T) 542 { 543 assert(offset >= 0); 544 assert(count >= 0); 545 if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES", "LIMIT", offset, count); 546 else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "LIMIT", offset, count); 547 } 548 549 /// Returns the rank of member in the sorted set stored at key, with the scores ordered from low to high. 550 long zrank(T)(string key, T member) 551 if (isValidRedisValueType!T) 552 { 553 auto str = request!string("ZRANK", key, member); 554 return str != "" ? parse!long(str) : -1; 555 } 556 557 /// Removes the specified members from the sorted set stored at key. 558 long zrem(ARGS...)(string key, ARGS members) { return request!long("ZREM", key, members); } 559 /// Removes all elements in the sorted set stored at key with rank between start and stop. 560 long zremRangeByRank(string key, long start, long stop) { return request!long("ZREMRANGEBYRANK", key, start, stop); } 561 /// Removes all elements in the sorted set stored at key with a score between min and max (inclusive). 562 long zremRangeByScore(string RNG = "[]")(string key, double min, double max) { return request!long("ZREMRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max));} 563 /// Returns the specified range of elements in the sorted set stored at key. 564 RedisReply!T zrevRange(T = string)(string key, long start, long end, bool with_scores = false) 565 if(isValidRedisValueType!T) 566 { 567 if (with_scores) return request!(RedisReply!T)("ZREVRANGE", key, start, end, "WITHSCORES"); 568 else return request!(RedisReply!T)("ZREVRANGE", key, start, end); 569 } 570 571 /// Returns all the elements in the sorted set at key with a score between max and min (including elements with score equal to max or min). 572 RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, bool with_scores=false) 573 if(isValidRedisValueType!T) 574 { 575 if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES"); 576 else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max)); 577 } 578 579 /// Computes an internal list of elements in the sorted set at key with a score between max and min, and 580 /// returns a window of elements selected in a way equivalent to $(D results[offset .. offset + count]) 581 RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, long offset, long count, bool with_scores=false) 582 if(isValidRedisValueType!T) 583 { 584 assert(offset >= 0); 585 assert(count >= 0); 586 if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES", "LIMIT", offset, count); 587 else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "LIMIT", offset, count); 588 } 589 590 /// Returns the rank of member in the sorted set stored at key, with the scores ordered from high to low. 591 long zrevRank(T)(string key, T member) 592 if (isValidRedisValueType!T) 593 { 594 auto str = request!string("ZREVRANK", key, member); 595 return str != "" ? parse!long(str) : -1; 596 } 597 598 /// Returns the score of member in the sorted set at key. 599 RedisReply!T zscore(T = string, U)(string key, U member) 600 if(isValidRedisValueType!T && isValidRedisValueType!U) 601 { 602 return request!(RedisReply!T)("ZSCORE", key, member); 603 } 604 605 /* 606 Hyperloglog 607 */ 608 609 /// Adds one or more Keys to a HyperLogLog data structure . 610 long pfadd(ARGS...)(string key, ARGS args) { return request!long("PFADD", key, args); } 611 612 /** Returns the approximated cardinality computed by the HyperLogLog data 613 structure stored at the specified key. 614 615 When called with a single key, returns the approximated cardinality 616 computed by the HyperLogLog data structure stored at the specified 617 variable, which is 0 if the variable does not exist. 618 619 When called with multiple keys, returns the approximated cardinality 620 of the union of the HyperLogLogs passed, by internally merging the 621 HyperLogLogs stored at the provided keys into a temporary HyperLogLog. 622 */ 623 long pfcount(scope string[] keys...) { return request!long("PFCOUNT", keys); } 624 625 /// Merge multiple HyperLogLog values into a new one. 626 void pfmerge(ARGS...)(string destkey, ARGS args) { request("PFMERGE", destkey, args); } 627 628 629 //TODO: zunionstore 630 631 /* 632 Pub / Sub 633 */ 634 635 /// Publishes a message to all clients subscribed at the channel 636 long publish(string channel, string message) 637 { 638 auto str = request!string("PUBLISH", channel, message); 639 return str != "" ? parse!long(str) : -1; 640 } 641 642 /// Inspect the state of the Pub/Sub subsystem 643 RedisReply!T pubsub(T = string)(string subcommand, scope string[] args...) 644 if(isValidRedisValueType!T) 645 { 646 return request!(RedisReply!T)("PUBSUB", subcommand, args); 647 } 648 649 /* 650 TODO: Transactions 651 */ 652 /// Return the number of keys in the selected database 653 long dbSize() { return request!long("DBSIZE"); } 654 655 /* 656 LUA Scripts 657 */ 658 /// Execute a Lua script server side 659 RedisReply!T eval(T = string, ARGS...)(string lua_code, scope string[] keys, scope ARGS args) 660 if(isValidRedisValueType!T) 661 { 662 return request!(RedisReply!T)("EVAL", lua_code, keys.length, keys, args); 663 } 664 /// Evaluates a script cached on the server side by its SHA1 digest. Scripts are cached on the server side using the scriptLoad function. 665 RedisReply!T evalSHA(T = string, ARGS...)(string sha, scope string[] keys, scope ARGS args) 666 if(isValidRedisValueType!T) 667 { 668 return request!(RedisReply!T)("EVALSHA", sha, keys.length, keys, args); 669 } 670 671 //scriptExists 672 //scriptFlush 673 //scriptKill 674 675 /// Load a script into the scripts cache, without executing it. Run it using evalSHA. 676 string scriptLoad(string lua_code) { return request!string("SCRIPT", "LOAD", lua_code); } 677 678 /// Run the specified command and arguments in the Redis database server 679 T request(T = void, ARGS...)(string command, scope ARGS args) 680 { 681 return m_client.requestDB!(T, ARGS)(m_index, command, args); 682 } 683 684 private static string[2] getMinMaxArgs(string RNG)(double min, double max) 685 { 686 // TODO: avoid GC allocations 687 static assert(RNG.length == 2, "The RNG range specification must be two characters long"); 688 689 string[2] ret; 690 string mins, maxs; 691 mins = min == -double.infinity ? "-inf" : min == double.infinity ? "+inf" : format(typeFormatString!double, min); 692 maxs = max == -double.infinity ? "-inf" : max == double.infinity ? "+inf" : format(typeFormatString!double, max); 693 694 static if (RNG[0] == '[') ret[0] = mins; 695 else static if (RNG[0] == '(') ret[0] = '('~mins; 696 else static assert(false, "Opening range specification mist be either '[' or '('."); 697 698 static if (RNG[1] == ']') ret[1] = maxs; 699 else static if (RNG[1] == ')') ret[1] = '('~maxs; 700 else static assert(false, "Closing range specification mist be either ']' or ')'."); 701 702 return ret; 703 } 704 } 705 706 707 /** 708 A redis subscription listener 709 */ 710 import std.datetime; 711 import std.variant; 712 import std.typecons : Tuple, tuple; 713 import std.container : Array; 714 import std.algorithm : canFind; 715 import std.range : takeOne; 716 import std.array : array; 717 718 import vibe.core.concurrency; 719 import vibe.core.sync; 720 721 alias RedisSubscriber = FreeListRef!RedisSubscriberImpl; 722 723 final class RedisSubscriberImpl { 724 private { 725 RedisClient m_client; 726 LockedConnection!RedisConnection m_lockedConnection; 727 bool[string] m_subscriptions; 728 string[] m_pendingSubscriptions; 729 bool m_listening; 730 bool m_stop; 731 Task m_listener; 732 Task m_listenerHelper; 733 Task m_waiter; 734 Task m_stopWaiter; 735 InterruptibleRecursiveTaskMutex m_mutex; 736 InterruptibleTaskMutex m_connMutex; 737 } 738 739 private enum Action { 740 DATA, 741 STOP, 742 STARTED, 743 SUBSCRIBE, 744 UNSUBSCRIBE 745 } 746 747 @property bool isListening() const { 748 return m_listening; 749 } 750 751 /// Get a list of channels with active subscriptions 752 @property string[] subscriptions() const { 753 return () @trusted { return m_subscriptions.keys; } (); 754 } 755 756 bool hasSubscription(string channel) const { 757 return (channel in m_subscriptions) !is null && m_subscriptions[channel]; 758 } 759 760 this(RedisClient client) { 761 762 logTrace("this()"); 763 m_client = client; 764 m_mutex = new InterruptibleRecursiveTaskMutex; 765 m_connMutex = new InterruptibleTaskMutex; 766 } 767 768 // FIXME: instead of waiting here, the class must be reference counted 769 // and destructions needs to be defered until everything is stopped 770 ~this() { 771 logTrace("~this"); 772 waitForStop(); 773 } 774 775 // Task will block until the listener is finished 776 private void waitForStop() 777 { 778 logTrace("waitForStop"); 779 if (!m_listening) return; 780 781 void impl() @safe { 782 m_mutex.performLocked!({ 783 m_stopWaiter = Task.getThis(); 784 }); 785 if (!m_listening) return; // verify again in case the mutex was locked by bstop 786 scope(exit) { 787 m_mutex.performLocked!({ 788 m_stopWaiter = Task(); 789 }); 790 } 791 bool stopped; 792 do { 793 () @trusted { receive((Action act) { if (act == Action.STOP) stopped = true; }); } (); 794 } while (!stopped); 795 796 enforce(stopped, "Failed to wait for Redis listener to stop"); 797 } 798 inTask(&impl); 799 } 800 801 /// Stop listening and yield until the operation is complete. 802 void bstop(){ 803 logTrace("bstop"); 804 if (!m_listening) return; 805 806 void impl() @safe { 807 m_mutex.performLocked!({ 808 m_waiter = Task.getThis(); 809 scope(exit) m_waiter = Task(); 810 stop(); 811 812 bool stopped; 813 do { 814 if (!() @trusted { return receiveTimeout(3.seconds, (Action act) { if (act == Action.STOP) stopped = true; }); } ()) 815 break; 816 } while (!stopped); 817 818 enforce(stopped, "Failed to wait for Redis listener to stop"); 819 }); 820 } 821 inTask(&impl); 822 } 823 824 /// Stop listening asynchroneously 825 void stop(){ 826 logTrace("stop"); 827 if (!m_listening) 828 return; 829 830 void impl() @safe { 831 m_mutex.performLocked!({ 832 m_stop = true; 833 () @trusted { m_listener.send(Action.STOP); } (); 834 // send a message to wake up the listenerHelper from the reply 835 if (m_subscriptions.length > 0) { 836 m_connMutex.performLocked!(() { 837 _request_void(m_lockedConnection, "UNSUBSCRIBE", () @trusted { return cast(string[]) m_subscriptions.keys.takeOne.array; } ()); 838 }); 839 sleep(30.msecs); 840 } 841 }); 842 } 843 inTask(&impl); 844 } 845 846 private bool hasNewSubscriptionIn(scope string[] args) { 847 bool has_new; 848 foreach (arg; args) 849 if (!hasSubscription(arg)) 850 has_new = true; 851 if (!has_new) 852 return false; 853 854 return true; 855 } 856 857 private bool anySubscribed(scope string[] args) { 858 859 bool any_subscribed; 860 foreach (arg ; args) { 861 if (hasSubscription(arg)) 862 any_subscribed = true; 863 } 864 return any_subscribed; 865 } 866 867 /// Completes the subscription for a listener to start receiving pubsub messages 868 /// on the corresponding channel(s). Returns instantly if already subscribed. 869 /// If a connection error is thrown here, it stops the listener. 870 void subscribe(scope string[] args...) 871 { 872 logTrace("subscribe"); 873 if (!m_listening) { 874 foreach (arg; args) 875 m_pendingSubscriptions ~= arg; 876 return; 877 } 878 879 if (!hasNewSubscriptionIn(args)) 880 return; 881 882 void impl() @safe { 883 884 scope(failure) { logTrace("Failure"); bstop(); } 885 try { 886 m_mutex.performLocked!({ 887 m_waiter = Task.getThis(); 888 scope(exit) m_waiter = Task(); 889 bool subscribed; 890 m_connMutex.performLocked!({ 891 _request_void(m_lockedConnection, "SUBSCRIBE", args); 892 }); 893 while(!() @trusted { return m_subscriptions.byKey.canFind(args); } ()) { 894 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE); }); } ()) 895 break; 896 897 subscribed = true; 898 } 899 debug { 900 auto keys = () @trusted { return m_subscriptions.keys; } (); 901 logTrace("Can find keys?: %s", keys.canFind(args)); 902 logTrace("Subscriptions: %s", keys); 903 } 904 enforce(subscribed, "Could not complete subscription(s)."); 905 }); 906 } catch (Exception e) { 907 logDebug("Redis subscribe() failed: ", e.msg); 908 } 909 } 910 inTask(&impl); 911 } 912 913 /// Unsubscribes from the channel(s) specified, returns immediately if none 914 /// is currently being listened. 915 /// If a connection error is thrown here, it stops the listener. 916 void unsubscribe(scope string[] args...) 917 { 918 logTrace("unsubscribe"); 919 920 void impl() @safe { 921 922 if (!anySubscribed(args)) 923 return; 924 925 scope(failure) bstop(); 926 assert(m_listening); 927 928 m_mutex.performLocked!({ 929 m_waiter = Task.getThis(); 930 scope(exit) m_waiter = Task(); 931 bool unsubscribed; 932 m_connMutex.performLocked!({ 933 _request_void(m_lockedConnection, "UNSUBSCRIBE", args); 934 }); 935 while(() @trusted { return m_subscriptions.byKey.canFind(args); } ()) { 936 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE); }); } ()) { 937 unsubscribed = false; 938 break; 939 } 940 unsubscribed = true; 941 } 942 debug { 943 auto keys = () @trusted { return m_subscriptions.keys; } (); 944 logTrace("Can find keys?: %s", keys.canFind(args)); 945 logTrace("Subscriptions: %s", keys); 946 } 947 enforce(unsubscribed, "Could not complete unsubscription(s)."); 948 }); 949 } 950 inTask(&impl); 951 } 952 953 /// Same as subscribe, but uses glob patterns, and does not return instantly if 954 /// the subscriptions are already registered. 955 /// throws Exception if the pattern does not yield a new subscription. 956 void psubscribe(scope string[] args...) 957 { 958 logTrace("psubscribe"); 959 void impl() @safe { 960 scope(failure) bstop(); 961 assert(m_listening); 962 m_mutex.performLocked!({ 963 m_waiter = Task.getThis(); 964 scope(exit) m_waiter = Task(); 965 bool subscribed; 966 m_connMutex.performLocked!({ 967 _request_void(m_lockedConnection, "PSUBSCRIBE", args); 968 }); 969 970 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE); }); } ()) 971 subscribed = false; 972 else 973 subscribed = true; 974 975 debug logTrace("Subscriptions: %s", () @trusted { return m_subscriptions.keys; } ()); 976 enforce(subscribed, "Could not complete subscription(s)."); 977 }); 978 } 979 inTask(&impl); 980 } 981 982 /// Same as unsubscribe, but uses glob patterns, and does not return instantly if 983 /// the subscriptions are not registered. 984 /// throws Exception if the pattern does not yield a new unsubscription. 985 void punsubscribe(scope string[] args...) 986 { 987 logTrace("punsubscribe"); 988 void impl() @safe { 989 scope(failure) bstop(); 990 assert(m_listening); 991 m_mutex.performLocked!({ 992 m_waiter = Task.getThis(); 993 scope(exit) m_waiter = Task(); 994 bool unsubscribed; 995 m_connMutex.performLocked!({ 996 _request_void(m_lockedConnection, "PUNSUBSCRIBE", args); 997 }); 998 if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE); }); } ()) 999 unsubscribed = false; 1000 else 1001 unsubscribed = true; 1002 1003 debug { 1004 auto keys = () @trusted { return m_subscriptions.keys; } (); 1005 logTrace("Can find keys?: %s", keys.canFind(args)); 1006 logTrace("Subscriptions: %s", keys); 1007 } 1008 enforce(unsubscribed, "Could not complete unsubscription(s)."); 1009 }); 1010 } 1011 inTask(&impl); 1012 } 1013 1014 private void inTask(scope void delegate() @safe impl) { 1015 logTrace("inTask"); 1016 if (Task.getThis() == Task()) 1017 { 1018 Throwable ex; 1019 bool done; 1020 Task task = runTask({ 1021 logDebug("inTask %s", Task.getThis()); 1022 try impl(); 1023 catch (Exception e) { 1024 ex = e; 1025 } 1026 done = true; 1027 }); 1028 task.join(); 1029 logDebug("done"); 1030 if (ex) 1031 throw ex; 1032 } 1033 else 1034 impl(); 1035 } 1036 1037 private void init(){ 1038 1039 logTrace("init"); 1040 if (!m_lockedConnection) { 1041 m_lockedConnection = m_client.m_connections.lockConnection(); 1042 m_lockedConnection.setAuth(m_client.m_authPassword); 1043 m_lockedConnection.setDB(m_client.m_selectedDB); 1044 } 1045 1046 if (!m_lockedConnection.conn || !m_lockedConnection.conn.connected) { 1047 try m_lockedConnection.conn = connectTCP(m_lockedConnection.m_host, m_lockedConnection.m_port); 1048 catch (Exception e) { 1049 throw new Exception(format("Failed to connect to Redis server at %s:%s.", m_lockedConnection.m_host, m_lockedConnection.m_port), __FILE__, __LINE__, e); 1050 } 1051 m_lockedConnection.conn.tcpNoDelay = true; 1052 m_lockedConnection.setAuth(m_client.m_authPassword); 1053 m_lockedConnection.setDB(m_client.m_selectedDB); 1054 } 1055 } 1056 1057 // Same as listen, but blocking 1058 void blisten(void delegate(string, string) @safe onMessage, Duration timeout = 0.seconds) 1059 { 1060 init(); 1061 1062 void onSubscribe(string channel) @safe { 1063 logTrace("Callback subscribe(%s)", channel); 1064 m_subscriptions[channel] = true; 1065 if (m_waiter != Task()) 1066 () @trusted { m_waiter.send(Action.SUBSCRIBE); } (); 1067 } 1068 1069 void onUnsubscribe(string channel) @safe { 1070 logTrace("Callback unsubscribe(%s)", channel); 1071 m_subscriptions.remove(channel); 1072 if (m_waiter != Task()) 1073 () @trusted { m_waiter.send(Action.UNSUBSCRIBE); } (); 1074 } 1075 1076 void teardown() @safe { // teardown 1077 logTrace("Redis listener exiting"); 1078 // More publish commands may be sent to this connection after recycling it, so we 1079 // actively destroy it 1080 m_lockedConnection.conn.close(); 1081 m_lockedConnection.destroy(); 1082 m_listening = false; 1083 } 1084 // http://redis.io/topics/pubsub 1085 /** 1086 SUBSCRIBE first second 1087 *3 1088 $9 1089 subscribe 1090 $5 1091 first 1092 :1 1093 *3 1094 $9 1095 subscribe 1096 $6 1097 second 1098 :2 1099 */ 1100 // This is a simple parser/handler for subscribe/unsubscribe/publish 1101 // commands sent by redis. The PubSub client protocol is simple enough 1102 1103 void pubsub_handler() { 1104 TCPConnection conn = m_lockedConnection.conn; 1105 logTrace("Pubsub handler"); 1106 void dropCRLF() @safe { 1107 ubyte[2] crlf; 1108 conn.read(crlf); 1109 } 1110 size_t readArgs() @safe { 1111 char[8] ucnt; 1112 ubyte[1] num; 1113 size_t i; 1114 do { 1115 conn.read(num); 1116 if (num[0] >= 48 && num[0] <= 57) 1117 ucnt[i] = num[0]; 1118 else break; 1119 i++; 1120 } 1121 while (true); // ascii 1122 ubyte[1] b; 1123 conn.read(b); 1124 logTrace("Found %s", ucnt); 1125 // the new line is consumed when num is not in range. 1126 return ucnt[0 .. i].to!size_t; 1127 } 1128 // find the number of arguments in the array 1129 ubyte[1] symbol; 1130 conn.read(symbol); 1131 enforce(symbol[0] == '*', "Expected '*', got '" ~ symbol.to!string ~ "'"); 1132 size_t args = readArgs(); 1133 // get the number of characters in the first string (the command) 1134 conn.read(symbol); 1135 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1136 size_t cnt = readArgs(); 1137 ubyte[] cmd = () @trusted { return theAllocator.makeArray!ubyte(cnt); } (); 1138 scope(exit) () @trusted { theAllocator.dispose(cmd); } (); 1139 conn.read(cmd); 1140 dropCRLF(); 1141 // find the channel 1142 conn.read(symbol); 1143 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1144 cnt = readArgs(); 1145 ubyte[] str = new ubyte[cnt]; 1146 conn.read(str); 1147 dropCRLF(); 1148 string channel = () @trusted { return cast(string)str; } (); 1149 logTrace("chan: %s", channel); 1150 1151 if (cmd == "message") { // find the message 1152 conn.read(symbol); 1153 enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'"); 1154 cnt = readArgs(); 1155 str = new ubyte[cnt]; 1156 conn.read(str); // channel 1157 string message = () @trusted { return cast(string)str.idup; } (); 1158 logTrace("msg: %s", message); 1159 dropCRLF(); 1160 onMessage(channel, message); 1161 } 1162 else if (cmd == "subscribe" || cmd == "unsubscribe") { // find the remaining subscriptions 1163 bool is_subscribe = (cmd == "subscribe"); 1164 conn.read(symbol); 1165 enforce(symbol[0] == ':', "Expected ':', got '" ~ symbol.to!string ~ "'"); 1166 cnt = readArgs(); // number of subscriptions 1167 logTrace("subscriptions: %d", cnt); 1168 if (is_subscribe) 1169 onSubscribe(channel); 1170 else 1171 onUnsubscribe(channel); 1172 1173 // todo: enforce the number of subscriptions? 1174 } 1175 else assert(false, "Unrecognized pubsub wire protocol command received"); 1176 } 1177 1178 // Waits for data and advises the handler 1179 m_listenerHelper = runTask(() nothrow { 1180 loop: while(true) { 1181 if (m_stop || !m_lockedConnection.conn) break; 1182 1183 try { 1184 const waitResult = m_lockedConnection.conn.waitForDataEx(100.msecs); 1185 if (m_stop) break; 1186 1187 final switch (waitResult) { 1188 case WaitForDataStatus.noMoreData: 1189 break loop; 1190 case WaitForDataStatus.timeout: 1191 logTrace("No data arrival in 100 ms..."); 1192 continue loop; 1193 case WaitForDataStatus.dataAvailable: 1194 } 1195 1196 // Data has arrived, this task is in charge of notifying the main handler loop 1197 logTrace("Notify data arrival"); 1198 1199 () @trusted { receiveTimeout(0.seconds, (Variant v) {}); } (); // clear message queue 1200 () @trusted { m_listener.send(Action.DATA); } (); 1201 if (!() @trusted { return receiveTimeout(5.seconds, (Action act) { assert(act == Action.DATA); }); } ()) 1202 assert(false); 1203 } catch (Exception e) { 1204 logException(e, "Redis listen task failed - stopping to listen"); 1205 break; 1206 } 1207 } 1208 1209 logTrace("Listener Helper exit."); 1210 try () @trusted { m_listener.send(Action.STOP); } (); 1211 catch (Exception e) assert(false, e.msg); 1212 } ); 1213 1214 m_listening = true; 1215 logTrace("Redis listener now listening"); 1216 if (m_waiter != Task()) 1217 () @trusted { m_waiter.send(Action.STARTED); } (); 1218 1219 if (timeout == 0.seconds) 1220 timeout = 365.days; // make sure 0.seconds is considered as big. 1221 1222 scope(exit) { 1223 logTrace("Redis Listener exit."); 1224 if (!m_stop) { 1225 stop(); // notifies the listenerHelper 1226 } 1227 m_listenerHelper.join(); 1228 // close the data connections 1229 teardown(); 1230 1231 if (m_waiter != Task()) 1232 () @trusted { m_waiter.send(Action.STOP); } (); 1233 if (m_stopWaiter != Task()) 1234 () @trusted { m_stopWaiter.send(Action.STOP); } (); 1235 1236 m_listenerHelper = Task(); 1237 m_listener = Task(); 1238 m_stop = false; 1239 } 1240 1241 // Start waiting for data notifications to arrive 1242 while(true) { 1243 1244 auto handler = (Action act) { 1245 if (act == Action.STOP) m_stop = true; 1246 if (m_stop) return; 1247 logTrace("Calling PubSub Handler"); 1248 m_connMutex.performLocked!({ 1249 pubsub_handler(); // handles one command at a time 1250 }); 1251 () @trusted { m_listenerHelper.send(Action.DATA); } (); 1252 }; 1253 1254 if (!() @trusted { return receiveTimeout(timeout, handler); } () || m_stop) { 1255 logTrace("Redis Listener stopped"); 1256 break; 1257 } 1258 1259 } 1260 1261 } 1262 /// ditto 1263 deprecated("Use an @safe message callback") 1264 void blisten(void delegate(string, string) @system onMessage, Duration timeout = 0.seconds) 1265 { 1266 blisten((string ch, string msg) @trusted => onMessage(ch, msg)); 1267 } 1268 1269 /// Waits for messages and calls the callback with the channel and the message as arguments. 1270 /// The timeout is passed over to the listener, which closes after the period of inactivity. 1271 /// Use 0.seconds timeout to specify a very long time (365 days) 1272 /// Errors will be sent to Callback Delegate on channel "Error". 1273 Task listen(void delegate(string, string) @safe nothrow callback, Duration timeout = 0.seconds) 1274 { 1275 logTrace("Listen"); 1276 void impl() @safe { 1277 logTrace("Listen"); 1278 m_waiter = Task.getThis(); 1279 scope(exit) m_waiter = Task(); 1280 Throwable ex; 1281 m_listener = runTask(() nothrow { 1282 try blisten(callback, timeout); 1283 catch(Exception e) { 1284 ex = e; 1285 if (m_waiter != Task() && !m_listening) { 1286 try () @trusted { m_waiter.send(Action.STARTED); } (); 1287 catch (Exception e) assert(false, e.msg); 1288 return; 1289 } 1290 callback("Error", e.msg); 1291 } 1292 }); 1293 m_mutex.performLocked!({ 1294 import std.datetime : usecs; 1295 () @trusted { receiveTimeout(2.seconds, (Action act) { assert( act == Action.STARTED); }); } (); 1296 if (ex) throw ex; 1297 enforce(m_listening, "Failed to start listening, timeout of 2 seconds expired"); 1298 }); 1299 1300 foreach (channel; m_pendingSubscriptions) { 1301 subscribe(channel); 1302 } 1303 1304 m_pendingSubscriptions = null; 1305 } 1306 inTask(&impl); 1307 return m_listener; 1308 } 1309 /// ditto 1310 deprecated("Use an `@safe` message callback") 1311 Task listen(void delegate(string, string) @system onMessage, Duration timeout = 0.seconds) 1312 { 1313 return listen((string ch, string msg) @trusted => onMessage(ch, msg)); 1314 } 1315 /// ditto 1316 deprecated("Use a `nothrow` message callback") 1317 Task listen(void delegate(string, string) @safe callback, Duration timeout = 0.seconds) 1318 { 1319 return listen((ch, msg) @safe nothrow { 1320 try callback(ch, msg); 1321 catch (Exception e) logException(e, "Uncaught exception in Redis listen callback"); 1322 }, timeout); 1323 } 1324 } 1325 1326 1327 1328 /** Range interface to a single Redis reply. 1329 */ 1330 struct RedisReply(T = ubyte[]) { 1331 static assert(isInputRange!RedisReply); 1332 1333 private { 1334 uint m_magic = 0x15f67ab3; 1335 RedisConnection m_conn; 1336 LockedConnection!RedisConnection m_lockedConnection; 1337 } 1338 1339 alias ElementType = T; 1340 1341 private this(RedisConnection conn) 1342 { 1343 m_conn = conn; 1344 auto ctx = &conn.m_replyContext; 1345 assert(ctx.refCount == 0); 1346 *ctx = RedisReplyContext.init; 1347 ctx.refCount++; 1348 } 1349 1350 this(this) 1351 { 1352 assert(m_magic == 0x15f67ab3); 1353 if (m_conn) { 1354 auto ctx = &m_conn.m_replyContext; 1355 assert(ctx.refCount > 0); 1356 ctx.refCount++; 1357 } 1358 } 1359 1360 ~this() 1361 { 1362 assert(m_magic == 0x15f67ab3); 1363 if (m_conn) { 1364 if (!--m_conn.m_replyContext.refCount) 1365 drop(); 1366 } 1367 } 1368 1369 @property bool empty() const { return !m_conn || m_conn.m_replyContext.index >= m_conn.m_replyContext.length; } 1370 1371 /** Returns the current element of the reply. 1372 1373 Note that byte and character arrays may be returned as slices to a 1374 temporary buffer. This buffer will be invalidated on the next call to 1375 $(D popFront), so it needs to be duplicated for permanent storage. 1376 */ 1377 @property T front() 1378 { 1379 assert(!empty, "Accessing the front of an empty RedisReply!"); 1380 auto ctx = &m_conn.m_replyContext; 1381 if (!ctx.hasData) readData(); 1382 1383 ubyte[] ret = ctx.data; 1384 1385 return convertToType!T(ret); 1386 } 1387 1388 @property bool frontIsNull() 1389 const { 1390 assert(!empty, "Accessing the front of an empty RedisReply!"); 1391 return m_conn.m_replyContext.frontIsNull; 1392 } 1393 1394 /** Pops the current element of the reply 1395 */ 1396 void popFront() 1397 { 1398 assert(!empty, "Popping the front of an empty RedisReply!"); 1399 1400 auto ctx = &m_conn.m_replyContext; 1401 1402 if (!ctx.hasData) readData(); // ensure that we actually read the data entry from the wire 1403 clearData(); 1404 ctx.index++; 1405 1406 if (ctx.index >= ctx.length && ctx.refCount == 1) { 1407 ctx.refCount = 0; 1408 m_conn = null; 1409 m_lockedConnection.destroy(); 1410 } 1411 } 1412 1413 1414 /// Legacy property for hasNext/next based iteration 1415 @property bool hasNext() const { return !empty; } 1416 1417 /// Legacy property for hasNext/next based iteration 1418 TN next(TN : E[], E)() 1419 { 1420 assert(hasNext, "end of reply"); 1421 1422 auto ret = front.dup; 1423 popFront(); 1424 return () @trusted { return cast(TN)ret; } (); 1425 } 1426 1427 void drop() 1428 { 1429 if (!m_conn) return; 1430 while (!empty) popFront(); 1431 } 1432 1433 private void readData() 1434 { 1435 auto ctx = &m_conn.m_replyContext; 1436 assert(!ctx.hasData && ctx.initialized); 1437 1438 if (ctx.multi) 1439 readBulk(() @trusted { return cast(string)m_conn.conn.readLine(); } ()); 1440 } 1441 1442 private void clearData() 1443 { 1444 auto ctx = &m_conn.m_replyContext; 1445 ctx.data = null; 1446 ctx.hasData = false; 1447 } 1448 1449 private @property void lockedConnection(ref LockedConnection!RedisConnection conn) 1450 { 1451 assert(m_conn !is null); 1452 m_lockedConnection = conn; 1453 } 1454 1455 private void initialize() 1456 { 1457 assert(m_conn !is null); 1458 auto ctx = &m_conn.m_replyContext; 1459 assert(!ctx.initialized); 1460 ctx.initialized = true; 1461 1462 ubyte[] ln = m_conn.conn.readLine(); 1463 1464 switch (ln[0]) { 1465 default: 1466 m_conn.conn.close(); 1467 throw new Exception(format("Unknown reply type: %s", cast(char)ln[0])); 1468 case '+': ctx.data = ln[1 .. $]; ctx.hasData = true; break; 1469 case '-': throw new Exception(() @trusted { return cast(string)ln[1 .. $]; } ()); 1470 case ':': ctx.data = ln[1 .. $]; ctx.hasData = true; break; 1471 case '$': 1472 readBulk(() @trusted { return cast(string)ln; } ()); 1473 break; 1474 case '*': 1475 if (ln.startsWith(cast(const(ubyte)[])"*-1")) { 1476 ctx.length = 0; // TODO: make this NIL reply distinguishable from a 0-length array 1477 } else { 1478 ctx.multi = true; 1479 scope (failure) m_conn.conn.close(); 1480 ctx.length = to!long(() @trusted { return cast(string)ln[1 .. $]; } ()); 1481 } 1482 break; 1483 } 1484 } 1485 1486 private void readBulk(string sizeLn) 1487 { 1488 assert(m_conn !is null); 1489 auto ctx = &m_conn.m_replyContext; 1490 if (sizeLn.startsWith("$-1")) { 1491 ctx.frontIsNull = true; 1492 ctx.hasData = true; 1493 ctx.data = null; 1494 } else { 1495 auto size = to!size_t(sizeLn[1 .. $]); 1496 auto data = new ubyte[size]; 1497 m_conn.conn.read(data); 1498 m_conn.conn.readLine(); 1499 ctx.frontIsNull = false; 1500 ctx.hasData = true; 1501 ctx.data = data; 1502 } 1503 } 1504 } 1505 1506 class RedisProtocolException : Exception { 1507 this(string message, string file = __FILE__, size_t line = __LINE__, Exception next = null) 1508 { 1509 super(message, file, line, next); 1510 } 1511 } 1512 1513 template isValidRedisValueReturn(T) 1514 { 1515 import std.typecons; 1516 static if (isInstanceOf!(Nullable, T)) { 1517 enum isValidRedisValueReturn = isValidRedisValueType!(typeof(T.init.get())); 1518 } else static if (isInstanceOf!(RedisReply, T)) { 1519 enum isValidRedisValueReturn = isValidRedisValueType!(T.ElementType); 1520 } else enum isValidRedisValueReturn = isValidRedisValueType!T; 1521 } 1522 1523 template isValidRedisValueType(T) 1524 { 1525 enum isValidRedisValueType = is(T : const(char)[]) || is(T : const(ubyte)[]) || is(T == long) || is(T == double) || is(T == bool); 1526 } 1527 1528 private RedisReply!T getReply(T = ubyte)(RedisConnection conn) 1529 { 1530 auto repl = RedisReply!T(conn); 1531 repl.initialize(); 1532 return repl; 1533 } 1534 1535 private struct RedisReplyContext { 1536 long refCount = 0; 1537 ubyte[] data; 1538 bool hasData; 1539 bool multi = false; 1540 bool initialized = false; 1541 bool frontIsNull = false; 1542 long length = 1; 1543 long index = 0; 1544 ubyte[128] dataBuffer; 1545 } 1546 1547 private final class RedisConnection { 1548 private { 1549 string m_host; 1550 ushort m_port; 1551 TCPConnection m_conn; 1552 string m_password; 1553 long m_selectedDB; 1554 RedisReplyContext m_replyContext; 1555 } 1556 1557 this(string host, ushort port) 1558 { 1559 m_host = host; 1560 m_port = port; 1561 } 1562 1563 @property TCPConnection conn() nothrow { return m_conn; } 1564 @property void conn(TCPConnection conn) nothrow { m_conn = conn; } 1565 1566 void setAuth(string password) 1567 { 1568 if (m_password == password) return; 1569 _request_reply(this, "AUTH", password); 1570 m_password = password; 1571 } 1572 1573 void setDB(long index) 1574 { 1575 if (index == m_selectedDB) return; 1576 _request_reply(this, "SELECT", index); 1577 m_selectedDB = index; 1578 } 1579 1580 private static long countArgs(ARGS...)(scope ARGS args) 1581 { 1582 long ret = 0; 1583 foreach (i, A; ARGS) { 1584 static if (isArray!A && !(is(A : const(ubyte[])) || is(A : const(char[])))) { 1585 foreach (arg; args[i]) 1586 ret += countArgs(arg); 1587 } else ret++; 1588 } 1589 return ret; 1590 } 1591 1592 unittest { 1593 assert(countArgs() == 0); 1594 assert(countArgs(1, 2, 3) == 3); 1595 assert(countArgs("1", ["2", "3", "4"]) == 4); 1596 assert(countArgs([["1", "2"], ["3"]]) == 3); 1597 } 1598 1599 private static void writeArgs(R, ARGS...)(R dst, scope ARGS args) 1600 if (isOutputRange!(R, char)) 1601 { 1602 foreach (i, A; ARGS) { 1603 static if (is(A == bool)) { 1604 writeArgs(dst, args[i] ? "1" : "0"); 1605 } else static if (is(A : long) || is(A : real) || is(A == string)) { 1606 auto alen = formattedLength(args[i]); 1607 enum fmt = "$%d\r\n"~typeFormatString!A~"\r\n"; 1608 dst.formattedWrite(fmt, alen, args[i]); 1609 } else static if (is(A : const(ubyte[])) || is(A : const(char[]))) { 1610 dst.formattedWrite("$%s\r\n", args[i].length); 1611 dst.put(args[i]); 1612 dst.put("\r\n"); 1613 } else static if (isArray!A) { 1614 foreach (arg; args[i]) 1615 writeArgs(dst, arg); 1616 } else static assert(false, "Unsupported Redis argument type: " ~ A.stringof); 1617 } 1618 } 1619 1620 unittest { 1621 import std.array : appender; 1622 auto dst = appender!string; 1623 writeArgs(dst, false, true, ["2", "3"], "4", 5.0); 1624 assert(dst.data == "$1\r\n0\r\n$1\r\n1\r\n$1\r\n2\r\n$1\r\n3\r\n$1\r\n4\r\n$1\r\n5\r\n"); 1625 } 1626 1627 private static long formattedLength(ARG)(scope ARG arg) 1628 { 1629 static if (is(ARG == string)) return arg.length; 1630 else { 1631 import vibe.internal.rangeutil; 1632 long length; 1633 auto rangeCnt = RangeCounter(() @trusted { return &length; } ()); 1634 rangeCnt.formattedWrite(typeFormatString!ARG, arg); 1635 return length; 1636 } 1637 } 1638 } 1639 1640 private void _request_void(ARGS...)(RedisConnection conn, string command, scope ARGS args) 1641 { 1642 import vibe.stream.wrapper; 1643 1644 if (!conn.conn || !conn.conn.connected) { 1645 try conn.conn = connectTCP(conn.m_host, conn.m_port); 1646 catch (Exception e) { 1647 throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e); 1648 } 1649 conn.conn.tcpNoDelay = true; 1650 } 1651 1652 auto nargs = conn.countArgs(args); 1653 auto rng = streamOutputRange(conn.conn); 1654 formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command); 1655 RedisConnection.writeArgs(() @trusted { return &rng; } (), args); 1656 } 1657 1658 private RedisReply!T _request_reply(T = ubyte[], ARGS...)(RedisConnection conn, string command, scope ARGS args) 1659 { 1660 import vibe.stream.wrapper; 1661 1662 if (!conn.conn || !conn.conn.connected) { 1663 try conn.conn = connectTCP(conn.m_host, conn.m_port); 1664 catch (Exception e) { 1665 throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e); 1666 } 1667 conn.conn.tcpNoDelay = true; 1668 } 1669 1670 auto nargs = conn.countArgs(args); 1671 auto rng = streamOutputRange(conn.conn); 1672 formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command); 1673 RedisConnection.writeArgs(() @trusted { return &rng; } (), args); 1674 rng.flush(); 1675 1676 return conn.getReply!T; 1677 } 1678 1679 private T _request(T, ARGS...)(LockedConnection!RedisConnection conn, string command, scope ARGS args) 1680 { 1681 import std.typecons; 1682 static if (isInstanceOf!(RedisReply, T)) { 1683 auto reply = _request_reply!(T.ElementType)(conn, command, args); 1684 reply.lockedConnection = conn; 1685 return reply; 1686 } else static if (is(T == void)) { 1687 _request_reply(conn, command, args); 1688 } else static if (isInstanceOf!(Nullable, T)) { 1689 alias TB = typeof(T.init.get()); 1690 auto reply = _request_reply!TB(conn, command, args); 1691 T ret; 1692 if (!reply.frontIsNull) ret = reply.front; 1693 return ret; 1694 } else { 1695 auto reply = _request_reply!T(conn, command, args); 1696 return reply.front; 1697 } 1698 } 1699 1700 private T convertToType(T)(ubyte[] data) /// NOTE: data must be unique! 1701 { 1702 static if (isSomeString!T) () @trusted { validate(cast(T)data); } (); 1703 1704 static if (is(T == ubyte[])) return data; 1705 else static if (is(T == string)) return cast(T)data.idup; 1706 else static if (is(T == bool)) return data[0] == '1'; 1707 else static if (is(T == int) || is(T == long) || is(T == size_t) || is(T == double)) { 1708 auto str = () @trusted { return cast(string)data; } (); 1709 return parse!T(str); 1710 } 1711 else static assert(false, "Unsupported Redis reply type: " ~ T.stringof); 1712 } 1713 1714 private template typeFormatString(T) 1715 { 1716 static if (isFloatingPoint!T) enum typeFormatString = "%.16g"; 1717 else enum typeFormatString = "%s"; 1718 }