1 // Written in D programming language 2 /** 3 * Module describes a real connection to PostgreSQL server. 4 * 5 * Copyright: © 2014 DSoftOut 6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 7 * Authors: NCrashed <ncrashed@gmail.com> 8 */ 9 module pgator.db.pq.connection; 10 11 import dpq2.connection; 12 import pgator.db.connection; 13 import pgator.db.pq.api; 14 import pgator.util.string; 15 import dlogg.log; 16 import std.algorithm; 17 import std.conv; 18 import std.container; 19 import std.range; 20 import std.datetime; 21 import vibe.data.bson; 22 23 alias Dpq2ConnectException = ConnException; 24 25 /** 26 * PostgreSQL specific connection type. Although it can use 27 * different data base backend, the class is defined to 28 * support only PostgreSQL. 29 */ 30 synchronized class PQConnection : IConnection 31 { 32 this(shared ILogger logger, shared IPostgreSQL api) 33 { 34 this.logger = logger; 35 this.api = api; 36 } 37 38 /** 39 * Tries to establish connection with a SQL server described 40 * in $(B connString). 41 * 42 * Throws: ConnectException 43 */ 44 void connect(string connString) 45 { 46 if(conn !is null) conn.finish; 47 48 try 49 { 50 conn = api.startConnect(connString); 51 reconnecting = false; 52 lastConnString = connString; 53 54 initNoticeCallbacks(); 55 } catch(Dpq2ConnectException e) 56 { 57 logger.logError(text("Failed to connect to SQL server, reason:", e.msg)); 58 throw new ConnectException(server, e.msg); 59 } 60 } 61 62 /** 63 * Tries to establish connection with a SQL server described 64 * in previous call of $(B connect). 65 * 66 * Should throw ReconnectException if method cannot get stored 67 * connection string (the $(B connect) method wasn't called). 68 * 69 * Throws: ConnectException, ReconnectException 70 */ 71 void reconnect() 72 { 73 if(conn is null) throw new ReconnectException(server); 74 75 try 76 { 77 /// reset cannot reset connection with restarted postgres server 78 /// replaced with plain connect for now 79 /// see issue #57 fo more info 80 //conn.resetStart(); 81 //reconnecting = true; 82 83 conn = api.startConnect(lastConnString); 84 reconnecting = false; 85 86 initNoticeCallbacks(); 87 } catch(Dpq2ConnectException e) 88 { 89 logger.logError(text("Failed to reconnect to SQL server, reason:", e.msg)); 90 throw new ConnectException(server, e.msg); 91 } 92 } 93 94 /** 95 * Returns current status of connection. 96 */ 97 ConnectionStatus pollConnectionStatus() nothrow 98 in 99 { 100 assert(conn !is null, "Connection start wasn't established!"); 101 } 102 body 103 { 104 savedException = null; 105 PostgresPollingStatusType val; 106 if(reconnecting) val = conn.resetPoll; 107 else val = conn.poll; 108 109 switch(val) 110 { 111 case PGRES_POLLING_OK: 112 { 113 switch(conn.status) 114 { 115 case(CONNECTION_OK): 116 { 117 return ConnectionStatus.Finished; 118 } 119 case(CONNECTION_NEEDED): 120 { 121 savedException = cast(shared)(new ConnectException(server, "Connection wasn't tried to be established!")); 122 return ConnectionStatus.Error; 123 } 124 case(CONNECTION_BAD): 125 { 126 savedException = cast(shared)(new ConnectException(server, conn.errorMessage)); 127 return ConnectionStatus.Error; 128 } 129 default: 130 { 131 return ConnectionStatus.Pending; 132 } 133 } 134 } 135 case PGRES_POLLING_FAILED: 136 { 137 savedException = cast(shared)(new ConnectException(server, conn.errorMessage)); 138 return ConnectionStatus.Error; 139 } 140 default: 141 { 142 return ConnectionStatus.Pending; 143 } 144 } 145 } 146 147 /** 148 * If connection process is ended with error state, then 149 * throws ConnectException, else do nothing. 150 * 151 * Throws: ConnectException 152 */ 153 void pollConnectionException() 154 { 155 if(savedException !is null) throw cast()savedException; 156 } 157 158 /** 159 * Initializes querying process in non-blocking manner. 160 * Throws: QueryException 161 */ 162 void postQuery(string com, string[] params = []) 163 in 164 { 165 assert(conn !is null, "Connection start wasn't established!"); 166 } 167 body 168 { 169 try conn.sendQueryParams(com, params); 170 catch (ConnException e) 171 { 172 throw new QueryException(e.msg); 173 } 174 } 175 176 /** 177 * Returns quering status of connection. 178 */ 179 QueringStatus pollQueringStatus() 180 in 181 { 182 assert(conn !is null, "Connection start wasn't established!"); 183 } 184 body 185 { 186 savedQueryException = null; 187 try conn.consumeInput(); 188 catch (Exception e) // PGQueryException 189 { 190 savedQueryException = cast(shared)(new QueryException(e.msg)); 191 while(conn.getResult !is null) {} 192 return QueringStatus.Error; 193 } 194 195 if(conn.isBusy) return QueringStatus.Pending; 196 else return QueringStatus.Finished; 197 } 198 199 /** 200 * Sending senseless query to the server to check if the connection is 201 * actually alive (e.g. nothing can detect fail after postgresql restart but 202 * query). 203 */ 204 bool testAlive() 205 { 206 try 207 { 208 execQuery("SELECT 'pgator_ping';"); 209 } 210 catch(Dpq2ConnectException e) 211 { 212 return false; 213 } 214 215 return true; 216 } 217 218 /** 219 * If quering process is ended with error state, then 220 * throws QueryException, else do nothing. 221 * 222 * Throws: QueryException 223 */ 224 void pollQueryException() 225 { 226 if (savedQueryException !is null) throw cast()savedQueryException; 227 } 228 229 /** 230 * Returns query result, if $(B pollQueringStatus) shows that 231 * query is processed without errors, else blocks the caller 232 * until the answer is arrived. 233 */ 234 InputRange!(shared IPGresult) getQueryResult() 235 in 236 { 237 assert(conn !is null, "Connection start wasn't established!"); 238 } 239 body 240 { 241 if(conn.isBusy) 242 { 243 while(pollQueringStatus != QueringStatus.Finished) pollQueryException(); 244 } 245 246 auto builder = appender!(IPGresult[]); 247 shared IPGresult res = conn.getResult; 248 while(res !is null) 249 { 250 builder.put(cast()res); 251 res = conn.getResult; 252 } 253 254 return (cast(shared(IPGresult)[])builder.data[]).inputRangeObject; 255 } 256 257 /** 258 * Closes connection to the SQL server instantly. 259 * 260 * Also should interrupt connections in progress. 261 * 262 * Calls $(B callback) when closed. 263 */ 264 void disconnect() nothrow 265 in 266 { 267 assert(conn !is null, "Connection start wasn't established!"); 268 } 269 body 270 { 271 scope(failure) {} 272 conn.finish; 273 conn = null; 274 savedException = null; 275 savedQueryException = null; 276 } 277 278 /** 279 * Returns SQL server name (domain) the connection is desired to connect to. 280 * If connection isn't ever established (or tried) the method returns empty string. 281 */ 282 string server() nothrow const @property 283 { 284 scope(failure) return ""; 285 286 return (cast(shared) conn).server(); 287 } 288 289 /** 290 * Returns current date output format and ambitious values converting behavior. 291 * Throws: QueryException 292 */ 293 DateFormat dateFormat() @property 294 { 295 auto result = execQuery("SHOW DateStyle;"); 296 297 if(result.empty) throw new QueryException("DateFormat query expected result!"); 298 299 auto res = result.front.asColumnBson(this)["DateStyle"].deserializeBson!(string[]); 300 assert(res.length == 1); 301 auto vals = res[0].split(", "); 302 assert(vals.length == 2); 303 return DateFormat(vals[0], vals[1]); 304 } 305 306 /** 307 * Returns actual timestamp representation format used in server. 308 * 309 * Note: This property tells particular HAVE_INT64_TIMESTAMP version flag that is used 310 * by remote server. 311 */ 312 TimestampFormat timestampFormat() @property 313 { 314 auto res = conn.parameterStatus("integer_datetimes"); 315 if(res == "on") 316 { 317 return TimestampFormat.Int64; 318 } else 319 { 320 return TimestampFormat.Float8; 321 } 322 } 323 324 /** 325 * Returns server time zone. This value is important to handle 326 * time stamps with time zone specified as libpq doesn't send 327 * the information with time stamp. 328 * 329 * Note: Will fallback to UTC value if server protocol doesn't support acquiring of 330 * 'TimeZone' parameter or server returns invalid time zone name. 331 */ 332 immutable(TimeZone) timeZone() @property 333 { 334 auto res = conn.parameterStatus("TimeZone"); 335 336 return TimeZone.getTimeZone(res); 337 } 338 339 /** 340 * Returns true if the connection stores info/warning/error messages. 341 */ 342 bool hasRaisedMsgs() 343 { 344 return mRaisedMsgs.length != 0; 345 } 346 347 /** 348 * Returns all saved info/warning/error messages from the connection. 349 */ 350 InputRange!string raisedMsgs() 351 { 352 return ((cast(string[])mRaisedMsgs)[]).inputRangeObject; 353 } 354 355 /** 356 * Cleaning inner buffer for info/warning/error messages. 357 */ 358 void clearRaisedMsgs() 359 { 360 mRaisedMsgs = []; 361 } 362 363 private 364 { 365 bool reconnecting = false; 366 string lastConnString; 367 shared ILogger logger; 368 shared IPostgreSQL api; 369 shared IPGconn conn; 370 shared ConnectException savedException; 371 shared QueryException savedQueryException; 372 373 shared string[] mRaisedMsgs; 374 375 extern(C) static void noticeProcessor(void* arg, char* message) nothrow @nogc 376 { 377 auto pqConn = cast(shared PQConnection)arg; 378 if(!pqConn) return; 379 //FIXME: need logging here 380 381 //~ try pqConn.addRaisedMsg(fromStringz(message).idup); 382 //~ catch(Throwable e) 383 //~ { 384 //~ pqConn.logger.logError(text("Failed to add raised msg at libpq connection! Reason: ", e.msg)); 385 //~ } 386 } 387 388 void initNoticeCallbacks() 389 { 390 assert(conn); 391 conn.setNoticeProcessor(¬iceProcessor, cast(void*)this); 392 } 393 394 void addRaisedMsg(string msg) 395 { 396 mRaisedMsgs ~= msg; 397 } 398 } 399 } 400 401 synchronized class PQConnProvider : IConnectionProvider 402 { 403 this(shared ILogger logger, shared IPostgreSQL api) 404 { 405 this.logger = logger; 406 this.api = api; 407 } 408 409 shared(IConnection) allocate() 410 { 411 return new shared PQConnection(logger, api); 412 } 413 414 private shared ILogger logger; 415 private shared IPostgreSQL api; 416 }