Skip to content
Snippets Groups Projects
Commit 434d2713 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added test for dataIngestAPI in c

parent 8b94e3d9
No related branches found
No related tags found
No related merge requests found
...@@ -4,7 +4,7 @@ DataIngest::DataIngest () ...@@ -4,7 +4,7 @@ DataIngest::DataIngest ()
{ {
std::string connectionStr("tcp://" + signalHost + ":" + signalPort); std::string connectionStr("tcp://" + signalHost + ":" + signalPort);
signalSocket.connect(connectionStr); signalSocket.connect(connectionStr.c_str());
std::cout << "signalSocket started (connect) for '" << connectionStr << "'"; std::cout << "signalSocket started (connect) for '" << connectionStr << "'";
// self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True) // self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
// subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6); // subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);
...@@ -16,12 +16,12 @@ DataIngest::DataIngest () ...@@ -16,12 +16,12 @@ DataIngest::DataIngest ()
}; };
std::string connectionStr("tcp://localhost:" + eventPort); std::string connectionStr("tcp://localhost:" + eventPort);
eventSocket.connect(connectionStr); eventSocket.connect(connectionStr.c_str());
std::cout << "eventSocket started (connect) for '" << connectionStr << "'"; std::cout << "eventSocket started (connect) for '" << connectionStr << "'";
// self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True) // self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
std::string connectionStr("tcp://localhost:" + dataPort); std::string connectionStr("tcp://localhost:" + dataPort);
dataSocket.connect(connectionStr); dataSocket.connect(connectionStr.c_str());
std::cout << "dataSocket started (connect) for '" << connectionStr << "'"; std::cout << "dataSocket started (connect) for '" << connectionStr << "'";
// self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True) // self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
...@@ -41,29 +41,38 @@ int DataIngest::createFile (std::string fileName) ...@@ -41,29 +41,38 @@ int DataIngest::createFile (std::string fileName)
zmq::message_t response; zmq::message_t response;
signalSocket.recv(&response) signalSocket.recv(&response)
std::cout << "Received responce:" << response; std::cout << "Received responce:" << static_cast<char*>(response.data()) << std::endl;
filename = fileName filename = fileName
filePart = 0 filePart = 0
}; };
int DataIngest::write (std::string *data, int &size) void my_free (void *data, void *hint)
{ {
free (data);
};
int DataIngest::write (void *data, int &size)
{
std::string message = "{ \"filePart\": " + std::to_string(filePart) + ", \"filename\": \"" + filename + "\" }";
// message = { zmq::message_t eventMessage (message.length());
// "filename" : self.filename, memcpy ( eventMessage.data (), message.c_str(), message.length());
// "filePart" : self.filePart
// }
std::string message = '{ "filePart": ' + str(self.filePart) + ', "filename": "' + self.filename + '" }'; // Send event to eventDetector
eventSocket.send(eventMessage);
zmq::message_t dataMessage (data, size, my_free);
// # send event to eventDetector // zmq::message_t dataMessage (size);
eventSocket.send(message) // memcpy ( dataMessage.data (), &data, size);
// Send data to ZMQ-Queue // Send data to ZMQ-Queue
dataSocket.send(data) dataSocket.send(dataMessage);
filePart += 1 filePart += 1
...@@ -72,8 +81,10 @@ int DataIngest::write (std::string *data, int &size) ...@@ -72,8 +81,10 @@ int DataIngest::write (std::string *data, int &size)
int DataIngest::closeFile() int DataIngest::closeFile()
{ {
zmq::message_t resvMessage;
char* sendMessage = "CLOSE_FILE";
// Send close-signal to signal socket // Send close-signal to signal socket
std::string sendMessage = "CLOSE_FILE";
signalSocket.send(sendMessage); signalSocket.send(sendMessage);
std::cout << "Sending signal to close the file to signalSocket."; std::cout << "Sending signal to close the file to signalSocket.";
// self.log.error("Sending signal to close the file to signalSocket...failed.") // self.log.error("Sending signal to close the file to signalSocket...failed.")
...@@ -84,13 +95,26 @@ int DataIngest::closeFile() ...@@ -84,13 +95,26 @@ int DataIngest::closeFile()
std::cout << "Sending signal to close the file to eventSocket.(sendMessage=" << sendMessage << ")"; std::cout << "Sending signal to close the file to eventSocket.(sendMessage=" << sendMessage << ")";
// self.log.error("Sending signal to close the file to eventSocket...failed.)") // self.log.error("Sending signal to close the file to eventSocket...failed.)")
zmq::message_t resvMessage;
self.signalSocket.recv(&resvMessage) // try:
// socks = dict(self.poller.poll(10000)) # in ms
// except:
// socks = None
// self.log.error("Could not poll for signal", exc_info=True)
//
// if socks and self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
std::cout << "Received answer to signal..." << std:endl;
// Get the reply.
signalSocket.recv(&resvMessage)
std::cout << "Received answer to signal: " + static_cast<char*>(recvMessage) << std::endl;
// else:
// recvMessage = None
if ( recvMessage != sendMessage ) if ( recvMessage != sendMessage )
{ {
// self.log.debug("recieved message: " + str(recvMessage)) std::cout << "recieved message: " << recvMessage << ")" << std::endl;
// self.log.debug("send message: " + str(sendMessage)) std::cout << "send message: " + sendMessage << std::endl;
// raise Exception("Something went wrong while notifying to close the file") // raise Exception("Something went wrong while notifying to close the file")
}; };
......
test: test_dataIngestAPI.cpp
g++ -o test_dataIngestAPI test_dataIngestAPI.cpp -lzmq -std=c++0x
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#ifndef _WIN32
#include <unistd.h>
#else
#include <windows.h>
#endif
//class Receiver
int main() {
std::string signalHost_ = "*";
std::string signalPort_ = "6000";
std::string extHost = "0.0.0.0";
std::string signalPort = "50050";
std::string eventPort = "50003";
std::string dataPort = "50100";
// Prepare our context and socket
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REP);
std::string connectionStr("tcp://" + signalHost_ + ":" + signalPort_);
socket.bind (connectionStr.c_str());
// if context:
// self.context = context
// self.extContext = True
// else:
// self.context = zmq.Context()
// self.extContext = False
zmq::socket_t signalSocket (context, ZMQ_REP);
zmq::socket_t eventSocket (context, ZMQ_PULL);
zmq::socket_t dataSocket (context, ZMQ_PULL);
// std::string connectionStr;
std::string connectionStr1("tcp://" + extHost + ":" + signalPort);
signalSocket.bind(connectionStr1.c_str());
std::cout << "signalSocket started (bind) for '" << connectionStr1 << "'";
std::string connectionStr2("tcp://" + extHost + ":" + eventPort);
eventSocket.bind(connectionStr2.c_str());
std::cout << "eventSocket started (bind) for '" << connectionStr2 << "'";
std::string connectionStr3("tcp://" + extHost + ":" + dataPort);
dataSocket.bind(connectionStr3.c_str());
std::cout << "dataSocket started (bind) for '" << connectionStr3 << "'";
zmq::message_t message;
signalSocket.recv (&message);
std::cout << "signalSocket recv: " << static_cast<char*>(message.data()) << std::endl;
signalSocket.send (message);
std::cout << "signalSocket send: " << static_cast<char*>(message.data()) << std::endl;
zmq::message_t data;
for (int i = 0; i < 5; i++)
{
eventSocket.recv (&data);
std::cout << "eventSocket recv: " << static_cast<char*>(data.data()) << std::endl;
dataSocket.recv (&data);
std::cout << "dataSocket recv: " << static_cast<char*>(data.data()) << std::endl;
}
signalSocket.recv (&message);
std::cout << "signalSocket recv: " << static_cast<char*>(message.data()) << std::endl;
signalSocket.send (message);
std::cout << "signalSocket send: " << static_cast<char*>(message.data()) << std::endl;
eventSocket.recv (&data);
std::cout << "eventSocket recv: " << static_cast<char*>(data.data()) << std::endl;
return 0;
};
// Receiver(zmq::context_t* m_context)
// {
// context = m_context;
//
// createSockets();
//
// };
//
// Receiver()
// {
// createSockets();
//
// };
//
// void createSockets()
// {
// std::string connectionStr;
//
// connectionStr("tcp://" + extHost + ":" + signalPort);
// signalSocket.bind(connectionStr.c_str());
// std::cout << "signalSocket started (bind) for '" << connectionStr << "'";
//
// connectionStr("tcp://" + extHost + ":" + eventPort);
// eventSocket.bind(connectionStr.c_str());
// std::cout << "eventSocket started (bind) for '" << connectionStr << "'";
//
// connectionStr("tcp://" + extHost + ":" + dataPort);
// dataSocket.bind(connectionStr.c_str());
// std::cout << "dataSocket started (bind) for '" << connectionStr << "'";
// };
//
// void run()
// {
// zmq::message_t message;
//
// signalSocket.recv (&message);
// std::cout << "signalSocket recv: " << static_cast<char*>(message.data()) << std::endl;
//
// signalSocket.send (message);
// std::cout << "signalSocket send: " << static_cast<char*>(message.data()) << std::endl;
//
// zmq::message_t data;
// for (int i = 0; i < 5; i++)
// {
// eventSocket.recv (&data);
// std::cout << "eventSocket recv: " << static_cast<char*>(data.data()) << std::endl;
// dataSocket.recv (&data);
// std::cout << "dataSocket recv: " << static_cast<char*>(data.data()) << std::endl;
// }
//
// signalSocket.recv (&message);
// std::cout << "signalSocket recv: " << static_cast<char*>(message.data()) << std::endl;
//
// signalSocket.send (message);
// std::cout << "signalSocket send: " << static_cast<char*>(message.data()) << std::endl;
//
// eventSocket.recv (&data);
// std::cout << "eventSocket recv: " << static_cast<char*>(data.data()) << std::endl;
//
// }
//
// ~Receiver()
// {};
//
//};
//int main()
//{
// Receiver rcv;
//};
//context = zmq.Context()
//
//receiverThread = Receiver(context)
//receiverThread.start()
//
//
//
//dataIngest obj;
//
//obj.createFile("1.h5")
//
//for (i=0; i < 5; i++):
// std::string data = "asdfasdasdfasd"
// obj.write(data)
// std::cout << "write" << std::endl;
//
//obj.closeFile()
//
//logging.info("Stopping")
//
//receiverThread.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment