1 // Written in D programming language
2 /**
3 *    Part of asynchronous pool realization.
4 *    
5 *    Copyright: © 2014 DSoftOut
6 *    License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 *    Authors: NCrashed <ncrashed@gmail.com>
8 */
9 module pgator.db.async.workers.free;
10 
11 import dlogg.log;
12 import pgator.db.connection;
13 import pgator.db.async.workers.handler;
14 import std.container;
15 import std.concurrency;
16 import std.range;
17 import std.datetime;
18 import core.thread;
19 //import util;
20 
21 private alias DList!(shared IConnection) ConnectionList;
22        
23 void freeChecker(shared ILogger logger, Duration reconnectTime, Duration aliveCheckTime)
24 {
25     try 
26     {
27         setMaxMailboxSize(thisTid, 0, OnCrowding.block);
28         Thread.getThis.isDaemon = true;
29                
30         DList!Tid connRequests;
31         ConnectionList list;
32         auto ids = ThreadIds.receive();
33         Tid exitTid;
34         auto nextCheckTime = Clock.currSystemTick + cast(TickDuration)aliveCheckTime;
35                           
36         bool exit = false;
37         while(!exit)
38         {
39             while (receiveTimeout(dur!"msecs"(1)
40                 , (Tid sender, bool v) 
41                 {
42                     exit = v; 
43                     exitTid = sender;
44                 }
45                 , (string com, shared IConnection conn) 
46                 {
47                     if(com == "add")
48                     {
49                         if(connRequests.empty) list.insert(conn);
50                         else
51                         {
52                             auto reqTid = connRequests.front;
53                             connRequests.removeFront;
54                             reqTid.send(thisTid, conn); /// TODO: Check case the requester is already gone
55                                                         /// then the conn is lost
56                         }
57                     }
58                 }
59                 , (Tid sender, string com) 
60                 {
61                     if(com == "length")
62                     {
63                         sender.send(thisTid, list[].walkLength);
64                     } else if(com == "get")
65                     {
66                         if(list.empty)
67                         {
68                             connRequests.insert(sender);
69                         } else
70                         {
71                             sender.send(thisTid, list.front);
72                             list.removeFront;
73                         }
74                         } else assert(false, "Invalid command!");
75                     }
76                     , (Variant v) { assert(false, "Unhandled message!"); }
77             )) {}
78             
79             ConnectionList newList;     
80             bool checkAlive = Clock.currSystemTick > nextCheckTime;
81             foreach(conn; list)
82             {
83                 void processFailedConn()
84                 {
85                 	static if (__VERSION__ < 2066) {
86 	                    logger.logInfo(text("Will retry to connect to server over "
87 	                        , reconnectTime.total!"seconds", ".", reconnectTime.fracSec.msecs, " seconds."));
88                     } else {
89 	                    logger.logInfo(text("Will retry to connect to server over "
90 	                        , reconnectTime.total!"seconds", ".", reconnectTime.split!("seconds", "msecs").msecs, " seconds."));
91                     }
92                     
93                     TickDuration whenRetry = TickDuration.currSystemTick + cast(TickDuration)reconnectTime;
94                     ids.closedCheckerId.send("add", conn, whenRetry);
95                 }
96                    
97                 if(conn.pollConnectionStatus == ConnectionStatus.Error)
98                 {
99                     try conn.pollConnectionException();
100                     catch(ConnectException e)
101                     {
102                         logger.logError(e.msg);
103                         processFailedConn();
104                         continue;
105                     }
106                 }
107                    
108                 if(checkAlive)
109                 {
110                     if(!conn.testAlive)
111                     {
112                         logger.logError("Connection test on its aliveness is failed!");
113                         processFailedConn();
114                         continue;
115                     }
116                 }
117                 
118                 newList.insert = conn;
119             }
120             list.clear;
121             list = newList;
122                
123             if(checkAlive)
124             {
125                 nextCheckTime = Clock.currSystemTick + cast(TickDuration)aliveCheckTime;
126             }
127         }
128                
129         // also compiler don't allow to put this in scope(exit)
130         foreach(conn; list)
131         {
132             try
133             {
134                 conn.disconnect();
135             } 
136             catch(Throwable e)
137             {
138                    
139             }
140         }
141            
142         exitTid.send(true);
143         logger.logDebug("Free connections thread exited!");
144     } catch (Throwable th)
145     {
146         logger.logError("AsyncPool: free connections thread died!");
147         logger.logError(text(th));
148     }
149 }