Commit 6d8e20c6 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added cleanup thread to InotifyxDetector

parent fe92da8c
......@@ -168,11 +168,10 @@ class TaskProvider():
break
else:
self.log.error("Invalid fileEvent message received.", exc_info=True)
continue
workloadList = []
except:
self.log.error("Invalid fileEvent message received.", exc_info=True)
#skip all further instructions and continue with next iteration
continue
workloadList = []
#TODO validate workload dict
for workload in workloadList:
......
......@@ -7,6 +7,9 @@ from inotifyx import binding
from inotifyx.distinfo import version as __version__
import sys
import collections
import threading
import time
import copy
try:
......@@ -22,7 +25,8 @@ del SHARED_PATH
from logutils.queue import QueueHandler
constants = {}
constants = {}
fileEventList = []
for name in dir(binding):
if name.startswith('IN_'):
......@@ -87,10 +91,142 @@ class InotifyEvent (object):
return '0'
def getEventMessage (path, filename, paths):
parentDir = path
relativePath = ""
eventMessage = {}
# traverse the relative path till the original path is reached
# e.g. created file: /source/dir1/dir2/test.tif
while True:
if parentDir not in paths:
(parentDir,relDir) = os.path.split(parentDir)
# the os.sep is needed at the beginning because the relative path is built up from the right
# e.g.
# self.paths = ["/tmp/test/source"]
# path = /tmp/test/source/local/testdir
# first iteration: parentDir = /tmp/test/source/local, relDir = /testdir
# second iteration: parentDir = /tmp/test/source, relDir = /local/testdir
relativePath = os.sep + relDir + relativePath
else:
# remove beginning "/"
if relativePath.startswith(os.sep):
relativePath = os.path.normpath(relativePath[1:])
else:
relativePath = os.path.normpath(relativePath)
# the event for a file /tmp/test/source/local/file1.tif is of the form:
# {
# "sourcePath" : "/tmp/test/source"
# "relativePath": "local"
# "filename" : "file1.tif"
# }
eventMessage = {
"sourcePath" : parentDir,
"relativePath": relativePath,
"filename" : filename
}
return eventMessage
class CleanUp (threading.Thread):
def __init__ (self, paths, monSubdirs, monSuffixes, cleanUpTime, actionTime, lock, logQueue):
self.log = self.getLogger(logQueue)
self.log.debug("init")
self.paths = paths
self.monSubdirs = monSubdirs
self.monSuffixes = monSuffixes
self.cleanUpTime = cleanUpTime
self.actionTime = actionTime
self.lock = lock
self.log.debug("threading.Thread init")
threading.Thread.__init__(self)
# Send all logs to the main process
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def getLogger (self, queue):
# Create log and set handler to queue handle
h = QueueHandler(queue) # Just the one handler needed
logger = logging.getLogger("CleanUp")
logger.propagate = False
logger.addHandler(h)
logger.setLevel(logging.DEBUG)
return logger
def run (self):
global fileEventList
dirsToWalk = [os.path.normpath(self.paths[0] + os.sep + directory) for directory in self.monSubdirs]
while True:
try:
result = []
for dirname in dirsToWalk:
result += self.traverseDirectory(dirname)
self.lock.acquire()
fileEventList += result
self.lock.release()
# self.log.debug("fileEventList: " + str(fileEventList))
time.sleep(self.actionTime)
except:
self.log.error("Stopping loop due to error", exc_info=True)
self.lock.release()
break
def traverseDirectory(self, dirname):
eventList = []
for root, directories, files in os.walk(dirname):
for filename in files:
if not filename.endswith(self.monSuffixes):
# self.log.debug("File ending not in monitored Suffixes: " + str(filename))
continue
filepath = os.path.join(root, filename)
self.log.debug("filepath: " + filepath)
try:
timeLastModified = os.stat(filepath).st_mtime
except:
self.log.error("Unable to get modification time for file: " + filepath, exc_info=True)
continue
try:
# get current time
timeCurrent = time.time()
except:
self.log.error("Unable to get current time for file: " + filepath, exc_info=True)
continue
if timeCurrent - timeLastModified >= self.cleanUpTime:
self.log.debug("New closed file detected: " + str(filepath))
eventMessage = getEventMessage(root, filename, self.paths)
self.log.debug("eventMessage: " + str(eventMessage))
# add to result list
eventList.append(eventMessage)
return eventList
# Modification of the inotifyx example found inside inotifyx library
# Copyright (c) 2005 Manuel Amador
# Copyright (c) 2009-2011 Forest Bond
#class InotifyxDetector():
class EventDetector():
def __init__ (self, config, logQueue):
......@@ -99,8 +235,8 @@ class EventDetector():
# check format of config
if ( not config.has_key("monDir") or
not config.has_key("monEventType") or
not config.has_key("monSubdirs") or
not config.has_key("monEventType") or
not config.has_key("monSuffixes") or
not config.has_key("timeout") or
not config.has_key("historySize") ):
......@@ -118,16 +254,24 @@ class EventDetector():
#TODO why is this necessary
self.paths = [ config["monDir"] ]
self.monSubdirs = config["monSubdirs"]
self.monEventType = config["monEventType"]
self.monSuffixes = tuple(config["monSuffixes"])
self.monSubdirs = config["monSubdirs"]
self.timeout = config["timeout"]
self.history = collections.deque(maxlen=config["historySize"])
self.cleanUpTime = 5
self.cleanUpTime = 120
self.lock = threading.Lock()
self.add_watch()
self.cleanupThread = CleanUp(self.paths, self.monSubdirs, self.monSuffixes, self.cleanUpTime, self.actionTime, self.lock, logQueue)
self.cleanupThread.start()
def get_events (self, fd, *args):
'''
......@@ -188,18 +332,30 @@ class EventDetector():
monitoredDirs.append(root)
self.log.info("Add directory to monitor: " + str(root))
else:
self.log.info("Dir does not exists: " + str(directory))
self.log.info("Dir does not exist: " + str(directory))
return monitoredDirs
def getNewEvent (self):
global fileEventList
try:
self.lock.acquire()
# get missed files
eventMessageList = copy.deepcopy(fileEventList)
fileEventList = []
finally:
self.lock.release()
if eventMessageList:
self.log.info("Added missed files: " + str(eventMessageList))
eventMessageList = []
eventMessage = {}
events = self.get_events(self.fd, self.timeout)
removedWd = None
for event in events:
if not event.name:
......@@ -263,7 +419,7 @@ class EventDetector():
self.log.debug("File ending not in monitored Suffixes: " + str(filename))
self.log.debug("detected events were: " + str(parts))
continue
eventMessage = self.getEventMessage(path, filename)
eventMessage = self.getEventMessage(path, filename, self.paths)
self.log.debug("eventMessage: " + str(eventMessage))
eventMessageList.append(eventMessage)
# self.log.debug("eventMessageList: " + str(eventMessageList))
......@@ -280,6 +436,7 @@ class EventDetector():
for watch, watchPath in self.wd_to_path.iteritems():
if watchPath == dirname:
foundWatch = watch
break
binding.rm_watch(self.fd, foundWatch)
self.log.info("Removed directory from watch:" + str(dirname))
# the IN_MOVE_FROM event always apears before the IN_MOVE_TO (+ additional) events
......@@ -304,7 +461,7 @@ class EventDetector():
continue
eventMessage = self.getEventMessage(path, event.name)
eventMessage = getEventMessage(path, event.name, self.paths)
self.log.debug("eventMessage" + str(eventMessage))
eventMessageList.append(eventMessage)
......@@ -313,49 +470,6 @@ class EventDetector():
return eventMessageList
def getEventMessage (self, path, filename):
parentDir = path
relativePath = ""
eventMessage = {}
# traverse the relative path till the original path is reached
# e.g. created file: /source/dir1/dir2/test.tif
while True:
if parentDir not in self.paths:
(parentDir,relDir) = os.path.split(parentDir)
# the os.sep is needed at the beginning because the relative path is built up from the right
# e.g.
# self.paths = ["/tmp/test/source"]
# path = /tmp/test/source/local/testdir
# first iteration: self.monEventType parentDir = /tmp/test/source/local, relDir = /testdir
# second iteration: parentDir = /tmp/test/source, relDir = /local/testdir
relativePath = os.sep + relDir + relativePath
else:
# the event for a file /tmp/test/source/local/file1.tif is of the form:
# {
# "sourcePath" : "/tmp/test/source"
# "relativePath": "/local"
# "filename" : "file1.tif"
# }
# remove beginning
if relativePath.startswith(os.sep):
relativePath = os.path.normpath(relativePath[1:])
else:
relativePath = os.path.normpath(relativePath)
eventMessage = {
"sourcePath" : parentDir,
"relativePath": relativePath,
"filename" : filename
}
return eventMessage
def stop (self):
try:
for wd in self.wd_to_path:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment