Skip to content
Snippets Groups Projects
Commit 20e1c2b9 authored by p11user's avatar p11user
Browse files

Modifications from P11

parent 82e75596
No related branches found
No related tags found
No related merge requests found
......@@ -9,17 +9,23 @@ sys.path.append ( ZMQ_PATH )
import helperScript
#LOCAL_IP= "0.0.0.0"
LOCAL_IP= "127.0.0.1"
#LOCAL_IP= "127.0.0.1"
LOCAL_IP= "131.169.66.47"
BASE_PATH = "/home/kuhnm/Arbeit/live-viewer"
#BASE_PATH = "/space/projects/live-viewer"
BASE_PATH = "/home/p11user/live-viewer"
class defaultConfigSender():
# folder you want to monitor for changes
# inside this folder only the subdirectories "commissioning", "current" and "local" are monitored
watchFolder = BASE_PATH + "/data/source/"
# watchFolder = BASE_PATH + "/data/source/"
watchFolder = "/rd/"
watchFolder = "/rd_temp/"
# Target to move the files into
cleanerTargetPath = BASE_PATH + "/data/target/"
# cleanerTargetPath = BASE_PATH + "/data/target/"
cleanerTargetPath = "/gpfs/"
# cleanerTargetPath = "/rd/temp/"
# subfolders of watchFolders to be monitored
monitoredSubfolders = ["commissioning", "current", "local"]
......@@ -32,7 +38,7 @@ class defaultConfigSender():
# list of hosts allowed to connect to the sender
# receiverWhiteList = ["lsdma-lab04"]
# receiverWhiteList = ["zitpcx19282"]
receiverWhiteList = ["zitpcx19282", "zitpcx22614", "lsdma-lab04"]
receiverWhiteList = ["zitpcx19282", "zitpcx22614", "lsdma-lab04" , "haspp11eval01" , "it-hpc-cxi04", "it-hpc-cxi03" ]
# zmq endpoint (IP-address) to send file events to
fileEventIp = LOCAL_IP
......@@ -66,7 +72,7 @@ class defaultConfigSender():
# logfilePath = BASE_PATH + "/logs"
# path where logfile will be created
logfilePath = BASE_PATH + "/logs"
logfilePath = "/home/p11user/logs"
# filename used for logging
logfileName = "zmq_sender.log"
......
......@@ -125,6 +125,7 @@ class Cleaner():
filename = workloadDict["filename"]
sourcePath = workloadDict["sourcePath"]
relativePath = workloadDict["relativePath"]
# print "workloadDict:", workloadDict
except Exception, e:
errorMessage = "Invalid fileEvent message received."
self.log.error(errorMessage)
......@@ -234,6 +235,7 @@ class Cleaner():
except OSError:
pass
# moving the file
# print 'paths:', source, target, os.sep, filename
sourceFile = source + os.sep + filename
targetFile = target + os.sep + filename
self.log.debug("sourceFile: " + str(sourceFile))
......@@ -251,7 +253,7 @@ class Cleaner():
self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
if not fileWasMoved:
self.log.error("Moving file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.")
self.log.error("Moving file '" + str(filename) + " from " + str(sourceFile) + " to " + str(targetFile) + "' (attempt " + str(iterationCount) + ")...FAILED.")
raise Exception("maxAttemptsToMoveFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount), FILE=filename))
......
......@@ -79,12 +79,13 @@ class InotifyDetector():
log = None
def __init__(self, paths, monitoredSuffixes):
def __init__(self, paths, monitoredSubfolders, monitoredSuffixes):
self.paths = paths
self.log = self.getLogger()
self.fd = binding.init()
self.monitoredSuffixes = monitoredSuffixes
self.monitoredSuffixes = monitoredSuffixes
self.monitoredSubfolders = monitoredSubfolders
self.add_watch()
......@@ -112,8 +113,10 @@ class InotifyDetector():
def add_watch(self):
foldersToRegister=self.getDirectoryStructure()
try:
for path in self.paths:
# for path in self.paths:
for path in foldersToRegister:
wd = binding.add_watch(self.fd, path)
self.wd_to_path[wd] = path
self.log.debug("Register watch for path:" + str(path) )
......@@ -123,6 +126,23 @@ class InotifyDetector():
self.stop()
def getDirectoryStructure(self):
# Add the default subfolders
print "paths:", self.paths
foldersToWalk = [self.paths[0] + os.sep + folder for folder in self.monitoredSubfolders]
print "foldersToWalk:", foldersToWalk
monitoredFolders = []
# Walk the tree
for folder in foldersToWalk:
for root, directories, files in os.walk(folder):
# Add the found folders to the list for the inotify-watch
monitoredFolders.append(root)
self.log.info("Add folder to monitor: " + str(root))
print "Add folder to monitor: " + str(root)
return monitoredFolders
def getNewEvent(self):
eventMessageList = []
......@@ -138,22 +158,24 @@ class InotifyDetector():
if not event.name:
return []
if not event.name.endswith(self.monitoredSuffixes):
# print "not considered", event.name
return []
# print path, event.name, parts
# print event.name
is_dir = ("IN_ISDIR" in parts_array)
is_closed = ("IN_CLOSE_WRITE" in parts_array)
is_moved = ("IN_MOVE" in parts_array)
# is_closed = ("IN_CLOSE" in parts_array)
# is_closed = ("IN_CLOSE" in parts_array or "IN_CLOSE_WRITE" in parts_array)
is_created = ("IN_CREATE" in parts_array)
# if a new directory is created inside the monitored one,
# this one has to be monitored as well
if is_created and is_dir and event.name:
# print "is_created and is_dir"
# print path, event.name, parts
# if is_created and is_dir and event.name:
if is_dir and event.name:
print "is_created and is_dir"
# print path, event.name, parts
dirname = path + os.sep + event.name
if dirname in self.paths:
......@@ -163,10 +185,23 @@ class InotifyDetector():
self.wd_to_path[wd] = dirname
self.log.info("Added new directory to watch:" + str(dirname))
# if not event.name.endswith(self.monitoredSuffixes):
# print "not considered", event.name
# return []
if '.cbf' not in event.name :
print "not a cbf-file: ", event.name, parts
return []
# only closed files are send
if is_closed and not is_dir:
# if (is_moved and not is_dir) or (is_closed and not is_dir):
# print "is_closed and not is_dir"
# print path, event.name, parts
if event.name[0] == '.' :
event_name_dirty_hack = event.name.rsplit(".", 1)[0][1:]
else :
event_name_dirty_hack = event.name
parentDir = path
relativePath = ""
eventMessage = {}
......@@ -176,11 +211,15 @@ class InotifyDetector():
while True:
if parentDir not in self.paths:
(parentDir,relDir) = os.path.split(parentDir)
relativePath += os.sep + relDir
# print "debug1:", parentDir, relDir
relativePath = os.sep + relDir + relativePath
# print "debug11:", relativePath
else:
# add the local, commissional or current to the relativePath as well
(parentDir,relDir) = os.path.split(parentDir)
relativePath += os.sep + relDir
# (parentDir,relDir) = os.path.split(parentDir)
# print "debug2:", parentDir, relDir
# relativePath = os.sep + relDir + relativePath
# print "debug22:", relativePath
# the event for a file /tmp/test/source/local/file1.tif is of the form:
# {
......@@ -191,8 +230,9 @@ class InotifyDetector():
eventMessage = {
"sourcePath" : parentDir,
"relativePath": relativePath,
"filename" : event.name
"filename" : event_name_dirty_hack
}
print "eventMessage:", eventMessage
eventMessageList.append(eventMessage)
break
......
......@@ -28,6 +28,8 @@ class WorkerProcess():
useLiveViewer = False # boolian to inform if the receiver for the live viewer is running
useRealTimeAnalysis = False # boolian to inform if the receiver for realtime-analysis is running
requestFromOnda = False
# to get the logging only handling this class
log = None
......@@ -230,6 +232,7 @@ class WorkerProcess():
# print "ondaWorkload", ondaWorkload
request = ondaWorkload == b"NEXT_FILE"
if request:
self.requestFromOnda = True
#passing file to data-messagPipe
try:
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...")
......@@ -241,9 +244,23 @@ class WorkerProcess():
self.log.debug("worker-"+str(self.id) + ": passing new file to data-messagePipe...failed.")
#skip all further instructions and continue with next iteration
continue
elif self.requestFromOnda:
#passing file to data-messagPipe
try:
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...")
socketListToSendData["onda"] = self.ondaComSocket
self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...success.")
except Exception, e:
self.log.error("Unable to pass new file to data-messagePipe.")
self.log.error("Error was: " + str(e))
self.log.debug("worker-"+str(self.id) + ": passing new file to data-messagePipe...failed.")
#skip all further instructions and continue with next iteration
continue
self.passFileToDataStream(filename, sourcePath, relativePath, socketListToSendData)
return_value = self.passFileToDataStream(filename, sourcePath, relativePath, socketListToSendData)
#send remove-request to message pipe
try:
......@@ -313,7 +330,8 @@ class WorkerProcess():
errorMessage = "Unable to get file metadata for '" + str(sourceFilePathFull) + "'."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
raise Exception(e)
return None
# raise Exception(e)
try:
self.log.debug("opening '" + str(sourceFilePathFull) + "'...")
......@@ -323,7 +341,8 @@ class WorkerProcess():
errorMessage = "Unable to read source file '" + str(sourceFilePathFull) + "'."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
raise Exception(e)
return None
# raise Exception(e)
#build payload for message-pipe by putting source-file into a message
......@@ -332,7 +351,8 @@ class WorkerProcess():
except Exception, e:
self.log.error("Unable to assemble multi-part message.")
self.log.debug("Error was: " + str(e))
raise Exception(e)
return None
# raise Exception(e)
#send message
......@@ -379,6 +399,7 @@ class WorkerProcess():
# send data to onda
if socketDict.has_key("onda"):
socketDict["onda"].send_multipart(payloadAll, zmq.NOBLOCK)
self.requestFromOnda = False
#close file
fileDescriptor.close()
......@@ -395,6 +416,7 @@ class WorkerProcess():
self.log.debug("Error was: " + str(trace))
self.log.info("Passing multipart-message...failed.")
# raise Exception(e)
return
def appendFileChunksToPayload(self, payload, sourceFilePathFull, fileDescriptor, chunkSize):
......@@ -409,7 +431,8 @@ class WorkerProcess():
payload.append(fileContentAsByteObject)
fileContentAsByteObject = fileDescriptor.read(chunkSize)
except Exception, e:
raise Exception(str(e))
self.log("Error was: " + str(e))
# raise Exception(str(e))
......
......@@ -52,8 +52,9 @@ class DirectoryWatcher():
self.monitoredSuffixes = monitoredSuffixes
print monitoredSuffixes
monitoredFolders = self.getDirectoryStructure()
self.eventDetector = EventDetector(monitoredFolders, self.monitoredSuffixes)
# monitoredFolders = self.getDirectoryStructure()
monitoredFolders = [self.watchFolder]
self.eventDetector = EventDetector(monitoredFolders, self.monitoredDefaultSubfolders, self.monitoredSuffixes)
# assert isinstance(self.zmqContext, zmq.sugar.context.Context)
......@@ -154,10 +155,12 @@ class DirectoryWatcher():
for workload in workloadList:
sourcePath = workload["sourcePath"]
# the folders local, current, and commissioning are monitored by default
(sourcePath,relDir) = os.path.split(sourcePath)
relativePath = os.path.normpath(relDir + os.sep + workload["relativePath"])
# (sourcePath,relDir) = os.path.split(sourcePath)
# relativePath = os.path.normpath(relDir + os.sep + workload["relativePath"])
relativePath = workload["relativePath"]
filename = workload["filename"]
# print "eventDetector:", sourcePath, relativePath, filename
# send the file to the fileMover
self.passFileToZeromq(self.messageSocket, sourcePath, relativePath, filename)
except KeyboardInterrupt:
......
#/bin/sh
BASEPATH=/home/kuhnm/Arbeit
BASEPATH=/home/p11user/live-viewer
FILES=${BASEPATH}/test_data/test_015_00001.cbf
TARGET=$BASEPATH/live-viewer/data/source/local
FILES=/tmp/PhilipPBS_3_00001.cbf
#FILES=${BASEPATH}/test_015_00001.cbf
TARGET=/rd/local/sender_test
i=1
LIMIT=1000
while [ "$i" -le $LIMIT ]
......@@ -11,6 +12,6 @@ do
TARGET_FILE="$TARGET/$i.cbf"
echo $TARGET_FILE
cp $FILES "$TARGET_FILE"
sleep 0.2
# sleep 0.2
i=$(($i+1))
done
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment