Skip to content
Snippets Groups Projects
Commit 73fb279f authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed API for queryNext

parent 3cf779fc
No related branches found
No related tags found
No related merge requests found
......@@ -225,7 +225,7 @@ class dataTransfer():
return message
def start(self, dataPort = False, requestHost = None):
def start(self, dataSocket = False, requestHost = None):
# if not self.connectionType:
# raise Exception("No connection specified. Please initiate a connection first.")
......@@ -239,11 +239,15 @@ class dataTransfer():
ip = "0.0.0.0" #TODO use IP of hostname?
if dataPort:
if type(dataPort) == list:
socketId = dataPort[0] + ":" + dataPort[1]
host = ""
port = ""
if dataSocket:
if type(dataSocket) == list:
socketId = dataSocket[0] + ":" + dataSocket[1]
host = dataSocket[0]
else:
socketId = ip + ":" + str(dataPort)
socketId = ip + ":" + str(dataSocket)
host = socket.gethostname()
elif len(self.targets) == 1:
host, port = self.targets[0][0].split(":")
# ipFromHost = socket.gethostbyaddr(host)[2]
......@@ -268,14 +272,14 @@ class dataTransfer():
self.requestSocket = self.context.socket(zmq.PUSH)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + socketId
connectionStr = "tcp://" + self.signalHost + ":" + self.requestPort
try:
self.requestSocket.connect(connectionStr)
self.log.info("Socket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start Socket of type " + self.connectionType + " (connect): '" + connectionStr + "'", exc_info=True)
self.queryNextStarted = socketId
self.queryNextStarted = host + ":" + port
else:
self.streamStarted = socketId
......@@ -336,6 +340,7 @@ class dataTransfer():
metadata = cPickle.loads(multipartMessage[0])
except:
self.log.error("Could not extract metadata from the multipart-message.", exc_info=True)
metadata = None
#TODO validate multipartMessage (like correct dict-values for metadata)
......
......@@ -256,7 +256,7 @@ class checkModTime(threading.Thread):
self.log.debug("checkLastModified-" + str(threadName) + " eventMessageList" + str(eventMessageList))
eventMessageList.append(eventMessage)
eventListToObserve.remove(filepath)
self.log.debug("checkLastModified-" + str(threadName) + " eventMessageLi (from modify)st" + str(eventMessageList))
self.log.debug("checkLastModified-" + str(threadName) + " eventMessageList" + str(eventMessageList))
self.lock.release()
else:
self.log.debug("File was last modified " + str(timeCurrent - timeLastModified) + \
......
import os
import sys
import time
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) )
API_PATH = BASE_PATH + os.sep + "APIs"
if not API_PATH in sys.path:
sys.path.append ( API_PATH )
del API_PATH
del BASE_PATH
from dataTransferAPI import dataTransfer
signalIp = "zitpcx19282.desy.de"
#signalIp = "zitpcx22614.desy.de"
dataPort = "50200"
print
print "==== TEST: OnDA ===="
print
query = dataTransfer( signalIp, dataPort )
query.start("OnDA")
while True:
try:
[metadata, data] = query.get()
except:
break
print
print "metadata"
print metadata
print "data", str(data)[:10]
print
time.sleep(0.1)
query.stop()
print
print "==== TEST END: OnDA ===="
print
......@@ -16,7 +16,7 @@ from dataTransferAPI import dataTransfer
signalHost = "zitpcx19282.desy.de"
#isignalHost = "zitpcx22614.desy.de"
dataPort = "50201"
targets = ["zitpcx19282.desy.de", "50101", 0]
print
print "==== TEST: Query for the newest filename ===="
......@@ -24,7 +24,7 @@ print
query = dataTransfer("queryNext", signalHost)
query.initiate(dataPort)
query.initiate(targets)
query.start()
......@@ -35,11 +35,10 @@ while True:
break
print
print "metadata"
print metadata
print "metadata", metadata["filename"]
print "data", str(data)[:10]
print
time.sleep(0.1)
# time.sleep(0.1)
query.stop()
......
import os
import sys
import time
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) )
API_PATH = BASE_PATH + os.sep + "APIs"
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
if not API_PATH in sys.path:
sys.path.append ( API_PATH )
del API_PATH
from dataTransferAPI import dataTransfer
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
import helperScript
#enable logging
logfilePath = os.path.join(BASE_PATH + os.sep + "logs")
logfileFullPath = os.path.join(logfilePath, "testAPI.log")
helperScript.initLogging(logfileFullPath, True, "DEBUG")
del BASE_PATH
signalHost = "zitpcx19282.desy.de"
#isignalHost = "zitpcx22614.desy.de"
dataPort = ["50205", "50206"]
print
print "==== TEST: Query for the newest filename ===="
print
query = dataTransfer("queryNext", signalHost, useLog = True)
query.initiate(dataPort)
query.start(50205)
while True:
try:
[metadata, data] = query.get()
except:
break
print
print "metadata"
print metadata
print "data", str(data)[:10]
print
time.sleep(0.1)
query.stop()
print
print "==== TEST END: Query for the newest filename ===="
print
import os
import sys
import time
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) )
API_PATH = BASE_PATH + os.sep + "APIs"
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
if not API_PATH in sys.path:
sys.path.append ( API_PATH )
del API_PATH
from dataTransferAPI import dataTransfer
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
import helperScript
#enable logging
logfilePath = os.path.join(BASE_PATH + os.sep + "logs")
logfileFullPath = os.path.join(logfilePath, "testAPI2.log")
helperScript.initLogging(logfileFullPath, True, "DEBUG")
del BASE_PATH
signalHost = "zitpcx19282.desy.de"
#isignalHost = "zitpcx22614.desy.de"
#dataPort = ["50205", "50206"]
print
print "==== TEST: Query for the newest filename ===="
print
query = dataTransfer("queryNext", signalHost, useLog = True)
query.start(50206)
while True:
try:
[metadata, data] = query.get()
except:
break
print
print "metadata"
print metadata
print "data", str(data)[:10]
print
time.sleep(0.1)
query.stop()
print
print "==== TEST END: Query for the newest filename ===="
print
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment