Skip to content
Snippets Groups Projects
Commit d6e5f2a2 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Merge branch 'multiple_receiver'

parents 04fd1569 164a90c4
No related branches found
No related tags found
No related merge requests found
......@@ -22,6 +22,11 @@ class defaultConfigSender():
# number of parallel data streams
parallelDataStreams = "1"
# list of hosts allowed to connect to the sender
# receiverWhiteList = ["lsdma-lab04"]
# receiverWhiteList = ["zitpcx19282"]
receiverWhiteList = ["zitpcx19282", "zitpcx22614", "lsdma-lab04"]
# zmq endpoint (IP-address) to send file events to
fileEventIp = LOCAL_IP
# zmq endpoint (port) to send file events to
......@@ -37,6 +42,7 @@ class defaultConfigSender():
zmqCleanerPort = "6062"
# port number of dataStream-socket to receive signals from the receiver
receiverComPort = "6080"
# chunk size of file-parts getting send via zmq
chunkSize = 1048576 # = 1024*1024
#chunkSize = 1073741824 # = 1024*1024*1024
......
#Ignore everything in this directory
*
# Except this file
!.gitignore
#Ignore everything in this directory
*
# Except this file
!.gitignore
#Ignore everything in this directory
*
# Except this file
!.gitignore
#Ignore everything in this directory
*
# Except this file
!.gitignore
#Ignore everything in this directory
*
# Except this file
!.gitignore
#Ignore everything in this directory
*
# Except this file
!.gitignore
......@@ -110,10 +110,12 @@ class FileReceiver:
except KeyboardInterrupt:
self.log.error("KeyboardInterrupt: No message received from sender")
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False)
sys.exit(1)
except Exception as e:
self.log.error("No message received from sender")
self.log.debug("Error was: " + str(e))
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False)
sys.exit(1)
if senderMessage == "START_LIVE_VIEWER":
self.log.info("Received confirmation from sender...start receiving files")
......
......@@ -15,6 +15,7 @@ import subprocess
import json
import shutil
import helperScript
import socket # needed to get hostname
from watcher import DirectoryWatcher
from Cleaner import Cleaner
......@@ -397,6 +398,7 @@ class FileMover():
zmqCleanerPort = None # zmq pull endpoint, responsable to delete/move files
receiverComIp = None # ip for socket to communicate with receiver
receiverComPort = None # port for socket to communicate receiver
receiverWhiteList = None
parallelDataStreams = None
chunkSize = None
......@@ -411,8 +413,10 @@ class FileMover():
log = None
def __init__(self, fileEventIp, fileEventPort, dataStreamIp, dataStreamPort, receiverComPort, parallelDataStreams,
chunkSize, zmqCleanerIp, zmqCleanerPort,
def __init__(self, fileEventIp, fileEventPort, dataStreamIp, dataStreamPort,
receiverComPort, receiverWhiteList,
parallelDataStreams, chunkSize,
zmqCleanerIp, zmqCleanerPort,
context = None):
assert isinstance(context, zmq.sugar.context.Context)
......@@ -426,6 +430,7 @@ class FileMover():
self.zmqCleanerPort = zmqCleanerPort
self.receiverComIp = dataStreamIp # ip for socket to communicate with receiver; is the same ip as the data stream ip
self.receiverComPort = receiverComPort
self.receiverWhiteList = receiverWhiteList
self.parallelDataStreams = parallelDataStreams
self.chunkSize = chunkSize
......@@ -545,6 +550,17 @@ class FileMover():
self.receiverComSocket.send("NO_VALID_SIGNAL", zmq.NOBLOCK)
continue
self.log.debug("Check if signal sending host is in WhiteList...")
if signalHostname in self.receiverWhiteList:
self.log.info("Check if signal sending host is in WhiteList...Host " + str(signalHostname) + " is allowed to connect.")
else:
self.log.info("Check if signal sending host is in WhiteList...Host " + str(signalHostname) + " is not allowed to connect.")
self.log.debug("Signal from host " + str(signalHostname) + " is discarded.")
print "Signal from host " + str(signalHostname) + " is discarded."
self.receiverComSocket.send("NO_VALID_HOST", zmq.NOBLOCK)
continue
if signal == "STOP_LIVE_VIEWER":
self.log.info("Received live viewer stop signal from host " + str(signalHostname) + "...stopping live viewer")
print "Received live viewer stop signal from host " + signalHostname + "...stopping live viewer"
......@@ -615,22 +631,24 @@ def argumentParsing():
defConf = defaultConfigSender()
parser = argparse.ArgumentParser()
parser.add_argument("--logfilePath" , type=str, default=defConf.logfilePath , help="path where logfile will be created (default=" + str(defConf.logfilePath) + ")")
parser.add_argument("--logfileName" , type=str, default=defConf.logfileName , help="filename used for logging (default=" + str(defConf.logfileName) + ")")
parser.add_argument("--logfilePath" , type=str , default=defConf.logfilePath , help="path where logfile will be created (default=" + str(defConf.logfilePath) + ")")
parser.add_argument("--logfileName" , type=str , default=defConf.logfileName , help="filename used for logging (default=" + str(defConf.logfileName) + ")")
parser.add_argument("--verbose" , action="store_true" , help="more verbose output")
parser.add_argument("--watchFolder" , type=str, default=defConf.watchFolder , help="folder you want to monitor for changes")
parser.add_argument("--fileEventIp" , type=str, default=defConf.fileEventIp , help="zmq endpoint (IP-address) to send file events to (default=" + str(defConf.fileEventIp) + ")")
parser.add_argument("--fileEventPort" , type=str, default=defConf.fileEventPort , help="zmq endpoint (port) to send file events to (default=" + str(defConf.fileEventPort) + ")")
parser.add_argument("--watchFolder" , type=str , default=defConf.watchFolder , help="folder you want to monitor for changes")
parser.add_argument("--fileEventIp" , type=str , default=defConf.fileEventIp , help="zmq endpoint (IP-address) to send file events to (default=" + str(defConf.fileEventIp) + ")")
parser.add_argument("--fileEventPort" , type=str , default=defConf.fileEventPort , help="zmq endpoint (port) to send file events to (default=" + str(defConf.fileEventPort) + ")")
parser.add_argument("--dataStreamIp" , type=str , default=defConf.dataStreamIp , help="ip of dataStream-socket to push new files to (default=" + str(defConf.dataStreamIp) + ")")
parser.add_argument("--dataStreamPort" , type=str , default=defConf.dataStreamPort , help="port number of dataStream-socket to push new files to (default=" + str(defConf.dataStreamPort) + ")")
parser.add_argument("--cleanerTargetPath" , type=str , default=defConf.cleanerTargetPath , help="Target to move the files into (default=" + str(defConf.cleanerTargetPath) + ")")
parser.add_argument("--zmqCleanerIp" , type=str , default=defConf.zmqCleanerIp , help="zmq-pull-socket ip which deletes/moves given files (default=" + str(defConf.zmqCleanerIp) + ")")
parser.add_argument("--zmqCleanerPort" , type=str , default=defConf.zmqCleanerPort , help="zmq-pull-socket port which deletes/moves given files (default=" + str(defConf.zmqCleanerPort) + ")")
parser.add_argument("--receiverComPort" , type=str , default=defConf.receiverComPort , help="port number of dataStream-socket to receive signals from the receiver (default=" + str(defConf.receiverComPort) + ")")
parser.add_argument("--receiverWhiteList" , nargs='+', default=defConf.receiverWhiteList , help="names of the hosts allowed to receive data (default=" + str(defConf.receiverWhiteList) + ")")
parser.add_argument("--dataStreamIp" , type=str, default=defConf.dataStreamIp , help="ip of dataStream-socket to push new files to (default=" + str(defConf.dataStreamIp) + ")")
parser.add_argument("--dataStreamPort" , type=str, default=defConf.dataStreamPort , help="port number of dataStream-socket to push new files to (default=" + str(defConf.dataStreamPort) + ")")
parser.add_argument("--cleanerTargetPath" , type=str, default=defConf.cleanerTargetPath , help="Target to move the files into (default=" + str(defConf.cleanerTargetPath) + ")")
parser.add_argument("--zmqCleanerIp" , type=str, default=defConf.zmqCleanerIp , help="zmq-pull-socket ip which deletes/moves given files (default=" + str(defConf.zmqCleanerIp) + ")")
parser.add_argument("--zmqCleanerPort" , type=str, default=defConf.zmqCleanerPort , help="zmq-pull-socket port which deletes/moves given files (default=" + str(defConf.zmqCleanerPort) + ")")
parser.add_argument("--receiverComPort" , type=str, default=defConf.receiverComPort , help="port number of dataStream-socket to receive signals from the receiver (default=" + str(defConf.receiverComPort) + ")")
parser.add_argument("--parallelDataStreams", type=int, default=defConf.parallelDataStreams, help="number of parallel data streams (default=" + str(defConf.parallelDataStreams) + ")")
parser.add_argument("--chunkSize" , type=int, default=defConf.chunkSize , help="chunk size of file-parts getting send via zmq (default=" + str(defConf.chunkSize) + ")")
parser.add_argument("--parallelDataStreams", type=int , default=defConf.parallelDataStreams, help="number of parallel data streams (default=" + str(defConf.parallelDataStreams) + ")")
parser.add_argument("--chunkSize" , type=int , default=defConf.chunkSize , help="chunk size of file-parts getting send via zmq (default=" + str(defConf.chunkSize) + ")")
arguments = parser.parse_args()
......@@ -670,6 +688,8 @@ if __name__ == '__main__':
zmqCleanerIp = str(arguments.zmqCleanerIp)
zmqCleanerPort = str(arguments.zmqCleanerPort)
receiverComPort = str(arguments.receiverComPort)
receiverWhiteList = arguments.receiverWhiteList
parallelDataStreams = str(arguments.parallelDataStreams)
chunkSize = int(arguments.chunkSize)
......@@ -696,7 +716,8 @@ if __name__ == '__main__':
logging.debug("start cleaner process...done")
#start new fileMover
fileMover = FileMover(fileEventIp, fileEventPort, dataStreamIp, dataStreamPort, receiverComPort,
fileMover = FileMover(fileEventIp, fileEventPort, dataStreamIp, dataStreamPort,
receiverComPort, receiverWhiteList,
parallelDataStreams, chunkSize,
zmqCleanerIp, zmqCleanerPort,
zmqContext)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment