-
Manuela Kuhn authoredManuela Kuhn authored
dataIngestAPI.c 6.71 KiB
// API to ingest data into a data transfer unit
#include <dataIngestAPI.h>
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
// helper functions for sending and receiving messages
// source: Pieter Hintjens: ZeroMQ; O'Reilly
static char* s_recv (void *socket)
{
zmq_msg_t message;
zmq_msg_init (&message);
int size = zmq_msg_recv (&message, socket, 0);
if (size == -1)
return NULL;
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
printf("recieved string: %s\n", string);
zmq_msg_close (&message);
string [size] = 0;
return string;
};
static int s_send (void * socket, char* string)
{
printf("Sending: %s\n", string);
int size = zmq_send (socket, string, strlen (string), 0);
if (size == -1) {
printf("Sending failed\n");
// fprintf (stderr, "ERROR: Sending failed\n");
// perror("");
};
return size;
};
struct dataIngest {
char *signalHost;
char *extHost;
char *signalPort;
// has to be the same port as configured in dataManager.conf as eventPort
char *eventPort;
char *dataPort;
// Prepare our context and socket
void *context;
void *signalSocket;
void *eventSocket;
void *dataSocket;
// Initialize poll set
// zmq::pollitem_t items [];
char *filename;
int openFile;
int filePart;
int responseTimeout;
};
int dataIngest_init (dataIngest **out)
{
dataIngest* dI = malloc(sizeof(dataIngest));
*out = NULL;
dI->signalHost = "zitpcx19282";
dI->extHost = "0.0.0.0";
dI->signalPort = "50050";
dI->eventPort = "50003";
dI->dataPort = "50010";
if ( (dI->context = zmq_ctx_new ()) == NULL )
{
perror("ERROR: Cannot create 0MQ context: ");
exit(9);
}
if ( (dI->signalSocket = zmq_socket (dI->context, ZMQ_REQ)) == NULL )
{
perror("ERROR: Could not create 0MQ signalSocket: ");
exit(9);
}
if ( (dI->eventSocket = zmq_socket (dI->context, ZMQ_PUSH)) == NULL )
{
perror("ERROR: Could not create 0MQ eventSocket: ");
exit(9);
}
if ( (dI->dataSocket = zmq_socket (dI->context, ZMQ_PUSH)) == NULL )
{
perror("ERROR: Could not create 0MQ dataSocket: ");
exit(9);
}
dI->filePart = 0;
dI->responseTimeout = 1000;
char connectionStr[128];
int rc;
// Create sockets
snprintf(connectionStr, sizeof(connectionStr), "tcp://%s:%s", dI->signalHost, dI->signalPort);
// rc = zmq_connect(dI->signalSocket, connectionStr);
// assert (rc == 0);
if ( zmq_connect(dI->signalSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start signalSocket (connect) for '%s'\n", connectionStr);
perror("");
} else {
printf("signalSocket started (connect) for '%s'\n", connectionStr);
}
snprintf(connectionStr, sizeof(connectionStr), "tcp://localhost:%s", dI->eventPort);
// rc = zmq_connect(dI->eventSocket, connectionStr);
// assert (rc == 0);
if ( zmq_connect(dI->eventSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start eventSocket (connect) for '%s'\n", connectionStr);
perror("");
} else {
printf("eventSocket started (connect) for '%s'\n", connectionStr);
}
snprintf(connectionStr, sizeof(connectionStr), "tcp://localhost:%s", dI->dataPort);
// rc = zmq_connect(dI->dataSocket, connectionStr);
// assert (rc == 0);
if ( zmq_connect(dI->dataSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start dataSocket (connect) for '%s'\n", connectionStr);
perror("");
} else {
printf("dataSocket started (connect) for '%s'\n", connectionStr);
}
*out = dI;
return 0;
}
int dataIngest_createFile (dataIngest *dI, char *fileName)
{
// if self.openFile and self.openFile != filename:
// raise Exception("File " + str(filename) + " already opened.")
char *message;
int rc;
// Send notification to receiver
rc = s_send (dI->signalSocket, "OPEN_FILE");
printf ("Sending signal to open a new file.\n");
// s_recv (dI->signalSocket, message);
message = s_recv (dI->signalSocket);
printf ("Received responce: '%s'\n", message);
dI->filename = fileName;
dI->filePart = 0;
free (message);
return 0;
}
int dataIngest_write (dataIngest *dI, char *data, int size)
//int dataIngest_write (dataIngest *dI, void *data, int &size)
{
char message[128];
int rc;
snprintf(message, sizeof(message), "{ \"filePart\": %d, \"filename\": \"%s\" }", dI->filePart, dI->filename);
// Send event to eventDetector
rc = s_send (dI->eventSocket, message);
// Send data to ZMQ-Queue
rc = s_send (dI->dataSocket, data);
dI->filePart += 1;
return 0;
};
int dataIngest_closeFile (dataIngest *dI)
{
char *message = "CLOSE_FILE";
char *answer;
int rc;
// Send close-signal to signal socket
rc = s_send (dI->signalSocket, message);
printf ("Sending signal to close the file to signalSocket.\n");
// self.log.error("Sending signal to close the file to signalSocket...failed.")
// send close-signal to event Detector
rc = s_send (dI->eventSocket, message);
printf ("Sending signal to close the file to eventSocket.(sendMessage=%s)\n", message);
// self.log.error("Sending signal to close the file to eventSocket...failed.)")
// try:
// socks = dict(self.poller.poll(10000)) # in ms
// except:
// socks = None
// self.log.error("Could not poll for signal", exc_info=True)
//
// if socks and self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
// Get the reply.
// s_recv (dI->signalSocket, answer);
answer = s_recv (dI->signalSocket);
printf ("Received answer to signal: %s\n", answer);
// else:
// recvMessage = None
if ( message != answer )
{
printf ("recieved message: %s\n", answer);
printf ("send message: %s\n", message);
// raise Exception("Something went wrong while notifying to close the file")
};
dI->filename = "";
dI->filePart = 0;
free (answer);
return 0;
};
int dataIngest_stop (dataIngest *dI)
{
printf ("closing signalSocket...\n");
zmq_close(dI->signalSocket);
printf ("closing eventSocket...\n");
zmq_close(dI->eventSocket);
printf ("closing dataSocket...\n");
zmq_close(dI->dataSocket);
// self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
printf ("Closing ZMQ context...\n");
zmq_ctx_destroy(dI->context);
// self.log.error("Closing ZMQ context...failed.", exc_info=True)
// free (dI);
printf ("Cleanup finished.\n");
return 0;
};