Skip to content
Snippets Groups Projects
LambdaDetector.py 5.16 KiB
Newer Older
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'


import os
import zmq
import logging
from logutils.queue import QueueHandler


class LambdaDetector():

    def __init__(self, config, logQueue):

        self.log = self.getLogger(logQueue)

        # check format of config
        checkPassed = True
        if ( not config.has_key("context") or
                not config.has_key("eventPort") ):
            self.log.error ("Configuration of wrong format")
            self.log.debug ("config="+ str(config))
            checkPassed = False


        if checkPassed:
            self.eventPort   = config["eventPort"]
            self.extIp       = "0.0.0.0"
            self.eventSocket = None

            self.log.debug("Registering ZMQ context")
            # remember if the context was created outside this class or not
            if config["context"]:
                self.context    = config["context"]
                self.extContext = True
            else:
                self.context    = zmq.Context()
                self.extContext = False

            self.createSockets()


    # Send all logs to the main process
    # The worker configuration is done at the start of the worker process run.
    # Note that on Windows you can't rely on fork semantics, so each process
    # will run the logging configuration code when it starts.
    def getLogger (self, queue):
        # Create log and set handler to queue handle
        h = QueueHandler(queue) # Just the one handler needed
        logger = logging.getLogger("lambdaDetector")
        logger.propagate = False
        logger.addHandler(h)
        logger.setLevel(logging.DEBUG)

        return logger


    def createSockets(self):
        # create zmq socket to get events
        self.eventSocket = self.context.socket(zmq.PULL)
        connectionStr  = "tcp://{ip}:{port}".format(ip=self.extIp, port=self.eventPort)
        try:
            self.eventSocket.bind(connectionStr)
            self.log.info("Start eventSocket (bind): '" + connectionStr + "'")
        except:
            self.log.error("Failed to start eventSocket (bind): '" + connectionStr + "'", exc_info=True)


    def getNewEvent(self):

        eventMessageList = None
        eventMessage = {}

        eventMessage = self.eventSocket.recv()
        self.log.debug("eventMessage: " + str(eventMessage))

        return eventMessageList



    def stop(self):
        #close ZMQ
        if self.eventSocket:
            self.eventSocket.close(0)
            self.eventSocket = None

        # if the context was created inside this class,
        # it has to be destroyed also within the class
        if not self.externalContext and self.context:
            try:
                self.log.info("Closing ZMQ context...")
                self.context.destroy()
                self.context = None
                self.log.info("Closing ZMQ context...done.")
            except:
                self.log.error("Closing ZMQ context...failed.", exc_info=True)


    def __exit__(self):
        self.stop()


if __name__ == '__main__':
    import sys
    import time
    from subprocess import call
    from multiprocessing import Queue

    BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
    SHARED_PATH  = BASE_PATH + os.sep + "src" + os.sep + "shared"
    print "SHARED", SHARED_PATH

    if not SHARED_PATH in sys.path:
        sys.path.append ( SHARED_PATH )
    del SHARED_PATH

    import helpers

    logfile  = BASE_PATH + os.sep + "logs" + os.sep + "lambdaDetector.log"

    logQueue = Queue(-1)

    # Get the log Configuration for the lisener
    h1, h2 = helpers.getLogHandlers(logfile, verbose=True, onScreenLogLevel="debug")

    # Start queue listener using the stream handler above
    logQueueListener = helpers.CustomQueueListener(logQueue, h1, h2)
    logQueueListener.start()

    # Create log and set handler to queue handle
    root = logging.getLogger()
    root.setLevel(logging.DEBUG) # Log level = DEBUG
    qh = QueueHandler(logQueue)
    root.addHandler(qh)


    eventPort = "6001"
    config = {
            "eventDetectorType" : "lambda",
            "eventPort"         : eventPort,
            "context"           : None,
            }


    eventDetector = LambdaDetector(config, logQueue)

    sourceFile = BASE_PATH + os.sep + "test_file.cbf"
    targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep


    context        = zmq.Context.instance()

    # create zmq socket to send events
    eventSocket    = context.socket(zmq.PUSH)
    connectionStr  = "tcp://localhost:{port}".format(port=eventPort)
    eventSocket.connect(connectionStr)
    logging.info("Start eventSocket (connect): '" + connectionStr + "'")


    i = 100
    while i <= 105:
        try:
            logging.debug("generate event")
            targetFile = targetFileBase + str(i) + ".cbf"
            eventSocket.send(targetFile)
            i += 1

            eventList = eventDetector.getNewEvent()
            if eventList:
                print "eventList:", eventList

            time.sleep(1)
        except KeyboardInterrupt:
            break

    logQueue.put_nowait(None)
    logQueueListener.stop()