From f0e111b5dea9e694e12aabfac45b2e4c9d8b9c74 Mon Sep 17 00:00:00 2001
From: Manuela Kuhn <manuela.kuhn@desy.de>
Date: Wed, 30 Mar 2016 16:21:41 +0200
Subject: [PATCH] Integrated http option into DataManger

---
 conf/dataManager.conf                  | 119 +++++++++++++-------
 src/sender/DataManager.py              | 145 +++++++++++++++++--------
 src/sender/dataFetchers/getFromFile.py |   1 +
 src/sender/dataFetchers/getFromZmq.py  |   5 +-
 4 files changed, 180 insertions(+), 90 deletions(-)

diff --git a/conf/dataManager.conf b/conf/dataManager.conf
index 7a1177ec..7570c561 100644
--- a/conf/dataManager.conf
+++ b/conf/dataManager.conf
@@ -1,37 +1,95 @@
+#########################################
+####      Logging configuration      ####
+#########################################
+
+# Path where the logfile will be created
+logfilePath        = /space/projects/zeromq-data-transfer/logs
+#logfilePath        = /home/kuhnm/Arbeit/zeromq-data-transfer/logs
+#logfilePath        = /home/p11user/zeromq-data-transfer/logs
+#logfilePath        = /home/p11user/live-viewer/logs
+
+# Filename used for logging
+logfileName        = dataManager.log
+
+# File size before rollover in B (linux only)
+logfileSize        = 10485760 ; #10 MB
+
+
+#########################################
+####   SignalHandler Configuration   ####
+#########################################
+
+# Port number to receive signals from
+comPort            = 50000
+
+# List of hosts allowed to connect
+whitelist          = ["localhost", "zitpcx19282", "zitpcx22614", "lsdma-lab04", "haspp11eval01", "it-hpc-cxi03"]
+#whitelist          = ["localhost", "haspp11eval01", "it-hpc-cxi03"]
+
+# ZMQ port to get new requests
+requestPort        = 50001
+# ZMQ port to forward requests
+requestFwPort      = 50002
+
+
+#########################################
+####   EventDetector Configuration   ####
+#########################################
+
+# Type of event detector to use (options are: InotifyxDetector, WatchdogDetector, ZmqDetector, HttpGetDetector)
+#eventDetectorType  = InotifyxDetector
+#eventDetectorType  = WatchdogDetector
+eventDetectorType  = ZmqDetector
+#eventDetectorType  = HttpDetector
+
 # Directory to be monitor for changes
 # Inside this directory only the subdirectories "commissioning", "current" and "local" are monitored
+# (only needed if eventDetector is InotifyxDetector or WatchdogDetector)
 monitoredDir       = /space/projects/zeromq-data-transfer/data/source
 #monitoredDir       = /home/kuhnm/Arbeit/zeromq-data-transfer/data/source
 #monitoredDir       = /rd
-# Target to move the files into
-localTarget        = /space/projects/zeromq-data-transfer/data/target
-#localTarget       = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target
-#localTarget       = /gpfs
 
-# Type of event detector to use (options are: inotifyx, watchdog, zmq)
-eventDetectorType  = InotifyxDetector
-#eventDetectorType  = WatchdogDetector
-#eventDetectorType  = ZmqDetector
 # Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...)
+# (only needed if eventDetector is InotifyxDetector or WatchdogDetector)
 monitoredEventType = IN_CLOSE_WRITE
 #monitoredEventType  = IN_MOVED_TO
 # Subdirectories of watchDir to be monitored
+# (only needed if eventDetector is InotifyxDetector or WatchdogDetector)
 monitoredSubdirs   = ["commissioning", "current", "local"]
 # The formats to be monitored, files in an other format will be be neglected
+# (only needed if eventDetector is InotifyxDetector or WatchdogDetector)
 monitoredFormats   = [".tif", ".cbf"]
+
 # Time (in seconds) since last modification after which a file will be seen as closed
+# (only needed if eventDetector is WatchdogDetector)
 timeTillClosed     = 2
 
+# ZMQ port to get events from (only needed if eventDetectorType is ZmqDetector)
+eventPort          = 50003
+
+# Supply a scan prefix. Otherwise the prefix is read from the tango server.
+# (only needed if eventDetectorType is HttpDetector)
+prefix             = None
+# Tango device proxy for the detector
+# (only needed if eventDetectorType is HttpDetector)
+detectorDevice     = "haspp10lab:10000/p10/eigerdectris/lab.01"
+# Tango device proxy for the filewriter
+# (only needed if eventDetectorType is HttpDetector)
+filewriterDevice   = "haspp10lab:10000/p10/eigerfilewriter/lab.01"
+
+
+#########################################
+####    DataFetcher Configuration    ####
+#########################################
+
 # Module with methods specifying how to get the data (options are "getFromFile", "getFromZmq")
 dataFetcherType    = getFromFile
 #dataFetcherType    = getFromZmq
-# If "getFromZmq" is specified as dataFetcherType is needs a port to listen to
-dataFetcherPort    = 50010
+#dataFetcherType    = getFromHttp
 
+# If "getFromZmq" is specified as dataFetcherType it needs a port to listen to
+dataFetcherPort    = 50010
 
-# List of hosts allowed to connect
-whitelist          = ["localhost", "zitpcx19282", "zitpcx22614", "lsdma-lab04", "haspp11eval01", "it-hpc-cxi03"]
-#whitelist          = ["localhost", "haspp11eval01", "it-hpc-cxi03"]
 
 # Number of parallel data streams
 # if this number is modifified, the port numbers also have to be adjusted
@@ -44,36 +102,17 @@ fixedStreamHost    = zitpcx19282
 # Fixed Port to send the data to with highest priority
 fixedStreamPort    = 50100
 
-# Port number to receive signals from
-comPort            = 50000
-
-# ZMQ port to get new requests
-requestPort        = 50001
-# ZMQ port to forward requests
-requestFwPort      = 50002
-
-# ZMQ port to get events from (only needed if eventDetectorType is zmq)
-eventPort          = 50003
-
-# ZMQ-router port which coordinates the load-balancing to the worker-processes
-routerPort         = 50004
-
-# ZMQ-pull-socket port which deletes/moves given files
-cleanerPort        = 50005
-
 # Chunk size of file-parts getting send via zmq
 chunkSize          = 10485760 ; # = 1024*1024*10
 #chunkSize          = 1073741824 ; # = 1024*1024*1024
 
-# Path where the logfile will be created
-logfilePath        = /space/projects/zeromq-data-transfer/logs
-#logfilePath        = /home/kuhnm/Arbeit/zeromq-data-transfer/logs
-#logfilePath        = /home/p11user/zeromq-data-transfer/logs
-#logfilePath        = /home/p11user/live-viewer/logs
-
-# Filename used for logging
-logfileName        = dataManager.log
+# ZMQ-router port which coordinates the load-balancing to the worker-processes
+routerPort         = 50004
 
-# File size before rollover in B (linux only)
-logfileSize        = 10485760 ; #10 MB
+# Target to move the files into
+localTarget        = /space/projects/zeromq-data-transfer/data/target
+#localTarget       = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target
+#localTarget       = /gpfs
 
+# ZMQ-pull-socket port which deletes/moves given files
+cleanerPort        = 50005
diff --git a/src/sender/DataManager.py b/src/sender/DataManager.py
index f3b164b0..9bd3ab8b 100644
--- a/src/sender/DataManager.py
+++ b/src/sender/DataManager.py
@@ -38,41 +38,14 @@ def argumentParsing():
     config = ConfigParser.RawConfigParser()
     config.readfp(helpers.FakeSecHead(open(configFile)))
 
-    logfilePath        = config.get('asection', 'logfilePath')
-    logfileName        = config.get('asection', 'logfileName')
-    logfileSize        = config.get('asection', 'logfileSize') #100*1048576
-
-    comPort            = config.get('asection', 'comPort')
-    whitelist          = json.loads(config.get('asection', 'whitelist'))
-
-    requestPort        = config.get('asection', 'requestPort')
-    requestFwPort      = config.get('asection', 'requestFwPort')
-
-    eventDetectorType  = config.get('asection', 'eventDetectorType')
-    monitoredDir       = config.get('asection', 'monitoredDir')
-    monitoredEventType = config.get('asection', 'monitoredEventType')
-    monitoredSubdirs   = json.loads(config.get('asection', 'monitoredSubdirs'))
-    monitoredFormats   = json.loads(config.get('asection', 'monitoredFormats'))
-    timeTillClosed     = int(config.get('asection', 'timeTillClosed'))
-
-    dataFetcherType    = config.get('asection', 'dataFetcherType')
-    dataFetcherPort    = config.get('asection', 'dataFetcherPort')
-
-    useDataStream      = config.getboolean('asection', 'useDataStream')
-    fixedStreamHost    = config.get('asection', 'fixedStreamHost')
-    fixedStreamPort    = config.get('asection', 'fixedStreamPort')
-
-    numberOfStreams    = config.get('asection', 'numberOfStreams')
-    chunkSize          = int(config.get('asection', 'chunkSize'))
-
-    eventPort          = config.get('asection', 'eventPort')
-    routerPort         = config.get('asection', 'routerPort')
+    parser = argparse.ArgumentParser()
 
-    localTarget        = config.get('asection', 'localTarget')
-    cleanerPort        = config.get('asection', 'cleanerPort')
+    # Logging
 
+    logfilePath        = config.get('asection', 'logfilePath')
+    logfileName        = config.get('asection', 'logfileName')
+    logfileSize        = config.get('asection', 'logfileSize')
 
-    parser = argparse.ArgumentParser()
     parser.add_argument("--logfilePath"       , type    = str,
                                                 help    = "Path where the logfile will be created (default=" + str(logfilePath) + ")",
                                                 default = logfilePath )
@@ -88,6 +61,14 @@ def argumentParsing():
                                                 help    = "Display logging on screen (options are CRITICAL, ERROR, WARNING, INFO, DEBUG)",
                                                 default = False )
 
+    # SignalHandler config
+
+    comPort            = config.get('asection', 'comPort')
+    whitelist          = json.loads(config.get('asection', 'whitelist'))
+
+    requestPort        = config.get('asection', 'requestPort')
+    requestFwPort      = config.get('asection', 'requestFwPort')
+
     parser.add_argument("--comPort"           , type    = str,
                                                 help    = "Port number to receive signals (default=" + str(comPort) + ")",
                                                 default = comPort )
@@ -102,33 +83,90 @@ def argumentParsing():
                                                 help    = "ZMQ port to forward requests (default=" + str(requestFwPort) + ")",
                                                 default = requestFwPort )
 
+    # EventDetector config
+
+    eventDetectorType  = config.get('asection', 'eventDetectorType')
+    # for InotifyxDetector and WatchdogDetector:
+    monitoredDir       = config.get('asection', 'monitoredDir')
+    monitoredEventType = config.get('asection', 'monitoredEventType')
+    monitoredSubdirs   = json.loads(config.get('asection', 'monitoredSubdirs'))
+    monitoredFormats   = json.loads(config.get('asection', 'monitoredFormats'))
+    # for WatchdogDetector:
+    timeTillClosed     = int(config.get('asection', 'timeTillClosed'))
+    # for HttpGetDetector:
+    prefix             = config.get('asection', 'prefix')
+    detectorDevice     = config.get('asection', 'detectorDevice')
+    filewriterDevice   = config.get('asection', 'filewriterDevice')
+
     parser.add_argument("--eventDetectorType" , type    = str,
                                                 help    = "Type of event detector to use (default=" + str(eventDetectorType) + ")",
                                                 default = eventDetectorType )
     parser.add_argument("--monitoredDir"      , type    = str,
                                                 help    = "Directory to be monitor for changes; inside this directory only the specified \
-                                                            subdirectories are monitred (default=" + str(monitoredDir) + ")",
+                                                           subdirectories are monitred (only needed if eventDetector is InotifyxDetector \
+                                                           or WatchdogDetector; default=" + str(monitoredDir) + ")",
                                                 default = monitoredDir )
     parser.add_argument("--monitoredEventType", type    = str,
-                                                help    = "Event type of files to be monitored (default=" + str(monitoredEventType) + ")",
+                                                help    = "Event type of files to be monitored (only needed if eventDetector is InotifyxDetector \
+                                                           or WatchdogDetector; default=" + str(monitoredEventType) + ")",
                                                 default = monitoredEventType )
     parser.add_argument("--monitoredSubdirs"  , type    = str,
-                                                help    = "Subdirectories of 'monitoredDirs' to be monitored (default=" + str(monitoredSubdirs) + ")",
+                                                help    = "Subdirectories of 'monitoredDirs' to be monitored (only needed if eventDetector is \
+                                                           InotifyxDetector or WatchdogDetector; default=" + str(monitoredSubdirs) + ")",
                                                 default = monitoredSubdirs )
     parser.add_argument("--monitoredFormats"  , type    = str,
                                                 help    = "The formats to be monitored, files in an other format will be be neglected \
-                                                           (default=" + str(monitoredFormats) + ")",
+                                                           (only needed if eventDetector is InotifyxDetector or WatchdogDetector; \
+                                                           default=" + str(monitoredFormats) + ")",
                                                 default = monitoredFormats )
     parser.add_argument("--timeTillClosed"    , type    = str,
                                                 help    = "Time (in seconds) since last modification after which a file will be seen as closed \
-                                                           (default=" + str(timeTillClosed) + ")",
+                                                           (only needed if eventDetectorType is WatchdogDetector; default=" + str(timeTillClosed) + ")",
                                                 default = timeTillClosed )
 
+    parser.add_argument("--eventPort"         , type    = str,
+                                                help    = "ZMQ port to get events from \
+                                                           (only needed if eventDetectorType is ZmqDetector; default=" + str(eventPort) + ")",
+                                                default = eventPort )
+
+    parser.add_argument("--prefix"            , type    = str,
+                                                help    = "Supply a scan prefix. Otherwise the prefix is read from the tango server \
+                                                           (only needed if eventDetectorType is HttpDetector; default=" + str(prefix) + ")",
+                                                default = prefix )
+    parser.add_argument("--detectorDevice"    , type    = str,
+                                                help    = "Tango device proxy for the detector \
+                                                           (only needed if eventDetectorType is HttpDetector; default=" + str(detectorDevice) + ")",
+                                                default = detectorDevice )
+    parser.add_argument("--filewriterDevice"  , type    = str,
+                                                help    = "Tango device proxy for the filewriter \
+                                                           (only needed if eventDetectorType is HttpDetector; default=" + str(filewriterDevice) + ")",
+                                                default = filewriterDevice )
+
+    # DataFetcher config
+
+    dataFetcherType    = config.get('asection', 'dataFetcherType')
+
+    # for getFromZMQ:
+    dataFetcherPort    = config.get('asection', 'dataFetcherPort')
+
+    useDataStream      = config.getboolean('asection', 'useDataStream')
+    fixedStreamHost    = config.get('asection', 'fixedStreamHost')
+    fixedStreamPort    = config.get('asection', 'fixedStreamPort')
+
+    numberOfStreams    = config.get('asection', 'numberOfStreams')
+    chunkSize          = int(config.get('asection', 'chunkSize'))
+
+    eventPort          = config.get('asection', 'eventPort')
+    routerPort         = config.get('asection', 'routerPort')
+
+    localTarget        = config.get('asection', 'localTarget')
+    cleanerPort        = config.get('asection', 'cleanerPort')
+
     parser.add_argument("--dataFetcherType"   , type    = str,
                                                 help    = "Module with methods specifying how to get the data (default=" + str(dataFetcherType) + ")",
                                                 default = dataFetcherType )
     parser.add_argument("--dataFetcherPort"   , type    = str,
-                                                help    = "If 'getFromZmq is specified as dataFetcherType is needs a port to listen to \
+                                                help    = "If 'getFromZmq is specified as dataFetcherType it needs a port to listen to \
                                                            (default=" + str(dataFetcherType) + ")",
                                                 default = dataFetcherPort )
 
@@ -138,11 +176,11 @@ def argumentParsing():
                                                 default = useDataStream )
     parser.add_argument("--fixedStreamHost"   , type    = str,
                                                 help    = "Fixed host to send the data to with highest priority \
-                                                        (only active is useDataStream is set; default=" + str(fixedStreamHost) + ")",
+                                                           (only active if useDataStream is set; default=" + str(fixedStreamHost) + ")",
                                                 default = fixedStreamHost )
     parser.add_argument("--fixedStreamPort"   , type    = str,
                                                 help    = "Fixed port to send the data to with highest priority \
-                                                        (only active is useDataStream is set; default=" + str(fixedStreamPort) + ")",
+                                                           (only active if useDataStream is set; default=" + str(fixedStreamPort) + ")",
                                                 default = fixedStreamPort )
     parser.add_argument("--numberOfStreams"   , type    = int,
                                                 help    = "Number of parallel data streams (default=" + str(numberOfStreams) + ")",
@@ -151,10 +189,6 @@ def argumentParsing():
                                                 help    = "Chunk size of file-parts getting send via ZMQ (default=" + str(chunkSize) + ")",
                                                 default = chunkSize )
 
