1 // Written in D programming language 2 /** 3 * This module defines realization of high-level libpq api. 4 * 5 * See_Also: pgator.db.pq.api 6 * 7 * Copyright: © 2014 DSoftOut 8 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 9 * Authors: NCrashed <ncrashed@gmail.com> 10 */ 11 module pgator.db.pq.libpq; 12 13 public import pgator.db.pq.api; 14 import pgator.db.connection; 15 public import dpq2; 16 import derelict.util.exception; 17 import dlogg.log; 18 import vibe.data.bson; 19 import std.exception; 20 import std.string; 21 import std.regex; 22 import std.conv; 23 import core.memory; 24 import core.exception: RangeError; 25 26 alias Dpq2Connection = dpq2.Connection; 27 //import util; 28 29 synchronized class CPGresult : IPGresult 30 { 31 private immutable Answer result; 32 33 this(immutable Answer result, shared ILogger plogger) nothrow 34 { 35 this.result = result; 36 this.mLogger = plogger; 37 } 38 39 Bson asColumnBson(shared IConnection conn) const 40 { 41 Bson[string] fields; 42 foreach(i; 0..result.columnCount) 43 { 44 Bson[] rows; 45 foreach(j; 0..result.length) 46 { 47 rows ~= result[j][i].toBson; 48 } 49 50 fields[result.columnName(i)] = Bson(rows); 51 } 52 53 return Bson(fields); 54 } 55 56 private shared(ILogger) mLogger; 57 58 protected shared(ILogger) logger() 59 { 60 return mLogger; 61 } 62 63 ExecStatusType resultStatus() nothrow const 64 { 65 return result.status; 66 } 67 68 string resStatus() const 69 { 70 return result.statusString; 71 } 72 73 string resultErrorMessage() const 74 { 75 return result.resultErrorMessage; 76 } 77 78 size_t ntuples() nothrow const 79 { 80 return result.length; 81 } 82 83 size_t nfields() nothrow const 84 { 85 return result.columnCount; 86 } 87 88 string fname(size_t colNumber) const 89 { 90 return result.columnName(colNumber); 91 } 92 93 bool isBinary(size_t colNumber) const 94 { 95 return result.columnFormat(colNumber) == ValueFormat.BINARY; 96 } 97 98 string asString(size_t rowNumber, size_t colNumber) const 99 { 100 return result[rowNumber][colNumber].as!string; 101 } 102 103 ubyte[] asBytes(size_t rowNumber, size_t colNumber) const 104 { 105 return result[rowNumber][colNumber].as!PGbytea.dup; 106 } 107 108 bool getisnull(size_t rowNumber, size_t colNumber) const 109 { 110 return result[rowNumber].isNULL(colNumber); 111 } 112 113 OidType ftype(size_t colNumber) const 114 { 115 return result.OID(colNumber); 116 } 117 } 118 119 synchronized class CPGconn : IPGconn 120 { 121 private Dpq2Connection sharedConn; 122 private shared(ILogger) mLogger; 123 124 @property 125 private Dpq2Connection conn() const nothrow // nonshared conn for compatibility with dpq2 126 { 127 return cast(Dpq2Connection) sharedConn; 128 } 129 130 this(Dpq2Connection conn, shared ILogger plogger) nothrow 131 { 132 this.sharedConn = cast(shared) conn; 133 this.mLogger = plogger; 134 } 135 136 protected shared(ILogger) logger() nothrow 137 { 138 return mLogger; 139 } 140 141 PostgresPollingStatusType poll() nothrow 142 { 143 return conn.poll; 144 } 145 146 ConnStatusType status() nothrow 147 { 148 return conn.status; 149 } 150 151 /** 152 * Prototype: PQfinish 153 * Note: this function should be called even 154 * there was an error. 155 */ 156 void finish() nothrow 157 { 158 conn.disconnect(); 159 } 160 161 bool flush() nothrow const 162 { 163 return flush(); 164 } 165 166 void resetStart() 167 { 168 conn.resetStart(); 169 } 170 171 PostgresPollingStatusType resetPoll() nothrow 172 { 173 return conn.resetPoll(); 174 } 175 176 string errorMessage() const nothrow @property 177 { 178 return conn.errorMessage(); 179 } 180 181 void sendQuery(string command) 182 { 183 conn.sendQuery(command); 184 } 185 186 void sendQueryParams(string command, string[] paramValues) 187 { 188 QueryParams params; 189 params.sqlCommand = command; 190 params.resultFormat = ValueFormat.BINARY; 191 params.args.length = paramValues.length; 192 193 foreach(i, ref p; params.args) 194 { 195 p.value = paramValues[i]; 196 } 197 198 conn.sendQuery(params); 199 } 200 201 /** 202 * Like sendQueryParams but uses libpq escaping functions 203 * and sendQuery. 204 * 205 * The main advantage of the function is ability to handle 206 * multiple SQL commands in one query. 207 * Throws: PGQueryException 208 */ 209 void sendQueryParamsExt(string command, string[] paramValues) 210 { 211 sendQuery(escapeParams(command, paramValues)); 212 } 213 214 /** 215 * Prototype: PQgetResult 216 * Note: Even when PQresultStatus indicates a fatal error, 217 * PQgetResult should be called until it returns a null pointer 218 * to allow libpq to process the error information completely. 219 * Note: A null pointer is returned when the command is complete and t 220 * here will be no more results. 221 */ 222 shared(IPGresult) getResult() 223 { 224 auto r = conn.getResult(); 225 226 if(r is null) return null; 227 228 auto a = r.getAnswer(); 229 230 return new shared CPGresult(a, logger); 231 } 232 233 void consumeInput() 234 { 235 conn.consumeInput(); 236 } 237 238 bool isBusy() nothrow 239 { 240 return conn.isBusy(); 241 } 242 243 /** 244 * Prototype: PQescapeLiteral 245 * Throws: PGEscapeException 246 */ 247 string escapeLiteral(string msg) 248 { 249 return conn.escapeLiteral(msg); 250 } 251 252 /** 253 * Escaping query like PQexecParams does. This function 254 * enables use of multiple SQL commands in one query. 255 */ 256 private string escapeParams(string query, string[] args) 257 { 258 foreach(i, arg; args) 259 { 260 auto reg = regex(text(`\$`, i)); 261 query = query.replaceAll(reg, escapeLiteral(arg)); 262 } 263 return query; 264 } 265 266 string parameterStatus(string param) 267 { 268 return conn.parameterStatus(param); 269 } 270 271 /** 272 * Prototype: PQsetNoticeProcessor 273 */ 274 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 275 { 276 return conn.setNoticeProcessor(proc, arg); 277 } 278 279 string server() 280 { 281 return conn.host; 282 } 283 } 284 285 synchronized class PostgreSQL : IPostgreSQL 286 { 287 this(shared ILogger plogger) 288 { 289 this.mLogger = plogger; 290 } 291 292 private shared(ILogger) mLogger; 293 294 protected shared(ILogger) logger() 295 { 296 return mLogger; 297 } 298 299 shared(IPGconn) startConnect(string conninfo) 300 { 301 auto c = new Connection; 302 c.connString = conninfo; 303 c.connectNonblockingStart(); 304 305 return new shared CPGconn(c, logger); 306 } 307 }