1 /**
2 	Redis database client implementation.
3 
4 	Copyright: © 2012-2016 RejectedSoftware e.K.
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Jan Krüger, Sönke Ludwig, Michael Eisendle, Etienne Cimon
7 */
8 module vibe.db.redis.redis;
9 
10 public import vibe.core.net;
11 
12 import vibe.core.connectionpool;
13 import vibe.core.core;
14 import vibe.core.log;
15 import vibe.inet.url;
16 import vibe.internal.allocator;
17 import vibe.internal.freelistref;
18 import vibe.stream.operations;
19 import std.conv;
20 import std.exception;
21 import std.format;
22 import std.range : isInputRange, isOutputRange;
23 import std.string;
24 import std.traits;
25 import std.typecons : Nullable;
26 import std.utf;
27 
28 @safe:
29 
30 
31 /**
32 	Returns a RedisClient that can be used to communicate to the specified database server.
33 */
34 RedisClient connectRedis(string host, ushort port = RedisClient.defaultPort)
35 {
36 	return new RedisClient(host, port);
37 }
38 
39 /**
40 	Returns a Redis database connecction instance corresponding to the given URL.
41 
42 	The URL must be of the format "redis://server[:port]/dbnum".
43 */
44 RedisDatabase connectRedisDB(URL url)
45 {
46 	auto cli = connectRedis(url.host, url.port != 0 ? url.port : RedisClient.defaultPort);
47 	// TODO: support password
48 	return cli.getDatabase(url.localURI[1 .. $].to!long);
49 }
50 
51 /**
52 	A redis client with connection pooling.
53 */
54 final class RedisClient {
55 	private {
56 		ConnectionPool!RedisConnection m_connections;
57 		string m_authPassword;
58 		string m_version;
59 		long m_selectedDB;
60 	}
61 
62 	enum defaultPort = 6379;
63 
64 	this(string host = "127.0.0.1", ushort port = defaultPort)
65 	{
66 		m_connections = new ConnectionPool!RedisConnection({
67 			return new RedisConnection(host, port);
68 		});
69 	}
70 
71 	/// Returns Redis version
72 	@property string redisVersion()
73 	{
74 		if(m_version == "")
75 		{
76 			import std.string;
77 			auto info = info();
78 			auto lines = info.splitLines();
79 			if (lines.length > 1) {
80 				foreach (string line; lines) {
81 					auto lineParams = line.split(":");
82 					if (lineParams.length > 1 && lineParams[0] == "redis_version") {
83 						m_version = lineParams[1];
84 						break;
85 					}
86 				}
87 			}
88 		}
89 
90 		return m_version;
91 	}
92 
93 	/** Returns a handle to the given database.
94 	*/
95 	RedisDatabase getDatabase(long index) { return RedisDatabase(this, index); }
96 
97 	/** Creates a RedisSubscriber instance for launching a pubsub listener
98 	*/
99 	RedisSubscriber createSubscriber() {
100 		return RedisSubscriber(this);
101 	}
102 
103 	/*
104 		Connection
105 	*/
106 
107 	/// Authenticate to the server
108 	void auth(string password) { m_authPassword = password; }
109 	/// Echo the given string
110 	T echo(T, U)(U data) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("ECHO", data); }
111 	/// Ping the server
112 	void ping() { request("PING"); }
113 	/// Close the connection
114 	void quit() { request("QUIT"); }
115 
116 	/*
117 		Server
118 	*/
119 
120 	//TODO: BGREWRITEAOF
121 	//TODO: BGSAVE
122 
123 	/// Get the value of a configuration parameter
124 	T getConfig(T)(string parameter) if(isValidRedisValueReturn!T) { return request!T("CONFIG", "GET", parameter); }
125 	/// Set a configuration parameter to the given value
126 	void setConfig(T)(string parameter, T value) if(isValidRedisValueType!T) { request("CONFIG", "SET", parameter, value); }
127 	/// Reset the stats returned by INFO
128 	void configResetStat() { request("CONFIG", "RESETSTAT"); }
129 
130 	//TOOD: Debug Object
131 	//TODO: Debug Segfault
132 
133 	/** Deletes all keys from all databases.
134 
135 		See_also: $(LINK2 http://redis.io/commands/flushall, FLUSHALL)
136 	*/
137 	void deleteAll() { request("FLUSHALL"); }
138 
139 	/// Get information and statistics about the server
140 	string info() { return request!string("INFO"); }
141 	/// Get the UNIX time stamp of the last successful save to disk
142 	long lastSave() { return request!long("LASTSAVE"); }
143 	//TODO monitor
144 	/// Synchronously save the dataset to disk
145 	void save() { request("SAVE"); }
146 	/// Synchronously save the dataset to disk and then shut down the server
147 	void shutdown() { request("SHUTDOWN"); }
148 	/// Make the server a slave of another instance, or promote it as master
149 	void slaveOf(string host, ushort port) { request("SLAVEOF", host, port); }
150 
151 	//TODO slowlog
152 	//TODO sync
153 
154 	private T request(T = void, ARGS...)(string command, scope ARGS args)
155 	{
156 		return requestDB!(T, ARGS)(m_selectedDB, command, args);
157 	}
158 
159 	private T requestDB(T, ARGS...)(long db, string command, scope ARGS args)
160 	{
161 		auto conn = m_connections.lockConnection();
162 		conn.setAuth(m_authPassword);
163 		conn.setDB(db);
164 		version (RedisDebug) {
165 			import std.conv;
166 			string debugargs = command;
167 			foreach (i, A; ARGS) debugargs ~= ", " ~ args[i].to!string;
168 		}
169 
170 		static if (is(T == void)) {
171 			version (RedisDebug) logDebug("Redis request: %s => void", debugargs);
172 			_request!void(conn, command, args);
173 		} else static if (!isInstanceOf!(RedisReply, T)) {
174 			auto ret = _request!T(conn, command, args);
175 			version (RedisDebug) logDebug("Redis request: %s => %s", debugargs, ret.to!string);
176 			return ret;
177 		} else {
178 			auto ret = _request!T(conn, command, args);
179 			version (RedisDebug) logDebug("Redis request: %s => RedisReply", debugargs);
180 			return ret;
181 		}
182 	}
183 }
184 
185 
186 /**
187 	Accesses the contents of a Redis database
188 */
189 struct RedisDatabase {
190 	private {
191 		RedisClient m_client;
192 		long m_index;
193 	}
194 
195 	private this(RedisClient client, long index)
196 	{
197 		m_client = client;
198 		m_index = index;
199 	}
200 
201 	/** The Redis client with which the database is accessed.
202 	*/
203 	@property inout(RedisClient) client() inout { return m_client; }
204 
205 	/** Index of the database.
206 	*/
207 	@property long index() const { return m_index; }
208 
209 	/** Deletes all keys of the database.
210 
211 		See_also: $(LINK2 http://redis.io/commands/flushdb, FLUSHDB)
212 	*/
213 	void deleteAll() { request!void("FLUSHDB"); }
214 	/// Delete a key
215 	long del(scope string[] keys...) { return request!long("DEL", keys); }
216 	/// Determine if a key exists
217 	bool exists(string key) { return request!bool("EXISTS", key); }
218 	/// Set a key's time to live in seconds
219 	bool expire(string key, long seconds) { return request!bool("EXPIRE", key, seconds); }
220 	/// Set a key's time to live with D notation. E.g. $(D 5.minutes) for 60 * 5 seconds.
221 	bool expire(string key, Duration timeout) { return request!bool("PEXPIRE", key, timeout.total!"msecs"); }
222 	/// Set the expiration for a key as a UNIX timestamp
223 	bool expireAt(string key, long timestamp) { return request!bool("EXPIREAT", key, timestamp); }
224 	/// Find all keys matching the given glob-style pattern (Supported wildcards: *, ?, [ABC])
225 	RedisReply!T keys(T = string)(string pattern) if(isValidRedisValueType!T) { return request!(RedisReply!T)("KEYS", pattern); }
226 	/// Move a key to another database
227 	bool move(string key, long db) { return request!bool("MOVE", key, db); }
228 	/// Remove the expiration from a key
229 	bool persist(string key) { return request!bool("PERSIST", key); }
230 	//TODO: object
231 	/// Return a random key from the keyspace
232 	string randomKey() { return request!string("RANDOMKEY"); }
233 	/// Rename a key
234 	void rename(string key, string newkey) { request("RENAME", key, newkey); }
235 	/// Rename a key, only if the new key does not exist
236 	bool renameNX(string key, string newkey) { return request!bool("RENAMENX", key, newkey); }
237 	//TODO sort
238 	/// Get the time to live for a key
239 	long ttl(string key) { return request!long("TTL", key); }
240 	/// Get the time to live for a key in milliseconds
241 	long pttl(string key) { return request!long("PTTL", key); }
242 	/// Determine the type stored at key (string, list, set, zset and hash.)
243 	string type(string key) { return request!string("TYPE", key); }
244 
245 	/*
246 		String Commands
247 	*/
248 
249 	/// Append a value to a key
250 	long append(T)(string key, T suffix) if(isValidRedisValueType!T) { return request!long("APPEND", key, suffix); }
251 	/// Decrement the integer value of a key by one
252 	long decr(string key, long value = 1) { return value == 1 ? request!long("DECR", key) : request!long("DECRBY", key, value); }
253 	/// Get the value of a key
254 	T get(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("GET", key); }
255 	/// Returns the bit value at offset in the string value stored at key
256 	bool getBit(string key, long offset) { return request!bool("GETBIT", key, offset); }
257 	/// Get a substring of the string stored at a key
258 	T getRange(T = string)(string key, long start, long end) if(isValidRedisValueReturn!T) { return request!T("GETRANGE", key, start, end); }
259 	/// Set the string value of a key and return its old value
260 	T getSet(T = string, U)(string key, U value) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("GETSET", key, value); }
261 	/// Increment the integer value of a key
262 	long incr(string key, long value = 1) { return value == 1 ? request!long("INCR", key) : request!long("INCRBY", key, value); }
263 	/// Increment the real number value of a key
264 	long incr(string key, double value) { return request!long("INCRBYFLOAT", key, value); }
265 	/// Get the values of all the given keys
266 	RedisReply!T mget(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("MGET", keys); }
267 
268 	/// Set multiple keys to multiple values
269 	void mset(ARGS...)(ARGS args)
270 	{
271 		static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value");
272 		foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings.");
273 		request("MSET", args);
274 	}
275 
276 	/// Set multiple keys to multiple values, only if none of the keys exist
277 	bool msetNX(ARGS...)(ARGS args) {
278 		static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value");
279 		foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings.");
280 		return request!bool("MSETEX", args);
281 	}
282 
283 	/// Set the string value of a key
284 	void set(T)(string key, T value) if(isValidRedisValueType!T) { request("SET", key, value); }
285 	/// Set the value of a key, only if the key does not exist
286 	bool setNX(T)(string key, T value) if(isValidRedisValueType!T) { return request!bool("SETNX", key, value); }
287 	/// Set the value of a key, only if the key already exists
288 	bool setXX(T)(string key, T value) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "XX"); }
289 	/// Set the value of a key, only if the key does not exist, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes.
290 	bool setNX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "NX"); }
291 	/// Set the value of a key, only if the key already exists, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes.
292 	bool setXX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "XX"); }
293 	/// Sets or clears the bit at offset in the string value stored at key
294 	bool setBit(string key, long offset, bool value) { return request!bool("SETBIT", key, offset, value ? "1" : "0"); }
295 	/// Set the value and expiration of a key
296 	void setEX(T)(string key, long seconds, T value) if(isValidRedisValueType!T) { request("SETEX", key, seconds, value); }
297 	/// Overwrite part of a string at key starting at the specified offset
298 	long setRange(T)(string key, long offset, T value) if(isValidRedisValueType!T) { return request!long("SETRANGE", key, offset, value); }
299 	/// Get the length of the value stored in a key
300 	long strlen(string key) { return request!long("STRLEN", key); }
301 
302 	/*
303 		Hashes
304 	*/
305 	/// Delete one or more hash fields
306 	long hdel(string key, scope string[] fields...) { return request!long("HDEL", key, fields); }
307 	/// Determine if a hash field exists
308 	bool hexists(string key, string field) { return request!bool("HEXISTS", key, field); }
309 	/// Set multiple hash fields to multiple values
310 	void hset(T)(string key, string field, T value) if(isValidRedisValueType!T) { request("HSET", key, field, value); }
311 	/// Set the value of a hash field, only if the field does not exist
312 	bool hsetNX(T)(string key, string field, T value) if(isValidRedisValueType!T) { return request!bool("HSETNX", key, field, value); }
313 	/// Get the value of a hash field.
314 	T hget(T = string)(string key, string field) if(isValidRedisValueReturn!T) { return request!T("HGET", key, field); }
315 	/// Get all the fields and values in a hash
316 	RedisReply!T hgetAll(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HGETALL", key); }
317 	/// Increment the integer value of a hash field
318 	long hincr(string key, string field, long value=1) { return request!long("HINCRBY", key, field, value); }
319 	/// Increment the real number value of a hash field
320 	long hincr(string key, string field, double value) { return request!long("HINCRBYFLOAT", key, field, value); }
321 	/// Get all the fields in a hash
322 	RedisReply!T hkeys(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HKEYS", key); }
323 	/// Get the number of fields in a hash
324 	long hlen(string key) { return request!long("HLEN", key); }
325 	/// Get the values of all the given hash fields
326 	RedisReply!T hmget(T = string)(string key, scope string[] fields...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HMGET", key, fields); }
327 	/// Set multiple hash fields to multiple values
328 	void hmset(ARGS...)(string key, ARGS args) { request("HMSET", key, args); }
329 
330 	/// Get all the values in a hash
331 	RedisReply!T hvals(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HVALS", key); }
332 
333 	/*
334 		Lists
335 	*/
336 	/// Get an element from a list by its index
337 	T lindex(T = string)(string key, long index) if(isValidRedisValueReturn!T) { return request!T("LINDEX", key, index); }
338 	/// Insert value in the list stored at key before the reference value pivot.
339 	long linsertBefore(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "BEFORE", pivot, value); }
340 	/// Insert value in the list stored at key after the reference value pivot.
341 	long linsertAfter(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "AFTER", pivot, value); }
342 	/// Returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned.
343 	long llen(string key) { return request!long("LLEN", key); }
344 	/// Insert all the specified values at the head of the list stored at key.
345 	long lpush(ARGS...)(string key, ARGS args) { return request!long("LPUSH", key, args); }
346 	/// Inserts value at the head of the list stored at key, only if key already exists and holds a list.
347 	long lpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("LPUSHX", key, value); }
348 	/// Insert all the specified values at the tail of the list stored at key.
349 	long rpush(ARGS...)(string key, ARGS args) { return request!long("RPUSH", key, args); }
350 	/// Inserts value at the tail of the list stored at key, only if key already exists and holds a list.
351 	long rpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("RPUSHX", key, value); }
352 	/// Returns the specified elements of the list stored at key.
353 	RedisReply!T lrange(T = string)(string key, long start, long stop) { return request!(RedisReply!T)("LRANGE",  key, start, stop); }
354 	/// Removes the first count occurrences of elements equal to value from the list stored at key.
355 	long lrem(T)(string key, long count, T value) if(isValidRedisValueType!T) { return request!long("LREM", key, count, value); }
356 	/// Sets the list element at index to value.
357 	void lset(T)(string key, long index, T value) if(isValidRedisValueType!T) { request("LSET", key, index, value); }
358 	/// Trim an existing list so that it will contain only the specified range of elements specified.
359 	/// Equivalent to $(D range = range[start .. stop+1])
360 	void ltrim(string key, long start, long stop) { request("LTRIM",  key, start, stop); }
361 	/// Removes and returns the last element of the list stored at key.
362 	T rpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("RPOP", key); }
363 	/// Removes and returns the first element of the list stored at key.
364 	T lpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("LPOP", key); }
365 	/// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks
366 	/// the connection when there are no elements to pop from any of the given lists.
367 	Nullable!(Tuple!(string, T)) blpop(T = string)(string key, long seconds) if(isValidRedisValueReturn!T)
368 	{
369 		auto reply = request!(RedisReply!(ubyte[]))("BLPOP", key, seconds);
370 		Nullable!(Tuple!(string, T)) ret;
371 		if (reply.empty || reply.frontIsNull) return ret;
372 		string rkey = reply.front.convertToType!string();
373 		reply.popFront();
374 		ret = tuple(rkey, reply.front.convertToType!T());
375 		return ret;
376 	}
377 	/// Atomically returns and removes the last element (tail) of the list stored at source,
378 	/// and pushes the element at the first element (head) of the list stored at destination.
379 	T rpoplpush(T = string)(string key, string destination) if(isValidRedisValueReturn!T) { return request!T("RPOPLPUSH", key, destination); }
380 
381 	/*
382 		Sets
383 	*/
384 	/// Add the specified members to the set stored at key. Specified members that are already a member of this set are ignored.
385 	/// If key does not exist, a new set is created before adding the specified members.
386 	long sadd(ARGS...)(string key, ARGS args) { return request!long("SADD", key, args); }
387 	/// Returns the set cardinality (number of elements) of the set stored at key.
388 	long scard(string key) { return request!long("SCARD", key); }
389 	/// Returns the members of the set resulting from the difference between the first set and all the successive sets.
390 	RedisReply!T sdiff(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SDIFF", keys); }
391 	/// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination.
392 	/// If destination already exists, it is overwritten.
393 	long sdiffStore(string destination, scope string[] keys...) { return request!long("SDIFFSTORE", destination, keys); }
394 	/// Returns the members of the set resulting from the intersection of all the given sets.
395 	RedisReply!T sinter(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SINTER", keys); }
396 	/// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination.
397 	/// If destination already exists, it is overwritten.
398 	long sinterStore(string destination, scope string[] keys...) { return request!long("SINTERSTORE", destination, keys); }
399 	/// Returns if member is a member of the set stored at key.
400 	bool sisMember(T)(string key, T member) if(isValidRedisValueType!T) { return request!bool("SISMEMBER", key, member); }
401 	/// Returns all the members of the set value stored at key.
402 	RedisReply!T smembers(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SMEMBERS", key); }
403 	/// Move member from the set at source to the set at destination. This operation is atomic.
404 	/// In every given moment the element will appear to be a member of source or destination for other clients.
405 	bool smove(T)(string source, string destination, T member) if(isValidRedisValueType!T) { return request!bool("SMOVE", source, destination, member); }
406 	/// Removes and returns a random element from the set value stored at key.
407 	T spop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SPOP", key ); }
408 	/// Returns a random element from the set stored at key.
409 	T srandMember(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SRANDMEMBER", key ); }
410 	///returns count random elements from the set stored at key
411 	RedisReply!T srandMember(T = string)(string key, long count) if(isValidRedisValueReturn!T) { return request!(RedisReply!T)("SRANDMEMBER", key, count ); }
412 
413 
414 	/// Remove the specified members from the set stored at key.
415 	long srem(ARGS...)(string key, ARGS args) { return request!long("SREM", key, args); }
416 	/// Returns the members of the set resulting from the union of all the given sets.
417 	RedisReply!T sunion(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SUNION", keys); }
418 	/// This command is equal to SUNION, but instead of returning the resulting set, it is stored in destination.
419 	long sunionStore(scope string[] keys...) { return request!long("SUNIONSTORE", keys); }
420 
421 	/*
422 		Sorted Sets
423 	*/
424 	/// Add one or more members to a sorted set, or update its score if it already exists
425 	long zadd(ARGS...)(string key, ARGS args) { return request!long("ZADD", key, args); }
426 	/// Returns the sorted set cardinality (number of elements) of the sorted set stored at key.
427 	long zcard(string key) { return request!long("ZCARD", key); }
428 	/// Returns the number of elements in the sorted set at key with a score between min and max
429 	long zcount(string RNG = "[]")(string key, double min, double max) { return request!long("ZCOUNT", key, getMinMaxArgs!RNG(min, max)); }
430 	/// Increments the score of member in the sorted set stored at key by increment.
431 	double zincrby(T)(string key, double value, T member) if (isValidRedisValueType!T) { return request!double("ZINCRBY", key, value, member); }
432 	//TODO: zinterstore
433 	/// Returns the specified range of elements in the sorted set stored at key.
434 	RedisReply!T zrange(T = string)(string key, long start, long end, bool with_scores = false)
435 		if(isValidRedisValueType!T)
436 	{
437 		if (with_scores) return request!(RedisReply!T)("ZRANGE", key, start, end, "WITHSCORES");
438 		else return request!(RedisReply!T)("ZRANGE", key, start, end);
439 	}
440 
441 	/// When all the elements in a sorted set are inserted with the same score, in order to force lexicographical ordering,
442 	/// this command returns all the elements in the sorted set at key with a value between min and max.
443 	RedisReply!T zrangeByLex(T = string)(string key, string min = "-", string max = "+", long offset = 0, long count = -1)
444 		if(isValidRedisValueType!T)
445 	{
446 		if (offset > 0 || count != -1) return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max, "LIMIT", offset, count);
447 		else return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max);
448 	}
449 
450 	/// Returns all the elements in the sorted set at key with a score between start and end inclusively
451 	RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, bool with_scores = false)
452 		if(isValidRedisValueType!T)
453 	{
454 		if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES");
455 		else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end));
456 	}
457 
458 	/// Computes an internal list of elements in the sorted set at key with a score between start and end inclusively,
459 	/// and returns a range subselection similar to $(D results[offset .. offset+count])
460 	RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, long offset, long count, bool with_scores = false)
461 		if(isValidRedisValueType!T)
462 	{
463 		assert(offset >= 0);
464 		assert(count >= 0);
465 		if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES", "LIMIT", offset, count);
466 		else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "LIMIT", offset, count);
467 	}
468 
469 	/// Returns the rank of member in the sorted set stored at key, with the scores ordered from low to high.
470 	long zrank(T)(string key, T member)
471 		if (isValidRedisValueType!T)
472 	{
473 		auto str = request!string("ZRANK", key, member);
474 		return str != "" ? parse!long(str) : -1;
475 	}
476 
477 	/// Removes the specified members from the sorted set stored at key.
478 	long zrem(ARGS...)(string key, ARGS members) { return request!long("ZREM", key, members); }
479 	/// Removes all elements in the sorted set stored at key with rank between start and stop.
480 	long zremRangeByRank(string key, long start, long stop) { return request!long("ZREMRANGEBYRANK", key, start, stop); }
481 	/// Removes all elements in the sorted set stored at key with a score between min and max (inclusive).
482 	long zremRangeByScore(string RNG = "[]")(string key, double min, double max) { return request!long("ZREMRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max));}
483 	/// Returns the specified range of elements in the sorted set stored at key.
484 	RedisReply!T zrevRange(T = string)(string key, long start, long end, bool with_scores = false)
485 		if(isValidRedisValueType!T)
486 	{
487 		if (with_scores) return request!(RedisReply!T)("ZREVRANGE", key, start, end, "WITHSCORES");
488 		else return request!(RedisReply!T)("ZREVRANGE", key, start, end);
489 	}
490 
491 	/// Returns all the elements in the sorted set at key with a score between max and min (including elements with score equal to max or min).
492 	RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, bool with_scores=false)
493 		if(isValidRedisValueType!T)
494 	{
495 		if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES");
496 		else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max));
497 	}
498 
499 	/// Computes an internal list of elements in the sorted set at key with a score between max and min, and
500 	/// returns a window of elements selected in a way equivalent to $(D results[offset .. offset + count])
501 	RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, long offset, long count, bool with_scores=false)
502 		if(isValidRedisValueType!T)
503 	{
504 		assert(offset >= 0);
505 		assert(count >= 0);
506 		if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES", "LIMIT", offset, count);
507 		else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "LIMIT", offset, count);
508 	}
509 
510 	/// Returns the rank of member in the sorted set stored at key, with the scores ordered from high to low.
511 	long zrevRank(T)(string key, T member)
512 		if (isValidRedisValueType!T)
513 	{
514 		auto str = request!string("ZREVRANK", key, member);
515 		return str != "" ? parse!long(str) : -1;
516 	}
517 
518 	/// Returns the score of member in the sorted set at key.
519 	RedisReply!T zscore(T = string, U)(string key, U member)
520 		if(isValidRedisValueType!T && isValidRedisValueType!U)
521 	{
522 		return request!(RedisReply!T)("ZSCORE", key, member);
523 	}
524 
525 	/*
526 		Hyperloglog
527 	*/
528 
529 	/// Adds one or more Keys to a HyperLogLog data structure .
530 	long pfadd(ARGS...)(string key, ARGS args) { return request!long("PFADD", key, args); }
531 
532 	/** Returns the approximated cardinality computed by the HyperLogLog data
533 		structure stored at the specified key.
534 
535 		When called with a single key, returns the approximated cardinality
536 		computed by the HyperLogLog data structure stored at the specified
537 		variable, which is 0 if the variable does not exist.
538 
539 		When called with multiple keys, returns the approximated cardinality
540 		of the union of the HyperLogLogs passed, by internally merging the
541 		HyperLogLogs stored at the provided keys into a temporary HyperLogLog.
542 	*/
543 	long pfcount(scope string[] keys...) { return request!long("PFCOUNT", keys); }
544 
545 	/// Merge multiple HyperLogLog values into a new one.
546 	void pfmerge(ARGS...)(string destkey, ARGS args) { request("PFMERGE", destkey, args); }
547 
548 
549 	//TODO: zunionstore
550 
551 	/*
552 		Pub / Sub
553 	*/
554 
555 	/// Publishes a message to all clients subscribed at the channel
556 	long publish(string channel, string message)
557 	{
558 		auto str = request!string("PUBLISH", channel, message);
559 		return str != "" ? parse!long(str) : -1;
560 	}
561 
562 	/// Inspect the state of the Pub/Sub subsystem
563 	RedisReply!T pubsub(T = string)(string subcommand, scope string[] args...)
564 		if(isValidRedisValueType!T)
565 	{
566 		return request!(RedisReply!T)("PUBSUB", subcommand, args);
567 	}
568 
569 	/*
570 		TODO: Transactions
571 	*/
572 	/// Return the number of keys in the selected database
573 	long dbSize() { return request!long("DBSIZE"); }
574 
575 	/*
576 		LUA Scripts
577 	*/
578 	/// Execute a Lua script server side
579 	RedisReply!T eval(T = string, ARGS...)(string lua_code, scope string[] keys, scope ARGS args)
580 		if(isValidRedisValueType!T)
581 	{
582 		return request!(RedisReply!T)("EVAL", lua_code, keys.length, keys, args);
583 	}
584 	/// Evaluates a script cached on the server side by its SHA1 digest. Scripts are cached on the server side using the scriptLoad function.
585 	RedisReply!T evalSHA(T = string, ARGS...)(string sha, scope string[] keys, scope ARGS args)
586 		if(isValidRedisValueType!T)
587 	{
588 		return request!(RedisReply!T)("EVALSHA", sha, keys.length, keys, args);
589 	}
590 
591 	//scriptExists
592 	//scriptFlush
593 	//scriptKill
594 
595 	/// Load a script into the scripts cache, without executing it. Run it using evalSHA.
596 	string scriptLoad(string lua_code) { return request!string("SCRIPT", "LOAD", lua_code); }
597 
598 	/// Run the specified command and arguments in the Redis database server
599 	T request(T = void, ARGS...)(string command, scope ARGS args)
600 	{
601 		return m_client.requestDB!(T, ARGS)(m_index, command, args);
602 	}
603 
604 	private static string[2] getMinMaxArgs(string RNG)(double min, double max)
605 	{
606 		// TODO: avoid GC allocations
607 		static assert(RNG.length == 2, "The RNG range specification must be two characters long");
608 
609 		string[2] ret;
610 		string mins, maxs;
611 		mins = min == -double.infinity ? "-inf" : min == double.infinity ? "+inf" : format(typeFormatString!double, min);
612 		maxs = max == -double.infinity ? "-inf" : max == double.infinity ? "+inf" : format(typeFormatString!double, max);
613 
614 		static if (RNG[0] == '[') ret[0] = mins;
615 		else static if (RNG[0] == '(') ret[0] = '('~mins;
616 		else static assert(false, "Opening range specification mist be either '[' or '('.");
617 
618 		static if (RNG[1] == ']') ret[1] = maxs;
619 		else static if (RNG[1] == ')') ret[1] = '('~maxs;
620 		else static assert(false, "Closing range specification mist be either ']' or ')'.");
621 
622 		return ret;
623 	}
624 }
625 
626 
627 /**
628 	A redis subscription listener
629 */
630 import std.datetime;
631 import std.variant;
632 import std.typecons : Tuple, tuple;
633 import std.container : Array;
634 import std.algorithm : canFind;
635 import std.range : takeOne;
636 import std.array : array;
637 
638 import vibe.core.concurrency;
639 import vibe.core.sync;
640 
641 alias RedisSubscriber = FreeListRef!RedisSubscriberImpl;
642 
643 final class RedisSubscriberImpl {
644 	private {
645 		RedisClient m_client;
646 		LockedConnection!RedisConnection m_lockedConnection;
647 		bool[string] m_subscriptions;
648 		string[] m_pendingSubscriptions;
649 		bool m_listening;
650 		bool m_stop;
651 		Task m_listener;
652 		Task m_listenerHelper;
653 		Task m_waiter;
654 		InterruptibleRecursiveTaskMutex m_mutex;
655 		InterruptibleTaskMutex m_connMutex;
656 	}
657 
658 	private enum Action {
659 		DATA,
660 		STOP,
661 		STARTED,
662 		SUBSCRIBE,
663 		UNSUBSCRIBE
664 	}
665 
666 	@property bool isListening() const {
667 		return m_listening;
668 	}
669 
670 	/// Get a list of channels with active subscriptions
671 	@property string[] subscriptions() const {
672 		return () @trusted { return m_subscriptions.keys; } ();
673 	}
674 
675 	bool hasSubscription(string channel) const {
676 		return (channel in m_subscriptions) !is null && m_subscriptions[channel];
677 	}
678 
679 	this(RedisClient client) {
680 
681 		logTrace("this()");
682 		m_client = client;
683 		m_mutex = new InterruptibleRecursiveTaskMutex;
684 		m_connMutex = new InterruptibleTaskMutex;
685 	}
686 
687 	~this() {
688 		logTrace("~this");
689 		bstop();
690 	}
691 
692 	/// Stop listening and yield until the operation is complete.
693 	void bstop(){
694 		logTrace("bstop");
695 		if (!m_listening) return;
696 
697 		void impl() @safe {
698 			m_mutex.performLocked!({
699 				m_waiter = Task.getThis();
700 				scope(exit) m_waiter = Task();
701 				stop();
702 
703 				bool stopped;
704 				do {
705 					if (!() @trusted { return receiveTimeout(3.seconds, (Action act) { if (act == Action.STOP) stopped = true;  }); } ())
706 						break;
707 				} while (!stopped);
708 
709 				enforce(stopped, "Failed to wait for Redis listener to stop");
710 			});
711 		}
712 		inTask(&impl);
713 	}
714 
715 	/// Stop listening asynchroneously
716 	void stop(){
717 		logTrace("stop");
718 		if (!m_listening)
719 			return;
720 
721 		void impl() @safe {
722 			m_mutex.performLocked!({
723 				m_stop = true;
724 				() @trusted { m_listener.send(Action.STOP); } ();
725 				// send a message to wake up the listenerHelper from the reply
726 				if (m_subscriptions.length > 0) {
727 					m_connMutex.performLocked!(() {
728 						_request_void(m_lockedConnection, "UNSUBSCRIBE", () @trusted { return cast(string[]) m_subscriptions.keys.takeOne.array; } ());
729 					});
730 					sleep(30.msecs);
731 				}
732 			});
733 		}
734 		inTask(&impl);
735 	}
736 
737 	private bool hasNewSubscriptionIn(scope string[] args) {
738 		bool has_new;
739 		foreach (arg; args)
740 			if (!hasSubscription(arg))
741 				has_new = true;
742 		if (!has_new)
743 			return false;
744 
745 		return true;
746 	}
747 
748 	private bool anySubscribed(scope string[] args) {
749 
750 		bool any_subscribed;
751 		foreach (arg ; args) {
752 			if (hasSubscription(arg))
753 				any_subscribed = true;
754 		}
755 		return any_subscribed;
756 	}
757 
758 	/// Completes the subscription for a listener to start receiving pubsub messages
759 	/// on the corresponding channel(s). Returns instantly if already subscribed.
760 	/// If a connection error is thrown here, it stops the listener.
761 	void subscribe(scope string[] args...)
762 	{
763 		logTrace("subscribe");
764 		if (!m_listening) {
765 			foreach (arg; args)
766 				m_pendingSubscriptions ~= arg;
767 			return;
768 		}
769 
770 		if (!hasNewSubscriptionIn(args))
771 			return;
772 
773 		void impl() @safe {
774 
775 			scope(failure) { logTrace("Failure"); bstop(); }
776 			try {
777 				m_mutex.performLocked!({
778 					m_waiter = Task.getThis();
779 					scope(exit) m_waiter = Task();
780 					bool subscribed;
781 					m_connMutex.performLocked!({
782 						_request_void(m_lockedConnection, "SUBSCRIBE", args);
783 					});
784 					while(!() @trusted { return m_subscriptions.byKey.canFind(args); } ()) {
785 						if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE);  }); } ())
786 							break;
787 
788 						subscribed = true;
789 					}
790 					debug {
791 						auto keys = () @trusted { return m_subscriptions.keys; } ();
792 						logTrace("Can find keys?: %s",  keys.canFind(args));
793 						logTrace("Subscriptions: %s", keys);
794 					}
795 					enforce(subscribed, "Could not complete subscription(s).");
796 				});
797 			} catch (Exception e) {
798 				logDebug("Redis subscribe() failed: ", e.msg);
799 			}
800 		}
801 		inTask(&impl);
802 	}
803 
804 	/// Unsubscribes from the channel(s) specified, returns immediately if none
805 	/// is currently being listened.
806 	/// If a connection error is thrown here, it stops the listener.
807 	void unsubscribe(scope string[] args...)
808 	{
809 		logTrace("unsubscribe");
810 
811 		void impl() @safe {
812 
813 			if (!anySubscribed(args))
814 				return;
815 
816 			scope(failure) bstop();
817 			assert(m_listening);
818 
819 			m_mutex.performLocked!({
820 				m_waiter = Task.getThis();
821 				scope(exit) m_waiter = Task();
822 				bool unsubscribed;
823 				m_connMutex.performLocked!({
824 					_request_void(m_lockedConnection, "UNSUBSCRIBE", args);
825 				});
826 				while(() @trusted { return m_subscriptions.byKey.canFind(args); } ()) {
827 					if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE);  }); } ()) {
828 						unsubscribed = false;
829 						break;
830 					}
831 					unsubscribed = true;
832 				}
833 				debug {
834 					auto keys = () @trusted { return m_subscriptions.keys; } ();
835 					logTrace("Can find keys?: %s",  keys.canFind(args));
836 					logTrace("Subscriptions: %s", keys);
837 				}
838 				enforce(unsubscribed, "Could not complete unsubscription(s).");
839 			});
840 		}
841 		inTask(&impl);
842 	}
843 
844 	/// Same as subscribe, but uses glob patterns, and does not return instantly if
845 	/// the subscriptions are already registered.
846 	/// throws Exception if the pattern does not yield a new subscription.
847 	void psubscribe(scope string[] args...)
848 	{
849 		logTrace("psubscribe");
850 		void impl() @safe {
851 			scope(failure) bstop();
852 			assert(m_listening);
853 			m_mutex.performLocked!({
854 				m_waiter = Task.getThis();
855 				scope(exit) m_waiter = Task();
856 				bool subscribed;
857 				m_connMutex.performLocked!({
858 					_request_void(m_lockedConnection, "PSUBSCRIBE", args);
859 				});
860 
861 				if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE);  }); } ())
862 					subscribed = false;
863 				else
864 					subscribed = true;
865 
866 				debug logTrace("Subscriptions: %s", () @trusted { return m_subscriptions.keys; } ());
867 				enforce(subscribed, "Could not complete subscription(s).");
868 			});
869 		}
870 		inTask(&impl);
871 	}
872 
873 	/// Same as unsubscribe, but uses glob patterns, and does not return instantly if
874 	/// the subscriptions are not registered.
875 	/// throws Exception if the pattern does not yield a new unsubscription.
876 	void punsubscribe(scope string[] args...)
877 	{
878 		logTrace("punsubscribe");
879 		void impl() @safe {
880 			scope(failure) bstop();
881 			assert(m_listening);
882 			m_mutex.performLocked!({
883 				m_waiter = Task.getThis();
884 				scope(exit) m_waiter = Task();
885 				bool unsubscribed;
886 				m_connMutex.performLocked!({
887 					_request_void(m_lockedConnection, "PUNSUBSCRIBE", args);
888 				});
889 				if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE);  }); } ())
890 					unsubscribed = false;
891 				else
892 					unsubscribed = true;
893 
894 				debug {
895 					auto keys = () @trusted { return m_subscriptions.keys; } ();
896 					logTrace("Can find keys?: %s",  keys.canFind(args));
897 					logTrace("Subscriptions: %s", keys);
898 				}
899 				enforce(unsubscribed, "Could not complete unsubscription(s).");
900 			});
901 		}
902 		inTask(&impl);
903 	}
904 
905 	private void inTask(scope void delegate() @safe impl) {
906 		logTrace("inTask");
907 		if (Task.getThis() == Task())
908 		{
909 			Throwable ex;
910 			bool done;
911 			Task task = runTask({
912 				logDebug("inTask %s", Task.getThis());
913 				try impl();
914 				catch (Exception e) {
915 					ex = e;
916 				}
917 				done = true;
918 			});
919 			task.join();
920 			logDebug("done");
921 			if (ex)
922 				throw ex;
923 		}
924 		else
925 			impl();
926 	}
927 
928 	private void init(){
929 
930 		logTrace("init");
931 		if (!m_lockedConnection) {
932 			m_lockedConnection = m_client.m_connections.lockConnection();
933 			m_lockedConnection.setAuth(m_client.m_authPassword);
934 			m_lockedConnection.setDB(m_client.m_selectedDB);
935 		}
936 
937 		if (!m_lockedConnection.conn || !m_lockedConnection.conn.connected) {
938 			try m_lockedConnection.conn = connectTCP(m_lockedConnection.m_host, m_lockedConnection.m_port);
939 			catch (Exception e) {
940 				throw new Exception(format("Failed to connect to Redis server at %s:%s.", m_lockedConnection.m_host, m_lockedConnection.m_port), __FILE__, __LINE__, e);
941 			}
942 			m_lockedConnection.conn.tcpNoDelay = true;
943 			m_lockedConnection.setAuth(m_client.m_authPassword);
944 			m_lockedConnection.setDB(m_client.m_selectedDB);
945 		}
946 	}
947 
948 	// Same as listen, but blocking
949 	void blisten(void delegate(string, string) @safe onMessage, Duration timeout = 0.seconds)
950 	{
951 		init();
952 
953 		void onSubscribe(string channel) @safe {
954 			logTrace("Callback subscribe(%s)", channel);
955 			m_subscriptions[channel] = true;
956 			if (m_waiter != Task())
957 				() @trusted { m_waiter.send(Action.SUBSCRIBE); } ();
958 		}
959 
960 		void onUnsubscribe(string channel) @safe {
961 			logTrace("Callback unsubscribe(%s)", channel);
962 			m_subscriptions.remove(channel);
963 			if (m_waiter != Task())
964 				() @trusted { m_waiter.send(Action.UNSUBSCRIBE); } ();
965 		}
966 
967 		void teardown() @safe { // teardown
968 			logTrace("Redis listener exiting");
969 			// More publish commands may be sent to this connection after recycling it, so we
970 			// actively destroy it
971 			Action act;
972 			// wait for the listener helper to send its stop message
973 			while (act != Action.STOP)
974 				act = () @trusted { return receiveOnly!Action(); } ();
975 			m_lockedConnection.conn.close();
976 			m_lockedConnection.destroy();
977 			m_listening = false;
978 			return;
979 		}
980 		// http://redis.io/topics/pubsub
981 		/**
982 			 	SUBSCRIBE first second
983 				*3
984 				$9
985 				subscribe
986 				$5
987 				first
988 				:1
989 				*3
990 				$9
991 				subscribe
992 				$6
993 				second
994 				:2
995 			*/
996 		// This is a simple parser/handler for subscribe/unsubscribe/publish
997 		// commands sent by redis. The PubSub client protocol is simple enough
998 
999 		void pubsub_handler() {
1000 			TCPConnection conn = m_lockedConnection.conn;
1001 			logTrace("Pubsub handler");
1002 			void dropCRLF() @safe {
1003 				ubyte[2] crlf;
1004 				conn.read(crlf);
1005 			}
1006 			size_t readArgs() @safe {
1007 				char[8] ucnt;
1008 				ubyte[1] num;
1009 				size_t i;
1010 				do {
1011 					conn.read(num);
1012 					if (num[0] >= 48 && num[0] <= 57)
1013 						ucnt[i] = num[0];
1014 					else break;
1015 					i++;
1016 				}
1017 				while (true); // ascii
1018 				ubyte[1] b;
1019 				conn.read(b);
1020 				logTrace("Found %s", ucnt);
1021 				// the new line is consumed when num is not in range.
1022 				return ucnt[0 .. i].to!size_t;
1023 			}
1024 			// find the number of arguments in the array
1025 			ubyte[1] symbol;
1026 			conn.read(symbol);
1027 			enforce(symbol[0] == '*', "Expected '*', got '" ~ symbol.to!string ~ "'");
1028 			size_t args = readArgs();
1029 			// get the number of characters in the first string (the command)
1030 			conn.read(symbol);
1031 			enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'");
1032 			size_t cnt = readArgs();
1033 			ubyte[] cmd = () @trusted { return theAllocator.makeArray!ubyte(cnt); } ();
1034 			scope(exit) () @trusted { theAllocator.dispose(cmd); } ();
1035 			conn.read(cmd);
1036 			dropCRLF();
1037 			// find the channel
1038 			conn.read(symbol);
1039 			enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'");
1040 			cnt = readArgs();
1041 			ubyte[] str = new ubyte[cnt];
1042 			conn.read(str);
1043 			dropCRLF();
1044 			string channel = () @trusted { return cast(string)str; } ();
1045 			logTrace("chan: %s", channel);
1046 
1047 			if (cmd == "message") { // find the message
1048 				conn.read(symbol);
1049 				enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'");
1050 				cnt = readArgs();
1051 				str = new ubyte[cnt];
1052 				conn.read(str); // channel
1053 				string message = () @trusted { return cast(string)str.idup; } ();
1054 				logTrace("msg: %s", message);
1055 				dropCRLF();
1056 				onMessage(channel, message);
1057 			}
1058 			else if (cmd == "subscribe" || cmd == "unsubscribe") { // find the remaining subscriptions
1059 				bool is_subscribe = (cmd == "subscribe");
1060 				conn.read(symbol);
1061 				enforce(symbol[0] == ':', "Expected ':', got '" ~ symbol.to!string ~ "'");
1062 				cnt = readArgs(); // number of subscriptions
1063 				logTrace("subscriptions: %d", cnt);
1064 				if (is_subscribe)
1065 					onSubscribe(channel);
1066 				else
1067 					onUnsubscribe(channel);
1068 
1069 				// todo: enforce the number of subscriptions?
1070 			}
1071 			else assert(false, "Unrecognized pubsub wire protocol command received");
1072 		}
1073 
1074 		// Waits for data and advises the handler
1075 		m_listenerHelper = runTask( {
1076 			while(true) {
1077 				if (!m_stop && m_lockedConnection.conn && m_lockedConnection.conn.waitForData(100.msecs)) {
1078 					// We check every 5 seconds if this task should stay active
1079 					if (m_stop)	break;
1080 					else if (m_lockedConnection.conn && !m_lockedConnection.conn.dataAvailableForRead) continue;
1081 					// Data has arrived, this task is in charge of notifying the main handler loop
1082 					logTrace("Notify data arrival");
1083 
1084 					() @trusted { receiveTimeout(0.seconds, (Variant v) {}); } (); // clear message queue
1085 					() @trusted { m_listener.send(Action.DATA); } ();
1086 					if (!() @trusted { return receiveTimeout(5.seconds, (Action act) { assert(act == Action.DATA); }); } ())
1087 						assert(false);
1088 
1089 				} else if (m_stop || !m_lockedConnection.conn) break;
1090 				logTrace("No data arrival in 100 ms...");
1091 			}
1092 			logTrace("Listener Helper exit.");
1093 			() @trusted { m_listener.send(Action.STOP); } ();
1094 		} );
1095 
1096 		m_listening = true;
1097 		logTrace("Redis listener now listening");
1098 		if (m_waiter != Task())
1099 			() @trusted { m_waiter.send(Action.STARTED); } ();
1100 
1101 		if (timeout == 0.seconds)
1102 			timeout = 365.days; // make sure 0.seconds is considered as big.
1103 
1104 		scope(exit) {
1105 			logTrace("Redis Listener exit.");
1106 			if (!m_stop) {
1107 				stop(); // notifies the listenerHelper
1108 			}
1109 			m_listenerHelper.join();
1110 			// close the data connections
1111 			teardown();
1112 
1113 			if (m_waiter != Task())
1114 				() @trusted { m_waiter.send(Action.STOP); } ();
1115 
1116 			m_listenerHelper = Task();
1117 			m_listener = Task();
1118 			m_stop = false;
1119 		}
1120 
1121 		// Start waiting for data notifications to arrive
1122 		while(true) {
1123 
1124 			auto handler = (Action act) {
1125 				if (act == Action.STOP) m_stop = true;
1126 				if (m_stop) return;
1127 				logTrace("Calling PubSub Handler");
1128 				m_connMutex.performLocked!({
1129 					pubsub_handler(); // handles one command at a time
1130 				});
1131 				() @trusted { m_listenerHelper.send(Action.DATA); } ();
1132 			};
1133 
1134 			if (!() @trusted { return receiveTimeout(timeout, handler); } () || m_stop) {
1135 				logTrace("Redis Listener stopped");
1136 				break;
1137 			}
1138 
1139 		}
1140 
1141 	}
1142 	/// ditto
1143 	deprecated("Use an @safe message callback")
1144 	void blisten(void delegate(string, string) @system onMessage, Duration timeout = 0.seconds)
1145 	{
1146 		blisten((string ch, string msg) @trusted => onMessage(ch, msg));
1147 	}
1148 
1149 	/// Waits for messages and calls the callback with the channel and the message as arguments.
1150 	/// The timeout is passed over to the listener, which closes after the period of inactivity.
1151 	/// Use 0.seconds timeout to specify a very long time (365 days)
1152 	/// Errors will be sent to Callback Delegate on channel "Error".
1153 	Task listen(void delegate(string, string) @safe callback, Duration timeout = 0.seconds)
1154 	{
1155 		logTrace("Listen");
1156 		void impl() @safe {
1157 			logTrace("Listen");
1158 			m_waiter = Task.getThis();
1159 			scope(exit) m_waiter = Task();
1160 			Throwable ex;
1161 			m_listener = runTask({
1162 				try blisten(callback, timeout);
1163 				catch(Exception e) {
1164 					ex = e;
1165 					if (m_waiter != Task() && !m_listening) {
1166 						() @trusted { m_waiter.send(Action.STARTED); } ();
1167 						return;
1168 					}
1169 					callback("Error", e.msg);
1170 				}
1171 			});
1172 			m_mutex.performLocked!({
1173 				import std.datetime : usecs;
1174 				() @trusted { receiveTimeout(2.seconds, (Action act) { assert( act == Action.STARTED); }); } ();
1175 				if (ex) throw ex;
1176 				enforce(m_listening, "Failed to start listening, timeout of 2 seconds expired");
1177 			});
1178 
1179 			foreach (channel; m_pendingSubscriptions) {
1180 				subscribe(channel);
1181 			}
1182 
1183 			m_pendingSubscriptions = null;
1184 		}
1185 		inTask(&impl);
1186 		return m_listener;
1187 	}
1188 	/// ditto
1189 	deprecated("Use an @safe message callback")
1190 	Task listen(void delegate(string, string) @system onMessage, Duration timeout = 0.seconds)
1191 	{
1192 		return listen((string ch, string msg) @trusted => onMessage(ch, msg));
1193 	}
1194 }
1195 
1196 
1197 
1198 /** Range interface to a single Redis reply.
1199 */
1200 struct RedisReply(T = ubyte[]) {
1201 	static assert(isInputRange!RedisReply);
1202 
1203 	private {
1204 		uint m_magic = 0x15f67ab3;
1205 		RedisConnection m_conn;
1206 		LockedConnection!RedisConnection m_lockedConnection;
1207 	}
1208 
1209 	alias ElementType = T;
1210 
1211 	private this(RedisConnection conn)
1212 	{
1213 		m_conn = conn;
1214 		auto ctx = &conn.m_replyContext;
1215 		assert(ctx.refCount == 0);
1216 		*ctx = RedisReplyContext.init;
1217 		ctx.refCount++;
1218 	}
1219 
1220 	this(this)
1221 	{
1222 		assert(m_magic == 0x15f67ab3);
1223 		if (m_conn) {
1224 			auto ctx = &m_conn.m_replyContext;
1225 			assert(ctx.refCount > 0);
1226 			ctx.refCount++;
1227 		}
1228 	}
1229 
1230 	~this()
1231 	{
1232 		assert(m_magic == 0x15f67ab3);
1233 		if (m_conn) {
1234 			if (!--m_conn.m_replyContext.refCount)
1235 				drop();
1236 		}
1237 	}
1238 
1239 	@property bool empty() const { return !m_conn || m_conn.m_replyContext.index >= m_conn.m_replyContext.length; }
1240 
1241 	/** Returns the current element of the reply.
1242 
1243 		Note that byte and character arrays may be returned as slices to a
1244 		temporary buffer. This buffer will be invalidated on the next call to
1245 		$(D popFront), so it needs to be duplicated for permanent storage.
1246 	*/
1247 	@property T front()
1248 	{
1249 		assert(!empty, "Accessing the front of an empty RedisReply!");
1250 		auto ctx = &m_conn.m_replyContext;
1251 		if (!ctx.hasData) readData();
1252 
1253 		ubyte[] ret = ctx.data;
1254 
1255 		return convertToType!T(ret);
1256 	}
1257 
1258 	@property bool frontIsNull()
1259 	const {
1260 		assert(!empty, "Accessing the front of an empty RedisReply!");
1261 		return m_conn.m_replyContext.frontIsNull;
1262 	}
1263 
1264 	/** Pops the current element of the reply
1265 	*/
1266 	void popFront()
1267 	{
1268 		assert(!empty, "Popping the front of an empty RedisReply!");
1269 
1270 		auto ctx = &m_conn.m_replyContext;
1271 
1272 		if (!ctx.hasData) readData(); // ensure that we actually read the data entry from the wire
1273 		clearData();
1274 		ctx.index++;
1275 
1276 		if (ctx.index >= ctx.length && ctx.refCount == 1) {
1277 			ctx.refCount = 0;
1278 			m_conn = null;
1279 			m_lockedConnection.destroy();
1280 		}
1281 	}
1282 
1283 
1284 	/// Legacy property for hasNext/next based iteration
1285 	@property bool hasNext() const { return !empty; }
1286 
1287 	/// Legacy property for hasNext/next based iteration
1288 	TN next(TN : E[], E)()
1289 	{
1290 		assert(hasNext, "end of reply");
1291 
1292 		auto ret = front.dup;
1293 		popFront();
1294 		return () @trusted { return cast(TN)ret; } ();
1295 	}
1296 
1297 	void drop()
1298 	{
1299 		if (!m_conn) return;
1300 		while (!empty) popFront();
1301 	}
1302 
1303 	private void readData()
1304 	{
1305 		auto ctx = &m_conn.m_replyContext;
1306 		assert(!ctx.hasData && ctx.initialized);
1307 
1308 		if (ctx.multi)
1309 			readBulk(() @trusted { return cast(string)m_conn.conn.readLine(); } ());
1310 	}
1311 
1312 	private void clearData()
1313 	{
1314 		auto ctx = &m_conn.m_replyContext;
1315 		ctx.data = null;
1316 		ctx.hasData = false;
1317 	}
1318 
1319 	private @property void lockedConnection(ref LockedConnection!RedisConnection conn)
1320 	{
1321 		assert(m_conn !is null);
1322 		m_lockedConnection = conn;
1323 	}
1324 
1325 	private void initialize()
1326 	{
1327 		assert(m_conn !is null);
1328 		auto ctx = &m_conn.m_replyContext;
1329 		assert(!ctx.initialized);
1330 		ctx.initialized = true;
1331 
1332 		ubyte[] ln = m_conn.conn.readLine();
1333 
1334 		switch (ln[0]) {
1335 			default:
1336 				m_conn.conn.close();
1337 				throw new Exception(format("Unknown reply type: %s", cast(char)ln[0]));
1338 			case '+': ctx.data = ln[1 .. $]; ctx.hasData = true; break;
1339 			case '-': throw new Exception(() @trusted { return cast(string)ln[1 .. $]; } ());
1340 			case ':': ctx.data = ln[1 .. $]; ctx.hasData = true; break;
1341 			case '$':
1342 				readBulk(() @trusted { return cast(string)ln; } ());
1343 				break;
1344 			case '*':
1345 				if (ln.startsWith(cast(const(ubyte)[])"*-1")) {
1346 					ctx.length = 0; // TODO: make this NIL reply distinguishable from a 0-length array
1347 				} else {
1348 					ctx.multi = true;
1349 					scope (failure) m_conn.conn.close();
1350 					ctx.length = to!long(() @trusted { return cast(string)ln[1 .. $]; } ());
1351 				}
1352 				break;
1353 		}
1354 	}
1355 
1356 	private void readBulk(string sizeLn)
1357 	{
1358 		assert(m_conn !is null);
1359 		auto ctx = &m_conn.m_replyContext;
1360 		if (sizeLn.startsWith("$-1")) {
1361 			ctx.frontIsNull = true;
1362 			ctx.hasData = true;
1363 			ctx.data = null;
1364 		} else {
1365 			auto size = to!size_t(sizeLn[1 .. $]);
1366 			auto data = new ubyte[size];
1367 			m_conn.conn.read(data);
1368 			m_conn.conn.readLine();
1369 			ctx.frontIsNull = false;
1370 			ctx.hasData = true;
1371 			ctx.data = data;
1372 		}
1373 	}
1374 }
1375 
1376 class RedisProtocolException : Exception {
1377 	this(string message, string file = __FILE__, size_t line = __LINE__, Exception next = null)
1378 	{
1379 		super(message, file, line, next);
1380 	}
1381 }
1382 
1383 template isValidRedisValueReturn(T)
1384 {
1385 	import std.typecons;
1386 	static if (isInstanceOf!(Nullable, T)) {
1387 		enum isValidRedisValueReturn = isValidRedisValueType!(typeof(T.init.get()));
1388 	} else static if (isInstanceOf!(RedisReply, T)) {
1389 		enum isValidRedisValueReturn = isValidRedisValueType!(T.ElementType);
1390 	} else enum isValidRedisValueReturn = isValidRedisValueType!T;
1391 }
1392 
1393 template isValidRedisValueType(T)
1394 {
1395 	enum isValidRedisValueType = is(T : const(char)[]) || is(T : const(ubyte)[]) || is(T == long) || is(T == double) || is(T == bool);
1396 }
1397 
1398 private RedisReply!T getReply(T = ubyte)(RedisConnection conn)
1399 {
1400 	auto repl = RedisReply!T(conn);
1401 	repl.initialize();
1402 	return repl;
1403 }
1404 
1405 private struct RedisReplyContext {
1406 	long refCount = 0;
1407 	ubyte[] data;
1408 	bool hasData;
1409 	bool multi = false;
1410 	bool initialized = false;
1411 	bool frontIsNull = false;
1412 	long length = 1;
1413 	long index = 0;
1414 	ubyte[128] dataBuffer;
1415 }
1416 
1417 private final class RedisConnection {
1418 	private {
1419 		string m_host;
1420 		ushort m_port;
1421 		TCPConnection m_conn;
1422 		string m_password;
1423 		long m_selectedDB;
1424 		RedisReplyContext m_replyContext;
1425 	}
1426 
1427 	this(string host, ushort port)
1428 	{
1429 		m_host = host;
1430 		m_port = port;
1431 	}
1432 
1433 	@property TCPConnection conn() { return m_conn; }
1434 	@property void conn(TCPConnection conn) { m_conn = conn; }
1435 
1436 	void setAuth(string password)
1437 	{
1438 		if (m_password == password) return;
1439 		_request_reply(this, "AUTH", password);
1440 		m_password = password;
1441 	}
1442 
1443 	void setDB(long index)
1444 	{
1445 		if (index == m_selectedDB) return;
1446 		_request_reply(this, "SELECT", index);
1447 		m_selectedDB = index;
1448 	}
1449 
1450 	private static long countArgs(ARGS...)(scope ARGS args)
1451 	{
1452 		long ret = 0;
1453 		foreach (i, A; ARGS) {
1454 			static if (isArray!A && !(is(A : const(ubyte[])) || is(A : const(char[])))) {
1455 				foreach (arg; args[i])
1456 					ret += countArgs(arg);
1457 			} else ret++;
1458 		}
1459 		return ret;
1460 	}
1461 
1462 	unittest {
1463 		assert(countArgs() == 0);
1464 		assert(countArgs(1, 2, 3) == 3);
1465 		assert(countArgs("1", ["2", "3", "4"]) == 4);
1466 		assert(countArgs([["1", "2"], ["3"]]) == 3);
1467 	}
1468 
1469 	private static void writeArgs(R, ARGS...)(R dst, scope ARGS args)
1470 		if (isOutputRange!(R, char))
1471 	{
1472 		foreach (i, A; ARGS) {
1473 			static if (is(A == bool)) {
1474 				writeArgs(dst, args[i] ? "1" : "0");
1475 			} else static if (is(A : long) || is(A : real) || is(A == string)) {
1476 				auto alen = formattedLength(args[i]);
1477 				enum fmt = "$%d\r\n"~typeFormatString!A~"\r\n";
1478 				dst.formattedWrite(fmt, alen, args[i]);
1479 			} else static if (is(A : const(ubyte[])) || is(A : const(char[]))) {
1480 				dst.formattedWrite("$%s\r\n", args[i].length);
1481 				dst.put(args[i]);
1482 				dst.put("\r\n");
1483 			} else static if (isArray!A) {
1484 				foreach (arg; args[i])
1485 					writeArgs(dst, arg);
1486 			} else static assert(false, "Unsupported Redis argument type: " ~ A.stringof);
1487 		}
1488 	}
1489 
1490 	unittest {
1491 		import std.array : appender;
1492 		auto dst = appender!string;
1493 		writeArgs(dst, false, true, ["2", "3"], "4", 5.0);
1494 		assert(dst.data == "$1\r\n0\r\n$1\r\n1\r\n$1\r\n2\r\n$1\r\n3\r\n$1\r\n4\r\n$1\r\n5\r\n");
1495 	}
1496 
1497 	private static long formattedLength(ARG)(scope ARG arg)
1498 	{
1499 		static if (is(ARG == string)) return arg.length;
1500 		else {
1501 			import vibe.internal.rangeutil;
1502 			long length;
1503 			auto rangeCnt = RangeCounter(() @trusted { return &length; } ());
1504 			rangeCnt.formattedWrite(typeFormatString!ARG, arg);
1505 			return length;
1506 		}
1507 	}
1508 }
1509 
1510 private void _request_void(ARGS...)(RedisConnection conn, string command, scope ARGS args)
1511 {
1512 	import vibe.stream.wrapper;
1513 
1514 	if (!conn.conn || !conn.conn.connected) {
1515 		try conn.conn = connectTCP(conn.m_host, conn.m_port);
1516 		catch (Exception e) {
1517 			throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e);
1518 		}
1519 		conn.conn.tcpNoDelay = true;
1520 	}
1521 
1522 	auto nargs = conn.countArgs(args);
1523 	auto rng = streamOutputRange(conn.conn);
1524 	formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command);
1525 	RedisConnection.writeArgs(() @trusted { return &rng; } (), args);
1526 }
1527 
1528 private RedisReply!T _request_reply(T = ubyte[], ARGS...)(RedisConnection conn, string command, scope ARGS args)
1529 {
1530 	import vibe.stream.wrapper;
1531 
1532 	if (!conn.conn || !conn.conn.connected) {
1533 		try conn.conn = connectTCP(conn.m_host, conn.m_port);
1534 		catch (Exception e) {
1535 			throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e);
1536 		}
1537 		conn.conn.tcpNoDelay = true;
1538 	}
1539 
1540 	auto nargs = conn.countArgs(args);
1541 	auto rng = streamOutputRange(conn.conn);
1542 	formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command);
1543 	RedisConnection.writeArgs(() @trusted { return &rng; } (), args);
1544 	rng.flush();
1545 
1546 	return conn.getReply!T;
1547 }
1548 
1549 private T _request(T, ARGS...)(LockedConnection!RedisConnection conn, string command, scope ARGS args)
1550 {
1551 	import std.typecons;
1552 	static if (isInstanceOf!(RedisReply, T)) {
1553 		auto reply = _request_reply!(T.ElementType)(conn, command, args);
1554 		reply.lockedConnection = conn;
1555 		return reply;
1556 	} else static if (is(T == void)) {
1557 		_request_reply(conn, command, args);
1558 	} else static if (isInstanceOf!(Nullable, T)) {
1559 		alias TB = typeof(T.init.get());
1560 		auto reply = _request_reply!TB(conn, command, args);
1561 		T ret;
1562 		if (!reply.frontIsNull) ret = reply.front;
1563 		return ret;
1564 	} else {
1565 		auto reply = _request_reply!T(conn, command, args);
1566 		return reply.front;
1567 	}
1568 }
1569 
1570 private T convertToType(T)(ubyte[] data) /// NOTE: data must be unique!
1571 {
1572 	static if (isSomeString!T) () @trusted { validate(cast(T)data); } ();
1573 
1574 	static if (is(T == ubyte[])) return data;
1575 	else static if (is(T == string)) return cast(T)data.idup;
1576 	else static if (is(T == bool)) return data[0] == '1';
1577 	else static if (is(T == int) || is(T == long) || is(T == size_t) || is(T == double)) {
1578 		auto str = () @trusted { return cast(string)data; } ();
1579 		return parse!T(str);
1580 	}
1581 	else static assert(false, "Unsupported Redis reply type: " ~ T.stringof);
1582 }
1583 
1584 private template typeFormatString(T)
1585 {
1586 	static if (isFloatingPoint!T) enum typeFormatString = "%.16g";
1587 	else enum typeFormatString = "%s";
1588 }