Skip to content
Snippets Groups Projects
sender.py 5.33 KiB
from __builtin__ import open, type

__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'


import time
import argparse
import zmq
import os
import logging
import sys
from multiprocessing import Process, freeze_support

BASE_PATH   = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ))
ZEROMQ_PATH = BASE_PATH + os.sep + "src" + os.sep + "ZeroMQTunnel"
CONFIG_PATH = BASE_PATH + os.sep + "conf"

sys.path.append ( ZEROMQ_PATH )
sys.path.append ( CONFIG_PATH )

import helperScript
from watcher import DirectoryWatcher
from Cleaner import Cleaner

from config import defaultConfigSender


class Sender():
    logfilePath         = None
    logfileName         = None
    logfileFullPath     = None
    verbose             = None

    watchFolder         = None
    monitoredSubfolders = None
    monitoredSuffixes   = None
    fileEventIp         = None
    fileEventPort       = None

    dataStreamIp        = None
    dataStreamPort      = None
    cleanerTargetPath   = None
    zmqCleanerIp        = None
    zmqCleanerPort      = None
    cleanerComPort      = None
    receiverComPort     = None
    receiverWhiteList   = None

    parallelDataStreams = None
    chunkSize           = None

    zmqContext          = None

    def __init__(self, verbose = True):
        defConf                  = defaultConfigSender()

        self.logfilePath         = defConf.logfilePath
        self.logfileName         = defConf.logfileName
        self.logfileFullPath     = os.path.join(self.logfilePath, self.logfileName)
        self.verbose             = verbose

        self.watchFolder         = defConf.watchFolder
        self.monitoredSubfolders = defConf.monitoredSubfolders
        self.monitoredFormats    = defConf.monitoredFormats
        self.fileEventIp         = defConf.fileEventIp
        self.fileEventPort       = defConf.fileEventPort

        self.dataStreamIp        = defConf.dataStreamIp
        self.dataStreamPort      = defConf.dataStreamPort
        self.cleanerTargetPath   = defConf.cleanerTargetPath
        self.cleanerIp           = defConf.cleanerIp
        self.cleanerPort         = defConf.cleanerPort
        self.receiverComPort     = defConf.receiverComPort
        self.ondaIps             = defConf.ondaIps
        self.ondaPorts           = defConf.ondaPorts
        self.receiverWhiteList   = defConf.receiverWhiteList

        self.parallelDataStreams = defConf.parallelDataStreams
        self.chunkSize           = defConf.chunkSize

        #enable logging
        helperScript.initLogging(self.logfileFullPath, self.verbose)



        #create zmq context
        # there should be only one context in one process
        self.zmqContext = zmq.Context.instance()
        logging.info("registering zmq global context")

        self.run()


    def run(self):
        logging.debug("start watcher process...")
        watcherProcess = Process(target=DirectoryWatcher, args=(self.fileEventIp, self.watchFolder, self.fileEventPort, self.monitoredSubfolders, self.monitoredFormats, self.zmqContext))
        logging.debug("watcher process registered")
        watcherProcess.start()
        logging.debug("start watcher process...done")

        logging.debug("start cleaner process...")
        cleanerProcess = Process(target=Cleaner, args=(self.cleanerTargetPath, self.cleanerIp, self.cleanerPort, self.zmqContext))
        logging.debug("cleaner process registered")
        cleanerProcess.start()
        logging.debug("start cleaner process...done")


        # due to a logging problem with fabio the import statement (using fabio) have to be placed after the logging is initialized
        from FileMover import FileMover

        #start new fileMover
        fileMover = FileMover(self.fileEventIp, self.fileEventPort, self.dataStreamIp, self.dataStreamPort,
                              self.receiverComPort, self.receiverWhiteList,
                              self.parallelDataStreams, self.chunkSize,
                              self.cleanerIp, self.cleanerPort,
                              self.ondaIps, self.ondaPorts,
                              self.zmqContext)
        try:
            fileMover.process()
        except KeyboardInterrupt:
            logging.info("Keyboard interruption detected. Shutting down")
        # except Exception, e:
        #     print "unknown exception detected."
        finally:
            logging.debug("shutting down zeromq...")
            try:
                fileMover.stop()
                logging.debug("shutting down zeromq...done.")
            except:
                logging.error(sys.exc_info())
                logging.error("shutting down zeromq...failed.")

            # give the other processes time to close the sockets
            time.sleep(0.1)
            try:
                logging.debug("closing zmqContext...")
                self.zmqContext.destroy()
                logging.debug("closing zmqContext...done.")
            except:
                logging.debug("closing zmqContext...failed.")
                logging.error(sys.exc_info())





if __name__ == '__main__':
    freeze_support()    #see https://docs.python.org/2/library/multiprocessing.html#windows

    parser = argparse.ArgumentParser()
    parser.add_argument("--verbose", action="store_true", help="more verbose output")
    arguments = parser.parse_args()

    sender = Sender(arguments.verbose)