From 888b92e39bf99c0c5dfee1b829d11f8352b8aed4 Mon Sep 17 00:00:00 2001
From: Manuela Kuhn <manuela.kuhn@desy.de>
Date: Tue, 28 Jul 2015 12:44:02 +0200
Subject: [PATCH] lose sockets properly in fileMover

---
 ZeroMQTunnel/fileMover.py | 313 +++++++++++++++++---------------------
 1 file changed, 141 insertions(+), 172 deletions(-)

diff --git a/ZeroMQTunnel/fileMover.py b/ZeroMQTunnel/fileMover.py
index 842aef9c..04fc44eb 100644
--- a/ZeroMQTunnel/fileMover.py
+++ b/ZeroMQTunnel/fileMover.py
@@ -31,12 +31,15 @@ class WorkerProcess():
     zmqCleanerIp         = None              # responsable to delete files
     zmqCleanerPort       = None              # responsable to delete files
     zmqDataStreamSocket  = None
+    routerSocket         = None
+    cleanerSocket        = None
 
     # to get the logging only handling this class
     log                   = None
 
     def __init__(self, id, dataStreamIp, dataStreamPort, chunkSize, zmqCleanerIp, zmqCleanerPort,
-                 fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0):
+                 fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0,
+                 context = None):
         self.id                   = id
         self.dataStreamIp         = dataStreamIp
         self.dataStreamPort       = dataStreamPort
@@ -46,20 +49,50 @@ class WorkerProcess():
         self.fileWaitTime_inMs    = fileWaitTimeInMs
         self.fileMaxWaitTime_InMs = fileMaxWaitTimeInMs
 
+        #initialize router
+        self.zmqContextForWorker = context or zmq.Context()
+
         self.log = self.getLogger()
 
+
+        dataStreamIp   = self.dataStreamIp
+        dataStreamPort = self.dataStreamPort
+
+        self.log.debug("new workerThread started. id=" + str(self.id))
+
+        # initialize sockets
+        self.zmqDataStreamSocket      = self.zmqContextForWorker.socket(zmq.PUSH)
+        connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=dataStreamIp, port=self.dataStreamPort)
+        self.zmqDataStreamSocket.connect(connectionStrDataStreamSocket)
+        self.log.debug("zmqDataStreamSocket started for '" + connectionStrDataStreamSocket + "'")
+
+        self.routerSocket             = self.zmqContextForWorker.socket(zmq.REQ)
+        self.routerSocket.identity    = u"worker-{ID}".format(ID=self.id).encode("ascii")
+        connectionStrRouterSocket     = "tcp://{ip}:{port}".format(ip="127.0.0.1", port="50000")
+        self.routerSocket.connect(connectionStrRouterSocket)
+        self.log.debug("routerSocket started for '" + connectionStrRouterSocket + "'")
+
+        #init Cleaner message-pipe
+        self.cleanerSocket            = self.zmqContextForWorker.socket(zmq.PUSH)
+        connectionStrCleanerSocket    = "tcp://{ip}:{port}".format(ip=self.zmqCleanerIp, port=self.zmqCleanerPort)
+        self.cleanerSocket.connect(connectionStrCleanerSocket)
+
         try:
             self.process()
         except KeyboardInterrupt:
             # trace = traceback.format_exc()
             self.log.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
-#            self.zmqDataStreamSocket.close()
-#            self.zmqContextForWorker.destroy()
         else:
             trace = traceback.format_exc()
             self.log.error("Stopping workerProcess due to unknown error condition.")
             self.log.debug("Error was: " + str(trace))
 
+        self.log.info("Closing sockets")
+        self.zmqDataStreamSocket.close()
+        self.routerSocket.close()
+        self.cleanerSocket.close()
+        self.zmqContextForWorker.destroy()
+
 
     def process(self):
         """
@@ -82,44 +115,21 @@ class WorkerProcess():
           a separate data-messagePipe. Afterwards the original file
           will be removed.
         """
-        id             = self.id
-        dataStreamIp   = self.dataStreamIp
-        dataStreamPort = self.dataStreamPort
-
-        self.log.debug("new workerThread started. id=" + str(id))
 
-        #initialize router
-        zmqContextForWorker = zmq.Context()
-        self.zmqContextForWorker = zmqContextForWorker
-
-        self.zmqDataStreamSocket = zmqContextForWorker.socket(zmq.PUSH)
-        connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=dataStreamIp, port=dataStreamPort)
-        self.zmqDataStreamSocket.connect(connectionStrDataStreamSocket)
-
-        routerSocket = zmqContextForWorker.socket(zmq.REQ)
-        routerSocket.identity = u"worker-{ID}".format(ID=id).encode("ascii")
-        connectionStrRouterSocket = "tcp://{ip}:{port}".format(ip="127.0.0.1", port="50000")
-        routerSocket.connect(connectionStrRouterSocket)
         processingJobs = True
         jobCount = 0
 
-        #init Cleaner message-pipe
-        cleanerSocket = zmqContextForWorker.socket(zmq.PUSH)
-        connectionStringCleanerSocket = "tcp://{ip}:{port}".format(ip=self.zmqCleanerIp, port=self.zmqCleanerPort)
-        cleanerSocket.connect(connectionStringCleanerSocket)
-
-
         while processingJobs:
             #sending a "ready"-signal to the router.
             #the reply will contain the actual job/task.
-            self.log.debug("worker-"+str(id)+": sending ready signal")
+            self.log.debug("worker-"+str(self.id)+": sending ready signal")
 
-            routerSocket.send(b"READY")
+            self.routerSocket.send(b"READY")
 
             # Get workload from router, until finished
-            self.log.debug("worker-"+str(id)+": waiting for new job")
-            workload = routerSocket.recv()
-            self.log.debug("worker-"+str(id)+": new job received")
+            self.log.debug("worker-"+str(self.id)+": waiting for new job")
+            workload = self.routerSocket.recv()
+            self.log.debug("worker-"+str(self.id)+": new job received")
             finished = workload == b"END"
             if finished:
                 processingJobs = False
@@ -170,7 +180,7 @@ class WorkerProcess():
             try:
                 #sending to pipe
                 self.log.debug("send file-event to cleaner-pipe...")
-                cleanerSocket.send(workload)
+                self.cleanerSocket.send(workload)
                 self.log.debug("send file-event to cleaner-pipe...success.")
 
                 #TODO: remember workload. append to list?
@@ -286,6 +296,7 @@ class WorkerProcess():
 
                     #as chunk is empty decrease chunck-counter
                     chunkNumber -= 1
+                    break
 
                 #assemble metadata for zmq-message
                 chunkPayloadMetadata = payloadMetadata.copy()
@@ -310,7 +321,6 @@ class WorkerProcess():
             raise Exception(e)
 
 
-
     def appendFileChunksToPayload(self, payload, sourceFilePathFull, fileDescriptor, chunkSize):
         try:
             # chunksize = 16777216 #16MB
@@ -381,12 +391,7 @@ class WorkerProcess():
 #  --------------------------  class: FileMover  --------------------------------------
 #
 class FileMover():
-    patterns              = ["*"]
-    fileList_newFiles     = list()
-    fileCount_newFiles    = 0
     zmqContext            = None
-    messageSocket         = None         # to receiver fileMove-jobs as json-encoded dictionary
-    dataSocket            = None         # to send fileObject as multipart message
     bindingIpForSocket    = None
     zqmFileEventServerIp  = "127.0.0.1"  # serverIp for incoming messages
     tcpPort_messageStream = "6060"
@@ -396,20 +401,25 @@ class FileMover():
     zmqCleanerPort        = "6062"       # zmq pull endpoint, responsable to delete files
     fileWaitTimeInMs      = None
     fileMaxWaitTimeInMs   = None
-    pipe_name             = "/tmp/zeromqllpipe_resp"
-
-    currentZmqDataStreamSocketListIndex = None # Index-Number of a socket used to send datafiles to
+    parallelDataStreams   = None
     chunkSize             = None
 
+    # sockets
+    messageSocket         = None         # to receiver fileMove-jobs as json-encoded dictionary
+    routerSocket          = None
+
     # to get the logging only handling this class
     log                   = None
 
 
-    def __init__(self, context, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams,
+    def __init__(self, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams,
                  chunkSize, zmqCleanerIp, zmqCleanerPort,
-                 fileWaitTimeInMs, fileMaxWaitTimeInMs):
+                 fileWaitTimeInMs, fileMaxWaitTimeInMs,
+                 context = None):
+
+        assert isinstance(context, zmq.sugar.context.Context)
 
-        self.zmqContext            = context
+        self.zmqContext            = context or zmq.Context()
         self.bindingIpForSocket    = bindingIpForSocket
         self.tcpPort_messageStream = bindingPortForSocket
         self.dataStreamIp          = dataStreamIp
@@ -422,13 +432,22 @@ class FileMover():
         self.fileMaxWaitTimeInMs   = fileMaxWaitTimeInMs
 
 
-        #create zmq sockets. one for incoming file events, one for passing fileObjects to
-        self.messageSocket = self.getZmqSocket_Pull(self.zmqContext)
-        self.dataSocket    = self.getZmqSocket_Push(self.zmqContext)
-
         self.log = self.getLogger()
         self.log.debug("Init")
 
+        #create zmq sockets. one for incoming file events, one for passing fileObjects to
+        self.messageSocket         = self.zmqContext.socket(zmq.PULL)
+        connectionStrMessageSocket = "tcp://" + self.bindingIpForSocket + ":%s" % self.tcpPort_messageStream
+        self.messageSocket.bind(connectionStrMessageSocket)
+        self.log.debug("messageSocket started for '" + connectionStrMessageSocket + "'")
+
+        #setting up router for load-balancing worker-threads.
+        #each worker-thread will handle a file event
+        self.routerSocket         = self.zmqContext.socket(zmq.ROUTER)
+        connectionStrRouterSocket = "tcp://127.0.0.1:50000"
+        self.routerSocket.bind(connectionStrRouterSocket)
+        self.log.debug("routerSocket started for '" + connectionStrRouterSocket + "'")
+
 
     def process(self):
         try:
@@ -436,16 +455,13 @@ class FileMover():
         except zmq.error.ZMQError as e:
             self.log.error("ZMQError: "+ str(e))
             self.log.debug("Shutting down workerProcess.")
-        except KeyboardInterrupt:
-            self.log.debug("KeyboardInterrupt detected. Shutting down fileMover.")
-            self.log.info("Shutting down fileMover as KeyboardInterrupt was detected.")
-            self.zmqContext.destroy()
         except:
             trace = traceback.format_exc()
             self.log.info("Stopping fileMover due to unknown error condition.")
             self.log.debug("Error was: " + str(trace))
 
 
+
     def getLogger(self):
         logger = logging.getLogger("fileMover")
         return logger
@@ -460,9 +476,6 @@ class FileMover():
 
 
     def startReceiving(self):
-        #create socket
-        zmqContext = self.zmqContext
-        zmqSocketForNewFileEvents  = self.createPullSocket()
         self.log.debug("new message-socket crated for: new file events.")
         parallelDataStreams = int(self.parallelDataStreams)
 
@@ -470,21 +483,13 @@ class FileMover():
 
         incomingMessageCounter = 0
 
-
-        #setting up router for load-balancing worker-threads.
-        #each worker-thread will handle a file event
-        routerSocket = self.zmqContext.socket(zmq.ROUTER)
-        routerSocket.bind("tcp://127.0.0.1:50000")
-        self.log.debug("routerSocket started for 'tcp://127.0.0.1:50000'")
-
-
         #start worker-threads. each will have its own PushSocket.
         workerThreadList      = list()
         numberOfWorkerThreads = parallelDataStreams
         fileWaitTimeInMs      = self.getFileWaitTimeInMs()
         fileMaxWaitTimeInMs   = self.getFileMaxWaitTimeInMs()
         for threadNumber in range(numberOfWorkerThreads):
