Skip to content
Snippets Groups Projects
Commit 9ba0529e authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added communication with live viewer infrastructure to receiver

parent 7a03e9d7
No related branches found
No related tags found
No related merge requests found
......@@ -12,6 +12,7 @@ import errno
import os
import traceback
from stat import S_ISREG, ST_MTIME, ST_MODE
import threading
......@@ -25,13 +26,13 @@ class FileReceiver:
maxRingBufferSize = 100
timeToWaitForRingBuffer = 2
ringBuffer = []
alublaZmqContext = None
liveViewerZmqContext = None
def __init__(self, outputDir, bindingPortForDataStream, zmqFileserverIp, bindingPortForLiveViewer, zmqLiveViewerIP):
def __init__(self, outputDir, bindingPortForDataStream, zmqFileserverIp, bindingPortForLiveViewer, zmqLiveViewerIp):
self.outputDir = outputDir
self.bindingPortForDataStream = bindingPortForDataStream
self.zmqFileserverIp = zmqFileserverIp
self.zqmLiveViewerIp = zqmLiveViewerIp
self.zmqLiveViewerIp = zmqLiveViewerIp
self.bindingPortForLiveViewer = bindingPortForLiveViewer
# initialize ring buffer
......@@ -53,9 +54,28 @@ class FileReceiver:
global globalZmqContext
self.globalZmqContext = zmq.Context()
# thread to communicate with live viewer
#create socket for live viewer
try:
logging.info("creating socket for communication with live viewer...")
zmqContext = self.getZmqContext()
zmqLiveViewerSocket = self.createSocketForLiveViewer(zmqContext)
logging.info("creating socket for communication with live viewer...done.")
except Exception, e:
errorMessage = "Unable to create zeromq context."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.info("creating socket for communication with live viewer...failed.")
raise Exception(e)
t1 = threading.Thread(target=self.sendFileToLiveViewer, args=(zmqLiveViewerSocket,))
t1.start()
try:
logging.info("Start receiving new files")
self.startReceiving()
self.startReceiving(zmqLiveViewerSocket)
logging.info("Stopped receiving.")
except Exception, e:
logging.error("Unknown error while receiving files. Need to abort.")
......@@ -94,6 +114,14 @@ class FileReceiver:
return socket
def getZmqSocket_Rep(self, context):
pattern_pull = zmq.REP
assert isinstance(context, zmq.sugar.context.Context)
socket = context.socket(pattern_pull)
return socket
def createPullSocket(self, context):
assert isinstance(context, zmq.sugar.context.Context)
socket = self.getZmqSocket_Pull(context)
......@@ -104,9 +132,9 @@ class FileReceiver:
return socket
def createPushSocket(self, context):
def createSocketForLiveViewer(self, context):
assert isinstance(context, zmq.sugar.context.Context)
socket = self.getZmqSocket_Pull(context)
socket = self.getZmqSocket_Rep(context)
logging.info("binding to data socket: tcp://" + self.zmqLiveViewerIp + ":%s" % self.bindingPortForLiveViewer)
socket.bind('tcp://' + self.zmqLiveViewerIp + ':%s' % self.bindingPortForLiveViewer)
......@@ -122,18 +150,61 @@ class FileReceiver:
# if the maximal size is exceeded: remove the oldest files
if len(self.ringBuffer) > self.maxRingBufferSize:
for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]:
if int(time.time()) - mod_time > self.timeToWaitForRingBuffer:
if float(time.time()) - mod_time > self.timeToWaitForRingBuffer:
os.remove(path)
self.ringBuffer.remove([mod_time, path])
# Albula is the live viewer used at the beamlines
def sendFileToAlbula(self, zmqLiveViewerSocket):
def sendFileToLiveViewer(self, zmqLiveViewerSocket):
#send first element in ring buffer to albula
pass
def startReceiving(self):
def combineMessage(self, zmqSocket):
# multipartMessage = zmqSocket.recv_multipart()
# logging.info("New message received.")
# logging.debug("message-type : " + str(type(multipartMessage)))
# logging.debug("message-length: " + str(len(multipartMessage)))
# loopCounter+=1
#save all chunks to file
while True:
multipartMessage = zmqSocket.recv_multipart()
#append to file
try:
logging.debug("append to file based on multipart-message...")
#TODO: save message to file using a thread (avoids blocking)
#TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
self.appendChunksToFileFromMultipartMessage(multipartMessage)
logging.debug("append to file based on multipart-message...success.")
except Exception, e:
errorMessage = "Unable to append multipart-content to file."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.debug("append to file based on multipart-message...failed.")
except:
errorMessage = "Unable to append multipart-content to file. Unknown Error."
logging.error(errorMessage)
logging.debug("append to file based on multipart-message...failed.")
if len(multipartMessage[1]) == 0:
#indicated end of file. closing file and leave loop
logging.debug("last file-chunk received. stop appending.")
break
payloadMetadata = str(multipartMessage[0])
payloadMetadataDict = json.loads(payloadMetadata)
filename = self.generateTargetFilepath(payloadMetadataDict)
fileModTime = payloadMetadataDict["fileModificationTime"]
logging.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
# logging.debug("message-type : " + str(type(multipartMessage)))
# logging.debug("message-length: " + str(len(multipartMessage)))
# add to ring buffer
self.addFileToRingBuffer(str(filename), fileModTime)
def startReceiving(self, zmqLiveViewerSocket):
#create pull socket
try:
logging.info("creating local pullSocket for incoming files...")
......@@ -147,19 +218,6 @@ class FileReceiver:
logging.info("creating local pullSocket for incoming files...failed.")
raise Exception(e)
#create pull socket
try:
logging.info("creating local pushSocket for outgoing files...")
zmqContext = self.getZmqContext()
zmqLiveViewerSocket = self.createPushSocket(zmqContext)
logging.info("creating local pushSocket for outgoing files...done.")
except Exception, e:
errorMessage = "Unable to create zeromq context."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.info("creating local pushSocket for outgoing files...failed.")
raise Exception(e)
#run loop, and wait for incoming messages
continueStreaming = True
loopCounter = 0 #counter of total received messages
......@@ -167,42 +225,7 @@ class FileReceiver:
logging.debug("Waiting for new messages...")
while continueReceiving:
try:
# multipartMessage = zmqSocket.recv_multipart()
# logging.info("New message received.")
# logging.debug("message-type : " + str(type(multipartMessage)))
# logging.debug("message-length: " + str(len(multipartMessage)))
# loopCounter+=1
#save all chunks to file
while True:
multipartMessage = zmqSocket.recv_multipart()
#append to file
try:
logging.debug("append to file based on multipart-message...")
#TODO: save message to file using a thread (avoids blocking)
#TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
self.appendChunksToFileFromMultipartMessage(multipartMessage, zmqLiveViewerSocket)
logging.debug("append to file based on multipart-message...success.")
except Exception, e:
errorMessage = "Unable to append multipart-content to file."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.debug("append to file based on multipart-message...failed.")
except:
errorMessage = "Unable to append multipart-content to file. Unknown Error."
logging.error(errorMessage)
logging.debug("append to file based on multipart-message...failed.")
if len(multipartMessage[1]) == 0:
#indicated end of file. closing file and leave loop
logging.debug("last file-chunk received. stop appending.")
break
payloadMetadata = str(multipartMessage[0])
payloadMetadataDict = json.loads(payloadMetadata)
filename = self.generateTargetFilepath(payloadMetadataDict)
fileModTime = payloadMetadataDict
logging.info("New file with modification time " + fileModTime + " received and saved: " + str(filename))
# logging.debug("message-type : " + str(type(multipartMessage)))
# logging.debug("message-length: " + str(len(multipartMessage)))
self.combineMessage(zmqSocket)
loopCounter+=1
except KeyboardInterrupt:
logging.debug("Keyboard interrupt detected. Stop receiving.")
......@@ -212,19 +235,10 @@ class FileReceiver:
logging.error(sys.exc_info())
continueReceiving = False
# add to ring buffer
self.addFileToRingBuffer(filename, fileModTime)
# send to albula
self.sendNewestFileToAlbula(zmqLiveViewerSocket)
logging.info("shutting down receiver...")
try:
logging.debug("shutting down zeromq...")
self.stopReceiving(zmqSocket, zmpLiveViewer, zmqContext)
self.stopReceiving(zmqSocket, zmqLiveViewerSocket, zmqContext)
logging.debug("shutting down zeromq...done.")
except:
logging.error(sys.exc_info())
......@@ -266,7 +280,7 @@ class FileReceiver:
return targetPath
def appendChunksToFileFromMultipartMessage(self, multipartMessage, zmqLiveViewerSocket):
def appendChunksToFileFromMultipartMessage(self, multipartMessage):
#extract multipart message
try:
......@@ -329,7 +343,7 @@ class FileReceiver:
def stopReceiving(self, zmqSocket, zmqLiveViwerSocket, msgContext):
def stopReceiving(self, zmqSocket, zmqLiveViewerSocket, msgContext):
try:
logging.debug("closing zmqSocket...")
zmqSocket.close()
......@@ -387,7 +401,7 @@ def initLogging(filenameFullPath, verbose):
#log everything to file
logging.basicConfig(level=loggingLevel,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s',
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=filenameFullPath,
filemode="a")
......@@ -410,7 +424,7 @@ if __name__ == "__main__":
zqmDataStreamPort = str(arguments.tcpPortDataStream)
zmqLiveViewerPort = str(arguments.tcpPortLiveViewer)
zqmDataStreamIp = str(arguments.bindingIpForDataStream)
zmqLiveViewerIp = str(arguments.binginIpForLiveViewer)
zmqLiveViewerIp = str(arguments.bindingIpForLiveViewer)
logFile = arguments.logfile
logfileFilePath = arguments.logfile
......@@ -420,4 +434,4 @@ if __name__ == "__main__":
#start file receiver
myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp, bindingPortForLiveViewer, zmqLiveViewerIp):
myWorker = FileReceiver(outputDir, zqmDataStreamPort, zqmDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment