From 683e26ea00b0aea294a52557fe20ea882bec9569 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Thu, 30 Jul 2015 10:45:41 +0200 Subject: [PATCH] Removed unnecessay communication sockets --- ZeroMQTunnel/receiver.py | 308 +++++++++++++-------------------------- 1 file changed, 100 insertions(+), 208 deletions(-) diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index 04d2cf84..e111ff5f 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -15,177 +15,6 @@ from stat import S_ISREG, ST_MTIME, ST_MODE import threading -class Coordinator: - zmqContext = None - liveViewerZmqContext = None - outputDir = None - zqmDataStreamIp = None - zmqDataStreamPort = None - zmqLiveViewerIp = None - zmqLiveViewerPort = None - receiverExchangeIp = "127.0.0.1" - receiverExchangePort = "6072" - liveViewerExchangeIp = "127.0.0.1" - liveViewerExchangePort = "6073" - ringBuffer = [] - maxRingBufferSize = 200 - timeToWaitForRingBuffer = 2 - - log = None - - receiverThread = None - liveViewerThread = None - - # sockets - receiverExchangeSocket = None - liveViewerExchangeSocket = None - zmqliveViewerSocket = None - - - def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None): - self.outputDir = outputDir - self.zmqDataStreamIp = zmqDataStreamIp - self.zmqDataStreamPort = zmqDataStreamPort - self.zmqLiveViewerIp = zmqLiveViewerIp - self.zmqLiveViewerPort = zmqLiveViewerPort - - self.log = self.getLogger() - self.log.debug("Init") - - if context: - assert isinstance(context, zmq.sugar.context.Context) - - self.zmqContext = context or zmq.Context() - - # create sockets - self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIR) - connectionStrReceiverExchangeSocket = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort - self.receiverExchangeSocket.bind(connectionStrReceiverExchangeSocket) - self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStrReceiverExchangeSocket + "'") - - # TODO use this to communicate with live viewer class - self.liveViewerExchangeSocket = self.zmqContext.socket(zmq.PAIR) - connectionStrLiveViewerExchangeSocket = "tcp://" + self.liveViewerExchangeIp + ":%s" % self.liveViewerExchangePort - self.liveViewerExchangeSocket.bind(connectionStrLiveViewerExchangeSocket) - self.log.debug("liveViewerExchangeSocket started (bind) for '" + connectionStrLiveViewerExchangeSocket + "'") - - # create socket for live viewer - self.zmqliveViewerSocket = self.zmqContext.socket(zmq.REP) - connectionStrLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort - self.zmqliveViewerSocket.bind(connectionStrLiveViewerSocket) - self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStrLiveViewerSocket + "'") - - self.poller = zmq.Poller() - self.poller.register(self.receiverExchangeSocket, zmq.POLLIN) -# self.poller.register(self.liveViewerExchangeSocket, zmq.POLLIN) - self.poller.register(self.zmqliveViewerSocket, zmq.POLLIN) - - - # thread to communicate with live viewer -# self.liveViewerThread = threading.Thread(target=LiveViewer) -# self.liveViewerThread.start() - - - # initialize ring buffer - # get all entries in the directory - # TODO empty target dir -> ringBuffer = [] - self.ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir)) - # get the corresponding stats - self.ringBuffer = ((os.stat(path), path) for path in self.ringBuffer) - # leave only regular files, insert modification date - self.ringBuffer = [[stat[ST_MTIME], path] - for stat, path in self.ringBuffer if S_ISREG(stat[ST_MODE])] - - # sort the ring buffer in descending order (new to old files) - self.ringBuffer = sorted(self.ringBuffer, reverse=True) - self.log.debug("Init ring buffer") - - - try: - self.log.info("Start communication") - self.communicate() - self.log.info("Stopped communication.") - except Exception, e: - trace = traceback.format_exc() - self.log.info("Unkown error state. Shutting down...") - self.log.debug("Error was: " + str(e)) - - - self.log.info("Quitting.") - - - def getLogger(self): - logger = logging.getLogger("coordinator") - return logger - - - def communicate(self): - should_continue = True - - while should_continue: - socks = dict(self.poller.poll()) - - if self.receiverExchangeSocket in socks and socks[self.receiverExchangeSocket] == zmq.POLLIN: - message = self.receiverExchangeSocket.recv() - self.log.debug("Recieved control command: %s" % message ) - if message == "Exit": - self.log.debug("Recieved exit command, coordinator thread will stop recieving messages") - should_continue = False - self.liveViewerSocket.send("Exit") - break - elif message.startswith("AddFile"): - self.log.debug("Received AddFile command") - # add file to ring buffer - splittedMessage = message[7:].split(", ") - filename = splittedMessage[0] - fileModTime = splittedMessage[1] - self.log.debug("Send new file to ring buffer: " + str(filename) + ", " + str(fileModTime)) - self.addFileToRingBuffer(filename, fileModTime) - -# if self.liveViewerExchangeSocket in socks and socks[self.liveViewerExchangeSocket] == zmq.POLLIN: -# message = self.liveViewerExchangeSocket.recv() - if self.zmqliveViewerSocket in socks and socks[self.zmqliveViewerSocket] == zmq.POLLIN: - message = self.zmqliveViewer.recv() - self.log.debug("Call for next file... " + message) - # send first element in ring buffer to live viewer (the path of this file is the second entry) - if self.ringBuffer: - answer = self.ringBuffer[0][1] - else: - answer = "None" - - print answer - try: -# self.liveViewerExchangeSocket.send(answer) - self.zmqliveViewerSocket.send(answer) - except zmq.error.ContextTerminated: - break - - self.log.debug("sending exit signal to thread...") -# self.liveViewerExchangeSocket.send("Exit") - self.zmqliveViewerSocket.send("Exit") - # give the signal time to arrive - time.sleep(0.1) - - self.log.debug("Closing socket") - self.receiverExchangeSocket.close() - self.liveViewerExchangeSocket.close() - self.zmqliveViewerSocket.close() - - - def addFileToRingBuffer(self, filename, fileModTime): - # prepend file to ring buffer and restore order - self.ringBuffer[:0] = [[fileModTime, filename]] - self.ringBuffer = sorted(self.ringBuffer, reverse=True) - - # if the maximal size is exceeded: remove the oldest files - if len(self.ringBuffer) > self.maxRingBufferSize: - for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]: - if float(time.time()) - mod_time > self.timeToWaitForRingBuffer: - os.remove(path) - self.ringBuffer.remove([mod_time, path]) - - - class FileReceiver: zmqContext = None outputDir = None @@ -443,74 +272,121 @@ class FileReceiver: self.log.error(sys.exc_info()) -class LiveViewer(): +class Coordinator: zmqContext = None - liveViewerIp = None - liveViewerPort = None - exchangeIp = "127.0.0.1" - exchangePort = "6072" + liveViewerZmqContext = None + outputDir = None + zqmDataStreamIp = None + zmqDataStreamPort = None + zmqLiveViewerIp = None + zmqLiveViewerPort = None + receiverExchangeIp = "127.0.0.1" + receiverExchangePort = "6072" + ringBuffer = [] + maxRingBufferSize = 200 + timeToWaitForRingBuffer = 2 log = None + receiverThread = None + liveViewerThread = None + # sockets - liveViewerSocket = None - exchangeSocket = None + receiverExchangeSocket = None + zmqliveViewerSocket = None - poller = None + def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None): + self.outputDir = outputDir + self.zmqDataStreamIp = zmqDataStreamIp + self.zmqDataStreamPort = zmqDataStreamPort + self.zmqLiveViewerIp = zmqLiveViewerIp + self.zmqLiveViewerPort = zmqLiveViewerPort - def __init__(self, liveViewerIp = "127.0.0.1", liveViewerPort = "6071", context = None): - self.liveViewerIp = liveViewerIp - self.liveViewerPort = liveViewerPort + self.log = self.getLogger() + self.log.debug("Init") if context: assert isinstance(context, zmq.sugar.context.Context) self.zmqContext = context or zmq.Context() - self.log = self.getLogger() - self.log.debug("Init") + # create sockets + self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIR) + connectionStrReceiverExchangeSocket = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort + self.receiverExchangeSocket.bind(connectionStrReceiverExchangeSocket) + self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStrReceiverExchangeSocket + "'") # create socket for live viewer - self.liveViewerSocket = self.zmqContext.socket(zmq.REP) - connectionStrLiveViewerSocket = "tcp://" + self.liveViewerIp + ":%s" % self.liveViewerPort - self.liveViewerSocket.bind(connectionStrLiveViewerSocket) + self.zmqliveViewerSocket = self.zmqContext.socket(zmq.REP) + connectionStrLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort + self.zmqliveViewerSocket.bind(connectionStrLiveViewerSocket) self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStrLiveViewerSocket + "'") - # create socket for message exchange - self.exchangeSocket = self.zmqContext.socket(zmq.PAIR) - connectionStrExchangeSocket = "tcp://" + self.exchangeIp + ":%s" % self.exchangePort - self.exchangeSocket.connect(connectionStrExchangeSocket) - self.log.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'") - self.poller = zmq.Poller() - self.poller.register(self.liveViewerSocket, zmq.POLLIN) - self.poller.register(self.exchangeSocket, zmq.POLLIN) + self.poller.register(self.receiverExchangeSocket, zmq.POLLIN) + self.poller.register(self.zmqliveViewerSocket, zmq.POLLIN) - self.sendFileToLiveViewer() + + # initialize ring buffer + # get all entries in the directory + # TODO empty target dir -> ringBuffer = [] + self.ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir)) + # get the corresponding stats + self.ringBuffer = ((os.stat(path), path) for path in self.ringBuffer) + # leave only regular files, insert modification date + self.ringBuffer = [[stat[ST_MTIME], path] + for stat, path in self.ringBuffer if S_ISREG(stat[ST_MODE])] + + # sort the ring buffer in descending order (new to old files) + self.ringBuffer = sorted(self.ringBuffer, reverse=True) + self.log.debug("Init ring buffer") + + + try: + self.log.info("Start communication") + self.communicate() + self.log.info("Stopped communication.") + except Exception, e: + trace = traceback.format_exc() + self.log.info("Unkown error state. Shutting down...") + self.log.debug("Error was: " + str(e)) + + + self.log.info("Quitting.") def getLogger(self): - logger = logging.getLogger("liveViewer") + logger = logging.getLogger("coordinator") return logger - def sendFileToLiveViewer(self): + def communicate(self): should_continue = True while should_continue: socks = dict(self.poller.poll()) - if self.exchangeSocket in socks and socks[self.exchangeSocket] == zmq.POLLIN: - message = self.exchangeSocket.recv() + + if self.receiverExchangeSocket in socks and socks[self.receiverExchangeSocket] == zmq.POLLIN: + message = self.receiverExchangeSocket.recv() self.log.debug("Recieved control command: %s" % message ) if message == "Exit": - self.log.debug("Recieved exit command, liveViewer thread will stop recieving messages") + self.log.debug("Recieved exit command, coordinator thread will stop recieving messages") should_continue = False + self.liveViewerSocket.send("Exit") break + elif message.startswith("AddFile"): + self.log.debug("Received AddFile command") + # add file to ring buffer + splittedMessage = message[7:].split(", ") + filename = splittedMessage[0] + fileModTime = splittedMessage[1] + self.log.debug("Send new file to ring buffer: " + str(filename) + ", " + str(fileModTime)) + self.addFileToRingBuffer(filename, fileModTime) - if self.liveViewerSocket in socks and socks[self.liveViewerSocket] == zmq.POLLIN: - message = self.liveViewerSocket.recv() - self.log.debug("Call for next file... ") + if self.zmqliveViewerSocket in socks and socks[self.zmqliveViewerSocket] == zmq.POLLIN: + message = self.zmqliveViewerSocket.recv() + self.log.debug("Call for next file... " + message) # send first element in ring buffer to live viewer (the path of this file is the second entry) if self.ringBuffer: answer = self.ringBuffer[0][1] @@ -519,15 +395,31 @@ class LiveViewer(): print answer try: - self.liveViewerSocket.send(answer) + self.zmqliveViewerSocket.send(answer) except zmq.error.ContextTerminated: break - self.log.debug("LiveViewerThread: closing socket") - self.liveViewerSocket.close() - self.exchangeSocket.close() + self.log.debug("sending exit signal to thread...") + self.zmqliveViewerSocket.send("Exit") + # give the signal time to arrive + time.sleep(0.1) + + self.log.debug("Closing socket") + self.receiverExchangeSocket.close() + self.zmqliveViewerSocket.close() + + def addFileToRingBuffer(self, filename, fileModTime): + # prepend file to ring buffer and restore order + self.ringBuffer[:0] = [[fileModTime, filename]] + self.ringBuffer = sorted(self.ringBuffer, reverse=True) + # if the maximal size is exceeded: remove the oldest files + if len(self.ringBuffer) > self.maxRingBufferSize: + for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]: + if float(time.time()) - mod_time > self.timeToWaitForRingBuffer: + os.remove(path) + self.ringBuffer.remove([mod_time, path]) -- GitLab