diff --git a/conf/dataManager.conf b/conf/dataManager.conf index 5e1dc2d08ad99b13bc3887fc5031fc6bb015590a..2d40d590ffe4fe8e9626a5c2fdf5d1e0acefb5cf 100644 --- a/conf/dataManager.conf +++ b/conf/dataManager.conf @@ -9,9 +9,9 @@ localTarget = /space/projects/zeromq-data-transfer/data/target #localTarget = /gpfs # Type of event detector to use (options are: inotifyx, watchdog, zmq) -#eventDetectorType = inotifyx -eventDetectorType = watchdog -#eventDetectorType = zmq +#eventDetectorType = InotifyxDetector +#eventDetectorType = WatchdogDetector +eventDetectorType = ZmqDetector # Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...) monitoredEventType = IN_CLOSE_WRITE #monitoredEventType = IN_MOVED_TO @@ -23,7 +23,8 @@ monitoredFormats = [".tif", ".cbf"] timeTillClosed = 2 # Module with methods specifying how to get the data (options are "getFromFile", "getFromZmq") -dataFetcherType = getFromFile +#dataFetcherType = getFromFile +dataFetcherType = getFromZmq # If "getFromZmq" is specified as dataFetcherType is needs a port to listen to dataFetcherPort = 50010 diff --git a/src/sender/DataDispatcher.py b/src/sender/DataDispatcher.py index 50afc5a5da8d42803c7e83499770674b754fbf1c..accf7e289a95d26c778869fd0f470b922c484099 100644 --- a/src/sender/DataDispatcher.py +++ b/src/sender/DataDispatcher.py @@ -65,7 +65,7 @@ class DataDispatcher(): self.localTarget = localTarget self.dataFetcherProp = dataFetcherProp - self.log.debug("Configuration for data fetcher: " + str(self.dataFetcherProp)) + self.log.debug("Configuration for dataFetcher: " + str(self.dataFetcherProp)) dataFetcher = self.dataFetcherProp["type"] @@ -84,9 +84,9 @@ class DataDispatcher(): self.__createSockets() - self.log.info("Loading data Fetcher: " + dataFetcher) + self.log.info("Loading dataFetcher: " + dataFetcher) self.dataFetcher = __import__(dataFetcher) - self.dataFetcher.setup(dataFetcherProp) + self.dataFetcher.setup(self.log, dataFetcherProp) # Process.__init__(self) diff --git a/src/sender/DataManager.py b/src/sender/DataManager.py index 4185513a4db24c79eaffc35d211e299882a344a6..a3be1fc429c09200da7f5288a04c974351b9df32 100644 --- a/src/sender/DataManager.py +++ b/src/sender/DataManager.py @@ -154,7 +154,7 @@ def argumentParsing(): parser.add_argument("--eventPort" , type = str, help = "ZMQ port to get events from (only needed if eventDetectorType is zmq; \ default=" + str(eventPort) + ")", - default = routerPort ) + default = eventPort ) parser.add_argument("--routerPort" , type = str, help = "ZMQ-router port which coordinates the load-balancing \ to the worker-processes (default=" + str(routerPort) + ")", @@ -177,8 +177,8 @@ def argumentParsing(): onScreen = arguments.onScreen eventDetectorType = arguments.eventDetectorType.lower() - supportedEDTypes = ["inotifyx", "watchdog", "zmq"] - supportedDFTypes = ["getFromFile"] + supportedEDTypes = ["inotifyxdetector", "watchdogdetector", "zmqdetector"] + supportedDFTypes = ["getfromfile", "getfromzmq"] monitoredDir = str(arguments.monitoredDir) monitoredSubdirs = arguments.monitoredSubdirs localTarget = str(arguments.localTarget) @@ -249,7 +249,7 @@ class DataManager(): self.requestFwPort = arguments.requestFwPort self.log.debug("Configured type of eventDetector: " + arguments.eventDetectorType) - if arguments.eventDetectorType == "inotifyx": + if arguments.eventDetectorType == "InotifyxDetector": self.eventDetectorConfig = { "eventDetectorType" : arguments.eventDetectorType, "monDir" : arguments.monitoredDir, @@ -257,7 +257,7 @@ class DataManager(): "monSubdirs" : arguments.monitoredSubdirs, "monSuffixes" : arguments.monitoredFormats } - elif arguments.eventDetectorType == "watchdog": + elif arguments.eventDetectorType == "WatchdogDetector": self.eventDetectorConfig = { "eventDetectorType" : arguments.eventDetectorType, "monDir" : arguments.monitoredDir, @@ -266,12 +266,12 @@ class DataManager(): "monSuffixes" : arguments.monitoredFormats, "timeTillClosed" : arguments.timeTillClosed } - elif arguments.eventDetectorType == "zmq": + elif arguments.eventDetectorType == "ZmqDetector": self.eventDetectorConfig = { "eventDetectorType" : arguments.eventDetectorType, "eventPort" : arguments.eventPort, "numberOfStreams" : arguments.numberOfStreams, - "context" : None, #TODO + "context" : None, } diff --git a/src/sender/TaskProvider.py b/src/sender/TaskProvider.py index d95318a821d16a6da5bc517ebb44553589be019d..7687b12ff9b4708c1a7c564aa3eb536f0376c296 100644 --- a/src/sender/TaskProvider.py +++ b/src/sender/TaskProvider.py @@ -19,6 +19,11 @@ if not SHARED_PATH in sys.path: sys.path.append ( SHARED_PATH ) del SHARED_PATH +EVENTDETECTOR_PATH = BASE_PATH + os.sep + "src" + os.sep + "sender" + os.sep + "eventDetectors" +if not EVENTDETECTOR_PATH in sys.path: + sys.path.append ( EVENTDETECTOR_PATH ) +del EVENTDETECTOR_PATH + import helpers @@ -38,20 +43,23 @@ class TaskProvider(): # monSuffixes : ... , #} - self.log = self.getLogger(logQueue) + self.log = self.getLogger(logQueue) self.log.debug("TaskProvider started (PID " + str(os.getpid()) + ").") - self.eventDetector = None + self.dataDetectorModule = None + self.eventDetector = None - self.config = eventDetectorConfig + self.config = eventDetectorConfig self.log.debug("Configuration for event detector: " + str(self.config)) - self.localhost = "127.0.0.1" - self.extIp = "0.0.0.0" - self.requestFwPort = requestFwPort - self.routerPort = routerPort - self.requestFwSocket = None - self.routerSocket = None + eventDetectorModule = self.config["eventDetectorType"] + + self.localhost = "127.0.0.1" + self.extIp = "0.0.0.0" + self.requestFwPort = requestFwPort + self.routerPort = routerPort + self.requestFwSocket = None + self.routerSocket = None self.log.debug("Registering ZMQ context") # remember if the context was created outside this class or not @@ -63,34 +71,10 @@ class TaskProvider(): self.extContext = False - if self.config.has_key("eventDetectorType") and self.config["eventDetectorType"] == "inotifyx": - - from eventDetectors.InotifyxDetector import InotifyxDetector as EventDetector - - # check format of config - if ( not self.config.has_key("monDir") or - not self.config.has_key("monEventType") or - not self.config.has_key("monSubdirs") or - not self.config.has_key("monSuffixes") ): - self.log.error ("Configuration of wrong format") - - elif self.config.has_key("eventDetectorType") and self.config["eventDetectorType"] == "watchdog": - - from eventDetectors.WatchdogDetector import WatchdogDetector as EventDetector - - # check format of config - if ( not self.config.has_key("monDir") or - not self.config.has_key("monEventType") or - not self.config.has_key("monSubdirs") or - not self.config.has_key("monSuffixes") or - not self.config.has_key("timeTillClosed") ): - self.log.error ("Configuration of wrong format") - - else: - self.log.error("Type of event detector is not supported: " + str( self.config["eventDetectorType"] )) - return -1 + self.log.info("Loading eventDetector: " + eventDetectorModule) + self.eventDetectorModule = __import__(eventDetectorModule) - self.eventDetector = EventDetector(self.config, logQueue) + self.eventDetector = self.eventDetectorModule.EventDetector(self.config, logQueue) self.createSockets() diff --git a/src/sender/dataFetchers/getFromFile.py b/src/sender/dataFetchers/getFromFile.py index d8bc2337fa4f96e0e7ef041e90f78b8285290b25..b31d16070041b4ba237f39d019740b7ab39386ff 100644 --- a/src/sender/dataFetchers/getFromFile.py +++ b/src/sender/dataFetchers/getFromFile.py @@ -9,7 +9,7 @@ import cPickle import shutil -def setup (dataFetcherProp): +def setup (log, dataFetcherProp): #TODO # check if dataFetcherProp has correct format return diff --git a/src/sender/dataFetchers/getFromZmq.py b/src/sender/dataFetchers/getFromZmq.py index 697d88fccb944c6052ef1aaf3bdab9513c1dd1d5..3f24af01e11e0732d568622a8975dd1a8af2c5b1 100644 --- a/src/sender/dataFetchers/getFromZmq.py +++ b/src/sender/dataFetchers/getFromZmq.py @@ -38,7 +38,7 @@ def getMetadata (log, metadata, chunkSize, localTarget = None): #TODO combine better with sourceFile... (for efficiency) if localTarget: - targetFile = os.path.join(localTarget, filename) + targetFile = os.path.join(localTarget, sourceFile) else: targetFile = None diff --git a/src/sender/eventDetectors/InotifyxDetector.py b/src/sender/eventDetectors/InotifyxDetector.py index 5682b7d4576befe1e5ffce86285de0734737d64c..8c037dbe926e13b125e4e23c04a4bc658f104df1 100644 --- a/src/sender/eventDetectors/InotifyxDetector.py +++ b/src/sender/eventDetectors/InotifyxDetector.py @@ -75,7 +75,8 @@ class InotifyEvent(object): # Modification of the inotifyx example found inside inotifyx library # Copyright (c) 2005 Manuel Amador # Copyright (c) 2009-2011 Forest Bond -class InotifyxDetector(): +#class InotifyxDetector(): +class EventDetector(): def __init__(self, config, logQueue): @@ -129,7 +130,7 @@ class InotifyxDetector(): def getLogger (self, queue): # Create log and set handler to queue handle h = QueueHandler(queue) # Just the one handler needed - logger = logging.getLogger("inotifyDetector") + logger = logging.getLogger("InotifyxDetector") logger.propagate = False logger.addHandler(h) logger.setLevel(logging.DEBUG) @@ -355,7 +356,8 @@ if __name__ == '__main__': "monSuffixes" : [".tif", ".cbf"] } - eventDetector = InotifyxDetector(config, logQueue) +# eventDetector = InotifyxDetector(config, logQueue) + eventDetector = EventDetector(config, logQueue) sourceFile = BASE_PATH + os.sep + "test_file.cbf" targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep diff --git a/src/sender/eventDetectors/WatchdogDetector.py b/src/sender/eventDetectors/WatchdogDetector.py index 0961b1c64a51f77262c6c2ac06a1153105577c5e..94ca389004b54bf976181d73c0f46d1f75ed9c56 100644 --- a/src/sender/eventDetectors/WatchdogDetector.py +++ b/src/sender/eventDetectors/WatchdogDetector.py @@ -311,7 +311,8 @@ class checkModTime(threading.Thread): self.stop() -class WatchdogDetector(): +#class WatchdogDetector(): +class EventDetector(): def __init__(self, config, logQueue): self.log = self.getLogger(logQueue) @@ -433,7 +434,8 @@ if __name__ == '__main__': sourceFile = BASE_PATH + os.sep + "test_file.cbf" targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep - eventDetector = WatchdogDetector(config, logQueue) +# eventDetector = WatchdogDetector(config, logQueue) + eventDetector = EventDetector(config, logQueue) copyFlag = False diff --git a/src/sender/eventDetectors/ZmqDetector.py b/src/sender/eventDetectors/ZmqDetector.py index fd65e96f634a8c87d339f7a9c267c2bd478b4b59..d0c7388ab0781708501cc75ffe930442d969e651 100644 --- a/src/sender/eventDetectors/ZmqDetector.py +++ b/src/sender/eventDetectors/ZmqDetector.py @@ -8,7 +8,8 @@ import cPickle from logutils.queue import QueueHandler -class ZmqDetector(): +#class ZmqDetector(): +class EventDetector(): def __init__(self, config, logQueue): @@ -50,7 +51,7 @@ class ZmqDetector(): def getLogger (self, queue): # Create log and set handler to queue handle h = QueueHandler(queue) # Just the one handler needed - logger = logging.getLogger("zmqDetector") + logger = logging.getLogger("ZmqDetector") logger.propagate = False logger.addHandler(h) logger.setLevel(logging.DEBUG) @@ -151,7 +152,8 @@ if __name__ == '__main__': } - eventDetector = ZmqDetector(config, logQueue) +# eventDetector = ZmqDetector(config, logQueue) + eventDetector = EventDetector(config, logQueue) sourceFile = BASE_PATH + os.sep + "test_file.cbf" targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep