Skip to content
Snippets Groups Projects
dataIngestAPI.cpp 4.95 KiB
Newer Older
# API to ingest data into a data transfer unit

DataIngest::DataIngest ()
{

    std::string connectionStr("tcp://" + signalHost + ":" + signalPort);
    signalSocket.connect(connectionStr.c_str());
    std::cout << "signalSocket started (connect) for '" << connectionStr << "'";
//            self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
//            subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);
//#        self.signalSocket.RCVTIMEO = self.responseTimeout

    //  Initialize poll set
    zmq::pollitem_t items [] = {
        { signalSocket, 0, ZMQ_POLLIN, 0 },
    };

    std::string connectionStr("tcp://localhost:" + eventPort);
    eventSocket.connect(connectionStr.c_str());
    std::cout << "eventSocket started (connect) for '" << connectionStr << "'";
//            self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)

    std::string connectionStr("tcp://localhost:" + dataPort);
    dataSocket.connect(connectionStr.c_str());
    std::cout << "dataSocket started (connect) for '" << connectionStr << "'";
//            self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)


};


int DataIngest::createFile (std::string fileName)
{

//        if self.openFile and self.openFile != filename:
//            raise Exception("File " + str(filename) + " already opened.")

    // Send notification to receiver
    signalSocket.send("OPEN_FILE")
    std::cout << "Sending signal to open a new file.";

    zmq::message_t response;
    signalSocket.recv(&response)
    std::cout << "Received responce:" << static_cast<char*>(response.data()) << std::endl;

    filename = fileName
    filePart = 0

};

void my_free (void *data, void *hint)
    free (data);
};


int DataIngest::write (void *data, int &size)
{

    std::string message = "{ \"filePart\": " + std::to_string(filePart) + ", \"filename\": \"" + filename + "\" }";

    zmq::message_t eventMessage (message.length());
    memcpy ( eventMessage.data (), message.c_str(), message.length());
    // Send event to eventDetector
    eventSocket.send(eventMessage);
    zmq::message_t dataMessage (data, size, my_free);
//    zmq::message_t dataMessage (size);
//    memcpy ( dataMessage.data (), &data, size);

    // Send data to ZMQ-Queue
    dataSocket.send(dataMessage);

    filePart += 1

};

int DataIngest::closeFile()
{

    zmq::message_t resvMessage;
    char* sendMessage = "CLOSE_FILE";

    // Send close-signal to signal socket
    signalSocket.send(sendMessage);
    std::cout << "Sending signal to close the file to signalSocket.";
    //        self.log.error("Sending signal to close the file to signalSocket...failed.")


    // send close-signal to event Detector
    eventSocket.send(sendMessage)
    std::cout << "Sending signal to close the file to eventSocket.(sendMessage=" << sendMessage << ")";
//        self.log.error("Sending signal to close the file to eventSocket...failed.)")


//    try:
//        socks = dict(self.poller.poll(10000)) # in ms
//    except:
//        socks = None
//        self.log.error("Could not poll for signal", exc_info=True)
//
//    if socks and self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
    std::cout << "Received answer to signal..." << std:endl;
    //  Get the reply.
    signalSocket.recv(&resvMessage)
    std::cout << "Received answer to signal: " + static_cast<char*>(recvMessage) << std::endl;
//    else:
//        recvMessage = None


    if ( recvMessage != sendMessage )
    {
        std::cout << "recieved message: " << recvMessage << ")" << std::endl;
        std::cout << "send message: " + sendMessage << std::endl;
//            raise Exception("Something went wrong while notifying to close the file")

    };

    openFile = ""
    filePart = 0

};

DataIngest::~DataIngest()
{

//        try:
//            if self.signalSocket:
//                self.log.info("closing eventSocket...")
//                self.signalSocket.close(linger=0)
//                self.signalSocket = None
//            if self.eventSocket:
//                self.log.info("closing eventSocket...")
//                self.eventSocket.close(linger=0)
//                self.eventSocket = None
//            if self.dataSocket:
//                self.log.info("closing dataSocket...")
//                self.dataSocket.close(linger=0)
//                self.dataSocket = None
//        except:
//            self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
//
//        # if the context was created inside this class,
//        # it has to be destroyed also within the class
//        if not self.extContext and self.context:
//            try:
//                self.log.info("Closing ZMQ context...")
//                self.context.destroy()
//                self.context = None
//                self.log.info("Closing ZMQ context...done.")
//            except:
//                self.log.error("Closing ZMQ context...failed.", exc_info=True)

};