Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import os
import zmq
import logging
from logutils.queue import QueueHandler
class LambdaDetector():
def __init__(self, config, logQueue):
self.log = self.getLogger(logQueue)
# check format of config
checkPassed = True
if ( not config.has_key("context") or
not config.has_key("eventPort") ):
self.log.error ("Configuration of wrong format")
self.log.debug ("config="+ str(config))
checkPassed = False
if checkPassed:
self.eventPort = config["eventPort"]
self.extIp = "0.0.0.0"
self.eventSocket = None
self.log.debug("Registering ZMQ context")
# remember if the context was created outside this class or not
if config["context"]:
self.context = config["context"]
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
self.createSockets()
# Send all logs to the main process
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def getLogger (self, queue):
# Create log and set handler to queue handle
h = QueueHandler(queue) # Just the one handler needed
logger = logging.getLogger("lambdaDetector")
logger.propagate = False
logger.addHandler(h)
logger.setLevel(logging.DEBUG)
return logger
def createSockets(self):
# create zmq socket to get events
self.eventSocket = self.context.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format(ip=self.extIp, port=self.eventPort)
try:
self.eventSocket.bind(connectionStr)
self.log.info("Start eventSocket (bind): '" + connectionStr + "'")
except:
self.log.error("Failed to start eventSocket (bind): '" + connectionStr + "'", exc_info=True)
def getNewEvent(self):
eventMessageList = None
eventMessage = {}
eventMessage = self.eventSocket.recv()
self.log.debug("eventMessage: " + str(eventMessage))
return eventMessageList
def stop(self):
#close ZMQ
if self.eventSocket:
self.eventSocket.close(0)
self.eventSocket = None
# if the context was created inside this class,
# it has to be destroyed also within the class
if not self.externalContext and self.context:
try:
self.log.info("Closing ZMQ context...")
self.context.destroy()
self.context = None
self.log.info("Closing ZMQ context...done.")
except:
self.log.error("Closing ZMQ context...failed.", exc_info=True)
def __exit__(self):
self.stop()
if __name__ == '__main__':
import sys
import time
from subprocess import call
from multiprocessing import Queue
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
print "SHARED", SHARED_PATH
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
import helpers
logfile = BASE_PATH + os.sep + "logs" + os.sep + "lambdaDetector.log"
logQueue = Queue(-1)
# Get the log Configuration for the lisener
h1, h2 = helpers.getLogHandlers(logfile, verbose=True, onScreenLogLevel="debug")
# Start queue listener using the stream handler above
logQueueListener = helpers.CustomQueueListener(logQueue, h1, h2)
logQueueListener.start()
# Create log and set handler to queue handle
root = logging.getLogger()
root.setLevel(logging.DEBUG) # Log level = DEBUG
qh = QueueHandler(logQueue)
root.addHandler(qh)
eventPort = "6001"
config = {
"eventDetectorType" : "lambda",
"eventPort" : eventPort,
"context" : None,
}
eventDetector = LambdaDetector(config, logQueue)
sourceFile = BASE_PATH + os.sep + "test_file.cbf"
targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep
context = zmq.Context.instance()
# create zmq socket to send events
eventSocket = context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:{port}".format(port=eventPort)
eventSocket.connect(connectionStr)
logging.info("Start eventSocket (connect): '" + connectionStr + "'")
i = 100
while i <= 105:
try:
logging.debug("generate event")
targetFile = targetFileBase + str(i) + ".cbf"
eventSocket.send(targetFile)
i += 1
eventList = eventDetector.getNewEvent()
if eventList:
print "eventList:", eventList
time.sleep(1)
except KeyboardInterrupt:
break
logQueue.put_nowait(None)
logQueueListener.stop()