Skip to content
Snippets Groups Projects
Commit a5a36baf authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed dataIngestAPI compiling errors in dataIngestAPI

parent 5c52e4f6
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,7 @@
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
typedef int bool;
......@@ -28,14 +29,22 @@ static char* s_recv (void *socket)
return NULL;
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
printf("recieved string: %s\n", string);
zmq_msg_close (&message);
string [size] = 0;
return string;
};
static int s_send (void * socket, char* string)
{
printf("Sending: %s\n", string);
int size = zmq_send (socket, string, strlen (string), 0);
if (size == -1) {
printf("Sending failed\n");
// fprintf (stderr, "ERROR: Sending failed\n");
// perror("");
};
return size;
};
......@@ -53,6 +62,7 @@ typedef struct {
// Prepare our context and socket
void *context;
void *signalSocket;
void *signalSocket_r;
void *eventSocket;
void *dataSocket;
......@@ -68,66 +78,99 @@ typedef struct {
int dataIngest_init (dataIngest *dI)
{
dI = malloc(sizeof(dataIngest));
dI->signalHost = "zitpcx19282";
dI->extHost = "0.0.0.0";
dI->signalPort = "50050";
dI->eventPort = "50003";
dI->dataPort = "50010";
//
// if ( (dI->context = zmq_ctx_new ()) == NULL )
// {
// perror("ERROR: Cannot create 0MQ context: ");
// exit(9);
// }
//
// if ( (dI->signalSocket = zmq_socket (dI->context, ZMQ_REQ)) == NULL )
// {
// perror("ERROR: Could not create 0MQ socket: ");
// exit(9);
// }
//
// if ( (dI->eventSocket = zmq_socket (dI->context, ZMQ_PUSH)) == NULL )
// {
// perror("ERROR: Could not create 0MQ socket: ");
// exit(9);
// }
//
// if ( (dI->dataSocket = zmq_socket (dI->context, ZMQ_PUSH)) == NULL )
// {
// perror("ERROR: Could not create 0MQ socket: ");
// exit(9);
// }
dI->context = zmq_ctx_new ();
dI->signalSocket = zmq_socket (dI->context, ZMQ_REQ);
dI->eventSocket = zmq_socket (dI->context, ZMQ_PUSH);
dI->dataSocket = zmq_socket (dI->context, ZMQ_PUSH);
if ( (dI->context = zmq_ctx_new ()) == NULL )
{
perror("ERROR: Cannot create 0MQ context: ");
exit(9);
}
if ( (dI->signalSocket = zmq_socket (dI->context, ZMQ_REQ)) == NULL )
{
perror("ERROR: Could not create 0MQ signalSocket: ");
exit(9);
}
if ( (dI->signalSocket_r = zmq_socket (dI->context, ZMQ_REP)) == NULL )
{
perror("ERROR: Could not create 0MQ signalSocket_r: ");
exit(9);
}
if ( (dI->eventSocket = zmq_socket (dI->context, ZMQ_PUSH)) == NULL )
{
perror("ERROR: Could not create 0MQ eventSocket: ");
exit(9);
}
if ( (dI->dataSocket = zmq_socket (dI->context, ZMQ_PUSH)) == NULL )
{
perror("ERROR: Could not create 0MQ dataSocket: ");
exit(9);
}
dI->filePart = 0;
dI->responseTimeout = 1000;
dI->responseTimeout = 1000;
char connectionStr[128];
int rc;
// Create sockets
sprintf(connectionStr, "tcp://%s:%s", dI->signalHost, dI->signalPort);
rc = zmq_connect(dI->signalSocket, connectionStr);
assert (rc == 0);
// self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
printf("signalSocket started (connect) for '%s'\n", connectionStr);
sprintf(connectionStr, "tcp://localhost:%s", dI->eventPort);
rc = zmq_connect(dI->eventSocket, connectionStr);
assert (rc == 0);
// self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
printf("eventSocket started (connect) for '%s'\n", connectionStr);
sprintf(connectionStr, "tcp://localhost:%s", dI->dataPort);
rc = zmq_connect(dI->dataSocket, connectionStr);
assert (rc == 0);
// self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
printf("dataSocket started (connect) for '%s'\n", connectionStr);
snprintf(connectionStr, sizeof(connectionStr), "tcp://%s:%s", dI->signalHost, dI->signalPort);
// rc = zmq_connect(dI->signalSocket, connectionStr);
// assert (rc == 0);
if ( zmq_connect(dI->signalSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start signalSocket (connect) for '%s'\n", connectionStr);
perror("");
} else {
printf("signalSocket started (connect) for '%s'\n", connectionStr);
}
/**** for testing ***/
snprintf(connectionStr, sizeof(connectionStr), "tcp://*:%s", dI->signalPort);
if ( zmq_bind(dI->signalSocket_r, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start signalSocket (bind) for '%s'\n", connectionStr);
perror("");
} else {
printf("signalSocket_r started (bind) for '%s'\n", connectionStr);
}
/**** end testing ***/
snprintf(connectionStr, sizeof(connectionStr), "tcp://localhost:%s", dI->eventPort);
// rc = zmq_connect(dI->eventSocket, connectionStr);
// assert (rc == 0);
if ( zmq_connect(dI->eventSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start eventSocket (connect) for '%s'\n", connectionStr);
perror("");
} else {
printf("eventSocket started (connect) for '%s'\n", connectionStr);
}
snprintf(connectionStr, sizeof(connectionStr), "tcp://localhost:%s", dI->dataPort);
// rc = zmq_connect(dI->dataSocket, connectionStr);
// assert (rc == 0);
if ( zmq_connect(dI->dataSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start dataSocket (connect) for '%s'\n", connectionStr);
perror("");
} else {
printf("dataSocket started (connect) for '%s'\n", connectionStr);
}
return 0;
}
......@@ -139,12 +182,22 @@ int dataIngest_createFile (dataIngest *dI, char *fileName)
// raise Exception("File " + str(filename) + " already opened.")
char *message;
char *message2;
int rc;
// Send notification to receiver
rc = s_send (dI->signalSocket, "OPEN_FILE");
printf ("Sending signal to open a new file.\n");
/**** for testing ***/
message2 = s_recv (dI->signalSocket_r);
printf ("Received responce: %s\n", message2);
rc = s_send (dI->signalSocket_r, "OPEN_FILE");
printf ("Sending signal to open a new file.\n");
/**** end testing ***/
// s_recv (dI->signalSocket, message);
message = s_recv (dI->signalSocket);
printf ("Received responce: %s\n", message);
......@@ -152,7 +205,9 @@ int dataIngest_createFile (dataIngest *dI, char *fileName)
dI->filePart = 0;
free (message);
free (message2);
return 0;
}
......@@ -163,7 +218,7 @@ int dataIngest_write (dataIngest *dI, char *data, int size)
char message[128];
int rc;
sprintf(message, "{ \"filePart\": %d, \"filename\": \"%s\" }", dI->filePart, dI->filename);
snprintf(message, sizeof(message), "{ \"filePart\": %d, \"filename\": \"%s\" }", dI->filePart, dI->filename);
// Send event to eventDetector
rc = s_send (dI->eventSocket, message);
......@@ -173,6 +228,7 @@ int dataIngest_write (dataIngest *dI, char *data, int size)
dI->filePart += 1;
return 0;
};
......@@ -204,6 +260,7 @@ int dataIngest_closeFile (dataIngest *dI)
//
// if socks and self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
// Get the reply.
// s_recv (dI->signalSocket, answer);
answer = s_recv (dI->signalSocket);
printf ("Received answer to signal: %s\n", answer);
// else:
......@@ -220,40 +277,29 @@ int dataIngest_closeFile (dataIngest *dI)
dI->filename = "";
dI->filePart = 0;
free (message);
free (answer);
return 0;
};
int dataIngest_stop (dataIngest *dI)
{
free (dI->signalHost);
free (dI->extHost);
free (dI->signalPort);
free (dI->eventPort);
free (dI->dataPort);
free (dI->filename);
printf ("closing signalSocket...");
printf ("closing signalSocket...\n");
zmq_close(dI->signalSocket);
printf ("closing eventSocket...");
printf ("closing eventSocket...\n");
zmq_close(dI->eventSocket);
printf ("closing dataSocket...");
printf ("closing dataSocket...\n");
zmq_close(dI->dataSocket);
// self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
printf ("Closing ZMQ context...");
printf ("Closing ZMQ context...\n");
zmq_ctx_destroy(dI->context);
// self.log.error("Closing ZMQ context...failed.", exc_info=True)
// Prepare our context and socket
free (dI->context);
free (dI->signalSocket);
free (dI->eventSocket);
free (dI->dataSocket);
// free (dI);
return 0;
};
//#endif
......@@ -265,23 +311,24 @@ int main ()
char *data;
int i;
int size;
int rc;
dataIngest_init (obj);
rc = dataIngest_init (obj);
dataIngest_createFile (obj, "1.h5");
rc = dataIngest_createFile (obj, "1.h5");
for (i=0; i < 5; i++)
{
data = "asdfasdasdfasd";
size = strlen(data);
dataIngest_write (obj, data, size);
printf ("write");
rc = dataIngest_write (obj, data, size);
printf ("write\n");
};
dataIngest_closeFile (obj);
rc = dataIngest_closeFile (obj);
printf ("Stopping");
dataIngest_stop(obj);
printf ("Stopping\n");
rc = dataIngest_stop(obj);
return 0;
};
......
......@@ -4,10 +4,12 @@
#include <string.h>
#include <stdlib.h>
int main() {
dataIngest obj;
int main()
{
dataIngest *obj;
char *data;
int i;
int size;
dataIngest_init (obj);
......@@ -16,15 +18,16 @@ int main() {
for (i=0; i < 5; i++)
{
data = "asdfasdasdfasd";
size = strlen(data);
dataIngest_write (obj, data, size);
printf ("write");
printf ("write\n");
};
dataIngest_closeFile (obj);
printf ("Stopping");
printf ("Stopping\n");
dataIngest_stop(obj);
return 0;
};
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment