1 // Written in D programming language
2 /**
3 *    Part of asynchronous pool realization.
4 *    
5 *    Copyright: © 2014 DSoftOut
6 *    License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 *    Authors: NCrashed <ncrashed@gmail.com>
8 */
9 module pgator.db.async.workers.connecting;
10 
11 import dlogg.log;
12 import pgator.db.connection;
13 import pgator.db.async.workers.handler;
14 import std.algorithm;
15 import std.container;
16 import std.concurrency;
17 import std.datetime;
18 import std.range;
19 import core.thread;
20 //import util;
21 
22 
23 import std.stdio;
24 
25 private alias DList!(shared IConnection) ConnectionList;
26 
27 void connectingChecker(shared ILogger logger, Duration reconnectTime)
28 {
29     try
30     {
31         setMaxMailboxSize(thisTid, 0, OnCrowding.block);
32         Thread.getThis.isDaemon = true;
33            
34         ConnectionList list;
35         auto ids = ThreadIds.receive();
36         Tid exitTid;
37            
38         bool exit = false;
39         while(!exit)
40         {
41             while(receiveTimeout(dur!"msecs"(1)
42                 , (Tid sender, bool v) 
43                 {
44                     exit = v; 
45                     exitTid = sender;
46                 }
47                 , (string com, shared IConnection conn) 
48                 {
49                     if(com == "add")
50                     {
51                         list.insert(conn);
52                     }
53                 }
54                 , (Tid sender, string com) 
55                 {
56                     if(com == "length")
57                     {
58                         sender.send(thisTid, list[].walkLength);
59                     }
60                 }
61                 , (Variant v) { assert(false, "Unhandled message!"); }
62             )) {}
63 
64             ConnectionList newList; 
65             foreach(conn; list)
66             {
67                 final switch(conn.pollConnectionStatus())
68                 {
69                     case ConnectionStatus.Pending:
70                     {
71                         newList.insert = conn;
72                         break;
73                     }
74                     case ConnectionStatus.Error:
75                     {
76                         try conn.pollConnectionException();
77                         catch(ConnectException e)
78                         {
79                             logger.logError(e.msg);
80 			                static if (__VERSION__ < 2066) {
81 				                logger.logDebug("Will retry to connect to ", e.server, " over "
82 				                       , reconnectTime.total!"seconds", ".", reconnectTime.fracSec.msecs, " seconds.");
83 			                } else {
84 				                logger.logDebug("Will retry to connect to ", e.server, " over "
85 				                       , reconnectTime.total!"seconds", ".", reconnectTime.split!("seconds", "msecs").msecs, " seconds.");
86 			                }
87                            
88                             TickDuration whenRetry = TickDuration.currSystemTick + cast(TickDuration)reconnectTime;
89                             ids.closedCheckerId.send("add", conn, whenRetry);
90                         }
91                         break;
92                     }
93                     case ConnectionStatus.Finished:
94                     {
95                         ids.freeCheckerId.send("add", conn);
96                         break;
97                     }
98                 }
99             }
100             list.clear();
101             list = newList;
102         }
103 
104         scope(exit)
105         {
106             foreach(conn; list)
107             {
108                 conn.disconnect();
109             } 
110         }
111            
112         exitTid.send(true);
113         logger.logDebug("Connecting thread exited!");
114     } catch (Throwable th)
115     {
116         logger.logError("AsyncPool: connecting thread died!");
117         logger.logError(text(th));
118     }
119 }