Skip to content
Snippets Groups Projects
Commit 3c4c4d5c authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added communication with live viewer

parent 72386d2b
No related branches found
No related tags found
No related merge requests found
...@@ -5,6 +5,7 @@ import time ...@@ -5,6 +5,7 @@ import time
from PyQt4 import QtCore from PyQt4 import QtCore
from PyQt4.QtCore import SIGNAL, QThread, QMutex from PyQt4.QtCore import SIGNAL, QThread, QMutex
import zmq import zmq
import cv2
class LiveView(QThread): class LiveView(QThread):
FILETYPE_CBF = 0 FILETYPE_CBF = 0
...@@ -55,10 +56,11 @@ class LiveView(QThread): ...@@ -55,10 +56,11 @@ class LiveView(QThread):
print "Live view thread: Stopping thread" print "Live view thread: Stopping thread"
self.alive = False self.alive = False
self.wait() # waits until run stops on his own
# close ZeroMQ socket and destroy ZeroMQ context # close ZeroMQ socket and destroy ZeroMQ context
stopZmq(self.zmqSocket, self.zmqContext) stopZmq(self.zmqSocket, self.zmqContext)
self.wait() # waits until run stops on his own
def run(self): def run(self):
self.alive = True self.alive = True
...@@ -68,14 +70,15 @@ class LiveView(QThread): ...@@ -68,14 +70,15 @@ class LiveView(QThread):
if self.filetype in [LiveView.FILETYPE_CBF, LiveView.FILETYPE_TIF]: if self.filetype in [LiveView.FILETYPE_CBF, LiveView.FILETYPE_TIF]:
# open viewer # open viewer
while self.alive: while self.alive:
print "self.alive", self.alive
# find latest image # find latest image
self.mutex.lock() self.mutex.lock()
# get latest file from reveiver # get latest file from reveiver
try: try:
received_file = communicateWithReceiver(self.zmqSocket) received_file = communicateWithReceiver(self.zmqSocket)
print "===received_file", received_file
except zmq.error.ZMQError: except zmq.error.ZMQError:
received_file = None
print "ZMQError" print "ZMQError"
break break
...@@ -146,6 +149,7 @@ def communicateWithReceiver(socket): ...@@ -146,6 +149,7 @@ def communicateWithReceiver(socket):
# Get the reply. # Get the reply.
message = socket.recv() message = socket.recv()
print "Next file: ", message print "Next file: ", message
return message
def stopZmq(zmqSocket, zmqContext): def stopZmq(zmqSocket, zmqContext):
try: try:
......
...@@ -18,13 +18,16 @@ import threading ...@@ -18,13 +18,16 @@ import threading
class FileReceiver: class FileReceiver:
globalZmqContext = None globalZmqContext = None
liveViewerZmqContext = None
outputDir = None outputDir = None
zmqDataStreamPort = None
zqmDataStreamIp = None zqmDataStreamIp = None
maxRingBufferSize = 100 zmqDataStreamPort = None
timeToWaitForRingBuffer = 2 zmqLiveViewerIp = None
zmqLiveViewerPort = None
ringBuffer = [] ringBuffer = []
liveViewerZmqContext = None maxRingBufferSize = 200
timeToWaitForRingBuffer = 2
runThread = True
def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp): def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp):
self.outputDir = outputDir self.outputDir = outputDir
...@@ -53,27 +56,13 @@ class FileReceiver: ...@@ -53,27 +56,13 @@ class FileReceiver:
self.globalZmqContext = zmq.Context() self.globalZmqContext = zmq.Context()
# thread to communicate with live viewer # thread to communicate with live viewer
self.liveViewerThread = threading.Thread(target=self.sendFileToLiveViewer)
#create socket for live viewer self.liveViewerThread.start()
try:
logging.info("creating socket for communication with live viewer...")
zmqContext = self.getZmqContext()
zmqLiveViewerSocket = self.createSocketForLiveViewer(zmqContext)
logging.info("creating socket for communication with live viewer...done.")
except Exception, e:
errorMessage = "Unable to create zeromq context."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.info("creating socket for communication with live viewer...failed.")
raise Exception(e)
t1 = threading.Thread(target=self.sendFileToLiveViewer, args=(zmqLiveViewerSocket,))
t1.start()
try: try:
logging.info("Start receiving new files") logging.info("Start receiving new files")
self.startReceiving(zmqLiveViewerSocket) self.startReceiving()
logging.info("Stopped receiving.") logging.info("Stopped receiving.")
except Exception, e: except Exception, e:
logging.error("Unknown error while receiving files. Need to abort.") logging.error("Unknown error while receiving files. Need to abort.")
...@@ -154,16 +143,37 @@ class FileReceiver: ...@@ -154,16 +143,37 @@ class FileReceiver:
# Albula is the live viewer used at the beamlines # Albula is the live viewer used at the beamlines
def sendFileToLiveViewer(self, zmqLiveViewerSocket): def sendFileToLiveViewer(self):
#create socket for live viewer
try:
logging.info("creating socket for communication with live viewer...")
zmqContext = self.getZmqContext()
zmqLiveViewerSocket = self.createSocketForLiveViewer(zmqContext)
logging.info("creating socket for communication with live viewer...done.")
except Exception, e:
errorMessage = "Unable to create zeromq context."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.info("creating socket for communication with live viewer...failed.")
raise Exception(e)
# if there is a request of the live viewer: # if there is a request of the live viewer:
# send first element in ring buffer to live viewer while True:
pass # Wait for next request from client
# while True: try:
# # Wait for next request from client message = zmqLiveViewerSocket.recv()
# message = zmqLiveViewerSocket.recv() except zmq.error.ContextTerminated:
# print "Received request: ", message break
# time.sleep (1) print "Received request: ", message
# socket.send("World from %s" % port) time.sleep (1)
# send first element in ring buffer to live viewer (the path of this file is the second entry)
if self.ringBuffer:
zmqLiveViewerSocket.send(self.ringBuffer[0][1])
print self.ringBuffer[0][1]
else:
zmqLiveViewerSocket.send("None")
print self.ringBuffer
def combineMessage(self, zmqSocket): def combineMessage(self, zmqSocket):
...@@ -209,7 +219,7 @@ class FileReceiver: ...@@ -209,7 +219,7 @@ class FileReceiver:
def startReceiving(self, zmqLiveViewerSocket): def startReceiving(self):
#create pull socket #create pull socket
try: try:
logging.info("creating local pullSocket for incoming files...") logging.info("creating local pullSocket for incoming files...")
...@@ -243,7 +253,7 @@ class FileReceiver: ...@@ -243,7 +253,7 @@ class FileReceiver:
logging.info("shutting down receiver...") logging.info("shutting down receiver...")
try: try:
logging.debug("shutting down zeromq...") logging.debug("shutting down zeromq...")
self.stopReceiving(zmqSocket, zmqLiveViewerSocket, zmqContext) self.stopReceiving(zmqSocket, zmqContext)
logging.debug("shutting down zeromq...done.") logging.debug("shutting down zeromq...done.")
except: except:
logging.error(sys.exc_info()) logging.error(sys.exc_info())
...@@ -348,7 +358,10 @@ class FileReceiver: ...@@ -348,7 +358,10 @@ class FileReceiver:
def stopReceiving(self, zmqSocket, zmqLiveViewerSocket, msgContext): def stopReceiving(self, zmqSocket, msgContext):
self.runThread=False
try: try:
logging.debug("closing zmqSocket...") logging.debug("closing zmqSocket...")
zmqSocket.close() zmqSocket.close()
...@@ -357,17 +370,18 @@ class FileReceiver: ...@@ -357,17 +370,18 @@ class FileReceiver:
logging.error("closing zmqSocket...failed.") logging.error("closing zmqSocket...failed.")
logging.error(sys.exc_info()) logging.error(sys.exc_info())
try: # try:
logging.debug("closing zmqLiveViwerSocket...") # logging.debug("closing zmqLiveViwerSocket...")
zmqLiveViewerSocket.close() # zmqLiveViewerSocket.close()
logging.debug("closing zmqLiveViwerSocket...done.") # logging.debug("closing zmqLiveViwerSocket...done.")
except: # except:
logging.error("closing zmqLiveViewerSocket...failed.") # logging.error("closing zmqLiveViewerSocket...failed.")
logging.error(sys.exc_info()) # logging.error(sys.exc_info())
try: try:
logging.debug("closing zmqContext...") logging.debug("closing zmqContext...")
msgContext.destroy() # msgContext.destroy()
msgContext.term()
logging.debug("closing zmqContext...done.") logging.debug("closing zmqContext...done.")
except: except:
logging.error("closing zmqContext...failed.") logging.error("closing zmqContext...failed.")
......
...@@ -14,6 +14,6 @@ lv = LiveView() ...@@ -14,6 +14,6 @@ lv = LiveView()
lv.start() lv.start()
time.sleep(5) time.sleep(100)
lv.stop() lv.stop()
...@@ -31,7 +31,7 @@ if supported_file: ...@@ -31,7 +31,7 @@ if supported_file:
p.communicate() p.communicate()
# wait to ZeroMQ to finish # wait to ZeroMQ to finish
time.sleep(5) time.sleep(10)
# get responce from zeromq # get responce from zeromq
#pipe_path = "/tmp/zeromqllpipe_resp" #pipe_path = "/tmp/zeromqllpipe_resp"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment