-
Manuela Kuhn authoredManuela Kuhn authored
FileReceiver.py 16.12 KiB
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'
import time
import zmq
import sys
import json
import logging
import errno
import os
import traceback
import threading
import socket # needed to get hostname
from Coordinator import Coordinator
#
# -------------------------- class: FileReceiver --------------------------------------
#
class FileReceiver:
zmqContext = None
outputDir = None
dataStreamIp = None
dataStreamPorts = []
liveViewerIp = None
liveViewerPort = None
exchangeIp = "127.0.0.1"
exchangePort = "6072"
senderComIp = None # ip for socket to communicate with receiver
senderComPort = None # port for socket to communicate receiver
socketResponseTimeout = None # time in milliseconds to wait for the sender to answer to a signal
log = None
# sockets
dataStreamSockets = [] # socket to receive the data from
exchangeSocket = None # socket to communicate with Coordinator class
senderComSocket = None # socket to communicate with sender
hostname = socket.gethostname()
# print socket.gethostbyname(socket.gethostname())
def __init__(self, outputDir, dataStreamIp, dataStreamPorts, liveViewerPort, liveViewerIp, senderComPort,
maxRingBuffersize, senderResponseTimeout = 1000, context = None):
self.outputDir = os.path.normpath(outputDir)
self.dataStreamIp = dataStreamIp
self.dataStreamPorts = dataStreamPorts
self.liveViewerIp = liveViewerIp
self.liveViewerPort = liveViewerPort
self.senderComIp = dataStreamIp # ip for socket to communicate with sender; is the same ip as the data stream ip
self.senderComPort = senderComPort
self.socketResponseTimeout = senderResponseTimeout
# if context:
# assert isinstance(context, zmq.sugar.context.Context)
self.zmqContext = context or zmq.Context()
self.log = self.getLogger()
self.log.debug("Init")
# start file receiver
self.receiverThread = threading.Thread(target=Coordinator, args=(self.outputDir, self.liveViewerPort, self.liveViewerIp, maxRingBuffersize))
self.receiverThread.start()
# create sockets
self.createSockets()
# Starting live viewer
message = "START_LIVE_VIEWER," + str(self.hostname)
self.log.info("Sending start signal to sender...")
self.log.debug("Sending start signal to sender, message: " + message)
print "sending message ", message
self.senderComSocket.send(str(message))
senderMessage = None
# wait for response of sender till timeout is reached
socks = dict(self.poller.poll(self.socketResponseTimeout))
# if there was a response
if self.senderComSocket in socks and socks[self.senderComSocket] == zmq.POLLIN:
try:
senderMessage = self.senderComSocket.recv()
print "answer to start live viewer: ", senderMessage
self.log.debug("Received message from sender: " + str(senderMessage) )
except KeyboardInterrupt:
self.log.error("KeyboardInterrupt: No message received from sender")
self.stopReceiving(sendToSender = False)
sys.exit(1)
except Exception as e:
self.log.error("No message received from sender")
self.log.debug("Error was: " + str(e))
self.stopReceiving(sendToSender = False)
sys.exit(1)
# if the response was correct: start data retrieving
if senderMessage == "START_LIVE_VIEWER":
self.log.info("Received confirmation from sender...start receiving files")
try:
self.log.info("Start receiving new files")
self.startReceiving()
self.log.info("Stopped receiving.")
except Exception, e:
self.log.error("Unknown error while receiving files. Need to abort.")
self.log.debug("Error was: " + str(e))
except:
trace = traceback.format_exc()
self.log.info("Unkown error state. Shutting down...")
self.log.debug("Error was: " + str(trace))
self.zmqContext.destroy()
# if there was no response or the response was of the wrong format, the receiver should be shut down
else:
print "Sending start signal to sender...failed."
self.log.info("Sending start signal to sender...failed.")
self.stopReceiving(sendToSender = False)
self.log.info("Quitting.")
def getLogger(self):
logger = logging.getLogger("fileReceiver")
return logger
def createSockets(self):
# create socket to exchange signals with Sender
self.senderComSocket = self.zmqContext.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
# self.senderComSocket.RCVTIMEO = self.socketResponseTimeout
connectionStr = "tcp://{ip}:{port}".format(ip=self.senderComIp, port=self.senderComPort)
self.senderComSocket.connect(connectionStr)
self.log.info("senderComSocket started (connect) for '" + connectionStr + "'")
# using a Poller to implement the senderComSocket timeout (in older ZMQ version there is no option RCVTIMEO)
self.poller = zmq.Poller()
self.poller.register(self.senderComSocket, zmq.POLLIN)
# create socket to communicate with Coordinator
self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStr = "tcp://{ip}:{port}".format(ip=self.exchangeIp, port=self.exchangePort)
self.exchangeSocket.connect(connectionStr)
self.log.debug("exchangeSocket started (connect) for '" + connectionStr + "'")
# create poller to differentiate between the different data stream port
self.dataPoller = zmq.Poller()
# create sockets to retrieve data from Sender
for dataStreamPort in self.dataStreamPorts:
dataStreamSocket = self.zmqContext.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format(ip=self.dataStreamIp, port=dataStreamPort)
dataStreamSocket.connect(connectionStr)
self.log.info("dataStreamSocket started (connect) for '" + connectionStr + "'")
self.dataStreamSockets.append(dataStreamSocket)
self.dataPoller.register(dataStreamSocket, zmq.POLLIN)
def startReceiving(self):
#run loop, and wait for incoming messages
loopCounter = 0 #counter of total received messages
continueReceiving = True #receiving will stop if value gets False
self.log.debug("Waiting for new messages...")
while continueReceiving:
try:
self.pollDifferentSockets()
loopCounter+=1
except KeyboardInterrupt:
self.log.debug("Keyboard interrupt detected. Stop receiving.")
continueReceiving = False
break
except:
self.log.error("receive message...failed.")
self.log.error(sys.exc_info())
continueReceiving = False
self.log.info("shutting down receiver...")
try:
self.stopReceiving()
self.log.debug("shutting down receiver...done.")
except:
self.log.error(sys.exc_info())
self.log.error("shutting down receiver...failed.")
def pollDifferentSockets(self):
socks = dict(self.dataPoller.poll())
for dataStreamSocket in self.dataStreamSockets:
if dataStreamSocket in socks and socks[dataStreamSocket] == zmq.POLLIN:
self.combineMessage(dataStreamSocket)
def combineMessage(self, dataStreamSocket):
receivingMessages = True
#save all chunks to file
while receivingMessages:
multipartMessage = dataStreamSocket.recv_multipart()
#extract multipart message
try:
#TODO is string conversion needed here?
payloadMetadata = str(multipartMessage[0])
except:
self.log.error("an empty config was transferred for multipartMessage")
#TODO validate multipartMessage (like correct dict-values for metadata)
self.log.debug("multipartMessage.metadata = " + str(payloadMetadata))
#extraction metadata from multipart-message
payloadMetadataDict = json.loads(payloadMetadata)
#append to file
try:
self.log.debug("append to file based on multipart-message...")
#TODO: save message to file using a thread (avoids blocking)
#TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
self.appendChunksToFileFromMultipartMessage(payloadMetadataDict, multipartMessage)
self.log.debug("append to file based on multipart-message...success.")
except KeyboardInterrupt:
errorMessage = "KeyboardInterrupt detected. Unable to append multipart-content to file."
self.log.info(errorMessage)
break
except Exception, e:
errorMessage = "Unable to append multipart-content to file."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
self.log.debug("append to file based on multipart-message...failed.")
if len(multipartMessage[1]) < payloadMetadataDict["chunkSize"] :
#indicated end of file. closing file and leave loop
self.log.debug("last file-chunk received. stop appending.")
break
filename = self.generateTargetFilepath(payloadMetadataDict)
fileModTime = payloadMetadataDict["fileModificationTime"]
self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
# send the file to the coordinator to add it to the ring buffer
message = "AddFile" + str(filename) + ", " + str(fileModTime)
self.log.debug("Send file to coordinator: " + message )
self.exchangeSocket.send(message)
def generateTargetFilepath(self,configDict):
"""
generates full path where target file will saved to.
"""
targetFilename = configDict["filename"]
targetRelativePath = configDict["relativePath"]
if targetRelativePath is '' or targetRelativePath is None:
targetPath = self.outputDir
else:
targetPath = os.path.normpath(self.outputDir + os.sep + targetRelativePath)
targetFilepath = os.path.join(targetPath, targetFilename)
return targetFilepath
def generateTargetPath(self,configDict):
"""
generates path where target file will saved to.
"""
targetRelativePath = configDict["relativePath"]
# if the relative path starts with a slash path.join will consider it as absolute path
if targetRelativePath.startswith("/"):
targetRelativePath = targetRelativePath[1:]
outputDir = self.outputDir
targetPath = os.path.join(outputDir, targetRelativePath)
return targetPath
def appendChunksToFileFromMultipartMessage(self, configDict, multipartMessage):
try:
chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata
payload = multipartMessage[1:]
except:
self.log.warning("an empty file was received within the multipart-message")
payload = None
#generate target filepath
targetFilepath = self.generateTargetFilepath(configDict)
self.log.debug("new file is going to be created at: " + targetFilepath)
#append payload to file
try:
newFile = open(targetFilepath, "a")
except IOError, e:
# errno.ENOENT == "No such file or directory"
if e.errno == errno.ENOENT:
#TODO create subdirectory first, then try to open the file again
try:
targetPath = self.generateTargetPath(configDict)
os.makedirs(targetPath)
newFile = open(targetFilepath, "w")
self.log.info("New target directory created: " + str(targetPath))
except Exception, f:
errorMessage = "unable to save payload to file: '" + targetFilepath + "'"
self.log.error(errorMessage)
self.log.debug("Error was: " + str(f))
self.log.debug("targetPath="+str(targetPath))
raise Exception(errorMessage)
else:
self.log.error("failed to append payload to file: '" + targetFilepath + "'")
self.log.debug("Error was: " + str(e))
except Exception, e:
self.log.error("failed to append payload to file: '" + targetFilepath + "'")
self.log.debug("Error was: " + str(e))
self.log.debug("ErrorTyp: " + str(type(e)))
self.log.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST))
#only write data if a payload exist
try:
if payload != None:
for chunk in payload:
newFile.write(chunk)
newFile.close()
# print "received file: ", targetFilepath
except Exception, e:
errorMessage = "unable to append data to file."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
raise Exception(errorMessage)
def stopReceiving(self, sendToSender = True):
self.log.debug("stopReceiving...")
for dataStreamSocket in self.dataStreamSockets:
try:
dataStreamSocket.close(0)
self.log.debug("closing dataStreamSocket...done.")
except:
self.log.error("closing dataStreamSocket...failed.")
self.log.error(sys.exc_info())
self.log.debug("sending exit signal to coordinator...")
self.exchangeSocket.send("Exit")
if sendToSender:
self.log.debug("sending stop signal to sender...")
message = "STOP_LIVE_VIEWER,"+ str(self.hostname)
print "sending message ", message
self.senderComSocket.send(str(message), zmq.NOBLOCK)
socks = dict(self.poller.poll(self.socketResponseTimeout))
if self.senderComSocket in socks and socks[self.senderComSocket] == zmq.POLLIN:
try:
senderMessage = self.senderComSocket.recv()
print "answer to stop live viewer: ", senderMessage
self.log.debug("Received message from sender: " + str(senderMessage) )
if senderMessage == "STOP_LIVE_VIEWER":
self.log.info("Received confirmation from sender...")
else:
self.log.error("Received confirmation from sender...failed")
except KeyboardInterrupt:
self.log.error("KeyboardInterrupt: No message received from sender")
except Exception as e:
self.log.error("sending stop signal to sender...failed.")
self.log.debug("Error was: " + str(e))
# give the signal time to arrive
time.sleep(0.1)
self.log.debug("closing signal communication sockets...")
self.exchangeSocket.close(0)
self.senderComSocket.close(0)
self.log.debug("closing signal communication sockets...done")
try:
self.zmqContext.destroy()
self.log.debug("closing zmqContext...done.")
except:
self.log.error("closing zmqContext...failed.")
self.log.error(sys.exc_info())