From fa000fdb3fd40dc024230178ce453263145cbb32 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Wed, 15 Jul 2015 09:01:19 +0200 Subject: [PATCH] Removed cleaner --- ZeroMQTunnel/fileMover.py | 196 +------------------------------------- 1 file changed, 3 insertions(+), 193 deletions(-) diff --git a/ZeroMQTunnel/fileMover.py b/ZeroMQTunnel/fileMover.py index 87e0dd5a..b0d6b877 100644 --- a/ZeroMQTunnel/fileMover.py +++ b/ZeroMQTunnel/fileMover.py @@ -1,6 +1,6 @@ from __builtin__ import open, type -__author__ = 'Marco Strutz <marco.strutz@desy.de>' +__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <manuela.kuhn@desy.de>' import time @@ -396,8 +396,6 @@ class FileMover(): tcpPort_messageStream = "6060" dataStreamIp = "127.0.0.1" # ip of dataStream-socket to push new files to dataStreamPort = "6061" # port number of dataStream-socket to push new files to - zmqCleanerIp = "127.0.0.1" # zmq pull endpoint, responsable to delete files - zmqCleanerPort = "6062" # zmq pull endpoint, responsable to delete files fileWaitTimeInMs = None fileMaxWaitTimeInMs = None @@ -420,7 +418,7 @@ class FileMover(): def __init__(self, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams, - logfileFullPath, chunkSize, zmqCleanerIp, zmqCleanerPort, + logfileFullPath, chunkSize, fileWaitTimeInMs, fileMaxWaitTimeInMs): logging.info("registering zmq global context") @@ -435,8 +433,6 @@ class FileMover(): self.parallelDataStreams = parallelDataStreams self.logfileFullPath = logfileFullPath self.chunkSize = chunkSize - self.zmqCleanerIp = zmqCleanerIp - self.zmqCleanerPort = zmqCleanerPort self.fileWaitTimeInMs = fileWaitTimeInMs self.fileMaxWaitTimeInMs = fileMaxWaitTimeInMs @@ -483,8 +479,6 @@ class FileMover(): self.dataStreamPort, logfileFullPath, self.chunkSize, - zmqCleanerIp, - zmqCleanerPort, fileWaitTimeInMs, fileMaxWaitTimeInMs)) workerThreadList.append(newWorkerThread) @@ -510,7 +504,7 @@ class FileMover(): logging.error("Failed to receive new fileEvent-message.") logging.error(sys.exc_info()) - #TODO might using a error-count and threashold when to stop receiving, e.g. after 100 misses? + #TODO might using a error-count and threshold when to stop receiving, e.g. after 100 misses? # continueReceiving = False @@ -593,180 +587,6 @@ class FileMover(): -class Cleaner(): - """ - * received cleaning jobs via zeromq, - such as removing a file - * Does regular checks on the watched directory, - such as - - deleting files which have been successfully send - to target but still remain in the watched directory - - poll the watched directory and reissue new files - to fileMover which have not been detected yet - """ - logfileFullPath = None - bindingPortForSocket = None - bindingIpForSocket = None - zmqContextForCleaner = None - - def __init__(self, logfilePath, bindingIp="127.0.0.1", bindingPort="6062", verbose=False): - self.bindingPortForSocket = bindingPort - self.bindingIpForSocket = bindingIp - self.initLogging(logfilePath, verbose) - log = self.getLogger() - try: - self.process() - except KeyboardInterrupt: - log.debug("KeyboardInterrupt detected. Shutting down workerProcess.") - self.zmqContextForCleaner.destroy() - else: - trace = traceback.format_exc() - log.error("Stopping cleanerProcess due to unknown error condition.") - log.debug("Error was: " + str(trace)) - - - def getLogger(self): - logger = logging.getLogger("cleaner") - return logger - - def initLogging(self, logfilePath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - logfilePathFull = os.path.join(logfilePath, "cleaner.log") - logger = logging.getLogger("cleaner") - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - - #log everything to file - fileHandler = logging.FileHandler(filename=logfilePathFull, - mode="a") - fileHandlerFormat = logging.Formatter(datefmt='%Y-%m-%d_%H:%M:%S', - fmt='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s') - fileHandler.setFormatter(fileHandlerFormat) - fileHandler.setLevel(loggingLevel) - logger.addHandler(fileHandler) - - - - def process(self): - processingJobs = True - log = self.getLogger() - - #create zmq context - zmqContextForCleaner = zmq.Context() - self.zmqContextForCleaner = zmqContextForCleaner - - #bind to local port - zmqJobSocket = zmqContextForCleaner.socket(zmq.PULL) - zmqJobSocket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.bindingPortForSocket) - - #processing messaging - while processingJobs: - #waiting for new jobs - workload = zmqJobSocket.recv() - - #transform to dictionary - try: - workloadDict = json.loads(str(workload)) - except: - errorMessage = "invalid job received. skipping job" - log.error(errorMessage) - log.debug("workload=" + str(workload)) - continue - - #extract fileEvent metadata - try: - #TODO validate fileEventMessageDict dict - filename = workloadDict["filename"] - sourcePath = workloadDict["sourcePath"] - relativeParent = workloadDict["relativeParent"] - # filesize = workloadDict["filesize"] - except Exception, e: - errorMessage = "Invalid fileEvent message received." - log.error(errorMessage) - log.debug("Error was: " + str(e)) - log.debug("workloadDict=" + str(workloadDict)) - #skip all further instructions and continue with next iteration - continue - - #removing source file - sourceFilepath = None - try: - logging.debug("removing source file...") - #generate target filepath - sourceFilepath = os.path.join(sourcePath,filename) - self.removeFile(sourceFilepath) - - # #show filesystem statistics - # try: - # self.showFilesystemStatistics(sourcePath) - # except Exception, f: - # logging.warning("Unable to get filesystem statistics") - # logging.debug("Error was: " + str(f)) - log.debug("file removed: " + str(sourceFilepath)) - log.debug("removing source file...success.") - - except Exception, e: - errorMessage = "Unable to remove source file." - log.error(errorMessage) - trace = traceback.format_exc() - log.error("Error was: " + str(trace)) - log.debug("sourceFilepath="+str(sourceFilepath)) - log.debug("removing source file...failed.") - #skip all further instructions and continue with next iteration - continue - - - - def removeFile(self, filepath): - log = self.getLogger() - maxAttemptsToRemoveFile = 2 - waitTimeBetweenAttemptsInMs = 500 - - - iterationCount = 0 - log.info("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...") - fileWasRemoved = False - - while iterationCount <= maxAttemptsToRemoveFile and not fileWasRemoved: - iterationCount+=1 - try: - os.remove(filepath) - fileWasRemoved = True - log.debug("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...success.") - except Exception, e: - trace = traceback.format_exc() - warningMessage = "Unable to remove file {FILE}.".format(FILE=str(filepath)) - log.warning(warningMessage) - log.debug("trace=" + str(trace)) - log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs))) - - - if not fileWasRemoved: - log.error("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...FAILED.") - raise Exception("maxAttemptsToRemoveFile reached (value={ATTEMPT}). Unable to remove file '{FILE}'.".format(ATTEMPT=str(iterationCount), - FILE=filepath)) - - def getNewFiles(self): - """ - returns a list with all files currently available - in the watched directory - """ - raise NotImplementedError - - - def sendNewFileEventToFileMover(self): - """ - sends a new-file-event to the fileMover-messageQueue - """ - raise NotImplementedError - - - def argumentParsing(): parser = argparse.ArgumentParser() parser.add_argument("--logfilePath" , type=str, help="path where logfile will be created", default="/tmp/log/") @@ -775,8 +595,6 @@ def argumentParsing(): parser.add_argument("--bindingPortForSocket", type=str, help="local port to bind to", default="6060") parser.add_argument("--dataStreamIp" , type=str, help="ip of dataStream-socket to push new files to", default="127.0.0.1") parser.add_argument("--dataStreamPort", type=str, help="port number of dataStream-socket to push new files to", default="6061") - parser.add_argument("--zmqCleanerIp" , type=str, help="zmq-pull-socket which deletes given files", default="127.0.0.1") - parser.add_argument("--zmqCleanerPort", type=str, help="zmq-pull-socket which deletes given files", default="6063") parser.add_argument("--parallelDataStreams", type=int, help="number of parallel data streams. default is 1", default="1") parser.add_argument("--chunkSize", type=int, help="chunk size of file-parts getting send via zmq", default=DEFAULT_CHUNK_SIZE) parser.add_argument("--verbose" , help="more verbose output", action="store_true") @@ -858,8 +676,6 @@ if __name__ == '__main__': logfilePath = str(arguments.logfilePath) logfileName = str(arguments.logfileName) parallelDataStreams = str(arguments.parallelDataStreams) - zmqCleanerIp = str(arguments.zmqCleanerIp) - zmqCleanerPort = str(arguments.zmqCleanerPort) chunkSize = arguments.chunkSize verbose = arguments.verbose logfileFullPath = os.path.join(logfilePath, logfileName) @@ -871,16 +687,10 @@ if __name__ == '__main__': initLogging(logfileFullPath, verbose) - cleanerThread = Process(target=Cleaner, args=(logfilePath, zmqCleanerIp, zmqCleanerPort)) - logging.debug("cleaner thread started") - cleanerThread.start() - - #start new fileMover # try: fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams, logfileFullPath, chunkSize, - zmqCleanerIp, zmqCleanerPort, fileWaitTimeInMs, fileMaxWaitTimeInMs) fileMover.process() # except KeyboardInterrupt, ke: -- GitLab