1 // Written in D programming language 2 /** 3 * Part of asynchronous pool realization. 4 * 5 * Copyright: © 2014 DSoftOut 6 * License: Subject to the terms of the MIT license, as written in the included LICENSE file. 7 * Authors: NCrashed <ncrashed@gmail.com> 8 */ 9 module pgator.db.async.workers.handler; 10 11 import std.concurrency; 12 import dlogg.log; 13 import core.time; 14 15 /** 16 * Struct-helper that holds all workers ids. It solves problem with 17 * immutability for Tids and allows to send and receive messages in a batch. 18 */ 19 shared struct ThreadIds 20 { 21 /// Worker for closed connections 22 immutable Tid mClosedCheckerId; 23 /// Worker for free connections 24 immutable Tid mFreeCheckerId; 25 /// Worker for connections in connecting state 26 immutable Tid mConnectingCheckerId; 27 /// Worker for quering 28 immutable Tid mQueringCheckerId; 29 30 /// Getter to cast away immutable 31 Tid closedCheckerId() 32 { 33 return cast()mClosedCheckerId; 34 } 35 36 /// Getter to cast away immutable 37 Tid freeCheckerId() 38 { 39 return cast()mFreeCheckerId; 40 } 41 42 /// Getter to cast away immutable 43 Tid connectingCheckerId() 44 { 45 return cast()mConnectingCheckerId; 46 } 47 48 /// Getter to cast away immutable 49 Tid queringCheckerId() 50 { 51 return cast()mQueringCheckerId; 52 } 53 54 /// Creating handler with all workers ids 55 this(Tid closedCheckerId, Tid freeCheckerId, Tid connectingCheckerId, Tid queringCheckerId) 56 { 57 this.mClosedCheckerId = cast(immutable)closedCheckerId; 58 this.mFreeCheckerId = cast(immutable)freeCheckerId; 59 this.mConnectingCheckerId = cast(immutable)connectingCheckerId; 60 this.mQueringCheckerId = cast(immutable)queringCheckerId; 61 } 62 63 /** 64 * Sends itself to all workers. Worker can restore local handler 65 * with use fo $(B receive) method. 66 */ 67 void sendTids() 68 { 69 sendTo(closedCheckerId); 70 sendTo(freeCheckerId); 71 sendTo(connectingCheckerId); 72 sendTo(queringCheckerId); 73 } 74 75 /** 76 * Sending content of the handler to specified $(B dist) 77 * thread. There it can restore the handler with $(B receive) 78 * method. 79 */ 80 private void sendTo(Tid dist) 81 { 82 dist.send(closedCheckerId); 83 dist.send(freeCheckerId); 84 dist.send(connectingCheckerId); 85 dist.send(queringCheckerId); 86 } 87 88 /** 89 * Constructing handler from message mailbox. 90 * The method is used with $(B sendTids) in 91 * workers. 92 */ 93 static shared(ThreadIds) receive() 94 { 95 auto closedTid = receiveOnly!Tid(); 96 auto freeTid = receiveOnly!Tid(); 97 auto connectingTid = receiveOnly!Tid(); 98 auto queringTid = receiveOnly!Tid(); 99 return shared ThreadIds(closedTid, freeTid, connectingTid, queringTid); 100 } 101 102 /** 103 * Asks all workers to quit. 104 */ 105 void finalize(shared ILogger logger) 106 { 107 void finalizeThread(Tid tid, string name) 108 { 109 tid.send(thisTid, true); 110 if(!receiveTimeout(dur!"seconds"(1), (bool val) {})) 111 { 112 logger.logDebug(name, " thread refused to terminated safely!"); 113 } 114 } 115 116 finalizeThread(closedCheckerId, "Closed connections"); 117 finalizeThread(freeCheckerId, "Free connections"); 118 finalizeThread(connectingCheckerId, "Connecting connections"); 119 finalizeThread(queringCheckerId, "Quering connections"); 120 } 121 }