1 // Written in D programming language 2 /** 3 * Part of asynchronous pool realization. 4 * 5 * Copyright: © 2014 DSoftOut 6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 7 * Authors: NCrashed <ncrashed@gmail.com> 8 */ 9 module pgator.db.async.workers.query; 10 11 import dlogg.log; 12 import pgator.db.connection; 13 import pgator.db.async.transaction; 14 import pgator.db.async.respond; 15 import pgator.db.async.workers.handler; 16 import std.concurrency; 17 import std.container; 18 import std.range; 19 import std.array; 20 import core.thread; 21 import derelict.pq.pq; 22 23 static void queringChecker(shared ILogger logger) 24 { 25 try 26 { 27 setMaxMailboxSize(thisTid, 0, OnCrowding.block); 28 Thread.getThis.isDaemon = true; 29 30 DList!Element list; 31 auto ids = ThreadIds.receive(); 32 33 bool exit = false; 34 Tid exitTid; 35 size_t last = list[].walkLength; 36 while(!exit || last > 0) 37 { 38 while(receiveTimeout(dur!"msecs"(1) 39 , (Tid sender, bool v) 40 { 41 exit = v; 42 exitTid = sender; 43 } 44 , (Tid sender, shared IConnection conn, shared Transaction transaction) 45 { 46 list.insert(new Element(sender, conn, cast(immutable)transaction, logger)); 47 } 48 , (Tid sender, string com) 49 { 50 if(com == "length") 51 { 52 sender.send(thisTid, list[].walkLength); 53 } 54 } 55 , (Variant v) { assert(false, "Unhandled message!"); } 56 )) {} 57 58 DList!Element nextList; 59 foreach(elem; list[]) 60 { 61 final switch(elem.stage) 62 { 63 case Element.Stage.MoreQueries: 64 { 65 elem.postQuery(); 66 nextList.insert(elem); 67 break; 68 } 69 case Element.Stage.Proccessing: 70 { 71 elem.stepQuery(); 72 nextList.insert(elem); 73 break; 74 } 75 case Element.Stage.Finished: 76 { 77 elem.sendRespond(); 78 if(exit) elem.conn.disconnect(); 79 else ids.freeCheckerId.send("add", elem.conn); 80 } 81 } 82 } 83 list.clear(); 84 list = nextList; 85 last = list[].walkLength; 86 } 87 88 exitTid.send(true); 89 logger.logDebug("Quering thread exited!"); 90 } catch (Throwable th) 91 { 92 logger.logError("AsyncPool: quering thread died!"); 93 logger.logError(text(th)); 94 } 95 } 96 97 private class Element 98 { 99 Tid sender; 100 shared IConnection conn; 101 102 immutable Transaction transaction; 103 private 104 { 105 size_t transactPos = 0; 106 size_t currQueryIndex = 0; 107 size_t paramsPassed = 0; 108 immutable string[] varsQueries; 109 size_t localVars = 0; 110 bool transStarted = false; 111 bool transEnded = false; 112 bool commandPosting = false; 113 bool rollbackNeeded = false; 114 bool rollbacked = false; 115 shared(ILogger) logger; 116 } 117 118 enum Stage 119 { 120 MoreQueries, 121 Proccessing, 122 Finished 123 } 124 Stage stage = Stage.MoreQueries; 125 private Respond respond; 126 127 this(Tid sender, shared IConnection conn, immutable Transaction transaction, shared ILogger logger) 128 { 129 this.sender = sender; 130 this.conn = conn; 131 this.transaction = transaction; 132 this.logger = logger; 133 134 auto builder = appender!(string[]); 135 foreach(key, value; transaction.vars) 136 { 137 builder.put(`SET LOCAL "` ~ key ~ `" = '` ~ value ~ `';`); 138 } 139 varsQueries = builder.data.idup; 140 } 141 142 private void wrapError(void delegate() func, bool startRollback = true) 143 { 144 try func(); 145 catch(QueryException e) 146 { 147 respond = Respond(e, conn); 148 if(startRollback) 149 { 150 rollbackNeeded = true; 151 stage = Stage.MoreQueries; 152 } else 153 { 154 stage = Stage.Finished; 155 } 156 return; 157 } 158 catch (Exception e) 159 { 160 respond = Respond(new QueryException("Internal error: "~e.msg), conn); 161 if(startRollback) 162 { 163 rollbackNeeded = true; 164 stage = Stage.MoreQueries; 165 } else 166 { 167 stage = Stage.Finished; 168 } 169 return; 170 } 171 172 stage = Stage.Proccessing; 173 } 174 175 void postQuery() 176 { 177 assert(stage == Stage.MoreQueries); 178 179 try 180 { 181 if(rollbackNeeded) 182 { 183 wrapError((){ conn.postQuery("rollback;", []); }, false); 184 rollbacked = true; 185 return; 186 } 187 188 if(!transStarted) 189 { 190 transStarted = true; 191 wrapError((){ conn.postQuery("begin;", []); }); 192 return; 193 } 194 195 if(localVars < varsQueries.length) 196 { 197 wrapError(() 198 { 199 conn.postQuery(varsQueries[localVars], []); 200 localVars++; 201 }); 202 return; 203 } 204 205 if(transactPos < transaction.commands.length) 206 { 207 commandPosting = true; 208 wrapError(() 209 { 210 if(transactPos < transaction.argnums.length && 211 transaction.argnums[transactPos] + paramsPassed <= transaction.params.length) 212 { 213 auto query = transaction.commands[transactPos]; 214 auto params = transaction.params[paramsPassed .. paramsPassed + transaction.argnums[transactPos]].dup; 215 216 conn.postQuery(query, params); 217 currQueryIndex = transactPos; 218 219 paramsPassed += transaction.argnums[transactPos]; 220 } 221 transactPos++; 222 }); 223 return; 224 } 225 226 if(!transEnded) 227 { 228 commandPosting = false; 229 transEnded = true; 230 wrapError((){ conn.postQuery("commit;", []); }); 231 return; 232 } 233 } 234 catch(Error err) 235 { 236 logger.logError(text("Internal unrecoverable error with transaction: ", err.msg)); 237 logger.logError(transaction.text); 238 logger.logError(text("Stack trace: ", err)); 239 throw err; 240 } 241 assert(false); 242 } 243 244 private bool hasMoreQueries() 245 { 246 if(!rollbackNeeded) 247 { 248 return !transStarted || !transEnded || localVars < varsQueries.length || transactPos < transaction.commands.length; 249 } 250 return !rollbacked; 251 } 252 253 private bool needCollectResult() 254 { 255 return commandPosting; 256 } 257 258 void stepQuery() 259 { 260 assert(stage == Stage.Proccessing); 261 262 try 263 { 264 final switch(conn.pollQueringStatus()) 265 { 266 case QueringStatus.Pending: 267 { 268 return; 269 } 270 case QueringStatus.Error: 271 { 272 try conn.pollQueryException(); 273 catch(QueryException e) 274 { 275 respond = Respond(e, conn); 276 rollbackNeeded = true; 277 return; 278 } 279 catch (Exception e) 280 { 281 respond = Respond(new QueryException("Internal error: "~e.msg), conn); 282 rollbackNeeded = true; 283 return; 284 } 285 break; 286 } 287 case QueringStatus.Finished: 288 { 289 try 290 { 291 if(rollbackNeeded) 292 { 293 rollbacked = true; 294 } 295 296 auto resList = conn.getQueryResult; 297 if(needCollectResult) 298 { 299 if(!respond.collect(resList, conn, transaction.oneRowConstraints[currQueryIndex], currQueryIndex)) 300 { 301 rollbackNeeded = true; 302 commandPosting = false; 303 stage = Stage.MoreQueries; 304 return; 305 } 306 } else // setting vars can fail 307 { 308 bool failed = false; 309 foreach(res; resList) 310 { 311 if(res.resultStatus != ExecStatusType.PGRES_TUPLES_OK && 312 res.resultStatus != ExecStatusType.PGRES_COMMAND_OK) 313 { 314 respond = Respond(new QueryException(res.resultErrorMessage), conn); 315 rollbackNeeded = true; 316 stage = Stage.MoreQueries; 317 failed = true; 318 } 319 res.clear(); 320 } 321 if(failed) return; 322 } 323 324 if(!hasMoreQueries) 325 { 326 stage = Stage.Finished; 327 return; 328 } 329 stage = Stage.MoreQueries; 330 } 331 catch(QueryException e) 332 { 333 respond = Respond(e, conn); 334 rollbackNeeded = true; 335 stage = Stage.MoreQueries; 336 return; 337 } 338 catch (Exception e) 339 { 340 respond = Respond(new QueryException("Internal error: "~e.msg), conn); 341 rollbackNeeded = true; 342 stage = Stage.MoreQueries; 343 return; 344 } 345 } 346 } 347 } 348 catch(Error err) 349 { 350 logger.logError(text("Internal unrecoverable error with transaction: ", err.msg)); 351 logger.logError(transaction.text); 352 logger.logError(text("Stack trace: ", err)); 353 throw err; 354 } 355 } 356 357 void sendRespond() 358 { 359 sender.send(thisTid, cast(shared)transaction, respond); 360 } 361 }