From 8c8c404f0c26a1651bc254be7c33f5343b1f2675 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Wed, 29 Jul 2015 17:25:16 +0200 Subject: [PATCH] finished coordinator class --- ZeroMQTunnel/receiver.py | 138 +++++++++++++++++++-------------------- 1 file changed, 67 insertions(+), 71 deletions(-) diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index e74e9710..04d2cf84 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -23,10 +23,10 @@ class Coordinator: zmqDataStreamPort = None zmqLiveViewerIp = None zmqLiveViewerPort = None - liveViewerExchangeIp = "127.0.0.1" - liveViewerExchangePort = "6072" receiverExchangeIp = "127.0.0.1" - receiverExchangePort = "6073" + receiverExchangePort = "6072" + liveViewerExchangeIp = "127.0.0.1" + liveViewerExchangePort = "6073" ringBuffer = [] maxRingBufferSize = 200 timeToWaitForRingBuffer = 2 @@ -36,6 +36,12 @@ class Coordinator: receiverThread = None liveViewerThread = None + # sockets + receiverExchangeSocket = None + liveViewerExchangeSocket = None + zmqliveViewerSocket = None + + def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None): self.outputDir = outputDir self.zmqDataStreamIp = zmqDataStreamIp @@ -52,28 +58,32 @@ class Coordinator: self.zmqContext = context or zmq.Context() # create sockets - self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIL) + self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIR) connectionStrReceiverExchangeSocket = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort - self.zmqSocket.bind(connectionStrReceiverExchangeSocket) + self.receiverExchangeSocket.bind(connectionStrReceiverExchangeSocket) self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStrReceiverExchangeSocket + "'") + # TODO use this to communicate with live viewer class self.liveViewerExchangeSocket = self.zmqContext.socket(zmq.PAIR) connectionStrLiveViewerExchangeSocket = "tcp://" + self.liveViewerExchangeIp + ":%s" % self.liveViewerExchangePort self.liveViewerExchangeSocket.bind(connectionStrLiveViewerExchangeSocket) self.log.debug("liveViewerExchangeSocket started (bind) for '" + connectionStrLiveViewerExchangeSocket + "'") + # create socket for live viewer + self.zmqliveViewerSocket = self.zmqContext.socket(zmq.REP) + connectionStrLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort + self.zmqliveViewerSocket.bind(connectionStrLiveViewerSocket) + self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStrLiveViewerSocket + "'") + self.poller = zmq.Poller() self.poller.register(self.receiverExchangeSocket, zmq.POLLIN) - self.poller.register(self.liveViewerExchangeSocket, zmq.POLLIN) - +# self.poller.register(self.liveViewerExchangeSocket, zmq.POLLIN) + self.poller.register(self.zmqliveViewerSocket, zmq.POLLIN) - # start file receiver - self.receiverThread = threading.Thread(target=FileReceiver, args=(self.outputDir, self.zmqDataStreamPort, self.zmqDataStreamIp, self.zmqLiveViewerPort, self.zmqLiveViewerIp)) - self.receiverThread.start() # thread to communicate with live viewer - self.liveViewerThread = threading.Thread(target=LiveViewer) - self.liveViewerThread.start() +# self.liveViewerThread = threading.Thread(target=LiveViewer) +# self.liveViewerThread.start() # initialize ring buffer @@ -93,16 +103,13 @@ class Coordinator: try: self.log.info("Start communication") - self.communicateWithThreads() + self.communicate() self.log.info("Stopped communication.") except Exception, e: trace = traceback.format_exc() self.log.info("Unkown error state. Shutting down...") self.log.debug("Error was: " + str(e)) - self.log.debug("Closing socket") - self.receiverExchangeSocket.close() - self.liveViewerExchangeSocket.close() self.log.info("Quitting.") @@ -112,7 +119,7 @@ class Coordinator: return logger - def communicateWithThreads(self): + def communicate(self): should_continue = True while should_continue: @@ -122,18 +129,24 @@ class Coordinator: message = self.receiverExchangeSocket.recv() self.log.debug("Recieved control command: %s" % message ) if message == "Exit": - self.log.debug("Recieved exit command, liveViewer thread will stop recieving messages") + self.log.debug("Recieved exit command, coordinator thread will stop recieving messages") should_continue = False self.liveViewerSocket.send("Exit") break elif message.startswith("AddFile"): + self.log.debug("Received AddFile command") # add file to ring buffer - filename, fileModTime = message[7:].split(", ") - addFileToRingBuffer(filename, fileModTime) - - if self.liveViewerExchangeSocket in socks and socks[self.liveViewerExchangeSocket] == zmq.POLLIN: - message = self.liveViewerExchangeSocket.recv() - self.log.debug("Call for next file... ") + splittedMessage = message[7:].split(", ") + filename = splittedMessage[0] + fileModTime = splittedMessage[1] + self.log.debug("Send new file to ring buffer: " + str(filename) + ", " + str(fileModTime)) + self.addFileToRingBuffer(filename, fileModTime) + +# if self.liveViewerExchangeSocket in socks and socks[self.liveViewerExchangeSocket] == zmq.POLLIN: +# message = self.liveViewerExchangeSocket.recv() + if self.zmqliveViewerSocket in socks and socks[self.zmqliveViewerSocket] == zmq.POLLIN: + message = self.zmqliveViewer.recv() + self.log.debug("Call for next file... " + message) # send first element in ring buffer to live viewer (the path of this file is the second entry) if self.ringBuffer: answer = self.ringBuffer[0][1] @@ -142,10 +155,22 @@ class Coordinator: print answer try: - self.liveViewerExchangeSocket.send(answer) +# self.liveViewerExchangeSocket.send(answer) + self.zmqliveViewerSocket.send(answer) except zmq.error.ContextTerminated: break + self.log.debug("sending exit signal to thread...") +# self.liveViewerExchangeSocket.send("Exit") + self.zmqliveViewerSocket.send("Exit") + # give the signal time to arrive + time.sleep(0.1) + + self.log.debug("Closing socket") + self.receiverExchangeSocket.close() + self.liveViewerExchangeSocket.close() + self.zmqliveViewerSocket.close() + def addFileToRingBuffer(self, filename, fileModTime): # prepend file to ring buffer and restore order @@ -163,17 +188,13 @@ class Coordinator: class FileReceiver: zmqContext = None - liveViewerZmqContext = None outputDir = None zqmDataStreamIp = None zmqDataStreamPort = None zmqLiveViewerIp = None zmqLiveViewerPort = None - zmqLiveViewerExchangeIp = "127.0.0.1" - zmqLiveViewerExchangePort = "6072" - ringBuffer = [] - maxRingBufferSize = 200 - timeToWaitForRingBuffer = 2 + exchangeIp = "127.0.0.1" + exchangePort = "6072" log = None @@ -197,35 +218,20 @@ class FileReceiver: self.log = self.getLogger() self.log.debug("Init") + # start file receiver + self.receiverThread = threading.Thread(target=Coordinator, args=(self.outputDir, self.zmqDataStreamPort, self.zmqDataStreamIp, self.zmqLiveViewerPort, self.zmqLiveViewerIp)) + self.receiverThread.start() + # create pull socket self.zmqSocket = self.zmqContext.socket(zmq.PULL) connectionStrZmqSocket = "tcp://" + self.zmqDataStreamIp + ":%s" % self.zmqDataStreamPort self.zmqSocket.bind(connectionStrZmqSocket) - self.log.debug("zmqSocket started for '" + connectionStrZmqSocket + "'") + self.log.debug("zmqSocket started (bind) for '" + connectionStrZmqSocket + "'") self.exchangeSocket = self.zmqContext.socket(zmq.PAIR) - connectionStrExchangeSocket = "tcp://" + self.zmqLiveViewerExchangeIp + ":%s" % self.zmqLiveViewerExchangePort - self.exchangeSocket.bind(connectionStrExchangeSocket) - self.log.debug("exchangeSocket started (bind) for '" + connectionStrExchangeSocket + "'") - - - # thread to communicate with live viewer - self.liveViewerThread = threading.Thread(target=LiveViewer) - self.liveViewerThread.start() - - # initialize ring buffer - # get all entries in the directory - # TODO empty target dir -> ringBuffer = [] - ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir)) - # get the corresponding stats - ringBuffer = ((os.stat(path), path) for path in ringBuffer) - - # leave only regular files, insert modification date - ringBuffer = [[stat[ST_MTIME], path] - for stat, path in ringBuffer if S_ISREG(stat[ST_MODE])] - - # sort the ring buffer in descending order (new to old files) - ringBuffer = sorted(ringBuffer, reverse=True) + connectionStrExchangeSocket = "tcp://" + self.exchangeIp + ":%s" % self.exchangePort + self.exchangeSocket.connect(connectionStrExchangeSocket) + self.log.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'") try: self.log.info("Start receiving new files") @@ -248,19 +254,6 @@ class FileReceiver: return logger - def addFileToRingBuffer(self, filename, fileModTime): - # prepend file to ring buffer and restore order - self.ringBuffer[:0] = [[fileModTime, filename]] - self.ringBuffer = sorted(self.ringBuffer, reverse=True) - - # if the maximal size is exceeded: remove the oldest files - if len(self.ringBuffer) > self.maxRingBufferSize: - for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]: - if float(time.time()) - mod_time > self.timeToWaitForRingBuffer: - os.remove(path) - self.ringBuffer.remove([mod_time, path]) - - def combineMessage(self, zmqSocket): receivingMessages = True #save all chunks to file @@ -304,9 +297,10 @@ class FileReceiver: fileModTime = payloadMetadataDict["fileModificationTime"] self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename)) - # add to ring buffer - self.log.debug("add file to ring buffer: "+ str(filename) + ", " + str(fileModTime)) - self.addFileToRingBuffer(str(filename), fileModTime) + # send the file to the coordinator to add it to the ring buffer + message = "AddFile" + str(filename) + ", " + str(fileModTime) + self.log.debug("Send file to coordinator: " + message ) + self.exchangeSocket.send(message) @@ -322,6 +316,7 @@ class FileReceiver: loopCounter+=1 except KeyboardInterrupt: self.log.debug("Keyboard interrupt detected. Stop receiving.") + continueReceiving = False break except: self.log.error("receive message...failed.") @@ -435,6 +430,7 @@ class FileReceiver: self.log.debug("sending exit signal to thread...") self.exchangeSocket.send("Exit") + # give the signal time to arrive time.sleep(0.1) self.exchangeSocket.close() self.log.debug("sending exit signal to thread...done") @@ -479,7 +475,7 @@ class LiveViewer(): self.liveViewerSocket = self.zmqContext.socket(zmq.REP) connectionStrLiveViewerSocket = "tcp://" + self.liveViewerIp + ":%s" % self.liveViewerPort self.liveViewerSocket.bind(connectionStrLiveViewerSocket) - self.log.debug("zmqLiveViewerSocket started for '" + connectionStrLiveViewerSocket + "'") + self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStrLiveViewerSocket + "'") # create socket for message exchange self.exchangeSocket = self.zmqContext.socket(zmq.PAIR) -- GitLab