1 // Written in D programming language 2 /** 3 * Module describes connection pool to data bases. Pool handles 4 * several connections to one or more sql servers. If connection 5 * is lost, pool tries to reconnect over $(B reconnectTime) duration. 6 * 7 * Copyright: © 2014 DSoftOut 8 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 9 * Authors: NCrashed <ncrashed@gmail.com> 10 */ 11 module pgator.db.pool; 12 13 import pgator.db.connection; 14 import pgator.db.pq.api; 15 import std.algorithm; 16 import std.datetime; 17 import std.range; 18 import core.time; 19 import vibe.data.bson; 20 21 /** 22 * The exception is thrown when there is no any free connection 23 * for $(B freeConnTimeout) duration while trying to lock one. 24 */ 25 class ConnTimeoutException : Exception 26 { 27 @safe pure nothrow this(string file = __FILE__, size_t line = __LINE__) 28 { 29 super("There is no any free connection to SQL servers!", file, line); 30 } 31 } 32 33 /** 34 * The exception is thrown when invalid transaction interface is passed to 35 * $(B isTransactionReady) and $(B getTransaction) methods. 36 */ 37 class UnknownTransactionException : Exception 38 { 39 @safe pure nothrow this(string file = __FILE__, size_t line = __LINE__) 40 { 41 super("There is no such transaction that is processing now!", file, line); 42 } 43 } 44 45 /** 46 * The exception is thrown when something bad has happen while 47 * query passing to server or loading from server. This exception 48 * has no bearing on the SQL errors. 49 */ 50 class QueryProcessingException : Exception 51 { 52 @safe pure nothrow this(string msg, string file = __FILE__, size_t line = __LINE__) 53 { 54 super(msg, file, line); 55 } 56 } 57 58 /** 59 * The exception is thrown when a sql query was marked as one row respond and 60 * the query returns a multiple rows. Transaction of the query is rollbacked. 61 */ 62 class OneRowConstraintException : QueryProcessingException 63 { 64 @safe pure nothrow this(string msg, string file = __FILE__, size_t line = __LINE__) 65 { 66 super(msg, file, line); 67 } 68 } 69 70 /** 71 * Pool handles several connections to one or more SQL servers. If 72 * connection is lost, pool tries to reconnect over $(B reconnectTime) 73 * duration. 74 * 75 * Example: 76 * -------- 77 * scope(exit) pool.finalize(); 78 * std.stdio.writeln(pool.timeZone); // know server time zone 79 * 80 * import pgator.db.pq.types.time; // for PGTimeStamp 81 * 82 * // blocking quering 83 * auto time = pool.execTransaction(["SELECT TIMESTAMP 'epoch' as field;"]).deserializeBson!PGTimeStamp; 84 * assert(time == SysTime.fromSimpleString("1970-Jan-01 00:00:00Z")); 85 * 86 * // or can use asynchronous alternative 87 * auto transaction = pool.postTransaction(["SELECT TIMESTAMP 'epoch' as field;"]); 88 * while(!isTransactionReady(transaction)) 89 * { 90 * // do something 91 * } 92 * time = getTransaction(transaction).deserializeBson!PGTimeStamp; 93 * assert(time == SysTime.fromSimpleString("1970-Jan-01 00:00:00Z")); 94 * -------- 95 */ 96 interface IConnectionPool 97 { 98 /** 99 * Adds connection string to a SQL server with 100 * maximum connections count. 101 * 102 * The pool will try to reconnect to the sql 103 * server every $(B reconnectTime) is connection 104 * is dropped (or is down initially). 105 */ 106 void addServer(string connString, size_t connNum) shared; 107 108 /** 109 * Performs several SQL $(B commands) on single connection 110 * wrapped in a transaction (BEGIN/COMMIT in PostgreSQL). 111 * Each command should use '$n' notation to refer $(B params) 112 * values. Before any command occurs in transaction the 113 * local SQL variables is set from $(B vars). 114 * 115 * $(B argnums) array holds information about which parameter 116 * from $(B params) should be passed to a query in $(B commands). 117 * If there are no params, the $(B argnums) can be empty. 118 * 119 * $(B oneRowConstraint) store info which query in $(B commands) have 120 * to have one row respond. If some query with the constraint switched on 121 * returns 0 or greater than 1 - transaction is rollbacked and OneRowConstraintException 122 * is thrown. 123 * 124 * Example: 125 * ----------- 126 * pool.execTransaction(["com1", "com2"], 127 * ["par1", "par2", "par3"], [1, 2]); 128 * // com1 is fed with par1 129 * // com2 is fed with par2 and par3 130 * ----------- 131 * 132 * Throws: ConnTimeoutException, QueryProcessingException, OneRowConstraintException 133 */ 134 InputRange!(immutable Bson) execTransaction(string[] commands 135 , string[] params = [], uint[] argnums = [] 136 , string[string] vars = null, bool[] oneRowConstraint = []) shared 137 in 138 { 139 assert(commands && params && argnums, "null reference"); 140 assert(argnums.reduce!"a+b" == params.length, "length of params is not equal argument count summ!"); 141 } 142 143 /** 144 * Asynchronous way to execute transaction. User can check 145 * transaction status by calling $(B isTransactionReady) method. 146 * When $(B isTransactionReady) method returns true, the 147 * transaction can be finalized by $(B getTransaction) method. 148 * 149 * Returns: Specific interface to distinct the query 150 * among others. 151 * 152 * $(B argnums) array holds information about which parameter 153 * from $(B params) should be passed to a query in $(B commands). 154 * If there are no params, the $(B argnums) can be empty. 155 * 156 * $(B oneRowConstraint) store info which query in $(B commands) have 157 * to have one row respond. If some query with the constraint switched on 158 * returns 0 or greater than 1 - transaction is rollbacked and OneRowConstraintException 159 * is thrown. 160 * 161 * Example: 162 * ----------- 163 * pool.postTransaction(["com1", "com2"], 164 * ["par1", "par2", "par3"], [1, 2]); 165 * // com1 is fed with par1 166 * // com2 is fed with par2 and par3 167 * ----------- 168 * 169 * See_Also: isTransactionReady, getTransaction. 170 * Throws: ConnTimeoutException 171 */ 172 immutable(ITransaction) postTransaction(string[] commands 173 , string[] params = [], uint[] argnums = [] 174 , string[string] vars = null, bool[] oneRowConstraint = []) shared 175 in 176 { 177 assert(commands && params && argnums, "null reference"); 178 assert(argnums.reduce!"a+b" == params.length, "length of params is not equal argument count summ!"); 179 } 180 181 /** 182 * Returns true if transaction processing is finished (doesn't 183 * matter the actual reason, error or transaction object is invalid, 184 * or successful completion). 185 * 186 * If the method returns true, then $(B getTransaction) method 187 * can be called in non-blocking manner. 188 * 189 * See_Also: postTransaction, getTransaction. 190 */ 191 bool isTransactionReady(immutable ITransaction transaction) shared; 192 193 /** 194 * Retrieves SQL result from specified transaction. 195 * 196 * If previously called $(B isTransactionReady) returns true, 197 * then the method is not blocking, else it falls back 198 * to $(B execTransaction) behavior. 199 * 200 * See_Also: postTransaction, isTransactionReady 201 * Throws: UnknownQueryException, QueryProcessingException, OneRowConstraintException 202 */ 203 InputRange!(immutable Bson) getTransaction(immutable ITransaction transaction) shared; 204 205 /** 206 * If connection to a SQL server is down, 207 * the pool tries to reestablish it every 208 * time units returned by the method. 209 */ 210 Duration reconnectTime() @property shared; 211 212 /** 213 * If there is no free connection for 214 * specified duration while trying to 215 * initialize SQL query, then the pool 216 * throws $(B ConnTimeoutException) exception. 217 */ 218 Duration freeConnTimeout() @property shared; 219 220 /** 221 * Returns current alive connections number. 222 */ 223 size_t activeConnections() @property shared; 224 225 /** 226 * Returns current frozen connections number. 227 */ 228 size_t inactiveConnections() @property shared; 229 230 /** 231 * Awaits all queries to finish and then closes each connection. 232 */ 233 synchronized void finalize(); 234 235 /** 236 * Returns date format used in ONE OF sql servers. 237 * Warning: This method can be trusted only the pool conns are connected 238 * to the same sql server. 239 * TODO: Make a way to get such configs for particular connection. 240 */ 241 DateFormat dateFormat() @property shared; 242 243 /** 244 * Returns timestamp format used in ONE OF sql servers. 245 * Warning: This method can be trusted only the pool conns are connected 246 * to the same sql server. 247 * TODO: Make a way to get such configs for particular connection. 248 */ 249 TimestampFormat timestampFormat() @property shared; 250 251 /** 252 * Returns server time zone used in ONE OF sql servers. 253 * Warning: This method can be trusted only the pool conns are connected 254 * to the same sql server. 255 * TODO: Make a way to get such configs for particular connection. 256 */ 257 immutable(TimeZone) timeZone() @property shared; 258 259 /** 260 * Returns first free connection from the pool. 261 * Throws: ConnTimeoutException 262 */ 263 protected shared(IConnection) fetchFreeConnection() shared; 264 265 /** 266 * Transaction that should be executed by one of connections 267 * on remote SQL server. Client code shouldn't know anything 268 * about the interface but only store it as unique key to acquire 269 * finished transaction. 270 */ 271 protected interface ITransaction {} 272 273 /** 274 * Returns $(B true) if the pool logs all transactions. 275 */ 276 bool loggingAllTransactions() shared const; 277 278 /** 279 * Enables/disables logging for all transactions. 280 */ 281 void loggingAllTransactions(bool val) shared; 282 }