Skip to content
Snippets Groups Projects
Commit e10d87f5 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed problems with prio in SignalHandler/TaskProvider

parent a3dc9359
No related branches found
No related tags found
No related merge requests found
......@@ -142,11 +142,11 @@ class SignalHandler():
openRequests.append(tmp)
if openRequests:
self.requestFwSocket.send(str(openRequests))
self.requestFwSocket.send(pickle.dumps(openRequests))
self.log.debug("Answered to request: " + str(openRequests))
else:
openRequests = ["None"]
self.requestFwSocket.send(str(openRequests))
self.requestFwSocket.send(pickle.dumps(openRequests))
self.log.debug("Answered to request: " + str(openRequests))
except Exception, e:
self.log.error("Failed to receive/answer new signal requests.")
......@@ -163,8 +163,6 @@ class SignalHandler():
self.log.debug("Received signal: " + str(incomingMessage) )
checkStatus, signal, target = self.checkSignal(incomingMessage)
print "signal", signal
print "target", target
if checkStatus:
self.reactToSignal(signal, target)
......@@ -177,14 +175,18 @@ class SignalHandler():
incomingMessage = self.requestSocket.recv_multipart()
self.log.debug("Received request: " + str(incomingMessage) )
print "allowedQueries", self.allowedQueries
print " self.openRequVari", self.openRequVari
# print "allowedQueries", self.allowedQueries
for index in range(len(self.allowedQueries)):
if incomingMessage[1] in self.allowedQueries[index]:
print "allowedQueries[index]", self.allowedQueries[index]
tmp = [incomingMessage[1], self.allowedQueries[index][1]]
self.openRequVari[index].append(incomingMessage[1])
self.log.debug("Add to openRequVari: " + incomingMessage[1] )
# no double requests from the same socket TODO do we want that?
# print "allowedQueries[", index, "]", self.allowedQueries[index]
# print "openRequVari", self.openRequVari
# print "incomingMessage", incomingMessage[1]
for i in range(len(self.allowedQueries[index])):
if incomingMessage[1] == self.allowedQueries[index][i][0]:
self.openRequVari[index].append(self.allowedQueries[index][i])
self.log.debug("Add to openRequVari: " + str(self.allowedQueries[index][i]) )
# print "openRequVari", self.openRequVari
......@@ -238,7 +240,7 @@ class SignalHandler():
if signal == "START_STREAM":
#FIXME
socketId = socketIds[0]
self.log.info("Received signal: " + signal + " to host " + str(socketId) +
self.log.info("Received signal: " + signal + " to host " + str(socketId) + \
" with priority " + str(socketId[1]))
if socketId in [i[0] for i in self.openRequPerm]:
......@@ -287,9 +289,9 @@ class SignalHandler():
if not connectionFound:
# send signal back to receiver
self.sendResponse(signal)
self.allowedQueries += sorted(tmpAllowed)
self.allowedQueries.append(sorted(tmpAllowed))
self.openRequVari += [[] for i in tmpAllowed]
self.openRequVari.append([])
self.log.debug("Send response back: " + str(signal))
return
......@@ -297,26 +299,35 @@ class SignalHandler():
elif signal == "STOP_QUERY_NEXT":
self.log.info("Received signal to disable querying for data for hosts: " + str(socketIds))
connectionNotFound = False
tmpRemove = []
tmpRemoveIndex = []
found = False
for socketConf in socketIds:
socketId = socketConf[0]
if socketId in [ i[0] for i in self.allowedQueries]:
tmpRemove.append(socketConf)
else:
for i in range(len(self.allowedQueries)):
for j in range(len(self.allowedQueries[i])):
if socketId == self.allowedQueries[i][j][0]:
tmpRemoveIndex.append([i,j])
found = True
if not found:
connectionNotFound = True
if connectionNotFound:
self.log.info("No connection to close was found for " + str(socketId))
self.sendResponse("NO_OPEN_CONNECTION_FOUND")
self.log.info("No connection to close was found for " + str(socketConf))
else:
# send signal back to receiver
self.sendResponse(signal)
self.log.debug("Send response back: " + str(signal))
for socketToRemove in tmpRemove:
indexToRemove = self.allowedQueries.index(socketToRemove)
self.allowedQueries.pop(indexToRemove)
self.openRequVari.pop(indexToRemove)
self.log.debug("Removed " + str(socketToRemove) + " from allowedQueries.")
for i, j in tmpRemoveIndex:
self.log.debug("Remove " + str(self.allowedQueries[i][j]) + " from allowedQueries.")
socketId = self.allowedQueries[i].pop(j)[0]
self.openRequVari = [ [ b for b in self.openRequVari[a] if socketId != b[0] ] for a in range(len(self.openRequVari)) ]
self.log.debug("Remove all occurences from " + str(socketId) + " from openRequVari.")
if not self.allowedQueries[i]:
del self.allowedQueries[i]
del self.openRequVari[i]
return
......@@ -361,7 +372,7 @@ if __name__ == '__main__':
while True:
self.requestFwSocket.send("")
logging.info("[getRequests] send")
requests = self.requestFwSocket.recv()
requests = pickle.loads(self.requestFwSocket.recv())
logging.info("[getRequests] Requests: " + str(requests))
time.sleep(0.25)
......@@ -447,7 +458,12 @@ if __name__ == '__main__':
sendRequest(requestSocket, "zitpcx19282:6005")
sendRequest(requestSocket, "zitpcx19282:6005")
sendRequest(requestSocket, "zitpcx19282:6006")
time.sleep(0.5)
sendRequest(requestSocket, "zitpcx19282:6005")
sendSignal(comSocket, "STOP_QUERY_NEXT", "6005", 2)
time.sleep(1)
......
......@@ -8,6 +8,7 @@ import logging
import sys
import json
import trace
import pickle
#
......@@ -130,7 +131,7 @@ class TaskProvider():
try:
self.log.debug("Get requests...")
self.requestFwSocket.send("")
requests = self.requestFwSocket.recv_multipart()
requests = pickle.loads(self.requestFwSocket.recv())
self.log.debug("Get requests... done.")
self.log.debug("Requests: " + str(requests))
except:
......@@ -154,7 +155,7 @@ class TaskProvider():
self.log.debug("Sending message...")
message = [messageDict]
if requests != ["None"]:
message += requests
message.append(pickle.dumps(requests))
self.log.debug(str(message))
self.routerSocket.send_multipart(message)
self.log.debug("Sending message...done.")
......@@ -209,12 +210,12 @@ if __name__ == '__main__':
def run (self):
logging.info("[requestResponder] Start run")
openRequests = ['zitpcx19282:6004', 'zitpcx19282:6005']
openRequests = [['zitpcx19282:6003', 1], ['zitpcx19282:6004', 0]]
while True:
request = self.requestFwSocket.recv()
logging.debug("[requestResponder] Received request: " + str(request) )
self.requestFwSocket.send_multipart(openRequests)
self.requestFwSocket.send(pickle.dumps(openRequests))
logging.debug("[requestResponder] Answer: " + str(openRequests) )
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment