From 23bc45863e1e78e788f0da2cf5d41d90904f20d3 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Wed, 29 Jul 2015 16:14:39 +0200 Subject: [PATCH] Added different logger for each class in receiver --- ZeroMQTunnel/receiver.py | 157 ++++++++++++++++++++++----------------- 1 file changed, 87 insertions(+), 70 deletions(-) diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index f8f0a822..54154f7e 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -15,7 +15,6 @@ from stat import S_ISREG, ST_MTIME, ST_MODE import threading - class FileReceiver: zmqContext = None liveViewerZmqContext = None @@ -30,6 +29,8 @@ class FileReceiver: maxRingBufferSize = 200 timeToWaitForRingBuffer = 2 + log = None + # sockets zmqSocket = None exchangeSocket = None @@ -47,20 +48,23 @@ class FileReceiver: self.zmqContext = context or zmq.Context() + self.log = self.getLogger() + self.log.debug("Init") + # create pull socket self.zmqSocket = self.zmqContext.socket(zmq.PULL) connectionStrZmqSocket = "tcp://" + self.zmqDataStreamIp + ":%s" % self.zmqDataStreamPort self.zmqSocket.bind(connectionStrZmqSocket) - logging.debug("zmqSocket started for '" + connectionStrZmqSocket + "'") + self.log.debug("zmqSocket started for '" + connectionStrZmqSocket + "'") self.exchangeSocket = self.zmqContext.socket(zmq.PAIR) connectionStrExchangeSocket = "tcp://" + self.zmqLiveViewerExchangeIp + ":%s" % self.zmqLiveViewerExchangePort self.exchangeSocket.bind(connectionStrExchangeSocket) - logging.debug("exchangeSocket started (bind) for '" + connectionStrExchangeSocket + "'") + self.log.debug("exchangeSocket started (bind) for '" + connectionStrExchangeSocket + "'") # thread to communicate with live viewer - self.liveViewerThread = threading.Thread(target=liveViewer) + self.liveViewerThread = threading.Thread(target=LiveViewer) self.liveViewerThread.start() # initialize ring buffer @@ -78,19 +82,24 @@ class FileReceiver: ringBuffer = sorted(ringBuffer, reverse=True) try: - logging.info("Start receiving new files") + self.log.info("Start receiving new files") self.startReceiving() - logging.info("Stopped receiving.") + self.log.info("Stopped receiving.") except Exception, e: - logging.error("Unknown error while receiving files. Need to abort.") - logging.debug("Error was: " + str(e)) + self.log.error("Unknown error while receiving files. Need to abort.") + self.log.debug("Error was: " + str(e)) except: trace = traceback.format_exc() - logging.info("Unkown error state. Shutting down...") - logging.debug("Error was: " + str(trace)) + self.log.info("Unkown error state. Shutting down...") + self.log.debug("Error was: " + str(trace)) self.zmqContext.destroy() - logging.info("Quitting.") + self.log.info("Quitting.") + + + def getLogger(self): + logger = logging.getLogger("fileReceiver") + return logger def addFileToRingBuffer(self, filename, fileModTime): @@ -117,43 +126,40 @@ class FileReceiver: #TODO is string conversion needed here? payloadMetadata = str(multipartMessage[0]) except: - logging.error("an empty config was transferred for multipartMessage") + self.log.error("an empty config was transferred for multipartMessage") #TODO validate multipartMessage (like correct dict-values for metadata) - logging.debug("multipartMessage.metadata = " + str(payloadMetadata)) + self.log.debug("multipartMessage.metadata = " + str(payloadMetadata)) #extraction metadata from multipart-message payloadMetadataDict = json.loads(payloadMetadata) #append to file try: - logging.debug("append to file based on multipart-message...") + self.log.debug("append to file based on multipart-message...") #TODO: save message to file using a thread (avoids blocking) #TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened self.appendChunksToFileFromMultipartMessage(payloadMetadataDict, multipartMessage) - logging.debug("append to file based on multipart-message...success.") + self.log.debug("append to file based on multipart-message...success.") except Exception, e: errorMessage = "Unable to append multipart-content to file." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.debug("append to file based on multipart-message...failed.") + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) + self.log.debug("append to file based on multipart-message...failed.") except: errorMessage = "Unable to append multipart-content to file. Unknown Error." - logging.error(errorMessage) - logging.debug("append to file based on multipart-message...failed.") + self.log.error(errorMessage) + self.log.debug("append to file based on multipart-message...failed.") if len(multipartMessage[1]) < payloadMetadataDict["chunkSize"] : #indicated end of file. closing file and leave loop - logging.debug("last file-chunk received. stop appending.") + self.log.debug("last file-chunk received. stop appending.") break filename = self.generateTargetFilepath(payloadMetadataDict) fileModTime = payloadMetadataDict["fileModificationTime"] - logging.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename)) - - # logging.debug("message-type : " + str(type(multipartMessage))) - # logging.debug("message-length: " + str(len(multipartMessage))) + self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename)) # add to ring buffer - logging.debug("add file to ring buffer: "+ str(filename) + ", " + str(fileModTime)) + self.log.debug("add file to ring buffer: "+ str(filename) + ", " + str(fileModTime)) self.addFileToRingBuffer(str(filename), fileModTime) @@ -163,26 +169,26 @@ class FileReceiver: continueStreaming = True loopCounter = 0 #counter of total received messages continueReceiving = True #receiving will stop if value gets False - logging.debug("Waiting for new messages...") + self.log.debug("Waiting for new messages...") while continueReceiving: try: self.combineMessage(self.zmqSocket) loopCounter+=1 except KeyboardInterrupt: - logging.debug("Keyboard interrupt detected. Stop receiving.") + self.log.debug("Keyboard interrupt detected. Stop receiving.") break except: - logging.error("receive message...failed.") - logging.error(sys.exc_info()) + self.log.error("receive message...failed.") + self.log.error(sys.exc_info()) continueReceiving = False - logging.info("shutting down receiver...") + self.log.info("shutting down receiver...") try: self.stopReceiving(self.zmqSocket, self.zmqContext) - logging.debug("shutting down receiver...done.") + self.log.debug("shutting down receiver...done.") except: - logging.error(sys.exc_info()) - logging.error("shutting down receiver...failed.") + self.log.error(sys.exc_info()) + self.log.error("shutting down receiver...failed.") def generateTargetFilepath(self,configDict): @@ -226,13 +232,13 @@ class FileReceiver: chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata payload = multipartMessage[1:] except: - logging.warning("an empty file was received within the multipart-message") + self.log.warning("an empty file was received within the multipart-message") payload = None #generate target filepath targetFilepath = self.generateTargetFilepath(configDict) - logging.debug("new file is going to be created at: " + targetFilepath) + self.log.debug("new file is going to be created at: " + targetFilepath) #append payload to file @@ -246,18 +252,18 @@ class FileReceiver: targetPath = self.generateTargetPath(configDict) os.makedirs(targetPath) newFile = open(targetFilepath, "w") - logging.info("New target directory created: " + str(targetPath)) + self.log.info("New target directory created: " + str(targetPath)) except Exception, f: errorMessage = "unable to save payload to file: '" + targetFilepath + "'" - logging.error(errorMessage) - logging.debug("Error was: " + str(f)) - logging.debug("targetPath="+str(targetPath)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(f)) + self.log.debug("targetPath="+str(targetPath)) raise Exception(errorMessage) except Exception, e: - logging.error("failed to append payload to file: '" + targetFilepath + "'") - logging.debug("Error was: " + str(e)) - logging.debug("ErrorTyp: " + str(type(e))) - logging.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST)) + self.log.error("failed to append payload to file: '" + targetFilepath + "'") + self.log.debug("Error was: " + str(e)) + self.log.debug("ErrorTyp: " + str(type(e))) + self.log.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST)) #only write data if a payload exist try: if payload != None: @@ -266,47 +272,49 @@ class FileReceiver: newFile.close() except Exception, e: errorMessage = "unable to append data to file." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) raise Exception(errorMessage) def stopReceiving(self, zmqSocket, zmqContext): - logging.debug("stopReceiving...") + self.log.debug("stopReceiving...") try: zmqSocket.close() - logging.debug("closing zmqSocket...done.") + self.log.debug("closing zmqSocket...done.") except: - logging.error("closing zmqSocket...failed.") - logging.error(sys.exc_info()) + self.log.error("closing zmqSocket...failed.") + self.log.error(sys.exc_info()) - logging.debug("sending exit signal to thread...") + self.log.debug("sending exit signal to thread...") self.exchangeSocket.send("Exit") time.sleep(0.1) self.exchangeSocket.close() - logging.debug("sending exit signal to thread...done") + self.log.debug("sending exit signal to thread...done") try: zmqContext.destroy() - logging.debug("closing zmqContext...done.") + self.log.debug("closing zmqContext...done.") except: - logging.error("closing zmqContext...failed.") - logging.error(sys.exc_info()) + self.log.error("closing zmqContext...failed.") + self.log.error(sys.exc_info()) -class liveViewer(): - zmqContext = None - liveViewerIp = None - liveViewerPort = None - exchangeIp = "127.0.0.1" - exchangePort = "6072" +class LiveViewer(): + zmqContext = None + liveViewerIp = None + liveViewerPort = None + exchangeIp = "127.0.0.1" + exchangePort = "6072" + + log = None # sockets - liveViewerSocket = None - exchangeSocket = None + liveViewerSocket = None + exchangeSocket = None - poller = None + poller = None def __init__(self, liveViewerIp = "127.0.0.1", liveViewerPort = "6071", context = None): @@ -318,17 +326,20 @@ class liveViewer(): self.zmqContext = context or zmq.Context() + self.log = self.getLogger() + self.log.debug("Init") + # create socket for live viewer self.liveViewerSocket = self.zmqContext.socket(zmq.REP) connectionStrLiveViewerSocket = "tcp://" + self.liveViewerIp + ":%s" % self.liveViewerPort self.liveViewerSocket.bind(connectionStrLiveViewerSocket) - logging.debug("zmqLiveViewerSocket started for '" + connectionStrLiveViewerSocket + "'") + self.log.debug("zmqLiveViewerSocket started for '" + connectionStrLiveViewerSocket + "'") # create socket for message exchange self.exchangeSocket = self.zmqContext.socket(zmq.PAIR) connectionStrExchangeSocket = "tcp://" + self.exchangeIp + ":%s" % self.exchangePort self.exchangeSocket.connect(connectionStrExchangeSocket) - logging.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'") + self.log.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'") self.poller = zmq.Poller() self.poller.register(self.liveViewerSocket, zmq.POLLIN) @@ -337,6 +348,11 @@ class liveViewer(): self.sendFileToLiveViewer() + def getLogger(self): + logger = logging.getLogger("liveViewer") + return logger + + def sendFileToLiveViewer(self): should_continue = True @@ -344,15 +360,15 @@ class liveViewer(): socks = dict(self.poller.poll()) if self.exchangeSocket in socks and socks[self.exchangeSocket] == zmq.POLLIN: message = self.exchangeSocket.recv() - logging.debug("Recieved control command: %s" % message ) + self.log.debug("Recieved control command: %s" % message ) if message == "Exit": - logging.debug("Recieved exit command, liveViewer thread will stop recieving messages") + self.log.debug("Recieved exit command, liveViewer thread will stop recieving messages") should_continue = False break if self.liveViewerSocket in socks and socks[self.liveViewerSocket] == zmq.POLLIN: message = self.liveViewerSocket.recv() - logging.debug("Call for next file... ") + self.log.debug("Call for next file... ") # send first element in ring buffer to live viewer (the path of this file is the second entry) if self.ringBuffer: answer = self.ringBuffer[0][1] @@ -365,7 +381,7 @@ class liveViewer(): except zmq.error.ContextTerminated: break - logging.debug("LiveViewerThread: closing socket") + self.log.debug("LiveViewerThread: closing socket") self.liveViewerSocket.close() self.exchangeSocket.close() @@ -412,6 +428,7 @@ def initLogging(filenameFullPath, verbose): console.setLevel(logging.WARNING) formatter = logging.Formatter("%(asctime)s > %(message)s") console.setFormatter(formatter) + logging.getLogger("").addHandler(console) -- GitLab