1 /**
2 	Generic connection pool for reusing persistent connections across fibers.
3 
4 	Copyright: © 2012 RejectedSoftware e.K.
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.core.connectionpool;
9 
10 import vibe.core.log;
11 import vibe.core.driver;
12 
13 import core.thread;
14 import vibe.core.sync;
15 import vibe.internal.freelistref;
16 
17 /**
18 	Generic connection pool class.
19 
20 	The connection pool is creating connections using the supplied factory
21 	function as needed whenever `lockConnection` is called. Connections are
22 	associated to the calling fiber, as long as any copy of the returned
23 	`LockedConnection` object still exists. Connections that are not associated
24 	to any fiber will be kept in a pool of open connections for later reuse.
25 
26 	Note that, after retrieving a connection with `lockConnection`, the caller
27 	has to make sure that the connection is actually physically open and to
28 	reopen it if necessary. The `ConnectionPool` class has no knowledge of the
29 	internals of the connection objects.
30 */
31 class ConnectionPool(Connection)
32 {
33 	@safe:
34 	private {
35 		Connection delegate() m_connectionFactory;
36 		Connection[] m_connections;
37 		int[const(Connection)] m_lockCount;
38 		FreeListRef!LocalTaskSemaphore m_sem;
39 		debug Thread m_thread;
40 	}
41 
42 	this(Connection delegate() @safe connection_factory, uint max_concurrent = uint.max)
43 	{
44 		m_connectionFactory = connection_factory;
45 		() @trusted { m_sem = FreeListRef!LocalTaskSemaphore(max_concurrent); } ();
46 		debug m_thread = () @trusted { return Thread.getThis(); } ();
47 	}
48 
49 	/// Scheduled for deprecation - use an `@safe` callback instead
50 	this(Connection delegate() connection_factory, uint max_concurrent = uint.max)
51 	@system {
52 		this(cast(Connection delegate() @safe)connection_factory, max_concurrent);
53 	}
54 
55 	/** Determines the maximum number of concurrently open connections.
56 
57 		Attempting to lock more connections that this number will cause the
58 		calling fiber to be blocked until one of the locked connections
59 		becomes available for reuse.
60 	*/
61 	@property void maxConcurrency(uint max_concurrent) {
62 		m_sem.maxLocks = max_concurrent;
63 	}
64 	/// ditto
65 	@property uint maxConcurrency() {
66 		return m_sem.maxLocks;
67 	}
68 
69 	/** Retrieves a connection to temporarily associate with the calling fiber.
70 
71 		The returned `LockedConnection` object uses RAII and reference counting
72 		to determine when to unlock the connection.
73 	*/
74 	LockedConnection!Connection lockConnection()
75 	{
76 		debug assert(m_thread is () @trusted { return Thread.getThis(); } (), "ConnectionPool was called from a foreign thread!");
77 
78 		m_sem.lock();
79 		size_t cidx = size_t.max;
80 		foreach( i, c; m_connections ){
81 			auto plc = c in m_lockCount;
82 			if( !plc || *plc == 0 ){
83 				cidx = i;
84 				break;
85 			}
86 		}
87 
88 		Connection conn;
89 		if( cidx != size_t.max ){
90 			logTrace("returning %s connection %d of %d", Connection.stringof, cidx, m_connections.length);
91 			conn = m_connections[cidx];
92 		} else {
93 			logDebug("creating new %s connection, all %d are in use", Connection.stringof, m_connections.length);
94 			conn = m_connectionFactory(); // NOTE: may block
95 			logDebug(" ... %s", () @trusted { return cast(void*)conn; } ());
96 		}
97 		m_lockCount[conn] = 1;
98 		if( cidx == size_t.max ){
99 			m_connections ~= conn;
100 			logDebug("Now got %d connections", m_connections.length);
101 		}
102 		auto ret = LockedConnection!Connection(this, conn);
103 		return ret;
104 	}
105 }
106 
107 struct LockedConnection(Connection) {
108 	@safe:
109 	private {
110 		ConnectionPool!Connection m_pool;
111 		Task m_task;
112 		Connection m_conn;
113 		debug uint m_magic = 0xB1345AC2;
114 	}
115 
116 	private this(ConnectionPool!Connection pool, Connection conn)
117 	{
118 		assert(conn !is null);
119 		m_pool = pool;
120 		m_conn = conn;
121 		m_task = Task.getThis();
122 	}
123 
124 	this(this)
125 	{
126 		debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted.");
127 		if( m_conn ){
128 			auto fthis = Task.getThis();
129 			assert(fthis is m_task);
130 			m_pool.m_lockCount[m_conn]++;
131 			logTrace("conn %s copy %d", () @trusted { return cast(void*)m_conn; } (), m_pool.m_lockCount[m_conn]);
132 		}
133 	}
134 
135 	~this()
136 	{
137 		debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted.");
138 		if( m_conn ){
139 			auto fthis = Task.getThis();
140 			assert(fthis is m_task, "Locked connection destroyed in foreign task.");
141 			auto plc = m_conn in m_pool.m_lockCount;
142 			assert(plc !is null);
143 			assert(*plc >= 1);
144 			//logTrace("conn %s destroy %d", cast(void*)m_conn, *plc-1);
145 			if( --*plc == 0 ){
146 				() @trusted { m_pool.m_sem.unlock(); } ();
147 				//logTrace("conn %s release", cast(void*)m_conn);
148 			}
149 			m_conn = null;
150 		}
151 	}
152 
153 
154 	@property int __refCount() const { return m_pool.m_lockCount.get(m_conn, 0); }
155 	@property inout(Connection) __conn() inout { return m_conn; }
156 
157 	alias __conn this;
158 }