-    parser.add_argument("--eventPort"         , type    = str,
-                                                help    = "ZMQ port to get events from (only needed if eventDetectorType is zmq; \
-                                                           default=" + str(eventPort) + ")",
-                                                default = eventPort )
     parser.add_argument("--routerPort"        , type    = str,
                                                 help    = "ZMQ-router port which coordinates the load-balancing \
                                                            to the worker-processes (default=" + str(routerPort) + ")",
@@ -170,9 +204,10 @@ def argumentParsing():
 
     arguments         = parser.parse_args()
 
-    logfilePath       = str(arguments.logfilePath)
-    logfileName       = str(arguments.logfileName)
-    logfileFullPath   = os.path.join(logfilePath, logfileName)
+    # Check given arguments
+
+    logfilePath       = arguments.logfilePath
+    logfileName       = arguments.logfileName
     verbose           = arguments.verbose
     onScreen          = arguments.onScreen
 
@@ -248,6 +283,7 @@ class DataManager():
         self.requestPort      = arguments.requestPort
         self.requestFwPort    = arguments.requestFwPort
 
+        # Assemble configuration for eventDetector
         self.log.debug("Configured type of eventDetector: " + arguments.eventDetectorType)
         if arguments.eventDetectorType == "InotifyxDetector":
             self.eventDetectorConfig = {
@@ -271,10 +307,18 @@ class DataManager():
                     "eventDetectorType" : arguments.eventDetectorType,
                     "eventPort"         : arguments.eventPort,
                     "numberOfStreams"   : arguments.numberOfStreams,
-                    "context"           : None,
+                    "context"           : None
+                    }
+        elif arguments.eventDetectorType == "HttpDetector":
+            self.eventDetectorConfig = {
+                    "eventDetectorType" : arguments.eventDetectorType,
+                    "prefix"            : arguments.prefix,
+                    "detectorDevice"    : arguments.detectorDevice,
+                    "filewriterDevice"  : arguments.filewriterDevice
                     }
 
 
+        # Assemble configuration for dataFetcher
         self.log.debug("Configured Type of dataFetcher: " + arguments.dataFetcherType)
         if arguments.dataFetcherType == "getFromFile":
             self.dataFetcherProp = {
@@ -288,6 +332,11 @@ class DataManager():
                     "extIp"       : "0.0.0.0",
                     "port"        : arguments.dataFetcherPort,
                     }
+        elif arguments.dataFetcherType == "getFromHttp":
+            self.dataFetcherProp = {
+                    "type"        : arguments.dataFetcherType,
+                    "session"     : None
+                    }
 
 
         if arguments.useDataStream:
diff --git a/src/sender/dataFetchers/getFromFile.py b/src/sender/dataFetchers/getFromFile.py
index 7167a188..587fc978 100644
--- a/src/sender/dataFetchers/getFromFile.py
+++ b/src/sender/dataFetchers/getFromFile.py
@@ -117,6 +117,7 @@ def sendData (log, targets, sourceFile, metadata, openConnections, context, prop
             chunkPayloadMetadata = metadata.copy()
             chunkPayloadMetadata["chunkNumber"] = chunkNumber
             chunkPayloadMetadataJson = cPickle.dumps(chunkPayloadMetadata)
+
             chunkPayload = []
             chunkPayload.append(chunkPayloadMetadataJson)
             chunkPayload.append(fileContent)
diff --git a/src/sender/dataFetchers/getFromZmq.py b/src/sender/dataFetchers/getFromZmq.py
index 4fd623c8..e43b3a54 100644
--- a/src/sender/dataFetchers/getFromZmq.py
+++ b/src/sender/dataFetchers/getFromZmq.py
@@ -164,7 +164,7 @@ def sendData (log, targets, sourceFile, metadata, openConnections, context, prop
         log.error("Unable to send multipart-message for file " + str(sourceFile), exc_info=True)
 
 
-def finishDataHandling (log, sourceFile, targetFile, removeFlag = False):
+def finishDataHandling (log, sourceFile, targetFile, prop):
     pass
 
 
@@ -250,7 +250,7 @@ if __name__ == '__main__':
     openConnections = dict()
 
     dataFetcherProp = {
-            "type"       : "getFromQueue",
+            "type"       : "getFromZmq",
             "context"    : context,
             "extIp"      : extIp,
             "port"       : dataFwPort
@@ -276,6 +276,7 @@ if __name__ == '__main__':
     except KeyboardInterrupt:
         pass
     finally:
+        dataFwSocket.close(0)
         receivingSocket.close(0)
         receivingSocket2.close(0)
         clean(dataFetcherProp)
-- 
GitLab