Skip to content
Snippets Groups Projects
Commit 0f765851 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fix signal exchange

parent 42468bf3
No related branches found
No related tags found
No related merge requests found
......@@ -6,6 +6,7 @@ import logging
import os
import sys
import traceback
import copy
from multiprocessing import Process
from WorkerProcess import WorkerProcess
......@@ -89,7 +90,7 @@ class FileMover():
else:
self.receiverWhiteList.append(host)
self.parallelDataStreams = parallelDataStreams
self.parallelDataStreams = int(parallelDataStreams)
self.chunkSize = chunkSize
self.freeWorker = [] # list of workers which sent a ready signal (again) during signal distributing process
......@@ -163,7 +164,7 @@ class FileMover():
incomingMessageCounter = 0
#start worker-processes. each will have its own PushSocket.
numberOfWorkerProcesses = int(self.parallelDataStreams)
numberOfWorkerProcesses = self.parallelDataStreams
for processNumber in range(numberOfWorkerProcesses):
self.log.debug("instantiate new workerProcess (nr " + str(processNumber) + " )")
newWorkerProcess = Process(target=WorkerProcess, args=(processNumber,
......@@ -256,6 +257,8 @@ class FileMover():
else:
version, signal, host, port = incomingMessage
host = host.split(',')
port = port.split(',')
if version:
if helperScript.checkVersion(version, self.log):
......@@ -269,13 +272,26 @@ class FileMover():
# Checking signal sending host
self.log.debug("Check if signal sending host is in WhiteList...")
if helperScript.checkHost(host, self.receiverWhiteList):
self.log.debug("Host " + str(host) + " is allowed to connect.")
if helperScript.checkHost(host, self.receiverWhiteList, self.log):
self.log.debug("One of the hosts is allowed to connect.")
self.log.debug("hosts: " + str(host))
else:
self.log.debug("Host " + str(host) + " is not allowed to connect.")
self.log.debug("One of the hosts is not allowed to connect.")
self.log.debug("hosts: " + str(host))
self.sendResponse("NO_VALID_HOST")
return False, None, None, None
if signal in ["START_QUERY_NEXT", "STOP_QUERY_NEXT"]:
if len(host) != self.parallelDataStreams:
self.log.debug("Not enough hosts specified.")
self.sendResponse("INCORRECT_NUMBER_OF_HOSTS")
return False, None, None, None
if len(port) != self.parallelDataStreams:
self.log.debug("Not enough ports specified.")
self.sendResponse("INCORRECT_NUMBER_OF_PORTS")
return False, None, None, None
return True, signal, host, port
......@@ -285,14 +301,17 @@ class FileMover():
def reactToSignal(self, signal, host, port):
message = signal + "," + host + "," + port
# React to signal
if signal == "START_STREAM":
#FIXME
host = host[0]
port = port[0]
self.log.info("Received signal to start stream to host " + str(host) + " on port " + str(port))
if [host, port] not in self.openConnections["streams"]:
self.openConnections["streams"].append([host, port])
# send signal to workerProcesses and back to receiver
message = signal + "," + host + "," + port
self.sendSignalToWorker(message)
self.sendResponse(signal)
else:
......@@ -301,10 +320,13 @@ class FileMover():
return
elif signal == "STOP_STREAM":
host = host[0]
port = port[0]
self.log.info("Received signal to stop stream to host " + str(host) + " on port " + str(port))
if [host, port] in self.openConnections["streams"]:
self.openConnections["streams"].remove([host, port])
# send signal to workerProcesses and back to receiver
message = signal + "," + host + "," + port
self.sendSignalToWorker(message)
self.sendResponse(signal)
else:
......@@ -315,9 +337,9 @@ class FileMover():
elif signal == "START_QUERY_NEXT" or signal == "START_REALTIME_ANALYSIS":
self.log.info("Received signal from host " + str(host) + " to enable querying for data")
if [host, port] not in self.openConnections["queryNext"]:
self.openConnections["queryNext"].append([host, port])
self.openConnections["queryNext"].append([copy.deepcopy(host), copy.deepcopy(port)])
# send signal to workerProcesses and back to receiver
self.sendSignalToWorker(message)
self.sendSignalToWorker(signal, host, port)
self.sendResponse(signal)
else:
self.log.info("Query connection to host " + str(host) + " on port " + str(port) + " is already started")
......@@ -326,10 +348,11 @@ class FileMover():
elif signal == "STOP_QUERY_NEXT" or signal == "STOP_REALTIME_ANALYSIS":
self.log.info("Received signal from host " + str(host) + " to disable querying for data")
print self.openConnections["queryNext"]
if [host, port] in self.openConnections["queryNext"]:
self.openConnections["queryNext"].remove([host, port])
# send signal to workerProcesses and back to receiver
self.sendSignalToWorker(message)
self.sendSignalToWorker(signal, host, port)
self.log.debug("Send signal to worker: " + str(signal))
self.sendResponse(signal)
self.log.debug("Setime.sleep(0.1)nd response back: " + str(signal))
......@@ -343,8 +366,9 @@ class FileMover():
self.sendResponse("NO_VALID_SIGNAL")
def sendSignalToWorker(self, signal, individual = False):
numberOfWorkerProcesses = int(self.parallelDataStreams)
def sendSignalToWorker(self, signal, individualHost = False, individualPort = False):
numberOfWorkerProcesses = self.parallelDataStreams
alreadySignalled = []
# for processNumber in range(numberOfWorkerProcesses):
......@@ -354,19 +378,26 @@ class FileMover():
self.log.debug("Available workerProcess detected.")
if address not in alreadySignalled:
self.log.debug("Send signal " + str(signal) + " to " + str(address))
if individualHost and individualPort:
signalToSend = signal + "," + individualHost.pop() + "," + individualPort.pop()
else:
signalToSend = signal
self.log.debug("Send signal " + str(signalToSend) + " to " + str(address))
# address == "worker-0"
# empty == b'' # as delimiter
# signal == b'START_STREAM'
self.routerSocket.send_multipart([
address,
b'',
signal,
signalToSend,
])
alreadySignalled.append(address)
else:
self.log.debug("Signal " + str(signal) + " already sent to " + str(address) + ". Mark this worker as ready/.")
self.log.debug("Signal " + str(signalToSend) + " already sent to " + str(address) + ". Mark this worker as ready/.")
self.freeWorker.append(address)
......
......@@ -228,6 +228,8 @@ class LiveViewCommunicator:
else:
version, signal, host, port = message
host = host.split(',')
port = port.split(',')
if version:
if helperScript.checkVersion(version, self.log):
......@@ -240,11 +242,13 @@ class LiveViewCommunicator:
if signal and host and port :
# Checking signal sending host
self.log.debug("Check if signal sending host is in WhiteList...")
if helperScript.checkHost(host, self.receiverWhiteList):
self.log.debug("Host " + str(host) + " is allowed to connect.")
self.log.debug("Check if hosts is in WhiteList...")
if helperScript.checkHost(host, self.receiverWhiteList, self.log):
self.log.debug("One of the hosts is allowed to connect.")
self.log.debug("hosts: " + str(host))
else:
self.log.debug("Host " + str(host) + " is not allowed to connect.")
self.log.debug("One of the hosts is not allowed to connect.")
self.log.debug("hosts: " + str(host))
self.sendResponse("NO_VALID_HOST")
return False, None, None, None
......
......@@ -222,17 +222,33 @@ def checkVersion(version, log):
return True
def checkHost(hostname, whiteList ):
def checkHost(hostname, whiteList, log):
if hostname and whiteList:
if hostname.endswith(".desy.de"):
hostnameModified = hostname[:-8]
if type(hostname) == list:
temp = True
for host in hostname:
if host.endswith(".desy.de"):
hostModified = host[:-8]
else:
hostModified = host
if host not in whiteList and hostModified not in whiteList:
log.info("Host " + str(host) + " is not allowed to connect")
temp = False
return temp
else:
hostnameModified = hostname
if hostname.endswith(".desy.de"):
hostnameModified = hostname[:-8]
else:
hostnameModified = hostname
if hostname in whiteList or hostnameModified in whiteList:
return True
if hostname in whiteList or hostnameModified in whiteList:
return True
return False
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment