Skip to content
Snippets Groups Projects
Commit d1fa11d1 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

First approach of nexusTransferAPI in C

parent 27edea36
No related branches found
No related tags found
No related merge requests found
// API to ingest data into a data transfer unit
//#ifndef DATAINGEST_H
#define DATAINGEST_H
#define version "0.0.1"
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
typedef int bool;
#define true 1
#define false 0
// helper functions for sending and receiving messages
// source: Pieter Hintjens: ZeroMQ; O'Reilly
static char* s_recv (void *socket)
{
zmq_msg_t message;
zmq_msg_init (&message);
int size = zmq_msg_recv (&message, socket, 0);
if (size == -1)
return NULL;
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
printf("recieved string: %s\n", string);
zmq_msg_close (&message);
string [size] = 0;
return string;
};
static int s_send (void * socket, char* string)
{
printf("Sending: %s\n", string);
int size = zmq_send (socket, string, strlen (string), 0);
if (size == -1) {
printf("Sending failed\n");
// fprintf (stderr, "ERROR: Sending failed\n");
// perror("");
};
return size;
};
typedef struct {
char *extHost;
char *localhost;
char *signalPort;
char *dataPort;
// Prepare our context and socket
void *context;
void *signalSocket;
void *dataSocket;
// Initialize poll set
// zmq::pollitem_t items [];
int numberOfStreams;
// self.recvdCloseFrom = []
char *replyToSignal;
bool allCloseRecvd;
} nexusTransfer;
int nexusTransfer_init (nexusTransfer *nT)
{
nT->extHost = "0.0.0.0";
nT->localhost = "localhost";
nT->signalPort = "50050";
nT->dataPort = "50100";
if ( (nT->context = zmq_ctx_new ()) == NULL )
{
perror("ERROR: Cannot create 0MQ context: ");
exit(9);
}
if ( (nT->signalSocket = zmq_socket (nT->context, ZMQ_REP)) == NULL )
{
perror("ERROR: Could not create 0MQ signalSocket: ");
exit(9);
}
if ( (nT->dataSocket = zmq_socket (nT->context, ZMQ_PULL)) == NULL )
{
perror("ERROR: Could not create 0MQ dataSocket: ");
exit(9);
}
nT->replyToSignal = ""
nT->allCloseRecvd = false
char connectionStr[128];
int rc;
// Create sockets
snprintf(connectionStr, sizeof(connectionStr), "tcp://%s:%s", nT->extlHost, nT->signalPort);
// rc = zmq_connect(nT->signalSocket, connectionStr);
// assert (rc == 0);
if ( zmq_bind(nT->signalSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start signalSocket (bind) for '%s'\n", connectionStr);
perror("");
} else {
printf("signalSocket started (bind) for '%s'\n", connectionStr);
}
snprintf(connectionStr, sizeof(connectionStr), "tcp://%s:%s", nT->localhost, nT->dataPort);
// rc = zmq_connect(nT->dataSocket, connectionStr);
// assert (rc == 0);
if ( zmq_bind(nT->dataSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start dataSocket (bind) for '%s'\n", connectionStr);
perror("");
} else {
printf("dataSocket started (bind) for '%s'\n", connectionStr);
}
return 0;
}
int nexusTransfer_read (nexusTransfer *nT, char *data, int size)
{
zmq_pollitem_t items [] = {
{ nT->signalSocket, 0, ZMQ_POLLIN, 0 },
{ nT->dataSocket, 0, ZMQ_POLLIN, 0 }
};
while (1)
{
printf ("polling");
zmq_poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN)
{
printf ("signalSocket is polling\n");
data = s_recv (dI->signalSocket);
printf ("signalSocket recv: '%s'\n", message);
if (data == b"CLOSE_FILE" && nT->allCloseRecvd == false):
nT->replyToSignal = data;
else:
rc = zmq_send (nT-signalSocket, data, strlen(data), 0);
printf ("signalSocket send: '%s'\n", data);
return 0;
}
if (items [1].revents & ZMQ_POLLIN) {
printf ("dataSocket is polling\n");
char *multipartMessage[2];
int j;
rc = getMultipartMessage(dI->dataSocket, multipartMessage)
for (j = 0; j < 2; j++)
{
free(multipartMessage[j]);
};
}
};
int recv_multipartMessage (void *socket, char **multipartMessage, int *len)
{
int i = 0;
int size;
int more;
size_t more_size = sizeof(more);
while (1)
{
// Wait for next request from client
zmq_msg_t message;
zmq_msg_init (&message);
m_size = zmq_msg_recv (&message, socket, 0);
//Process message
multipartMessage[i] = malloc (m_size + 1);
memcpy (multipartMessage[i], zmq_msg_data (&message), m_size);
printf("recieved string: %s\n", multipartMessage[i]);
multipartMessage[i] [m_size] = 0;
i++;
zmq_msg_close(&message);
zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
if (more == 0)
{
printf("last received\n");
*len = i;
break;
};
}
return 0;
};
int getMultipartMessage (void *socket, char *data)
{
char* mulitpartMessage[2];
int len;
int rc;
//TODO multipart
rc = recv_multipartMessage (dI-dataSocket, multipartMessage, len);
printf ("multipartMessage[0]=%s\nmultipartMessage[1]=%s\n", multipartMessage[0], multipartMessage[1]);
//self.log.error("Could not receive data due to unknown error.", exc_info=True)
if (len < 2)
{
printf ("Received mutipart-message is too short. Either config or file content is missing.");
// self.log.debug("multipartMessage=" + str(mutipartMessage))
//TODO coorect errorcode
return -1;
}
if (multipartMessage[0] == "CLOSE_FILE")
{
/*
id = multipartMessage[1]
self.recvdCloseFrom.append(id)
self.log.debug("Received close-file-signal from DataDispatcher-" + id)
# get number of signals to wait for
if not self.numberOfStreams:
self.numberOfStreams = int(id.split("/")[1])
# have all signals arrived?
if len(self.recvdCloseFrom) == self.numberOfStreams:
self.log.info("All close-file-signals arrived")
self.allCloseRecvd = True
if self.replyToSignal:
self.signalSocket.send(self.replyToSignal)
self.log.debug("signalSocket send: " + self.replyToSignal)
self.replyToSignal = False
else:
pass
*/
data = "CLOSE_FILE"
return 0;
} else {
data = multipartMessage[1];
return 0;
}
};
int nexusTransfer_stop (dataIngest *nT)
{
printf ("closing signalSocket...\n");
zmq_close(nT->signalSocket);
printf ("closing dataSocket...\n");
zmq_close(nT->dataSocket);
// self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
printf ("Closing ZMQ context...\n");
zmq_ctx_destroy(nT->context);
// self.log.error("Closing ZMQ context...failed.", exc_info=True)
// free (nT);
printf ("Cleanup finished.\n");
return 0;
};
//#endif
int main ()
{
nexusTransfer *obj;
obj = malloc(sizeof(nexusTransfer));
char *data;
int i;
int size;
int rc;
rc = nexusTransfer_init (obj);
rc = nexusTransfer_read (obj, data, size);
printf("Read data: %s, size: %d\n", data, size);
for (i=0; i < 5; i++)
{
rc = nexusTransfer_read (obj, data, size);
printf("Read data: %s, size: %d\n", data, size);
};
rc = nexusTransfer_read (obj, data, size);
printf("Read data: %s, size: %d\n", data, size);
printf ("Stopping\n");
rc = nexusTransfer_stop(obj);
free(obj);
return 0;
};
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment