From 0f765851c7702d1b0986b717e131df8585c2e923 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Fri, 18 Dec 2015 12:26:29 +0100 Subject: [PATCH] Fix signal exchange --- src/sender/FileMover.py | 59 +++++++++++++++++++++++------- src/shared/LiveViewCommunicator.py | 12 ++++-- src/shared/helperScript.py | 28 +++++++++++--- 3 files changed, 75 insertions(+), 24 deletions(-) diff --git a/src/sender/FileMover.py b/src/sender/FileMover.py index 32f8f55a..c1464772 100644 --- a/src/sender/FileMover.py +++ b/src/sender/FileMover.py @@ -6,6 +6,7 @@ import logging import os import sys import traceback +import copy from multiprocessing import Process from WorkerProcess import WorkerProcess @@ -89,7 +90,7 @@ class FileMover(): else: self.receiverWhiteList.append(host) - self.parallelDataStreams = parallelDataStreams + self.parallelDataStreams = int(parallelDataStreams) self.chunkSize = chunkSize self.freeWorker = [] # list of workers which sent a ready signal (again) during signal distributing process @@ -163,7 +164,7 @@ class FileMover(): incomingMessageCounter = 0 #start worker-processes. each will have its own PushSocket. - numberOfWorkerProcesses = int(self.parallelDataStreams) + numberOfWorkerProcesses = self.parallelDataStreams for processNumber in range(numberOfWorkerProcesses): self.log.debug("instantiate new workerProcess (nr " + str(processNumber) + " )") newWorkerProcess = Process(target=WorkerProcess, args=(processNumber, @@ -256,6 +257,8 @@ class FileMover(): else: version, signal, host, port = incomingMessage + host = host.split(',') + port = port.split(',') if version: if helperScript.checkVersion(version, self.log): @@ -269,13 +272,26 @@ class FileMover(): # Checking signal sending host self.log.debug("Check if signal sending host is in WhiteList...") - if helperScript.checkHost(host, self.receiverWhiteList): - self.log.debug("Host " + str(host) + " is allowed to connect.") + if helperScript.checkHost(host, self.receiverWhiteList, self.log): + self.log.debug("One of the hosts is allowed to connect.") + self.log.debug("hosts: " + str(host)) else: - self.log.debug("Host " + str(host) + " is not allowed to connect.") + self.log.debug("One of the hosts is not allowed to connect.") + self.log.debug("hosts: " + str(host)) self.sendResponse("NO_VALID_HOST") return False, None, None, None + if signal in ["START_QUERY_NEXT", "STOP_QUERY_NEXT"]: + if len(host) != self.parallelDataStreams: + self.log.debug("Not enough hosts specified.") + self.sendResponse("INCORRECT_NUMBER_OF_HOSTS") + return False, None, None, None + + if len(port) != self.parallelDataStreams: + self.log.debug("Not enough ports specified.") + self.sendResponse("INCORRECT_NUMBER_OF_PORTS") + return False, None, None, None + return True, signal, host, port @@ -285,14 +301,17 @@ class FileMover(): def reactToSignal(self, signal, host, port): - message = signal + "," + host + "," + port # React to signal if signal == "START_STREAM": + #FIXME + host = host[0] + port = port[0] self.log.info("Received signal to start stream to host " + str(host) + " on port " + str(port)) if [host, port] not in self.openConnections["streams"]: self.openConnections["streams"].append([host, port]) # send signal to workerProcesses and back to receiver + message = signal + "," + host + "," + port self.sendSignalToWorker(message) self.sendResponse(signal) else: @@ -301,10 +320,13 @@ class FileMover(): return elif signal == "STOP_STREAM": + host = host[0] + port = port[0] self.log.info("Received signal to stop stream to host " + str(host) + " on port " + str(port)) if [host, port] in self.openConnections["streams"]: self.openConnections["streams"].remove([host, port]) # send signal to workerProcesses and back to receiver + message = signal + "," + host + "," + port self.sendSignalToWorker(message) self.sendResponse(signal) else: @@ -315,9 +337,9 @@ class FileMover(): elif signal == "START_QUERY_NEXT" or signal == "START_REALTIME_ANALYSIS": self.log.info("Received signal from host " + str(host) + " to enable querying for data") if [host, port] not in self.openConnections["queryNext"]: - self.openConnections["queryNext"].append([host, port]) + self.openConnections["queryNext"].append([copy.deepcopy(host), copy.deepcopy(port)]) # send signal to workerProcesses and back to receiver - self.sendSignalToWorker(message) + self.sendSignalToWorker(signal, host, port) self.sendResponse(signal) else: self.log.info("Query connection to host " + str(host) + " on port " + str(port) + " is already started") @@ -326,10 +348,11 @@ class FileMover(): elif signal == "STOP_QUERY_NEXT" or signal == "STOP_REALTIME_ANALYSIS": self.log.info("Received signal from host " + str(host) + " to disable querying for data") + print self.openConnections["queryNext"] if [host, port] in self.openConnections["queryNext"]: self.openConnections["queryNext"].remove([host, port]) # send signal to workerProcesses and back to receiver - self.sendSignalToWorker(message) + self.sendSignalToWorker(signal, host, port) self.log.debug("Send signal to worker: " + str(signal)) self.sendResponse(signal) self.log.debug("Setime.sleep(0.1)nd response back: " + str(signal)) @@ -343,8 +366,9 @@ class FileMover(): self.sendResponse("NO_VALID_SIGNAL") - def sendSignalToWorker(self, signal, individual = False): - numberOfWorkerProcesses = int(self.parallelDataStreams) + def sendSignalToWorker(self, signal, individualHost = False, individualPort = False): + + numberOfWorkerProcesses = self.parallelDataStreams alreadySignalled = [] # for processNumber in range(numberOfWorkerProcesses): @@ -354,19 +378,26 @@ class FileMover(): self.log.debug("Available workerProcess detected.") if address not in alreadySignalled: - self.log.debug("Send signal " + str(signal) + " to " + str(address)) + + if individualHost and individualPort: + signalToSend = signal + "," + individualHost.pop() + "," + individualPort.pop() + else: + signalToSend = signal + + self.log.debug("Send signal " + str(signalToSend) + " to " + str(address)) # address == "worker-0" # empty == b'' # as delimiter # signal == b'START_STREAM' self.routerSocket.send_multipart([ address, b'', - signal, + signalToSend, ]) alreadySignalled.append(address) + else: - self.log.debug("Signal " + str(signal) + " already sent to " + str(address) + ". Mark this worker as ready/.") + self.log.debug("Signal " + str(signalToSend) + " already sent to " + str(address) + ". Mark this worker as ready/.") self.freeWorker.append(address) diff --git a/src/shared/LiveViewCommunicator.py b/src/shared/LiveViewCommunicator.py index feea15f2..924e021f 100644 --- a/src/shared/LiveViewCommunicator.py +++ b/src/shared/LiveViewCommunicator.py @@ -228,6 +228,8 @@ class LiveViewCommunicator: else: version, signal, host, port = message + host = host.split(',') + port = port.split(',') if version: if helperScript.checkVersion(version, self.log): @@ -240,11 +242,13 @@ class LiveViewCommunicator: if signal and host and port : # Checking signal sending host - self.log.debug("Check if signal sending host is in WhiteList...") - if helperScript.checkHost(host, self.receiverWhiteList): - self.log.debug("Host " + str(host) + " is allowed to connect.") + self.log.debug("Check if hosts is in WhiteList...") + if helperScript.checkHost(host, self.receiverWhiteList, self.log): + self.log.debug("One of the hosts is allowed to connect.") + self.log.debug("hosts: " + str(host)) else: - self.log.debug("Host " + str(host) + " is not allowed to connect.") + self.log.debug("One of the hosts is not allowed to connect.") + self.log.debug("hosts: " + str(host)) self.sendResponse("NO_VALID_HOST") return False, None, None, None diff --git a/src/shared/helperScript.py b/src/shared/helperScript.py index bda84803..48a645d3 100644 --- a/src/shared/helperScript.py +++ b/src/shared/helperScript.py @@ -222,17 +222,33 @@ def checkVersion(version, log): return True -def checkHost(hostname, whiteList ): +def checkHost(hostname, whiteList, log): if hostname and whiteList: - if hostname.endswith(".desy.de"): - hostnameModified = hostname[:-8] + if type(hostname) == list: + temp = True + for host in hostname: + if host.endswith(".desy.de"): + hostModified = host[:-8] + else: + hostModified = host + + if host not in whiteList and hostModified not in whiteList: + log.info("Host " + str(host) + " is not allowed to connect") + temp = False + + return temp + + else: - hostnameModified = hostname + if hostname.endswith(".desy.de"): + hostnameModified = hostname[:-8] + else: + hostnameModified = hostname - if hostname in whiteList or hostnameModified in whiteList: - return True + if hostname in whiteList or hostnameModified in whiteList: + return True return False -- GitLab