Newer
Older
# API to ingest data into a data transfer unit
__version__ = '0.0.1'
import zmq
import logging
import cPickle
import traceback
class dataIngest():
# return error code
if useLog:
self.log = logging.getLogger("dataIngestAPI")
else:
class loggingFunction:
def out(self, x, exc_info = None):
if exc_info:
print x, traceback.format_exc()
else:
print x
def __init__(self):
self.debug = lambda x, exc_info=None: self.out(x, exc_info)
self.info = lambda x, exc_info=None: self.out(x, exc_info)
self.warning = lambda x, exc_info=None: self.out(x, exc_info)
self.error = lambda x, exc_info=None: self.out(x, exc_info)
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
self.log = loggingFunction()
# ZMQ applications always start by creating a context,
# and then using that for creating sockets
# (source: ZeroMQ, Messaging for Many Applications by Pieter Hintjens)
if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
self.signalHost = "zitpcx19282"
self.signalPort = "50050"
# has to be the same port as configured in dataManager.conf as eventPort
#TODO add port in config
# has to be the same port as configured in dataManager.conf as ...
self.signalSocket = None
self.eventSocket = None
self.dataSocket = None
self.poller = zmq.Poller()
self.filename = False
self.openFile = False
self.filePart = None
self.responseTimeout = 1000
self.__createSocket()
# To send file open and file close notification, a communication socket is needed
self.signalSocket = self.context.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
# self.signalSocket.RCVTIMEO = self.responseTimeout
connectionStr = "tcp://" + str(self.signalHost) + ":" + str(self.signalPort)
try:
self.signalSocket.connect(connectionStr)
self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
self.poller.register(self.signalSocket, zmq.POLLIN)
self.eventSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.eventPort)
try:
self.eventSocket.connect(connectionStr)
self.log.info("eventSocket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
self.dataSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.dataPort)
try:
self.dataSocket.connect(connectionStr)
self.log.info("dataSocket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# return error code
if self.openFile and self.openFile != filename:
raise Exception("File " + str(filename) + " already opened.")
# send notification to receiver
self.log.info("Sending signal to open a new file.")
message = self.signalSocket.recv()
self.log.debug("Received responce: " + str(message))
self.filename = filename
self.filePart = 0
# send event to eventDetector
# message = {
# "filename" : self.filename,
# "filePart" : self.filePart
# }
# message = "{ 'filename': " + self.filename + ", 'filePart': " + self.filePart + "}"
message = '{ "filePart": ' + str(self.filePart) + ', "filename": "' + self.filename + '" }'
self.eventSocket.send(message)
# send data to ZMQ-Queue
self.dataSocket.send(data)
self.filePart += 1
# return error code
# send close-signal to signal socket
try:
self.signalSocket.send(sendMessage)
self.log.info("Sending signal to close the file to signalSocket.")
except:
raise Exception("Sending signal to close the file to signalSocket...failed.")
# send close-signal to event Detector
try:
self.eventSocket.send(sendMessage)
self.log.debug("Sending signal to close the file to eventSocket. (sendMessage=" + sendMessage + ")")
except:
raise Exception("Sending signal to close the file to eventSocket...failed.")
try:
socks = dict(self.poller.poll(10000)) # in ms
except:
self.log.error("Could not poll for signal", exc_info=True)
# if there was a response
if socks and self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
self.log.info("Received answer to signal...")
# Get the reply.
recvMessage = self.signalSocket.recv()
self.log.info("Received answer to signal: " + str(recvMessage) )
else:
recvMessage = None
if recvMessage != sendMessage:
self.log.debug("recieved message: " + str(recvMessage))
self.log.debug("send message: " + str(sendMessage))
raise Exception("Something went wrong while notifying to close the file")
self.openFile = None
self.filePart = None
##
#
# Send signal that the displayer is quitting, close ZMQ connections, destoying context
#
##
try:
if self.signalSocket:
self.log.info("closing signalSocket...")
self.signalSocket.close(linger=0)
self.signalSocket = None
if self.eventSocket:
self.log.info("closing eventSocket...")
self.eventSocket.close(linger=0)
self.eventSocket = None
if self.dataSocket:
self.log.info("closing dataSocket...")
self.dataSocket.close(linger=0)
self.dataSocket = None
except:
self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
# if the context was created inside this class,
# it has to be destroyed also within the class
if not self.extContext and self.context:
try:
self.log.info("Closing ZMQ context...")
self.context = None
self.log.info("Closing ZMQ context...done.")
except:
self.log.error("Closing ZMQ context...failed.", exc_info=True)