1 // Written in D programming language
2 /**
3 *    Part of asynchronous pool realization.
4 *    
5 *    Copyright: © 2014 DSoftOut
6 *    License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 *    Authors: NCrashed <ncrashed@gmail.com>
8 */
9 module pgator.db.async.workers.query;
10 
11 import dlogg.log;
12 import pgator.db.connection;
13 import pgator.db.async.transaction;
14 import pgator.db.async.respond;
15 import pgator.db.async.workers.handler;
16 import std.concurrency;
17 import std.container;
18 import std.range;
19 import std.array;
20 import core.thread;
21 import derelict.pq.pq;
22 
23 static void queringChecker(shared ILogger logger)
24 {
25     try
26     {
27         setMaxMailboxSize(thisTid, 0, OnCrowding.block);
28         Thread.getThis.isDaemon = true;
29        
30         DList!Element list;
31         auto ids = ThreadIds.receive();
32       
33         bool exit = false;
34         Tid exitTid;
35         size_t last = list[].walkLength;
36         while(!exit || last > 0)
37         {
38             while(receiveTimeout(dur!"msecs"(1)
39                 , (Tid sender, bool v) 
40                 {
41                     exit = v; 
42                     exitTid = sender;
43                 }
44                 , (Tid sender, shared IConnection conn, shared Transaction transaction) 
45                 {
46                     list.insert(new Element(sender, conn, cast(immutable)transaction, logger));
47                 }
48                 , (Tid sender, string com) 
49                 {
50                     if(com == "length")
51                     {
52                         sender.send(thisTid, list[].walkLength);
53                     }
54                 }
55                 , (Variant v) { assert(false, "Unhandled message!"); }
56             )) {}
57            
58             DList!Element nextList;
59             foreach(elem; list[])
60             {
61                 final switch(elem.stage)
62                 {
63                     case Element.Stage.MoreQueries:
64                     {
65                         elem.postQuery();
66                         nextList.insert(elem);
67                         break;             
68                     }
69                     case Element.Stage.Proccessing:
70                     {
71                         elem.stepQuery();
72                         nextList.insert(elem);
73                         break;            
74                     }
75                     case Element.Stage.Finished:
76                     {
77                         elem.sendRespond();
78                         if(exit) elem.conn.disconnect();
79                         else ids.freeCheckerId.send("add", elem.conn);
80                     }
81                 }
82             }
83             list.clear();
84             list = nextList;
85             last = list[].walkLength;
86         }
87 
88         exitTid.send(true);
89         logger.logDebug("Quering thread exited!");
90     } catch (Throwable th)
91     {
92         logger.logError("AsyncPool: quering thread died!");
93         logger.logError(text(th));
94     }
95 }
96 
97 private class Element
98 {
99     Tid sender;
100     shared IConnection conn;
101        
102     immutable Transaction transaction;
103     private
104     {
105         size_t transactPos = 0;
106         size_t currQueryIndex = 0;
107         size_t paramsPassed = 0;
108         immutable string[] varsQueries;
109         size_t localVars = 0;
110         bool transStarted = false;
111         bool transEnded = false;
112         bool commandPosting = false;
113         bool rollbackNeeded = false;
114         bool rollbacked = false;
115         shared(ILogger) logger;
116     }
117        
118     enum Stage
119     {
120         MoreQueries,
121         Proccessing,
122         Finished
123     }
124     Stage stage = Stage.MoreQueries;
125     private Respond respond;
126        
127     this(Tid sender, shared IConnection conn, immutable Transaction transaction, shared ILogger logger)
128     {
129         this.sender = sender;
130         this.conn = conn;
131         this.transaction = transaction;
132         this.logger = logger;
133         
134         auto builder = appender!(string[]);   
135         foreach(key, value; transaction.vars)
136         {
137             builder.put(`SET LOCAL "` ~ key ~ `" = '` ~ value ~ `';`);
138         } 
139         varsQueries = builder.data.idup;
140     }
141        
142     private void wrapError(void delegate() func, bool startRollback = true)
143     {
144         try func();
145         catch(QueryException e)
146         {
147             respond = Respond(e, conn);         
148             if(startRollback)
149             {       
150                 rollbackNeeded = true; 
151                 stage = Stage.MoreQueries;
152             } else
153             {
154                 stage = Stage.Finished;
155             }
156             return;
157         }
158         catch (Exception e)
159         {
160             respond = Respond(new QueryException("Internal error: "~e.msg), conn);
161             if(startRollback)
162             {       
163                 rollbackNeeded = true;
164                 stage = Stage.MoreQueries;
165             } else
166             {
167                 stage = Stage.Finished;
168             }
169             return;
170         }
171            
172         stage = Stage.Proccessing;   
173     }
174         
175     void postQuery()
176     {
177         assert(stage == Stage.MoreQueries); 
178 
179         try
180         {
181             if(rollbackNeeded)
182             {
183                 wrapError((){ conn.postQuery("rollback;", []); }, false);
184                 rollbacked = true;
185                 return;
186             }
187                
188             if(!transStarted)
189             {
190                 transStarted = true; 
191                 wrapError((){ conn.postQuery("begin;", []); });            
192                 return;
193             }
194                
195             if(localVars < varsQueries.length)
196             {
197                 wrapError(()
198                 { 
199                     conn.postQuery(varsQueries[localVars], []); 
200                     localVars++;
201                 });              
202                 return;
203             }
204                
205             if(transactPos < transaction.commands.length)
206             {
207                 commandPosting = true;
208                 wrapError(()
209                 { 
210                     if(transactPos < transaction.argnums.length &&
211                         transaction.argnums[transactPos] + paramsPassed <= transaction.params.length)
212                     {
213                         auto query = transaction.commands[transactPos];
214                         auto params = transaction.params[paramsPassed .. paramsPassed + transaction.argnums[transactPos]].dup;
215                                
216                         conn.postQuery(query, params);  
217                         currQueryIndex = transactPos; 
218                         
219                         paramsPassed += transaction.argnums[transactPos];
220                     }
221                     transactPos++; 
222                 });             
223                 return;
224             }
225                
226             if(!transEnded)
227             {
228                 commandPosting = false;
229                 transEnded = true;
230                 wrapError((){ conn.postQuery("commit;", []); });           
231                 return;
232             }
233         }
234         catch(Error err)
235         {
236             logger.logError(text("Internal unrecoverable error with transaction: ", err.msg));
237             logger.logError(transaction.text);
238             logger.logError(text("Stack trace: ", err));
239             throw err;
240         }
241         assert(false);
242     }
243        
244     private bool hasMoreQueries()
245     {
246         if(!rollbackNeeded)
247         {
248             return !transStarted || !transEnded || localVars < varsQueries.length || transactPos < transaction.commands.length;
249         }
250         return !rollbacked;
251     }
252        
253     private bool needCollectResult()
254     {
255         return commandPosting;
256     }
257        
258     void stepQuery()
259     {
260         assert(stage == Stage.Proccessing);                
261         
262         try
263         {   
264             final switch(conn.pollQueringStatus())
265             {
266                 case QueringStatus.Pending:
267                 { 
268                     return;                
269                 }
270                 case QueringStatus.Error:
271                 {
272                     try conn.pollQueryException();
273                     catch(QueryException e)
274                     {
275                         respond = Respond(e, conn);
276                         rollbackNeeded = true;
277                         return;
278                     } 
279                     catch (Exception e)
280                     {
281                         respond = Respond(new QueryException("Internal error: "~e.msg), conn);
282                         rollbackNeeded = true;
283                         return;
284                     }
285                     break;
286                 }
287                 case QueringStatus.Finished:
288                 {
289                     try 
290                     {
291                         if(rollbackNeeded)
292                         {
293                             rollbacked = true;
294                         }
295                           
296                         auto resList = conn.getQueryResult;
297                         if(needCollectResult) 
298                         {
299                             if(!respond.collect(resList, conn, transaction.oneRowConstraints[currQueryIndex], currQueryIndex))
300                             {
301                                 rollbackNeeded = true;  
302                                 commandPosting = false;
303                                 stage = Stage.MoreQueries;
304                                 return;
305                             }
306                         } else // setting vars can fail
307                         {
308                             bool failed = false;
309                             foreach(res; resList)
310                             {
311                                 if(res.resultStatus != ExecStatusType.PGRES_TUPLES_OK &&
312                                    res.resultStatus != ExecStatusType.PGRES_COMMAND_OK)
313                                 {
314                                     respond = Respond(new QueryException(res.resultErrorMessage), conn);
315                                     rollbackNeeded = true;  
316                                     stage = Stage.MoreQueries;                         
317                                     failed = true;
318                                 }
319                                 res.clear();
320                             }
321                             if(failed) return;
322                         }
323                                          
324                         if(!hasMoreQueries)
325                         {
326                             stage = Stage.Finished; 
327                             return;
328                         }
329                         stage = Stage.MoreQueries;
330                     }
331                     catch(QueryException e)
332                     {
333                         respond = Respond(e, conn);
334                         rollbackNeeded = true;  
335                         stage = Stage.MoreQueries;                          
336                         return;
337                     } 
338                     catch (Exception e)
339                     {
340                         respond = Respond(new QueryException("Internal error: "~e.msg), conn);
341                         rollbackNeeded = true; 
342                         stage = Stage.MoreQueries; 
343                         return;
344                     }
345                 }
346             }
347         }
348         catch(Error err)
349         {
350             logger.logError(text("Internal unrecoverable error with transaction: ", err.msg));
351             logger.logError(transaction.text);
352             logger.logError(text("Stack trace: ", err));
353             throw err;
354         }
355     }
356        
357     void sendRespond()
358     {
359         sender.send(thisTid, cast(shared)transaction, respond);            
360     }
361 }