-
Manuela Kuhn authoredManuela Kuhn authored
DataManager.py 27.87 KiB
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import argparse
import zmq
import os
import logging
import sys
import json
import time
import cPickle
from multiprocessing import Process, freeze_support, Queue
import ConfigParser
from SignalHandler import SignalHandler
from TaskProvider import TaskProvider
from DataDispatcher import DataDispatcher
try:
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
except:
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) )))
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
CONFIG_PATH = BASE_PATH + os.sep + "conf"
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
from logutils.queue import QueueHandler
import helpers
from version import __version__
def argumentParsing():
configFile = CONFIG_PATH + os.sep + "dataManager.conf"
config = ConfigParser.RawConfigParser()
config.readfp(helpers.FakeSecHead(open(configFile)))
parser = argparse.ArgumentParser()
# Logging
logfilePath = config.get('asection', 'logfilePath')
logfileName = config.get('asection', 'logfileName')
logfileSize = config.get('asection', 'logfileSize')
parser.add_argument("--logfilePath" , type = str,
help = "Path where the logfile will be created (default=" + str(logfilePath) + ")",
default = logfilePath )
parser.add_argument("--logfileName" , type = str,
help = "Filename used for logging (default=" + str(logfileName) + ")",
default = logfileName )
parser.add_argument("--logfileSize" , type = int,
help = "File size before rollover in B (linux only; (default=" + str(logfileSize) + ")",
default = logfileSize )
parser.add_argument("--verbose" , help = "More verbose output",
action = "store_true")
parser.add_argument("--onScreen" , type = str,
help = "Display logging on screen (options are CRITICAL, ERROR, WARNING, INFO, DEBUG)",
default = False )
# SignalHandler config
comPort = config.get('asection', 'comPort')
whitelist = json.loads(config.get('asection', 'whitelist'))
requestPort = config.get('asection', 'requestPort')
requestFwPort = config.get('asection', 'requestFwPort')
parser.add_argument("--comPort" , type = str,
help = "Port number to receive signals (default=" + str(comPort) + ")",
default = comPort )
parser.add_argument("--whitelist" , type = str,
help = "List of hosts allowed to connect (default=" + str(whitelist) + ")",
default = whitelist )
parser.add_argument("--requestPort" , type = str,
help = "ZMQ port to get new requests (default=" + str(requestPort) + ")",
default = requestPort )
parser.add_argument("--requestFwPort" , type = str,
help = "ZMQ port to forward requests (default=" + str(requestFwPort) + ")",
default = requestFwPort )
# EventDetector config
eventDetectorType = config.get('asection', 'eventDetectorType')
# for InotifyxDetector and WatchdogDetector:
monitoredDir = config.get('asection', 'monitoredDir')
monitoredEventType = config.get('asection', 'monitoredEventType')
monitoredSubdirs = json.loads(config.get('asection', 'monitoredSubdirs'))
monitoredFormats = json.loads(config.get('asection', 'monitoredFormats'))
# for WatchdogDetector:
timeTillClosed = int(config.get('asection', 'timeTillClosed'))
# for ZmqDetector:
eventPort = config.get('asection', 'eventPort')
# for HttpGetDetector:
prefix = config.get('asection', 'prefix')
detectorDevice = config.get('asection', 'detectorDevice')
filewriterDevice = config.get('asection', 'filewriterDevice')
parser.add_argument("--eventDetectorType" , type = str,
help = "Type of event detector to use (default=" + str(eventDetectorType) + ")",
default = eventDetectorType )
parser.add_argument("--monitoredDir" , type = str,
help = "Directory to be monitor for changes; inside this directory only the specified \
subdirectories are monitred (only needed if eventDetector is InotifyxDetector \
or WatchdogDetector; default=" + str(monitoredDir) + ")",
default = monitoredDir )
parser.add_argument("--monitoredEventType", type = str,
help = "Event type of files to be monitored (only needed if eventDetector is InotifyxDetector \
or WatchdogDetector; default=" + str(monitoredEventType) + ")",
default = monitoredEventType )
parser.add_argument("--monitoredSubdirs" , type = str,
help = "Subdirectories of 'monitoredDirs' to be monitored (only needed if eventDetector is \
InotifyxDetector or WatchdogDetector; default=" + str(monitoredSubdirs) + ")",
default = monitoredSubdirs )
parser.add_argument("--monitoredFormats" , type = str,
help = "The formats to be monitored, files in an other format will be be neglected \
(only needed if eventDetector is InotifyxDetector or WatchdogDetector; \
default=" + str(monitoredFormats) + ")",
default = monitoredFormats )
parser.add_argument("--timeTillClosed" , type = str,
help = "Time (in seconds) since last modification after which a file will be seen as closed \
(only needed if eventDetectorType is WatchdogDetector; default=" + str(timeTillClosed) + ")",
default = timeTillClosed )
parser.add_argument("--eventPort" , type = str,
help = "ZMQ port to get events from \
(only needed if eventDetectorType is ZmqDetector; default=" + str(eventPort) + ")",
default = eventPort )
parser.add_argument("--prefix" , type = str,
help = "Supply a scan prefix. Otherwise the prefix is read from the tango server \
(only needed if eventDetectorType is HttpDetector; default=" + str(prefix) + ")",
default = prefix )
parser.add_argument("--detectorDevice" , type = str,
help = "Tango device proxy for the detector \
(only needed if eventDetectorType is HttpDetector; default=" + str(detectorDevice) + ")",
default = detectorDevice )
parser.add_argument("--filewriterDevice" , type = str,
help = "Tango device proxy for the filewriter \
(only needed if eventDetectorType is HttpDetector; default=" + str(filewriterDevice) + ")",
default = filewriterDevice )
# DataFetcher config
dataFetcherType = config.get('asection', 'dataFetcherType')
# for getFromZMQ:
dataFetcherPort = config.get('asection', 'dataFetcherPort')
useDataStream = config.getboolean('asection', 'useDataStream')
fixedStreamHost = config.get('asection', 'fixedStreamHost')
fixedStreamPort = config.get('asection', 'fixedStreamPort')
numberOfStreams = config.get('asection', 'numberOfStreams')
chunkSize = int(config.get('asection', 'chunkSize'))
eventPort = config.get('asection', 'eventPort')
routerPort = config.get('asection', 'routerPort')
localTarget = config.get('asection', 'localTarget')
cleanerPort = config.get('asection', 'cleanerPort')
parser.add_argument("--dataFetcherType" , type = str,
help = "Module with methods specifying how to get the data (default=" + str(dataFetcherType) + ")",
default = dataFetcherType )
parser.add_argument("--dataFetcherPort" , type = str,
help = "If 'getFromZmq is specified as dataFetcherType it needs a port to listen to \
(default=" + str(dataFetcherType) + ")",
default = dataFetcherPort )
parser.add_argument("--useDataStream" , type = str,
help = "Enable ZMQ pipe into storage system (if set to false: the file is moved \
into the localTarget) (default=" + str(useDataStream) + ")",
default = useDataStream )
parser.add_argument("--fixedStreamHost" , type = str,
help = "Fixed host to send the data to with highest priority \
(only active if useDataStream is set; default=" + str(fixedStreamHost) + ")",
default = fixedStreamHost )
parser.add_argument("--fixedStreamPort" , type = str,
help = "Fixed port to send the data to with highest priority \
(only active if useDataStream is set; default=" + str(fixedStreamPort) + ")",
default = fixedStreamPort )
parser.add_argument("--numberOfStreams" , type = int,
help = "Number of parallel data streams (default=" + str(numberOfStreams) + ")",
default = numberOfStreams )
parser.add_argument("--chunkSize" , type = int,
help = "Chunk size of file-parts getting send via ZMQ (default=" + str(chunkSize) + ")",
default = chunkSize )
parser.add_argument("--routerPort" , type = str,
help = "ZMQ-router port which coordinates the load-balancing \
to the worker-processes (default=" + str(routerPort) + ")",
default = routerPort )
parser.add_argument("--localTarget" , type = str,
help = "Target to move the files into (default=" + str(localTarget) + ")",
default = localTarget )
parser.add_argument("--cleanerPort" , type = str,
help = "ZMQ-pull-socket port which deletes/moves given file \
(default=" + str(cleanerPort) + ")",
default = cleanerPort )
arguments = parser.parse_args()
# Check given arguments
logfilePath = arguments.logfilePath
logfileName = arguments.logfileName
verbose = arguments.verbose
onScreen = arguments.onScreen
eventDetectorType = arguments.eventDetectorType.lower()
supportedEDTypes = ["inotifyxdetector", "watchdogdetector", "zmqdetector", "httpdetector"]
supportedDFTypes = ["getfromfile", "getfromzmq", "getFromHttp"]
monitoredDir = str(arguments.monitoredDir)
monitoredSubdirs = arguments.monitoredSubdirs
localTarget = str(arguments.localTarget)
useDataStream = arguments.useDataStream
numberOfStreams = arguments.numberOfStreams
# check if logfile is writable
helpers.checkLogFileWritable(logfilePath, logfileName)
# check if the eventDetectorType is supported
helpers.checkEventDetectorType(eventDetectorType, supportedEDTypes)
# check if the dataFetcherType is supported
# helpers.checkDataFetcherType(dataFetcherType, supportedDFTypes)
# check if directories exists
helpers.checkDirExistance(logfilePath)
helpers.checkDirExistance(monitoredDir)
helpers.checkSubDirExistance(monitoredDir, monitoredSubdirs)
if useDataStream:
helpers.checkDirExistance(localTarget)
return arguments
class DataManager():
def __init__ (self, logQueue = None):
arguments = argumentParsing()
logfilePath = arguments.logfilePath
logfileName = arguments.logfileName
logfile = os.path.join(logfilePath, logfileName)
logsize = arguments.logfileSize
verbose = arguments.verbose
onScreen = arguments.onScreen
self.extLogQueue = False
if logQueue:
self.logQueue = logQueue
self.extLogQueue = True
else:
# Get queue
self.logQueue = Queue(-1)
# Get the log Configuration for the lisener
if onScreen:
h1, h2 = helpers.getLogHandlers(logfile, logsize, verbose, onScreen)
# Start queue listener using the stream handler above
self.logQueueListener = helpers.CustomQueueListener(self.logQueue, h1, h2)
else:
h1 = helpers.getLogHandlers(logfile, logsize, verbose, onScreen)
# Start queue listener using the stream handler above
self.logQueueListener = helpers.CustomQueueListener(self.logQueue, h1)
self.logQueueListener.start()
# Create log and set handler to queue handle
self.log = self.getLogger(self.logQueue)
self.comPort = arguments.comPort
self.whitelist = arguments.whitelist
self.requestPort = arguments.requestPort
self.requestFwPort = arguments.requestFwPort
if arguments.useDataStream:
self.fixedStreamId = "{host}:{port}".format( host=arguments.fixedStreamHost, port=arguments.fixedStreamPort )
else:
self.fixedStreamId = None
self.numberOfStreams = arguments.numberOfStreams
self.chunkSize = arguments.chunkSize
self.routerPort = arguments.routerPort
self.localTarget = arguments.localTarget
self.cleanerPort = arguments.cleanerPort
# Assemble configuration for eventDetector
self.log.debug("Configured type of eventDetector: " + arguments.eventDetectorType)
if arguments.eventDetectorType == "InotifyxDetector":
self.eventDetectorConfig = {
"eventDetectorType" : arguments.eventDetectorType,
"monDir" : arguments.monitoredDir,
"monEventType" : arguments.monitoredEventType,
"monSubdirs" : arguments.monitoredSubdirs,
"monSuffixes" : arguments.monitoredFormats
}
elif arguments.eventDetectorType == "WatchdogDetector":
self.eventDetectorConfig = {
"eventDetectorType" : arguments.eventDetectorType,
"monDir" : arguments.monitoredDir,
"monEventType" : arguments.monitoredEventType,
"monSubdirs" : arguments.monitoredSubdirs,
"monSuffixes" : arguments.monitoredFormats,
"timeTillClosed" : arguments.timeTillClosed
}
elif arguments.eventDetectorType == "ZmqDetector":
self.eventDetectorConfig = {
"eventDetectorType" : arguments.eventDetectorType,
"eventPort" : arguments.eventPort,
"numberOfStreams" : self.numberOfStreams,
"context" : None
}
elif arguments.eventDetectorType == "HttpDetector":
self.eventDetectorConfig = {
"eventDetectorType" : arguments.eventDetectorType,
"prefix" : arguments.prefix,
"detectorDevice" : arguments.detectorDevice,
"filewriterDevice" : arguments.filewriterDevice
}
# Assemble configuration for dataFetcher
self.log.debug("Configured Type of dataFetcher: " + arguments.dataFetcherType)
if arguments.dataFetcherType == "getFromFile":
self.dataFetcherProp = {
"type" : arguments.dataFetcherType,
"removeFlag" : False
}
elif arguments.dataFetcherType == "getFromZmq":
self.dataFetcherProp = {
"type" : arguments.dataFetcherType,
"context" : None,
"extIp" : "0.0.0.0",
"port" : arguments.dataFetcherPort,
}
elif arguments.dataFetcherType == "getFromHttp":
self.dataFetcherProp = {
"type" : arguments.dataFetcherType,
"localTarget" : self.localTarget,
"session" : None,
"storeFlag" : True, #TODO add to config
"removeFlag" : False #TODO add to config
}
self.signalHandlerPr = None
self.taskProviderPr = None
self.dataDispatcherPr = []
self.log.info("Version: " + str(__version__))
#create zmq context
# there should be only one context in one process
self.zmqContext = zmq.Context.instance()
self.log.debug("Registering global ZMQ context")
self.run()
# Send all logs to the main process
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def getLogger (self, queue):
# Create log and set handler to queue handle
h = QueueHandler(queue) # Just the one handler needed
logger = logging.getLogger("DataManager")
logger.propagate = False
logger.addHandler(h)
logger.setLevel(logging.DEBUG)
return logger
def run (self):
self.signalHandlerPr = Process ( target = SignalHandler, args = (self.whitelist, self.comPort, self.requestFwPort, self.requestPort, self.logQueue) )
self.signalHandlerPr.start()
# needed, because otherwise the requests for the first files are not forwarded properly
time.sleep(0.5)
self.taskProviderPr = Process ( target = TaskProvider, args = (self.eventDetectorConfig, self.requestFwPort, self.routerPort, self.logQueue) )
self.taskProviderPr.start()
for i in range(self.numberOfStreams):
id = str(i) + "/" + str(self.numberOfStreams)
pr = Process ( target = DataDispatcher, args = ( id, self.routerPort, self.chunkSize, self.fixedStreamId, self.dataFetcherProp,
self.logQueue, self.localTarget) )
pr.start()
self.dataDispatcherPr.append(pr)
def stop (self):
if self.signalHandlerPr:
self.log.info("terminate SignalHandler...")
self.signalHandlerPr.terminate()
self.signalHandlerPr = None
self.log.info("terminate SignalHandler...done")
if self.taskProviderPr:
self.log.info("terminate TaskProvider...")
self.taskProviderPr.terminate()
self.taskProviderPr = None
self.log.info("terminate TaskProvider...done")
for pr in self.dataDispatcherPr:
id = str(self.dataDispatcherPr.index(pr)) + "/" + str(self.numberOfStreams)
self.log.info("terminate DataDispatcher-" + str(id) + "...")
pr.terminate()
pr = None
self.log.info("terminate DataDispatcher-" + str(id) + "...done")
if self.dataDispatcherPr == [ None for i in self.dataDispatcherPr ]:
self.dataDispatcher = []
if not self.extLogQueue and self.logQueueListener:
self.logQueue.put_nowait(None)
self.logQueueListener.stop()
self.logQueueListener = None
def __exit__ (self):
self.stop()
def __def__ (self):
self.stop()
# cannot be defined in "if __name__ == '__main__'" because then it is unbound
# see https://docs.python.org/2/library/multiprocessing.html#windows
class Test_Receiver_Stream():
def __init__(self, comPort, fixedRecvPort, receivingPort, receivingPort2, logQueue):
self.log = self.getLogger(logQueue)
context = zmq.Context.instance()
self.comSocket = context.socket(zmq.REQ)
connectionStr = "tcp://localhost:" + comPort
self.comSocket.connect(connectionStr)
self.log.info("=== comSocket connected to " + connectionStr)
self.fixedRecvSocket = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + fixedRecvPort
self.fixedRecvSocket.bind(connectionStr)
self.log.info("=== fixedRecvSocket connected to " + connectionStr)
self.receivingSocket = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + receivingPort
self.receivingSocket.bind(connectionStr)
self.log.info("=== receivingSocket connected to " + connectionStr)
self.receivingSocket2 = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + receivingPort2
self.receivingSocket2.bind(connectionStr)
self.log.info("=== receivingSocket2 connected to " + connectionStr)
self.sendSignal("START_STREAM", receivingPort, 1)
self.sendSignal("START_STREAM", receivingPort2, 0)
self.run()
# Send all logs to the main process
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def getLogger (self, queue):
# Create log and set handler to queue handle
h = QueueHandler(queue) # Just the one handler needed
logger = logging.getLogger("Test_Receiver_Stream")
logger.propagate = False
logger.addHandler(h)
logger.setLevel(logging.DEBUG)
return logger
def sendSignal (self, signal, ports, prio = None):
self.log.info("=== sendSignal : " + signal + ", " + str(ports))
sendMessage = ["0.0.1", signal]
targets = []
if type(ports) == list:
for port in ports:
targets.append(["localhost:" + port, prio])
else:
targets.append(["localhost:" + ports, prio])
targets = cPickle.dumps(targets)
sendMessage.append(targets)
self.comSocket.send_multipart(sendMessage)
receivedMessage = self.comSocket.recv()
self.log.info("=== Responce : " + receivedMessage )
def run (self):
try:
while True:
recv_message = self.fixedRecvSocket.recv_multipart()
self.log.info("=== received fixed: " + str(cPickle.loads(recv_message[0])))
recv_message = self.receivingSocket.recv_multipart()
self.log.info("=== received: " + str(cPickle.loads(recv_message[0])))
recv_message = self.receivingSocket2.recv_multipart()
self.log.info("=== received 2: " + str(cPickle.loads(recv_message[0])))
except KeyboardInterrupt:
pass
def __exit__ (self):
self.receivingSocket.close(0)
self.receivingSocket2.close(0)
context.destroy()
if __name__ == '__main__':
freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows
test = False
if test:
import time
from shutil import copyfile
from subprocess import call
logfile = BASE_PATH + os.sep + "logs" + os.sep + "dataManager_test.log"
logsize = 10485760
logQueue = Queue(-1)
# Get the log Configuration for the lisener
h1, h2 = helpers.getLogHandlers(logfile, logsize, verbose=True, onScreenLogLevel="debug")
# Start queue listener using the stream handler above
logQueueListener = helpers.CustomQueueListener(logQueue, h1, h2)
logQueueListener.start()
# Create log and set handler to queue handle
root = logging.getLogger()
root.setLevel(logging.DEBUG) # Log level = DEBUG
qh = QueueHandler(logQueue)
root.addHandler(qh)
comPort = "50000"
fixedRecvPort = "50100"
receivingPort = "50101"
receivingPort2 = "50102"
testPr = Process ( target = Test_Receiver_Stream, args = (comPort, fixedRecvPort, receivingPort, receivingPort2, logQueue))
testPr.start()
logging.debug("test receiver started")
sourceFile = BASE_PATH + os.sep + "test_file.cbf"
targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep
try:
sender = DataManager(logQueue)
except:
sender = None
if sender:
time.sleep(0.5)
i = 100
try:
while i <= 105:
targetFile = targetFileBase + str(i) + ".cbf"
logging.debug("copy to " + targetFile)
copyfile(sourceFile, targetFile)
i += 1
time.sleep(1)
except Exception as e:
logging.error("Exception detected: " + str(e), exc_info=True)
finally:
time.sleep(3)
testPr.terminate()
for number in range(100, i):
targetFile = targetFileBase + str(number) + ".cbf"
try:
os.remove(targetFile)
logging.debug("remove " + targetFile)
except:
pass
sender.stop()
logQueue.put_nowait(None)
logQueueListener.stop()
else:
sender = DataManager()
try:
while True:
pass
except:
pass
finally:
sender.stop()