Skip to content
Snippets Groups Projects
Commit f0e111b5 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Integrated http option into DataManger

parent b98bc1e4
No related branches found
No related tags found
No related merge requests found
#########################################
#### Logging configuration ####
#########################################
# Path where the logfile will be created
logfilePath = /space/projects/zeromq-data-transfer/logs
#logfilePath = /home/kuhnm/Arbeit/zeromq-data-transfer/logs
#logfilePath = /home/p11user/zeromq-data-transfer/logs
#logfilePath = /home/p11user/live-viewer/logs
# Filename used for logging
logfileName = dataManager.log
# File size before rollover in B (linux only)
logfileSize = 10485760 ; #10 MB
#########################################
#### SignalHandler Configuration ####
#########################################
# Port number to receive signals from
comPort = 50000
# List of hosts allowed to connect
whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "lsdma-lab04", "haspp11eval01", "it-hpc-cxi03"]
#whitelist = ["localhost", "haspp11eval01", "it-hpc-cxi03"]
# ZMQ port to get new requests
requestPort = 50001
# ZMQ port to forward requests
requestFwPort = 50002
#########################################
#### EventDetector Configuration ####
#########################################
# Type of event detector to use (options are: InotifyxDetector, WatchdogDetector, ZmqDetector, HttpGetDetector)
#eventDetectorType = InotifyxDetector
#eventDetectorType = WatchdogDetector
eventDetectorType = ZmqDetector
#eventDetectorType = HttpDetector
# Directory to be monitor for changes
# Inside this directory only the subdirectories "commissioning", "current" and "local" are monitored
# (only needed if eventDetector is InotifyxDetector or WatchdogDetector)
monitoredDir = /space/projects/zeromq-data-transfer/data/source
#monitoredDir = /home/kuhnm/Arbeit/zeromq-data-transfer/data/source
#monitoredDir = /rd
# Target to move the files into
localTarget = /space/projects/zeromq-data-transfer/data/target
#localTarget = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target
#localTarget = /gpfs
# Type of event detector to use (options are: inotifyx, watchdog, zmq)
eventDetectorType = InotifyxDetector
#eventDetectorType = WatchdogDetector
#eventDetectorType = ZmqDetector
# Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...)
# (only needed if eventDetector is InotifyxDetector or WatchdogDetector)
monitoredEventType = IN_CLOSE_WRITE
#monitoredEventType = IN_MOVED_TO
# Subdirectories of watchDir to be monitored
# (only needed if eventDetector is InotifyxDetector or WatchdogDetector)
monitoredSubdirs = ["commissioning", "current", "local"]
# The formats to be monitored, files in an other format will be be neglected
# (only needed if eventDetector is InotifyxDetector or WatchdogDetector)
monitoredFormats = [".tif", ".cbf"]
# Time (in seconds) since last modification after which a file will be seen as closed
# (only needed if eventDetector is WatchdogDetector)
timeTillClosed = 2
# ZMQ port to get events from (only needed if eventDetectorType is ZmqDetector)
eventPort = 50003
# Supply a scan prefix. Otherwise the prefix is read from the tango server.
# (only needed if eventDetectorType is HttpDetector)
prefix = None
# Tango device proxy for the detector
# (only needed if eventDetectorType is HttpDetector)
detectorDevice = "haspp10lab:10000/p10/eigerdectris/lab.01"
# Tango device proxy for the filewriter
# (only needed if eventDetectorType is HttpDetector)
filewriterDevice = "haspp10lab:10000/p10/eigerfilewriter/lab.01"
#########################################
#### DataFetcher Configuration ####
#########################################
# Module with methods specifying how to get the data (options are "getFromFile", "getFromZmq")
dataFetcherType = getFromFile
#dataFetcherType = getFromZmq
# If "getFromZmq" is specified as dataFetcherType is needs a port to listen to
dataFetcherPort = 50010
#dataFetcherType = getFromHttp
# If "getFromZmq" is specified as dataFetcherType it needs a port to listen to
dataFetcherPort = 50010
# List of hosts allowed to connect
whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "lsdma-lab04", "haspp11eval01", "it-hpc-cxi03"]
#whitelist = ["localhost", "haspp11eval01", "it-hpc-cxi03"]
# Number of parallel data streams
# if this number is modifified, the port numbers also have to be adjusted
......@@ -44,36 +102,17 @@ fixedStreamHost = zitpcx19282
# Fixed Port to send the data to with highest priority
fixedStreamPort = 50100
# Port number to receive signals from
comPort = 50000
# ZMQ port to get new requests
requestPort = 50001
# ZMQ port to forward requests
requestFwPort = 50002
# ZMQ port to get events from (only needed if eventDetectorType is zmq)
eventPort = 50003
# ZMQ-router port which coordinates the load-balancing to the worker-processes
routerPort = 50004
# ZMQ-pull-socket port which deletes/moves given files
cleanerPort = 50005
# Chunk size of file-parts getting send via zmq
chunkSize = 10485760 ; # = 1024*1024*10
#chunkSize = 1073741824 ; # = 1024*1024*1024
# Path where the logfile will be created
logfilePath = /space/projects/zeromq-data-transfer/logs
#logfilePath = /home/kuhnm/Arbeit/zeromq-data-transfer/logs
#logfilePath = /home/p11user/zeromq-data-transfer/logs
#logfilePath = /home/p11user/live-viewer/logs
# Filename used for logging
logfileName = dataManager.log
# ZMQ-router port which coordinates the load-balancing to the worker-processes
routerPort = 50004
# File size before rollover in B (linux only)
logfileSize = 10485760 ; #10 MB
# Target to move the files into
localTarget = /space/projects/zeromq-data-transfer/data/target
#localTarget = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target
#localTarget = /gpfs
# ZMQ-pull-socket port which deletes/moves given files
cleanerPort = 50005
......@@ -38,41 +38,14 @@ def argumentParsing():
config = ConfigParser.RawConfigParser()
config.readfp(helpers.FakeSecHead(open(configFile)))
logfilePath = config.get('asection', 'logfilePath')
logfileName = config.get('asection', 'logfileName')
logfileSize = config.get('asection', 'logfileSize') #100*1048576
comPort = config.get('asection', 'comPort')
whitelist = json.loads(config.get('asection', 'whitelist'))
requestPort = config.get('asection', 'requestPort')
requestFwPort = config.get('asection', 'requestFwPort')
eventDetectorType = config.get('asection', 'eventDetectorType')
monitoredDir = config.get('asection', 'monitoredDir')
monitoredEventType = config.get('asection', 'monitoredEventType')
monitoredSubdirs = json.loads(config.get('asection', 'monitoredSubdirs'))
monitoredFormats = json.loads(config.get('asection', 'monitoredFormats'))
timeTillClosed = int(config.get('asection', 'timeTillClosed'))
dataFetcherType = config.get('asection', 'dataFetcherType')
dataFetcherPort = config.get('asection', 'dataFetcherPort')
useDataStream = config.getboolean('asection', 'useDataStream')
fixedStreamHost = config.get('asection', 'fixedStreamHost')
fixedStreamPort = config.get('asection', 'fixedStreamPort')
numberOfStreams = config.get('asection', 'numberOfStreams')
chunkSize = int(config.get('asection', 'chunkSize'))
eventPort = config.get('asection', 'eventPort')
routerPort = config.get('asection', 'routerPort')
parser = argparse.ArgumentParser()
localTarget = config.get('asection', 'localTarget')
cleanerPort = config.get('asection', 'cleanerPort')
# Logging
logfilePath = config.get('asection', 'logfilePath')
logfileName = config.get('asection', 'logfileName')
logfileSize = config.get('asection', 'logfileSize')
parser = argparse.ArgumentParser()
parser.add_argument("--logfilePath" , type = str,
help = "Path where the logfile will be created (default=" + str(logfilePath) + ")",
default = logfilePath )
......@@ -88,6 +61,14 @@ def argumentParsing():
help = "Display logging on screen (options are CRITICAL, ERROR, WARNING, INFO, DEBUG)",
default = False )
# SignalHandler config
comPort = config.get('asection', 'comPort')
whitelist = json.loads(config.get('asection', 'whitelist'))
requestPort = config.get('asection', 'requestPort')
requestFwPort = config.get('asection', 'requestFwPort')
parser.add_argument("--comPort" , type = str,
help = "Port number to receive signals (default=" + str(comPort) + ")",
default = comPort )
......@@ -102,33 +83,90 @@ def argumentParsing():
help = "ZMQ port to forward requests (default=" + str(requestFwPort) + ")",
default = requestFwPort )
# EventDetector config
eventDetectorType = config.get('asection', 'eventDetectorType')
# for InotifyxDetector and WatchdogDetector:
monitoredDir = config.get('asection', 'monitoredDir')
monitoredEventType = config.get('asection', 'monitoredEventType')
monitoredSubdirs = json.loads(config.get('asection', 'monitoredSubdirs'))
monitoredFormats = json.loads(config.get('asection', 'monitoredFormats'))
# for WatchdogDetector:
timeTillClosed = int(config.get('asection', 'timeTillClosed'))
# for HttpGetDetector:
prefix = config.get('asection', 'prefix')
detectorDevice = config.get('asection', 'detectorDevice')
filewriterDevice = config.get('asection', 'filewriterDevice')
parser.add_argument("--eventDetectorType" , type = str,
help = "Type of event detector to use (default=" + str(eventDetectorType) + ")",
default = eventDetectorType )
parser.add_argument("--monitoredDir" , type = str,
help = "Directory to be monitor for changes; inside this directory only the specified \
subdirectories are monitred (default=" + str(monitoredDir) + ")",
subdirectories are monitred (only needed if eventDetector is InotifyxDetector \
or WatchdogDetector; default=" + str(monitoredDir) + ")",
default = monitoredDir )
parser.add_argument("--monitoredEventType", type = str,
help = "Event type of files to be monitored (default=" + str(monitoredEventType) + ")",
help = "Event type of files to be monitored (only needed if eventDetector is InotifyxDetector \
or WatchdogDetector; default=" + str(monitoredEventType) + ")",
default = monitoredEventType )
parser.add_argument("--monitoredSubdirs" , type = str,
help = "Subdirectories of 'monitoredDirs' to be monitored (default=" + str(monitoredSubdirs) + ")",
help = "Subdirectories of 'monitoredDirs' to be monitored (only needed if eventDetector is \
InotifyxDetector or WatchdogDetector; default=" + str(monitoredSubdirs) + ")",
default = monitoredSubdirs )
parser.add_argument("--monitoredFormats" , type = str,
help = "The formats to be monitored, files in an other format will be be neglected \
(default=" + str(monitoredFormats) + ")",
(only needed if eventDetector is InotifyxDetector or WatchdogDetector; \
default=" + str(monitoredFormats) + ")",
default = monitoredFormats )
parser.add_argument("--timeTillClosed" , type = str,
help = "Time (in seconds) since last modification after which a file will be seen as closed \
(default=" + str(timeTillClosed) + ")",
(only needed if eventDetectorType is WatchdogDetector; default=" + str(timeTillClosed) + ")",
default = timeTillClosed )
parser.add_argument("--eventPort" , type = str,
help = "ZMQ port to get events from \
(only needed if eventDetectorType is ZmqDetector; default=" + str(eventPort) + ")",
default = eventPort )
parser.add_argument("--prefix" , type = str,
help = "Supply a scan prefix. Otherwise the prefix is read from the tango server \
(only needed if eventDetectorType is HttpDetector; default=" + str(prefix) + ")",
default = prefix )
parser.add_argument("--detectorDevice" , type = str,
help = "Tango device proxy for the detector \
(only needed if eventDetectorType is HttpDetector; default=" + str(detectorDevice) + ")",
default = detectorDevice )
parser.add_argument("--filewriterDevice" , type = str,
help = "Tango device proxy for the filewriter \
(only needed if eventDetectorType is HttpDetector; default=" + str(filewriterDevice) + ")",
default = filewriterDevice )
# DataFetcher config
dataFetcherType = config.get('asection', 'dataFetcherType')
# for getFromZMQ:
dataFetcherPort = config.get('asection', 'dataFetcherPort')
useDataStream = config.getboolean('asection', 'useDataStream')
fixedStreamHost = config.get('asection', 'fixedStreamHost')
fixedStreamPort = config.get('asection', 'fixedStreamPort')
numberOfStreams = config.get('asection', 'numberOfStreams')
chunkSize = int(config.get('asection', 'chunkSize'))
eventPort = config.get('asection', 'eventPort')
routerPort = config.get('asection', 'routerPort')
localTarget = config.get('asection', 'localTarget')
cleanerPort = config.get('asection', 'cleanerPort')
parser.add_argument("--dataFetcherType" , type = str,
help = "Module with methods specifying how to get the data (default=" + str(dataFetcherType) + ")",
default = dataFetcherType )
parser.add_argument("--dataFetcherPort" , type = str,
help = "If 'getFromZmq is specified as dataFetcherType is needs a port to listen to \
help = "If 'getFromZmq is specified as dataFetcherType it needs a port to listen to \
(default=" + str(dataFetcherType) + ")",
default = dataFetcherPort )
......@@ -138,11 +176,11 @@ def argumentParsing():
default = useDataStream )
parser.add_argument("--fixedStreamHost" , type = str,
help = "Fixed host to send the data to with highest priority \
(only active is useDataStream is set; default=" + str(fixedStreamHost) + ")",
(only active if useDataStream is set; default=" + str(fixedStreamHost) + ")",
default = fixedStreamHost )
parser.add_argument("--fixedStreamPort" , type = str,
help = "Fixed port to send the data to with highest priority \
(only active is useDataStream is set; default=" + str(fixedStreamPort) + ")",
(only active if useDataStream is set; default=" + str(fixedStreamPort) + ")",
default = fixedStreamPort )
parser.add_argument("--numberOfStreams" , type = int,
help = "Number of parallel data streams (default=" + str(numberOfStreams) + ")",
......@@ -151,10 +189,6 @@ def argumentParsing():
help = "Chunk size of file-parts getting send via ZMQ (default=" + str(chunkSize) + ")",
default = chunkSize )
parser.add_argument("--eventPort" , type = str,
help = "ZMQ port to get events from (only needed if eventDetectorType is zmq; \
default=" + str(eventPort) + ")",
default = eventPort )
parser.add_argument("--routerPort" , type = str,
help = "ZMQ-router port which coordinates the load-balancing \
to the worker-processes (default=" + str(routerPort) + ")",
......@@ -170,9 +204,10 @@ def argumentParsing():
arguments = parser.parse_args()
logfilePath = str(arguments.logfilePath)
logfileName = str(arguments.logfileName)
logfileFullPath = os.path.join(logfilePath, logfileName)
# Check given arguments
logfilePath = arguments.logfilePath
logfileName = arguments.logfileName
verbose = arguments.verbose
onScreen = arguments.onScreen
......@@ -248,6 +283,7 @@ class DataManager():
self.requestPort = arguments.requestPort
self.requestFwPort = arguments.requestFwPort
# Assemble configuration for eventDetector
self.log.debug("Configured type of eventDetector: " + arguments.eventDetectorType)
if arguments.eventDetectorType == "InotifyxDetector":
self.eventDetectorConfig = {
......@@ -271,10 +307,18 @@ class DataManager():
"eventDetectorType" : arguments.eventDetectorType,
"eventPort" : arguments.eventPort,
"numberOfStreams" : arguments.numberOfStreams,
"context" : None,
"context" : None
}
elif arguments.eventDetectorType == "HttpDetector":
self.eventDetectorConfig = {
"eventDetectorType" : arguments.eventDetectorType,
"prefix" : arguments.prefix,
"detectorDevice" : arguments.detectorDevice,
"filewriterDevice" : arguments.filewriterDevice
}
# Assemble configuration for dataFetcher
self.log.debug("Configured Type of dataFetcher: " + arguments.dataFetcherType)
if arguments.dataFetcherType == "getFromFile":
self.dataFetcherProp = {
......@@ -288,6 +332,11 @@ class DataManager():
"extIp" : "0.0.0.0",
"port" : arguments.dataFetcherPort,
}
elif arguments.dataFetcherType == "getFromHttp":
self.dataFetcherProp = {
"type" : arguments.dataFetcherType,
"session" : None
}
if arguments.useDataStream:
......
......@@ -117,6 +117,7 @@ def sendData (log, targets, sourceFile, metadata, openConnections, context, prop
chunkPayloadMetadata = metadata.copy()
chunkPayloadMetadata["chunkNumber"] = chunkNumber
chunkPayloadMetadataJson = cPickle.dumps(chunkPayloadMetadata)
chunkPayload = []
chunkPayload.append(chunkPayloadMetadataJson)
chunkPayload.append(fileContent)
......
......@@ -164,7 +164,7 @@ def sendData (log, targets, sourceFile, metadata, openConnections, context, prop
log.error("Unable to send multipart-message for file " + str(sourceFile), exc_info=True)
def finishDataHandling (log, sourceFile, targetFile, removeFlag = False):
def finishDataHandling (log, sourceFile, targetFile, prop):
pass
......@@ -250,7 +250,7 @@ if __name__ == '__main__':
openConnections = dict()
dataFetcherProp = {
"type" : "getFromQueue",
"type" : "getFromZmq",
"context" : context,
"extIp" : extIp,
"port" : dataFwPort
......@@ -276,6 +276,7 @@ if __name__ == '__main__':
except KeyboardInterrupt:
pass
finally:
dataFwSocket.close(0)
receivingSocket.close(0)
receivingSocket2.close(0)
clean(dataFetcherProp)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment