Skip to content
Snippets Groups Projects
Commit 1bb03854 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added ring buffer to sender and enabled realtime analysis communication

parent 7eff68b1
No related branches found
No related tags found
No related merge requests found
......@@ -16,6 +16,7 @@ import subprocess
import json
import shutil
import helperScript
from RingBuffer import RingBuffer
DEFAULT_CHUNK_SIZE = 1048576
......@@ -44,7 +45,11 @@ class Cleaner():
# to get the logging only handling this class
log = None
def __init__(self, targetPath, bindingIp="127.0.0.1", bindingPort="6062", context = None, verbose=False):
useRealTimeAnalysis = True # boolian to inform if the receiver for the realtime analysis is running
maxRingBufferSize = None
ringBuffer = None
def __init__(self, targetPath, bindingIp="127.0.0.1", bindingPort="6062", maxRingBufferSize = None, context = None, verbose=False):
self.bindingPortForSocket = bindingPort
self.bindingIpForSocket = bindingIp
self.targetPath = targetPath
......@@ -56,6 +61,13 @@ class Cleaner():
self.zmqContextForCleaner = zmq.Context()
self.externalContext = False
if maxRingBufferSize:
self.maxRingBufferSize = maxRingBufferSize
# # TODO remove targetPath?
# self.ringBuffer = RingBuffer(self.maxRingBufferSize, self.targetPath)
self.ringBuffer = RingBuffer(self.maxRingBufferSize)
self.log = self.getLogger()
self.log.debug("Init")
......@@ -99,8 +111,24 @@ class Cleaner():
self.log.info("Stopping cleaner")
self.stop()
break
elif workload == "START_REALTIME_ANALYSIS":
self.useRealTimeAnalysis = True
self.log.info("Starting realtime analysis")
break
elif workload == "STOP_REALTIME_ANALYSIS":
self.useRealTimeAnalysis = False
self.log.info("Stopping realtime analysis")
break
#transform to dictionary
# transform to dictionary
# metadataDict = {
# "filename" : filename,
# "filesize" : filesize,
# "fileModificationTime" : fileModificationTime,
# "sourcePath" : sourcePath,
# "relativePath" : relativePath,
# "chunkSize" : self.getChunkSize()
# }
try:
workloadDict = json.loads(str(workload))
except:
......@@ -115,6 +143,8 @@ class Cleaner():
filename = workloadDict["filename"]
sourcePath = workloadDict["sourcePath"]
relativePath = workloadDict["relativePath"]
if self.useRealTimeAnalysis:
modTime = workloadDict["fileModificationTime"]
# filesize = workloadDict["filesize"]
except Exception, e:
errorMessage = "Invalid fileEvent message received."
......@@ -124,43 +154,111 @@ class Cleaner():
#skip all further instructions and continue with next iteration
continue
#moving source file
sourceFilepath = None
#source file
sourceFullpath = None
try:
self.log.debug("removing source file...")
#generate target filepath
sourcePath = os.path.normpath(sourcePath + os.sep + relativePath)
sourceFullPath = os.path.join(sourcePath,filename)
targetFullPath = os.path.normpath(self.targetPath + relativePath)
# self.removeFile(sourceFilepath)
self.log.debug ("sourcePath: " + str (sourcePath))
self.log.debug ("filename: " + str (filename))
self.log.debug ("targetPath: " + str (targetFullPath))
self.moveFile(sourcePath, filename, targetFullPath)
# #show filesystem statistics
# try:
# self.showFilesystemStatistics(sourcePath)
# except Exception, f:
# logging.warning("Unable to get filesystem statistics")
# logging.debug("Error was: " + str(f))
self.log.debug("file removed: " + str(sourceFullPath))
self.log.debug("removing source file...success.")
self.log.debug("sourcePath: " + str (sourcePath))
self.log.debug("filename: " + str (filename))
self.log.debug("targetPath: " + str (targetFullPath))
except Exception, e:
errorMessage = "Unable to remove source file: " + str (sourceFullPath)
self.log.error(errorMessage)
self.log.error("Unable to generate file paths")
trace = traceback.format_exc()
self.log.error("Error was: " + str(trace))
self.log.debug("sourceFilepath="+str(sourceFilepath))
self.log.debug("removing source file...failed.")
#skip all further instructions and continue with next iteration
continue
if self.useRealTimeAnalysis:
# copy file
try:
self.log.debug("Copying source file...")
self.copyFile(sourcePath, filename, targetFullPath)
self.log.debug("File copied: " + str(sourceFullPath))
self.log.debug("Copying source file...success.")
except Exception, e:
self.log.error("Unable to copy source file: " + str (sourceFullPath) )
trace = traceback.format_exc()
self.log.error("Error was: " + str(trace))
self.log.debug("sourceFullpath="+str(sourceFullpath))
self.log.debug("Copying source file...failed.")
#skip all further instructions and continue with next iteration
continue
# add file to ring buffer
self.log.debug("Add new file to ring buffer: " + str(sourceFullPath) + ", " + str(modTime))
self.ringBuffer.add(sourceFullPath, modTime)
else:
try:
self.log.debug("Moving source file...")
self.moveFile(sourcePath, filename, targetFullPath)
# self.removeFile(sourceFullpath)
# #show filesystem statistics
# try:
# self.showFilesystemStatistics(sourcePath)
# except Exception, f:
# logging.warning("Unable to get filesystem statistics")
# logging.debug("Error was: " + str(f))
self.log.debug("File moved: " + str(sourceFullPath))
self.log.debug("Moving source file...success.")
except Exception, e:
self.log.error("Unable to move source file: " + str (sourceFullPath) )
trace = traceback.format_exc()
self.log.error("Error was: " + str(trace))
self.log.debug("sourceFullpath="+str(sourceFullpath))
self.log.debug("Moving source file...failed.")
#skip all further instructions and continue with next iteration
continue
def copyFile(self, source, filename, target):
maxAttemptsToCopyFile = 2
waitTimeBetweenAttemptsInMs = 500
iterationCount = 0
self.log.info("Copying file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
fileWasCopied = False
while iterationCount <= maxAttemptsToCopyFile and not fileWasCopied:
iterationCount+=1
try:
# check if the directory exists before moving the file
if not os.path.exists(target):
try:
os.makedirs(target)
except OSError:
pass
# moving the file
sourceFile = source + os.sep + filename
targetFile = target + os.sep + filename
self.log.debug("sourceFile: " + str(sourceFile))
self.log.debug("targetFile: " + str(targetFile))
shutil.copyfile(sourceFile, targetFile)
fileWasCopied = True
self.log.debug("Copying file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
except IOError:
self.log.debug ("IOError: " + str(filename))
except Exception, e:
trace = traceback.format_exc()
warningMessage = "Unable to copy file {FILE}.".format(FILE=str(source) + str(filename))
self.log.warning(warningMessage)
self.log.debug("trace=" + str(trace))
self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
if not fileWasCopied:
self.log.error("Copying file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.")
raise Exception("maxAttemptsToCopyFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount), FILE=filename))
def moveFile(self, source, filename, target):
maxAttemptsToRemoveFile = 2
maxAttemptsToMoveFile = 2
waitTimeBetweenAttemptsInMs = 500
......@@ -168,7 +266,7 @@ class Cleaner():
self.log.info("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
fileWasMoved = False
while iterationCount <= maxAttemptsToRemoveFile and not fileWasMoved:
while iterationCount <= maxAttemptsToMoveFile and not fileWasMoved:
iterationCount+=1
try:
# check if the directory exists before moving the file
......@@ -194,11 +292,9 @@ class Cleaner():
self.log.debug("trace=" + str(trace))
self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
if not fileWasMoved:
self.log.error("Moving file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.")
raise Exception("maxAttemptsToMoveFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount),
FILE=filename))
raise Exception("maxAttemptsToMoveFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount), FILE=filename))
def removeFile(self, filepath):
......@@ -223,11 +319,10 @@ class Cleaner():
self.log.debug("trace=" + str(trace))
self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
if not fileWasRemoved:
self.log.error("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...FAILED.")
raise Exception("maxAttemptsToRemoveFile reached (value={ATTEMPT}). Unable to remove file '{FILE}'.".format(ATTEMPT=str(iterationCount),
FILE=filepath))
raise Exception("maxAttemptsToRemoveFile reached (value={ATTEMPT}). Unable to remove file '{FILE}'.".format(ATTEMPT=str(iterationCount), FILE=filepath))
def stop(self):
self.log.debug("Closing socket")
......
......@@ -57,8 +57,10 @@ class RingBuffer:
def getNewestFile(self):
# send first element in ring buffer to live viewer (the path of this file is the second entry)
if self.ringBuffer:
self.log.debug("Newest Event: " + str(self.ringBuffer[0][1]) )
return self.ringBuffer[0][1]
else:
self.log.debug("Newest Event: None")
return "None"
......@@ -70,6 +72,7 @@ class RingBuffer:
# if the maximal size is exceeded: remove the oldest files
if len(self.ringBuffer) > self.maxRingBufferSize:
for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]:
self.log.debug("Remove file from ring buffer: " + str(path) )
os.remove(path)
self.ringBuffer.remove([mod_time, path])
......
......@@ -43,7 +43,8 @@ class WorkerProcess():
routerSocket = None
cleanerSocket = None
useLiveViewer = False # boolian to inform if the receiver to show the files in the live viewer is running
useLiveViewer = False # boolian to inform if the receiver for the live viewer is running
useRealTimeAnalysis = True # boolian to inform if the receiver for realtime-analysis is running
# to get the logging only handling this class
log = None
......@@ -156,14 +157,28 @@ class WorkerProcess():
self.useLiveViewer = True
continue
# the live viewer is turned of
# the live viewer is turned off
stopLV = workload == b"STOP_LIVE_VIEWER"
if stopLV:
self.log.info("worker-"+str(self.id)+": Received live viewer stop command...stopping live viewer")
self.useLiveViewer = False
continue
if self.useLiveViewer:
# the realtime-analysis is turned on
startRTA = workload == b"START_REALTIME_ANALYSIS"
if startRTA:
self.log.info("worker-"+str(self.id)+": Received realtime-analysis start command...starting live viewer")
self.useRealTimeAnalysis = True
continue
# the realtime-analysis is turned off
stopRTA = workload == b"STOP_REALTIME_ANALYSIS"
if stopRTA:
self.log.info("worker-"+str(self.id)+": Received realtime-analysis stop command...stopping live viewer")
self.useRealTimeAnalysis = False
continue
if self.useLiveViewer or self.useRealTimeAnalysis:
#convert fileEventMessage back to a dictionary
fileEventMessageDict = None
try:
......@@ -188,6 +203,7 @@ class WorkerProcess():
#skip all further instructions and continue with next iteration
continue
if self.useLiveViewer:
#passing file to data-messagPipe
try:
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...")
......@@ -203,21 +219,61 @@ class WorkerProcess():
else:
print "worker-"+str(self.id)+": no data sent"
if self.useRealTimeAnalysis:
# --> sourceFilePathFull = 'C:\\dir\img.tiff'
sourceFilePath = os.path.normpath(sourcePath + os.sep + relativePath)
sourceFilePathFull = os.path.join(sourceFilePath, filename)
#send remove-request to message pipe
try:
#sending to pipe
self.log.debug("send file-event for file to cleaner-pipe...")
self.log.debug("workload = " + str(workload))
self.cleanerSocket.send(workload)
self.log.debug("send file-event for file to cleaner-pipe...success.")
#reading source file into memory
try:
#for quick testing set filesize of file as chunksize
self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...")
filesize = os.path.getsize(sourceFilePathFull)
fileModificationTime = os.stat(sourceFilePathFull).st_mtime
self.log.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize)))
self.log.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime)))
except Exception, e:
self.log.error("Unable to get file metadata for '" + str(sourceFilePathFull) + "'." )
self.log.debug("Error was: " + str(e))
raise Exception(e)
#build payload for message-pipe by putting source-file into a message
try:
payloadMetadata = self.buildPayloadMetadata(filename, filesize, fileModificationTime, sourcePath, relativePath)
payloadMetadata = json.dumps(payloadMetadata)
except Exception, e:
self.log.error("Unable to assemble multi-part message.")
self.log.debug("Error was: " + str(e))
raise Exception(e)
#TODO: remember workload. append to list?
# can be used to verify files which have been processed twice or more
except Exception, e:
errorMessage = "Unable to notify Cleaner-pipe to delete file: " + str(workload)
self.log.error(errorMessage)
self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict))
if self.useRealTimeAnalysis:
#send remove-request to message pipe
try:
#sending to pipe
self.log.debug("send file-event for file to cleaner-pipe...")
self.log.debug("payloadMetadata = " + str(payloadMetadata))
self.cleanerSocket.send(payloadMetadata)
self.log.debug("send file-event for file to cleaner-pipe...success.")
except Exception, e:
self.log.error("Unable to notify Cleaner-pipe to delete file: " + str(workload))
self.log.debug("payloadMetadata=" + str(payloadMetadata))
else:
#send remove-request to message pipe
try:
#sending to pipe
self.log.debug("send file-event for file to cleaner-pipe...")
self.log.debug("workload = " + str(workload))
self.cleanerSocket.send(workload)
self.log.debug("send file-event for file to cleaner-pipe...success.")
#TODO: remember workload. append to list?
# can be used to verify files which have been processed twice or more
except Exception, e:
errorMessage = "Unable to notify Cleaner-pipe to delete file: " + str(workload)
self.log.error(errorMessage)
self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict))
def getLogger(self):
......@@ -418,8 +474,9 @@ class FileMover():
fileEventSocket = None # to receive fileMove-jobs as json-encoded dictionary
receiverComSocket = None # to exchange messages with the receiver
routerSocket = None
cleanerSocket = None # to echange if a realtime analysis receiver is online
useLiveViewer = False # boolian to inform if the receiver to show the files in the live viewer is running
useLiveViewer = False # boolian to inform if the receiver for the live viewer is running
# to get the logging only handling this class
log = None
......@@ -464,6 +521,12 @@ class FileMover():
self.receiverComSocket.bind(connectionStrReceiverComSocket)
self.log.debug("receiverComSocket started (bind) for '" + connectionStrReceiverComSocket + "'")
#init Cleaner message-pipe
self.cleanerSocket = self.zmqContext.socket(zmq.PUSH)
connectionStrCleanerSocket = "tcp://{ip}:{port}".format(ip=self.zmqCleanerIp, port=self.zmqCleanerPort)
self.cleanerSocket.connect(connectionStrCleanerSocket)
self.log.debug("cleanerSocket started (connect) for '" + connectionStrCleanerSocket + "'")
# Poller to get either messages from the watcher or communication messages to stop sending data to the live viewer
self.poller = zmq.Poller()
self.poller.register(self.fileEventSocket, zmq.POLLIN)
......@@ -577,13 +640,25 @@ class FileMover():
self.log.info("Received live viewer stop signal from host " + str(signalHostname) + "...stopping live viewer")
print "Received live viewer stop signal from host " + signalHostname + "...stopping live viewer"
self.useLiveViewer = False
self.sendLiveViewerSignal(signal)
self.sendSignalToReceiver(signal)
continue
elif signal == "START_LIVE_VIEWER":
self.log.info("Received live viewer start signal from host " + str(signalHostname) + "...starting live viewer")
print "Received live viewer start signal from host " + str(signalHostname) + "...starting live viewer"
self.useLiveViewer = True
self.sendLiveViewerSignal(signal)
self.sendSignalToReceiver(signal)
continue
elif signal == "STOP_REALTIME_ANALYSIS":
self.log.info("Received realtime analysis stop signal from host " + str(signalHostname) + "...stopping realtime analysis")
print "Received realtime analysis stop signal from host " + signalHostname + "...stopping realtime analysis"
self.useRealTimeAnalysis = False
self.sendSignalToCleaner(signal)
continue
elif signal == "START_REALTIME_ANALYSIS":
self.log.info("Received realtime analysis start signal from host " + str(signalHostname) + "...starting realtime analysis")
print "Received realtime analysis start signal from host " + str(signalHostname) + "...starting realtime analysis"
self.useRealTimeAnalysis = True
self.sendSignalToCleaner(signal)
continue
else:
self.log.info("Received live viewer signal from host " + str(signalHostname) + " unkown: " + str(signal))
......@@ -613,10 +688,17 @@ class FileMover():
self.log.debug("passing job to workerProcess...done.")
def sendLiveViewerSignal(self, signal):
def sendSignalToCleaner(self, signal):
self.log.debug("send signal to cleaner: " + str(signal) )
self.cleanerSocket.send(signal)
self.log.debug("send confirmation back to receiver: " + str(signal) )
self.receiverComSocket.send(signal, zmq.NOBLOCK)
def sendSignalToReceiver(self, signal):
numberOfWorkerProcesses = int(self.parallelDataStreams)
for processNumber in range(numberOfWorkerProcesses):
self.log.debug("send live viewer signal " + str(signal) + " to workerProcess (nr " + str(processNumber) + " )")
self.log.debug("send signal to receiver " + str(signal) + " to workerProcess (nr " + str(processNumber) + " )")
address, empty, ready = self.routerSocket.recv_multipart()
self.log.debug("available workerProcess detected.")
......@@ -629,6 +711,7 @@ class FileMover():
b'',
signal,
])
self.log.debug("send confirmation back to receiver: " + str(signal) )
self.receiverComSocket.send(signal, zmq.NOBLOCK)
......@@ -636,6 +719,7 @@ class FileMover():
self.log.debug("Closing sockets")
self.fileEventSocket.close(0)
self.receiverComSocket.close(0)
self.cleanerSocket.close(0)
self.routerSocket.close(0)
......@@ -663,6 +747,7 @@ class Sender():
parallelDataStreams = None
chunkSize = None
zmqContext = None
def __init__(self, verbose = True):
defConf = defaultConfigSender()
......@@ -709,7 +794,7 @@ class Sender():
logging.debug("start watcher process...done")
logging.debug("start cleaner process...")
cleanerProcess = Process(target=Cleaner, args=(self.cleanerTargetPath, self.zmqCleanerIp, self.zmqCleanerPort, self.zmqContext))
cleanerProcess = Process(target=Cleaner, args=(self.cleanerTargetPath, self.zmqCleanerIp, self.zmqCleanerPort, 10, self.zmqContext))
logging.debug("cleaner process registered")
cleanerProcess.start()
logging.debug("start cleaner process...done")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment