1 // Written in D programming language
2 /**
3 *   Default implementation of IConnectionPool. It is consists of four worker thread that holding
4 *   corresponding connection lists: closed connections, connecting connections, free connections and
5 *   quering connections.
6 *
7 *   Closed connections thread accepts connection to a SQL server and waits specified moment to
8 *   start establishing process. All failed connections are passed into the worker to try again 
9 *   later.
10 *
11 *   Connecting connections thread handles connection establishing procedure. Connection establishing
12 *   process is done in asynchronous way (non-blocking polling). All new connections are passed to
13 *   the worker. If connection is failed, it is passed to closed connections thread, else if it successes,
14 *   it is passed to free connections thread.
15 *
16 *   Free connections thread watch after idle connections. If one want to make a query, pool asks the
17 *   free connections worker for one. If there is no free connection for specified amount of time,
18 *   timeout exception is thrown, else returned connection is binded with transaction information and
19 *   is sent to quering connections worker. Also free connections worker watches after health of each
20 *   connection, if a free connection dies, it is sent to closed connections process to try to open later.
21 *   
22 *   And finally the most interesting one is quering connections worker. The worker accepts all requested
23 *   transaction to be proceeded on a remote SQL server (several connection could be linked to different servers).
24 *   Worker starts the transaction, setups all needed local variables and proceeds all requested commands,
25 *   collects results and transfer them to pool inner queue of finished transactions. Transaction is ended with 
26 *   "COMMIT;" before sending connection to free connections worker.
27 *
28 *   Copyright: © 2014 DSoftOut
29 *   License: Subject to the terms of the MIT license, as written in the included LICENSE file.
30 *   Authors: NCrashed <ncrashed@gmail.com>
31 */
32 module pgator.db.async.pool;
33 
34 import dlogg.log;
35 public import pgator.db.pool;
36 import pgator.db.connection;
37 import pgator.db.pq.api;
38 import pgator.util.list;
39 import std.algorithm;
40 import std.container;
41 import std.concurrency;
42 import std.datetime;
43 import std.exception;
44 import std.range;
45 import core.thread;
46 import core.atomic;
47 import vibe.core.core : yield;
48 import vibe.data.bson;  
49 
50 import pgator.db.async.respond;
51 import pgator.db.async.transaction;
52 import pgator.db.async.workers.handler;
53 import pgator.db.async.workers.closed;
54 import pgator.db.async.workers.free;
55 import pgator.db.async.workers.connecting;
56 import pgator.db.async.workers.query; 
57  
58 /**
59 *    Describes asynchronous connection pool.
60 */
61 class AsyncPool : IConnectionPool
62 {
63     this(shared ILogger logger, shared IConnectionProvider provider, Duration pReconnectTime, Duration pFreeConnTimeout, Duration pAliveCheckTime) shared
64     {
65         this.logger = logger;
66         this.provider = provider;
67 
68         mReconnectTime   = pReconnectTime;
69         mFreeConnTimeout = pFreeConnTimeout;      
70         mAliveCheckTime  = pAliveCheckTime;
71         
72         ids = shared ThreadIds(  spawn(&closedChecker, logger, reconnectTime)
73                         , spawn(&freeChecker, logger, reconnectTime, aliveCheckTime)
74                         , spawn(&connectingChecker, logger, reconnectTime)
75                         , spawn(&queringChecker, logger));
76 
77         ids.sendTids;
78     }
79     
80     /**
81     *    Adds connection string to a SQL server with
82     *    maximum connections count.
83     *
84     *    The pool will try to reconnect to the sql 
85     *    server every $(B reconnectTime) is connection
86     *    is dropped (or is down initially).
87     */
88     void addServer(string connString, size_t connNum) shared
89     {
90         ///TODO: move to contract when issue with contracts is fixed
91         assert(!finalized, "Pool was finalized!");
92         
93         TimedConnList failedList;
94         DList!(shared IConnection) connsList;
95         foreach(i; 0..connNum)
96         {
97             auto conn = provider.allocate;
98 
99             bool failed = false;
100             try
101             {
102                 conn.connect(connString);
103             }
104             catch(ConnectException e)
105             {
106                 failed = true;
107                 logger.logError(e.msg);
108                 static if (__VERSION__ < 2066) {
109 	                logger.logDebug("Will retry to connect to ", e.server, " over "
110 	                       , reconnectTime.total!"seconds", ".", reconnectTime.fracSec.msecs, " seconds.");
111                 } else {
112 	                logger.logDebug("Will retry to connect to ", e.server, " over "
113 	                       , reconnectTime.total!"seconds", ".", reconnectTime.split!("seconds", "msecs").msecs, " seconds.");
114                 }
115                 
116                 auto whenRetry = TickDuration.currSystemTick + cast(TickDuration)reconnectTime;
117                 failedList.insert(TimedConnListElem(conn, whenRetry));
118             }
119             
120             if(!failed) connsList.insert(conn);
121         }
122 
123         foreach(conn; connsList)
124             ids.connectingCheckerId.send("add", conn);
125         foreach(elem; failedList)
126             ids.closedCheckerId.send("add", elem.conn, elem.duration);
127     }
128     
129     /**
130     *   Performs several SQL $(B commands) on single connection
131     *   wrapped in a transaction (BEGIN/COMMIT in PostgreSQL).
132     *   Each command should use '$n' notation to refer $(B params)
133     *   values. Before any command occurs in transaction the
134     *   local SQL variables is set from $(B vars). 
135     *
136     *   Throws: ConnTimeoutException, QueryProcessingException
137     */
138     InputRange!(immutable Bson) execTransaction(string[] commands
139         , string[] params = [], uint[] argnums = []
140         , string[string] vars = null, bool[] oneRowConstraint = []) shared
141     {
142         ///TODO: move to contract when issue with contracts is fixed
143         assert(!finalized, "Pool was finalized!");
144         
145         /// Workaround for gdc
146         if(vars is null)
147         {
148             string[string] empty;
149             vars = empty;
150         }
151         
152         auto transaction = postTransaction(commands, params, argnums, vars, oneRowConstraint);
153         while(!isTransactionReady(transaction)) yield;
154         return getTransaction(transaction);
155     }
156     
157     /**
158     *   Asynchronous way to execute transaction. User can check
159     *   transaction status by calling $(B isTransactionReady) method.
160     *   When $(B isTransactionReady) method returns true, the
161     *   transaction can be finalized by $(B getTransaction) method.
162     * 
163     *   Returns: Specific interface to distinct the query
164     *            among others.
165     *   See_Also: isTransactionReady, getTransaction.
166     *   Throws: ConnTimeoutException
167     */
168     immutable(ITransaction) postTransaction(string[] commands
169         , string[] params = [], uint[] argnums = []
170         , string[string] vars = null, bool[] oneRowConstraint = []) shared
171     {
172         ///TODO: move to contract when issue with contracts are fixed
173         assert(!finalized, "Pool was finalized!");
174         assert(oneRowConstraint.length == 0 || oneRowConstraint.length == commands.length
175             , "oneRowConstraint have to have length equal to commands length!");
176         
177         if(oneRowConstraint.length == 0)
178         {
179             oneRowConstraint = new bool[commands.length];
180             oneRowConstraint[] = false;
181         }
182         
183         /// Workaround for gdc
184         if(vars is null)
185         {
186             string[string] empty;
187             vars = empty;
188         }
189         
190         if(params.length == 0 && argnums.length == 0)
191         {
192             argnums = 0u.repeat.take(commands.length).array;
193         }
194         
195         auto conn = fetchFreeConnection();
196         auto transaction = new immutable Transaction(commands, params, argnums, vars, oneRowConstraint);
197         processingTransactions.insert(cast(shared ITransaction)transaction); 
198         
199         ids.queringCheckerId.send(thisTid, conn, cast(shared)transaction);
200         
201         if(loggingAllTransactions)
202         {
203             logger.logInfo("Transaction is posted:");
204             logger.logInfo(transaction.text);
205         }
206         
207         return transaction;
208     }
209     
210     /**
211     *   Returns true if transaction processing is finished (doesn't
212     *   matter the actual reason, error or transaction object is invalid,
213     *   or successful completion).
214     *
215     *   If the method returns true, then $(B getTransaction) method
216     *   can be called in non-blocking manner.
217     *
218     *   See_Also: postTransaction, getTransaction.
219     */
220     bool isTransactionReady(immutable ITransaction transaction) shared
221     {
222         ///TODO: move to contract when issue with contracts is fixed
223         assert(!finalized, "Pool was finalized!");
224         
225         scope(failure) return true;
226 
227         if(processingTransactions[].find(cast(shared)transaction).empty)
228             return true; 
229             
230         fetchResponds();
231 
232         if(transaction in awaitingResponds) return true;
233         else return false;
234     }
235     
236     /**
237     *   Retrieves SQL result from specified transaction.
238     *   
239     *   If previously called $(B isTransactionReady) returns true,
240     *   then the method is not blocking, else it falls back
241     *   to $(B execTransaction) behavior.
242     *
243     *   See_Also: postTransaction, isTransactionReady
244     *   Throws: UnknownTransactionException, QueryProcessingException
245     */
246     InputRange!(immutable Bson) getTransaction(immutable ITransaction transaction) shared
247     {
248         ///TODO: move to contract when issue with contracts is fixed
249         assert(!finalized, "Pool was finalized!");
250         
251         if(processingTransactions[].find(cast(shared)transaction).empty)
252             throw new UnknownTransactionException();
253                 
254         if(transaction in awaitingResponds) 
255         {
256             processingTransactions.removeOne(cast(shared)transaction);
257             
258             auto tr = cast(Transaction)transaction;
259             assert(tr);
260             
261             auto respond = awaitingResponds[transaction];
262             awaitingResponds.remove(transaction);
263             
264             void logMessages(void delegate(string msg) sink)
265             {
266                 if(respond.msgs.length != 0)
267                 {
268                     sink("Following messages were raised from db:");
269                     foreach(msg; respond.msgs) 
270                     {
271                         if(msg.length > 0 && msg[$-1] == '\n') sink(msg[0 .. $-1]);
272                         else sink(msg);
273                     }
274                 }
275             }
276             
277             if(respond.failed)
278             {
279                 if(respond.onRowConstaintFailed)
280                 {
281                     logger.logError("Transaction failure: ");
282                     logger.logError(text(tr));
283                     auto errMsg = text("Transaction ", respond.constraintFailQueryId, " fired single row constraint!" );
284                     logger.logError(errMsg);
285                     throw new OneRowConstraintException(errMsg);
286                 } 
287                 else
288                 {
289                     logger.logError("Transaction failure:");
290                     logger.logError(text(tr));
291                     logMessages((s) => logger.logError(s));
292                     logger.logError(respond.exception);
293                     
294                     throw new QueryProcessingException(respond.exception);
295                 }
296             }
297             else
298             {
299                 if(respond.msgs.length != 0)
300                 {
301                     logMessages((s) => logger.logInfo(s));
302                     logger.logInfo("For transaction:");
303                     logger.logInfo(text(tr));
304                 }
305                 
306                 return respond.result[].inputRangeObject;
307             }
308         } else
309         {
310             while(!isTransactionReady(transaction)) yield;
311             return getTransaction(transaction);
312         }
313     }
314     
315     
316     private void fetchResponds() shared
317     {
318         receiveTimeout(dur!"msecs"(1),
319             (Tid tid, shared Transaction transaction, Respond respond)
320             {
321                 assert(cast(immutable)transaction !in awaitingResponds);
322                 awaitingResponds[cast(immutable)transaction] = respond;
323             }
324         );
325     }
326     
327     /**
328     *    If connection to a SQL server is down,
329     *    the pool tries to reestablish it every
330     *    time units returned by the method. 
331     */
332     Duration reconnectTime() @property shared
333     {
334         ///TODO: move to contract when issue with contracts is fixed
335         assert(!finalized, "Pool was finalized!");
336         
337         return mReconnectTime;
338     }
339     
340     /**
341     *    If there is no free connection for 
342     *    specified duration while trying to
343     *    initialize SQL query, then the pool
344     *    throws $(B ConnTimeoutException) exception.
345     */
346     Duration freeConnTimeout() @property shared
347     {
348         ///TODO: move to contract when issue with contracts is fixed
349         assert(!finalized, "Pool was finalized!");
350         
351         return mFreeConnTimeout;
352     }
353     
354     /**
355     *   Free connections are checked over the time by
356     *   sending senseless queries. Don't make this time
357     *   too small - huge overhead for network. Don't make
358     *   this time too big - you wouldn't able to detect
359     *   failed connections due server restarting or e.t.c.
360     */
361     Duration aliveCheckTime() @property shared
362     {
363         ///TODO: move to contract when issue with contracts is fixed
364         assert(!finalized, "Pool was finalized!");
365         
366         return mAliveCheckTime;
367     }
368     
369     /**
370     *   Returns current alive connections number.
371     *   Warning: The method displays count of active connections at the moment,
372     *            returned value can become invalid as soon as it returned due
373     *            async nature of the pool.
374     */
375     size_t activeConnections() @property shared
376     {
377         ///TODO: move to contract when issue with contracts is fixed
378         assert(!finalized, "Pool was finalized!");
379         
380         size_t freeCount, queringCount;
381         ids.freeCheckerId.send(thisTid, "length");
382         ids.queringCheckerId.send(thisTid, "length");
383         
384         foreach(i;0..2) 
385             try enforce(receiveTimeout(dur!"seconds"(1),
386                 (Tid sender, size_t answer) 
387                 {
388                     if(sender == ids.freeCheckerId)
389                         freeCount = answer;
390                     else if(sender == ids.queringCheckerId)
391                         queringCount = answer;
392                 }
393             ), "Async pool internal problem! Workers don't respond!");
394             catch (LinkTerminated e)
395             {
396                 logger.logError("Free conn or quering conn worker is dead!"); 
397                 freeCount = 0;
398                 queringCount = 0;
399             }
400 
401         return freeCount + queringCount;
402     }
403     
404     /**
405     *   Returns current frozen connections number.
406     *   Warning: The method displays count of active connections at the moment,
407     *            returned value can become invalid as soon as it returned due
408     *            async nature of the pool.
409     */
410     size_t inactiveConnections() @property shared
411     {
412         ///TODO: move to contract when issue with contracts is fixed
413         assert(!finalized, "Pool was finalized!");
414         
415         size_t closedCount, connectingCount;
416         ids.closedCheckerId.send(thisTid, "length");
417         ids.connectingCheckerId.send(thisTid, "length");
418         
419         foreach(i;0..2)
420             try enforce(receiveTimeout(dur!"seconds"(1),
421                 (Tid sender, size_t answer) 
422                 {
423                     if(sender == ids.closedCheckerId)
424                         closedCount = answer;
425                     else if(sender == ids.connectingCheckerId)
426                         connectingCount = answer;
427                 }
428             ), "Async pool internal problem! Workers don't respond!");
429             catch (LinkTerminated e)
430             {
431                 logger.logError("Closed conn or connecting conn worker is dead!");
432                 closedCount = 0;
433                 connectingCount = 0;
434             }
435 
436         return closedCount + connectingCount;
437     }
438     
439     size_t totalConnections() @property shared
440     {
441         ///TODO: move to contract when issue with contracts is fixed
442         assert(!finalized, "Pool was finalized!");
443         
444         size_t freeCount, queringCount;
445         size_t closedCount, connectingCount;
446         ids.freeCheckerId.send(thisTid, "length");
447         ids.queringCheckerId.send(thisTid, "length");
448         ids.closedCheckerId.send(thisTid, "length");
449         ids.connectingCheckerId.send(thisTid, "length");
450         
451         foreach(i;0..4)
452             try enforce(receiveTimeout(dur!"seconds"(1),
453                 (Tid sender, size_t answer) 
454                 {
455                     if(sender == ids.freeCheckerId)
456                         freeCount = answer;
457                     else if(sender == ids.queringCheckerId)
458                         queringCount = answer;
459                     else if(sender == ids.closedCheckerId)
460                         closedCount = answer;
461                     else if(sender == ids.connectingCheckerId)
462                         connectingCount = answer;
463                 }
464             ), "Async pool internal problem! Workers don't respond!");
465             catch (LinkTerminated e)
466             {
467                 logger.logError("One of workers is dead!");
468                 freeCount = 0;
469                 queringCount = 0;
470                 closedCount = 0;
471                 connectingCount = 0;
472             }
473         return freeCount + queringCount + closedCount + connectingCount;
474     }
475     
476     /**
477     *    Awaits all queries to finish and then closes each connection.
478     *    Calls $(B callback) when connections are closed.
479     */
480     synchronized void finalize()
481     {
482         if(finalized) return;
483         ids.finalize(logger);
484         finalized = true;
485     }
486     
487     /**
488     *   Returns first free connection from the pool.
489     *   Throws: ConnTimeoutException
490     */
491     protected shared(IConnection) fetchFreeConnection() shared
492     {
493         ///TODO: move to contract when issue with contracts is fixed
494         assert(!finalized, "Pool was finalized!");
495                 
496         ids.freeCheckerId.send(thisTid, "get");
497         shared IConnection res;
498         enforceEx!ConnTimeoutException(receiveTimeout(freeConnTimeout,
499                 (Tid sender, shared IConnection conn) 
500                 {
501                     res = conn;
502                 }
503             ));
504         return res;
505     }
506 
507     /**
508     *   Returns date format used in ONE OF sql servers.
509     *   Warning: This method can be trust only the pool conns are connected
510     *            to the same sql server.
511     *   TODO: Make a way to get such configs for particular connection.
512     */
513     DateFormat dateFormat() @property shared
514     {
515         ///TODO: move to contract when issue with contracts is fixed
516         assert(!finalized, "Pool was finalized!");
517         
518         return fetchFreeConnection.dateFormat;
519     }
520     
521     /**
522     *   Returns timestamp format used in ONE OF sql servers.
523     *   Warning: This method can be trust only the pool conns are connected
524     *            to the same sql server.
525     *   TODO: Make a way to get such configs for particular connection.
526     */
527     TimestampFormat timestampFormat() @property shared
528     {
529         ///TODO: move to contract when issue with contracts is fixed
530         assert(!finalized, "Pool was finalized!");
531         
532         return fetchFreeConnection.timestampFormat;
533     }
534     
535     /**
536     *   Returns server time zone used in ONE OF sql servers.
537     *   Warning: This method can be trusted only the pool conns are connected
538     *            to the same sql server.
539     *   TODO: Make a way to get such configs for particular connection.
540     */
541     immutable(TimeZone) timeZone() @property shared
542     {
543         ///TODO: move to contract when issue with contracts is fixed
544         assert(!finalized, "Pool was finalized!");
545         
546         return fetchFreeConnection.timeZone;
547     }
548     
549     /**
550     *   Returns $(B true) if the pool logs all transactions.
551     */
552     bool loggingAllTransactions() shared const
553     {
554         return mLoggingAll;
555     }
556     
557     /**
558     *   Enables/disables logging for all transactions.
559     */
560     void loggingAllTransactions(bool val) shared
561     {
562         mLoggingAll.atomicStore(val);
563     }
564     
565     private
566     {
567        shared ILogger logger;
568        __gshared DList!(shared ITransaction) processingTransactions;
569        Respond[immutable ITransaction] awaitingResponds;
570        IConnectionProvider provider;
571        Duration mReconnectTime;
572        Duration mFreeConnTimeout;
573        Duration mAliveCheckTime;
574        
575        shared ThreadIds ids;
576        bool finalized = false;
577        bool mLoggingAll = false;
578    }
579 }