From 9a8d8d68d92a150e13f3757b3382a0f42a83f935 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Wed, 29 Jul 2015 16:15:30 +0200 Subject: [PATCH] Added protoype of coordinator class --- ZeroMQTunnel/receiver.py | 146 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index 54154f7e..e74e9710 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -15,6 +15,152 @@ from stat import S_ISREG, ST_MTIME, ST_MODE import threading +class Coordinator: + zmqContext = None + liveViewerZmqContext = None + outputDir = None + zqmDataStreamIp = None + zmqDataStreamPort = None + zmqLiveViewerIp = None + zmqLiveViewerPort = None + liveViewerExchangeIp = "127.0.0.1" + liveViewerExchangePort = "6072" + receiverExchangeIp = "127.0.0.1" + receiverExchangePort = "6073" + ringBuffer = [] + maxRingBufferSize = 200 + timeToWaitForRingBuffer = 2 + + log = None + + receiverThread = None + liveViewerThread = None + + def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None): + self.outputDir = outputDir + self.zmqDataStreamIp = zmqDataStreamIp + self.zmqDataStreamPort = zmqDataStreamPort + self.zmqLiveViewerIp = zmqLiveViewerIp + self.zmqLiveViewerPort = zmqLiveViewerPort + + self.log = self.getLogger() + self.log.debug("Init") + + if context: + assert isinstance(context, zmq.sugar.context.Context) + + self.zmqContext = context or zmq.Context() + + # create sockets + self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIL) + connectionStrReceiverExchangeSocket = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort + self.zmqSocket.bind(connectionStrReceiverExchangeSocket) + self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStrReceiverExchangeSocket + "'") + + self.liveViewerExchangeSocket = self.zmqContext.socket(zmq.PAIR) + connectionStrLiveViewerExchangeSocket = "tcp://" + self.liveViewerExchangeIp + ":%s" % self.liveViewerExchangePort + self.liveViewerExchangeSocket.bind(connectionStrLiveViewerExchangeSocket) + self.log.debug("liveViewerExchangeSocket started (bind) for '" + connectionStrLiveViewerExchangeSocket + "'") + + self.poller = zmq.Poller() + self.poller.register(self.receiverExchangeSocket, zmq.POLLIN) + self.poller.register(self.liveViewerExchangeSocket, zmq.POLLIN) + + + # start file receiver + self.receiverThread = threading.Thread(target=FileReceiver, args=(self.outputDir, self.zmqDataStreamPort, self.zmqDataStreamIp, self.zmqLiveViewerPort, self.zmqLiveViewerIp)) + self.receiverThread.start() + + # thread to communicate with live viewer + self.liveViewerThread = threading.Thread(target=LiveViewer) + self.liveViewerThread.start() + + + # initialize ring buffer + # get all entries in the directory + # TODO empty target dir -> ringBuffer = [] + self.ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir)) + # get the corresponding stats + self.ringBuffer = ((os.stat(path), path) for path in self.ringBuffer) + # leave only regular files, insert modification date + self.ringBuffer = [[stat[ST_MTIME], path] + for stat, path in self.ringBuffer if S_ISREG(stat[ST_MODE])] + + # sort the ring buffer in descending order (new to old files) + self.ringBuffer = sorted(self.ringBuffer, reverse=True) + self.log.debug("Init ring buffer") + + + try: + self.log.info("Start communication") + self.communicateWithThreads() + self.log.info("Stopped communication.") + except Exception, e: + trace = traceback.format_exc() + self.log.info("Unkown error state. Shutting down...") + self.log.debug("Error was: " + str(e)) + + self.log.debug("Closing socket") + self.receiverExchangeSocket.close() + self.liveViewerExchangeSocket.close() + + self.log.info("Quitting.") + + + def getLogger(self): + logger = logging.getLogger("coordinator") + return logger + + + def communicateWithThreads(self): + should_continue = True + + while should_continue: + socks = dict(self.poller.poll()) + + if self.receiverExchangeSocket in socks and socks[self.receiverExchangeSocket] == zmq.POLLIN: + message = self.receiverExchangeSocket.recv() + self.log.debug("Recieved control command: %s" % message ) + if message == "Exit": + self.log.debug("Recieved exit command, liveViewer thread will stop recieving messages") + should_continue = False + self.liveViewerSocket.send("Exit") + break + elif message.startswith("AddFile"): + # add file to ring buffer + filename, fileModTime = message[7:].split(", ") + addFileToRingBuffer(filename, fileModTime) + + if self.liveViewerExchangeSocket in socks and socks[self.liveViewerExchangeSocket] == zmq.POLLIN: + message = self.liveViewerExchangeSocket.recv() + self.log.debug("Call for next file... ") + # send first element in ring buffer to live viewer (the path of this file is the second entry) + if self.ringBuffer: + answer = self.ringBuffer[0][1] + else: + answer = "None" + + print answer + try: + self.liveViewerExchangeSocket.send(answer) + except zmq.error.ContextTerminated: + break + + + def addFileToRingBuffer(self, filename, fileModTime): + # prepend file to ring buffer and restore order + self.ringBuffer[:0] = [[fileModTime, filename]] + self.ringBuffer = sorted(self.ringBuffer, reverse=True) + + # if the maximal size is exceeded: remove the oldest files + if len(self.ringBuffer) > self.maxRingBufferSize: + for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]: + if float(time.time()) - mod_time > self.timeToWaitForRingBuffer: + os.remove(path) + self.ringBuffer.remove([mod_time, path]) + + + class FileReceiver: zmqContext = None liveViewerZmqContext = None -- GitLab