1 /**
2 	Redis database client implementation.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Jan Krüger, Sönke Ludwig, Michael Eisendle, Etienne Cimon
7 */
8 module vibe.db.redis.redis;
9 
10 public import vibe.core.net;
11 
12 import vibe.core.connectionpool;
13 import vibe.core.core;
14 import vibe.core.log;
15 import vibe.inet.url;
16 import vibe.internal.allocator;
17 import vibe.internal.freelistref;
18 import vibe.stream.operations;
19 import std.conv;
20 import std.exception;
21 import std.format;
22 import std.range : isInputRange, isOutputRange;
23 import std.string;
24 import std.traits;
25 import std.typecons : Nullable;
26 import std.utf;
27 
28 @safe:
29 
30 
31 /**
32 	Returns a RedisClient that can be used to communicate to the specified database server.
33 */
34 RedisClient connectRedis(string host, ushort port = RedisClient.defaultPort)
35 {
36 	return new RedisClient(host, port);
37 }
38 
39 /**
40 	Returns a Redis database connection instance corresponding to the given URL.
41 
42 	The URL must be of the format "redis://server[:port]/dbnum".
43 
44 	Authentication:
45 		Authenticated connections are supported by using a URL connection string
46 		such as "redis://password@host".
47 
48 	Examples:
49 		---
50 		// connecting with default settings:
51 		auto redisDB = connectRedisDB("redis://127.0.0.1");
52 		---
53 
54 		---
55 		// connecting using the URL form with custom settings
56 		auto redisDB = connectRedisDB("redis://password:myremotehost/3?maxmemory=10000000");
57 		---
58 
59 	Params:
60 		url = Redis URI scheme for a Redis database instance
61 
62 	Returns:
63 		A new RedisDatabase instance that can be used to access the database.
64 
65 	See_also: $(LINK2 https://www.iana.org/assignments/uri-schemes/prov/redis, Redis URI scheme)
66 */
67 RedisDatabase connectRedisDB(URL url)
68 {
69 	auto cli = connectRedis(url.host, url.port != 0 ? url.port : RedisClient.defaultPort);
70 
71 	if (!url.queryString.empty)
72 	{
73 		import vibe.inet.webform : FormFields, parseURLEncodedForm;
74 		auto query = FormFields.init;
75 		parseURLEncodedForm(url.queryString, query);
76 		foreach (param, val; query.byKeyValue)
77 		{
78 			switch (param)
79 			{
80 				/**
81 				The password to use for the Redis AUTH command comes from either the
82 				password portion of the "userinfo" URI field or the value from the
83 				key-value pair from the "query" URI field with the key "password".
84 				If both the password portion of the "userinfo" URI field and a
85 				"query" URI field key-value pair with the key "password" are present,
86 				the semantics for what password to use for authentication are not
87 				well-defined.  Such situations therefore ought to be avoided.
88 				*/
89 				case "password":
90 					if (!url.password.empty)
91 						cli.auth(val);
92 					break;
93 				default:
94 					throw new Exception(`Redis config parameter "` ~ param ~ `" isn't supported`);
95 			}
96 		}
97 	}
98 
99 	/*
100 	Redis' current optional authentication mechanism does not employ a
101 	username, but this might change in the future
102 	*/
103 	if (!url.password.empty)
104 		cli.auth(url.password);
105 
106 	long databaseIndex;
107 	if (url.localURI.length >= 2)
108 		databaseIndex = url.pathString[1 .. $].to!long;
109 
110 	return cli.getDatabase(databaseIndex);
111 }
112 
113 /**
114 	A redis client with connection pooling.
115 */
116 final class RedisClient {
117 	private {
118 		ConnectionPool!RedisConnection m_connections;
119 		string m_authPassword;
120 		string m_version;
121 		long m_selectedDB;
122 	}
123 
124 	enum defaultPort = 6379;
125 
126 	this(string host = "127.0.0.1", ushort port = defaultPort)
127 	{
128 		m_connections = new ConnectionPool!RedisConnection({
129 			return new RedisConnection(host, port);
130 		});
131 	}
132 
133 	/** Release all connections that are not in use. Call before exiting the
134 	 * program to avoid relying on the GC to clean up the sockets.
135 	 */
136 	void releaseUnusedConnections() @safe
137 	{
138 		// the try/catch in this is for the old vibe.d:core library, not
139 		// necessary in vibe-core, but we have to work with both.
140 		m_connections.removeUnused((conn) @safe nothrow {
141 			try {
142                  conn.m_conn.close();
143 			 } catch(Exception e) {
144 				 logDebug("Failed to close unused Redis connection: %s", e.msg);
145 			 }
146 		});
147 	}
148 
149 	/// Returns Redis version
150 	@property string redisVersion()
151 	{
152 		if(m_version == "")
153 		{
154 			import std.string;
155 			auto info = info();
156 			auto lines = info.splitLines();
157 			if (lines.length > 1) {
158 				foreach (string line; lines) {
159 					auto lineParams = line.split(":");
160 					if (lineParams.length > 1 && lineParams[0] == "redis_version") {
161 						m_version = lineParams[1];
162 						break;
163 					}
164 				}
165 			}
166 		}
167 
168 		return m_version;
169 	}
170 
171 	/** Returns a handle to the given database.
172 	*/
173 	RedisDatabase getDatabase(long index) { return RedisDatabase(this, index); }
174 
175 	/** Creates a RedisSubscriber instance for launching a pubsub listener
176 	*/
177 	RedisSubscriber createSubscriber() {
178 		return RedisSubscriber(this);
179 	}
180 
181 	/*
182 		Connection
183 	*/
184 
185 	/// Authenticate to the server
186 	void auth(string password) { m_authPassword = password; }
187 	/// Echo the given string
188 	T echo(T, U)(U data) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("ECHO", data); }
189 	/// Ping the server
190 	void ping() { request("PING"); }
191 	/// Close the connection
192 	void quit() { request("QUIT"); }
193 
194 	/*
195 		Server
196 	*/
197 
198 	//TODO: BGREWRITEAOF
199 	//TODO: BGSAVE
200 
201 	/// Get the value of a configuration parameter
202 	T getConfig(T)(string parameter) if(isValidRedisValueReturn!T) { return request!T("CONFIG", "GET", parameter); }
203 	/// Set a configuration parameter to the given value
204 	void setConfig(T)(string parameter, T value) if(isValidRedisValueType!T) { request("CONFIG", "SET", parameter, value); }
205 	/// Reset the stats returned by INFO
206 	void configResetStat() { request("CONFIG", "RESETSTAT"); }
207 
208 	//TOOD: Debug Object
209 	//TODO: Debug Segfault
210 
211 	/** Deletes all keys from all databases.
212 
213 		See_also: $(LINK2 http://redis.io/commands/flushall, FLUSHALL)
214 	*/
215 	void deleteAll() { request("FLUSHALL"); }
216 
217 	/// Get information and statistics about the server
218 	string info() { return request!string("INFO"); }
219 	/// Get the UNIX time stamp of the last successful save to disk
220 	long lastSave() { return request!long("LASTSAVE"); }
221 	//TODO monitor
222 	/// Synchronously save the dataset to disk
223 	void save() { request("SAVE"); }
224 	/// Synchronously save the dataset to disk and then shut down the server
225 	void shutdown() { request("SHUTDOWN"); }
226 	/// Make the server a slave of another instance, or promote it as master
227 	void slaveOf(string host, ushort port) { request("SLAVEOF", host, port); }
228 
229 	//TODO slowlog
230 	//TODO sync
231 
232 	private T request(T = void, ARGS...)(string command, scope ARGS args)
233 	{
234 		return requestDB!(T, ARGS)(m_selectedDB, command, args);
235 	}
236 
237 	private T requestDB(T, ARGS...)(long db, string command, scope ARGS args)
238 	{
239 		auto conn = m_connections.lockConnection();
240 		conn.setAuth(m_authPassword);
241 		conn.setDB(db);
242 		version (RedisDebug) {
243 			import std.conv;
244 			string debugargs = command;
245 			foreach (i, A; ARGS) debugargs ~= ", " ~ args[i].to!string;
246 		}
247 
248 		static if (is(T == void)) {
249 			version (RedisDebug) logDebug("Redis request: %s => void", debugargs);
250 			_request!void(conn, command, args);
251 		} else static if (!isInstanceOf!(RedisReply, T)) {
252 			auto ret = _request!T(conn, command, args);
253 			version (RedisDebug) logDebug("Redis request: %s => %s", debugargs, ret.to!string);
254 			return ret;
255 		} else {
256 			auto ret = _request!T(conn, command, args);
257 			version (RedisDebug) logDebug("Redis request: %s => RedisReply", debugargs);
258 			return ret;
259 		}
260 	}
261 }
262 
263 
264 /**
265 	Accesses the contents of a Redis database
266 */
267 struct RedisDatabase {
268 	private {
269 		RedisClient m_client;
270 		long m_index;
271 	}
272 
273 	private this(RedisClient client, long index)
274 	{
275 		m_client = client;
276 		m_index = index;
277 	}
278 
279 	/** The Redis client with which the database is accessed.
280 	*/
281 	@property inout(RedisClient) client() inout { return m_client; }
282 
283 	/** Index of the database.
284 	*/
285 	@property long index() const { return m_index; }
286 
287 	/** Deletes all keys of the database.
288 
289 		See_also: $(LINK2 http://redis.io/commands/flushdb, FLUSHDB)
290 	*/
291 	void deleteAll() { request!void("FLUSHDB"); }
292 	/// Delete a key
293 	long del(scope string[] keys...) { return request!long("DEL", keys); }
294 	/// Determine if a key exists
295 	bool exists(string key) { return request!bool("EXISTS", key); }
296 	/// Set a key's time to live in seconds
297 	bool expire(string key, long seconds) { return request!bool("EXPIRE", key, seconds); }
298 	/// Set a key's time to live with D notation. E.g. $(D 5.minutes) for 60 * 5 seconds.
299 	bool expire(string key, Duration timeout) { return request!bool("PEXPIRE", key, timeout.total!"msecs"); }
300 	/// Set the expiration for a key as a UNIX timestamp
301 	bool expireAt(string key, long timestamp) { return request!bool("EXPIREAT", key, timestamp); }
302 	/// Find all keys matching the given glob-style pattern (Supported wildcards: *, ?, [ABC])
303 	RedisReply!T keys(T = string)(string pattern) if(isValidRedisValueType!T) { return request!(RedisReply!T)("KEYS", pattern); }
304 	/// Move a key to another database
305 	bool move(string key, long db) { return request!bool("MOVE", key, db); }
306 	/// Remove the expiration from a key
307 	bool persist(string key) { return request!bool("PERSIST", key); }
308 	//TODO: object
309 	/// Return a random key from the keyspace
310 	string randomKey() { return request!string("RANDOMKEY"); }
311 	/// Rename a key
312 	void rename(string key, string newkey) { request("RENAME", key, newkey); }
313 	/// Rename a key, only if the new key does not exist
314 	bool renameNX(string key, string newkey) { return request!bool("RENAMENX", key, newkey); }
315 	//TODO sort
316 	/// Get the time to live for a key
317 	long ttl(string key) { return request!long("TTL", key); }
318 	/// Get the time to live for a key in milliseconds
319 	long pttl(string key) { return request!long("PTTL", key); }
320 	/// Determine the type stored at key (string, list, set, zset and hash.)
321 	string type(string key) { return request!string("TYPE", key); }
322 
323 	/*
324 		String Commands
325 	*/
326 
327 	/// Append a value to a key
328 	long append(T)(string key, T suffix) if(isValidRedisValueType!T) { return request!long("APPEND", key, suffix); }
329 	/// Decrement the integer value of a key by one
330 	long decr(string key, long value = 1) { return value == 1 ? request!long("DECR", key) : request!long("DECRBY", key, value); }
331 	/// Get the value of a key
332 	T get(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("GET", key); }
333 	/// Returns the bit value at offset in the string value stored at key
334 	bool getBit(string key, long offset) { return request!bool("GETBIT", key, offset); }
335 	/// Get a substring of the string stored at a key
336 	T getRange(T = string)(string key, long start, long end) if(isValidRedisValueReturn!T) { return request!T("GETRANGE", key, start, end); }
337 	/// Set the string value of a key and return its old value
338 	T getSet(T = string, U)(string key, U value) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("GETSET", key, value); }
339 	/// Increment the integer value of a key
340 	long incr(string key, long value = 1) { return value == 1 ? request!long("INCR", key) : request!long("INCRBY", key, value); }
341 	/// Increment the real number value of a key
342 	long incr(string key, double value) { return request!long("INCRBYFLOAT", key, value); }
343 	/// Get the values of all the given keys
344 	RedisReply!T mget(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("MGET", keys); }
345 
346 	/// Set multiple keys to multiple values
347 	void mset(ARGS...)(ARGS args)
348 	{
349 		static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value");
350 		foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings.");
351 		request("MSET", args);
352 	}
353 
354 	/// Set multiple keys to multiple values, only if none of the keys exist
355 	bool msetNX(ARGS...)(ARGS args) {
356 		static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value");
357 		foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings.");
358 		return request!bool("MSETEX", args);
359 	}
360 
361 	/// Set the string value of a key
362 	void set(T)(string key, T value) if(isValidRedisValueType!T) { request("SET", key, value); }
363 	/// Set the value of a key, only if the key does not exist
364 	bool setNX(T)(string key, T value) if(isValidRedisValueType!T) { return request!bool("SETNX", key, value); }
365 	/// Set the value of a key, only if the key already exists
366 	bool setXX(T)(string key, T value) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "XX"); }
367 	/// Set the value of a key, only if the key does not exist, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes.
368 	bool setNX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "NX"); }
369 	/// Set the value of a key, only if the key already exists, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes.
370 	bool setXX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "XX"); }
371 	/// Sets or clears the bit at offset in the string value stored at key
372 	bool setBit(string key, long offset, bool value) { return request!bool("SETBIT", key, offset, value ? "1" : "0"); }
373 	/// Set the value and expiration of a key
374 	void setEX(T)(string key, long seconds, T value) if(isValidRedisValueType!T) { request("SETEX", key, seconds, value); }
375 	/// Overwrite part of a string at key starting at the specified offset
376 	long setRange(T)(string key, long offset, T value) if(isValidRedisValueType!T) { return request!long("SETRANGE", key, offset, value); }
377 	/// Get the length of the value stored in a key
378 	long strlen(string key) { return request!long("STRLEN", key); }
379 
380 	/*
381 		Hashes
382 	*/
383 	/// Delete one or more hash fields
384 	long hdel(string key, scope string[] fields...) { return request!long("HDEL", key, fields); }
385 	/// Determine if a hash field exists
386 	bool hexists(string key, string field) { return request!bool("HEXISTS", key, field); }
387 	/// Set multiple hash fields to multiple values
388 	void hset(T)(string key, string field, T value) if(isValidRedisValueType!T) { request("HSET", key, field, value); }
389 	/// Set the value of a hash field, only if the field does not exist
390 	bool hsetNX(T)(string key, string field, T value) if(isValidRedisValueType!T) { return request!bool("HSETNX", key, field, value); }
391 	/// Get the value of a hash field.
392 	T hget(T = string)(string key, string field) if(isValidRedisValueReturn!T) { return request!T("HGET", key, field); }
393 	/// Get all the fields and values in a hash
394 	RedisReply!T hgetAll(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HGETALL", key); }
395 	/// Increment the integer value of a hash field
396 	long hincr(string key, string field, long value=1) { return request!long("HINCRBY", key, field, value); }
397 	/// Increment the real number value of a hash field
398 	long hincr(string key, string field, double value) { return request!long("HINCRBYFLOAT", key, field, value); }
399 	/// Get all the fields in a hash
400 	RedisReply!T hkeys(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HKEYS", key); }
401 	/// Get the number of fields in a hash
402 	long hlen(string key) { return request!long("HLEN", key); }
403 	/// Get the values of all the given hash fields
404 	RedisReply!T hmget(T = string)(string key, scope string[] fields...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HMGET", key, fields); }
405 	/// Set multiple hash fields to multiple values
406 	void hmset(ARGS...)(string key, ARGS args) { request("HMSET", key, args); }
407 
408 	/// Get all the values in a hash
409 	RedisReply!T hvals(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HVALS", key); }
410 
411 	/*
412 		Lists
413 	*/
414 	/// Get an element from a list by its index
415 	T lindex(T = string)(string key, long index) if(isValidRedisValueReturn!T) { return request!T("LINDEX", key, index); }
416 	/// Insert value in the list stored at key before the reference value pivot.
417 	long linsertBefore(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "BEFORE", pivot, value); }
418 	/// Insert value in the list stored at key after the reference value pivot.
419 	long linsertAfter(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "AFTER", pivot, value); }
420 	/// Returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned.
421 	long llen(string key) { return request!long("LLEN", key); }
422 	/// Insert all the specified values at the head of the list stored at key.
423 	long lpush(ARGS...)(string key, ARGS args) { return request!long("LPUSH", key, args); }
424 	/// Inserts value at the head of the list stored at key, only if key already exists and holds a list.
425 	long lpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("LPUSHX", key, value); }
426 	/// Insert all the specified values at the tail of the list stored at key.
427 	long rpush(ARGS...)(string key, ARGS args) { return request!long("RPUSH", key, args); }
428 	/// Inserts value at the tail of the list stored at key, only if key already exists and holds a list.
429 	long rpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("RPUSHX", key, value); }
430 	/// Returns the specified elements of the list stored at key.
431 	RedisReply!T lrange(T = string)(string key, long start, long stop) { return request!(RedisReply!T)("LRANGE",  key, start, stop); }
432 	/// Removes the first count occurrences of elements equal to value from the list stored at key.
433 	long lrem(T)(string key, long count, T value) if(isValidRedisValueType!T) { return request!long("LREM", key, count, value); }
434 	/// Sets the list element at index to value.
435 	void lset(T)(string key, long index, T value) if(isValidRedisValueType!T) { request("LSET", key, index, value); }
436 	/// Trim an existing list so that it will contain only the specified range of elements specified.
437 	/// Equivalent to $(D range = range[start .. stop+1])
438 	void ltrim(string key, long start, long stop) { request("LTRIM",  key, start, stop); }
439 	/// Removes and returns the last element of the list stored at key.
440 	T rpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("RPOP", key); }
441 	/// Removes and returns the first element of the list stored at key.
442 	T lpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("LPOP", key); }
443 	/// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks
444 	/// the connection when there are no elements to pop from any of the given lists.
445 	Nullable!(Tuple!(string, T)) blpop(T = string)(string key, long seconds) if(isValidRedisValueReturn!T)
446 	{
447 		auto reply = request!(RedisReply!(ubyte[]))("BLPOP", key, seconds);
448 		Nullable!(Tuple!(string, T)) ret;
449 		if (reply.empty || reply.frontIsNull) return ret;
450 		string rkey = reply.front.convertToType!string();
451 		reply.popFront();
452 		ret = tuple(rkey, reply.front.convertToType!T());
453 		return ret;
454 	}
455 	/// Atomically returns and removes the last element (tail) of the list stored at source,
456 	/// and pushes the element at the first element (head) of the list stored at destination.
457 	T rpoplpush(T = string)(string key, string destination) if(isValidRedisValueReturn!T) { return request!T("RPOPLPUSH", key, destination); }
458 
459 	/*
460 		Sets
461 	*/
462 	/// Add the specified members to the set stored at key. Specified members that are already a member of this set are ignored.
463 	/// If key does not exist, a new set is created before adding the specified members.
464 	long sadd(ARGS...)(string key, ARGS args) { return request!long("SADD", key, args); }
465 	/// Returns the set cardinality (number of elements) of the set stored at key.
466 	long scard(string key) { return request!long("SCARD", key); }
467 	/// Returns the members of the set resulting from the difference between the first set and all the successive sets.
468 	RedisReply!T sdiff(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SDIFF", keys); }
469 	/// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination.
470 	/// If destination already exists, it is overwritten.
471 	long sdiffStore(string destination, scope string[] keys...) { return request!long("SDIFFSTORE", destination, keys); }
472 	/// Returns the members of the set resulting from the intersection of all the given sets.
473 	RedisReply!T sinter(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SINTER", keys); }
474 	/// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination.
475 	/// If destination already exists, it is overwritten.
476 	long sinterStore(string destination, scope string[] keys...) { return request!long("SINTERSTORE", destination, keys); }
477 	/// Returns if member is a member of the set stored at key.
478 	bool sisMember(T)(string key, T member) if(isValidRedisValueType!T) { return request!bool("SISMEMBER", key, member); }
479 	/// Returns all the members of the set value stored at key.
480 	RedisReply!T smembers(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SMEMBERS", key); }
481 	/// Move member from the set at source to the set at destination. This operation is atomic.
482 	/// In every given moment the element will appear to be a member of source or destination for other clients.
483 	bool smove(T)(string source, string destination, T member) if(isValidRedisValueType!T) { return request!bool("SMOVE", source, destination, member); }
484 	/// Removes and returns a random element from the set value stored at key.
485 	T spop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SPOP", key ); }
486 	/// Returns a random element from the set stored at key.
487 	T srandMember(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SRANDMEMBER", key ); }
488 	///returns count random elements from the set stored at key
489 	RedisReply!T srandMember(T = string)(string key, long count) if(isValidRedisValueReturn!T) { return request!(RedisReply!T)("SRANDMEMBER", key, count ); }
490 
491 
492 	/// Remove the specified members from the set stored at key.
493 	long srem(ARGS...)(string key, ARGS args) { return request!long("SREM", key, args); }
494 	/// Returns the members of the set resulting from the union of all the given sets.
495 	RedisReply!T sunion(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SUNION", keys); }
496 	/// This command is equal to SUNION, but instead of returning the resulting set, it is stored in destination.
497 	long sunionStore(scope string[] keys...) { return request!long("SUNIONSTORE", keys); }
498 
499 	/*
500 		Sorted Sets
501 	*/
502 	/// Add one or more members to a sorted set, or update its score if it already exists
503 	long zadd(ARGS...)(string key, ARGS args) { return request!long("ZADD", key, args); }
504 	/// Returns the sorted set cardinality (number of elements) of the sorted set stored at key.
505 	long zcard(string key) { return request!long("ZCARD", key); }
506 	/// Returns the number of elements in the sorted set at key with a score between min and max
507 	long zcount(string RNG = "[]")(string key, double min, double max) { return request!long("ZCOUNT", key, getMinMaxArgs!RNG(min, max)); }
508 	/// Increments the score of member in the sorted set stored at key by increment.
509 	double zincrby(T)(string key, double value, T member) if (isValidRedisValueType!T) { return request!double("ZINCRBY", key, value, member); }
510 	//TODO: zinterstore
511 	/// Returns the specified range of elements in the sorted set stored at key.
512 	RedisReply!T zrange(T = string)(string key, long start, long end, bool with_scores = false)
513 		if(isValidRedisValueType!T)
514 	{
515 		if (with_scores) return request!(RedisReply!T)("ZRANGE", key, start, end, "WITHSCORES");
516 		else return request!(RedisReply!T)("ZRANGE", key, start, end);
517 	}
518 
519 	long zlexCount(string key, string min = "-", string max = "+") { return request!long("ZLEXCOUNT", key, min, max); }
520 
521 	/// When all the elements in a sorted set are inserted with the same score, in order to force lexicographical ordering,
522 	/// this command returns all the elements in the sorted set at key with a value between min and max.
523 	RedisReply!T zrangeByLex(T = string)(string key, string min = "-", string max = "+", long offset = 0, long count = -1)
524 		if(isValidRedisValueType!T)
525 	{
526 		if (offset > 0 || count != -1) return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max, "LIMIT", offset, count);
527 		else return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max);
528 	}
529 
530 	/// Returns all the elements in the sorted set at key with a score between start and end inclusively
531 	RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, bool with_scores = false)
532 		if(isValidRedisValueType!T)
533 	{
534 		if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES");
535 		else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end));
536 	}
537 
538 	/// Computes an internal list of elements in the sorted set at key with a score between start and end inclusively,
539 	/// and returns a range subselection similar to $(D results[offset .. offset+count])
540 	RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, long offset, long count, bool with_scores = false)
541 		if(isValidRedisValueType!T)
542 	{
543 		assert(offset >= 0);
544 		assert(count >= 0);
545 		if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES", "LIMIT", offset, count);
546 		else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "LIMIT", offset, count);
547 	}
548 
549 	/// Returns the rank of member in the sorted set stored at key, with the scores ordered from low to high.
550 	long zrank(T)(string key, T member)
551 		if (isValidRedisValueType!T)
552 	{
553 		auto str = request!string("ZRANK", key, member);
554 		return str != "" ? parse!long(str) : -1;
555 	}
556 
557 	/// Removes the specified members from the sorted set stored at key.
558 	long zrem(ARGS...)(string key, ARGS members) { return request!long("ZREM", key, members); }
559 	/// Removes all elements in the sorted set stored at key with rank between start and stop.
560 	long zremRangeByRank(string key, long start, long stop) { return request!long("ZREMRANGEBYRANK", key, start, stop); }
561 	/// Removes all elements in the sorted set stored at key with a score between min and max (inclusive).
562 	long zremRangeByScore(string RNG = "[]")(string key, double min, double max) { return request!long("ZREMRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max));}
563 	/// Returns the specified range of elements in the sorted set stored at key.
564 	RedisReply!T zrevRange(T = string)(string key, long start, long end, bool with_scores = false)
565 		if(isValidRedisValueType!T)
566 	{
567 		if (with_scores) return request!(RedisReply!T)("ZREVRANGE", key, start, end, "WITHSCORES");
568 		else return request!(RedisReply!T)("ZREVRANGE", key, start, end);
569 	}
570 
571 	/// Returns all the elements in the sorted set at key with a score between max and min (including elements with score equal to max or min).
572 	RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, bool with_scores=false)
573 		if(isValidRedisValueType!T)
574 	{
575 		if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES");
576 		else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max));
577 	}
578 
579 	/// Computes an internal list of elements in the sorted set at key with a score between max and min, and
580 	/// returns a window of elements selected in a way equivalent to $(D results[offset .. offset + count])
581 	RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, long offset, long count, bool with_scores=false)
582 		if(isValidRedisValueType!T)
583 	{
584 		assert(offset >= 0);
585 		assert(count >= 0);
586 		if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES", "LIMIT", offset, count);
587 		else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "LIMIT", offset, count);
588 	}
589 
590 	/// Returns the rank of member in the sorted set stored at key, with the scores ordered from high to low.
591 	long zrevRank(T)(string key, T member)
592 		if (isValidRedisValueType!T)
593 	{
594 		auto str = request!string("ZREVRANK", key, member);
595 		return str != "" ? parse!long(str) : -1;
596 	}
597 
598 	/// Returns the score of member in the sorted set at key.
599 	RedisReply!T zscore(T = string, U)(string key, U member)
600 		if(isValidRedisValueType!T && isValidRedisValueType!U)
601 	{
602 		return request!(RedisReply!T)("ZSCORE", key, member);
603 	}
604 
605 	/*
606 		Hyperloglog
607 	*/
608 
609 	/// Adds one or more Keys to a HyperLogLog data structure .
610 	long pfadd(ARGS...)(string key, ARGS args) { return request!long("PFADD", key, args); }
611 
612 	/** Returns the approximated cardinality computed by the HyperLogLog data
613 		structure stored at the specified key.
614 
615 		When called with a single key, returns the approximated cardinality
616 		computed by the HyperLogLog data structure stored at the specified
617 		variable, which is 0 if the variable does not exist.
618 
619 		When called with multiple keys, returns the approximated cardinality
620 		of the union of the HyperLogLogs passed, by internally merging the
621 		HyperLogLogs stored at the provided keys into a temporary HyperLogLog.
622 	*/
623 	long pfcount(scope string[] keys...) { return request!long("PFCOUNT", keys); }
624 
625 	/// Merge multiple HyperLogLog values into a new one.
626 	void pfmerge(ARGS...)(string destkey, ARGS args) { request("PFMERGE", destkey, args); }
627 
628 
629 	//TODO: zunionstore
630 
631 	/*
632 		Pub / Sub
633 	*/
634 
635 	/// Publishes a message to all clients subscribed at the channel
636 	long publish(string channel, string message)
637 	{
638 		auto str = request!string("PUBLISH", channel, message);
639 		return str != "" ? parse!long(str) : -1;
640 	}
641 
642 	/// Inspect the state of the Pub/Sub subsystem
643 	RedisReply!T pubsub(T = string)(string subcommand, scope string[] args...)
644 		if(isValidRedisValueType!T)
645 	{
646 		return request!(RedisReply!T)("PUBSUB", subcommand, args);
647 	}
648 
649 	/*
650 		TODO: Transactions
651 	*/
652 	/// Return the number of keys in the selected database
653 	long dbSize() { return request!long("DBSIZE"); }
654 
655 	/*
656 		LUA Scripts
657 	*/
658 	/// Execute a Lua script server side
659 	RedisReply!T eval(T = string, ARGS...)(string lua_code, scope string[] keys, scope ARGS args)
660 		if(isValidRedisValueType!T)
661 	{
662 		return request!(RedisReply!T)("EVAL", lua_code, keys.length, keys, args);
663 	}
664 	/// Evaluates a script cached on the server side by its SHA1 digest. Scripts are cached on the server side using the scriptLoad function.
665 	RedisReply!T evalSHA(T = string, ARGS...)(string sha, scope string[] keys, scope ARGS args)
666 		if(isValidRedisValueType!T)
667 	{
668 		return request!(RedisReply!T)("EVALSHA", sha, keys.length, keys, args);
669 	}
670 
671 	//scriptExists
672 	//scriptFlush
673 	//scriptKill
674 
675 	/// Load a script into the scripts cache, without executing it. Run it using evalSHA.
676 	string scriptLoad(string lua_code) { return request!string("SCRIPT", "LOAD", lua_code); }
677 
678 	/// Run the specified command and arguments in the Redis database server
679 	T request(T = void, ARGS...)(string command, scope ARGS args)
680 	{
681 		return m_client.requestDB!(T, ARGS)(m_index, command, args);
682 	}
683 
684 	private static string[2] getMinMaxArgs(string RNG)(double min, double max)
685 	{
686 		// TODO: avoid GC allocations
687 		static assert(RNG.length == 2, "The RNG range specification must be two characters long");
688 
689 		string[2] ret;
690 		string mins, maxs;
691 		mins = min == -double.infinity ? "-inf" : min == double.infinity ? "+inf" : format(typeFormatString!double, min);
692 		maxs = max == -double.infinity ? "-inf" : max == double.infinity ? "+inf" : format(typeFormatString!double, max);
693 
694 		static if (RNG[0] == '[') ret[0] = mins;
695 		else static if (RNG[0] == '(') ret[0] = '('~mins;
696 		else static assert(false, "Opening range specification mist be either '[' or '('.");
697 
698 		static if (RNG[1] == ']') ret[1] = maxs;
699 		else static if (RNG[1] == ')') ret[1] = '('~maxs;
700 		else static assert(false, "Closing range specification mist be either ']' or ')'.");
701 
702 		return ret;
703 	}
704 }
705 
706 
707 /**
708 	A redis subscription listener
709 */
710 import std.datetime;
711 import std.variant;
712 import std.typecons : Tuple, tuple;
713 import std.container : Array;
714 import std.algorithm : canFind;
715 import std.range : takeOne;
716 import std.array : array;
717 
718 import vibe.core.concurrency;
719 import vibe.core.sync;
720 
721 alias RedisSubscriber = FreeListRef!RedisSubscriberImpl;
722 
723 final class RedisSubscriberImpl {
724 	private {
725 		RedisClient m_client;
726 		LockedConnection!RedisConnection m_lockedConnection;
727 		bool[string] m_subscriptions;
728 		string[] m_pendingSubscriptions;
729 		bool m_listening;
730 		bool m_stop;
731 		Task m_listener;
732 		Task m_listenerHelper;
733 		Task m_waiter;
734 		Task m_stopWaiter;
735 		InterruptibleRecursiveTaskMutex m_mutex;
736 		InterruptibleTaskMutex m_connMutex;
737 	}
738 
739 	private enum Action {
740 		DATA,
741 		STOP,
742 		STARTED,
743 		SUBSCRIBE,
744 		UNSUBSCRIBE
745 	}
746 
747 	@property bool isListening() const {
748 		return m_listening;
749 	}
750 
751 	/// Get a list of channels with active subscriptions
752 	@property string[] subscriptions() const {
753 		return () @trusted { return m_subscriptions.keys; } ();
754 	}
755 
756 	bool hasSubscription(string channel) const {
757 		return (channel in m_subscriptions) !is null && m_subscriptions[channel];
758 	}
759 
760 	this(RedisClient client) {
761 
762 		logTrace("this()");
763 		m_client = client;
764 		m_mutex = new InterruptibleRecursiveTaskMutex;
765 		m_connMutex = new InterruptibleTaskMutex;
766 	}
767 
768 	// FIXME: instead of waiting here, the class must be reference counted
769 	// and destructions needs to be defered until everything is stopped
770 	~this() {
771 		logTrace("~this");
772 		waitForStop();
773 	}
774 
775 	// Task will block until the listener is finished
776 	private void waitForStop()
777 	scope {
778 		logTrace("waitForStop");
779 		if (!m_listening) return;
780 
781 		void impl() @safe {
782 			m_mutex.performLocked!({
783 				m_stopWaiter = Task.getThis();
784 			});
785 			if (!m_listening) return; // verify again in case the mutex was locked by bstop
786 			scope(exit) {
787 				m_mutex.performLocked!({
788 					m_stopWaiter = Task();
789 				});
790 			}
791 			bool stopped;
792 			do {
793 				() @trusted { receive((Action act) { if (act == Action.STOP) stopped = true;  }); } ();
794 			} while (!stopped);
795 
796 			enforce(stopped, "Failed to wait for Redis listener to stop");
797 		}
798 		inTask(&impl);
799 	}
800 
801 	/// Stop listening and yield until the operation is complete.
802 	void bstop(){
803 		logTrace("bstop");
804 		if (!m_listening) return;
805 
806 		void impl() @safe {
807 			m_mutex.performLocked!({
808 				m_waiter = Task.getThis();
809 				scope(exit) m_waiter = Task();
810 				stop();
811 
812 				bool stopped;
813 				do {
814 					if (!() @trusted { return receiveTimeout(3.seconds, (Action act) { if (act == Action.STOP) stopped = true;  }); } ())
815 						break;
816 				} while (!stopped);
817 
818 				enforce(stopped, "Failed to wait for Redis listener to stop");
819 			});
820 		}
821 		inTask(&impl);
822 	}
823 
824 	/// Stop listening asynchroneously
825 	void stop(){
826 		logTrace("stop");
827 		if (!m_listening)
828 			return;
829 
830 		void impl() @safe {
831 			m_mutex.performLocked!({
832 				m_stop = true;
833 				() @trusted { m_listener.send(Action.STOP); } ();
834 				// send a message to wake up the listenerHelper from the reply
835 				if (m_subscriptions.length > 0) {
836 					m_connMutex.performLocked!(() {
837 						_request_void(m_lockedConnection, "UNSUBSCRIBE", () @trusted { return cast(string[]) m_subscriptions.keys.takeOne.array; } ());
838 					});
839 					sleep(30.msecs);
840 				}
841 			});
842 		}
843 		inTask(&impl);
844 	}
845 
846 	private bool hasNewSubscriptionIn(scope string[] args) {
847 		bool has_new;
848 		foreach (arg; args)
849 			if (!hasSubscription(arg))
850 				has_new = true;
851 		if (!has_new)
852 			return false;
853 
854 		return true;
855 	}
856 
857 	private bool anySubscribed(scope string[] args) {
858 
859 		bool any_subscribed;
860 		foreach (arg ; args) {
861 			if (hasSubscription(arg))
862 				any_subscribed = true;
863 		}
864 		return any_subscribed;
865 	}
866 
867 	/// Completes the subscription for a listener to start receiving pubsub messages
868 	/// on the corresponding channel(s). Returns instantly if already subscribed.
869 	/// If a connection error is thrown here, it stops the listener.
870 	void subscribe(scope string[] args...)
871 	{
872 		logTrace("subscribe");
873 		if (!m_listening) {
874 			foreach (arg; args)
875 				m_pendingSubscriptions ~= arg;
876 			return;
877 		}
878 
879 		if (!hasNewSubscriptionIn(args))
880 			return;
881 
882 		void impl() @safe {
883 
884 			scope(failure) { logTrace("Failure"); bstop(); }
885 			try {
886 				m_mutex.performLocked!({
887 					m_waiter = Task.getThis();
888 					scope(exit) m_waiter = Task();
889 					bool subscribed;
890 					m_connMutex.performLocked!({
891 						_request_void(m_lockedConnection, "SUBSCRIBE", args);
892 					});
893 					while(!() @trusted { return m_subscriptions.byKey.canFind(args); } ()) {
894 						if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE);  }); } ())
895 							break;
896 
897 						subscribed = true;
898 					}
899 					debug {
900 						auto keys = () @trusted { return m_subscriptions.keys; } ();
901 						logTrace("Can find keys?: %s",  keys.canFind(args));
902 						logTrace("Subscriptions: %s", keys);
903 					}
904 					enforce(subscribed, "Could not complete subscription(s).");
905 				});
906 			} catch (Exception e) {
907 				logDebug("Redis subscribe() failed: ", e.msg);
908 			}
909 		}
910 		inTask(&impl);
911 	}
912 
913 	/// Unsubscribes from the channel(s) specified, returns immediately if none
914 	/// is currently being listened.
915 	/// If a connection error is thrown here, it stops the listener.
916 	void unsubscribe(scope string[] args...)
917 	{
918 		logTrace("unsubscribe");
919 
920 		void impl() @safe {
921 
922 			if (!anySubscribed(args))
923 				return;
924 
925 			scope(failure) bstop();
926 			assert(m_listening);
927 
928 			m_mutex.performLocked!({
929 				m_waiter = Task.getThis();
930 				scope(exit) m_waiter = Task();
931 				bool unsubscribed;
932 				m_connMutex.performLocked!({
933 					_request_void(m_lockedConnection, "UNSUBSCRIBE", args);
934 				});
935 				while(() @trusted { return m_subscriptions.byKey.canFind(args); } ()) {
936 					if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE);  }); } ()) {
937 						unsubscribed = false;
938 						break;
939 					}
940 					unsubscribed = true;
941 				}
942 				debug {
943 					auto keys = () @trusted { return m_subscriptions.keys; } ();
944 					logTrace("Can find keys?: %s",  keys.canFind(args));
945 					logTrace("Subscriptions: %s", keys);
946 				}
947 				enforce(unsubscribed, "Could not complete unsubscription(s).");
948 			});
949 		}
950 		inTask(&impl);
951 	}
952 
953 	/// Same as subscribe, but uses glob patterns, and does not return instantly if
954 	/// the subscriptions are already registered.
955 	/// throws Exception if the pattern does not yield a new subscription.
956 	void psubscribe(scope string[] args...)
957 	{
958 		logTrace("psubscribe");
959 		void impl() @safe {
960 			scope(failure) bstop();
961 			assert(m_listening);
962 			m_mutex.performLocked!({
963 				m_waiter = Task.getThis();
964 				scope(exit) m_waiter = Task();
965 				bool subscribed;
966 				m_connMutex.performLocked!({
967 					_request_void(m_lockedConnection, "PSUBSCRIBE", args);
968 				});
969 
970 				if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE);  }); } ())
971 					subscribed = false;
972 				else
973 					subscribed = true;
974 
975 				debug logTrace("Subscriptions: %s", () @trusted { return m_subscriptions.keys; } ());
976 				enforce(subscribed, "Could not complete subscription(s).");
977 			});
978 		}
979 		inTask(&impl);
980 	}
981 
982 	/// Same as unsubscribe, but uses glob patterns, and does not return instantly if
983 	/// the subscriptions are not registered.
984 	/// throws Exception if the pattern does not yield a new unsubscription.
985 	void punsubscribe(scope string[] args...)
986 	{
987 		logTrace("punsubscribe");
988 		void impl() @safe {
989 			scope(failure) bstop();
990 			assert(m_listening);
991 			m_mutex.performLocked!({
992 				m_waiter = Task.getThis();
993 				scope(exit) m_waiter = Task();
994 				bool unsubscribed;
995 				m_connMutex.performLocked!({
996 					_request_void(m_lockedConnection, "PUNSUBSCRIBE", args);
997 				});
998 				if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE);  }); } ())
999 					unsubscribed = false;
1000 				else
1001 					unsubscribed = true;
1002 
1003 				debug {
1004 					auto keys = () @trusted { return m_subscriptions.keys; } ();
1005 					logTrace("Can find keys?: %s",  keys.canFind(args));
1006 					logTrace("Subscriptions: %s", keys);
1007 				}
1008 				enforce(unsubscribed, "Could not complete unsubscription(s).");
1009 			});
1010 		}
1011 		inTask(&impl);
1012 	}
1013 
1014 	private static void inTask(scope void delegate() @safe impl)
1015 	@trusted {
1016 		logTrace("inTask");
1017 		if (Task.getThis() == Task())
1018 		{
1019 			Throwable ex;
1020 			bool done;
1021 			Task task = runTask({
1022 				logDebug("inTask %s", Task.getThis());
1023 				try impl();
1024 				catch (Exception e) {
1025 					ex = e;
1026 				}
1027 				done = true;
1028 			});
1029 			task.join();
1030 			logDebug("done");
1031 			if (ex)
1032 				throw ex;
1033 		}
1034 		else
1035 			impl();
1036 	}
1037 
1038 	private void init(){
1039 
1040 		logTrace("init");
1041 		if (!m_lockedConnection) {
1042 			m_lockedConnection = m_client.m_connections.lockConnection();
1043 			m_lockedConnection.setAuth(m_client.m_authPassword);
1044 			m_lockedConnection.setDB(m_client.m_selectedDB);
1045 		}
1046 
1047 		if (!m_lockedConnection.conn || !m_lockedConnection.conn.connected) {
1048 			try m_lockedConnection.conn = connectTCP(m_lockedConnection.m_host, m_lockedConnection.m_port);
1049 			catch (Exception e) {
1050 				throw new Exception(format("Failed to connect to Redis server at %s:%s.", m_lockedConnection.m_host, m_lockedConnection.m_port), __FILE__, __LINE__, e);
1051 			}
1052 			m_lockedConnection.conn.tcpNoDelay = true;
1053 			m_lockedConnection.setAuth(m_client.m_authPassword);
1054 			m_lockedConnection.setDB(m_client.m_selectedDB);
1055 		}
1056 	}
1057 
1058 	// Same as listen, but blocking
1059 	void blisten(void delegate(string, string) @safe onMessage, Duration timeout = 0.seconds)
1060 	{
1061 		init();
1062 
1063 		void onSubscribe(string channel) @safe {
1064 			logTrace("Callback subscribe(%s)", channel);
1065 			m_subscriptions[channel] = true;
1066 			if (m_waiter != Task())
1067 				() @trusted { m_waiter.send(Action.SUBSCRIBE); } ();
1068 		}
1069 
1070 		void onUnsubscribe(string channel) @safe {
1071 			logTrace("Callback unsubscribe(%s)", channel);
1072 			m_subscriptions.remove(channel);
1073 			if (m_waiter != Task())
1074 				() @trusted { m_waiter.send(Action.UNSUBSCRIBE); } ();
1075 		}
1076 
1077 		void teardown() @safe { // teardown
1078 			logTrace("Redis listener exiting");
1079 			// More publish commands may be sent to this connection after recycling it, so we
1080 			// actively destroy it
1081 			m_lockedConnection.conn.close();
1082 			m_lockedConnection.destroy();
1083 			m_listening = false;
1084 		}
1085 		// http://redis.io/topics/pubsub
1086 		/**
1087 			 	SUBSCRIBE first second
1088 				*3
1089 				$9
1090 				subscribe
1091 				$5
1092 				first
1093 				:1
1094 				*3
1095 				$9
1096 				subscribe
1097 				$6
1098 				second
1099 				:2
1100 			*/
1101 		// This is a simple parser/handler for subscribe/unsubscribe/publish
1102 		// commands sent by redis. The PubSub client protocol is simple enough
1103 
1104 		void pubsub_handler() {
1105 			TCPConnection conn = m_lockedConnection.conn;
1106 			logTrace("Pubsub handler");
1107 			void dropCRLF() @safe {
1108 				ubyte[2] crlf;
1109 				conn.read(crlf);
1110 			}
1111 			size_t readArgs() @safe {
1112 				char[8] ucnt;
1113 				ubyte[1] num;
1114 				size_t i;
1115 				do {
1116 					conn.read(num);
1117 					if (num[0] >= 48 && num[0] <= 57)
1118 						ucnt[i] = num[0];
1119 					else break;
1120 					i++;
1121 				}
1122 				while (true); // ascii
1123 				ubyte[1] b;
1124 				conn.read(b);
1125 				logTrace("Found %s", ucnt);
1126 				// the new line is consumed when num is not in range.
1127 				return ucnt[0 .. i].to!size_t;
1128 			}
1129 			// find the number of arguments in the array
1130 			ubyte[1] symbol;
1131 			conn.read(symbol);
1132 			enforce(symbol[0] == '*', "Expected '*', got '" ~ symbol.to!string ~ "'");
1133 			size_t args = readArgs();
1134 			// get the number of characters in the first string (the command)
1135 			conn.read(symbol);
1136 			enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'");
1137 			size_t cnt = readArgs();
1138 			ubyte[] cmd = () @trusted { return theAllocator.makeArray!ubyte(cnt); } ();
1139 			scope(exit) () @trusted { theAllocator.dispose(cmd); } ();
1140 			conn.read(cmd);
1141 			dropCRLF();
1142 			// find the channel
1143 			conn.read(symbol);
1144 			enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'");
1145 			cnt = readArgs();
1146 			ubyte[] str = new ubyte[cnt];
1147 			conn.read(str);
1148 			dropCRLF();
1149 			string channel = () @trusted { return cast(string)str; } ();
1150 			logTrace("chan: %s", channel);
1151 
1152 			if (cmd == "message") { // find the message
1153 				conn.read(symbol);
1154 				enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'");
1155 				cnt = readArgs();
1156 				str = new ubyte[cnt];
1157 				conn.read(str); // channel
1158 				string message = () @trusted { return cast(string)str.idup; } ();
1159 				logTrace("msg: %s", message);
1160 				dropCRLF();
1161 				onMessage(channel, message);
1162 			}
1163 			else if (cmd == "subscribe" || cmd == "unsubscribe") { // find the remaining subscriptions
1164 				bool is_subscribe = (cmd == "subscribe");
1165 				conn.read(symbol);
1166 				enforce(symbol[0] == ':', "Expected ':', got '" ~ symbol.to!string ~ "'");
1167 				cnt = readArgs(); // number of subscriptions
1168 				logTrace("subscriptions: %d", cnt);
1169 				if (is_subscribe)
1170 					onSubscribe(channel);
1171 				else
1172 					onUnsubscribe(channel);
1173 
1174 				// todo: enforce the number of subscriptions?
1175 			}
1176 			else assert(false, "Unrecognized pubsub wire protocol command received");
1177 		}
1178 
1179 		// Waits for data and advises the handler
1180 		m_listenerHelper = runTask(() nothrow {
1181 			loop: while(true) {
1182 				if (m_stop || !m_lockedConnection.conn) break;
1183 
1184 				try {
1185 					const waitResult = m_lockedConnection.conn.waitForDataEx(100.msecs);
1186 					if (m_stop) break;
1187 
1188 					final switch (waitResult) {
1189 						case WaitForDataStatus.noMoreData:
1190 							break loop;
1191 						case WaitForDataStatus.timeout:
1192 							logTrace("No data arrival in 100 ms...");
1193 							continue loop;
1194 						case WaitForDataStatus.dataAvailable:
1195 					}
1196 
1197 					// Data has arrived, this task is in charge of notifying the main handler loop
1198 					logTrace("Notify data arrival");
1199 
1200 					() @trusted { receiveTimeout(0.seconds, (Variant v) {}); } (); // clear message queue
1201 					() @trusted { m_listener.send(Action.DATA); } ();
1202 					if (!() @trusted { return receiveTimeout(5.seconds, (Action act) { assert(act == Action.DATA); }); } ())
1203 						assert(false);
1204 				} catch (Exception e) {
1205 					logException(e, "Redis listen task failed - stopping to listen");
1206 					break;
1207 				}
1208 			}
1209 
1210 			logTrace("Listener Helper exit.");
1211 			try () @trusted { m_listener.send(Action.STOP); } ();
1212 			catch (Exception e) assert(false, e.msg);
1213 		} );
1214 
1215 		m_listening = true;
1216 		logTrace("Redis listener now listening");
1217 		if (m_waiter != Task())
1218 			() @trusted { m_waiter.send(Action.STARTED); } ();
1219 
1220 		if (timeout == 0.seconds)
1221 			timeout = 365.days; // make sure 0.seconds is considered as big.
1222 
1223 		scope(exit) {
1224 			logTrace("Redis Listener exit.");
1225 			if (!m_stop) {
1226 				stop(); // notifies the listenerHelper
1227 			}
1228 			m_listenerHelper.join();
1229 			// close the data connections
1230 			teardown();
1231 
1232 			if (m_waiter != Task())
1233 				() @trusted { m_waiter.send(Action.STOP); } ();
1234 			if (m_stopWaiter != Task())
1235 				() @trusted { m_stopWaiter.send(Action.STOP); } ();
1236 
1237 			m_listenerHelper = Task();
1238 			m_listener = Task();
1239 			m_stop = false;
1240 		}
1241 
1242 		// Start waiting for data notifications to arrive
1243 		while(true) {
1244 
1245 			auto handler = (Action act) {
1246 				if (act == Action.STOP) m_stop = true;
1247 				if (m_stop) return;
1248 				logTrace("Calling PubSub Handler");
1249 				m_connMutex.performLocked!({
1250 					pubsub_handler(); // handles one command at a time
1251 				});
1252 				() @trusted { m_listenerHelper.send(Action.DATA); } ();
1253 			};
1254 
1255 			if (!() @trusted { return receiveTimeout(timeout, handler); } () || m_stop) {
1256 				logTrace("Redis Listener stopped");
1257 				break;
1258 			}
1259 
1260 		}
1261 
1262 	}
1263 
1264 	/// Waits for messages and calls the callback with the channel and the message as arguments.
1265 	/// The timeout is passed over to the listener, which closes after the period of inactivity.
1266 	/// Use 0.seconds timeout to specify a very long time (365 days)
1267 	/// Errors will be sent to Callback Delegate on channel "Error".
1268 	Task listen(void delegate(string, string) @safe nothrow callback, Duration timeout = 0.seconds)
1269 	{
1270 		logTrace("Listen");
1271 		void impl() @safe {
1272 			logTrace("Listen");
1273 			m_waiter = Task.getThis();
1274 			scope(exit) m_waiter = Task();
1275 			Throwable ex;
1276 			m_listener = runTask(() nothrow {
1277 				try blisten(callback, timeout);
1278 				catch(Exception e) {
1279 					ex = e;
1280 					if (m_waiter != Task() && !m_listening) {
1281 						try () @trusted { m_waiter.send(Action.STARTED); } ();
1282 						catch (Exception e) assert(false, e.msg);
1283 						return;
1284 					}
1285 					callback("Error", e.msg);
1286 				}
1287 			});
1288 			m_mutex.performLocked!({
1289 				import std.datetime : usecs;
1290 				() @trusted { receiveTimeout(2.seconds, (Action act) { assert( act == Action.STARTED); }); } ();
1291 				if (ex) throw ex;
1292 				enforce(m_listening, "Failed to start listening, timeout of 2 seconds expired");
1293 			});
1294 
1295 			foreach (channel; m_pendingSubscriptions) {
1296 				subscribe(channel);
1297 			}
1298 
1299 			m_pendingSubscriptions = null;
1300 		}
1301 		inTask(&impl);
1302 		return m_listener;
1303 	}
1304 }
1305 
1306 
1307 
1308 /** Range interface to a single Redis reply.
1309 */
1310 struct RedisReply(T = ubyte[]) {
1311 	static assert(isInputRange!RedisReply);
1312 
1313 	private {
1314 		uint m_magic = 0x15f67ab3;
1315 		RedisConnection m_conn;
1316 		LockedConnection!RedisConnection m_lockedConnection;
1317 	}
1318 
1319 	alias ElementType = T;
1320 
1321 	private this(RedisConnection conn)
1322 	{
1323 		m_conn = conn;
1324 		auto ctx = &conn.m_replyContext;
1325 		assert(ctx.refCount == 0);
1326 		*ctx = RedisReplyContext.init;
1327 		ctx.refCount++;
1328 	}
1329 
1330 	this(this)
1331 	{
1332 		assert(m_magic == 0x15f67ab3);
1333 		if (m_conn) {
1334 			auto ctx = &m_conn.m_replyContext;
1335 			assert(ctx.refCount > 0);
1336 			ctx.refCount++;
1337 		}
1338 	}
1339 
1340 	~this()
1341 	{
1342 		assert(m_magic == 0x15f67ab3);
1343 		if (m_conn) {
1344 			if (!--m_conn.m_replyContext.refCount)
1345 				drop();
1346 		}
1347 	}
1348 
1349 	@property bool empty() const { return !m_conn || m_conn.m_replyContext.index >= m_conn.m_replyContext.length; }
1350 
1351 	/** Returns the current element of the reply.
1352 
1353 		Note that byte and character arrays may be returned as slices to a
1354 		temporary buffer. This buffer will be invalidated on the next call to
1355 		$(D popFront), so it needs to be duplicated for permanent storage.
1356 	*/
1357 	@property T front()
1358 	{
1359 		assert(!empty, "Accessing the front of an empty RedisReply!");
1360 		auto ctx = &m_conn.m_replyContext;
1361 		if (!ctx.hasData) readData();
1362 
1363 		ubyte[] ret = ctx.data;
1364 
1365 		return convertToType!T(ret);
1366 	}
1367 
1368 	@property bool frontIsNull()
1369 	const {
1370 		assert(!empty, "Accessing the front of an empty RedisReply!");
1371 		return m_conn.m_replyContext.frontIsNull;
1372 	}
1373 
1374 	/** Pops the current element of the reply
1375 	*/
1376 	void popFront()
1377 	{
1378 		assert(!empty, "Popping the front of an empty RedisReply!");
1379 
1380 		auto ctx = &m_conn.m_replyContext;
1381 
1382 		if (!ctx.hasData) readData(); // ensure that we actually read the data entry from the wire
1383 		clearData();
1384 		ctx.index++;
1385 
1386 		if (ctx.index >= ctx.length && ctx.refCount == 1) {
1387 			ctx.refCount = 0;
1388 			m_conn = null;
1389 			m_lockedConnection.destroy();
1390 		}
1391 	}
1392 
1393 
1394 	/// Legacy property for hasNext/next based iteration
1395 	@property bool hasNext() const { return !empty; }
1396 
1397 	/// Legacy property for hasNext/next based iteration
1398 	TN next(TN : E[], E)()
1399 	{
1400 		assert(hasNext, "end of reply");
1401 
1402 		auto ret = front.dup;
1403 		popFront();
1404 		return () @trusted { return cast(TN)ret; } ();
1405 	}
1406 
1407 	void drop()
1408 	{
1409 		if (!m_conn) return;
1410 		while (!empty) popFront();
1411 	}
1412 
1413 	private void readData()
1414 	{
1415 		auto ctx = &m_conn.m_replyContext;
1416 		assert(!ctx.hasData && ctx.initialized);
1417 
1418 		if (ctx.multi)
1419 			readBulk(() @trusted { return cast(string)m_conn.conn.readLine(); } ());
1420 	}
1421 
1422 	private void clearData()
1423 	{
1424 		auto ctx = &m_conn.m_replyContext;
1425 		ctx.data = null;
1426 		ctx.hasData = false;
1427 	}
1428 
1429 	private @property void lockedConnection(ref LockedConnection!RedisConnection conn)
1430 	{
1431 		assert(m_conn !is null);
1432 		m_lockedConnection = conn;
1433 	}
1434 
1435 	private void initialize()
1436 	{
1437 		assert(m_conn !is null);
1438 		auto ctx = &m_conn.m_replyContext;
1439 		assert(!ctx.initialized);
1440 		ctx.initialized = true;
1441 
1442 		ubyte[] ln = m_conn.conn.readLine();
1443 
1444 		switch (ln[0]) {
1445 			default:
1446 				m_conn.conn.close();
1447 				throw new Exception(format("Unknown reply type: %s", cast(char)ln[0]));
1448 			case '+': ctx.data = ln[1 .. $]; ctx.hasData = true; break;
1449 			case '-': throw new Exception(() @trusted { return cast(string)ln[1 .. $]; } ());
1450 			case ':': ctx.data = ln[1 .. $]; ctx.hasData = true; break;
1451 			case '$':
1452 				readBulk(() @trusted { return cast(string)ln; } ());
1453 				break;
1454 			case '*':
1455 				if (ln.startsWith(cast(const(ubyte)[])"*-1")) {
1456 					ctx.length = 0; // TODO: make this NIL reply distinguishable from a 0-length array
1457 				} else {
1458 					ctx.multi = true;
1459 					scope (failure) m_conn.conn.close();
1460 					ctx.length = to!long(() @trusted { return cast(string)ln[1 .. $]; } ());
1461 				}
1462 				break;
1463 		}
1464 	}
1465 
1466 	private void readBulk(string sizeLn)
1467 	{
1468 		assert(m_conn !is null);
1469 		auto ctx = &m_conn.m_replyContext;
1470 		if (sizeLn.startsWith("$-1")) {
1471 			ctx.frontIsNull = true;
1472 			ctx.hasData = true;
1473 			ctx.data = null;
1474 		} else {
1475 			auto size = to!size_t(sizeLn[1 .. $]);
1476 			auto data = new ubyte[size];
1477 			m_conn.conn.read(data);
1478 			m_conn.conn.readLine();
1479 			ctx.frontIsNull = false;
1480 			ctx.hasData = true;
1481 			ctx.data = data;
1482 		}
1483 	}
1484 }
1485 
1486 class RedisProtocolException : Exception {
1487 	this(string message, string file = __FILE__, size_t line = __LINE__, Exception next = null)
1488 	{
1489 		super(message, file, line, next);
1490 	}
1491 }
1492 
1493 template isValidRedisValueReturn(T)
1494 {
1495 	import std.typecons;
1496 	static if (isInstanceOf!(Nullable, T)) {
1497 		enum isValidRedisValueReturn = isValidRedisValueType!(typeof(T.init.get()));
1498 	} else static if (isInstanceOf!(RedisReply, T)) {
1499 		enum isValidRedisValueReturn = isValidRedisValueType!(T.ElementType);
1500 	} else enum isValidRedisValueReturn = isValidRedisValueType!T;
1501 }
1502 
1503 template isValidRedisValueType(T)
1504 {
1505 	enum isValidRedisValueType = is(T : const(char)[]) || is(T : const(ubyte)[]) || is(T == long) || is(T == double) || is(T == bool);
1506 }
1507 
1508 private RedisReply!T getReply(T = ubyte)(RedisConnection conn)
1509 {
1510 	auto repl = RedisReply!T(conn);
1511 	repl.initialize();
1512 	return repl;
1513 }
1514 
1515 private struct RedisReplyContext {
1516 	long refCount = 0;
1517 	ubyte[] data;
1518 	bool hasData;
1519 	bool multi = false;
1520 	bool initialized = false;
1521 	bool frontIsNull = false;
1522 	long length = 1;
1523 	long index = 0;
1524 	ubyte[128] dataBuffer;
1525 }
1526 
1527 private final class RedisConnection {
1528 	private {
1529 		string m_host;
1530 		ushort m_port;
1531 		TCPConnection m_conn;
1532 		string m_password;
1533 		long m_selectedDB;
1534 		RedisReplyContext m_replyContext;
1535 	}
1536 
1537 	this(string host, ushort port)
1538 	{
1539 		m_host = host;
1540 		m_port = port;
1541 	}
1542 
1543 	@property TCPConnection conn() nothrow { return m_conn; }
1544 	@property void conn(TCPConnection conn) nothrow { m_conn = conn; }
1545 
1546 	void setAuth(string password)
1547 	{
1548 		if (m_password == password) return;
1549 		_request_reply(this, "AUTH", password);
1550 		m_password = password;
1551 	}
1552 
1553 	void setDB(long index)
1554 	{
1555 		if (index == m_selectedDB) return;
1556 		_request_reply(this, "SELECT", index);
1557 		m_selectedDB = index;
1558 	}
1559 
1560 	private static long countArgs(ARGS...)(scope ARGS args)
1561 	{
1562 		long ret = 0;
1563 		foreach (i, A; ARGS) {
1564 			static if (isArray!A && !(is(A : const(ubyte[])) || is(A : const(char[])))) {
1565 				foreach (arg; args[i])
1566 					ret += countArgs(arg);
1567 			} else ret++;
1568 		}
1569 		return ret;
1570 	}
1571 
1572 	unittest {
1573 		assert(countArgs() == 0);
1574 		assert(countArgs(1, 2, 3) == 3);
1575 		assert(countArgs("1", ["2", "3", "4"]) == 4);
1576 		assert(countArgs([["1", "2"], ["3"]]) == 3);
1577 	}
1578 
1579 	private static void writeArgs(R, ARGS...)(R dst, scope ARGS args)
1580 		if (isOutputRange!(R, char))
1581 	{
1582 		foreach (i, A; ARGS) {
1583 			static if (is(A == bool)) {
1584 				writeArgs(dst, args[i] ? "1" : "0");
1585 			} else static if (is(A : long) || is(A : real) || is(A == string)) {
1586 				auto alen = formattedLength(args[i]);
1587 				enum fmt = "$%d\r\n"~typeFormatString!A~"\r\n";
1588 				dst.formattedWrite(fmt, alen, args[i]);
1589 			} else static if (is(A : const(ubyte[])) || is(A : const(char[]))) {
1590 				dst.formattedWrite("$%s\r\n", args[i].length);
1591 				dst.put(args[i]);
1592 				dst.put("\r\n");
1593 			} else static if (isArray!A) {
1594 				foreach (arg; args[i])
1595 					writeArgs(dst, arg);
1596 			} else static assert(false, "Unsupported Redis argument type: " ~ A.stringof);
1597 		}
1598 	}
1599 
1600 	unittest {
1601 		import std.array : appender;
1602 		auto dst = appender!string;
1603 		writeArgs(dst, false, true, ["2", "3"], "4", 5.0);
1604 		assert(dst.data == "$1\r\n0\r\n$1\r\n1\r\n$1\r\n2\r\n$1\r\n3\r\n$1\r\n4\r\n$1\r\n5\r\n");
1605 	}
1606 
1607 	private static long formattedLength(ARG)(scope ARG arg)
1608 	{
1609 		static if (is(ARG == string)) return arg.length;
1610 		else {
1611 			import vibe.internal.rangeutil;
1612 			long length;
1613 			auto rangeCnt = RangeCounter(() @trusted { return &length; } ());
1614 			rangeCnt.formattedWrite(typeFormatString!ARG, arg);
1615 			return length;
1616 		}
1617 	}
1618 }
1619 
1620 private void _request_void(ARGS...)(RedisConnection conn, string command, scope ARGS args)
1621 {
1622 	import vibe.stream.wrapper;
1623 
1624 	if (!conn.conn || !conn.conn.connected) {
1625 		try conn.conn = connectTCP(conn.m_host, conn.m_port);
1626 		catch (Exception e) {
1627 			throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e);
1628 		}
1629 		conn.conn.tcpNoDelay = true;
1630 	}
1631 
1632 	auto nargs = conn.countArgs(args);
1633 	auto rng = streamOutputRange(conn.conn);
1634 	formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command);
1635 	RedisConnection.writeArgs(() @trusted { return &rng; } (), args);
1636 }
1637 
1638 private RedisReply!T _request_reply(T = ubyte[], ARGS...)(RedisConnection conn, string command, scope ARGS args)
1639 {
1640 	import vibe.stream.wrapper;
1641 
1642 	if (!conn.conn || !conn.conn.connected) {
1643 		try conn.conn = connectTCP(conn.m_host, conn.m_port);
1644 		catch (Exception e) {
1645 			throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e);
1646 		}
1647 		conn.conn.tcpNoDelay = true;
1648 	}
1649 
1650 	auto nargs = conn.countArgs(args);
1651 	auto rng = streamOutputRange(conn.conn);
1652 	formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command);
1653 	RedisConnection.writeArgs(() @trusted { return &rng; } (), args);
1654 	rng.flush();
1655 
1656 	return conn.getReply!T;
1657 }
1658 
1659 private T _request(T, ARGS...)(LockedConnection!RedisConnection conn, string command, scope ARGS args)
1660 {
1661 	import std.typecons;
1662 	static if (isInstanceOf!(RedisReply, T)) {
1663 		auto reply = _request_reply!(T.ElementType)(conn, command, args);
1664 		reply.lockedConnection = conn;
1665 		return reply;
1666 	} else static if (is(T == void)) {
1667 		_request_reply(conn, command, args);
1668 	} else static if (isInstanceOf!(Nullable, T)) {
1669 		alias TB = typeof(T.init.get());
1670 		auto reply = _request_reply!TB(conn, command, args);
1671 		T ret;
1672 		if (!reply.frontIsNull) ret = reply.front;
1673 		return ret;
1674 	} else {
1675 		auto reply = _request_reply!T(conn, command, args);
1676 		return reply.front;
1677 	}
1678 }
1679 
1680 private T convertToType(T)(ubyte[] data) /// NOTE: data must be unique!
1681 {
1682 	static if (isSomeString!T) () @trusted { validate(cast(T)data); } ();
1683 
1684 	static if (is(T == ubyte[])) return data;
1685 	else static if (is(T == string)) return cast(T)data.idup;
1686 	else static if (is(T == bool)) return data[0] == '1';
1687 	else static if (is(T == int) || is(T == long) || is(T == size_t) || is(T == double)) {
1688 		auto str = () @trusted { return cast(string)data; } ();
1689 		return parse!T(str);
1690 	}
1691 	else static assert(false, "Unsupported Redis reply type: " ~ T.stringof);
1692 }
1693 
1694 private template typeFormatString(T)
1695 {
1696 	static if (isFloatingPoint!T) enum typeFormatString = "%.16g";
1697 	else enum typeFormatString = "%s";
1698 }