Skip to content
Snippets Groups Projects
Commit 6bfe7c0c authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

First version and test of API for data ingest

parent add472af
No related branches found
No related tags found
No related merge requests found
# API to ingest data into a data transfer unit
__version__ = '0.0.1'
import zmq
import socket
import logging
import json
import errno
import os
import cPickle
import traceback
class dataIngest():
# return error code
def __init__(self, useLog = False, context = None):
if useLog:
self.log = logging.getLogger("dataIngestAPI")
else:
class loggingFunction:
def out(self, x, exc_info = None):
if exc_info:
print x, traceback.format_exc()
else:
print x
def __init__(self):
self.debug = lambda x, exc_info=None: self.out(x, exc_info)
self.info = lambda x, exc_info=None: self.out(x, exc_info)
self.warning = lambda x, exc_info=None: self.out(x, exc_info)
self.error = lambda x, exc_info=None: self.out(x, exc_info)
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
self.log = loggingFunction()
# ZMQ applications always start by creating a context,
# and then using that for creating sockets
# (source: ZeroMQ, Messaging for Many Applications by Pieter Hintjens)
if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
self.signalHost = "zitpcx19282"
self.signalPort = "6000"
self.eventPort = "6001"
self.dataPort = "6002"
self.eventSocket = None
self.dataSocket = None
self.openFile = False
self.filePart = None
self.responseTimeout = 1000
self.__createSocket()
def __createSocket(self):
# To send file open and file close notification, a communication socket is needed
self.signalSocket = self.context.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
# self.signalSocket.RCVTIMEO = self.responseTimeout
connectionStr = "tcp://" + str(self.signalHost) + ":" + str(self.signalPort)
try:
self.signalSocket.connect(connectionStr)
self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
self.poller = zmq.Poller()
self.poller.register(self.signalSocket, zmq.POLLIN)
self.eventSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.eventPort)
try:
self.eventSocket.connect(connectionStr)
self.log.info("eventSocket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
self.dataSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.dataPort)
try:
self.dataSocket.connect(connectionStr)
self.log.info("dataSocket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# return error code
def createFile(self, filename):
if self.openFile and self.openFile != filename:
raise Exception("File " + str(filename) + " already opened.")
# send notification to receiver
self.signalSocket.send("openFile")
self.log.info("Sending signal to open a new file.")
message = self.signalSocket.recv()
self.log.debug("Received responce: " + str(message))
self.filename = filename
self.filePart = 0
def write(self, data):
# send event to eventDetector
message = {
"filename" : self.filename,
"filePart" : self.filePart
}
self.eventSocket.send(cPickle.dumps(message))
# send data to ZMQ-Queue
self.dataSocket.send(data)
self.filePart += 1
# return error code
def closeFile(self):
# send close-signal to signal socket
sendMessage = "closeFile"
self.signalSocket.send(sendMessage)
self.log.info("Sending signal to close the file.")
recvMessage = self.signalSocket.recv()
if recvMessage != sendMessage:
self.log.debug("recieved message: " + str(recvMessage))
self.log.debug("send message: " + str(sendMessage))
raise Exception("Something went wrong while notifying to close the file")
# send close-signal to event Detector
self.eventSocket.send(sendMessage)
self.log.debug("Sending signal to close the file.")
self.openFile = None
self.filePart = None
##
#
# Send signal that the displayer is quitting, close ZMQ connections, destoying context
#
##
def stop(self):
try:
if self.signalSocket:
self.log.info("closing eventSocket...")
self.signalSocket.close(linger=0)
self.signalSocket = None
if self.eventSocket:
self.log.info("closing eventSocket...")
self.eventSocket.close(linger=0)
self.eventSocket = None
if self.dataSocket:
self.log.info("closing dataSocket...")
self.dataSocket.close(linger=0)
self.dataSocket = None
except:
self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
# if the context was created inside this class,
# it has to be destroyed also within the class
if not self.extContext and self.context:
try:
self.log.info("Closing ZMQ context...")
self.context.destroy()
self.context = None
self.log.info("Closing ZMQ context...done.")
except:
self.log.error("Closing ZMQ context...failed.", exc_info=True)
def __exit__(self):
self.stop()
def __del__(self):
self.stop()
......@@ -83,7 +83,7 @@ class DataDispatcher():
if dataFetcher in supportedDataFetchers:
self.log.info("Loading data Fetcher: " + dataFetcher)
self.dataFetcher = __import__(dataFetcher)
self.dataFetcher = __import__(dataFetcher)
self.dataFetcher.setup(dataFetcherProp)
else:
raise Exception("DataFetcher type " + dataFetcher + " not supported")
......@@ -136,6 +136,13 @@ class DataDispatcher():
# sort the target list by the priority
targets = sorted(targets, key=lambda target: target[1])
else:
closeFile = message[0] == b"CLOSE_FILE"
if closeFile:
self.log.debug("Router requested to send signal that file was closed.")
time.sleep(2)
self.log.debug("Continue after sleeping.")
continue
finished = message[0] == b"EXIT"
if finished:
self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
......
......@@ -4,6 +4,7 @@ __author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import os
import zmq
import logging
import cPickle
from logutils.queue import QueueHandler
......@@ -16,16 +17,19 @@ class LambdaDetector():
# check format of config
checkPassed = True
if ( not config.has_key("context") or
not config.has_key("eventPort") ):
not config.has_key("eventPort") or
not config.has_key("numberOfStreams") ):
self.log.error ("Configuration of wrong format")
self.log.debug ("config="+ str(config))
checkPassed = False
if checkPassed:
self.eventPort = config["eventPort"]
self.extIp = "0.0.0.0"
self.eventSocket = None
self.eventPort = config["eventPort"]
self.extIp = "0.0.0.0"
self.eventSocket = None
self.numberOfStreams = config["numberOfStreams"]
self.log.debug("Registering ZMQ context")
# remember if the context was created outside this class or not
......@@ -67,11 +71,14 @@ class LambdaDetector():
def getNewEvent(self):
eventMessageList = None
eventMessage = {}
eventMessage = self.eventSocket.recv()
self.log.debug("eventMessage: " + str(eventMessage))
if eventMessage == b"CLOSE_FILE":
eventMessageList = [ eventMessage for i in range(self.numberOfStreams) ]
else:
eventMessageList = [ cPickle.loads(eventMessage) ]
self.log.debug("eventMessage: " + str(eventMessageList))
return eventMessageList
......@@ -105,7 +112,7 @@ if __name__ == '__main__':
from subprocess import call
from multiprocessing import Queue
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ))))
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
print "SHARED", SHARED_PATH
......@@ -133,10 +140,12 @@ if __name__ == '__main__':
root.addHandler(qh)
eventPort = "6001"
eventPort = "6001"
numberOfStreams = 4
config = {
"eventDetectorType" : "lambda",
"eventPort" : eventPort,
"numberOfStreams" : numberOfStreams,
"context" : None,
}
......@@ -157,11 +166,15 @@ if __name__ == '__main__':
i = 100
while i <= 105:
while i <= 101:
try:
logging.debug("generate event")
targetFile = targetFileBase + str(i) + ".cbf"
eventSocket.send(targetFile)
message = {
"filename" : targetFile,
"filePart" : 0
}
eventSocket.send(cPickle.dumps(message))
i += 1
eventList = eventDetector.getNewEvent()
......@@ -172,5 +185,10 @@ if __name__ == '__main__':
except KeyboardInterrupt:
break
eventSocket.send(b"CLOSE_FILE")
eventList = eventDetector.getNewEvent()
print "eventList:", eventList
logQueue.put_nowait(None)
logQueueListener.stop()
import os
import sys
import time
import zmq
import logging
import threading
import cPickle
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) )
API_PATH = BASE_PATH + os.sep + "APIs"
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
if not API_PATH in sys.path:
sys.path.append ( API_PATH )
del API_PATH
from dataIngestAPI import dataIngest
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
import helpers
#enable logging
logfilePath = os.path.join(BASE_PATH + os.sep + "logs")
logfile = os.path.join(logfilePath, "testIngestAPI.log")
helpers.initLogging(logfile, True, "DEBUG")
del BASE_PATH
print
print "==== TEST: data ingest ===="
print
class Receiver(threading.Thread):
def __init__(self):
self.extHost = "0.0.0.0"
self.signalPort = "6000"
self.eventPort = "6001"
self.dataPort = "6002"
self.context = zmq.Context()
self.signalSocket = self.context.socket(zmq.REP)
connectionStr = "tcp://" + str(self.extHost) + ":" + str(self.signalPort)
self.signalSocket.bind(connectionStr)
logging.info("signalSocket started (bind) for '" + connectionStr + "'")
self.eventSocket = self.context.socket(zmq.PULL)
connectionStr = "tcp://" + str(self.extHost) + ":" + str(self.eventPort)
self.eventSocket.bind(connectionStr)
logging.info("eventSocket started (bind) for '" + connectionStr + "'")
self.dataSocket = self.context.socket(zmq.PULL)
connectionStr = "tcp://" + str(self.extHost) + ":" + str(self.dataPort)
self.dataSocket.bind(connectionStr)
logging.info("dataSocket started (bind) for '" + connectionStr + "'")
threading.Thread.__init__(self)
def run(self):
message = self.signalSocket.recv()
logging.debug("signalSocket recv: " + message)
self.signalSocket.send(message)
logging.debug("signalSocket send: " + message)
for i in range(5):
logging.debug("eventSocket recv: " + str(cPickle.loads(self.eventSocket.recv())))
logging.debug("dataSocket recv: " + self.dataSocket.recv())
message = self.signalSocket.recv()
logging.debug("signalSocket recv: " + message)
self.signalSocket.send(message)
logging.debug("signalSocket send: " + message)
logging.debug("eventSocket recv: " + self.eventSocket.recv())
def stop(self):
try:
if self.signalSocket:
logging.info("closing eventSocket...")
self.signalSocket.close(linger=0)
self.signalSocket = None
if self.eventSocket:
logging.info("closing eventSocket...")
self.eventSocket.close(linger=0)
self.eventSocket = None
if self.dataSocket:
logging.info("closing dataSocket...")
self.dataSocket.close(linger=0)
self.dataSocket = None
except:
self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
receiverThread = Receiver()
receiverThread.start()
obj = dataIngest(useLog = True)
obj.createFile("1.h5")
for i in range(5):
try:
data = "asdfasdasdfasd"
obj.write(data)
print "write"
except:
logging.error("break", exc_info=True)
break
obj.closeFile()
receiverThread.stop()
obj.stop()
print
print "==== TEST END: data Ingest ===="
print
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment