Skip to content
Snippets Groups Projects
Commit 8480e2cd authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Event detector for Lambda use case

parent c0c8ddec
No related branches found
No related tags found
No related merge requests found
......@@ -503,15 +503,14 @@ class dataTransfer():
# if the context was created inside this class,
# it has to be destroyed also within the class
if not self.externalContext:
if not self.externalContext and self.context:
try:
if self.context:
self.log.info("closing ZMQ context...")
self.context.destroy()
self.context = None
self.log.info("closing ZMQ context...done.")
self.log.info("Closing ZMQ context...")
self.context.destroy()
self.context = None
self.log.info("Closing ZMQ context...done.")
except:
self.log.error("closing ZMQ context...failed.", exc_info=True)
self.log.error("Closing ZMQ context...failed.", exc_info=True)
def __exit__(self):
......
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import os
import zmq
import logging
from logutils.queue import QueueHandler
class LambdaDetector():
def __init__(self, config, logQueue):
self.log = self.getLogger(logQueue)
# check format of config
checkPassed = True
if ( not config.has_key("context") or
not config.has_key("eventPort") ):
self.log.error ("Configuration of wrong format")
self.log.debug ("config="+ str(config))
checkPassed = False
if checkPassed:
self.eventPort = config["eventPort"]
self.extIp = "0.0.0.0"
self.eventSocket = None
self.log.debug("Registering ZMQ context")
# remember if the context was created outside this class or not
if config["context"]:
self.context = config["context"]
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
self.createSockets()
# Send all logs to the main process
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def getLogger (self, queue):
# Create log and set handler to queue handle
h = QueueHandler(queue) # Just the one handler needed
logger = logging.getLogger("lambdaDetector")
logger.propagate = False
logger.addHandler(h)
logger.setLevel(logging.DEBUG)
return logger
def createSockets(self):
# create zmq socket to get events
self.eventSocket = self.context.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format(ip=self.extIp, port=self.eventPort)
try:
self.eventSocket.bind(connectionStr)
self.log.info("Start eventSocket (bind): '" + connectionStr + "'")
except:
self.log.error("Failed to start eventSocket (bind): '" + connectionStr + "'", exc_info=True)
def getNewEvent(self):
eventMessageList = None
eventMessage = {}
eventMessage = self.eventSocket.recv()
self.log.debug("eventMessage: " + str(eventMessage))
return eventMessageList
def stop(self):
#close ZMQ
if self.eventSocket:
self.eventSocket.close(0)
self.eventSocket = None
# if the context was created inside this class,
# it has to be destroyed also within the class
if not self.externalContext and self.context:
try:
self.log.info("Closing ZMQ context...")
self.context.destroy()
self.context = None
self.log.info("Closing ZMQ context...done.")
except:
self.log.error("Closing ZMQ context...failed.", exc_info=True)
def __exit__(self):
self.stop()
if __name__ == '__main__':
import sys
import time
from subprocess import call
from multiprocessing import Queue
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
print "SHARED", SHARED_PATH
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
import helpers
logfile = BASE_PATH + os.sep + "logs" + os.sep + "lambdaDetector.log"
logQueue = Queue(-1)
# Get the log Configuration for the lisener
h1, h2 = helpers.getLogHandlers(logfile, verbose=True, onScreenLogLevel="debug")
# Start queue listener using the stream handler above
logQueueListener = helpers.CustomQueueListener(logQueue, h1, h2)
logQueueListener.start()
# Create log and set handler to queue handle
root = logging.getLogger()
root.setLevel(logging.DEBUG) # Log level = DEBUG
qh = QueueHandler(logQueue)
root.addHandler(qh)
eventPort = "6001"
config = {
"eventDetectorType" : "lambda",
"eventPort" : eventPort,
"context" : None,
}
eventDetector = LambdaDetector(config, logQueue)
sourceFile = BASE_PATH + os.sep + "test_file.cbf"
targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep
context = zmq.Context.instance()
# create zmq socket to send events
eventSocket = context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:{port}".format(port=eventPort)
eventSocket.connect(connectionStr)
logging.info("Start eventSocket (connect): '" + connectionStr + "'")
i = 100
while i <= 105:
try:
logging.debug("generate event")
targetFile = targetFileBase + str(i) + ".cbf"
eventSocket.send(targetFile)
i += 1
eventList = eventDetector.getNewEvent()
if eventList:
print "eventList:", eventList
time.sleep(1)
except KeyboardInterrupt:
break
logQueue.put_nowait(None)
logQueueListener.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment