diff --git a/ZeroMQTunnel/watcher_lsyncd.py b/ZeroMQTunnel/watcher_lsyncd.py index a9eb644f229ea46635bc267664f71629d508fbc6..de7efeb0625c193875583a9991a148e6dffe0e29 100644 --- a/ZeroMQTunnel/watcher_lsyncd.py +++ b/ZeroMQTunnel/watcher_lsyncd.py @@ -35,19 +35,11 @@ class DirectoryWatcherHandler(): self.messageSocket = self.createPushSocket(self.globalZmqContext, fileEventServerPort) - def getZmqSocket_Push(self, context): - pattern = zmq.PUSH - assert isinstance(context, zmq.sugar.context.Context) - socket = context.socket(pattern) - - return socket - - def createPushSocket(self, context, fileEventServerPort): assert isinstance(context, zmq.sugar.context.Context) - socket = self.getZmqSocket_Push(context) + socket = context.socket(zmq.PUSH) zmqSocketStr = 'tcp://' + self.fileEventServerIp + ':' + str(fileEventServerPort) logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr)) @@ -64,6 +56,8 @@ class DirectoryWatcherHandler(): try: self.sendFilesystemEventToMessagePipe(filepath, self.messageSocket, targetPath) + except KeyboardInterrupt: + logging.info("Keyboard interruption detected. Stop passing file to zmq.") except Exception, e: logging.error("Unable to process file '" + str(filepath) + "'") logging.warning("Skip file '" + str(filepath) + "'. Reason was: " + str(e)) @@ -137,11 +131,15 @@ class DirectoryWatcherHandler(): logging.debug(str(messageDictJson)) targetSocket.send(messageDictJson) logging.info("Sending message...done.") + except KeyboardInterrupt: + logging.error("Sending message...failed because of KeyboardInterrupt.") except Exception, e: logging.error("Sending message...failed.") logging.debug("Error was: " + str(e)) raise Exception(e) + def shuttingDown(self): + self.messageSocket.close() def getDefaultConfig_logfilePath(): @@ -341,7 +339,6 @@ if __name__ == '__main__': #create zmq context - global zmqContext zmqContext = zmq.Context() @@ -376,43 +373,40 @@ if __name__ == '__main__': logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr)) workers.bind(zmqSocketStr) - while True: - #waiting for new jobs - try: + try: + while True: + #waiting for new jobs workload = workers.recv() - except KeyboardInterrupt: - break + #transform to dictionary + try: + workloadDict = json.loads(str(workload)) + except: + errorMessage = "invalid job received. skipping job" + logging.error(errorMessage) + logging.debug("workload=" + str(workload)) + continue + + #extract fileEvent metadata + try: + #TODO validate fileEventMessageDict dict + filepath = workloadDict["filepath"] + targetPath = workloadDict["targetPath"] + logging.info("Received message: filepath: " + str(filepath) + ", targetPath: " + str(targetPath)) + except Exception, e: + errorMessage = "Invalid fileEvent message received." + logging.error(errorMessage) + logging.debug("Error was: " + str(e)) + logging.debug("workloadDict=" + str(workloadDict)) + #skip all further instructions and continue with next iteration + continue + + # send the file to the fileMover + directoryWatcher.passFileToZeromq(filepath, targetPath) + except KeyboardInterrupt: + logging.info("Keyboard interruption detected. Shuting down") - #transform to dictionary - try: - workloadDict = json.loads(str(workload)) - except: - errorMessage = "invalid job received. skipping job" - logging.error(errorMessage) - logging.debug("workload=" + str(workload)) - continue - - #extract fileEvent metadata - try: - #TODO validate fileEventMessageDict dict - filepath = workloadDict["filepath"] - targetPath = workloadDict["targetPath"] - logging.info("Received message: filepath: " + str(filepath) + ", targetPath: " + str(targetPath)) - except Exception, e: - errorMessage = "Invalid fileEvent message received." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.debug("workloadDict=" + str(workloadDict)) - #skip all further instructions and continue with next iteration - continue - - # send the file to the fileMover - directoryWatcher.passFileToZeromq(filepath, targetPath) - - - # We never get here but clean up anyhow workers.close() - + directoryWatcher.shuttingDown() zmqContext.destroy() diff --git a/wrapper_script.py b/wrapper_script.py index 3bcf363cd0e89b112da29b920cd7c39da6de6ee2..86d546f1a7cf24f66a60968acf62d2ecd81fc677 100644 --- a/wrapper_script.py +++ b/wrapper_script.py @@ -75,7 +75,11 @@ if supported_file: #send reply back to server workload = { "filepath": source, "targetPath": target } workload_json = json.dumps(workload) - socket.send(workload_json) + try: + socket.send(workload_json) + except: + logging.debug( "Could not send message to ZMQ: " + str(workload)) + logging.debug( "Send message to ZMQ: " + str(workload)) # my_cmd = 'echo "' + source + '" > /tmp/zeromqllpipe' @@ -89,8 +93,7 @@ if supported_file: # stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, # universal_newlines = False ) #out, err = p.communicate() - # We never get here but clean up anyhow + # We never get here but clean up anyhow socket.close() zmqContext.destroy() -