1 // Written in D programming language
2 /**
3 *    Part of asynchronous pool realization.
4 *    
5 *    Copyright: © 2014 DSoftOut
6 *    License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 *    Authors: NCrashed <ncrashed@gmail.com>
8 */
9 module pgator.db.async.workers.closed;
10 
11 import dlogg.log;
12 import pgator.db.connection;
13 import pgator.db.async.workers.handler;
14 import std.container;
15 import std.typecons;
16 import std.concurrency;
17 import std.range;
18 import core.time;
19 import core.thread;
20 
21 alias Tuple!(shared IConnection, "conn", TickDuration, "duration") TimedConnListElem;
22 alias DList!TimedConnListElem  TimedConnList;
23    
24 static void closedChecker(shared ILogger logger, Duration reconnectTime)
25 {
26     try 
27     {
28         setMaxMailboxSize(thisTid, 0, OnCrowding.block);
29         Thread.getThis.isDaemon = true;
30            
31         TimedConnList list;
32         auto ids = ThreadIds.receive();
33         Tid exitTid;
34            
35         bool exit = false;
36         while(!exit)
37         {
38             while (receiveTimeout(dur!"msecs"(1)
39                 , (Tid sender, bool v) 
40                 {
41                     exit = v; 
42                     exitTid = sender;
43                 }
44                 , (string com, shared(IConnection) conn, TickDuration time) 
45                 { 
46                     if(com == "add")
47                     {                 
48                         list.insert(TimedConnListElem(conn, time));
49                     }
50                 }
51                 , (Tid sender, string com) 
52                 {
53                     if(com == "length")
54                     {
55                         sender.send(thisTid, list[].walkLength);
56                     }
57                 }
58                 , (Variant v) { assert(false, "Unhandled message!"); }
59             )) {}
60                
61             TimedConnList nextList;
62             foreach(elem; list)
63             {
64                 auto conn = elem.conn;
65                 auto time = elem.duration;
66 
67                 if(TickDuration.currSystemTick > time)
68                 {
69                     try
70                     {
71                         scope(success)
72                         {
73                             ids.connectingCheckerId.send("add", conn);
74                         }
75                            
76                         conn.reconnect();      
77                     } catch(ConnectException e)
78                     {
79                     	static if (__VERSION__ < 2066) {
80 	                        logger.logDebug("Connection to server ",e.server," is still failing! Will retry over "
81 	                            , reconnectTime.total!"seconds", ".", reconnectTime.fracSec.msecs, " seconds.");
82                         } else {
83 	                        logger.logDebug("Connection to server ",e.server," is still failing! Will retry over "
84 	                            , reconnectTime.total!"seconds", ".", reconnectTime.split!("seconds", "msecs").msecs, " seconds.");
85                         }
86                         elem.duration = TickDuration.currSystemTick + cast(TickDuration)reconnectTime;
87                         nextList.insert(elem);
88                     }
89                 } else
90                 {
91                     nextList.insert(elem);
92                 } 
93            }
94            list.clear;  
95            list = nextList;
96         }
97            
98         scope(exit)
99         {
100             foreach(elem; list)
101             {
102                 elem.conn.disconnect();
103             } 
104         }
105         
106         exitTid.send(true);
107         logger.logDebug("Closed connections thread exited!");
108     } catch (Throwable th)
109     {
110         logger.logError("AsyncPool: closed connections thread died!");
111         logger.logError(text(th));
112     }
113 }