1 // Written in D programming language
2 /**
3 *   Module describes a real connection to PostgreSQL server.
4 *
5 *   Copyright: © 2014 DSoftOut
6 *   License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 *   Authors: NCrashed <ncrashed@gmail.com>
8 */
9 module pgator.db.pq.connection;
10 
11 import dpq2.connection;
12 import pgator.db.connection;
13 import pgator.db.pq.api;
14 import pgator.util.string;
15 import dlogg.log;
16 import std.algorithm;
17 import std.conv;
18 import std.container;
19 import std.range;
20 import std.datetime;
21 import vibe.data.bson;
22 
23 alias Dpq2ConnectException = ConnException;
24 
25 /**
26 *   PostgreSQL specific connection type. Although it can use
27 *   different data base backend, the class is defined to 
28 *   support only PostgreSQL.
29 */
30 synchronized class PQConnection : IConnection
31 {
32     this(shared ILogger logger, shared IPostgreSQL api)
33     {
34         this.logger = logger;
35         this.api = api;
36     }
37     
38     /**
39     *    Tries to establish connection with a SQL server described
40     *    in $(B connString). 
41     *
42     *    Throws: ConnectException
43     */
44     void connect(string connString)
45     {
46         if(conn !is null) conn.finish;
47         
48         try
49         {
50             conn = api.startConnect(connString);
51             reconnecting = false;
52             lastConnString = connString;
53             
54             initNoticeCallbacks();
55         } catch(Dpq2ConnectException e)
56         {
57             logger.logError(text("Failed to connect to SQL server, reason:", e.msg));
58             throw new ConnectException(server, e.msg);
59         }
60     }
61     
62     /**
63     *   Tries to establish connection with a SQL server described
64     *   in previous call of $(B connect). 
65     *
66     *   Should throw ReconnectException if method cannot get stored
67     *   connection string (the $(B connect) method wasn't called).
68     *
69     *   Throws: ConnectException, ReconnectException
70     */
71     void reconnect()
72     {
73         if(conn is null) throw new ReconnectException(server);
74         
75         try
76         {
77             /// reset cannot reset connection with restarted postgres server
78             /// replaced with plain connect for now
79             /// see issue #57 fo more info
80             //conn.resetStart();
81             //reconnecting = true;
82             
83             conn = api.startConnect(lastConnString);
84             reconnecting = false;
85             
86             initNoticeCallbacks();
87         } catch(Dpq2ConnectException e)
88         {
89             logger.logError(text("Failed to reconnect to SQL server, reason:", e.msg));
90             throw new ConnectException(server, e.msg);
91         }
92     }
93     
94     /**
95     *   Returns current status of connection.
96     */
97     ConnectionStatus pollConnectionStatus() nothrow
98     in
99     {
100         assert(conn !is null, "Connection start wasn't established!");
101     }
102     body
103     {
104         savedException = null;
105         PostgresPollingStatusType val;
106         if(reconnecting) val = conn.resetPoll;
107         else val = conn.poll;
108         
109         switch(val)
110         {
111             case PGRES_POLLING_OK:
112             {
113                 switch(conn.status)
114                 {
115                     case(CONNECTION_OK):
116                     {
117                         return ConnectionStatus.Finished;
118                     }
119                     case(CONNECTION_NEEDED):
120                     {
121                         savedException = cast(shared)(new ConnectException(server, "Connection wasn't tried to be established!"));
122                         return ConnectionStatus.Error;
123                     }
124                     case(CONNECTION_BAD):
125                     {
126                         savedException = cast(shared)(new ConnectException(server, conn.errorMessage));
127                         return ConnectionStatus.Error;
128                     }
129                     default:
130                     {
131                         return ConnectionStatus.Pending;
132                     }
133                 }
134             }
135             case PGRES_POLLING_FAILED:
136             {
137                 savedException = cast(shared)(new ConnectException(server, conn.errorMessage));
138                 return ConnectionStatus.Error;
139             }
140             default:
141             {
142                 return ConnectionStatus.Pending;
143             } 
144         }
145     }
146     
147     /**
148     *   If connection process is ended with error state, then
149     *   throws ConnectException, else do nothing.
150     *
151     *   Throws: ConnectException
152     */    
153     void pollConnectionException()
154     {
155         if(savedException !is null) throw cast()savedException;
156     }
157     
158     /**
159     *   Initializes querying process in non-blocking manner.
160     *   Throws: QueryException
161     */
162     void postQuery(string com, string[] params = [])
163     in
164     {
165         assert(conn !is null, "Connection start wasn't established!");
166     }
167     body
168     {
169         try conn.sendQueryParams(com, params);
170         catch (ConnException e)
171         {
172             throw new QueryException(e.msg);
173         }
174     }
175     
176     /**
177     *   Returns quering status of connection.
178     */
179     QueringStatus pollQueringStatus()
180     in
181     {
182         assert(conn !is null, "Connection start wasn't established!");
183     }
184     body
185     {
186         savedQueryException = null;
187         try conn.consumeInput();
188         catch (Exception e) // PGQueryException
189         {
190             savedQueryException = cast(shared)(new QueryException(e.msg));
191             while(conn.getResult !is null) {}
192             return QueringStatus.Error;
193         }
194         
195         if(conn.isBusy) return QueringStatus.Pending;
196         else return QueringStatus.Finished;
197     }
198     
199     /**
200     *   Sending senseless query to the server to check if the connection is
201     *   actually alive (e.g. nothing can detect fail after postgresql restart but
202     *   query).
203     */    
204     bool testAlive()
205     {
206         try
207         {
208             execQuery("SELECT 'pgator_ping';");
209         }
210         catch(Dpq2ConnectException e)
211         {
212             return false;
213         }
214 
215         return true;
216     }
217     
218     /**
219     *   If quering process is ended with error state, then
220     *   throws QueryException, else do nothing.
221     *
222     *   Throws: QueryException
223     */
224     void pollQueryException()
225     {
226         if (savedQueryException !is null) throw cast()savedQueryException;
227     }
228     
229     /**
230     *   Returns query result, if $(B pollQueringStatus) shows that
231     *   query is processed without errors, else blocks the caller
232     *   until the answer is arrived.
233     */
234     InputRange!(shared IPGresult) getQueryResult()
235     in
236     {
237         assert(conn !is null, "Connection start wasn't established!");
238     }
239     body
240     {
241         if(conn.isBusy)
242         {
243             while(pollQueringStatus != QueringStatus.Finished) pollQueryException();
244         }
245         
246         auto builder = appender!(IPGresult[]);
247         shared IPGresult res = conn.getResult;
248         while(res !is null)
249         {
250             builder.put(cast()res);
251             res = conn.getResult;
252         }
253         
254         return (cast(shared(IPGresult)[])builder.data[]).inputRangeObject;
255     }
256     
257     /**
258     *    Closes connection to the SQL server instantly.    
259     *    
260     *    Also should interrupt connections in progress.
261     *
262     *    Calls $(B callback) when closed.
263     */
264     void disconnect() nothrow
265     in
266     {
267         assert(conn !is null, "Connection start wasn't established!");
268     }
269     body
270     {
271         scope(failure) {}
272         conn.finish;
273         conn = null;
274         savedException = null;
275         savedQueryException = null;
276     }
277     
278     /**
279     *   Returns SQL server name (domain) the connection is desired to connect to.
280     *   If connection isn't ever established (or tried) the method returns empty string.
281     */
282     string server() nothrow const @property
283     {
284         scope(failure) return "";
285         
286         return (cast(shared) conn).server();
287     }
288     
289     /**
290     *   Returns current date output format and ambitious values converting behavior.
291     *   Throws: QueryException
292     */
293     DateFormat dateFormat() @property
294     {
295         auto result = execQuery("SHOW DateStyle;");
296         
297         if(result.empty) throw new QueryException("DateFormat query expected result!");
298         
299         auto res = result.front.asColumnBson(this)["DateStyle"].deserializeBson!(string[]);
300         assert(res.length == 1);
301         auto vals = res[0].split(", ");
302         assert(vals.length == 2);
303         return DateFormat(vals[0], vals[1]);
304     }
305     
306     /**
307     *   Returns actual timestamp representation format used in server.
308     *
309     *   Note: This property tells particular HAVE_INT64_TIMESTAMP version flag that is used
310     *         by remote server.
311     */
312     TimestampFormat timestampFormat() @property
313     {
314         auto res = conn.parameterStatus("integer_datetimes");
315         if(res == "on")
316         {
317             return TimestampFormat.Int64;
318         } else
319         {
320             return TimestampFormat.Float8;
321         }
322     }
323     
324     /**
325     *   Returns server time zone. This value is important to handle 
326     *   time stamps with time zone specified as libpq doesn't send
327     *   the information with time stamp.
328     *
329     *   Note: Will fallback to UTC value if server protocol doesn't support acquiring of
330     *         'TimeZone' parameter or server returns invalid time zone name.
331     */
332     immutable(TimeZone) timeZone() @property
333     {
334             auto res = conn.parameterStatus("TimeZone");
335 
336             return TimeZone.getTimeZone(res);
337     }
338     
339     /**
340     *   Returns true if the connection stores info/warning/error messages.
341     */
342     bool hasRaisedMsgs()
343     {
344         return mRaisedMsgs.length != 0;
345     }
346     
347     /**
348     *   Returns all saved info/warning/error messages from the connection.
349     */
350     InputRange!string raisedMsgs()
351     {
352         return ((cast(string[])mRaisedMsgs)[]).inputRangeObject;
353     }
354     
355     /**
356     *   Cleaning inner buffer for info/warning/error messages.
357     */
358     void clearRaisedMsgs()
359     {
360         mRaisedMsgs = [];
361     }
362     
363     private
364     {
365         bool reconnecting = false;
366         string lastConnString;
367         shared ILogger logger;
368         shared IPostgreSQL api;
369         shared IPGconn conn;
370         shared ConnectException savedException;
371         shared QueryException savedQueryException;
372         
373         shared string[] mRaisedMsgs;
374         
375         extern(C) static void noticeProcessor(void* arg, char* message) nothrow @nogc
376         {
377             auto pqConn = cast(shared PQConnection)arg;
378             if(!pqConn) return;
379             //FIXME: need logging here
380             
381             //~ try pqConn.addRaisedMsg(fromStringz(message).idup);
382             //~ catch(Throwable e)
383             //~ {
384                 //~ pqConn.logger.logError(text("Failed to add raised msg at libpq connection! Reason: ", e.msg));
385             //~ }
386         }
387         
388         void initNoticeCallbacks()
389         {
390             assert(conn);
391             conn.setNoticeProcessor(&noticeProcessor, cast(void*)this);
392         }
393         
394         void addRaisedMsg(string msg)
395         {
396             mRaisedMsgs ~= msg;
397         }
398     }
399 }
400 
401 synchronized class PQConnProvider : IConnectionProvider
402 {
403     this(shared ILogger logger, shared IPostgreSQL api)
404     {
405         this.logger = logger;
406         this.api = api;
407     }
408     
409     shared(IConnection) allocate()
410     {
411         return new shared PQConnection(logger, api);
412     }
413     
414     private shared ILogger logger;
415     private shared IPostgreSQL api;
416 }