Skip to content
Snippets Groups Projects
Commit 0054b216 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

First version of TaskHandler

parent 3b12a8f0
No related branches found
No related tags found
No related merge requests found
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import argparse
import zmq
import os
import logging
import sys
import json
import trace
#
# -------------------------- class: TaskHandler --------------------------------------
#
class TaskHandler():
def __init__ (self, eventDetectorConfig, requestFwPort, distrPort, context = None):
#eventDetectorConfig = {
# configType: ... ,
# watchDir : ... ,
# monEventType : ... ,
# monDefaultSubdirs : ... ,
# monSuffixes : ... ,
#}
self.log = self.getLogger()
self.log.debug("TaskHandler: __init__()")
self.eventDetector = None
self.watchDir = None
self.monEventType = "IN_CLOSE_WRITE"
self.monDefaultSubdirs = ["commissioning", "current", "local"]
self.monSuffixes = [".tif", ".cbf"]
self.config = eventDetectorConfig
self.localhost = "127.0.0.1"
self.extIp = "0.0.0.0"
self.requestFwPort = requestFwPort
self.distrPort = distrPort
self.requestFwSocket = None
self.distrSocket = None
self.log.debug("Registering ZMQ context")
# remember if the context was created outside this class or not
if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
if self.config["configType"] == "inotify":
from InotifyDetector import InotifyDetector as EventDetector
self.watchDir = os.path.normpath(watchDir)
self.monEventType = self.config["monEventType"] or None
self.log.info ("Monitored event type is: " + str(self.monEventType))
self.monDefaultSubdirs = self.config["monDefaultSubdirs"] or None
self.monSuffixes = self.config["monSuffixes"] or None
self.log.info ("Monitored suffixes are: " + str(self.monSuffixes))
monDirs = [self.config["self.watchDir"]]
#TODO forward self.config instead of seperate variables
self.eventDetector = EventDetector(monDirs, self.monEventType, self.monDefaultSubdirs, self.monSuffixes)
else:
self.log.error("Type of event detector is not supported: " + str( self.config["configType"] ))
return -1
self.createSockets()
try:
self.run()
except KeyboardInterrupt:
self.log.debug("Keyboard interruption detected. Shuting down")
except:
trace = traceback.format_exc()
self.log.info("Stopping TaskHandler due to unknown error condition.")
self.log.debug("Error was: " + str(trace))
def getLogger (self):
logger = logging.getLogger("TaskHandler")
return logger
def createSockets (self):
# socket to get requests
self.requestFwSocket = self.context.socket(zmq.REQ)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.requestFwPort )
try:
self.requestFwSocket.connect(connectionStr)
self.log.debug("Connecting to requestFwSocket (connect): " + str(connectionStr))
except Exception as e:
self.log.error("Failed to start requestFwSocket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
# socket to disribute the events to the worker
self.distrSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.requestFwPort )
try:
self.distrSocket.bind(connectionStr)
self.log.debug("Connecting to distributing socket (bind): " + str(connectionStr))
except Exception as e:
self.log.error("Failed to start distributing Socket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
def run (self):
while True:
try:
# the event for a file /tmp/test/source/local/file1.tif is of the form:
# {
# "sourcePath" : "/tmp/test/source/"
# "relativePath": "local"
# "filename" : "file1.tif"
# }
workloadList = self.eventDetector.getNewEvent()
except Exception, e:
self.log.error("Invalid fileEvent message received.")
self.log.debug("Error was: " + str(e))
#skip all further instructions and continue with next iteration
continue
#TODO validate workload dict
for workload in workloadList:
# get requests for this event
try:
self.log.debug("Get requests...")
self.requestFwSocket.send("")
requests = self.requestFwSocket.recv_multipart()
self.log.debug("Get requests... done.")
self.log.debug("Requests: " + str(requests))
except:
self.log.error("Get Requests... failed.")
trace = traceback.format_exc()
self.log.debug("Error was: " + str(trace))
# build message dict
try:
self.log.debug("Building message dict...")
messageDict = json.dumps(workload) #sets correct escape characters
self.log.debug("Building message dict...done.")
except Exception, e:
self.log.error("Unable to assemble message dict.")
self.log.debug("Error was: " + stri(e))
continue
# send the file to the fileMover
try:
self.log.debug("Sending message...")
self.log.debug(str(messageDict))
self.distrSocket.send_multipart([messageDict, requests])
self.log.debug("Sending message...done.")
except Exception, e:
self.log.error("Sending message...failed.")
self.log.debug("Error was: " + str(e))
def stop(self):
if self.distrSocket:
self.distrSocket.close(0)
self.distrSocket = None
if self.requestFwSocket:
self.requestFwSocket.close(0)
self.requestFwSocket = None
if not self.extContext and if self.context:
self.context.destroy()
self.context = None
def __exit__(self):
self.stop()
def __del__(self):
self.stop()
if __name__ == '__main__':
def argumentParsing():
parser = argparse.ArgumentParser()
parser.add_argument("--watchDir" , type=str, help="directory you want to monitor for changes")
parser.add_argument("--staticNotification",
help="disables new file-events. just sends a list of currently available files within the defined 'watchDir'.",
action="store_true")
parser.add_argument("--logfilePath" , type=str, help="path where logfile will be created" , default="/tmp/log/")
parser.add_argument("--logfileName" , type=str, help="filename used for logging" , default="watchDir.log")
parser.add_argument("--fileEventIp" , type=str, help="zqm endpoint (IP-address) to send file events to", default="127.0.0.1")
parser.add_argument("--fileEventPort", type=str, help="zqm endpoint (port) to send file events to" , default="6060")
parser.add_argument("--verbose" , help="more verbose output", action="store_true")
arguments = parser.parse_args()
# TODO: check watchDir-directory for existance
watchDir = str(arguments.watchDir)
assert isinstance(type(watchDir), type(str))
#exit with error if no watchDir path was provided
if watchDir in [ None, "", "None" ]:
print """You need to set the following option:
--watchDir {DIRECTORY}
"""
sys.exit(1)
#abort if watchDir does not exist
helperScript.checkDirExistance(watchDir)
#error if logfile cannot be written
try:
fullPath = os.path.join(arguments.logfilePath, arguments.logfileName)
logFile = open(fullPath, "a")
except:
print "Unable to create the logfile """ + str(fullPath)
print """Please specify a new target by setting the following arguments:
--logfileName
--logfilePath
"""
sys.exit(1)
#check logfile-path for existance
helperScript.checkDirExistance(arguments.logfilePath)
return arguments
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
SRC_PATH = BASE_PATH + os.sep + "src"
sys.path.append ( SRC_PATH )
import shared.helperScript as helperScript
arguments = argumentParsing()
watchDir = arguments.watchDir
verbose = arguments.verbose
logfileFilePath = os.path.join(arguments.logfilePath, arguments.logfileName)
fileEventIp = str(arguments.fileEventIp)
fileEventPort = str(arguments.fileEventPort)
#enable logging
helperScript.initLogging(logfileFilePath, verbose)
#run only once, skipping file events
#just get a list of all files in watchDir and pass to zeromq
directoryWatcher = DirectoryWatcher(fileEventIp, watchDir, fileEventPort)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment