Skip to content
Snippets Groups Projects
Commit 42468bf3 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Separated signal sending from socket creating

parent c19908be
No related branches found
No related tags found
No related merge requests found
......@@ -14,10 +14,10 @@ class dataTransfer():
context = None
externalContext = True
signalIp = None
signalHost = None
signalPort = None
dataIp = None
hostname = None
dataHost = None
dataPort = None
signalSocket = None
......@@ -25,6 +25,7 @@ class dataTransfer():
log = None
connectionType = None
supportedConnections = ["priorityStream", "stream", "queryNext", "OnDA", "queryMetadata"]
signalPort_MetadataOnly = "50021"
......@@ -38,7 +39,8 @@ class dataTransfer():
socketResponseTimeout = None
def __init__(self, signalIp, dataPort, dataIp = "0.0.0.0", useLog = False, context = None):
def __init__(self, signalHost, useLog = False, context = None):
if useLog:
self.log = logging.getLogger("dataTransferAPI")
......@@ -65,157 +67,81 @@ class dataTransfer():
self.context = zmq.Context()
self.externalContext = False
self.signalIp = signalIp
self.dataIp = dataIp
self.hostname = socket.gethostname()
self.dataPort = dataPort
self.signalHost = signalHost
self.socketResponseTimeout = 1000
##
#
# Initailizes the signal and data transfer sockets and
# send a signal which the kind of connection to be established
#
# Returns 0 if the connection could be initializes without errors
# Error Codes are:
# 10 if the connection type is not supported
# 11 if there is already one connection running
# 12 Could not send signal
# 13 Could not poll new for new message
# 14 Could not receive answer to signal
# 15 if the response was not correct
#
##
def start(self, connectionType):
def initiate(self, connectionType, dataPort, dataHost = False):
if connectionType not in self.supportedConnections:
if connectionType in self.supportedConnections:
self.connectionType = connectionType
else:
raise Exception("Chosen type of connection is not supported.")
alreadyConnected = self.streamStarted or self.queryNextStarted or self.ondaStarted or self.queryMetadataStarted or self.prioStreamStarted
signal = None
if connectionType == "priorityStream" and not alreadyConnected:
self.dataSocket = self.context.socket(zmq.PULL)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.bind(connectionStr)
self.log.info("dataSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.prioStreamStarted = True
if dataHost:
self.dataHost = dataHost
elif type(dataPort) == list:
self.dataHost = str([socket.gethostname() for i in dataPort])
else:
self.dataHost = socket.gethostname()
if connectionType == "stream" and not alreadyConnected:
signalPort = self.signalPort_data
signal = "START_STREAM"
elif connectionType == "queryNext" and not alreadyConnected:
signalPort = self.signalPort_data
signal = "START_QUERY_NEXT"
elif connectionType == "OnDA" and not alreadyConnected:
signalPort = self.signalPort_data
signal = "START_REALTIME_ANALYSIS"
elif connectionType == "queryMetadata" and not alreadyConnected:
signalPort = self.signalPort_MetadataOnly
signal = "START_DISPLAYER"
else:
raise Exception("Other connection type already running.\
More than one connection type is currently not supported.")
self.dataPort = str(dataPort)
self.__creatSignalSocket(signalPort)
message = self.__sendSignal(signal)
if message and message == "VERSION_CONFLICT":
self.stop()
raise Exception("Versions are conflicting.")
if message and message == "NO_VALID_HOST":
self.stop()
raise Exception("Host is not allowed to connect.")
elif message and message == "NO_VALID_SIGNAL":
self.stop()
raise Exception("Connection type is not supported for this kind of sender.")
# if the response was correct
elif message and message.startswith(signal):
self.log.info("Received confirmation ...start receiving files")
if connectionType == "stream":
self.dataSocket = self.context.socket(zmq.PULL)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.bind(connectionStr)
self.log.info("Socket of type " + connectionType + " started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start Socket of type " + connectionType + " (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.streamStarted = True
elif connectionType == "queryNext":
self.dataSocket = self.context.socket(zmq.REQ)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.bind(connectionStr)
self.log.info("Socket of type " + connectionType + " started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start Socket of type " + connectionType + " (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.queryNextStarted = True
elif connectionType == "OnDA":
self.dataSocket = self.context.socket(zmq.REQ)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.connect(connectionStr)
self.log.info("Socket of type " + connectionType + " started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start Socket of type " + connectionType + " (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.ondaStarted = True
elif connectionType == "queryMetadata":
self.dataSocket = self.context.socket(zmq.REQ)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.bind(connectionStr)
self.log.info("Socket of type " + connectionType + " started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start Socket of type " + connectionType + " (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.queryMetadataStarted = True
# if there was no response or the response was of the wrong format, the receiver should be shut down
else:
raise Exception("Sending start signal ...failed.")
signal = None
# Signal exchange
if self.connectionType == "priorityStream":
# for a priority stream not signal has to be exchanged,
# this has to be configured at the sender
return
if self.connectionType == "stream":
signalPort = self.signalPort_data
signal = "START_STREAM"
elif self.connectionType == "queryNext":
signalPort = self.signalPort_data
signal = "START_QUERY_NEXT"
elif self.connectionType == "OnDA":
signalPort = self.signalPort_data
signal = "START_REALTIME_ANALYSIS"
elif self.connectionType == "queryMetadata":
signalPort = self.signalPort_MetadataOnly
signal = "START_DISPLAYER"
self.log.debug("create socket for signal exchange...")
self.__createSignalSocket(signalPort)
message = self.__sendSignal(signal)
if message and message == "VERSION_CONFLICT":
self.stop()
raise Exception("Versions are conflicting.")
elif message and message == "NO_VALID_HOST":
self.stop()
raise Exception("Host is not allowed to connect.")
elif message and message == "INCORRECT_NUMBER_OF_HOSTS":
self.stop()
raise Exception("Specified number of hosts not working with the number of streams configured in the sender.")
elif message and message == "INCORRECT_NUMBER_OF_PORTS":
self.stop()
raise Exception("Specified number of ports not working with the number of streams configured in the sender.")
elif message and message == "NO_VALID_SIGNAL":
self.stop()
raise Exception("Connection type is not supported for this kind of sender.")
# if there was no response or the response was of the wrong format, the receiver should be shut down
elif message and message.startswith(signal):
self.log.info("Received confirmation ...")
return 0
else:
raise Exception("Sending start signal ...failed.")
def __creatSignalSocket(self, signalPort):
def __createSignalSocket(self, signalPort):
# To send a notification that a Displayer is up and running, a communication socket is needed
# create socket to exchange signals with Sender
......@@ -223,7 +149,7 @@ class dataTransfer():
# time to wait for the sender to give a confirmation of the signal
# self.signalSocket.RCVTIMEO = self.socketResponseTimeout
connectionStr = "tcp://" + str(self.signalIp) + ":" + str(signalPort)
connectionStr = "tcp://" + str(self.signalHost) + ":" + str(signalPort)
try:
self.signalSocket.connect(connectionStr)
self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
......@@ -241,8 +167,8 @@ class dataTransfer():
# Send the signal that the communication infrastructure should be established
self.log.info("Sending Signal")
sendMessage = [__version__, signal, self.hostname, self.dataPort]
# sendMessage = str(signal) + "," + str(__version__) + "," + str(self.hostname) + "," + str(self.dataPort)
sendMessage = [__version__, signal, self.dataHost, self.dataPort]
# sendMessage = str(signal) + "," + str(__version__) + "," + str(self.dataHost) + "," + str(self.dataPort)
self.log.debug("Signal: " + str(sendMessage))
try:
self.signalSocket.send_multipart(sendMessage)
......@@ -280,6 +206,76 @@ class dataTransfer():
return message
def start(self, dataPort = False):
# if not self.connectionType:
# raise Exception("No connection specified. Please initiate a connection first.")
alreadyConnected = self.streamStarted or self.queryNextStarted or self.ondaStarted or self.queryMetadataStarted or self.prioStreamStarted
if alreadyConnected:
raise Exception("Connection already started.")
if dataPort:
port = str(dataPort)
elif type(self.dataPort) != list:
port = self.dataPort
ip = "0.0.0.0" #TODO use IP of hostname?
else:
raise Exception("Multipe possible ports. Please choose which one to use.")
signal = None
if self.connectionType in ["priorityStream", "stream"]:
self.dataSocket = self.context.socket(zmq.PULL)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + ip + ":" + port
try:
self.dataSocket.bind(connectionStr)
self.log.info("Socket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start Socket of type " + self.connectionType + " (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
if self.connectionType == "priorityStream":
self.prioStreamStarted = True
else:
self.streamStarted = True
elif self.connectionType in ["queryNext", "queryMetadata"]:
self.dataSocket = self.context.socket(zmq.REQ)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + ip + ":" + port
try:
self.dataSocket.bind(connectionStr)
self.log.info("Socket of type " + self.connectionType + " started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start Socket of type " + self.connectionType + " (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
if self.connectionType == "queryNext":
self.queryNextStarted = True
else:
self.queryMetadataStarted = True
elif self.connectionType == "OnDA":
self.dataSocket = self.context.socket(zmq.REQ)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + ip + ":" + port
try:
self.dataSocket.connect(connectionStr)
self.log.info("Socket of type " + self.connectionType + " started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start Socket of type " + self.connectionType + " (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.ondaStarted = True
##
#
# Receives or queries for new files depending on the connection initialized
......
......@@ -14,17 +14,18 @@ del BASE_PATH
from dataTransferAPI import dataTransfer
signalIp = "zitpcx19282.desy.de"
#signalIp = "zitpcx22614.desy.de"
signalHost = "zitpcx19282.desy.de"
#signalHost = "zitpcx22614.desy.de"
dataPort = "50022"
print
print "==== TEST: Query for the newest filename ===="
print
query = dataTransfer( signalIp, dataPort )
query = dataTransfer(signalHost)
query.start("queryMetadata")
query.initiate("queryMetadata", dataPort)
query.start()
while True:
message = query.get()
......
......@@ -14,17 +14,19 @@ del BASE_PATH
from dataTransferAPI import dataTransfer
signalIp = "zitpcx19282.desy.de"
#signalIp = "zitpcx22614.desy.de"
signalHost = "zitpcx19282.desy.de"
#signalHost = "zitpcx22614.desy.de"
dataPort = "50201"
#dataPort = ["50201", "50202"]
print
print "==== TEST: Query for the newest filename ===="
print
query = dataTransfer( signalIp, dataPort )
query = dataTransfer(signalHost)
query.start("queryNext")
query.initiate("queryNext", dataPort)
query.start()
while True:
try:
......
......@@ -30,8 +30,8 @@ helperScript.initLogging(logfileFullPath, True, "DEBUG")
del BASE_PATH
signalIp = "zitpcx19282.desy.de"
#signalIp = "zitpcx22614.desy.de"
signalHost = "zitpcx19282.desy.de"
#signalHost = "zitpcx22614.desy.de"
dataPort = "50100"
print
......@@ -39,9 +39,10 @@ print "==== TEST: Stream all files and store them ===="
print
query = dataTransfer( signalIp, dataPort, useLog = True )
query = dataTransfer(signalHost, useLog = True)
query.start("stream")
query.initiate("stream", dataPort)
query.start()
while True:
......
......@@ -14,8 +14,8 @@ del BASE_PATH
from dataTransferAPI import dataTransfer
signalIp = "zitpcx19282.desy.de"
#signalIp = "zitpcx22614.desy.de"
signalHost = "zitpcx19282.desy.de"
#signalHost = "zitpcx22614.desy.de"
dataPort = "50100"
print
......@@ -23,9 +23,11 @@ print "==== TEST: Stream all files ===="
print
query = dataTransfer( signalIp, dataPort )
query = dataTransfer(signalHost)
query.start("stream")
query.initiate("stream", dataPort)
query.start()
while True:
......@@ -37,7 +39,7 @@ while True:
print
print "metadata"
print metadata
print "data", str(data)[:10]
# print "data", str(data)[:10]
print
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment