Skip to content
Snippets Groups Projects
Commit 2dd52ea2 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added Exceptions if bind/connect failed

parent 016ab612
No related branches found
No related tags found
No related merge requests found
......@@ -16,20 +16,20 @@ import threading
# -------------------------- class: FileReceiver --------------------------------------
#
class FileReceiver:
zmqContext = None
outputDir = None
zmqDataStreamIp = None
zmqDataStreamPort = None
log = None
zmqContext = None
outputDir = None
dataStreamIp = None
dataStreamPort = None
log = None
# sockets
zmqDataStreamSocket = None # socket to receive the data from
dataStreamSocket = None # socket to receive the data from
def __init__(self, outputDir, zmqDataStreamIp, zmqDataStreamPort, context = None):
def __init__(self, outputDir, dataStreamIp, dataStreamPort, context = None):
self.outputDir = os.path.normpath(outputDir)
self.zmqDataStreamIp = zmqDataStreamIp
self.zmqDataStreamPort = zmqDataStreamPort
self.outputDir = os.path.normpath(outputDir)
self.dataStreamIp = dataStreamIp
self.dataStreamPort = dataStreamPort
# if context:
# assert isinstance(context, zmq.sugar.context.Context)
......@@ -39,11 +39,7 @@ class FileReceiver:
self.log = self.getLogger()
self.log.debug("Init")
# create pull socket
self.zmqDataStreamSocket = self.zmqContext.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format(ip=self.zmqDataStreamIp, port=self.zmqDataStreamPort)
self.zmqDataStreamSocket.bind(connectionStr)
self.log.info("zmqDataStreamSocket started (bind) for '" + connectionStr + "'")
self.createSockets()
try:
self.log.info("Start receiving new files")
......@@ -66,11 +62,23 @@ class FileReceiver:
return logger
def combineMessage(self, zmqDataStreamSocket):
def createSockets(self):
# create pull socket
self.dataStreamSocket = self.zmqContext.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format(ip=self.dataStreamIp, port=self.dataStreamPort)
try:
self.dataStreamSocket.bind(connectionStr)
self.log.info("dataStreamSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
def combineMessage(self, dataStreamSocket):
receivingMessages = True
#save all chunks to file
while receivingMessages:
multipartMessage = zmqDataStreamSocket.recv_multipart()
multipartMessage = dataStreamSocket.recv_multipart()
#extract multipart message
try:
......@@ -119,7 +127,7 @@ class FileReceiver:
while continueReceiving:
try:
self.combineMessage(self.zmqDataStreamSocket)
self.combineMessage(self.dataStreamSocket)
except KeyboardInterrupt:
self.log.debug("Keyboard interrupt detected. Stop receiving.")
continueReceiving = False
......@@ -131,7 +139,7 @@ class FileReceiver:
self.log.info("shutting down receiver...")
try:
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext)
self.stopReceiving(self.dataStreamSocket, self.zmqContext)
self.log.debug("shutting down receiver...done.")
except:
self.log.error(sys.exc_info())
......@@ -228,14 +236,14 @@ class FileReceiver:
raise Exception(errorMessage)
def stopReceiving(self, zmqDataStreamSocket, zmqContext, sendToSender = True):
def stopReceiving(self, dataStreamSocket, zmqContext, sendToSender = True):
self.log.debug("stopReceiving...")
try:
zmqDataStreamSocket.close(0)
self.log.debug("closing zmqDataStreamSocket...done.")
dataStreamSocket.close(0)
self.log.debug("closing dataStreamSocket...done.")
except:
self.log.error("closing zmqDataStreamSocket...failed.")
self.log.error("closing dataStreamSocket...failed.")
self.log.error(sys.exc_info())
try:
......
......@@ -66,11 +66,7 @@ class DirectoryWatcher():
# assert isinstance(self.zmqContext, zmq.sugar.context.Context)
#create zmq sockets
self.messageSocket = self.zmqContext.socket(zmq.PUSH)
zmqSocketStr = "tcp://" + self.fileEventIp + ":" + str(self.fileEventPort)
self.messageSocket.connect(zmqSocketStr)
self.log.debug("Connecting to ZMQ socket: " + str(zmqSocketStr))
self.createSockets()
self.process()
......@@ -80,6 +76,18 @@ class DirectoryWatcher():
return logger
def createSockets(self):
#create zmq socket
self.messageSocket = self.zmqContext.socket(zmq.PUSH)
connectionStr = "tcp://" + self.fileEventIp + ":" + str(self.fileEventPort)
try:
self.messageSocket.connect(connectionStr)
self.log.debug("Connecting to ZMQ socket: " + str(connectionStr))
except Exception as e:
self.log.error("Failed to start ZMQ Socket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
def getDirectoryStructure(self):
# Add the default subdirs
dirsToWalk = [self.watchDir + os.sep + directory for directory in self.monitoredDefaultSubdirs]
......
......@@ -85,6 +85,15 @@ class FileMover():
self.log = self.getLogger()
self.log.debug("Init")
self.createSockets()
def getLogger(self):
logger = logging.getLogger("fileMover")
return logger
def createSockets(self):
# create zmq socket for incoming file events
self.fileEventSocket = self.zmqContext.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format(ip=self.fileEventIp, port=self.fileEventPort)
......@@ -135,11 +144,6 @@ class FileMover():
self.log.debug("Error was: " + str(trace))
def getLogger(self):
logger = logging.getLogger("fileMover")
return logger
def startReceiving(self):
self.log.debug("new message-socket crated for: new file events.")
......
......@@ -63,9 +63,24 @@ class WorkerProcess():
self.log = self.getLogger()
self.log.debug("new workerProcess started. id=" + str(self.id))
self.createSockets()
try:
self.process()
except KeyboardInterrupt:
# trace = traceback.format_exc()
self.log.debug("KeyboardInterrupt detected. Shutting down workerProcess " + str(self.id) + ".")
except:
trace = traceback.format_exc()
self.log.error("Stopping workerProcess due to unknown error condition.")
self.log.debug("Error was: " + str(trace))
finally:
self.stop()
def createSockets(self):
if self.useDataStream:
self.dataStreamSocket = self.zmqContextForWorker.socket(zmq.PUSH)
connectionStr = "tcp://{ip}:{port}".format(ip=self.dataStreamIp, port=self.dataStreamPort)
......@@ -78,15 +93,23 @@ class WorkerProcess():
self.routerSocket = self.zmqContextForWorker.socket(zmq.REQ)
self.routerSocket.identity = u"worker-{ID}".format(ID=self.id).encode("ascii")
connectionStrRouterSocket = "tcp://{ip}:{port}".format(ip=routerIp, port=routerPort)
self.routerSocket.connect(connectionStrRouterSocket)
self.log.debug("routerSocket started (connect) for '" + connectionStrRouterSocket + "'")
connectionStr = "tcp://{ip}:{port}".format(ip=routerIp, port=routerPort)
try:
self.routerSocket.connect(connectionStr)
self.log.debug("routerSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start routerSocket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
#init Cleaner message-pipe
self.cleanerSocket = self.zmqContextForWorker.socket(zmq.PUSH)
connectionStrCleanerSocket = "tcp://{ip}:{port}".format(ip=self.cleanerIp, port=self.cleanerPort)
self.cleanerSocket.connect(connectionStrCleanerSocket)
self.log.debug("cleanerSocket started (connect) for '" + connectionStrCleanerSocket + "'")
try:
self.cleanerSocket.connect(connectionStrCleanerSocket)
self.log.debug("cleanerSocket started (connect) for '" + connectionStrCleanerSocket + "'")
except Exception as e:
self.log.error("Failed to start cleanerSocket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
# Poller to get either messages from the watcher or communication messages to stop sending data to the live viewer
self.poller = zmq.Poller()
......@@ -94,18 +117,6 @@ class WorkerProcess():
self.poller.register(self.routerSocket, zmq.POLLIN)
try:
self.process()
except KeyboardInterrupt:
# trace = traceback.format_exc()
self.log.debug("KeyboardInterrupt detected. Shutting down workerProcess " + str(self.id) + ".")
except:
trace = traceback.format_exc()
self.log.error("Stopping workerProcess due to unknown error condition.")
self.log.debug("Error was: " + str(trace))
finally:
self.stop()
def process(self):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment