Skip to content
Snippets Groups Projects
Commit 23bc4586 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added different logger for each class in receiver

parent d7e2d543
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,6 @@ from stat import S_ISREG, ST_MTIME, ST_MODE
import threading
class FileReceiver:
zmqContext = None
liveViewerZmqContext = None
......@@ -30,6 +29,8 @@ class FileReceiver:
maxRingBufferSize = 200
timeToWaitForRingBuffer = 2
log = None
# sockets
zmqSocket = None
exchangeSocket = None
......@@ -47,20 +48,23 @@ class FileReceiver:
self.zmqContext = context or zmq.Context()
self.log = self.getLogger()
self.log.debug("Init")
# create pull socket
self.zmqSocket = self.zmqContext.socket(zmq.PULL)
connectionStrZmqSocket = "tcp://" + self.zmqDataStreamIp + ":%s" % self.zmqDataStreamPort
self.zmqSocket.bind(connectionStrZmqSocket)
logging.debug("zmqSocket started for '" + connectionStrZmqSocket + "'")
self.log.debug("zmqSocket started for '" + connectionStrZmqSocket + "'")
self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrExchangeSocket = "tcp://" + self.zmqLiveViewerExchangeIp + ":%s" % self.zmqLiveViewerExchangePort
self.exchangeSocket.bind(connectionStrExchangeSocket)
logging.debug("exchangeSocket started (bind) for '" + connectionStrExchangeSocket + "'")
self.log.debug("exchangeSocket started (bind) for '" + connectionStrExchangeSocket + "'")
# thread to communicate with live viewer
self.liveViewerThread = threading.Thread(target=liveViewer)
self.liveViewerThread = threading.Thread(target=LiveViewer)
self.liveViewerThread.start()
# initialize ring buffer
......@@ -78,19 +82,24 @@ class FileReceiver:
ringBuffer = sorted(ringBuffer, reverse=True)
try:
logging.info("Start receiving new files")
self.log.info("Start receiving new files")
self.startReceiving()
logging.info("Stopped receiving.")
self.log.info("Stopped receiving.")
except Exception, e:
logging.error("Unknown error while receiving files. Need to abort.")
logging.debug("Error was: " + str(e))
self.log.error("Unknown error while receiving files. Need to abort.")
self.log.debug("Error was: " + str(e))
except:
trace = traceback.format_exc()
logging.info("Unkown error state. Shutting down...")
logging.debug("Error was: " + str(trace))
self.log.info("Unkown error state. Shutting down...")
self.log.debug("Error was: " + str(trace))
self.zmqContext.destroy()
logging.info("Quitting.")
self.log.info("Quitting.")
def getLogger(self):
logger = logging.getLogger("fileReceiver")
return logger
def addFileToRingBuffer(self, filename, fileModTime):
......@@ -117,43 +126,40 @@ class FileReceiver:
#TODO is string conversion needed here?
payloadMetadata = str(multipartMessage[0])
except:
logging.error("an empty config was transferred for multipartMessage")
self.log.error("an empty config was transferred for multipartMessage")
#TODO validate multipartMessage (like correct dict-values for metadata)
logging.debug("multipartMessage.metadata = " + str(payloadMetadata))
self.log.debug("multipartMessage.metadata = " + str(payloadMetadata))
#extraction metadata from multipart-message
payloadMetadataDict = json.loads(payloadMetadata)
#append to file
try:
logging.debug("append to file based on multipart-message...")
self.log.debug("append to file based on multipart-message...")
#TODO: save message to file using a thread (avoids blocking)
#TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
self.appendChunksToFileFromMultipartMessage(payloadMetadataDict, multipartMessage)
logging.debug("append to file based on multipart-message...success.")
self.log.debug("append to file based on multipart-message...success.")
except Exception, e:
errorMessage = "Unable to append multipart-content to file."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.debug("append to file based on multipart-message...failed.")
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
self.log.debug("append to file based on multipart-message...failed.")
except:
errorMessage = "Unable to append multipart-content to file. Unknown Error."
logging.error(errorMessage)
logging.debug("append to file based on multipart-message...failed.")
self.log.error(errorMessage)
self.log.debug("append to file based on multipart-message...failed.")
if len(multipartMessage[1]) < payloadMetadataDict["chunkSize"] :
#indicated end of file. closing file and leave loop
logging.debug("last file-chunk received. stop appending.")
self.log.debug("last file-chunk received. stop appending.")
break
filename = self.generateTargetFilepath(payloadMetadataDict)
fileModTime = payloadMetadataDict["fileModificationTime"]
logging.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
# logging.debug("message-type : " + str(type(multipartMessage)))
# logging.debug("message-length: " + str(len(multipartMessage)))
self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
# add to ring buffer
logging.debug("add file to ring buffer: "+ str(filename) + ", " + str(fileModTime))
self.log.debug("add file to ring buffer: "+ str(filename) + ", " + str(fileModTime))
self.addFileToRingBuffer(str(filename), fileModTime)
......@@ -163,26 +169,26 @@ class FileReceiver:
continueStreaming = True
loopCounter = 0 #counter of total received messages
continueReceiving = True #receiving will stop if value gets False
logging.debug("Waiting for new messages...")
self.log.debug("Waiting for new messages...")
while continueReceiving:
try:
self.combineMessage(self.zmqSocket)
loopCounter+=1
except KeyboardInterrupt:
logging.debug("Keyboard interrupt detected. Stop receiving.")
self.log.debug("Keyboard interrupt detected. Stop receiving.")
break
except:
logging.error("receive message...failed.")
logging.error(sys.exc_info())
self.log.error("receive message...failed.")
self.log.error(sys.exc_info())
continueReceiving = False
logging.info("shutting down receiver...")
self.log.info("shutting down receiver...")
try:
self.stopReceiving(self.zmqSocket, self.zmqContext)
logging.debug("shutting down receiver...done.")
self.log.debug("shutting down receiver...done.")
except:
logging.error(sys.exc_info())
logging.error("shutting down receiver...failed.")
self.log.error(sys.exc_info())
self.log.error("shutting down receiver...failed.")
def generateTargetFilepath(self,configDict):
......@@ -226,13 +232,13 @@ class FileReceiver:
chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata
payload = multipartMessage[1:]
except:
logging.warning("an empty file was received within the multipart-message")
self.log.warning("an empty file was received within the multipart-message")
payload = None
#generate target filepath
targetFilepath = self.generateTargetFilepath(configDict)
logging.debug("new file is going to be created at: " + targetFilepath)
self.log.debug("new file is going to be created at: " + targetFilepath)
#append payload to file
......@@ -246,18 +252,18 @@ class FileReceiver:
targetPath = self.generateTargetPath(configDict)
os.makedirs(targetPath)
newFile = open(targetFilepath, "w")
logging.info("New target directory created: " + str(targetPath))
self.log.info("New target directory created: " + str(targetPath))
except Exception, f:
errorMessage = "unable to save payload to file: '" + targetFilepath + "'"
logging.error(errorMessage)
logging.debug("Error was: " + str(f))
logging.debug("targetPath="+str(targetPath))
self.log.error(errorMessage)
self.log.debug("Error was: " + str(f))
self.log.debug("targetPath="+str(targetPath))
raise Exception(errorMessage)
except Exception, e:
logging.error("failed to append payload to file: '" + targetFilepath + "'")
logging.debug("Error was: " + str(e))
logging.debug("ErrorTyp: " + str(type(e)))
logging.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST))
self.log.error("failed to append payload to file: '" + targetFilepath + "'")
self.log.debug("Error was: " + str(e))
self.log.debug("ErrorTyp: " + str(type(e)))
self.log.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST))
#only write data if a payload exist
try:
if payload != None:
......@@ -266,47 +272,49 @@ class FileReceiver:
newFile.close()
except Exception, e:
errorMessage = "unable to append data to file."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
raise Exception(errorMessage)
def stopReceiving(self, zmqSocket, zmqContext):
logging.debug("stopReceiving...")
self.log.debug("stopReceiving...")
try:
zmqSocket.close()
logging.debug("closing zmqSocket...done.")
self.log.debug("closing zmqSocket...done.")
except:
logging.error("closing zmqSocket...failed.")
logging.error(sys.exc_info())
self.log.error("closing zmqSocket...failed.")
self.log.error(sys.exc_info())
logging.debug("sending exit signal to thread...")
self.log.debug("sending exit signal to thread...")
self.exchangeSocket.send("Exit")
time.sleep(0.1)
self.exchangeSocket.close()
logging.debug("sending exit signal to thread...done")
self.log.debug("sending exit signal to thread...done")
try:
zmqContext.destroy()
logging.debug("closing zmqContext...done.")
self.log.debug("closing zmqContext...done.")
except:
logging.error("closing zmqContext...failed.")
logging.error(sys.exc_info())
self.log.error("closing zmqContext...failed.")
self.log.error(sys.exc_info())
class liveViewer():
zmqContext = None
liveViewerIp = None
liveViewerPort = None
exchangeIp = "127.0.0.1"
exchangePort = "6072"
class LiveViewer():
zmqContext = None
liveViewerIp = None
liveViewerPort = None
exchangeIp = "127.0.0.1"
exchangePort = "6072"
log = None
# sockets
liveViewerSocket = None
exchangeSocket = None
liveViewerSocket = None
exchangeSocket = None
poller = None
poller = None
def __init__(self, liveViewerIp = "127.0.0.1", liveViewerPort = "6071", context = None):
......@@ -318,17 +326,20 @@ class liveViewer():
self.zmqContext = context or zmq.Context()
self.log = self.getLogger()
self.log.debug("Init")
# create socket for live viewer
self.liveViewerSocket = self.zmqContext.socket(zmq.REP)
connectionStrLiveViewerSocket = "tcp://" + self.liveViewerIp + ":%s" % self.liveViewerPort
self.liveViewerSocket.bind(connectionStrLiveViewerSocket)
logging.debug("zmqLiveViewerSocket started for '" + connectionStrLiveViewerSocket + "'")
self.log.debug("zmqLiveViewerSocket started for '" + connectionStrLiveViewerSocket + "'")
# create socket for message exchange
self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrExchangeSocket = "tcp://" + self.exchangeIp + ":%s" % self.exchangePort
self.exchangeSocket.connect(connectionStrExchangeSocket)
logging.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'")
self.log.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'")
self.poller = zmq.Poller()
self.poller.register(self.liveViewerSocket, zmq.POLLIN)
......@@ -337,6 +348,11 @@ class liveViewer():
self.sendFileToLiveViewer()
def getLogger(self):
logger = logging.getLogger("liveViewer")
return logger
def sendFileToLiveViewer(self):
should_continue = True
......@@ -344,15 +360,15 @@ class liveViewer():
socks = dict(self.poller.poll())
if self.exchangeSocket in socks and socks[self.exchangeSocket] == zmq.POLLIN:
message = self.exchangeSocket.recv()
logging.debug("Recieved control command: %s" % message )
self.log.debug("Recieved control command: %s" % message )
if message == "Exit":
logging.debug("Recieved exit command, liveViewer thread will stop recieving messages")
self.log.debug("Recieved exit command, liveViewer thread will stop recieving messages")
should_continue = False
break
if self.liveViewerSocket in socks and socks[self.liveViewerSocket] == zmq.POLLIN:
message = self.liveViewerSocket.recv()
logging.debug("Call for next file... ")
self.log.debug("Call for next file... ")
# send first element in ring buffer to live viewer (the path of this file is the second entry)
if self.ringBuffer:
answer = self.ringBuffer[0][1]
......@@ -365,7 +381,7 @@ class liveViewer():
except zmq.error.ContextTerminated:
break
logging.debug("LiveViewerThread: closing socket")
self.log.debug("LiveViewerThread: closing socket")
self.liveViewerSocket.close()
self.exchangeSocket.close()
......@@ -412,6 +428,7 @@ def initLogging(filenameFullPath, verbose):
console.setLevel(logging.WARNING)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment