From 3c4c4d5c1aea7e4383e4bf0069a19d91b9d51d4d Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Fri, 24 Jul 2015 09:35:59 +0200 Subject: [PATCH] Added communication with live viewer --- LiveViewer.py | 8 ++- ZeroMQTunnel/receiver.py | 96 ++++++++++++++++++-------------- start_scripts/startliveviewer.py | 2 +- wrapper_script.py | 2 +- 4 files changed, 63 insertions(+), 45 deletions(-) diff --git a/LiveViewer.py b/LiveViewer.py index 59e5f6ab..72876b68 100644 --- a/LiveViewer.py +++ b/LiveViewer.py @@ -5,6 +5,7 @@ import time from PyQt4 import QtCore from PyQt4.QtCore import SIGNAL, QThread, QMutex import zmq +import cv2 class LiveView(QThread): FILETYPE_CBF = 0 @@ -55,10 +56,11 @@ class LiveView(QThread): print "Live view thread: Stopping thread" self.alive = False + self.wait() # waits until run stops on his own + # close ZeroMQ socket and destroy ZeroMQ context stopZmq(self.zmqSocket, self.zmqContext) - self.wait() # waits until run stops on his own def run(self): self.alive = True @@ -68,14 +70,15 @@ class LiveView(QThread): if self.filetype in [LiveView.FILETYPE_CBF, LiveView.FILETYPE_TIF]: # open viewer while self.alive: - print "self.alive", self.alive # find latest image self.mutex.lock() # get latest file from reveiver try: received_file = communicateWithReceiver(self.zmqSocket) + print "===received_file", received_file except zmq.error.ZMQError: + received_file = None print "ZMQError" break @@ -146,6 +149,7 @@ def communicateWithReceiver(socket): # Get the reply. message = socket.recv() print "Next file: ", message + return message def stopZmq(zmqSocket, zmqContext): try: diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index 93b7caed..40f6c314 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -18,13 +18,16 @@ import threading class FileReceiver: globalZmqContext = None + liveViewerZmqContext = None outputDir = None - zmqDataStreamPort = None zqmDataStreamIp = None - maxRingBufferSize = 100 - timeToWaitForRingBuffer = 2 + zmqDataStreamPort = None + zmqLiveViewerIp = None + zmqLiveViewerPort = None ringBuffer = [] - liveViewerZmqContext = None + maxRingBufferSize = 200 + timeToWaitForRingBuffer = 2 + runThread = True def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp): self.outputDir = outputDir @@ -53,27 +56,13 @@ class FileReceiver: self.globalZmqContext = zmq.Context() # thread to communicate with live viewer - - #create socket for live viewer - try: - logging.info("creating socket for communication with live viewer...") - zmqContext = self.getZmqContext() - zmqLiveViewerSocket = self.createSocketForLiveViewer(zmqContext) - logging.info("creating socket for communication with live viewer...done.") - except Exception, e: - errorMessage = "Unable to create zeromq context." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.info("creating socket for communication with live viewer...failed.") - raise Exception(e) - - t1 = threading.Thread(target=self.sendFileToLiveViewer, args=(zmqLiveViewerSocket,)) - t1.start() + self.liveViewerThread = threading.Thread(target=self.sendFileToLiveViewer) + self.liveViewerThread.start() try: logging.info("Start receiving new files") - self.startReceiving(zmqLiveViewerSocket) + self.startReceiving() logging.info("Stopped receiving.") except Exception, e: logging.error("Unknown error while receiving files. Need to abort.") @@ -154,16 +143,37 @@ class FileReceiver: # Albula is the live viewer used at the beamlines - def sendFileToLiveViewer(self, zmqLiveViewerSocket): + def sendFileToLiveViewer(self): + + #create socket for live viewer + try: + logging.info("creating socket for communication with live viewer...") + zmqContext = self.getZmqContext() + zmqLiveViewerSocket = self.createSocketForLiveViewer(zmqContext) + logging.info("creating socket for communication with live viewer...done.") + except Exception, e: + errorMessage = "Unable to create zeromq context." + logging.error(errorMessage) + logging.debug("Error was: " + str(e)) + logging.info("creating socket for communication with live viewer...failed.") + raise Exception(e) + # if there is a request of the live viewer: - # send first element in ring buffer to live viewer - pass -# while True: -# # Wait for next request from client -# message = zmqLiveViewerSocket.recv() -# print "Received request: ", message -# time.sleep (1) -# socket.send("World from %s" % port) + while True: + # Wait for next request from client + try: + message = zmqLiveViewerSocket.recv() + except zmq.error.ContextTerminated: + break + print "Received request: ", message + time.sleep (1) + # send first element in ring buffer to live viewer (the path of this file is the second entry) + if self.ringBuffer: + zmqLiveViewerSocket.send(self.ringBuffer[0][1]) + print self.ringBuffer[0][1] + else: + zmqLiveViewerSocket.send("None") + print self.ringBuffer def combineMessage(self, zmqSocket): @@ -209,7 +219,7 @@ class FileReceiver: - def startReceiving(self, zmqLiveViewerSocket): + def startReceiving(self): #create pull socket try: logging.info("creating local pullSocket for incoming files...") @@ -243,7 +253,7 @@ class FileReceiver: logging.info("shutting down receiver...") try: logging.debug("shutting down zeromq...") - self.stopReceiving(zmqSocket, zmqLiveViewerSocket, zmqContext) + self.stopReceiving(zmqSocket, zmqContext) logging.debug("shutting down zeromq...done.") except: logging.error(sys.exc_info()) @@ -348,7 +358,10 @@ class FileReceiver: - def stopReceiving(self, zmqSocket, zmqLiveViewerSocket, msgContext): + def stopReceiving(self, zmqSocket, msgContext): + + self.runThread=False + try: logging.debug("closing zmqSocket...") zmqSocket.close() @@ -357,17 +370,18 @@ class FileReceiver: logging.error("closing zmqSocket...failed.") logging.error(sys.exc_info()) - try: - logging.debug("closing zmqLiveViwerSocket...") - zmqLiveViewerSocket.close() - logging.debug("closing zmqLiveViwerSocket...done.") - except: - logging.error("closing zmqLiveViewerSocket...failed.") - logging.error(sys.exc_info()) +# try: +# logging.debug("closing zmqLiveViwerSocket...") +# zmqLiveViewerSocket.close() +# logging.debug("closing zmqLiveViwerSocket...done.") +# except: +# logging.error("closing zmqLiveViewerSocket...failed.") +# logging.error(sys.exc_info()) try: logging.debug("closing zmqContext...") - msgContext.destroy() +# msgContext.destroy() + msgContext.term() logging.debug("closing zmqContext...done.") except: logging.error("closing zmqContext...failed.") diff --git a/start_scripts/startliveviewer.py b/start_scripts/startliveviewer.py index 2de0e0f3..0cd6dce5 100644 --- a/start_scripts/startliveviewer.py +++ b/start_scripts/startliveviewer.py @@ -14,6 +14,6 @@ lv = LiveView() lv.start() -time.sleep(5) +time.sleep(100) lv.stop() diff --git a/wrapper_script.py b/wrapper_script.py index 587beb33..5a97429d 100644 --- a/wrapper_script.py +++ b/wrapper_script.py @@ -31,7 +31,7 @@ if supported_file: p.communicate() # wait to ZeroMQ to finish - time.sleep(5) + time.sleep(10) # get responce from zeromq #pipe_path = "/tmp/zeromqllpipe_resp" -- GitLab