Commit 3755856f authored by Manuela Kuhn's avatar Manuela Kuhn

Merge branch 'releases-3.0.0'

parents 2020be52 a4243244
[bumpversion]
current_version = 3.0.0
tag = False
commit = False
files = freeze_setup.py src/APIs/hidra/_version.py src/shared/_version.py
[bumpversion:file:docs/ReleaseNotes.txt]
search = HiDRA develop
replace = HiDRA develop
HiDRA {new_version}
[bumpversion:file:hidra.spec]
search = %changelog
replace = %changelog
{now:%a %b %d %Y} Manuela Kuhn <manuela.kuhn@desy.de> - {new_version}-1
Bump version
[bumpversion]
current_version = 3.0.0
tag = False
commit = False
[bumpversion:file:hidra.spec]
search = Version: {current_version}
replace = Version: {new_version}
......@@ -2,3 +2,4 @@
*.pyc
*.swp
*.o
*.x
dataIngest_h:
gcc -o dataIngestAPI.o dataIngestAPI.h -lzmq -c
dataIngest_c:
gcc -o dataIngestAPI.o dataIngestAPI.c -lzmq
// API to ingest data into a data transfer unit
#include <dataIngestAPI.h>
#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
// helper functions for sending and receiving messages
// source: Pieter Hintjens: ZeroMQ; O'Reilly
static char* s_recv (void *socket)
{
zmq_msg_t message;
zmq_msg_init (&message);
int size = zmq_msg_recv (&message, socket, 0);
if (size == -1)
return NULL;
char *string = malloc (size + 1);
memcpy (string, zmq_msg_data (&message), size);
printf("recieved string: %s\n", string);
zmq_msg_close (&message);
string [size] = 0;
return string;
};
static int s_send (void * socket, char* string)
{
printf("Sending: %s\n", string);
int size = zmq_send (socket, string, strlen (string), 0);
if (size == -1) {
printf("Sending failed\n");
// fprintf (stderr, "ERROR: Sending failed\n");
// perror("");
};
return size;
};
struct dataIngest {
char *signalHost;
char *extHost;
char *signalPort;
// has to be the same port as configured in dataManager.conf as eventPort
char *eventPort;
char *dataPort;
// Prepare our context and socket
void *context;
void *signalSocket;
void *eventSocket;
void *dataSocket;
// Initialize poll set
// zmq::pollitem_t items [];
char *filename;
int openFile;
int filePart;
int responseTimeout;
};
int dataIngest_init (dataIngest **out)
{
dataIngest* dI = malloc(sizeof(dataIngest));
*out = NULL;
dI->signalHost = "zitpcx19282";
dI->extHost = "0.0.0.0";
dI->signalPort = "50050";
dI->eventPort = "50003";
dI->dataPort = "50010";
if ( (dI->context = zmq_ctx_new ()) == NULL )
{
perror("ERROR: Cannot create 0MQ context: ");
exit(9);
}
if ( (dI->signalSocket = zmq_socket (dI->context, ZMQ_REQ)) == NULL )
{
perror("ERROR: Could not create 0MQ signalSocket: ");
exit(9);
}
if ( (dI->eventSocket = zmq_socket (dI->context, ZMQ_PUSH)) == NULL )
{
perror("ERROR: Could not create 0MQ eventSocket: ");
exit(9);
}
if ( (dI->dataSocket = zmq_socket (dI->context, ZMQ_PUSH)) == NULL )
{
perror("ERROR: Could not create 0MQ dataSocket: ");
exit(9);
}
dI->filePart = 0;
dI->responseTimeout = 1000;
char connectionStr[128];
int rc;
// Create sockets
snprintf(connectionStr, sizeof(connectionStr), "tcp://%s:%s", dI->signalHost, dI->signalPort);
// rc = zmq_connect(dI->signalSocket, connectionStr);
// assert (rc == 0);
if ( zmq_connect(dI->signalSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start signalSocket (connect) for '%s'\n", connectionStr);
perror("");
} else {
printf("signalSocket started (connect) for '%s'\n", connectionStr);
}
snprintf(connectionStr, sizeof(connectionStr), "tcp://localhost:%s", dI->eventPort);
// rc = zmq_connect(dI->eventSocket, connectionStr);
// assert (rc == 0);
if ( zmq_connect(dI->eventSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start eventSocket (connect) for '%s'\n", connectionStr);
perror("");
} else {
printf("eventSocket started (connect) for '%s'\n", connectionStr);
}
snprintf(connectionStr, sizeof(connectionStr), "tcp://localhost:%s", dI->dataPort);
// rc = zmq_connect(dI->dataSocket, connectionStr);
// assert (rc == 0);
if ( zmq_connect(dI->dataSocket, connectionStr) )
{
fprintf (stderr, "ERROR: Could not start dataSocket (connect) for '%s'\n", connectionStr);
perror("");
} else {
printf("dataSocket started (connect) for '%s'\n", connectionStr);
}
*out = dI;
return 0;
}
int dataIngest_createFile (dataIngest *dI, char *fileName)
{
// if self.openFile and self.openFile != filename:
// raise Exception("File " + str(filename) + " already opened.")
char *message;
int rc;
// Send notification to receiver
rc = s_send (dI->signalSocket, "OPEN_FILE");
printf ("Sending signal to open a new file.\n");
// s_recv (dI->signalSocket, message);
message = s_recv (dI->signalSocket);
printf ("Received responce: '%s'\n", message);
dI->filename = fileName;
dI->filePart = 0;
free (message);
return 0;
}
int dataIngest_write (dataIngest *dI, char *data, int size)
//int dataIngest_write (dataIngest *dI, void *data, int &size)
{
char message[128];
int rc;
snprintf(message, sizeof(message), "{ \"filePart\": %d, \"filename\": \"%s\" }", dI->filePart, dI->filename);
// Send event to eventDetector
rc = s_send (dI->eventSocket, message);
// Send data to ZMQ-Queue
rc = s_send (dI->dataSocket, data);
dI->filePart += 1;
return 0;
};
int dataIngest_closeFile (dataIngest *dI)
{
char *message = "CLOSE_FILE";
char *answer;
int rc;
// Send close-signal to signal socket
rc = s_send (dI->signalSocket, message);
printf ("Sending signal to close the file to signalSocket.\n");
// self.log.error("Sending signal to close the file to signalSocket...failed.")
// send close-signal to event Detector
rc = s_send (dI->eventSocket, message);
printf ("Sending signal to close the file to eventSocket.(sendMessage=%s)\n", message);
// self.log.error("Sending signal to close the file to eventSocket...failed.)")
// try:
// socks = dict(self.poller.poll(10000)) # in ms
// except:
// socks = None
// self.log.error("Could not poll for signal", exc_info=True)
//
// if socks and self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
// Get the reply.
// s_recv (dI->signalSocket, answer);
answer = s_recv (dI->signalSocket);
printf ("Received answer to signal: %s\n", answer);
// else:
// recvMessage = None
if ( message != answer )
{
printf ("recieved message: %s\n", answer);
printf ("send message: %s\n", message);
// raise Exception("Something went wrong while notifying to close the file")
};
dI->filename = "";
dI->filePart = 0;
free (answer);
return 0;
};
int dataIngest_stop (dataIngest *dI)
{
printf ("closing signalSocket...\n");
zmq_close(dI->signalSocket);
printf ("closing eventSocket...\n");
zmq_close(dI->eventSocket);
printf ("closing dataSocket...\n");
zmq_close(dI->dataSocket);
// self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
printf ("Closing ZMQ context...\n");
zmq_ctx_destroy(dI->context);
// self.log.error("Closing ZMQ context...failed.", exc_info=True)
// free (dI);
printf ("Cleanup finished.\n");
return 0;
};
# API to ingest data into a data transfer unit
DataIngest::DataIngest ()
{
std::string connectionStr("tcp://" + signalHost + ":" + signalPort);
signalSocket.connect(connectionStr.c_str());
std::cout << "signalSocket started (connect) for '" << connectionStr << "'";
// self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
// subscriber.setsockopt(ZMQ_SUBSCRIBE, "10001 ", 6);
//# self.signalSocket.RCVTIMEO = self.responseTimeout
// Initialize poll set
zmq::pollitem_t items [] = {
{ signalSocket, 0, ZMQ_POLLIN, 0 },
};
std::string connectionStr("tcp://localhost:" + eventPort);
eventSocket.connect(connectionStr.c_str());
std::cout << "eventSocket started (connect) for '" << connectionStr << "'";
// self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
std::string connectionStr("tcp://localhost:" + dataPort);
dataSocket.connect(connectionStr.c_str());
std::cout << "dataSocket started (connect) for '" << connectionStr << "'";
// self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
};
int DataIngest::createFile (std::string fileName)
{
// if self.openFile and self.openFile != filename:
// raise Exception("File " + str(filename) + " already opened.")
// Send notification to receiver
signalSocket.send("OPEN_FILE")
std::cout << "Sending signal to open a new file.";
zmq::message_t response;
signalSocket.recv(&response)
std::cout << "Received responce:" << static_cast<char*>(response.data()) << std::endl;
filename = fileName
filePart = 0
};
void my_free (void *data, void *hint)
{
free (data);
};
int DataIngest::write (void *data, int &size)
{
std::string message = "{ \"filePart\": " + std::to_string(filePart) + ", \"filename\": \"" + filename + "\" }";
zmq::message_t eventMessage (message.length());
memcpy ( eventMessage.data (), message.c_str(), message.length());
// Send event to eventDetector
eventSocket.send(eventMessage);
zmq::message_t dataMessage (data, size, my_free);
// zmq::message_t dataMessage (size);
// memcpy ( dataMessage.data (), &data, size);
// Send data to ZMQ-Queue
dataSocket.send(dataMessage);
filePart += 1
};
int DataIngest::closeFile()
{
zmq::message_t resvMessage;
char* sendMessage = "CLOSE_FILE";
// Send close-signal to signal socket
signalSocket.send(sendMessage);
std::cout << "Sending signal to close the file to signalSocket.";
// self.log.error("Sending signal to close the file to signalSocket...failed.")
// send close-signal to event Detector
eventSocket.send(sendMessage)
std::cout << "Sending signal to close the file to eventSocket.(sendMessage=" << sendMessage << ")";
// self.log.error("Sending signal to close the file to eventSocket...failed.)")
// try:
// socks = dict(self.poller.poll(10000)) # in ms
// except:
// socks = None
// self.log.error("Could not poll for signal", exc_info=True)
//
// if socks and self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
std::cout << "Received answer to signal..." << std:endl;
// Get the reply.
signalSocket.recv(&resvMessage)
std::cout << "Received answer to signal: " + static_cast<char*>(recvMessage) << std::endl;
// else:
// recvMessage = None
if ( recvMessage != sendMessage )
{
std::cout << "recieved message: " << recvMessage << ")" << std::endl;
std::cout << "send message: " + sendMessage << std::endl;
// raise Exception("Something went wrong while notifying to close the file")
};
openFile = ""
filePart = 0
};
DataIngest::~DataIngest()
{
// try:
// if self.signalSocket:
// self.log.info("closing eventSocket...")
// self.signalSocket.close(linger=0)
// self.signalSocket = None
// if self.eventSocket:
// self.log.info("closing eventSocket...")
// self.eventSocket.close(linger=0)
// self.eventSocket = None
// if self.dataSocket:
// self.log.info("closing dataSocket...")
// self.dataSocket.close(linger=0)
// self.dataSocket = None
// except:
// self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
//
// # if the context was created inside this class,
// # it has to be destroyed also within the class
// if not self.extContext and self.context:
// try:
// self.log.info("Closing ZMQ context...")
// self.context.destroy()
// self.context = None
// self.log.info("Closing ZMQ context...done.")
// except:
// self.log.error("Closing ZMQ context...failed.", exc_info=True)
};
// API to ingest data into a data transfer unit
#ifndef DATAINGEST_H
#define DATAINGEST_H
#define version "0.0.1"
typedef struct dataIngest dataIngest;
int dataIngest_init (dataIngest **dI);
int dataIngest_createFile (dataIngest *dI, char *fileName);
int dataIngest_write (dataIngest *dI, char *data, int size);
int dataIngest_closeFile (dataIngest *dI);
int dataIngest_stop (dataIngest *dI);
#endif
# API to ingest data into a data transfer unit
#ifndef DATAINGEST_H
#define DATAINGEST_H
#define version "0.0.1"
#include <zmq.hpp>
#include <string>
//import logging
//import cPickle
//import traceback
//typedef std::map<std::string, int> Dict;
class DataIngest
{
std::string signalHost = "zitpcx19282";
std::string signalPort = "50050";
// has to be the same port as configured in dataManager.conf as eventPort
std::string eventPort = "50003";
std::string dataPort = "50010";
zmq::context_t context (1);
zmq::socket_t signalSocket (context, ZMQ_REQ);
zmq::socket_t eventSocket (context, ZMQ_PUSH);
zmq::socket_t dataSocket (context, ZMQ_PUSH);
// Initialize poll set
zmq::pollitem_t items [];
std::string self.filename;
bool openFile;
int filePart;
int responseTimeout = 1000;
DataIngest ();
createFile (std::string filename);
write(data);
closeFile();
stop();
~DataIngest();
};
#endif
# API to ingest data into a data transfer unit
__version__ = '0.0.1'
import zmq
import logging
import cPickle
import traceback
class dataIngest():
# return error code
def __init__ (self, useLog = False, context = None):
if useLog:
self.log = logging.getLogger("dataIngestAPI")
else:
class loggingFunction:
def out(self, x, exc_info = None):
if exc_info:
print x, traceback.format_exc()
else:
print x
def __init__(self):
self.debug = lambda x, exc_info=None: self.out(x, exc_info)
self.info = lambda x, exc_info=None: self.out(x, exc_info)
self.warning = lambda x, exc_info=None: self.out(x, exc_info)
self.error = lambda x, exc_info=None: self.out(x, exc_info)
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
self.log = loggingFunction()
# ZMQ applications always start by creating a context,
# and then using that for creating sockets
# (source: ZeroMQ, Messaging for Many Applications by Pieter Hintjens)
if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
self.signalHost = "zitpcx19282"
self.signalPort = "50050"
# has to be the same port as configured in dataManager.conf as eventPort
self.eventPort = "50003"
#TODO add port in config
# has to be the same port as configured in dataManager.conf as ...
self.dataPort = "50010"
self.signalSocket = None
self.eventSocket = None
self.dataSocket = None
self.poller = zmq.Poller()
self.filename = False
self.openFile = False
self.filePart = None
self.responseTimeout = 1000
self.__createSocket()
def __createSocket (self):
# To send file open and file close notification, a communication socket is needed
self.signalSocket = self.context.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
# self.signalSocket.RCVTIMEO = self.responseTimeout
connectionStr = "tcp://" + str(self.signalHost) + ":" + str(self.signalPort)
try:
self.signalSocket.connect(connectionStr)
self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
# self.poller = zmq.Poller()
self.poller.register(self.signalSocket, zmq.POLLIN)
self.eventSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.eventPort)
try:
self.eventSocket.connect(connectionStr)
self.log.info("eventSocket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
self.dataSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.dataPort)
try:
self.dataSocket.connect(connectionStr)
self.log.info("dataSocket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# return error code
def createFile (self, filename):
if self.openFile and self.openFile != filename:
raise Exception("File " + str(filename) + " already opened.")
# send notification to receiver
self.signalSocket.send("OPEN_FILE")
self.log.info("Sending signal to open a new file.")
message = self.signalSocket.recv()
self.log.debug("Received responce: " + str(message))
self.filename = filename
self.filePart = 0
def write (self, data):