Commit 99f214a2 authored by Manuela Kuhn's avatar Manuela Kuhn

Merged master fixed confict

parents e06b4b53 3772c988
# API to communicate with a data transfer unit
__version__ = '2.0.0'
__version__ = '2.1.0'
import zmq
import socket
......@@ -37,6 +37,28 @@ class noLoggingFunction:
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
class NotSupported(Exception):
pass
class FormatError(Exception):
pass
class ConnectionFailed(Exception):
pass
class VersionError(Exception):
pass
class AuthenticationFailed(Exception):
pass
class CommunicationFailed(Exception):
pass
class DataSavingError(Exception):
pass
class dataTransfer():
def __init__ (self, connectionType, signalHost = None, useLog = False, context = None):
......@@ -83,7 +105,7 @@ class dataTransfer():
if connectionType in self.supportedConnections:
self.connectionType = connectionType
else:
raise Exception("Chosen type of connection is not supported.")
raise NotSupported("Chosen type of connection is not supported.")
# targets: [host, port, prio] or [[host, port, prio], ...]
......@@ -91,7 +113,7 @@ class dataTransfer():
if type(targets) != list:
self.stop()
raise Excepition("Argument 'targets' must be list.")
raise FormatError("Argument 'targets' must be list.")
if not self.context:
self.context = zmq.Context()
......@@ -119,7 +141,7 @@ class dataTransfer():
self.__createSignalSocket(signalPort)
else:
self.stop()
raise Exception("No host to send signal to specified." )
raise ConnectionFailed("No host to send signal to specified." )
self.targets = []
# [host, port, prio]
......@@ -129,13 +151,13 @@ class dataTransfer():
# [[host, port, prio], ...]
else:
for t in targets:
if type(t) == list:
if type(t) == list and len(t) == 3:
host, port, prio = t
self.targets.append([host + ":" + port, prio])
else:
self.stop()
self.log.debug("targets=" + str(targets))
raise Exception("Argument 'targets' is of wrong format.")
raise FormatError("Argument 'targets' is of wrong format.")
# if type(dataPort) == list:
# self.dataHost = str([socket.gethostname() for i in dataPort])
......@@ -146,19 +168,19 @@ class dataTransfer():
if message and message == "VERSION_CONFLICT":
self.stop()
raise Exception("Versions are conflicting.")
raise VersionError("Versions are conflicting.")
elif message and message == "NO_VALID_HOST":
self.stop()
raise Exception("Host is not allowed to connect.")
raise AuthenticationFailed("Host is not allowed to connect.")
elif message and message == "CONNECTION_ALREADY_OPEN":
self.stop()
raise Exception("Connection is already open.")
raise CommunicationFailed("Connection is already open.")
elif message and message == "NO_VALID_SIGNAL":
self.stop()
raise Exception("Connection type is not supported for this kind of sender.")
raise CommunicationFailed("Connection type is not supported for this kind of sender.")
# if there was no response or the response was of the wrong format, the receiver should be shut down
elif message and message.startswith(signal):
......@@ -166,7 +188,7 @@ class dataTransfer():
self.signalExchanged = signal
else:
raise Exception("Sending start signal ...failed.")
raise CommunicationFailed("Sending start signal ...failed.")
def __createSignalSocket (self, signalPort):
......@@ -181,8 +203,8 @@ class dataTransfer():
try:
self.signalSocket.connect(connectionStr)
self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
except:
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'")
raise
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
......@@ -209,14 +231,14 @@ class dataTransfer():
try:
self.signalSocket.send_multipart(sendMessage)
except:
self.log.error("Could not send signal", exc_info=True)
self.log.error("Could not send signal")
raise
message = None
try:
socks = dict(self.poller.poll(self.socketResponseTimeout))
except:
self.log.error("Could not poll for new message", exc_info=True)
self.log.error("Could not poll for new message")
raise
......@@ -227,13 +249,8 @@ class dataTransfer():
message = self.signalSocket.recv()
self.log.info("Received answer to signal: " + str(message) )
except KeyboardInterrupt:
self.log.error("KeyboardInterrupt: No message received")
self.stop()
raise
except:
self.log.error("Could not receive answer to signal", exc_info=True)
self.stop()
self.log.error("Could not receive answer to signal")
raise
return message
......@@ -275,7 +292,7 @@ class dataTransfer():
ip = ipFromHost[0]
else:
raise Exception("Multipe possible ports. Please choose which one to use.")
raise FormatError("Multipe possible ports. Please choose which one to use.")
socketId = host + ":" + port
socketIdToConnect = ip + ":" + port
......@@ -338,13 +355,13 @@ class dataTransfer():
try:
socks = dict(self.poller.poll(timeout))
except:
self.log.error("Could not poll for new message", exc_info=True)
self.log.error("Could not poll for new message")
raise
else:
try:
socks = dict(self.poller.poll())
except:
self.log.error("Could not poll for new message", exc_info=True)
self.log.error("Could not poll for new message")
raise
# if there was a response
......@@ -371,7 +388,7 @@ class dataTransfer():
#TODO validate multipartMessage (like correct dict-values for metadata)
try:
payload = multipartMessage[1:]
payload = multipartMessage[1]
except:
self.log.warning("An empty file was received within the multipart-message", exc_info=True)
payload = None
......@@ -392,14 +409,14 @@ class dataTransfer():
def store (self, targetBasePath, dataObject):
if type(dataObject) is not list and len(dataObject) != 2:
raise Exception("Wrong input type for 'store'")
raise FormatError("Wrong input type for 'store'")
payloadMetadata = dataObject[0]
payload = dataObject[1]
if type(payloadMetadata) is not dict or type(payload) is not list:
raise Exception("payload: Wrong input format in 'store'")
raise FormatError("payload: Wrong input format in 'store'")
#save all chunks to file
while True:
......@@ -456,14 +473,16 @@ class dataTransfer():
newFile = open(targetFilepath, "w")
self.log.info("New target directory created: " + str(targetPath))
except:
self.log.error("Unable to save payload to file: '" + targetFilepath + "'", exc_info=True)
self.log.error("Unable to save payload to file: '" + targetFilepath + "'")
self.log.debug("targetPath:" + str(targetPath))
raise
else:
self.log.error("Failed to append payload to file: '" + targetFilepath + "'", exc_info=True)
self.log.error("Failed to append payload to file: '" + targetFilepath + "'")
raise
except:
self.log.error("Failed to append payload to file: '" + targetFilepath + "'", exc_info=True)
self.log.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST))
self.log.error("Failed to append payload to file: '" + targetFilepath + "'")
# self.log.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST))
raise
#only write data if a payload exist
try:
......@@ -472,7 +491,7 @@ class dataTransfer():
newFile.write(chunk)
newFile.close()
except:
self.log.error("Unable to append data to file.", exc_info=True)
self.log.error("Unable to append data to file.")
raise
......
......@@ -28,10 +28,13 @@ whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "lsdma-lab04",
# ZMQ port to get new requests
requestPort = 50001
# ZMQ port to forward requests
# (needed if running on Windows)
requestFwPort = 50002
# ZMQ port to disribute control signals
# (needed if running on Windows)
controlPort = 50005
#########################################
......@@ -71,12 +74,10 @@ historySize = 0
# (needed if eventDetector is WatchdogDetector)
timeTillClosed = 2
# ZMQ port to get events from (only needed if eventDetectorType is ZmqDetector)
# ZMQ port to get events from
# (needed if eventDetectorType is ZmqDetector)
eventPort = 50003
# Supply a scan prefix. Otherwise the prefix is read from the tango server.
# (needed if eventDetectorType is HttpDetector)
prefix = ""
# Tango device proxy for the detector
# (needed if eventDetectorType is HttpDetector)
#detectorDevice = "haspp10lab:10000/p10/eigerdectris/lab.01"
......@@ -97,9 +98,9 @@ dataFetcherType = getFromFile
#dataFetcherType = getFromHttp
# If "getFromZmq" is specified as dataFetcherType it needs a port to listen to
# (needed if eventDetectorType is ZmqDetector)
dataFetcherPort = 50010
# Number of parallel data streams
# if this number is modifified, the port numbers also have to be adjusted
numberOfStreams = 1
......@@ -116,6 +117,7 @@ chunkSize = 10485760 ; # = 1024*1024*10
#chunkSize = 1073741824 ; # = 1024*1024*1024
# ZMQ-router port which coordinates the load-balancing to the worker-processes
# (needed if running on Windows)
routerPort = 50004
# Target to move the files into
......
#!/bin/bash
INSTANZ="DataManager"
Pidfile=/root/zeromq-data-transfer/DataManager.pid
Pidfile=/space/projects/zeromq-data-transfer/DataManager.pid
#Pidfile=/root/zeromq-data-transfer/DataManager.pid
DATAMANAGER=/root/zeromq-data-transfer/src/sender/DataManager.py
DATAMANAGER=/space/projects/zeromq-data-transfer/src/sender/DataManager.py
#DATAMANAGER=/root/zeromq-data-transfer/src/sender/DataManager.py
if [ -f $Pidfile ]
then
......@@ -10,53 +12,46 @@ then
fi
start() {
if [ -f $Pidfile ] ; then
if test `ps -e | grep -c $Pid` = 1; then
echo "Not starting $INSTANZ - instance already running"
# echo "Not starting $INSTANZ - instance already running with PID: $Pid"
else
echo "Starting $INSTANZ"
nohup /usr/bin/python ${DATAMANAGER} &> /dev/null &
$(ps -ef | grep '${DATAMANAGER}' | awk '{ print $2 }') > $PIDfile
fi
else
echo "Starting $INSTANZ"
nohup /usr/bin/python ${DATAMANAGER} &> /dev/null &
echo $! > $Pidfile
fi
start()
{
if [ -f $Pidfile ] && test `ps -e | grep -c $Pid` = 1; then
echo "Not starting $INSTANZ - instance already running"
# echo "Not starting $INSTANZ - instance already running with PID: $Pid"
else
echo "Starting $INSTANZ"
nohup /usr/bin/python ${DATAMANAGER} &
echo $! > $Pidfile
fi
}
stop()
{
if [ -f $Pidfile ] ; then
echo "Stopping $INSTANZ"
pkill python
# echo ${pid}
# kill -15 $Pid
rm $Pidfile
# echo $!
else
echo "Cannot stop $INSTANZ - no Pidfile found!"
fi
if [ -f $Pidfile ] ; then
echo "Stopping $INSTANZ"
# pkill python
kill -15 $Pid
rm $Pidfile
else
echo "Cannot stop $INSTANZ - no Pidfile found!"
fi
}
status()
{
if [ -f $Pidfile ] ; then
if test `ps -e | grep -c $Pid` = 0; then
echo "$INSTANZ not running"
else
# echo "$INSTANZ running with PID: [$Pid]"
echo "$INSTANZ running"
fi
else
echo "$Pidfile does not exist! Cannot process $INSTANZ status!"
exit 1
fi
}
if [ -f $Pidfile ] ; then
if test `ps -e | grep -c $Pid` = 0; then
echo "$INSTANZ not running"
else
# echo "$INSTANZ running with PID: [$Pid]"
echo "$INSTANZ running"
fi
else
echo "$INSTANZ not running"
# echo "$Pidfile does not exist! Cannot process $INSTANZ status!"
# exit 1
fi
}
case "$1" in
......
This diff is collapsed.
......@@ -34,7 +34,7 @@ import helpers
class DataDispatcher():
#class DataDispatcher(Process):
def __init__ (self, id, controlPort, routerPort, chunkSize, fixedStreamId, dataFetcherProp,
def __init__ (self, id, controlConId, routerConId, chunkSize, fixedStreamId, dataFetcherProp,
logQueue, localTarget = None, context = None):
# dataFetcherProp = {
......@@ -53,24 +53,24 @@ class DataDispatcher():
self.id = id
self.log = self.__getLogger(logQueue)
self.log.debug("DataDispatcher-" + str(self.id) + " started (PID " + str(os.getpid()) + ").")
self.currentPID = os.getpid()
self.log.debug("DataDispatcher-" + str(self.id) + " started (PID " + str(self.currentPID) + ").")
self.localhost = "127.0.0.1"
self.extIp = "0.0.0.0"
self.controlPort = controlPort
self.routerPort = routerPort
self.chunkSize = chunkSize
self.controlConId = controlConId
self.routerConId = routerConId
self.controlSocket = None
self.routerSocket = None
self.poller = None
self.chunkSize = chunkSize
self.fixedStreamId = fixedStreamId
self.localTarget = localTarget
self.dataFetcherProp = dataFetcherProp
self.log.debug("Configuration for dataFetcher: " + str(self.dataFetcherProp))
self.log.info("Configuration for dataFetcher: " + str(self.dataFetcherProp))
dataFetcher = self.dataFetcherProp["type"]
......@@ -106,26 +106,24 @@ class DataDispatcher():
def __createSockets (self):
# socket for control signals
self.controlSocket = self.context.socket(zmq.SUB)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.controlPort )
try:
self.controlSocket.connect(connectionStr)
self.log.info("Start controlSocket (connect): '" + str(connectionStr) + "'")
self.controlSocket = self.context.socket(zmq.SUB)
self.controlSocket.connect(self.controlConId)
self.log.info("Start controlSocket (connect): '" + self.controlConId + "'")
except:
self.log.error("Failed to start controlSocket (connect): '" + connectionStr + "'", exc_info=True)
self.log.error("Failed to start controlSocket (connect): '" + self.controlConId + "'", exc_info=True)
raise
self.controlSocket.setsockopt(zmq.SUBSCRIBE, "control")
self.controlSocket.setsockopt(zmq.SUBSCRIBE, "signal")
# socket to get new workloads from
self.routerSocket = self.context.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.routerPort )
try:
self.routerSocket.connect(connectionStr)
self.log.info("Start routerSocket (connect): '" + str(connectionStr) + "'")
self.routerSocket = self.context.socket(zmq.PULL)
self.routerSocket.connect(self.routerConId)
self.log.info("Start routerSocket (connect): '" + str(self.routerConId) + "'")
except:
self.log.error("Failed to start routerSocket (connect): '" + connectionStr + "'", exc_info=True)
self.log.error("Failed to start routerSocket (connect): '" + self.routerConId + "'", exc_info=True)
raise
self.poller = zmq.Poller()
......@@ -294,7 +292,7 @@ class DataDispatcher():
self.dataFetcher.clean(self.dataFetcherProp)
if not self.extContext and self.context:
self.log.debug("Destroying context")
self.log.info("Destroying context")
self.context.destroy(0)
self.context = None
......@@ -339,20 +337,29 @@ if __name__ == '__main__':
copyfile(sourceFile, targetFile)
time.sleep(0.5)
controlPort = "50005"
routerPort = "7000"
receivingPort = "6005"
localhost = "127.0.0.1"
extIp = "0.0.0.0"
controlPort = "50005"
routerPort = "7000"
controlConId = "tcp://{ip}:{port}".format(ip=localhost, port=controlPort)
routerConId = "tcp://{ip}:{port}".format(ip=localhost, port=routerPort )
receivingPort = "6005"
receivingPort2 = "6006"
chunkSize = 10485760 ; # = 1024*1024*10 = 10 MiB
localTarget = BASE_PATH + os.sep + "data" + os.sep + "target"
fixedStreamId = False
fixedStreamId = "localhost:6006"
chunkSize = 10485760 ; # = 1024*1024*10 = 10 MiB
localTarget = BASE_PATH + os.sep + "data" + os.sep + "target"
fixedStreamId = False
fixedStreamId = "localhost:6006"
logConfig = "test"
dataFetcherProp = {
"type" : "getFromFile",
"fixSubdirs" : ["commissioning", "current", "local"],
"storeData" : False,
"removeFlag" : False
}
......@@ -365,8 +372,7 @@ if __name__ == '__main__':
context = zmq.Context.instance()
# dataDispatcherPr = DataDispatcher( "0/1", routerPort, chunkSize, fixedStreamId, logQueue, localTarget, context)
dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, controlPort, routerPort, chunkSize, fixedStreamId, dataFetcherProp,
dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, controlConId, routerConId, chunkSize, fixedStreamId, dataFetcherProp,
logQueue, localTarget, context) )
dataDispatcherPr.start()
......
......@@ -104,7 +104,6 @@ def argumentParsing():
# for ZmqDetector:
eventPort = config.get('asection', 'eventPort')
# for HttpGetDetector:
prefix = config.get('asection', 'prefix')
detectorDevice = config.get('asection', 'detectorDevice')
filewriterDevice = config.get('asection', 'filewriterDevice')
......@@ -147,10 +146,6 @@ def argumentParsing():
(only needed if eventDetectorType is ZmqDetector; default=" + str(eventPort) + ")",
default = eventPort )
parser.add_argument("--prefix" , type = str,
help = "Supply a scan prefix. Otherwise the prefix is read from the tango server \
(only needed if eventDetectorType is HttpDetector; default=" + str(prefix) + ")",
default = prefix )
parser.add_argument("--detectorDevice" , type = str,
help = "Tango device proxy for the detector \
(only needed if eventDetectorType is HttpDetector; default=" + str(detectorDevice) + ")",
......@@ -281,6 +276,8 @@ class DataManager():
verbose = arguments.verbose
onScreen = arguments.onScreen
self.currentPID = os.getpid()
self.extLogQueue = False
if logQueue:
......@@ -308,21 +305,35 @@ class DataManager():
# Create log and set handler to queue handle
self.log = self.getLogger(self.logQueue)
self.log.info("DataManager started (PID " + str(os.getpid()) + ").")
self.log.info("DataManager started (PID " + str(self.currentPID) + ").")
signal.signal(signal.SIGTERM, self.signal_term_handler)
self.localhost = "127.0.0.1"
self.extIp = "0.0.0.0"
self.controlPort = arguments.controlPort
self.comPort = arguments.comPort
self.whitelist = arguments.whitelist
self.requestPort = arguments.requestPort
self.requestFwPort = arguments.requestFwPort
self.routerPort = arguments.routerPort
self.comConId = "tcp://{ip}:{port}".format(ip=self.extIp, port=arguments.comPort)
self.requestConId = "tcp://{ip}:{port}".format(ip=self.extIp, port=arguments.requestPort)
self.localhost = "127.0.0.1"
self.extHost = "0.0.0.0"
if helpers.isWindows():
self.log.info("Using tcp for internal communication.")
self.controlConId = "tcp://{ip}:{port}".format(ip=self.localhost, port=arguments.controlPort)
self.requestFwConId = "tcp://{ip}:{port}".format(ip=self.localhost, port=arguments.requestFwPort)
self.routerConId = "tcp://{ip}:{port}".format(ip=self.localhost, port=arguments.routerPort)
else:
self.log.info("Using ipc for internal communication.")
self.controlConId = "ipc://{pid}_{id}".format(pid=self.currentPID, id="control")
self.requestFwConId = "ipc://{pid}_{id}".format(pid=self.currentPID, id="requestFw")
self.routerConId = "ipc://{pid}_{id}".format(pid=self.currentPID, id="router")
self.whitelist = arguments.whitelist
if arguments.useDataStream:
self.fixedStreamId = "{host}:{port}".format( host=arguments.fixedStreamHost, port=arguments.fixedStreamPort )
......@@ -332,12 +343,10 @@ class DataManager():
self.numberOfStreams = arguments.numberOfStreams
self.chunkSize = arguments.chunkSize
self.routerPort = arguments.routerPort
self.localTarget = arguments.localTarget
# Assemble configuration for eventDetectorhelper.globalObject.
self.log.debug("Configured type of eventDetector: " + arguments.eventDetectorType)
self.log.info("Configured type of eventDetector: " + arguments.eventDetectorType)
if arguments.eventDetectorType == "InotifyxDetector":
self.eventDetectorConfig = {
"eventDetectorType" : arguments.eventDetectorType,
......@@ -367,7 +376,6 @@ class DataManager():
elif arguments.eventDetectorType == "HttpDetector":
self.eventDetectorConfig = {
"eventDetectorType" : arguments.eventDetectorType,
"prefix" : arguments.prefix,
"detectorDevice" : arguments.detectorDevice,
"filewriterDevice" : arguments.filewriterDevice,
"historySize" : arguments.historySize
......@@ -375,7 +383,7 @@ class DataManager():
# Assemble configuration for dataFetcher
self.log.debug("Configured Type of dataFetcher: " + arguments.dataFetcherType)
self.log.info("Configured Type of dataFetcher: " + arguments.dataFetcherType)
if arguments.dataFetcherType == "getFromFile":
self.dataFetcherProp = {
"type" : arguments.dataFetcherType,
......@@ -422,6 +430,7 @@ class DataManager():
except:
self.stop()
# Send all logs to the main process
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
......@@ -440,19 +449,18 @@ class DataManager():
def createSockets(self):
# socket for control signals
helpers.globalObjects.controlSocket = self.context.socket(zmq.PUB)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.controlPort )
try:
helpers.globalObjects.controlSocket.bind(connectionStr)
self.log.info("Start controlSocket (bind): '" + str(connectionStr) + "'")
helpers.globalObjects.controlSocket = self.context.socket(zmq.PUB)
helpers.globalObjects.controlSocket.bind(self.controlConId)
self.log.info("Start controlSocket (bind): '" + str(self.controlConId) + "'")
except:
self.log.error("Failed to start controlSocket (bind): '" + connectionStr + "'", exc_info=True)
self.log.error("Failed to start controlSocket (bind): '" + self.controlConId + "'", exc_info=True)
helpers.globalObjects.controlSocket = None
raise
def run (self):
self.signalHandlerPr = threading.Thread ( target = SignalHandler, args = (self.controlPort, self.whitelist, self.comPort, self.requestFwPort, self.requestPort, self.logQueue, self.context) )
self.signalHandlerPr = threading.Thread ( target = SignalHandler, args = (self.controlConId, self.whitelist, self.comConId, self.requestFwConId, self.requestConId, self.logQueue, self.context) )
self.signalHandlerPr.start()
# needed, because otherwise the requests for the first files are not forwarded properly
......@@ -461,12 +469,12 @@ class DataManager():
if not self.signalHandlerPr.is_alive():
return
self.taskProviderPr = Process ( target = TaskProvider, args = (self.eventDetectorConfig, self.controlPort, self.requestFwPort, self.routerPort, self.logQueue) )
self.taskProviderPr = Process ( target = TaskProvider, args = (self.eventDetectorConfig, self.controlConId, self.requestFwConId, self.routerConId, self.logQueue) )
self.taskProviderPr.start()
for i in range(self.numberOfStreams):
id = str(i) + "/" + str(self.numberOfStreams)
pr = Process ( target = DataDispatcher, args = (id, self.controlPort, self.routerPort, self.chunkSize, self.fixedStreamId, self.dataFetcherProp,
pr = Process ( target = DataDispatcher, args = (id, self.controlConId, self.routerConId, self.chunkSize, self.fixedStreamId, self.dataFetcherProp,
self.logQueue, self.localTarget) )
pr.start()
self.dataDispatcherPr.append(pr)
......@@ -501,12 +509,12 @@ class DataManager():
helpers.globalObjects.controlSocket = None
if self.context:
self.log.debug("Destroying context")
self.log.info("Destroying context")
self.context.destroy(0)
self.context = None
if not self.extLogQueue and self.logQueueListener:
self.log.debug("Stopping logQueue")
self.log.info("Stopping logQueue")
self.logQueue.put_nowait(None)
self.logQueueListener.stop()
self.logQueueListener = None
......
This diff is collapsed.
......@@ -32,7 +32,7 @@ import helpers
#
class TaskProvider():
def __init__ (self, eventDetectorConfig, controlPort, requestFwPort, routerPort, logQueue, context = None):
def __init__ (self, eventDetectorConfig, controlConId, requestFwConId, routerConId, logQueue, context = None):
global BASE_PATH
#eventDetectorConfig = {
......@@ -44,21 +44,21 @@ class TaskProvider():
#}
self.log = self.getLogger(logQueue)
self.log.debug("TaskProvider started (PID " + str(os.getpid()) + ").")
self.currentPID = os.getpid()
self.log.debug("TaskProvider started (PID " + str(self.currentPID) + ").")
self.dataDetectorModule = None
self.eventDetector = None
self.config = eventDetectorConfig
self.log.debug("Configuration for event detector: " + str(self.config))
self.log.info("Configuration for event detector: " + str(self.config))
eventDetectorModule = self.config["eventDetectorType"]
self.localhost = "127.0.0.1"
self.extIp = "0.0.0.0"
self.controlPort = controlPort
self.requestFwPort = requestFwPort
self.routerPort = routerPort
self.controlConId = controlConId
self.requestFwConId = requestFwConId
self.routerConId = routerConId
self.controlSocket = None
self.requestFwSocket = None
......@@ -66,12 +66,12 @@ class TaskProvider():
self.poller = None
self.log.debug("Registering ZMQ context")
# remember if the context was created outside this class or not
if context:
self.context = context
self.extContext = True
else:
self.log.info("Registering ZMQ context")
self.context = zmq.Context()
self.extContext = False
......@@ -90,7 +90,7 @@ class TaskProvider():
except KeyboardInterrupt:
pass
except:
self.log.info("Stopping TaskProvider due to unknown error condition.", exc_info=True)
self.log.error("Stopping TaskProvider due to unknown error condition.", exc_info=True)