1 // Written in D programming language
2 /**
3 * Module describes a real connection to PostgreSQL server.
4 *
5 * Copyright: © 2014 DSoftOut
6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 * Authors: NCrashed <ncrashed@gmail.com>
8 */
9 module pgator.db.pq.connection;
10
11 import derelict.pq.pq;
12 import pgator.db.connection;
13 import pgator.db.pq.api;
14 import pgator.util..string;
15 import dlogg.log;
16 import std.algorithm;
17 import std.conv;
18 import std.container;
19 import std.range;
20 import std.datetime;
21 import vibe.data.bson;
22
23 /**
24 * PostgreSQL specific connection type. Although it can use
25 * different data base backend, the class is defined to
26 * support only PostgreSQL.
27 */
28 synchronized class PQConnection : IConnection
29 {
30 this(shared ILogger logger, shared IPostgreSQL api)
31 {
32 this.logger = logger;
33 this.api = api;
34 }
35
36 /**
37 * Tries to establish connection with a SQL server described
38 * in $(B connString).
39 *
40 * Throws: ConnectException
41 */
42 void connect(string connString)
43 {
44 if(conn !is null) conn.finish;
45
46 try
47 {
48 conn = api.startConnect(connString);
49 reconnecting = false;
50 lastConnString = connString;
51
52 initNoticeCallbacks();
53 } catch(PGException e)
54 {
55 logger.logError(text("Failed to connect to SQL server, reason:", e.msg));
56 throw new ConnectException(server, e.msg);
57 }
58 }
59
60 /**
61 * Tries to establish connection with a SQL server described
62 * in previous call of $(B connect).
63 *
64 * Should throw ReconnectException if method cannot get stored
65 * connection string (the $(B connect) method wasn't called).
66 *
67 * Throws: ConnectException, ReconnectException
68 */
69 void reconnect()
70 {
71 if(conn is null) throw new ReconnectException(server);
72
73 try
74 {
75 /// reset cannot reset connection with restarted postgres server
76 /// replaced with plain connect for now
77 /// see issue #57 fo more info
78 //conn.resetStart();
79 //reconnecting = true;
80
81 conn = api.startConnect(lastConnString);
82 reconnecting = false;
83
84 initNoticeCallbacks();
85 } catch(PGReconnectException e)
86 {
87 logger.logError(text("Failed to reconnect to SQL server, reason:", e.msg));
88 throw new ConnectException(server, e.msg);
89 }
90 }
91
92 /**
93 * Returns current status of connection.
94 */
95 ConnectionStatus pollConnectionStatus() nothrow
96 in
97 {
98 assert(conn !is null, "Connection start wasn't established!");
99 }
100 body
101 {
102 savedException = null;
103 PostgresPollingStatusType val;
104 if(reconnecting) val = conn.resetPoll;
105 else val = conn.poll;
106
107 switch(val)
108 {
109 case PostgresPollingStatusType.PGRES_POLLING_OK:
110 {
111 switch(conn.status)
112 {
113 case(ConnStatusType.CONNECTION_OK):
114 {
115 return ConnectionStatus.Finished;
116 }
117 case(ConnStatusType.CONNECTION_NEEDED):
118 {
119 savedException = cast(shared)(new ConnectException(server, "Connection wasn't tried to be established!"));
120 return ConnectionStatus.Error;
121 }
122 case(ConnStatusType.CONNECTION_BAD):
123 {
124 savedException = cast(shared)(new ConnectException(server, conn.errorMessage));
125 return ConnectionStatus.Error;
126 }
127 default:
128 {
129 return ConnectionStatus.Pending;
130 }
131 }
132 }
133 case PostgresPollingStatusType.PGRES_POLLING_FAILED:
134 {
135 savedException = cast(shared)(new ConnectException(server, conn.errorMessage));
136 return ConnectionStatus.Error;
137 }
138 default:
139 {
140 return ConnectionStatus.Pending;
141 }
142 }
143 }
144
145 /**
146 * If connection process is ended with error state, then
147 * throws ConnectException, else do nothing.
148 *
149 * Throws: ConnectException
150 */
151 void pollConnectionException()
152 {
153 if(savedException !is null) throw cast()savedException;
154 }
155
156 /**
157 * Initializes querying process in non-blocking manner.
158 * Throws: QueryException
159 */
160 void postQuery(string com, string[] params = [])
161 in
162 {
163 assert(conn !is null, "Connection start wasn't established!");
164 }
165 body
166 {
167 try conn.sendQueryParams(com, params);
168 catch (PGQueryException e)
169 {
170 throw new QueryException(e.msg);
171 }
172 }
173
174 /**
175 * Returns quering status of connection.
176 */
177 QueringStatus pollQueringStatus() nothrow
178 in
179 {
180 assert(conn !is null, "Connection start wasn't established!");
181 }
182 body
183 {
184 savedQueryException = null;
185 try conn.consumeInput();
186 catch (Exception e) // PGQueryException
187 {
188 savedQueryException = cast(shared)(new QueryException(e.msg));
189 while(conn.getResult !is null) {}
190 return QueringStatus.Error;
191 }
192
193 if(conn.isBusy) return QueringStatus.Pending;
194 else return QueringStatus.Finished;
195 }
196
197 /**
198 * Sending senseless query to the server to check if the connection is
199 * actually alive (e.g. nothing can detect fail after postgresql restart but
200 * query).
201 */
202 bool testAlive() nothrow
203 {
204 try
205 {
206 auto reses = execQuery("SELECT 'pgator_ping';");
207 foreach(res; reses)
208 {
209 res.clear();
210 }
211 } catch(Exception e)
212 {
213 return false;
214 }
215 return true;
216 }
217
218 /**
219 * If quering process is ended with error state, then
220 * throws QueryException, else do nothing.
221 *
222 * Throws: QueryException
223 */
224 void pollQueryException()
225 {
226 if (savedQueryException !is null) throw cast()savedQueryException;
227 }
228
229 /**
230 * Returns query result, if $(B pollQueringStatus) shows that
231 * query is processed without errors, else blocks the caller
232 * until the answer is arrived.
233 */
234 InputRange!(shared IPGresult) getQueryResult()
235 in
236 {
237 assert(conn !is null, "Connection start wasn't established!");
238 }
239 body
240 {
241 if(conn.isBusy)
242 {
243 while(pollQueringStatus != QueringStatus.Finished) pollQueryException();
244 }
245
246 auto builder = appender!(IPGresult[]);
247 shared IPGresult res = conn.getResult;
248 while(res !is null)
249 {
250 builder.put(cast()res);
251 res = conn.getResult;
252 }
253
254 return (cast(shared(IPGresult)[])builder.data[]).inputRangeObject;
255 }
256
257 /**
258 * Closes connection to the SQL server instantly.
259 *
260 * Also should interrupt connections in progress.
261 *
262 * Calls $(B callback) when closed.
263 */
264 void disconnect() nothrow
265 in
266 {
267 assert(conn !is null, "Connection start wasn't established!");
268 }
269 body
270 {
271 scope(failure) {}
272 conn.finish;
273 conn = null;
274 savedException = null;
275 savedQueryException = null;
276 }
277
278 /**
279 * Returns SQL server name (domain) the connection is desired to connect to.
280 * If connection isn't ever established (or tried) the method returns empty string.
281 */
282 string server() nothrow const @property
283 {
284 scope(failure) return "";
285
286 return conn.host;
287 }
288
289 /**
290 * Returns current date output format and ambitious values converting behavior.
291 * Throws: QueryException
292 */
293 DateFormat dateFormat() @property
294 {
295 auto result = execQuery("SHOW DateStyle;");
296
297 if(result.empty) throw new QueryException("DateFormat query expected result!");
298
299 auto res = result.front.asColumnBson(this)["DateStyle"].deserializeBson!(string[]);
300 assert(res.length == 1);
301 auto vals = res[0].split(", ");
302 assert(vals.length == 2);
303 return DateFormat(vals[0], vals[1]);
304 }
305
306 /**
307 * Returns actual timestamp representation format used in server.
308 *
309 * Note: This property tells particular HAVE_INT64_TIMESTAMP version flag that is used
310 * by remote server.
311 */
312 TimestampFormat timestampFormat() @property
313 {
314 try
315 {
316 auto res = conn.parameterStatus("integer_datetimes");
317 if(res == "on")
318 {
319 return TimestampFormat.Int64;
320 } else
321 {
322 return TimestampFormat.Float8;
323 }
324 } catch(PGParamNotExistException e)
325 {
326 logger.logInfo(text("Server doesn't support '", e.param,"' parameter! Assume HAVE_INT64_TIMESTAMP."));
327 return TimestampFormat.Int64;
328 }
329 }
330
331 /**
332 * Returns server time zone. This value is important to handle
333 * time stamps with time zone specified as libpq doesn't send
334 * the information with time stamp.
335 *
336 * Note: Will fallback to UTC value if server protocol doesn't support acquiring of
337 * 'TimeZone' parameter or server returns invalid time zone name.
338 */
339 immutable(TimeZone) timeZone() @property
340 {
341 try
342 {
343 auto res = conn.parameterStatus("TimeZone");
344
345 try
346 {
347 return TimeZone.getTimeZone(res);
348 } catch(DateTimeException e)
349 {
350 logger.logInfo(text("Cannot parse time zone value '", res, "'. Assume UTC."));
351 return UTC();
352 }
353
354 } catch(PGParamNotExistException e)
355 {
356 logger.logInfo(text("Server doesn't support '", e.param,"' parameter! Assume UTC."));
357 return UTC();
358 }
359 }
360
361 /**
362 * Returns true if the connection stores info/warning/error messages.
363 */
364 bool hasRaisedMsgs()
365 {
366 return mRaisedMsgs.length != 0;
367 }
368
369 /**
370 * Returns all saved info/warning/error messages from the connection.
371 */
372 InputRange!string raisedMsgs()
373 {
374 return ((cast(string[])mRaisedMsgs)[]).inputRangeObject;
375 }
376
377 /**
378 * Cleaning inner buffer for info/warning/error messages.
379 */
380 void clearRaisedMsgs()
381 {
382 mRaisedMsgs = [];
383 }
384
385 private
386 {
387 bool reconnecting = false;
388 string lastConnString;
389 shared ILogger logger;
390 shared IPostgreSQL api;
391 shared IPGconn conn;
392 shared ConnectException savedException;
393 shared QueryException savedQueryException;
394
395 shared string[] mRaisedMsgs;
396
397 extern(C) static void noticeProcessor(void* arg, char* message) nothrow
398 {
399 auto pqConn = cast(shared PQConnection)arg;
400 if(!pqConn) return;
401
402 try pqConn.addRaisedMsg(fromStringz(message).idup);
403 catch(Throwable e)
404 {
405 pqConn.logger.logError(text("Failed to add raised msg at libpq connection! Reason: ", e.msg));
406 }
407 }
408
409 void initNoticeCallbacks()
410 {
411 assert(conn);
412 conn.setNoticeProcessor(¬iceProcessor, cast(void*)this);
413 }
414
415 void addRaisedMsg(string msg)
416 {
417 mRaisedMsgs ~= msg;
418 }
419 }
420 }
421
422 synchronized class PQConnProvider : IConnectionProvider
423 {
424 this(shared ILogger logger, shared IPostgreSQL api)
425 {
426 this.logger = logger;
427 this.api = api;
428 }
429
430 shared(IConnection) allocate()
431 {
432 return new shared PQConnection(logger, api);
433 }
434
435 private shared ILogger logger;
436 private shared IPostgreSQL api;
437 }