From 9c49b522c8b448fabdf51c423a1a4502643d4be8 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Tue, 28 Jul 2015 14:42:09 +0200 Subject: [PATCH] Removed unnessary functions --- ZeroMQTunnel/fileMover.py | 10 +-- ZeroMQTunnel/receiver.py | 155 +++++++++----------------------------- 2 files changed, 42 insertions(+), 123 deletions(-) diff --git a/ZeroMQTunnel/fileMover.py b/ZeroMQTunnel/fileMover.py index 04fc44eb..1f08670f 100644 --- a/ZeroMQTunnel/fileMover.py +++ b/ZeroMQTunnel/fileMover.py @@ -582,8 +582,10 @@ class Cleaner(): self.log.debug("Init") #bind to local port - self.zmqCleanerSocket = self.zmqContextForCleaner.socket(zmq.PULL) - self.zmqCleanerSocket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.bindingPortForSocket) + self.zmqCleanerSocket = self.zmqContextForCleaner.socket(zmq.PULL) + connectionStrCleanerSocket = "tcp://" + self.bindingIpForSocket + ":%s" % self.bindingPortForSocket + self.zmqCleanerSocket.bind(connectionStrCleanerSocket) + self.log.debug("zmqCleanerSocket started for '" + connectionStrCleanerSocket + "'") try: self.process() @@ -607,10 +609,8 @@ class Cleaner(): def process(self): - processingJobs = True - #processing messaging - while processingJobs: + while True: #waiting for new jobs self.log.debug("Waiting for new jobs") try: diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index af664ff4..28ab21d6 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -17,7 +17,7 @@ import threading class FileReceiver: - globalZmqContext = None + zmqContext = None liveViewerZmqContext = None outputDir = None zqmDataStreamIp = None @@ -27,15 +27,33 @@ class FileReceiver: ringBuffer = [] maxRingBufferSize = 200 timeToWaitForRingBuffer = 2 - runThread = True - def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp): + # sockets + zmqSocket = None + + + def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None): self.outputDir = outputDir self.zmqDataStreamIp = zmqDataStreamIp self.zmqDataStreamPort = zmqDataStreamPort self.zmqLiveViewerIp = zmqLiveViewerIp self.zmqLiveViewerPort = zmqLiveViewerPort + if context: + assert isinstance(context, zmq.sugar.context.Context) + + self.zmqContext = context or zmq.Context() + + # create pull socket + self.zmqSocket = self.zmqContext.socket(zmq.PULL) + connectionStrZmqSocket = "tcp://" + self.zmqDataStreamIp + ":%s" % self.zmqDataStreamPort + self.zmqSocket.bind(connectionStrZmqSocket) + logging.debug("zmqSocket started for '" + connectionStrZmqSocket + "'") + + # thread to communicate with live viewer + self.liveViewerThread = threading.Thread(target=self.sendFileToLiveViewer) + self.liveViewerThread.start() + # initialize ring buffer # get all entries in the directory # TODO empty target dir -> ringBuffer = [] @@ -50,16 +68,6 @@ class FileReceiver: # sort the ring buffer in descending order (new to old files) ringBuffer = sorted(ringBuffer, reverse=True) - - # "global" in Python is per-module ! - global globalZmqContext - self.globalZmqContext = zmq.Context() - - # thread to communicate with live viewer - self.liveViewerThread = threading.Thread(target=self.sendFileToLiveViewer) - self.liveViewerThread.start() - - try: logging.info("Start receiving new files") self.startReceiving() @@ -71,64 +79,11 @@ class FileReceiver: trace = traceback.format_exc() logging.info("Unkown error state. Shutting down...") logging.debug("Error was: " + str(trace)) - self.globalZmqContext.destroy() + self.zmqContext.destroy() logging.info("Quitting.") - def receiveMessage(self, socket): - assert isinstance(socket, zmq.sugar.socket.Socket) - logging.debug("receiving messages...") - # message = socket.recv() - # while True: - message = socket.recv_multipart() - - return message - - - def getZmqContext(self): - #get reference for global context-var - globalZmqContext = self.globalZmqContext - - return globalZmqContext - - - def getZmqSocket_Pull(self, context): - pattern_pull = zmq.PULL - assert isinstance(context, zmq.sugar.context.Context) - socket = context.socket(pattern_pull) - - return socket - - - def getZmqSocket_Rep(self, context): - pattern_pull = zmq.REP - assert isinstance(context, zmq.sugar.context.Context) - socket = context.socket(pattern_pull) - - return socket - - - def createPullSocket(self, context): - assert isinstance(context, zmq.sugar.context.Context) - socket = self.getZmqSocket_Pull(context) - - logging.info("binding to data socket: tcp://" + self.zmqDataStreamIp + ":%s" % self.zmqDataStreamPort) - socket.bind('tcp://' + self.zmqDataStreamIp + ':%s' % self.zmqDataStreamPort) - - return socket - - - def createSocketForLiveViewer(self, context): - assert isinstance(context, zmq.sugar.context.Context) - socket = self.getZmqSocket_Rep(context) - - logging.info("binding to data socket: tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort) - socket.bind('tcp://' + self.zmqLiveViewerIp + ':%s' % self.zmqLiveViewerPort) - - return socket - - def addFileToRingBuffer(self, filename, fileModTime): # prepend file to ring buffer and restore order self.ringBuffer[:0] = [[fileModTime, filename]] @@ -142,21 +97,14 @@ class FileReceiver: self.ringBuffer.remove([mod_time, path]) - # Albula is the live viewer used at the beamlines def sendFileToLiveViewer(self): - #create socket for live viewer - try: - logging.info("creating socket for communication with live viewer...") - zmqContext = self.getZmqContext() - zmqLiveViewerSocket = self.createSocketForLiveViewer(zmqContext) - logging.info("creating socket for communication with live viewer...done.") - except Exception, e: - errorMessage = "Unable to create zeromq context." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.info("creating socket for communication with live viewer...failed.") - raise Exception(e) + # create socket for live viewer + zmqLiveViewerSocket = self.zmqContext.socket(zmq.REP) + connectionStrZmqLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort + zmqLiveViewerSocket.bind(connectionStrZmqLiveViewerSocket) + logging.debug("zmqLiveViewerSocket started for '" + connectionStrZmqLiveViewerSocket + "'") + # if there is a request of the live viewer: while True: @@ -181,13 +129,10 @@ class FileReceiver: except zmq.error.ContextTerminated: break + zmqLiveViewerSocket.close() + def combineMessage(self, zmqSocket): - # multipartMessage = zmqSocket.recv_multipart() - # logging.info("New message received.") - # logging.debug("message-type : " + str(type(multipartMessage))) - # logging.debug("message-length: " + str(len(multipartMessage))) - # loopCounter+=1 #save all chunks to file while True: multipartMessage = zmqSocket.recv_multipart() @@ -226,19 +171,6 @@ class FileReceiver: def startReceiving(self): - #create pull socket - try: - logging.info("creating local pullSocket for incoming files...") - zmqContext = self.getZmqContext() - zmqSocket = self.createPullSocket(zmqContext) - logging.info("creating local pullSocket for incoming files...done.") - except Exception, e: - errorMessage = "Unable to create zeromq context." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.info("creating local pullSocket for incoming files...failed.") - raise Exception(e) - #run loop, and wait for incoming messages continueStreaming = True loopCounter = 0 #counter of total received messages @@ -246,7 +178,7 @@ class FileReceiver: logging.debug("Waiting for new messages...") while continueReceiving: try: - self.combineMessage(zmqSocket) + self.combineMessage(self.zmqSocket) loopCounter+=1 except KeyboardInterrupt: logging.debug("Keyboard interrupt detected. Stop receiving.") @@ -258,12 +190,11 @@ class FileReceiver: logging.info("shutting down receiver...") try: - logging.debug("shutting down zeromq...") - self.stopReceiving(zmqSocket, zmqContext) - logging.debug("shutting down zeromq...done.") + self.stopReceiving(self.zmqSocket, self.zmqContext) + logging.debug("shutting down receiver...done.") except: logging.error(sys.exc_info()) - logging.error("shutting down zeromq...failed.") + logging.error("shutting down receiver...failed.") def generateTargetFilepath(self,configDict): @@ -364,30 +295,18 @@ class FileReceiver: - def stopReceiving(self, zmqSocket, msgContext): - - self.runThread=False + def stopReceiving(self, zmqSocket, zmqContext): + logging.debug("stopReceiving...") try: - logging.debug("closing zmqSocket...") zmqSocket.close() logging.debug("closing zmqSocket...done.") except: logging.error("closing zmqSocket...failed.") logging.error(sys.exc_info()) -# try: -# logging.debug("closing zmqLiveViwerSocket...") -# zmqLiveViewerSocket.close() -# logging.debug("closing zmqLiveViwerSocket...done.") -# except: -# logging.error("closing zmqLiveViewerSocket...failed.") -# logging.error(sys.exc_info()) - try: - logging.debug("closing zmqContext...") -# msgContext.destroy() - msgContext.term() + zmqContext.destroy() logging.debug("closing zmqContext...done.") except: logging.error("closing zmqContext...failed.") -- GitLab