Skip to content
Snippets Groups Projects
Commit 25299638 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Modified watchfolder -> watcher_lsyncd

parent 9d2c6984
No related branches found
No related tags found
No related merge requests found
import os
import platform
def isWindows():
returnValue = False
windowsName = "Windows"
platformName = platform.system()
if platformName == windowsName:
returnValue = True
# osName = os.name
# supportedWindowsNames = ["nt"]
# if osName in supportedWindowsNames:
# returnValue = True
return returnValue
def isLinux():
returnValue = False
linuxName = "Linux"
platformName = platform.system()
if platformName == linuxName:
returnValue = True
return returnValue
def isPosix():
osName = os.name
supportedPosixNames = ["posix"]
returnValue = False
if osName in supportedPosixNames:
returnValue = True
return returnValue
def isSupported():
supportedWindowsReleases = ["7"]
osRelease = platform.release()
supportValue = False
#check windows
if isWindows():
supportValue = True
# if osRelease in supportedWindowsReleases:
# supportValue = True
#check linux
if isLinux():
supportValue = True
return supportValue
__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <manuela.kuhn@desy.de>'
import time
import argparse
import zmq
import os
import logging
import sys
import json
import helperScript
# class MyHandler(PatternMatchingEventHandler):
class DirectoryWatcherHandler():
patterns = ["*"]
zmqContext = None
messageSocket = None # strings only, control plane
fileEventServerIp = None
fileEventServerPort = None
watchFolder = None
def __init__(self, zmqContext, fileEventServerIp, watchFolder, fileEventServerPort):
logging.debug("DirectoryWatcherHandler: __init__()")
# logging.debug("DirectoryWatcherHandler(): type(zmqContext) = " + str(type(zmqContext)))
logging.info("registering zmq global context")
self.globalZmqContext = zmqContext
self.watchFolder = os.path.normpath(watchFolder)
self.fileEventServerIp = fileEventServerIp
self.fileEventServerPort = fileEventServerPort
#create zmq sockets
self.messageSocket = self.createPushSocket(self.globalZmqContext, fileEventServerPort)
def getZmqSocket_Push(self, context):
pattern = zmq.PUSH
assert isinstance(context, zmq.sugar.context.Context)
socket = context.socket(pattern)
return socket
def createPushSocket(self, context, fileEventServerPort):
assert isinstance(context, zmq.sugar.context.Context)
socket = self.getZmqSocket_Push(context)
zmqSocketStr = 'tcp://' + self.fileEventServerIp + ':' + str(fileEventServerPort)
logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr))
socket.connect(zmqSocketStr)
return socket
def passFileToZeromq(self, filepath):
"""
:param rootDirectorty: where to start traversing. including subdirs.
:return:
"""
try:
self.sendFilesystemEventToMessagePipe(filepath, self.messageSocket)
except Exception, e:
logging.error("Unable to process file '" + str(filepath) + "'")
logging.warning("Skip file '" + str(filepath) + "'. Reason was: " + str(e))
def sendFilesystemEventToMessagePipe(self, filepath, targetSocket):
'''
Taking the filename, creating a buffer and then
sending the data as multipart message to the targetSocket.
For testing currently the multipart message consists of only one message.
:param filepath: Pointing to the data which is going to be send
:param targetSocket: Where to send the data to
:return:
'''
#extract relative pathname and filename for the file.
try:
# why?
# The receiver might need to save the file at a different
# path than it was created in at the senders side.
#
# example:
# source_filepath = /tmp/inotify/source/dir3/2.txt
# source_watchFolder = /tmp/inotify/source
# relative basepath = dir3/2.txt
# relative parent = dir3
# target_root = /storage/raw/
# target_filepath = /storage/raw/dir3/2.txt
logging.debug("Building relative path names...")
filepathNormalised = os.path.normpath(filepath)
(parentDir,filename) = os.path.split(filepathNormalised)
commonPrefix = os.path.commonprefix([self.watchFolder,filepathNormalised])
relativeBasepath = os.path.relpath(filepathNormalised, commonPrefix)
(relativeParent, blub) = os.path.split(relativeBasepath)
logging.debug("Common prefix : " + str(commonPrefix))
logging.debug("Relative basepath : " + str(relativeBasepath))
logging.debug("Relative parent : " + str(relativeParent))
logging.debug("Building relative path names...done.")
except Exception, e:
errorMessage = "Unable to generate relative path names."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.debug("Building relative path names...failed.")
raise Exception(e)
#build message dict
try:
logging.debug("Building message dict...")
messageDict = { "filename" : filename,
"sourcePath" : parentDir,
"relativeParent": relativeParent}
messageDictJson = json.dumps(messageDict) #sets correct escape characters
logging.debug("Building message dict...done.")
except Exception, e:
errorMessage = "Unable to assemble message dict."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.debug("Building message dict...failed.")
raise Exception(e)
#send message
try:
logging.info("Sending message...")
logging.debug(str(messageDictJson))
targetSocket.send(messageDictJson)
logging.info("Sending message...done.")
except Exception, e:
logging.error("Sending message...failed.")
logging.debug("Error was: " + str(e))
raise Exception(e)
def getDefaultConfig_logfilePath():
defaultConfig = getDefaultConfig()
if helperScript.isWindows():
osName = "Windows"
elif helperScript.isLinux():
osName = "Linux"
else:
return ""
returnValue = defaultConfig["logfilePath"][osName]
return returnValue
def getDefaultConfig_logfileName():
defaultConfig = getDefaultConfig()
if helperScript.isWindows():
osName = "Windows"
elif helperScript.isLinux():
osName = "Linux"
else:
return ""
returnValue = defaultConfig["logfileName"][osName]
return returnValue
def getDefaultConfig_pushServerPort():
defaultConfig = getDefaultConfig()
if helperScript.isWindows():
osName = "Windows"
elif helperScript.isLinux():
osName = "Linux"
else:
return ""
returnValue = defaultConfig["pushServerPort"][osName]
return returnValue
def getDefaultConfig_pushServerIp():
defaultConfig = getDefaultConfig()
if helperScript.isWindows():
osName = "Windows"
elif helperScript.isLinux():
osName = "Linux"
else:
return ""
returnValue = defaultConfig["pushServerIp"][osName]
return returnValue
def getDefaultConfig():
defaultConfigDict = {
"logfilePath" : { "Windows" : "C:\\",
"Linux" : "/tmp/log/"
},
"logfileName" : { "Windows" : "watchFolder.log",
"Linux" : "watchFolder.log"
},
"pushServerPort" : { "Windows": "6060",
"Linux" : "6060"
},
"pushServerIp" : { "Windows": "127.0.0.1",
"Linux" : "127.0.0.1"
},
}
return defaultConfigDict
def argumentParsing():
parser = argparse.ArgumentParser()
parser.add_argument("--watchFolder" , type=str, help="folder you want to monitor for changes")
parser.add_argument("--staticNotification",
help="disables new file-events. just sends a list of currently available files within the defined 'watchFolder'.",
action="store_true")
parser.add_argument("--logfilePath" , type=str, help="path where logfile will be created", default=getDefaultConfig_logfilePath())
parser.add_argument("--logfileName" , type=str, help="filename used for logging", default=getDefaultConfig_logfileName())
parser.add_argument("--pushServerIp", type=str, help="zqm endpoint (IP-address) to send file events to", default=getDefaultConfig_pushServerIp())
parser.add_argument("--pushServerPort", type=str, help="zqm endpoint (port) to send file events to", default=getDefaultConfig_pushServerPort())
parser.add_argument("--verbose" , help="more verbose output", action="store_true")
arguments = parser.parse_args()
# TODO: check watchFolder-directory for existance
watchFolder = str(arguments.watchFolder)
assert isinstance(type(watchFolder), type(str))
#exit with error if now watchFolder path was provided
if (watchFolder == None) or (watchFolder == "") or (watchFolder == "None"):
print """You need to set the following option:
--watchFolder {FOLDER}
"""
sys.exit(1)
#error if logfile cannot be written
try:
fullPath = os.path.join(arguments.logfilePath, arguments.logfileName)
logFile = open(fullPath, "a")
except:
print "Unable to create the logfile """ + str(fullPath)
print """Please specify a new target by setting the following arguments:
--logfileName
--logfilePath
"""
sys.exit(1)
#check logfile-path for existance
checkLogfileFolder(arguments.logfilePath)
return arguments
def checkWatchFolder(watchFolderPath):
"""
abort if watch-folder does not exist
:return:
"""
#check folder path for existance. exits if it does not exist
if not os.path.exists(watchFolderPath):
logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath))
sys.exit(1)
def checkLogfileFolder(logfilePath):
"""
abort if watch-folder does not exist
:return:
"""
#check folder path for existance. exits if it does not exist
if not os.path.exists(logfilePath):
logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath))
sys.exit(1)
def initLogging(filenameFullPath, verbose):
#@see https://docs.python.org/2/howto/logging-cookbook.html
#more detailed logging if verbose-option has been set
loggingLevel = logging.INFO
if verbose:
loggingLevel = logging.DEBUG
#log everything to file
logging.basicConfig(level=loggingLevel,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=filenameFullPath,
filemode="a")
#log info to stdout, display messages with different format than the file output
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
if __name__ == '__main__':
arguments = argumentParsing()
watchFolder = arguments.watchFolder
verbose = arguments.verbose
logfileFilePath = os.path.join(arguments.logfilePath, arguments.logfileName)
fileEventServerIp = str(arguments.pushServerIp)
fileEventServerPort = str(arguments.pushServerPort)
#abort if watch-folder does not exist
checkWatchFolder(watchFolder)
#enable logging
initLogging(logfileFilePath, verbose)
#create zmq context
global zmqContext
zmqContext = zmq.Context()
#run only once, skipping file events
#just get a list of all files in watchDir and pass to zeromq
directoryWatcher = DirectoryWatcherHandler(zmqContext, fileEventServerIp, watchFolder, fileEventServerPort)
pipe_path = "/tmp/zeromqllpipe"
if not os.path.exists(pipe_path):
os.mkfifo(pipe_path)
# Open the fifo. We need to open in non-blocking mode or it will stalls until
# someone opens it for writting
pipe_fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK)
#wait for new files
with os.fdopen(pipe_fd) as pipe:
while True:
message = pipe.read()
if message:
# print("Received: '%s'" % message)
pathnames = message.splitlines()
for filepath in pathnames:
directoryWatcher.passFileToZeromq(filepath)
time.sleep(5)
zmqContext.destroy()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment