Skip to content
Snippets Groups Projects
Commit 9201cd9e authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added queryNewest to API

parent 3abaa7bd
No related branches found
No related tags found
No related merge requests found
......@@ -24,7 +24,7 @@ class dataTransferQuery():
log = None
streamStarted = False
queryStarted = False
queryNewestStarted = False
queryMetadataStarted = False
socketResponseTimeout = None
......@@ -99,18 +99,18 @@ class dataTransferQuery():
##
def initConnection(self, connectionType):
supportedConnections = ["stream", "query", "queryMetadata"]
supportedConnections = ["stream", "queryNewest", "queryMetadata"]
if connectionType not in supportedConnections:
self.log.info("Chosen type of connection is not supported.")
return 10
alreadyConnected = self.streamStarted or self.queryStarted or self.queryMetadataStarted
alreadyConnected = self.streamStarted or self.queryNewestStarted or self.queryMetadataStarted
signal = None
if connectionType == "stream" and not alreadyConnected:
signal = "START_LIVE_VIEWER"
elif connectionType == "query" and not alreadyConnected:
elif connectionType == "queryNewest" and not alreadyConnected:
signal = "START_REALTIME_ANALYSIS"
elif connectionType == "queryMetadata" and not alreadyConnected:
signal = "START_DISPLAYER"
......@@ -161,13 +161,45 @@ class dataTransferQuery():
self.log.info("Received confirmation ...start receiving files")
if connectionType == "stream":
self.dataSocket = self.context.socket(zmq.PULL)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.bind(connectionStr)
self.log.info("dataSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.streamStarted = True
elif connectionType == "query":
elif connectionType == "queryNewest":
self.dataSocket = self.context.socket(zmq.REQ)
self.queryStarted = True
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.connect(connectionStr)
self.log.info("dataSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.queryNewestStarted = True
elif connectionType == "queryMetadata":
self.dataSocket = self.context.socket(zmq.REQ)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.connect(connectionStr)
self.log.info("dataSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.queryMetadataStarted = True
# if there was no response or the response was of the wrong format, the receiver should be shut down
......@@ -176,14 +208,6 @@ class dataTransferQuery():
sys.exit(1)
return 15
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort)
try:
self.dataSocket.bind(connectionStr)
self.log.info("dataSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
return 0
......@@ -197,13 +221,15 @@ class dataTransferQuery():
# the next file
# (if connection type "stream" was choosen)
# the newest file
# (if connection type "query" was choosen)
# (if connection type "queryNewest" was choosen)
# the path of the newest file
# (if connection type "queryMetadata" was choosen)
#
##
def getData(self):
if self.streamStarted:
#run loop, and wait for incoming messages
self.log.debug("Waiting for new messages...")
try:
......@@ -216,15 +242,12 @@ class dataTransferQuery():
self.log.debug("Error was: " + str(e))
return None
elif self.queryStarted:
pass
elif self.queryMetadataStarted:
elif self.queryNewestStarted or self.queryMetadataStarted:
sendMessage = "NEXT_FILE"
self.log.info("Asking for next file with message " + str(sendMessage))
try:
self.dataSocket.send (sendMessage)
self.dataSocket.send(sendMessage)
except Exception as e:
self.log.info("Could not send request to dataSocket")
self.log.info("Error was: " + str(e))
......@@ -232,7 +255,10 @@ class dataTransferQuery():
try:
# Get the reply.
message = self.dataSocket.recv()
if self.queryNewestStarted:
message = self.dataSocket.recv_multipart()
else:
message = self.dataSocket.recv()
except Exception as e:
message = ""
self.log.info("Could not receive answer to request")
......@@ -240,6 +266,7 @@ class dataTransferQuery():
return None
return message
else:
self.log.info("Could not communicate, no connection was initialized.")
return None
......@@ -259,7 +286,6 @@ class dataTransferQuery():
self.log.error("An empty config was transferred for the multipart-message.")
#TODO validate multipartMessage (like correct dict-values for metadata)
# self.log.debug("multipartMessage.metadata = " + str(metadata))
#extraction metadata from multipart-message
try:
......@@ -275,9 +301,7 @@ class dataTransferQuery():
self.log.debug("Error was:" + str(e))
payload = None
# self.log.debug("multipartMessage.data = " + str(payload)[:20])
return metadata, payload
return [metadata, payload]
##
......@@ -289,7 +313,7 @@ class dataTransferQuery():
if self.dataSocket:
if self.streamStarted:
signal = "STOP_LIVE_VIEWER"
elif self.queryStarted:
elif self.queryNewestStarted:
signal = "STOP_REALTIME_ANALYSIS"
elif self.queryMetadataStarted:
signal = "STOP_DISPLAYER"
......
import os
import sys
import time
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) )
API_PATH = BASE_PATH + os.sep + "APIs"
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
if not API_PATH in sys.path:
sys.path.append ( API_PATH )
del API_PATH
#if not SHARED_PATH in sys.path:
# sys.path.append ( SHARED_PATH )
#del SHARED_PATH
from dataTransferAPI import dataTransferQuery
#import helperScript
#enable logging
#logfilePath = os.path.join(BASE_PATH + os.sep + "logs")
#logfileFullPath = os.path.join(logfilePath, "testAPI.log")
#helperScript.initLogging(logfileFullPath, True, "DEBUG")
signalIp = "zitpcx19282.desy.de"
signalPort = "50000"
dataPort = "50200"
print
print "==== TEST: Query for the newest filename ===="
print
query = dataTransferQuery( signalPort, signalIp, dataPort )
query.initConnection("queryNewest")
while True:
#for i in range(5):
try:
[metadata, data] = query.getData()
except:
break
print
print "metadata"
print metadata
print "data", str(data)[:10]
print
time.sleep(0.5)
query.stop()
print
print "==== TEST END: Query for the newest filename ===="
print
......@@ -27,14 +27,13 @@ from dataTransferAPI import dataTransferQuery
signalIp = "zitpcx19282.desy.de"
signalPort = "50021"
signalPort = "50000"
dataPort = "50022"
print
print "==== TEST: Stream all files ===="
print
signalPort = "50000"
query = dataTransferQuery( signalPort, signalIp, dataPort )
......@@ -44,7 +43,7 @@ query.initConnection("stream")
while True:
#for i in range(5):
try:
metadata, data = query.getData()
[metadata, data] = query.getData()
except:
break
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment