From 95503df12fc40741976f02e010ca7159425334e7 Mon Sep 17 00:00:00 2001
From: Manuela Kuhn <manuela.kuhn@desy.de>
Date: Wed, 29 Jul 2015 14:11:04 +0200
Subject: [PATCH] Stopping thread in receiver correctly

---
 ZeroMQTunnel/fileMover.py      |   4 +-
 ZeroMQTunnel/receiver.py       | 107 +++++++++++++++++++++------------
 ZeroMQTunnel/watcher_lsyncd.py |  25 +++-----
 wrapper_script.py              |   4 +-
 4 files changed, 79 insertions(+), 61 deletions(-)

diff --git a/ZeroMQTunnel/fileMover.py b/ZeroMQTunnel/fileMover.py
index 1f08670f..940aa506 100644
--- a/ZeroMQTunnel/fileMover.py
+++ b/ZeroMQTunnel/fileMover.py
@@ -494,8 +494,8 @@ class FileMover():
                                                                   self.dataStreamIp,
                                                                   self.dataStreamPort,
                                                                   self.chunkSize,
-                                                                  zmqCleanerIp,
-                                                                  zmqCleanerPort,
+                                                                  self.zmqCleanerIp,
+                                                                  self.zmqCleanerPort,
                                                                   fileWaitTimeInMs,
                                                                   fileMaxWaitTimeInMs))
             workerThreadList.append(newWorkerThread)
diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py
index 28ab21d6..e9ed9e87 100644
--- a/ZeroMQTunnel/receiver.py
+++ b/ZeroMQTunnel/receiver.py
@@ -24,12 +24,15 @@ class FileReceiver:
     zmqDataStreamPort        = None
     zmqLiveViewerIp          = None
     zmqLiveViewerPort        = None
+    zmqLiveViewerExchangeIp   = "127.0.0.1"
+    zmqLiveViewerExchangePort = "6072"
     ringBuffer               = []
     maxRingBufferSize        = 200
     timeToWaitForRingBuffer  = 2
 
     # sockets
     zmqSocket                = None
+    exchangeSocket           = None
 
 
     def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None):
@@ -50,6 +53,12 @@ class FileReceiver:
         self.zmqSocket.bind(connectionStrZmqSocket)
         logging.debug("zmqSocket started for '" + connectionStrZmqSocket + "'")
 
+        self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
+        connectionStrExchangeSocket = "tcp://" + self.zmqLiveViewerExchangeIp + ":%s" % self.zmqLiveViewerExchangePort
+        self.exchangeSocket.bind(connectionStrExchangeSocket)
+        logging.debug("exchangeSocket started (bind)for '" + connectionStrExchangeSocket + "'")
+
+
         # thread to communicate with live viewer
         self.liveViewerThread = threading.Thread(target=self.sendFileToLiveViewer)
         self.liveViewerThread.start()
@@ -100,48 +109,76 @@ class FileReceiver:
     def sendFileToLiveViewer(self):
 
         # create socket for live viewer
-        zmqLiveViewerSocket         = self.zmqContext.socket(zmq.REP)
+        zmqLiveViewerSocket              = self.zmqContext.socket(zmq.REP)
         connectionStrZmqLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort
         zmqLiveViewerSocket.bind(connectionStrZmqLiveViewerSocket)
         logging.debug("zmqLiveViewerSocket started for '" + connectionStrZmqLiveViewerSocket + "'")
 
+        exchangeSocket              = self.zmqContext.socket(zmq.PAIR)
+        connectionStrExchangeSocket = "tcp://" + self.zmqLiveViewerExchangeIp + ":%s" % self.zmqLiveViewerExchangePort
+        exchangeSocket.connect(connectionStrExchangeSocket)
+        logging.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'")
+
+        poller = zmq.Poller()
+        poller.register(zmqLiveViewerSocket, zmq.POLLIN)
+        poller.register(exchangeSocket, zmq.POLLIN)
+
+        should_continue = True
+        while should_continue:
+            socks = dict(poller.poll())
+            if exchangeSocket in socks and socks[exchangeSocket] == zmq.POLLIN:
+                message = exchangeSocket.recv()
+                logging.debug("Recieved control command: %s" % message )
+                if message == "Exit":
+                    logging.debug("Recieved exit command, liveViewer thread will stop recieving messages")
+                    should_continue = False
+                    break
 
