Skip to content
Snippets Groups Projects
watcher_lsyncd.py 12.2 KiB
Newer Older
__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <manuela.kuhn@desy.de>'


import time
import argparse
import zmq
import os
import logging
import sys
import json
import helperScript



# class MyHandler(PatternMatchingEventHandler):
class DirectoryWatcherHandler():
    patterns            = ["*"]
    zmqContext          = None
    messageSocket       = None    # strings only, control plane
    fileEventServerIp   = None
    fileEventServerPort = None
    watchFolder         = None


    def __init__(self, zmqContext, fileEventServerIp, watchFolder, fileEventServerPort):
        logging.debug("DirectoryWatcherHandler: __init__()")
        # logging.debug("DirectoryWatcherHandler(): type(zmqContext) = " + str(type(zmqContext)))
        logging.info("registering zmq global context")
        self.globalZmqContext    = zmqContext
        self.watchFolder         = os.path.normpath(watchFolder)
        self.fileEventServerIp   = fileEventServerIp
        self.fileEventServerPort = fileEventServerPort

        #create zmq sockets
        self.messageSocket = self.createPushSocket(self.globalZmqContext, fileEventServerPort)


    def getZmqSocket_Push(self, context):
        pattern = zmq.PUSH
        assert isinstance(context, zmq.sugar.context.Context)
        socket = context.socket(pattern)

        return socket


    def createPushSocket(self, context, fileEventServerPort):

        assert isinstance(context, zmq.sugar.context.Context)

        socket = self.getZmqSocket_Push(context)

        zmqSocketStr = 'tcp://' + self.fileEventServerIp + ':' + str(fileEventServerPort)
        logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr))
        socket.connect(zmqSocketStr)

        return socket


    def passFileToZeromq(self, filepath):
        """
        :param rootDirectorty: where to start traversing. including subdirs.
        :return:
        """

        try:
            self.sendFilesystemEventToMessagePipe(filepath, self.messageSocket)
        except Exception, e:
            logging.error("Unable to process file '" + str(filepath) + "'")
            logging.warning("Skip file '" + str(filepath) + "'. Reason was: " + str(e))


    def sendFilesystemEventToMessagePipe(self, filepath, targetSocket):
        '''
        Taking the filename, creating a buffer and then
        sending the data as multipart message to the targetSocket.

        For testing currently the multipart message consists of only one message.

        :param filepath:     Pointing to the data which is going to be send
        :param targetSocket: Where to send the data to
        :return:
        '''


        #extract relative pathname and filename for the file.
        try:
            # why?
            # The receiver might need to save the file at a different
            # path than it was created in at the senders side.
            #
            # example:
            # source_filepath    = /tmp/inotify/source/dir3/2.txt
            # source_watchFolder = /tmp/inotify/source
            # relative basepath  = dir3/2.txt
            # relative parent    = dir3
            # target_root        = /storage/raw/
            # target_filepath    = /storage/raw/dir3/2.txt
            logging.debug("Building relative path names...")
            filepathNormalised   = os.path.normpath(filepath)
            (parentDir,filename) = os.path.split(filepathNormalised)
            commonPrefix         = os.path.commonprefix([self.watchFolder,filepathNormalised])
            relativeBasepath     = os.path.relpath(filepathNormalised, commonPrefix)
            (relativeParent, blub) = os.path.split(relativeBasepath)
            logging.debug("Common prefix     : " + str(commonPrefix))
            logging.debug("Relative basepath : " + str(relativeBasepath))
            logging.debug("Relative parent   : " + str(relativeParent))
            logging.debug("Building relative path names...done.")
        except Exception, e:
            errorMessage = "Unable to generate relative path names."
            logging.error(errorMessage)
            logging.debug("Error was: " + str(e))
            logging.debug("Building relative path names...failed.")
            raise Exception(e)


        #build message dict
        try:
            logging.debug("Building message dict...")
            messageDict = { "filename"      : filename,
                            "sourcePath"    : parentDir,
                            "relativeParent": relativeParent}

            messageDictJson = json.dumps(messageDict)  #sets correct escape characters
            logging.debug("Building message dict...done.")
        except Exception, e:
            errorMessage = "Unable to assemble message dict."
            logging.error(errorMessage)
            logging.debug("Error was: " + str(e))
            logging.debug("Building message dict...failed.")
            raise Exception(e)


        #send message
        try:
            logging.info("Sending message...")
            logging.debug(str(messageDictJson))
            targetSocket.send(messageDictJson)
            logging.info("Sending message...done.")
        except Exception, e:
            logging.error("Sending message...failed.")
            logging.debug("Error was: " + str(e))
            raise Exception(e)



def getDefaultConfig_logfilePath():
    defaultConfig = getDefaultConfig()

    if helperScript.isWindows():
        osName = "Windows"
    elif helperScript.isLinux():
        osName = "Linux"
    else:
        return ""

    returnValue = defaultConfig["logfilePath"][osName]
    return returnValue


def getDefaultConfig_logfileName():
    defaultConfig = getDefaultConfig()

    if helperScript.isWindows():
        osName = "Windows"
    elif helperScript.isLinux():
        osName = "Linux"
    else:
        return ""

    returnValue = defaultConfig["logfileName"][osName]
    return returnValue


def getDefaultConfig_pushServerPort():
    defaultConfig = getDefaultConfig()

    if helperScript.isWindows():
        osName = "Windows"
    elif helperScript.isLinux():
        osName = "Linux"
    else:
        return ""

    returnValue = defaultConfig["pushServerPort"][osName]
    return returnValue


def getDefaultConfig_pushServerIp():
    defaultConfig = getDefaultConfig()

    if helperScript.isWindows():
        osName = "Windows"
    elif helperScript.isLinux():
        osName = "Linux"
    else:
        return ""

    returnValue = defaultConfig["pushServerIp"][osName]
    return returnValue


def getDefaultConfig():
    defaultConfigDict = {
                            "logfilePath"    : { "Windows" : "C:\\",
                                                 "Linux"   : "/tmp/log/"
                                               },
                            "logfileName"    : { "Windows" : "watchFolder.log",
                                                 "Linux"   : "watchFolder.log"
                                               },
                            "pushServerPort" : { "Windows": "6060",
                                                 "Linux" : "6060"
                                               },
                            "pushServerIp"   : { "Windows": "127.0.0.1",
                                                 "Linux" : "127.0.0.1"
                                               },
                        }
    return defaultConfigDict



def argumentParsing():


    parser = argparse.ArgumentParser()
    parser.add_argument("--watchFolder" , type=str, help="folder you want to monitor for changes")
    parser.add_argument("--staticNotification",
                        help="disables new file-events. just sends a list of currently available files within the defined 'watchFolder'.",
                        action="store_true")
    parser.add_argument("--logfilePath" , type=str, help="path where logfile will be created", default=getDefaultConfig_logfilePath())
    parser.add_argument("--logfileName" , type=str, help="filename used for logging", default=getDefaultConfig_logfileName())
    parser.add_argument("--pushServerIp", type=str, help="zqm endpoint (IP-address) to send file events to", default=getDefaultConfig_pushServerIp())
    parser.add_argument("--pushServerPort", type=str, help="zqm endpoint (port) to send file events to", default=getDefaultConfig_pushServerPort())
    parser.add_argument("--verbose"     ,           help="more verbose output", action="store_true")

    arguments = parser.parse_args()

    # TODO: check watchFolder-directory for existance

    watchFolder = str(arguments.watchFolder)
    assert isinstance(type(watchFolder), type(str))


    #exit with error if now watchFolder path was provided
    if (watchFolder == None) or (watchFolder == "") or (watchFolder == "None"):
        print """You need to set the following option:
--watchFolder {FOLDER}
"""
        sys.exit(1)

    #error if logfile cannot be written
    try:
        fullPath = os.path.join(arguments.logfilePath, arguments.logfileName)
        logFile = open(fullPath, "a")
    except:
        print "Unable to create the logfile """ + str(fullPath)
        print """Please specify a new target by setting the following arguments:
--logfileName
--logfilePath
"""
        sys.exit(1)

    #check logfile-path for existance
    checkLogfileFolder(arguments.logfilePath)


    return arguments




def checkWatchFolder(watchFolderPath):
    """
    abort if watch-folder does not exist

    :return:
    """

    #check folder path for existance. exits if it does not exist
    if not os.path.exists(watchFolderPath):
        logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath))
        sys.exit(1)



def checkLogfileFolder(logfilePath):
    """
    abort if watch-folder does not exist

    :return:
    """

    #check folder path for existance. exits if it does not exist
    if not os.path.exists(logfilePath):
        logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath))
        sys.exit(1)



def initLogging(filenameFullPath, verbose):
    #@see https://docs.python.org/2/howto/logging-cookbook.html

    #more detailed logging if verbose-option has been set
    loggingLevel = logging.INFO
    if verbose:
        loggingLevel = logging.DEBUG

    #log everything to file
    logging.basicConfig(level=loggingLevel,
                        format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s',
                        datefmt='%Y-%m-%d_%H:%M:%S',
                        filename=filenameFullPath,
                        filemode="a")

    #log info to stdout, display messages with different format than the file output
    console = logging.StreamHandler()
    console.setLevel(logging.WARNING)
    formatter = logging.Formatter("%(asctime)s >  %(message)s")
    console.setFormatter(formatter)
    logging.getLogger("").addHandler(console)



if __name__ == '__main__':
    arguments   = argumentParsing()
    watchFolder = arguments.watchFolder
    verbose     = arguments.verbose
    logfileFilePath = os.path.join(arguments.logfilePath, arguments.logfileName)

    fileEventServerIp   = str(arguments.pushServerIp)
    fileEventServerPort = str(arguments.pushServerPort)

    #abort if watch-folder does not exist
    checkWatchFolder(watchFolder)


    #enable logging
    initLogging(logfileFilePath, verbose)


    #create zmq context
    global zmqContext
    zmqContext = zmq.Context()


    #run only once, skipping file events
    #just get a list of all files in watchDir and pass to zeromq
    directoryWatcher = DirectoryWatcherHandler(zmqContext, fileEventServerIp, watchFolder, fileEventServerPort)

    pipe_path = "/tmp/zeromqllpipe"
    if not os.path.exists(pipe_path):
        os.mkfifo(pipe_path)

    # Open the fifo. We need to open in non-blocking mode or it will stalls until
    # someone opens it for writting
    pipe_fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK)


    #wait for new files
    with os.fdopen(pipe_fd) as pipe:
        while True:
            message = pipe.read()
            if message:
#                print("Received: '%s'" % message)
                pathnames = message.splitlines()
                for filepath in pathnames:
                    directoryWatcher.passFileToZeromq(filepath)
            time.sleep(5)


    zmqContext.destroy()