Skip to content
Snippets Groups Projects
Commit eb113023 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added options streamMetadata and queryMetadata

parent 47770ced
No related branches found
No related tags found
No related merge requests found
......@@ -57,7 +57,7 @@ class dataTransfer():
self.targets = None
self.supportedConnections = ["stream", "queryNext"]
self.supportedConnections = ["stream", "streamMetadata", "queryNext", "queryMetadata"]
self.signalExchanged = None
......@@ -85,9 +85,15 @@ class dataTransfer():
if self.connectionType == "stream":
signalPort = self.signalPort
signal = "START_STREAM"
elif self.connectionType == "streamMetadata":
signalPort = self.signalPort
signal = "START_STREAM_METADATA"
elif self.connectionType == "queryNext":
signalPort = self.signalPort
signal = "START_QUERY_NEXT"
elif self.connectionType == "queryMetadata":
signalPort = self.signalPort
signal = "START_QUERY_METADATA"
self.log.debug("Create socket for signal exchange...")
......@@ -270,7 +276,7 @@ class dataTransfer():
self.log.error("Failed to start Socket of type " + self.connectionType + " (bind): '" + connectionStr + "'", exc_info=True)
if self.connectionType == "queryNext":
if self.connectionType in ["queryNext", "queryMetadata"]:
self.requestSocket = self.context.socket(zmq.PUSH)
# An additional socket is needed to establish the data retriving mechanism
......@@ -491,7 +497,7 @@ class dataTransfer():
signal = None
if self.streamStarted or ( "STREAM" in self.signalExchanged):
signal = "STOP_STREAM"
elif self.queryNextStarted or ( "QUERY_NEXT" in self.signalExchanged):
elif self.queryNextStarted or ( "QUERY" in self.signalExchanged):
signal = "STOP_QUERY_NEXT"
......
......@@ -261,190 +261,162 @@ class SignalHandler():
self.comSocket.send(signal, zmq.NOBLOCK)
def reactToSignal (self, signal, socketIds):
def __startSignal(self, signal, sendType, socketIds, listToCheck, variList, correspList):
# React to signal
###########################
## START_STREAM ##
###########################
if signal == "START_STREAM":
self.log.info("Received signal: " + signal + " for hosts " + str(socketIds))
connectionFound = False
tmpAllowed = []
connectionFound = False
tmpAllowed = []
for socketConf in socketIds:
for socketConf in socketIds:
if ".desy.de:" in socketConf[0]:
socketConf[0] = socketConf[0].replace(".desy.de:",":")
if ".desy.de:" in socketConf[0]:
socketConf[0] = socketConf[0].replace(".desy.de:",":")
socketId = socketConf[0]
socketId = socketConf[0]
self.log.debug("socketId: " + str(socketId))
flatlist = [ i[0] for i in [j for sublist in self.openRequPerm for j in sublist]]
self.log.debug("flatlist: " + str(flatlist))
self.log.debug("socketId: " + str(socketId))
flatlist = [ i[0] for i in [j for sublist in listToCheck for j in sublist]]
self.log.debug("flatlist: " + str(flatlist))
self.log.debug("tmpAllowed: " + str(tmpAllowed))
self.log.debug(str([i for i in tmpAllowed]))
if socketId in flatlist:
connectionFound = True
self.log.info("Connection to " + str(socketId) + " is already open")
elif socketId not in [ i[0] for i in tmpAllowed]:
tmpSocketConf = socketConf + ["data"]
tmpAllowed.append(tmpSocketConf)
else:
#TODO send notification (double entries in START_QUERY_NEXT) back?
pass
if socketId in flatlist:
connectionFound = True
self.log.info("Connection to " + str(socketId) + " is already open")
elif socketId not in [ i[0] for i in tmpAllowed]:
tmpSocketConf = socketConf + [sendType]
tmpAllowed.append(tmpSocketConf)
else:
#TODO send notification (double entries in START_QUERY_NEXT) back?
pass
if not connectionFound:
# send signal back to receiver
self.sendResponse(signal)
listToCheck.append(copy.deepcopy(sorted(tmpAllowed)))
if correspList != None:
correspList.append(0)
del tmpAllowed
if variList != None:
variList.append([])
else:
# send error back to receiver
self.sendResponse("CONNECTION_ALREADY_OPEN")
if not connectionFound:
# send signal back to receiver
self.sendResponse(signal)
self.openRequPerm.append(copy.deepcopy(sorted(tmpAllowed)))
self.nextRequNode.append(0)
del tmpAllowed
else:
# send error back to receiver
self.sendResponse("CONNECTION_ALREADY_OPEN")
def __stopSignal(self, signal, socketIds, listToCheck, variList, correspList):
return
connectionNotFound = False
tmpRemoveIndex = []
tmpRemoveElement = []
found = False
###########################
## STOP_STREAM ##
###########################
elif signal == "STOP_STREAM":
self.log.info("Received signal: " + signal + " for host " + str(socketIds))
connectionNotFound = False
tmpRemoveIndex = []
tmpRemoveElement = []
found = False
for socketConf in socketIds:
for socketConf in socketIds:
if ".desy.de:" in socketConf[0]:
socketConf[0] = socketConf[0].replace(".desy.de:",":")
if ".desy.de:" in socketConf[0]:
socketConf[0] = socketConf[0].replace(".desy.de:",":")
socketId = socketConf[0]
socketId = socketConf[0]
for sublist in listToCheck:
for element in sublist:
if socketId == element[0]:
tmpRemoveElement.append(element)
found = True
if not found:
connectionNotFound = True
for sublist in self.openRequPerm:
for element in sublist:
if socketId == element[0]:
tmpRemoveElement.append(element)
found = True
if not found:
connectionNotFound = True
if connectionNotFound:
self.sendResponse("NO_OPEN_CONNECTION_FOUND")
self.log.info("No connection to close was found for " + str(socketConf))
else:
# send signal back to receiver
self.sendResponse(signal)
if connectionNotFound:
self.sendResponse("NO_OPEN_CONNECTION_FOUND")
self.log.info("No connection to close was found for " + str(socketConf))
else:
# send signal back to receiver
self.sendResponse(signal)
for element in tmpRemoveElement:
for element in tmpRemoveElement:
socketId = element[0]
socketId = element[0]
if variList != None:
variList = [ [ b for b in variList[a] if socketId != b[0] ] for a in range(len(variList)) ]
self.log.debug("Remove all occurences from " + str(socketId) + " from variable request list.")
for i in range(len(self.openRequPerm)):
if element in self.openRequPerm[i]:
self.openRequPerm[i].remove(element)
self.log.debug("Remove " + str(socketId) + " from openRequPerm.")
for i in range(len(listToCheck)):
if element in listToCheck[i]:
listToCheck[i].remove(element)
self.log.debug("Remove " + str(socketId) + " from pemanent request/allowed list.")
if not listToCheck[i]:
del listToCheck[i]
if variList != None:
del variList[i]
if correspList != None:
correspList.pop(i)
else:
if correspList != None:
correspList[i] = correspList[i] % len(listToCheck[i])
if not self.openRequPerm[i]:
del self.openRequPerm[i]
self.nextRequNode.pop(i)
else:
self.nextRequNode[i] = self.nextRequNode[i] % len(self.openRequPerm[i])
return
def reactToSignal (self, signal, socketIds):
###########################
## START_QUERY ##
## START_STREAM ##
###########################
elif signal == "START_QUERY_NEXT":
if signal == "START_STREAM":
self.log.info("Received signal: " + signal + " for hosts " + str(socketIds))
connectionFound = False
tmpAllowed = []
for socketConf in socketIds:
self.__startSignal(signal, "data", socketIds, self.openRequPerm, None, self.nextRequNode)
if ".desy.de:" in socketConf[0]:
socketConf[0] = socketConf[0].replace(".desy.de:",":")
return
socketId = socketConf[0]
###########################
## START_STREAM_METADATA ##
###########################
elif signal == "START_STREAM_METADATA":
self.log.info("Received signal: " + signal + " for hosts " + str(socketIds))
self.log.debug("socketId: " + str(socketId))
flatlist = [ i[0] for i in [j for sublist in self.allowedQueries for j in sublist]]
self.log.debug("flatlist: " + str(flatlist))
self.__startSignal(signal, "metadata", socketIds, self.openRequPerm, None, self.nextRequNode)
if socketId in flatlist:
connectionFound = True
self.log.info("Connection to " + str(socketId) + " is already open")
elif socketId not in [ i[0] for i in tmpAllowed]:
tmpSocketConf = socketConf + ["data"]
tmpAllowed.append(tmpSocketConf)
else:
#TODO send notification (double entries in START_QUERY_NEXT) back?
pass
return
if not connectionFound:
# send signal back to receiver
self.sendResponse(signal)
self.allowedQueries.append(copy.deepcopy(sorted(tmpAllowed)))
del tmpAllowed
###########################
## STOP_STREAM ##
## STOP_STREAM_METADATA ##
###########################
elif signal == "STOP_STREAM" or signal == "STOP_STREAM_METADATA":
self.log.info("Received signal: " + signal + " for host " + str(socketIds))
self.openRequVari.append([])
else:
# send error back to receiver
self.sendResponse("CONNECTION_ALREADY_OPEN")
self.__stopSignal(signal, socketIds, self.openRequPerm, None, self.nextRequNode)
return
###########################
## STOP_QUERY ##
## START_QUERY ##
###########################
elif signal == "STOP_QUERY_NEXT":
elif signal == "START_QUERY_NEXT":
self.log.info("Received signal: " + signal + " for hosts " + str(socketIds))
connectionNotFound = False
tmpRemoveIndex = []
tmpRemoveElement = []
found = False
for socketConf in socketIds:
self.__startSignal(signal, "data", socketIds, self.allowedQueries, self.openRequVari, None)
if ".desy.de:" in socketConf[0]:
socketConf[0] = socketConf[0].replace(".desy.de:",":")
socketId = socketConf[0]
for sublist in self.allowedQueries:
for element in sublist:
if socketId == element[0]:
tmpRemoveElement.append(element)
found = True
if not found:
connectionNotFound = True
if connectionNotFound:
self.sendResponse("NO_OPEN_CONNECTION_FOUND")
self.log.info("No connection to close was found for " + str(socketConf))
else:
# send signal back to receiver
self.sendResponse(signal)
return
for element in tmpRemoveElement:
###########################
## START_QUERY_METADATA ##
###########################
elif signal == "START_QUERY_METADATA":
self.log.info("Received signal: " + signal + " for hosts " + str(socketIds))
socketId = element[0]
self.__startSignal(signal, "metadata", socketIds, self.allowedQueries, self.openRequVari, None)
self.openRequVari = [ [ b for b in self.openRequVari[a] if socketId != b[0] ] for a in range(len(self.openRequVari)) ]
self.log.debug("Remove all occurences from " + str(socketId) + " from openRequVari.")
return
for i in range(len(self.allowedQueries)):
sublist.remove(element) #TODO Bug??
self.log.debug("Remove " + str(socketId) + " from allowedQueries.")
###########################
## STOP_QUERY ##
## STOP_QUERY_METADATA ##
###########################
elif signal == "STOP_QUERY_NEXT" or signal == "STOP_QUERY_METADATA":
self.log.info("Received signal: " + signal + " for hosts " + str(socketIds))
if not self.allowedQueries[i]:
del self.allowedQueries[i]
del self.openRequVari[i]
self.__stopSignal(signal, socketIds, self.allowedQueries, self.openRequVari, None)
return
......
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import zmq
import cPickle
def __sendToTargets(log, targets, sourceFile, targetFile, openConnections, metadata, payload, context):
......@@ -27,7 +28,8 @@ def __sendToTargets(log, targets, sourceFile, targetFile, openConnections, metad
" to '" + target + "' with priority " + str(prio) )
elif sendType == "metadata":
tracker = openConnections[target].send_multipart(metadata, copy=False, track=True)
#cPickle.dumps(None) is 'N.'
tracker = openConnections[target].send_multipart([metadata, cPickle.dumps(None)], copy=False, track=True)
log.info("Sending metadata of message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) )
log.debug("metadata=" + str(metadata))
......@@ -60,8 +62,8 @@ def __sendToTargets(log, targets, sourceFile, targetFile, openConnections, metad
" to " + target)
elif sendType == "metadata":
openConnections[target].send_multipart(metadata, zmq.NOBLOCK)
openConnections[target].send_multipart([metadata, cPickle.dumps(None)], zmq.NOBLOCK)
log.info("Sending metadata of message part from file " + str(sourceFile) +
" to " + target)
log.debug("metadata=" + str(metadata))
log.debug("metadata=" + str([metadata]))
......@@ -304,13 +304,11 @@ class EventDetector():
# "filename" : "file1.tif"
# }
# remove beginning
# relativePath = relativePath[1:]
# if relativePath.startswith(os.sep):
# relativePath = os.path.normpath(relativePath[1:])
# else:
# relativePath = os.path.normpath(relativePath)
# remove beginning
if relativePath.startswith(os.sep):
relativePath = os.path.normpath(relativePath[1:])
else:
relativePath = os.path.normpath(relativePath)
eventMessage = {
"sourcePath" : parentDir,
......
......@@ -55,7 +55,9 @@ class worker(multiprocessing.Process):
break
self.log.debug("worker-" + str(self.id) + ": metadata " + str(metadata["filename"]))
# print "data", str(data)[:10]
# print "metadata", str(metadata)
# print "data", str(data)[:100]
def stop(self):
......@@ -78,7 +80,10 @@ if __name__ == "__main__":
targets = [["zitpcx19282.desy.de", "50101", 1], ["zitpcx19282.desy.de", "50102", 1], ["zitpcx19282.desy.de", "50103", 1]]
# targets = [["zitpcx19282.desy.de", "50101", 1], ["zitpcx19282.desy.de", "50102", 1], ["zitpcx19282.desy.de", "50103", 1], ["lsdma-lab04.desy.de", "50104", 1]]
# transferType = "queryNext"
transferType = "stream"
# transferType = "streamMetadata"
# transferType = "queryMetadata"
w1 = multiprocessing.Process(target=worker, args=(0, transferType, signalHost, "50101"))
w2 = multiprocessing.Process(target=worker, args=(1, transferType, signalHost, "50102"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment