From f6379e886227e211cc0ca2091f016e047d747341 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Fri, 17 Jul 2015 14:30:46 +0200 Subject: [PATCH] Use original modification time in ring buffer --- ZeroMQTunnel/fileMover.py | 27 +++++--- ZeroMQTunnel/receiver.py | 128 +++++++++++++++++++++++++------------- 2 files changed, 102 insertions(+), 53 deletions(-) diff --git a/ZeroMQTunnel/fileMover.py b/ZeroMQTunnel/fileMover.py index b6b84531..eda4cec7 100644 --- a/ZeroMQTunnel/fileMover.py +++ b/ZeroMQTunnel/fileMover.py @@ -158,10 +158,12 @@ class WorkerProcess(): waitTime = 2000.0 return waitTime + def getFileMaxWaitTimeInMs(self): maxWaitTime = 10000.0 return maxWaitTime + def passFileToDataStream(self, zmqDataStreamSocket, filename, sourcePath, relativeParent): """filesizeRequested == filesize submitted by file-event. In theory it can differ to real file size""" @@ -202,9 +204,11 @@ class WorkerProcess(): #for quick testing set filesize of file as chunksize logging.debug("get filesize for '" + str(sourceFilePathFull) + "'...") - filesize = os.path.getsize(sourceFilePathFull) - chunksize = filesize #can be used later on to split multipart message + filesize = os.path.getsize(sourceFilePathFull) + fileModificationTime = os.stat(sourceFilePathFull) + chunksize = filesize #can be used later on to split multipart message logging.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize))) + logging.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime))) except Exception, e: errorMessage = "Unable to get file metadata for '" + str(sourceFilePathFull) + "'." @@ -225,7 +229,7 @@ class WorkerProcess(): #build payload for message-pipe by putting source-file into a message try: - payloadMetadata = self.buildPayloadMetadata(filename, filesize, sourcePath, relativeParent) + payloadMetadata = self.buildPayloadMetadata(filename, filesize, fileModificationTime, sourcePath, relativeParent) except Exception, e: errorMessage = "Unable to assemble multi-part message." logging.error(errorMessage) @@ -291,11 +295,12 @@ class WorkerProcess(): - def buildPayloadMetadata(self, filename, filesize, sourcePath, relativeParent): + def buildPayloadMetadata(self, filename, filesize, fileModificationTime, sourcePath, relativeParent): """ builds metadata for zmq-multipart-message. should be used as first element for payload. :param filename: :param filesize: + :param fileModificationTime: :param sourcePath: :param relativeParent: :return: @@ -304,11 +309,12 @@ class WorkerProcess(): #add metadata to multipart logging.debug("create metadata for source file...") metadataDict = { - "filename" : filename, - "filesize" : filesize, - "sourcePath" : sourcePath, - "relativeParent" : relativeParent, - "chunkSize" : self.getChunkSize()} + "filename" : filename, + "filesize" : filesize, + "fileModificationTime" : fileModificationTime, + "sourcePath" : sourcePath, + "relativeParent" : relativeParent, + "chunkSize" : self.getChunkSize()} logging.debug("metadataDict = " + str(metadataDict)) @@ -423,9 +429,11 @@ class FileMover(): def getFileWaitTimeInMs(self): return self.fileWaitTimeInMs + def getFileMaxWaitTimeInMs(self): return self.fileMaxWaitTimeInMs + def startReceiving(self): #create socket zmqContext = self.zmqContext @@ -532,6 +540,7 @@ class FileMover(): def processFileEvent(self, fileEventMessage, routerSocket): self.routeFileEventToWorkerThread(fileEventMessage, routerSocket) + def stopReceiving(self, zmqSocket, msgContext): try: logging.debug("closing zmqSocket...") diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index 869778fd..fe7802a0 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -21,18 +21,22 @@ class FileReceiver: bindingPortForDataStream = None zqmDataStreamIp = None bindingIpForDataStream = None - zqmFileserverIp = None + zmqFileserverIp = None maxRingBufferSize = 100 timeToWaitForRingBuffer = 2 ringBuffer = [] + alublaZmqContext = None - def __init__(self, outputDir, bindingPortForDataStream, zqmFileserverIp): + def __init__(self, outputDir, bindingPortForDataStream, zmqFileserverIp, bindingPortForLiveViewer, zmqLiveViewerIP): self.outputDir = outputDir self.bindingPortForDataStream = bindingPortForDataStream - self.zqmFileserverIp = zqmFileserverIp + self.zmqFileserverIp = zmqFileserverIp + self.zqmLiveViewerIp = zqmLiveViewerIp + self.bindingPortForLiveViewer = bindingPortForLiveViewer # initialize ring buffer # get all entries in the directory + # TODO empty target dir -> ringBuffer = [] ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir)) # get the corresponding stats ringBuffer = ((os.stat(path), path) for path in ringBuffer) @@ -94,12 +98,41 @@ class FileReceiver: assert isinstance(context, zmq.sugar.context.Context) socket = self.getZmqSocket_Pull(context) - logging.info("binding to data socket: tcp://" + self.zqmFileserverIp + ":%s" % self.bindingPortForDataStream) - socket.bind('tcp://' + self.zqmFileserverIp + ':%s' % self.bindingPortForDataStream) + logging.info("binding to data socket: tcp://" + self.zmqFileserverIp + ":%s" % self.bindingPortForDataStream) + socket.bind('tcp://' + self.zmqFileserverIp + ':%s' % self.bindingPortForDataStream) return socket + def createPushSocket(self, context): + assert isinstance(context, zmq.sugar.context.Context) + socket = self.getZmqSocket_Pull(context) + + logging.info("binding to data socket: tcp://" + self.zmqLiveViewerIp + ":%s" % self.bindingPortForLiveViewer) + socket.bind('tcp://' + self.zmqLiveViewerIp + ':%s' % self.bindingPortForLiveViewer) + + return socket + + + def addFileToRingBuffer(self, filename, fileModTime): + # prepend file to ring buffer and restore order + self.ringBuffer[:0] = [[fileModTime, filename]] + self.ringBuffer = sorted(self.ringBuffer, reverse=True) + + # if the maximal size is exceeded: remove the oldest files + if len(self.ringBuffer) > self.maxRingBufferSize: + for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]: + if int(time.time()) - mod_time > self.timeToWaitForRingBuffer: + os.remove(path) + self.ringBuffer.remove([mod_time, path]) + + + # Albula is the live viewer used at the beamlines + def sendFileToAlbula(self, zmqLiveViewerSocket): + #send first element in ring buffer to albula + pass + + def startReceiving(self): #create pull socket try: @@ -114,6 +147,18 @@ class FileReceiver: logging.info("creating local pullSocket for incoming files...failed.") raise Exception(e) + #create pull socket + try: + logging.info("creating local pushSocket for outgoing files...") + zmqContext = self.getZmqContext() + zmqLiveViewerSocket = self.createPushSocket(zmqContext) + logging.info("creating local pushSocket for outgoing files...done.") + except Exception, e: + errorMessage = "Unable to create zeromq context." + logging.error(errorMessage) + logging.debug("Error was: " + str(e)) + logging.info("creating local pushSocket for outgoing files...failed.") + raise Exception(e) #run loop, and wait for incoming messages continueStreaming = True @@ -135,7 +180,7 @@ class FileReceiver: logging.debug("append to file based on multipart-message...") #TODO: save message to file using a thread (avoids blocking) #TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened - self.appendChunksToFileFromMultipartMessage(multipartMessage) + self.appendChunksToFileFromMultipartMessage(multipartMessage, zmqLiveViewerSocket) logging.debug("append to file based on multipart-message...success.") except Exception, e: errorMessage = "Unable to append multipart-content to file." @@ -150,10 +195,11 @@ class FileReceiver: #indicated end of file. closing file and leave loop logging.debug("last file-chunk received. stop appending.") break - payloadMetadata = str(multipartMessage[0]) + payloadMetadata = str(multipartMessage[0]) payloadMetadataDict = json.loads(payloadMetadata) - filename = self.generateTargetFilepath(payloadMetadataDict) - logging.info("New file received and saved: " + str(filename)) + filename = self.generateTargetFilepath(payloadMetadataDict) + fileModTime = payloadMetadataDict + logging.info("New file with modification time " + fileModTime + " received and saved: " + str(filename)) # logging.debug("message-type : " + str(type(multipartMessage))) # logging.debug("message-length: " + str(len(multipartMessage))) @@ -167,10 +213,18 @@ class FileReceiver: continueReceiving = False + # add to ring buffer + self.addFileToRingBuffer(filename, fileModTime) + + + # send to albula + self.sendNewestFileToAlbula(zmqLiveViewerSocket) + + logging.info("shutting down receiver...") try: logging.debug("shutting down zeromq...") - self.stopReceiving(zmqSocket, zmqContext) + self.stopReceiving(zmqSocket, zmpLiveViewer, zmqContext) logging.debug("shutting down zeromq...done.") except: logging.error(sys.exc_info()) @@ -212,25 +266,7 @@ class FileReceiver: return targetPath - def addToRingBuffer(self, targetFilepath): - # prepend file to ring buffer (buffer is sorted) - self.ringBuffer[:0] = [[os.stat(targetFilepath)[ST_MTIME], targetFilepath]] - - # if the maximal size is exceeded: remove the oldest files - if len(self.ringBuffer) > self.maxRingBufferSize: - for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]: - if int(time.time()) - mod_time > self.timeToWaitForRingBuffer: - os.remove(path) - self.ringBuffer.remove([mod_time, path]) - - # Albula is the live viewer used at the beamlines - def sendFileToAlbula(self, targetFilepath): -# subframe.loadFile(targetFilepath) - - pass - - - def appendChunksToFileFromMultipartMessage(self, multipartMessage): + def appendChunksToFileFromMultipartMessage(self, multipartMessage, zmqLiveViewerSocket): #extract multipart message try: @@ -292,16 +328,8 @@ class FileReceiver: raise Exception(errorMessage) - # send to albula - self.sendFileToAlbula(filename) - - - # add to ring buffer - self.addToRingBuffer(targetFilepath) - - - def stopReceiving(self, zmqSocket, msgContext): + def stopReceiving(self, zmqSocket, zmqLiveViwerSocket, msgContext): try: logging.debug("closing zmqSocket...") zmqSocket.close() @@ -310,6 +338,14 @@ class FileReceiver: logging.error("closing zmqSocket...failed.") logging.error(sys.exc_info()) + try: + logging.debug("closing zmqLiveViwerSocket...") + zmqLiveViewerSocket.close() + logging.debug("closing zmqLiveViwerSocket...done.") + except: + logging.error("closing zmqLiveViewerSocket...failed.") + logging.error(sys.exc_info()) + try: logging.debug("closing zmqContext...") msgContext.destroy() @@ -327,8 +363,10 @@ def argumentParsing(): parser = argparse.ArgumentParser() parser.add_argument("--outputDir" , type=str, help="where incoming data will be stored to", default="/tmp/watchdog/data_mirror/") parser.add_argument("--tcpPortDataStream" , type=int, help="tcp port of data pipe", default=6061) + parser.add_argument("--tcpPortLiveViewer" , type=int, help="tcp port of live viewer", default=6071) parser.add_argument("--logfile" , type=str, help="file used for logging", default="/tmp/watchdog/fileReceiver.log") parser.add_argument("--bindingIpForDataStream", type=str, help="local ip to bind dataStream to", default="127.0.0.1") + parser.add_argument("--bindingIpForLiveViewer", type=str, help="local ip to bind LiveViewer to", default="127.0.0.1") parser.add_argument("--verbose" , help="more verbose output", action="store_true") arguments = parser.parse_args() @@ -366,13 +404,15 @@ if __name__ == "__main__": #argument parsing - arguments = argumentParsing() - outputDir = arguments.outputDir - verbose = arguments.verbose + arguments = argumentParsing() + outputDir = arguments.outputDir + verbose = arguments.verbose zqmDataStreamPort = str(arguments.tcpPortDataStream) + zmqLiveViewerPort = str(arguments.tcpPortLiveViewer) zqmDataStreamIp = str(arguments.bindingIpForDataStream) - logFile = arguments.logfile - logfileFilePath = arguments.logfile + zmqLiveViewerIp = str(arguments.binginIpForLiveViewer) + logFile = arguments.logfile + logfileFilePath = arguments.logfile #enable logging @@ -380,4 +420,4 @@ if __name__ == "__main__": #start file receiver - myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp) + myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp, bindingPortForLiveViewer, zmqLiveViewerIp): -- GitLab