1 // Written in D programming language 2 /** 3 * Default implementation of IConnectionPool. It is consists of four worker thread that holding 4 * corresponding connection lists: closed connections, connecting connections, free connections and 5 * quering connections. 6 * 7 * Closed connections thread accepts connection to a SQL server and waits specified moment to 8 * start establishing process. All failed connections are passed into the worker to try again 9 * later. 10 * 11 * Connecting connections thread handles connection establishing procedure. Connection establishing 12 * process is done in asynchronous way (non-blocking polling). All new connections are passed to 13 * the worker. If connection is failed, it is passed to closed connections thread, else if it successes, 14 * it is passed to free connections thread. 15 * 16 * Free connections thread watch after idle connections. If one want to make a query, pool asks the 17 * free connections worker for one. If there is no free connection for specified amount of time, 18 * timeout exception is thrown, else returned connection is binded with transaction information and 19 * is sent to quering connections worker. Also free connections worker watches after health of each 20 * connection, if a free connection dies, it is sent to closed connections process to try to open later. 21 * 22 * And finally the most interesting one is quering connections worker. The worker accepts all requested 23 * transaction to be proceeded on a remote SQL server (several connection could be linked to different servers). 24 * Worker starts the transaction, setups all needed local variables and proceeds all requested commands, 25 * collects results and transfer them to pool inner queue of finished transactions. Transaction is ended with 26 * "COMMIT;" before sending connection to free connections worker. 27 * 28 * Copyright: © 2014 DSoftOut 29 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 30 * Authors: NCrashed <ncrashed@gmail.com> 31 */ 32 module pgator.db.async.pool; 33 34 import dlogg.log; 35 public import pgator.db.pool; 36 import pgator.db.connection; 37 import pgator.db.pq.api; 38 import pgator.util.list; 39 import std.algorithm; 40 import std.container; 41 import std.concurrency; 42 import std.datetime; 43 import std.exception; 44 import std.range; 45 import core.thread; 46 import core.atomic; 47 import vibe.core.core : yield; 48 import vibe.data.bson; 49 50 import pgator.db.async.respond; 51 import pgator.db.async.transaction; 52 import pgator.db.async.workers.handler; 53 import pgator.db.async.workers.closed; 54 import pgator.db.async.workers.free; 55 import pgator.db.async.workers.connecting; 56 import pgator.db.async.workers.query; 57 58 /** 59 * Describes asynchronous connection pool. 60 */ 61 class AsyncPool : IConnectionPool 62 { 63 this(shared ILogger logger, shared IConnectionProvider provider, Duration pReconnectTime, Duration pFreeConnTimeout, Duration pAliveCheckTime) shared 64 { 65 this.logger = logger; 66 this.provider = provider; 67 68 mReconnectTime = pReconnectTime; 69 mFreeConnTimeout = pFreeConnTimeout; 70 mAliveCheckTime = pAliveCheckTime; 71 72 ids = shared ThreadIds( spawn(&closedChecker, logger, reconnectTime) 73 , spawn(&freeChecker, logger, reconnectTime, aliveCheckTime) 74 , spawn(&connectingChecker, logger, reconnectTime) 75 , spawn(&queringChecker, logger)); 76 77 ids.sendTids; 78 } 79 80 /** 81 * Adds connection string to a SQL server with 82 * maximum connections count. 83 * 84 * The pool will try to reconnect to the sql 85 * server every $(B reconnectTime) is connection 86 * is dropped (or is down initially). 87 */ 88 void addServer(string connString, size_t connNum) shared 89 { 90 ///TODO: move to contract when issue with contracts is fixed 91 assert(!finalized, "Pool was finalized!"); 92 93 TimedConnList failedList; 94 DList!(shared IConnection) connsList; 95 foreach(i; 0..connNum) 96 { 97 auto conn = provider.allocate; 98 99 bool failed = false; 100 try 101 { 102 conn.connect(connString); 103 } 104 catch(ConnectException e) 105 { 106 failed = true; 107 logger.logError(e.msg); 108 static if (__VERSION__ < 2066) { 109 logger.logDebug("Will retry to connect to ", e.server, " over " 110 , reconnectTime.total!"seconds", ".", reconnectTime.fracSec.msecs, " seconds."); 111 } else { 112 logger.logDebug("Will retry to connect to ", e.server, " over " 113 , reconnectTime.total!"seconds", ".", reconnectTime.split!("seconds", "msecs").msecs, " seconds."); 114 } 115 116 auto whenRetry = TickDuration.currSystemTick + cast(TickDuration)reconnectTime; 117 failedList.insert(TimedConnListElem(conn, whenRetry)); 118 } 119 120 if(!failed) connsList.insert(conn); 121 } 122 123 foreach(conn; connsList) 124 ids.connectingCheckerId.send("add", conn); 125 foreach(elem; failedList) 126 ids.closedCheckerId.send("add", elem.conn, elem.duration); 127 } 128 129 /** 130 * Performs several SQL $(B commands) on single connection 131 * wrapped in a transaction (BEGIN/COMMIT in PostgreSQL). 132 * Each command should use '$n' notation to refer $(B params) 133 * values. Before any command occurs in transaction the 134 * local SQL variables is set from $(B vars). 135 * 136 * Throws: ConnTimeoutException, QueryProcessingException 137 */ 138 InputRange!(immutable Bson) execTransaction(string[] commands 139 , string[] params = [], uint[] argnums = [] 140 , string[string] vars = null, bool[] oneRowConstraint = []) shared 141 { 142 ///TODO: move to contract when issue with contracts is fixed 143 assert(!finalized, "Pool was finalized!"); 144 145 /// Workaround for gdc 146 if(vars is null) 147 { 148 string[string] empty; 149 vars = empty; 150 } 151 152 auto transaction = postTransaction(commands, params, argnums, vars, oneRowConstraint); 153 while(!isTransactionReady(transaction)) yield; 154 return getTransaction(transaction); 155 } 156 157 /** 158 * Asynchronous way to execute transaction. User can check 159 * transaction status by calling $(B isTransactionReady) method. 160 * When $(B isTransactionReady) method returns true, the 161 * transaction can be finalized by $(B getTransaction) method. 162 * 163 * Returns: Specific interface to distinct the query 164 * among others. 165 * See_Also: isTransactionReady, getTransaction. 166 * Throws: ConnTimeoutException 167 */ 168 immutable(ITransaction) postTransaction(string[] commands 169 , string[] params = [], uint[] argnums = [] 170 , string[string] vars = null, bool[] oneRowConstraint = []) shared 171 { 172 ///TODO: move to contract when issue with contracts are fixed 173 assert(!finalized, "Pool was finalized!"); 174 assert(oneRowConstraint.length == 0 || oneRowConstraint.length == commands.length 175 , "oneRowConstraint have to have length equal to commands length!"); 176 177 if(oneRowConstraint.length == 0) 178 { 179 oneRowConstraint = new bool[commands.length]; 180 oneRowConstraint[] = false; 181 } 182 183 /// Workaround for gdc 184 if(vars is null) 185 { 186 string[string] empty; 187 vars = empty; 188 } 189 190 if(params.length == 0 && argnums.length == 0) 191 { 192 argnums = 0u.repeat.take(commands.length).array; 193 } 194 195 auto conn = fetchFreeConnection(); 196 auto transaction = new immutable Transaction(commands, params, argnums, vars, oneRowConstraint); 197 processingTransactions.insert(cast(shared ITransaction)transaction); 198 199 ids.queringCheckerId.send(thisTid, conn, cast(shared)transaction); 200 201 if(loggingAllTransactions) 202 { 203 logger.logInfo("Transaction is posted:"); 204 logger.logInfo(transaction.text); 205 } 206 207 return transaction; 208 } 209 210 /** 211 * Returns true if transaction processing is finished (doesn't 212 * matter the actual reason, error or transaction object is invalid, 213 * or successful completion). 214 * 215 * If the method returns true, then $(B getTransaction) method 216 * can be called in non-blocking manner. 217 * 218 * See_Also: postTransaction, getTransaction. 219 */ 220 bool isTransactionReady(immutable ITransaction transaction) shared 221 { 222 ///TODO: move to contract when issue with contracts is fixed 223 assert(!finalized, "Pool was finalized!"); 224 225 scope(failure) return true; 226 227 if(processingTransactions[].find(cast(shared)transaction).empty) 228 return true; 229 230 fetchResponds(); 231 232 if(transaction in awaitingResponds) return true; 233 else return false; 234 } 235 236 /** 237 * Retrieves SQL result from specified transaction. 238 * 239 * If previously called $(B isTransactionReady) returns true, 240 * then the method is not blocking, else it falls back 241 * to $(B execTransaction) behavior. 242 * 243 * See_Also: postTransaction, isTransactionReady 244 * Throws: UnknownTransactionException, QueryProcessingException 245 */ 246 InputRange!(immutable Bson) getTransaction(immutable ITransaction transaction) shared 247 { 248 ///TODO: move to contract when issue with contracts is fixed 249 assert(!finalized, "Pool was finalized!"); 250 251 if(processingTransactions[].find(cast(shared)transaction).empty) 252 throw new UnknownTransactionException(); 253 254 if(transaction in awaitingResponds) 255 { 256 processingTransactions.removeOne(cast(shared)transaction); 257 258 auto tr = cast(Transaction)transaction; 259 assert(tr); 260 261 auto respond = awaitingResponds[transaction]; 262 awaitingResponds.remove(transaction); 263 264 void logMessages(void delegate(string msg) sink) 265 { 266 if(respond.msgs.length != 0) 267 { 268 sink("Following messages were raised from db:"); 269 foreach(msg; respond.msgs) 270 { 271 if(msg.length > 0 && msg[$-1] == '\n') sink(msg[0 .. $-1]); 272 else sink(msg); 273 } 274 } 275 } 276 277 if(respond.failed) 278 { 279 if(respond.onRowConstaintFailed) 280 { 281 logger.logError("Transaction failure: "); 282 logger.logError(text(tr)); 283 auto errMsg = text("Transaction ", respond.constraintFailQueryId, " fired single row constraint!" ); 284 logger.logError(errMsg); 285 throw new OneRowConstraintException(errMsg); 286 } 287 else 288 { 289 logger.logError("Transaction failure:"); 290 logger.logError(text(tr)); 291 logMessages((s) => logger.logError(s)); 292 logger.logError(respond.exception); 293 294 throw new QueryProcessingException(respond.exception); 295 } 296 } 297 else 298 { 299 if(respond.msgs.length != 0) 300 { 301 logMessages((s) => logger.logInfo(s)); 302 logger.logInfo("For transaction:"); 303 logger.logInfo(text(tr)); 304 } 305 306 return respond.result[].inputRangeObject; 307 } 308 } else 309 { 310 while(!isTransactionReady(transaction)) yield; 311 return getTransaction(transaction); 312 } 313 } 314 315 316 private void fetchResponds() shared 317 { 318 receiveTimeout(dur!"msecs"(1), 319 (Tid tid, shared Transaction transaction, Respond respond) 320 { 321 assert(cast(immutable)transaction !in awaitingResponds); 322 awaitingResponds[cast(immutable)transaction] = respond; 323 } 324 ); 325 } 326 327 /** 328 * If connection to a SQL server is down, 329 * the pool tries to reestablish it every 330 * time units returned by the method. 331 */ 332 Duration reconnectTime() @property shared 333 { 334 ///TODO: move to contract when issue with contracts is fixed 335 assert(!finalized, "Pool was finalized!"); 336 337 return mReconnectTime; 338 } 339 340 /** 341 * If there is no free connection for 342 * specified duration while trying to 343 * initialize SQL query, then the pool 344 * throws $(B ConnTimeoutException) exception. 345 */ 346 Duration freeConnTimeout() @property shared 347 { 348 ///TODO: move to contract when issue with contracts is fixed 349 assert(!finalized, "Pool was finalized!"); 350 351 return mFreeConnTimeout; 352 } 353 354 /** 355 * Free connections are checked over the time by 356 * sending senseless queries. Don't make this time 357 * too small - huge overhead for network. Don't make 358 * this time too big - you wouldn't able to detect 359 * failed connections due server restarting or e.t.c. 360 */ 361 Duration aliveCheckTime() @property shared 362 { 363 ///TODO: move to contract when issue with contracts is fixed 364 assert(!finalized, "Pool was finalized!"); 365 366 return mAliveCheckTime; 367 } 368 369 /** 370 * Returns current alive connections number. 371 * Warning: The method displays count of active connections at the moment, 372 * returned value can become invalid as soon as it returned due 373 * async nature of the pool. 374 */ 375 size_t activeConnections() @property shared 376 { 377 ///TODO: move to contract when issue with contracts is fixed 378 assert(!finalized, "Pool was finalized!"); 379 380 size_t freeCount, queringCount; 381 ids.freeCheckerId.send(thisTid, "length"); 382 ids.queringCheckerId.send(thisTid, "length"); 383 384 foreach(i;0..2) 385 try enforce(receiveTimeout(dur!"seconds"(1), 386 (Tid sender, size_t answer) 387 { 388 if(sender == ids.freeCheckerId) 389 freeCount = answer; 390 else if(sender == ids.queringCheckerId) 391 queringCount = answer; 392 } 393 ), "Async pool internal problem! Workers don't respond!"); 394 catch (LinkTerminated e) 395 { 396 logger.logError("Free conn or quering conn worker is dead!"); 397 freeCount = 0; 398 queringCount = 0; 399 } 400 401 return freeCount + queringCount; 402 } 403 404 /** 405 * Returns current frozen connections number. 406 * Warning: The method displays count of active connections at the moment, 407 * returned value can become invalid as soon as it returned due 408 * async nature of the pool. 409 */ 410 size_t inactiveConnections() @property shared 411 { 412 ///TODO: move to contract when issue with contracts is fixed 413 assert(!finalized, "Pool was finalized!"); 414 415 size_t closedCount, connectingCount; 416 ids.closedCheckerId.send(thisTid, "length"); 417 ids.connectingCheckerId.send(thisTid, "length"); 418 419 foreach(i;0..2) 420 try enforce(receiveTimeout(dur!"seconds"(1), 421 (Tid sender, size_t answer) 422 { 423 if(sender == ids.closedCheckerId) 424 closedCount = answer; 425 else if(sender == ids.connectingCheckerId) 426 connectingCount = answer; 427 } 428 ), "Async pool internal problem! Workers don't respond!"); 429 catch (LinkTerminated e) 430 { 431 logger.logError("Closed conn or connecting conn worker is dead!"); 432 closedCount = 0; 433 connectingCount = 0; 434 } 435 436 return closedCount + connectingCount; 437 } 438 439 size_t totalConnections() @property shared 440 { 441 ///TODO: move to contract when issue with contracts is fixed 442 assert(!finalized, "Pool was finalized!"); 443 444 size_t freeCount, queringCount; 445 size_t closedCount, connectingCount; 446 ids.freeCheckerId.send(thisTid, "length"); 447 ids.queringCheckerId.send(thisTid, "length"); 448 ids.closedCheckerId.send(thisTid, "length"); 449 ids.connectingCheckerId.send(thisTid, "length"); 450 451 foreach(i;0..4) 452 try enforce(receiveTimeout(dur!"seconds"(1), 453 (Tid sender, size_t answer) 454 { 455 if(sender == ids.freeCheckerId) 456 freeCount = answer; 457 else if(sender == ids.queringCheckerId) 458 queringCount = answer; 459 else if(sender == ids.closedCheckerId) 460 closedCount = answer; 461 else if(sender == ids.connectingCheckerId) 462 connectingCount = answer; 463 } 464 ), "Async pool internal problem! Workers don't respond!"); 465 catch (LinkTerminated e) 466 { 467 logger.logError("One of workers is dead!"); 468 freeCount = 0; 469 queringCount = 0; 470 closedCount = 0; 471 connectingCount = 0; 472 } 473 return freeCount + queringCount + closedCount + connectingCount; 474 } 475 476 /** 477 * Awaits all queries to finish and then closes each connection. 478 * Calls $(B callback) when connections are closed. 479 */ 480 synchronized void finalize() 481 { 482 if(finalized) return; 483 ids.finalize(logger); 484 finalized = true; 485 } 486 487 /** 488 * Returns first free connection from the pool. 489 * Throws: ConnTimeoutException 490 */ 491 protected shared(IConnection) fetchFreeConnection() shared 492 { 493 ///TODO: move to contract when issue with contracts is fixed 494 assert(!finalized, "Pool was finalized!"); 495 496 ids.freeCheckerId.send(thisTid, "get"); 497 shared IConnection res; 498 enforceEx!ConnTimeoutException(receiveTimeout(freeConnTimeout, 499 (Tid sender, shared IConnection conn) 500 { 501 res = conn; 502 } 503 )); 504 return res; 505 } 506 507 /** 508 * Returns date format used in ONE OF sql servers. 509 * Warning: This method can be trust only the pool conns are connected 510 * to the same sql server. 511 * TODO: Make a way to get such configs for particular connection. 512 */ 513 DateFormat dateFormat() @property shared 514 { 515 ///TODO: move to contract when issue with contracts is fixed 516 assert(!finalized, "Pool was finalized!"); 517 518 return fetchFreeConnection.dateFormat; 519 } 520 521 /** 522 * Returns timestamp format used in ONE OF sql servers. 523 * Warning: This method can be trust only the pool conns are connected 524 * to the same sql server. 525 * TODO: Make a way to get such configs for particular connection. 526 */ 527 TimestampFormat timestampFormat() @property shared 528 { 529 ///TODO: move to contract when issue with contracts is fixed 530 assert(!finalized, "Pool was finalized!"); 531 532 return fetchFreeConnection.timestampFormat; 533 } 534 535 /** 536 * Returns server time zone used in ONE OF sql servers. 537 * Warning: This method can be trusted only the pool conns are connected 538 * to the same sql server. 539 * TODO: Make a way to get such configs for particular connection. 540 */ 541 immutable(TimeZone) timeZone() @property shared 542 { 543 ///TODO: move to contract when issue with contracts is fixed 544 assert(!finalized, "Pool was finalized!"); 545 546 return fetchFreeConnection.timeZone; 547 } 548 549 /** 550 * Returns $(B true) if the pool logs all transactions. 551 */ 552 bool loggingAllTransactions() shared const 553 { 554 return mLoggingAll; 555 } 556 557 /** 558 * Enables/disables logging for all transactions. 559 */ 560 void loggingAllTransactions(bool val) shared 561 { 562 mLoggingAll.atomicStore(val); 563 } 564 565 private 566 { 567 shared ILogger logger; 568 __gshared DList!(shared ITransaction) processingTransactions; 569 Respond[immutable ITransaction] awaitingResponds; 570 IConnectionProvider provider; 571 Duration mReconnectTime; 572 Duration mFreeConnTimeout; 573 Duration mAliveCheckTime; 574 575 shared ThreadIds ids; 576 bool finalized = false; 577 bool mLoggingAll = false; 578 } 579 }