1 // Written in D programming language 2 /** 3 * Part of asynchronous pool realization. 4 * 5 * Copyright: © 2014 DSoftOut 6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 7 * Authors: NCrashed <ncrashed@gmail.com> 8 */ 9 module pgator.db.async.workers.closed; 10 11 import dlogg.log; 12 import pgator.db.connection; 13 import pgator.db.async.workers.handler; 14 import std.container; 15 import std.typecons; 16 import std.concurrency; 17 import std.range; 18 import core.time; 19 import core.thread; 20 21 alias Tuple!(shared IConnection, "conn", TickDuration, "duration") TimedConnListElem; 22 alias DList!TimedConnListElem TimedConnList; 23 24 static void closedChecker(shared ILogger logger, Duration reconnectTime) 25 { 26 try 27 { 28 setMaxMailboxSize(thisTid, 0, OnCrowding.block); 29 Thread.getThis.isDaemon = true; 30 31 TimedConnList list; 32 auto ids = ThreadIds.receive(); 33 Tid exitTid; 34 35 bool exit = false; 36 while(!exit) 37 { 38 while (receiveTimeout(dur!"msecs"(1) 39 , (Tid sender, bool v) 40 { 41 exit = v; 42 exitTid = sender; 43 } 44 , (string com, shared(IConnection) conn, TickDuration time) 45 { 46 if(com == "add") 47 { 48 list.insert(TimedConnListElem(conn, time)); 49 } 50 } 51 , (Tid sender, string com) 52 { 53 if(com == "length") 54 { 55 sender.send(thisTid, list[].walkLength); 56 } 57 } 58 , (Variant v) { assert(false, "Unhandled message!"); } 59 )) {} 60 61 TimedConnList nextList; 62 foreach(elem; list) 63 { 64 auto conn = elem.conn; 65 auto time = elem.duration; 66 67 if(TickDuration.currSystemTick > time) 68 { 69 try 70 { 71 scope(success) 72 { 73 ids.connectingCheckerId.send("add", conn); 74 } 75 76 conn.reconnect(); 77 } catch(ConnectException e) 78 { 79 static if (__VERSION__ < 2066) { 80 logger.logDebug("Connection to server ",e.server," is still failing! Will retry over " 81 , reconnectTime.total!"seconds", ".", reconnectTime.fracSec.msecs, " seconds."); 82 } else { 83 logger.logDebug("Connection to server ",e.server," is still failing! Will retry over " 84 , reconnectTime.total!"seconds", ".", reconnectTime.split!("seconds", "msecs").msecs, " seconds."); 85 } 86 elem.duration = TickDuration.currSystemTick + cast(TickDuration)reconnectTime; 87 nextList.insert(elem); 88 } 89 } else 90 { 91 nextList.insert(elem); 92 } 93 } 94 list.clear; 95 list = nextList; 96 } 97 98 scope(exit) 99 { 100 foreach(elem; list) 101 { 102 elem.conn.disconnect(); 103 } 104 } 105 106 exitTid.send(true); 107 logger.logDebug("Closed connections thread exited!"); 108 } catch (Throwable th) 109 { 110 logger.logError("AsyncPool: closed connections thread died!"); 111 logger.logError(text(th)); 112 } 113 }