Skip to content
Snippets Groups Projects
Commit 5abbea2d authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Data stream/LL/onDa sockets not created by default but on request

parent 25d23974
No related branches found
No related tags found
No related merge requests found
......@@ -16,7 +16,7 @@ class WorkerProcess():
dataStreamIp = None
dataStreamPort = None
zmqContextForWorker = None
externalContext = None # if the context was created outside this class or not
externalContext = None # if the context was created outside this class or not
zmqMessageChunkSize = None
cleanerIp = None # responsable to delete/move files
cleanerPort = None # responsable to delete/move files
......@@ -27,6 +27,7 @@ class WorkerProcess():
routerSocket = None
cleanerSocket = None
dataStreamSocket = None
liveViewerSocket = None
ondaComSocket = None
......@@ -67,20 +68,11 @@ class WorkerProcess():
self.log.debug("new workerProcess started. id=" + str(self.id))
self.dataStreamSocket = self.zmqContextForWorker.socket(zmq.PUSH)
connectionStr = "tcp://{ip}:{port}".format(ip=self.dataStreamIp, port=self.dataStreamPort)
self.dataStreamSocket.connect(connectionStr)
self.log.info("dataStreamSocket started (connect) for '" + connectionStr + "'")
self.liveViewerSocket = self.zmqContextForWorker.socket(zmq.PUSH)
connectionStr = "tcp://{ip}:{port}".format(ip=self.liveViewerIp, port=self.liveViewerPort)
self.liveViewerSocket.bind(connectionStr)
self.log.info("liveViewerSocket started (bind) for '" + connectionStr + "'")
self.ondaComSocket = self.zmqContextForWorker.socket(zmq.REP)
connectionStr = "tcp://{ip}:{port}".format(ip=self.ondaIp, port=self.ondaPort)
self.ondaComSocket.bind(connectionStr)
self.log.info("ondaSocket started (bind) for '" + connectionStr + "'")
if self.useDataStream:
self.dataStreamSocket = self.zmqContextForWorker.socket(zmq.PUSH)
connectionStr = "tcp://{ip}:{port}".format(ip=self.dataStreamIp, port=self.dataStreamPort)
self.dataStreamSocket.connect(connectionStr)
self.log.info("dataStreamSocket started (connect) for '" + connectionStr + "'")
# initialize sockets
routerIp = "127.0.0.1"
......@@ -100,8 +92,8 @@ class WorkerProcess():
# Poller to get either messages from the watcher or communication messages to stop sending data to the live viewer
self.poller = zmq.Poller()
#TODO do I need to register the routerSocket in here?
self.poller.register(self.routerSocket, zmq.POLLIN)
self.poller.register(self.ondaComSocket, zmq.POLLIN)
try:
......@@ -165,22 +157,41 @@ class WorkerProcess():
# the live viewer is turned on
startLV = workload == b"START_LIVE_VIEWER"
if startLV:
self.log.info("worker-"+str(self.id)+": Received live viewer start command...starting live viewer")
self.log.info("worker-"+str(self.id)+": Received live viewer start command...")
self.useLiveViewer = True
# create the socket to send data to the live viewer
self.liveViewerSocket = self.zmqContextForWorker.socket(zmq.PUSH)
connectionStr = "tcp://{ip}:{port}".format(ip=self.liveViewerIp, port=self.liveViewerPort)
self.liveViewerSocket.bind(connectionStr)
self.log.info("liveViewerSocket started (bind) for '" + connectionStr + "'")
continue
# the live viewer is turned off
stopLV = workload == b"STOP_LIVE_VIEWER"
if stopLV:
self.log.info("worker-"+str(self.id)+": Received live viewer stop command...stopping live viewer")
self.log.info("worker-"+str(self.id)+": Received live viewer stop command...")
self.useLiveViewer = False
# close the socket to send data to the live viewer
self.liveViewerSocket.close(0)
self.log.info("liveViewerSocket closed")
continue
# the realtime-analysis is turned on
startRTA = workload == b"START_REALTIME_ANALYSIS"
if startRTA:
self.log.info("worker-"+str(self.id)+": Received realtime-analysis start command...starting realtime analysis")
self.log.info("worker-"+str(self.id)+": Received realtime-analysis start command...")
self.useRealTimeAnalysis = True
# create the socket to send data to the realtime analysis
self.ondaComSocket = self.zmqContextForWorker.socket(zmq.REP)
connectionStr = "tcp://{ip}:{port}".format(ip=self.ondaIp, port=self.ondaPort)
self.ondaComSocket.bind(connectionStr)
self.log.info("ondaSocket started (bind) for '" + connectionStr + "'")
self.poller.register(self.ondaComSocket, zmq.POLLIN)
continue
# the realtime-analysis is turned off
......@@ -188,6 +199,10 @@ class WorkerProcess():
if stopRTA:
self.log.info("worker-"+str(self.id)+": Received realtime-analysis stop command...stopping realtime analysis")
self.useRealTimeAnalysis = False
# close the socket to send data to the realtime analysis
self.ondaComSocket.close(0)
self.log.info("ondaComSocket closed")
continue
if self.useDataStream or self.useLiveViewer or self.useRealTimeAnalysis:
......@@ -258,17 +273,17 @@ class WorkerProcess():
#skip all further instructions and continue with next iteration
continue
elif self.requestFromOnda:
#passing file to data-messagPipe
try:
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...")
socketListToSendData["onda"] = self.ondaComSocket
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...success.")
except Exception, e:
self.log.error("Unable to pass new file to data-messagePipe.")
self.log.error("Error was: " + str(e))
self.log.debug("worker-"+str(self.id) + ": passing new file to data-messagePipe...failed.")
#skip all further instructions and continue with next iteration
continue
#passing file to data-messagPipe
try:
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...")
socketListToSendData["onda"] = self.ondaComSocket
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...success.")
except Exception, e:
self.log.error("Unable to pass new file to data-messagePipe.")
self.log.error("Error was: " + str(e))
self.log.debug("worker-"+str(self.id) + ": passing new file to data-messagePipe...failed.")
#skip all further instructions and continue with next iteration
continue
return_value = self.passFileToDataStream(filename, sourcePath, relativePath, socketListToSendData)
......@@ -508,10 +523,12 @@ class WorkerProcess():
self.log.debug("Sending stop signal to cleaner from worker-" + str(self.id))
self.cleanerSocket.send("STOP") #no communication needed because cleaner detects KeyboardInterrupt signals
self.log.debug("Closing sockets for worker " + str(self.id))
self.dataStreamSocket.close(0)
if self.dataStreamSocket:
self.dataStreamSocket.close(0)
if self.liveViewerSocket:
self.liveViewerSocket.close(0)
self.ondaComSocket.close(0)
if self.ondaComSocket:
self.ondaComSocket.close(0)
self.routerSocket.close(0)
self.cleanerSocket.close(0)
if not self.externalContext:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment