Commit faa32595 authored by Manuela Kuhn's avatar Manuela Kuhn

Merge branch 'release-2.4'

parents 2020be52 df98f809
......@@ -2,3 +2,4 @@
*.pyc
*.swp
*.o
*.x
......@@ -3,3 +3,7 @@ dataIngest_h:
dataIngest_c:
gcc -o dataIngestAPI.o dataIngestAPI.c -lzmq
dataTransfer_h:
gcc -o dataTransferAPI.o dataTransferAPI.h -lzmq -c
This diff is collapsed.
# API to ingest data into a data transfer unit
DataIngest::DataIngest ()
{
std::string connectionStr("tcp://" + signalHost + ":" + signalPort);
signalSocket.connect(connectionStr.c_str());
std::cout << "signalSocket started (connect) for '" << connectionStr << "'";
// self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
// subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);
//# self.signalSocket.RCVTIMEO = self.responseTimeout
// Initialize poll set
zmq::pollitem_t items [] = {
{ signalSocket, 0, ZMQ_POLLIN, 0 },
};
std::string connectionStr("tcp://localhost:" + eventPort);
eventSocket.connect(connectionStr.c_str());
std::cout << "eventSocket started (connect) for '" << connectionStr << "'";
// self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
std::string connectionStr("tcp://localhost:" + dataPort);
dataSocket.connect(connectionStr.c_str());
std::cout << "dataSocket started (connect) for '" << connectionStr << "'";
// self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
};
int DataIngest::createFile (std::string fileName)
{
// if self.openFile and self.openFile != filename:
// raise Exception("File " + str(filename) + " already opened.")
// Send notification to receiver
signalSocket.send("OPEN_FILE")
std::cout << "Sending signal to open a new file.";
zmq::message_t response;
signalSocket.recv(&response)
std::cout << "Received responce:" << static_cast<char*>(response.data()) << std::endl;
filename = fileName
filePart = 0
};
void my_free (void *data, void *hint)
{
free (data);
};
int DataIngest::write (void *data, int &size)
{
std::string message = "{ \"filePart\": " + std::to_string(filePart) + ", \"filename\": \"" + filename + "\" }";
zmq::message_t eventMessage (message.length());
memcpy ( eventMessage.data (), message.c_str(), message.length());
// Send event to eventDetector
eventSocket.send(eventMessage);
zmq::message_t dataMessage (data, size, my_free);
// zmq::message_t dataMessage (size);
// memcpy ( dataMessage.data (), &data, size);
// Send data to ZMQ-Queue
dataSocket.send(dataMessage);
filePart += 1
};
int DataIngest::closeFile()
{
zmq::message_t resvMessage;
char* sendMessage = "CLOSE_FILE";
// Send close-signal to signal socket
signalSocket.send(sendMessage);
std::cout << "Sending signal to close the file to signalSocket.";
// self.log.error("Sending signal to close the file to signalSocket...failed.")
// send close-signal to event Detector
eventSocket.send(sendMessage)
std::cout << "Sending signal to close the file to eventSocket.(sendMessage=" << sendMessage << ")";
// self.log.error("Sending signal to close the file to eventSocket...failed.)")
// try:
// socks = dict(self.poller.poll(10000)) # in ms
// except:
// socks = None
// self.log.error("Could not poll for signal", exc_info=True)
//
// if socks and self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
std::cout << "Received answer to signal..." << std:endl;
// Get the reply.
signalSocket.recv(&resvMessage)
std::cout << "Received answer to signal: " + static_cast<char*>(recvMessage) << std::endl;
// else:
// recvMessage = None
if ( recvMessage != sendMessage )
{
std::cout << "recieved message: " << recvMessage << ")" << std::endl;
std::cout << "send message: " + sendMessage << std::endl;
// raise Exception("Something went wrong while notifying to close the file")
};
openFile = ""
filePart = 0
};
DataIngest::~DataIngest()
{
// try:
// if self.signalSocket:
// self.log.info("closing eventSocket...")
// self.signalSocket.close(linger=0)
// self.signalSocket = None
// if self.eventSocket:
// self.log.info("closing eventSocket...")
// self.eventSocket.close(linger=0)
// self.eventSocket = None
// if self.dataSocket:
// self.log.info("closing dataSocket...")
// self.dataSocket.close(linger=0)
// self.dataSocket = None
// except:
// self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
//
// # if the context was created inside this class,
// # it has to be destroyed also within the class
// if not self.extContext and self.context:
// try:
// self.log.info("Closing ZMQ context...")
// self.context.destroy()
// self.context = None
// self.log.info("Closing ZMQ context...done.")
// except:
// self.log.error("Closing ZMQ context...failed.", exc_info=True)
};
......@@ -7,14 +7,16 @@
typedef struct dataIngest dataIngest;
int dataIngest_init (dataIngest **dI);
typedef enum { SUCCESS, NOTSUPPORTED, USAGEERROR, FORMATERROR, ZMQERROR, CONNECTIONFAILED, VERSIONERROR, AUTHENTICATIONFAILED, COMMUNICATIONFAILED, DATASAVINGERROR } HIDRA_ERROR;
int dataIngest_createFile (dataIngest *dI, char *fileName);
HIDRA_ERROR dataIngest_init (dataIngest **dI);
int dataIngest_write (dataIngest *dI, char *data, int size);
HIDRA_ERROR dataIngest_createFile (dataIngest *dI, char *fileName);
int dataIngest_closeFile (dataIngest *dI);
HIDRA_ERROR dataIngest_write (dataIngest *dI, char *data, int size);
int dataIngest_stop (dataIngest *dI);
HIDRA_ERROR dataIngest_closeFile (dataIngest *dI);
HIDRA_ERROR dataIngest_stop (dataIngest *dI);
#endif
# API to ingest data into a data transfer unit
#ifndef DATAINGEST_H
#define DATAINGEST_H
#define version "0.0.1"
#include <zmq.hpp>
#include <string>
//import logging
//import cPickle
//import traceback
//typedef std::map<std::string, int> Dict;
class DataIngest
{
std::string signalHost = "zitpcx19282";
std::string signalPort = "50050";
// has to be the same port as configured in dataManager.conf as eventPort
std::string eventPort = "50003";
std::string dataPort = "50010";
zmq::context_t context (1);
zmq::socket_t signalSocket (context, ZMQ_REQ);
zmq::socket_t eventSocket (context, ZMQ_PUSH);
zmq::socket_t dataSocket (context, ZMQ_PUSH);
// Initialize poll set
zmq::pollitem_t items [];
std::string self.filename;
bool openFile;
int filePart;
int responseTimeout = 1000;
DataIngest ();
createFile (std::string filename);
write(data);
closeFile();
stop();
~DataIngest();
};
#endif
......@@ -2,10 +2,47 @@
__version__ = '0.0.1'
import os
import platform
import zmq
import logging
import cPickle
import traceback
import json
class loggingFunction:
def out (self, x, exc_info = None):
if exc_info:
print x, traceback.format_exc()
else:
print x
def __init__ (self):
self.debug = lambda x, exc_info=None: self.out(x, exc_info)
self.info = lambda x, exc_info=None: self.out(x, exc_info)
self.warning = lambda x, exc_info=None: self.out(x, exc_info)
self.error = lambda x, exc_info=None: self.out(x, exc_info)
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
class noLoggingFunction:
def out (self, x, exc_info = None):
pass
def __init__ (self):
self.debug = lambda x, exc_info=None: self.out(x, exc_info)
self.info = lambda x, exc_info=None: self.out(x, exc_info)
self.warning = lambda x, exc_info=None: self.out(x, exc_info)
self.error = lambda x, exc_info=None: self.out(x, exc_info)
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
def isWindows():
returnValue = False
windowsName = "Windows"
platformName = platform.system()
if platformName == windowsName:
returnValue = True
return returnValue
class dataIngest():
......@@ -14,20 +51,9 @@ class dataIngest():
if useLog:
self.log = logging.getLogger("dataIngestAPI")
elif useLog == None:
self.log = noLoggingFunction()
else:
class loggingFunction:
def out(self, x, exc_info = None):
if exc_info:
print x, traceback.format_exc()
else:
print x
def __init__(self):
self.debug = lambda x, exc_info=None: self.out(x, exc_info)
self.info = lambda x, exc_info=None: self.out(x, exc_info)
self.warning = lambda x, exc_info=None: self.out(x, exc_info)
self.error = lambda x, exc_info=None: self.out(x, exc_info)
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
self.log = loggingFunction()
# ZMQ applications always start by creating a context,
......@@ -40,27 +66,44 @@ class dataIngest():
self.context = zmq.Context()
self.extContext = False
self.currentPID = os.getpid()
self.localhost = "127.0.0.1"
self.extIp = "0.0.0.0"
self.ipcPath = "/tmp/HiDRA"
self.signalHost = "zitpcx19282"
self.signalPort = "50050"
self.signalHost = "zitpcx19282"
self.signalPort = "50050"
# has to be the same port as configured in dataManager.conf as eventPort
self.eventPort = "50003"
#TODO add port in config
# has to be the same port as configured in dataManager.conf as eventDetPort
self.eventDetPort = "50003"
# has to be the same port as configured in dataManager.conf as ...
self.dataPort = "50010"
self.dataFetchPort = "50010"
self.signalConId = "tcp://{ip}:{port}".format(ip=self.signalHost, port=self.signalPort)
if isWindows():
self.log.info("Using tcp for internal communication.")
self.eventDetConId = "tcp://{ip}:{port}".format(ip=self.localhost, port=self.eventDetPort)
self.dataFetchConId = "tcp://{ip}:{port}".format(ip=self.localhost, port=self.dataFetchPort)
else:
self.log.info("Using ipc for internal communication.")
self.eventDetConId = "ipc://{path}/{id}".format(path=self.ipcPath, id="eventDet")
self.dataFetchConId = "ipc://{path}/{id}".format(path=self.ipcPath, id="dataFetch")
# self.eventDetConId = "ipc://{path}/{pid}_{id}".format(path=self.ipcPath, pid=self.currentPID, id="eventDet")
# self.dataFetchConId = "ipc://{path}/{pid}_{id}".format(path=self.ipcPath, pid=self.currentPID, id="dataFetch")
self.signalSocket = None
self.eventSocket = None
self.dataSocket = None
self.signalSocket = None
self.eventDetSocket = None
self.dataFetchSocket = None
self.poller = zmq.Poller()
self.poller = zmq.Poller()
self.filename = False
self.openFile = False
self.filePart = None
self.filename = False
self.openFile = False
self.filePart = None
self.responseTimeout = 1000
self.responseTimeout = 1000
self.__createSocket()
......@@ -72,12 +115,11 @@ class dataIngest():
# time to wait for the sender to give a confirmation of the signal
# self.signalSocket.RCVTIMEO = self.responseTimeout
connectionStr = "tcp://" + str(self.signalHost) + ":" + str(self.signalPort)
try:
self.signalSocket.connect(connectionStr)
self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
self.signalSocket.connect(self.signalConId)
self.log.info("signalSocket started (connect) for '" + self.signalConId + "'")
except Exception as e:
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
self.log.error("Failed to start signalSocket (connect): '" + self.signalConId + "'", exc_info=True)
raise
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
......@@ -85,22 +127,20 @@ class dataIngest():
self.poller.register(self.signalSocket, zmq.POLLIN)
self.eventSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.eventPort)
self.eventDetSocket = self.context.socket(zmq.PUSH)
try:
self.eventSocket.connect(connectionStr)
self.log.info("eventSocket started (connect) for '" + connectionStr + "'")
self.eventDetSocket.connect(self.eventDetConId)
self.log.info("eventDetSocket started (connect) for '" + self.eventDetConId + "'")
except:
self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
self.log.error("Failed to start eventDetSocket (connect): '" + self.eventDetConId + "'", exc_info=True)
raise
self.dataSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.dataPort)
self.dataFetchSocket = self.context.socket(zmq.PUSH)
try:
self.dataSocket.connect(connectionStr)
self.log.info("dataSocket started (connect) for '" + connectionStr + "'")
self.dataFetchSocket.connect(self.dataFetchConId)
self.log.info("dataFetchSocket started (connect) for '" + self.dataFetchConId + "'")
except:
self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
self.log.error("Failed to start dataFetchSocket (connect): '" + self.dataFetchConId + "'", exc_info=True)
raise
......@@ -121,17 +161,17 @@ class dataIngest():
def write (self, data):
# send event to eventDetector
# message = {
# "filename" : self.filename,
# "filePart" : self.filePart
# }
# message = "{ 'filename': " + self.filename + ", 'filePart': " + self.filePart + "}"
message = '{ "filePart": ' + str(self.filePart) + ', "filename": "' + self.filename + '" }'
self.eventSocket.send(message)
# send event to eventDet
message = {
"filename" : self.filename,
"filePart" : self.filePart
}
# message = '{ "filePart": ' + str(self.filePart) + ', "filename": "' + self.filename + '" }'
message = json.dumps(message)
self.eventDetSocket.send(message)
# send data to ZMQ-Queue
self.dataSocket.send(data)
self.dataFetchSocket.send(data)
self.filePart += 1
......@@ -147,10 +187,10 @@ class dataIngest():
# send close-signal to event Detector
try:
self.eventSocket.send(sendMessage)
self.log.debug("Sending signal to close the file to eventSocket. (sendMessage=" + sendMessage + ")")
self.eventDetSocket.send(sendMessage)
self.log.debug("Sending signal to close the file to eventDetSocket. (sendMessage=" + sendMessage + ")")
except:
raise Exception("Sending signal to close the file to eventSocket...failed.")
raise Exception("Sending signal to close the file to eventDetSocket...failed.")
try:
socks = dict(self.poller.poll(10000)) # in ms
......@@ -187,14 +227,14 @@ class dataIngest():
self.log.info("closing signalSocket...")
self.signalSocket.close(linger=0)
self.signalSocket = None
if self.eventSocket:
self.log.info("closing eventSocket...")
self.eventSocket.close(linger=0)
self.eventSocket = None
if self.dataSocket:
self.log.info("closing dataSocket...")
self.dataSocket.close(linger=0)
self.dataSocket = None
if self.eventDetSocket:
self.log.info("closing eventDetSocket...")
self.eventDetSocket.close(linger=0)
self.eventDetSocket = None
if self.dataFetchSocket:
self.log.info("closing dataFetchSocket...")
self.dataFetchSocket.close(linger=0)
self.dataFetchSocket = None
except:
self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
......
This diff is collapsed.
// API to ingest data into a data transfer unit
#ifndef DATATRANSFER_H
#define DATATRANSFER_H
#define version "0.0.1"
#include <json.h>
typedef struct dataTransfer dataTransfer_t;
typedef struct params_cb params_cb_t;
typedef struct metadata
{
const char *filename;
int filePart;
int fileCreateTime;
int fileModTime;
int filesize;
int chunkSize;
int chunkNumber;
} metadata_t;
typedef int (*open_cb_t)(params_cb_t *cbp, char *filename);
typedef int (*read_cb_t)(params_cb_t *cbp, metadata_t *metadata, char *payload, int payloadSize);
typedef int (*close_cb_t)(params_cb_t *cbp);
typedef enum { SUCCESS, NOTSUPPORTED, USAGEERROR, FORMATERROR, ZMQERROR, CONNECTIONFAILED, VERSIONERROR, AUTHENTICATIONFAILED, COMMUNICATIONFAILED, DATASAVINGERROR } HIDRA_ERROR;
HIDRA_ERROR dataTransfer_init (dataTransfer_t **dT, char *connectionType);
HIDRA_ERROR dataTransfer_initiate (dataTransfer_t *dT, char **targets);
HIDRA_ERROR dataTransfer_read (dataTransfer_t *dT, params_cb_t *cbp, open_cb_t openFunc, read_cb_t readFunc, close_cb_t closeFunc);
HIDRA_ERROR dataTransfer_stop (dataTransfer_t *dT);
#endif
This diff is collapsed.
// API to ingest data into a data transfer unit
//#ifndef DATAINGEST_H
#define DATAINGEST_H
#define version "0.0.1"
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
typedef int bool;
#define true 1
#define false 0
// helper functions for sending and receiving messages
// source: Pieter Hintjens: ZeroMQ; O'Reilly
static char* s_recv (void *socket)
{
zmq_msg_t message;
zmq_msg_init (&message);
int size = zmq_msg_recv (&message, socket, 0);
if (size == -1)
return NULL;
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
printf("recieved string: %s\n", string);
zmq_msg_close (&message);
string [size] = 0;
return string;
};
static int s_send (void * socket, char* string)
{
printf("Sending: %s\n", string);
int size = zmq_send (socket, string, strlen (string), 0);
if (size == -1) {
printf("Sending failed\n");
// fprintf (stderr, "ERROR: Sending failed\n");
// perror("");
};
return size;
};
typedef struct {
char *extHost;
char *localhost;
char *signalPort;
char *dataPort;
// Prepare our context and socket
void *context;
void *signalSocket;
void *dataSocket;
// Initialize poll set
// zmq::pollitem_t items [];
int numberOfStreams;
// self.recvdCloseFrom = []
char *replyToSignal;
bool allCloseRecvd;
} nexusTransfer;
int nexusTransfer_init (nexusTransfer *nT)
{
nT->extHost = "0.0.0.0";
nT->localhost = "localhost";
nT->signalPort = "50050";
nT->dataPort = "50100";
if ( (nT->context = zmq_ctx_new ()) == NULL )
{
perror("ERROR: Cannot create 0MQ context: ");
exit(9);
}
if ( (nT->signalSocket = zmq_socket (nT->context, ZMQ_REP)) == NULL )
{
perror("ERROR: Could not create 0MQ signalSocket: ");
exit(9);
}
if ( (nT->dataSocket = zmq_socket (nT->context, ZMQ_PULL)) == NULL )
{
perror("ERROR: Could not create 0MQ dataSocket: ");
exit(9);
}
nT->replyToSignal = ""
nT->allCloseRecvd = false
char connectionStr[128];
int rc;
// Create sockets
snprintf(connectionStr, sizeof(connectionStr), "tcp://%s:%s", nT->extlHost, nT->signalPort);
// rc = zmq_connect(nT->signalSocket, connectionStr);
// assert (rc == 0);
if ( zmq_bind(nT->signalSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start signalSocket (bind) for '%s'\n", connectionStr);
perror("");
} else {
printf("signalSocket started (bind) for '%s'\n", connectionStr);
}
snprintf(connectionStr, sizeof(connectionStr), "tcp://%s:%s", nT->localhost, nT->dataPort);
// rc = zmq_connect(nT->dataSocket, connectionStr);
// assert (rc == 0);
if ( zmq_bind(nT->dataSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start dataSocket (bind) for '%s'\n", connectionStr);
perror("");
} else {
printf("dataSocket started (bind) for '%s'\n", connectionStr);
}
return 0;
}
int nexusTransfer_read (nexusTransfer *nT, char *data, int size)
{
zmq_pollitem_t items [] = {
{ nT->signalSocket, 0, ZMQ_POLLIN, 0 },
{ nT->dataSocket, 0, ZMQ_POLLIN, 0 }
};
while (1)
{
printf ("polling");
zmq_poll (items, 2, -1);