Skip to content
Snippets Groups Projects
Commit 1aeb92b2 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed live viewer for multiple streams

parent 14a3c50e
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@
targetDir = /space/projects/live-viewer/data/target
# Local IP to connect dataStream to
dataStreamIp = 131.169.185.121 # zitpcx19282.desy.de
dataStreamIp = 131.169.185.121 ;# zitpcx19282.desy.de
# TCP port of data pipe
dataStreamPort = 6061
......
......@@ -4,7 +4,7 @@ targetDir = /space/projects/live-viewer/data/zmq_target
# Local ip to connect dataStream to
dataStreamIp = 127.0.0.1
# TCP port of data pipe"
dataStreamPort = 6070
dataStreamPorts = [6070]
# Local ip to bind LiveViewer to
liveViewerIp = 127.0.0.1
# TCP port of live viewer
......
......@@ -37,7 +37,7 @@ receiverComPort = 6080
# IP of liveViewer-socket to send new files to
liveViewerIp = 127.0.0.1
# Port number of liveViewer-socket to send data to
liveViewerPort = 6070
liveViewerPorts = ["6070"]
# Ports and ips to communicate with onda/realtime analysis
# There needs to be one entry for each workerProcess (meaning streams)
ondaIps = ["127.0.0.1"]
......
......@@ -43,7 +43,7 @@ class FileReceiver:
self.zmqDataStreamSocket = self.zmqContext.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format(ip=self.zmqDataStreamIp, port=self.zmqDataStreamPort)
self.zmqDataStreamSocket.bind(connectionStr)
self.log.debug("zmqDataStreamSocket started (bind) for '" + connectionStr + "'")
self.log.info("zmqDataStreamSocket started (bind) for '" + connectionStr + "'")
try:
self.log.info("Start receiving new files")
......
......@@ -5,6 +5,7 @@ import sys
import argparse
import logging
import os
import json
import ConfigParser
import shared.helperScript as helperScript
......@@ -25,7 +26,7 @@ def argumentParsing():
logfileName = config.get('asection', 'logfileName')
targetDir = config.get('asection', 'targetDir')
dataStreamIp = config.get('asection', 'dataStreamIp')
dataStreamPort = config.get('asection', 'dataStreamPort')
dataStreamPorts = json.loads(config.get('asection', 'dataStreamPorts'))
liveViewerIp = config.get('asection', 'liveViewerIp')
liveViewerPort = config.get('asection', 'liveViewerPort')
senderComPort = config.get('asection', 'senderComPort')
......@@ -43,8 +44,8 @@ def argumentParsing():
help="where incoming data will be stored to (default=" + str(targetDir) + ")")
parser.add_argument("--dataStreamIp" , type=str, default=dataStreamIp,
help="ip of dataStream-socket to pull new files from (default=" + str(dataStreamIp) + ")")
parser.add_argument("--dataStreamPort" , type=str, default=dataStreamPort,
help="port number of dataStream-socket to pull new files from (default=" + str(dataStreamPort) + ")")
parser.add_argument("--dataStreamPorts" , type=str, default=dataStreamPorts,
help="port number of dataStream-socket to pull new files from; there needs to be one entry for each streams (default=" + str(dataStreamPorts) + ")")
parser.add_argument("--liveViewerIp" , type=str, default=liveViewerIp,
help="local ip to bind LiveViewer to (default=" + str(liveViewerIp) + ")")
parser.add_argument("--liveViewerPort" , type=str, default=liveViewerPort,
......@@ -80,11 +81,11 @@ class ReceiverLiveViewer():
verbose = None
targetDir = None
zmqDataStreamIp = None
zmqDataStreamPort = None
dataStreamIp = None
dataStreamPorts = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
liveViewerIp = None
liveViewerPort = None
senderComPort = None
maxRingBufferSize = None
senderResponseTimeout = None
......@@ -98,11 +99,11 @@ class ReceiverLiveViewer():
self.verbose = arguments.verbose
self.targetDir = arguments.targetDir
self.zmqDataStreamIp = arguments.dataStreamIp
self.zmqDataStreamPort = arguments.dataStreamPort
self.dataStreamIp = arguments.dataStreamIp
self.dataStreamPorts = arguments.dataStreamPorts
self.zmqLiveViewerIp = arguments.liveViewerIp
self.zmqLiveViewerPort = arguments.liveViewerPort
self.liveViewerIp = arguments.liveViewerIp
self.liveViewerPort = arguments.liveViewerPort
self.senderComPort = arguments.senderComPort
self.maxRingBufferSize = arguments.maxRingBufferSize
self.senderResponseTimeout = arguments.senderResponseTimeout
......@@ -113,7 +114,7 @@ class ReceiverLiveViewer():
#start file receiver
myWorker = FileReceiver(self.targetDir, self.zmqDataStreamIp, self.zmqDataStreamPort, self.zmqLiveViewerPort, self.zmqLiveViewerIp, self.senderComPort, self.maxRingBufferSize, self.senderResponseTimeout)
myWorker = FileReceiver(self.targetDir, self.dataStreamIp, self.dataStreamPorts, self.liveViewerPort, self.liveViewerIp, self.senderComPort, self.maxRingBufferSize, self.senderResponseTimeout)
if __name__ == "__main__":
......
......@@ -13,10 +13,8 @@ class Coordinator:
zmqContext = None
liveViewerZmqContext = None
outputDir = None
zmqDataStreamIp = None
zmqDataStreamPort = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
liveViewerIp = None
liveViewerPort = None
receiverExchangeIp = "127.0.0.1"
receiverExchangePort = "6072"
......@@ -30,15 +28,13 @@ class Coordinator:
# sockets
receiverExchangeSocket = None # socket to communicate with FileReceiver class
zmqliveViewerSocket = None # socket to communicate with live viewer
liveViewerSocket = None # socket to communicate with live viewer
def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, maxRingBufferSize, context = None):
def __init__(self, outputDir, liveViewerPort, liveViewerIp, maxRingBufferSize, context = None):
self.outputDir = outputDir
self.zmqDataStreamIp = zmqDataStreamIp
self.zmqDataStreamPort = zmqDataStreamPort
self.zmqLiveViewerIp = zmqLiveViewerIp
self.zmqLiveViewerPort = zmqLiveViewerPort
self.liveViewerIp = liveViewerIp
self.liveViewerPort = liveViewerPort
self.maxRingBufferSize = maxRingBufferSize
# # TODO remove outputDir from ringBuffer?
......@@ -54,20 +50,20 @@ class Coordinator:
self.zmqContext = context or zmq.Context()
# create sockets
self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrReceiverExchangeSocket = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort
self.receiverExchangeSocket.bind(connectionStrReceiverExchangeSocket)
self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStrReceiverExchangeSocket + "'")
self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStr = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort
self.receiverExchangeSocket.bind(connectionStr)
self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStr + "'")
# create socket for live viewer
self.zmqliveViewerSocket = self.zmqContext.socket(zmq.REP)
connectionStrLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort
self.zmqliveViewerSocket.bind(connectionStrLiveViewerSocket)
self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStrLiveViewerSocket + "'")
self.liveViewerSocket = self.zmqContext.socket(zmq.REP)
connectionStr = "tcp://" + self.liveViewerIp + ":%s" % self.liveViewerPort
self.liveViewerSocket.bind(connectionStr)
self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStr + "'")
self.poller = zmq.Poller()
self.poller.register(self.receiverExchangeSocket, zmq.POLLIN)
self.poller.register(self.zmqliveViewerSocket, zmq.POLLIN)
self.poller.register(self.liveViewerSocket, zmq.POLLIN)
try:
self.log.info("Start communication")
......@@ -100,7 +96,7 @@ class Coordinator:
self.log.debug("Received exit command, coordinator thread will stop receiving messages")
should_continue = False
# TODO why sending signal to live viewer?
# self.zmqliveViewerSocket.send("Exit", zmq.NOBLOCK)
# self.liveViewerSocket.send("Exit", zmq.NOBLOCK)
break
elif message.startswith("AddFile"):
self.log.debug("Received AddFile command")
......@@ -111,18 +107,18 @@ class Coordinator:
self.log.debug("Add new file to ring buffer: " + str(filename) + ", " + str(fileModTime))
self.ringBuffer.add(filename, fileModTime)
if self.zmqliveViewerSocket in socks and socks[self.zmqliveViewerSocket] == zmq.POLLIN:
message = self.zmqliveViewerSocket.recv()
if self.liveViewerSocket in socks and socks[self.liveViewerSocket] == zmq.POLLIN:
message = self.liveViewerSocket.recv()
self.log.debug("Call for next file... " + message)
# send newest element in ring buffer to live viewer
answer = self.ringBuffer.getNewestFile()
print answer
try:
self.zmqliveViewerSocket.send(answer)
self.liveViewerSocket.send(answer)
except zmq.error.ContextTerminated:
break
self.log.debug("Closing socket")
self.receiverExchangeSocket.close(0)
self.zmqliveViewerSocket.close(0)
self.liveViewerSocket.close(0)
......@@ -20,10 +20,10 @@ from Coordinator import Coordinator
class FileReceiver:
zmqContext = None
outputDir = None
zmqDataStreamIp = None
zmqDataStreamPort = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
dataStreamIp = None
dataStreamPorts = []
liveViewerIp = None
liveViewerPort = None
exchangeIp = "127.0.0.1"
exchangePort = "6072"
senderComIp = None # ip for socket to communicate with receiver
......@@ -33,7 +33,7 @@ class FileReceiver:
log = None
# sockets
zmqDataStreamSocket = None # socket to receive the data from
dataStreamSockets = [] # socket to receive the data from
exchangeSocket = None # socket to communicate with Coordinator class
senderComSocket = None # socket to communicate with sender
......@@ -41,15 +41,15 @@ class FileReceiver:
# print socket.gethostbyname(socket.gethostname())
def __init__(self, outputDir, zmqDataStreamIp, zmqDataStreamPort, zmqLiveViewerPort, zmqLiveViewerIp, senderComPort,
def __init__(self, outputDir, dataStreamIp, dataStreamPorts, liveViewerPort, liveViewerIp, senderComPort,
maxRingBuffersize, senderResponseTimeout = 1000, context = None):
self.outputDir = os.path.normpath(outputDir)
self.zmqDataStreamIp = zmqDataStreamIp
self.zmqDataStreamPort = zmqDataStreamPort
self.zmqLiveViewerIp = zmqLiveViewerIp
self.zmqLiveViewerPort = zmqLiveViewerPort
self.senderComIp = zmqDataStreamIp # ip for socket to communicate with sender; is the same ip as the data stream ip
self.dataStreamIp = dataStreamIp
self.dataStreamPorts = dataStreamPorts
self.liveViewerIp = liveViewerIp
self.liveViewerPort = liveViewerPort
self.senderComIp = dataStreamIp # ip for socket to communicate with sender; is the same ip as the data stream ip
self.senderComPort = senderComPort
self.socketResponseTimeout = senderResponseTimeout
......@@ -62,43 +62,25 @@ class FileReceiver:
self.log.debug("Init")
# start file receiver
self.receiverThread = threading.Thread(target=Coordinator, args=(self.outputDir, self.zmqDataStreamPort, self.zmqDataStreamIp, self.zmqLiveViewerPort, self.zmqLiveViewerIp, maxRingBuffersize))
self.receiverThread = threading.Thread(target=Coordinator, args=(self.outputDir, self.liveViewerPort, self.liveViewerIp, maxRingBuffersize))
self.receiverThread.start()
# create pull socket
self.zmqDataStreamSocket = self.zmqContext.socket(zmq.PULL)
connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=self.zmqDataStreamIp, port=self.zmqDataStreamPort)
print "connectionStrDataStreamSocket", connectionStrDataStreamSocket
self.zmqDataStreamSocket.connect(connectionStrDataStreamSocket)
self.log.debug("zmqDataStreamSocket started (connect) for '" + connectionStrDataStreamSocket + "'")
self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrExchangeSocket = "tcp://{ip}:{port}".format(ip=self.exchangeIp, port=self.exchangePort)
self.exchangeSocket.connect(connectionStrExchangeSocket)
self.log.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'")
self.senderComSocket = self.zmqContext.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
# self.senderComSocket.RCVTIMEO = self.socketResponseTimeout
connectionStrSenderComSocket = "tcp://{ip}:{port}".format(ip=self.senderComIp, port=self.senderComPort)
print "connectionStrSenderComSocket", connectionStrSenderComSocket
self.senderComSocket.connect(connectionStrSenderComSocket)
self.log.debug("senderComSocket started (connect) for '" + connectionStrSenderComSocket + "'")
# using a Poller to implement the senderComSocket timeout because in older ZMQ version there is
self.poller = zmq.Poller()
self.poller.register(self.senderComSocket, zmq.POLLIN)
# create sockets
self.createSockets()
# Starting live viewer
message = "START_LIVE_VIEWER," + str(self.hostname)
self.log.info("Sending start signal to sender...")
self.log.debug("Sending start signal to sender, message: " + message)
print "sending message ", message
self.senderComSocket.send(str(message))
# self.senderComSocket.send("START_LIVE_VIEWER")
senderMessage = None
# wait for response of sender till timeout is reached
socks = dict(self.poller.poll(self.socketResponseTimeout))
# if there was a response
if self.senderComSocket in socks and socks[self.senderComSocket] == zmq.POLLIN:
try:
senderMessage = self.senderComSocket.recv()
......@@ -106,14 +88,15 @@ class FileReceiver:
self.log.debug("Received message from sender: " + str(senderMessage) )
except KeyboardInterrupt:
self.log.error("KeyboardInterrupt: No message received from sender")
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False)
self.stopReceiving(sendToSender = False)
sys.exit(1)
except Exception as e:
self.log.error("No message received from sender")
self.log.debug("Error was: " + str(e))
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False)
self.stopReceiving(sendToSender = False)
sys.exit(1)
# if the response was correct: start data retrieving
if senderMessage == "START_LIVE_VIEWER":
self.log.info("Received confirmation from sender...start receiving files")
try:
......@@ -128,10 +111,11 @@ class FileReceiver:
self.log.info("Unkown error state. Shutting down...")
self.log.debug("Error was: " + str(trace))
self.zmqContext.destroy()
# if there was no response or the response was of the wrong format, the receiver should be shut down
else:
print "Sending start signal to sender...failed."
self.log.info("Sending start signal to sender...failed.")
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False)
self.stopReceiving(sendToSender = False)
self.log.info("Quitting.")
......@@ -142,11 +126,80 @@ class FileReceiver:
return logger
def combineMessage(self, zmqDataStreamSocket):
def createSockets(self):
# create socket to exchange signals with Sender
self.senderComSocket = self.zmqContext.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
# self.senderComSocket.RCVTIMEO = self.socketResponseTimeout
connectionStr = "tcp://{ip}:{port}".format(ip=self.senderComIp, port=self.senderComPort)
self.senderComSocket.connect(connectionStr)
self.log.info("senderComSocket started (connect) for '" + connectionStr + "'")
# using a Poller to implement the senderComSocket timeout (in older ZMQ version there is no option RCVTIMEO)
self.poller = zmq.Poller()
self.poller.register(self.senderComSocket, zmq.POLLIN)
# create socket to communicate with Coordinator
self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStr = "tcp://{ip}:{port}".format(ip=self.exchangeIp, port=self.exchangePort)
self.exchangeSocket.connect(connectionStr)
self.log.debug("exchangeSocket started (connect) for '" + connectionStr + "'")
# create poller to differentiate between the different data stream port
self.dataPoller = zmq.Poller()
# create sockets to retrieve data from Sender
for dataStreamPort in self.dataStreamPorts:
dataStreamSocket = self.zmqContext.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format(ip=self.dataStreamIp, port=dataStreamPort)
dataStreamSocket.connect(connectionStr)
self.log.info("dataStreamSocket started (connect) for '" + connectionStr + "'")
self.dataStreamSockets.append(dataStreamSocket)
self.dataPoller.register(dataStreamSocket, zmq.POLLIN)
def startReceiving(self):
#run loop, and wait for incoming messages
loopCounter = 0 #counter of total received messages
continueReceiving = True #receiving will stop if value gets False
self.log.debug("Waiting for new messages...")
while continueReceiving:
try:
self.pollDifferentSockets()
loopCounter+=1
except KeyboardInterrupt:
self.log.debug("Keyboard interrupt detected. Stop receiving.")
continueReceiving = False
break
except:
self.log.error("receive message...failed.")
self.log.error(sys.exc_info())
continueReceiving = False
self.log.info("shutting down receiver...")
try:
self.stopReceiving()
self.log.debug("shutting down receiver...done.")
except:
self.log.error(sys.exc_info())
self.log.error("shutting down receiver...failed.")
def pollDifferentSockets(self):
socks = dict(self.dataPoller.poll())
for dataStreamSocket in self.dataStreamSockets:
if dataStreamSocket in socks and socks[dataStreamSocket] == zmq.POLLIN:
self.combineMessage(dataStreamSocket)
def combineMessage(self, dataStreamSocket):
receivingMessages = True
#save all chunks to file
while receivingMessages:
multipartMessage = zmqDataStreamSocket.recv_multipart()
multipartMessage = dataStreamSocket.recv_multipart()
#extract multipart message
try:
......@@ -185,7 +238,6 @@ class FileReceiver:
break
filename = self.generateTargetFilepath(payloadMetadataDict)
fileModTime = payloadMetadataDict["fileModificationTime"]
print "receiving multipart message from data pipe: ", filename
self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
# send the file to the coordinator to add it to the ring buffer
......@@ -194,33 +246,6 @@ class FileReceiver:
self.exchangeSocket.send(message)
def startReceiving(self):
#run loop, and wait for incoming messages
loopCounter = 0 #counter of total received messages
continueReceiving = True #receiving will stop if value gets False
self.log.debug("Waiting for new messages...")
while continueReceiving:
try:
self.combineMessage(self.zmqDataStreamSocket)
loopCounter+=1
except KeyboardInterrupt:
self.log.debug("Keyboard interrupt detected. Stop receiving.")
continueReceiving = False
break
except:
self.log.error("receive message...failed.")
self.log.error(sys.exc_info())
continueReceiving = False
self.log.info("shutting down receiver...")
try:
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext)
self.log.debug("shutting down receiver...done.")
except:
self.log.error(sys.exc_info())
self.log.error("shutting down receiver...failed.")
def generateTargetFilepath(self,configDict):
"""
generates full path where target file will saved to.
......@@ -311,15 +336,16 @@ class FileReceiver:
raise Exception(errorMessage)
def stopReceiving(self, zmqDataStreamSocket, zmqContext, sendToSender = True):
def stopReceiving(self, sendToSender = True):
self.log.debug("stopReceiving...")
try:
zmqDataStreamSocket.close(0)
self.log.debug("closing zmqDataStreamSocket...done.")
except:
self.log.error("closing zmqDataStreamSocket...failed.")
self.log.error(sys.exc_info())
for dataStreamSocket in self.dataStreamSockets:
try:
dataStreamSocket.close(0)
self.log.debug("closing dataStreamSocket...done.")
except:
self.log.error("closing dataStreamSocket...failed.")
self.log.error(sys.exc_info())
self.log.debug("sending exit signal to coordinator...")
self.exchangeSocket.send("Exit")
......@@ -356,7 +382,7 @@ class FileReceiver:
self.log.debug("closing signal communication sockets...done")
try:
zmqContext.destroy()
self.zmqContext.destroy()
self.log.debug("closing zmqContext...done.")
except:
self.log.error("closing zmqContext...failed.")
......
......@@ -51,7 +51,7 @@ def argumentParsing():
receiverComIp = config.get('asection', 'receiverComIp')
receiverComPort = config.get('asection', 'receiverComPort')
liveViewerIp = config.get('asection', 'liveViewerIp')
liveViewerPort = config.get('asection', 'liveViewerPort')
liveViewerPorts = json.loads(config.get('asection', 'liveViewerPorts'))
ondaIps = json.loads(config.get('asection', 'ondaIps'))
ondaPorts = json.loads(config.get('asection', 'ondaPorts'))
receiverWhiteList = json.loads(config.get('asection', 'receiverWhiteList'))
......@@ -97,8 +97,8 @@ def argumentParsing():
help="Port number to receive signals from the receiver (default=" + str(receiverComPort) + ")")
parser.add_argument("--liveViewerIp" , type=str, default=liveViewerIp,
help="IP of liveViewer-socket to send new files to (default=" + str(liveViewerIp) + ")")
parser.add_argument("--liveViewerPort" , type=str, default=liveViewerPort,
help="Port number of liveViewer-socket to send data to (default=" + str(liveViewerPort) + ")")
parser.add_argument("--liveViewerPorts" , type=str, default=liveViewerPorts,
help="Ports number of liveViewer-socket to send data to; there needs to be one entry for each streams (default=" + str(liveViewerPorts) + ")")
parser.add_argument("--ondaIps" , type=str, default=ondaIps,
help="IPs to communicate with onda/realtime analysis; there needs to be one entry for each streams (default=" + str(ondaIps) + ")")
parser.add_argument("--ondaPorts" , type=str, default=ondaPorts,
......@@ -149,7 +149,7 @@ class Sender():
cleanerComPort = None
receiverComPort = None
liveViewerIp = None
liveViewerPort = None
liveViewerPorts = None
ondaIps = None
ondaPorts = None
receiverWhiteList = None
......@@ -183,7 +183,7 @@ class Sender():
self.receiverComIp = arguments.receiverComIp
self.receiverComPort = arguments.receiverComPort
self.liveViewerIp = arguments.liveViewerIp
self.liveViewerPort = arguments.liveViewerPort
self.liveViewerPorts = arguments.liveViewerPorts
self.ondaIps = arguments.ondaIps
self.ondaPorts = arguments.ondaPorts
self.receiverWhiteList = arguments.receiverWhiteList
......@@ -223,7 +223,7 @@ class Sender():
self.receiverComIp, self.receiverComPort, self.receiverWhiteList,
self.parallelDataStreams, self.chunkSize,
self.cleanerIp, self.cleanerPort,
self.liveViewerIp, self.liveViewerPort,
self.liveViewerIp, self.liveViewerPorts,
self.ondaIps, self.ondaPorts,
self.useDataStream,
self.zmqContext)
......
......@@ -23,7 +23,7 @@ class FileMover():
receiverComIp = None # ip for socket to communicate with receiver
receiverComPort = None # port for socket to communicate receiver
liveViewer = None
liveViewerPort = None
liveViewerPorts = []
ondaIps = []
ondaPorts = []
receiverWhiteList = None
......@@ -46,9 +46,9 @@ class FileMover():
receiverComIp, receiverComPort, receiverWhiteList,
parallelDataStreams, chunkSize,
cleanerIp, cleanerPort,
liveViewerIp, liveViewerPort,
liveViewerIp, liveViewerPorts,
ondaIps, ondaPorts,
useDataStream = True,
useDataStream,
context = None):
# assert isinstance(context, zmq.sugar.context.Context)
......@@ -60,10 +60,11 @@ class FileMover():
self.dataStreamPort = dataStreamPort
self.cleanerIp = cleanerIp
self.cleanerPort = cleanerPort
self.receiverComIp = receiverComIp # ip for socket to communicate with receiver;
self.receiverComIp = receiverComIp # ip for socket to communicate with receiver;
self.receiverComPort = receiverComPort
self.liveViewerIp = liveViewerIp
self.liveViewerPort = liveViewerPort
self.liveViewerPorts = liveViewerPorts # needs a list of ports because every WorkerProcess
# binds to one port (this is not possible for only one port
self.ondaIps = ondaIps
self.ondaPorts = ondaPorts
......@@ -148,7 +149,7 @@ class FileMover():
self.cleanerIp,
self.cleanerPort,
self.liveViewerIp,
self.liveViewerPort,
self.liveViewerPorts[processNumber],
self.ondaIps[processNumber],
self.ondaPorts[processNumber],
self.useDataStream
......
......@@ -30,7 +30,7 @@ class WorkerProcess():
liveViewerSocket = None
ondaComSocket = None
useDataStream = True # boolian to inform if the data should be send to the data stream pipe (to the storage system)
useDataStream = False # boolian to inform if the data should be send to the data stream pipe (to the storage system)
useLiveViewer = False # boolian to inform if the receiver for the live viewer is running
useRealTimeAnalysis = False # boolian to inform if the receiver for realtime-analysis is running
......@@ -40,7 +40,7 @@ class WorkerProcess():
log = None
def __init__(self, id, dataStreamIp, dataStreamPort, chunkSize, cleanerIp, cleanerPort, liveViewerIp, liveViewerPort, ondaIp, ondaPort,
useDataStream = True, context = None):
useDataStream, context = None):
self.id = id
self.dataStreamIp = dataStreamIp
self.dataStreamPort = dataStreamPort
......@@ -52,6 +52,8 @@ class WorkerProcess():
self.ondaIp = ondaIp
self.ondaPort = ondaPort
self.useDataStream = useDataStream
#initialize router
if context:
self.zmqContextForWorker = context
......@@ -407,10 +409,12 @@ class WorkerProcess():
#send data to the live viewer
if socketDict.has_key("liveViewer"):
socketDict["liveViewer"].send_multipart(chunkPayload, zmq.NOBLOCK)
self.log.info("Sending message part from file " + str(sourceFilePathFull) + " to LiveViewer")
# send data to onda
if socketDict.has_key("onda"):
socketDict["onda"].send_multipart(payloadAll, zmq.NOBLOCK)
self.log.info("Sending from file " + str(sourceFilePathFull) + " to OnDA")
self.requestFromOnda = False
#close file
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment