From 5ee8f9f3809f809caeb8b321bf44c64ca58301ec Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Wed, 16 Mar 2016 11:07:19 +0100 Subject: [PATCH] Renamed LambdaDetector to ZmqDetector --- conf/dataManager.conf | 14 +++++++++----- conf/dataManager_windows.conf | 14 +++++++++----- src/sender/DataManager.py | 14 ++++++++++---- .../{LambdaDetector.py => ZmqDetector.py} | 10 +++++----- 4 files changed, 33 insertions(+), 19 deletions(-) rename src/sender/eventDetectors/{LambdaDetector.py => ZmqDetector.py} (95%) diff --git a/conf/dataManager.conf b/conf/dataManager.conf index 17cf6b71..5fa5c83a 100644 --- a/conf/dataManager.conf +++ b/conf/dataManager.conf @@ -8,9 +8,10 @@ localTarget = /space/projects/zeromq-data-transfer/data/target #localTarget = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target #localTarget = /gpfs -# Type of event detector to use (options are: inotifyx, watchdog) +# Type of event detector to use (options are: inotifyx, watchdog, zmq) #eventDetectorType = inotifyx eventDetectorType = watchdog +#eventDetectorType = zmq # Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...) monitoredEventType = IN_CLOSE_WRITE #monitoredEventType = IN_MOVED_TO @@ -39,16 +40,19 @@ fixedStreamPort = 50100 # Port number to receive signals from comPort = 50000 -# ZMQ port to forward requests -requestFwPort = 50002 # ZMQ port to get new requests requestPort = 50001 +# ZMQ port to forward requests +requestFwPort = 50002 + +# ZMQ port to get events from (only needed if eventDetectorType is zmq) +eventPort = 50003 # ZMQ-router port which coordinates the load-balancing to the worker-processes -routerPort = 50003 +routerPort = 50004 # ZMQ-pull-socket port which deletes/moves given files -cleanerPort = 50004 +cleanerPort = 50005 # Chunk size of file-parts getting send via zmq chunkSize = 10485760 ; # = 1024*1024*10 diff --git a/conf/dataManager_windows.conf b/conf/dataManager_windows.conf index 3a5c3a54..e1898686 100644 --- a/conf/dataManager_windows.conf +++ b/conf/dataManager_windows.conf @@ -4,9 +4,10 @@ monitoredDir = D:\zeromq-data-transfer\data\source # Target to move the files into localTarget = D:\zeromq-data-transfer\data\target -# Type of event detector to use (options are: inotifyx, watchdog) +# Type of event detector to use (options are: inotifyx, watchdog, zmq) #eventDetectorType = inotifyx eventDetectorType = watchdog +#eventDetectorType = zmq # Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...) monitoredEventType = IN_CLOSE_WRITE # Subdirectories of watchDir to be monitored @@ -33,16 +34,19 @@ fixedStreamPort = 50100 # Port number to receive signals from comPort = 50000 -# ZMQ port to forward requests -requestFwPort = 50002 # ZMQ port to get new requests requestPort = 50001 +# ZMQ port to forward requests +requestFwPort = 50002 + +# ZMQ port to get events from (only needed if eventDetectorType is zmq) +eventPort = 50003 # ZMQ-router port which coordinates the load-balancing to the worker-processes -routerPort = 50003 +routerPort = 50004 # ZMQ-pull-socket port which deletes/moves given files -cleanerPort = 50004 +cleanerPort = 50005 # Chunk size of file-parts getting send via zmq chunkSize = 10485760 ; # = 1024*1024*10 diff --git a/src/sender/DataManager.py b/src/sender/DataManager.py index 7a9d2a25..e8d5d977 100644 --- a/src/sender/DataManager.py +++ b/src/sender/DataManager.py @@ -62,6 +62,7 @@ def argumentParsing(): numberOfStreams = config.get('asection', 'numberOfStreams') chunkSize = int(config.get('asection', 'chunkSize')) + eventPort = config.get('asection', 'eventPort') routerPort = config.get('asection', 'routerPort') localTarget = config.get('asection', 'localTarget') @@ -139,6 +140,10 @@ def argumentParsing(): help = "Chunk size of file-parts getting send via ZMQ (default=" + str(chunkSize) + ")", default = chunkSize ) + parser.add_argument("--eventPort" , type = str, + help = "ZMQ port to get events from (only needed if eventDetectorType is zmq; \ + default=" + str(eventPort) + ")", + default = routerPort ) parser.add_argument("--routerPort" , type = str, help = "ZMQ-router port which coordinates the load-balancing \ to the worker-processes (default=" + str(routerPort) + ")", @@ -161,7 +166,7 @@ def argumentParsing(): onScreen = arguments.onScreen eventDetectorType = arguments.eventDetectorType.lower() - supportedEDTypes = ["inotifyx", "watchdog"] + supportedEDTypes = ["inotifyx", "watchdog", "zmq"] monitoredDir = str(arguments.monitoredDir) monitoredSubdirs = arguments.monitoredSubdirs localTarget = str(arguments.localTarget) @@ -245,11 +250,12 @@ class DataManager(): "monSuffixes" : arguments.monitoredFormats, "timeTillClosed" : arguments.timeTillClosed } - elif arguments.eventDetectorType == "lambda": + elif arguments.eventDetectorType == "zmq": self.eventDetectorConfig = { - "eventPort" : "6001", #arguments.eventPort, + "eventDetectorType" : arguments.eventDetectorType, + "eventPort" : arguments.eventPort, "numberOfStreams" : arguments.numberOfStreams, - "context" : None, + "context" : None, #TODO } diff --git a/src/sender/eventDetectors/LambdaDetector.py b/src/sender/eventDetectors/ZmqDetector.py similarity index 95% rename from src/sender/eventDetectors/LambdaDetector.py rename to src/sender/eventDetectors/ZmqDetector.py index d78c9795..fd65e96f 100644 --- a/src/sender/eventDetectors/LambdaDetector.py +++ b/src/sender/eventDetectors/ZmqDetector.py @@ -8,7 +8,7 @@ import cPickle from logutils.queue import QueueHandler -class LambdaDetector(): +class ZmqDetector(): def __init__(self, config, logQueue): @@ -50,7 +50,7 @@ class LambdaDetector(): def getLogger (self, queue): # Create log and set handler to queue handle h = QueueHandler(queue) # Just the one handler needed - logger = logging.getLogger("lambdaDetector") + logger = logging.getLogger("zmqDetector") logger.propagate = False logger.addHandler(h) logger.setLevel(logging.DEBUG) @@ -122,7 +122,7 @@ if __name__ == '__main__': import helpers - logfile = BASE_PATH + os.sep + "logs" + os.sep + "lambdaDetector.log" + logfile = BASE_PATH + os.sep + "logs" + os.sep + "zmqDetector.log" logsize = 10485760 logQueue = Queue(-1) @@ -144,14 +144,14 @@ if __name__ == '__main__': eventPort = "6001" numberOfStreams = 4 config = { - "eventDetectorType" : "lambda", + "eventDetectorType" : "zmq", "eventPort" : eventPort, "numberOfStreams" : numberOfStreams, "context" : None, } - eventDetector = LambdaDetector(config, logQueue) + eventDetector = ZmqDetector(config, logQueue) sourceFile = BASE_PATH + os.sep + "test_file.cbf" targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep -- GitLab