Skip to content
Snippets Groups Projects
Commit 0ace40c7 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Centralize redundant methods

parent 683e26ea
No related branches found
No related tags found
No related merge requests found
...@@ -15,8 +15,12 @@ from multiprocessing import Process, freeze_support ...@@ -15,8 +15,12 @@ from multiprocessing import Process, freeze_support
import subprocess import subprocess
import json import json
import shutil import shutil
import helperScript
DEFAULT_CHUNK_SIZE = 1048576 DEFAULT_CHUNK_SIZE = 1048576
# #
# -------------------------- class: WorkerProcess -------------------------------------- # -------------------------- class: WorkerProcess --------------------------------------
# #
...@@ -554,6 +558,9 @@ class FileMover(): ...@@ -554,6 +558,9 @@ class FileMover():
#
# -------------------------- class: Cleaner --------------------------------------
#
class Cleaner(): class Cleaner():
""" """
* received cleaning jobs via zeromq, * received cleaning jobs via zeromq,
...@@ -772,82 +779,6 @@ def argumentParsing(): ...@@ -772,82 +779,6 @@ def argumentParsing():
def checkFolderForExistance(watchFolderPath):
"""
abort if watch-folder does not exist
:return:
"""
#check folder path for existance. exits if it does not exist
if not os.path.exists(watchFolderPath):
logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath))
sys.exit(1)
def checkLogfileFolder(logfilePath):
"""
abort if watch-folder does not exist
:return:
"""
#check folder path for existance. exits if it does not exist
if not os.path.exists(logfilePath):
logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath))
sys.exit(1)
def initLogging(filenameFullPath, verbose):
#@see https://docs.python.org/2/howto/logging-cookbook.html
#more detailed logging if verbose-option has been set
loggingLevel = logging.INFO
if verbose:
loggingLevel = logging.DEBUG
#log everything to file
logging.basicConfig(level=loggingLevel,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=filenameFullPath,
filemode="a")
#log info to stdout, display messages with different format than the file output
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
# def initLogging(self, logfilePath, verbose):
# #@see https://docs.python.org/2/howto/logging-cookbook.html
#
# logfilePathFull = os.path.join(logfilePath, "cleaner.log")
# logger = logging.getLogger("cleaner")
#
# #more detailed logging if verbose-option has been set
# loggingLevel = logging.INFO
# if verbose:
# loggingLevel = logging.DEBUG
#
#
# #log everything to file
# fileHandler = logging.FileHandler(filename=logfilePathFull,
# mode="a")
# fileHandlerFormat = logging.Formatter(datefmt='%Y-%m-%d_%H:%M:%S',
# fmt='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s')
# fileHandler.setFormatter(fileHandlerFormat)
# fileHandler.setLevel(loggingLevel)
# logger.addHandler(fileHandler)
if __name__ == '__main__': if __name__ == '__main__':
freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows
arguments = argumentParsing() arguments = argumentParsing()
...@@ -870,7 +801,7 @@ if __name__ == '__main__': ...@@ -870,7 +801,7 @@ if __name__ == '__main__':
#enable logging #enable logging
initLogging(logfileFullPath, verbose) helperScript.initLogging(logfileFullPath, verbose)
#create zmq context #create zmq context
...@@ -884,7 +815,6 @@ if __name__ == '__main__': ...@@ -884,7 +815,6 @@ if __name__ == '__main__':
logging.debug("cleaner thread started") logging.debug("cleaner thread started")
#start new fileMover #start new fileMover
# try:
fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort,
parallelDataStreams, chunkSize, parallelDataStreams, chunkSize,
zmqCleanerIp, zmqCleanerPort, zmqCleanerIp, zmqCleanerPort,
......
import os import os
import platform import platform
import logging
...@@ -58,3 +59,55 @@ def isSupported(): ...@@ -58,3 +59,55 @@ def isSupported():
supportValue = True supportValue = True
return supportValue return supportValue
def checkFolderExistance(folderPath):
"""
abort if folder does not exist
:return:
"""
#check folder path for existance. exits if it does not exist
if not os.path.exists(folderPath):
logging.error("Folder '%s' does not exist. Abort." % str(watchFolderPath))
sys.exit(1)
def initLogging(filenameFullPath, verbose):
#@see https://docs.python.org/2/howto/logging-cookbook.html
# def initLogging(self, logfilePath, verbose):
#
# logfilePathFull = os.path.join(logfilePath, "cleaner.log")
#more detailed logging if verbose-option has been set
loggingLevel = logging.INFO
if verbose:
loggingLevel = logging.DEBUG
#log everything to file
logging.basicConfig(level=loggingLevel,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=filenameFullPath,
filemode="a")
# fileHandler = logging.FileHandler(filename=logfilePathFull,
# mode="a")
# fileHandlerFormat = logging.Formatter(datefmt='%Y-%m-%d_%H:%M:%S',
# fmt='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s')
# fileHandler.setFormatter(fileHandlerFormat)
# fileHandler.setLevel(loggingLevel)
# logger.addHandler(fileHandler)
#log info to stdout, display messages with different format than the file output
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
...@@ -13,8 +13,12 @@ import os ...@@ -13,8 +13,12 @@ import os
import traceback import traceback
from stat import S_ISREG, ST_MTIME, ST_MODE from stat import S_ISREG, ST_MTIME, ST_MODE
import threading import threading
import helperScript
#
# -------------------------- class: FileReceiver --------------------------------------
#
class FileReceiver: class FileReceiver:
zmqContext = None zmqContext = None
outputDir = None outputDir = None
...@@ -272,6 +276,9 @@ class FileReceiver: ...@@ -272,6 +276,9 @@ class FileReceiver:
self.log.error(sys.exc_info()) self.log.error(sys.exc_info())
#
# -------------------------- class: Coordinator --------------------------------------
#
class Coordinator: class Coordinator:
zmqContext = None zmqContext = None
liveViewerZmqContext = None liveViewerZmqContext = None
...@@ -441,31 +448,6 @@ def argumentParsing(): ...@@ -441,31 +448,6 @@ def argumentParsing():
return arguments return arguments
def initLogging(filenameFullPath, verbose):
#@see https://docs.python.org/2/howto/logging-cookbook.html
#more detailed logging if verbose-option has been set
loggingLevel = logging.INFO
if verbose:
loggingLevel = logging.DEBUG
#log everything to file
logging.basicConfig(level=loggingLevel,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=filenameFullPath,
filemode="a")
#log info to stdout, display messages with different format than the file output
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
if __name__ == "__main__": if __name__ == "__main__":
...@@ -482,7 +464,7 @@ if __name__ == "__main__": ...@@ -482,7 +464,7 @@ if __name__ == "__main__":
#enable logging #enable logging
initLogging(logfileFilePath, verbose) helperScript.initLogging(logfileFilePath, verbose)
#start file receiver #start file receiver
......
...@@ -23,7 +23,6 @@ class DirectoryWatcherHandler(): ...@@ -23,7 +23,6 @@ class DirectoryWatcherHandler():
def __init__(self, zmqContext, fileEventServerIp, watchFolder, fileEventServerPort): def __init__(self, zmqContext, fileEventServerIp, watchFolder, fileEventServerPort):
logging.debug("DirectoryWatcherHandler: __init__()") logging.debug("DirectoryWatcherHandler: __init__()")
# logging.debug("DirectoryWatcherHandler(): type(zmqContext) = " + str(type(zmqContext)))
logging.info("registering zmq context") logging.info("registering zmq context")
self.zmqContext = zmqContext self.zmqContext = zmqContext
self.watchFolder = os.path.normpath(watchFolder) self.watchFolder = os.path.normpath(watchFolder)
...@@ -250,7 +249,7 @@ def argumentParsing(): ...@@ -250,7 +249,7 @@ def argumentParsing():
sys.exit(1) sys.exit(1)
#check logfile-path for existance #check logfile-path for existance
checkLogfileFolder(arguments.logfilePath) helperScript.checkFolderExistance(arguments.logfilePath)
return arguments return arguments
...@@ -258,58 +257,6 @@ def argumentParsing(): ...@@ -258,58 +257,6 @@ def argumentParsing():
def checkWatchFolder(watchFolderPath):
"""
abort if watch-folder does not exist
:return:
"""
#check folder path for existance. exits if it does not exist
if not os.path.exists(watchFolderPath):
logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath))
sys.exit(1)
def checkLogfileFolder(logfilePath):
"""
abort if watch-folder does not exist
:return:
"""
#check folder path for existance. exits if it does not exist
if not os.path.exists(logfilePath):
logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath))
sys.exit(1)
def initLogging(filenameFullPath, verbose):
#@see https://docs.python.org/2/howto/logging-cookbook.html
#more detailed logging if verbose-option has been set
loggingLevel = logging.INFO
if verbose:
loggingLevel = logging.DEBUG
#log everything to file
logging.basicConfig(level=loggingLevel,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=filenameFullPath,
filemode="a")
#log info to stdout, display messages with different format than the file output
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
if __name__ == '__main__': if __name__ == '__main__':
arguments = argumentParsing() arguments = argumentParsing()
watchFolder = arguments.watchFolder watchFolder = arguments.watchFolder
...@@ -322,11 +269,11 @@ if __name__ == '__main__': ...@@ -322,11 +269,11 @@ if __name__ == '__main__':
communicationWithLcyncdPort = "6080" communicationWithLcyncdPort = "6080"
#abort if watch-folder does not exist #abort if watch-folder does not exist
checkWatchFolder(watchFolder) helperScript.checkFolderExistance(watchFolder)
#enable logging #enable logging
initLogging(logfileFilePath, verbose) helperScript.initLogging(logfileFilePath, verbose)
#create zmq context #create zmq context
...@@ -338,27 +285,6 @@ if __name__ == '__main__': ...@@ -338,27 +285,6 @@ if __name__ == '__main__':
directoryWatcher = DirectoryWatcherHandler(zmqContext, fileEventServerIp, watchFolder, fileEventServerPort) directoryWatcher = DirectoryWatcherHandler(zmqContext, fileEventServerIp, watchFolder, fileEventServerPort)
# pipe_path = "/tmp/zeromqllpipe"
# if not os.path.exists(pipe_path):
# os.mkfifo(pipe_path)
#
# # Open the fifo. We need to open in non-blocking mode or it will stalls until
# # someone opens it for writting
# pipe_fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK)
#
#
# #wait for new files
# with os.fdopen(pipe_fd) as pipe:
# while True:
# message = pipe.read()
# if message:
## print("Received: '%s'" % message)
# pathnames = message.splitlines()
# for filepath in pathnames:
# directoryWatcher.passFileToZeromq(filepath)
# time.sleep(0.1)
workers = zmqContext.socket(zmq.PULL) workers = zmqContext.socket(zmq.PULL)
zmqSocketStr = 'tcp://' + communicationWithLcyncdIp + ':' + communicationWithLcyncdPort zmqSocketStr = 'tcp://' + communicationWithLcyncdIp + ':' + communicationWithLcyncdPort
workers.bind(zmqSocketStr) workers.bind(zmqSocketStr)
......
...@@ -50,7 +50,7 @@ gpfs = { ...@@ -50,7 +50,7 @@ gpfs = {
-- spawn ( -- spawn (
-- event, -- event,
-- '/usr/bin/python', -- '/usr/bin/python',
-- '/space/projects/Live_Viewer/wrapper_script.py', -- '/space/projects/Live_Viewer/ZeroMQTunnel/wrapper_script.py',
-- '--mv_source', -- '--mv_source',
-- event.sourcePath, -- event.sourcePath,
-- '--mv_target', -- '--mv_target',
...@@ -87,7 +87,7 @@ gpfs = { ...@@ -87,7 +87,7 @@ gpfs = {
spawn ( spawn (
event, event,
'/usr/bin/python', '/usr/bin/python',
'/space/projects/Live_Viewer/wrapper_script.py', '/space/projects/Live_Viewer/ZeroMQTunnel/wrapper_script.py',
'--mv_source', '--mv_source',
event.sourcePath, event.sourcePath,
'--mv_target', '--mv_target',
......
import argparse
import subprocess
import os
import time
import zmq
import json
import logging
def initLogging(filenameFullPath, verbose):
#@see https://docs.python.org/2/howto/logging-cookbook.html
#more detailed logging if verbose-option has been set
loggingLevel = logging.INFO
if verbose:
loggingLevel = logging.DEBUG
#log everything to file
logging.basicConfig(level=loggingLevel,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=filenameFullPath,
filemode="a")
#log info to stdout, display messages with different format than the file output
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
supportedFormats = [ "tif", "cbf", "hdf5"]
watchFolder = "/space/projects/Live_Viewer/source/"
logfile = "/space/projects/Live_Viewer/logs/wrapper_script.log"
verbose = True
#enable logging
initLogging(logfile, verbose)
parser = argparse.ArgumentParser()
parser.add_argument("--mv_source", help = "Move source")
parser.add_argument("--mv_target", help = "Move target")
arguments = parser.parse_args()
source = os.path.normpath ( arguments.mv_source )
target = os.path.normpath ( arguments.mv_target )
( parentDir, filename ) = os.path.split ( source )
commonPrefix = os.path.commonprefix ( [ watchFolder, source ] )
relativebasepath = os.path.relpath ( source, commonPrefix )
( relativeParent, blub ) = os.path.split ( relativebasepath )
( name, postfix ) = filename.split( "." )
supported_file = postfix in supportedFormats
zmqIp = "127.0.0.1"
zmqPort = "6080"
if supported_file:
# set up ZeroMQ
zmqContext = zmq.Context()
socket = zmqContext.socket(zmq.PUSH)
zmqSocketStr = 'tcp://' + zmqIp + ':' + zmqPort
socket.connect(zmqSocketStr)
logging.debug( "Connecting to ZMQ socket: " + str(zmqSocketStr))
#send reply back to server
workload = { "filepath": source, "targetPath": target }
workload_json = json.dumps(workload)
try:
socket.send(workload_json)
except:
logging.debug( "Could not send message to ZMQ: " + str(workload))
logging.debug( "Send message to ZMQ: " + str(workload))
# my_cmd = 'echo "' + source + '" > /tmp/zeromqllpipe'
# p = subprocess.Popen ( my_cmd, shell=True )
# p.communicate()
# wait to ZeroMQ to finish
# time.sleep(10)
#p = subprocess.Popen ( [ 'mv', source, target ],
# stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE,
# universal_newlines = False )
#out, err = p.communicate()
# We never get here but clean up anyhow
socket.close()
zmqContext.destroy()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment