Skip to content
Snippets Groups Projects
TaskHandler.py 9.53 KiB
Newer Older
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'


import argparse
import zmq
import os
import logging
import sys
import json
import trace


#
#  --------------------------  class: TaskHandler  --------------------------------------
#

class TaskHandler():
    def __init__ (self, eventDetectorConfig, requestFwPort, distrPort, context = None):
        #eventDetectorConfig = {
        #        configType: ... ,
        #        watchDir : ... ,
        #        monEventType : ... ,
        #        monDefaultSubdirs : ... ,
        #        monSuffixes : ... ,
        #}

        self.log               = self.getLogger()
        self.log.debug("TaskHandler: __init__()")

        self.eventDetector     = None
        self.watchDir          = None
        self.monEventType      = "IN_CLOSE_WRITE"
        self.monDefaultSubdirs = ["commissioning", "current", "local"]
        self.monSuffixes       = [".tif", ".cbf"]

        self.config = eventDetectorConfig

        self.localhost         = "127.0.0.1"
        self.extIp             = "0.0.0.0"
        self.requestFwPort     = requestFwPort
        self.distrPort         = distrPort
        self.requestFwSocket   = None
        self.distrSocket       = None

        self.log.debug("Registering ZMQ context")
        # remember if the context was created outside this class or not
        if context:
            self.context    = context
            self.extContext = True
        else:
            self.context    = zmq.Context()
            self.extContext = False


        if self.config["configType"] == "inotify":

            from InotifyDetector import InotifyDetector as EventDetector

            self.watchDir          = os.path.normpath(watchDir)

            self.monEventType      = self.config["monEventType"] or None
            self.log.info ("Monitored event type is: " + str(self.monEventType))

            self.monDefaultSubdirs = self.config["monDefaultSubdirs"] or None
            self.monSuffixes       = self.config["monSuffixes"] or None
            self.log.info ("Monitored suffixes are: " + str(self.monSuffixes))

            monDirs                = [self.config["self.watchDir"]]
            #TODO forward self.config instead of seperate variables
            self.eventDetector     = EventDetector(monDirs, self.monEventType, self.monDefaultSubdirs, self.monSuffixes)
        else:
            self.log.error("Type of event detector is not supported: " + str( self.config["configType"] ))
            return -1

        self.createSockets()

        try:
            self.run()
        except KeyboardInterrupt:
            self.log.debug("Keyboard interruption detected. Shuting down")
        except:
            trace = traceback.format_exc()
            self.log.info("Stopping TaskHandler due to unknown error condition.")
            self.log.debug("Error was: " + str(trace))


    def getLogger (self):
        logger = logging.getLogger("TaskHandler")
        return logger


    def createSockets (self):
        # socket to get requests
        self.requestFwSocket = self.context.socket(zmq.REQ)
        connectionStr  = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.requestFwPort )
        try:
            self.requestFwSocket.connect(connectionStr)
            self.log.debug("Connecting to requestFwSocket (connect): " + str(connectionStr))
        except Exception as e:
            self.log.error("Failed to start requestFwSocket (connect): '" + connectionStr + "'")
            self.log.debug("Error was:" + str(e))

        # socket to disribute the events to the worker
        self.distrSocket = self.context.socket(zmq.PUSH)
        connectionStr  = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.requestFwPort )
        try:
            self.distrSocket.bind(connectionStr)
            self.log.debug("Connecting to distributing socket (bind): " + str(connectionStr))
        except Exception as e:
            self.log.error("Failed to start distributing Socket (bind): '" + connectionStr + "'")
            self.log.debug("Error was:" + str(e))


    def run (self):
        while True:
            try:
                # the event for a file /tmp/test/source/local/file1.tif is of the form:
                # {
                #   "sourcePath" : "/tmp/test/source/"
                #   "relativePath": "local"
                #   "filename"   : "file1.tif"
                # }
                workloadList = self.eventDetector.getNewEvent()
            except Exception, e:
                self.log.error("Invalid fileEvent message received.")
                self.log.debug("Error was: " + str(e))
                #skip all further instructions and continue with next iteration
                continue

            #TODO validate workload dict
            for workload in workloadList:
                # get requests for this event
                try:
                    self.log.debug("Get requests...")
                    self.requestFwSocket.send("")
                    requests = self.requestFwSocket.recv_multipart()
                    self.log.debug("Get requests... done.")
                    self.log.debug("Requests: " + str(requests))
                except:
                    self.log.error("Get Requests... failed.")
                    trace = traceback.format_exc()
                    self.log.debug("Error was: " + str(trace))


                # build message dict
                try:
                    self.log.debug("Building message dict...")
                    messageDict = json.dumps(workload)  #sets correct escape characters
                    self.log.debug("Building message dict...done.")
                except Exception, e:
                    self.log.error("Unable to assemble message dict.")
                    self.log.debug("Error was: " + stri(e))
                    continue

                # send the file to the fileMover
                try:
                    self.log.debug("Sending message...")
                    self.log.debug(str(messageDict))
                    self.distrSocket.send_multipart([messageDict, requests])
                    self.log.debug("Sending message...done.")
                except Exception, e:
                    self.log.error("Sending message...failed.")
                    self.log.debug("Error was: " + str(e))



    def stop(self):
        if self.distrSocket:
            self.distrSocket.close(0)
            self.distrSocket = None
        if self.requestFwSocket:
            self.requestFwSocket.close(0)
            self.requestFwSocket = None
        if not self.extContext and if self.context:
            self.context.destroy()
            self.context = None


    def __exit__(self):
        self.stop()


    def __del__(self):
        self.stop()



if __name__ == '__main__':

    def argumentParsing():

        parser = argparse.ArgumentParser()
        parser.add_argument("--watchDir"  , type=str, help="directory you want to monitor for changes")
        parser.add_argument("--staticNotification",
                            help="disables new file-events. just sends a list of currently available files within the defined 'watchDir'.",
                            action="store_true")
        parser.add_argument("--logfilePath"  , type=str, help="path where logfile will be created"              , default="/tmp/log/")
        parser.add_argument("--logfileName"  , type=str, help="filename used for logging"                       , default="watchDir.log")
        parser.add_argument("--fileEventIp"  , type=str, help="zqm endpoint (IP-address) to send file events to", default="127.0.0.1")
        parser.add_argument("--fileEventPort", type=str, help="zqm endpoint (port) to send file events to"      , default="6060")
        parser.add_argument("--verbose"      ,           help="more verbose output", action="store_true")

        arguments = parser.parse_args()

        # TODO: check watchDir-directory for existance

        watchDir = str(arguments.watchDir)
        assert isinstance(type(watchDir), type(str))

        #exit with error if no watchDir path was provided
        if watchDir in [ None, "", "None" ]:
            print """You need to set the following option:
    --watchDir {DIRECTORY}
    """
            sys.exit(1)


        #abort if watchDir does not exist
        helperScript.checkDirExistance(watchDir)


        #error if logfile cannot be written
        try:
            fullPath = os.path.join(arguments.logfilePath, arguments.logfileName)
            logFile = open(fullPath, "a")
        except:
            print "Unable to create the logfile """ + str(fullPath)
            print """Please specify a new target by setting the following arguments:
    --logfileName
    --logfilePath
    """
            sys.exit(1)

        #check logfile-path for existance
        helperScript.checkDirExistance(arguments.logfilePath)


        return arguments




    BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
    SRC_PATH  = BASE_PATH + os.sep + "src"

    sys.path.append ( SRC_PATH )

    import shared.helperScript as helperScript

    arguments       = argumentParsing()
    watchDir        = arguments.watchDir
    verbose         = arguments.verbose
    logfileFilePath = os.path.join(arguments.logfilePath, arguments.logfileName)

    fileEventIp     = str(arguments.fileEventIp)
    fileEventPort   = str(arguments.fileEventPort)


    #enable logging
    helperScript.initLogging(logfileFilePath, verbose)


    #run only once, skipping file events
    #just get a list of all files in watchDir and pass to zeromq
    directoryWatcher = DirectoryWatcher(fileEventIp, watchDir, fileEventPort)