-        # if there is a request of the live viewer:
-        while True:
-            #  Wait for next request from client
-            try:
+            if zmqLiveViewerSocket in socks and socks[zmqLiveViewerSocket] == zmq.POLLIN:
                 message = zmqLiveViewerSocket.recv()
-            except zmq.error.ContextTerminated:
-                break
-            print "Received request: ", message
-            time.sleep (1)
-            # send first element in ring buffer to live viewer (the path of this file is the second entry)
-            if self.ringBuffer:
-                try:
-                    zmqLiveViewerSocket.send(self.ringBuffer[0][1])
-                    print self.ringBuffer[0][1]
-                except zmq.error.ContextTerminated:
-                    break
-            else:
+                logging.debug("Call for next file... ")
+                # send first element in ring buffer to live viewer (the path of this file is the second entry)
+                if self.ringBuffer:
+                    answer = self.ringBuffer[0][1]
+                else:
+                    answer = "None"
+
+                print answer
                 try:
-                    zmqLiveViewerSocket.send("None")
-                    print self.ringBuffer
+                    zmqLiveViewerSocket.send(answer)
                 except zmq.error.ContextTerminated:
                     break
 
+        logging.debug("LiveViewerThread: closing socket")
         zmqLiveViewerSocket.close()
+        exchangeSocket.close()
 
 
     def combineMessage(self, zmqSocket):
+        receivingMessages = True
         #save all chunks to file
-        while True:
+        while receivingMessages:
             multipartMessage = zmqSocket.recv_multipart()
+
+            #extract multipart message
+            try:
+                #TODO is string conversion needed here?
+                payloadMetadata = str(multipartMessage[0])
+            except:
+                logging.error("an empty config was transferred for multipartMessage")
+
+            #TODO validate multipartMessage (like correct dict-values for metadata)
+            logging.debug("multipartMessage.metadata = " + str(payloadMetadata))
+
+            #extraction metadata from multipart-message
+            payloadMetadataDict = json.loads(payloadMetadata)
+
             #append to file
             try:
                 logging.debug("append to file based on multipart-message...")
                 #TODO: save message to file using a thread (avoids blocking)
                 #TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
-                self.appendChunksToFileFromMultipartMessage(multipartMessage)
+                self.appendChunksToFileFromMultipartMessage(payloadMetadataDict, multipartMessage)
                 logging.debug("append to file based on multipart-message...success.")
             except Exception, e:
                 errorMessage = "Unable to append multipart-content to file."
@@ -152,12 +189,10 @@ class FileReceiver:
                 errorMessage = "Unable to append multipart-content to file. Unknown Error."
                 logging.error(errorMessage)
                 logging.debug("append to file based on multipart-message...failed.")
-            if len(multipartMessage[1]) == 0:
+            if len(multipartMessage[1]) < payloadMetadataDict["chunkSize"] :
                 #indicated end of file. closing file and leave loop
                 logging.debug("last file-chunk received. stop appending.")
                 break
-        payloadMetadata     = str(multipartMessage[0])
-        payloadMetadataDict = json.loads(payloadMetadata)
         filename            = self.generateTargetFilepath(payloadMetadataDict)
         fileModTime         = payloadMetadataDict["fileModificationTime"]
         logging.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
@@ -166,6 +201,7 @@ class FileReceiver:
         # logging.debug("message-length: " + str(len(multipartMessage)))
 
         # add to ring buffer
+        logging.debug("add file to ring buffer: "+ str(filename) + ", " + str(fileModTime))
         self.addFileToRingBuffer(str(filename), fileModTime)
 
 
@@ -232,13 +268,8 @@ class FileReceiver:
         return targetPath
 
 
-    def appendChunksToFileFromMultipartMessage(self, multipartMessage):
+    def appendChunksToFileFromMultipartMessage(self, configDict, multipartMessage):
 
-        #extract multipart message
-        try:
-            configDictJson = multipartMessage[0]
-        except:
-            logging.error("an empty config was transferred for multipartMessage")
         try:
             chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata
             payload = multipartMessage[1:]
@@ -246,12 +277,6 @@ class FileReceiver:
             logging.warning("an empty file was received within the multipart-message")
             payload = None
 
-        #TODO validate multipartMessage (like correct dict-values for metadata)
-        logging.debug("multipartMessage.metadata = " + str(configDictJson))
-
-        #extraction metadata from multipart-message
-        configDict         = json.loads(configDictJson)
-
 
         #generate target filepath
         targetFilepath = self.generateTargetFilepath(configDict)
@@ -294,7 +319,6 @@ class FileReceiver:
             raise Exception(errorMessage)
 
 
-
     def stopReceiving(self, zmqSocket, zmqContext):
 
         logging.debug("stopReceiving...")
@@ -305,6 +329,12 @@ class FileReceiver:
             logging.error("closing zmqSocket...failed.")
             logging.error(sys.exc_info())
 
+        logging.debug("sending exit signal to thread...")
+        self.exchangeSocket.send("Exit")
+        time.sleep(0.01)
+        self.exchangeSocket.close()
+        logging.debug("sending exit signal to thread...done")
+
         try:
             zmqContext.destroy()
             logging.debug("closing zmqContext...done.")
@@ -313,9 +343,6 @@ class FileReceiver:
             logging.error(sys.exc_info())
 
 
-
-
-
 def argumentParsing():
 
     parser = argparse.ArgumentParser()
diff --git a/ZeroMQTunnel/watcher_lsyncd.py b/ZeroMQTunnel/watcher_lsyncd.py
index de7efeb0..d54f3729 100644
--- a/ZeroMQTunnel/watcher_lsyncd.py
+++ b/ZeroMQTunnel/watcher_lsyncd.py
@@ -12,7 +12,6 @@ import helperScript
 
 
 
-# class MyHandler(PatternMatchingEventHandler):
 class DirectoryWatcherHandler():
     patterns            = ["*"]
     zmqContext          = None
@@ -25,27 +24,19 @@ class DirectoryWatcherHandler():
     def __init__(self, zmqContext, fileEventServerIp, watchFolder, fileEventServerPort):
         logging.debug("DirectoryWatcherHandler: __init__()")
         # logging.debug("DirectoryWatcherHandler(): type(zmqContext) = " + str(type(zmqContext)))
-        logging.info("registering zmq global context")
-        self.globalZmqContext    = zmqContext
+        logging.info("registering zmq context")
+        self.zmqContext          = zmqContext
         self.watchFolder         = os.path.normpath(watchFolder)
         self.fileEventServerIp   = fileEventServerIp
         self.fileEventServerPort = fileEventServerPort
 
-        #create zmq sockets
-        self.messageSocket = self.createPushSocket(self.globalZmqContext, fileEventServerPort)
-
-
-    def createPushSocket(self, context, fileEventServerPort):
-
-        assert isinstance(context, zmq.sugar.context.Context)
+        assert isinstance(self.zmqContext, zmq.sugar.context.Context)
 
-        socket = context.socket(zmq.PUSH)
-
-        zmqSocketStr = 'tcp://' + self.fileEventServerIp + ':' + str(fileEventServerPort)
+        #create zmq sockets
+        self.messageSocket = zmqContext.socket(zmq.PUSH)
+        zmqSocketStr = "tcp://" + self.fileEventServerIp + ":" + str(self.fileEventServerPort)
+        self.messageSocket.connect(zmqSocketStr)
         logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr))
-        socket.connect(zmqSocketStr)
-
-        return socket
 
 
     def passFileToZeromq(self, filepath, targetPath):
@@ -370,8 +361,8 @@ if __name__ == '__main__':
 
     workers = zmqContext.socket(zmq.PULL)
     zmqSocketStr = 'tcp://' + communicationWithLcyncdIp + ':' + communicationWithLcyncdPort
-    logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr))
     workers.bind(zmqSocketStr)
+    logging.debug("Bind to lcyncd ZMQ socket: " + str(zmqSocketStr))
 
     try:
         while True:
diff --git a/wrapper_script.py b/wrapper_script.py
index 86d546f1..32d99388 100644
--- a/wrapper_script.py
+++ b/wrapper_script.py
@@ -64,13 +64,13 @@ zmqPort = "6080"
 
 if supported_file:
 
-       # set up ZeroMQ
+    # set up ZeroMQ
     zmqContext = zmq.Context()
 
     socket = zmqContext.socket(zmq.PUSH)
     zmqSocketStr = 'tcp://' + zmqIp + ':' + zmqPort
-    logging.debug( "Connecting to ZMQ socket: " + str(zmqSocketStr))
     socket.connect(zmqSocketStr)
+    logging.debug( "Connecting to ZMQ socket: " + str(zmqSocketStr))
 
     #send reply back to server
     workload = { "filepath": source, "targetPath": target }
-- 
GitLab