Skip to content
Snippets Groups Projects
dataIngestAPI.cpp 4.21 KiB
Newer Older
# API to ingest data into a data transfer unit

DataIngest::DataIngest ()
{

    std::string connectionStr("tcp://" + signalHost + ":" + signalPort);
    signalSocket.connect(connectionStr);
    std::cout << "signalSocket started (connect) for '" << connectionStr << "'";
//            self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
//            subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);
//#        self.signalSocket.RCVTIMEO = self.responseTimeout

    //  Initialize poll set
    zmq::pollitem_t items [] = {
        { signalSocket, 0, ZMQ_POLLIN, 0 },
    };

    std::string connectionStr("tcp://localhost:" + eventPort);
    eventSocket.connect(connectionStr);
    std::cout << "eventSocket started (connect) for '" << connectionStr << "'";
//            self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)

    std::string connectionStr("tcp://localhost:" + dataPort);
    dataSocket.connect(connectionStr);
    std::cout << "dataSocket started (connect) for '" << connectionStr << "'";
//            self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)


};


int DataIngest::createFile (std::string fileName)
{

//        if self.openFile and self.openFile != filename:
//            raise Exception("File " + str(filename) + " already opened.")

    // Send notification to receiver
    signalSocket.send("OPEN_FILE")
    std::cout << "Sending signal to open a new file.";

    zmq::message_t response;
    signalSocket.recv(&response)
    std::cout << "Received responce:" << response;

    filename = fileName
    filePart = 0

};

int DataIngest::write (std::string *data, int &size)
{

//        message = {
//                "filename" : self.filename,
//                "filePart" : self.filePart
//                }

    std::string message = '{ "filePart": ' + str(self.filePart) + ', "filename": "' + self.filename + '" }';


//        # send event to eventDetector
    eventSocket.send(message)

    // Send data to ZMQ-Queue
    dataSocket.send(data)

    filePart += 1

};

int DataIngest::closeFile()
{

    // Send close-signal to signal socket
    std::string sendMessage = "CLOSE_FILE";
    signalSocket.send(sendMessage);
    std::cout << "Sending signal to close the file to signalSocket.";
    //        self.log.error("Sending signal to close the file to signalSocket...failed.")


    // send close-signal to event Detector
    eventSocket.send(sendMessage)
    std::cout << "Sending signal to close the file to eventSocket.(sendMessage=" << sendMessage << ")";
//        self.log.error("Sending signal to close the file to eventSocket...failed.)")

    zmq::message_t resvMessage;
    self.signalSocket.recv(&resvMessage)

    if ( recvMessage != sendMessage )
    {
//            self.log.debug("recieved message: " + str(recvMessage))
//            self.log.debug("send message: " + str(sendMessage))
//            raise Exception("Something went wrong while notifying to close the file")

    };

    openFile = ""
    filePart = 0

};

DataIngest::~DataIngest()
{

//        try:
//            if self.signalSocket:
//                self.log.info("closing eventSocket...")
//                self.signalSocket.close(linger=0)
//                self.signalSocket = None
//            if self.eventSocket:
//                self.log.info("closing eventSocket...")
//                self.eventSocket.close(linger=0)
//                self.eventSocket = None
//            if self.dataSocket:
//                self.log.info("closing dataSocket...")
//                self.dataSocket.close(linger=0)
//                self.dataSocket = None
//        except:
//            self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
//
//        # if the context was created inside this class,
//        # it has to be destroyed also within the class
//        if not self.extContext and self.context:
//            try:
//                self.log.info("Closing ZMQ context...")
//                self.context.destroy()
//                self.context = None
//                self.log.info("Closing ZMQ context...done.")
//            except:
//                self.log.error("Closing ZMQ context...failed.", exc_info=True)

};