Skip to content
Snippets Groups Projects
Commit eed46a1e authored by p11user's avatar p11user
Browse files

Changed to move_to trigger from InotifyDetector

parent 0d5b9207
No related branches found
No related tags found
No related merge requests found
# Directory you want to monitor for changes
# Inside this directory only the subdirectories "commissioning", "current" and "local" are monitored
watchDir = /rd_temp
watchDir = /rd
# Target to move the files into
cleanerTargetPath = /gpfs/
cleanerTargetPath = /gpfs
# Subdirectories of watchDir to be monitored
monitoredSubdirs = ["commissioning", "current", "local"]
......
......@@ -15,6 +15,7 @@ from multiprocessing import Process, freeze_support
import subprocess
import json
import shutil
import collections
DEFAULT_CHUNK_SIZE = 1048576
......@@ -41,7 +42,9 @@ class Cleaner():
externalContext = None # if the context was created outside this class or not
zmqCleanerSocket = None
useDataStream = True # boolian to inform if the data should be send to the data stream pipe (to the storage system)
useDataStream = True # boolian to inform if the data should be send to the data stream pipe (to the storage system)
lastMovedFiles = collections.deque(maxlen = 20)
# to get the logging only handling this class
log = None
......@@ -158,6 +161,7 @@ class Cleaner():
if self.useDataStream:
self.removeFile(sourceFullPath)
else:
# self.copyFile(sourcePath, filename, targetFullPath)
self.moveFile(sourcePath, filename, targetFullPath)
# #show filesystem statistics
......@@ -183,7 +187,6 @@ class Cleaner():
iterationCount = 0
self.log.debug("Copying file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
fileWasCopied = False
while iterationCount <= maxAttemptsToCopyFile and not fileWasCopied:
......@@ -198,12 +201,13 @@ class Cleaner():
# moving the file
sourceFile = source + os.sep + filename
targetFile = target + os.sep + filename
# targetFile = "/gpfs/current/scratch_bl/test" + os.sep + filename
self.log.debug("sourceFile: " + str(sourceFile))
self.log.debug("targetFile: " + str(targetFile))
# shutil.copyfile(sourceFile, targetFile)
subprocess.call(["mv", sourceFile, targetFile])
shutil.copyfile(sourceFile, targetFile)
# subprocess.call(["mv", sourceFile, targetFile])
fileWasCopied = True
self.log.debug("Copying file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
self.log.info("Copying file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
except IOError:
self.log.debug ("IOError: " + str(filename))
except Exception, e:
......@@ -214,7 +218,7 @@ class Cleaner():
self.log.debug("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
if not fileWasCopied:
self.log.debug("Copying file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.")
self.log.info("Copying file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.")
raise Exception("maxAttemptsToCopyFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount), FILE=filename))
......@@ -239,13 +243,23 @@ class Cleaner():
# print 'paths:', source, target, os.sep, filename
sourceFile = source + os.sep + filename
targetFile = target + os.sep + filename
# targetFile = "/gpfs/current/scratch_bl/test" + os.sep + filename
self.log.debug("sourceFile: " + str(sourceFile))
self.log.debug("targetFile: " + str(targetFile))
shutil.move(sourceFile, targetFile)
fileWasMoved = True
self.log.info("Moving file '" + str(filename) + "' from '" + str(sourceFile) + "' to '" + str(targetFile) + "' (attempt " + str(iterationCount) + ")...success.")
except IOError:
self.log.debug ("IOError: " + str(filename))
try:
shutil.move(sourceFile, targetFile)
self.lastMovedFiles.append(filename)
fileWasMoved = True
self.log.info("Moving file '" + str(filename) + "' from '" + str(sourceFile) + "' to '" + str(targetFile) + "' (attempt " + str(iterationCount) + ")...success.")
except Exception, e:
self.log.debug ("Checking if file was already moved: " + str(filename))
self.log.debug ("Error was: " + str(e))
if filename in self.lastMovedFiles:
self.log.info("File was found in history.")
fileWasMoved = True
else:
self.log.info("File was not found in history.")
except Exception, e:
trace = traceback.format_exc()
warningMessage = "Unable to move file {FILE}.".format(FILE=str(sourceFile))
......
......@@ -85,7 +85,7 @@ class InotifyDetector():
self.paths = paths
self.log = self.getLogger()
self.fd = binding.init()
self.monitoredSuffixes = monitoredSuffixes
self.monitoredSuffixes = tuple(monitoredSuffixes)
self.monitoredSubdirs = monitoredSubdirs
self.add_watch()
......@@ -209,17 +209,19 @@ class InotifyDetector():
del self.wd_to_path[foundWatch]
continue
# only moved files are send
if not is_dir and is_moved_to:
# only closed files are send
if not is_dir and is_closed:
# print path, event.name, parts
# print event.name
# TODO check if still necessary
# checks if one of the suffixes to monitore is contained in the event.name
resultSuffix = filter(lambda x: x in event.name, self.monitoredSuffixes)
# only files with end with a suffix specified in monitoredSuffixed are monitored
if not resultSuffix:
#if not event.name.endswith(self.monitoredSuffixes):
# if not resultSuffix:
if not event.name.endswith(self.monitoredSuffixes):
self.log.debug("File ending not in monitored Suffixes: " + str(event.name))
self.log.debug("detected events were: " + str(parts))
continue
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment