1 // Written in D programming language 2 /** 3 * Module describes a real connection to PostgreSQL server. 4 * 5 * Copyright: © 2014 DSoftOut 6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 7 * Authors: NCrashed <ncrashed@gmail.com> 8 */ 9 module pgator.db.pq.connection; 10 11 import derelict.pq.pq; 12 import pgator.db.connection; 13 import pgator.db.pq.api; 14 import pgator.util..string; 15 import dlogg.log; 16 import std.algorithm; 17 import std.conv; 18 import std.container; 19 import std.range; 20 import std.datetime; 21 import vibe.data.bson; 22 23 /** 24 * PostgreSQL specific connection type. Although it can use 25 * different data base backend, the class is defined to 26 * support only PostgreSQL. 27 */ 28 synchronized class PQConnection : IConnection 29 { 30 this(shared ILogger logger, shared IPostgreSQL api) 31 { 32 this.logger = logger; 33 this.api = api; 34 } 35 36 /** 37 * Tries to establish connection with a SQL server described 38 * in $(B connString). 39 * 40 * Throws: ConnectException 41 */ 42 void connect(string connString) 43 { 44 if(conn !is null) conn.finish; 45 46 try 47 { 48 conn = api.startConnect(connString); 49 reconnecting = false; 50 lastConnString = connString; 51 52 initNoticeCallbacks(); 53 } catch(PGException e) 54 { 55 logger.logError(text("Failed to connect to SQL server, reason:", e.msg)); 56 throw new ConnectException(server, e.msg); 57 } 58 } 59 60 /** 61 * Tries to establish connection with a SQL server described 62 * in previous call of $(B connect). 63 * 64 * Should throw ReconnectException if method cannot get stored 65 * connection string (the $(B connect) method wasn't called). 66 * 67 * Throws: ConnectException, ReconnectException 68 */ 69 void reconnect() 70 { 71 if(conn is null) throw new ReconnectException(server); 72 73 try 74 { 75 /// reset cannot reset connection with restarted postgres server 76 /// replaced with plain connect for now 77 /// see issue #57 fo more info 78 //conn.resetStart(); 79 //reconnecting = true; 80 81 conn = api.startConnect(lastConnString); 82 reconnecting = false; 83 84 initNoticeCallbacks(); 85 } catch(PGReconnectException e) 86 { 87 logger.logError(text("Failed to reconnect to SQL server, reason:", e.msg)); 88 throw new ConnectException(server, e.msg); 89 } 90 } 91 92 /** 93 * Returns current status of connection. 94 */ 95 ConnectionStatus pollConnectionStatus() nothrow 96 in 97 { 98 assert(conn !is null, "Connection start wasn't established!"); 99 } 100 body 101 { 102 savedException = null; 103 PostgresPollingStatusType val; 104 if(reconnecting) val = conn.resetPoll; 105 else val = conn.poll; 106 107 switch(val) 108 { 109 case PostgresPollingStatusType.PGRES_POLLING_OK: 110 { 111 switch(conn.status) 112 { 113 case(ConnStatusType.CONNECTION_OK): 114 { 115 return ConnectionStatus.Finished; 116 } 117 case(ConnStatusType.CONNECTION_NEEDED): 118 { 119 savedException = cast(shared)(new ConnectException(server, "Connection wasn't tried to be established!")); 120 return ConnectionStatus.Error; 121 } 122 case(ConnStatusType.CONNECTION_BAD): 123 { 124 savedException = cast(shared)(new ConnectException(server, conn.errorMessage)); 125 return ConnectionStatus.Error; 126 } 127 default: 128 { 129 return ConnectionStatus.Pending; 130 } 131 } 132 } 133 case PostgresPollingStatusType.PGRES_POLLING_FAILED: 134 { 135 savedException = cast(shared)(new ConnectException(server, conn.errorMessage)); 136 return ConnectionStatus.Error; 137 } 138 default: 139 { 140 return ConnectionStatus.Pending; 141 } 142 } 143 } 144 145 /** 146 * If connection process is ended with error state, then 147 * throws ConnectException, else do nothing. 148 * 149 * Throws: ConnectException 150 */ 151 void pollConnectionException() 152 { 153 if(savedException !is null) throw cast()savedException; 154 } 155 156 /** 157 * Initializes querying process in non-blocking manner. 158 * Throws: QueryException 159 */ 160 void postQuery(string com, string[] params = []) 161 in 162 { 163 assert(conn !is null, "Connection start wasn't established!"); 164 } 165 body 166 { 167 try conn.sendQueryParams(com, params); 168 catch (PGQueryException e) 169 { 170 throw new QueryException(e.msg); 171 } 172 } 173 174 /** 175 * Returns quering status of connection. 176 */ 177 QueringStatus pollQueringStatus() nothrow 178 in 179 { 180 assert(conn !is null, "Connection start wasn't established!"); 181 } 182 body 183 { 184 savedQueryException = null; 185 try conn.consumeInput(); 186 catch (Exception e) // PGQueryException 187 { 188 savedQueryException = cast(shared)(new QueryException(e.msg)); 189 while(conn.getResult !is null) {} 190 return QueringStatus.Error; 191 } 192 193 if(conn.isBusy) return QueringStatus.Pending; 194 else return QueringStatus.Finished; 195 } 196 197 /** 198 * Sending senseless query to the server to check if the connection is 199 * actually alive (e.g. nothing can detect fail after postgresql restart but 200 * query). 201 */ 202 bool testAlive() nothrow 203 { 204 try 205 { 206 auto reses = execQuery("SELECT 'pgator_ping';"); 207 foreach(res; reses) 208 { 209 res.clear(); 210 } 211 } catch(Exception e) 212 { 213 return false; 214 } 215 return true; 216 } 217 218 /** 219 * If quering process is ended with error state, then 220 * throws QueryException, else do nothing. 221 * 222 * Throws: QueryException 223 */ 224 void pollQueryException() 225 { 226 if (savedQueryException !is null) throw cast()savedQueryException; 227 } 228 229 /** 230 * Returns query result, if $(B pollQueringStatus) shows that 231 * query is processed without errors, else blocks the caller 232 * until the answer is arrived. 233 */ 234 InputRange!(shared IPGresult) getQueryResult() 235 in 236 { 237 assert(conn !is null, "Connection start wasn't established!"); 238 } 239 body 240 { 241 if(conn.isBusy) 242 { 243 while(pollQueringStatus != QueringStatus.Finished) pollQueryException(); 244 } 245 246 auto builder = appender!(IPGresult[]); 247 shared IPGresult res = conn.getResult; 248 while(res !is null) 249 { 250 builder.put(cast()res); 251 res = conn.getResult; 252 } 253 254 return (cast(shared(IPGresult)[])builder.data[]).inputRangeObject; 255 } 256 257 /** 258 * Closes connection to the SQL server instantly. 259 * 260 * Also should interrupt connections in progress. 261 * 262 * Calls $(B callback) when closed. 263 */ 264 void disconnect() nothrow 265 in 266 { 267 assert(conn !is null, "Connection start wasn't established!"); 268 } 269 body 270 { 271 scope(failure) {} 272 conn.finish; 273 conn = null; 274 savedException = null; 275 savedQueryException = null; 276 } 277 278 /** 279 * Returns SQL server name (domain) the connection is desired to connect to. 280 * If connection isn't ever established (or tried) the method returns empty string. 281 */ 282 string server() nothrow const @property 283 { 284 scope(failure) return ""; 285 286 return conn.host; 287 } 288 289 /** 290 * Returns current date output format and ambitious values converting behavior. 291 * Throws: QueryException 292 */ 293 DateFormat dateFormat() @property 294 { 295 auto result = execQuery("SHOW DateStyle;"); 296 297 if(result.empty) throw new QueryException("DateFormat query expected result!"); 298 299 auto res = result.front.asColumnBson(this)["DateStyle"].deserializeBson!(string[]); 300 assert(res.length == 1); 301 auto vals = res[0].split(", "); 302 assert(vals.length == 2); 303 return DateFormat(vals[0], vals[1]); 304 } 305 306 /** 307 * Returns actual timestamp representation format used in server. 308 * 309 * Note: This property tells particular HAVE_INT64_TIMESTAMP version flag that is used 310 * by remote server. 311 */ 312 TimestampFormat timestampFormat() @property 313 { 314 try 315 { 316 auto res = conn.parameterStatus("integer_datetimes"); 317 if(res == "on") 318 { 319 return TimestampFormat.Int64; 320 } else 321 { 322 return TimestampFormat.Float8; 323 } 324 } catch(PGParamNotExistException e) 325 { 326 logger.logInfo(text("Server doesn't support '", e.param,"' parameter! Assume HAVE_INT64_TIMESTAMP.")); 327 return TimestampFormat.Int64; 328 } 329 } 330 331 /** 332 * Returns server time zone. This value is important to handle 333 * time stamps with time zone specified as libpq doesn't send 334 * the information with time stamp. 335 * 336 * Note: Will fallback to UTC value if server protocol doesn't support acquiring of 337 * 'TimeZone' parameter or server returns invalid time zone name. 338 */ 339 immutable(TimeZone) timeZone() @property 340 { 341 try 342 { 343 auto res = conn.parameterStatus("TimeZone"); 344 345 try 346 { 347 return TimeZone.getTimeZone(res); 348 } catch(DateTimeException e) 349 { 350 logger.logInfo(text("Cannot parse time zone value '", res, "'. Assume UTC.")); 351 return UTC(); 352 } 353 354 } catch(PGParamNotExistException e) 355 { 356 logger.logInfo(text("Server doesn't support '", e.param,"' parameter! Assume UTC.")); 357 return UTC(); 358 } 359 } 360 361 /** 362 * Returns true if the connection stores info/warning/error messages. 363 */ 364 bool hasRaisedMsgs() 365 { 366 return mRaisedMsgs.length != 0; 367 } 368 369 /** 370 * Returns all saved info/warning/error messages from the connection. 371 */ 372 InputRange!string raisedMsgs() 373 { 374 return ((cast(string[])mRaisedMsgs)[]).inputRangeObject; 375 } 376 377 /** 378 * Cleaning inner buffer for info/warning/error messages. 379 */ 380 void clearRaisedMsgs() 381 { 382 mRaisedMsgs = []; 383 } 384 385 private 386 { 387 bool reconnecting = false; 388 string lastConnString; 389 shared ILogger logger; 390 shared IPostgreSQL api; 391 shared IPGconn conn; 392 shared ConnectException savedException; 393 shared QueryException savedQueryException; 394 395 shared string[] mRaisedMsgs; 396 397 extern(C) static void noticeProcessor(void* arg, char* message) nothrow 398 { 399 auto pqConn = cast(shared PQConnection)arg; 400 if(!pqConn) return; 401 402 try pqConn.addRaisedMsg(fromStringz(message).idup); 403 catch(Throwable e) 404 { 405 pqConn.logger.logError(text("Failed to add raised msg at libpq connection! Reason: ", e.msg)); 406 } 407 } 408 409 void initNoticeCallbacks() 410 { 411 assert(conn); 412 conn.setNoticeProcessor(¬iceProcessor, cast(void*)this); 413 } 414 415 void addRaisedMsg(string msg) 416 { 417 mRaisedMsgs ~= msg; 418 } 419 } 420 } 421 422 synchronized class PQConnProvider : IConnectionProvider 423 { 424 this(shared ILogger logger, shared IPostgreSQL api) 425 { 426 this.logger = logger; 427 this.api = api; 428 } 429 430 shared(IConnection) allocate() 431 { 432 return new shared PQConnection(logger, api); 433 } 434 435 private shared ILogger logger; 436 private shared IPostgreSQL api; 437 }