Skip to content
Snippets Groups Projects
TaskProvider.py 11.9 KiB
Newer Older
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'

import socket
import zmq
import os
import logging
import sys
import trace
from logutils.queue import QueueHandler
try:
    BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
except:
    BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) )))
SHARED_PATH  = BASE_PATH + os.sep + "src" + os.sep + "shared"

if not SHARED_PATH in sys.path:
    sys.path.append ( SHARED_PATH )
del SHARED_PATH

EVENTDETECTOR_PATH = BASE_PATH + os.sep + "src" + os.sep + "sender" + os.sep + "eventDetectors"
if not EVENTDETECTOR_PATH in sys.path:
    sys.path.append ( EVENTDETECTOR_PATH )
del EVENTDETECTOR_PATH

import helpers


#  --------------------------  class: TaskProvider  --------------------------------------
class TaskProvider():
    def __init__ (self, eventDetectorConfig, requestFwPort, routerPort, logQueue, context = None):
        global BASE_PATH
        #eventDetectorConfig = {
        #        eventDetectorType   : ... ,
        #        monDir       : ... ,
        #        monEventType : ... ,
        #        monSubdirs   : ... ,
        #        monSuffixes  : ... ,
        self.log                = self.getLogger(logQueue)
        self.log.debug("TaskProvider started (PID " + str(os.getpid()) + ").")
        self.dataDetectorModule = None
        self.eventDetector      = None
        self.config             = eventDetectorConfig
        self.log.debug("Configuration for event detector: " + str(self.config))
        eventDetectorModule     = self.config["eventDetectorType"]

        self.localhost          = "127.0.0.1"
        self.extIp              = "0.0.0.0"
        self.requestFwPort      = requestFwPort
        self.routerPort         = routerPort
        self.requestFwSocket    = None
        self.routerSocket       = None

        self.log.debug("Registering ZMQ context")
        # remember if the context was created outside this class or not
        if context:
            self.context    = context
            self.extContext = True
        else:
            self.context    = zmq.Context()
            self.extContext = False


        self.log.info("Loading eventDetector: " + eventDetectorModule)
        self.eventDetectorModule = __import__(eventDetectorModule)
        self.eventDetector = self.eventDetectorModule.EventDetector(self.config, logQueue)
        self.createSockets()

        try:
            self.run()
        except KeyboardInterrupt:
            self.log.debug("Keyboard interruption detected. Shuting down")
        except:
            self.log.info("Stopping TaskProvider due to unknown error condition.", exc_info=True)
    # Send all logs to the main process
    # The worker configuration is done at the start of the worker process run.
    # Note that on Windows you can't rely on fork semantics, so each process
    # will run the logging configuration code when it starts.
    def getLogger (self, queue):
        # Create log and set handler to queue handle
        h = QueueHandler(queue) # Just the one handler needed
        logger = logging.getLogger("TaskProvider")
        logger.propagate = False
        logger.addHandler(h)
        logger.setLevel(logging.DEBUG)

        return logger


    def createSockets (self):
        # socket to get requests
        self.requestFwSocket = self.context.socket(zmq.REQ)
        connectionStr  = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.requestFwPort )
        try:
            self.requestFwSocket.connect(connectionStr)
            self.log.info("Start requestFwSocket (connect): '" + str(connectionStr) + "'")
        except:
            self.log.error("Failed to start requestFwSocket (connect): '" + connectionStr + "'", exc_info=True)

        # socket to disribute the events to the worker
        self.routerSocket = self.context.socket(zmq.PUSH)
        connectionStr  = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.routerPort )
            self.routerSocket.bind(connectionStr)
            self.log.info("Start to router socket (bind): '" + str(connectionStr) + "'")
        except:
            self.log.error("Failed to start router Socket (bind): '" + connectionStr + "'", exc_info=True)


    def run (self):
        while True:
            try:
                # the event for a file /tmp/test/source/local/file1.tif is of the form:
                # {
                #   "sourcePath" : "/tmp/test/source/"
                #   "relativePath": "local"
                #   "filename"   : "file1.tif"
                # }
                workloadList = self.eventDetector.getNewEvent()
            except KeyboardInterrupt:
                break
            except:
                self.log.error("Invalid fileEvent message received.", exc_info=True)
                #skip all further instructions and continue with next iteration
                continue

            #TODO validate workload dict
            for workload in workloadList:
                # get requests for this event
                try:
                    self.log.debug("Get requests...")
                    self.requestFwSocket.send("")
                    requests = cPickle.loads(self.requestFwSocket.recv())
                    self.log.debug("Get requests... done.")
                    self.log.debug("Requests: " + str(requests))
                except:
                    self.log.error("Get Requests... failed.", exc_info=True)


                # build message dict
                try:
                    self.log.debug("Building message dict...")
                    messageDict = cPickle.dumps(workload)  #sets correct escape characters
                    self.log.debug("Building message dict...done.")
                except:
                    self.log.error("Unable to assemble message dict.", exc_info=True)
                    continue

                # send the file to the fileMover
                try:
                    self.log.debug("Sending message...")
                    message = [messageDict]
                    if requests != ["None"]:
                        message.append(cPickle.dumps(requests))
                    self.log.debug(str(message))
                    self.routerSocket.send_multipart(message)
                    self.log.debug("Sending message...done.")
                except:
                    self.log.error("Sending message...failed.", exc_info=True)
Manuela Kuhn's avatar
Manuela Kuhn committed
    def stop (self):
        if self.routerSocket:
            self.routerSocket.close(0)
            self.routerSocket = None
        if self.requestFwSocket:
            self.requestFwSocket.close(0)
            self.requestFwSocket = None
        if not self.extContext and self.context:
