Skip to content
Snippets Groups Projects
Commit 59861e28 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed watchdogDetector

parent 6981d6aa
No related branches found
No related tags found
No related merge requests found
......@@ -43,10 +43,11 @@ def argumentParsing():
monitoredEventType = config.get('asection', 'monitoredEventType')
monitoredSubdirs = json.loads(config.get('asection', 'monitoredSubdirs'))
monitoredFormats = json.loads(config.get('asection', 'monitoredFormats'))
timeTillClosed = int(config.get('asection', 'timeTillClosed'))
useDataStream = config.getboolean('asection', 'useDataStream')
fixedStreamHost = config.get('asection', 'fixedStreamHost')
fixedStreamPort = config.get('asection', 'fixedStreamPort')
fixedStreamHost = config.get('asection', 'fixedStreamHost')
fixedStreamPort = config.get('asection', 'fixedStreamPort')
parallelDataStreams = config.get('asection', 'parallelDataStreams')
chunkSize = int(config.get('asection', 'chunkSize'))
......@@ -101,6 +102,10 @@ def argumentParsing():
help = "The formats to be monitored, files in an other format will be be neglected \
(default=" + str(monitoredFormats) + ")",
default = monitoredFormats )
parser.add_argument("--timeTillClosed" , type = str,
help = "Time (in seconds) since last modification after which a file will be seen as closed \
(default=" + str(timeTillClosed) + ")",
default = timeTillClosed )
parser.add_argument("--useDataStream" , type = str,
help = "Enable ZMQ pipe into storage system (if set to false: the file is moved \
......@@ -142,7 +147,7 @@ def argumentParsing():
verbose = arguments.verbose
onScreen = arguments.onScreen
eventDetectorType = arguments.eventDetectorType
eventDetectorType = arguments.eventDetectorType.lower()
supportedEDTypes = ["inotifyx"]
monitoredDir = str(arguments.monitoredDir)
monitoredSubdirs = arguments.monitoredSubdirs
......@@ -150,20 +155,22 @@ def argumentParsing():
parallelDataStreams = arguments.parallelDataStreams
# check if logfile is writable
helperScript.checkLogFileWritable(logfilePath, logfileName)
#enable logging
helperScript.initLogging(logfileFullPath, verbose, onScreen)
# check if the eventDetectorType is supported
helperScript.checkEventDetectorType(eventDetectorType, supportedEDTypes)
# check if directories exists
helperScript.checkDirExistance(logfilePath)
helperScript.checkDirExistance(monitoredDir)
helperScript.checkSubDirExistance(monitoredDir, monitoredSubdirs)
helperScript.checkDirExistance(localTarget)
helperScript.checkEventDetectorType(eventDetectorType, supportedEDTypes)
# check if logfile is writable
helperScript.checkLogFileWritable(logfilePath, logfileName)
return arguments
......@@ -177,13 +184,23 @@ class Sender():
self.requestPort = arguments.requestPort
self.requestFwPort = arguments.requestFwPort
self.eventDetectorConfig = {
"eventDetectorType" : arguments.eventDetectorType,
"monDir" : arguments.monitoredDir,
"monEventType" : arguments.monitoredEventType,
"monSubdirs" : arguments.monitoredSubdirs,
"monSuffixes" : arguments.monitoredFormats
}
if arguments.eventDetectorType == "inotifyx":
self.eventDetectorConfig = {
"eventDetectorType" : arguments.eventDetectorType,
"monDir" : arguments.monitoredDir,
"monEventType" : arguments.monitoredEventType,
"monSubdirs" : arguments.monitoredSubdirs,
"monSuffixes" : arguments.monitoredFormats
}
elif arguments.eventDetectorType == "watchdog":
self.eventDetectorConfig = {
"eventDetectorType" : arguments.eventDetectorType,
"monDir" : arguments.monitoredDir,
"monEventType" : arguments.monitoredEventType,
"monSubdirs" : arguments.monitoredSubdirs,
"monSuffixes" : arguments.monitoredFormats,
"timeTillClosed" : arguments.timeTillClosed
}
if arguments.useDataStream:
self.fixedStreamId = "{host}:{port}".format( host=arguments.fixedStreamHost, port=arguments.fixedStreamPort )
......
......@@ -164,19 +164,19 @@ def splitFilePath(filepath, paths):
"relativePath": relativePath,
"filename" : filename
}
print "eventMessage", eventMessage
return eventMessage
class checkModTime(threading.Thread):
def __init__(self, NumberOfThreads, paths):
def __init__(self, NumberOfThreads, timeTillClosed, paths):
self.log = self.getLogger()
self.log.debug("init")
#Make the Pool of workers
self.pool = ThreadPool(NumberOfThreads)
self.paths = paths
self.timeTillClosed = timeTillClosed # s
self._stop = threading.Event()
self.log.debug("threading.Thread init")
......@@ -194,10 +194,10 @@ class checkModTime(threading.Thread):
while True:
try:
# Open the urls in their own threads
self.log.debug("loop: " + str(eventListToObserve))
self.log.debug("eventMessageList: " + str(eventMessageList))
self.log.debug("List to observe: " + str(eventListToObserve))
# self.log.debug("eventMessageList: " + str(eventMessageList))
self.pool.map(self.checkLastModified, eventListToObserve)
self.log.debug("eventMessageList: " + str(eventMessageList))
# self.log.debug("eventMessageList: " + str(eventMessageList))
time.sleep(2)
except:
break
......@@ -205,6 +205,7 @@ class checkModTime(threading.Thread):
def checkLastModified(self, filepath):
global eventMessageList
global eventListToObserve
try:
# check modification time
......@@ -214,8 +215,6 @@ class checkModTime(threading.Thread):
self.log.error("Error was: " + str(e))
return
self.log.debug("modification Time: " + str(timeLastModified))
try:
# get current time
timeCurrent = time.time()
......@@ -223,14 +222,19 @@ class checkModTime(threading.Thread):
self.log.error("Unable to get current time for file: " + filepath)
self.log.error("Error was: " + str(e))
self.log("current Time: " + str(timeCurrent))
# compare ( >= limit)
if timeCurrent - timeLastModified >= timeToWait:
if timeCurrent - timeLastModified >= self.timeTillClosed:
self.log.debug("New closed file detected: " + str(filepath))
eventMessage = splitFilePath(filepath, self.paths)
self.log.debug("eventMessage: " + str(eventMessage))
# add to result list
eventMessageList.append(eventMessage)
eventListToObserve.remove(filepath)
else:
self.log.debug("File was last modified " + str(timeCurrent - timeLastModified) + \
" sec ago: " + str(filepath))
def stop(self):
......@@ -255,15 +259,16 @@ class WatchdogDetector():
self.log.debug("init")
self.config = config
self.paths = self.config["monDir"]
self.monDir = self.config["monDir"][0]
self.config = config
self.paths = self.config["monDir"]
self.monDir = self.config["monDir"][0]
self.timeTillClosed = self.config["timeTillClosed"]
self.observer = Observer()
self.observer.schedule(WatchdogEventHandler(self.config), path=self.monDir, recursive=True)
self.observer.start()
self.checkingThread = checkModTime(4, self.paths)
self.checkingThread = checkModTime(4, self.timeTillClosed, self.paths)
self.checkingThread.start()
......@@ -300,11 +305,13 @@ if __name__ == '__main__':
from subprocess import call
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
SRC_PATH = BASE_PATH + os.sep + "src"
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
sys.path.append ( SRC_PATH )
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
import shared.helperScript as helperScript
import helperScript
logfilePath = BASE_PATH + "/logs/watchdogDetector.log"
verbose = True
......@@ -315,11 +322,12 @@ if __name__ == '__main__':
config = {
#TODO normpath to make insensitive to "/" at the end
"monDir" : [ BASE_PATH + "/data/source" ],
"monEventType" : "ON_CLOSE",
# "monEventType" : "IN_CREATE",
"monSubdirs" : ["local"],
"monSuffixes" : [".tif", ".cbf"]
"monDir" : [ BASE_PATH + "/data/source" ],
"monEventType" : "ON_CLOSE",
# "monEventType" : "IN_CREATE",
"monSubdirs" : ["local"],
"monSuffixes" : [".tif", ".cbf"],
"timeTillClosed" : 2 #s
}
sourceFile = BASE_PATH + "/test_file.cbf"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment