From 282bbae418b200c1b4c524ae0a99413d7baa125b Mon Sep 17 00:00:00 2001
From: Manuela Kuhn <manuela.kuhn@desy.de>
Date: Fri, 24 Jul 2015 13:36:28 +0200
Subject: [PATCH] Cleaner via tcp socket

---
 ZeroMQTunnel/fileMover.py | 34 +++++++++++++++++++++++++++-------
 1 file changed, 27 insertions(+), 7 deletions(-)

diff --git a/ZeroMQTunnel/fileMover.py b/ZeroMQTunnel/fileMover.py
index ce1cd744..4e3933dd 100644
--- a/ZeroMQTunnel/fileMover.py
+++ b/ZeroMQTunnel/fileMover.py
@@ -29,16 +29,19 @@ class WorkerProcess():
     zmqMessageChunkSize  = None
     fileWaitTime_inMs    = None
     fileMaxWaitTime_InMs = None
-    cleaner_url          = "inproc://workers"
+    zmqCleanerIp        = None              # responsable to delete files
+    zmqCleanerPort      = None              # responsable to delete files
     zmqDataStreamSocket  = None
 
-    def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize,
+    def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize, zmqCleanerIp, zmqCleanerPort,
                  fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0):
         self.id                   = id
         self.dataStreamIp         = dataStreamIp
         self.dataStreamPort       = dataStreamPort
         self.logfileFullPath      = logfileFullPath
         self.zmqMessageChunkSize  = chunkSize
+        self.zmqCleanerIp         = zmqCleanerIp
+        self.zmqCleanerPort       = zmqCleanerPort
         self.fileWaitTime_inMs    = fileWaitTimeInMs
         self.fileMaxWaitTime_InMs = fileMaxWaitTimeInMs
 
@@ -49,8 +52,8 @@ class WorkerProcess():
         except KeyboardInterrupt:
             # trace = traceback.format_exc()
             logging.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
-            self.zmqDataStreamSocket.close()
-            self.zmqContextForWorker.destroy()
+#            self.zmqDataStreamSocket.close()
+#            self.zmqContextForWorker.destroy()
         else:
             trace = traceback.format_exc()
             logging.error("Stopping workerProcess due to unknown error condition.")
@@ -101,7 +104,7 @@ class WorkerProcess():
 
         #init Cleaner message-pipe
         cleanerSocket = zmqContextForWorker.socket(zmq.PUSH)
-        connectionStringCleanerSocket = self.cleaner_url
+        connectionStringCleanerSocket = "tcp://{ip}:{port}".format(ip=self.zmqCleanerIp, port=self.zmqCleanerPort)
         cleanerSocket.connect(connectionStringCleanerSocket)
 
 
@@ -404,6 +407,8 @@ class FileMover():
     tcpPort_messageStream = "6060"
     dataStreamIp          = "127.0.0.1"  # ip of dataStream-socket to push new files to
     dataStreamPort        = "6061"       # port number of dataStream-socket to push new files to
+    zmqCleanerIp          = "127.0.0.1"  # zmq pull endpoint, responsable to delete files
+    zmqCleanerPort        = "6062"       # zmq pull endpoint, responsable to delete files
     fileWaitTimeInMs      = None
     fileMaxWaitTimeInMs   = None
     pipe_name             = "/tmp/zeromqllpipe_resp"
@@ -418,6 +423,7 @@ class FileMover():
             self.startReceiving()
         except zmq.error.ZMQError as e:
             logging.error("ZMQError: "+ str(e))
+            log.debug("Shutting down workerProcess.")
         except KeyboardInterrupt:
             logging.debug("KeyboardInterrupt detected. Shutting down fileMover.")
             logging.info("Shutting down fileMover as KeyboardInterrupt was detected.")
@@ -430,7 +436,7 @@ class FileMover():
 
 
     def __init__(self, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams,
-                 logfileFullPath, chunkSize,
+                 logfileFullPath, chunkSize, zmqCleanerIp, zmqCleanerPort,
                  fileWaitTimeInMs, fileMaxWaitTimeInMs):
         logging.info("registering zmq global context")
 
@@ -445,6 +451,8 @@ class FileMover():
         self.parallelDataStreams   = parallelDataStreams
         self.logfileFullPath       = logfileFullPath
         self.chunkSize             = chunkSize
+        self.zmqCleanerIp          = zmqCleanerIp
+        self.zmqCleanerPort        = zmqCleanerPort
         self.fileWaitTimeInMs      = fileWaitTimeInMs
         self.fileMaxWaitTimeInMs   = fileMaxWaitTimeInMs
 
@@ -493,6 +501,8 @@ class FileMover():
                                                                   self.dataStreamPort,
                                                                   logfileFullPath,
                                                                   self.chunkSize,
+                                                                  zmqCleanerIp,
+                                                                  zmqCleanerPort,
                                                                   fileWaitTimeInMs,
                                                                   fileMaxWaitTimeInMs))
             workerThreadList.append(newWorkerThread)
@@ -638,7 +648,8 @@ class Cleaner():
         try:
             self.process()
         except zmq.error.ZMQError:
-            log.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
+            logging.error("ZMQError: "+ str(e))
+            log.debug("Shutting down workerProcess.")
             self.zmqContextForCleaner.destroy()
         except KeyboardInterrupt:
             log.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
@@ -828,6 +839,8 @@ def argumentParsing():
     parser.add_argument("--bindingPortForSocket", type=str, help="local port to bind to", default="6060")
     parser.add_argument("--dataStreamIp"  , type=str, help="ip of dataStream-socket to push new files to", default="127.0.0.1")
     parser.add_argument("--dataStreamPort", type=str, help="port number of dataStream-socket to push new files to", default="6061")
+    parser.add_argument("--zmqCleanerIp"  , type=str, help="zmq-pull-socket which deletes given files", default="127.0.0.1")
+    parser.add_argument("--zmqCleanerPort", type=str, help="zmq-pull-socket which deletes given files", default="6063")
     parser.add_argument("--parallelDataStreams", type=int, help="number of parallel data streams. default is 1", default="1")
     parser.add_argument("--chunkSize",      type=int, help="chunk size of file-parts getting send via zmq", default=DEFAULT_CHUNK_SIZE)
     parser.add_argument("--verbose"       ,           help="more verbose output", action="store_true")
@@ -909,6 +922,8 @@ if __name__ == '__main__':
     logfilePath          = str(arguments.logfilePath)
     logfileName          = str(arguments.logfileName)
     parallelDataStreams  = str(arguments.parallelDataStreams)
+    zmqCleanerIp         = str(arguments.zmqCleanerIp)
+    zmqCleanerPort       = str(arguments.zmqCleanerPort)
     chunkSize            = arguments.chunkSize
     verbose              = arguments.verbose
     logfileFullPath      = os.path.join(logfilePath, logfileName)
@@ -916,6 +931,10 @@ if __name__ == '__main__':
     fileMaxWaitTimeInMs  = float(arguments.fileMaxWaitTimeInMs)
 
 
+    cleanerThread = Process(target=Cleaner, args=(logfilePath, zmqCleanerIp, zmqCleanerPort))
+    logging.debug("cleaner thread started")
+    cleanerThread.start()
+
     #enable logging
     initLogging(logfileFullPath, verbose)
 
@@ -924,6 +943,7 @@ if __name__ == '__main__':
     # try:
     fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort,
                           parallelDataStreams, logfileFullPath, chunkSize,
+                          zmqCleanerIp, zmqCleanerPort,
                           fileWaitTimeInMs, fileMaxWaitTimeInMs)
     fileMover.process()
     # except KeyboardInterrupt, ke:
-- 
GitLab