From 456465b666fc053b27347e542c1d38a7f9316753 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Fri, 19 Feb 2016 14:20:57 +0100 Subject: [PATCH] Added fixed streaming option --- conf/dataManager.conf | 59 ++++++ src/DataManager.py | 384 +++++++++++++++-------------------- src/sender/DataDispatcher.py | 41 +++- src/sender/SignalHandler.py | 15 +- src/sender/TaskProvider.py | 4 +- 5 files changed, 259 insertions(+), 244 deletions(-) create mode 100644 conf/dataManager.conf diff --git a/conf/dataManager.conf b/conf/dataManager.conf new file mode 100644 index 00000000..f2456285 --- /dev/null +++ b/conf/dataManager.conf @@ -0,0 +1,59 @@ +# Directory you want to monitor for changes +# Inside this directory only the subdirectories "commissioning", "current" and "local" are monitored +monitoredDir = /space/projects/live-viewer/data/source +#monitoredDir = /home/kuhnm/Arbeit/live-viewer/data/source +#monitoredDir = /rd +# Target to move the files into +localTarget = /space/projects/live-viewer/data/target +#localTarget = /home/kuhnm/Arbeit/live-viewer/data/target +#localTarget = /gpfs + +# Event type of files to be monitored (options are: "IN_CLOSE_WRITE", "IN_MOVED_TO", ...) +monitoredEventType = IN_CLOSE_WRITE +#monitoredEventType = IN_MOVED_TO +# Subdirectories of watchDir to be monitored +monitoredSubdirs = ["commissioning", "current", "local"] +# The formats to be monitored, files in an other format will be be neglected +monitoredFormats = [".tif", ".cbf"] + +# List of hosts allowed to connect +whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "haspp11eval01", "it-hpc-cxi03"] +#whitelist = ["localhost", "haspp11eval01", "it-hpc-cxi03"] + +# Number of parallel data streams +# if this number is modifified, the port numbers also have to be adjusted +parallelDataStreams = 1 + +# Enable ZMQ pipe into storage system (if set to False: the file is moved into the localTarget) +useDataStream = True +# Fixed host to send the data to with highest priority +fixedStreamHost = zitpcx19282 +# Fixed Port to send the data to with highest priority +fixedStreamPort = 6006 + +# Port number to receive signals from +comPort = 50000 + +# ZMQ port to forward requests +requestFwPort = 50001 +# ZMQ port to get new requests +requestPort = 50100 + +# ZMQ-router port which coordinates the load-balancing to the worker-processes +routerPort = 50002 + +# ZMQ-pull-socket port which deletes/moves given files +cleanerPort = 50003 + +# Chunk size of file-parts getting send via zmq +chunkSize = 10485760 ; # = 1024*1024*10 +#chunkSize = 1073741824 ; # = 1024*1024*1024 + +# Path where the logfile will be created +logfilePath = /space/projects/live-viewer/logs +#logfilePath = /home/kuhnm/Arbeit/live-viewer/logs +#logfilePath = /home/p11user/live-viewer/logs + +# Filename used for logging +logfileName = zmq_sender.log + diff --git a/src/DataManager.py b/src/DataManager.py index 12ca11e0..96db3296 100644 --- a/src/DataManager.py +++ b/src/DataManager.py @@ -1,9 +1,6 @@ -from __builtin__ import open, type - __author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>' -import time import argparse import zmq import os @@ -27,7 +24,7 @@ from version import __version__ def argumentParsing(): - configFile = CONFIG_PATH + os.sep + "sender.conf" + configFile = CONFIG_PATH + os.sep + "dataManager.conf" config = ConfigParser.RawConfigParser() config.readfp(helperScript.FakeSecHead(open(configFile))) @@ -35,38 +32,28 @@ def argumentParsing(): logfilePath = config.get('asection', 'logfilePath') logfileName = config.get('asection', 'logfileName') - watchDir = config.get('asection', 'watchDir') + comPort = config.get('asection', 'comPort') + whitelist = json.loads(config.get('asection', 'whitelist')) + + requestPort = config.get('asection', 'requestPort') + requestFwPort = config.get('asection', 'requestFwPort') + + monitoredDir = config.get('asection', 'monitoredDir') monitoredEventType = config.get('asection', 'monitoredEventType') monitoredSubdirs = json.loads(config.get('asection', 'monitoredSubdirs')) monitoredFormats = json.loads(config.get('asection', 'monitoredFormats')) - fileEventIp = config.get('asection', 'fileEventIp') - fileEventPort = config.get('asection', 'fileEventPort') useDataStream = config.getboolean('asection', 'useDataStream') - dataStreamIp = config.get('asection', 'dataStreamIp') - dataStreamPort = config.get('asection', 'dataStreamPort') - cleanerTargetPath = config.get('asection', 'cleanerTargetPath') - cleanerIp = config.get('asection', 'cleanerIp') - cleanerPort = config.get('asection', 'cleanerPort') - routerPort = config.get('asection', 'routerPort') - - receiverComIp = config.get('asection', 'receiverComIp') - receiverComPort = config.get('asection', 'receiverComPort') - ondaIps = json.loads(config.get('asection', 'ondaIps')) - ondaPorts = json.loads(config.get('asection', 'ondaPorts')) - receiverWhiteList = json.loads(config.get('asection', 'receiverWhiteList')) + fixedStreamHost = config.get('asection', 'fixedStreamHost') + fixedStreamPort = config.get('asection', 'fixedStreamPort') parallelDataStreams = config.get('asection', 'parallelDataStreams') chunkSize = int(config.get('asection', 'chunkSize')) - useRingbuffer = config.getboolean('asection', 'useRingbuffer') - cleanerExchangePort = config.get('asection', 'cleanerExchangePort') + routerPort = config.get('asection', 'routerPort') - liveViewerComPort = config.get('asection', 'liveViewerComPort') - liveViewerComIp = config.get('asection', 'liveViewerComIp') - liveViewerWhiteList = json.loads(config.get('asection', 'liveViewerWhiteList')) - maxRingBufferSize = config.get('asection', 'maxRingBufferSize') - maxQueueSize = config.get('asection', 'maxQueueSize') + localTarget = config.get('asection', 'localTarget') + cleanerPort = config.get('asection', 'cleanerPort') parser = argparse.ArgumentParser() @@ -82,77 +69,47 @@ def argumentParsing(): help = "Display logging on screen (options are CRITICAL, ERROR, WARNING, INFO, DEBUG)", default = False ) - parser.add_argument("--watchDir" , type = str, - help = "Dir you want to monitor for changes; inside this directory only the specified \ - subdirectories are monitred (default=" + str(watchDir) + ")", - default = watchDir ) + parser.add_argument("--comPort" , type = str, + help = "Port number to receive signals (default=" + str(comPort) + ")", + default = comPort ) + parser.add_argument("--whitelist" , type = str, + help = "List of hosts allowed to connect (default=" + str(whitelist) + ")", + default = whitelist ) + + parser.add_argument("--requestPort" , type = str, + help = "ZMQ port to get new requests (default=" + str(requestPort) + ")", + default = requestPort ) + parser.add_argument("--requestFwPort" , type = str, + help = "ZMQ port to forward requests (default=" + str(requestFwPort) + ")", + default = requestFwPort ) + + parser.add_argument("--monitoredDir" , type = str, + help = "Dirextory you want to monitor for changes; inside this directory only the specified \ + subdirectories are monitred (default=" + str(monitoredDir) + ")", + default = monitoredDir ) parser.add_argument("--monitoredEventType" , type = str, help = "Event type of files to be monitored (default=" + str(monitoredEventType) + ")", default = monitoredEventType ) parser.add_argument("--monitoredSubdirs" , type = str, - help = "Subdirectories of watchDirs to be monitored (default=" + str(monitoredSubdirs) + ")", + help = "Subdirectories of 'monitoredDirs' to be monitored (default=" + str(monitoredSubdirs) + ")", default = monitoredSubdirs ) parser.add_argument("--monitoredFormats" , type = str, help = "The formats to be monitored, files in an other format will be be neglected \ (default=" + str(monitoredFormats) + ")", default = monitoredFormats ) - parser.add_argument("--fileEventIp" , type = str, - help = "ZMQ endpoint (IP-address) to send file events to for the live viewer \ - (default=" + str(fileEventIp) + ")", - default = fileEventIp ) - parser.add_argument("--fileEventPort" , type = str, - help = "ZMQ endpoint (port) to send file events to for the live viewer \ - (default=" + str(fileEventPort) + ")", - default = fileEventPort ) parser.add_argument("--useDataStream" , type = str, help = "Enable ZMQ pipe into storage system (if set to false: the file is moved \ - into the cleanerTargetPath) (default=" + str(useDataStream) + ")", + into the localTarget) (default=" + str(useDataStream) + ")", default = useDataStream ) - parser.add_argument("--dataStreamIp" , type = str, - help = "IP of dataStream-socket to push new files to \ - (default=" + str(dataStreamIp) + ")", - default = dataStreamIp ) - parser.add_argument("--dataStreamPort" , type = str, - help = "Port number of dataStream-socket to push new \ - files to (default=" + str(dataStreamPort) + ")", - default = dataStreamPort ) - parser.add_argument("--cleanerTargetPath" , type = str, - help = "Target to move the files into (default=" + str(cleanerTargetPath) + ")", - default = cleanerTargetPath ) - parser.add_argument("--cleanerIp" , type = str, - help = "ZMQ-pull-socket IP which deletes/moves given files \ - (default=" + str(cleanerIp) + ")", - default = cleanerIp ) - parser.add_argument("--cleanerPort" , type = str, - help = "ZMQ-pull-socket port which deletes/moves given file \ - (default=" + str(cleanerPort) + ")", - default = cleanerPort ) - parser.add_argument("--routerPort" , type = str, - help = "ZMQ-router port which coordinates the load-balancing \ - to the worker-processes (default=" + str(routerPort) + ")", - default = routerPort ) - - parser.add_argument("--receiverComIp" , type = str, - help = "IP receive signals from the receiver (default=" + str(receiverComIp) + ")", - default = receiverComIp ) - parser.add_argument("--receiverComPort" , type = str, - help = "Port number to receive signals from the receiver \ - (default=" + str(receiverComPort) + ")", - default = receiverComPort ) - parser.add_argument("--ondaIps" , type = str, - help = "IPs to communicate with onda/realtime analysis; there needs to be one entry \ - for each streams (default=" + str(ondaIps) + ")", - default = ondaIps ) - parser.add_argument("--ondaPorts" , type = str, - help = "Ports to communicate with onda/realtime analysis; there needs to be one entry \ - for each streams (default=" + str(ondaPorts) + ")", - default = ondaPorts ) - parser.add_argument("--receiverWhiteList" , type = str, - help = "List of hosts allowed to connect to the sender \ - (default=" + str(receiverWhiteList) + ")", - default = receiverWhiteList ) - + parser.add_argument("--fixedStreamHost" , type = str, + help = "Fixed host to send the data to with highest priority \ + (only active is useDataStream is set; default=" + str(fixedStreamHost) + ")", + default = fixedStreamHost ) + parser.add_argument("--fixedStreamPort" , type = str, + help = "Fixed port to send the data to with highest priority \ + (only active is useDataStream is set; default=" + str(fixedStreamPort) + ")", + default = fixedStreamPort ) parser.add_argument("--parallelDataStreams", type = int, help = "Number of parallel data streams (default=" + str(parallelDataStreams) + ")", default = parallelDataStreams ) @@ -160,30 +117,18 @@ def argumentParsing(): help = "Chunk size of file-parts getting send via ZMQ (default=" + str(chunkSize) + ")", default = chunkSize ) - parser.add_argument("--useRingbuffer" , type = str, - help = "Put the data into a ringbuffer followed by a queue to delay the \ - removal of the files(default=" + str(useRingbuffer) + ")", - default = useRingbuffer ) - parser.add_argument("--cleanerExchangePort", type = str, - help = "Port number to exchange data and signals between Cleaner and \ - LiveViewCommunicator (default=" + str(cleanerExchangePort) + ")", - default = cleanerExchangePort ) - parser.add_argument("--liveViewerComIp" , type = str, - help = "IP to bind communication to LiveViewer to (default=" + str(liveViewerComIp) + ")", - default = liveViewerComIp ) - parser.add_argument("--liveViewerComPort" , type = str, - help = "Port number to communicate with live viewer (default=" + str(liveViewerComPort) + ")", - default = liveViewerComPort ) - parser.add_argument("--liveViewerWhiteList", type = str, - help = "List of hosts allowed to connect to the receiver \ - (default=" + str(liveViewerWhiteList) + ")", - default = liveViewerWhiteList ) - parser.add_argument("--maxRingBufferSize" , type = int, - help = "Size of the ring buffer for the live viewer (default=" + str(maxRingBufferSize) + ")", - default = maxRingBufferSize ) - parser.add_argument("--maxQueueSize" , type = int, - help = "Size of the queue for the live viewer (default=" + str(maxQueueSize) + ")", - default = maxQueueSize ) + parser.add_argument("--routerPort" , type = str, + help = "ZMQ-router port which coordinates the load-balancing \ + to the worker-processes (default=" + str(routerPort) + ")", + default = routerPort ) + + parser.add_argument("--localTarget" , type = str, + help = "Target to move the files into (default=" + str(localTarget) + ")", + default = localTarget ) + parser.add_argument("--cleanerPort" , type = str, + help = "ZMQ-pull-socket port which deletes/moves given file \ + (default=" + str(cleanerPort) + ")", + default = cleanerPort ) arguments = parser.parse_args() @@ -193,13 +138,10 @@ def argumentParsing(): verbose = arguments.verbose onScreen = arguments.onScreen - watchDir = str(arguments.watchDir) - + monitoredDir = str(arguments.monitoredDir) monitoredSubdirs = arguments.monitoredSubdirs - cleanerTargetPath = str(arguments.cleanerTargetPath) + localTarget = str(arguments.localTarget) - ondaIps = arguments.ondaIps - ondaPorts = arguments.ondaPorts parallelDataStreams = arguments.parallelDataStreams #enable logging @@ -207,16 +149,13 @@ def argumentParsing(): # check if directories exists helperScript.checkDirExistance(logfilePath) - helperScript.checkDirExistance(watchDir) - helperScript.checkSubDirExistance(watchDir, monitoredSubdirs) - helperScript.checkDirExistance(cleanerTargetPath) + helperScript.checkDirExistance(monitoredDir) + helperScript.checkSubDirExistance(monitoredDir, monitoredSubdirs) + helperScript.checkDirExistance(localTarget) # check if logfile is writable helperScript.checkLogFileWritable(logfilePath, logfileName) - # check if there are enough ports specified (OnDA), corresponding to the number of streams - helperScript.checkStreamConfig(ondaIps, ondaPorts, parallelDataStreams) - return arguments @@ -224,37 +163,36 @@ class Sender(): def __init__(self): arguments = argumentParsing() - self.watchDir = arguments.watchDir - self.monitoredEventType = arguments.monitoredEventType - self.monitoredSubdirs = arguments.monitoredSubdirs - self.monitoredFormats = arguments.monitoredFormats - self.fileEventIp = arguments.fileEventIp - self.fileEventPort = arguments.fileEventPort + self.comPort = arguments.comPort + self.whitelist = arguments.whitelist - self.useDataStream = arguments.useDataStream + self.requestPort = arguments.requestPort + self.requestFwPort = arguments.requestFwPort - self.dataStreamIp = arguments.dataStreamIp - self.dataStreamPort = arguments.dataStreamPort - self.cleanerTargetPath = arguments.cleanerTargetPath - self.cleanerIp = arguments.cleanerIp - self.cleanerPort = arguments.cleanerPort - self.routerPort = arguments.routerPort - self.receiverComIp = arguments.receiverComIp - self.receiverComPort = arguments.receiverComPort - self.ondaIps = arguments.ondaIps - self.ondaPorts = arguments.ondaPorts - self.receiverWhiteList = arguments.receiverWhiteList + self.eventDetectorConfig = { + "configType" : "inotifyx", + "monDir" : arguments.monitoredDir, + "monEventType" : arguments.monitoredEventType, + "monSubdirs" : arguments.monitoredSubdirs, + "monSuffixes" : arguments.monitoredFormats + } + + if arguments.useDataStream: + self.fixedStreamId = "{host}:{port}".format( host=arguments.fixedStreamHost, port=arguments.fixedStreamPort ) + else: + self.fixedStreamId = None self.parallelDataStreams = arguments.parallelDataStreams self.chunkSize = arguments.chunkSize - self.useRingbuffer = arguments.useRingbuffer - self.cleanerExchangePort = arguments.cleanerExchangePort - self.liveViewerComPort = arguments.liveViewerComPort - self.liveViewerComIp = arguments.liveViewerComIp - self.liveViewerWhiteList = arguments.liveViewerWhiteList - self.maxRingBufferSize = arguments.maxRingBufferSize - self.maxQueueSize = arguments.maxQueueSize + self.routerPort = arguments.routerPort + + self.localTarget = arguments.localTarget + self.cleanerPort = arguments.cleanerPort + + self.signalHandlerPr = None + self.taskProviderPr = None + self.dataDispatcherPr = None logging.info("Version: " + str(__version__)) @@ -274,7 +212,6 @@ class Sender(): requestPort = "6002" routerPort = "7000" chunkSize = 10485760 ; # = 1024*1024*10 = 10 MiB - useDataStream = False eventDetectorConfig = { "configType" : "inotifyx", "monDir" : BASE_PATH + "/data/source", @@ -282,10 +219,7 @@ class Sender(): "monSubdirs" : ["commissioning", "current", "local"], "monSuffixes" : [".tif", ".cbf"] } - - self.signalHandlerPr = None - self.taskProviderPr = None - self.dataDispatcherPr = None + localTarget = BASE_PATH + "/data/target" logging.info("Start SignalHandler...") @@ -293,13 +227,16 @@ class Sender(): self.signalHandlerPr.start() logging.debug("Start SignalHandler...done") + # needed, because otherwise the requests for the first files are not forwarded properly + time.sleep(0.5) + logging.info("Start TaskProvider...") self.taskProviderPr = Process ( target = TaskProvider, args = (eventDetectorConfig, requestFwPort, routerPort) ) self.taskProviderPr.start() logging.info("Start TaskProvider...done") logging.info("Start DataDispatcher...") - self.dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, routerPort, chunkSize, useDataStream) ) + self.dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, routerPort, chunkSize, self.fixedStreamId, localTarget) ) self.dataDispatcherPr.start() logging.info("Start DataDispatcher...done") @@ -320,81 +257,92 @@ class Sender(): if __name__ == '__main__': freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows - import cPickle - BASE_PATH = "/space/projects/live-viewer" + test = True + + if test: + import time + import cPickle + from shutil import copyfile + + #enable logging + helperScript.initLogging(BASE_PATH + "/logs/dataManager.log", verbose=True, onScreenLogLevel="debug") + + class Test_Receiver_Stream(): + def __init__(self, comPort, receivingPort, receivingPort2): + context = zmq.Context.instance() + + self.comSocket = context.socket(zmq.REQ) + connectionStr = "tcp://zitpcx19282:" + comPort + self.comSocket.connect(connectionStr) + logging.info("=== comSocket connected to " + connectionStr) + + self.receivingSocket = context.socket(zmq.PULL) + connectionStr = "tcp://0.0.0.0:" + receivingPort + self.receivingSocket.bind(connectionStr) + logging.info("=== receivingSocket connected to " + connectionStr) + + self.receivingSocket2 = context.socket(zmq.PULL) + connectionStr = "tcp://0.0.0.0:" + receivingPort2 + self.receivingSocket2.bind(connectionStr) + logging.info("=== receivingSocket2 connected to " + connectionStr) + + self.sendSignal("START_STREAM", receivingPort, 1) + self.sendSignal("START_STREAM", receivingPort2, 0) + + self.run() + + def sendSignal(self, signal, ports, prio = None): + logging.info("=== sendSignal : " + signal + ", " + str(ports)) + sendMessage = ["0.0.1", signal] + targets = [] + if type(ports) == list: + for port in ports: + targets.append(["zitpcx19282:" + port, prio]) + else: + targets.append(["zitpcx19282:" + ports, prio]) + targets = cPickle.dumps(targets) + sendMessage.append(targets) + self.comSocket.send_multipart(sendMessage) + receivedMessage = self.comSocket.recv() + logging.info("=== Responce : " + receivedMessage ) + + def run(self): + try: + while True: + recv_message = self.receivingSocket.recv_multipart() + logging.info("=== received: " + str(cPickle.loads(recv_message[0]))) + recv_message = self.receivingSocket2.recv_multipart() + logging.info("=== received 2: " + str(cPickle.loads(recv_message[0]))) + except KeyboardInterrupt: + pass + + def __exit__(self): + self.receivingSocket.close(0) + self.receivingSocket2.close(0) + context.destroy() + + + comPort = "6000" + receivingPort = "6005" + receivingPort2 = "6006" + + testPr = Process ( target = Test_Receiver_Stream, args = (comPort, receivingPort, receivingPort2)) + testPr.start() - #enable logging - helperScript.initLogging(BASE_PATH + "/logs/dataManager.log", verbose=True, onScreenLogLevel="debug") - - class Test_Receiver_Stream(): - def __init__(self, comPort, receivingPort, receivingPort2): - context = zmq.Context.instance() - - self.comSocket = context.socket(zmq.REQ) - connectionStr = "tcp://zitpcx19282:" + comPort - self.comSocket.connect(connectionStr) - logging.info("=== comSocket connected to " + connectionStr) - - self.receivingSocket = context.socket(zmq.PULL) - connectionStr = "tcp://0.0.0.0:" + receivingPort - self.receivingSocket.bind(connectionStr) - logging.info("=== receivingSocket connected to " + connectionStr) - - self.receivingSocket2 = context.socket(zmq.PULL) - connectionStr = "tcp://0.0.0.0:" + receivingPort2 - self.receivingSocket2.bind(connectionStr) - logging.info("=== receivingSocket2 connected to " + connectionStr) - - self.sendSignal("START_STREAM", receivingPort, 1) - self.sendSignal("START_STREAM", receivingPort2, 0) - - self.run() - - def sendSignal(self, signal, ports, prio = None): - logging.info("=== sendSignal : " + signal + ", " + str(ports)) - sendMessage = ["0.0.1", signal] - targets = [] - if type(ports) == list: - for port in ports: - targets.append(["zitpcx19282:" + port, prio]) - else: - targets.append(["zitpcx19282:" + ports, prio]) - targets = cPickle.dumps(targets) - sendMessage.append(targets) - self.comSocket.send_multipart(sendMessage) - receivedMessage = self.comSocket.recv() - logging.info("=== Responce : " + receivedMessage ) - - def run(self): - try: - while True: - recv_message = self.receivingSocket.recv_multipart() - logging.info("=== received: " + str(cPickle.loads(recv_message[0]))) - recv_message = self.receivingSocket2.recv_multipart() - logging.info("=== received 2: " + str(cPickle.loads(recv_message[0]))) - except KeyboardInterrupt: - pass - - def __exit__(self): - self.receivingSocket.close(0) - self.receivingSocket2.close(0) - context.destroy() - - - comPort = "6000" - receivingPort = "6005" - receivingPort2 = "6006" - - testPr = Process ( target = Test_Receiver_Stream, args = (comPort, receivingPort, receivingPort2)) - testPr.start() sender = Sender() try: while True: - pass + if test: + copyfile(BASE_PATH + "/test_file.cbf", BASE_PATH + "/data/source/local/raw/100.cbf") + time.sleep(1) + break + else: + pass finally: - testPr.terminate() + if test: + testPr.terminate() sender.stop() diff --git a/src/sender/DataDispatcher.py b/src/sender/DataDispatcher.py index 2a18fb33..918ef49f 100644 --- a/src/sender/DataDispatcher.py +++ b/src/sender/DataDispatcher.py @@ -8,6 +8,7 @@ import sys import logging import traceback import cPickle +import shutil #path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) SHARED_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) + os.sep + "shared" @@ -24,7 +25,7 @@ import helperScript # class DataDispatcher(): - def __init__(self, id, routerPort, chunkSize, useDataStream, context = None): + def __init__(self, id, routerPort, chunkSize, fixedStreamId, localTarget = None, context = None): self.log = self.getLogger() self.id = id @@ -38,6 +39,9 @@ class DataDispatcher(): self.routerSocket = None + self.fixedStreamId = fixedStreamId + self.localTarget = localTarget + # dict with informations of all open sockets to which a data stream is opened (host, port,...) self.openConnections = dict() @@ -99,26 +103,32 @@ class DataDispatcher(): self.log.debug("DataDispatcher-" + str(self.id) + ": waiting for new job") message = self.routerSocket.recv_multipart() self.log.debug("DataDispatcher-" + str(self.id) + ": new job received") + self.log.debug("message = " + str(message)) if len(message) >= 2: workload = cPickle.loads(message[0]) targets = cPickle.loads(message[1]) - # sort the target list by thge priority + if self.fixedStreamId: + targets.insert(0,[self.fixedStreamId, 0]) + # sort the target list by the priority targets = sorted(targets, key=lambda target: target[1]) else: - workload = cPickle.loads(message) - targets = None - - finished = workload == b"EXIT" + finished = message[0] == b"EXIT" if finished: self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".") break + workload = cPickle.loads(message[0]) + if self.fixedStreamId: + targets = [[self.fixedStreamId, 0]] + else: + targets = None + # get metadata of the file try: self.log.debug("Getting file metadata") - sourceFile, metadata = self.getMetadata(workload) + sourceFile, targetFile, metadata = self.getMetadata(workload) except Exception as e: self.log.error("Building of metadata dictionary failed for workload: " + str(workload) + ".") self.log.debug("Error was: " + str(e)) @@ -193,6 +203,13 @@ class DataDispatcher(): sourceFilePath = os.path.normpath(sourcePath + os.sep + relativePath) sourceFilePathFull = os.path.join(sourceFilePath, filename) + #TODO combine better with sourceFile... (for efficiency) + if self.localTarget: + targetFilePath = os.path.normpath(self.localTarget + os.sep + relativePath) + targetFilePathFull = os.path.join(targetFilePath, filename) + else: + targetFilePathFull = None + try: #for quick testing set filesize of file as chunksize self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...") @@ -228,7 +245,7 @@ class DataDispatcher(): self.log.debug("Error was: " + str(e)) raise Exception(e) - return sourceFilePathFull, metadata + return sourceFilePathFull, targetFilePathFull, metadata def sendData(self, targets, sourceFilepath, metadata): @@ -388,9 +405,12 @@ if __name__ == '__main__': receivingPort = "6005" receivingPort2 = "6006" chunkSize = 10485760 ; # = 1024*1024*10 = 10 MiB - useDataStream = False - dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, routerPort, chunkSize, useDataStream) ) + localTarget = BASE_PATH + "/data/target" + fixedStreamId = False + fixedStreamId = "zitpcx19282:6006" + + dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, routerPort, chunkSize, fixedStreamId, localTarget) ) dataDispatcherPr.start() context = zmq.Context.instance() @@ -419,6 +439,7 @@ if __name__ == '__main__': targets = [['zitpcx19282:6005', 1], ['zitpcx19282:6006', 0]] message = [ cPickle.dumps(metadata), cPickle.dumps(targets) ] +# message = [ cPickle.dumps(metadata)] time.sleep(1) diff --git a/src/sender/SignalHandler.py b/src/sender/SignalHandler.py index bdfe20d1..6fd50d15 100644 --- a/src/sender/SignalHandler.py +++ b/src/sender/SignalHandler.py @@ -24,8 +24,7 @@ import helperScript # class SignalHandler(): - def __init__ (self, whiteList, - comPort, signalFwPort, requestPort, + def __init__ (self, whiteList, comPort, signalFwPort, requestPort, context = None): # to get the logging only handling this class @@ -174,20 +173,12 @@ class SignalHandler(): incomingMessage = self.requestSocket.recv_multipart() self.log.debug("Received request: " + str(incomingMessage) ) -# print "allowedQueries", self.allowedQueries for index in range(len(self.allowedQueries)): - # no double requests from the same socket TODO do we want that? -# print "allowedQueries[", index, "]", self.allowedQueries[index] -# print "openRequVari", self.openRequVari -# print "incomingMessage", incomingMessage[1] for i in range(len(self.allowedQueries[index])): if incomingMessage[1] == self.allowedQueries[index][i][0]: self.openRequVari[index].append(self.allowedQueries[index][i]) self.log.debug("Add to openRequVari: " + str(self.allowedQueries[index][i]) ) -# print "openRequVari", self.openRequVari - - def checkSignal (self, incomingMessage): @@ -379,10 +370,6 @@ if __name__ == '__main__': self.requestFwSocket.close(0) self.context.destroy() -# def __del__(self): -# self.requestFwSocket.close(0) -# self.context.destroy() - helperScript.initLogging("/space/projects/live-viewer/logs/signalHandler.log", verbose=True, onScreenLogLevel="debug") diff --git a/src/sender/TaskProvider.py b/src/sender/TaskProvider.py index 2d4f7239..2ebcc7ee 100644 --- a/src/sender/TaskProvider.py +++ b/src/sender/TaskProvider.py @@ -97,9 +97,9 @@ class TaskProvider(): connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.routerPort ) try: self.routerSocket.bind(connectionStr) - self.log.info("Start to routeributing socket (bind): '" + str(connectionStr) + "'") + self.log.info("Start to router socket (bind): '" + str(connectionStr) + "'") except Exception as e: - self.log.error("Failed to start routeributing Socket (bind): '" + connectionStr + "'") + self.log.error("Failed to start router Socket (bind): '" + connectionStr + "'") self.log.debug("Error was:" + str(e)) -- GitLab