Skip to content
Snippets Groups Projects
Commit 16b00e9b authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Close sockets properly in watcher

parent 5f737188
No related branches found
No related tags found
No related merge requests found
......@@ -35,19 +35,11 @@ class DirectoryWatcherHandler():
self.messageSocket = self.createPushSocket(self.globalZmqContext, fileEventServerPort)
def getZmqSocket_Push(self, context):
pattern = zmq.PUSH
assert isinstance(context, zmq.sugar.context.Context)
socket = context.socket(pattern)
return socket
def createPushSocket(self, context, fileEventServerPort):
assert isinstance(context, zmq.sugar.context.Context)
socket = self.getZmqSocket_Push(context)
socket = context.socket(zmq.PUSH)
zmqSocketStr = 'tcp://' + self.fileEventServerIp + ':' + str(fileEventServerPort)
logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr))
......@@ -64,6 +56,8 @@ class DirectoryWatcherHandler():
try:
self.sendFilesystemEventToMessagePipe(filepath, self.messageSocket, targetPath)
except KeyboardInterrupt:
logging.info("Keyboard interruption detected. Stop passing file to zmq.")
except Exception, e:
logging.error("Unable to process file '" + str(filepath) + "'")
logging.warning("Skip file '" + str(filepath) + "'. Reason was: " + str(e))
......@@ -137,11 +131,15 @@ class DirectoryWatcherHandler():
logging.debug(str(messageDictJson))
targetSocket.send(messageDictJson)
logging.info("Sending message...done.")
except KeyboardInterrupt:
logging.error("Sending message...failed because of KeyboardInterrupt.")
except Exception, e:
logging.error("Sending message...failed.")
logging.debug("Error was: " + str(e))
raise Exception(e)
def shuttingDown(self):
self.messageSocket.close()
def getDefaultConfig_logfilePath():
......@@ -341,7 +339,6 @@ if __name__ == '__main__':
#create zmq context
global zmqContext
zmqContext = zmq.Context()
......@@ -376,43 +373,40 @@ if __name__ == '__main__':
logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr))
workers.bind(zmqSocketStr)
while True:
#waiting for new jobs
try:
try:
while True:
#waiting for new jobs
workload = workers.recv()
except KeyboardInterrupt:
break
#transform to dictionary
try:
workloadDict = json.loads(str(workload))
except:
errorMessage = "invalid job received. skipping job"
logging.error(errorMessage)
logging.debug("workload=" + str(workload))
continue
#extract fileEvent metadata
try:
#TODO validate fileEventMessageDict dict
filepath = workloadDict["filepath"]
targetPath = workloadDict["targetPath"]
logging.info("Received message: filepath: " + str(filepath) + ", targetPath: " + str(targetPath))
except Exception, e:
errorMessage = "Invalid fileEvent message received."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.debug("workloadDict=" + str(workloadDict))
#skip all further instructions and continue with next iteration
continue
# send the file to the fileMover
directoryWatcher.passFileToZeromq(filepath, targetPath)
except KeyboardInterrupt:
logging.info("Keyboard interruption detected. Shuting down")
#transform to dictionary
try:
workloadDict = json.loads(str(workload))
except:
errorMessage = "invalid job received. skipping job"
logging.error(errorMessage)
logging.debug("workload=" + str(workload))
continue
#extract fileEvent metadata
try:
#TODO validate fileEventMessageDict dict
filepath = workloadDict["filepath"]
targetPath = workloadDict["targetPath"]
logging.info("Received message: filepath: " + str(filepath) + ", targetPath: " + str(targetPath))
except Exception, e:
errorMessage = "Invalid fileEvent message received."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.debug("workloadDict=" + str(workloadDict))
#skip all further instructions and continue with next iteration
continue
# send the file to the fileMover
directoryWatcher.passFileToZeromq(filepath, targetPath)
# We never get here but clean up anyhow
workers.close()
directoryWatcher.shuttingDown()
zmqContext.destroy()
......@@ -75,7 +75,11 @@ if supported_file:
#send reply back to server
workload = { "filepath": source, "targetPath": target }
workload_json = json.dumps(workload)
socket.send(workload_json)
try:
socket.send(workload_json)
except:
logging.debug( "Could not send message to ZMQ: " + str(workload))
logging.debug( "Send message to ZMQ: " + str(workload))
# my_cmd = 'echo "' + source + '" > /tmp/zeromqllpipe'
......@@ -89,8 +93,7 @@ if supported_file:
# stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE,
# universal_newlines = False )
#out, err = p.communicate()
# We never get here but clean up anyhow
# We never get here but clean up anyhow
socket.close()
zmqContext.destroy()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment