Skip to content
Snippets Groups Projects
Commit 47770ced authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Devided between sending data and sending metadata

parent 96e69f53
No related branches found
No related tags found
No related merge requests found
......@@ -354,27 +354,14 @@ class SignalHandler():
self.nextRequNode.pop(i)
else:
self.nextRequNode[i] = self.nextRequNode[i] % len(self.openRequPerm[i])
# socketId = socketIds[0][0]
# self.log.info("Received signal: " + signal + " to host " + str(socketId[0]))
#
# if socketId in [i[0] for i in self.openRequPerm]:
# # send signal back to receiver
# self.sendResponse(signal)
# self.log.debug("Send response back: " + str(signal))
#
# for element in self.openRequPerm:
# if element[0] == socketId:
# self.openRequPerm.remove(element)
# else:
# self.log.info("No connection to close was found for " + str(socketId))
# self.log.debug("self.openReqPerm=" + str(self.openRequPerm))
# self.sendResponse("NO_OPEN_CONNECTION_FOUND")
return
###########################
## START_QUERY ##
###########################
elif signal == "START_QUERY_NEXT":
self.log.info("Received signal to enable querying for data for hosts: " + str(socketIds))
self.log.info("Received signal: " + signal + " for hosts " + str(socketIds))
connectionFound = False
tmpAllowed = []
......@@ -459,21 +446,9 @@ class SignalHandler():
del self.allowedQueries[i]
del self.openRequVari[i]
# for i, j in tmpRemoveIndex:
# self.log.debug("i=" + str(i) + ", j=" + str(j))
# self.log.debug("self.allowedQueries: " + str(self.allowedQueries))
# self.log.debug("Remove " + str(self.allowedQueries[i][j]) + " from allowedQueries.")
# socketId = self.allowedQueries[i][j][0]
#
# self.openRequVari = [ [ b for b in self.openRequVari[a] if socketId != b[0] ] for a in range(len(self.openRequVari)) ]
# self.log.debug("Remove all occurences from " + str(socketId) + " from openRequVari.")
#
# if not self.allowedQueries[i]:
# del self.allowedQueries[i]
# del self.openRequVari[i]
return
else:
self.log.info("Received signal from host " + str(host) + " unkown: " + str(signal))
self.sendResponse("NO_VALID_SIGNAL")
......
......@@ -81,10 +81,10 @@ def getMetadata (log, metadata, chunkSize, localTarget = None):
return sourceFile, targetFile, metadata
def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, context, properties):
def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, context, prop):
if not targets:
properties["removeFlag"] = False
prop["removeFlag"] = False
return
#reading source file into memory
......@@ -117,22 +117,22 @@ def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, c
break
#assemble metadata for zmq-message
chunkPayloadMetadata = metadata.copy()
chunkPayloadMetadata["chunkNumber"] = chunkNumber
chunkPayloadMetadataJson = cPickle.dumps(chunkPayloadMetadata)
chunkMetadata = metadata.copy()
chunkMetadata["chunkNumber"] = chunkNumber
chunkMetadata = cPickle.dumps(chunkMetadata)
chunkPayload = []
chunkPayload.append(chunkPayloadMetadataJson)
chunkPayload.append(chunkMetadata)
chunkPayload.append(fileContent)
# sending data
__sendToTargets(log, targets, sourceFile, openConnections, chunkPayload, context, properties)
__sendToTargets(log, targets, sourceFile, targetFile, openConnections, chunkMetadata, chunkPayload, context)
#close file
fileDescriptor.close()
log.debug("Passing multipart-message for file " + str(sourceFile) + "...done.")
properties["removeFlag"] = True
prop["removeFlag"] = True
except:
log.error("Unable to send multipart-message for file " + str(sourceFile), exc_info=True)
......@@ -187,7 +187,7 @@ def finishDataHandling (log, sourceFile, targetFile, prop):
# self.log.error("Unable to notify Cleaner-pipe to handle file: " + str(workload), exc_info=True)
def clean (properties):
def clean (prop):
pass
......
......@@ -133,7 +133,7 @@ def sendData (log, targets, sourceFile, targetFile, metadata, openConnections,
#send message
try:
__sendToTargets(log, targets, sourceFile, openConnections, payload, context, properties)
__sendToTargets(log, targets, sourceFile, targetFile, openConnections, metadataExtended, payload, context)
log.debug("Passing multipart-message for file " + str(sourceFile) + "...done.")
except:
......
......@@ -77,7 +77,7 @@ def getMetadata (log, metadata, chunkSize, localTarget = None):
return sourceFile, targetFile, metadata
def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, context, prop):
def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, context):
#reading source file into memory
try:
......@@ -109,7 +109,7 @@ def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, c
#send message
try:
__sendToTargets(log, targets, sourceFile, openConnections, payload, context, properties)
__sendToTargets(log, targets, sourceFile, targetFile, openConnections, metadataExtended, payload, context)
log.debug("Passing multipart-message for file " + str(sourceFile) + "...done.")
except:
log.error("Unable to send multipart-message for file " + str(sourceFile), exc_info=True)
......
......@@ -2,18 +2,14 @@ __author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import zmq
def __sendToTargets(log, targets, sourceFile, openConnections, payload, context, properties):
def __sendToTargets(log, targets, sourceFile, targetFile, openConnections, metadata, payload, context):
for target, prio, sendType in targets:
# send data to the data stream to store it in the storage system
if prio == 0:
# socket already known
if target in openConnections:
tracker = openConnections[target].send_multipart(payload, copy=False, track=True)
log.info("Sending message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) )
else:
# socket not known
if target not in openConnections:
# open socket
socket = context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
......@@ -24,12 +20,19 @@ def __sendToTargets(log, targets, sourceFile, openConnections, payload, context,
# register socket
openConnections[target] = socket
# send data
# send data
if sendType == "data":
tracker = openConnections[target].send_multipart(payload, copy=False, track=True)
log.info("Sending message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) )
# socket not known
elif sendType == "metadata":
tracker = openConnections[target].send_multipart(metadata, copy=False, track=True)
log.info("Sending metadata of message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) )
log.debug("metadata=" + str(metadata))
if not tracker.done:
log.info("Message part from file " + str(sourceFile) +
" has not been sent yet, waiting...")
......@@ -38,14 +41,8 @@ def __sendToTargets(log, targets, sourceFile, openConnections, payload, context,
" has not been sent yet, waiting...done")
else:
# socket already known
if target in openConnections:
# send data
openConnections[target].send_multipart(payload, zmq.NOBLOCK)
log.info("Sending message part from file " + str(sourceFile) +
" to " + target)
# socket not known
else:
if target not in openConnections:
# open socket
socket = context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
......@@ -56,8 +53,15 @@ def __sendToTargets(log, targets, sourceFile, openConnections, payload, context,
# register socket
openConnections[target] = socket
# send data
# send data
if sendType == "data":
openConnections[target].send_multipart(payload, zmq.NOBLOCK)
log.info("Sending message part from file " + str(sourceFile) +
" to " + target)
elif sendType == "metadata":
openConnections[target].send_multipart(metadata, zmq.NOBLOCK)
log.info("Sending metadata of message part from file " + str(sourceFile) +
" to " + target)
log.debug("metadata=" + str(metadata))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment