1 /** 2 Generic connection pool for reusing persistent connections across fibers. 3 4 Copyright: © 2012 RejectedSoftware e.K. 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.core.connectionpool; 9 10 import vibe.core.log; 11 import vibe.core.driver; 12 13 import core.thread; 14 import vibe.core.sync; 15 import vibe.internal.freelistref; 16 17 /** 18 Generic connection pool class. 19 20 The connection pool is creating connections using the supplied factory 21 function as needed whenever `lockConnection` is called. Connections are 22 associated to the calling fiber, as long as any copy of the returned 23 `LockedConnection` object still exists. Connections that are not associated 24 to any fiber will be kept in a pool of open connections for later reuse. 25 26 Note that, after retrieving a connection with `lockConnection`, the caller 27 has to make sure that the connection is actually physically open and to 28 reopen it if necessary. The `ConnectionPool` class has no knowledge of the 29 internals of the connection objects. 30 */ 31 class ConnectionPool(Connection) 32 { 33 @safe: 34 private { 35 Connection delegate() m_connectionFactory; 36 Connection[] m_connections; 37 int[const(Connection)] m_lockCount; 38 FreeListRef!LocalTaskSemaphore m_sem; 39 debug Thread m_thread; 40 } 41 42 this(Connection delegate() @safe connection_factory, uint max_concurrent = uint.max) 43 { 44 m_connectionFactory = connection_factory; 45 () @trusted { m_sem = FreeListRef!LocalTaskSemaphore(max_concurrent); } (); 46 debug m_thread = () @trusted { return Thread.getThis(); } (); 47 } 48 49 /// Scheduled for deprecation - use an `@safe` callback instead 50 this(Connection delegate() connection_factory, uint max_concurrent = uint.max) 51 @system { 52 this(cast(Connection delegate() @safe)connection_factory, max_concurrent); 53 } 54 55 /** Determines the maximum number of concurrently open connections. 56 57 Attempting to lock more connections that this number will cause the 58 calling fiber to be blocked until one of the locked connections 59 becomes available for reuse. 60 */ 61 @property void maxConcurrency(uint max_concurrent) { 62 m_sem.maxLocks = max_concurrent; 63 } 64 /// ditto 65 @property uint maxConcurrency() { 66 return m_sem.maxLocks; 67 } 68 69 /** Retrieves a connection to temporarily associate with the calling fiber. 70 71 The returned `LockedConnection` object uses RAII and reference counting 72 to determine when to unlock the connection. 73 */ 74 LockedConnection!Connection lockConnection() 75 { 76 debug assert(m_thread is () @trusted { return Thread.getThis(); } (), "ConnectionPool was called from a foreign thread!"); 77 78 m_sem.lock(); 79 size_t cidx = size_t.max; 80 foreach( i, c; m_connections ){ 81 auto plc = c in m_lockCount; 82 if( !plc || *plc == 0 ){ 83 cidx = i; 84 break; 85 } 86 } 87 88 Connection conn; 89 if( cidx != size_t.max ){ 90 logTrace("returning %s connection %d of %d", Connection.stringof, cidx, m_connections.length); 91 conn = m_connections[cidx]; 92 } else { 93 logDebug("creating new %s connection, all %d are in use", Connection.stringof, m_connections.length); 94 conn = m_connectionFactory(); // NOTE: may block 95 logDebug(" ... %s", () @trusted { return cast(void*)conn; } ()); 96 } 97 m_lockCount[conn] = 1; 98 if( cidx == size_t.max ){ 99 m_connections ~= conn; 100 logDebug("Now got %d connections", m_connections.length); 101 } 102 auto ret = LockedConnection!Connection(this, conn); 103 return ret; 104 } 105 } 106 107 struct LockedConnection(Connection) { 108 @safe: 109 private { 110 ConnectionPool!Connection m_pool; 111 Task m_task; 112 Connection m_conn; 113 debug uint m_magic = 0xB1345AC2; 114 } 115 116 private this(ConnectionPool!Connection pool, Connection conn) 117 { 118 assert(conn !is null); 119 m_pool = pool; 120 m_conn = conn; 121 m_task = Task.getThis(); 122 } 123 124 this(this) 125 { 126 debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted."); 127 if( m_conn ){ 128 auto fthis = Task.getThis(); 129 assert(fthis is m_task); 130 m_pool.m_lockCount[m_conn]++; 131 logTrace("conn %s copy %d", () @trusted { return cast(void*)m_conn; } (), m_pool.m_lockCount[m_conn]); 132 } 133 } 134 135 ~this() 136 { 137 debug assert(m_magic == 0xB1345AC2, "LockedConnection value corrupted."); 138 if( m_conn ){ 139 auto fthis = Task.getThis(); 140 assert(fthis is m_task, "Locked connection destroyed in foreign task."); 141 auto plc = m_conn in m_pool.m_lockCount; 142 assert(plc !is null); 143 assert(*plc >= 1); 144 //logTrace("conn %s destroy %d", cast(void*)m_conn, *plc-1); 145 if( --*plc == 0 ){ 146 () @trusted { m_pool.m_sem.unlock(); } (); 147 //logTrace("conn %s release", cast(void*)m_conn); 148 } 149 m_conn = null; 150 } 151 } 152 153 154 @property int __refCount() const { return m_pool.m_lockCount.get(m_conn, 0); } 155 @property inout(Connection) __conn() inout { return m_conn; } 156 157 alias __conn this; 158 }