Skip to content
Snippets Groups Projects
Commit 8ef97ecf authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Removed receiver of old architecture

parent d1aea05f
No related branches found
No related tags found
No related merge requests found
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'
import sys
import argparse
import logging
import os
import json
import ConfigParser
import zmq
import time
from multiprocessing import Process, freeze_support
import shared.helperScript as helperScript
from shared.LiveViewCommunicator import LiveViewCommunicator
from receiverLiveViewer.FileReceiver import FileReceiver
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) )
CONFIG_PATH = BASE_PATH + os.sep + "conf"
def argumentParsing():
configFile = CONFIG_PATH + os.sep + "receiverLiveViewer.conf"
config = ConfigParser.RawConfigParser()
config.readfp(helperScript.FakeSecHead(open(configFile)))
logfilePath = config.get('asection', 'logfilePath')
logfileName = config.get('asection', 'logfileName')
targetDir = config.get('asection', 'targetDir')
dataStreamPort = config.get('asection', 'dataStreamPort')
liveViewerComIp = config.get('asection', 'liveViewerComIp')
liveViewerComPort = config.get('asection', 'liveViewerComPort')
liveViewerWhiteList = json.loads(config.get('asection', 'liveViewerWhiteList'))
lvCommunicatorPort = config.get('asection', 'lvCommunicatorPort')
signalIp = config.get('asection', 'signalIp')
maxRingBufferSize = config.get('asection', 'maxRingBufferSize')
maxQueueSize = config.get('asection', 'maxQueueSize')
senderResponseTimeout = config.get('asection', 'senderResponseTimeout')
parser = argparse.ArgumentParser()
parser.add_argument("--logfilePath" , type=str, default=logfilePath,
help="Path where logfile will be created (default=" + str(logfilePath) + ")")
parser.add_argument("--logfileName" , type=str, default=logfileName,
help="Filename used for logging (default=" + str(logfileName) + ")")
parser.add_argument("--targetDir" , type=str, default=targetDir,
help="Where incoming data will be stored to (default=" + str(targetDir) + ")")
parser.add_argument("--dataStreamPort" , type=str, default=dataStreamPort,
help="Port number of dataStream-socket to pull new files from; there needs to be one entry for each streams (default=" + str(dataStreamPort) + ")")
parser.add_argument("--liveViewerComIp" , type=str, default=liveViewerComIp,
help="IP to bind LiveViewer to (default=" + str(liveViewerComIp) + ")")
parser.add_argument("--liveViewerComPort" , type=str, default=liveViewerComPort,
help="TCP port of live viewer (default=" + str(liveViewerComPort) + ")")
parser.add_argument("--liveViewerWhiteList" , type=str, default=liveViewerWhiteList,
help="List of hosts allowed to connect to the receiver (default=" + str(liveViewerWhiteList) + ")")
parser.add_argument("--lvCommunicatorPort" , type=str, default=lvCommunicatorPort,
help="Port to exchange data and signals between receiver and lvcommunicator (default=" + str(lvCommunicatorPort) + ")")
parser.add_argument("--signalIp" , type=str, default=signalIp,
help="Port number of dataStream-socket to send signals back to the sender (default=" + str(signalIp) + ")")
parser.add_argument("--maxRingBufferSize" , type=int, default=maxRingBufferSize,
help="Size of the ring buffer for the live viewer (default=" + str(maxRingBufferSize) + ")")
parser.add_argument("--maxQueueSize" , type=int, default=maxQueueSize,
help="Size of the queue for the live viewer (default=" + str(maxQueueSize) + ")")
parser.add_argument("--senderResponseTimeout" , type=int, default=senderResponseTimeout,
help=argparse.SUPPRESS)
parser.add_argument("--verbose" , action="store_true",
help="More verbose output")
parser.add_argument("--onScreen" , type=str, default=False,
help="Display logging on screen (options are CRITICAL, ERROR, WARNING, INFO, DEBUG)")
arguments = parser.parse_args()
targetDir = str(arguments.targetDir)
logfilePath = str(arguments.logfilePath)
logfileName = str(arguments.logfileName)
logfileFullPath = os.path.join(logfilePath, logfileName)
verbose = arguments.verbose
onScreen = arguments.onScreen
#enable logging
helperScript.initLogging(logfileFullPath, verbose, onScreen)
# check target directory for existance
helperScript.checkDirExistance(targetDir)
helperScript.checkDirEmpty(targetDir)
# check if logfile is writable
helperScript.checkLogFileWritable(logfilePath, logfileName)
return arguments
class ReceiverLiveViewer():
targetDir = None
dataStreamPort = None
liveViewerComIp = None
liveViewerComPort = None
liveViewerWhiteList = None
lvCommunicatorPort = None
signalIp = None
maxRingBufferSize = None
maxQueueSize = None
senderResponseTimeout = None
def __init__(self):
arguments = argumentParsing()
self.targetDir = arguments.targetDir
self.dataStreamPort = arguments.dataStreamPort
self.liveViewerComIp = arguments.liveViewerComIp
self.liveViewerComPort = arguments.liveViewerComPort
self.liveViewerWhiteList = arguments.liveViewerWhiteList
self.lvCommunicatorPort = arguments.lvCommunicatorPort
self.signalIp = arguments.signalIp
self.maxRingBufferSize = arguments.maxRingBufferSize
self.maxQueueSize = arguments.maxQueueSize
self.senderResponseTimeout = arguments.senderResponseTimeout
# self.context = zmq.Context.instance()
# logging.debug("registering zmq global context")
self.run()
def run(self):
# start file receiver
# lvCommunicatorProcess = threading.Thread(target=LiveViewCommunicator, args=(self.lvCommunicatorPort, self.liveViewerComPort, self.liveViewerComIp, self.maxRingBuffersize, self.maxQueueSize))
logging.info("start lvCommunicator process...")
lvCommunicatorProcess = Process(target=LiveViewCommunicator, args=(self.lvCommunicatorPort,
self.liveViewerComPort, self.liveViewerComIp, self.liveViewerWhiteList,
self.maxRingBufferSize, self.maxQueueSize))
lvCommunicatorProcess.start()
#start file receiver
fileReceiver = FileReceiver(self.targetDir,
self.signalIp, self.dataStreamPort,
self.lvCommunicatorPort, self.senderResponseTimeout)
try:
fileReceiver.process()
except KeyboardInterrupt:
logging.debug("Keyboard interruption detected. Shutting down")
# except Exception, e:
# print "unknown exception detected."
# finally:
# try:
# logging.debug("Destroying ZMQ context...")
# self.context.destroy()
# logging.debug("Destroying ZMQ context...done.")
# except:
# logging.debug("Destroying ZMQ context...failed.")
# logging.error(sys.exc_info())
if __name__ == "__main__":
freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows
receiver = ReceiverLiveViewer()
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'
import zmq
import sys
import logging
import errno
import os
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) )
API_PATH = BASE_PATH + os.sep + "APIs"
if not API_PATH in sys.path:
sys.path.append ( API_PATH )
del API_PATH
del BASE_PATH
from dataTransferAPI import dataTransfer
#
# -------------------------- class: FileReceiver --------------------------------------
#
class FileReceiver:
context = None
externalContext = None # if the context was created outside this class or not
outputDir = None
dataStreamPort = None
lvCommunicatorIp = None
lvCommunicatorPort = None
signalIp = None # ip for socket to communicate with the sender
socketResponseTimeout = None # time in milliseconds to wait for the sender to answer to a signal
log = None
# sockets
dataStreamSocket = None # socket to receive the data from
lvCommunicatorSocket = None # socket to communicate with LiveViewCommunicator class
signalSocket = None # socket to communicate with sender
def __init__(self, outputDir,
signalIp, dataStreamPort,
lvCommunicatorPort, senderResponseTimeout = 1000,
context = None):
self.outputDir = os.path.normpath(outputDir)
self.lvCommunicatorIp = "127.0.0.1"
self.lvCommunicatorPort = lvCommunicatorPort
self.signalIp = signalIp
self.socketResponseTimeout = senderResponseTimeout
if context:
self.context = context
self.externalContext = True
else:
self.context = zmq.Context()
self.externalContext = False
#self.context = context or zmq.Context()
self.log = self.getLogger()
self.log.debug("Init")
self.dataTransferObject = dataTransfer(signalIp, dataStreamPort, useLog = True, context = self.context)
# create sockets
self.createSockets()
def getLogger(self):
logger = logging.getLogger("fileReceiver")
return logger
def createSockets(self):
# create socket to communicate with LiveViewCommunicator
self.lvCommunicatorSocket = self.context.socket(zmq.PAIR)
connectionStr = "tcp://{ip}:{port}".format(ip=self.lvCommunicatorIp, port=self.lvCommunicatorPort)
try:
self.lvCommunicatorSocket.connect(connectionStr)
self.log.debug("lvCommunicatorSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start lvCommunicatorSocket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
def process(self):
try:
self.dataTransferObject.start("stream")
continueReceiving = True #receiving will stop if value gets False
self.log.debug("Waiting for new messages...")
except Exception as e:
self.log.error("could not initiate stream")
self.log.debug("Error was: " + str(e))
continueReceiving = False
#run loop, and wait for incoming messages
while continueReceiving:
try:
self.combineMessage()
except KeyboardInterrupt:
self.log.debug("Keyboard interrupt detected. Stop receiving.")
break
except Exception as e:
self.log.error("receive message...failed.")
self.log.error("Error was: " + str(e))
break
self.log.info("shutting down receiver...")
self.stop()
def combineMessage(self):
try:
[payloadMetadata, payload] = self.dataTransferObject.get()
except Exception as e:
self.log.error("Getting data failed.")
self.log.debug("Error was: " + str(e))
raise
self.dataTransferObject.store(self.outputDir, [payloadMetadata, payload] )
filename = self.dataTransferObject.generateTargetFilepath(self.outputDir, payloadMetadata)
fileModTime = payloadMetadata["fileModificationTime"]
# send the file to the LiveViewCommunicator to add it to the ring buffer
message = "AddFile" + str(filename) + ", " + str(fileModTime)
self.log.debug("Send file to LiveViewCommunicator: " + message )
self.lvCommunicatorSocket.send(message)
def stop(self):
if self.lvCommunicatorSocket:
try:
self.log.debug("Sending exit signal to LiveViewCommunicator...")
self.lvCommunicatorSocket.send("Exit")
except Exception as e:
self.log.error("Sending exit signal to LiveViewCommunicator...failed")
self.log.debug("Error was: " + str(e))
try:
self.log.debug("Closing communication socket...")
# give signal time to arrive
self.lvCommunicatorSocket.close(0.2)
self.lvCommunicatorSocket = None
self.log.debug("Closing communication socket...done")
except Exception as e:
self.log.error("Closing communication socket...failed")
self.log.debug("Error was: " + str(e))
self.dataTransferObject.stop()
if not self.externalContext:
try:
if self.context:
self.log.info("Destroying ZMQ context...")
self.context.destroy()
self.context = None
self.log.debug("Destroying ZMQ context...done.")
except:
self.log.error("Destroying ZMQ context...failed.")
self.log.debug("Error was: " + str(e))
def __exit__(self):
self.stop()
def __del__(self):
self.stop()
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import os
import sys
import time
import zmq
import logging
import socket # needed to get hostname
import shared.helperScript as helperScript
class ReceiverRealTimeAnalysis():
senderComIp = "127.0.0.1"
senderComPort = "50000"
senderDataIp = "127.0.0.1"
senderDataPort = "50200"
zmqContext = None
senderComSocket = None
hostname = socket.gethostname()
def __init__(self, senderResponseTimeout = 1000):
self.zmqContext = zmq.Context()
assert isinstance(self.zmqContext, zmq.sugar.context.Context)
self.socketResponseTimeout = senderResponseTimeout
self.log = self.getLogger()
self.log.debug("Init")
self.senderComSocket = self.zmqContext.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
self.senderComSocket.RCVTIMEO = self.socketResponseTimeout
connectionStr = "tcp://{ip}:{port}".format(ip=self.senderComIp, port=self.senderComPort)
self.senderComSocket.connect(connectionStr)
self.log.debug("senderComSocket started (connect) for '" + connectionStr + "'")
print "senderComSocket started (connect) for '" + connectionStr + "'"
self.senderDataSocket = self.zmqContext.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
connectionStr = "tcp://{ip}:{port}".format(ip=self.senderDataIp, port=self.senderDataPort)
self.senderDataSocket.connect(connectionStr)
self.log.debug("senderDataSocket started (connect) for '" + connectionStr + "'")
print "senderDataSocket started (connect) for '" + connectionStr + "'"
message = "START_REALTIME_ANALYSIS," + str(self.hostname)
self.log.info("Sending start signal to sender...")
self.log.debug("Sending start signal to sender, message: " + message)
print "sending message ", message
self.senderComSocket.send(str(message))
senderMessage = None
try:
senderMessage = self.senderComSocket.recv()
print "answer to start realtime analysis: ", senderMessage
self.log.debug("Received message from sender: " + str(senderMessage) )
except KeyboardInterrupt:
self.log.error("KeyboardInterrupt: No message received from sender")
self.stop( sendToSender = False)
sys.exit(1)
except Exception as e:
self.log.error("No message received from sender")
self.log.debug("Error was: " + str(e))
self.stop( sendToSender = False)
sys.exit(1)
if senderMessage == "START_REALTIME_ANALYSIS":
self.log.info("Received confirmation from sender...start receiving files")
else:
print "Sending start signal to sender...failed."
self.log.info("Sending start signal to sender...failed.")
self.stop(sendToSender = False)
def getLogger(self):
logger = logging.getLogger("Receiver")
return logger
def askForNextFile(self):
# get latest file from reveiver
message = "NEXT_FILE"
while True:
try:
print "Asking for next file"
self.log.debug("Asking for next file")
self.senderDataSocket.send(message)
self.log.debug("Asking for next file...done")
print "Asking for next file...done"
time.sleep(1)
try:
# Get the reply.
received_file = self.senderDataSocket.recv_multipart()
# print "Received_file", "".join(received_file)[:45]
print "Received_file", received_file[0][:45]
self.log.debug("Received_file" + str(received_file))
except zmq.error.ZMQError:
received_file = None
self.log.warning("Unable to reveice reply: sender is currently busy")
except Exception as e:
self.log.error("Unable receive reply")
self.log.debug("Error was: " + str(e))
break
except Exception as e:
self.log.error("Unable to send request")
self.log.debug("Error was: " + str(e))
def stop(self, sendToSender = True):
if sendToSender:
self.log.debug("sending stop signal to sender...")
message = "STOP_REALTIME_ANALYSIS,"+ str(self.hostname)
print "sending message ", message
try:
self.senderComSocket.send(str(message), zmq.NOBLOCK)
except zmq.error.ZMQError:
self.log.debug("Unable to send stop signal to sender")
try:
senderMessage = self.senderComSocket.recv()
print "answer to stop realtime analysis: ", senderMessage
self.log.debug("Received message from sender: " + str(senderMessage) )
if senderMessage == "STOP_REALTIME_ANALYSIS":
self.log.info("Received confirmation from sender...")
else:
self.log.debug("Received unexpected response from sender")
self.log.debug("Try to send stop signal again")
try:
self.senderComSocket.send(str(message), zmq.NOBLOCK)
except zmq.error.ZMQError:
self.log.debug("Unable to send stop signal to sender (second try)")
senderMessage = self.senderComSocket.recv()
print "answer to stop realtime analysis(second try): ", senderMessage
self.log.debug("Received message from sender: " + str(senderMessage) )
if senderMessage == "STOP_REALTIME_ANALYSIS":
self.log.info("Received confirmation from sender...")
else:
self.log.error("Received confirmation from sender...failed")
except KeyboardInterrupt:
self.log.error("KeyboardInterrupt: No message received from sender")
except Exception as e:
self.log.error("sending stop signal to sender...failed.")
self.log.debug("Error was: " + str(e))
# give the signal time to arrive
time.sleep(0.1)
# close ZeroMQ socket and destroy ZeroMQ context
try:
self.log.debug("closing sockets...")
self.senderComSocket.close(linger=0)
self.senderDataSocket.close(linger=0)
self.log.debug("closing sockets...done")
except Exception as e:
self.log.error("closing sockets...failed")
self.log.debug("Error was: " + str(e))
try:
self.zmqContext.destroy()
self.log.debug("closing zmqContext...done.")
except Exception as e:
self.log.debug("closing zmqContext...failed.")
self.log.debug("Error was: " + str(e))
if __name__ == '__main__':
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) )
CONFIG_PATH = BASE_PATH + os.sep + "conf"
sys.path.append ( CONFIG_PATH )
logfilePath = BASE_PATH + os.sep + "logs/receiver_RealTimeAnalysis.log"
verbose = True
#enable logging
helperScript.initLogging(logfilePath, verbose)
receiver = ReceiverRealTimeAnalysis()
time.sleep(0.5)
i = 0
while True:
try:
receiver.askForNextFile()
time.sleep(1)
except KeyboardInterrupt:
break
# if i >= 5:
# break
# else:
# i += 1
receiver.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment