From 5ca76b26905ce67433e257d359339d05a8890d20 Mon Sep 17 00:00:00 2001
From: Manuela Kuhn <manuela.kuhn@desy.de>
Date: Thu, 7 Apr 2016 16:24:31 +0200
Subject: [PATCH] Added control channel and fixed some termination bugs

---
 conf/dataManager.conf                         |   5 +-
 src/sender/DataDispatcher.py                  | 211 +++++++++++-------
 src/sender/DataManager.py                     |  89 +++++---
 src/sender/SignalHandler.py                   |   7 +-
 src/sender/TaskProvider.py                    |  25 ++-
 src/sender/eventDetectors/InotifyxDetector.py |   4 +-
 src/sender/eventDetectors/WatchdogDetector.py |   5 +-
 src/shared/helpers.py                         |   4 +
 8 files changed, 216 insertions(+), 134 deletions(-)

diff --git a/conf/dataManager.conf b/conf/dataManager.conf
index 6bebbd97..af79aa6f 100644
--- a/conf/dataManager.conf
+++ b/conf/dataManager.conf
@@ -31,6 +31,8 @@ requestPort        = 50001
 # ZMQ port to forward requests
 requestFwPort      = 50002
 
+# ZMQ port to disribute control signals
+controlPort        = 50005
 
 #########################################
 ####   EventDetector Configuration   ####
@@ -115,6 +117,3 @@ routerPort         = 50004
 localTarget        = /space/projects/zeromq-data-transfer/data/target
 #localTarget       = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target
 #localTarget       = /gpfs
-
-# ZMQ-pull-socket port which deletes/moves given files
-cleanerPort        = 50005
diff --git a/src/sender/DataDispatcher.py b/src/sender/DataDispatcher.py
index 537b5b53..437e9aa4 100644
--- a/src/sender/DataDispatcher.py
+++ b/src/sender/DataDispatcher.py
@@ -34,7 +34,7 @@ import helpers
 class DataDispatcher():
 #class DataDispatcher(Process):
 
-    def __init__ (self, id, routerPort, chunkSize, fixedStreamId, dataFetcherProp,
+    def __init__ (self, id, controlPort, routerPort, chunkSize, fixedStreamId, dataFetcherProp,
                 logQueue, localTarget = None, context = None):
 
 #        dataFetcherProp = {
@@ -57,11 +57,15 @@ class DataDispatcher():
 
         self.localhost       = "127.0.0.1"
         self.extIp           = "0.0.0.0"
+        self.controlPort     = controlPort
         self.routerPort      = routerPort
         self.chunkSize       = chunkSize
 
+        self.controlSocket   = None
         self.routerSocket    = None
 
+        self.poller          = None
+
         self.fixedStreamId   = fixedStreamId
         self.localTarget     = localTarget
 
@@ -94,18 +98,35 @@ class DataDispatcher():
         try:
             self.run()
         except KeyboardInterrupt:
-            self.log.debug("KeyboardInterrupt detected. Shutting down DataDispatcher-" + str(self.id) + ".")
-            self.stop()
+            pass
         except:
             self.log.error("Stopping DataDispatcher-" + str(self.id) + " due to unknown error condition.", exc_info=True)
             self.stop()
 
 
     def __createSockets (self):
+
+        # socket for control signals
+        self.controlSocket = self.context.socket(zmq.SUB)
+        connectionStr  = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.controlPort )
+        try:
+            self.controlSocket.connect(connectionStr)
+            self.log.info("Start controlSocket (connect): '" + str(connectionStr) + "'")
+        except:
+            self.log.error("Failed to start controlSocket (connect): '" + connectionStr + "'", exc_info=True)
+
+        # socket to get new workloads from
         self.routerSocket = self.context.socket(zmq.PULL)
         connectionStr  = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.routerPort )
-        self.routerSocket.connect(connectionStr)
-        self.log.info("Start routerSocket (connect): '" + str(connectionStr) + "'")
+        try:
+            self.routerSocket.connect(connectionStr)
+            self.log.info("Start routerSocket (connect): '" + str(connectionStr) + "'")
+        except:
+            self.log.error("Failed to start routerSocket (connect): '" + connectionStr + "'", exc_info=True)
+
+        self.poller = zmq.Poller()
+        self.poller.register(self.controlSocket, zmq.POLLIN)
+        self.poller.register(self.routerSocket, zmq.POLLIN)
 
 
     # Send all logs to the main process
@@ -126,124 +147,144 @@ class DataDispatcher():
     def run (self):
 
         while True:
-
-            # Get workload from router, until finished
             self.log.debug("DataDispatcher-" + str(self.id) + ": waiting for new job")
-            try:
-                message = self.routerSocket.recv_multipart()
-                self.log.debug("DataDispatcher-" + str(self.id) + ": new job received")
-                self.log.debug("message = " + str(message))
-            except KeyboardInterrupt:
-                break
-            except:
-                self.log.error("DataDispatcher-" + str(self.id) + ": waiting for new job...failed", exc_info=True)
-                continue
+            socks = dict(self.poller.poll())
 
+            if self.routerSocket in socks and socks[self.routerSocket] == zmq.POLLIN:
 
-            if len(message) >= 2:
+                try:
+                    message = self.routerSocket.recv_multipart()
+                    self.log.debug("DataDispatcher-" + str(self.id) + ": new job received")
+                    self.log.debug("message = " + str(message))
+                except:
+                    self.log.error("DataDispatcher-" + str(self.id) + ": waiting for new job...failed", exc_info=True)
+                    continue
 
-                if message[0] == b"CLOSE_SOCKETS":
 
-                    targets  = cPickle.loads(message[1])
+                if len(message) >= 2:
+
+                    if message[0] == b"CLOSE_SOCKETS":
+
+                        targets  = cPickle.loads(message[1])
+
+                        for socketId, prio in targets:
+                            if self.openConnections.has_key(socketId):
+                                self.log.info("Closing socket " + str(socketId))
+                                if self.openConnections[socketId]:
+                                    self.openConnections[socketId].close(0)
+                                del self.openConnections[socketId]
+                        continue
+                    else:
+                        workload = cPickle.loads(message[0])
+                        targets  = cPickle.loads(message[1])
+
+                        if self.fixedStreamId:
+                            targets.insert(0,[self.fixedStreamId, 0, "data"])
+
+                        # sort the target list by the priority
+                        targets = sorted(targets, key=lambda target: target[1])
 
-                    for socketId, prio in targets:
-                        if self.openConnections.has_key(socketId):
-                            self.log.info("Closing socket " + str(socketId))
-                            if self.openConnections[socketId]:
-                                self.openConnections[socketId].close(0)
-                            del self.openConnections[socketId]
-                    continue
                 else:
+                    #TODO is this needed?
                     workload = cPickle.loads(message[0])
-                    targets  = cPickle.loads(message[1])
 
-                    if self.fixedStreamId:
-                        targets.insert(0,[self.fixedStreamId, 0, "data"])
+                    # TODO move this in if len(message) >=2 statement?
+                    if workload == b"CLOSE_FILE":
+                        self.log.debug("Router requested to send signal that file was closed.")
+                        payload = [ workload, self.id ]
 
-                    # sort the target list by the priority
-                    targets = sorted(targets, key=lambda target: target[1])
+                        # socket already known
+                        if self.fixedStreamId in self.openConnections:
+                            tracker = self.openConnections[self.fixedStreamId].send_multipart(payload, copy=False, track=True)
+                            self.log.info("Sending close file signal to '" + self.fixedStreamId + "' with priority 0")
+                        else:
+                            # open socket
+                            socket        = context.socket(zmq.PUSH)
+                            connectionStr = "tcp://" + str(self.fixedStreamId)
 
-            elif message[0] == b"EXIT":
-                self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
-                break
+                            socket.connect(connectionStr)
+                            self.log.info("Start socket (connect): '" + str(connectionStr) + "'")
 
-            else:
-                #TODO is this needed?
-                workload = cPickle.loads(message[0])
+                            # register socket
+                            self.openConnections[self.fixedStreamId] = socket
 
-                # TODO move this in if len(message) >=2 statement?
-                if workload == b"CLOSE_FILE":
-                    self.log.debug("Router requested to send signal that file was closed.")
-                    payload = [ workload, self.id ]
+                            # send data
+                            tracker = self.openConnections[self.fixedStreamId].send_multipart(payload, copy=False, track=True)
+                            self.log.info("Sending close file signal to '" + self.fixedStreamId + "' with priority 0" )
 
-                    # socket already known
-                    if self.fixedStreamId in self.openConnections:
-                        tracker = self.openConnections[self.fixedStreamId].send_multipart(payload, copy=False, track=True)
-                        self.log.info("Sending close file signal to '" + self.fixedStreamId + "' with priority 0")
-                    else:
-                        # open socket
-                        socket        = context.socket(zmq.PUSH)
-                        connectionStr = "tcp://" + str(self.fixedStreamId)
+                        # socket not known
+                        if not tracker.done:
+                            self.log.info("Close file signal has not been sent yet, waiting...")
+                            tracker.wait()
+                            self.log.info("Close file signal has not been sent yet, waiting...done")
 
-                        socket.connect(connectionStr)
-                        self.log.info("Start socket (connect): '" + str(connectionStr) + "'")
+                        time.sleep(2)
+                        self.log.debug("Continue after sleeping.")
+                        continue
 
-                        # register socket
-                        self.openConnections[self.fixedStreamId] = socket
+                    elif self.fixedStreamId:
+                        targets = [[self.fixedStreamId, 0, "data"]]
 
-                        # send data
-                        tracker = self.openConnections[self.fixedStreamId].send_multipart(payload, copy=False, track=True)
-                        self.log.info("Sending close file signal to '" + self.fixedStreamId + "' with priority 0" )
+                    else:
+                        targets = []
 
-                    # socket not known
-                    if not tracker.done:
-                        self.log.info("Close file signal has not been sent yet, waiting...")
-                        tracker.wait()
-                        self.log.info("Close file signal has not been sent yet, waiting...done")
 
-                    time.sleep(2)
-                    self.log.debug("Continue after sleeping.")
-                    continue
+                # get metadata of the file
+                try:
+                    self.log.debug("Getting file metadata")
+                    sourceFile, targetFile, metadata = self.dataFetcher.getMetadata(self.log, workload, self.chunkSize, self.localTarget)
 
-                elif self.fixedStreamId:
-                    targets = [[self.fixedStreamId, 0, "data"]]
+                except:
+                    self.log.error("Building of metadata dictionary failed for workload: " + str(workload) + ".", exc_info=True)
+                    #skip all further instructions and continue with next iteration
+                    continue
 
-                else:
-                    targets = []
+                # send data
+                try:
+                    self.dataFetcher.sendData(self.log, targets, sourceFile, targetFile, metadata, self.openConnections, self.context, self.dataFetcherProp)
+                except:
+                    self.log.error("DataDispatcher-"+str(self.id) + ": Passing new file to data stream...failed.", exc_info=True)
 
+                # finish data handling
+                self.dataFetcher.finishDataHandling(self.log, sourceFile, targetFile, self.dataFetcherProp)
 
-            # get metadata of the file
-            try:
-                self.log.debug("Getting file metadata")
-                sourceFile, targetFile, metadata = self.dataFetcher.getMetadata(self.log, workload, self.chunkSize, self.localTarget)
 
-            except:
-                self.log.error("Building of metadata dictionary failed for workload: " + str(workload) + ".", exc_info=True)
-                #skip all further instructions and continue with next iteration
-                continue
+            if self.controlSocket in socks and socks[self.controlSocket] == zmq.POLLIN:
 
-            # send data
-            try:
-                self.dataFetcher.sendData(self.log, targets, sourceFile, targetFile, metadata, self.openConnections, self.context, self.dataFetcherProp)
-            except:
-                self.log.error("DataDispatcher-"+str(self.id) + ": Passing new file to data stream...failed.", exc_info=True)
+                try:
+                    message = self.controlSocket.recv_multipart()
+                    self.log.debug("DataDispatcher-" + str(self.id) + ": control signal received")
+                    self.log.debug("message = " + str(message))
+                except:
+                    self.log.error("DataDispatcher-" + str(self.id) + ": waiting for control signal...failed", exc_info=True)
+                    continue
 
-            # finish data handling
-            self.dataFetcher.finishDataHandling(self.log, sourceFile, targetFile, self.dataFetcherProp)
+                if message[0] == b"EXIT":
+                    self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
+                    break
 
 
     def stop (self):
         self.log.debug("Closing sockets for DataDispatcher-" + str(self.id))
+
         for connection in self.openConnections:
             if self.openConnections[connection]:
                 self.log.info("Closing socket " + str(connection))
                 self.openConnections[connection].close(0)
                 self.openConnections[connection] = None
+
+        if self.controlSocket:
+            self.log.info("Closing controlSocket")
+            self.controlSocket.close(0)
+            self.controlSocket = None
+
         if self.routerSocket:
             self.log.info("Closing routerSocket")
             self.routerSocket.close(0)
             self.routerSocket = None
+
         self.dataFetcher.clean(self.dataFetcherProp)
+
         if not self.extContext and self.context:
             self.log.debug("Destroying context")
             self.context.destroy(0)
@@ -290,7 +331,7 @@ if __name__ == '__main__':
     copyfile(sourceFile, targetFile)
     time.sleep(0.5)
 
-
+    controlPort   = "50005"
     routerPort    = "7000"
     receivingPort = "6005"
     receivingPort2 = "6006"
@@ -317,7 +358,7 @@ if __name__ == '__main__':
     context       = zmq.Context.instance()
 
 #    dataDispatcherPr = DataDispatcher( "0/1", routerPort, chunkSize, fixedStreamId, logQueue, localTarget, context)
-    dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, routerPort, chunkSize, fixedStreamId, dataFetcherProp,
+    dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, controlPort, routerPort, chunkSize, fixedStreamId, dataFetcherProp,
                                                                   logQueue, localTarget, context) )
     dataDispatcherPr.start()
 
diff --git a/src/sender/DataManager.py b/src/sender/DataManager.py
index 8abad26a..983adb18 100644
--- a/src/sender/DataManager.py
+++ b/src/sender/DataManager.py
@@ -11,6 +11,7 @@ import time
 import cPickle
 from multiprocessing import Process, freeze_support, Queue
 import ConfigParser
+import threading
 
 from SignalHandler import SignalHandler
 from TaskProvider import TaskProvider
@@ -31,7 +32,6 @@ from logutils.queue import QueueHandler
 import helpers
 from version import __version__
 
-
 def argumentParsing():
     configFile = CONFIG_PATH + os.sep + "dataManager.conf"
 
@@ -63,12 +63,16 @@ def argumentParsing():
 
     # SignalHandler config
 
+    controlPort        = config.get('asection', 'controlPort')
     comPort            = config.get('asection', 'comPort')
     whitelist          = json.loads(config.get('asection', 'whitelist'))
 
     requestPort        = config.get('asection', 'requestPort')
     requestFwPort      = config.get('asection', 'requestFwPort')
 
+    parser.add_argument("--controlPort"       , type    = str,
+                                                help    = "Port number to distribute control signals (default=" + str(controlPort) + ")",
+                                                default = controlPort )
     parser.add_argument("--comPort"           , type    = str,
                                                 help    = "Port number to receive signals (default=" + str(comPort) + ")",
                                                 default = comPort )
@@ -162,7 +166,6 @@ def argumentParsing():
     routerPort         = config.get('asection', 'routerPort')
 
     localTarget        = config.get('asection', 'localTarget')
-    cleanerPort        = config.get('asection', 'cleanerPort')
 
     parser.add_argument("--dataFetcherType"   , type    = str,
                                                 help    = "Module with methods specifying how to get the data (default=" + str(dataFetcherType) + ")",
@@ -199,10 +202,6 @@ def argumentParsing():
     parser.add_argument("--localTarget"       , type    = str,
                                                 help    = "Target to move the files into (default=" + str(localTarget) + ")",
                                                 default = localTarget )
-    parser.add_argument("--cleanerPort"       , type    = str,
-                                                help    = "ZMQ-pull-socket port which deletes/moves given file \
-                                                           (default=" + str(cleanerPort) + ")",
-                                                default = cleanerPort )
 
     arguments         = parser.parse_args()
 
@@ -266,7 +265,7 @@ class DataManager():
             if onScreen:
                 h1, h2 = helpers.getLogHandlers(logfile, logsize, verbose, onScreen)
 
-                # Start queue listener using the stream handler above
+                # Start queue listener using the stream handler abovehelper.globalObject.
                 self.logQueueListener = helpers.CustomQueueListener(self.logQueue, h1, h2)
             else:
                 h1 = helpers.getLogHandlers(logfile, logsize, verbose, onScreen)
@@ -279,12 +278,20 @@ class DataManager():
         # Create log and set handler to queue handle
         self.log = self.getLogger(self.logQueue)
 
+        self.log.info("DataManager started (PID " + str(os.getpid()) + ").")
+
+
+        self.controlPort      = arguments.controlPort
+
         self.comPort          = arguments.comPort
         self.whitelist        = arguments.whitelist
 
         self.requestPort      = arguments.requestPort
         self.requestFwPort    = arguments.requestFwPort
 
+        self.localhost        = "localhost"
+        self.extHost          = "0.0.0.0"
+
         if arguments.useDataStream:
             self.fixedStreamId = "{host}:{port}".format( host=arguments.fixedStreamHost, port=arguments.fixedStreamPort )
         else:
@@ -296,9 +303,8 @@ class DataManager():
         self.routerPort       = arguments.routerPort
 
         self.localTarget      = arguments.localTarget
-        self.cleanerPort      = arguments.cleanerPort
 
-        # Assemble configuration for eventDetector
+        # Assemble configuration for eventDetectorhelper.globalObject.
         self.log.debug("Configured type of eventDetector: " + arguments.eventDetectorType)
         if arguments.eventDetectorType == "InotifyxDetector":
             self.eventDetectorConfig = {
@@ -353,7 +359,7 @@ class DataManager():
                     "localTarget" : self.localTarget,
                     "session"     : None,
                     "storeFlag"   : True,  #TODO add to config
-                    "removeFlag"  : True  #TODO add to config
+                    "removeFlag"  : False  #TODO add to config
                     }
 
 
@@ -365,9 +371,12 @@ class DataManager():
 
         #create zmq context
         # there should be only one context in one process
-        self.zmqContext = zmq.Context.instance()
+#        self.context = zmq.Context.instance()
+        self.context = zmq.Context()
         self.log.debug("Registering global ZMQ context")
 
+        self.createSockets()
+
         self.run()
 
     # Send all logs to the main process
@@ -384,6 +393,16 @@ class DataManager():
 
         return logger
 
+    def createSockets(self):
+
+        # socket for control signals
+        helpers.globalObjects.controlSocket = self.context.socket(zmq.PUB)
+        connectionStr  = "tcp://{ip}:{port}".format( ip=self.extHost, port=self.controlPort )
+        try:
+            helpers.globalObjects.controlSocket.bind(connectionStr)
+            self.log.info("Start controlSocket (bind): '" + str(connectionStr) + "'")
+        except:
+            self.log.error("Failed to start controlSocket (bind): '" + connectionStr + "'", exc_info=True)
 
 
     def run (self):
@@ -393,41 +412,43 @@ class DataManager():
         # needed, because otherwise the requests for the first files are not forwarded properly
         time.sleep(0.5)
 
-        self.taskProviderPr = Process ( target = TaskProvider, args = (self.eventDetectorConfig, self.requestFwPort, self.routerPort, self.logQueue) )
+        self.taskProviderPr = threading.Thread ( target = TaskProvider, args = (self.eventDetectorConfig, self.controlPort, self.requestFwPort, self.routerPort, self.logQueue, self.context) )
+#        self.taskProviderPr = Process ( target = TaskProvider, args = (self.eventDetectorConfig, self.controlPort, self.requestFwPort, self.routerPort, self.logQueue) )
         self.taskProviderPr.start()
 
         for i in range(self.numberOfStreams):
             id = str(i) + "/" + str(self.numberOfStreams)
-            pr = Process ( target = DataDispatcher, args = ( id, self.routerPort, self.chunkSize, self.fixedStreamId, self.dataFetcherProp,
+            pr = Process ( target = DataDispatcher, args = (id, self.controlPort, self.routerPort, self.chunkSize, self.fixedStreamId, self.dataFetcherProp,
                                                             self.logQueue, self.localTarget) )
             pr.start()
             self.dataDispatcherPr.append(pr)
 
 
     def stop (self):
-        if self.signalHandlerPr:
-            self.log.info("terminate SignalHandler...")
-            self.signalHandlerPr.terminate()
-            self.signalHandlerPr = None
-            self.log.info("terminate SignalHandler...done")
-
-        if self.taskProviderPr:
-            self.log.info("terminate TaskProvider...")
-            self.taskProviderPr.terminate()
-            self.taskProviderPr = None
-            self.log.info("terminate TaskProvider...done")
-
-        for pr in self.dataDispatcherPr:
-            id = str(self.dataDispatcherPr.index(pr)) + "/" + str(self.numberOfStreams)
-            self.log.info("terminate DataDispatcher-" + str(id) + "...")
-            pr.terminate()
-            pr = None
-            self.log.info("terminate DataDispatcher-" + str(id) + "...done")
-
-        if self.dataDispatcherPr == [ None for i in self.dataDispatcherPr ]:
-            self.dataDispatcher = []
+
+        if helpers.globalObjects.controlSocket:
+            self.log.info("Sending 'Exit' signal")
+            helpers.globalObjects.controlSocket.send_multipart(["Exit"])
+
+        if helpers.globalObjects.controlFlag:
+            helpers.globalObjects.controlFlag = False
+
+        # waiting till the other processes are finished
+        time.sleep(1)
+
+        if helpers.globalObjects.controlSocket:
+            self.log.info("Closing controlSocket")
+            helpers.globalObjects.controlSocket.close(0)
+            helpers.globalObjects.controlSocket = None
+
+        if self.context:
+            self.log.debug("Destroying context")
+            self.context.destroy(0)
+            self.context = None
+            self.log.debug("Destroying context..done")
 
         if not self.extLogQueue and self.logQueueListener:
+            self.log.debug("Stopping logQueue")
             self.logQueue.put_nowait(None)
             self.logQueueListener.stop()
             self.logQueueListener = None
diff --git a/src/sender/SignalHandler.py b/src/sender/SignalHandler.py
index dae71eda..19b20dab 100644
--- a/src/sender/SignalHandler.py
+++ b/src/sender/SignalHandler.py
@@ -82,7 +82,7 @@ class SignalHandler():
         try:
             self.run()
         except KeyboardInterrupt:
-            self.stop()
+            pass
         except:
             self.log.error("Stopping signalHandler due to unknown error condition.", exc_info=True)
             self.stop()
@@ -176,10 +176,9 @@ class SignalHandler():
                             openRequests.append(tmp)
 
                     if self.forwardSignal:
-                        self.log.info("Fowarding control signal " + str(self.forwardSignal))
 
                         self.requestFwSocket.send_multipart([self.forwardSignal[0], cPickle.dumps(self.forwardSignal[1])])
-                        self.log.debug("Answered to request: " + str(self.forwardSignal))
+                        self.log.info("Fowarding control signal " + str(self.forwardSignal))
 
                         self.forwardSignal = []
                     else:
@@ -230,7 +229,6 @@ class SignalHandler():
                             self.openRequVari[index].append(self.allowedQueries[index][i])
                             self.log.debug("Add to openRequVari: " + str(self.allowedQueries[index][i]) )
 
-
     def checkSignal (self, incomingMessage):
 
         if len(incomingMessage) != 3:
@@ -444,6 +442,7 @@ class SignalHandler():
 
 
     def stop (self):
+
         self.log.debug("Closing sockets")
         if self.comSocket:
             self.comSocket.close(0)
diff --git a/src/sender/TaskProvider.py b/src/sender/TaskProvider.py
index e66d311e..e93b2130 100644
--- a/src/sender/TaskProvider.py
+++ b/src/sender/TaskProvider.py
@@ -32,7 +32,7 @@ import helpers
 #
 
 class TaskProvider():
-    def __init__ (self, eventDetectorConfig, requestFwPort, routerPort, logQueue, context = None):
+    def __init__ (self, eventDetectorConfig, controlPort, requestFwPort, routerPort, logQueue, context = None):
         global BASE_PATH
 
         #eventDetectorConfig = {
@@ -56,11 +56,15 @@ class TaskProvider():
 
         self.localhost          = "127.0.0.1"
         self.extIp              = "0.0.0.0"
+        self.controlPort        = controlPort
         self.requestFwPort      = requestFwPort
         self.routerPort         = routerPort
+
         self.requestFwSocket    = None
         self.routerSocket       = None
 
+        self.poller             = None
+
         self.log.debug("Registering ZMQ context")
         # remember if the context was created outside this class or not
         if context:
@@ -81,8 +85,7 @@ class TaskProvider():
         try:
             self.run()
         except KeyboardInterrupt:
-            self.log.debug("Keyboard interruption detected. Shuting down")
-            self.stop()
+            pass
         except:
             self.log.info("Stopping TaskProvider due to unknown error condition.", exc_info=True)
             self.stop()
@@ -104,6 +107,7 @@ class TaskProvider():
 
 
     def createSockets (self):
+
         # socket to get requests
         self.requestFwSocket = self.context.socket(zmq.REQ)
         connectionStr  = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.requestFwPort )
@@ -124,7 +128,8 @@ class TaskProvider():
 
 
     def run (self):
-        while True:
+
+        while helpers.globalObjects.controlFlag:
             try:
                 # the event for a file /tmp/test/source/local/file1.tif is of the form:
                 # {
@@ -140,6 +145,8 @@ class TaskProvider():
                 #skip all further instructions and continue with next iteration
                 continue
 
+
+
             #TODO validate workload dict
             for workload in workloadList:
                 # get requests for this event
@@ -192,15 +199,20 @@ class TaskProvider():
                     self.log.error("Sending message...failed.", exc_info=True)
 
 
-
     def stop (self):
+        self.log.debug("Closing sockets")
         if self.routerSocket:
+            self.log.info("Closing routerSocket")
             self.routerSocket.close(0)
             self.routerSocket = None
+
         if self.requestFwSocket:
+            self.log.info("Closing requestFwSocket")
             self.requestFwSocket.close(0)
             self.requestFwSocket = None
+
         if not self.extContext and self.context:
+            self.log.debug("Destroying context")
             self.context.destroy(0)
             self.context = None
 
@@ -290,6 +302,7 @@ if __name__ == '__main__':
 
     requestFwPort = "6001"
     routerPort    = "7000"
+    controlPort   = "50005"
 
     logQueue = Queue(-1)
 
@@ -307,7 +320,7 @@ if __name__ == '__main__':
     root.addHandler(qh)
 
 
-    taskProviderPr = Process ( target = TaskProvider, args = (eventDetectorConfig, requestFwPort, routerPort, logQueue) )
+    taskProviderPr = Process ( target = TaskProvider, args = (eventDetectorConfig, controlPort, requestFwPort, routerPort, logQueue) )
     taskProviderPr.start()
 
     requestResponderPr = Process ( target = requestResponder, args = ( requestFwPort, logQueue) )
diff --git a/src/sender/eventDetectors/InotifyxDetector.py b/src/sender/eventDetectors/InotifyxDetector.py
index 9419abed..de79864f 100644
--- a/src/sender/eventDetectors/InotifyxDetector.py
+++ b/src/sender/eventDetectors/InotifyxDetector.py
@@ -117,6 +117,8 @@ class EventDetector():
             self.monSuffixes  = tuple(config["monSuffixes"])
             self.monSubdirs   = config["monSubdirs"]
 
+            self.timeout      = 0.5
+
             self.add_watch()
 
 
@@ -189,7 +191,7 @@ class EventDetector():
         eventMessageList = []
         eventMessage = {}
 
-        events = self.get_events(self.fd)
+        events = self.get_events(self.fd, self.timeout)
         removedWd = None
         for event in events:
 
diff --git a/src/sender/eventDetectors/WatchdogDetector.py b/src/sender/eventDetectors/WatchdogDetector.py
index 818094d6..7f651f56 100644
--- a/src/sender/eventDetectors/WatchdogDetector.py
+++ b/src/sender/eventDetectors/WatchdogDetector.py
@@ -408,6 +408,7 @@ class EventDetector():
 
 
     def stop(self):
+        self.log.info("Stopping observer Threads")
         for observer in  self.observerThreads:
             observer.stop()
             observer.join()
@@ -416,10 +417,12 @@ class EventDetector():
         self.checkingThread.stop()
         self.checkingThread.join()
 
-
     def __exit__(self):
         self.stop()
 
+    def __del__(self):
+        self.stop()
+
 
 if __name__ == '__main__':
     import sys
diff --git a/src/shared/helpers.py b/src/shared/helpers.py
index ac94b3ff..882ca3dc 100644
--- a/src/shared/helpers.py
+++ b/src/shared/helpers.py
@@ -84,6 +84,10 @@ def getTransportProtocol():
         return "tcp"
 
 
+class globalObjects (object):
+    controlSocket = None
+    controlFlag   = True
+
 # This function is needed because configParser always needs a section name
 # the used config file consists of key-value pairs only
 # source: http://stackoverflow.com/questions/2819696/parsing-properties-file-in-python/2819788#2819788
-- 
GitLab