-
Manuela Kuhn authoredManuela Kuhn authored
Cleaner.py 9.58 KiB
from __builtin__ import open, type
__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <manuela.kuhn@desy.de>'
import time
import argparse
import zmq
import os
import logging
import sys
import json
import traceback
from multiprocessing import Process, freeze_support
import subprocess
import json
import shutil
import helperScript
DEFAULT_CHUNK_SIZE = 1048576
#
# -------------------------- class: Cleaner --------------------------------------
#
class Cleaner():
"""
* received cleaning jobs via zeromq,
such as removing a file
* Does regular checks on the watched directory,
such as
- deleting files which have been successfully send
to target but still remain in the watched directory
- poll the watched directory and reissue new files
to fileMover which have not been detected yet
"""
bindingPortForSocket = None
bindingIpForSocket = None
zmqContextForCleaner = None
externalContext = None # if the context was created outside this class or not
zmqCleanerSocket = None
# to get the logging only handling this class
log = None
def __init__(self, targetPath, bindingIp="127.0.0.1", bindingPort="6062", context = None, verbose=False):
self.bindingPortForSocket = bindingPort
self.bindingIpForSocket = bindingIp
self.targetPath = targetPath
if context:
self.zmqContextForCleaner = context
self.externalContext = True
else:
self.zmqContextForCleaner = zmq.Context()
self.externalContext = False
self.log = self.getLogger()
self.log.debug("Init")
#bind to local port
self.zmqCleanerSocket = self.zmqContextForCleaner.socket(zmq.PULL)
connectionStrCleanerSocket = "tcp://" + self.bindingIpForSocket + ":%s" % self.bindingPortForSocket
self.zmqCleanerSocket.bind(connectionStrCleanerSocket)
self.log.debug("zmqCleanerSocket started for '" + connectionStrCleanerSocket + "'")
try:
self.process()
except zmq.error.ZMQError:
self.log.error("ZMQError: "+ str(e))
self.log.debug("Shutting down cleaner.")
except KeyboardInterrupt:
self.log.info("KeyboardInterrupt detected. Shutting down cleaner.")
except:
trace = traceback.format_exc()
self.log.error("Stopping cleanerProcess due to unknown error condition.")
self.log.debug("Error was: " + str(trace))
self.stop()
def getLogger(self):
logger = logging.getLogger("cleaner")
return logger
def process(self):
#processing messaging
while True:
#waiting for new jobs
self.log.debug("Waiting for new jobs")
try:
workload = self.zmqCleanerSocket.recv()
except Exception as e:
self.log.error("Error in receiving job: " + str(e))
if workload == "STOP":
self.log.info("Stopping cleaner")
self.stop()
break
#transform to dictionary
try:
workloadDict = json.loads(str(workload))
except:
errorMessage = "invalid job received. skipping job"
self.log.error(errorMessage)
self.log.debug("workload=" + str(workload))
continue
#extract fileEvent metadata
try:
#TODO validate fileEventMessageDict dict
filename = workloadDict["filename"]
sourcePath = workloadDict["sourcePath"]
relativePath = workloadDict["relativePath"]
# filesize = workloadDict["filesize"]
except Exception, e:
errorMessage = "Invalid fileEvent message received."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
self.log.debug("workloadDict=" + str(workloadDict))
#skip all further instructions and continue with next iteration
continue
#moving source file
sourceFilepath = None
try:
self.log.debug("removing source file...")
#generate target filepath
sourcePath = os.path.normpath(sourcePath + os.sep + relativePath)
sourceFullPath = os.path.join(sourcePath,filename)
targetFullPath = os.path.normpath(self.targetPath + relativePath)
# self.removeFile(sourceFilepath)
self.log.debug ("sourcePath: " + str (sourcePath))
self.log.debug ("filename: " + str (filename))
self.log.debug ("targetPath: " + str (targetFullPath))
self.moveFile(sourcePath, filename, targetFullPath)
# #show filesystem statistics
# try:
# self.showFilesystemStatistics(sourcePath)
# except Exception, f:
# logging.warning("Unable to get filesystem statistics")
# logging.debug("Error was: " + str(f))
self.log.debug("file removed: " + str(sourceFullPath))
self.log.debug("removing source file...success.")
except Exception, e:
errorMessage = "Unable to remove source file: " + str (sourceFullPath)
self.log.error(errorMessage)
trace = traceback.format_exc()
self.log.error("Error was: " + str(trace))
self.log.debug("sourceFilepath="+str(sourceFilepath))
self.log.debug("removing source file...failed.")
#skip all further instructions and continue with next iteration
continue
def moveFile(self, source, filename, target):
maxAttemptsToRemoveFile = 2
waitTimeBetweenAttemptsInMs = 500
iterationCount = 0
self.log.info("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
fileWasMoved = False
while iterationCount <= maxAttemptsToRemoveFile and not fileWasMoved:
iterationCount+=1
try:
# check if the directory exists before moving the file
if not os.path.exists(target):
try:
os.makedirs(target)
except OSError:
pass
# moving the file
sourceFile = source + os.sep + filename
targetFile = target + os.sep + filename
self.log.debug("sourceFile: " + str(sourceFile))
self.log.debug("targetFile: " + str(targetFile))
shutil.move(sourceFile, targetFile)
fileWasMoved = True
self.log.debug("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
except IOError:
self.log.debug ("IOError: " + str(filename))
except Exception, e:
trace = traceback.format_exc()
warningMessage = "Unable to move file {FILE}.".format(FILE=str(source) + str(filename))
self.log.warning(warningMessage)
self.log.debug("trace=" + str(trace))
self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
if not fileWasMoved:
self.log.error("Moving file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.")
raise Exception("maxAttemptsToMoveFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount),
FILE=filename))
def removeFile(self, filepath):
maxAttemptsToRemoveFile = 2
waitTimeBetweenAttemptsInMs = 500
iterationCount = 0
self.log.info("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...")
fileWasRemoved = False
while iterationCount <= maxAttemptsToRemoveFile and not fileWasRemoved:
iterationCount+=1
try:
os.remove(filepath)
fileWasRemoved = True
self.log.debug("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...success.")
except Exception, e:
trace = traceback.format_exc()
warningMessage = "Unable to remove file {FILE}.".format(FILE=str(filepath))
self.log.warning(warningMessage)
self.log.debug("trace=" + str(trace))
self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
if not fileWasRemoved:
self.log.error("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...FAILED.")
raise Exception("maxAttemptsToRemoveFile reached (value={ATTEMPT}). Unable to remove file '{FILE}'.".format(ATTEMPT=str(iterationCount),
FILE=filepath))
def stop(self):
self.log.debug("Closing socket")
self.zmqCleanerSocket.close(0)
if not self.externalContext:
self.log.debug("Destroying context")
self.zmqContextForCleaner.destroy()