1 // Written in D programming language
2 /**
3 *    Part of asynchronous pool realization.
4 *    
5 *    Copyright: © 2014 DSoftOut
6 *    License: Subject to the terms of the MIT license, as written in the included LICENSE file.
7 *    Authors: NCrashed <ncrashed@gmail.com>
8 */
9 module pgator.db.async.workers.handler;
10 
11 import std.concurrency;
12 import dlogg.log;
13 import core.time;
14 
15 /**
16 *   Struct-helper that holds all workers ids. It solves problem with
17 *   immutability for Tids and allows to send and receive messages in a batch.
18 */
19 shared struct ThreadIds
20 {
21     /// Worker for closed connections
22     immutable Tid mClosedCheckerId;
23     /// Worker for free connections
24     immutable Tid mFreeCheckerId;
25     /// Worker for connections in connecting state
26     immutable Tid mConnectingCheckerId;
27     /// Worker for quering
28     immutable Tid mQueringCheckerId;
29     
30     /// Getter to cast away immutable
31     Tid closedCheckerId()
32     {
33         return cast()mClosedCheckerId;
34     }
35     
36     /// Getter to cast away immutable
37     Tid freeCheckerId()
38     {
39         return cast()mFreeCheckerId;
40     }
41     
42     /// Getter to cast away immutable
43     Tid connectingCheckerId()
44     {
45         return cast()mConnectingCheckerId;
46     }
47     
48     /// Getter to cast away immutable
49     Tid queringCheckerId()
50     {
51         return cast()mQueringCheckerId;
52     }
53     
54     /// Creating handler with all workers ids
55     this(Tid closedCheckerId, Tid freeCheckerId, Tid connectingCheckerId, Tid queringCheckerId)
56     {
57         this.mClosedCheckerId     = cast(immutable)closedCheckerId;
58         this.mFreeCheckerId       = cast(immutable)freeCheckerId;
59         this.mConnectingCheckerId = cast(immutable)connectingCheckerId;
60         this.mQueringCheckerId    = cast(immutable)queringCheckerId;
61     }
62     
63     /**
64     *   Sends itself to all workers. Worker can restore local handler
65     *   with use fo $(B receive) method.
66     */
67     void sendTids()
68     {
69         sendTo(closedCheckerId);
70         sendTo(freeCheckerId);
71         sendTo(connectingCheckerId);
72         sendTo(queringCheckerId);
73     }
74     
75     /**
76     *   Sending content of the handler to specified $(B dist)
77     *   thread. There it can restore the handler with $(B receive) 
78     *   method.
79     */   
80     private void sendTo(Tid dist)
81     {
82         dist.send(closedCheckerId);
83         dist.send(freeCheckerId);
84         dist.send(connectingCheckerId);
85         dist.send(queringCheckerId);
86     }
87     
88     /**
89     *   Constructing handler from message mailbox.
90     *   The method is used with $(B sendTids) in
91     *   workers. 
92     */  
93     static shared(ThreadIds) receive()
94     {
95         auto closedTid = receiveOnly!Tid();
96         auto freeTid = receiveOnly!Tid();
97         auto connectingTid = receiveOnly!Tid();
98         auto queringTid = receiveOnly!Tid();
99         return shared ThreadIds(closedTid, freeTid, connectingTid, queringTid);
100     }
101      
102     /**
103     *   Asks all workers to quit.
104     */   
105     void finalize(shared ILogger logger)
106     {
107         void finalizeThread(Tid tid, string name)
108         {
109             tid.send(thisTid, true);
110             if(!receiveTimeout(dur!"seconds"(1), (bool val) {}))
111             {
112                 logger.logDebug(name, " thread refused to terminated safely!");
113             }
114         }
115        
116         finalizeThread(closedCheckerId, "Closed connections");
117         finalizeThread(freeCheckerId, "Free connections");
118         finalizeThread(connectingCheckerId, "Connecting connections");
119         finalizeThread(queringCheckerId, "Quering connections");
120     }
121 }