Skip to content
Snippets Groups Projects
Commit c1bab564 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added option for timeout + send cancel signal if data receiving takes too long

parent 766ea89d
No related branches found
No related tags found
No related merge requests found
......@@ -272,6 +272,7 @@ class dataTransfer():
except:
self.log.error("Failed to start Socket of type " + self.connectionType + " (bind): '" + connectionStr + "'", exc_info=True)
self.poller.register(self.dataSocket, zmq.POLLIN)
if self.connectionType in ["queryNext", "queryMetadata"]:
......@@ -294,71 +295,74 @@ class dataTransfer():
# Receives or queries for new files depending on the connection initialized
#
# returns either
# the next file
# (if connection type "stream" was choosen)
# the newest file
# (if connection type "queryNext" was choosen)
# (if connection type "queryNext" or "stream" was choosen)
# the path of the newest file
# (if connection type "queryMetadata" was choosen)
# (if connection type "queryMetadata" or "streamMetadata" was choosen)
#
##
def get (self):
def get (self, timeout=None):
if not self.streamStarted and not self.queryNextStarted:
self.log.info("Could not communicate, no connection was initialized.")
return None, None
if self.queryNextStarted :
sendMessage = ["NEXT", self.queryNextStarted]
# self.log.debug("Asking for next file with message " + str(sendMessage))
try:
self.requestSocket.send_multipart(sendMessage)
except Exception as e:
self.log.error("Could not send request to requestSocket", exc_info=True)
return None, None
# receive data
try:
return self.__getMultipartMessage()
except KeyboardInterrupt:
self.log.debug("Keyboard interrupt detected. Stopping to receive.")
raise
socks = dict(self.poller.poll(timeout))
except:
self.log.error("Unknown error while receiving files. Need to abort.", exc_info=True)
return None, None
self.log.error("Could not poll for new message", exc_info=True)
raise
# if there was a response
if self.dataSocket in socks and socks[self.dataSocket] == zmq.POLLIN:
def __getMultipartMessage (self):
try:
multipartMessage = self.dataSocket.recv_multipart()
except:
self.log.error("Receiving files..failed.")
return [None, None]
#save all chunks to file
try:
multipartMessage = self.dataSocket.recv_multipart()
except:
self.log.error("Receiving files..failed.")
return [None, None]
if len(multipartMessage) < 2:
self.log.error("Received mutipart-message is too short. Either config or file content is missing.")
self.log.debug("multipartMessage=" + str(mutipartMessage))
return [None, None]
# extract multipart message
try:
metadata = cPickle.loads(multipartMessage[0])
except:
self.log.error("Could not extract metadata from the multipart-message.", exc_info=True)
metadata = None
if len(multipartMessage) < 2:
self.log.error("Received mutipart-message is too short. Either config or file content is missing.")
self.log.debug("multipartMessage=" + str(mutipartMessage))
#TODO validate multipartMessage (like correct dict-values for metadata)
#extract multipart message
try:
metadata = cPickle.loads(multipartMessage[0])
except:
self.log.error("Could not extract metadata from the multipart-message.", exc_info=True)
metadata = None
try:
payload = multipartMessage[1:]
except:
self.log.warning("An empty file was received within the multipart-message", exc_info=True)
payload = None
#TODO validate multipartMessage (like correct dict-values for metadata)
return [metadata, payload]
else:
self.log.warning("Could not receive data in the given time.")
try:
payload = multipartMessage[1:]
except:
self.log.warning("An empty file was received within the multipart-message", exc_info=True)
payload = None
if self.queryNextStarted :
try:
self.requestSocket.send_multipart(["CANCEL", self.queryNextStarted])
except Exception as e:
self.log.error("Could not cancel the next query", exc_info=True)
return [metadata, payload]
return [None, None]
def store (self, targetBasePath, dataObject):
......
......@@ -219,17 +219,32 @@ class SignalHandler():
incomingMessage = self.requestSocket.recv_multipart()
self.log.debug("Received request: " + str(incomingMessage) )
for index in range(len(self.allowedQueries)):
for i in range(len(self.allowedQueries[index])):
if incomingMessage[0] == "NEXT":
if ".desy.de:" in incomingMessage[1]:
incomingMessage[1] = incomingMessage[1].replace(".desy.de:", ":")
if ".desy.de:" in incomingMessage[1]:
incomingMessage[1] = incomingMessage[1].replace(".desy.de:", ":")
incomingSocketId = incomingMessage[1]
incomingSocketId = incomingMessage[1]
for index in range(len(self.allowedQueries)):
for i in range(len(self.allowedQueries[index])):
if incomingSocketId == self.allowedQueries[index][i][0]:
self.openRequVari[index].append(self.allowedQueries[index][i])
self.log.debug("Add to openRequVari: " + str(self.allowedQueries[index][i]) )
elif incomingMessage[0] == "CANCEL":
if ".desy.de:" in incomingMessage[1]:
incomingMessage[1] = incomingMessage[1].replace(".desy.de:", ":")
incomingSocketId = incomingMessage[1]
self.openRequVari = [ [ b for b in self.openRequVari[a] if incomingSocketId != b[0] ] for a in range(len(self.openRequVari)) ]
self.log.debug("Remove all occurences from " + str(incomingSocketId) + " from variable request list.")
else:
self.log.debug("Request not supported.")
if incomingSocketId == self.allowedQueries[index][i][0]:
self.openRequVari[index].append(self.allowedQueries[index][i])
self.log.debug("Add to openRequVari: " + str(self.allowedQueries[index][i]) )
if self.controlSocket in socks and socks[self.controlSocket] == zmq.POLLIN:
......
......@@ -36,6 +36,7 @@ class LiveView(QThread):
zmqQuery = None
# zmqSignalIp = "haspp11eval01.desy.de"
zmqSignalIp = "zitpcx19282.desy.de"
# zmqSignalIp = "zitpcx22614w.desy.de"
zmqDataPort = "50022"
basePath = BASE_PATH + os.sep + "data" + os.sep + "target"
......@@ -52,6 +53,8 @@ class LiveView(QThread):
self.zmqQuery = dataTransfer( "queryNext", self.zmqSignalIp )
self.zmqQuery.initiate([socket.gethostname(), self.zmqDataPort, "1"])
self.zmqQuery.start(self.zmqDataPort)
# self.zmqQuery.initiate(["zitpcx22164w", self.zmqDataPort, "1"])
# self.zmqQuery.start(["zitpcx22614w", self.zmqDataPort])
self.mutex = QMutex()
......@@ -91,7 +94,7 @@ class LiveView(QThread):
self.mutex.lock()
# get latest file from reveiver
[metadata, data] = self.zmqQuery.get()
[metadata, data] = self.zmqQuery.get(1)
receivedFile = self.zmqQuery.generateTargetFilepath(self.basePath, metadata)
print "Next file: ", receivedFile
......@@ -174,16 +177,16 @@ if __name__ == '__main__':
print "LiveViewer stop"
lv.stop()
time.sleep(1)
# time.sleep(1)
#
# try:
# print "LiveViewer start"
# lv.start()
try:
print "LiveViewer start"
lv.start()
time.sleep(2)
finally:
print "LiveViewer stop"
lv.stop()
# time.sleep(2)
# finally:
# print "LiveViewer stop"
# lv.stop()
del LiveView
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment