diff --git a/conf/dataManager.conf b/conf/dataManager.conf index 5fa5c83aafd33da070a498c47ec5df87fe0f4ae4..5e1dc2d08ad99b13bc3887fc5031fc6bb015590a 100644 --- a/conf/dataManager.conf +++ b/conf/dataManager.conf @@ -22,6 +22,12 @@ monitoredFormats = [".tif", ".cbf"] # Time (in seconds) since last modification after which a file will be seen as closed timeTillClosed = 2 +# Module with methods specifying how to get the data (options are "getFromFile", "getFromZmq") +dataFetcherType = getFromFile +# If "getFromZmq" is specified as dataFetcherType is needs a port to listen to +dataFetcherPort = 50010 + + # List of hosts allowed to connect whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "haspp11eval01", "it-hpc-cxi03"] #whitelist = ["localhost", "haspp11eval01", "it-hpc-cxi03"] diff --git a/src/sender/DataDispatcher.py b/src/sender/DataDispatcher.py index 4e2763ba3dd0849854882df5c86150efe08298a0..50afc5a5da8d42803c7e83499770674b754fbf1c 100644 --- a/src/sender/DataDispatcher.py +++ b/src/sender/DataDispatcher.py @@ -36,14 +36,13 @@ class DataDispatcher(): def __init__(self, id, routerPort, chunkSize, fixedStreamId, dataFetcherProp, logQueue, localTarget = None, context = None): - supportedDataFetchers = ["getFromFile"] - dataFetcherProp = { - "type" : "getFromFile", - "removeFlag" : False - } +# dataFetcherProp = { +# "type" : "getFromFile", +# "removeFlag" : False +# } # dataFetcherProp = { -# "type" : "getFromQueue", +# "type" : "getFromZmq", # "context" : context, # "extIp" : "0.0.0.0", # "port" : "50010" @@ -66,6 +65,8 @@ class DataDispatcher(): self.localTarget = localTarget self.dataFetcherProp = dataFetcherProp + self.log.debug("Configuration for data fetcher: " + str(self.dataFetcherProp)) + dataFetcher = self.dataFetcherProp["type"] # dict with informations of all open sockets to which a data stream is opened (host, port,...) @@ -83,13 +84,9 @@ class DataDispatcher(): self.__createSockets() - - if dataFetcher in supportedDataFetchers: - self.log.info("Loading data Fetcher: " + dataFetcher) - self.dataFetcher = __import__(dataFetcher) - self.dataFetcher.setup(dataFetcherProp) - else: - raise Exception("DataFetcher type " + dataFetcher + " not supported") + self.log.info("Loading data Fetcher: " + dataFetcher) + self.dataFetcher = __import__(dataFetcher) + self.dataFetcher.setup(dataFetcherProp) # Process.__init__(self) @@ -287,7 +284,7 @@ if __name__ == '__main__': } # dataFetcherProp = { -# "type" : "getFromQueue", +# "type" : "getFromZmq", # "context" : context, # "extIp" : "0.0.0.0", # "port" : "50010" diff --git a/src/sender/DataManager.py b/src/sender/DataManager.py index c8900a0f3ae39e7b99066c0db8f905955420feea..4185513a4db24c79eaffc35d211e299882a344a6 100644 --- a/src/sender/DataManager.py +++ b/src/sender/DataManager.py @@ -55,6 +55,9 @@ def argumentParsing(): monitoredFormats = json.loads(config.get('asection', 'monitoredFormats')) timeTillClosed = int(config.get('asection', 'timeTillClosed')) + dataFetcherType = config.get('asection', 'dataFetcherType') + dataFetcherPort = config.get('asection', 'dataFetcherPort') + useDataStream = config.getboolean('asection', 'useDataStream') fixedStreamHost = config.get('asection', 'fixedStreamHost') fixedStreamPort = config.get('asection', 'fixedStreamPort') @@ -121,6 +124,14 @@ def argumentParsing(): (default=" + str(timeTillClosed) + ")", default = timeTillClosed ) + parser.add_argument("--dataFetcherType" , type = str, + help = "Module with methods specifying how to get the data (default=" + str(dataFetcherType) + ")", + default = dataFetcherType ) + parser.add_argument("--dataFetcherPort" , type = str, + help = "If 'getFromZmq is specified as dataFetcherType is needs a port to listen to \ + (default=" + str(dataFetcherType) + ")", + default = dataFetcherPort ) + parser.add_argument("--useDataStream" , type = str, help = "Enable ZMQ pipe into storage system (if set to false: the file is moved \ into the localTarget) (default=" + str(useDataStream) + ")", @@ -237,6 +248,7 @@ class DataManager(): self.requestPort = arguments.requestPort self.requestFwPort = arguments.requestFwPort + self.log.debug("Configured type of eventDetector: " + arguments.eventDetectorType) if arguments.eventDetectorType == "inotifyx": self.eventDetectorConfig = { "eventDetectorType" : arguments.eventDetectorType, @@ -263,18 +275,19 @@ class DataManager(): } - self.dataFetcherProp = { - "type" : "getFromFile", - "removeFlag" : False - } - -# self.dataFetcherProp = { -# "type" : "getFromQueue", # TODO get from config-file -# "context" : None, -# "extIp" : "0.0.0.0", -# "port" : "50010" # TODO get from config-file -# } - + self.log.debug("Configured Type of dataFetcher: " + arguments.dataFetcherType) + if arguments.dataFetcherType == "getFromFile": + self.dataFetcherProp = { + "type" : arguments.dataFetcherType, + "removeFlag" : False + } + elif arguments.dataFetcherType == "getFromZmq": + self.dataFetcherProp = { + "type" : arguments.dataFetcherType, + "context" : None, + "extIp" : "0.0.0.0", + "port" : arguments.dataFetcherPort, + } if arguments.useDataStream: diff --git a/src/sender/TaskProvider.py b/src/sender/TaskProvider.py index 318fdba903ffc9c0e255c9948f87ddcc1f4b71a1..d95318a821d16a6da5bc517ebb44553589be019d 100644 --- a/src/sender/TaskProvider.py +++ b/src/sender/TaskProvider.py @@ -44,7 +44,7 @@ class TaskProvider(): self.eventDetector = None self.config = eventDetectorConfig - self.log.debug("Configuration for Event detector: " + str(self.config)) + self.log.debug("Configuration for event detector: " + str(self.config)) self.localhost = "127.0.0.1" self.extIp = "0.0.0.0" diff --git a/src/sender/dataFetchers/getFromFile.py b/src/sender/dataFetchers/getFromFile.py index a1aeecc88a7197505710cd7f1a6176d169968f58..d8bc2337fa4f96e0e7ef041e90f78b8285290b25 100644 --- a/src/sender/dataFetchers/getFromFile.py +++ b/src/sender/dataFetchers/getFromFile.py @@ -10,7 +10,9 @@ import shutil def setup (dataFetcherProp): - return dict() + #TODO + # check if dataFetcherProp has correct format + return def getMetadata (log, metadata, chunkSize, localTarget = None): diff --git a/src/sender/dataFetchers/getFromQueue.py b/src/sender/dataFetchers/getFromZmq.py similarity index 99% rename from src/sender/dataFetchers/getFromQueue.py rename to src/sender/dataFetchers/getFromZmq.py index bd0a4f9484ed72c909afdf96e68cf0b29c094afc..697d88fccb944c6052ef1aaf3bdab9513c1dd1d5 100644 --- a/src/sender/dataFetchers/getFromQueue.py +++ b/src/sender/dataFetchers/getFromZmq.py @@ -11,6 +11,9 @@ import shutil def setup(log, prop): + #TODO + # check if prop has correct format + # Create zmq socket socket = prop["context"].socket(zmq.PULL) connectionStr = "tcp://{ip}:{port}".format( ip=prop["extIp"], port=prop["port"] )