Manuela Kuhn's avatar
Manuela Kuhn committed
            self.context.destroy(0)
            self.context = None


Manuela Kuhn's avatar
Manuela Kuhn committed
    def __exit__ (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
    def __del__ (self):
# cannot be defined in "if __name__ == '__main__'" because then it is unbound
# see https://docs.python.org/2/library/multiprocessing.html#windows
class requestResponder():
    def __init__ (self, requestFwPort, logQueue, context = None):
        # Send all logs to the main process
        self.log = self.getLogger(logQueue)
        self.context         = context or zmq.Context.instance()
        self.requestFwSocket = self.context.socket(zmq.REP)
        connectionStr   = "tcp://127.0.0.1:" + requestFwPort
        self.requestFwSocket.bind(connectionStr)
        self.log.info("[requestResponder] requestFwSocket started (bind) for '" + connectionStr + "'")
        self.run()
    # Send all logs to the main process
    # The worker configuration is done at the start of the worker process run.
    # Note that on Windows you can't rely on fork semantics, so each process
    # will run the logging configuration code when it starts.
    def getLogger (self, queue):
        # Create log and set handler to queue handle
        h = QueueHandler(queue) # Just the one handler needed
        logger = logging.getLogger("requestResponder")
        logger.propagate = False
        logger.addHandler(h)
        logger.setLevel(logging.DEBUG)

        return logger
    def run (self):
        hostname = socket.gethostname()
        self.log.info("[requestResponder] Start run")
        openRequests = [[hostname + ':6003', 1], [hostname + ':6004', 0]]
        while True:
            request = self.requestFwSocket.recv()
            self.log.debug("[requestResponder] Received request: " + str(request) )
            self.requestFwSocket.send(cPickle.dumps(openRequests))
            self.log.debug("[requestResponder] Answer: " + str(openRequests) )
    def __exit__(self):
        self.requestFwSocket.close(0)
        self.context.destroy()
if __name__ == '__main__':
    from multiprocessing import Process, freeze_support, Queue
    import time
    from shutil import copyfile
    from subprocess import call
    freeze_support()    #see https://docs.python.org/2/library/multiprocessing.html#windows
Manuela Kuhn's avatar
Manuela Kuhn committed
    logfile = BASE_PATH + os.sep + "logs" + os.sep + "taskProvider.log"
    logsize = 10485760
#    eventDetectorConfig = {
#            "eventDetectorType"   : "inotifyx",
Manuela Kuhn's avatar
Manuela Kuhn committed
#            "monDir"              : BASE_PATH + os.sep + "data" + os.sep + "source",
#            "monEventType"        : "IN_CLOSE_WRITE",
#            "monSubdirs"          : ["commissioning", "current", "local"],
#            "monSuffixes"         : [".tif", ".cbf"]
#            }

    eventDetectorConfig = {
            "eventDetectorType" : "watchdog",
Manuela Kuhn's avatar
Manuela Kuhn committed
            "monDir"            : BASE_PATH + os.sep + "data" + os.sep + "source",
            "monEventType"      : "IN_CLOSE_WRITE",
            "monSubdirs"        : ["commissioning", "current", "local"],
            "monSuffixes"       : [".tif", ".cbf"],
            "timeTillClosed"    : 1 #s
    requestFwPort = "6001"
    routerPort    = "7000"
    logQueue = Queue(-1)
    # Get the log Configuration for the lisener
    h1, h2 = helpers.getLogHandlers(logfile, logsize, verbose=True, onScreenLogLevel="debug")

    # Start queue listener using the stream handler above
    logQueueListener    = helpers.CustomQueueListener(logQueue, h1, h2)
    logQueueListener.start()

    # Create log and set handler to queue handle
    root = logging.getLogger()
    root.setLevel(logging.DEBUG) # Log level = DEBUG
    qh = QueueHandler(logQueue)
    root.addHandler(qh)


    taskProviderPr = Process ( target = TaskProvider, args = (eventDetectorConfig, requestFwPort, routerPort, logQueue) )
    taskProviderPr.start()
    requestResponderPr = Process ( target = requestResponder, args = ( requestFwPort, logQueue) )
    requestResponderPr.start()

Manuela Kuhn's avatar
Manuela Kuhn committed
    context       = zmq.Context.instance()
Manuela Kuhn's avatar
Manuela Kuhn committed
    routerSocket  = context.socket(zmq.PULL)
    connectionStr = "tcp://localhost:" + routerPort
    routerSocket.connect(connectionStr)
    logging.info("=== routerSocket connected to " + connectionStr)
Manuela Kuhn's avatar
Manuela Kuhn committed
    sourceFile = BASE_PATH + os.sep + "test_file.cbf"
    targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep
        while i <= 105:
            time.sleep(0.5)
            targetFile = targetFileBase + str(i) + ".cbf"
            logging.debug("copy to " + targetFile)
            copyfile(sourceFile, targetFile)
#            call(["cp", sourceFile, targetFile])
            workload = routerSocket.recv_multipart()
            logging.info("=== next workload " + str(workload))
    except KeyboardInterrupt:
        pass
    finally:

        requestResponderPr.terminate()
        taskProviderPr.terminate()

        routerSocket.close(0)
        context.destroy()

        for number in range(100, i):
            targetFile = targetFileBase + str(number) + ".cbf"
            logging.debug("remove " + targetFile)
            os.remove(targetFile)

        logQueue.put_nowait(None)
        logQueueListener.stop()