From 3694a3d9533bcf266436677ee1a441ed1ecc35a1 Mon Sep 17 00:00:00 2001
From: Manuela Kuhn <>
Date: Fri, 24 Jul 2015 13:14:21 +0200
Subject: [PATCH] Movement of file is done by ZeroMQ

 ZeroMQTunnel/      | 265 +++++++++++++++++++++++++++++++--
 ZeroMQTunnel/       |  14 +-
 ZeroMQTunnel/ |  87 ++++++++---
 start_scripts/  |   2 +-              |  89 +++++++----
 5 files changed, 387 insertions(+), 70 deletions(-)

diff --git a/ZeroMQTunnel/ b/ZeroMQTunnel/
index 4943e9a7..ce1cd744 100644
--- a/ZeroMQTunnel/
+++ b/ZeroMQTunnel/
@@ -14,20 +14,23 @@ import traceback
 from multiprocessing import Process, freeze_support
 import subprocess
 import json
+import shutil
 #  --------------------------  class: WorkerProcess  --------------------------------------
 class WorkerProcess():
-    id                  = None
-    dataStreamIp        = None
-    dataStreamPort      = None
-    logfileFullPath     = None
-    zmqContextForWorker = None
-    zmqMessageChunkSize = None
-    fileWaitTime_inMs   = None
-    fileMaxWaitTime_InMs= None
+    id                   = None
+    dataStreamIp         = None
+    dataStreamPort       = None
+    logfileFullPath      = None
+    zmqContextForWorker  = None
+    zmqMessageChunkSize  = None
+    fileWaitTime_inMs    = None
+    fileMaxWaitTime_InMs = None
+    cleaner_url          = "inproc://workers"
+    zmqDataStreamSocket  = None
     def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize,
                  fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0):
@@ -46,6 +49,7 @@ class WorkerProcess():
         except KeyboardInterrupt:
             # trace = traceback.format_exc()
             logging.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
+            self.zmqDataStreamSocket.close()
             trace = traceback.format_exc()
@@ -84,9 +88,9 @@ class WorkerProcess():
         zmqContextForWorker = zmq.Context()
         self.zmqContextForWorker = zmqContextForWorker
-        zmqDataStreamSocket = zmqContextForWorker.socket(zmq.PUSH)
+        self.zmqDataStreamSocket = zmqContextForWorker.socket(zmq.PUSH)
         connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=dataStreamIp, port=dataStreamPort)
-        zmqDataStreamSocket.connect(connectionStrDataStreamSocket)
+        self.zmqDataStreamSocket.connect(connectionStrDataStreamSocket)
         routerSocket = zmqContextForWorker.socket(zmq.REQ)
         routerSocket.identity = u"worker-{ID}".format(ID=id).encode("ascii")
@@ -95,6 +99,11 @@ class WorkerProcess():
         processingJobs = True
         jobCount = 0
+        #init Cleaner message-pipe
+        cleanerSocket = zmqContextForWorker.socket(zmq.PUSH)
+        connectionStringCleanerSocket = self.cleaner_url
+        cleanerSocket.connect(connectionStringCleanerSocket)
         while processingJobs:
             #sending a "ready"-signal to the router.
@@ -142,7 +151,7 @@ class WorkerProcess():
             #passing file to data-messagPipe
                 logging.debug("worker-" + str(id) + ": passing new file to data-messagePipe...")
-                self.passFileToDataStream(zmqDataStreamSocket, filename, sourcePath, relativeParent)
+                self.passFileToDataStream(filename, sourcePath, relativeParent)
                 logging.debug("worker-" + str(id) + ": passing new file to data-messagePipe...success.")
             except Exception, e:
                 errorMessage = "Unable to pass new file to data-messagePipe."
@@ -153,6 +162,22 @@ class WorkerProcess():
+            #send remove-request to message pipe
+            try:
+                #sending to pipe
+                logging.debug("send file-event to cleaner-pipe...")
+                cleanerSocket.send(workload)
+                logging.debug("send file-event to cleaner-pipe...success.")
+                #TODO: remember workload. append to list?
+                # can be used to verify files which have been processed twice or more
+            except Exception, e:
+                errorMessage = "Unable to notify Cleaner-pipe to delete file: " + str(filename)
+                logging.error(errorMessage)
+                logging.debug("fileEventMessageDict=" + str(fileEventMessageDict))
     def getFileWaitTimeInMs(self):
         waitTime = 2000.0
@@ -164,7 +189,7 @@ class WorkerProcess():
         return maxWaitTime
-    def passFileToDataStream(self, zmqDataStreamSocket, filename, sourcePath, relativeParent):
+    def passFileToDataStream(self, filename, sourcePath, relativeParent):
         """filesizeRequested == filesize submitted by file-event. In theory it can differ to real file size"""
         # filename = "img.tiff"
@@ -264,12 +289,12 @@ class WorkerProcess():
                 #send to zmq pipe
-                zmqDataStreamSocket.send_multipart(chunkPayload)
+                self.zmqDataStreamSocket.send_multipart(chunkPayload)
             #close file
-            # zmqDataStreamSocket.send_multipart(multipartMessage)
+            # self.zmqDataStreamSocket.send_multipart(multipartMessage)
             logging.debug("Passing multipart-message...done.")
         except Exception, e:
             logging.error("Unable to send multipart-message")
@@ -391,13 +416,16 @@ class FileMover():
     def process(self):
+        except zmq.error.ZMQError as e:
+            logging.error("ZMQError: "+ str(e))
         except KeyboardInterrupt:
             logging.debug("KeyboardInterrupt detected. Shutting down fileMover.")
   "Shutting down fileMover as KeyboardInterrupt was detected.")
-        else:
-            logging.error("Unknown Error. Quitting.")
+        except:
+            trace = traceback.format_exc()
   "Stopping fileMover due to unknown error condition.")
+            logging.debug("Error was: " + str(trace))
@@ -586,6 +614,211 @@ class FileMover():
+class Cleaner():
+    """
+    * received cleaning jobs via zeromq,
+      such as removing a file
+    * Does regular checks on the watched directory,
+    such as
+      - deleting files which have been successfully send
+        to target but still remain in the watched directory
+      - poll the watched directory and reissue new files
+        to fileMover which have not been detected yet
+    """
+    logfileFullPath      = None
+    bindingPortForSocket = None
+    bindingIpForSocket   = None
+    zmqContextForCleaner = None
+    def __init__(self, logfilePath, context, bindingIp="", bindingPort="6062", verbose=False):
+        self.bindingPortForSocket = bindingPort
+        self.bindingIpForSocket   = bindingIp
+        self.initLogging(logfilePath, verbose)
+        log = self.getLogger()
+        try:
+            self.process()
+        except zmq.error.ZMQError:
+            log.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
+            self.zmqContextForCleaner.destroy()
+        except KeyboardInterrupt:
+            log.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
+            self.zmqContextForCleaner.destroy()
+        except:
+            trace = traceback.format_exc()
+            log.error("Stopping cleanerProcess due to unknown error condition.")
+            log.debug("Error was: " + str(trace))
+    def getLogger(self):
+        logger = logging.getLogger("cleaner")
+        return logger
+    def initLogging(self, logfilePath, verbose):
+        #@see
+        logfilePathFull = os.path.join(logfilePath, "cleaner.log")
+        logger = logging.getLogger("cleaner")
+        #more detailed logging if verbose-option has been set
+        loggingLevel = logging.INFO
+        if verbose:
+            loggingLevel = logging.DEBUG
+        #log everything to file
+        fileHandler = logging.FileHandler(filename=logfilePathFull,
+                                          mode="a")
+        fileHandlerFormat = logging.Formatter(datefmt='%Y-%m-%d_%H:%M:%S',
+                                              fmt='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s')
+        fileHandler.setFormatter(fileHandlerFormat)
+        fileHandler.setLevel(loggingLevel)
+        logger.addHandler(fileHandler)
+    def process(self):
+        processingJobs = True
+        log = self.getLogger()
+        #create zmq context
+        zmqContextForCleaner = zmq.Context()
+        self.zmqContextForCleaner = zmqContextForCleaner
+        #bind to local port
+        zmqJobSocket = zmqContextForCleaner.socket(zmq.PULL)
+        zmqJobSocket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.bindingPortForSocket)
+        #processing messaging
+        while processingJobs:
+            #waiting for new jobs
+            try:
+                workload = zmqJobSocket.recv()
+            except Exception as e:
+                logging.error("Error in receiving job: " + str(e))
+            #transform to dictionary
+            try:
+                workloadDict = json.loads(str(workload))
+            except:
+                errorMessage = "invalid job received. skipping job"
+                log.error(errorMessage)
+                log.debug("workload=" + str(workload))
+                continue
+            #extract fileEvent metadata
+            try:
+                #TODO validate fileEventMessageDict dict
+                filename       = workloadDict["filename"]
+                sourcePath     = workloadDict["sourcePath"]
+                relativeParent = workloadDict["relativeParent"]
+                targetPath     = workloadDict["targetPath"]
+                # filesize       = workloadDict["filesize"]
+            except Exception, e:
+                errorMessage   = "Invalid fileEvent message received."
+                log.error(errorMessage)
+                log.debug("Error was: " + str(e))
+                log.debug("workloadDict=" + str(workloadDict))
+                #skip all further instructions and continue with next iteration
+                continue
+            #moving source file
+            sourceFilepath = None
+            try:
+                logging.debug("removing source file...")
+                #generate target filepath
+#                sourceFilepath = os.path.join(sourcePath,filename)
+                self.moveFile(sourceFilepath)
+                self.moveFile(sourcePath, filename, target)
+                # #show filesystem statistics
+                # try:
+                #     self.showFilesystemStatistics(sourcePath)
+                # except Exception, f:
+                #     logging.warning("Unable to get filesystem statistics")
+                #     logging.debug("Error was: " + str(f))
+                log.debug("file removed: " + str(sourceFilepath))
+                log.debug("removing source file...success.")
+            except Exception, e:
+                errorMessage = "Unable to remove source file."
+                log.error(errorMessage)
+                trace = traceback.format_exc()
+                log.error("Error was: " + str(trace))
+                log.debug("sourceFilepath="+str(sourceFilepath))
+                log.debug("removing source file...failed.")
+                #skip all further instructions and continue with next iteration
+                continue
+    def moveFile(self, source, filename, target):
+        log = self.getLogger()
+        maxAttemptsToRemoveFile     = 2
+        waitTimeBetweenAttemptsInMs = 500
+        iterationCount = 0
+"Moving file '" + str(filename) + "' from '" +  str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
+        fileWasMoved = False
+        while iterationCount <= maxAttemptsToRemoveFile and not fileWasMoved:
+            iterationCount+=1
+            try:
+                # check if the directory exists before moving the file
+                if not os.path.exists(target):
+                    try:
+                        os.makedirs(target)
+                    except OSError:
+                        pass
+                # moving the file
+                os.remove(filepath)
+                shutil.move(source + os.sep + filename, target + os.sep + filename)
+                fileWasMoved = True
+                log.debug("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
+            except Exception, e:
+                trace = traceback.format_exc()
+                warningMessage = "Unable to move file {FILE}.".format(FILE=str(source) + str(filename))
+                log.warning(warningMessage)
+                log.debug("trace=" + str(trace))
+                log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
+        if not fileWasMoved:
+            log.error("Moving file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.")
+            raise Exception("maxAttemptsToMoveFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount),
+                                                                                                                            FILE=filepath))
+    def removeFile(self, filepath):
+        log = self.getLogger()
+        maxAttemptsToRemoveFile     = 2
+        waitTimeBetweenAttemptsInMs = 500
+        iterationCount = 0
+"Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...")
+        fileWasRemoved = False
+        while iterationCount <= maxAttemptsToRemoveFile and not fileWasRemoved:
+            iterationCount+=1
+            try:
+                os.remove(filepath)
+                fileWasRemoved = True
+                log.debug("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...success.")
+            except Exception, e:
+                trace = traceback.format_exc()
+                warningMessage = "Unable to remove file {FILE}.".format(FILE=str(filepath))
+                log.warning(warningMessage)
+                log.debug("trace=" + str(trace))
+                log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
+        if not fileWasRemoved:
+            log.error("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...FAILED.")
+            raise Exception("maxAttemptsToRemoveFile reached (value={ATTEMPT}). Unable to remove file '{FILE}'.".format(ATTEMPT=str(iterationCount),
+                                                                                                                            FILE=filepath))
 def argumentParsing():
     parser = argparse.ArgumentParser()
diff --git a/ZeroMQTunnel/ b/ZeroMQTunnel/
index 40f6c314..af664ff4 100644
--- a/ZeroMQTunnel/
+++ b/ZeroMQTunnel/
@@ -169,11 +169,17 @@ class FileReceiver:
             time.sleep (1)
             # send first element in ring buffer to live viewer (the path of this file is the second entry)
             if self.ringBuffer:
-                zmqLiveViewerSocket.send(self.ringBuffer[0][1])
-                print self.ringBuffer[0][1]
+                try:
+                    zmqLiveViewerSocket.send(self.ringBuffer[0][1])
+                    print self.ringBuffer[0][1]
+                except zmq.error.ContextTerminated:
+                    break
-                zmqLiveViewerSocket.send("None")
-                print self.ringBuffer
+                try:
+                    zmqLiveViewerSocket.send("None")
+                    print self.ringBuffer
+                except zmq.error.ContextTerminated:
+                    break
     def combineMessage(self, zmqSocket):
diff --git a/ZeroMQTunnel/ b/ZeroMQTunnel/
index d0ab0075..a9eb644f 100644
--- a/ZeroMQTunnel/
+++ b/ZeroMQTunnel/
@@ -56,20 +56,20 @@ class DirectoryWatcherHandler():
         return socket
-    def passFileToZeromq(self, filepath):
+    def passFileToZeromq(self, filepath, targetPath):
         :param rootDirectorty: where to start traversing. including subdirs.
-            self.sendFilesystemEventToMessagePipe(filepath, self.messageSocket)
+            self.sendFilesystemEventToMessagePipe(filepath, self.messageSocket, targetPath)
         except Exception, e:
             logging.error("Unable to process file '" + str(filepath) + "'")
             logging.warning("Skip file '" + str(filepath) + "'. Reason was: " + str(e))
-    def sendFilesystemEventToMessagePipe(self, filepath, targetSocket):
+    def sendFilesystemEventToMessagePipe(self, filepath, targetSocket, targetPath):
         Taking the filename, creating a buffer and then
         sending the data as multipart message to the targetSocket.
@@ -118,7 +118,8 @@ class DirectoryWatcherHandler():
             logging.debug("Building message dict...")
             messageDict = { "filename"      : filename,
                             "sourcePath"    : parentDir,
-                            "relativeParent": relativeParent}
+                            "relativeParent": relativeParent,
+                            "targetPath"    : targetPath}
             messageDictJson = json.dumps(messageDict)  #sets correct escape characters
             logging.debug("Building message dict...done.")
@@ -328,6 +329,8 @@ if __name__ == '__main__':
     fileEventServerIp   = str(arguments.pushServerIp)
     fileEventServerPort = str(arguments.pushServerPort)
+    communicationWithLcyncdIp   = ""
+    communicationWithLcyncdPort = "6080"
     #abort if watch-folder does not exist
@@ -346,26 +349,70 @@ if __name__ == '__main__':
     #just get a list of all files in watchDir and pass to zeromq
     directoryWatcher = DirectoryWatcherHandler(zmqContext, fileEventServerIp, watchFolder, fileEventServerPort)
-    pipe_path = "/tmp/zeromqllpipe"
-    if not os.path.exists(pipe_path):
-        os.mkfifo(pipe_path)
-    # Open the fifo. We need to open in non-blocking mode or it will stalls until
-    # someone opens it for writting
-    pipe_fd =, os.O_RDONLY | os.O_NONBLOCK)
+#    pipe_path = "/tmp/zeromqllpipe"
+#    if not os.path.exists(pipe_path):
+#        os.mkfifo(pipe_path)
+#    # Open the fifo. We need to open in non-blocking mode or it will stalls until
+#    # someone opens it for writting
+#    pipe_fd =, os.O_RDONLY | os.O_NONBLOCK)
+#    #wait for new files
+#    with os.fdopen(pipe_fd) as pipe:
+#        while True:
+#            message =
+#            if message:
+##                print("Received: '%s'" % message)
+#                pathnames = message.splitlines()
+#                for filepath in pathnames:
+#                    directoryWatcher.passFileToZeromq(filepath)
+#            time.sleep(0.1)
+    workers = zmqContext.socket(zmq.PULL)
+    zmqSocketStr = 'tcp://' + communicationWithLcyncdIp + ':' + communicationWithLcyncdPort
+    logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr))
+    workers.bind(zmqSocketStr)
+    while True:
+        #waiting for new jobs
+        try:
+            workload = workers.recv()
+        except KeyboardInterrupt:
+            break
+        #transform to dictionary
+        try:
+            workloadDict = json.loads(str(workload))
+        except:
+            errorMessage = "invalid job received. skipping job"
+            logging.error(errorMessage)
+            logging.debug("workload=" + str(workload))
+            continue
+        #extract fileEvent metadata
+        try:
+            #TODO validate fileEventMessageDict dict
+            filepath   = workloadDict["filepath"]
+            targetPath = workloadDict["targetPath"]
+  "Received message: filepath: " + str(filepath) + ", targetPath: " + str(targetPath))
+        except Exception, e:
+            errorMessage   = "Invalid fileEvent message received."
+            logging.error(errorMessage)
+            logging.debug("Error was: " + str(e))
+            logging.debug("workloadDict=" + str(workloadDict))
+            #skip all further instructions and continue with next iteration
+            continue
+        # send the file to the fileMover
+        directoryWatcher.passFileToZeromq(filepath, targetPath)
-    #wait for new files
-    with os.fdopen(pipe_fd) as pipe:
-        while True:
-            message =
-            if message:
-#                print("Received: '%s'" % message)
-                pathnames = message.splitlines()
-                for filepath in pathnames:
-                    directoryWatcher.passFileToZeromq(filepath)
-            time.sleep(0.1)
+    # We never get here but clean up anyhow
+    workers.close()
diff --git a/start_scripts/ b/start_scripts/
index d38602ea..b5d82aaa 100644
--- a/start_scripts/
+++ b/start_scripts/
@@ -1,3 +1,3 @@
-python ../ZeroMQTunnel/ --watchFolder /space/projects/Live_Viewer/source/ --logfilePath /space/projects/Live_Viewer/
+python ../ZeroMQTunnel/ --watchFolder /space/projects/Live_Viewer/source/ --logfilePath /space/projects/Live_Viewer/logs
diff --git a/ b/
index 5a97429d..3bcf363c 100644
--- a/
+++ b/
@@ -2,10 +2,43 @@ import argparse
 import subprocess
 import os
 import time
+import zmq
+import json
+import logging
+def initLogging(filenameFullPath, verbose):
+    #@see
+    #more detailed logging if verbose-option has been set
+    loggingLevel = logging.INFO
+    if verbose:
+        loggingLevel = logging.DEBUG
+    #log everything to file
+    logging.basicConfig(level=loggingLevel,
+                        format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s',
+                        datefmt='%Y-%m-%d_%H:%M:%S',
+                        filename=filenameFullPath,
+                        filemode="a")
+    #log info to stdout, display messages with different format than the file output
+    console = logging.StreamHandler()
+    console.setLevel(logging.WARNING)
+    formatter = logging.Formatter("%(asctime)s >  %(message)s")
+    console.setFormatter(formatter)
+    logging.getLogger("").addHandler(console)
 supportedFormats = [ "tif", "cbf", "hdf5"]
 watchFolder = "/space/projects/Live_Viewer/source/"
+logfile = "/space/projects/Live_Viewer/logs/wrapper_script.log"
+verbose = True
+#enable logging
+initLogging(logfile, verbose)
 parser = argparse.ArgumentParser()
@@ -14,6 +47,7 @@ parser.add_argument("--mv_target", help = "Move target")
 arguments = parser.parse_args()
 source = os.path.normpath ( arguments.mv_source )
 target = os.path.normpath ( arguments.mv_target )
@@ -25,41 +59,38 @@ relativebasepath         = os.path.relpath ( source, commonPrefix )
 ( name, postfix ) = filename.split( "." )
 supported_file = postfix in supportedFormats
-if supported_file:
-    my_cmd = 'echo "' +  source + '"  > /tmp/zeromqllpipe'
-    p = subprocess.Popen ( my_cmd, shell=True )
-    p.communicate()
+zmqIp   = ""
+zmqPort = "6080"
-    # wait to ZeroMQ to finish
-    time.sleep(10)
-# get responce from zeromq
-#pipe_path = "/tmp/zeromqllpipe_resp"
+if supported_file:
-# Open the fifo. We need to open in non-blocking mode or it will stalls until
-# someone opens it for writting
-#pipe_fd =, os.O_RDONLY | os.O_NONBLOCK)
+       # set up ZeroMQ
+    zmqContext = zmq.Context()
-#waitForAnswer = True
+    socket = zmqContext.socket(zmq.PUSH)
+    zmqSocketStr = 'tcp://' + zmqIp + ':' + zmqPort
+    logging.debug( "Connecting to ZMQ socket: " + str(zmqSocketStr))
+    socket.connect(zmqSocketStr)
-#wait for new files
-#with os.fdopen(pipe_fd) as pipe:
-#    while waitForAnswer:
-#        message =
-#        if message:
-#            pathnames = message.splitlines()
-#            for filepath in pathnames:
-#                if filepath == source:
-#                    waitForAnswer = False
-#                    break
-#        print "sleep"
-#        time.sleep(0.1)
+    #send reply back to server
+    workload = { "filepath": source, "targetPath": target }
+    workload_json = json.dumps(workload)
+    socket.send(workload_json)
+    logging.debug( "Send message to ZMQ: " + str(workload))
+#    my_cmd = 'echo "' +  source + '"  > /tmp/zeromqllpipe'
+#    p = subprocess.Popen ( my_cmd, shell=True )
+#    p.communicate()
+    # wait to ZeroMQ to finish
+#    time.sleep(10)
+#p = subprocess.Popen ( [ 'mv', source, target ],
+#                stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE,
+#                universal_newlines = False )
+#out, err = p.communicate()
+    # We never get here but clean up anyhow
-p = subprocess.Popen ( [ 'mv', source, target ],
-                stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE,
-                universal_newlines = False )
-out, err = p.communicate()
+    socket.close()
+    zmqContext.destroy()