1 // Written in D programming language
2 /**
3 *   Module describes a real connection to PostgreSQL server.
4 *
5 *   Copyright: © 2014 DSoftOut
6 *   License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 *   Authors: NCrashed <ncrashed@gmail.com>
8 */
9 module pgator.db.pq.connection;
10 
11 import derelict.pq.pq;
12 import pgator.db.connection;
13 import pgator.db.pq.api;
14 import pgator.util..string;
15 import dlogg.log;
16 import std.algorithm;
17 import std.conv;
18 import std.container;
19 import std.range;
20 import std.datetime;
21 import vibe.data.bson;
22 
23 /**
24 *   PostgreSQL specific connection type. Although it can use
25 *   different data base backend, the class is defined to 
26 *   support only PostgreSQL.
27 */
28 synchronized class PQConnection : IConnection
29 {
30     this(shared ILogger logger, shared IPostgreSQL api)
31     {
32         this.logger = logger;
33         this.api = api;
34     }
35     
36     /**
37     *    Tries to establish connection with a SQL server described
38     *    in $(B connString). 
39     *
40     *    Throws: ConnectException
41     */
42     void connect(string connString)
43     {
44         if(conn !is null) conn.finish;
45         
46         try
47         {
48             conn = api.startConnect(connString);
49             reconnecting = false;
50             lastConnString = connString;
51             
52             initNoticeCallbacks();
53         } catch(PGException e)
54         {
55             logger.logError(text("Failed to connect to SQL server, reason:", e.msg));
56             throw new ConnectException(server, e.msg);
57         }
58     }
59     
60     /**
61     *   Tries to establish connection with a SQL server described
62     *   in previous call of $(B connect). 
63     *
64     *   Should throw ReconnectException if method cannot get stored
65     *   connection string (the $(B connect) method wasn't called).
66     *
67     *   Throws: ConnectException, ReconnectException
68     */
69     void reconnect()
70     {
71         if(conn is null) throw new ReconnectException(server);
72         
73         try
74         {
75             /// reset cannot reset connection with restarted postgres server
76             /// replaced with plain connect for now
77             /// see issue #57 fo more info
78             //conn.resetStart();
79             //reconnecting = true;
80             
81             conn = api.startConnect(lastConnString);
82             reconnecting = false;
83             
84             initNoticeCallbacks();
85         } catch(PGReconnectException e)
86         {
87             logger.logError(text("Failed to reconnect to SQL server, reason:", e.msg));
88             throw new ConnectException(server, e.msg);
89         }
90     }
91     
92     /**
93     *   Returns current status of connection.
94     */
95     ConnectionStatus pollConnectionStatus() nothrow
96     in
97     {
98         assert(conn !is null, "Connection start wasn't established!");
99     }
100     body
101     {
102         savedException = null;
103         PostgresPollingStatusType val;
104         if(reconnecting) val = conn.resetPoll;
105         else val = conn.poll;
106         
107         switch(val)
108         {
109             case PostgresPollingStatusType.PGRES_POLLING_OK:
110             {
111                 switch(conn.status)
112                 {
113                     case(ConnStatusType.CONNECTION_OK):
114                     {
115                         return ConnectionStatus.Finished;
116                     }
117                     case(ConnStatusType.CONNECTION_NEEDED):
118                     {
119                         savedException = cast(shared)(new ConnectException(server, "Connection wasn't tried to be established!"));
120                         return ConnectionStatus.Error;
121                     }
122                     case(ConnStatusType.CONNECTION_BAD):
123                     {
124                         savedException = cast(shared)(new ConnectException(server, conn.errorMessage));
125                         return ConnectionStatus.Error;
126                     }
127                     default:
128                     {
129                         return ConnectionStatus.Pending;
130                     }
131                 }
132             }
133             case PostgresPollingStatusType.PGRES_POLLING_FAILED:
134             {
135                 savedException = cast(shared)(new ConnectException(server, conn.errorMessage));
136                 return ConnectionStatus.Error;
137             }
138             default:
139             {
140                 return ConnectionStatus.Pending;
141             } 
142         }
143     }
144     
145     /**
146     *   If connection process is ended with error state, then
147     *   throws ConnectException, else do nothing.
148     *
149     *   Throws: ConnectException
150     */    
151     void pollConnectionException()
152     {
153         if(savedException !is null) throw cast()savedException;
154     }
155     
156     /**
157     *   Initializes querying process in non-blocking manner.
158     *   Throws: QueryException
159     */
160     void postQuery(string com, string[] params = [])
161     in
162     {
163         assert(conn !is null, "Connection start wasn't established!");
164     }
165     body
166     {
167         try conn.sendQueryParams(com, params);
168         catch (PGQueryException e)
169         {
170             throw new QueryException(e.msg);
171         }
172     }
173     
174     /**
175     *   Returns quering status of connection.
176     */
177     QueringStatus pollQueringStatus() nothrow
178     in
179     {
180         assert(conn !is null, "Connection start wasn't established!");
181     }
182     body
183     {
184         savedQueryException = null;
185         try conn.consumeInput();
186         catch (Exception e) // PGQueryException
187         {
188             savedQueryException = cast(shared)(new QueryException(e.msg));
189             while(conn.getResult !is null) {}
190             return QueringStatus.Error;
191         }
192         
193         if(conn.isBusy) return QueringStatus.Pending;
194         else return QueringStatus.Finished;
195     }
196     
197     /**
198     *   Sending senseless query to the server to check if the connection is
199     *   actually alive (e.g. nothing can detect fail after postgresql restart but
200     *   query).
201     */    
202     bool testAlive() nothrow
203     {
204         try
205         {
206             auto reses = execQuery("SELECT 'pgator_ping';");
207             foreach(res; reses)
208             {
209                 res.clear();
210             }
211         } catch(Exception e)
212         {
213             return false;
214         }
215         return true;
216     }
217     
218     /**
219     *   If quering process is ended with error state, then
220     *   throws QueryException, else do nothing.
221     *
222     *   Throws: QueryException
223     */
224     void pollQueryException()
225     {
226         if (savedQueryException !is null) throw cast()savedQueryException;
227     }
228     
229     /**
230     *   Returns query result, if $(B pollQueringStatus) shows that
231     *   query is processed without errors, else blocks the caller
232     *   until the answer is arrived.
233     */
234     InputRange!(shared IPGresult) getQueryResult()
235     in
236     {
237         assert(conn !is null, "Connection start wasn't established!");
238     }
239     body
240     {
241         if(conn.isBusy)
242         {
243             while(pollQueringStatus != QueringStatus.Finished) pollQueryException();
244         }
245         
246         auto builder = appender!(IPGresult[]);
247         shared IPGresult res = conn.getResult;
248         while(res !is null)
249         {
250             builder.put(cast()res);
251             res = conn.getResult;
252         }
253         
254         return (cast(shared(IPGresult)[])builder.data[]).inputRangeObject;
255     }
256     
257     /**
258     *    Closes connection to the SQL server instantly.    
259     *    
260     *    Also should interrupt connections in progress.
261     *
262     *    Calls $(B callback) when closed.
263     */
264     void disconnect() nothrow
265     in
266     {
267         assert(conn !is null, "Connection start wasn't established!");
268     }
269     body
270     {
271         scope(failure) {}
272         conn.finish;
273         conn = null;
274         savedException = null;
275         savedQueryException = null;
276     }
277     
278     /**
279     *   Returns SQL server name (domain) the connection is desired to connect to.
280     *   If connection isn't ever established (or tried) the method returns empty string.
281     */
282     string server() nothrow const @property
283     {
284         scope(failure) return "";
285         
286         return conn.host;
287     }
288     
289     /**
290     *   Returns current date output format and ambitious values converting behavior.
291     *   Throws: QueryException
292     */
293     DateFormat dateFormat() @property
294     {
295         auto result = execQuery("SHOW DateStyle;");
296         
297         if(result.empty) throw new QueryException("DateFormat query expected result!");
298         
299         auto res = result.front.asColumnBson(this)["DateStyle"].deserializeBson!(string[]);
300         assert(res.length == 1);
301         auto vals = res[0].split(", ");
302         assert(vals.length == 2);
303         return DateFormat(vals[0], vals[1]);
304     }
305     
306     /**
307     *   Returns actual timestamp representation format used in server.
308     *
309     *   Note: This property tells particular HAVE_INT64_TIMESTAMP version flag that is used
310     *         by remote server.
311     */
312     TimestampFormat timestampFormat() @property
313     {
314         try
315         {
316             auto res = conn.parameterStatus("integer_datetimes");
317             if(res == "on")
318             {
319                 return TimestampFormat.Int64;
320             } else
321             {
322                 return TimestampFormat.Float8;
323             }
324         } catch(PGParamNotExistException e)
325         {
326             logger.logInfo(text("Server doesn't support '", e.param,"' parameter! Assume HAVE_INT64_TIMESTAMP."));
327             return TimestampFormat.Int64; 
328         }
329     }
330     
331     /**
332     *   Returns server time zone. This value is important to handle 
333     *   time stamps with time zone specified as libpq doesn't send
334     *   the information with time stamp.
335     *
336     *   Note: Will fallback to UTC value if server protocol doesn't support acquiring of
337     *         'TimeZone' parameter or server returns invalid time zone name.
338     */
339     immutable(TimeZone) timeZone() @property
340     {
341         try
342         {
343             auto res = conn.parameterStatus("TimeZone");
344 
345             try
346             {
347                 return TimeZone.getTimeZone(res);
348             } catch(DateTimeException e)
349             {
350                 logger.logInfo(text("Cannot parse time zone value '", res, "'. Assume UTC."));
351                 return UTC();
352             }
353 
354         } catch(PGParamNotExistException e)
355         {
356             logger.logInfo(text("Server doesn't support '", e.param,"' parameter! Assume UTC."));
357             return UTC(); 
358         }
359     }
360     
361     /**
362     *   Returns true if the connection stores info/warning/error messages.
363     */
364     bool hasRaisedMsgs()
365     {
366         return mRaisedMsgs.length != 0;
367     }
368     
369     /**
370     *   Returns all saved info/warning/error messages from the connection.
371     */
372     InputRange!string raisedMsgs()
373     {
374         return ((cast(string[])mRaisedMsgs)[]).inputRangeObject;
375     }
376     
377     /**
378     *   Cleaning inner buffer for info/warning/error messages.
379     */
380     void clearRaisedMsgs()
381     {
382         mRaisedMsgs = [];
383     }
384     
385     private
386     {
387         bool reconnecting = false;
388         string lastConnString;
389         shared ILogger logger;
390         shared IPostgreSQL api;
391         shared IPGconn conn;
392         shared ConnectException savedException;
393         shared QueryException savedQueryException;
394         
395         shared string[] mRaisedMsgs;
396         
397         extern(C) static void noticeProcessor(void* arg, char* message) nothrow
398         {
399             auto pqConn = cast(shared PQConnection)arg;
400             if(!pqConn) return;
401             
402             try pqConn.addRaisedMsg(fromStringz(message).idup);
403             catch(Throwable e)
404             {
405                 pqConn.logger.logError(text("Failed to add raised msg at libpq connection! Reason: ", e.msg));
406             }
407         }
408         
409         void initNoticeCallbacks()
410         {
411             assert(conn);
412             conn.setNoticeProcessor(&noticeProcessor, cast(void*)this);
413         }
414         
415         void addRaisedMsg(string msg)
416         {
417             mRaisedMsgs ~= msg;
418         }
419     }
420 }
421 
422 synchronized class PQConnProvider : IConnectionProvider
423 {
424     this(shared ILogger logger, shared IPostgreSQL api)
425     {
426         this.logger = logger;
427         this.api = api;
428     }
429     
430     shared(IConnection) allocate()
431     {
432         return new shared PQConnection(logger, api);
433     }
434     
435     private shared ILogger logger;
436     private shared IPostgreSQL api;
437 }