From 9ba0529ef9d35fa12a10dea3a5110e0c3f8d31eb Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Tue, 21 Jul 2015 16:48:40 +0200 Subject: [PATCH] Added communication with live viewer infrastructure to receiver --- ZeroMQTunnel/receiver.py | 160 +++++++++++++++++++++------------------ 1 file changed, 87 insertions(+), 73 deletions(-) diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index fe7802a0..5a89793e 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -12,6 +12,7 @@ import errno import os import traceback from stat import S_ISREG, ST_MTIME, ST_MODE +import threading @@ -25,13 +26,13 @@ class FileReceiver: maxRingBufferSize = 100 timeToWaitForRingBuffer = 2 ringBuffer = [] - alublaZmqContext = None + liveViewerZmqContext = None - def __init__(self, outputDir, bindingPortForDataStream, zmqFileserverIp, bindingPortForLiveViewer, zmqLiveViewerIP): + def __init__(self, outputDir, bindingPortForDataStream, zmqFileserverIp, bindingPortForLiveViewer, zmqLiveViewerIp): self.outputDir = outputDir self.bindingPortForDataStream = bindingPortForDataStream self.zmqFileserverIp = zmqFileserverIp - self.zqmLiveViewerIp = zqmLiveViewerIp + self.zmqLiveViewerIp = zmqLiveViewerIp self.bindingPortForLiveViewer = bindingPortForLiveViewer # initialize ring buffer @@ -53,9 +54,28 @@ class FileReceiver: global globalZmqContext self.globalZmqContext = zmq.Context() + # thread to communicate with live viewer + + #create socket for live viewer + try: + logging.info("creating socket for communication with live viewer...") + zmqContext = self.getZmqContext() + zmqLiveViewerSocket = self.createSocketForLiveViewer(zmqContext) + logging.info("creating socket for communication with live viewer...done.") + except Exception, e: + errorMessage = "Unable to create zeromq context." + logging.error(errorMessage) + logging.debug("Error was: " + str(e)) + logging.info("creating socket for communication with live viewer...failed.") + raise Exception(e) + + t1 = threading.Thread(target=self.sendFileToLiveViewer, args=(zmqLiveViewerSocket,)) + t1.start() + + try: logging.info("Start receiving new files") - self.startReceiving() + self.startReceiving(zmqLiveViewerSocket) logging.info("Stopped receiving.") except Exception, e: logging.error("Unknown error while receiving files. Need to abort.") @@ -94,6 +114,14 @@ class FileReceiver: return socket + def getZmqSocket_Rep(self, context): + pattern_pull = zmq.REP + assert isinstance(context, zmq.sugar.context.Context) + socket = context.socket(pattern_pull) + + return socket + + def createPullSocket(self, context): assert isinstance(context, zmq.sugar.context.Context) socket = self.getZmqSocket_Pull(context) @@ -104,9 +132,9 @@ class FileReceiver: return socket - def createPushSocket(self, context): + def createSocketForLiveViewer(self, context): assert isinstance(context, zmq.sugar.context.Context) - socket = self.getZmqSocket_Pull(context) + socket = self.getZmqSocket_Rep(context) logging.info("binding to data socket: tcp://" + self.zmqLiveViewerIp + ":%s" % self.bindingPortForLiveViewer) socket.bind('tcp://' + self.zmqLiveViewerIp + ':%s' % self.bindingPortForLiveViewer) @@ -122,18 +150,61 @@ class FileReceiver: # if the maximal size is exceeded: remove the oldest files if len(self.ringBuffer) > self.maxRingBufferSize: for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]: - if int(time.time()) - mod_time > self.timeToWaitForRingBuffer: + if float(time.time()) - mod_time > self.timeToWaitForRingBuffer: os.remove(path) self.ringBuffer.remove([mod_time, path]) # Albula is the live viewer used at the beamlines - def sendFileToAlbula(self, zmqLiveViewerSocket): + def sendFileToLiveViewer(self, zmqLiveViewerSocket): #send first element in ring buffer to albula pass - def startReceiving(self): + def combineMessage(self, zmqSocket): + # multipartMessage = zmqSocket.recv_multipart() + # logging.info("New message received.") + # logging.debug("message-type : " + str(type(multipartMessage))) + # logging.debug("message-length: " + str(len(multipartMessage))) + # loopCounter+=1 + #save all chunks to file + while True: + multipartMessage = zmqSocket.recv_multipart() + #append to file + try: + logging.debug("append to file based on multipart-message...") + #TODO: save message to file using a thread (avoids blocking) + #TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened + self.appendChunksToFileFromMultipartMessage(multipartMessage) + logging.debug("append to file based on multipart-message...success.") + except Exception, e: + errorMessage = "Unable to append multipart-content to file." + logging.error(errorMessage) + logging.debug("Error was: " + str(e)) + logging.debug("append to file based on multipart-message...failed.") + except: + errorMessage = "Unable to append multipart-content to file. Unknown Error." + logging.error(errorMessage) + logging.debug("append to file based on multipart-message...failed.") + if len(multipartMessage[1]) == 0: + #indicated end of file. closing file and leave loop + logging.debug("last file-chunk received. stop appending.") + break + payloadMetadata = str(multipartMessage[0]) + payloadMetadataDict = json.loads(payloadMetadata) + filename = self.generateTargetFilepath(payloadMetadataDict) + fileModTime = payloadMetadataDict["fileModificationTime"] + logging.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename)) + + # logging.debug("message-type : " + str(type(multipartMessage))) + # logging.debug("message-length: " + str(len(multipartMessage))) + + # add to ring buffer + self.addFileToRingBuffer(str(filename), fileModTime) + + + + def startReceiving(self, zmqLiveViewerSocket): #create pull socket try: logging.info("creating local pullSocket for incoming files...") @@ -147,19 +218,6 @@ class FileReceiver: logging.info("creating local pullSocket for incoming files...failed.") raise Exception(e) - #create pull socket - try: - logging.info("creating local pushSocket for outgoing files...") - zmqContext = self.getZmqContext() - zmqLiveViewerSocket = self.createPushSocket(zmqContext) - logging.info("creating local pushSocket for outgoing files...done.") - except Exception, e: - errorMessage = "Unable to create zeromq context." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.info("creating local pushSocket for outgoing files...failed.") - raise Exception(e) - #run loop, and wait for incoming messages continueStreaming = True loopCounter = 0 #counter of total received messages @@ -167,42 +225,7 @@ class FileReceiver: logging.debug("Waiting for new messages...") while continueReceiving: try: - # multipartMessage = zmqSocket.recv_multipart() - # logging.info("New message received.") - # logging.debug("message-type : " + str(type(multipartMessage))) - # logging.debug("message-length: " + str(len(multipartMessage))) - # loopCounter+=1 - #save all chunks to file - while True: - multipartMessage = zmqSocket.recv_multipart() - #append to file - try: - logging.debug("append to file based on multipart-message...") - #TODO: save message to file using a thread (avoids blocking) - #TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened - self.appendChunksToFileFromMultipartMessage(multipartMessage, zmqLiveViewerSocket) - logging.debug("append to file based on multipart-message...success.") - except Exception, e: - errorMessage = "Unable to append multipart-content to file." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.debug("append to file based on multipart-message...failed.") - except: - errorMessage = "Unable to append multipart-content to file. Unknown Error." - logging.error(errorMessage) - logging.debug("append to file based on multipart-message...failed.") - if len(multipartMessage[1]) == 0: - #indicated end of file. closing file and leave loop - logging.debug("last file-chunk received. stop appending.") - break - payloadMetadata = str(multipartMessage[0]) - payloadMetadataDict = json.loads(payloadMetadata) - filename = self.generateTargetFilepath(payloadMetadataDict) - fileModTime = payloadMetadataDict - logging.info("New file with modification time " + fileModTime + " received and saved: " + str(filename)) - - # logging.debug("message-type : " + str(type(multipartMessage))) - # logging.debug("message-length: " + str(len(multipartMessage))) + self.combineMessage(zmqSocket) loopCounter+=1 except KeyboardInterrupt: logging.debug("Keyboard interrupt detected. Stop receiving.") @@ -212,19 +235,10 @@ class FileReceiver: logging.error(sys.exc_info()) continueReceiving = False - - # add to ring buffer - self.addFileToRingBuffer(filename, fileModTime) - - - # send to albula - self.sendNewestFileToAlbula(zmqLiveViewerSocket) - - logging.info("shutting down receiver...") try: logging.debug("shutting down zeromq...") - self.stopReceiving(zmqSocket, zmpLiveViewer, zmqContext) + self.stopReceiving(zmqSocket, zmqLiveViewerSocket, zmqContext) logging.debug("shutting down zeromq...done.") except: logging.error(sys.exc_info()) @@ -266,7 +280,7 @@ class FileReceiver: return targetPath - def appendChunksToFileFromMultipartMessage(self, multipartMessage, zmqLiveViewerSocket): + def appendChunksToFileFromMultipartMessage(self, multipartMessage): #extract multipart message try: @@ -329,7 +343,7 @@ class FileReceiver: - def stopReceiving(self, zmqSocket, zmqLiveViwerSocket, msgContext): + def stopReceiving(self, zmqSocket, zmqLiveViewerSocket, msgContext): try: logging.debug("closing zmqSocket...") zmqSocket.close() @@ -387,7 +401,7 @@ def initLogging(filenameFullPath, verbose): #log everything to file logging.basicConfig(level=loggingLevel, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s', + format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', datefmt='%Y-%m-%d_%H:%M:%S', filename=filenameFullPath, filemode="a") @@ -410,7 +424,7 @@ if __name__ == "__main__": zqmDataStreamPort = str(arguments.tcpPortDataStream) zmqLiveViewerPort = str(arguments.tcpPortLiveViewer) zqmDataStreamIp = str(arguments.bindingIpForDataStream) - zmqLiveViewerIp = str(arguments.binginIpForLiveViewer) + zmqLiveViewerIp = str(arguments.bindingIpForLiveViewer) logFile = arguments.logfile logfileFilePath = arguments.logfile @@ -420,4 +434,4 @@ if __name__ == "__main__": #start file receiver - myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp, bindingPortForLiveViewer, zmqLiveViewerIp): + myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp) -- GitLab