From d597b6888b20b74838cdca5b6f5c4fd40e3f8159 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Wed, 15 Jul 2015 15:11:47 +0200 Subject: [PATCH] Removed remains of cleaner --- ZeroMQTunnel/fileMover.py | 43 ++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/ZeroMQTunnel/fileMover.py b/ZeroMQTunnel/fileMover.py index b0d6b877..b6b84531 100644 --- a/ZeroMQTunnel/fileMover.py +++ b/ZeroMQTunnel/fileMover.py @@ -12,6 +12,8 @@ import sys import json import traceback from multiprocessing import Process, freeze_support +import subprocess +import json DEFAULT_CHUNK_SIZE = 1048576 # @@ -24,20 +26,16 @@ class WorkerProcess(): logfileFullPath = None zmqContextForWorker = None zmqMessageChunkSize = None - zmqCleanerIp = None # responsable to delete files - zmqCleanerPort = None # responsable to delete files fileWaitTime_inMs = None fileMaxWaitTime_InMs= None - def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize, zmqCleanerIp, zmqCleanerPort, + def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize, fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0): self.id = id self.dataStreamIp = dataStreamIp self.dataStreamPort = dataStreamPort self.logfileFullPath = logfileFullPath self.zmqMessageChunkSize = chunkSize - self.zmqCleanerIp = zmqCleanerIp - self.zmqCleanerPort = zmqCleanerPort self.fileWaitTime_inMs = fileWaitTimeInMs self.fileMaxWaitTime_InMs = fileMaxWaitTimeInMs @@ -98,12 +96,6 @@ class WorkerProcess(): jobCount = 0 - #init Cleaner message-pipe - cleanerSocket = zmqContextForWorker.socket(zmq.PUSH) - connectionStringCleanerSocket = "tcp://{ip}:{port}".format(ip=self.zmqCleanerIp, port=self.zmqCleanerPort) - cleanerSocket.connect(connectionStringCleanerSocket) - - while processingJobs: #sending a "ready"-signal to the router. #the reply will contain the actual job/task. @@ -162,21 +154,6 @@ class WorkerProcess(): - #send remove-request to message pipe - try: - #sending to pipe - logging.debug("send file-event to cleaner-pipe...") - cleanerSocket.send(workload) - logging.debug("send file-event to cleaner-pipe...success.") - - #TODO: remember workload. append to list? - # can be used to verify files which have been processed twice or more - except Exception, e: - errorMessage = "Unable to notify Cleaner-pipe to delete file: " + str(filename) - logging.error(errorMessage) - logging.debug("fileEventMessageDict=" + str(fileEventMessageDict)) - - def getFileWaitTimeInMs(self): waitTime = 2000.0 return waitTime @@ -398,6 +375,7 @@ class FileMover(): dataStreamPort = "6061" # port number of dataStream-socket to push new files to fileWaitTimeInMs = None fileMaxWaitTimeInMs = None + pipe_name = "/tmp/zeromqllpipe_resp" currentZmqDataStreamSocketListIndex = None # Index-Number of a socket used to send datafiles to logfileFullPath = None @@ -535,6 +513,19 @@ class FileMover(): b'', fileEventMessage, ]) + + # inform lsyncd wrapper that the file can be moved + +# if not os.path.exists(self.pipe_name): +# os.mkfifo(self.pipe_name) + + +# messageToPipe = json.loads ( fileEventMessage ) +# messageToPipe = messageToPipe["sourcePath"] + os.sep + messageToPipe["filename"] +# my_cmd = 'echo "' + messageToPipe + '" > ' + self.pipe_name +# p = subprocess.Popen ( my_cmd, shell=True ) +# p.communicate() + logging.debug("passing job to workerThread...done.") -- GitLab