From 25299638ab383408df7c7d331af1ed5135545935 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Tue, 14 Jul 2015 16:18:58 +0200 Subject: [PATCH] Modified watchfolder -> watcher_lsyncd --- ZeroMQTunnel/helperScript.py | 60 ++++++ ZeroMQTunnel/watcher_lsyncd.py | 371 +++++++++++++++++++++++++++++++++ 2 files changed, 431 insertions(+) create mode 100644 ZeroMQTunnel/helperScript.py create mode 100644 ZeroMQTunnel/watcher_lsyncd.py diff --git a/ZeroMQTunnel/helperScript.py b/ZeroMQTunnel/helperScript.py new file mode 100644 index 00000000..631981f5 --- /dev/null +++ b/ZeroMQTunnel/helperScript.py @@ -0,0 +1,60 @@ +import os +import platform + + + +def isWindows(): + returnValue = False + windowsName = "Windows" + platformName = platform.system() + + if platformName == windowsName: + returnValue = True + # osName = os.name + # supportedWindowsNames = ["nt"] + # if osName in supportedWindowsNames: + # returnValue = True + + return returnValue + + +def isLinux(): + returnValue = False + linuxName = "Linux" + platformName = platform.system() + + if platformName == linuxName: + returnValue = True + + return returnValue + + + +def isPosix(): + osName = os.name + supportedPosixNames = ["posix"] + returnValue = False + + if osName in supportedPosixNames: + returnValue = True + + return returnValue + + + +def isSupported(): + supportedWindowsReleases = ["7"] + osRelease = platform.release() + supportValue = False + + #check windows + if isWindows(): + supportValue = True + # if osRelease in supportedWindowsReleases: + # supportValue = True + + #check linux + if isLinux(): + supportValue = True + + return supportValue diff --git a/ZeroMQTunnel/watcher_lsyncd.py b/ZeroMQTunnel/watcher_lsyncd.py new file mode 100644 index 00000000..9e3432c6 --- /dev/null +++ b/ZeroMQTunnel/watcher_lsyncd.py @@ -0,0 +1,371 @@ +__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <manuela.kuhn@desy.de>' + + +import time +import argparse +import zmq +import os +import logging +import sys +import json +import helperScript + + + +# class MyHandler(PatternMatchingEventHandler): +class DirectoryWatcherHandler(): + patterns = ["*"] + zmqContext = None + messageSocket = None # strings only, control plane + fileEventServerIp = None + fileEventServerPort = None + watchFolder = None + + + def __init__(self, zmqContext, fileEventServerIp, watchFolder, fileEventServerPort): + logging.debug("DirectoryWatcherHandler: __init__()") + # logging.debug("DirectoryWatcherHandler(): type(zmqContext) = " + str(type(zmqContext))) + logging.info("registering zmq global context") + self.globalZmqContext = zmqContext + self.watchFolder = os.path.normpath(watchFolder) + self.fileEventServerIp = fileEventServerIp + self.fileEventServerPort = fileEventServerPort + + #create zmq sockets + self.messageSocket = self.createPushSocket(self.globalZmqContext, fileEventServerPort) + + + def getZmqSocket_Push(self, context): + pattern = zmq.PUSH + assert isinstance(context, zmq.sugar.context.Context) + socket = context.socket(pattern) + + return socket + + + def createPushSocket(self, context, fileEventServerPort): + + assert isinstance(context, zmq.sugar.context.Context) + + socket = self.getZmqSocket_Push(context) + + zmqSocketStr = 'tcp://' + self.fileEventServerIp + ':' + str(fileEventServerPort) + logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr)) + socket.connect(zmqSocketStr) + + return socket + + + def passFileToZeromq(self, filepath): + """ + :param rootDirectorty: where to start traversing. including subdirs. + :return: + """ + + try: + self.sendFilesystemEventToMessagePipe(filepath, self.messageSocket) + except Exception, e: + logging.error("Unable to process file '" + str(filepath) + "'") + logging.warning("Skip file '" + str(filepath) + "'. Reason was: " + str(e)) + + + def sendFilesystemEventToMessagePipe(self, filepath, targetSocket): + ''' + Taking the filename, creating a buffer and then + sending the data as multipart message to the targetSocket. + + For testing currently the multipart message consists of only one message. + + :param filepath: Pointing to the data which is going to be send + :param targetSocket: Where to send the data to + :return: + ''' + + + #extract relative pathname and filename for the file. + try: + # why? + # The receiver might need to save the file at a different + # path than it was created in at the senders side. + # + # example: + # source_filepath = /tmp/inotify/source/dir3/2.txt + # source_watchFolder = /tmp/inotify/source + # relative basepath = dir3/2.txt + # relative parent = dir3 + # target_root = /storage/raw/ + # target_filepath = /storage/raw/dir3/2.txt + logging.debug("Building relative path names...") + filepathNormalised = os.path.normpath(filepath) + (parentDir,filename) = os.path.split(filepathNormalised) + commonPrefix = os.path.commonprefix([self.watchFolder,filepathNormalised]) + relativeBasepath = os.path.relpath(filepathNormalised, commonPrefix) + (relativeParent, blub) = os.path.split(relativeBasepath) + logging.debug("Common prefix : " + str(commonPrefix)) + logging.debug("Relative basepath : " + str(relativeBasepath)) + logging.debug("Relative parent : " + str(relativeParent)) + logging.debug("Building relative path names...done.") + except Exception, e: + errorMessage = "Unable to generate relative path names." + logging.error(errorMessage) + logging.debug("Error was: " + str(e)) + logging.debug("Building relative path names...failed.") + raise Exception(e) + + + #build message dict + try: + logging.debug("Building message dict...") + messageDict = { "filename" : filename, + "sourcePath" : parentDir, + "relativeParent": relativeParent} + + messageDictJson = json.dumps(messageDict) #sets correct escape characters + logging.debug("Building message dict...done.") + except Exception, e: + errorMessage = "Unable to assemble message dict." + logging.error(errorMessage) + logging.debug("Error was: " + str(e)) + logging.debug("Building message dict...failed.") + raise Exception(e) + + + #send message + try: + logging.info("Sending message...") + logging.debug(str(messageDictJson)) + targetSocket.send(messageDictJson) + logging.info("Sending message...done.") + except Exception, e: + logging.error("Sending message...failed.") + logging.debug("Error was: " + str(e)) + raise Exception(e) + + + +def getDefaultConfig_logfilePath(): + defaultConfig = getDefaultConfig() + + if helperScript.isWindows(): + osName = "Windows" + elif helperScript.isLinux(): + osName = "Linux" + else: + return "" + + returnValue = defaultConfig["logfilePath"][osName] + return returnValue + + +def getDefaultConfig_logfileName(): + defaultConfig = getDefaultConfig() + + if helperScript.isWindows(): + osName = "Windows" + elif helperScript.isLinux(): + osName = "Linux" + else: + return "" + + returnValue = defaultConfig["logfileName"][osName] + return returnValue + + +def getDefaultConfig_pushServerPort(): + defaultConfig = getDefaultConfig() + + if helperScript.isWindows(): + osName = "Windows" + elif helperScript.isLinux(): + osName = "Linux" + else: + return "" + + returnValue = defaultConfig["pushServerPort"][osName] + return returnValue + + +def getDefaultConfig_pushServerIp(): + defaultConfig = getDefaultConfig() + + if helperScript.isWindows(): + osName = "Windows" + elif helperScript.isLinux(): + osName = "Linux" + else: + return "" + + returnValue = defaultConfig["pushServerIp"][osName] + return returnValue + + +def getDefaultConfig(): + defaultConfigDict = { + "logfilePath" : { "Windows" : "C:\\", + "Linux" : "/tmp/log/" + }, + "logfileName" : { "Windows" : "watchFolder.log", + "Linux" : "watchFolder.log" + }, + "pushServerPort" : { "Windows": "6060", + "Linux" : "6060" + }, + "pushServerIp" : { "Windows": "127.0.0.1", + "Linux" : "127.0.0.1" + }, + } + return defaultConfigDict + + + +def argumentParsing(): + + + parser = argparse.ArgumentParser() + parser.add_argument("--watchFolder" , type=str, help="folder you want to monitor for changes") + parser.add_argument("--staticNotification", + help="disables new file-events. just sends a list of currently available files within the defined 'watchFolder'.", + action="store_true") + parser.add_argument("--logfilePath" , type=str, help="path where logfile will be created", default=getDefaultConfig_logfilePath()) + parser.add_argument("--logfileName" , type=str, help="filename used for logging", default=getDefaultConfig_logfileName()) + parser.add_argument("--pushServerIp", type=str, help="zqm endpoint (IP-address) to send file events to", default=getDefaultConfig_pushServerIp()) + parser.add_argument("--pushServerPort", type=str, help="zqm endpoint (port) to send file events to", default=getDefaultConfig_pushServerPort()) + parser.add_argument("--verbose" , help="more verbose output", action="store_true") + + arguments = parser.parse_args() + + # TODO: check watchFolder-directory for existance + + watchFolder = str(arguments.watchFolder) + assert isinstance(type(watchFolder), type(str)) + + + #exit with error if now watchFolder path was provided + if (watchFolder == None) or (watchFolder == "") or (watchFolder == "None"): + print """You need to set the following option: +--watchFolder {FOLDER} +""" + sys.exit(1) + + #error if logfile cannot be written + try: + fullPath = os.path.join(arguments.logfilePath, arguments.logfileName) + logFile = open(fullPath, "a") + except: + print "Unable to create the logfile """ + str(fullPath) + print """Please specify a new target by setting the following arguments: +--logfileName +--logfilePath +""" + sys.exit(1) + + #check logfile-path for existance + checkLogfileFolder(arguments.logfilePath) + + + return arguments + + + + +def checkWatchFolder(watchFolderPath): + """ + abort if watch-folder does not exist + + :return: + """ + + #check folder path for existance. exits if it does not exist + if not os.path.exists(watchFolderPath): + logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath)) + sys.exit(1) + + + +def checkLogfileFolder(logfilePath): + """ + abort if watch-folder does not exist + + :return: + """ + + #check folder path for existance. exits if it does not exist + if not os.path.exists(logfilePath): + logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath)) + sys.exit(1) + + + +def initLogging(filenameFullPath, verbose): + #@see https://docs.python.org/2/howto/logging-cookbook.html + + #more detailed logging if verbose-option has been set + loggingLevel = logging.INFO + if verbose: + loggingLevel = logging.DEBUG + + #log everything to file + logging.basicConfig(level=loggingLevel, + format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d_%H:%M:%S', + filename=filenameFullPath, + filemode="a") + + #log info to stdout, display messages with different format than the file output + console = logging.StreamHandler() + console.setLevel(logging.WARNING) + formatter = logging.Formatter("%(asctime)s > %(message)s") + console.setFormatter(formatter) + logging.getLogger("").addHandler(console) + + + +if __name__ == '__main__': + arguments = argumentParsing() + watchFolder = arguments.watchFolder + verbose = arguments.verbose + logfileFilePath = os.path.join(arguments.logfilePath, arguments.logfileName) + + fileEventServerIp = str(arguments.pushServerIp) + fileEventServerPort = str(arguments.pushServerPort) + + #abort if watch-folder does not exist + checkWatchFolder(watchFolder) + + + #enable logging + initLogging(logfileFilePath, verbose) + + + #create zmq context + global zmqContext + zmqContext = zmq.Context() + + + #run only once, skipping file events + #just get a list of all files in watchDir and pass to zeromq + directoryWatcher = DirectoryWatcherHandler(zmqContext, fileEventServerIp, watchFolder, fileEventServerPort) + + pipe_path = "/tmp/zeromqllpipe" + if not os.path.exists(pipe_path): + os.mkfifo(pipe_path) + + # Open the fifo. We need to open in non-blocking mode or it will stalls until + # someone opens it for writting + pipe_fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK) + + + #wait for new files + with os.fdopen(pipe_fd) as pipe: + while True: + message = pipe.read() + if message: +# print("Received: '%s'" % message) + pathnames = message.splitlines() + for filepath in pathnames: + directoryWatcher.passFileToZeromq(filepath) + time.sleep(5) + + + zmqContext.destroy() + -- GitLab