Skip to content
Snippets Groups Projects
Commit f6535f51 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed start/stop query

parent 9bd1dfa6
No related branches found
No related tags found
No related merge requests found
......@@ -59,7 +59,7 @@ class dataTransfer():
self.supportedConnections = ["stream", "queryNext"]
self.signalExchanged = False
self.signalExchanged = None
self.streamStarted = None
self.queryNextStarted = None
......@@ -140,7 +140,7 @@ class dataTransfer():
# if there was no response or the response was of the wrong format, the receiver should be shut down
elif message and message.startswith(signal):
self.log.info("Received confirmation ...")
self.signalExchanged = True
self.signalExchanged = signal
else:
raise Exception("Sending start signal ...failed.")
......@@ -169,6 +169,9 @@ class dataTransfer():
def __sendSignal (self, signal):
if not signal:
return
# Send the signal that the communication infrastructure should be established
self.log.info("Sending Signal")
......@@ -483,12 +486,17 @@ class dataTransfer():
#
##
def stop (self):
if self.dataSocket and self.signalExchanged:
if self.streamStarted:
if self.signalSocket and self.signalExchanged:
self.log.info("Sending close signal")
signal = None
if self.streamStarted or ( "STREAM" in self.signalExchanged):
signal = "STOP_STREAM"
elif self.queryNextStarted:
elif self.queryNextStarted or ( "QUERY_NEXT" in self.signalExchanged):
signal = "STOP_QUERY_NEXT"
self.log.debug("signal=" + str(signal))
message = self.__sendSignal(signal)
#TODO need to check correctness of signal?
......
......@@ -193,6 +193,7 @@ class SignalHandler():
incomingMessage = self.requestSocket.recv_multipart()
self.log.debug("Received request: " + str(incomingMessage) )
# self.log.debug("self.allowedQueries:" + str(self.allowedQueries))
for index in range(len(self.allowedQueries)):
for i in range(len(self.allowedQueries[index])):
......@@ -246,7 +247,7 @@ class SignalHandler():
def sendResponse (self, signal):
self.log.debug("send confirmation back to receiver: " + str(signal) )
self.log.debug("Send response back: " + str(signal))
self.comSocket.send(signal, zmq.NOBLOCK)
......@@ -303,10 +304,14 @@ class SignalHandler():
socketId = socketConf[0]
if socketId in [ i[0] for i in self.allowedQueries]:
self.log.debug("socketId: " + str(socketId))
# self.log.debug("self.allowedQueries: " + str(self.allowedQueries))
flatlist = [ i[0] for i in [j for sublist in self.allowedQueries for j in sublist]]
self.log.debug("flatlist: " + str(flatlist))
if socketId in flatlist:
connectionFound = True
self.log.info("Connection to " + str(socketId) + " is already open")
self.sendResponse("CONNECTION_ALREADY_OPEN")
elif socketId not in [ i[0] for i in tmpAllowed]:
tmpAllowed.append(socketConf)
else:
......@@ -316,10 +321,14 @@ class SignalHandler():
if not connectionFound:
# send signal back to receiver
self.sendResponse(signal)
self.allowedQueries.append(sorted(tmpAllowed))
self.allowedQueries.append(copy.deepcopy(sorted(tmpAllowed)))
del tmpAllowed
self.openRequVari.append([])
self.log.debug("Send response back: " + str(signal))
else:
# send error back to receiver
signal = "CONNECTION_ALREADY_OPEN"
self.sendResponse(signal)
return
......@@ -327,13 +336,25 @@ class SignalHandler():
self.log.info("Received signal to disable querying for data for hosts: " + str(socketIds))
connectionNotFound = False
tmpRemoveIndex = []
tmpRemoveElement = []
found = False
for socketConf in socketIds:
if ".desy.de:" in socketConf[0]:
socketConf[0] = socketConf[0].replace(".desy.de:",":")
socketId = socketConf[0]
for i in range(len(self.allowedQueries)):
for j in range(len(self.allowedQueries[i])):
if socketId == self.allowedQueries[i][j][0]:
tmpRemoveIndex.append([i,j])
# for i in range(len(self.allowedQueries)):
# for j in range(len(self.allowedQueries[i])):
# if socketId == self.allowedQueries[i][j][0]:
# tmpRemoveIndex.append([i,j])
# tmpRemoveElement.append(self.allowedQueries[i][j])
# found = True
for sublist in self.allowedQueries:
for element in sublist:
if socketId == element[0]:
tmpRemoveElement.append(element)
found = True
if not found:
connectionNotFound = True
......@@ -344,17 +365,34 @@ class SignalHandler():
else:
# send signal back to receiver
self.sendResponse(signal)
self.log.debug("Send response back: " + str(signal))
for i, j in tmpRemoveIndex:
self.log.debug("Remove " + str(self.allowedQueries[i][j]) + " from allowedQueries.")
socketId = self.allowedQueries[i].pop(j)[0]
for element in tmpRemoveElement:
socketId = element[0]
self.openRequVari = [ [ b for b in self.openRequVari[a] if socketId != b[0] ] for a in range(len(self.openRequVari)) ]
self.log.debug("Remove all occurences from " + str(socketId) + " from openRequVari.")
if not self.allowedQueries[i]:
del self.allowedQueries[i]
del self.openRequVari[i]
for i in range(len(self.allowedQueries)):
sublist.remove(element)
self.log.debug("Remove " + str(socketId) + " from allowedQueries.")
if not self.allowedQueries[i]:
del self.allowedQueries[i]
del self.openRequVari[i]
# for i, j in tmpRemoveIndex:
# self.log.debug("i=" + str(i) + ", j=" + str(j))
# self.log.debug("self.allowedQueries: " + str(self.allowedQueries))
# self.log.debug("Remove " + str(self.allowedQueries[i][j]) + " from allowedQueries.")
# socketId = self.allowedQueries[i][j][0]
#
# self.openRequVari = [ [ b for b in self.openRequVari[a] if socketId != b[0] ] for a in range(len(self.openRequVari)) ]
# self.log.debug("Remove all occurences from " + str(socketId) + " from openRequVari.")
#
# if not self.allowedQueries[i]:
# del self.allowedQueries[i]
# del self.openRequVari[i]
return
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment