Skip to content
Snippets Groups Projects
Commit 29a0443a authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed multiple streams for queryNext

parent 0f765851
No related branches found
No related tags found
No related merge requests found
......@@ -221,10 +221,10 @@ class dataTransfer():
port = str(dataPort)
elif type(self.dataPort) != list:
port = self.dataPort
ip = "0.0.0.0" #TODO use IP of hostname?
else:
raise Exception("Multipe possible ports. Please choose which one to use.")
ip = "0.0.0.0" #TODO use IP of hostname?
signal = None
if self.connectionType in ["priorityStream", "stream"]:
......
......@@ -257,8 +257,17 @@ class FileMover():
else:
version, signal, host, port = incomingMessage
host = host.split(',')
port = port.split(',')
if host.startswith("["):
# remove "['" and "']" at the beginning and the end
host = host[2:-2].split("', '")
else:
host = [host]
if port.startswith("["):
port = port[2:-2].split("', '")
else:
port = [port]
if version:
if helperScript.checkVersion(version, self.log):
......@@ -273,7 +282,7 @@ class FileMover():
# Checking signal sending host
self.log.debug("Check if signal sending host is in WhiteList...")
if helperScript.checkHost(host, self.receiverWhiteList, self.log):
self.log.debug("One of the hosts is allowed to connect.")
self.log.debug("Hosts are allowed to connect.")
self.log.debug("hosts: " + str(host))
else:
self.log.debug("One of the hosts is not allowed to connect.")
......@@ -282,12 +291,13 @@ class FileMover():
return False, None, None, None
if signal in ["START_QUERY_NEXT", "STOP_QUERY_NEXT"]:
if len(host) != self.parallelDataStreams:
if type(host) == list and len(host) != self.parallelDataStreams:
self.log.debug("Not enough hosts specified.")
self.sendResponse("INCORRECT_NUMBER_OF_HOSTS")
return False, None, None, None
if len(port) != self.parallelDataStreams:
if type(port) == list and len(port) != self.parallelDataStreams:
self.log.debug("Not enough ports specified.")
self.sendResponse("INCORRECT_NUMBER_OF_PORTS")
return False, None, None, None
......@@ -348,7 +358,6 @@ class FileMover():
elif signal == "STOP_QUERY_NEXT" or signal == "STOP_REALTIME_ANALYSIS":
self.log.info("Received signal from host " + str(host) + " to disable querying for data")
print self.openConnections["queryNext"]
if [host, port] in self.openConnections["queryNext"]:
self.openConnections["queryNext"].remove([host, port])
# send signal to workerProcesses and back to receiver
......
......@@ -228,8 +228,8 @@ class LiveViewCommunicator:
else:
version, signal, host, port = message
host = host.split(',')
port = port.split(',')
host = host[2:-2].split("', '")
port = port[2:-2].split("', '")
if version:
if helperScript.checkVersion(version, self.log):
......@@ -244,7 +244,7 @@ class LiveViewCommunicator:
# Checking signal sending host
self.log.debug("Check if hosts is in WhiteList...")
if helperScript.checkHost(host, self.receiverWhiteList, self.log):
self.log.debug("One of the hosts is allowed to connect.")
self.log.debug("Hosts are allowed to connect.")
self.log.debug("hosts: " + str(host))
else:
self.log.debug("One of the hosts is not allowed to connect.")
......
......@@ -15,9 +15,8 @@ from dataTransferAPI import dataTransfer
signalHost = "zitpcx19282.desy.de"
#signalHost = "zitpcx22614.desy.de"
#isignalHost = "zitpcx22614.desy.de"
dataPort = "50201"
#dataPort = ["50201", "50202"]
print
print "==== TEST: Query for the newest filename ===="
......
import os
import sys
import time
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) )
API_PATH = BASE_PATH + os.sep + "APIs"
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
if not API_PATH in sys.path:
sys.path.append ( API_PATH )
del API_PATH
from dataTransferAPI import dataTransfer
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
import helperScript
#enable logging
logfilePath = os.path.join(BASE_PATH + os.sep + "logs")
logfileFullPath = os.path.join(logfilePath, "testAPI.log")
helperScript.initLogging(logfileFullPath, True, "DEBUG")
del BASE_PATH
signalHost = "zitpcx19282.desy.de"
#isignalHost = "zitpcx22614.desy.de"
dataPort = ["50205", "50206"]
print
print "==== TEST: Query for the newest filename ===="
print
query = dataTransfer(signalHost, useLog = True)
query.initiate("queryNext", dataPort)
query.start(50205)
while True:
try:
[metadata, data] = query.get()
except:
break
print
print "metadata"
print metadata
print "data", str(data)[:10]
print
time.sleep(0.1)
query.stop()
print
print "==== TEST END: Query for the newest filename ===="
print
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment