Skip to content
Snippets Groups Projects
Commit 7470ae72 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Splitted FileReceiver and Coordinater-classes from receiver

parent 83ee6343
No related branches found
No related tags found
No related merge requests found
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'
import zmq
import logging
import traceback
from RingBuffer import RingBuffer
#
# -------------------------- class: Coordinator --------------------------------------
#
class Coordinator:
zmqContext = None
liveViewerZmqContext = None
outputDir = None
zmqDataStreamIp = None
zmqDataStreamPort = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
receiverExchangeIp = "127.0.0.1"
receiverExchangePort = "6072"
ringBuffer = []
maxRingBufferSize = None
log = None
receiverThread = None
liveViewerThread = None
# sockets
receiverExchangeSocket = None # socket to communicate with FileReceiver class
zmqliveViewerSocket = None # socket to communicate with live viewer
def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, maxRingBufferSize, context = None):
self.outputDir = outputDir
self.zmqDataStreamIp = zmqDataStreamIp
self.zmqDataStreamPort = zmqDataStreamPort
self.zmqLiveViewerIp = zmqLiveViewerIp
self.zmqLiveViewerPort = zmqLiveViewerPort
self.maxRingBufferSize = maxRingBufferSize
# # TODO remove outputDir from ringBuffer?
# self.ringBuffer = RingBuffer(self.maxRingBufferSize, self.outputDir)
self.ringBuffer = RingBuffer(self.maxRingBufferSize)
self.log = self.getLogger()
self.log.debug("Init")
if context:
assert isinstance(context, zmq.sugar.context.Context)
self.zmqContext = context or zmq.Context()
# create sockets
self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrReceiverExchangeSocket = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort
self.receiverExchangeSocket.bind(connectionStrReceiverExchangeSocket)
self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStrReceiverExchangeSocket + "'")
# create socket for live viewer
self.zmqliveViewerSocket = self.zmqContext.socket(zmq.REP)
connectionStrLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort
self.zmqliveViewerSocket.bind(connectionStrLiveViewerSocket)
self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStrLiveViewerSocket + "'")
self.poller = zmq.Poller()
self.poller.register(self.receiverExchangeSocket, zmq.POLLIN)
self.poller.register(self.zmqliveViewerSocket, zmq.POLLIN)
try:
self.log.info("Start communication")
self.communicate()
self.log.info("Stopped communication.")
except Exception, e:
trace = traceback.format_exc()
self.log.info("Unkown error state. Shutting down...")
self.log.debug("Error was: " + str(e))
self.log.info("Quitting.")
def getLogger(self):
logger = logging.getLogger("coordinator")
return logger
def communicate(self):
should_continue = True
while should_continue:
socks = dict(self.poller.poll())
if self.receiverExchangeSocket in socks and socks[self.receiverExchangeSocket] == zmq.POLLIN:
message = self.receiverExchangeSocket.recv()
self.log.debug("Recieved control command: %s" % message )
if message == "Exit":
self.log.debug("Received exit command, coordinator thread will stop recieving messages")
should_continue = False
# TODO why sending signal to live viewer?
# self.zmqliveViewerSocket.send("Exit", zmq.NOBLOCK)
break
elif message.startswith("AddFile"):
self.log.debug("Received AddFile command")
# add file to ring buffer
splittedMessage = message[7:].split(", ")
filename = splittedMessage[0]
fileModTime = splittedMessage[1]
self.log.debug("Add new file to ring buffer: " + str(filename) + ", " + str(fileModTime))
self.ringBuffer.add(filename, fileModTime)
if self.zmqliveViewerSocket in socks and socks[self.zmqliveViewerSocket] == zmq.POLLIN:
message = self.zmqliveViewerSocket.recv()
self.log.debug("Call for next file... " + message)
# send newest element in ring buffer to live viewer
answer = self.ringBuffer.getNewestFile()
print answer
try:
self.zmqliveViewerSocket.send(answer)
except zmq.error.ContextTerminated:
break
self.log.debug("Closing socket")
self.receiverExchangeSocket.close(0)
self.zmqliveViewerSocket.close(0)
__author__ = 'Manuela Kuhn <marnuel.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'
import time
import zmq
import sys
import random
import json
import argparse
import logging
import errno
import os
import traceback
from stat import S_ISREG, ST_MTIME, ST_MODE
import threading
import socket # needed to get hostname
import helperScript
from RingBuffer import RingBuffer
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) )
CONFIG_PATH = BASE_PATH + os.sep + "conf"
sys.path.append ( CONFIG_PATH )
from config import defaultConfigReceiver as defaultConfig
from Coordinator import Coordinator
#
......@@ -365,201 +354,3 @@ class FileReceiver:
self.log.error("closing zmqContext...failed.")
self.log.error(sys.exc_info())
#
# -------------------------- class: Coordinator --------------------------------------
#
class Coordinator:
zmqContext = None
liveViewerZmqContext = None
outputDir = None
zmqDataStreamIp = None
zmqDataStreamPort = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
receiverExchangeIp = "127.0.0.1"
receiverExchangePort = "6072"
ringBuffer = []
maxRingBufferSize = None
log = None
receiverThread = None
liveViewerThread = None
# sockets
receiverExchangeSocket = None # socket to communicate with FileReceiver class
zmqliveViewerSocket = None # socket to communicate with live viewer
def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, maxRingBufferSize, context = None):
self.outputDir = outputDir
self.zmqDataStreamIp = zmqDataStreamIp
self.zmqDataStreamPort = zmqDataStreamPort
self.zmqLiveViewerIp = zmqLiveViewerIp
self.zmqLiveViewerPort = zmqLiveViewerPort
self.maxRingBufferSize = maxRingBufferSize
# # TODO remove outputDir from ringBuffer?
# self.ringBuffer = RingBuffer(self.maxRingBufferSize, self.outputDir)
self.ringBuffer = RingBuffer(self.maxRingBufferSize)
self.log = self.getLogger()
self.log.debug("Init")
if context:
assert isinstance(context, zmq.sugar.context.Context)
self.zmqContext = context or zmq.Context()
# create sockets
self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrReceiverExchangeSocket = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort
self.receiverExchangeSocket.bind(connectionStrReceiverExchangeSocket)
self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStrReceiverExchangeSocket + "'")
# create socket for live viewer
self.zmqliveViewerSocket = self.zmqContext.socket(zmq.REP)
connectionStrLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort
self.zmqliveViewerSocket.bind(connectionStrLiveViewerSocket)
self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStrLiveViewerSocket + "'")
self.poller = zmq.Poller()
self.poller.register(self.receiverExchangeSocket, zmq.POLLIN)
self.poller.register(self.zmqliveViewerSocket, zmq.POLLIN)
try:
self.log.info("Start communication")
self.communicate()
self.log.info("Stopped communication.")
except Exception, e:
trace = traceback.format_exc()
self.log.info("Unkown error state. Shutting down...")
self.log.debug("Error was: " + str(e))
self.log.info("Quitting.")
def getLogger(self):
logger = logging.getLogger("coordinator")
return logger
def communicate(self):
should_continue = True
while should_continue:
socks = dict(self.poller.poll())
if self.receiverExchangeSocket in socks and socks[self.receiverExchangeSocket] == zmq.POLLIN:
message = self.receiverExchangeSocket.recv()
self.log.debug("Recieved control command: %s" % message )
if message == "Exit":
self.log.debug("Received exit command, coordinator thread will stop recieving messages")
should_continue = False
# TODO why sending signal to live viewer?
# self.zmqliveViewerSocket.send("Exit", zmq.NOBLOCK)
break
elif message.startswith("AddFile"):
self.log.debug("Received AddFile command")
# add file to ring buffer
splittedMessage = message[7:].split(", ")
filename = splittedMessage[0]
fileModTime = splittedMessage[1]
self.log.debug("Add new file to ring buffer: " + str(filename) + ", " + str(fileModTime))
self.ringBuffer.add(filename, fileModTime)
if self.zmqliveViewerSocket in socks and socks[self.zmqliveViewerSocket] == zmq.POLLIN:
message = self.zmqliveViewerSocket.recv()
self.log.debug("Call for next file... " + message)
# send newest element in ring buffer to live viewer
answer = self.ringBuffer.getNewestFile()
print answer
try:
self.zmqliveViewerSocket.send(answer)
except zmq.error.ContextTerminated:
break
self.log.debug("Closing socket")
self.receiverExchangeSocket.close(0)
self.zmqliveViewerSocket.close(0)
def argumentParsing():
defConf = defaultConfig()
parser = argparse.ArgumentParser()
parser.add_argument("--logfilePath" , type=str, default=defConf.logfilePath , help="path where logfile will be created (default=" + str(defConf.logfilePath) + ")")
parser.add_argument("--logfileName" , type=str, default=defConf.logfileName , help="filename used for logging (default=" + str(defConf.logfileName) + ")")
parser.add_argument("--targetDir" , type=str, default=defConf.targetDir , help="where incoming data will be stored to (default=" + str(defConf.targetDir) + ")")
parser.add_argument("--dataStreamIp" , type=str, default=defConf.dataStreamIp , help="ip of dataStream-socket to pull new files from (default=" + str(defConf.dataStreamIp) + ")")
parser.add_argument("--dataStreamPort" , type=str, default=defConf.dataStreamPort , help="port number of dataStream-socket to pull new files from (default=" + str(defConf.dataStreamPort) + ")")
parser.add_argument("--liveViewerIp" , type=str, default=defConf.liveViewerIp , help="local ip to bind LiveViewer to (default=" + str(defConf.liveViewerIp) + ")")
parser.add_argument("--liveViewerPort" , type=str, default=defConf.liveViewerPort , help="tcp port of live viewer (default=" + str(defConf.liveViewerPort) + ")")
parser.add_argument("--senderComPort" , type=str, default=defConf.senderComPort , help="port number of dataStream-socket to send signals back to the sender (default=" + str(defConf.senderComPort) + ")")
parser.add_argument("--maxRingBufferSize" , type=int, default=defConf.maxRingBufferSize , help="size of the ring buffer for the live viewer (default=" + str(defConf.maxRingBufferSize) + ")")
parser.add_argument("--senderResponseTimeout", type=int, default=defConf.senderResponseTimeout, help=argparse.SUPPRESS)
parser.add_argument("--verbose" , action="store_true" , help="more verbose output")
arguments = parser.parse_args()
targetDir = str(arguments.targetDir)
# check target directory for existance
helperScript.checkFolderExistance(targetDir)
return arguments
class Receiver():
logfilePath = None
logfileName = None
logfileFullPath = None
verbose = None
targetDir = None
zmqDataStreamIp = None
zmqDataStreamPort = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
senderComPort = None
maxRingBufferSize = None
senderResponseTimeout = None
def __init__(self, verbose):
defConf = defaultConfig()
self.logfilePath = defConf.logfilePath
self.logfileName = defConf.logfileName
self.logfileFullPath = os.path.join(self.logfilePath, self.logfileName)
self.verbose = verbose
self.targetDir = defConf.targetDir
self.zmqDataStreamIp = defConf.dataStreamIp
self.zmqDataStreamPort = defConf.dataStreamPort
self.zmqLiveViewerIp = defConf.liveViewerIp
self.zmqLiveViewerPort = defConf.liveViewerPort
self.senderComPort = defConf.senderComPort
self.maxRingBufferSize = defConf.maxRingBufferSize
self.senderResponseTimeout = defConf.senderResponseTimeout
#enable logging
helperScript.initLogging(self.logfileFullPath, self.verbose)
#start file receiver
myWorker = FileReceiver(self.targetDir, self.zmqDataStreamPort, self.zmqDataStreamIp, self.zmqLiveViewerPort, self.zmqLiveViewerIp, self.senderComPort, self.maxRingBufferSize, self.senderResponseTimeout)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--verbose", action="store_true", help="more verbose output")
arguments = parser.parse_args()
receiver = Receiver(arguments.verbose)
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'
import sys
import argparse
import logging
import os
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) )
ZEROMQ_PATH = BASE_PATH + os.sep + "src" + os.sep + "ZeroMQTunnel"
CONFIG_PATH = BASE_PATH + os.sep + "conf"
sys.path.append ( ZEROMQ_PATH )
sys.path.append ( CONFIG_PATH )
import helperScript
from FileReceiver import FileReceiver
from config import defaultConfigReceiver as defaultConfig
def argumentParsing():
defConf = defaultConfig()
parser = argparse.ArgumentParser()
parser.add_argument("--logfilePath" , type=str, default=defConf.logfilePath , help="path where logfile will be created (default=" + str(defConf.logfilePath) + ")")
parser.add_argument("--logfileName" , type=str, default=defConf.logfileName , help="filename used for logging (default=" + str(defConf.logfileName) + ")")
parser.add_argument("--targetDir" , type=str, default=defConf.targetDir , help="where incoming data will be stored to (default=" + str(defConf.targetDir) + ")")
parser.add_argument("--dataStreamIp" , type=str, default=defConf.dataStreamIp , help="ip of dataStream-socket to pull new files from (default=" + str(defConf.dataStreamIp) + ")")
parser.add_argument("--dataStreamPort" , type=str, default=defConf.dataStreamPort , help="port number of dataStream-socket to pull new files from (default=" + str(defConf.dataStreamPort) + ")")
parser.add_argument("--liveViewerIp" , type=str, default=defConf.liveViewerIp , help="local ip to bind LiveViewer to (default=" + str(defConf.liveViewerIp) + ")")
parser.add_argument("--liveViewerPort" , type=str, default=defConf.liveViewerPort , help="tcp port of live viewer (default=" + str(defConf.liveViewerPort) + ")")
parser.add_argument("--senderComPort" , type=str, default=defConf.senderComPort , help="port number of dataStream-socket to send signals back to the sender (default=" + str(defConf.senderComPort) + ")")
parser.add_argument("--maxRingBufferSize" , type=int, default=defConf.maxRingBufferSize , help="size of the ring buffer for the live viewer (default=" + str(defConf.maxRingBufferSize) + ")")
parser.add_argument("--senderResponseTimeout", type=int, default=defConf.senderResponseTimeout, help=argparse.SUPPRESS)
parser.add_argument("--verbose" , action="store_true" , help="more verbose output")
arguments = parser.parse_args()
targetDir = str(arguments.targetDir)
# check target directory for existance
helperScript.checkFolderExistance(targetDir)
return arguments
class ReceiverLiveViewer():
logfilePath = None
logfileName = None
logfileFullPath = None
verbose = None
targetDir = None
zmqDataStreamIp = None
zmqDataStreamPort = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
senderComPort = None
maxRingBufferSize = None
senderResponseTimeout = None
def __init__(self, verbose):
defConf = defaultConfig()
self.logfilePath = defConf.logfilePath
self.logfileName = defConf.logfileName
self.logfileFullPath = os.path.join(self.logfilePath, self.logfileName)
self.verbose = verbose
self.targetDir = defConf.targetDir
self.zmqDataStreamIp = defConf.dataStreamIp
self.zmqDataStreamPort = defConf.dataStreamPort
self.zmqLiveViewerIp = defConf.liveViewerIp
self.zmqLiveViewerPort = defConf.liveViewerPort
self.senderComPort = defConf.senderComPort
self.maxRingBufferSize = defConf.maxRingBufferSize
self.senderResponseTimeout = defConf.senderResponseTimeout
#enable logging
helperScript.initLogging(self.logfileFullPath, self.verbose)
#start file receiver
myWorker = FileReceiver(self.targetDir, self.zmqDataStreamPort, self.zmqDataStreamIp, self.zmqLiveViewerPort, self.zmqLiveViewerIp, self.senderComPort, self.maxRingBufferSize, self.senderResponseTimeout)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--verbose", action="store_true", help="more verbose output")
arguments = parser.parse_args()
receiver = ReceiverLiveViewer(arguments.verbose)
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import sys
import time
import zmq
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment