Skip to content
Snippets Groups Projects
dataIngestAPI.py 7.62 KiB
Newer Older
# API to ingest data into a data transfer unit

__version__ = '0.0.1'

import zmq
import logging
import cPickle
import traceback


class dataIngest():
    # return error code
Manuela Kuhn's avatar
Manuela Kuhn committed
    def __init__ (self, useLog = False, context = None):

        if useLog:
            self.log = logging.getLogger("dataIngestAPI")
        else:
            class loggingFunction:
                def out(self, x, exc_info = None):
                    if exc_info:
                        print x, traceback.format_exc()
                    else:
                        print x
                def __init__(self):
                    self.debug    = lambda x, exc_info=None: self.out(x, exc_info)
                    self.info     = lambda x, exc_info=None: self.out(x, exc_info)
                    self.warning  = lambda x, exc_info=None: self.out(x, exc_info)
                    self.error    = lambda x, exc_info=None: self.out(x, exc_info)
                    self.critical = lambda x, exc_info=None: self.out(x, exc_info)

            self.log = loggingFunction()

        # ZMQ applications always start by creating a context,
        # and then using that for creating sockets
        # (source: ZeroMQ, Messaging for Many Applications by Pieter Hintjens)
        if context:
            self.context    = context
            self.extContext = True
        else:
            self.context    = zmq.Context()
            self.extContext = False


        self.signalHost   = "zitpcx19282"
        self.signalPort   = "50050"
Manuela Kuhn's avatar
Manuela Kuhn committed
        # has to be the same port as configured in dataManager.conf as eventPort
        self.eventPort    = "50003"
        #TODO add port in config
        # has to be the same port as configured in dataManager.conf as ...
        self.dataPort     = "50010"
        self.signalSocket = None
        self.eventSocket  = None
        self.dataSocket   = None
        self.poller       = zmq.Poller()

        self.filename     = False
        self.openFile     = False
        self.filePart     = None

        self.responseTimeout = 1000

        self.__createSocket()


Manuela Kuhn's avatar
Manuela Kuhn committed
    def __createSocket (self):

        # To send file open and file close notification, a communication socket is needed
        self.signalSocket = self.context.socket(zmq.REQ)

        # time to wait for the sender to give a confirmation of the signal
#        self.signalSocket.RCVTIMEO = self.responseTimeout
        connectionStr = "tcp://" + str(self.signalHost) + ":" + str(self.signalPort)
        try:
            self.signalSocket.connect(connectionStr)
            self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
        except Exception as e:
            self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
            raise

        # using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
#        self.poller = zmq.Poller()
        self.poller.register(self.signalSocket, zmq.POLLIN)


        self.eventSocket = self.context.socket(zmq.PUSH)
        connectionStr = "tcp://localhost:" + str(self.eventPort)
        try:
            self.eventSocket.connect(connectionStr)
            self.log.info("eventSocket started (connect) for '" + connectionStr + "'")
        except:
            self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
            raise

        self.dataSocket  = self.context.socket(zmq.PUSH)
        connectionStr = "tcp://localhost:" + str(self.dataPort)
        try:
            self.dataSocket.connect(connectionStr)
            self.log.info("dataSocket started (connect) for '" + connectionStr + "'")
        except:
            self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
            raise


    # return error code
Manuela Kuhn's avatar
Manuela Kuhn committed
    def createFile (self, filename):
        if self.openFile and self.openFile != filename:
            raise Exception("File " + str(filename) + " already opened.")

        # send notification to receiver
Manuela Kuhn's avatar
Manuela Kuhn committed
        self.signalSocket.send("OPEN_FILE")
        self.log.info("Sending signal to open a new file.")

        message = self.signalSocket.recv()
        self.log.debug("Received responce: " + str(message))

        self.filename = filename
        self.filePart = 0


Manuela Kuhn's avatar
Manuela Kuhn committed
    def write (self, data):
        # send event to eventDetector
#        message = {
#                "filename" : self.filename,
#                "filePart" : self.filePart
#                }
#        message = "{ 'filename': " + self.filename + ", 'filePart': " + self.filePart + "}"
        message = '{ "filePart": ' + str(self.filePart) + ', "filename": "' + self.filename + '" }'
        self.eventSocket.send(message)

        # send data to ZMQ-Queue
        self.dataSocket.send(data)
        self.filePart += 1


    # return error code
Manuela Kuhn's avatar
Manuela Kuhn committed
    def closeFile (self):
        # send close-signal to signal socket
Manuela Kuhn's avatar
Manuela Kuhn committed
        sendMessage = "CLOSE_FILE"
        try:
            self.signalSocket.send(sendMessage)
            self.log.info("Sending signal to close the file to signalSocket.")
        except:
            raise Exception("Sending signal to close the file to signalSocket...failed.")

        # send close-signal to event Detector
        try:
            self.eventSocket.send(sendMessage)
            self.log.debug("Sending signal to close the file to eventSocket. (sendMessage=" + sendMessage + ")")
        except:
            raise Exception("Sending signal to close the file to eventSocket...failed.")
        try:
            socks = dict(self.poller.poll(10000)) # in ms
        except:
            self.log.error("Could not poll for signal", exc_info=True)

        # if there was a response
        if socks and self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
            self.log.info("Received answer to signal...")
            #  Get the reply.
            recvMessage = self.signalSocket.recv()
            self.log.info("Received answer to signal: " + str(recvMessage) )
        else:
            recvMessage = None

        if recvMessage != sendMessage:
            self.log.debug("recieved message: " + str(recvMessage))
            self.log.debug("send message: " + str(sendMessage))
            raise Exception("Something went wrong while notifying to close the file")

        self.openFile = None
        self.filePart = None


    ##
    #
    # Send signal that the displayer is quitting, close ZMQ connections, destoying context
    #
    ##
Manuela Kuhn's avatar
Manuela Kuhn committed
    def stop (self):
        try:
            if self.signalSocket:
                self.log.info("closing signalSocket...")
                self.signalSocket.close(linger=0)
                self.signalSocket = None
            if self.eventSocket:
                self.log.info("closing eventSocket...")
                self.eventSocket.close(linger=0)
                self.eventSocket = None
            if self.dataSocket:
                self.log.info("closing dataSocket...")
                self.dataSocket.close(linger=0)
                self.dataSocket = None
        except:
            self.log.error("closing ZMQ Sockets...failed.", exc_info=True)

        # if the context was created inside this class,
        # it has to be destroyed also within the class
        if not self.extContext and self.context:
            try:
                self.log.info("Closing ZMQ context...")
                self.context.destroy(0)
                self.context = None
                self.log.info("Closing ZMQ context...done.")
            except:
                self.log.error("Closing ZMQ context...failed.", exc_info=True)


Manuela Kuhn's avatar
Manuela Kuhn committed
    def __exit__ (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
    def __del__ (self):