1 // Written in D programming language
2 /**
3 *   This module defines realization of high-level libpq api.
4 *
5 *   See_Also: pgator.db.pq.api
6 *
7 *   Copyright: © 2014 DSoftOut
8 *   License: Subject to the terms of the MIT license, as written in the included LICENSE file.
9 *   Authors: NCrashed <ncrashed@gmail.com>
10 */
11 module pgator.db.pq.libpq;
12 
13 public import pgator.db.pq.api;
14 import pgator.db.connection;
15 public import dpq2;
16 import derelict.util.exception;
17 import dlogg.log;
18 import vibe.data.bson;
19 import std.exception;
20 import std.string;
21 import std.regex;
22 import std.conv;
23 import core.memory;
24 import core.exception: RangeError;
25 
26 alias Dpq2Connection = dpq2.Connection;
27 //import util;
28 
29 synchronized class CPGresult : IPGresult
30 {
31     private immutable Answer result;
32 
33     this(immutable Answer result, shared ILogger plogger) nothrow
34     {
35         this.result = result;
36         this.mLogger = plogger;
37     }
38 
39     Bson asColumnBson(shared IConnection conn) const
40     {
41         Bson[string] fields;
42         foreach(i; 0..result.columnCount)
43         {
44             Bson[] rows;
45             foreach(j; 0..result.length)
46             {
47                 rows ~= result[j][i].toBson;
48             }
49 
50             fields[result.columnName(i)] = Bson(rows);
51         }
52 
53         return Bson(fields);
54     }
55 
56     private shared(ILogger) mLogger;
57     
58     protected shared(ILogger) logger()
59     {
60         return mLogger;
61     }
62 
63     ExecStatusType resultStatus() nothrow const
64     {
65         return result.status;
66     }
67 
68     string resStatus() const
69     {
70         return result.statusString;
71     }
72     
73     string resultErrorMessage() const
74     {
75         return result.resultErrorMessage;
76     }
77     
78     size_t ntuples() nothrow const
79     {
80         return result.length;
81     }
82     
83     size_t nfields() nothrow const
84     {
85         return result.columnCount;
86     }
87     
88     string fname(size_t colNumber) const
89     {
90         return result.columnName(colNumber);
91     }
92     
93     bool isBinary(size_t colNumber) const
94     {
95         return result.columnFormat(colNumber) == ValueFormat.BINARY;
96     }
97 
98     string asString(size_t rowNumber, size_t colNumber) const
99     {
100         return result[rowNumber][colNumber].as!string;
101     }
102 
103     ubyte[] asBytes(size_t rowNumber, size_t colNumber) const
104     {
105         return result[rowNumber][colNumber].as!PGbytea.dup;
106     }
107 
108     bool getisnull(size_t rowNumber, size_t colNumber) const
109     {
110         return result[rowNumber].isNULL(colNumber);
111     }
112 
113     OidType ftype(size_t colNumber) const
114     {
115         return result.OID(colNumber);
116     }
117 }
118 
119 synchronized class CPGconn : IPGconn
120 {
121     private Dpq2Connection sharedConn;    
122     private shared(ILogger) mLogger;
123 
124     @property
125     private Dpq2Connection conn() const nothrow // nonshared conn for compatibility with dpq2
126     {
127         return cast(Dpq2Connection) sharedConn;
128     }
129 
130     this(Dpq2Connection conn, shared ILogger plogger) nothrow
131     {
132         this.sharedConn = cast(shared) conn;
133         this.mLogger = plogger;
134     }
135 
136     protected shared(ILogger) logger() nothrow
137     {
138         return mLogger;
139     }
140 
141     PostgresPollingStatusType poll() nothrow
142     {
143         return conn.poll;
144     }
145 
146     ConnStatusType status() nothrow
147     {
148         return conn.status;
149     }
150 
151     /**
152     *   Prototype: PQfinish
153     *   Note: this function should be called even
154     *   there was an error.
155     */
156     void finish() nothrow
157     {
158         conn.disconnect();
159     }
160 
161     bool flush() nothrow const
162     {
163         return flush();
164     }
165 
166     void resetStart()
167     {
168         conn.resetStart();
169     }
170 
171     PostgresPollingStatusType resetPoll() nothrow
172     {
173         return conn.resetPoll();
174     }
175 
176     string errorMessage() const nothrow @property
177     {
178         return conn.errorMessage();
179     }
180 
181     void sendQuery(string command)
182     {
183         conn.sendQuery(command);
184     }
185 
186     void sendQueryParams(string command, string[] paramValues)
187     {
188         QueryParams params;
189         params.sqlCommand = command;
190         params.resultFormat = ValueFormat.BINARY;
191         params.args.length = paramValues.length;
192 
193         foreach(i, ref p; params.args)
194         {
195             p.value = paramValues[i];
196         }
197 
198         conn.sendQuery(params);
199     }
200 
201     /**
202     *   Like sendQueryParams but uses libpq escaping functions
203     *   and sendQuery. 
204     *   
205     *   The main advantage of the function is ability to handle
206     *   multiple SQL commands in one query.
207     *   Throws: PGQueryException
208     */
209     void sendQueryParamsExt(string command, string[] paramValues)
210     {
211         sendQuery(escapeParams(command, paramValues));
212     }
213 
214     /**
215     *   Prototype: PQgetResult
216     *   Note: Even when PQresultStatus indicates a fatal error, 
217     *         PQgetResult should be called until it returns a null pointer 
218     *         to allow libpq to process the error information completely.
219     *   Note: A null pointer is returned when the command is complete and t
220     *         here will be no more results.
221     */
222     shared(IPGresult) getResult()
223     {
224         auto r = conn.getResult();
225 
226         if(r is null) return null;
227 
228         auto a = r.getAnswer();
229 
230         return new shared CPGresult(a, logger);
231     }
232 
233     void consumeInput()
234     {
235         conn.consumeInput();
236     }
237 
238     bool isBusy() nothrow
239     {
240         return conn.isBusy();
241     }
242     
243     /**
244     *   Prototype: PQescapeLiteral
245     *   Throws: PGEscapeException
246     */
247     string escapeLiteral(string msg)
248     {
249         return conn.escapeLiteral(msg);
250     }
251     
252     /**
253     *   Escaping query like PQexecParams does. This function
254     *   enables use of multiple SQL commands in one query.
255     */
256     private string escapeParams(string query, string[] args)
257     {
258         foreach(i, arg; args)
259         {
260             auto reg = regex(text(`\$`, i));
261             query = query.replaceAll(reg, escapeLiteral(arg));
262         }
263         return query;
264     }
265 
266     string parameterStatus(string param)
267     {
268         return conn.parameterStatus(param);
269     }
270 
271     /**
272     *   Prototype: PQsetNoticeProcessor
273     */
274     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
275     {
276         return conn.setNoticeProcessor(proc, arg);
277     }
278 
279     string server()
280     {
281         return conn.host;
282     }
283 }
284 
285 synchronized class PostgreSQL : IPostgreSQL
286 {
287     this(shared ILogger plogger)
288     {
289         this.mLogger = plogger;
290     }
291 
292     private shared(ILogger) mLogger;
293     
294     protected shared(ILogger) logger()
295     {
296         return mLogger;
297     }
298     
299     shared(IPGconn) startConnect(string conninfo)
300     {
301         auto c = new Connection;
302         c.connString = conninfo;
303         c.connectNonblockingStart();
304 
305         return new shared CPGconn(c, logger);
306     }
307 }