Skip to content
Snippets Groups Projects
Commit 691dda11 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed polling in dataTransferAPI

parent d369ecf0
No related branches found
No related tags found
No related merge requests found
......@@ -54,6 +54,7 @@ class dataTransfer():
self.signalSocket = None
self.dataSocket = None
self.requestSocket = None
self.poller = zmq.Poller()
self.targets = None
......@@ -169,7 +170,6 @@ class dataTransfer():
raise
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
self.poller = zmq.Poller()
self.poller.register(self.signalSocket, zmq.POLLIN)
......
......@@ -64,7 +64,7 @@ def getMetadata (log, metadata, chunkSize, localTarget = None):
log.debug("fileModTime(%s) = %s" % (sourceFile, str(fileModTime)))
except:
log.error("Unable to create metadata dictionary.", exc_info=True)
log.error("Unable to create metadata dictionary.")
raise
try:
......@@ -83,7 +83,7 @@ def getMetadata (log, metadata, chunkSize, localTarget = None):
log.debug("metadata = " + str(metadata))
except:
log.error("Unable to assemble multi-part message.", exc_info=True)
log.error("Unable to assemble multi-part message.")
raise
return sourceFile, targetFile, metadata
......@@ -129,13 +129,14 @@ def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, c
except:
log.error("Unable to pack multipart-message for file " + str(sourceFile), exc_info=True)
#send message to data targets
try:
__sendToTargets(log, targets_data, sourceFile, targetFile, openConnections, None, chunkPayload, context)
log.debug("Passing multipart-message for file " + str(sourceFile) + " (chunk " + str(chunkNumber) + ")...done.")
if targets_data:
#send message to data targets
try:
__sendToTargets(log, targets_data, sourceFile, targetFile, openConnections, None, chunkPayload, context)
log.debug("Passing multipart-message for file " + str(sourceFile) + " (chunk " + str(chunkNumber) + ")...done.")
except:
log.error("Unable to send multipart-message for file " + str(sourceFile) + " (chunk " + str(chunkNumber) + ")", exc_info=True)
except:
log.error("Unable to send multipart-message for file " + str(sourceFile) + " (chunk " + str(chunkNumber) + ")", exc_info=True)
chunkNumber += 1
......@@ -190,12 +191,13 @@ def finishDataHandling (log, targets, sourceFile, targetFile, metadata, openConn
return
#send message to metadata targets
try:
__sendToTargets(log, targets_metadata, sourceFile, targetFile, openConnections, metadata, None, context)
log.debug("Passing metadata multipart-message for file " + str(sourceFile) + "...done.")
if targets_metadata:
try:
__sendToTargets(log, targets_metadata, sourceFile, targetFile, openConnections, metadata, None, context)
log.debug("Passing metadata multipart-message for file " + str(sourceFile) + "...done.")
except:
log.error("Unable to send metadata multipart-message for file " + str(sourceFile), exc_info=True)
except:
log.error("Unable to send metadata multipart-message for file " + str(sourceFile), exc_info=True)
elif prop["removeFlag"]:
# remove file
......
......@@ -83,12 +83,12 @@ class worker(multiprocessing.Process):
if __name__ == "__main__":
signalHost = "zitpcx19282.desy.de"
# signalHost = "zitpcx19282.desy.de"
# signalHost = "lsdma-lab04.desy.de"
# signalHost = "asap3-bl-prx07.desy.de"
signalHost = "asap3-bl-prx07.desy.de"
# targets = [["asap3-bl-prx07.desy.de", "50101", 1], ["asap3-bl-prx07.desy.de", "50102", 1], ["asap3-bl-prx07.desy.de", "50103", 1]]
targets = [["zitpcx19282.desy.de", "50101", 1], ["zitpcx19282.desy.de", "50102", 1], ["zitpcx19282.desy.de", "50103", 1]]
targets = [["asap3-bl-prx07.desy.de", "50101", 1], ["asap3-bl-prx07.desy.de", "50102", 1], ["asap3-bl-prx07.desy.de", "50103", 1]]
# targets = [["zitpcx19282.desy.de", "50101", 1], ["zitpcx19282.desy.de", "50102", 1], ["zitpcx19282.desy.de", "50103", 1]]
# targets = [["zitpcx19282.desy.de", "50101", 1], ["zitpcx19282.desy.de", "50102", 1], ["zitpcx19282.desy.de", "50103", 1], ["lsdma-lab04.desy.de", "50104", 1]]
# transferType = "queryNext"
......@@ -96,8 +96,8 @@ if __name__ == "__main__":
# transferType = "streamMetadata"
transferType = "queryMetadata"
basePath = BASE_PATH + os.sep + "data" + os.sep + "target"
# basePath = "/asap3/petra3/gpfs/p00/2016/commissioning/c20160205_000_smbtest/"
# basePath = BASE_PATH + os.sep + "data" + os.sep + "target"
basePath = "/asap3/petra3/gpfs/p00/2016/commissioning/c20160205_000_smbtest/"
w1 = multiprocessing.Process(target=worker, args=(0, transferType, basePath, signalHost, "50101"))
w2 = multiprocessing.Process(target=worker, args=(1, transferType, basePath, signalHost, "50102"))
......
......@@ -14,12 +14,11 @@ del BASE_PATH
from dataTransferAPI import dataTransfer
#signalHost = "zitpcx19282.desy.de"
signalHost = "zitpcx22614w.desy.de"
#targets = ["zitpcx19282.desy.de", "50101", 0]
targets = ["zitpcx22614w.desy.de", "50101", 0]
basePath = "/home/kuhnm/Arbeit/zeromq-data-transfer/data/target"
#basePath = "/space/projects/zeromq-data-transfer/data/target"
signalHost = "zitpcx19282.desy.de"
#signalHost = "zitpcx22614w.desy.de"
targets = ["zitpcx19282.desy.de", "50101", 0]
#targets = ["zitpcx22614w.desy.de", "50101", 0]
basePath = BASE_PATH + os.sep + "data" + os.sep + "target"
print
print "==== TEST: Query for the newest filename ===="
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment