-
Manuela Kuhn authoredManuela Kuhn authored
dataTransferAPI.py 12.91 KiB
# API to communicate with a data transfer unit
from __future__ import print_function
import zmq
import socket
import logging
import sys
import json
class dataTransferQuery():
context = None
externalContext = True
signalIp = None
signalPort = None
dataIp = None
hostname = None
dataPort = None
signalSocket = None
dataSocket = None
log = None
streamStarted = False
queryNewestStarted = False
queryMetadataStarted = False
socketResponseTimeout = None
def __init__(self, signalPort, signalIp, dataPort, dataIp = "0.0.0.0", useLog = False, context = None):
if useLog:
self.log = logging.getLogger("dataTransferAPI")
else:
class loggingFunction:
def __init__(self):
self.debug = lambda x: print(x)
self.info = lambda x: print(x)
self.warning = lambda x: print(x)
self.error = lambda x: print(x)
self.critical = lambda x: print(x)
self.log = loggingFunction()
# ZMQ applications always start by creating a context,
# and then using that for creating sockets
# (source: ZeroMQ, Messaging for Many Applications by Pieter Hintjens)
if context:
self.context = context
self.externalContext = True
else:
self.context = zmq.Context()
self.externalContext = False
self.signalIp = signalIp
self.signalPort = signalPort
self.dataIp = dataIp
self.hostname = socket.gethostname()
self.dataPort = dataPort
self.socketResponseTimeout = 1000
# To send a notification that a Displayer is up and running, a communication socket is needed
# create socket to exchange signals with Sender
self.signalSocket = self.context.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
# self.signalSocket.RCVTIMEO = self.socketResponseTimeout
connectionStr = "tcp://" + str(self.signalIp) + ":" + str(self.signalPort)
try:
self.signalSocket.connect(connectionStr)
self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
self.poller = zmq.Poller()
self.poller.register(self.signalSocket, zmq.POLLIN)
##
#
# Initailizes the data transfer socket and
# signals the sender which kind of connection should be used
#
# Returns 0 if the connection could be initializes without errors
# Error Codes are:
# 10 if the connection type is not supported
# 11 if there is already one connection running
# 12 Could not send signal
# 13 Could not poll new for new message
# 14 Could not receive answer to signal
# 15 if the response was not correct
#
##
def initConnection(self, connectionType):
supportedConnections = ["stream", "queryNewest", "queryMetadata"]
if connectionType not in supportedConnections:
self.log.info("Chosen type of connection is not supported.")
return 10
alreadyConnected = self.streamStarted or self.queryNewestStarted or self.queryMetadataStarted
signal = None
if connectionType == "stream" and not alreadyConnected:
signal = "START_LIVE_VIEWER"
elif connectionType == "queryNewest" and not alreadyConnected:
signal = "START_REALTIME_ANALYSIS"
elif connectionType == "queryMetadata" and not alreadyConnected:
signal = "START_DISPLAYER"
else:
self.log.info("Other connection type already runnging.")
self.log.info("More than one connection type is currently not supported.")
return 11
# Send the signal that the communication infrastructure should be established
self.log.info("Sending Start Signal")
sendMessage = str(signal) + "," + str(self.hostname) + "," + str(self.dataPort)
try:
self.signalSocket.send(sendMessage)
except Exception as e:
self.log.info("Could not send signal")
self.log.info("Error was: " + str(e))
return 12
message = None
try:
socks = dict(self.poller.poll(self.socketResponseTimeout))
except Exception as e:
self.log.info("Could not poll for new message")
self.log.info("Error was: " + str(e))
return 13
# if there was a response
if self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
try:
# Get the reply.
message = self.signalSocket.recv()
self.log.info("Received answer to signal: " + str(message) )
except KeyboardInterrupt:
self.log.error("KeyboardInterrupt: No message received")
self.stop()
return 14
except Exception as e:
self.log.info("Could not receive answer to signal")
self.log.debug("Error was: " + str(e))
self.stop()
return 14
# if the response was correct
if message and message.startswith(signal):
self.log.info("Received confirmation ...start receiving files")
if connectionType == "stream":
self.dataSocket = self.context.socket(zmq.PULL)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.bind(connectionStr)
self.log.info("dataSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.streamStarted = True
elif connectionType == "queryNewest":
self.dataSocket = self.context.socket(zmq.REQ)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.connect(connectionStr)
self.log.info("dataSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.queryNewestStarted = True
elif connectionType == "queryMetadata":
self.dataSocket = self.context.socket(zmq.REQ)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.connect(connectionStr)
self.log.info("dataSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.queryMetadataStarted = True
# if there was no response or the response was of the wrong format, the receiver should be shut down
else:
self.log.error("Sending start signal ...failed.")
sys.exit(1)
return 15
return 0
##
#
# Receives or queries for new files depending on the connection initialized
#
# returns either
# the next file
# (if connection type "stream" was choosen)
# the newest file
# (if connection type "queryNewest" was choosen)
# the path of the newest file
# (if connection type "queryMetadata" was choosen)
#
##
def getData(self):
if self.streamStarted:
#run loop, and wait for incoming messages
self.log.debug("Waiting for new messages...")
try:
return self.__getMultipartMessage()
except KeyboardInterrupt:
self.log.debug("Keyboard interrupt detected. Stop receiving.")
return None
except Exception, e:
self.log.error("Unknown error while receiving files. Need to abort.")
self.log.debug("Error was: " + str(e))
return None
elif self.queryNewestStarted or self.queryMetadataStarted:
sendMessage = "NEXT_FILE"
self.log.info("Asking for next file with message " + str(sendMessage))
try:
self.dataSocket.send(sendMessage)
except Exception as e:
self.log.info("Could not send request to dataSocket")
self.log.info("Error was: " + str(e))
return None
try:
# Get the reply.
if self.queryNewestStarted:
message = self.dataSocket.recv_multipart()
else:
message = self.dataSocket.recv()
except Exception as e:
message = ""
self.log.info("Could not receive answer to request")
self.log.info("Error was: " + str(e))
return None
return message
else:
self.log.info("Could not communicate, no connection was initialized.")
return None
def __getMultipartMessage(self):
receivingMessages = True
#save all chunks to file
multipartMessage = self.dataSocket.recv_multipart()
#extract multipart message
try:
#TODO is string conversion needed here?
metadata = str(multipartMessage[0])
except:
self.log.error("An empty config was transferred for the multipart-message.")
#TODO validate multipartMessage (like correct dict-values for metadata)
#extraction metadata from multipart-message
try:
metadataDict = json.loads(metadata)
except Exception as e:
self.log.error("Could not extract metadata from the multipart-message.")
self.log.debug("Error was:" + str(e))
try:
payload = multipartMessage[1:]
except Exception as e:
self.log.warning("An empty file was received within the multipart-message")
self.log.debug("Error was:" + str(e))
payload = None
return [metadata, payload]
##
#
# Send signal that the displayer is quitting, close ZMQ connections, destoying context
#
##
def stop(self):
if self.dataSocket:
if self.streamStarted:
signal = "STOP_LIVE_VIEWER"
elif self.queryNewestStarted:
signal = "STOP_REALTIME_ANALYSIS"
elif self.queryMetadataStarted:
signal = "STOP_DISPLAYER"
self.log.info("Sending Stop Signal")
sendMessage = str(signal) + "," + str(self.hostname) + "," + str(self.dataPort)
try:
self.signalSocket.send (sendMessage)
# Get the reply.
message = self.signalSocket.recv()
self.log.info("Recieved signal: " + message)
except Exception as e:
self.log.info("Could not communicate")
self.log.info("Error was: " + str(e))
try:
if self.signalSocket:
self.log.info("closing signalSocket...")
self.signalSocket.close(linger=0)
self.signalSocket = None
if self.dataSocket:
self.log.info("closing dataSocket...")
self.dataSocket.close(linger=0)
self.dataSocket = None
except Exception as e:
self.log.info("closing ZMQ Sockets...failed.")
self.log.info("Error was: " + str(e))
if not self.externalContext:
try:
if self.context:
self.log.info("closing zmqContext...")
self.context.destroy()
self.context = None
self.log.info("closing zmqContext...done.")
except Exception as e:
self.log.info("closing zmqContext...failed.")
self.log.info("Error was: " + str(e))
def __exit__(self):
self.stop()
def __del__(self):
self.stop()