Skip to content
Snippets Groups Projects
Commit dfb97de9 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Adapted receiver to changed API

parent 73fb279f
No related branches found
No related tags found
No related merge requests found
......@@ -4,9 +4,9 @@ targetDir = /space/projects/live-viewer/data/target
# Local IP to connect dataStream to
dataStreamIp = 131.169.185.121 ;# zitpcx19282.desy.de
# TCP port of data pipe
dataStreamPort = 50010
dataStreamPort = 50100
# Path where logfile will be created
logfilePath = /space/projects/live-viewer/logs
# Filename used for logging
logfileName = zmq_receiver.log
logfileName = receiver.log
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import sys
......@@ -7,84 +7,158 @@ import logging
import os
import ConfigParser
import shared.helperScript as helperScript
from receiver.FileReceiver import FileReceiver
import shared.helpers as helpers
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) )
CONFIG_PATH = BASE_PATH + os.sep + "conf"
API_PATH = BASE_PATH + os.sep + "APIs"
if not API_PATH in sys.path:
sys.path.append ( API_PATH )
del API_PATH
del BASE_PATH
from dataTransferAPI import dataTransfer
def argumentParsing():
configFile = CONFIG_PATH + os.sep + "receiver.conf"
config = ConfigParser.RawConfigParser()
config.readfp(helperScript.FakeSecHead(open(configFile)))
config.readfp(helpers.FakeSecHead(open(configFile)))
logfilePath = config.get('asection', 'logfilePath')
logfileName = config.get('asection', 'logfileName')
targetDir = config.get('asection', 'targetDir')
dataStreamIp = config.get('asection', 'dataStreamIp')
dataStreamPort = config.get('asection', 'dataStreamPort')
parser = argparse.ArgumentParser()
parser.add_argument("--logfilePath" , type=str, default=logfilePath,
help="path where logfile will be created (default=" + str(logfilePath) + ")")
parser.add_argument("--logfileName" , type=str, default=logfileName,
help="filename used for logging (default=" + str(logfileName) + ")")
parser.add_argument("--targetDir" , type=str, default=targetDir,
help="where incoming data will be stored to (default=" + str(targetDir) + ")")
parser.add_argument("--dataStreamIp" , type=str, default=dataStreamIp,
help="ip of dataStream-socket to pull new files from (default=" + str(dataStreamIp) + ")")
parser.add_argument("--dataStreamPort" , type=str, default=dataStreamPort,
help="port number of dataStream-socket to pull new files from (default=" + str(dataStreamPort) + ")")
parser.add_argument("--verbose" , action="store_true",
help="more verbose output")
parser.add_argument("--onScreen" , type=str, default=False,
help="Display logging on screen (options are CRITICAL, ERROR, WARNING, INFO, DEBUG)")
arguments = parser.parse_args()
targetDir = str(arguments.targetDir)
logfilePath = str(arguments.logfilePath)
logfileName = str(arguments.logfileName)
logfileFullPath = os.path.join(logfilePath, logfileName)
verbose = arguments.verbose
onScreen = arguments.onScreen
parser.add_argument("--logfilePath" , type = str,
help = "Path where logfile will be created (default=" + str(logfilePath) + ")",
default = logfilePath )
parser.add_argument("--logfileName" , type = str,
help = "Filename used for logging (default=" + str(logfileName) + ")",
default = logfileName )
parser.add_argument("--verbose" , help = "More verbose output",
action = "store_true" )
parser.add_argument("--onScreen" , type = str,
help = "Display logging on screen (options are CRITICAL, ERROR, WARNING, INFO, DEBUG)",
default = False )
parser.add_argument("--targetDir" , type = str,
help = "Where incoming data will be stored to (default=" + str(targetDir) + ")",
default = targetDir )
parser.add_argument("--dataStreamIp" , type = str,
help = "Ip of dataStream-socket to pull new files from (default=" + str(dataStreamIp) + ")",
default = dataStreamIp )
parser.add_argument("--dataStreamPort" , type = str,
help = "Port number of dataStream-socket to pull new files from (default=" + str(dataStreamPort) + ")",
default = dataStreamPort )
arguments = parser.parse_args()
logfilePath = arguments.logfilePath
logfileName = arguments.logfileName
logfile = os.path.join(logfilePath, logfileName)
verbose = arguments.verbose
onScreen = arguments.onScreen
targetDir = arguments.targetDir
#enable logging
helperScript.initLogging(logfileFullPath, verbose, onScreen)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
handlers = helpers.getLogHandlers(logfile, verbose, onScreen)
if type(handlers) == tuple:
for h in handlers:
root.addHandler(h)
else:
root.addHandler(handlers)
# check target directory for existance
helperScript.checkDirExistance(targetDir)
helpers.checkDirExistance(targetDir)
# check if logfile is writable
helperScript.checkLogFileWritable(logfilePath, logfileName)
helpers.checkLogFileWritable(logfilePath, logfileName)
return arguments
class ReceiverLiveViewer():
targetDir = None
zmqDataStreamIp = None
zmqDataStreamPort = None
class Receiver:
def __init__(self, outputDir, dataIp, dataPort):
self.outputDir = os.path.normpath(outputDir)
self.dataIp = dataIp
self.dataPort = dataPort
self.log = self.getLogger()
self.log.debug("Init")
self.dataTransfer = dataTransfer("stream", useLog = True)
self.run()
zmqLiveViewerIp = None
zmqLiveViewerPort = None
senderComPort = None
maxRingBufferSize = None
senderResponseTimeout = None
def __init__(self):
arguments = argumentParsing()
def getLogger(self):
logger = logging.getLogger("Receiver")
return logger
self.targetDir = arguments.targetDir
self.zmqDataStreamIp = arguments.dataStreamIp
self.zmqDataStreamPort = arguments.dataStreamPort
#start file receiver
myWorker = FileReceiver(self.targetDir, self.zmqDataStreamIp, self.zmqDataStreamPort)
def run(self):
try:
self.dataTransfer.start(self.dataPort)
except:
self.log.error("Could not initiate stream", exc_info=True)
raise
continueReceiving = True #receiving will stop if value gets False
self.log.debug("Waiting for new messages...")
#run loop, and wait for incoming messages
while continueReceiving:
try:
[payloadMetadata, payload] = self.dataTransfer.get()
except KeyboardInterrupt:
return
except:
self.log.error("Getting data failed.", exc_info=True)
raise
try:
self.dataTransfer.store(self.outputDir, [payloadMetadata, payload] )
except KeyboardInterrupt:
return
except:
self.log.error("Storing data...failed.", exc_info=True)
raise
def stop(self):
if self.dataTransfer:
self.log.info("Shutting down receiver...")
self.dataTransfer.stop()
self.dataTransfer = None
def __exit__(self):
self.stop()
if __name__ == "__main__":
receiver = ReceiverLiveViewer()
arguments = argumentParsing()
targetDir = arguments.targetDir
dataStreamIp = arguments.dataStreamIp
dataStreamPort = arguments.dataStreamPort
#start file receiver
receiver = Receiver(targetDir, dataStreamIp, dataStreamPort)
......@@ -304,7 +304,7 @@ def getLogHandlers(logfile, verbose, onScreenLogLevel = False):
# Setup file handler to output to file
# argument for RotatingFileHandler: filename, mode, maxBytes, backupCount)
# 1048576 = 1MB
h1 = logging.handlers.RotatingFileHandler(logfile, 'a', 3*1048576 , 5)
h1 = logging.handlers.RotatingFileHandler(logfile, 'a', 3*1048576, 5)
f1 = logging.Formatter(datefmt=datef,fmt=f)
h1.setFormatter(f1)
h1.setLevel(logLevel)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment