From 0ace40c7a4dac278d7f34524764b82d98bb36a8a Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Thu, 30 Jul 2015 11:11:38 +0200 Subject: [PATCH] Centralize redundant methods --- ZeroMQTunnel/fileMover.py | 86 +++-------------------------- ZeroMQTunnel/helperScript.py | 53 ++++++++++++++++++ ZeroMQTunnel/receiver.py | 34 +++--------- ZeroMQTunnel/watcher_lsyncd.py | 80 ++------------------------- lsyncd.conf | 4 +- wrapper_script.py | 99 ---------------------------------- 6 files changed, 74 insertions(+), 282 deletions(-) delete mode 100644 wrapper_script.py diff --git a/ZeroMQTunnel/fileMover.py b/ZeroMQTunnel/fileMover.py index c4455e35..adbc5283 100644 --- a/ZeroMQTunnel/fileMover.py +++ b/ZeroMQTunnel/fileMover.py @@ -15,8 +15,12 @@ from multiprocessing import Process, freeze_support import subprocess import json import shutil +import helperScript + DEFAULT_CHUNK_SIZE = 1048576 + + # # -------------------------- class: WorkerProcess -------------------------------------- # @@ -554,6 +558,9 @@ class FileMover(): +# +# -------------------------- class: Cleaner -------------------------------------- +# class Cleaner(): """ * received cleaning jobs via zeromq, @@ -772,82 +779,6 @@ def argumentParsing(): - -def checkFolderForExistance(watchFolderPath): - """ - abort if watch-folder does not exist - - :return: - """ - - #check folder path for existance. exits if it does not exist - if not os.path.exists(watchFolderPath): - logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath)) - sys.exit(1) - - - -def checkLogfileFolder(logfilePath): - """ - abort if watch-folder does not exist - - :return: - """ - - #check folder path for existance. exits if it does not exist - if not os.path.exists(logfilePath): - logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath)) - sys.exit(1) - - -def initLogging(filenameFullPath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - #log everything to file - logging.basicConfig(level=loggingLevel, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d_%H:%M:%S', - filename=filenameFullPath, - filemode="a") - - #log info to stdout, display messages with different format than the file output - console = logging.StreamHandler() - console.setLevel(logging.WARNING) - formatter = logging.Formatter("%(asctime)s > %(message)s") - console.setFormatter(formatter) - - logging.getLogger("").addHandler(console) - - -# def initLogging(self, logfilePath, verbose): -# #@see https://docs.python.org/2/howto/logging-cookbook.html -# -# logfilePathFull = os.path.join(logfilePath, "cleaner.log") -# logger = logging.getLogger("cleaner") -# -# #more detailed logging if verbose-option has been set -# loggingLevel = logging.INFO -# if verbose: -# loggingLevel = logging.DEBUG -# -# -# #log everything to file -# fileHandler = logging.FileHandler(filename=logfilePathFull, -# mode="a") -# fileHandlerFormat = logging.Formatter(datefmt='%Y-%m-%d_%H:%M:%S', -# fmt='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s') -# fileHandler.setFormatter(fileHandlerFormat) -# fileHandler.setLevel(loggingLevel) -# logger.addHandler(fileHandler) - - if __name__ == '__main__': freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows arguments = argumentParsing() @@ -870,7 +801,7 @@ if __name__ == '__main__': #enable logging - initLogging(logfileFullPath, verbose) + helperScript.initLogging(logfileFullPath, verbose) #create zmq context @@ -884,7 +815,6 @@ if __name__ == '__main__': logging.debug("cleaner thread started") #start new fileMover - # try: fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams, chunkSize, zmqCleanerIp, zmqCleanerPort, diff --git a/ZeroMQTunnel/helperScript.py b/ZeroMQTunnel/helperScript.py index 631981f5..02f4c812 100644 --- a/ZeroMQTunnel/helperScript.py +++ b/ZeroMQTunnel/helperScript.py @@ -1,5 +1,6 @@ import os import platform +import logging @@ -58,3 +59,55 @@ def isSupported(): supportValue = True return supportValue + + + +def checkFolderExistance(folderPath): + """ + abort if folder does not exist + + :return: + """ + + #check folder path for existance. exits if it does not exist + if not os.path.exists(folderPath): + logging.error("Folder '%s' does not exist. Abort." % str(watchFolderPath)) + sys.exit(1) + + +def initLogging(filenameFullPath, verbose): + #@see https://docs.python.org/2/howto/logging-cookbook.html + +# def initLogging(self, logfilePath, verbose): +# +# logfilePathFull = os.path.join(logfilePath, "cleaner.log") + + #more detailed logging if verbose-option has been set + loggingLevel = logging.INFO + if verbose: + loggingLevel = logging.DEBUG + + #log everything to file + logging.basicConfig(level=loggingLevel, + format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d_%H:%M:%S', + filename=filenameFullPath, + filemode="a") + +# fileHandler = logging.FileHandler(filename=logfilePathFull, +# mode="a") +# fileHandlerFormat = logging.Formatter(datefmt='%Y-%m-%d_%H:%M:%S', +# fmt='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s') +# fileHandler.setFormatter(fileHandlerFormat) +# fileHandler.setLevel(loggingLevel) +# logger.addHandler(fileHandler) + + #log info to stdout, display messages with different format than the file output + console = logging.StreamHandler() + console.setLevel(logging.WARNING) + formatter = logging.Formatter("%(asctime)s > %(message)s") + console.setFormatter(formatter) + + logging.getLogger("").addHandler(console) + + diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index e111ff5f..084c726a 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -13,8 +13,12 @@ import os import traceback from stat import S_ISREG, ST_MTIME, ST_MODE import threading +import helperScript +# +# -------------------------- class: FileReceiver -------------------------------------- +# class FileReceiver: zmqContext = None outputDir = None @@ -272,6 +276,9 @@ class FileReceiver: self.log.error(sys.exc_info()) +# +# -------------------------- class: Coordinator -------------------------------------- +# class Coordinator: zmqContext = None liveViewerZmqContext = None @@ -441,31 +448,6 @@ def argumentParsing(): return arguments -def initLogging(filenameFullPath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - #log everything to file - logging.basicConfig(level=loggingLevel, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d_%H:%M:%S', - filename=filenameFullPath, - filemode="a") - - #log info to stdout, display messages with different format than the file output - console = logging.StreamHandler() - console.setLevel(logging.WARNING) - formatter = logging.Formatter("%(asctime)s > %(message)s") - console.setFormatter(formatter) - - logging.getLogger("").addHandler(console) - - if __name__ == "__main__": @@ -482,7 +464,7 @@ if __name__ == "__main__": #enable logging - initLogging(logfileFilePath, verbose) + helperScript.initLogging(logfileFilePath, verbose) #start file receiver diff --git a/ZeroMQTunnel/watcher_lsyncd.py b/ZeroMQTunnel/watcher_lsyncd.py index d54f3729..080c2c59 100644 --- a/ZeroMQTunnel/watcher_lsyncd.py +++ b/ZeroMQTunnel/watcher_lsyncd.py @@ -23,7 +23,6 @@ class DirectoryWatcherHandler(): def __init__(self, zmqContext, fileEventServerIp, watchFolder, fileEventServerPort): logging.debug("DirectoryWatcherHandler: __init__()") - # logging.debug("DirectoryWatcherHandler(): type(zmqContext) = " + str(type(zmqContext))) logging.info("registering zmq context") self.zmqContext = zmqContext self.watchFolder = os.path.normpath(watchFolder) @@ -250,7 +249,7 @@ def argumentParsing(): sys.exit(1) #check logfile-path for existance - checkLogfileFolder(arguments.logfilePath) + helperScript.checkFolderExistance(arguments.logfilePath) return arguments @@ -258,58 +257,6 @@ def argumentParsing(): -def checkWatchFolder(watchFolderPath): - """ - abort if watch-folder does not exist - - :return: - """ - - #check folder path for existance. exits if it does not exist - if not os.path.exists(watchFolderPath): - logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath)) - sys.exit(1) - - - -def checkLogfileFolder(logfilePath): - """ - abort if watch-folder does not exist - - :return: - """ - - #check folder path for existance. exits if it does not exist - if not os.path.exists(logfilePath): - logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath)) - sys.exit(1) - - - -def initLogging(filenameFullPath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - #log everything to file - logging.basicConfig(level=loggingLevel, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d_%H:%M:%S', - filename=filenameFullPath, - filemode="a") - - #log info to stdout, display messages with different format than the file output - console = logging.StreamHandler() - console.setLevel(logging.WARNING) - formatter = logging.Formatter("%(asctime)s > %(message)s") - console.setFormatter(formatter) - logging.getLogger("").addHandler(console) - - - if __name__ == '__main__': arguments = argumentParsing() watchFolder = arguments.watchFolder @@ -322,11 +269,11 @@ if __name__ == '__main__': communicationWithLcyncdPort = "6080" #abort if watch-folder does not exist - checkWatchFolder(watchFolder) + helperScript.checkFolderExistance(watchFolder) #enable logging - initLogging(logfileFilePath, verbose) + helperScript.initLogging(logfileFilePath, verbose) #create zmq context @@ -338,27 +285,6 @@ if __name__ == '__main__': directoryWatcher = DirectoryWatcherHandler(zmqContext, fileEventServerIp, watchFolder, fileEventServerPort) -# pipe_path = "/tmp/zeromqllpipe" -# if not os.path.exists(pipe_path): -# os.mkfifo(pipe_path) -# -# # Open the fifo. We need to open in non-blocking mode or it will stalls until -# # someone opens it for writting -# pipe_fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK) -# -# -# #wait for new files -# with os.fdopen(pipe_fd) as pipe: -# while True: -# message = pipe.read() -# if message: -## print("Received: '%s'" % message) -# pathnames = message.splitlines() -# for filepath in pathnames: -# directoryWatcher.passFileToZeromq(filepath) -# time.sleep(0.1) - - workers = zmqContext.socket(zmq.PULL) zmqSocketStr = 'tcp://' + communicationWithLcyncdIp + ':' + communicationWithLcyncdPort workers.bind(zmqSocketStr) diff --git a/lsyncd.conf b/lsyncd.conf index 01c711db..41defeb2 100644 --- a/lsyncd.conf +++ b/lsyncd.conf @@ -50,7 +50,7 @@ gpfs = { -- spawn ( -- event, -- '/usr/bin/python', --- '/space/projects/Live_Viewer/wrapper_script.py', +-- '/space/projects/Live_Viewer/ZeroMQTunnel/wrapper_script.py', -- '--mv_source', -- event.sourcePath, -- '--mv_target', @@ -87,7 +87,7 @@ gpfs = { spawn ( event, '/usr/bin/python', - '/space/projects/Live_Viewer/wrapper_script.py', + '/space/projects/Live_Viewer/ZeroMQTunnel/wrapper_script.py', '--mv_source', event.sourcePath, '--mv_target', diff --git a/wrapper_script.py b/wrapper_script.py deleted file mode 100644 index 32d99388..00000000 --- a/wrapper_script.py +++ /dev/null @@ -1,99 +0,0 @@ -import argparse -import subprocess -import os -import time -import zmq -import json -import logging - - -def initLogging(filenameFullPath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - #log everything to file - logging.basicConfig(level=loggingLevel, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d_%H:%M:%S', - filename=filenameFullPath, - filemode="a") - - #log info to stdout, display messages with different format than the file output - console = logging.StreamHandler() - console.setLevel(logging.WARNING) - formatter = logging.Formatter("%(asctime)s > %(message)s") - console.setFormatter(formatter) - logging.getLogger("").addHandler(console) - - - - -supportedFormats = [ "tif", "cbf", "hdf5"] -watchFolder = "/space/projects/Live_Viewer/source/" -logfile = "/space/projects/Live_Viewer/logs/wrapper_script.log" -verbose = True - -#enable logging -initLogging(logfile, verbose) - - -parser = argparse.ArgumentParser() -parser.add_argument("--mv_source", help = "Move source") -parser.add_argument("--mv_target", help = "Move target") - -arguments = parser.parse_args() - - -source = os.path.normpath ( arguments.mv_source ) -target = os.path.normpath ( arguments.mv_target ) - -( parentDir, filename ) = os.path.split ( source ) -commonPrefix = os.path.commonprefix ( [ watchFolder, source ] ) -relativebasepath = os.path.relpath ( source, commonPrefix ) -( relativeParent, blub ) = os.path.split ( relativebasepath ) - -( name, postfix ) = filename.split( "." ) -supported_file = postfix in supportedFormats - -zmqIp = "127.0.0.1" -zmqPort = "6080" - -if supported_file: - - # set up ZeroMQ - zmqContext = zmq.Context() - - socket = zmqContext.socket(zmq.PUSH) - zmqSocketStr = 'tcp://' + zmqIp + ':' + zmqPort - socket.connect(zmqSocketStr) - logging.debug( "Connecting to ZMQ socket: " + str(zmqSocketStr)) - - #send reply back to server - workload = { "filepath": source, "targetPath": target } - workload_json = json.dumps(workload) - try: - socket.send(workload_json) - except: - logging.debug( "Could not send message to ZMQ: " + str(workload)) - - logging.debug( "Send message to ZMQ: " + str(workload)) - -# my_cmd = 'echo "' + source + '" > /tmp/zeromqllpipe' -# p = subprocess.Popen ( my_cmd, shell=True ) -# p.communicate() - - # wait to ZeroMQ to finish -# time.sleep(10) - -#p = subprocess.Popen ( [ 'mv', source, target ], -# stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, -# universal_newlines = False ) -#out, err = p.communicate() - - # We never get here but clean up anyhow - socket.close() - zmqContext.destroy() -- GitLab