From 1aeb92b29c6c7abb3ff3031e1f3d44023812e4ca Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Mon, 26 Oct 2015 12:12:40 +0100 Subject: [PATCH] Fixed live viewer for multiple streams --- conf/receiver.conf | 2 +- conf/receiverLiveViewer.conf | 2 +- conf/sender.conf | 2 +- src/receiver/FileReceiver.py | 2 +- src/receiver_LiveViewer.py | 25 ++-- src/receiver_LiveViewer/Coordinator.py | 44 +++--- src/receiver_LiveViewer/FileReceiver.py | 180 ++++++++++++++---------- src/sender.py | 12 +- src/sender/FileMover.py | 13 +- src/sender/WorkerProcess.py | 8 +- 10 files changed, 159 insertions(+), 131 deletions(-) diff --git a/conf/receiver.conf b/conf/receiver.conf index fd823989..c9ef5695 100644 --- a/conf/receiver.conf +++ b/conf/receiver.conf @@ -2,7 +2,7 @@ targetDir = /space/projects/live-viewer/data/target # Local IP to connect dataStream to -dataStreamIp = 131.169.185.121 # zitpcx19282.desy.de +dataStreamIp = 131.169.185.121 ;# zitpcx19282.desy.de # TCP port of data pipe dataStreamPort = 6061 diff --git a/conf/receiverLiveViewer.conf b/conf/receiverLiveViewer.conf index fc7091b4..ccd36f42 100644 --- a/conf/receiverLiveViewer.conf +++ b/conf/receiverLiveViewer.conf @@ -4,7 +4,7 @@ targetDir = /space/projects/live-viewer/data/zmq_target # Local ip to connect dataStream to dataStreamIp = 127.0.0.1 # TCP port of data pipe" -dataStreamPort = 6070 +dataStreamPorts = [6070] # Local ip to bind LiveViewer to liveViewerIp = 127.0.0.1 # TCP port of live viewer diff --git a/conf/sender.conf b/conf/sender.conf index e54abd22..12962d95 100644 --- a/conf/sender.conf +++ b/conf/sender.conf @@ -37,7 +37,7 @@ receiverComPort = 6080 # IP of liveViewer-socket to send new files to liveViewerIp = 127.0.0.1 # Port number of liveViewer-socket to send data to -liveViewerPort = 6070 +liveViewerPorts = ["6070"] # Ports and ips to communicate with onda/realtime analysis # There needs to be one entry for each workerProcess (meaning streams) ondaIps = ["127.0.0.1"] diff --git a/src/receiver/FileReceiver.py b/src/receiver/FileReceiver.py index f3c26d0c..5b585fa7 100644 --- a/src/receiver/FileReceiver.py +++ b/src/receiver/FileReceiver.py @@ -43,7 +43,7 @@ class FileReceiver: self.zmqDataStreamSocket = self.zmqContext.socket(zmq.PULL) connectionStr = "tcp://{ip}:{port}".format(ip=self.zmqDataStreamIp, port=self.zmqDataStreamPort) self.zmqDataStreamSocket.bind(connectionStr) - self.log.debug("zmqDataStreamSocket started (bind) for '" + connectionStr + "'") + self.log.info("zmqDataStreamSocket started (bind) for '" + connectionStr + "'") try: self.log.info("Start receiving new files") diff --git a/src/receiver_LiveViewer.py b/src/receiver_LiveViewer.py index a23e442b..07ef7965 100644 --- a/src/receiver_LiveViewer.py +++ b/src/receiver_LiveViewer.py @@ -5,6 +5,7 @@ import sys import argparse import logging import os +import json import ConfigParser import shared.helperScript as helperScript @@ -25,7 +26,7 @@ def argumentParsing(): logfileName = config.get('asection', 'logfileName') targetDir = config.get('asection', 'targetDir') dataStreamIp = config.get('asection', 'dataStreamIp') - dataStreamPort = config.get('asection', 'dataStreamPort') + dataStreamPorts = json.loads(config.get('asection', 'dataStreamPorts')) liveViewerIp = config.get('asection', 'liveViewerIp') liveViewerPort = config.get('asection', 'liveViewerPort') senderComPort = config.get('asection', 'senderComPort') @@ -43,8 +44,8 @@ def argumentParsing(): help="where incoming data will be stored to (default=" + str(targetDir) + ")") parser.add_argument("--dataStreamIp" , type=str, default=dataStreamIp, help="ip of dataStream-socket to pull new files from (default=" + str(dataStreamIp) + ")") - parser.add_argument("--dataStreamPort" , type=str, default=dataStreamPort, - help="port number of dataStream-socket to pull new files from (default=" + str(dataStreamPort) + ")") + parser.add_argument("--dataStreamPorts" , type=str, default=dataStreamPorts, + help="port number of dataStream-socket to pull new files from; there needs to be one entry for each streams (default=" + str(dataStreamPorts) + ")") parser.add_argument("--liveViewerIp" , type=str, default=liveViewerIp, help="local ip to bind LiveViewer to (default=" + str(liveViewerIp) + ")") parser.add_argument("--liveViewerPort" , type=str, default=liveViewerPort, @@ -80,11 +81,11 @@ class ReceiverLiveViewer(): verbose = None targetDir = None - zmqDataStreamIp = None - zmqDataStreamPort = None + dataStreamIp = None + dataStreamPorts = None - zmqLiveViewerIp = None - zmqLiveViewerPort = None + liveViewerIp = None + liveViewerPort = None senderComPort = None maxRingBufferSize = None senderResponseTimeout = None @@ -98,11 +99,11 @@ class ReceiverLiveViewer(): self.verbose = arguments.verbose self.targetDir = arguments.targetDir - self.zmqDataStreamIp = arguments.dataStreamIp - self.zmqDataStreamPort = arguments.dataStreamPort + self.dataStreamIp = arguments.dataStreamIp + self.dataStreamPorts = arguments.dataStreamPorts - self.zmqLiveViewerIp = arguments.liveViewerIp - self.zmqLiveViewerPort = arguments.liveViewerPort + self.liveViewerIp = arguments.liveViewerIp + self.liveViewerPort = arguments.liveViewerPort self.senderComPort = arguments.senderComPort self.maxRingBufferSize = arguments.maxRingBufferSize self.senderResponseTimeout = arguments.senderResponseTimeout @@ -113,7 +114,7 @@ class ReceiverLiveViewer(): #start file receiver - myWorker = FileReceiver(self.targetDir, self.zmqDataStreamIp, self.zmqDataStreamPort, self.zmqLiveViewerPort, self.zmqLiveViewerIp, self.senderComPort, self.maxRingBufferSize, self.senderResponseTimeout) + myWorker = FileReceiver(self.targetDir, self.dataStreamIp, self.dataStreamPorts, self.liveViewerPort, self.liveViewerIp, self.senderComPort, self.maxRingBufferSize, self.senderResponseTimeout) if __name__ == "__main__": diff --git a/src/receiver_LiveViewer/Coordinator.py b/src/receiver_LiveViewer/Coordinator.py index 361c0998..97012624 100644 --- a/src/receiver_LiveViewer/Coordinator.py +++ b/src/receiver_LiveViewer/Coordinator.py @@ -13,10 +13,8 @@ class Coordinator: zmqContext = None liveViewerZmqContext = None outputDir = None - zmqDataStreamIp = None - zmqDataStreamPort = None - zmqLiveViewerIp = None - zmqLiveViewerPort = None + liveViewerIp = None + liveViewerPort = None receiverExchangeIp = "127.0.0.1" receiverExchangePort = "6072" @@ -30,15 +28,13 @@ class Coordinator: # sockets receiverExchangeSocket = None # socket to communicate with FileReceiver class - zmqliveViewerSocket = None # socket to communicate with live viewer + liveViewerSocket = None # socket to communicate with live viewer - def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, maxRingBufferSize, context = None): + def __init__(self, outputDir, liveViewerPort, liveViewerIp, maxRingBufferSize, context = None): self.outputDir = outputDir - self.zmqDataStreamIp = zmqDataStreamIp - self.zmqDataStreamPort = zmqDataStreamPort - self.zmqLiveViewerIp = zmqLiveViewerIp - self.zmqLiveViewerPort = zmqLiveViewerPort + self.liveViewerIp = liveViewerIp + self.liveViewerPort = liveViewerPort self.maxRingBufferSize = maxRingBufferSize # # TODO remove outputDir from ringBuffer? @@ -54,20 +50,20 @@ class Coordinator: self.zmqContext = context or zmq.Context() # create sockets - self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIR) - connectionStrReceiverExchangeSocket = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort - self.receiverExchangeSocket.bind(connectionStrReceiverExchangeSocket) - self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStrReceiverExchangeSocket + "'") + self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIR) + connectionStr = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort + self.receiverExchangeSocket.bind(connectionStr) + self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStr + "'") # create socket for live viewer - self.zmqliveViewerSocket = self.zmqContext.socket(zmq.REP) - connectionStrLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort - self.zmqliveViewerSocket.bind(connectionStrLiveViewerSocket) - self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStrLiveViewerSocket + "'") + self.liveViewerSocket = self.zmqContext.socket(zmq.REP) + connectionStr = "tcp://" + self.liveViewerIp + ":%s" % self.liveViewerPort + self.liveViewerSocket.bind(connectionStr) + self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStr + "'") self.poller = zmq.Poller() self.poller.register(self.receiverExchangeSocket, zmq.POLLIN) - self.poller.register(self.zmqliveViewerSocket, zmq.POLLIN) + self.poller.register(self.liveViewerSocket, zmq.POLLIN) try: self.log.info("Start communication") @@ -100,7 +96,7 @@ class Coordinator: self.log.debug("Received exit command, coordinator thread will stop receiving messages") should_continue = False # TODO why sending signal to live viewer? -# self.zmqliveViewerSocket.send("Exit", zmq.NOBLOCK) +# self.liveViewerSocket.send("Exit", zmq.NOBLOCK) break elif message.startswith("AddFile"): self.log.debug("Received AddFile command") @@ -111,18 +107,18 @@ class Coordinator: self.log.debug("Add new file to ring buffer: " + str(filename) + ", " + str(fileModTime)) self.ringBuffer.add(filename, fileModTime) - if self.zmqliveViewerSocket in socks and socks[self.zmqliveViewerSocket] == zmq.POLLIN: - message = self.zmqliveViewerSocket.recv() + if self.liveViewerSocket in socks and socks[self.liveViewerSocket] == zmq.POLLIN: + message = self.liveViewerSocket.recv() self.log.debug("Call for next file... " + message) # send newest element in ring buffer to live viewer answer = self.ringBuffer.getNewestFile() print answer try: - self.zmqliveViewerSocket.send(answer) + self.liveViewerSocket.send(answer) except zmq.error.ContextTerminated: break self.log.debug("Closing socket") self.receiverExchangeSocket.close(0) - self.zmqliveViewerSocket.close(0) + self.liveViewerSocket.close(0) diff --git a/src/receiver_LiveViewer/FileReceiver.py b/src/receiver_LiveViewer/FileReceiver.py index cf5dcedd..32f8b342 100644 --- a/src/receiver_LiveViewer/FileReceiver.py +++ b/src/receiver_LiveViewer/FileReceiver.py @@ -20,10 +20,10 @@ from Coordinator import Coordinator class FileReceiver: zmqContext = None outputDir = None - zmqDataStreamIp = None - zmqDataStreamPort = None - zmqLiveViewerIp = None - zmqLiveViewerPort = None + dataStreamIp = None + dataStreamPorts = [] + liveViewerIp = None + liveViewerPort = None exchangeIp = "127.0.0.1" exchangePort = "6072" senderComIp = None # ip for socket to communicate with receiver @@ -33,7 +33,7 @@ class FileReceiver: log = None # sockets - zmqDataStreamSocket = None # socket to receive the data from + dataStreamSockets = [] # socket to receive the data from exchangeSocket = None # socket to communicate with Coordinator class senderComSocket = None # socket to communicate with sender @@ -41,15 +41,15 @@ class FileReceiver: # print socket.gethostbyname(socket.gethostname()) - def __init__(self, outputDir, zmqDataStreamIp, zmqDataStreamPort, zmqLiveViewerPort, zmqLiveViewerIp, senderComPort, + def __init__(self, outputDir, dataStreamIp, dataStreamPorts, liveViewerPort, liveViewerIp, senderComPort, maxRingBuffersize, senderResponseTimeout = 1000, context = None): self.outputDir = os.path.normpath(outputDir) - self.zmqDataStreamIp = zmqDataStreamIp - self.zmqDataStreamPort = zmqDataStreamPort - self.zmqLiveViewerIp = zmqLiveViewerIp - self.zmqLiveViewerPort = zmqLiveViewerPort - self.senderComIp = zmqDataStreamIp # ip for socket to communicate with sender; is the same ip as the data stream ip + self.dataStreamIp = dataStreamIp + self.dataStreamPorts = dataStreamPorts + self.liveViewerIp = liveViewerIp + self.liveViewerPort = liveViewerPort + self.senderComIp = dataStreamIp # ip for socket to communicate with sender; is the same ip as the data stream ip self.senderComPort = senderComPort self.socketResponseTimeout = senderResponseTimeout @@ -62,43 +62,25 @@ class FileReceiver: self.log.debug("Init") # start file receiver - self.receiverThread = threading.Thread(target=Coordinator, args=(self.outputDir, self.zmqDataStreamPort, self.zmqDataStreamIp, self.zmqLiveViewerPort, self.zmqLiveViewerIp, maxRingBuffersize)) + self.receiverThread = threading.Thread(target=Coordinator, args=(self.outputDir, self.liveViewerPort, self.liveViewerIp, maxRingBuffersize)) self.receiverThread.start() - # create pull socket - self.zmqDataStreamSocket = self.zmqContext.socket(zmq.PULL) - connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=self.zmqDataStreamIp, port=self.zmqDataStreamPort) - print "connectionStrDataStreamSocket", connectionStrDataStreamSocket - self.zmqDataStreamSocket.connect(connectionStrDataStreamSocket) - self.log.debug("zmqDataStreamSocket started (connect) for '" + connectionStrDataStreamSocket + "'") - - self.exchangeSocket = self.zmqContext.socket(zmq.PAIR) - connectionStrExchangeSocket = "tcp://{ip}:{port}".format(ip=self.exchangeIp, port=self.exchangePort) - self.exchangeSocket.connect(connectionStrExchangeSocket) - self.log.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'") - - self.senderComSocket = self.zmqContext.socket(zmq.REQ) - # time to wait for the sender to give a confirmation of the signal -# self.senderComSocket.RCVTIMEO = self.socketResponseTimeout - connectionStrSenderComSocket = "tcp://{ip}:{port}".format(ip=self.senderComIp, port=self.senderComPort) - print "connectionStrSenderComSocket", connectionStrSenderComSocket - self.senderComSocket.connect(connectionStrSenderComSocket) - self.log.debug("senderComSocket started (connect) for '" + connectionStrSenderComSocket + "'") - - # using a Poller to implement the senderComSocket timeout because in older ZMQ version there is - self.poller = zmq.Poller() - self.poller.register(self.senderComSocket, zmq.POLLIN) + # create sockets + self.createSockets() + # Starting live viewer message = "START_LIVE_VIEWER," + str(self.hostname) self.log.info("Sending start signal to sender...") self.log.debug("Sending start signal to sender, message: " + message) print "sending message ", message self.senderComSocket.send(str(message)) -# self.senderComSocket.send("START_LIVE_VIEWER") senderMessage = None + # wait for response of sender till timeout is reached socks = dict(self.poller.poll(self.socketResponseTimeout)) + + # if there was a response if self.senderComSocket in socks and socks[self.senderComSocket] == zmq.POLLIN: try: senderMessage = self.senderComSocket.recv() @@ -106,14 +88,15 @@ class FileReceiver: self.log.debug("Received message from sender: " + str(senderMessage) ) except KeyboardInterrupt: self.log.error("KeyboardInterrupt: No message received from sender") - self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False) + self.stopReceiving(sendToSender = False) sys.exit(1) except Exception as e: self.log.error("No message received from sender") self.log.debug("Error was: " + str(e)) - self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False) + self.stopReceiving(sendToSender = False) sys.exit(1) + # if the response was correct: start data retrieving if senderMessage == "START_LIVE_VIEWER": self.log.info("Received confirmation from sender...start receiving files") try: @@ -128,10 +111,11 @@ class FileReceiver: self.log.info("Unkown error state. Shutting down...") self.log.debug("Error was: " + str(trace)) self.zmqContext.destroy() + # if there was no response or the response was of the wrong format, the receiver should be shut down else: print "Sending start signal to sender...failed." self.log.info("Sending start signal to sender...failed.") - self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False) + self.stopReceiving(sendToSender = False) self.log.info("Quitting.") @@ -142,11 +126,80 @@ class FileReceiver: return logger - def combineMessage(self, zmqDataStreamSocket): + def createSockets(self): + # create socket to exchange signals with Sender + self.senderComSocket = self.zmqContext.socket(zmq.REQ) + # time to wait for the sender to give a confirmation of the signal +# self.senderComSocket.RCVTIMEO = self.socketResponseTimeout + connectionStr = "tcp://{ip}:{port}".format(ip=self.senderComIp, port=self.senderComPort) + self.senderComSocket.connect(connectionStr) + self.log.info("senderComSocket started (connect) for '" + connectionStr + "'") + + # using a Poller to implement the senderComSocket timeout (in older ZMQ version there is no option RCVTIMEO) + self.poller = zmq.Poller() + self.poller.register(self.senderComSocket, zmq.POLLIN) + + # create socket to communicate with Coordinator + self.exchangeSocket = self.zmqContext.socket(zmq.PAIR) + connectionStr = "tcp://{ip}:{port}".format(ip=self.exchangeIp, port=self.exchangePort) + self.exchangeSocket.connect(connectionStr) + self.log.debug("exchangeSocket started (connect) for '" + connectionStr + "'") + + # create poller to differentiate between the different data stream port + self.dataPoller = zmq.Poller() + + # create sockets to retrieve data from Sender + for dataStreamPort in self.dataStreamPorts: + dataStreamSocket = self.zmqContext.socket(zmq.PULL) + connectionStr = "tcp://{ip}:{port}".format(ip=self.dataStreamIp, port=dataStreamPort) + dataStreamSocket.connect(connectionStr) + self.log.info("dataStreamSocket started (connect) for '" + connectionStr + "'") + + self.dataStreamSockets.append(dataStreamSocket) + + self.dataPoller.register(dataStreamSocket, zmq.POLLIN) + + + def startReceiving(self): + #run loop, and wait for incoming messages + loopCounter = 0 #counter of total received messages + continueReceiving = True #receiving will stop if value gets False + self.log.debug("Waiting for new messages...") + while continueReceiving: + try: + self.pollDifferentSockets() + loopCounter+=1 + except KeyboardInterrupt: + self.log.debug("Keyboard interrupt detected. Stop receiving.") + continueReceiving = False + break + except: + self.log.error("receive message...failed.") + self.log.error(sys.exc_info()) + continueReceiving = False + + self.log.info("shutting down receiver...") + try: + self.stopReceiving() + self.log.debug("shutting down receiver...done.") + except: + self.log.error(sys.exc_info()) + self.log.error("shutting down receiver...failed.") + + + def pollDifferentSockets(self): + socks = dict(self.dataPoller.poll()) + + for dataStreamSocket in self.dataStreamSockets: + if dataStreamSocket in socks and socks[dataStreamSocket] == zmq.POLLIN: + self.combineMessage(dataStreamSocket) + + + def combineMessage(self, dataStreamSocket): receivingMessages = True #save all chunks to file while receivingMessages: - multipartMessage = zmqDataStreamSocket.recv_multipart() + multipartMessage = dataStreamSocket.recv_multipart() #extract multipart message try: @@ -185,7 +238,6 @@ class FileReceiver: break filename = self.generateTargetFilepath(payloadMetadataDict) fileModTime = payloadMetadataDict["fileModificationTime"] - print "receiving multipart message from data pipe: ", filename self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename)) # send the file to the coordinator to add it to the ring buffer @@ -194,33 +246,6 @@ class FileReceiver: self.exchangeSocket.send(message) - def startReceiving(self): - #run loop, and wait for incoming messages - loopCounter = 0 #counter of total received messages - continueReceiving = True #receiving will stop if value gets False - self.log.debug("Waiting for new messages...") - while continueReceiving: - try: - self.combineMessage(self.zmqDataStreamSocket) - loopCounter+=1 - except KeyboardInterrupt: - self.log.debug("Keyboard interrupt detected. Stop receiving.") - continueReceiving = False - break - except: - self.log.error("receive message...failed.") - self.log.error(sys.exc_info()) - continueReceiving = False - - self.log.info("shutting down receiver...") - try: - self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext) - self.log.debug("shutting down receiver...done.") - except: - self.log.error(sys.exc_info()) - self.log.error("shutting down receiver...failed.") - - def generateTargetFilepath(self,configDict): """ generates full path where target file will saved to. @@ -311,15 +336,16 @@ class FileReceiver: raise Exception(errorMessage) - def stopReceiving(self, zmqDataStreamSocket, zmqContext, sendToSender = True): + def stopReceiving(self, sendToSender = True): self.log.debug("stopReceiving...") - try: - zmqDataStreamSocket.close(0) - self.log.debug("closing zmqDataStreamSocket...done.") - except: - self.log.error("closing zmqDataStreamSocket...failed.") - self.log.error(sys.exc_info()) + for dataStreamSocket in self.dataStreamSockets: + try: + dataStreamSocket.close(0) + self.log.debug("closing dataStreamSocket...done.") + except: + self.log.error("closing dataStreamSocket...failed.") + self.log.error(sys.exc_info()) self.log.debug("sending exit signal to coordinator...") self.exchangeSocket.send("Exit") @@ -356,7 +382,7 @@ class FileReceiver: self.log.debug("closing signal communication sockets...done") try: - zmqContext.destroy() + self.zmqContext.destroy() self.log.debug("closing zmqContext...done.") except: self.log.error("closing zmqContext...failed.") diff --git a/src/sender.py b/src/sender.py index 79f47e0c..5783b7ef 100644 --- a/src/sender.py +++ b/src/sender.py @@ -51,7 +51,7 @@ def argumentParsing(): receiverComIp = config.get('asection', 'receiverComIp') receiverComPort = config.get('asection', 'receiverComPort') liveViewerIp = config.get('asection', 'liveViewerIp') - liveViewerPort = config.get('asection', 'liveViewerPort') + liveViewerPorts = json.loads(config.get('asection', 'liveViewerPorts')) ondaIps = json.loads(config.get('asection', 'ondaIps')) ondaPorts = json.loads(config.get('asection', 'ondaPorts')) receiverWhiteList = json.loads(config.get('asection', 'receiverWhiteList')) @@ -97,8 +97,8 @@ def argumentParsing(): help="Port number to receive signals from the receiver (default=" + str(receiverComPort) + ")") parser.add_argument("--liveViewerIp" , type=str, default=liveViewerIp, help="IP of liveViewer-socket to send new files to (default=" + str(liveViewerIp) + ")") - parser.add_argument("--liveViewerPort" , type=str, default=liveViewerPort, - help="Port number of liveViewer-socket to send data to (default=" + str(liveViewerPort) + ")") + parser.add_argument("--liveViewerPorts" , type=str, default=liveViewerPorts, + help="Ports number of liveViewer-socket to send data to; there needs to be one entry for each streams (default=" + str(liveViewerPorts) + ")") parser.add_argument("--ondaIps" , type=str, default=ondaIps, help="IPs to communicate with onda/realtime analysis; there needs to be one entry for each streams (default=" + str(ondaIps) + ")") parser.add_argument("--ondaPorts" , type=str, default=ondaPorts, @@ -149,7 +149,7 @@ class Sender(): cleanerComPort = None receiverComPort = None liveViewerIp = None - liveViewerPort = None + liveViewerPorts = None ondaIps = None ondaPorts = None receiverWhiteList = None @@ -183,7 +183,7 @@ class Sender(): self.receiverComIp = arguments.receiverComIp self.receiverComPort = arguments.receiverComPort self.liveViewerIp = arguments.liveViewerIp - self.liveViewerPort = arguments.liveViewerPort + self.liveViewerPorts = arguments.liveViewerPorts self.ondaIps = arguments.ondaIps self.ondaPorts = arguments.ondaPorts self.receiverWhiteList = arguments.receiverWhiteList @@ -223,7 +223,7 @@ class Sender(): self.receiverComIp, self.receiverComPort, self.receiverWhiteList, self.parallelDataStreams, self.chunkSize, self.cleanerIp, self.cleanerPort, - self.liveViewerIp, self.liveViewerPort, + self.liveViewerIp, self.liveViewerPorts, self.ondaIps, self.ondaPorts, self.useDataStream, self.zmqContext) diff --git a/src/sender/FileMover.py b/src/sender/FileMover.py index 33527453..c6bf2e58 100644 --- a/src/sender/FileMover.py +++ b/src/sender/FileMover.py @@ -23,7 +23,7 @@ class FileMover(): receiverComIp = None # ip for socket to communicate with receiver receiverComPort = None # port for socket to communicate receiver liveViewer = None - liveViewerPort = None + liveViewerPorts = [] ondaIps = [] ondaPorts = [] receiverWhiteList = None @@ -46,9 +46,9 @@ class FileMover(): receiverComIp, receiverComPort, receiverWhiteList, parallelDataStreams, chunkSize, cleanerIp, cleanerPort, - liveViewerIp, liveViewerPort, + liveViewerIp, liveViewerPorts, ondaIps, ondaPorts, - useDataStream = True, + useDataStream, context = None): # assert isinstance(context, zmq.sugar.context.Context) @@ -60,10 +60,11 @@ class FileMover(): self.dataStreamPort = dataStreamPort self.cleanerIp = cleanerIp self.cleanerPort = cleanerPort - self.receiverComIp = receiverComIp # ip for socket to communicate with receiver; + self.receiverComIp = receiverComIp # ip for socket to communicate with receiver; self.receiverComPort = receiverComPort self.liveViewerIp = liveViewerIp - self.liveViewerPort = liveViewerPort + self.liveViewerPorts = liveViewerPorts # needs a list of ports because every WorkerProcess + # binds to one port (this is not possible for only one port self.ondaIps = ondaIps self.ondaPorts = ondaPorts @@ -148,7 +149,7 @@ class FileMover(): self.cleanerIp, self.cleanerPort, self.liveViewerIp, - self.liveViewerPort, + self.liveViewerPorts[processNumber], self.ondaIps[processNumber], self.ondaPorts[processNumber], self.useDataStream diff --git a/src/sender/WorkerProcess.py b/src/sender/WorkerProcess.py index e2e14dc9..33318d18 100644 --- a/src/sender/WorkerProcess.py +++ b/src/sender/WorkerProcess.py @@ -30,7 +30,7 @@ class WorkerProcess(): liveViewerSocket = None ondaComSocket = None - useDataStream = True # boolian to inform if the data should be send to the data stream pipe (to the storage system) + useDataStream = False # boolian to inform if the data should be send to the data stream pipe (to the storage system) useLiveViewer = False # boolian to inform if the receiver for the live viewer is running useRealTimeAnalysis = False # boolian to inform if the receiver for realtime-analysis is running @@ -40,7 +40,7 @@ class WorkerProcess(): log = None def __init__(self, id, dataStreamIp, dataStreamPort, chunkSize, cleanerIp, cleanerPort, liveViewerIp, liveViewerPort, ondaIp, ondaPort, - useDataStream = True, context = None): + useDataStream, context = None): self.id = id self.dataStreamIp = dataStreamIp self.dataStreamPort = dataStreamPort @@ -52,6 +52,8 @@ class WorkerProcess(): self.ondaIp = ondaIp self.ondaPort = ondaPort + self.useDataStream = useDataStream + #initialize router if context: self.zmqContextForWorker = context @@ -407,10 +409,12 @@ class WorkerProcess(): #send data to the live viewer if socketDict.has_key("liveViewer"): socketDict["liveViewer"].send_multipart(chunkPayload, zmq.NOBLOCK) + self.log.info("Sending message part from file " + str(sourceFilePathFull) + " to LiveViewer") # send data to onda if socketDict.has_key("onda"): socketDict["onda"].send_multipart(payloadAll, zmq.NOBLOCK) + self.log.info("Sending from file " + str(sourceFilePathFull) + " to OnDA") self.requestFromOnda = False #close file -- GitLab