From 5ed12fb40b69f2927ae3d0269570de973b8d0391 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Mon, 27 Jul 2015 11:24:16 +0200 Subject: [PATCH] Every Class has its only logger + fix mv + fix context bug --- ZeroMQTunnel/fileMover.py | 402 +++++++++++++++++++------------------- 1 file changed, 197 insertions(+), 205 deletions(-) diff --git a/ZeroMQTunnel/fileMover.py b/ZeroMQTunnel/fileMover.py index 4e3933dd..842aef9c 100644 --- a/ZeroMQTunnel/fileMover.py +++ b/ZeroMQTunnel/fileMover.py @@ -24,40 +24,41 @@ class WorkerProcess(): id = None dataStreamIp = None dataStreamPort = None - logfileFullPath = None zmqContextForWorker = None zmqMessageChunkSize = None fileWaitTime_inMs = None fileMaxWaitTime_InMs = None - zmqCleanerIp = None # responsable to delete files - zmqCleanerPort = None # responsable to delete files + zmqCleanerIp = None # responsable to delete files + zmqCleanerPort = None # responsable to delete files zmqDataStreamSocket = None - def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize, zmqCleanerIp, zmqCleanerPort, + # to get the logging only handling this class + log = None + + def __init__(self, id, dataStreamIp, dataStreamPort, chunkSize, zmqCleanerIp, zmqCleanerPort, fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0): self.id = id self.dataStreamIp = dataStreamIp self.dataStreamPort = dataStreamPort - self.logfileFullPath = logfileFullPath self.zmqMessageChunkSize = chunkSize self.zmqCleanerIp = zmqCleanerIp self.zmqCleanerPort = zmqCleanerPort self.fileWaitTime_inMs = fileWaitTimeInMs self.fileMaxWaitTime_InMs = fileMaxWaitTimeInMs - self.initLogging(logfileFullPath) + self.log = self.getLogger() try: self.process() except KeyboardInterrupt: # trace = traceback.format_exc() - logging.debug("KeyboardInterrupt detected. Shutting down workerProcess.") + self.log.debug("KeyboardInterrupt detected. Shutting down workerProcess.") # self.zmqDataStreamSocket.close() # self.zmqContextForWorker.destroy() else: trace = traceback.format_exc() - logging.error("Stopping workerProcess due to unknown error condition.") - logging.debug("Error was: " + str(trace)) + self.log.error("Stopping workerProcess due to unknown error condition.") + self.log.debug("Error was: " + str(trace)) def process(self): @@ -85,7 +86,7 @@ class WorkerProcess(): dataStreamIp = self.dataStreamIp dataStreamPort = self.dataStreamPort - logging.debug("new workerThread started. id=" + str(id)) + self.log.debug("new workerThread started. id=" + str(id)) #initialize router zmqContextForWorker = zmq.Context() @@ -111,18 +112,18 @@ class WorkerProcess(): while processingJobs: #sending a "ready"-signal to the router. #the reply will contain the actual job/task. - logging.debug("worker-"+str(id)+": sending ready signal") + self.log.debug("worker-"+str(id)+": sending ready signal") routerSocket.send(b"READY") # Get workload from router, until finished - logging.debug("worker-"+str(id)+": waiting for new job") + self.log.debug("worker-"+str(id)+": waiting for new job") workload = routerSocket.recv() - logging.debug("worker-"+str(id)+": new job received") + self.log.debug("worker-"+str(id)+": new job received") finished = workload == b"END" if finished: processingJobs = False - logging.debug("router requested to shutdown worker-thread. Worker processed: %d files" % jobCount) + self.log.debug("router requested to shutdown worker-thread. Worker processed: %d files" % jobCount) break jobCount += 1 @@ -130,11 +131,11 @@ class WorkerProcess(): fileEventMessageDict = None try: fileEventMessageDict = json.loads(str(workload)) - logging.debug("str(messageDict) = " + str(fileEventMessageDict) + " type(messageDict) = " + str(type(fileEventMessageDict))) + self.log.debug("str(messageDict) = " + str(fileEventMessageDict) + " type(messageDict) = " + str(type(fileEventMessageDict))) except Exception, e: errorMessage = "Unable to convert message into a dictionary." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) #extract fileEvent metadata @@ -145,22 +146,22 @@ class WorkerProcess(): relativeParent = fileEventMessageDict["relativeParent"] except Exception, e: errorMessage = "Invalid fileEvent message received." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.debug("fileEventMessageDict=" + str(fileEventMessageDict)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) + self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict)) #skip all further instructions and continue with next iteration continue #passing file to data-messagPipe try: - logging.debug("worker-" + str(id) + ": passing new file to data-messagePipe...") + self.log.debug("worker-" + str(id) + ": passing new file to data-messagePipe...") self.passFileToDataStream(filename, sourcePath, relativeParent) - logging.debug("worker-" + str(id) + ": passing new file to data-messagePipe...success.") + self.log.debug("worker-" + str(id) + ": passing new file to data-messagePipe...success.") except Exception, e: errorMessage = "Unable to pass new file to data-messagePipe." - logging.error(errorMessage) - logging.error("Error was: " + str(e)) - logging.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.") + self.log.error(errorMessage) + self.log.error("Error was: " + str(e)) + self.log.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.") #skip all further instructions and continue with next iteration continue @@ -168,18 +169,21 @@ class WorkerProcess(): #send remove-request to message pipe try: #sending to pipe - logging.debug("send file-event to cleaner-pipe...") + self.log.debug("send file-event to cleaner-pipe...") cleanerSocket.send(workload) - logging.debug("send file-event to cleaner-pipe...success.") + self.log.debug("send file-event to cleaner-pipe...success.") #TODO: remember workload. append to list? # can be used to verify files which have been processed twice or more except Exception, e: errorMessage = "Unable to notify Cleaner-pipe to delete file: " + str(filename) - logging.error(errorMessage) - logging.debug("fileEventMessageDict=" + str(fileEventMessageDict)) + self.log.error(errorMessage) + self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict)) + def getLogger(self): + logger = logging.getLogger("workerProcess") + return logger def getFileWaitTimeInMs(self): @@ -211,47 +215,47 @@ class WorkerProcess(): while fileIsStillInUse: #skip waiting periode if waiting to long for file to get closed if time.time() - timeStartWaiting >= (fileMaxWaitTimeInMs / 1000): - logging.debug("waited to long for file getting closed. aborting") + self.log.debug("waited to long for file getting closed. aborting") break #wait for other process to finish file access #grabs time when file was modified last statInfo = os.stat(sourceFilePathFull) fileLastModified = statInfo.st_mtime - logging.debug("'" + str(sourceFilePathFull) + "' modified last: " + str(fileLastModified)) + self.log.debug("'" + str(sourceFilePathFull) + "' modified last: " + str(fileLastModified)) timeNow = time.time() timeDiff = timeNow - fileLastModified - logging.debug("timeNow=" + str(timeNow) + " timeDiff=" + str(timeDiff)) + self.log.debug("timeNow=" + str(timeNow) + " timeDiff=" + str(timeDiff)) waitTimeInSeconds = fileWaitTimeInMs/1000 if timeDiff >= waitTimeInSeconds: fileIsStillInUse = False - logging.debug("File was not modified within past " + str(fileWaitTimeInMs) + "ms.") + self.log.debug("File was not modified within past " + str(fileWaitTimeInMs) + "ms.") else: - logging.debug("still waiting for file to get closed...") + self.log.debug("still waiting for file to get closed...") time.sleep(fileWaitTimeInMs / 1000 ) #for quick testing set filesize of file as chunksize - logging.debug("get filesize for '" + str(sourceFilePathFull) + "'...") + self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...") filesize = os.path.getsize(sourceFilePathFull) fileModificationTime = os.stat(sourceFilePathFull).st_mtime chunksize = filesize #can be used later on to split multipart message - logging.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize))) - logging.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime))) + self.log.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize))) + self.log.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime))) except Exception, e: errorMessage = "Unable to get file metadata for '" + str(sourceFilePathFull) + "'." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) raise Exception(e) try: - logging.debug("opening '" + str(sourceFilePathFull) + "'...") + self.log.debug("opening '" + str(sourceFilePathFull) + "'...") fileDescriptor = open(str(sourceFilePathFull), "rb") except Exception, e: errorMessage = "Unable to read source file '" + str(sourceFilePathFull) + "'." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) raise Exception(e) @@ -260,14 +264,14 @@ class WorkerProcess(): payloadMetadata = self.buildPayloadMetadata(filename, filesize, fileModificationTime, sourcePath, relativeParent) except Exception, e: errorMessage = "Unable to assemble multi-part message." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) raise Exception(e) #send message try: - logging.debug("Passing multipart-message...") + self.log.debug("Passing multipart-message...") chunkNumber = 0 stillChunksToRead = True while stillChunksToRead: @@ -298,11 +302,11 @@ class WorkerProcess(): fileDescriptor.close() # self.zmqDataStreamSocket.send_multipart(multipartMessage) - logging.debug("Passing multipart-message...done.") + self.log.debug("Passing multipart-message...done.") except Exception, e: - logging.error("Unable to send multipart-message") - logging.debug("Error was: " + str(e)) - logging.info("Passing multipart-message...failed.") + self.log.error("Unable to send multipart-message") + self.log.debug("Error was: " + str(e)) + self.log.info("Passing multipart-message...failed.") raise Exception(e) @@ -310,7 +314,7 @@ class WorkerProcess(): def appendFileChunksToPayload(self, payload, sourceFilePathFull, fileDescriptor, chunkSize): try: # chunksize = 16777216 #16MB - logging.debug("reading file '" + str(sourceFilePathFull)+ "' to memory") + self.log.debug("reading file '" + str(sourceFilePathFull)+ "' to memory") # FIXME: chunk is read-out as str. why not as bin? will probably add to much overhead to zmq-message fileContentAsByteObject = fileDescriptor.read(chunkSize) @@ -335,7 +339,7 @@ class WorkerProcess(): """ #add metadata to multipart - logging.debug("create metadata for source file...") + self.log.debug("create metadata for source file...") metadataDict = { "filename" : filename, "filesize" : filesize, @@ -344,8 +348,7 @@ class WorkerProcess(): "relativeParent" : relativeParent, "chunkSize" : self.getChunkSize()} - logging.debug("metadataDict = " + str(metadataDict)) - + self.log.debug("metadataDict = " + str(metadataDict)) return metadataDict @@ -364,31 +367,13 @@ class WorkerProcess(): # print "{number:.{digits}f}".format(number=freeUserSpaceLeft_percent, digits=0) # print int(freeUserSpaceLeft_percent) - logging.debug("vfsstat: freeSpaceAvailableForUser=" + str(freeSpaceAvailableForUser_gigabytes)+ " Gigabytes " + self.log.debug("vfsstat: freeSpaceAvailableForUser=" + str(freeSpaceAvailableForUser_gigabytes)+ " Gigabytes " + " (" + str(int(freeUserSpaceLeft_percent)) + "% free disk space left)") #warn if disk space is running low highWaterMark = 85 if int(freeUserSpaceLeft_percent) >= int(highWaterMark): - logging.warning("Running low in disk space! " + str(int(freeUserSpaceLeft_percent)) + "% free disk space left.") - - - def initLogging(self, filenameFullPath): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - #log everything to file - logging.basicConfig(level=logging.DEBUG, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d_%H:%M', - filename=filenameFullPath, - filemode="a") - - #log info to stdout, display messages with different format than the file output - console = logging.StreamHandler() - console.setLevel(logging.INFO) - formatter = logging.Formatter("%(asctime)s > %(message)s") - console.setFormatter(formatter) - logging.getLogger("").addHandler(console) + self.log.warning("Running low in disk space! " + str(int(freeUserSpaceLeft_percent)) + "% free disk space left.") @@ -414,42 +399,22 @@ class FileMover(): pipe_name = "/tmp/zeromqllpipe_resp" currentZmqDataStreamSocketListIndex = None # Index-Number of a socket used to send datafiles to - logfileFullPath = None - chunkSize = None + chunkSize = None + # to get the logging only handling this class + log = None - def process(self): - try: - self.startReceiving() - except zmq.error.ZMQError as e: - logging.error("ZMQError: "+ str(e)) - log.debug("Shutting down workerProcess.") - except KeyboardInterrupt: - logging.debug("KeyboardInterrupt detected. Shutting down fileMover.") - logging.info("Shutting down fileMover as KeyboardInterrupt was detected.") - self.zmqContext.destroy() - except: - trace = traceback.format_exc() - logging.info("Stopping fileMover due to unknown error condition.") - logging.debug("Error was: " + str(trace)) - - - def __init__(self, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams, - logfileFullPath, chunkSize, zmqCleanerIp, zmqCleanerPort, + def __init__(self, context, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams, + chunkSize, zmqCleanerIp, zmqCleanerPort, fileWaitTimeInMs, fileMaxWaitTimeInMs): - logging.info("registering zmq global context") - #create zmq context - zmqContext = zmq.Context() - - self.zmqContext = zmqContext + self.zmqContext = context self.bindingIpForSocket = bindingIpForSocket self.tcpPort_messageStream = bindingPortForSocket self.dataStreamIp = dataStreamIp self.dataStreamPort = dataStreamPort self.parallelDataStreams = parallelDataStreams - self.logfileFullPath = logfileFullPath self.chunkSize = chunkSize self.zmqCleanerIp = zmqCleanerIp self.zmqCleanerPort = zmqCleanerPort @@ -461,6 +426,30 @@ class FileMover(): self.messageSocket = self.getZmqSocket_Pull(self.zmqContext) self.dataSocket = self.getZmqSocket_Push(self.zmqContext) + self.log = self.getLogger() + self.log.debug("Init") + + + def process(self): + try: + self.startReceiving() + except zmq.error.ZMQError as e: + self.log.error("ZMQError: "+ str(e)) + self.log.debug("Shutting down workerProcess.") + except KeyboardInterrupt: + self.log.debug("KeyboardInterrupt detected. Shutting down fileMover.") + self.log.info("Shutting down fileMover as KeyboardInterrupt was detected.") + self.zmqContext.destroy() + except: + trace = traceback.format_exc() + self.log.info("Stopping fileMover due to unknown error condition.") + self.log.debug("Error was: " + str(trace)) + + + def getLogger(self): + logger = logging.getLogger("fileMover") + return logger + def getFileWaitTimeInMs(self): return self.fileWaitTimeInMs @@ -474,10 +463,10 @@ class FileMover(): #create socket zmqContext = self.zmqContext zmqSocketForNewFileEvents = self.createPullSocket() - logging.debug("new message-socket crated for: new file events.") + self.log.debug("new message-socket crated for: new file events.") parallelDataStreams = int(self.parallelDataStreams) - logging.debug("new message-socket crated for: passing file objects.") + self.log.debug("new message-socket crated for: passing file objects.") incomingMessageCounter = 0 @@ -486,7 +475,7 @@ class FileMover(): #each worker-thread will handle a file event routerSocket = self.zmqContext.socket(zmq.ROUTER) routerSocket.bind("tcp://127.0.0.1:50000") - logging.debug("routerSocket started for 'tcp://127.0.0.1:50000'") + self.log.debug("routerSocket started for 'tcp://127.0.0.1:50000'") #start worker-threads. each will have its own PushSocket. @@ -495,11 +484,10 @@ class FileMover(): fileWaitTimeInMs = self.getFileWaitTimeInMs() fileMaxWaitTimeInMs = self.getFileMaxWaitTimeInMs() for threadNumber in range(numberOfWorkerThreads): - logging.debug("instantiate new workerProcess (nr " + str(threadNumber)) + self.log.debug("instantiate new workerProcess (nr " + str(threadNumber)) newWorkerThread = Process(target=WorkerProcess, args=(threadNumber, self.dataStreamIp, self.dataStreamPort, - logfileFullPath, self.chunkSize, zmqCleanerIp, zmqCleanerPort, @@ -507,26 +495,25 @@ class FileMover(): fileMaxWaitTimeInMs)) workerThreadList.append(newWorkerThread) - logging.debug("start worker process nr " + str(threadNumber)) + self.log.debug("start worker process nr " + str(threadNumber)) newWorkerThread.start() #run loop, and wait for incoming messages continueReceiving = True - logging.debug("waiting for new fileEvent-messages") + self.log.debug("waiting for new fileEvent-messages") while continueReceiving: try: incomingMessage = zmqSocketForNewFileEvents.recv() - logging.debug("new fileEvent-message received.") - logging.debug("message content: " + str(incomingMessage)) + self.log.debug("new fileEvent-message received.") + self.log.debug("message content: " + str(incomingMessage)) incomingMessageCounter += 1 - logging.debug("processFileEvent...") + self.log.debug("processFileEvent...") self.processFileEvent(incomingMessage, routerSocket) #TODO refactor as separate process to emphasize unblocking - logging.debug("processFileEvent...done") + self.log.debug("processFileEvent...done") except Exception, e: - print "exception" - logging.error("Failed to receive new fileEvent-message.") - logging.error(sys.exc_info()) + self.log.error("Failed to receive new fileEvent-message.") + self.log.error(sys.exc_info()) #TODO might using a error-count and threshold when to stop receiving, e.g. after 100 misses? # continueReceiving = False @@ -534,26 +521,26 @@ class FileMover(): print "shutting down fileEvent-receiver..." try: - logging.debug("shutting down zeromq...") + self.log.debug("shutting down zeromq...") self.stopReceiving(zmqSocketForNewFileEvents, zmqContext) - logging.debug("shutting down zeromq...done.") + self.log.debug("shutting down zeromq...done.") except: - logging.error(sys.exc_info()) - logging.error("shutting down zeromq...failed.") + self.log.error(sys.exc_info()) + self.log.error("shutting down zeromq...failed.") def routeFileEventToWorkerThread(self, fileEventMessage, routerSocket): # LRU worker is next waiting in the queue - logging.debug("waiting for available workerThread.") + self.log.debug("waiting for available workerThread.") # address == "worker-0" # empty == "" # ready == "READY" address, empty, ready = routerSocket.recv_multipart() - logging.debug("available workerThread detected.") + self.log.debug("available workerThread detected.") - logging.debug("passing job to workerThread...") + self.log.debug("passing job to workerThread...") routerSocket.send_multipart([ address, b'', @@ -572,7 +559,7 @@ class FileMover(): # p = subprocess.Popen ( my_cmd, shell=True ) # p.communicate() - logging.debug("passing job to workerThread...done.") + self.log.debug("passing job to workerThread...done.") def processFileEvent(self, fileEventMessage, routerSocket): @@ -581,20 +568,20 @@ class FileMover(): def stopReceiving(self, zmqSocket, msgContext): try: - logging.debug("closing zmqSocket...") + self.log.debug("closing zmqSocket...") zmqSocket.close() - logging.debug("closing zmqSocket...done.") + self.log.debug("closing zmqSocket...done.") except: - logging.debug("closing zmqSocket...failed.") - logging.error(sys.exc_info()) + self.log.debug("closing zmqSocket...failed.") + self.log.error(sys.exc_info()) try: - logging.debug("closing zmqContext...") + self.log.debug("closing zmqContext...") msgContext.destroy() - logging.debug("closing zmqContext...done.") + self.log.debug("closing zmqContext...done.") except: - logging.debug("closing zmqContext...failed.") - logging.error(sys.exc_info()) + self.log.debug("closing zmqContext...failed.") + self.log.error(sys.exc_info()) def getZmqSocket_Pull(self, context): @@ -618,7 +605,7 @@ class FileMover(): #get default message-socket socket = self.messageSocket - logging.info("binding to message socket: tcp://" + self.bindingIpForSocket + ":%s" % self.tcpPort_messageStream) + self.log.info("binding to message socket: tcp://" + self.bindingIpForSocket + ":%s" % self.tcpPort_messageStream) socket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.tcpPort_messageStream) return socket @@ -635,68 +622,46 @@ class Cleaner(): - poll the watched directory and reissue new files to fileMover which have not been detected yet """ - logfileFullPath = None bindingPortForSocket = None bindingIpForSocket = None zmqContextForCleaner = None + # to get the logging only handling this class + log = None - def __init__(self, logfilePath, context, bindingIp="127.0.0.1", bindingPort="6062", verbose=False): + def __init__(self, context, bindingIp="127.0.0.1", bindingPort="6062", verbose=False): self.bindingPortForSocket = bindingPort self.bindingIpForSocket = bindingIp - self.initLogging(logfilePath, verbose) - log = self.getLogger() + self.zmqContextForCleaner = context + + self.log = self.getLogger() + self.log.debug("Init") + try: self.process() except zmq.error.ZMQError: - logging.error("ZMQError: "+ str(e)) - log.debug("Shutting down workerProcess.") + self.log.error("ZMQError: "+ str(e)) + self.log.debug("Shutting down workerProcess.") self.zmqContextForCleaner.destroy() except KeyboardInterrupt: - log.debug("KeyboardInterrupt detected. Shutting down workerProcess.") + self.log.debug("KeyboardInterrupt detected. Shutting down workerProcess.") self.zmqContextForCleaner.destroy() except: trace = traceback.format_exc() - log.error("Stopping cleanerProcess due to unknown error condition.") - log.debug("Error was: " + str(trace)) + self.log.error("Stopping cleanerProcess due to unknown error condition.") + self.log.debug("Error was: " + str(trace)) def getLogger(self): logger = logging.getLogger("cleaner") return logger - def initLogging(self, logfilePath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - logfilePathFull = os.path.join(logfilePath, "cleaner.log") - logger = logging.getLogger("cleaner") - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - - #log everything to file - fileHandler = logging.FileHandler(filename=logfilePathFull, - mode="a") - fileHandlerFormat = logging.Formatter(datefmt='%Y-%m-%d_%H:%M:%S', - fmt='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s') - fileHandler.setFormatter(fileHandlerFormat) - fileHandler.setLevel(loggingLevel) - logger.addHandler(fileHandler) - def process(self): processingJobs = True - log = self.getLogger() - - #create zmq context - zmqContextForCleaner = zmq.Context() - self.zmqContextForCleaner = zmqContextForCleaner #bind to local port - zmqJobSocket = zmqContextForCleaner.socket(zmq.PULL) + zmqJobSocket = self.zmqContextForCleaner.socket(zmq.PULL) zmqJobSocket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.bindingPortForSocket) #processing messaging @@ -705,7 +670,7 @@ class Cleaner(): try: workload = zmqJobSocket.recv() except Exception as e: - logging.error("Error in receiving job: " + str(e)) + self.log.error("Error in receiving job: " + str(e)) #transform to dictionary @@ -713,8 +678,8 @@ class Cleaner(): workloadDict = json.loads(str(workload)) except: errorMessage = "invalid job received. skipping job" - log.error(errorMessage) - log.debug("workload=" + str(workload)) + self.log.error(errorMessage) + self.log.debug("workload=" + str(workload)) continue #extract fileEvent metadata @@ -727,20 +692,23 @@ class Cleaner(): # filesize = workloadDict["filesize"] except Exception, e: errorMessage = "Invalid fileEvent message received." - log.error(errorMessage) - log.debug("Error was: " + str(e)) - log.debug("workloadDict=" + str(workloadDict)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) + self.log.debug("workloadDict=" + str(workloadDict)) #skip all further instructions and continue with next iteration continue #moving source file sourceFilepath = None try: - logging.debug("removing source file...") + self.log.debug("removing source file...") #generate target filepath -# sourceFilepath = os.path.join(sourcePath,filename) - self.moveFile(sourceFilepath) - self.moveFile(sourcePath, filename, target) + sourceFilepath = os.path.join(sourcePath,filename) +# self.removeFile(sourceFilepath) + self.log.debug ("sourcePath: " + str (sourcePath)) + self.log.debug ("filename: " + str (filename)) + self.log.debug ("targetPath: " + str (targetPath)) + self.moveFile(sourcePath, filename, targetPath) # #show filesystem statistics # try: @@ -748,29 +716,28 @@ class Cleaner(): # except Exception, f: # logging.warning("Unable to get filesystem statistics") # logging.debug("Error was: " + str(f)) - log.debug("file removed: " + str(sourceFilepath)) - log.debug("removing source file...success.") + self.log.debug("file removed: " + str(sourcePath)) + self.log.debug("removing source file...success.") except Exception, e: errorMessage = "Unable to remove source file." - log.error(errorMessage) + self.log.error(errorMessage) trace = traceback.format_exc() - log.error("Error was: " + str(trace)) - log.debug("sourceFilepath="+str(sourceFilepath)) - log.debug("removing source file...failed.") + self.log.error("Error was: " + str(trace)) + self.log.debug("sourceFilepath="+str(sourceFilepath)) + self.log.debug("removing source file...failed.") #skip all further instructions and continue with next iteration continue def moveFile(self, source, filename, target): - log = self.getLogger() maxAttemptsToRemoveFile = 2 waitTimeBetweenAttemptsInMs = 500 iterationCount = 0 - log.info("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.") + self.log.info("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.") fileWasMoved = False while iterationCount <= maxAttemptsToRemoveFile and not fileWasMoved: @@ -783,32 +750,30 @@ class Cleaner(): except OSError: pass # moving the file - os.remove(filepath) shutil.move(source + os.sep + filename, target + os.sep + filename) fileWasMoved = True - log.debug("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.") + self.log.debug("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.") except Exception, e: trace = traceback.format_exc() warningMessage = "Unable to move file {FILE}.".format(FILE=str(source) + str(filename)) - log.warning(warningMessage) - log.debug("trace=" + str(trace)) - log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs))) + self.log.warning(warningMessage) + self.log.debug("trace=" + str(trace)) + self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs))) if not fileWasMoved: - log.error("Moving file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.") + self.log.error("Moving file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.") raise Exception("maxAttemptsToMoveFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount), FILE=filepath)) def removeFile(self, filepath): - log = self.getLogger() maxAttemptsToRemoveFile = 2 waitTimeBetweenAttemptsInMs = 500 iterationCount = 0 - log.info("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...") + self.log.info("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...") fileWasRemoved = False while iterationCount <= maxAttemptsToRemoveFile and not fileWasRemoved: @@ -816,17 +781,17 @@ class Cleaner(): try: os.remove(filepath) fileWasRemoved = True - log.debug("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...success.") + self.log.debug("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...success.") except Exception, e: trace = traceback.format_exc() warningMessage = "Unable to remove file {FILE}.".format(FILE=str(filepath)) - log.warning(warningMessage) - log.debug("trace=" + str(trace)) - log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs))) + self.log.warning(warningMessage) + self.log.debug("trace=" + str(trace)) + self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs))) if not fileWasRemoved: - log.error("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...FAILED.") + self.log.error("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...FAILED.") raise Exception("maxAttemptsToRemoveFile reached (value={ATTEMPT}). Unable to remove file '{FILE}'.".format(ATTEMPT=str(iterationCount), FILE=filepath)) @@ -902,14 +867,35 @@ def initLogging(filenameFullPath, verbose): #log info to stdout, display messages with different format than the file output console = logging.StreamHandler() - - console.setLevel(logging.WARNING) formatter = logging.Formatter("%(asctime)s > %(message)s") console.setFormatter(formatter) + logging.getLogger("").addHandler(console) +# def initLogging(self, logfilePath, verbose): +# #@see https://docs.python.org/2/howto/logging-cookbook.html +# +# logfilePathFull = os.path.join(logfilePath, "cleaner.log") +# logger = logging.getLogger("cleaner") +# +# #more detailed logging if verbose-option has been set +# loggingLevel = logging.INFO +# if verbose: +# loggingLevel = logging.DEBUG +# +# +# #log everything to file +# fileHandler = logging.FileHandler(filename=logfilePathFull, +# mode="a") +# fileHandlerFormat = logging.Formatter(datefmt='%Y-%m-%d_%H:%M:%S', +# fmt='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s') +# fileHandler.setFormatter(fileHandlerFormat) +# fileHandler.setLevel(loggingLevel) +# logger.addHandler(fileHandler) + + if __name__ == '__main__': freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows arguments = argumentParsing() @@ -931,18 +917,24 @@ if __name__ == '__main__': fileMaxWaitTimeInMs = float(arguments.fileMaxWaitTimeInMs) - cleanerThread = Process(target=Cleaner, args=(logfilePath, zmqCleanerIp, zmqCleanerPort)) - logging.debug("cleaner thread started") - cleanerThread.start() - #enable logging initLogging(logfileFullPath, verbose) + #create zmq context + # there should be only one context in one process + zmqContext = zmq.Context() + logging.info("registering zmq global context") + + + cleanerThread = Process(target=Cleaner, args=(zmqContext, zmqCleanerIp, zmqCleanerPort)) + logging.debug("cleaner thread started") + cleanerThread.start() + #start new fileMover # try: - fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, - parallelDataStreams, logfileFullPath, chunkSize, + fileMover = FileMover(zmqContext, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, + parallelDataStreams, chunkSize, zmqCleanerIp, zmqCleanerPort, fileWaitTimeInMs, fileMaxWaitTimeInMs) fileMover.process() -- GitLab