-
Manuela Kuhn authoredManuela Kuhn authored
sender.py 5.33 KiB
from __builtin__ import open, type
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'
import time
import argparse
import zmq
import os
import logging
import sys
from multiprocessing import Process, freeze_support
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ))
ZEROMQ_PATH = BASE_PATH + os.sep + "src" + os.sep + "ZeroMQTunnel"
CONFIG_PATH = BASE_PATH + os.sep + "conf"
sys.path.append ( ZEROMQ_PATH )
sys.path.append ( CONFIG_PATH )
import helperScript
from watcher import DirectoryWatcher
from Cleaner import Cleaner
from config import defaultConfigSender
class Sender():
logfilePath = None
logfileName = None
logfileFullPath = None
verbose = None
watchFolder = None
monitoredSubfolders = None
monitoredSuffixes = None
fileEventIp = None
fileEventPort = None
dataStreamIp = None
dataStreamPort = None
cleanerTargetPath = None
zmqCleanerIp = None
zmqCleanerPort = None
cleanerComPort = None
receiverComPort = None
receiverWhiteList = None
parallelDataStreams = None
chunkSize = None
zmqContext = None
def __init__(self, verbose = True):
defConf = defaultConfigSender()
self.logfilePath = defConf.logfilePath
self.logfileName = defConf.logfileName
self.logfileFullPath = os.path.join(self.logfilePath, self.logfileName)
self.verbose = verbose
self.watchFolder = defConf.watchFolder
self.monitoredSubfolders = defConf.monitoredSubfolders
self.monitoredFormats = defConf.monitoredFormats
self.fileEventIp = defConf.fileEventIp
self.fileEventPort = defConf.fileEventPort
self.dataStreamIp = defConf.dataStreamIp
self.dataStreamPort = defConf.dataStreamPort
self.cleanerTargetPath = defConf.cleanerTargetPath
self.cleanerIp = defConf.cleanerIp
self.cleanerPort = defConf.cleanerPort
self.receiverComPort = defConf.receiverComPort
self.ondaIps = defConf.ondaIps
self.ondaPorts = defConf.ondaPorts
self.receiverWhiteList = defConf.receiverWhiteList
self.parallelDataStreams = defConf.parallelDataStreams
self.chunkSize = defConf.chunkSize
#enable logging
helperScript.initLogging(self.logfileFullPath, self.verbose)
#create zmq context
# there should be only one context in one process
self.zmqContext = zmq.Context.instance()
logging.info("registering zmq global context")
self.run()
def run(self):
logging.debug("start watcher process...")
watcherProcess = Process(target=DirectoryWatcher, args=(self.fileEventIp, self.watchFolder, self.fileEventPort, self.monitoredSubfolders, self.monitoredFormats, self.zmqContext))
logging.debug("watcher process registered")
watcherProcess.start()
logging.debug("start watcher process...done")
logging.debug("start cleaner process...")
cleanerProcess = Process(target=Cleaner, args=(self.cleanerTargetPath, self.cleanerIp, self.cleanerPort, self.zmqContext))
logging.debug("cleaner process registered")
cleanerProcess.start()
logging.debug("start cleaner process...done")
# due to a logging problem with fabio the import statement (using fabio) have to be placed after the logging is initialized
from FileMover import FileMover
#start new fileMover
fileMover = FileMover(self.fileEventIp, self.fileEventPort, self.dataStreamIp, self.dataStreamPort,
self.receiverComPort, self.receiverWhiteList,
self.parallelDataStreams, self.chunkSize,
self.cleanerIp, self.cleanerPort,
self.ondaIps, self.ondaPorts,
self.zmqContext)
try:
fileMover.process()
except KeyboardInterrupt:
logging.info("Keyboard interruption detected. Shutting down")
# except Exception, e:
# print "unknown exception detected."
finally:
logging.debug("shutting down zeromq...")
try:
fileMover.stop()
logging.debug("shutting down zeromq...done.")
except:
logging.error(sys.exc_info())
logging.error("shutting down zeromq...failed.")
# give the other processes time to close the sockets
time.sleep(0.1)
try:
logging.debug("closing zmqContext...")
self.zmqContext.destroy()
logging.debug("closing zmqContext...done.")
except:
logging.debug("closing zmqContext...failed.")
logging.error(sys.exc_info())
if __name__ == '__main__':
freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows
parser = argparse.ArgumentParser()
parser.add_argument("--verbose", action="store_true", help="more verbose output")
arguments = parser.parse_args()
sender = Sender(arguments.verbose)