Skip to content
Snippets Groups Projects
Commit 986dc820 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Condensed code in EventDetector

parent 01e55352
No related branches found
No related tags found
No related merge requests found
......@@ -47,20 +47,15 @@ class DirectoryWatcher():
self.fileEventIp = fileEventIp
self.fileEventPort = fileEventPort
if monitoredEventType:
self.monitoredEventType = monitoredEventType
self.monitoredEventType = monitoredEventType or None
self.log.info ("Monitored event type is: " + str( monitoredEventType ))
if monitoredDefaultSubdirs:
self.monitoredDefaultSubdirs = monitoredDefaultSubdirs
if monitoredSuffixes:
self.monitoredSuffixes = monitoredSuffixes
self.monitoredDefaultSubdirs = monitoredDefaultSubdirs or None
self.monitoredSuffixes = monitoredSuffixes or None
self.log.info ("Monitored suffixes are: " + str( monitoredSuffixes ))
# monitoredDirs = self.getDirectoryStructure()
monitoredDirs = [self.watchDir]
self.eventDetector = EventDetector(monitoredDirs, self.monitoredEventType, self.monitoredDefaultSubdirs, self.monitoredSuffixes)
......@@ -68,7 +63,13 @@ class DirectoryWatcher():
self.createSockets()
self.process()
try:
self.process()
except KeyboardInterrupt:
self.log.debug("Keyboard interruption detected. Shuting down")
# finally:
# self.eventDetector.stop()
# self.stop()
def getLogger(self):
......@@ -88,102 +89,49 @@ class DirectoryWatcher():
self.log.debug("Error was:" + str(e))
def getDirectoryStructure(self):
# Add the default subdirs
dirsToWalk = [self.watchDir + os.sep + directory for directory in self.monitoredDefaultSubdirs]
monitoredDirs = []
# Walk the tree
for directory in dirsToWalk:
for root, directories, files in os.walk(directory):
# Add the found directorys to the list for the inotify-watch
monitoredDirs.append(root)
self.log.info("Add directory to monitor: " + str(root))
# print "Add directory to monitor: " + str(root)
return monitoredDirs
def passFileToZeromq(self, targetSocket, sourcePath, relativePath, filename):
'''
Taking the filename, creating a buffer and then
sending the data as multipart message to the socket.
For testing currently the multipart message consists of only one message.
:param sourcePath: Pointing to the data which is going to be send
:param relativePath: Relative path leading from the origin source path (not the filepath) to the file
:param filename: The name of the file to be send
:return:
'''
#build message dict
try:
self.log.debug("Building message dict...")
messageDict = { "filename" : filename,
"sourcePath" : sourcePath,
"relativePath" : relativePath
}
messageDictJson = json.dumps(messageDict) #sets correct escape characters
self.log.debug("Building message dict...done.")
except Exception, e:
errorMessage = "Unable to assemble message dict."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
self.log.debug("Building message dict...failed.")
raise Exception(e)
#send message
try:
self.log.debug("Sending message...")
self.log.debug(str(messageDictJson))
targetSocket.send(messageDictJson)
self.log.debug("Sending message...done.")
except KeyboardInterrupt:
self.log.error("Sending message...failed because of KeyboardInterrupt.")
except Exception, e:
self.log.error("Sending message...failed.")
self.log.debug("Error was: " + str(e))
raise Exception(e)
def process(self):
try:
while True:
try:
while True:
try:
# the event for a file /tmp/test/source/local/file1.tif is of the form:
# {
# "sourcePath" : "/tmp/test/source/"
# "relativePath": "local"
# "filename" : "file1.tif"
# }
workloadList = self.eventDetector.getNewEvent()
except Exception, e:
self.log.error("Invalid fileEvent message received.")
self.log.debug("Error was: " + str(e))
#skip all further instructions and continue with next iteration
continue
#TODO validate workload dict
for workload in workloadList:
sourcePath = workload["sourcePath"]
# the directories local, current, and commissioning are monitored by default
# (sourcePath,relDir) = os.path.split(sourcePath)
# relativePath = os.path.normpath(relDir + os.sep + workload["relativePath"])
relativePath = workload["relativePath"]
filename = workload["filename"]
# print "eventDetector:", sourcePath, relativePath, filename
# send the file to the fileMover
self.passFileToZeromq(self.messageSocket, sourcePath, relativePath, filename)
except KeyboardInterrupt:
self.log.debug("Keyboard interruption detected. Shuting down")
finally:
self.eventDetector.stop()
self.stop()
# the event for a file /tmp/test/source/local/file1.tif is of the form:
# {
# "sourcePath" : "/tmp/test/source/"
# "relativePath": "local"
# "filename" : "file1.tif"
# }
workloadList = self.eventDetector.getNewEvent()
except Exception, e:
self.log.error("Invalid fileEvent message received.")
self.log.debug("Error was: " + str(e))
#skip all further instructions and continue with next iteration
continue
#TODO validate workload dict
for workload in workloadList:
# build message dict
try:
self.log.debug("Building message dict...")
messageDict = { "filename" : workload["filename"],
"sourcePath" : workload["sourcePath"],
"relativePath" : workload["relativePath"]
}
messageDictJson = json.dumps(messageDict) #sets correct escape characters
self.log.debug("Building message dict...done.")
except Exception, e:
self.log.error("Unable to assemble message dict.")
self.log.debug("Error was: " + stri(e))
continue
# send the file to the fileMover
try:
self.log.debug("Sending message...")
self.log.debug(str(messageDictJson))
self.messageSocket.send(messageDictJson)
self.log.debug("Sending message...done.")
except Exception, e:
self.log.error("Sending message...failed.")
self.log.debug("Error was: " + str(e))
def stop(self):
......@@ -192,6 +140,14 @@ class DirectoryWatcher():
self.zmqContext.destroy()
def __exit__(self):
self.stop()
def __del__(self):
self.stop()
def argumentParsing():
......
......@@ -128,7 +128,7 @@ class InotifyDetector():
except Exception as e:
self.log.error("Could not register watch for path: " + str(path) )
self.log.debug("Error was " + str(e))
self.stop()
# self.stop()
def getDirectoryStructure(self):
......@@ -282,30 +282,23 @@ class InotifyDetector():
def process(self):
try:
try:
while True:
self.getNewEvent()
except KeyboardInterrupt:
pass
finally:
self.stop()
def stop(self):
try:
try:
for wd in self.wd_to_path:
for wd in self.wd_to_path:
try:
binding.rm_watch(self.fd, wd)
except Exception as e:
self.log.error("Unable to remove watch.")
except Exception as e:
self.log.error("Unable to remove watch: " + wd)
self.log.debug("Error was: " + str(e))
finally:
os.close(self.fd)
def __exit__(self):
self.stop()
def __del__(self):
self.stop()
if __name__ == '__main__':
......
......@@ -65,7 +65,7 @@ def isSupported():
def getTransportProtocol():
is platform.system() == "Linux":
if platform.system() == "Linux":
return "ipc"
else:
return "tcp"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment