Skip to content
Snippets Groups Projects
Commit f6379e88 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Use original modification time in ring buffer

parent cb1aea66
No related branches found
No related tags found
No related merge requests found
......@@ -158,10 +158,12 @@ class WorkerProcess():
waitTime = 2000.0
return waitTime
def getFileMaxWaitTimeInMs(self):
maxWaitTime = 10000.0
return maxWaitTime
def passFileToDataStream(self, zmqDataStreamSocket, filename, sourcePath, relativeParent):
"""filesizeRequested == filesize submitted by file-event. In theory it can differ to real file size"""
......@@ -202,9 +204,11 @@ class WorkerProcess():
#for quick testing set filesize of file as chunksize
logging.debug("get filesize for '" + str(sourceFilePathFull) + "'...")
filesize = os.path.getsize(sourceFilePathFull)
chunksize = filesize #can be used later on to split multipart message
filesize = os.path.getsize(sourceFilePathFull)
fileModificationTime = os.stat(sourceFilePathFull)
chunksize = filesize #can be used later on to split multipart message
logging.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize)))
logging.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime)))
except Exception, e:
errorMessage = "Unable to get file metadata for '" + str(sourceFilePathFull) + "'."
......@@ -225,7 +229,7 @@ class WorkerProcess():
#build payload for message-pipe by putting source-file into a message
try:
payloadMetadata = self.buildPayloadMetadata(filename, filesize, sourcePath, relativeParent)
payloadMetadata = self.buildPayloadMetadata(filename, filesize, fileModificationTime, sourcePath, relativeParent)
except Exception, e:
errorMessage = "Unable to assemble multi-part message."
logging.error(errorMessage)
......@@ -291,11 +295,12 @@ class WorkerProcess():
def buildPayloadMetadata(self, filename, filesize, sourcePath, relativeParent):
def buildPayloadMetadata(self, filename, filesize, fileModificationTime, sourcePath, relativeParent):
"""
builds metadata for zmq-multipart-message. should be used as first element for payload.
:param filename:
:param filesize:
:param fileModificationTime:
:param sourcePath:
:param relativeParent:
:return:
......@@ -304,11 +309,12 @@ class WorkerProcess():
#add metadata to multipart
logging.debug("create metadata for source file...")
metadataDict = {
"filename" : filename,
"filesize" : filesize,
"sourcePath" : sourcePath,
"relativeParent" : relativeParent,
"chunkSize" : self.getChunkSize()}
"filename" : filename,
"filesize" : filesize,
"fileModificationTime" : fileModificationTime,
"sourcePath" : sourcePath,
"relativeParent" : relativeParent,
"chunkSize" : self.getChunkSize()}
logging.debug("metadataDict = " + str(metadataDict))
......@@ -423,9 +429,11 @@ class FileMover():
def getFileWaitTimeInMs(self):
return self.fileWaitTimeInMs
def getFileMaxWaitTimeInMs(self):
return self.fileMaxWaitTimeInMs
def startReceiving(self):
#create socket
zmqContext = self.zmqContext
......@@ -532,6 +540,7 @@ class FileMover():
def processFileEvent(self, fileEventMessage, routerSocket):
self.routeFileEventToWorkerThread(fileEventMessage, routerSocket)
def stopReceiving(self, zmqSocket, msgContext):
try:
logging.debug("closing zmqSocket...")
......
......@@ -21,18 +21,22 @@ class FileReceiver:
bindingPortForDataStream = None
zqmDataStreamIp = None
bindingIpForDataStream = None
zqmFileserverIp = None
zmqFileserverIp = None
maxRingBufferSize = 100
timeToWaitForRingBuffer = 2
ringBuffer = []
alublaZmqContext = None
def __init__(self, outputDir, bindingPortForDataStream, zqmFileserverIp):
def __init__(self, outputDir, bindingPortForDataStream, zmqFileserverIp, bindingPortForLiveViewer, zmqLiveViewerIP):
self.outputDir = outputDir
self.bindingPortForDataStream = bindingPortForDataStream
self.zqmFileserverIp = zqmFileserverIp
self.zmqFileserverIp = zmqFileserverIp
self.zqmLiveViewerIp = zqmLiveViewerIp
self.bindingPortForLiveViewer = bindingPortForLiveViewer
# initialize ring buffer
# get all entries in the directory
# TODO empty target dir -> ringBuffer = []
ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir))
# get the corresponding stats
ringBuffer = ((os.stat(path), path) for path in ringBuffer)
......@@ -94,12 +98,41 @@ class FileReceiver:
assert isinstance(context, zmq.sugar.context.Context)
socket = self.getZmqSocket_Pull(context)
logging.info("binding to data socket: tcp://" + self.zqmFileserverIp + ":%s" % self.bindingPortForDataStream)
socket.bind('tcp://' + self.zqmFileserverIp + ':%s' % self.bindingPortForDataStream)
logging.info("binding to data socket: tcp://" + self.zmqFileserverIp + ":%s" % self.bindingPortForDataStream)
socket.bind('tcp://' + self.zmqFileserverIp + ':%s' % self.bindingPortForDataStream)
return socket
def createPushSocket(self, context):
assert isinstance(context, zmq.sugar.context.Context)
socket = self.getZmqSocket_Pull(context)
logging.info("binding to data socket: tcp://" + self.zmqLiveViewerIp + ":%s" % self.bindingPortForLiveViewer)
socket.bind('tcp://' + self.zmqLiveViewerIp + ':%s' % self.bindingPortForLiveViewer)
return socket
def addFileToRingBuffer(self, filename, fileModTime):
# prepend file to ring buffer and restore order
self.ringBuffer[:0] = [[fileModTime, filename]]
self.ringBuffer = sorted(self.ringBuffer, reverse=True)
# if the maximal size is exceeded: remove the oldest files
if len(self.ringBuffer) > self.maxRingBufferSize:
for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]:
if int(time.time()) - mod_time > self.timeToWaitForRingBuffer:
os.remove(path)
self.ringBuffer.remove([mod_time, path])
# Albula is the live viewer used at the beamlines
def sendFileToAlbula(self, zmqLiveViewerSocket):
#send first element in ring buffer to albula
pass
def startReceiving(self):
#create pull socket
try:
......@@ -114,6 +147,18 @@ class FileReceiver:
logging.info("creating local pullSocket for incoming files...failed.")
raise Exception(e)
#create pull socket
try:
logging.info("creating local pushSocket for outgoing files...")
zmqContext = self.getZmqContext()
zmqLiveViewerSocket = self.createPushSocket(zmqContext)
logging.info("creating local pushSocket for outgoing files...done.")
except Exception, e:
errorMessage = "Unable to create zeromq context."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.info("creating local pushSocket for outgoing files...failed.")
raise Exception(e)
#run loop, and wait for incoming messages
continueStreaming = True
......@@ -135,7 +180,7 @@ class FileReceiver:
logging.debug("append to file based on multipart-message...")
#TODO: save message to file using a thread (avoids blocking)
#TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
self.appendChunksToFileFromMultipartMessage(multipartMessage)
self.appendChunksToFileFromMultipartMessage(multipartMessage, zmqLiveViewerSocket)
logging.debug("append to file based on multipart-message...success.")
except Exception, e:
errorMessage = "Unable to append multipart-content to file."
......@@ -150,10 +195,11 @@ class FileReceiver:
#indicated end of file. closing file and leave loop
logging.debug("last file-chunk received. stop appending.")
break
payloadMetadata = str(multipartMessage[0])
payloadMetadata = str(multipartMessage[0])
payloadMetadataDict = json.loads(payloadMetadata)
filename = self.generateTargetFilepath(payloadMetadataDict)
logging.info("New file received and saved: " + str(filename))
filename = self.generateTargetFilepath(payloadMetadataDict)
fileModTime = payloadMetadataDict
logging.info("New file with modification time " + fileModTime + " received and saved: " + str(filename))
# logging.debug("message-type : " + str(type(multipartMessage)))
# logging.debug("message-length: " + str(len(multipartMessage)))
......@@ -167,10 +213,18 @@ class FileReceiver:
continueReceiving = False
# add to ring buffer
self.addFileToRingBuffer(filename, fileModTime)
# send to albula
self.sendNewestFileToAlbula(zmqLiveViewerSocket)
logging.info("shutting down receiver...")
try:
logging.debug("shutting down zeromq...")
self.stopReceiving(zmqSocket, zmqContext)
self.stopReceiving(zmqSocket, zmpLiveViewer, zmqContext)
logging.debug("shutting down zeromq...done.")
except:
logging.error(sys.exc_info())
......@@ -212,25 +266,7 @@ class FileReceiver:
return targetPath
def addToRingBuffer(self, targetFilepath):
# prepend file to ring buffer (buffer is sorted)
self.ringBuffer[:0] = [[os.stat(targetFilepath)[ST_MTIME], targetFilepath]]
# if the maximal size is exceeded: remove the oldest files
if len(self.ringBuffer) > self.maxRingBufferSize:
for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]:
if int(time.time()) - mod_time > self.timeToWaitForRingBuffer:
os.remove(path)
self.ringBuffer.remove([mod_time, path])
# Albula is the live viewer used at the beamlines
def sendFileToAlbula(self, targetFilepath):
# subframe.loadFile(targetFilepath)
pass
def appendChunksToFileFromMultipartMessage(self, multipartMessage):
def appendChunksToFileFromMultipartMessage(self, multipartMessage, zmqLiveViewerSocket):
#extract multipart message
try:
......@@ -292,16 +328,8 @@ class FileReceiver:
raise Exception(errorMessage)
# send to albula
self.sendFileToAlbula(filename)
# add to ring buffer
self.addToRingBuffer(targetFilepath)
def stopReceiving(self, zmqSocket, msgContext):
def stopReceiving(self, zmqSocket, zmqLiveViwerSocket, msgContext):
try:
logging.debug("closing zmqSocket...")
zmqSocket.close()
......@@ -310,6 +338,14 @@ class FileReceiver:
logging.error("closing zmqSocket...failed.")
logging.error(sys.exc_info())
try:
logging.debug("closing zmqLiveViwerSocket...")
zmqLiveViewerSocket.close()
logging.debug("closing zmqLiveViwerSocket...done.")
except:
logging.error("closing zmqLiveViewerSocket...failed.")
logging.error(sys.exc_info())
try:
logging.debug("closing zmqContext...")
msgContext.destroy()
......@@ -327,8 +363,10 @@ def argumentParsing():
parser = argparse.ArgumentParser()
parser.add_argument("--outputDir" , type=str, help="where incoming data will be stored to", default="/tmp/watchdog/data_mirror/")
parser.add_argument("--tcpPortDataStream" , type=int, help="tcp port of data pipe", default=6061)
parser.add_argument("--tcpPortLiveViewer" , type=int, help="tcp port of live viewer", default=6071)
parser.add_argument("--logfile" , type=str, help="file used for logging", default="/tmp/watchdog/fileReceiver.log")
parser.add_argument("--bindingIpForDataStream", type=str, help="local ip to bind dataStream to", default="127.0.0.1")
parser.add_argument("--bindingIpForLiveViewer", type=str, help="local ip to bind LiveViewer to", default="127.0.0.1")
parser.add_argument("--verbose" , help="more verbose output", action="store_true")
arguments = parser.parse_args()
......@@ -366,13 +404,15 @@ if __name__ == "__main__":
#argument parsing
arguments = argumentParsing()
outputDir = arguments.outputDir
verbose = arguments.verbose
arguments = argumentParsing()
outputDir = arguments.outputDir
verbose = arguments.verbose
zqmDataStreamPort = str(arguments.tcpPortDataStream)
zmqLiveViewerPort = str(arguments.tcpPortLiveViewer)
zqmDataStreamIp = str(arguments.bindingIpForDataStream)
logFile = arguments.logfile
logfileFilePath = arguments.logfile
zmqLiveViewerIp = str(arguments.binginIpForLiveViewer)
logFile = arguments.logfile
logfileFilePath = arguments.logfile
#enable logging
......@@ -380,4 +420,4 @@ if __name__ == "__main__":
#start file receiver
myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp)
myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp, bindingPortForLiveViewer, zmqLiveViewerIp):
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment