Skip to content
Snippets Groups Projects
Commit e4706795 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Changed format of events in dataIngestAPI

parent 8b593fd6
No related branches found
No related tags found
No related merge requests found
...@@ -81,7 +81,7 @@ class dataIngest(): ...@@ -81,7 +81,7 @@ class dataIngest():
raise raise
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO) # using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
self.poller = zmq.Poller() # self.poller = zmq.Poller()
self.poller.register(self.signalSocket, zmq.POLLIN) self.poller.register(self.signalSocket, zmq.POLLIN)
...@@ -122,11 +122,13 @@ class dataIngest(): ...@@ -122,11 +122,13 @@ class dataIngest():
def write (self, data): def write (self, data):
# send event to eventDetector # send event to eventDetector
message = { # message = {
"filename" : self.filename, # "filename" : self.filename,
"filePart" : self.filePart # "filePart" : self.filePart
} # }
self.eventSocket.send(cPickle.dumps(message)) # message = "{ 'filename': " + self.filename + ", 'filePart': " + self.filePart + "}"
message = '{ "filePart": ' + str(self.filePart) + ', "filename": "' + self.filename + '" }'
self.eventSocket.send(message)
# send data to ZMQ-Queue # send data to ZMQ-Queue
self.dataSocket.send(data) self.dataSocket.send(data)
...@@ -137,20 +139,28 @@ class dataIngest(): ...@@ -137,20 +139,28 @@ class dataIngest():
def closeFile (self): def closeFile (self):
# send close-signal to signal socket # send close-signal to signal socket
sendMessage = "CLOSE_FILE" sendMessage = "CLOSE_FILE"
self.signalSocket.send(sendMessage) try:
self.log.info("Sending signal to close the file to signalSocket.") self.signalSocket.send(sendMessage)
self.log.info("Sending signal to close the file to signalSocket.")
except:
raise Exception("Sending signal to close the file to signalSocket...failed.")
# send close-signal to event Detector # send close-signal to event Detector
self.eventSocket.send(sendMessage) try:
self.log.debug("Sending signal to close the file to eventSocket.(sendMessage=" + sendMessage + ")") self.eventSocket.send(sendMessage)
self.log.debug("Sending signal to close the file to eventSocket. (sendMessage=" + sendMessage + ")")
except:
raise Exception("Sending signal to close the file to eventSocket...failed.")
try: try:
socks = dict(self.poller.poll(10000)) # in ms socks = dict(self.poller.poll(10000)) # in ms
except: except:
socks = None
self.log.error("Could not poll for signal", exc_info=True) self.log.error("Could not poll for signal", exc_info=True)
# if there was a response # if there was a response
if self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN: if socks and self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
self.log.info("Received answer to signal...")
# Get the reply. # Get the reply.
recvMessage = self.signalSocket.recv() recvMessage = self.signalSocket.recv()
self.log.info("Received answer to signal: " + str(recvMessage) ) self.log.info("Received answer to signal: " + str(recvMessage) )
...@@ -174,7 +184,7 @@ class dataIngest(): ...@@ -174,7 +184,7 @@ class dataIngest():
def stop (self): def stop (self):
try: try:
if self.signalSocket: if self.signalSocket:
self.log.info("closing eventSocket...") self.log.info("closing signalSocket...")
self.signalSocket.close(linger=0) self.signalSocket.close(linger=0)
self.signalSocket = None self.signalSocket = None
if self.eventSocket: if self.eventSocket:
...@@ -193,7 +203,7 @@ class dataIngest(): ...@@ -193,7 +203,7 @@ class dataIngest():
if not self.extContext and self.context: if not self.extContext and self.context:
try: try:
self.log.info("Closing ZMQ context...") self.log.info("Closing ZMQ context...")
self.context.destroy() self.context.destroy(0)
self.context = None self.context = None
self.log.info("Closing ZMQ context...done.") self.log.info("Closing ZMQ context...done.")
except: except:
......
...@@ -5,6 +5,7 @@ import os ...@@ -5,6 +5,7 @@ import os
import zmq import zmq
import logging import logging
import cPickle import cPickle
import json
from logutils.queue import QueueHandler from logutils.queue import QueueHandler
...@@ -77,7 +78,8 @@ class EventDetector(): ...@@ -77,7 +78,8 @@ class EventDetector():
if eventMessage == b"CLOSE_FILE": if eventMessage == b"CLOSE_FILE":
eventMessageList = [ eventMessage for i in range(self.numberOfStreams) ] eventMessageList = [ eventMessage for i in range(self.numberOfStreams) ]
else: else:
eventMessageList = [ cPickle.loads(eventMessage) ] eventMessageList = [ json.loads(eventMessage) ]
# eventMessageList = [ cPickle.loads(eventMessage) ]
self.log.debug("eventMessage: " + str(eventMessageList)) self.log.debug("eventMessage: " + str(eventMessageList))
...@@ -93,7 +95,7 @@ class EventDetector(): ...@@ -93,7 +95,7 @@ class EventDetector():
# if the context was created inside this class, # if the context was created inside this class,
# it has to be destroyed also within the class # it has to be destroyed also within the class
if not self.externalContext and self.context: if not self.extContext and self.context:
try: try:
self.log.info("Closing ZMQ context...") self.log.info("Closing ZMQ context...")
self.context.destroy(0) self.context.destroy(0)
...@@ -177,11 +179,13 @@ if __name__ == '__main__': ...@@ -177,11 +179,13 @@ if __name__ == '__main__':
try: try:
logging.debug("generate event") logging.debug("generate event")
targetFile = targetFileBase + str(i) + ".cbf" targetFile = targetFileBase + str(i) + ".cbf"
message = { # message = {
"filename" : targetFile, # "filename" : targetFile,
"filePart" : 0 # "filepart" : 0
} # }
eventSocket.send(cPickle.dumps(message)) message = '{ "filePart": 0, "filename": "' + targetFile + '" }'
# eventSocket.send(cPickle.dumps(message))
eventSocket.send(message)
i += 1 i += 1
eventList = eventDetector.getNewEvent() eventList = eventDetector.getNewEvent()
......
...@@ -35,13 +35,18 @@ print "==== TEST: data ingest ====" ...@@ -35,13 +35,18 @@ print "==== TEST: data ingest ===="
print print
class Receiver(threading.Thread): class Receiver(threading.Thread):
def __init__(self): def __init__(self, context = None):
self.extHost = "0.0.0.0" self.extHost = "0.0.0.0"
self.signalPort = "50050" self.signalPort = "50050"
self.eventPort = "50003" self.eventPort = "50003"
self.dataPort = "50100" self.dataPort = "50100"
self.context = zmq.Context() if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
self.signalSocket = self.context.socket(zmq.REP) self.signalSocket = self.context.socket(zmq.REP)
connectionStr = "tcp://" + str(self.extHost) + ":" + str(self.signalPort) connectionStr = "tcp://" + str(self.extHost) + ":" + str(self.signalPort)
...@@ -70,7 +75,8 @@ class Receiver(threading.Thread): ...@@ -70,7 +75,8 @@ class Receiver(threading.Thread):
for i in range(5): for i in range(5):
logging.debug("eventSocket recv: " + str(cPickle.loads(self.eventSocket.recv()))) # logging.debug("eventSocket recv: " + str(cPickle.loads(self.eventSocket.recv())))
logging.debug("eventSocket recv: " + self.eventSocket.recv())
logging.debug("dataSocket recv: " + self.dataSocket.recv()) logging.debug("dataSocket recv: " + self.dataSocket.recv())
...@@ -85,7 +91,7 @@ class Receiver(threading.Thread): ...@@ -85,7 +91,7 @@ class Receiver(threading.Thread):
def stop(self): def stop(self):
try: try:
if self.signalSocket: if self.signalSocket:
logging.info("closing eventSocket...") logging.info("closing signalSocket...")
self.signalSocket.close(linger=0) self.signalSocket.close(linger=0)
self.signalSocket = None self.signalSocket = None
if self.eventSocket: if self.eventSocket:
...@@ -103,14 +109,24 @@ class Receiver(threading.Thread): ...@@ -103,14 +109,24 @@ class Receiver(threading.Thread):
except: except:
logging.error("closing ZMQ Sockets...failed.", exc_info=True) logging.error("closing ZMQ Sockets...failed.", exc_info=True)
if not self.extContext and self.context:
try:
self.log.info("Closing ZMQ context...")
self.context.destroy(0)
self.context = None
self.log.info("Closing ZMQ context...done.")
except:
self.log.error("Closing ZMQ context...failed.", exc_info=True)
context = zmq.Context()
receiverThread = Receiver() receiverThread = Receiver(context)
receiverThread.start() receiverThread.start()
obj = dataIngest(useLog = True) obj = dataIngest(useLog = True, context = context)
obj.createFile("1.h5") obj.createFile("1.h5")
...@@ -124,8 +140,12 @@ for i in range(5): ...@@ -124,8 +140,12 @@ for i in range(5):
logging.error("break", exc_info=True) logging.error("break", exc_info=True)
break break
obj.closeFile() try:
obj.closeFile()
except:
logging.error("Failed to close file", exc_info=True)
logging.info("Stopping")
receiverThread.stop() receiverThread.stop()
obj.stop() obj.stop()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment