1 /**
2 	Redis database client implementation.
3 
4 	Copyright: © 2012-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Jan Krüger, Sönke Ludwig, Michael Eisendle, Etienne Cimon
7 */
8 module vibe.db.redis.redis;
9 
10 public import vibe.core.net;
11 
12 import vibe.core.connectionpool;
13 import vibe.core.core;
14 import vibe.core.log;
15 import vibe.inet.url;
16 import vibe.internal.allocator;
17 import vibe.internal.freelistref;
18 import vibe.stream.operations;
19 import std.conv;
20 import std.exception;
21 import std.format;
22 import std.range : isInputRange, isOutputRange;
23 import std.string;
24 import std.traits;
25 import std.typecons : Nullable;
26 import std.utf;
27 
28 @safe:
29 
30 
31 /**
32 	Returns a RedisClient that can be used to communicate to the specified database server.
33 */
34 RedisClient connectRedis(string host, ushort port = RedisClient.defaultPort)
35 {
36 	return new RedisClient(host, port);
37 }
38 
39 /**
40 	Returns a Redis database connection instance corresponding to the given URL.
41 
42 	The URL must be of the format "redis://server[:port]/dbnum".
43 
44 	Authentication:
45 		Authenticated connections are supported by using a URL connection string
46 		such as "redis://password@host".
47 
48 	Examples:
49 		---
50 		// connecting with default settings:
51 		auto redisDB = connectRedisDB("redis://127.0.0.1");
52 		---
53 
54 		---
55 		// connecting using the URL form with custom settings
56 		auto redisDB = connectRedisDB("redis://password:myremotehost/3?maxmemory=10000000");
57 		---
58 
59 	Params:
60 		url = Redis URI scheme for a Redis database instance
61 
62 	Returns:
63 		A new RedisDatabase instance that can be used to access the database.
64 
65 	See_also: $(LINK2 https://www.iana.org/assignments/uri-schemes/prov/redis, Redis URI scheme)
66 */
67 RedisDatabase connectRedisDB(URL url)
68 {
69 	auto cli = connectRedis(url.host, url.port != 0 ? url.port : RedisClient.defaultPort);
70 
71 	if (!url.queryString.empty)
72 	{
73 		import vibe.inet.webform : FormFields, parseURLEncodedForm;
74 		auto query = FormFields.init;
75 		parseURLEncodedForm(url.queryString, query);
76 		foreach (param, val; query.byKeyValue)
77 		{
78 			switch (param)
79 			{
80 				/**
81 				The password to use for the Redis AUTH command comes from either the
82 				password portion of the "userinfo" URI field or the value from the
83 				key-value pair from the "query" URI field with the key "password".
84 				If both the password portion of the "userinfo" URI field and a
85 				"query" URI field key-value pair with the key "password" are present,
86 				the semantics for what password to use for authentication are not
87 				well-defined.  Such situations therefore ought to be avoided.
88 				*/
89 				case "password":
90 					if (!url.password.empty)
91 						cli.auth(val);
92 					break;
93 				default:
94 					throw new Exception(`Redis config parameter "` ~ param ~ `" isn't supported`);
95 			}
96 		}
97 	}
98 
99 	/*
100 	Redis' current optional authentication mechanism does not employ a
101 	username, but this might change in the future
102 	*/
103 	if (!url.password.empty)
104 		cli.auth(url.password);
105 
106 	long databaseIndex;
107 	if (url.localURI.length >= 2)
108 		databaseIndex = url.pathString[1 .. $].to!long;
109 
110 	return cli.getDatabase(databaseIndex);
111 }
112 
113 /**
114 	A redis client with connection pooling.
115 */
116 final class RedisClient {
117 	private {
118 		ConnectionPool!RedisConnection m_connections;
119 		string m_authPassword;
120 		string m_version;
121 		long m_selectedDB;
122 	}
123 
124 	enum defaultPort = 6379;
125 
126 	this(string host = "127.0.0.1", ushort port = defaultPort)
127 	{
128 		m_connections = new ConnectionPool!RedisConnection({
129 			return new RedisConnection(host, port);
130 		});
131 	}
132 
133 	/** Release all connections that are not in use. Call before exiting the
134 	 * program to avoid relying on the GC to clean up the sockets.
135 	 */
136 	void releaseUnusedConnections() @safe
137 	{
138 		// the try/catch in this is for the old vibe.d:core library, not
139 		// necessary in vibe-core, but we have to work with both.
140 		m_connections.removeUnused((conn) @safe nothrow {
141 			try {
142                  conn.m_conn.close();
143 			 } catch(Exception e) {
144 				 logDebug("Failed to close unused Redis connection: %s", e.msg);
145 			 }
146 		});
147 	}
148 
149 	/// Returns Redis version
150 	@property string redisVersion()
151 	{
152 		if(m_version == "")
153 		{
154 			import std.string;
155 			auto info = info();
156 			auto lines = info.splitLines();
157 			if (lines.length > 1) {
158 				foreach (string line; lines) {
159 					auto lineParams = line.split(":");
160 					if (lineParams.length > 1 && lineParams[0] == "redis_version") {
161 						m_version = lineParams[1];
162 						break;
163 					}
164 				}
165 			}
166 		}
167 
168 		return m_version;
169 	}
170 
171 	/** Returns a handle to the given database.
172 	*/
173 	RedisDatabase getDatabase(long index) { return RedisDatabase(this, index); }
174 
175 	/** Creates a RedisSubscriber instance for launching a pubsub listener
176 	*/
177 	RedisSubscriber createSubscriber() {
178 		return RedisSubscriber(this);
179 	}
180 
181 	/*
182 		Connection
183 	*/
184 
185 	/// Authenticate to the server
186 	void auth(string password) { m_authPassword = password; }
187 	/// Echo the given string
188 	T echo(T, U)(U data) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("ECHO", data); }
189 	/// Ping the server
190 	void ping() { request("PING"); }
191 	/// Close the connection
192 	void quit() { request("QUIT"); }
193 
194 	/*
195 		Server
196 	*/
197 
198 	//TODO: BGREWRITEAOF
199 	//TODO: BGSAVE
200 
201 	/// Get the value of a configuration parameter
202 	T getConfig(T)(string parameter) if(isValidRedisValueReturn!T) { return request!T("CONFIG", "GET", parameter); }
203 	/// Set a configuration parameter to the given value
204 	void setConfig(T)(string parameter, T value) if(isValidRedisValueType!T) { request("CONFIG", "SET", parameter, value); }
205 	/// Reset the stats returned by INFO
206 	void configResetStat() { request("CONFIG", "RESETSTAT"); }
207 
208 	//TOOD: Debug Object
209 	//TODO: Debug Segfault
210 
211 	/** Deletes all keys from all databases.
212 
213 		See_also: $(LINK2 http://redis.io/commands/flushall, FLUSHALL)
214 	*/
215 	void deleteAll() { request("FLUSHALL"); }
216 
217 	/// Get information and statistics about the server
218 	string info() { return request!string("INFO"); }
219 	/// Get the UNIX time stamp of the last successful save to disk
220 	long lastSave() { return request!long("LASTSAVE"); }
221 	//TODO monitor
222 	/// Synchronously save the dataset to disk
223 	void save() { request("SAVE"); }
224 	/// Synchronously save the dataset to disk and then shut down the server
225 	void shutdown() { request("SHUTDOWN"); }
226 	/// Make the server a slave of another instance, or promote it as master
227 	void slaveOf(string host, ushort port) { request("SLAVEOF", host, port); }
228 
229 	//TODO slowlog
230 	//TODO sync
231 
232 	private T request(T = void, ARGS...)(string command, scope ARGS args)
233 	{
234 		return requestDB!(T, ARGS)(m_selectedDB, command, args);
235 	}
236 
237 	private T requestDB(T, ARGS...)(long db, string command, scope ARGS args)
238 	{
239 		auto conn = m_connections.lockConnection();
240 		conn.setAuth(m_authPassword);
241 		conn.setDB(db);
242 		version (RedisDebug) {
243 			import std.conv;
244 			string debugargs = command;
245 			foreach (i, A; ARGS) debugargs ~= ", " ~ args[i].to!string;
246 		}
247 
248 		static if (is(T == void)) {
249 			version (RedisDebug) logDebug("Redis request: %s => void", debugargs);
250 			_request!void(conn, command, args);
251 		} else static if (!isInstanceOf!(RedisReply, T)) {
252 			auto ret = _request!T(conn, command, args);
253 			version (RedisDebug) logDebug("Redis request: %s => %s", debugargs, ret.to!string);
254 			return ret;
255 		} else {
256 			auto ret = _request!T(conn, command, args);
257 			version (RedisDebug) logDebug("Redis request: %s => RedisReply", debugargs);
258 			return ret;
259 		}
260 	}
261 }
262 
263 
264 /**
265 	Accesses the contents of a Redis database
266 */
267 struct RedisDatabase {
268 	private {
269 		RedisClient m_client;
270 		long m_index;
271 	}
272 
273 	private this(RedisClient client, long index)
274 	{
275 		m_client = client;
276 		m_index = index;
277 	}
278 
279 	/** The Redis client with which the database is accessed.
280 	*/
281 	@property inout(RedisClient) client() inout { return m_client; }
282 
283 	/** Index of the database.
284 	*/
285 	@property long index() const { return m_index; }
286 
287 	/** Deletes all keys of the database.
288 
289 		See_also: $(LINK2 http://redis.io/commands/flushdb, FLUSHDB)
290 	*/
291 	void deleteAll() { request!void("FLUSHDB"); }
292 	/// Delete a key
293 	long del(scope string[] keys...) { return request!long("DEL", keys); }
294 	/// Determine if a key exists
295 	bool exists(string key) { return request!bool("EXISTS", key); }
296 	/// Set a key's time to live in seconds
297 	bool expire(string key, long seconds) { return request!bool("EXPIRE", key, seconds); }
298 	/// Set a key's time to live with D notation. E.g. $(D 5.minutes) for 60 * 5 seconds.
299 	bool expire(string key, Duration timeout) { return request!bool("PEXPIRE", key, timeout.total!"msecs"); }
300 	/// Set the expiration for a key as a UNIX timestamp
301 	bool expireAt(string key, long timestamp) { return request!bool("EXPIREAT", key, timestamp); }
302 	/// Find all keys matching the given glob-style pattern (Supported wildcards: *, ?, [ABC])
303 	RedisReply!T keys(T = string)(string pattern) if(isValidRedisValueType!T) { return request!(RedisReply!T)("KEYS", pattern); }
304 	/// Move a key to another database
305 	bool move(string key, long db) { return request!bool("MOVE", key, db); }
306 	/// Remove the expiration from a key
307 	bool persist(string key) { return request!bool("PERSIST", key); }
308 	//TODO: object
309 	/// Return a random key from the keyspace
310 	string randomKey() { return request!string("RANDOMKEY"); }
311 	/// Rename a key
312 	void rename(string key, string newkey) { request("RENAME", key, newkey); }
313 	/// Rename a key, only if the new key does not exist
314 	bool renameNX(string key, string newkey) { return request!bool("RENAMENX", key, newkey); }
315 	//TODO sort
316 	/// Get the time to live for a key
317 	long ttl(string key) { return request!long("TTL", key); }
318 	/// Get the time to live for a key in milliseconds
319 	long pttl(string key) { return request!long("PTTL", key); }
320 	/// Determine the type stored at key (string, list, set, zset and hash.)
321 	string type(string key) { return request!string("TYPE", key); }
322 
323 	/*
324 		String Commands
325 	*/
326 
327 	/// Append a value to a key
328 	long append(T)(string key, T suffix) if(isValidRedisValueType!T) { return request!long("APPEND", key, suffix); }
329 	/// Decrement the integer value of a key by one
330 	long decr(string key, long value = 1) { return value == 1 ? request!long("DECR", key) : request!long("DECRBY", key, value); }
331 	/// Get the value of a key
332 	T get(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("GET", key); }
333 	/// Returns the bit value at offset in the string value stored at key
334 	bool getBit(string key, long offset) { return request!bool("GETBIT", key, offset); }
335 	/// Get a substring of the string stored at a key
336 	T getRange(T = string)(string key, long start, long end) if(isValidRedisValueReturn!T) { return request!T("GETRANGE", key, start, end); }
337 	/// Set the string value of a key and return its old value
338 	T getSet(T = string, U)(string key, U value) if(isValidRedisValueReturn!T && isValidRedisValueType!U) { return request!T("GETSET", key, value); }
339 	/// Increment the integer value of a key
340 	long incr(string key, long value = 1) { return value == 1 ? request!long("INCR", key) : request!long("INCRBY", key, value); }
341 	/// Increment the real number value of a key
342 	long incr(string key, double value) { return request!long("INCRBYFLOAT", key, value); }
343 	/// Get the values of all the given keys
344 	RedisReply!T mget(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("MGET", keys); }
345 
346 	/// Set multiple keys to multiple values
347 	void mset(ARGS...)(ARGS args)
348 	{
349 		static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value");
350 		foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings.");
351 		request("MSET", args);
352 	}
353 
354 	/// Set multiple keys to multiple values, only if none of the keys exist
355 	bool msetNX(ARGS...)(ARGS args) {
356 		static assert(ARGS.length % 2 == 0 && ARGS.length >= 2, "Arguments to mset must be pairs of key/value");
357 		foreach (i, T; ARGS ) static assert(i % 2 != 0 || is(T == string), "Keys must be strings.");
358 		return request!bool("MSETEX", args);
359 	}
360 
361 	/// Set the string value of a key
362 	void set(T)(string key, T value) if(isValidRedisValueType!T) { request("SET", key, value); }
363 	/// Set the value of a key, only if the key does not exist
364 	bool setNX(T)(string key, T value) if(isValidRedisValueType!T) { return request!bool("SETNX", key, value); }
365 	/// Set the value of a key, only if the key already exists
366 	bool setXX(T)(string key, T value) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "XX"); }
367 	/// Set the value of a key, only if the key does not exist, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes.
368 	bool setNX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "NX"); }
369 	/// Set the value of a key, only if the key already exists, and also set the specified expire time using D notation, e.g. $(D 5.minutes) for 5 minutes.
370 	bool setXX(T)(string key, T value, Duration expire_time) if(isValidRedisValueType!T) { return "OK" == request!string("SET", key, value, "PX", expire_time.total!"msecs", "XX"); }
371 	/// Sets or clears the bit at offset in the string value stored at key
372 	bool setBit(string key, long offset, bool value) { return request!bool("SETBIT", key, offset, value ? "1" : "0"); }
373 	/// Set the value and expiration of a key
374 	void setEX(T)(string key, long seconds, T value) if(isValidRedisValueType!T) { request("SETEX", key, seconds, value); }
375 	/// Overwrite part of a string at key starting at the specified offset
376 	long setRange(T)(string key, long offset, T value) if(isValidRedisValueType!T) { return request!long("SETRANGE", key, offset, value); }
377 	/// Get the length of the value stored in a key
378 	long strlen(string key) { return request!long("STRLEN", key); }
379 
380 	/*
381 		Hashes
382 	*/
383 	/// Delete one or more hash fields
384 	long hdel(string key, scope string[] fields...) { return request!long("HDEL", key, fields); }
385 	/// Determine if a hash field exists
386 	bool hexists(string key, string field) { return request!bool("HEXISTS", key, field); }
387 	/// Set multiple hash fields to multiple values
388 	void hset(T)(string key, string field, T value) if(isValidRedisValueType!T) { request("HSET", key, field, value); }
389 	/// Set the value of a hash field, only if the field does not exist
390 	bool hsetNX(T)(string key, string field, T value) if(isValidRedisValueType!T) { return request!bool("HSETNX", key, field, value); }
391 	/// Get the value of a hash field.
392 	T hget(T = string)(string key, string field) if(isValidRedisValueReturn!T) { return request!T("HGET", key, field); }
393 	/// Get all the fields and values in a hash
394 	RedisReply!T hgetAll(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HGETALL", key); }
395 	/// Increment the integer value of a hash field
396 	long hincr(string key, string field, long value=1) { return request!long("HINCRBY", key, field, value); }
397 	/// Increment the real number value of a hash field
398 	long hincr(string key, string field, double value) { return request!long("HINCRBYFLOAT", key, field, value); }
399 	/// Get all the fields in a hash
400 	RedisReply!T hkeys(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HKEYS", key); }
401 	/// Get the number of fields in a hash
402 	long hlen(string key) { return request!long("HLEN", key); }
403 	/// Get the values of all the given hash fields
404 	RedisReply!T hmget(T = string)(string key, scope string[] fields...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HMGET", key, fields); }
405 	/// Set multiple hash fields to multiple values
406 	void hmset(ARGS...)(string key, ARGS args) { request("HMSET", key, args); }
407 
408 	/// Get all the values in a hash
409 	RedisReply!T hvals(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("HVALS", key); }
410 
411 	/*
412 		Lists
413 	*/
414 	/// Get an element from a list by its index
415 	T lindex(T = string)(string key, long index) if(isValidRedisValueReturn!T) { return request!T("LINDEX", key, index); }
416 	/// Insert value in the list stored at key before the reference value pivot.
417 	long linsertBefore(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "BEFORE", pivot, value); }
418 	/// Insert value in the list stored at key after the reference value pivot.
419 	long linsertAfter(T1, T2)(string key, T1 pivot, T2 value) if(isValidRedisValueType!T1 && isValidRedisValueType!T2) { return request!long("LINSERT", key, "AFTER", pivot, value); }
420 	/// Returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned.
421 	long llen(string key) { return request!long("LLEN", key); }
422 	/// Insert all the specified values at the head of the list stored at key.
423 	long lpush(ARGS...)(string key, ARGS args) { return request!long("LPUSH", key, args); }
424 	/// Inserts value at the head of the list stored at key, only if key already exists and holds a list.
425 	long lpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("LPUSHX", key, value); }
426 	/// Insert all the specified values at the tail of the list stored at key.
427 	long rpush(ARGS...)(string key, ARGS args) { return request!long("RPUSH", key, args); }
428 	/// Inserts value at the tail of the list stored at key, only if key already exists and holds a list.
429 	long rpushX(T)(string key, T value) if(isValidRedisValueType!T) { return request!long("RPUSHX", key, value); }
430 	/// Returns the specified elements of the list stored at key.
431 	RedisReply!T lrange(T = string)(string key, long start, long stop) { return request!(RedisReply!T)("LRANGE",  key, start, stop); }
432 	/// Removes the first count occurrences of elements equal to value from the list stored at key.
433 	long lrem(T)(string key, long count, T value) if(isValidRedisValueType!T) { return request!long("LREM", key, count, value); }
434 	/// Sets the list element at index to value.
435 	void lset(T)(string key, long index, T value) if(isValidRedisValueType!T) { request("LSET", key, index, value); }
436 	/// Trim an existing list so that it will contain only the specified range of elements specified.
437 	/// Equivalent to $(D range = range[start .. stop+1])
438 	void ltrim(string key, long start, long stop) { request("LTRIM",  key, start, stop); }
439 	/// Removes and returns the last element of the list stored at key.
440 	T rpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("RPOP", key); }
441 	/// Removes and returns the first element of the list stored at key.
442 	T lpop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("LPOP", key); }
443 	/// BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks
444 	/// the connection when there are no elements to pop from any of the given lists.
445 	Nullable!(Tuple!(string, T)) blpop(T = string)(string key, long seconds) if(isValidRedisValueReturn!T)
446 	{
447 		auto reply = request!(RedisReply!(ubyte[]))("BLPOP", key, seconds);
448 		Nullable!(Tuple!(string, T)) ret;
449 		if (reply.empty || reply.frontIsNull) return ret;
450 		string rkey = reply.front.convertToType!string();
451 		reply.popFront();
452 		ret = tuple(rkey, reply.front.convertToType!T());
453 		return ret;
454 	}
455 	/// Atomically returns and removes the last element (tail) of the list stored at source,
456 	/// and pushes the element at the first element (head) of the list stored at destination.
457 	T rpoplpush(T = string)(string key, string destination) if(isValidRedisValueReturn!T) { return request!T("RPOPLPUSH", key, destination); }
458 
459 	/*
460 		Sets
461 	*/
462 	/// Add the specified members to the set stored at key. Specified members that are already a member of this set are ignored.
463 	/// If key does not exist, a new set is created before adding the specified members.
464 	long sadd(ARGS...)(string key, ARGS args) { return request!long("SADD", key, args); }
465 	/// Returns the set cardinality (number of elements) of the set stored at key.
466 	long scard(string key) { return request!long("SCARD", key); }
467 	/// Returns the members of the set resulting from the difference between the first set and all the successive sets.
468 	RedisReply!T sdiff(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SDIFF", keys); }
469 	/// This command is equal to SDIFF, but instead of returning the resulting set, it is stored in destination.
470 	/// If destination already exists, it is overwritten.
471 	long sdiffStore(string destination, scope string[] keys...) { return request!long("SDIFFSTORE", destination, keys); }
472 	/// Returns the members of the set resulting from the intersection of all the given sets.
473 	RedisReply!T sinter(T = string)(string[] keys) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SINTER", keys); }
474 	/// This command is equal to SINTER, but instead of returning the resulting set, it is stored in destination.
475 	/// If destination already exists, it is overwritten.
476 	long sinterStore(string destination, scope string[] keys...) { return request!long("SINTERSTORE", destination, keys); }
477 	/// Returns if member is a member of the set stored at key.
478 	bool sisMember(T)(string key, T member) if(isValidRedisValueType!T) { return request!bool("SISMEMBER", key, member); }
479 	/// Returns all the members of the set value stored at key.
480 	RedisReply!T smembers(T = string)(string key) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SMEMBERS", key); }
481 	/// Move member from the set at source to the set at destination. This operation is atomic.
482 	/// In every given moment the element will appear to be a member of source or destination for other clients.
483 	bool smove(T)(string source, string destination, T member) if(isValidRedisValueType!T) { return request!bool("SMOVE", source, destination, member); }
484 	/// Removes and returns a random element from the set value stored at key.
485 	T spop(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SPOP", key ); }
486 	/// Returns a random element from the set stored at key.
487 	T srandMember(T = string)(string key) if(isValidRedisValueReturn!T) { return request!T("SRANDMEMBER", key ); }
488 	///returns count random elements from the set stored at key
489 	RedisReply!T srandMember(T = string)(string key, long count) if(isValidRedisValueReturn!T) { return request!(RedisReply!T)("SRANDMEMBER", key, count ); }
490 
491 
492 	/// Remove the specified members from the set stored at key.
493 	long srem(ARGS...)(string key, ARGS args) { return request!long("SREM", key, args); }
494 	/// Returns the members of the set resulting from the union of all the given sets.
495 	RedisReply!T sunion(T = string)(scope string[] keys...) if(isValidRedisValueType!T) { return request!(RedisReply!T)("SUNION", keys); }
496 	/// This command is equal to SUNION, but instead of returning the resulting set, it is stored in destination.
497 	long sunionStore(scope string[] keys...) { return request!long("SUNIONSTORE", keys); }
498 
499 	/*
500 		Sorted Sets
501 	*/
502 	/// Add one or more members to a sorted set, or update its score if it already exists
503 	long zadd(ARGS...)(string key, ARGS args) { return request!long("ZADD", key, args); }
504 	/// Returns the sorted set cardinality (number of elements) of the sorted set stored at key.
505 	long zcard(string key) { return request!long("ZCARD", key); }
506 	/// Returns the number of elements in the sorted set at key with a score between min and max
507 	long zcount(string RNG = "[]")(string key, double min, double max) { return request!long("ZCOUNT", key, getMinMaxArgs!RNG(min, max)); }
508 	/// Increments the score of member in the sorted set stored at key by increment.
509 	double zincrby(T)(string key, double value, T member) if (isValidRedisValueType!T) { return request!double("ZINCRBY", key, value, member); }
510 	//TODO: zinterstore
511 	/// Returns the specified range of elements in the sorted set stored at key.
512 	RedisReply!T zrange(T = string)(string key, long start, long end, bool with_scores = false)
513 		if(isValidRedisValueType!T)
514 	{
515 		if (with_scores) return request!(RedisReply!T)("ZRANGE", key, start, end, "WITHSCORES");
516 		else return request!(RedisReply!T)("ZRANGE", key, start, end);
517 	}
518 
519 	long zlexCount(string key, string min = "-", string max = "+") { return request!long("ZLEXCOUNT", key, min, max); }
520 
521 	/// When all the elements in a sorted set are inserted with the same score, in order to force lexicographical ordering,
522 	/// this command returns all the elements in the sorted set at key with a value between min and max.
523 	RedisReply!T zrangeByLex(T = string)(string key, string min = "-", string max = "+", long offset = 0, long count = -1)
524 		if(isValidRedisValueType!T)
525 	{
526 		if (offset > 0 || count != -1) return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max, "LIMIT", offset, count);
527 		else return request!(RedisReply!T)("ZRANGEBYLEX", key, min, max);
528 	}
529 
530 	/// Returns all the elements in the sorted set at key with a score between start and end inclusively
531 	RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, bool with_scores = false)
532 		if(isValidRedisValueType!T)
533 	{
534 		if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES");
535 		else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end));
536 	}
537 
538 	/// Computes an internal list of elements in the sorted set at key with a score between start and end inclusively,
539 	/// and returns a range subselection similar to $(D results[offset .. offset+count])
540 	RedisReply!T zrangeByScore(T = string, string RNG = "[]")(string key, double start, double end, long offset, long count, bool with_scores = false)
541 		if(isValidRedisValueType!T)
542 	{
543 		assert(offset >= 0);
544 		assert(count >= 0);
545 		if (with_scores) return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "WITHSCORES", "LIMIT", offset, count);
546 		else return request!(RedisReply!T)("ZRANGEBYSCORE", key, getMinMaxArgs!RNG(start, end), "LIMIT", offset, count);
547 	}
548 
549 	/// Returns the rank of member in the sorted set stored at key, with the scores ordered from low to high.
550 	long zrank(T)(string key, T member)
551 		if (isValidRedisValueType!T)
552 	{
553 		auto str = request!string("ZRANK", key, member);
554 		return str != "" ? parse!long(str) : -1;
555 	}
556 
557 	/// Removes the specified members from the sorted set stored at key.
558 	long zrem(ARGS...)(string key, ARGS members) { return request!long("ZREM", key, members); }
559 	/// Removes all elements in the sorted set stored at key with rank between start and stop.
560 	long zremRangeByRank(string key, long start, long stop) { return request!long("ZREMRANGEBYRANK", key, start, stop); }
561 	/// Removes all elements in the sorted set stored at key with a score between min and max (inclusive).
562 	long zremRangeByScore(string RNG = "[]")(string key, double min, double max) { return request!long("ZREMRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max));}
563 	/// Returns the specified range of elements in the sorted set stored at key.
564 	RedisReply!T zrevRange(T = string)(string key, long start, long end, bool with_scores = false)
565 		if(isValidRedisValueType!T)
566 	{
567 		if (with_scores) return request!(RedisReply!T)("ZREVRANGE", key, start, end, "WITHSCORES");
568 		else return request!(RedisReply!T)("ZREVRANGE", key, start, end);
569 	}
570 
571 	/// Returns all the elements in the sorted set at key with a score between max and min (including elements with score equal to max or min).
572 	RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, bool with_scores=false)
573 		if(isValidRedisValueType!T)
574 	{
575 		if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES");
576 		else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max));
577 	}
578 
579 	/// Computes an internal list of elements in the sorted set at key with a score between max and min, and
580 	/// returns a window of elements selected in a way equivalent to $(D results[offset .. offset + count])
581 	RedisReply!T zrevRangeByScore(T = string, string RNG = "[]")(string key, double min, double max, long offset, long count, bool with_scores=false)
582 		if(isValidRedisValueType!T)
583 	{
584 		assert(offset >= 0);
585 		assert(count >= 0);
586 		if (with_scores) return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "WITHSCORES", "LIMIT", offset, count);
587 		else return request!(RedisReply!T)("ZREVRANGEBYSCORE", key, getMinMaxArgs!RNG(min, max), "LIMIT", offset, count);
588 	}
589 
590 	/// Returns the rank of member in the sorted set stored at key, with the scores ordered from high to low.
591 	long zrevRank(T)(string key, T member)
592 		if (isValidRedisValueType!T)
593 	{
594 		auto str = request!string("ZREVRANK", key, member);
595 		return str != "" ? parse!long(str) : -1;
596 	}
597 
598 	/// Returns the score of member in the sorted set at key.
599 	RedisReply!T zscore(T = string, U)(string key, U member)
600 		if(isValidRedisValueType!T && isValidRedisValueType!U)
601 	{
602 		return request!(RedisReply!T)("ZSCORE", key, member);
603 	}
604 
605 	/*
606 		Hyperloglog
607 	*/
608 
609 	/// Adds one or more Keys to a HyperLogLog data structure .
610 	long pfadd(ARGS...)(string key, ARGS args) { return request!long("PFADD", key, args); }
611 
612 	/** Returns the approximated cardinality computed by the HyperLogLog data
613 		structure stored at the specified key.
614 
615 		When called with a single key, returns the approximated cardinality
616 		computed by the HyperLogLog data structure stored at the specified
617 		variable, which is 0 if the variable does not exist.
618 
619 		When called with multiple keys, returns the approximated cardinality
620 		of the union of the HyperLogLogs passed, by internally merging the
621 		HyperLogLogs stored at the provided keys into a temporary HyperLogLog.
622 	*/
623 	long pfcount(scope string[] keys...) { return request!long("PFCOUNT", keys); }
624 
625 	/// Merge multiple HyperLogLog values into a new one.
626 	void pfmerge(ARGS...)(string destkey, ARGS args) { request("PFMERGE", destkey, args); }
627 
628 
629 	//TODO: zunionstore
630 
631 	/*
632 		Pub / Sub
633 	*/
634 
635 	/// Publishes a message to all clients subscribed at the channel
636 	long publish(string channel, string message)
637 	{
638 		auto str = request!string("PUBLISH", channel, message);
639 		return str != "" ? parse!long(str) : -1;
640 	}
641 
642 	/// Inspect the state of the Pub/Sub subsystem
643 	RedisReply!T pubsub(T = string)(string subcommand, scope string[] args...)
644 		if(isValidRedisValueType!T)
645 	{
646 		return request!(RedisReply!T)("PUBSUB", subcommand, args);
647 	}
648 
649 	/*
650 		TODO: Transactions
651 	*/
652 	/// Return the number of keys in the selected database
653 	long dbSize() { return request!long("DBSIZE"); }
654 
655 	/*
656 		LUA Scripts
657 	*/
658 	/// Execute a Lua script server side
659 	RedisReply!T eval(T = string, ARGS...)(string lua_code, scope string[] keys, scope ARGS args)
660 		if(isValidRedisValueType!T)
661 	{
662 		return request!(RedisReply!T)("EVAL", lua_code, keys.length, keys, args);
663 	}
664 	/// Evaluates a script cached on the server side by its SHA1 digest. Scripts are cached on the server side using the scriptLoad function.
665 	RedisReply!T evalSHA(T = string, ARGS...)(string sha, scope string[] keys, scope ARGS args)
666 		if(isValidRedisValueType!T)
667 	{
668 		return request!(RedisReply!T)("EVALSHA", sha, keys.length, keys, args);
669 	}
670 
671 	//scriptExists
672 	//scriptFlush
673 	//scriptKill
674 
675 	/// Load a script into the scripts cache, without executing it. Run it using evalSHA.
676 	string scriptLoad(string lua_code) { return request!string("SCRIPT", "LOAD", lua_code); }
677 
678 	/// Run the specified command and arguments in the Redis database server
679 	T request(T = void, ARGS...)(string command, scope ARGS args)
680 	{
681 		return m_client.requestDB!(T, ARGS)(m_index, command, args);
682 	}
683 
684 	private static string[2] getMinMaxArgs(string RNG)(double min, double max)
685 	{
686 		// TODO: avoid GC allocations
687 		static assert(RNG.length == 2, "The RNG range specification must be two characters long");
688 
689 		string[2] ret;
690 		string mins, maxs;
691 		mins = min == -double.infinity ? "-inf" : min == double.infinity ? "+inf" : format(typeFormatString!double, min);
692 		maxs = max == -double.infinity ? "-inf" : max == double.infinity ? "+inf" : format(typeFormatString!double, max);
693 
694 		static if (RNG[0] == '[') ret[0] = mins;
695 		else static if (RNG[0] == '(') ret[0] = '('~mins;
696 		else static assert(false, "Opening range specification mist be either '[' or '('.");
697 
698 		static if (RNG[1] == ']') ret[1] = maxs;
699 		else static if (RNG[1] == ')') ret[1] = '('~maxs;
700 		else static assert(false, "Closing range specification mist be either ']' or ')'.");
701 
702 		return ret;
703 	}
704 }
705 
706 
707 /**
708 	A redis subscription listener
709 */
710 import std.datetime;
711 import std.variant;
712 import std.typecons : Tuple, tuple;
713 import std.container : Array;
714 import std.algorithm : canFind;
715 import std.range : takeOne;
716 import std.array : array;
717 
718 import vibe.core.concurrency;
719 import vibe.core.sync;
720 
721 alias RedisSubscriber = FreeListRef!RedisSubscriberImpl;
722 
723 final class RedisSubscriberImpl {
724 	private {
725 		RedisClient m_client;
726 		LockedConnection!RedisConnection m_lockedConnection;
727 		bool[string] m_subscriptions;
728 		string[] m_pendingSubscriptions;
729 		bool m_listening;
730 		bool m_stop;
731 		Task m_listener;
732 		Task m_listenerHelper;
733 		Task m_waiter;
734 		Task m_stopWaiter;
735 		InterruptibleRecursiveTaskMutex m_mutex;
736 		InterruptibleTaskMutex m_connMutex;
737 	}
738 
739 	private enum Action {
740 		DATA,
741 		STOP,
742 		STARTED,
743 		SUBSCRIBE,
744 		UNSUBSCRIBE
745 	}
746 
747 	@property bool isListening() const {
748 		return m_listening;
749 	}
750 
751 	/// Get a list of channels with active subscriptions
752 	@property string[] subscriptions() const {
753 		return () @trusted { return m_subscriptions.keys; } ();
754 	}
755 
756 	bool hasSubscription(string channel) const {
757 		return (channel in m_subscriptions) !is null && m_subscriptions[channel];
758 	}
759 
760 	this(RedisClient client) {
761 
762 		logTrace("this()");
763 		m_client = client;
764 		m_mutex = new InterruptibleRecursiveTaskMutex;
765 		m_connMutex = new InterruptibleTaskMutex;
766 	}
767 
768 	// FIXME: instead of waiting here, the class must be reference counted
769 	// and destructions needs to be defered until everything is stopped
770 	~this() {
771 		logTrace("~this");
772 		waitForStop();
773 	}
774 
775 	// Task will block until the listener is finished
776 	private void waitForStop()
777 	{
778 		logTrace("waitForStop");
779 		if (!m_listening) return;
780 
781 		void impl() @safe {
782 			m_mutex.performLocked!({
783 				m_stopWaiter = Task.getThis();
784 			});
785 			if (!m_listening) return; // verify again in case the mutex was locked by bstop
786 			scope(exit) {
787 				m_mutex.performLocked!({
788 					m_stopWaiter = Task();
789 				});
790 			}
791 			bool stopped;
792 			do {
793 				() @trusted { receive((Action act) { if (act == Action.STOP) stopped = true;  }); } ();
794 			} while (!stopped);
795 
796 			enforce(stopped, "Failed to wait for Redis listener to stop");
797 		}
798 		inTask(&impl);
799 	}
800 
801 	/// Stop listening and yield until the operation is complete.
802 	void bstop(){
803 		logTrace("bstop");
804 		if (!m_listening) return;
805 
806 		void impl() @safe {
807 			m_mutex.performLocked!({
808 				m_waiter = Task.getThis();
809 				scope(exit) m_waiter = Task();
810 				stop();
811 
812 				bool stopped;
813 				do {
814 					if (!() @trusted { return receiveTimeout(3.seconds, (Action act) { if (act == Action.STOP) stopped = true;  }); } ())
815 						break;
816 				} while (!stopped);
817 
818 				enforce(stopped, "Failed to wait for Redis listener to stop");
819 			});
820 		}
821 		inTask(&impl);
822 	}
823 
824 	/// Stop listening asynchroneously
825 	void stop(){
826 		logTrace("stop");
827 		if (!m_listening)
828 			return;
829 
830 		void impl() @safe {
831 			m_mutex.performLocked!({
832 				m_stop = true;
833 				() @trusted { m_listener.send(Action.STOP); } ();
834 				// send a message to wake up the listenerHelper from the reply
835 				if (m_subscriptions.length > 0) {
836 					m_connMutex.performLocked!(() {
837 						_request_void(m_lockedConnection, "UNSUBSCRIBE", () @trusted { return cast(string[]) m_subscriptions.keys.takeOne.array; } ());
838 					});
839 					sleep(30.msecs);
840 				}
841 			});
842 		}
843 		inTask(&impl);
844 	}
845 
846 	private bool hasNewSubscriptionIn(scope string[] args) {
847 		bool has_new;
848 		foreach (arg; args)
849 			if (!hasSubscription(arg))
850 				has_new = true;
851 		if (!has_new)
852 			return false;
853 
854 		return true;
855 	}
856 
857 	private bool anySubscribed(scope string[] args) {
858 
859 		bool any_subscribed;
860 		foreach (arg ; args) {
861 			if (hasSubscription(arg))
862 				any_subscribed = true;
863 		}
864 		return any_subscribed;
865 	}
866 
867 	/// Completes the subscription for a listener to start receiving pubsub messages
868 	/// on the corresponding channel(s). Returns instantly if already subscribed.
869 	/// If a connection error is thrown here, it stops the listener.
870 	void subscribe(scope string[] args...)
871 	{
872 		logTrace("subscribe");
873 		if (!m_listening) {
874 			foreach (arg; args)
875 				m_pendingSubscriptions ~= arg;
876 			return;
877 		}
878 
879 		if (!hasNewSubscriptionIn(args))
880 			return;
881 
882 		void impl() @safe {
883 
884 			scope(failure) { logTrace("Failure"); bstop(); }
885 			try {
886 				m_mutex.performLocked!({
887 					m_waiter = Task.getThis();
888 					scope(exit) m_waiter = Task();
889 					bool subscribed;
890 					m_connMutex.performLocked!({
891 						_request_void(m_lockedConnection, "SUBSCRIBE", args);
892 					});
893 					while(!() @trusted { return m_subscriptions.byKey.canFind(args); } ()) {
894 						if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE);  }); } ())
895 							break;
896 
897 						subscribed = true;
898 					}
899 					debug {
900 						auto keys = () @trusted { return m_subscriptions.keys; } ();
901 						logTrace("Can find keys?: %s",  keys.canFind(args));
902 						logTrace("Subscriptions: %s", keys);
903 					}
904 					enforce(subscribed, "Could not complete subscription(s).");
905 				});
906 			} catch (Exception e) {
907 				logDebug("Redis subscribe() failed: ", e.msg);
908 			}
909 		}
910 		inTask(&impl);
911 	}
912 
913 	/// Unsubscribes from the channel(s) specified, returns immediately if none
914 	/// is currently being listened.
915 	/// If a connection error is thrown here, it stops the listener.
916 	void unsubscribe(scope string[] args...)
917 	{
918 		logTrace("unsubscribe");
919 
920 		void impl() @safe {
921 
922 			if (!anySubscribed(args))
923 				return;
924 
925 			scope(failure) bstop();
926 			assert(m_listening);
927 
928 			m_mutex.performLocked!({
929 				m_waiter = Task.getThis();
930 				scope(exit) m_waiter = Task();
931 				bool unsubscribed;
932 				m_connMutex.performLocked!({
933 					_request_void(m_lockedConnection, "UNSUBSCRIBE", args);
934 				});
935 				while(() @trusted { return m_subscriptions.byKey.canFind(args); } ()) {
936 					if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE);  }); } ()) {
937 						unsubscribed = false;
938 						break;
939 					}
940 					unsubscribed = true;
941 				}
942 				debug {
943 					auto keys = () @trusted { return m_subscriptions.keys; } ();
944 					logTrace("Can find keys?: %s",  keys.canFind(args));
945 					logTrace("Subscriptions: %s", keys);
946 				}
947 				enforce(unsubscribed, "Could not complete unsubscription(s).");
948 			});
949 		}
950 		inTask(&impl);
951 	}
952 
953 	/// Same as subscribe, but uses glob patterns, and does not return instantly if
954 	/// the subscriptions are already registered.
955 	/// throws Exception if the pattern does not yield a new subscription.
956 	void psubscribe(scope string[] args...)
957 	{
958 		logTrace("psubscribe");
959 		void impl() @safe {
960 			scope(failure) bstop();
961 			assert(m_listening);
962 			m_mutex.performLocked!({
963 				m_waiter = Task.getThis();
964 				scope(exit) m_waiter = Task();
965 				bool subscribed;
966 				m_connMutex.performLocked!({
967 					_request_void(m_lockedConnection, "PSUBSCRIBE", args);
968 				});
969 
970 				if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.SUBSCRIBE);  }); } ())
971 					subscribed = false;
972 				else
973 					subscribed = true;
974 
975 				debug logTrace("Subscriptions: %s", () @trusted { return m_subscriptions.keys; } ());
976 				enforce(subscribed, "Could not complete subscription(s).");
977 			});
978 		}
979 		inTask(&impl);
980 	}
981 
982 	/// Same as unsubscribe, but uses glob patterns, and does not return instantly if
983 	/// the subscriptions are not registered.
984 	/// throws Exception if the pattern does not yield a new unsubscription.
985 	void punsubscribe(scope string[] args...)
986 	{
987 		logTrace("punsubscribe");
988 		void impl() @safe {
989 			scope(failure) bstop();
990 			assert(m_listening);
991 			m_mutex.performLocked!({
992 				m_waiter = Task.getThis();
993 				scope(exit) m_waiter = Task();
994 				bool unsubscribed;
995 				m_connMutex.performLocked!({
996 					_request_void(m_lockedConnection, "PUNSUBSCRIBE", args);
997 				});
998 				if (!() @trusted { return receiveTimeout(2.seconds, (Action act) { enforce(act == Action.UNSUBSCRIBE);  }); } ())
999 					unsubscribed = false;
1000 				else
1001 					unsubscribed = true;
1002 
1003 				debug {
1004 					auto keys = () @trusted { return m_subscriptions.keys; } ();
1005 					logTrace("Can find keys?: %s",  keys.canFind(args));
1006 					logTrace("Subscriptions: %s", keys);
1007 				}
1008 				enforce(unsubscribed, "Could not complete unsubscription(s).");
1009 			});
1010 		}
1011 		inTask(&impl);
1012 	}
1013 
1014 	private void inTask(scope void delegate() @safe impl) {
1015 		logTrace("inTask");
1016 		if (Task.getThis() == Task())
1017 		{
1018 			Throwable ex;
1019 			bool done;
1020 			Task task = runTask({
1021 				logDebug("inTask %s", Task.getThis());
1022 				try impl();
1023 				catch (Exception e) {
1024 					ex = e;
1025 				}
1026 				done = true;
1027 			});
1028 			task.join();
1029 			logDebug("done");
1030 			if (ex)
1031 				throw ex;
1032 		}
1033 		else
1034 			impl();
1035 	}
1036 
1037 	private void init(){
1038 
1039 		logTrace("init");
1040 		if (!m_lockedConnection) {
1041 			m_lockedConnection = m_client.m_connections.lockConnection();
1042 			m_lockedConnection.setAuth(m_client.m_authPassword);
1043 			m_lockedConnection.setDB(m_client.m_selectedDB);
1044 		}
1045 
1046 		if (!m_lockedConnection.conn || !m_lockedConnection.conn.connected) {
1047 			try m_lockedConnection.conn = connectTCP(m_lockedConnection.m_host, m_lockedConnection.m_port);
1048 			catch (Exception e) {
1049 				throw new Exception(format("Failed to connect to Redis server at %s:%s.", m_lockedConnection.m_host, m_lockedConnection.m_port), __FILE__, __LINE__, e);
1050 			}
1051 			m_lockedConnection.conn.tcpNoDelay = true;
1052 			m_lockedConnection.setAuth(m_client.m_authPassword);
1053 			m_lockedConnection.setDB(m_client.m_selectedDB);
1054 		}
1055 	}
1056 
1057 	// Same as listen, but blocking
1058 	void blisten(void delegate(string, string) @safe onMessage, Duration timeout = 0.seconds)
1059 	{
1060 		init();
1061 
1062 		void onSubscribe(string channel) @safe {
1063 			logTrace("Callback subscribe(%s)", channel);
1064 			m_subscriptions[channel] = true;
1065 			if (m_waiter != Task())
1066 				() @trusted { m_waiter.send(Action.SUBSCRIBE); } ();
1067 		}
1068 
1069 		void onUnsubscribe(string channel) @safe {
1070 			logTrace("Callback unsubscribe(%s)", channel);
1071 			m_subscriptions.remove(channel);
1072 			if (m_waiter != Task())
1073 				() @trusted { m_waiter.send(Action.UNSUBSCRIBE); } ();
1074 		}
1075 
1076 		void teardown() @safe { // teardown
1077 			logTrace("Redis listener exiting");
1078 			// More publish commands may be sent to this connection after recycling it, so we
1079 			// actively destroy it
1080 			m_lockedConnection.conn.close();
1081 			m_lockedConnection.destroy();
1082 			m_listening = false;
1083 		}
1084 		// http://redis.io/topics/pubsub
1085 		/**
1086 			 	SUBSCRIBE first second
1087 				*3
1088 				$9
1089 				subscribe
1090 				$5
1091 				first
1092 				:1
1093 				*3
1094 				$9
1095 				subscribe
1096 				$6
1097 				second
1098 				:2
1099 			*/
1100 		// This is a simple parser/handler for subscribe/unsubscribe/publish
1101 		// commands sent by redis. The PubSub client protocol is simple enough
1102 
1103 		void pubsub_handler() {
1104 			TCPConnection conn = m_lockedConnection.conn;
1105 			logTrace("Pubsub handler");
1106 			void dropCRLF() @safe {
1107 				ubyte[2] crlf;
1108 				conn.read(crlf);
1109 			}
1110 			size_t readArgs() @safe {
1111 				char[8] ucnt;
1112 				ubyte[1] num;
1113 				size_t i;
1114 				do {
1115 					conn.read(num);
1116 					if (num[0] >= 48 && num[0] <= 57)
1117 						ucnt[i] = num[0];
1118 					else break;
1119 					i++;
1120 				}
1121 				while (true); // ascii
1122 				ubyte[1] b;
1123 				conn.read(b);
1124 				logTrace("Found %s", ucnt);
1125 				// the new line is consumed when num is not in range.
1126 				return ucnt[0 .. i].to!size_t;
1127 			}
1128 			// find the number of arguments in the array
1129 			ubyte[1] symbol;
1130 			conn.read(symbol);
1131 			enforce(symbol[0] == '*', "Expected '*', got '" ~ symbol.to!string ~ "'");
1132 			size_t args = readArgs();
1133 			// get the number of characters in the first string (the command)
1134 			conn.read(symbol);
1135 			enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'");
1136 			size_t cnt = readArgs();
1137 			ubyte[] cmd = () @trusted { return theAllocator.makeArray!ubyte(cnt); } ();
1138 			scope(exit) () @trusted { theAllocator.dispose(cmd); } ();
1139 			conn.read(cmd);
1140 			dropCRLF();
1141 			// find the channel
1142 			conn.read(symbol);
1143 			enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'");
1144 			cnt = readArgs();
1145 			ubyte[] str = new ubyte[cnt];
1146 			conn.read(str);
1147 			dropCRLF();
1148 			string channel = () @trusted { return cast(string)str; } ();
1149 			logTrace("chan: %s", channel);
1150 
1151 			if (cmd == "message") { // find the message
1152 				conn.read(symbol);
1153 				enforce(symbol[0] == '$', "Expected '$', got '" ~ symbol.to!string ~ "'");
1154 				cnt = readArgs();
1155 				str = new ubyte[cnt];
1156 				conn.read(str); // channel
1157 				string message = () @trusted { return cast(string)str.idup; } ();
1158 				logTrace("msg: %s", message);
1159 				dropCRLF();
1160 				onMessage(channel, message);
1161 			}
1162 			else if (cmd == "subscribe" || cmd == "unsubscribe") { // find the remaining subscriptions
1163 				bool is_subscribe = (cmd == "subscribe");
1164 				conn.read(symbol);
1165 				enforce(symbol[0] == ':', "Expected ':', got '" ~ symbol.to!string ~ "'");
1166 				cnt = readArgs(); // number of subscriptions
1167 				logTrace("subscriptions: %d", cnt);
1168 				if (is_subscribe)
1169 					onSubscribe(channel);
1170 				else
1171 					onUnsubscribe(channel);
1172 
1173 				// todo: enforce the number of subscriptions?
1174 			}
1175 			else assert(false, "Unrecognized pubsub wire protocol command received");
1176 		}
1177 
1178 		// Waits for data and advises the handler
1179 		m_listenerHelper = runTask(() nothrow {
1180 			loop: while(true) {
1181 				if (m_stop || !m_lockedConnection.conn) break;
1182 
1183 				try {
1184 					const waitResult = m_lockedConnection.conn.waitForDataEx(100.msecs);
1185 					if (m_stop) break;
1186 
1187 					final switch (waitResult) {
1188 						case WaitForDataStatus.noMoreData:
1189 							break loop;
1190 						case WaitForDataStatus.timeout:
1191 							logTrace("No data arrival in 100 ms...");
1192 							continue loop;
1193 						case WaitForDataStatus.dataAvailable:
1194 					}
1195 
1196 					// Data has arrived, this task is in charge of notifying the main handler loop
1197 					logTrace("Notify data arrival");
1198 
1199 					() @trusted { receiveTimeout(0.seconds, (Variant v) {}); } (); // clear message queue
1200 					() @trusted { m_listener.send(Action.DATA); } ();
1201 					if (!() @trusted { return receiveTimeout(5.seconds, (Action act) { assert(act == Action.DATA); }); } ())
1202 						assert(false);
1203 				} catch (Exception e) {
1204 					logException(e, "Redis listen task failed - stopping to listen");
1205 					break;
1206 				}
1207 			}
1208 
1209 			logTrace("Listener Helper exit.");
1210 			try () @trusted { m_listener.send(Action.STOP); } ();
1211 			catch (Exception e) assert(false, e.msg);
1212 		} );
1213 
1214 		m_listening = true;
1215 		logTrace("Redis listener now listening");
1216 		if (m_waiter != Task())
1217 			() @trusted { m_waiter.send(Action.STARTED); } ();
1218 
1219 		if (timeout == 0.seconds)
1220 			timeout = 365.days; // make sure 0.seconds is considered as big.
1221 
1222 		scope(exit) {
1223 			logTrace("Redis Listener exit.");
1224 			if (!m_stop) {
1225 				stop(); // notifies the listenerHelper
1226 			}
1227 			m_listenerHelper.join();
1228 			// close the data connections
1229 			teardown();
1230 
1231 			if (m_waiter != Task())
1232 				() @trusted { m_waiter.send(Action.STOP); } ();
1233 			if (m_stopWaiter != Task())
1234 				() @trusted { m_stopWaiter.send(Action.STOP); } ();
1235 
1236 			m_listenerHelper = Task();
1237 			m_listener = Task();
1238 			m_stop = false;
1239 		}
1240 
1241 		// Start waiting for data notifications to arrive
1242 		while(true) {
1243 
1244 			auto handler = (Action act) {
1245 				if (act == Action.STOP) m_stop = true;
1246 				if (m_stop) return;
1247 				logTrace("Calling PubSub Handler");
1248 				m_connMutex.performLocked!({
1249 					pubsub_handler(); // handles one command at a time
1250 				});
1251 				() @trusted { m_listenerHelper.send(Action.DATA); } ();
1252 			};
1253 
1254 			if (!() @trusted { return receiveTimeout(timeout, handler); } () || m_stop) {
1255 				logTrace("Redis Listener stopped");
1256 				break;
1257 			}
1258 
1259 		}
1260 
1261 	}
1262 
1263 	/// Waits for messages and calls the callback with the channel and the message as arguments.
1264 	/// The timeout is passed over to the listener, which closes after the period of inactivity.
1265 	/// Use 0.seconds timeout to specify a very long time (365 days)
1266 	/// Errors will be sent to Callback Delegate on channel "Error".
1267 	Task listen(void delegate(string, string) @safe nothrow callback, Duration timeout = 0.seconds)
1268 	{
1269 		logTrace("Listen");
1270 		void impl() @safe {
1271 			logTrace("Listen");
1272 			m_waiter = Task.getThis();
1273 			scope(exit) m_waiter = Task();
1274 			Throwable ex;
1275 			m_listener = runTask(() nothrow {
1276 				try blisten(callback, timeout);
1277 				catch(Exception e) {
1278 					ex = e;
1279 					if (m_waiter != Task() && !m_listening) {
1280 						try () @trusted { m_waiter.send(Action.STARTED); } ();
1281 						catch (Exception e) assert(false, e.msg);
1282 						return;
1283 					}
1284 					callback("Error", e.msg);
1285 				}
1286 			});
1287 			m_mutex.performLocked!({
1288 				import std.datetime : usecs;
1289 				() @trusted { receiveTimeout(2.seconds, (Action act) { assert( act == Action.STARTED); }); } ();
1290 				if (ex) throw ex;
1291 				enforce(m_listening, "Failed to start listening, timeout of 2 seconds expired");
1292 			});
1293 
1294 			foreach (channel; m_pendingSubscriptions) {
1295 				subscribe(channel);
1296 			}
1297 
1298 			m_pendingSubscriptions = null;
1299 		}
1300 		inTask(&impl);
1301 		return m_listener;
1302 	}
1303 }
1304 
1305 
1306 
1307 /** Range interface to a single Redis reply.
1308 */
1309 struct RedisReply(T = ubyte[]) {
1310 	static assert(isInputRange!RedisReply);
1311 
1312 	private {
1313 		uint m_magic = 0x15f67ab3;
1314 		RedisConnection m_conn;
1315 		LockedConnection!RedisConnection m_lockedConnection;
1316 	}
1317 
1318 	alias ElementType = T;
1319 
1320 	private this(RedisConnection conn)
1321 	{
1322 		m_conn = conn;
1323 		auto ctx = &conn.m_replyContext;
1324 		assert(ctx.refCount == 0);
1325 		*ctx = RedisReplyContext.init;
1326 		ctx.refCount++;
1327 	}
1328 
1329 	this(this)
1330 	{
1331 		assert(m_magic == 0x15f67ab3);
1332 		if (m_conn) {
1333 			auto ctx = &m_conn.m_replyContext;
1334 			assert(ctx.refCount > 0);
1335 			ctx.refCount++;
1336 		}
1337 	}
1338 
1339 	~this()
1340 	{
1341 		assert(m_magic == 0x15f67ab3);
1342 		if (m_conn) {
1343 			if (!--m_conn.m_replyContext.refCount)
1344 				drop();
1345 		}
1346 	}
1347 
1348 	@property bool empty() const { return !m_conn || m_conn.m_replyContext.index >= m_conn.m_replyContext.length; }
1349 
1350 	/** Returns the current element of the reply.
1351 
1352 		Note that byte and character arrays may be returned as slices to a
1353 		temporary buffer. This buffer will be invalidated on the next call to
1354 		$(D popFront), so it needs to be duplicated for permanent storage.
1355 	*/
1356 	@property T front()
1357 	{
1358 		assert(!empty, "Accessing the front of an empty RedisReply!");
1359 		auto ctx = &m_conn.m_replyContext;
1360 		if (!ctx.hasData) readData();
1361 
1362 		ubyte[] ret = ctx.data;
1363 
1364 		return convertToType!T(ret);
1365 	}
1366 
1367 	@property bool frontIsNull()
1368 	const {
1369 		assert(!empty, "Accessing the front of an empty RedisReply!");
1370 		return m_conn.m_replyContext.frontIsNull;
1371 	}
1372 
1373 	/** Pops the current element of the reply
1374 	*/
1375 	void popFront()
1376 	{
1377 		assert(!empty, "Popping the front of an empty RedisReply!");
1378 
1379 		auto ctx = &m_conn.m_replyContext;
1380 
1381 		if (!ctx.hasData) readData(); // ensure that we actually read the data entry from the wire
1382 		clearData();
1383 		ctx.index++;
1384 
1385 		if (ctx.index >= ctx.length && ctx.refCount == 1) {
1386 			ctx.refCount = 0;
1387 			m_conn = null;
1388 			m_lockedConnection.destroy();
1389 		}
1390 	}
1391 
1392 
1393 	/// Legacy property for hasNext/next based iteration
1394 	@property bool hasNext() const { return !empty; }
1395 
1396 	/// Legacy property for hasNext/next based iteration
1397 	TN next(TN : E[], E)()
1398 	{
1399 		assert(hasNext, "end of reply");
1400 
1401 		auto ret = front.dup;
1402 		popFront();
1403 		return () @trusted { return cast(TN)ret; } ();
1404 	}
1405 
1406 	void drop()
1407 	{
1408 		if (!m_conn) return;
1409 		while (!empty) popFront();
1410 	}
1411 
1412 	private void readData()
1413 	{
1414 		auto ctx = &m_conn.m_replyContext;
1415 		assert(!ctx.hasData && ctx.initialized);
1416 
1417 		if (ctx.multi)
1418 			readBulk(() @trusted { return cast(string)m_conn.conn.readLine(); } ());
1419 	}
1420 
1421 	private void clearData()
1422 	{
1423 		auto ctx = &m_conn.m_replyContext;
1424 		ctx.data = null;
1425 		ctx.hasData = false;
1426 	}
1427 
1428 	private @property void lockedConnection(ref LockedConnection!RedisConnection conn)
1429 	{
1430 		assert(m_conn !is null);
1431 		m_lockedConnection = conn;
1432 	}
1433 
1434 	private void initialize()
1435 	{
1436 		assert(m_conn !is null);
1437 		auto ctx = &m_conn.m_replyContext;
1438 		assert(!ctx.initialized);
1439 		ctx.initialized = true;
1440 
1441 		ubyte[] ln = m_conn.conn.readLine();
1442 
1443 		switch (ln[0]) {
1444 			default:
1445 				m_conn.conn.close();
1446 				throw new Exception(format("Unknown reply type: %s", cast(char)ln[0]));
1447 			case '+': ctx.data = ln[1 .. $]; ctx.hasData = true; break;
1448 			case '-': throw new Exception(() @trusted { return cast(string)ln[1 .. $]; } ());
1449 			case ':': ctx.data = ln[1 .. $]; ctx.hasData = true; break;
1450 			case '$':
1451 				readBulk(() @trusted { return cast(string)ln; } ());
1452 				break;
1453 			case '*':
1454 				if (ln.startsWith(cast(const(ubyte)[])"*-1")) {
1455 					ctx.length = 0; // TODO: make this NIL reply distinguishable from a 0-length array
1456 				} else {
1457 					ctx.multi = true;
1458 					scope (failure) m_conn.conn.close();
1459 					ctx.length = to!long(() @trusted { return cast(string)ln[1 .. $]; } ());
1460 				}
1461 				break;
1462 		}
1463 	}
1464 
1465 	private void readBulk(string sizeLn)
1466 	{
1467 		assert(m_conn !is null);
1468 		auto ctx = &m_conn.m_replyContext;
1469 		if (sizeLn.startsWith("$-1")) {
1470 			ctx.frontIsNull = true;
1471 			ctx.hasData = true;
1472 			ctx.data = null;
1473 		} else {
1474 			auto size = to!size_t(sizeLn[1 .. $]);
1475 			auto data = new ubyte[size];
1476 			m_conn.conn.read(data);
1477 			m_conn.conn.readLine();
1478 			ctx.frontIsNull = false;
1479 			ctx.hasData = true;
1480 			ctx.data = data;
1481 		}
1482 	}
1483 }
1484 
1485 class RedisProtocolException : Exception {
1486 	this(string message, string file = __FILE__, size_t line = __LINE__, Exception next = null)
1487 	{
1488 		super(message, file, line, next);
1489 	}
1490 }
1491 
1492 template isValidRedisValueReturn(T)
1493 {
1494 	import std.typecons;
1495 	static if (isInstanceOf!(Nullable, T)) {
1496 		enum isValidRedisValueReturn = isValidRedisValueType!(typeof(T.init.get()));
1497 	} else static if (isInstanceOf!(RedisReply, T)) {
1498 		enum isValidRedisValueReturn = isValidRedisValueType!(T.ElementType);
1499 	} else enum isValidRedisValueReturn = isValidRedisValueType!T;
1500 }
1501 
1502 template isValidRedisValueType(T)
1503 {
1504 	enum isValidRedisValueType = is(T : const(char)[]) || is(T : const(ubyte)[]) || is(T == long) || is(T == double) || is(T == bool);
1505 }
1506 
1507 private RedisReply!T getReply(T = ubyte)(RedisConnection conn)
1508 {
1509 	auto repl = RedisReply!T(conn);
1510 	repl.initialize();
1511 	return repl;
1512 }
1513 
1514 private struct RedisReplyContext {
1515 	long refCount = 0;
1516 	ubyte[] data;
1517 	bool hasData;
1518 	bool multi = false;
1519 	bool initialized = false;
1520 	bool frontIsNull = false;
1521 	long length = 1;
1522 	long index = 0;
1523 	ubyte[128] dataBuffer;
1524 }
1525 
1526 private final class RedisConnection {
1527 	private {
1528 		string m_host;
1529 		ushort m_port;
1530 		TCPConnection m_conn;
1531 		string m_password;
1532 		long m_selectedDB;
1533 		RedisReplyContext m_replyContext;
1534 	}
1535 
1536 	this(string host, ushort port)
1537 	{
1538 		m_host = host;
1539 		m_port = port;
1540 	}
1541 
1542 	@property TCPConnection conn() nothrow { return m_conn; }
1543 	@property void conn(TCPConnection conn) nothrow { m_conn = conn; }
1544 
1545 	void setAuth(string password)
1546 	{
1547 		if (m_password == password) return;
1548 		_request_reply(this, "AUTH", password);
1549 		m_password = password;
1550 	}
1551 
1552 	void setDB(long index)
1553 	{
1554 		if (index == m_selectedDB) return;
1555 		_request_reply(this, "SELECT", index);
1556 		m_selectedDB = index;
1557 	}
1558 
1559 	private static long countArgs(ARGS...)(scope ARGS args)
1560 	{
1561 		long ret = 0;
1562 		foreach (i, A; ARGS) {
1563 			static if (isArray!A && !(is(A : const(ubyte[])) || is(A : const(char[])))) {
1564 				foreach (arg; args[i])
1565 					ret += countArgs(arg);
1566 			} else ret++;
1567 		}
1568 		return ret;
1569 	}
1570 
1571 	unittest {
1572 		assert(countArgs() == 0);
1573 		assert(countArgs(1, 2, 3) == 3);
1574 		assert(countArgs("1", ["2", "3", "4"]) == 4);
1575 		assert(countArgs([["1", "2"], ["3"]]) == 3);
1576 	}
1577 
1578 	private static void writeArgs(R, ARGS...)(R dst, scope ARGS args)
1579 		if (isOutputRange!(R, char))
1580 	{
1581 		foreach (i, A; ARGS) {
1582 			static if (is(A == bool)) {
1583 				writeArgs(dst, args[i] ? "1" : "0");
1584 			} else static if (is(A : long) || is(A : real) || is(A == string)) {
1585 				auto alen = formattedLength(args[i]);
1586 				enum fmt = "$%d\r\n"~typeFormatString!A~"\r\n";
1587 				dst.formattedWrite(fmt, alen, args[i]);
1588 			} else static if (is(A : const(ubyte[])) || is(A : const(char[]))) {
1589 				dst.formattedWrite("$%s\r\n", args[i].length);
1590 				dst.put(args[i]);
1591 				dst.put("\r\n");
1592 			} else static if (isArray!A) {
1593 				foreach (arg; args[i])
1594 					writeArgs(dst, arg);
1595 			} else static assert(false, "Unsupported Redis argument type: " ~ A.stringof);
1596 		}
1597 	}
1598 
1599 	unittest {
1600 		import std.array : appender;
1601 		auto dst = appender!string;
1602 		writeArgs(dst, false, true, ["2", "3"], "4", 5.0);
1603 		assert(dst.data == "$1\r\n0\r\n$1\r\n1\r\n$1\r\n2\r\n$1\r\n3\r\n$1\r\n4\r\n$1\r\n5\r\n");
1604 	}
1605 
1606 	private static long formattedLength(ARG)(scope ARG arg)
1607 	{
1608 		static if (is(ARG == string)) return arg.length;
1609 		else {
1610 			import vibe.internal.rangeutil;
1611 			long length;
1612 			auto rangeCnt = RangeCounter(() @trusted { return &length; } ());
1613 			rangeCnt.formattedWrite(typeFormatString!ARG, arg);
1614 			return length;
1615 		}
1616 	}
1617 }
1618 
1619 private void _request_void(ARGS...)(RedisConnection conn, string command, scope ARGS args)
1620 {
1621 	import vibe.stream.wrapper;
1622 
1623 	if (!conn.conn || !conn.conn.connected) {
1624 		try conn.conn = connectTCP(conn.m_host, conn.m_port);
1625 		catch (Exception e) {
1626 			throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e);
1627 		}
1628 		conn.conn.tcpNoDelay = true;
1629 	}
1630 
1631 	auto nargs = conn.countArgs(args);
1632 	auto rng = streamOutputRange(conn.conn);
1633 	formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command);
1634 	RedisConnection.writeArgs(() @trusted { return &rng; } (), args);
1635 }
1636 
1637 private RedisReply!T _request_reply(T = ubyte[], ARGS...)(RedisConnection conn, string command, scope ARGS args)
1638 {
1639 	import vibe.stream.wrapper;
1640 
1641 	if (!conn.conn || !conn.conn.connected) {
1642 		try conn.conn = connectTCP(conn.m_host, conn.m_port);
1643 		catch (Exception e) {
1644 			throw new Exception(format("Failed to connect to Redis server at %s:%s.", conn.m_host, conn.m_port), __FILE__, __LINE__, e);
1645 		}
1646 		conn.conn.tcpNoDelay = true;
1647 	}
1648 
1649 	auto nargs = conn.countArgs(args);
1650 	auto rng = streamOutputRange(conn.conn);
1651 	formattedWrite(() @trusted { return &rng; } (), "*%d\r\n$%d\r\n%s\r\n", nargs + 1, command.length, command);
1652 	RedisConnection.writeArgs(() @trusted { return &rng; } (), args);
1653 	rng.flush();
1654 
1655 	return conn.getReply!T;
1656 }
1657 
1658 private T _request(T, ARGS...)(LockedConnection!RedisConnection conn, string command, scope ARGS args)
1659 {
1660 	import std.typecons;
1661 	static if (isInstanceOf!(RedisReply, T)) {
1662 		auto reply = _request_reply!(T.ElementType)(conn, command, args);
1663 		reply.lockedConnection = conn;
1664 		return reply;
1665 	} else static if (is(T == void)) {
1666 		_request_reply(conn, command, args);
1667 	} else static if (isInstanceOf!(Nullable, T)) {
1668 		alias TB = typeof(T.init.get());
1669 		auto reply = _request_reply!TB(conn, command, args);
1670 		T ret;
1671 		if (!reply.frontIsNull) ret = reply.front;
1672 		return ret;
1673 	} else {
1674 		auto reply = _request_reply!T(conn, command, args);
1675 		return reply.front;
1676 	}
1677 }
1678 
1679 private T convertToType(T)(ubyte[] data) /// NOTE: data must be unique!
1680 {
1681 	static if (isSomeString!T) () @trusted { validate(cast(T)data); } ();
1682 
1683 	static if (is(T == ubyte[])) return data;
1684 	else static if (is(T == string)) return cast(T)data.idup;
1685 	else static if (is(T == bool)) return data[0] == '1';
1686 	else static if (is(T == int) || is(T == long) || is(T == size_t) || is(T == double)) {
1687 		auto str = () @trusted { return cast(string)data; } ();
1688 		return parse!T(str);
1689 	}
1690 	else static assert(false, "Unsupported Redis reply type: " ~ T.stringof);
1691 }
1692 
1693 private template typeFormatString(T)
1694 {
1695 	static if (isFloatingPoint!T) enum typeFormatString = "%.16g";
1696 	else enum typeFormatString = "%s";
1697 }