From fb541371b2300f5cd19eaca541d0ea7b8fa96ac1 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Wed, 15 Jul 2015 15:14:47 +0200 Subject: [PATCH] Added ring buffer to receiver and wait with lsync copy till ZMQ is finished --- ZeroMQTunnel/receiver.py | 129 ++++++++++++--------------------- ZeroMQTunnel/watcher_lsyncd.py | 2 +- 2 files changed, 49 insertions(+), 82 deletions(-) diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index 4abb1756..869778fd 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -1,4 +1,4 @@ -__author__ = 'Marco Strutz <marco.strutz@desy.de>' +__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <marnuel.kuhn@desy.de>' import time @@ -11,6 +11,8 @@ import logging import errno import os import traceback +from stat import S_ISREG, ST_MTIME, ST_MODE + class FileReceiver: @@ -20,12 +22,29 @@ class FileReceiver: zqmDataStreamIp = None bindingIpForDataStream = None zqmFileserverIp = None + maxRingBufferSize = 100 + timeToWaitForRingBuffer = 2 + ringBuffer = [] def __init__(self, outputDir, bindingPortForDataStream, zqmFileserverIp): self.outputDir = outputDir self.bindingPortForDataStream = bindingPortForDataStream self.zqmFileserverIp = zqmFileserverIp + # initialize ring buffer + # get all entries in the directory + ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir)) + # get the corresponding stats + ringBuffer = ((os.stat(path), path) for path in ringBuffer) + + # leave only regular files, insert modification date + ringBuffer = [[stat[ST_MTIME], path] + for stat, path in ringBuffer if S_ISREG(stat[ST_MODE])] + + # sort the ring buffer in descending order (new to old files) + ringBuffer = sorted(ringBuffer, reverse=True) + + # "global" in Python is per-module ! global globalZmqContext self.globalZmqContext = zmq.Context() @@ -147,22 +166,6 @@ class FileReceiver: logging.error(sys.exc_info()) continueReceiving = False - # #create new file object - # try: - # logging.debug("create new file based on multipart-message...") - # #TODO: save message to file using a thread (avoids blocking) - # self.saveFileFromMultipartMessage(multipartMessage) - # logging.debug("create new file based on multipart-message...success.") - # except Exception, e: - # errorMessage = "Unable to save multipart-content as file." - # logging.error(errorMessage) - # logging.debug("Error was: " + str(e)) - # logging.debug("create new file based on multipart-message...failed.") - # except: - # errorMessage = "Unable to save multipart-content as file. Unknown Error." - # logging.error(errorMessage) - # logging.debug("create new file based on multipart-message...failed.") - logging.info("shutting down receiver...") try: @@ -174,69 +177,6 @@ class FileReceiver: logging.error("shutting down zeromq...failed.") - def saveFileFromMultipartMessage(self, multipartMessage): - - #extract multipart message - try: - configDictJson = multipartMessage[0] - except: - logging.error("an empty config was transferred for multipartMessage") - try: - chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata - payload = multipartMessage[1:] - except: - logging.warning("an empty file was received within the multipart-message") - payload = None - - #TODO validate multipartMessage (like correct dict-values for metadata) - logging.debug("multipartMessage.metadata = " + str(configDictJson)) - - #extraction metadata from multipart-message - configDict = json.loads(configDictJson) - targetFilename = configDict["filename"] - targetFilesize = configDict["filesize"] - targetRelativePath = configDict["relativeParent"] - - #generate target filepath - targetPath = self.outputDir + "/" + targetRelativePath - targetFilepath = targetPath + "/" + targetFilename - logging.debug("new file is going to be created at: " + targetFilepath) - - - #write payload to file - try: - newFile = open(targetFilepath, "w") - except IOError, e: - # errno.ENOENT == "No such file or directory" - if e.errno == errno.ENOENT: - #TODO create subdirectory first, then try to open the file again - try: - os.makedirs(targetPath) - newFile = open(targetFilepath, "w") - except Exception, f: - errorMessage = "unable to save payload to file: '" + targetFilepath + "'" - logging.error(errorMessage) - logging.debug("Error was: " + str(f)) - raise Exception(errorMessage) - except Exception, e: - logging.error("failed to save payload to file: '" + targetFilepath + "'") - logging.debug("Error was: " + str(e)) - logging.debug("ErrorTyp: " + str(type(e))) - logging.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST)) - #only write data if a payload exist - try: - if payload != None: - for chunk in payload: - newFile.write(chunk) - newFile.close() - except Exception, e: - errorMessage = "unable to write data to file." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - raise Exception(errorMessage) - - - def generateTargetFilepath(self,configDict): """ generates full path where target file will saved to. @@ -272,6 +212,24 @@ class FileReceiver: return targetPath + def addToRingBuffer(self, targetFilepath): + # prepend file to ring buffer (buffer is sorted) + self.ringBuffer[:0] = [[os.stat(targetFilepath)[ST_MTIME], targetFilepath]] + + # if the maximal size is exceeded: remove the oldest files + if len(self.ringBuffer) > self.maxRingBufferSize: + for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]: + if int(time.time()) - mod_time > self.timeToWaitForRingBuffer: + os.remove(path) + self.ringBuffer.remove([mod_time, path]) + + # Albula is the live viewer used at the beamlines + def sendFileToAlbula(self, targetFilepath): +# subframe.loadFile(targetFilepath) + + pass + + def appendChunksToFileFromMultipartMessage(self, multipartMessage): #extract multipart message @@ -334,6 +292,15 @@ class FileReceiver: raise Exception(errorMessage) + # send to albula + self.sendFileToAlbula(filename) + + + # add to ring buffer + self.addToRingBuffer(targetFilepath) + + + def stopReceiving(self, zmqSocket, msgContext): try: logging.debug("closing zmqSocket...") @@ -413,4 +380,4 @@ if __name__ == "__main__": #start file receiver - myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp) \ No newline at end of file + myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp) diff --git a/ZeroMQTunnel/watcher_lsyncd.py b/ZeroMQTunnel/watcher_lsyncd.py index 9e3432c6..3a619e12 100644 --- a/ZeroMQTunnel/watcher_lsyncd.py +++ b/ZeroMQTunnel/watcher_lsyncd.py @@ -364,7 +364,7 @@ if __name__ == '__main__': pathnames = message.splitlines() for filepath in pathnames: directoryWatcher.passFileToZeromq(filepath) - time.sleep(5) + time.sleep(0.1) zmqContext.destroy() -- GitLab