1 // Written in D programming language
2 /**
3 *    Module describes connection pool to data bases. Pool handles
4 *    several connections to one or more sql servers. If connection
5 *    is lost, pool tries to reconnect over $(B reconnectTime) duration.
6 *    
7 *    Copyright: © 2014 DSoftOut
8 *    License: Subject to the terms of the MIT license, as written in the included LICENSE file.
9 *    Authors: NCrashed <ncrashed@gmail.com>
10 */
11 module pgator.db.pool;
12 
13 import pgator.db.connection;
14 import pgator.db.pq.api;
15 import std.algorithm;
16 import std.datetime;
17 import std.range;
18 import core.time;
19 import vibe.data.bson;
20 
21 /**
22 *    The exception is thrown when there is no any free connection
23 *    for $(B freeConnTimeout) duration while trying to lock one.
24 */
25 class ConnTimeoutException : Exception
26 {
27     @safe pure nothrow this(string file = __FILE__, size_t line = __LINE__)
28     {
29         super("There is no any free connection to SQL servers!", file, line); 
30     }
31 }
32 
33 /**
34 *   The exception is thrown when invalid transaction interface is passed to
35 *   $(B isTransactionReady) and $(B getTransaction) methods.
36 */
37 class UnknownTransactionException : Exception
38 {
39     @safe pure nothrow this(string file = __FILE__, size_t line = __LINE__)
40     {
41         super("There is no such transaction that is processing now!", file, line); 
42     }
43 }
44 
45 /**
46 *   The exception is thrown when something bad has happen while 
47 *   query passing to server or loading from server. This exception
48 *   has no bearing on the SQL errors.
49 */
50 class QueryProcessingException : Exception
51 {
52     @safe pure nothrow this(string msg, string file = __FILE__, size_t line = __LINE__)
53     {
54         super(msg, file, line); 
55     }
56 }
57 
58 /**
59 *   The exception is thrown when a sql query was marked as one row respond and
60 *   the query returns a multiple rows. Transaction of the query is rollbacked.
61 */
62 class OneRowConstraintException : QueryProcessingException
63 {
64     @safe pure nothrow this(string msg, string file = __FILE__, size_t line = __LINE__)
65     {
66         super(msg, file, line); 
67     }
68 }
69 
70 /**
71 *    Pool handles several connections to one or more SQL servers. If 
72 *    connection is lost, pool tries to reconnect over $(B reconnectTime) 
73 *    duration.
74 *
75 *    Example:
76 *    --------
77 *    scope(exit) pool.finalize();
78 *    std.stdio.writeln(pool.timeZone); // know server time zone
79 *    
80 *    import pgator.db.pq.types.time; // for PGTimeStamp
81 *
82 *    // blocking quering
83 *    auto time = pool.execTransaction(["SELECT TIMESTAMP 'epoch' as field;"]).deserializeBson!PGTimeStamp;
84 *    assert(time == SysTime.fromSimpleString("1970-Jan-01 00:00:00Z"));
85 *
86 *    // or can use asynchronous alternative
87 *    auto transaction = pool.postTransaction(["SELECT TIMESTAMP 'epoch' as field;"]);
88 *    while(!isTransactionReady(transaction))
89 *    {
90 *       // do something
91 *    }
92 *    time = getTransaction(transaction).deserializeBson!PGTimeStamp;
93 *    assert(time == SysTime.fromSimpleString("1970-Jan-01 00:00:00Z"));
94 *    --------
95 */
96 interface IConnectionPool
97 {
98     /**
99     *    Adds connection string to a SQL server with
100     *    maximum connections count.
101     *
102     *    The pool will try to reconnect to the sql 
103     *    server every $(B reconnectTime) is connection
104     *    is dropped (or is down initially).
105     */
106     void addServer(string connString, size_t connNum) shared;
107     
108     /**
109     *   Performs several SQL $(B commands) on single connection
110     *   wrapped in a transaction (BEGIN/COMMIT in PostgreSQL).
111     *   Each command should use '$n' notation to refer $(B params)
112     *   values. Before any command occurs in transaction the
113     *   local SQL variables is set from $(B vars). 
114     *
115     *   $(B argnums) array holds information about which parameter
116     *   from $(B params) should be passed to a query in $(B commands).
117     *   If there are no params, the $(B argnums) can be empty.
118     *
119     *   $(B oneRowConstraint) store info which query in $(B commands) have
120     *   to have one row respond. If some query with the constraint switched on
121     *   returns 0 or greater than 1 - transaction is rollbacked and OneRowConstraintException
122     *   is thrown.
123     *
124     *   Example:
125     *   -----------
126     *   pool.execTransaction(["com1", "com2"],
127     *       ["par1", "par2", "par3"], [1, 2]);
128     *   // com1 is fed with par1
129     *   // com2 is fed with par2 and par3
130     *   -----------
131     *
132     *   Throws: ConnTimeoutException, QueryProcessingException, OneRowConstraintException
133     */
134     InputRange!(immutable Bson) execTransaction(string[] commands
135         , string[] params = [], uint[] argnums = []
136         , string[string] vars = null, bool[] oneRowConstraint = []) shared
137     in
138     {
139         assert(commands && params && argnums, "null reference");
140         assert(argnums.reduce!"a+b" == params.length, "length of params is not equal argument count summ!");
141     }
142     
143     /**
144     *   Asynchronous way to execute transaction. User can check
145     *   transaction status by calling $(B isTransactionReady) method.
146     *   When $(B isTransactionReady) method returns true, the
147     *   transaction can be finalized by $(B getTransaction) method.
148     * 
149     *   Returns: Specific interface to distinct the query
150     *            among others.
151     *
152     *   $(B argnums) array holds information about which parameter
153     *   from $(B params) should be passed to a query in $(B commands).
154     *   If there are no params, the $(B argnums) can be empty.
155     *
156     *   $(B oneRowConstraint) store info which query in $(B commands) have
157     *   to have one row respond. If some query with the constraint switched on
158     *   returns 0 or greater than 1 - transaction is rollbacked and OneRowConstraintException
159     *   is thrown.
160     *
161     *   Example:
162     *   -----------
163     *   pool.postTransaction(["com1", "com2"],
164     *       ["par1", "par2", "par3"], [1, 2]);
165     *   // com1 is fed with par1
166     *   // com2 is fed with par2 and par3
167     *   -----------
168     *
169     *   See_Also: isTransactionReady, getTransaction.
170     *   Throws: ConnTimeoutException
171     */
172     immutable(ITransaction) postTransaction(string[] commands
173         , string[] params = [], uint[] argnums = []
174         , string[string] vars = null, bool[] oneRowConstraint = []) shared
175     in
176     {
177         assert(commands && params && argnums, "null reference");
178         assert(argnums.reduce!"a+b" == params.length, "length of params is not equal argument count summ!");
179     }
180     
181     /**
182     *   Returns true if transaction processing is finished (doesn't
183     *   matter the actual reason, error or transaction object is invalid,
184     *   or successful completion).
185     *
186     *   If the method returns true, then $(B getTransaction) method
187     *   can be called in non-blocking manner.
188     *
189     *   See_Also: postTransaction, getTransaction.
190     */
191     bool isTransactionReady(immutable ITransaction transaction) shared;
192     
193     /**
194     *   Retrieves SQL result from specified transaction.
195     *   
196     *   If previously called $(B isTransactionReady) returns true,
197     *   then the method is not blocking, else it falls back
198     *   to $(B execTransaction) behavior.
199     *
200     *   See_Also: postTransaction, isTransactionReady
201     *   Throws: UnknownQueryException, QueryProcessingException, OneRowConstraintException
202     */
203     InputRange!(immutable Bson) getTransaction(immutable ITransaction transaction) shared;
204     
205     /**
206     *    If connection to a SQL server is down,
207     *    the pool tries to reestablish it every
208     *    time units returned by the method. 
209     */
210     Duration reconnectTime() @property shared;
211     
212     /**
213     *    If there is no free connection for 
214     *    specified duration while trying to
215     *    initialize SQL query, then the pool
216     *    throws $(B ConnTimeoutException) exception.
217     */
218     Duration freeConnTimeout() @property shared;
219     
220     /**
221     *    Returns current alive connections number.
222     */
223     size_t activeConnections() @property shared;
224 
225     /**
226     *    Returns current frozen connections number.
227     */
228     size_t inactiveConnections() @property shared;
229         
230     /**
231     *    Awaits all queries to finish and then closes each connection.
232     */
233     synchronized void finalize();
234     
235     /**
236     *   Returns date format used in ONE OF sql servers.
237     *   Warning: This method can be trusted only the pool conns are connected
238     *            to the same sql server.
239     *   TODO: Make a way to get such configs for particular connection.
240     */
241     DateFormat dateFormat() @property shared;
242     
243     /**
244     *   Returns timestamp format used in ONE OF sql servers.
245     *   Warning: This method can be trusted only the pool conns are connected
246     *            to the same sql server.
247     *   TODO: Make a way to get such configs for particular connection.
248     */
249     TimestampFormat timestampFormat() @property shared;
250     
251     /**
252     *   Returns server time zone used in ONE OF sql servers.
253     *   Warning: This method can be trusted only the pool conns are connected
254     *            to the same sql server.
255     *   TODO: Make a way to get such configs for particular connection.
256     */
257     immutable(TimeZone) timeZone() @property shared;
258     
259     /**
260     *   Returns first free connection from the pool.
261     *   Throws: ConnTimeoutException
262     */
263     protected shared(IConnection) fetchFreeConnection() shared;
264     
265     /**
266     *   Transaction that should be executed by one of connections
267     *   on remote SQL server. Client code shouldn't know anything
268     *   about the interface but only store it as unique key to acquire
269     *   finished transaction. 
270     */
271     protected interface ITransaction {}
272     
273     /**
274     *   Returns $(B true) if the pool logs all transactions.
275     */
276     bool loggingAllTransactions() shared const;
277     
278     /**
279     *   Enables/disables logging for all transactions.
280     */
281     void loggingAllTransactions(bool val) shared;
282 }