Skip to content
Snippets Groups Projects
Commit fb541371 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added ring buffer to receiver and wait with lsync copy till ZMQ is

finished
parent e968cce9
No related branches found
No related tags found
No related merge requests found
__author__ = 'Marco Strutz <marco.strutz@desy.de>'
__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <marnuel.kuhn@desy.de>'
import time
......@@ -11,6 +11,8 @@ import logging
import errno
import os
import traceback
from stat import S_ISREG, ST_MTIME, ST_MODE
class FileReceiver:
......@@ -20,12 +22,29 @@ class FileReceiver:
zqmDataStreamIp = None
bindingIpForDataStream = None
zqmFileserverIp = None
maxRingBufferSize = 100
timeToWaitForRingBuffer = 2
ringBuffer = []
def __init__(self, outputDir, bindingPortForDataStream, zqmFileserverIp):
self.outputDir = outputDir
self.bindingPortForDataStream = bindingPortForDataStream
self.zqmFileserverIp = zqmFileserverIp
# initialize ring buffer
# get all entries in the directory
ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir))
# get the corresponding stats
ringBuffer = ((os.stat(path), path) for path in ringBuffer)
# leave only regular files, insert modification date
ringBuffer = [[stat[ST_MTIME], path]
for stat, path in ringBuffer if S_ISREG(stat[ST_MODE])]
# sort the ring buffer in descending order (new to old files)
ringBuffer = sorted(ringBuffer, reverse=True)
# "global" in Python is per-module !
global globalZmqContext
self.globalZmqContext = zmq.Context()
......@@ -147,22 +166,6 @@ class FileReceiver:
logging.error(sys.exc_info())
continueReceiving = False
# #create new file object
# try:
# logging.debug("create new file based on multipart-message...")
# #TODO: save message to file using a thread (avoids blocking)
# self.saveFileFromMultipartMessage(multipartMessage)
# logging.debug("create new file based on multipart-message...success.")
# except Exception, e:
# errorMessage = "Unable to save multipart-content as file."
# logging.error(errorMessage)
# logging.debug("Error was: " + str(e))
# logging.debug("create new file based on multipart-message...failed.")
# except:
# errorMessage = "Unable to save multipart-content as file. Unknown Error."
# logging.error(errorMessage)
# logging.debug("create new file based on multipart-message...failed.")
logging.info("shutting down receiver...")
try:
......@@ -174,69 +177,6 @@ class FileReceiver:
logging.error("shutting down zeromq...failed.")
def saveFileFromMultipartMessage(self, multipartMessage):
#extract multipart message
try:
configDictJson = multipartMessage[0]
except:
logging.error("an empty config was transferred for multipartMessage")
try:
chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata
payload = multipartMessage[1:]
except:
logging.warning("an empty file was received within the multipart-message")
payload = None
#TODO validate multipartMessage (like correct dict-values for metadata)
logging.debug("multipartMessage.metadata = " + str(configDictJson))
#extraction metadata from multipart-message
configDict = json.loads(configDictJson)
targetFilename = configDict["filename"]
targetFilesize = configDict["filesize"]
targetRelativePath = configDict["relativeParent"]
#generate target filepath
targetPath = self.outputDir + "/" + targetRelativePath
targetFilepath = targetPath + "/" + targetFilename
logging.debug("new file is going to be created at: " + targetFilepath)
#write payload to file
try:
newFile = open(targetFilepath, "w")
except IOError, e:
# errno.ENOENT == "No such file or directory"
if e.errno == errno.ENOENT:
#TODO create subdirectory first, then try to open the file again
try:
os.makedirs(targetPath)
newFile = open(targetFilepath, "w")
except Exception, f:
errorMessage = "unable to save payload to file: '" + targetFilepath + "'"
logging.error(errorMessage)
logging.debug("Error was: " + str(f))
raise Exception(errorMessage)
except Exception, e:
logging.error("failed to save payload to file: '" + targetFilepath + "'")
logging.debug("Error was: " + str(e))
logging.debug("ErrorTyp: " + str(type(e)))
logging.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST))
#only write data if a payload exist
try:
if payload != None:
for chunk in payload:
newFile.write(chunk)
newFile.close()
except Exception, e:
errorMessage = "unable to write data to file."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
raise Exception(errorMessage)
def generateTargetFilepath(self,configDict):
"""
generates full path where target file will saved to.
......@@ -272,6 +212,24 @@ class FileReceiver:
return targetPath
def addToRingBuffer(self, targetFilepath):
# prepend file to ring buffer (buffer is sorted)
self.ringBuffer[:0] = [[os.stat(targetFilepath)[ST_MTIME], targetFilepath]]
# if the maximal size is exceeded: remove the oldest files
if len(self.ringBuffer) > self.maxRingBufferSize:
for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]:
if int(time.time()) - mod_time > self.timeToWaitForRingBuffer:
os.remove(path)
self.ringBuffer.remove([mod_time, path])
# Albula is the live viewer used at the beamlines
def sendFileToAlbula(self, targetFilepath):
# subframe.loadFile(targetFilepath)
pass
def appendChunksToFileFromMultipartMessage(self, multipartMessage):
#extract multipart message
......@@ -334,6 +292,15 @@ class FileReceiver:
raise Exception(errorMessage)
# send to albula
self.sendFileToAlbula(filename)
# add to ring buffer
self.addToRingBuffer(targetFilepath)
def stopReceiving(self, zmqSocket, msgContext):
try:
logging.debug("closing zmqSocket...")
......@@ -413,4 +380,4 @@ if __name__ == "__main__":
#start file receiver
myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp)
\ No newline at end of file
myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp)
......@@ -364,7 +364,7 @@ if __name__ == '__main__':
pathnames = message.splitlines()
for filepath in pathnames:
directoryWatcher.passFileToZeromq(filepath)
time.sleep(5)
time.sleep(0.1)
zmqContext.destroy()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment