Skip to content
Snippets Groups Projects
ZmqDetector.py 5.94 KiB
Newer Older
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'


import os
import zmq
import logging
from logutils.queue import QueueHandler


#class ZmqDetector():
class EventDetector():
Manuela Kuhn's avatar
Manuela Kuhn committed
    def __init__ (self, config, logQueue):

        self.log = self.getLogger(logQueue)

        # check format of config
        checkPassed = True
        if ( not config.has_key("context") or
                not config.has_key("eventPort") or
                not config.has_key("numberOfStreams") ):
            self.log.error ("Configuration of wrong format")
            self.log.debug ("config="+ str(config))
            checkPassed = False


        if checkPassed:
            self.eventPort       = config["eventPort"]
            self.extIp           = "0.0.0.0"
            self.eventSocket     = None

            self.numberOfStreams = config["numberOfStreams"]

            self.log.debug("Registering ZMQ context")
            # remember if the context was created outside this class or not
            if config["context"]:
                self.context    = config["context"]
                self.extContext = True
            else:
                self.context    = zmq.Context()
                self.extContext = False

            self.createSockets()


    # Send all logs to the main process
    # The worker configuration is done at the start of the worker process run.
    # Note that on Windows you can't rely on fork semantics, so each process
    # will run the logging configuration code when it starts.
    def getLogger (self, queue):
        # Create log and set handler to queue handle
        h = QueueHandler(queue) # Just the one handler needed
        logger = logging.getLogger("ZmqDetector")
        logger.propagate = False
        logger.addHandler(h)
        logger.setLevel(logging.DEBUG)

        return logger


Manuela Kuhn's avatar
Manuela Kuhn committed
    def createSockets (self):
        # create zmq socket to get events
        self.eventSocket = self.context.socket(zmq.PULL)
        connectionStr  = "tcp://{ip}:{port}".format(ip=self.extIp, port=self.eventPort)
        try:
            self.eventSocket.bind(connectionStr)
            self.log.info("Start eventSocket (bind): '" + connectionStr + "'")
        except:
            self.log.error("Failed to start eventSocket (bind): '" + connectionStr + "'", exc_info=True)


Manuela Kuhn's avatar
Manuela Kuhn committed
    def getNewEvent (self):

        eventMessage = self.eventSocket.recv()

        if eventMessage == b"CLOSE_FILE":
            eventMessageList = [ eventMessage for i in range(self.numberOfStreams) ]
        else:
            eventMessageList = [ cPickle.loads(eventMessage) ]

        self.log.debug("eventMessage: " + str(eventMessageList))

        return eventMessageList



Manuela Kuhn's avatar
Manuela Kuhn committed
    def stop (self):
        #close ZMQ
        if self.eventSocket:
            self.eventSocket.close(0)
            self.eventSocket = None

        # if the context was created inside this class,
        # it has to be destroyed also within the class
        if not self.externalContext and self.context:
            try:
                self.log.info("Closing ZMQ context...")
Manuela Kuhn's avatar
Manuela Kuhn committed
                self.context.destroy(0)
                self.context = None
                self.log.info("Closing ZMQ context...done.")
            except:
                self.log.error("Closing ZMQ context...failed.", exc_info=True)


Manuela Kuhn's avatar
Manuela Kuhn committed
    def __exit__ (self):
        self.stop()


    def __del__ (self):
        self.stop()


if __name__ == '__main__':
    import sys
    import time
    from subprocess import call
    from multiprocessing import Queue

    BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ))))
    SHARED_PATH  = BASE_PATH + os.sep + "src" + os.sep + "shared"
    print "SHARED", SHARED_PATH

    if not SHARED_PATH in sys.path:
        sys.path.append ( SHARED_PATH )
    del SHARED_PATH

    import helpers

    logfile  = BASE_PATH + os.sep + "logs" + os.sep + "zmqDetector.log"
    logsize  = 10485760

    logQueue = Queue(-1)

    # Get the log Configuration for the lisener
    h1, h2 = helpers.getLogHandlers(logfile, logsize, verbose=True, onScreenLogLevel="debug")

    # Start queue listener using the stream handler above
    logQueueListener = helpers.CustomQueueListener(logQueue, h1, h2)
    logQueueListener.start()

    # Create log and set handler to queue handle
    root = logging.getLogger()
    root.setLevel(logging.DEBUG) # Log level = DEBUG
    qh = QueueHandler(logQueue)
    root.addHandler(qh)


    eventPort       = "6001"
    numberOfStreams = 4
    config = {
            "eventDetectorType" : "zmq",
            "eventPort"         : eventPort,
            "numberOfStreams"   : numberOfStreams,
#    eventDetector = ZmqDetector(config, logQueue)
    eventDetector = EventDetector(config, logQueue)

    sourceFile = BASE_PATH + os.sep + "test_file.cbf"
    targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep


    context        = zmq.Context.instance()

    # create zmq socket to send events
    eventSocket    = context.socket(zmq.PUSH)
    connectionStr  = "tcp://localhost:{port}".format(port=eventPort)
    eventSocket.connect(connectionStr)
    logging.info("Start eventSocket (connect): '" + connectionStr + "'")


    i = 100
        try:
            logging.debug("generate event")
            targetFile = targetFileBase + str(i) + ".cbf"
            message = {
                    "filename" : targetFile,
                    "filePart" : 0
                    }
            eventSocket.send(cPickle.dumps(message))
            i += 1

            eventList = eventDetector.getNewEvent()
            if eventList:
                print "eventList:", eventList

            time.sleep(1)
        except KeyboardInterrupt:
            break

    eventSocket.send(b"CLOSE_FILE")

    eventList = eventDetector.getNewEvent()
    print "eventList:", eventList

    logQueue.put_nowait(None)
    logQueueListener.stop()