Skip to content
Snippets Groups Projects
Commit d794d6d4 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed Onda communication

parent e7ce37c6
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,6 @@ import os
import logging
import traceback
import json
from fabio.cbfimage import cbfimage
#
# -------------------------- class: WorkerProcess --------------------------------------
......@@ -131,136 +130,132 @@ class WorkerProcess():
print "ondaRequest", ondaRequest
while processingJobs:
print "while"
#sending a "ready"-signal to the router.
#the reply will contain the actual job/task.
self.log.debug("worker-"+str(self.id)+": sending ready signal")
self.routerSocket.send(b"READY")
socks = dict(self.poller.poll())
if self.ondaComSocket in socks and socks[self.ondaComSocket] == zmq.POLLIN:
workload = self.ondaComSocket.recv()
self.log.debug("worker-"+str(self.id)+": received new request from onda")
print "workload", workload
request = workload == b"NEXT_FILE"
if request:
ondaRequest = True
print "ondaRequest = True"
continue
if self.routerSocket in socks and socks[self.routerSocket] == zmq.POLLIN:
# Get workload from router, until finished
self.log.debug("worker-"+str(self.id)+": waiting for new job")
workload = self.routerSocket.recv()
self.log.debug("worker-"+str(self.id)+": new job received")
finished = workload == b"END"
if finished:
processingJobs = False
self.log.debug("router requested to shutdown worker-process. Worker processed: %d files" % jobCount)
break
jobCount += 1
# the live viewer is turned on
startLV = workload == b"START_LIVE_VIEWER"
if startLV:
self.log.info("worker-"+str(self.id)+": Received live viewer start command...starting live viewer")
self.useLiveViewer = True
continue
# the live viewer is turned off
stopLV = workload == b"STOP_LIVE_VIEWER"
if stopLV:
self.log.info("worker-"+str(self.id)+": Received live viewer stop command...stopping live viewer")
self.useLiveViewer = False
continue
# Get workload from router, until finished
self.log.debug("worker-"+str(self.id)+": waiting for new job")
workload = self.routerSocket.recv()
self.log.debug("worker-"+str(self.id)+": new job received")
finished = workload == b"END"
if finished:
processingJobs = False
self.log.debug("router requested to shutdown worker-process. Worker processed: %d files" % jobCount)
break
jobCount += 1
# the live viewer is turned on
startLV = workload == b"START_LIVE_VIEWER"
if startLV:
self.log.info("worker-"+str(self.id)+": Received live viewer start command...starting live viewer")
self.useLiveViewer = True
continue
# the live viewer is turned off
stopLV = workload == b"STOP_LIVE_VIEWER"
if stopLV:
self.log.info("worker-"+str(self.id)+": Received live viewer stop command...stopping live viewer")
self.useLiveViewer = False
continue
# the realtime-analysis is turned on
startRTA = workload == b"START_REALTIME_ANALYSIS"
if startRTA:
self.log.info("worker-"+str(self.id)+": Received realtime-analysis start command...starting live viewer")
self.useRealTimeAnalysis = True
continue
# the realtime-analysis is turned off
stopRTA = workload == b"STOP_REALTIME_ANALYSIS"
if stopRTA:
self.log.info("worker-"+str(self.id)+": Received realtime-analysis stop command...stopping live viewer")
self.useRealTimeAnalysis = False
continue
if self.useLiveViewer or self.useRealTimeAnalysis:
#convert fileEventMessage back to a dictionary
fileEventMessageDict = None
try:
fileEventMessageDict = json.loads(str(workload))
self.log.debug("str(messageDict) = " + str(fileEventMessageDict) + " type(messageDict) = " + str(type(fileEventMessageDict)))
except Exception, e:
errorMessage = "Unable to convert message into a dictionary."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
# the realtime-analysis is turned on
startRTA = workload == b"START_REALTIME_ANALYSIS"
if startRTA:
self.log.info("worker-"+str(self.id)+": Received realtime-analysis start command...starting live viewer")
self.useRealTimeAnalysis = True
continue
# the realtime-analysis is turned off
stopRTA = workload == b"STOP_REALTIME_ANALYSIS"
if stopRTA:
self.log.info("worker-"+str(self.id)+": Received realtime-analysis stop command...stopping live viewer")
self.useRealTimeAnalysis = False
#extract fileEvent metadata
try:
#TODO validate fileEventMessageDict dict
filename = fileEventMessageDict["filename"]
sourcePath = fileEventMessageDict["sourcePath"]
relativePath = fileEventMessageDict["relativePath"]
except Exception, e:
self.log.error("Invalid fileEvent message received.")
self.log.debug("Error was: " + str(e))
self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict))
#skip all further instructions and continue with next iteration
continue
if self.useLiveViewer or self.useRealTimeAnalysis:
#convert fileEventMessage back to a dictionary
fileEventMessageDict = None
try:
fileEventMessageDict = json.loads(str(workload))
self.log.debug("str(messageDict) = " + str(fileEventMessageDict) + " type(messageDict) = " + str(type(fileEventMessageDict)))
except Exception, e:
errorMessage = "Unable to convert message into a dictionary."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
#extract fileEvent metadata
try:
#TODO validate fileEventMessageDict dict
filename = fileEventMessageDict["filename"]
sourcePath = fileEventMessageDict["sourcePath"]
relativePath = fileEventMessageDict["relativePath"]
except Exception, e:
self.log.error("Invalid fileEvent message received.")
self.log.debug("Error was: " + str(e))
self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict))
#skip all further instructions and continue with next iteration
continue
if self.useLiveViewer:
#passing file to data-messagPipe
try:
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...")
self.passFileToDataStream(filename, sourcePath, relativePath, zmqDataStreamSocket)
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...success.")
except Exception, e:
self.log.error("Unable to pass new file to data-messagePipe.")
self.log.error("Error was: " + str(e))
self.log.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.")
#skip all further instructions and continue with next iteration
continue
else:
print "worker-"+str(self.id)+": no data sent"
if self.useRealTimeAnalysis and ondaRequest:
#passing file to data-messagPipe
try:
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...")
self.passFileToDataStream(filename, sourcePath, relativePath, ondaStreamSocket)
ondoRequest = False
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...success.")
except Exception, e:
self.log.error("Unable to pass new file to data-messagePipe.")
self.log.error("Error was: " + str(e))
self.log.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.")
#skip all further instructions and continue with next iteration
continue
#send remove-request to message pipe
if self.useLiveViewer:
#passing file to data-messagPipe
try:
#sending to pipe
self.log.debug("send file-event for file to cleaner-pipe...")
self.log.debug("workload = " + str(workload))
self.cleanerSocket.send(workload)
self.log.debug("send file-event for file to cleaner-pipe...success.")
#TODO: remember workload. append to list?
# can be used to verify files which have been processed twice or more
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...")
self.passFileToDataStream(filename, sourcePath, relativePath, zmqDataStreamSocket)
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...success.")
except Exception, e:
errorMessage = "Unable to notify Cleaner-pipe to delete file: " + str(workload)
self.log.error(errorMessage)
self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict))
self.log.error("Unable to pass new file to data-messagePipe.")
self.log.error("Error was: " + str(e))
self.log.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.")
#skip all further instructions and continue with next iteration
continue
else:
print "worker-"+str(self.id)+": no data sent"
if self.useRealTimeAnalysis:
print "waiting"
socks = dict(self.poller.poll(0))
print "waiting..done"
if self.ondaComSocket in socks and socks[self.ondaComSocket] == zmq.POLLIN:
ondaWorkload = self.ondaComSocket.recv()
self.log.debug("worker-"+str(self.id)+": received new request from onda")
print "ondaWorkload", ondaWorkload
request = ondaWorkload == b"NEXT_FILE"
if request:
#passing file to data-messagPipe
try:
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...")
self.passFileToDataStream(filename, sourcePath, relativePath, self.ondaComSocket)
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...success.")
except Exception, e:
self.log.error("Unable to pass new file to data-messagePipe.")
self.log.error("Error was: " + str(e))
self.log.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.")
#skip all further instructions and continue with next iteration
continue
#send remove-request to message pipe
try:
#sending to pipe
self.log.debug("send file-event for file to cleaner-pipe...")
self.log.debug("workload = " + str(workload))
self.cleanerSocket.send(workload)
self.log.debug("send file-event for file to cleaner-pipe...success.")
#TODO: remember workload. append to list?
# can be used to verify files which have been processed twice or more
except Exception, e:
errorMessage = "Unable to notify Cleaner-pipe to delete file: " + str(workload)
self.log.error(errorMessage)
self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict))
def getLogger(self):
......@@ -364,8 +359,9 @@ class WorkerProcess():
chunkPayload.append(chunkPayloadMetadataJson)
chunkPayload.append(fileContentAsByteObject)
#send to zmq pipe
self.streamSocket.send_multipart(chunkPayload, zmq.NOBLOCK)
print "before sending multipart"
#send to zmq pipe
streamSocket.send_multipart(chunkPayload, zmq.NOBLOCK)
#close file
fileDescriptor.close()
......
......@@ -48,8 +48,8 @@ class ReceiverRealTimeAnalysis():
self.senderDataSocket = self.zmqContext.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
connectionStr = "tcp://{ip}:{port}".format(ip=self.senderDataIp, port=self.senderDataPort)
self.senderComSocket.connect(connectionStr)
self.log.debug("senderComSocket started (connect) for '" + connectionStr + "'")
self.senderDataSocket.connect(connectionStr)
self.log.debug("senderDataSocket started (connect) for '" + connectionStr + "'")
print "senderDataSocket started (connect) for '" + connectionStr + "'"
message = "START_REALTIME_ANALYSIS," + str(self.hostname)
......@@ -87,19 +87,21 @@ class ReceiverRealTimeAnalysis():
def askForNextFile(self):
# get latest file from reveiver
message = "NEXT_FILE,"
message = "NEXT_FILE"
while True:
try:
print "Asking for next file"
self.log.debug("Asking for next file")
self.senderDataSocket.send(message)
self.log.debug("Asking for next file...done")
print "Asking for next file...done"
time.sleep(1)
try:
# Get the reply.
received_file = self.senderDataSocket.recv()
print "Received_file", received_file[:45]
received_file = self.senderDataSocket.recv_multipart()
print "Received_file", "".join(received_file)
self.log.debug("Received_file" + str(received_file))
except zmq.error.ZMQError:
received_file = None
......@@ -109,21 +111,6 @@ class ReceiverRealTimeAnalysis():
self.log.debug("Error was: " + str(e))
break
except zmq.error.ZMQError:
self.log.debug("Sending new request failed")
self.log.debug("Try to drain queue")
try:
# Get the reply.
received_file = self.senderDataSocket.recv()
print "Received_file", received_file[:45]
self.log.debug("Received_file" + str(received_file))
except zmq.error.ZMQError:
received_file = None
self.log.warning("Unable to receive reply (seconf try): sender is currently busy")
except Exception as e:
self.log.error("Unable receive reply")
self.log.debug("Error was: " + str(e))
except Exception as e:
self.log.error("Unable to send request")
self.log.debug("Error was: " + str(e))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment