diff --git a/conf/dataManager.conf b/conf/dataManager.conf index 9064a30aeb5ab7c36147be044d42aded5700ac11..17cf6b714bfc43ef5d26a3b2e7f0d570f1d7bb3b 100644 --- a/conf/dataManager.conf +++ b/conf/dataManager.conf @@ -1,68 +1,68 @@ # Directory to be monitor for changes # Inside this directory only the subdirectories "commissioning", "current" and "local" are monitored -monitoredDir = /space/projects/zeromq-data-transfer/data/source -#monitoredDir = /home/kuhnm/Arbeit/zeromq-data-transfer/data/source -#monitoredDir = /rd +monitoredDir = /space/projects/zeromq-data-transfer/data/source +#monitoredDir = /home/kuhnm/Arbeit/zeromq-data-transfer/data/source +#monitoredDir = /rd # Target to move the files into -localTarget = /space/projects/zeromq-data-transfer/data/target -#localTarget = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target -#localTarget = /gpfs +localTarget = /space/projects/zeromq-data-transfer/data/target +#localTarget = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target +#localTarget = /gpfs # Type of event detector to use (options are: inotifyx, watchdog) -#eventDetectorType = inotifyx -eventDetectorType = watchdog +#eventDetectorType = inotifyx +eventDetectorType = watchdog # Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...) -monitoredEventType = IN_CLOSE_WRITE +monitoredEventType = IN_CLOSE_WRITE #monitoredEventType = IN_MOVED_TO # Subdirectories of watchDir to be monitored -monitoredSubdirs = ["commissioning", "current", "local"] +monitoredSubdirs = ["commissioning", "current", "local"] # The formats to be monitored, files in an other format will be be neglected -monitoredFormats = [".tif", ".cbf"] +monitoredFormats = [".tif", ".cbf"] # Time (in seconds) since last modification after which a file will be seen as closed -timeTillClosed = 2 +timeTillClosed = 2 # List of hosts allowed to connect -whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "haspp11eval01", "it-hpc-cxi03"] -#whitelist = ["localhost", "haspp11eval01", "it-hpc-cxi03"] +whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "haspp11eval01", "it-hpc-cxi03"] +#whitelist = ["localhost", "haspp11eval01", "it-hpc-cxi03"] # Number of parallel data streams # if this number is modifified, the port numbers also have to be adjusted -parallelDataStreams = 1 +numberOfStreams = 1 # Enable ZMQ pipe into storage system (if set to False: the file is moved into the localTarget) -useDataStream = True +useDataStream = True # Fixed host to send the data to with highest priority -fixedStreamHost = zitpcx19282 +fixedStreamHost = zitpcx19282 # Fixed Port to send the data to with highest priority -fixedStreamPort = 50100 +fixedStreamPort = 50100 # Port number to receive signals from -comPort = 50000 +comPort = 50000 # ZMQ port to forward requests -requestFwPort = 50002 +requestFwPort = 50002 # ZMQ port to get new requests -requestPort = 50001 +requestPort = 50001 # ZMQ-router port which coordinates the load-balancing to the worker-processes -routerPort = 50003 +routerPort = 50003 # ZMQ-pull-socket port which deletes/moves given files -cleanerPort = 50004 +cleanerPort = 50004 # Chunk size of file-parts getting send via zmq -chunkSize = 10485760 ; # = 1024*1024*10 -#chunkSize = 1073741824 ; # = 1024*1024*1024 +chunkSize = 10485760 ; # = 1024*1024*10 +#chunkSize = 1073741824 ; # = 1024*1024*1024 # Path where the logfile will be created -logfilePath = /space/projects/zeromq-data-transfer/logs -#logfilePath = /home/kuhnm/Arbeit/zeromq-data-transfer/logs -#logfilePath = /home/p11user/zeromq-data-transfer/logs -#logfilePath = /home/p11user/live-viewer/logs +logfilePath = /space/projects/zeromq-data-transfer/logs +#logfilePath = /home/kuhnm/Arbeit/zeromq-data-transfer/logs +#logfilePath = /home/p11user/zeromq-data-transfer/logs +#logfilePath = /home/p11user/live-viewer/logs # Filename used for logging -logfileName = dataManager.log +logfileName = dataManager.log # File size before rollover in B (linux only) -logfileSize = 10485760 ; #100 MB +logfileSize = 10485760 ; #10 MB diff --git a/conf/dataManager_windows.conf b/conf/dataManager_windows.conf old mode 100755 new mode 100644 index 2fcd1c625152ed1c25469b0b40a82ea25320c3e5..3a5c3a542070928e65da4ecf6306d58be3851eb8 --- a/conf/dataManager_windows.conf +++ b/conf/dataManager_windows.conf @@ -1,57 +1,59 @@ -# Directory to be monitor for changes -# Inside this directory only the subdirectories "commissioning", "current" and "local" are monitored -monitoredDir = D:\zeromq-data-transfer\data\source -# Target to move the files into -localTarget = D:\zeromq-data-transfer\data\target - -# Type of event detector to use (options are: inotifyx, watchdog) -eventDetectorType = watchdog -# Event type of files to be monitored (options are: "IN_CLOSE_WRITE", "IN_MOVED_TO", ...) -monitoredEventType = IN_CLOSE_WRITE -# Subdirectories of watchDir to be monitored -monitoredSubdirs = ["commissioning", "current", "local"] -# The formats to be monitored, files in an other format will be be neglected -monitoredFormats = [".tif", ".cbf"] -# Time (in seconds) since last modification after which a file will be seen as closed -timeTillClosed = 2 - -# List of hosts allowed to connect -whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "zitpcx17858"] -#whitelist = ["localhost", "haspp11eval01", "it-hpc-cxi03"] - -# Number of parallel data streams -# if this number is modifified, the port numbers also have to be adjusted -parallelDataStreams = 1 - -# Enable ZMQ pipe into storage system (if set to False: the file is moved into the localTarget) -useDataStream = True -# Fixed host to send the data to with highest priority -fixedStreamHost = zitpcx19282 -#fixedStreamHost = zitpcx17858 -# Fixed Port to send the data to with highest priority -fixedStreamPort = 50100 - -# Port number to receive signals from -comPort = 50000 - -# ZMQ port to forward requests -requestFwPort = 50002 -# ZMQ port to get new requests -requestPort = 50001 - -# ZMQ-router port which coordinates the load-balancing to the worker-processes -routerPort = 50003 - -# ZMQ-pull-socket port which deletes/moves given files -cleanerPort = 50004 - -# Chunk size of file-parts getting send via zmq -chunkSize = 10485760 ; # = 1024*1024*10 -#chunkSize = 1073741824 ; # = 1024*1024*1024 - -# Path where the logfile will be created -logfilePath = D:\zeromq-data-transfer\logs - -# Filename used for logging -logfileName = dataManager.log - +# Directory to be monitor for changes +# Inside this directory only the subdirectories "commissioning", "current" and "local" are monitored +monitoredDir = D:\zeromq-data-transfer\data\source +# Target to move the files into +localTarget = D:\zeromq-data-transfer\data\target + +# Type of event detector to use (options are: inotifyx, watchdog) +#eventDetectorType = inotifyx +eventDetectorType = watchdog +# Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...) +monitoredEventType = IN_CLOSE_WRITE +# Subdirectories of watchDir to be monitored +monitoredSubdirs = ["commissioning", "current", "local"] +# The formats to be monitored, files in an other format will be be neglected +monitoredFormats = [".tif", ".cbf"] +# Time (in seconds) since last modification after which a file will be seen as closed +timeTillClosed = 2 + +# List of hosts allowed to connect +whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "zitpcx17858"] + +# Number of parallel data streams +# if this number is modifified, the port numbers also have to be adjusted +numberOfStreams = 1 + +# Enable ZMQ pipe into storage system (if set to False: the file is moved into the localTarget) +useDataStream = True +# Fixed host to send the data to with highest priority +fixedStreamHost = zitpcx19282 +# Fixed Port to send the data to with highest priority +fixedStreamPort = 50100 + +# Port number to receive signals from +comPort = 50000 + +# ZMQ port to forward requests +requestFwPort = 50002 +# ZMQ port to get new requests +requestPort = 50001 + +# ZMQ-router port which coordinates the load-balancing to the worker-processes +routerPort = 50003 + +# ZMQ-pull-socket port which deletes/moves given files +cleanerPort = 50004 + +# Chunk size of file-parts getting send via zmq +chunkSize = 10485760 ; # = 1024*1024*10 +#chunkSize = 1073741824 ; # = 1024*1024*1024 + +# Path where the logfile will be created +logfilePath = D:\zeromq-data-transfer\logs + +# Filename used for logging +logfileName = dataManager.log + +# File size before rollover in B (linux only) +logfileSize = 10485760 ; #10 MB + diff --git a/src/sender/DataManager.py b/src/sender/DataManager.py index 988c1f726d2a47eb5a2de7ed21bfc8cdbfe4bd20..7a9d2a256a30719b25f8bec54ea390f7610ec773 100644 --- a/src/sender/DataManager.py +++ b/src/sender/DataManager.py @@ -38,136 +38,136 @@ def argumentParsing(): config = ConfigParser.RawConfigParser() config.readfp(helpers.FakeSecHead(open(configFile))) - logfilePath = config.get('asection', 'logfilePath') - logfileName = config.get('asection', 'logfileName') - logfileSize = config.get('asection', 'logfileSize') #100*1048576 + logfilePath = config.get('asection', 'logfilePath') + logfileName = config.get('asection', 'logfileName') + logfileSize = config.get('asection', 'logfileSize') #100*1048576 - comPort = config.get('asection', 'comPort') - whitelist = json.loads(config.get('asection', 'whitelist')) + comPort = config.get('asection', 'comPort') + whitelist = json.loads(config.get('asection', 'whitelist')) - requestPort = config.get('asection', 'requestPort') - requestFwPort = config.get('asection', 'requestFwPort') + requestPort = config.get('asection', 'requestPort') + requestFwPort = config.get('asection', 'requestFwPort') - eventDetectorType = config.get('asection', 'eventDetectorType') - monitoredDir = config.get('asection', 'monitoredDir') - monitoredEventType = config.get('asection', 'monitoredEventType') - monitoredSubdirs = json.loads(config.get('asection', 'monitoredSubdirs')) - monitoredFormats = json.loads(config.get('asection', 'monitoredFormats')) - timeTillClosed = int(config.get('asection', 'timeTillClosed')) + eventDetectorType = config.get('asection', 'eventDetectorType') + monitoredDir = config.get('asection', 'monitoredDir') + monitoredEventType = config.get('asection', 'monitoredEventType') + monitoredSubdirs = json.loads(config.get('asection', 'monitoredSubdirs')) + monitoredFormats = json.loads(config.get('asection', 'monitoredFormats')) + timeTillClosed = int(config.get('asection', 'timeTillClosed')) - useDataStream = config.getboolean('asection', 'useDataStream') - fixedStreamHost = config.get('asection', 'fixedStreamHost') - fixedStreamPort = config.get('asection', 'fixedStreamPort') + useDataStream = config.getboolean('asection', 'useDataStream') + fixedStreamHost = config.get('asection', 'fixedStreamHost') + fixedStreamPort = config.get('asection', 'fixedStreamPort') - parallelDataStreams = config.get('asection', 'parallelDataStreams') - chunkSize = int(config.get('asection', 'chunkSize')) + numberOfStreams = config.get('asection', 'numberOfStreams') + chunkSize = int(config.get('asection', 'chunkSize')) - routerPort = config.get('asection', 'routerPort') + routerPort = config.get('asection', 'routerPort') - localTarget = config.get('asection', 'localTarget') - cleanerPort = config.get('asection', 'cleanerPort') + localTarget = config.get('asection', 'localTarget') + cleanerPort = config.get('asection', 'cleanerPort') parser = argparse.ArgumentParser() - parser.add_argument("--logfilePath" , type = str, - help = "Path where the logfile will be created (default=" + str(logfilePath) + ")", - default = logfilePath ) - parser.add_argument("--logfileName" , type = str, - help = "Filename used for logging (default=" + str(logfileName) + ")", - default = logfileName ) - parser.add_argument("--logfileSize" , type = int, - help = "File size before rollover in B (linux only; (default=" + str(logfileSize) + ")", - default = logfileSize ) - parser.add_argument("--verbose" , help = "More verbose output", - action = "store_true") - parser.add_argument("--onScreen" , type = str, - help = "Display logging on screen (options are CRITICAL, ERROR, WARNING, INFO, DEBUG)", - default = False ) - - parser.add_argument("--comPort" , type = str, - help = "Port number to receive signals (default=" + str(comPort) + ")", - default = comPort ) - parser.add_argument("--whitelist" , type = str, - help = "List of hosts allowed to connect (default=" + str(whitelist) + ")", - default = whitelist ) - - parser.add_argument("--requestPort" , type = str, - help = "ZMQ port to get new requests (default=" + str(requestPort) + ")", - default = requestPort ) - parser.add_argument("--requestFwPort" , type = str, - help = "ZMQ port to forward requests (default=" + str(requestFwPort) + ")", - default = requestFwPort ) - - parser.add_argument("--eventDetectorType" , type = str, - help = "Type of event detector to use (default=" + str(eventDetectorType) + ")", - default = eventDetectorType ) - parser.add_argument("--monitoredDir" , type = str, - help = "Directory to be monitor for changes; inside this directory only the specified \ + parser.add_argument("--logfilePath" , type = str, + help = "Path where the logfile will be created (default=" + str(logfilePath) + ")", + default = logfilePath ) + parser.add_argument("--logfileName" , type = str, + help = "Filename used for logging (default=" + str(logfileName) + ")", + default = logfileName ) + parser.add_argument("--logfileSize" , type = int, + help = "File size before rollover in B (linux only; (default=" + str(logfileSize) + ")", + default = logfileSize ) + parser.add_argument("--verbose" , help = "More verbose output", + action = "store_true") + parser.add_argument("--onScreen" , type = str, + help = "Display logging on screen (options are CRITICAL, ERROR, WARNING, INFO, DEBUG)", + default = False ) + + parser.add_argument("--comPort" , type = str, + help = "Port number to receive signals (default=" + str(comPort) + ")", + default = comPort ) + parser.add_argument("--whitelist" , type = str, + help = "List of hosts allowed to connect (default=" + str(whitelist) + ")", + default = whitelist ) + + parser.add_argument("--requestPort" , type = str, + help = "ZMQ port to get new requests (default=" + str(requestPort) + ")", + default = requestPort ) + parser.add_argument("--requestFwPort" , type = str, + help = "ZMQ port to forward requests (default=" + str(requestFwPort) + ")", + default = requestFwPort ) + + parser.add_argument("--eventDetectorType" , type = str, + help = "Type of event detector to use (default=" + str(eventDetectorType) + ")", + default = eventDetectorType ) + parser.add_argument("--monitoredDir" , type = str, + help = "Directory to be monitor for changes; inside this directory only the specified \ subdirectories are monitred (default=" + str(monitoredDir) + ")", - default = monitoredDir ) - parser.add_argument("--monitoredEventType" , type = str, - help = "Event type of files to be monitored (default=" + str(monitoredEventType) + ")", - default = monitoredEventType ) - parser.add_argument("--monitoredSubdirs" , type = str, - help = "Subdirectories of 'monitoredDirs' to be monitored (default=" + str(monitoredSubdirs) + ")", - default = monitoredSubdirs ) - parser.add_argument("--monitoredFormats" , type = str, - help = "The formats to be monitored, files in an other format will be be neglected \ - (default=" + str(monitoredFormats) + ")", - default = monitoredFormats ) - parser.add_argument("--timeTillClosed" , type = str, - help = "Time (in seconds) since last modification after which a file will be seen as closed \ - (default=" + str(timeTillClosed) + ")", - default = timeTillClosed ) - - parser.add_argument("--useDataStream" , type = str, - help = "Enable ZMQ pipe into storage system (if set to false: the file is moved \ - into the localTarget) (default=" + str(useDataStream) + ")", - default = useDataStream ) - parser.add_argument("--fixedStreamHost" , type = str, - help = "Fixed host to send the data to with highest priority \ - (only active is useDataStream is set; default=" + str(fixedStreamHost) + ")", - default = fixedStreamHost ) - parser.add_argument("--fixedStreamPort" , type = str, - help = "Fixed port to send the data to with highest priority \ - (only active is useDataStream is set; default=" + str(fixedStreamPort) + ")", - default = fixedStreamPort ) - parser.add_argument("--parallelDataStreams", type = int, - help = "Number of parallel data streams (default=" + str(parallelDataStreams) + ")", - default = parallelDataStreams ) - parser.add_argument("--chunkSize" , type = int, - help = "Chunk size of file-parts getting send via ZMQ (default=" + str(chunkSize) + ")", - default = chunkSize ) - - parser.add_argument("--routerPort" , type = str, - help = "ZMQ-router port which coordinates the load-balancing \ - to the worker-processes (default=" + str(routerPort) + ")", - default = routerPort ) - - parser.add_argument("--localTarget" , type = str, - help = "Target to move the files into (default=" + str(localTarget) + ")", - default = localTarget ) - parser.add_argument("--cleanerPort" , type = str, - help = "ZMQ-pull-socket port which deletes/moves given file \ - (default=" + str(cleanerPort) + ")", - default = cleanerPort ) - - arguments = parser.parse_args() - - logfilePath = str(arguments.logfilePath) - logfileName = str(arguments.logfileName) - logfileFullPath = os.path.join(logfilePath, logfileName) - verbose = arguments.verbose - onScreen = arguments.onScreen - - eventDetectorType = arguments.eventDetectorType.lower() - supportedEDTypes = ["inotifyx", "watchdog"] - monitoredDir = str(arguments.monitoredDir) - monitoredSubdirs = arguments.monitoredSubdirs - localTarget = str(arguments.localTarget) - - useDataStream = arguments.useDataStream - parallelDataStreams = arguments.parallelDataStreams + default = monitoredDir ) + parser.add_argument("--monitoredEventType", type = str, + help = "Event type of files to be monitored (default=" + str(monitoredEventType) + ")", + default = monitoredEventType ) + parser.add_argument("--monitoredSubdirs" , type = str, + help = "Subdirectories of 'monitoredDirs' to be monitored (default=" + str(monitoredSubdirs) + ")", + default = monitoredSubdirs ) + parser.add_argument("--monitoredFormats" , type = str, + help = "The formats to be monitored, files in an other format will be be neglected \ + (default=" + str(monitoredFormats) + ")", + default = monitoredFormats ) + parser.add_argument("--timeTillClosed" , type = str, + help = "Time (in seconds) since last modification after which a file will be seen as closed \ + (default=" + str(timeTillClosed) + ")", + default = timeTillClosed ) + + parser.add_argument("--useDataStream" , type = str, + help = "Enable ZMQ pipe into storage system (if set to false: the file is moved \ + into the localTarget) (default=" + str(useDataStream) + ")", + default = useDataStream ) + parser.add_argument("--fixedStreamHost" , type = str, + help = "Fixed host to send the data to with highest priority \ + (only active is useDataStream is set; default=" + str(fixedStreamHost) + ")", + default = fixedStreamHost ) + parser.add_argument("--fixedStreamPort" , type = str, + help = "Fixed port to send the data to with highest priority \ + (only active is useDataStream is set; default=" + str(fixedStreamPort) + ")", + default = fixedStreamPort ) + parser.add_argument("--numberOfStreams" , type = int, + help = "Number of parallel data streams (default=" + str(numberOfStreams) + ")", + default = numberOfStreams ) + parser.add_argument("--chunkSize" , type = int, + help = "Chunk size of file-parts getting send via ZMQ (default=" + str(chunkSize) + ")", + default = chunkSize ) + + parser.add_argument("--routerPort" , type = str, + help = "ZMQ-router port which coordinates the load-balancing \ + to the worker-processes (default=" + str(routerPort) + ")", + default = routerPort ) + + parser.add_argument("--localTarget" , type = str, + help = "Target to move the files into (default=" + str(localTarget) + ")", + default = localTarget ) + parser.add_argument("--cleanerPort" , type = str, + help = "ZMQ-pull-socket port which deletes/moves given file \ + (default=" + str(cleanerPort) + ")", + default = cleanerPort ) + + arguments = parser.parse_args() + + logfilePath = str(arguments.logfilePath) + logfileName = str(arguments.logfileName) + logfileFullPath = os.path.join(logfilePath, logfileName) + verbose = arguments.verbose + onScreen = arguments.onScreen + + eventDetectorType = arguments.eventDetectorType.lower() + supportedEDTypes = ["inotifyx", "watchdog"] + monitoredDir = str(arguments.monitoredDir) + monitoredSubdirs = arguments.monitoredSubdirs + localTarget = str(arguments.localTarget) + + useDataStream = arguments.useDataStream + numberOfStreams = arguments.numberOfStreams # check if logfile is writable helpers.checkLogFileWritable(logfilePath, logfileName) @@ -189,14 +189,14 @@ class DataManager(): def __init__(self, logQueue = None): arguments = argumentParsing() - logfilePath = arguments.logfilePath - logfileName = arguments.logfileName - logfile = os.path.join(logfilePath, logfileName) - logsize = arguments.logfileSize - verbose = arguments.verbose - onScreen = arguments.onScreen + logfilePath = arguments.logfilePath + logfileName = arguments.logfileName + logfile = os.path.join(logfilePath, logfileName) + logsize = arguments.logfileSize + verbose = arguments.verbose + onScreen = arguments.onScreen - self.extLogQueue = False + self.extLogQueue = False if logQueue: self.logQueue = logQueue @@ -222,11 +222,11 @@ class DataManager(): # Create log and set handler to queue handle self.log = self.getLogger(self.logQueue) - self.comPort = arguments.comPort - self.whitelist = arguments.whitelist + self.comPort = arguments.comPort + self.whitelist = arguments.whitelist - self.requestPort = arguments.requestPort - self.requestFwPort = arguments.requestFwPort + self.requestPort = arguments.requestPort + self.requestFwPort = arguments.requestFwPort if arguments.eventDetectorType == "inotifyx": self.eventDetectorConfig = { @@ -245,19 +245,26 @@ class DataManager(): "monSuffixes" : arguments.monitoredFormats, "timeTillClosed" : arguments.timeTillClosed } + elif arguments.eventDetectorType == "lambda": + self.eventDetectorConfig = { + "eventPort" : "6001", #arguments.eventPort, + "numberOfStreams" : arguments.numberOfStreams, + "context" : None, + } + if arguments.useDataStream: - self.fixedStreamId = "{host}:{port}".format( host=arguments.fixedStreamHost, port=arguments.fixedStreamPort ) + self.fixedStreamId = "{host}:{port}".format( host=arguments.fixedStreamHost, port=arguments.fixedStreamPort ) else: - self.fixedStreamId = None + self.fixedStreamId = None - self.parallelDataStreams = arguments.parallelDataStreams - self.chunkSize = arguments.chunkSize + self.numberOfStreams = arguments.numberOfStreams + self.chunkSize = arguments.chunkSize - self.routerPort = arguments.routerPort + self.routerPort = arguments.routerPort - self.localTarget = arguments.localTarget - self.cleanerPort = arguments.cleanerPort + self.localTarget = arguments.localTarget + self.cleanerPort = arguments.cleanerPort self.signalHandlerPr = None self.taskProviderPr = None @@ -298,7 +305,7 @@ class DataManager(): self.taskProviderPr = Process ( target = TaskProvider, args = (self.eventDetectorConfig, self.requestFwPort, self.routerPort, self.logQueue) ) self.taskProviderPr.start() - for id in range(self.parallelDataStreams): + for id in range(self.numberOfStreams): pr = Process ( target = DataDispatcher, args = ( id, self.routerPort, self.chunkSize, self.fixedStreamId, self.logQueue, self.localTarget) ) pr.start() self.dataDispatcherPr.append(pr)