1 // Written in D programming language 2 /** 3 * Part of asynchronous pool realization. 4 * 5 * Copyright: © 2014 DSoftOut 6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 7 * Authors: NCrashed <ncrashed@gmail.com> 8 */ 9 module pgator.db.async.workers.free; 10 11 import dlogg.log; 12 import pgator.db.connection; 13 import pgator.db.async.workers.handler; 14 import std.container; 15 import std.concurrency; 16 import std.range; 17 import std.datetime; 18 import core.thread; 19 //import util; 20 21 private alias DList!(shared IConnection) ConnectionList; 22 23 void freeChecker(shared ILogger logger, Duration reconnectTime, Duration aliveCheckTime) 24 { 25 try 26 { 27 setMaxMailboxSize(thisTid, 0, OnCrowding.block); 28 Thread.getThis.isDaemon = true; 29 30 DList!Tid connRequests; 31 ConnectionList list; 32 auto ids = ThreadIds.receive(); 33 Tid exitTid; 34 auto nextCheckTime = Clock.currSystemTick + cast(TickDuration)aliveCheckTime; 35 36 bool exit = false; 37 while(!exit) 38 { 39 while (receiveTimeout(dur!"msecs"(1) 40 , (Tid sender, bool v) 41 { 42 exit = v; 43 exitTid = sender; 44 } 45 , (string com, shared IConnection conn) 46 { 47 if(com == "add") 48 { 49 if(connRequests.empty) list.insert(conn); 50 else 51 { 52 auto reqTid = connRequests.front; 53 connRequests.removeFront; 54 reqTid.send(thisTid, conn); /// TODO: Check case the requester is already gone 55 /// then the conn is lost 56 } 57 } 58 } 59 , (Tid sender, string com) 60 { 61 if(com == "length") 62 { 63 sender.send(thisTid, list[].walkLength); 64 } else if(com == "get") 65 { 66 if(list.empty) 67 { 68 connRequests.insert(sender); 69 } else 70 { 71 sender.send(thisTid, list.front); 72 list.removeFront; 73 } 74 } else assert(false, "Invalid command!"); 75 } 76 , (Variant v) { assert(false, "Unhandled message!"); } 77 )) {} 78 79 ConnectionList newList; 80 bool checkAlive = Clock.currSystemTick > nextCheckTime; 81 foreach(conn; list) 82 { 83 void processFailedConn() 84 { 85 static if (__VERSION__ < 2066) { 86 logger.logInfo(text("Will retry to connect to server over " 87 , reconnectTime.total!"seconds", ".", reconnectTime.fracSec.msecs, " seconds.")); 88 } else { 89 logger.logInfo(text("Will retry to connect to server over " 90 , reconnectTime.total!"seconds", ".", reconnectTime.split!("seconds", "msecs").msecs, " seconds.")); 91 } 92 93 TickDuration whenRetry = TickDuration.currSystemTick + cast(TickDuration)reconnectTime; 94 ids.closedCheckerId.send("add", conn, whenRetry); 95 } 96 97 if(conn.pollConnectionStatus == ConnectionStatus.Error) 98 { 99 try conn.pollConnectionException(); 100 catch(ConnectException e) 101 { 102 logger.logError(e.msg); 103 processFailedConn(); 104 continue; 105 } 106 } 107 108 if(checkAlive) 109 { 110 if(!conn.testAlive) 111 { 112 logger.logError("Connection test on its aliveness is failed!"); 113 processFailedConn(); 114 continue; 115 } 116 } 117 118 newList.insert = conn; 119 } 120 list.clear; 121 list = newList; 122 123 if(checkAlive) 124 { 125 nextCheckTime = Clock.currSystemTick + cast(TickDuration)aliveCheckTime; 126 } 127 } 128 129 // also compiler don't allow to put this in scope(exit) 130 foreach(conn; list) 131 { 132 try 133 { 134 conn.disconnect(); 135 } 136 catch(Throwable e) 137 { 138 139 } 140 } 141 142 exitTid.send(true); 143 logger.logDebug("Free connections thread exited!"); 144 } catch (Throwable th) 145 { 146 logger.logError("AsyncPool: free connections thread died!"); 147 logger.logError(text(th)); 148 } 149 }