Newer
Older
# API to ingest data into a data transfer unit
__version__ = '0.0.1'
import zmq
import logging
import cPickle
import traceback
class dataIngest():
# return error code
if useLog:
self.log = logging.getLogger("dataIngestAPI")
else:
class loggingFunction:
def out(self, x, exc_info = None):
if exc_info:
print x, traceback.format_exc()
else:
print x
def __init__(self):
self.debug = lambda x, exc_info=None: self.out(x, exc_info)
self.info = lambda x, exc_info=None: self.out(x, exc_info)
self.warning = lambda x, exc_info=None: self.out(x, exc_info)
self.error = lambda x, exc_info=None: self.out(x, exc_info)
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
self.log = loggingFunction()
# ZMQ applications always start by creating a context,
# and then using that for creating sockets
# (source: ZeroMQ, Messaging for Many Applications by Pieter Hintjens)
if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
self.signalHost = "zitpcx19282"
self.signalPort = "50050"
# has to be the same port as configured in dataManager.conf as eventPort
#TODO add port in config
# has to be the same port as configured in dataManager.conf as ...
self.signalSocket = None
self.eventSocket = None
self.dataSocket = None
self.poller = zmq.Poller()
self.filename = False
self.openFile = False
self.filePart = None
self.responseTimeout = 1000
self.__createSocket()
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# To send file open and file close notification, a communication socket is needed
self.signalSocket = self.context.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
# self.signalSocket.RCVTIMEO = self.responseTimeout
connectionStr = "tcp://" + str(self.signalHost) + ":" + str(self.signalPort)
try:
self.signalSocket.connect(connectionStr)
self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
self.poller = zmq.Poller()
self.poller.register(self.signalSocket, zmq.POLLIN)
self.eventSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.eventPort)
try:
self.eventSocket.connect(connectionStr)
self.log.info("eventSocket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
self.dataSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.dataPort)
try:
self.dataSocket.connect(connectionStr)
self.log.info("dataSocket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# return error code
if self.openFile and self.openFile != filename:
raise Exception("File " + str(filename) + " already opened.")
# send notification to receiver
self.log.info("Sending signal to open a new file.")
message = self.signalSocket.recv()
self.log.debug("Received responce: " + str(message))
self.filename = filename
self.filePart = 0
# send event to eventDetector
message = {
"filename" : self.filename,
"filePart" : self.filePart
}
self.eventSocket.send(cPickle.dumps(message))
# send data to ZMQ-Queue
self.dataSocket.send(data)
self.filePart += 1
# return error code
# send close-signal to signal socket
self.signalSocket.send(sendMessage)
self.log.info("Sending signal to close the file to signalSocket.")
# send close-signal to event Detector
self.eventSocket.send(sendMessage)
self.log.debug("Sending signal to close the file to eventSocket.(sendMessage=" + sendMessage + ")")
try:
socks = dict(self.poller.poll(10000)) # in ms
except:
self.log.error("Could not poll for signal", exc_info=True)
# if there was a response
if self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
# Get the reply.
recvMessage = self.signalSocket.recv()
self.log.info("Received answer to signal: " + str(recvMessage) )
else:
recvMessage = None
if recvMessage != sendMessage:
self.log.debug("recieved message: " + str(recvMessage))
self.log.debug("send message: " + str(sendMessage))
raise Exception("Something went wrong while notifying to close the file")
self.openFile = None
self.filePart = None
##
#
# Send signal that the displayer is quitting, close ZMQ connections, destoying context
#
##
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
try:
if self.signalSocket:
self.log.info("closing eventSocket...")
self.signalSocket.close(linger=0)
self.signalSocket = None
if self.eventSocket:
self.log.info("closing eventSocket...")
self.eventSocket.close(linger=0)
self.eventSocket = None
if self.dataSocket:
self.log.info("closing dataSocket...")
self.dataSocket.close(linger=0)
self.dataSocket = None
except:
self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
# if the context was created inside this class,
# it has to be destroyed also within the class
if not self.extContext and self.context:
try:
self.log.info("Closing ZMQ context...")
self.context.destroy()
self.context = None
self.log.info("Closing ZMQ context...done.")
except:
self.log.error("Closing ZMQ context...failed.", exc_info=True)