1 // Written in D programming language 2 /** 3 * Part of asynchronous pool realization. 4 * 5 * Copyright: © 2014 DSoftOut 6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 7 * Authors: NCrashed <ncrashed@gmail.com> 8 */ 9 module pgator.db.async.workers.connecting; 10 11 import dlogg.log; 12 import pgator.db.connection; 13 import pgator.db.async.workers.handler; 14 import std.algorithm; 15 import std.container; 16 import std.concurrency; 17 import std.datetime; 18 import std.range; 19 import core.thread; 20 //import util; 21 22 23 import std.stdio; 24 25 private alias DList!(shared IConnection) ConnectionList; 26 27 void connectingChecker(shared ILogger logger, Duration reconnectTime) 28 { 29 try 30 { 31 setMaxMailboxSize(thisTid, 0, OnCrowding.block); 32 Thread.getThis.isDaemon = true; 33 34 ConnectionList list; 35 auto ids = ThreadIds.receive(); 36 Tid exitTid; 37 38 bool exit = false; 39 while(!exit) 40 { 41 while(receiveTimeout(dur!"msecs"(1) 42 , (Tid sender, bool v) 43 { 44 exit = v; 45 exitTid = sender; 46 } 47 , (string com, shared IConnection conn) 48 { 49 if(com == "add") 50 { 51 list.insert(conn); 52 } 53 } 54 , (Tid sender, string com) 55 { 56 if(com == "length") 57 { 58 sender.send(thisTid, list[].walkLength); 59 } 60 } 61 , (Variant v) { assert(false, "Unhandled message!"); } 62 )) {} 63 64 ConnectionList newList; 65 foreach(conn; list) 66 { 67 final switch(conn.pollConnectionStatus()) 68 { 69 case ConnectionStatus.Pending: 70 { 71 newList.insert = conn; 72 break; 73 } 74 case ConnectionStatus.Error: 75 { 76 try conn.pollConnectionException(); 77 catch(ConnectException e) 78 { 79 logger.logError(e.msg); 80 static if (__VERSION__ < 2066) { 81 logger.logDebug("Will retry to connect to ", e.server, " over " 82 , reconnectTime.total!"seconds", ".", reconnectTime.fracSec.msecs, " seconds."); 83 } else { 84 logger.logDebug("Will retry to connect to ", e.server, " over " 85 , reconnectTime.total!"seconds", ".", reconnectTime.split!("seconds", "msecs").msecs, " seconds."); 86 } 87 88 TickDuration whenRetry = TickDuration.currSystemTick + cast(TickDuration)reconnectTime; 89 ids.closedCheckerId.send("add", conn, whenRetry); 90 } 91 break; 92 } 93 case ConnectionStatus.Finished: 94 { 95 ids.freeCheckerId.send("add", conn); 96 break; 97 } 98 } 99 } 100 list.clear(); 101 list = newList; 102 } 103 104 scope(exit) 105 { 106 foreach(conn; list) 107 { 108 conn.disconnect(); 109 } 110 } 111 112 exitTid.send(true); 113 logger.logDebug("Connecting thread exited!"); 114 } catch (Throwable th) 115 { 116 logger.logError("AsyncPool: connecting thread died!"); 117 logger.logError(text(th)); 118 } 119 }