-            self.log.debug("instantiate new workerProcess (nr " + str(threadNumber))
+            self.log.debug("instantiate new workerProcess (nr " + str(threadNumber) + " )")
             newWorkerThread = Process(target=WorkerProcess, args=(threadNumber,
                                                                   self.dataStreamIp,
                                                                   self.dataStreamPort,
@@ -501,113 +506,51 @@ class FileMover():
         #run loop, and wait for incoming messages
         continueReceiving = True
         self.log.debug("waiting for new fileEvent-messages")
-        while continueReceiving:
-            try:
-                incomingMessage = zmqSocketForNewFileEvents.recv()
-                self.log.debug("new fileEvent-message received.")
-                self.log.debug("message content: " + str(incomingMessage))
-                incomingMessageCounter += 1
-
-                self.log.debug("processFileEvent...")
-                self.processFileEvent(incomingMessage, routerSocket)  #TODO refactor as separate process to emphasize unblocking
-                self.log.debug("processFileEvent...done")
-            except Exception, e:
-                self.log.error("Failed to receive new fileEvent-message.")
-                self.log.error(sys.exc_info())
-
-                #TODO might using a error-count and threshold when to stop receiving, e.g. after 100 misses?
-                # continueReceiving = False
-
-
-        print "shutting down fileEvent-receiver..."
         try:
-            self.log.debug("shutting down zeromq...")
-            self.stopReceiving(zmqSocketForNewFileEvents, zmqContext)
-            self.log.debug("shutting down zeromq...done.")
-        except:
-            self.log.error(sys.exc_info())
-            self.log.error("shutting down zeromq...failed.")
+            while continueReceiving:
+                try:
+                    incomingMessage = self.messageSocket.recv()
+                    self.log.debug("new fileEvent-message received.")
+                    self.log.debug("message content: " + str(incomingMessage))
+                    incomingMessageCounter += 1
+
+                    self.log.debug("processFileEvent..." + str(incomingMessageCounter))
+                    self.processFileEvent(incomingMessage)  #TODO refactor as separate process to emphasize unblocking
+                    self.log.debug("processFileEvent...done")
+                except Exception, e:
+                    self.log.error("Failed to receive new fileEvent-message.")
+                    self.log.error(sys.exc_info())
+
+                    #TODO might using a error-count and threshold when to stop receiving, e.g. after 100 misses?
+                    # continueReceiving = False
+        except KeyboardInterrupt:
+            self.log.info("Keyboard interuption detected. Stop receiving")
 
 
 
-    def routeFileEventToWorkerThread(self, fileEventMessage, routerSocket):
+    def processFileEvent(self, fileEventMessage):
         # LRU worker is next waiting in the queue
         self.log.debug("waiting for available workerThread.")
 
         # address == "worker-0"
         # empty   == ""
         # ready   == "READY"
-        address, empty, ready = routerSocket.recv_multipart()
+        address, empty, ready = self.routerSocket.recv_multipart()
         self.log.debug("available workerThread detected.")
 
         self.log.debug("passing job to workerThread...")
-        routerSocket.send_multipart([
+        self.routerSocket.send_multipart([
                                      address,
                                      b'',
                                      fileEventMessage,
                                     ])
 
-        # inform lsyncd wrapper that the file can be moved
-
-#        if not os.path.exists(self.pipe_name):
-#                os.mkfifo(self.pipe_name)
-
-
-#        messageToPipe = json.loads ( fileEventMessage )
-#        messageToPipe = messageToPipe["sourcePath"] + os.sep + messageToPipe["filename"]
-#        my_cmd = 'echo "' + messageToPipe + '"  > ' + self.pipe_name
-#        p = subprocess.Popen ( my_cmd, shell=True )
-#        p.communicate()
-
         self.log.debug("passing job to workerThread...done.")
 
 
-    def processFileEvent(self, fileEventMessage, routerSocket):
-        self.routeFileEventToWorkerThread(fileEventMessage, routerSocket)
-
-
-    def stopReceiving(self, zmqSocket, msgContext):
-        try:
-            self.log.debug("closing zmqSocket...")
-            zmqSocket.close()
-            self.log.debug("closing zmqSocket...done.")
-        except:
-            self.log.debug("closing zmqSocket...failed.")
-            self.log.error(sys.exc_info())
-
-        try:
-            self.log.debug("closing zmqContext...")
-            msgContext.destroy()
-            self.log.debug("closing zmqContext...done.")
-        except:
-            self.log.debug("closing zmqContext...failed.")
-            self.log.error(sys.exc_info())
-
-
-    def getZmqSocket_Pull(self, context):
-        pattern_pull = zmq.PULL
-        assert isinstance(context, zmq.sugar.context.Context)
-        socket = context.socket(pattern_pull)
-
-        return socket
-
-
-    def getZmqSocket_Push(self, context):
-        pattern = zmq.PUSH
-        assert isinstance(context, zmq.sugar.context.Context)
-        socket = context.socket(pattern)
-
-        return socket
-
-
-
-    def createPullSocket(self):
-        #get default message-socket
-        socket = self.messageSocket
-
-        self.log.info("binding to message socket: tcp://" + self.bindingIpForSocket + ":%s" % self.tcpPort_messageStream)
-        socket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.tcpPort_messageStream)
-        return socket
+    def stop(self):
+        self.messageSocket.close()
+        self.routerSocket.close()
 
 
 
@@ -625,50 +568,53 @@ class Cleaner():
     bindingPortForSocket = None
     bindingIpForSocket   = None
     zmqContextForCleaner = None
+    zmqCleanerSocket     = None
+
     # to get the logging only handling this class
     log                  = None
 
-    def __init__(self, context, bindingIp="127.0.0.1", bindingPort="6062", verbose=False):
+    def __init__(self, bindingIp="127.0.0.1", bindingPort="6062", context = None, verbose=False):
         self.bindingPortForSocket = bindingPort
         self.bindingIpForSocket   = bindingIp
-        self.zmqContextForCleaner = context
+        self.zmqContextForCleaner = context or zmq.Context()
 
         self.log = self.getLogger()
         self.log.debug("Init")
 
+        #bind to local port
+        self.zmqCleanerSocket = self.zmqContextForCleaner.socket(zmq.PULL)
+        self.zmqCleanerSocket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.bindingPortForSocket)
+
         try:
             self.process()
         except zmq.error.ZMQError:
             self.log.error("ZMQError: "+ str(e))
-            self.log.debug("Shutting down workerProcess.")
-            self.zmqContextForCleaner.destroy()
+            self.log.debug("Shutting down cleaner.")
         except KeyboardInterrupt:
-            self.log.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
-            self.zmqContextForCleaner.destroy()
+            self.log.info("KeyboardInterrupt detected. Shutting down cleaner.")
         except:
             trace = traceback.format_exc()
             self.log.error("Stopping cleanerProcess due to unknown error condition.")
             self.log.debug("Error was: " + str(trace))
 
+        self.zmqCleanerSocket.close()
+        self.zmqContextForCleaner.destroy()
+
 
     def getLogger(self):
         logger = logging.getLogger("cleaner")
         return logger
 
 
-
     def process(self):
         processingJobs = True
 
-        #bind to local port
-        zmqJobSocket = self.zmqContextForCleaner.socket(zmq.PULL)
-        zmqJobSocket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.bindingPortForSocket)
-
         #processing messaging
         while processingJobs:
             #waiting for new jobs
+            self.log.debug("Waiting for new jobs")
             try:
-                workload = zmqJobSocket.recv()
+                workload = self.zmqCleanerSocket.recv()
             except Exception as e:
                 self.log.error("Error in receiving job: " + str(e))
 
@@ -720,7 +666,7 @@ class Cleaner():
                 self.log.debug("removing source file...success.")
 
             except Exception, e:
-                errorMessage = "Unable to remove source file."
+                errorMessage = "Unable to remove source file: " + str (sourcePath)
                 self.log.error(errorMessage)
                 trace = traceback.format_exc()
                 self.log.error("Error was: " + str(trace))
@@ -750,9 +696,15 @@ class Cleaner():
                     except OSError:
                         pass
                 # moving the file
-                shutil.move(source + os.sep + filename, target + os.sep + filename)
+                sourceFile = source + os.sep + filename
+                targetFile = target + os.sep + filename
+                self.log.debug("sourceFile: " + str(sourceFile))
+                self.log.debug("targetFile: " + str(targetFile))
+                shutil.move(sourceFile, targetFile)
                 fileWasMoved = True
                 self.log.debug("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
+            except IOError:
+                self.log.debug ("IOError: " + str(filename))
             except Exception, e:
                 trace = traceback.format_exc()
                 warningMessage = "Unable to move file {FILE}.".format(FILE=str(source) + str(filename))
@@ -764,7 +716,7 @@ class Cleaner():
         if not fileWasMoved:
             self.log.error("Moving file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.")
             raise Exception("maxAttemptsToMoveFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount),
-                                                                                                                            FILE=filepath))
+                                                                                                                            FILE=filename))
 
 
     def removeFile(self, filepath):
@@ -923,25 +875,42 @@ if __name__ == '__main__':
 
     #create zmq context
     # there should be only one context in one process
-    zmqContext = zmq.Context()
+    zmqContext = zmq.Context.instance()
     logging.info("registering zmq global context")
 
 
-    cleanerThread = Process(target=Cleaner, args=(zmqContext, zmqCleanerIp, zmqCleanerPort))
+    cleanerThread = Process(target=Cleaner, args=(zmqCleanerIp, zmqCleanerPort, zmqContext))
     logging.debug("cleaner thread started")
     cleanerThread.start()
 
     #start new fileMover
     # try:
-    fileMover = FileMover(zmqContext, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort,
+    fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort,
                           parallelDataStreams, chunkSize,
                           zmqCleanerIp, zmqCleanerPort,
-                          fileWaitTimeInMs, fileMaxWaitTimeInMs)
-    fileMover.process()
-    # except KeyboardInterrupt, ke:
-    #     print "keyboardInterrupt detected."
+                          fileWaitTimeInMs, fileMaxWaitTimeInMs,
+                          zmqContext)
+    try:
+        fileMover.process()
+    except KeyboardInterrupt:
+        logging.info("Keyboard interruption detected. Shutting down")
     # except Exception, e:
     #     print "unknown exception detected."
 
 
+    logging.debug("shutting down zeromq...")
+    try:
+        fileMover.stop()
+        logging.debug("shutting down zeromq...done.")
+    except:
+        logging.error(sys.exc_info())
+        logging.error("shutting down zeromq...failed.")
+
+    try:
+        logging.debug("closing zmqContext...")
+        zmqContext.destroy()
+        logging.debug("closing zmqContext...done.")
+    except:
+        logging.debug("closing zmqContext...failed.")
+        logging.error(sys.exc_info())
 
-- 
GitLab