diff --git a/ZeroMQTunnel/fileMover.py b/ZeroMQTunnel/fileMover.py index ce1cd744c715f44c8da0e02c0d63833d675382ed..17dd869a506e3bb0e0d909049e8338831bb246a2 100644 --- a/ZeroMQTunnel/fileMover.py +++ b/ZeroMQTunnel/fileMover.py @@ -15,8 +15,12 @@ from multiprocessing import Process, freeze_support import subprocess import json import shutil +import helperScript + DEFAULT_CHUNK_SIZE = 1048576 + + # # -------------------------- class: WorkerProcess -------------------------------------- # @@ -24,37 +28,74 @@ class WorkerProcess(): id = None dataStreamIp = None dataStreamPort = None - logfileFullPath = None zmqContextForWorker = None zmqMessageChunkSize = None fileWaitTime_inMs = None fileMaxWaitTime_InMs = None - cleaner_url = "inproc://workers" + zmqCleanerIp = None # responsable to delete files + zmqCleanerPort = None # responsable to delete files zmqDataStreamSocket = None + routerSocket = None + cleanerSocket = None - def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize, - fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0): + # to get the logging only handling this class + log = None + + def __init__(self, id, dataStreamIp, dataStreamPort, chunkSize, zmqCleanerIp, zmqCleanerPort, + fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0, + context = None): self.id = id self.dataStreamIp = dataStreamIp self.dataStreamPort = dataStreamPort - self.logfileFullPath = logfileFullPath self.zmqMessageChunkSize = chunkSize + self.zmqCleanerIp = zmqCleanerIp + self.zmqCleanerPort = zmqCleanerPort self.fileWaitTime_inMs = fileWaitTimeInMs self.fileMaxWaitTime_InMs = fileMaxWaitTimeInMs - self.initLogging(logfileFullPath) + #initialize router + self.zmqContextForWorker = context or zmq.Context() + + self.log = self.getLogger() + + + dataStreamIp = self.dataStreamIp + dataStreamPort = self.dataStreamPort + + self.log.debug("new workerThread started. id=" + str(self.id)) + + # initialize sockets + self.zmqDataStreamSocket = self.zmqContextForWorker.socket(zmq.PUSH) + connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=dataStreamIp, port=self.dataStreamPort) + self.zmqDataStreamSocket.connect(connectionStrDataStreamSocket) + self.log.debug("zmqDataStreamSocket started for '" + connectionStrDataStreamSocket + "'") + + self.routerSocket = self.zmqContextForWorker.socket(zmq.REQ) + self.routerSocket.identity = u"worker-{ID}".format(ID=self.id).encode("ascii") + connectionStrRouterSocket = "tcp://{ip}:{port}".format(ip="127.0.0.1", port="50000") + self.routerSocket.connect(connectionStrRouterSocket) + self.log.debug("routerSocket started for '" + connectionStrRouterSocket + "'") + + #init Cleaner message-pipe + self.cleanerSocket = self.zmqContextForWorker.socket(zmq.PUSH) + connectionStrCleanerSocket = "tcp://{ip}:{port}".format(ip=self.zmqCleanerIp, port=self.zmqCleanerPort) + self.cleanerSocket.connect(connectionStrCleanerSocket) try: self.process() except KeyboardInterrupt: # trace = traceback.format_exc() - logging.debug("KeyboardInterrupt detected. Shutting down workerProcess.") - self.zmqDataStreamSocket.close() - self.zmqContextForWorker.destroy() + self.log.debug("KeyboardInterrupt detected. Shutting down workerProcess.") else: trace = traceback.format_exc() - logging.error("Stopping workerProcess due to unknown error condition.") - logging.debug("Error was: " + str(trace)) + self.log.error("Stopping workerProcess due to unknown error condition.") + self.log.debug("Error was: " + str(trace)) + + self.log.info("Closing sockets") + self.zmqDataStreamSocket.close(0) + self.routerSocket.close(0) + self.cleanerSocket.close(0) + self.zmqContextForWorker.destroy() def process(self): @@ -78,48 +119,25 @@ class WorkerProcess(): a separate data-messagePipe. Afterwards the original file will be removed. """ - id = self.id - dataStreamIp = self.dataStreamIp - dataStreamPort = self.dataStreamPort - logging.debug("new workerThread started. id=" + str(id)) - - #initialize router - zmqContextForWorker = zmq.Context() - self.zmqContextForWorker = zmqContextForWorker - - self.zmqDataStreamSocket = zmqContextForWorker.socket(zmq.PUSH) - connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=dataStreamIp, port=dataStreamPort) - self.zmqDataStreamSocket.connect(connectionStrDataStreamSocket) - - routerSocket = zmqContextForWorker.socket(zmq.REQ) - routerSocket.identity = u"worker-{ID}".format(ID=id).encode("ascii") - connectionStrRouterSocket = "tcp://{ip}:{port}".format(ip="127.0.0.1", port="50000") - routerSocket.connect(connectionStrRouterSocket) processingJobs = True jobCount = 0 - #init Cleaner message-pipe - cleanerSocket = zmqContextForWorker.socket(zmq.PUSH) - connectionStringCleanerSocket = self.cleaner_url - cleanerSocket.connect(connectionStringCleanerSocket) - - while processingJobs: #sending a "ready"-signal to the router. #the reply will contain the actual job/task. - logging.debug("worker-"+str(id)+": sending ready signal") + self.log.debug("worker-"+str(self.id)+": sending ready signal") - routerSocket.send(b"READY") + self.routerSocket.send(b"READY") # Get workload from router, until finished - logging.debug("worker-"+str(id)+": waiting for new job") - workload = routerSocket.recv() - logging.debug("worker-"+str(id)+": new job received") + self.log.debug("worker-"+str(self.id)+": waiting for new job") + workload = self.routerSocket.recv() + self.log.debug("worker-"+str(self.id)+": new job received") finished = workload == b"END" if finished: processingJobs = False - logging.debug("router requested to shutdown worker-thread. Worker processed: %d files" % jobCount) + self.log.debug("router requested to shutdown worker-thread. Worker processed: %d files" % jobCount) break jobCount += 1 @@ -127,11 +145,11 @@ class WorkerProcess(): fileEventMessageDict = None try: fileEventMessageDict = json.loads(str(workload)) - logging.debug("str(messageDict) = " + str(fileEventMessageDict) + " type(messageDict) = " + str(type(fileEventMessageDict))) + self.log.debug("str(messageDict) = " + str(fileEventMessageDict) + " type(messageDict) = " + str(type(fileEventMessageDict))) except Exception, e: errorMessage = "Unable to convert message into a dictionary." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) #extract fileEvent metadata @@ -142,22 +160,22 @@ class WorkerProcess(): relativeParent = fileEventMessageDict["relativeParent"] except Exception, e: errorMessage = "Invalid fileEvent message received." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.debug("fileEventMessageDict=" + str(fileEventMessageDict)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) + self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict)) #skip all further instructions and continue with next iteration continue #passing file to data-messagPipe try: - logging.debug("worker-" + str(id) + ": passing new file to data-messagePipe...") + self.log.debug("worker-" + str(id) + ": passing new file to data-messagePipe...") self.passFileToDataStream(filename, sourcePath, relativeParent) - logging.debug("worker-" + str(id) + ": passing new file to data-messagePipe...success.") + self.log.debug("worker-" + str(id) + ": passing new file to data-messagePipe...success.") except Exception, e: errorMessage = "Unable to pass new file to data-messagePipe." - logging.error(errorMessage) - logging.error("Error was: " + str(e)) - logging.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.") + self.log.error(errorMessage) + self.log.error("Error was: " + str(e)) + self.log.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.") #skip all further instructions and continue with next iteration continue @@ -165,18 +183,21 @@ class WorkerProcess(): #send remove-request to message pipe try: #sending to pipe - logging.debug("send file-event to cleaner-pipe...") - cleanerSocket.send(workload) - logging.debug("send file-event to cleaner-pipe...success.") + self.log.debug("send file-event to cleaner-pipe...") + self.cleanerSocket.send(workload) + self.log.debug("send file-event to cleaner-pipe...success.") #TODO: remember workload. append to list? # can be used to verify files which have been processed twice or more except Exception, e: errorMessage = "Unable to notify Cleaner-pipe to delete file: " + str(filename) - logging.error(errorMessage) - logging.debug("fileEventMessageDict=" + str(fileEventMessageDict)) + self.log.error(errorMessage) + self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict)) + def getLogger(self): + logger = logging.getLogger("workerProcess") + return logger def getFileWaitTimeInMs(self): @@ -208,47 +229,47 @@ class WorkerProcess(): while fileIsStillInUse: #skip waiting periode if waiting to long for file to get closed if time.time() - timeStartWaiting >= (fileMaxWaitTimeInMs / 1000): - logging.debug("waited to long for file getting closed. aborting") + self.log.debug("waited to long for file getting closed. aborting") break #wait for other process to finish file access #grabs time when file was modified last statInfo = os.stat(sourceFilePathFull) fileLastModified = statInfo.st_mtime - logging.debug("'" + str(sourceFilePathFull) + "' modified last: " + str(fileLastModified)) + self.log.debug("'" + str(sourceFilePathFull) + "' modified last: " + str(fileLastModified)) timeNow = time.time() timeDiff = timeNow - fileLastModified - logging.debug("timeNow=" + str(timeNow) + " timeDiff=" + str(timeDiff)) + self.log.debug("timeNow=" + str(timeNow) + " timeDiff=" + str(timeDiff)) waitTimeInSeconds = fileWaitTimeInMs/1000 if timeDiff >= waitTimeInSeconds: fileIsStillInUse = False - logging.debug("File was not modified within past " + str(fileWaitTimeInMs) + "ms.") + self.log.debug("File was not modified within past " + str(fileWaitTimeInMs) + "ms.") else: - logging.debug("still waiting for file to get closed...") + self.log.debug("still waiting for file to get closed...") time.sleep(fileWaitTimeInMs / 1000 ) #for quick testing set filesize of file as chunksize - logging.debug("get filesize for '" + str(sourceFilePathFull) + "'...") + self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...") filesize = os.path.getsize(sourceFilePathFull) fileModificationTime = os.stat(sourceFilePathFull).st_mtime chunksize = filesize #can be used later on to split multipart message - logging.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize))) - logging.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime))) + self.log.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize))) + self.log.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime))) except Exception, e: errorMessage = "Unable to get file metadata for '" + str(sourceFilePathFull) + "'." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) raise Exception(e) try: - logging.debug("opening '" + str(sourceFilePathFull) + "'...") + self.log.debug("opening '" + str(sourceFilePathFull) + "'...") fileDescriptor = open(str(sourceFilePathFull), "rb") except Exception, e: errorMessage = "Unable to read source file '" + str(sourceFilePathFull) + "'." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) raise Exception(e) @@ -257,14 +278,14 @@ class WorkerProcess(): payloadMetadata = self.buildPayloadMetadata(filename, filesize, fileModificationTime, sourcePath, relativeParent) except Exception, e: errorMessage = "Unable to assemble multi-part message." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) raise Exception(e) #send message try: - logging.debug("Passing multipart-message...") + self.log.debug("Passing multipart-message...") chunkNumber = 0 stillChunksToRead = True while stillChunksToRead: @@ -279,6 +300,7 @@ class WorkerProcess(): #as chunk is empty decrease chunck-counter chunkNumber -= 1 + break #assemble metadata for zmq-message chunkPayloadMetadata = payloadMetadata.copy() @@ -295,19 +317,18 @@ class WorkerProcess(): fileDescriptor.close() # self.zmqDataStreamSocket.send_multipart(multipartMessage) - logging.debug("Passing multipart-message...done.") + self.log.debug("Passing multipart-message...done.") except Exception, e: - logging.error("Unable to send multipart-message") - logging.debug("Error was: " + str(e)) - logging.info("Passing multipart-message...failed.") + self.log.error("Unable to send multipart-message") + self.log.debug("Error was: " + str(e)) + self.log.info("Passing multipart-message...failed.") raise Exception(e) - def appendFileChunksToPayload(self, payload, sourceFilePathFull, fileDescriptor, chunkSize): try: # chunksize = 16777216 #16MB - logging.debug("reading file '" + str(sourceFilePathFull)+ "' to memory") + self.log.debug("reading file '" + str(sourceFilePathFull)+ "' to memory") # FIXME: chunk is read-out as str. why not as bin? will probably add to much overhead to zmq-message fileContentAsByteObject = fileDescriptor.read(chunkSize) @@ -332,7 +353,7 @@ class WorkerProcess(): """ #add metadata to multipart - logging.debug("create metadata for source file...") + self.log.debug("create metadata for source file...") metadataDict = { "filename" : filename, "filesize" : filesize, @@ -341,8 +362,7 @@ class WorkerProcess(): "relativeParent" : relativeParent, "chunkSize" : self.getChunkSize()} - logging.debug("metadataDict = " + str(metadataDict)) - + self.log.debug("metadataDict = " + str(metadataDict)) return metadataDict @@ -361,31 +381,13 @@ class WorkerProcess(): # print "{number:.{digits}f}".format(number=freeUserSpaceLeft_percent, digits=0) # print int(freeUserSpaceLeft_percent) - logging.debug("vfsstat: freeSpaceAvailableForUser=" + str(freeSpaceAvailableForUser_gigabytes)+ " Gigabytes " + self.log.debug("vfsstat: freeSpaceAvailableForUser=" + str(freeSpaceAvailableForUser_gigabytes)+ " Gigabytes " + " (" + str(int(freeUserSpaceLeft_percent)) + "% free disk space left)") #warn if disk space is running low highWaterMark = 85 if int(freeUserSpaceLeft_percent) >= int(highWaterMark): - logging.warning("Running low in disk space! " + str(int(freeUserSpaceLeft_percent)) + "% free disk space left.") - - - def initLogging(self, filenameFullPath): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - #log everything to file - logging.basicConfig(level=logging.DEBUG, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d_%H:%M', - filename=filenameFullPath, - filemode="a") - - #log info to stdout, display messages with different format than the file output - console = logging.StreamHandler() - console.setLevel(logging.INFO) - formatter = logging.Formatter("%(asctime)s > %(message)s") - console.setFormatter(formatter) - logging.getLogger("").addHandler(console) + self.log.warning("Running low in disk space! " + str(int(freeUserSpaceLeft_percent)) + "% free disk space left.") @@ -393,65 +395,80 @@ class WorkerProcess(): # -------------------------- class: FileMover -------------------------------------- # class FileMover(): - patterns = ["*"] - fileList_newFiles = list() - fileCount_newFiles = 0 zmqContext = None - messageSocket = None # to receiver fileMove-jobs as json-encoded dictionary - dataSocket = None # to send fileObject as multipart message bindingIpForSocket = None zqmFileEventServerIp = "127.0.0.1" # serverIp for incoming messages tcpPort_messageStream = "6060" dataStreamIp = "127.0.0.1" # ip of dataStream-socket to push new files to dataStreamPort = "6061" # port number of dataStream-socket to push new files to + zmqCleanerIp = "127.0.0.1" # zmq pull endpoint, responsable to delete files + zmqCleanerPort = "6062" # zmq pull endpoint, responsable to delete files fileWaitTimeInMs = None fileMaxWaitTimeInMs = None - pipe_name = "/tmp/zeromqllpipe_resp" - - currentZmqDataStreamSocketListIndex = None # Index-Number of a socket used to send datafiles to - logfileFullPath = None - chunkSize = None + parallelDataStreams = None + chunkSize = None + # sockets + messageSocket = None # to receiver fileMove-jobs as json-encoded dictionary + routerSocket = None - def process(self): - try: - self.startReceiving() - except zmq.error.ZMQError as e: - logging.error("ZMQError: "+ str(e)) - except KeyboardInterrupt: - logging.debug("KeyboardInterrupt detected. Shutting down fileMover.") - logging.info("Shutting down fileMover as KeyboardInterrupt was detected.") - self.zmqContext.destroy() - except: - trace = traceback.format_exc() - logging.info("Stopping fileMover due to unknown error condition.") - logging.debug("Error was: " + str(trace)) - + # to get the logging only handling this class + log = None def __init__(self, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams, - logfileFullPath, chunkSize, - fileWaitTimeInMs, fileMaxWaitTimeInMs): - logging.info("registering zmq global context") + chunkSize, zmqCleanerIp, zmqCleanerPort, + fileWaitTimeInMs, fileMaxWaitTimeInMs, + context = None): - #create zmq context - zmqContext = zmq.Context() + assert isinstance(context, zmq.sugar.context.Context) - self.zmqContext = zmqContext + self.zmqContext = context or zmq.Context() self.bindingIpForSocket = bindingIpForSocket self.tcpPort_messageStream = bindingPortForSocket self.dataStreamIp = dataStreamIp self.dataStreamPort = dataStreamPort self.parallelDataStreams = parallelDataStreams - self.logfileFullPath = logfileFullPath self.chunkSize = chunkSize + self.zmqCleanerIp = zmqCleanerIp + self.zmqCleanerPort = zmqCleanerPort self.fileWaitTimeInMs = fileWaitTimeInMs self.fileMaxWaitTimeInMs = fileMaxWaitTimeInMs + self.log = self.getLogger() + self.log.debug("Init") + #create zmq sockets. one for incoming file events, one for passing fileObjects to - self.messageSocket = self.getZmqSocket_Pull(self.zmqContext) - self.dataSocket = self.getZmqSocket_Push(self.zmqContext) + self.messageSocket = self.zmqContext.socket(zmq.PULL) + connectionStrMessageSocket = "tcp://" + self.bindingIpForSocket + ":%s" % self.tcpPort_messageStream + self.messageSocket.bind(connectionStrMessageSocket) + self.log.debug("messageSocket started for '" + connectionStrMessageSocket + "'") + + #setting up router for load-balancing worker-threads. + #each worker-thread will handle a file event + self.routerSocket = self.zmqContext.socket(zmq.ROUTER) + connectionStrRouterSocket = "tcp://127.0.0.1:50000" + self.routerSocket.bind(connectionStrRouterSocket) + self.log.debug("routerSocket started for '" + connectionStrRouterSocket + "'") + + + def process(self): + try: + self.startReceiving() + except zmq.error.ZMQError as e: + self.log.error("ZMQError: "+ str(e)) + self.log.debug("Shutting down workerProcess.") + except: + trace = traceback.format_exc() + self.log.info("Stopping fileMover due to unknown error condition.") + self.log.debug("Error was: " + str(trace)) + + + + def getLogger(self): + logger = logging.getLogger("fileMover") + return logger def getFileWaitTimeInMs(self): @@ -463,157 +480,87 @@ class FileMover(): def startReceiving(self): - #create socket - zmqContext = self.zmqContext - zmqSocketForNewFileEvents = self.createPullSocket() - logging.debug("new message-socket crated for: new file events.") + self.log.debug("new message-socket crated for: new file events.") parallelDataStreams = int(self.parallelDataStreams) - logging.debug("new message-socket crated for: passing file objects.") + self.log.debug("new message-socket crated for: passing file objects.") incomingMessageCounter = 0 - - #setting up router for load-balancing worker-threads. - #each worker-thread will handle a file event - routerSocket = self.zmqContext.socket(zmq.ROUTER) - routerSocket.bind("tcp://127.0.0.1:50000") - logging.debug("routerSocket started for 'tcp://127.0.0.1:50000'") - - #start worker-threads. each will have its own PushSocket. workerThreadList = list() numberOfWorkerThreads = parallelDataStreams fileWaitTimeInMs = self.getFileWaitTimeInMs() fileMaxWaitTimeInMs = self.getFileMaxWaitTimeInMs() for threadNumber in range(numberOfWorkerThreads): - logging.debug("instantiate new workerProcess (nr " + str(threadNumber)) + self.log.debug("instantiate new workerProcess (nr " + str(threadNumber) + " )") newWorkerThread = Process(target=WorkerProcess, args=(threadNumber, self.dataStreamIp, self.dataStreamPort, - logfileFullPath, self.chunkSize, + self.zmqCleanerIp, + self.zmqCleanerPort, fileWaitTimeInMs, fileMaxWaitTimeInMs)) workerThreadList.append(newWorkerThread) - logging.debug("start worker process nr " + str(threadNumber)) + self.log.debug("start worker process nr " + str(threadNumber)) newWorkerThread.start() #run loop, and wait for incoming messages continueReceiving = True - logging.debug("waiting for new fileEvent-messages") - while continueReceiving: - try: - incomingMessage = zmqSocketForNewFileEvents.recv() - logging.debug("new fileEvent-message received.") - logging.debug("message content: " + str(incomingMessage)) - incomingMessageCounter += 1 - - logging.debug("processFileEvent...") - self.processFileEvent(incomingMessage, routerSocket) #TODO refactor as separate process to emphasize unblocking - logging.debug("processFileEvent...done") - except Exception, e: - print "exception" - logging.error("Failed to receive new fileEvent-message.") - logging.error(sys.exc_info()) - - #TODO might using a error-count and threshold when to stop receiving, e.g. after 100 misses? - # continueReceiving = False - - - print "shutting down fileEvent-receiver..." + self.log.debug("waiting for new fileEvent-messages") try: - logging.debug("shutting down zeromq...") - self.stopReceiving(zmqSocketForNewFileEvents, zmqContext) - logging.debug("shutting down zeromq...done.") - except: - logging.error(sys.exc_info()) - logging.error("shutting down zeromq...failed.") + while continueReceiving: + try: + incomingMessage = self.messageSocket.recv() + self.log.debug("new fileEvent-message received.") + self.log.debug("message content: " + str(incomingMessage)) + incomingMessageCounter += 1 + + self.log.debug("processFileEvent..." + str(incomingMessageCounter)) + self.processFileEvent(incomingMessage) #TODO refactor as separate process to emphasize unblocking + self.log.debug("processFileEvent...done") + except Exception, e: + self.log.error("Failed to receive new fileEvent-message.") + self.log.error(sys.exc_info()) + + #TODO might using a error-count and threshold when to stop receiving, e.g. after 100 misses? + # continueReceiving = False + except KeyboardInterrupt: + self.log.info("Keyboard interuption detected. Stop receiving") - def routeFileEventToWorkerThread(self, fileEventMessage, routerSocket): + def processFileEvent(self, fileEventMessage): # LRU worker is next waiting in the queue - logging.debug("waiting for available workerThread.") + self.log.debug("waiting for available workerThread.") # address == "worker-0" # empty == "" # ready == "READY" - address, empty, ready = routerSocket.recv_multipart() - logging.debug("available workerThread detected.") + address, empty, ready = self.routerSocket.recv_multipart() + self.log.debug("available workerThread detected.") - logging.debug("passing job to workerThread...") - routerSocket.send_multipart([ + self.log.debug("passing job to workerThread...") + self.routerSocket.send_multipart([ address, b'', fileEventMessage, ]) - # inform lsyncd wrapper that the file can be moved - -# if not os.path.exists(self.pipe_name): -# os.mkfifo(self.pipe_name) - - -# messageToPipe = json.loads ( fileEventMessage ) -# messageToPipe = messageToPipe["sourcePath"] + os.sep + messageToPipe["filename"] -# my_cmd = 'echo "' + messageToPipe + '" > ' + self.pipe_name -# p = subprocess.Popen ( my_cmd, shell=True ) -# p.communicate() + self.log.debug("passing job to workerThread...done.") - logging.debug("passing job to workerThread...done.") - - def processFileEvent(self, fileEventMessage, routerSocket): - self.routeFileEventToWorkerThread(fileEventMessage, routerSocket) - - - def stopReceiving(self, zmqSocket, msgContext): - try: - logging.debug("closing zmqSocket...") - zmqSocket.close() - logging.debug("closing zmqSocket...done.") - except: - logging.debug("closing zmqSocket...failed.") - logging.error(sys.exc_info()) - - try: - logging.debug("closing zmqContext...") - msgContext.destroy() - logging.debug("closing zmqContext...done.") - except: - logging.debug("closing zmqContext...failed.") - logging.error(sys.exc_info()) - - - def getZmqSocket_Pull(self, context): - pattern_pull = zmq.PULL - assert isinstance(context, zmq.sugar.context.Context) - socket = context.socket(pattern_pull) - - return socket - - - def getZmqSocket_Push(self, context): - pattern = zmq.PUSH - assert isinstance(context, zmq.sugar.context.Context) - socket = context.socket(pattern) - - return socket - - - - def createPullSocket(self): - #get default message-socket - socket = self.messageSocket - - logging.info("binding to message socket: tcp://" + self.bindingIpForSocket + ":%s" % self.tcpPort_messageStream) - socket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.tcpPort_messageStream) - return socket + def stop(self): + self.messageSocket.close(0) + self.routerSocket.close(0) +# +# -------------------------- class: Cleaner -------------------------------------- +# class Cleaner(): """ * received cleaning jobs via zeromq, @@ -625,76 +572,58 @@ class Cleaner(): - poll the watched directory and reissue new files to fileMover which have not been detected yet """ - logfileFullPath = None bindingPortForSocket = None bindingIpForSocket = None zmqContextForCleaner = None + zmqCleanerSocket = None + + # to get the logging only handling this class + log = None - def __init__(self, logfilePath, context, bindingIp="127.0.0.1", bindingPort="6062", verbose=False): + def __init__(self, bindingIp="127.0.0.1", bindingPort="6062", context = None, verbose=False): self.bindingPortForSocket = bindingPort self.bindingIpForSocket = bindingIp - self.initLogging(logfilePath, verbose) - log = self.getLogger() + self.zmqContextForCleaner = context or zmq.Context() + + self.log = self.getLogger() + self.log.debug("Init") + + #bind to local port + self.zmqCleanerSocket = self.zmqContextForCleaner.socket(zmq.PULL) + connectionStrCleanerSocket = "tcp://" + self.bindingIpForSocket + ":%s" % self.bindingPortForSocket + self.zmqCleanerSocket.bind(connectionStrCleanerSocket) + self.log.debug("zmqCleanerSocket started for '" + connectionStrCleanerSocket + "'") + try: self.process() except zmq.error.ZMQError: - log.debug("KeyboardInterrupt detected. Shutting down workerProcess.") - self.zmqContextForCleaner.destroy() + self.log.error("ZMQError: "+ str(e)) + self.log.debug("Shutting down cleaner.") except KeyboardInterrupt: - log.debug("KeyboardInterrupt detected. Shutting down workerProcess.") - self.zmqContextForCleaner.destroy() + self.log.info("KeyboardInterrupt detected. Shutting down cleaner.") except: trace = traceback.format_exc() - log.error("Stopping cleanerProcess due to unknown error condition.") - log.debug("Error was: " + str(trace)) + self.log.error("Stopping cleanerProcess due to unknown error condition.") + self.log.debug("Error was: " + str(trace)) + + self.zmqCleanerSocket.close(0) + self.zmqContextForCleaner.destroy() def getLogger(self): logger = logging.getLogger("cleaner") return logger - def initLogging(self, logfilePath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - logfilePathFull = os.path.join(logfilePath, "cleaner.log") - logger = logging.getLogger("cleaner") - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - - #log everything to file - fileHandler = logging.FileHandler(filename=logfilePathFull, - mode="a") - fileHandlerFormat = logging.Formatter(datefmt='%Y-%m-%d_%H:%M:%S', - fmt='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s') - fileHandler.setFormatter(fileHandlerFormat) - fileHandler.setLevel(loggingLevel) - logger.addHandler(fileHandler) - - def process(self): - processingJobs = True - log = self.getLogger() - - #create zmq context - zmqContextForCleaner = zmq.Context() - self.zmqContextForCleaner = zmqContextForCleaner - - #bind to local port - zmqJobSocket = zmqContextForCleaner.socket(zmq.PULL) - zmqJobSocket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.bindingPortForSocket) - #processing messaging - while processingJobs: + while True: #waiting for new jobs + self.log.debug("Waiting for new jobs") try: - workload = zmqJobSocket.recv() + workload = self.zmqCleanerSocket.recv() except Exception as e: - logging.error("Error in receiving job: " + str(e)) + self.log.error("Error in receiving job: " + str(e)) #transform to dictionary @@ -702,8 +631,8 @@ class Cleaner(): workloadDict = json.loads(str(workload)) except: errorMessage = "invalid job received. skipping job" - log.error(errorMessage) - log.debug("workload=" + str(workload)) + self.log.error(errorMessage) + self.log.debug("workload=" + str(workload)) continue #extract fileEvent metadata @@ -716,20 +645,23 @@ class Cleaner(): # filesize = workloadDict["filesize"] except Exception, e: errorMessage = "Invalid fileEvent message received." - log.error(errorMessage) - log.debug("Error was: " + str(e)) - log.debug("workloadDict=" + str(workloadDict)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) + self.log.debug("workloadDict=" + str(workloadDict)) #skip all further instructions and continue with next iteration continue #moving source file sourceFilepath = None try: - logging.debug("removing source file...") + self.log.debug("removing source file...") #generate target filepath -# sourceFilepath = os.path.join(sourcePath,filename) - self.moveFile(sourceFilepath) - self.moveFile(sourcePath, filename, target) + sourceFilepath = os.path.join(sourcePath,filename) +# self.removeFile(sourceFilepath) + self.log.debug ("sourcePath: " + str (sourcePath)) + self.log.debug ("filename: " + str (filename)) + self.log.debug ("targetPath: " + str (targetPath)) + self.moveFile(sourcePath, filename, targetPath) # #show filesystem statistics # try: @@ -737,29 +669,28 @@ class Cleaner(): # except Exception, f: # logging.warning("Unable to get filesystem statistics") # logging.debug("Error was: " + str(f)) - log.debug("file removed: " + str(sourceFilepath)) - log.debug("removing source file...success.") + self.log.debug("file removed: " + str(sourcePath)) + self.log.debug("removing source file...success.") except Exception, e: - errorMessage = "Unable to remove source file." - log.error(errorMessage) + errorMessage = "Unable to remove source file: " + str (sourcePath) + self.log.error(errorMessage) trace = traceback.format_exc() - log.error("Error was: " + str(trace)) - log.debug("sourceFilepath="+str(sourceFilepath)) - log.debug("removing source file...failed.") + self.log.error("Error was: " + str(trace)) + self.log.debug("sourceFilepath="+str(sourceFilepath)) + self.log.debug("removing source file...failed.") #skip all further instructions and continue with next iteration continue def moveFile(self, source, filename, target): - log = self.getLogger() maxAttemptsToRemoveFile = 2 waitTimeBetweenAttemptsInMs = 500 iterationCount = 0 - log.info("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.") + self.log.info("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.") fileWasMoved = False while iterationCount <= maxAttemptsToRemoveFile and not fileWasMoved: @@ -772,32 +703,36 @@ class Cleaner(): except OSError: pass # moving the file - os.remove(filepath) - shutil.move(source + os.sep + filename, target + os.sep + filename) + sourceFile = source + os.sep + filename + targetFile = target + os.sep + filename + self.log.debug("sourceFile: " + str(sourceFile)) + self.log.debug("targetFile: " + str(targetFile)) + shutil.move(sourceFile, targetFile) fileWasMoved = True - log.debug("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.") + self.log.debug("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.") + except IOError: + self.log.debug ("IOError: " + str(filename)) except Exception, e: trace = traceback.format_exc() warningMessage = "Unable to move file {FILE}.".format(FILE=str(source) + str(filename)) - log.warning(warningMessage) - log.debug("trace=" + str(trace)) - log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs))) + self.log.warning(warningMessage) + self.log.debug("trace=" + str(trace)) + self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs))) if not fileWasMoved: - log.error("Moving file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.") + self.log.error("Moving file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.") raise Exception("maxAttemptsToMoveFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount), - FILE=filepath)) + FILE=filename)) def removeFile(self, filepath): - log = self.getLogger() maxAttemptsToRemoveFile = 2 waitTimeBetweenAttemptsInMs = 500 iterationCount = 0 - log.info("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...") + self.log.info("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...") fileWasRemoved = False while iterationCount <= maxAttemptsToRemoveFile and not fileWasRemoved: @@ -805,17 +740,17 @@ class Cleaner(): try: os.remove(filepath) fileWasRemoved = True - log.debug("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...success.") + self.log.debug("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...success.") except Exception, e: trace = traceback.format_exc() warningMessage = "Unable to remove file {FILE}.".format(FILE=str(filepath)) - log.warning(warningMessage) - log.debug("trace=" + str(trace)) - log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs))) + self.log.warning(warningMessage) + self.log.debug("trace=" + str(trace)) + self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs))) if not fileWasRemoved: - log.error("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...FAILED.") + self.log.error("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...FAILED.") raise Exception("maxAttemptsToRemoveFile reached (value={ATTEMPT}). Unable to remove file '{FILE}'.".format(ATTEMPT=str(iterationCount), FILE=filepath)) @@ -828,6 +763,8 @@ def argumentParsing(): parser.add_argument("--bindingPortForSocket", type=str, help="local port to bind to", default="6060") parser.add_argument("--dataStreamIp" , type=str, help="ip of dataStream-socket to push new files to", default="127.0.0.1") parser.add_argument("--dataStreamPort", type=str, help="port number of dataStream-socket to push new files to", default="6061") + parser.add_argument("--zmqCleanerIp" , type=str, help="zmq-pull-socket which deletes given files", default="127.0.0.1") + parser.add_argument("--zmqCleanerPort", type=str, help="zmq-pull-socket which deletes given files", default="6063") parser.add_argument("--parallelDataStreams", type=int, help="number of parallel data streams. default is 1", default="1") parser.add_argument("--chunkSize", type=int, help="chunk size of file-parts getting send via zmq", default=DEFAULT_CHUNK_SIZE) parser.add_argument("--verbose" , help="more verbose output", action="store_true") @@ -842,61 +779,6 @@ def argumentParsing(): - -def checkFolderForExistance(watchFolderPath): - """ - abort if watch-folder does not exist - - :return: - """ - - #check folder path for existance. exits if it does not exist - if not os.path.exists(watchFolderPath): - logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath)) - sys.exit(1) - - - -def checkLogfileFolder(logfilePath): - """ - abort if watch-folder does not exist - - :return: - """ - - #check folder path for existance. exits if it does not exist - if not os.path.exists(logfilePath): - logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath)) - sys.exit(1) - - -def initLogging(filenameFullPath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - #log everything to file - logging.basicConfig(level=loggingLevel, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d_%H:%M:%S', - filename=filenameFullPath, - filemode="a") - - #log info to stdout, display messages with different format than the file output - console = logging.StreamHandler() - - - console.setLevel(logging.WARNING) - formatter = logging.Formatter("%(asctime)s > %(message)s") - console.setFormatter(formatter) - logging.getLogger("").addHandler(console) - - if __name__ == '__main__': freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows arguments = argumentParsing() @@ -909,6 +791,8 @@ if __name__ == '__main__': logfilePath = str(arguments.logfilePath) logfileName = str(arguments.logfileName) parallelDataStreams = str(arguments.parallelDataStreams) + zmqCleanerIp = str(arguments.zmqCleanerIp) + zmqCleanerPort = str(arguments.zmqCleanerPort) chunkSize = arguments.chunkSize verbose = arguments.verbose logfileFullPath = os.path.join(logfilePath, logfileName) @@ -917,19 +801,46 @@ if __name__ == '__main__': #enable logging - initLogging(logfileFullPath, verbose) + helperScript.initLogging(logfileFullPath, verbose) + + + #create zmq context + # there should be only one context in one process + zmqContext = zmq.Context.instance() + logging.info("registering zmq global context") + + cleanerThread = Process(target=Cleaner, args=(zmqCleanerIp, zmqCleanerPort, zmqContext)) + cleanerThread.start() + logging.debug("cleaner thread started") #start new fileMover - # try: fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, - parallelDataStreams, logfileFullPath, chunkSize, - fileWaitTimeInMs, fileMaxWaitTimeInMs) - fileMover.process() - # except KeyboardInterrupt, ke: - # print "keyboardInterrupt detected." + parallelDataStreams, chunkSize, + zmqCleanerIp, zmqCleanerPort, + fileWaitTimeInMs, fileMaxWaitTimeInMs, + zmqContext) + try: + fileMover.process() + except KeyboardInterrupt: + logging.info("Keyboard interruption detected. Shutting down") # except Exception, e: # print "unknown exception detected." + logging.debug("shutting down zeromq...") + try: + fileMover.stop() + logging.debug("shutting down zeromq...done.") + except: + logging.error(sys.exc_info()) + logging.error("shutting down zeromq...failed.") + + try: + logging.debug("closing zmqContext...") + zmqContext.destroy() + logging.debug("closing zmqContext...done.") + except: + logging.debug("closing zmqContext...failed.") + logging.error(sys.exc_info()) diff --git a/ZeroMQTunnel/helperScript.py b/ZeroMQTunnel/helperScript.py index 631981f53999996f66b4ffc6fbf5a51f1394b0d8..02f4c8121e795e232172d55511c564d7b9e3dd97 100644 --- a/ZeroMQTunnel/helperScript.py +++ b/ZeroMQTunnel/helperScript.py @@ -1,5 +1,6 @@ import os import platform +import logging @@ -58,3 +59,55 @@ def isSupported(): supportValue = True return supportValue + + + +def checkFolderExistance(folderPath): + """ + abort if folder does not exist + + :return: + """ + + #check folder path for existance. exits if it does not exist + if not os.path.exists(folderPath): + logging.error("Folder '%s' does not exist. Abort." % str(watchFolderPath)) + sys.exit(1) + + +def initLogging(filenameFullPath, verbose): + #@see https://docs.python.org/2/howto/logging-cookbook.html + +# def initLogging(self, logfilePath, verbose): +# +# logfilePathFull = os.path.join(logfilePath, "cleaner.log") + + #more detailed logging if verbose-option has been set + loggingLevel = logging.INFO + if verbose: + loggingLevel = logging.DEBUG + + #log everything to file + logging.basicConfig(level=loggingLevel, + format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d_%H:%M:%S', + filename=filenameFullPath, + filemode="a") + +# fileHandler = logging.FileHandler(filename=logfilePathFull, +# mode="a") +# fileHandlerFormat = logging.Formatter(datefmt='%Y-%m-%d_%H:%M:%S', +# fmt='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s') +# fileHandler.setFormatter(fileHandlerFormat) +# fileHandler.setLevel(loggingLevel) +# logger.addHandler(fileHandler) + + #log info to stdout, display messages with different format than the file output + console = logging.StreamHandler() + console.setLevel(logging.WARNING) + formatter = logging.Formatter("%(asctime)s > %(message)s") + console.setFormatter(formatter) + + logging.getLogger("").addHandler(console) + + diff --git a/ZeroMQTunnel/receiver.py b/ZeroMQTunnel/receiver.py index af664ff4b089649392f1dbd05e8250cbbf32b527..48e369f233f6631c57d614c514a005b3c769c925 100644 --- a/ZeroMQTunnel/receiver.py +++ b/ZeroMQTunnel/receiver.py @@ -13,257 +13,156 @@ import os import traceback from stat import S_ISREG, ST_MTIME, ST_MODE import threading +import helperScript - +# +# -------------------------- class: FileReceiver -------------------------------------- +# class FileReceiver: - globalZmqContext = None - liveViewerZmqContext = None + zmqContext = None outputDir = None zqmDataStreamIp = None zmqDataStreamPort = None zmqLiveViewerIp = None zmqLiveViewerPort = None - ringBuffer = [] - maxRingBufferSize = 200 - timeToWaitForRingBuffer = 2 - runThread = True + exchangeIp = "127.0.0.1" + exchangePort = "6072" - def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp): + log = None + + # sockets + zmqSocket = None + exchangeSocket = None + + + def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None): self.outputDir = outputDir self.zmqDataStreamIp = zmqDataStreamIp self.zmqDataStreamPort = zmqDataStreamPort self.zmqLiveViewerIp = zmqLiveViewerIp self.zmqLiveViewerPort = zmqLiveViewerPort - # initialize ring buffer - # get all entries in the directory - # TODO empty target dir -> ringBuffer = [] - ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir)) - # get the corresponding stats - ringBuffer = ((os.stat(path), path) for path in ringBuffer) - - # leave only regular files, insert modification date - ringBuffer = [[stat[ST_MTIME], path] - for stat, path in ringBuffer if S_ISREG(stat[ST_MODE])] + if context: + assert isinstance(context, zmq.sugar.context.Context) - # sort the ring buffer in descending order (new to old files) - ringBuffer = sorted(ringBuffer, reverse=True) + self.zmqContext = context or zmq.Context() + self.log = self.getLogger() + self.log.debug("Init") - # "global" in Python is per-module ! - global globalZmqContext - self.globalZmqContext = zmq.Context() + # start file receiver + self.receiverThread = threading.Thread(target=Coordinator, args=(self.outputDir, self.zmqDataStreamPort, self.zmqDataStreamIp, self.zmqLiveViewerPort, self.zmqLiveViewerIp)) + self.receiverThread.start() - # thread to communicate with live viewer - self.liveViewerThread = threading.Thread(target=self.sendFileToLiveViewer) - self.liveViewerThread.start() + # create pull socket + self.zmqSocket = self.zmqContext.socket(zmq.PULL) + connectionStrZmqSocket = "tcp://" + self.zmqDataStreamIp + ":%s" % self.zmqDataStreamPort + self.zmqSocket.bind(connectionStrZmqSocket) + self.log.debug("zmqSocket started (bind) for '" + connectionStrZmqSocket + "'") + self.exchangeSocket = self.zmqContext.socket(zmq.PAIR) + connectionStrExchangeSocket = "tcp://" + self.exchangeIp + ":%s" % self.exchangePort + self.exchangeSocket.connect(connectionStrExchangeSocket) + self.log.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'") try: - logging.info("Start receiving new files") + self.log.info("Start receiving new files") self.startReceiving() - logging.info("Stopped receiving.") + self.log.info("Stopped receiving.") except Exception, e: - logging.error("Unknown error while receiving files. Need to abort.") - logging.debug("Error was: " + str(e)) + self.log.error("Unknown error while receiving files. Need to abort.") + self.log.debug("Error was: " + str(e)) except: trace = traceback.format_exc() - logging.info("Unkown error state. Shutting down...") - logging.debug("Error was: " + str(trace)) - self.globalZmqContext.destroy() - - logging.info("Quitting.") - - - def receiveMessage(self, socket): - assert isinstance(socket, zmq.sugar.socket.Socket) - logging.debug("receiving messages...") - # message = socket.recv() - # while True: - message = socket.recv_multipart() - - return message - - - def getZmqContext(self): - #get reference for global context-var - globalZmqContext = self.globalZmqContext - - return globalZmqContext - - - def getZmqSocket_Pull(self, context): - pattern_pull = zmq.PULL - assert isinstance(context, zmq.sugar.context.Context) - socket = context.socket(pattern_pull) - - return socket - - - def getZmqSocket_Rep(self, context): - pattern_pull = zmq.REP - assert isinstance(context, zmq.sugar.context.Context) - socket = context.socket(pattern_pull) - - return socket - - - def createPullSocket(self, context): - assert isinstance(context, zmq.sugar.context.Context) - socket = self.getZmqSocket_Pull(context) - - logging.info("binding to data socket: tcp://" + self.zmqDataStreamIp + ":%s" % self.zmqDataStreamPort) - socket.bind('tcp://' + self.zmqDataStreamIp + ':%s' % self.zmqDataStreamPort) - - return socket - - - def createSocketForLiveViewer(self, context): - assert isinstance(context, zmq.sugar.context.Context) - socket = self.getZmqSocket_Rep(context) - - logging.info("binding to data socket: tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort) - socket.bind('tcp://' + self.zmqLiveViewerIp + ':%s' % self.zmqLiveViewerPort) - - return socket + self.log.info("Unkown error state. Shutting down...") + self.log.debug("Error was: " + str(trace)) + self.zmqContext.destroy() + self.log.info("Quitting.") - def addFileToRingBuffer(self, filename, fileModTime): - # prepend file to ring buffer and restore order - self.ringBuffer[:0] = [[fileModTime, filename]] - self.ringBuffer = sorted(self.ringBuffer, reverse=True) - # if the maximal size is exceeded: remove the oldest files - if len(self.ringBuffer) > self.maxRingBufferSize: - for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]: - if float(time.time()) - mod_time > self.timeToWaitForRingBuffer: - os.remove(path) - self.ringBuffer.remove([mod_time, path]) + def getLogger(self): + logger = logging.getLogger("fileReceiver") + return logger - # Albula is the live viewer used at the beamlines - def sendFileToLiveViewer(self): + def combineMessage(self, zmqSocket): + receivingMessages = True + #save all chunks to file + while receivingMessages: + multipartMessage = zmqSocket.recv_multipart() - #create socket for live viewer - try: - logging.info("creating socket for communication with live viewer...") - zmqContext = self.getZmqContext() - zmqLiveViewerSocket = self.createSocketForLiveViewer(zmqContext) - logging.info("creating socket for communication with live viewer...done.") - except Exception, e: - errorMessage = "Unable to create zeromq context." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.info("creating socket for communication with live viewer...failed.") - raise Exception(e) - - # if there is a request of the live viewer: - while True: - # Wait for next request from client + #extract multipart message try: - message = zmqLiveViewerSocket.recv() - except zmq.error.ContextTerminated: - break - print "Received request: ", message - time.sleep (1) - # send first element in ring buffer to live viewer (the path of this file is the second entry) - if self.ringBuffer: - try: - zmqLiveViewerSocket.send(self.ringBuffer[0][1]) - print self.ringBuffer[0][1] - except zmq.error.ContextTerminated: - break - else: - try: - zmqLiveViewerSocket.send("None") - print self.ringBuffer - except zmq.error.ContextTerminated: - break + #TODO is string conversion needed here? + payloadMetadata = str(multipartMessage[0]) + except: + self.log.error("an empty config was transferred for multipartMessage") + #TODO validate multipartMessage (like correct dict-values for metadata) + self.log.debug("multipartMessage.metadata = " + str(payloadMetadata)) + + #extraction metadata from multipart-message + payloadMetadataDict = json.loads(payloadMetadata) - def combineMessage(self, zmqSocket): - # multipartMessage = zmqSocket.recv_multipart() - # logging.info("New message received.") - # logging.debug("message-type : " + str(type(multipartMessage))) - # logging.debug("message-length: " + str(len(multipartMessage))) - # loopCounter+=1 - #save all chunks to file - while True: - multipartMessage = zmqSocket.recv_multipart() #append to file try: - logging.debug("append to file based on multipart-message...") + self.log.debug("append to file based on multipart-message...") #TODO: save message to file using a thread (avoids blocking) #TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened - self.appendChunksToFileFromMultipartMessage(multipartMessage) - logging.debug("append to file based on multipart-message...success.") + self.appendChunksToFileFromMultipartMessage(payloadMetadataDict, multipartMessage) + self.log.debug("append to file based on multipart-message...success.") except Exception, e: errorMessage = "Unable to append multipart-content to file." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.debug("append to file based on multipart-message...failed.") + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) + self.log.debug("append to file based on multipart-message...failed.") except: errorMessage = "Unable to append multipart-content to file. Unknown Error." - logging.error(errorMessage) - logging.debug("append to file based on multipart-message...failed.") - if len(multipartMessage[1]) == 0: + self.log.error(errorMessage) + self.log.debug("append to file based on multipart-message...failed.") + if len(multipartMessage[1]) < payloadMetadataDict["chunkSize"] : #indicated end of file. closing file and leave loop - logging.debug("last file-chunk received. stop appending.") + self.log.debug("last file-chunk received. stop appending.") break - payloadMetadata = str(multipartMessage[0]) - payloadMetadataDict = json.loads(payloadMetadata) filename = self.generateTargetFilepath(payloadMetadataDict) fileModTime = payloadMetadataDict["fileModificationTime"] - logging.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename)) + self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename)) - # logging.debug("message-type : " + str(type(multipartMessage))) - # logging.debug("message-length: " + str(len(multipartMessage))) - - # add to ring buffer - self.addFileToRingBuffer(str(filename), fileModTime) + # send the file to the coordinator to add it to the ring buffer + message = "AddFile" + str(filename) + ", " + str(fileModTime) + self.log.debug("Send file to coordinator: " + message ) + self.exchangeSocket.send(message) def startReceiving(self): - #create pull socket - try: - logging.info("creating local pullSocket for incoming files...") - zmqContext = self.getZmqContext() - zmqSocket = self.createPullSocket(zmqContext) - logging.info("creating local pullSocket for incoming files...done.") - except Exception, e: - errorMessage = "Unable to create zeromq context." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.info("creating local pullSocket for incoming files...failed.") - raise Exception(e) - #run loop, and wait for incoming messages continueStreaming = True loopCounter = 0 #counter of total received messages continueReceiving = True #receiving will stop if value gets False - logging.debug("Waiting for new messages...") + self.log.debug("Waiting for new messages...") while continueReceiving: try: - self.combineMessage(zmqSocket) + self.combineMessage(self.zmqSocket) loopCounter+=1 except KeyboardInterrupt: - logging.debug("Keyboard interrupt detected. Stop receiving.") + self.log.debug("Keyboard interrupt detected. Stop receiving.") + continueReceiving = False break except: - logging.error("receive message...failed.") - logging.error(sys.exc_info()) + self.log.error("receive message...failed.") + self.log.error(sys.exc_info()) continueReceiving = False - logging.info("shutting down receiver...") + self.log.info("shutting down receiver...") try: - logging.debug("shutting down zeromq...") - self.stopReceiving(zmqSocket, zmqContext) - logging.debug("shutting down zeromq...done.") + self.stopReceiving(self.zmqSocket, self.zmqContext) + self.log.debug("shutting down receiver...done.") except: - logging.error(sys.exc_info()) - logging.error("shutting down zeromq...failed.") + self.log.error(sys.exc_info()) + self.log.error("shutting down receiver...failed.") def generateTargetFilepath(self,configDict): @@ -301,30 +200,19 @@ class FileReceiver: return targetPath - def appendChunksToFileFromMultipartMessage(self, multipartMessage): + def appendChunksToFileFromMultipartMessage(self, configDict, multipartMessage): - #extract multipart message - try: - configDictJson = multipartMessage[0] - except: - logging.error("an empty config was transferred for multipartMessage") try: chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata payload = multipartMessage[1:] except: - logging.warning("an empty file was received within the multipart-message") + self.log.warning("an empty file was received within the multipart-message") payload = None - #TODO validate multipartMessage (like correct dict-values for metadata) - logging.debug("multipartMessage.metadata = " + str(configDictJson)) - - #extraction metadata from multipart-message - configDict = json.loads(configDictJson) - #generate target filepath targetFilepath = self.generateTargetFilepath(configDict) - logging.debug("new file is going to be created at: " + targetFilepath) + self.log.debug("new file is going to be created at: " + targetFilepath) #append payload to file @@ -338,18 +226,18 @@ class FileReceiver: targetPath = self.generateTargetPath(configDict) os.makedirs(targetPath) newFile = open(targetFilepath, "w") - logging.info("New target directory created: " + str(targetPath)) + self.log.info("New target directory created: " + str(targetPath)) except Exception, f: errorMessage = "unable to save payload to file: '" + targetFilepath + "'" - logging.error(errorMessage) - logging.debug("Error was: " + str(f)) - logging.debug("targetPath="+str(targetPath)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(f)) + self.log.debug("targetPath="+str(targetPath)) raise Exception(errorMessage) except Exception, e: - logging.error("failed to append payload to file: '" + targetFilepath + "'") - logging.debug("Error was: " + str(e)) - logging.debug("ErrorTyp: " + str(type(e))) - logging.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST)) + self.log.error("failed to append payload to file: '" + targetFilepath + "'") + self.log.debug("Error was: " + str(e)) + self.log.debug("ErrorTyp: " + str(type(e))) + self.log.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST)) #only write data if a payload exist try: if payload != None: @@ -358,43 +246,183 @@ class FileReceiver: newFile.close() except Exception, e: errorMessage = "unable to append data to file." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) + self.log.error(errorMessage) + self.log.debug("Error was: " + str(e)) raise Exception(errorMessage) + def stopReceiving(self, zmqSocket, zmqContext): - def stopReceiving(self, zmqSocket, msgContext): + self.log.debug("stopReceiving...") + try: + zmqSocket.close(0) + self.log.debug("closing zmqSocket...done.") + except: + self.log.error("closing zmqSocket...failed.") + self.log.error(sys.exc_info()) - self.runThread=False + self.log.debug("sending exit signal to thread...") + self.exchangeSocket.send("Exit") + # give the signal time to arrive + time.sleep(0.1) + self.exchangeSocket.close(0) + self.log.debug("sending exit signal to thread...done") try: - logging.debug("closing zmqSocket...") - zmqSocket.close() - logging.debug("closing zmqSocket...done.") + zmqContext.destroy() + self.log.debug("closing zmqContext...done.") except: - logging.error("closing zmqSocket...failed.") - logging.error(sys.exc_info()) + self.log.error("closing zmqContext...failed.") + self.log.error(sys.exc_info()) + + +# +# -------------------------- class: Coordinator -------------------------------------- +# +class Coordinator: + zmqContext = None + liveViewerZmqContext = None + outputDir = None + zqmDataStreamIp = None + zmqDataStreamPort = None + zmqLiveViewerIp = None + zmqLiveViewerPort = None + receiverExchangeIp = "127.0.0.1" + receiverExchangePort = "6072" + ringBuffer = [] + maxRingBufferSize = 200 + timeToWaitForRingBuffer = 2 + + log = None + + receiverThread = None + liveViewerThread = None + + # sockets + receiverExchangeSocket = None + zmqliveViewerSocket = None + + + def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None): + self.outputDir = outputDir + self.zmqDataStreamIp = zmqDataStreamIp + self.zmqDataStreamPort = zmqDataStreamPort + self.zmqLiveViewerIp = zmqLiveViewerIp + self.zmqLiveViewerPort = zmqLiveViewerPort + + self.log = self.getLogger() + self.log.debug("Init") + + if context: + assert isinstance(context, zmq.sugar.context.Context) + + self.zmqContext = context or zmq.Context() + + # create sockets + self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIR) + connectionStrReceiverExchangeSocket = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort + self.receiverExchangeSocket.bind(connectionStrReceiverExchangeSocket) + self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStrReceiverExchangeSocket + "'") + + # create socket for live viewer + self.zmqliveViewerSocket = self.zmqContext.socket(zmq.REP) + connectionStrLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort + self.zmqliveViewerSocket.bind(connectionStrLiveViewerSocket) + self.log.debug("zmqLiveViewerSocket started (bind) for '" + connectionStrLiveViewerSocket + "'") + + self.poller = zmq.Poller() + self.poller.register(self.receiverExchangeSocket, zmq.POLLIN) + self.poller.register(self.zmqliveViewerSocket, zmq.POLLIN) + + + # initialize ring buffer + # get all entries in the directory + # TODO empty target dir -> ringBuffer = [] + self.ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir)) + # get the corresponding stats + self.ringBuffer = ((os.stat(path), path) for path in self.ringBuffer) + # leave only regular files, insert modification date + self.ringBuffer = [[stat[ST_MTIME], path] + for stat, path in self.ringBuffer if S_ISREG(stat[ST_MODE])] + + # sort the ring buffer in descending order (new to old files) + self.ringBuffer = sorted(self.ringBuffer, reverse=True) + self.log.debug("Init ring buffer") -# try: -# logging.debug("closing zmqLiveViwerSocket...") -# zmqLiveViewerSocket.close() -# logging.debug("closing zmqLiveViwerSocket...done.") -# except: -# logging.error("closing zmqLiveViewerSocket...failed.") -# logging.error(sys.exc_info()) try: - logging.debug("closing zmqContext...") -# msgContext.destroy() - msgContext.term() - logging.debug("closing zmqContext...done.") - except: - logging.error("closing zmqContext...failed.") - logging.error(sys.exc_info()) + self.log.info("Start communication") + self.communicate() + self.log.info("Stopped communication.") + except Exception, e: + trace = traceback.format_exc() + self.log.info("Unkown error state. Shutting down...") + self.log.debug("Error was: " + str(e)) + + + self.log.info("Quitting.") + + + def getLogger(self): + logger = logging.getLogger("coordinator") + return logger + + + def communicate(self): + should_continue = True + + while should_continue: + socks = dict(self.poller.poll()) + + if self.receiverExchangeSocket in socks and socks[self.receiverExchangeSocket] == zmq.POLLIN: + message = self.receiverExchangeSocket.recv() + self.log.debug("Recieved control command: %s" % message ) + if message == "Exit": + self.log.debug("Recieved exit command, coordinator thread will stop recieving messages") + should_continue = False + self.liveViewerSocket.send("Exit") + break + elif message.startswith("AddFile"): + self.log.debug("Received AddFile command") + # add file to ring buffer + splittedMessage = message[7:].split(", ") + filename = splittedMessage[0] + fileModTime = splittedMessage[1] + self.log.debug("Send new file to ring buffer: " + str(filename) + ", " + str(fileModTime)) + self.addFileToRingBuffer(filename, fileModTime) + + if self.zmqliveViewerSocket in socks and socks[self.zmqliveViewerSocket] == zmq.POLLIN: + message = self.zmqliveViewerSocket.recv() + self.log.debug("Call for next file... " + message) + # send first element in ring buffer to live viewer (the path of this file is the second entry) + if self.ringBuffer: + answer = self.ringBuffer[0][1] + else: + answer = "None" + + print answer + try: + self.zmqliveViewerSocket.send(answer) + except zmq.error.ContextTerminated: + break + self.log.debug("Closing socket") + self.receiverExchangeSocket.close(0) + self.zmqliveViewerSocket.close(0) + def addFileToRingBuffer(self, filename, fileModTime): + # prepend file to ring buffer and restore order + self.ringBuffer[:0] = [[fileModTime, filename]] + self.ringBuffer = sorted(self.ringBuffer, reverse=True) + + # if the maximal size is exceeded: remove the oldest files + if len(self.ringBuffer) > self.maxRingBufferSize: + for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]: + if float(time.time()) - mod_time > self.timeToWaitForRingBuffer: + os.remove(path) + self.ringBuffer.remove([mod_time, path]) + def argumentParsing(): @@ -415,30 +443,6 @@ def argumentParsing(): return arguments -def initLogging(filenameFullPath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - #log everything to file - logging.basicConfig(level=loggingLevel, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d_%H:%M:%S', - filename=filenameFullPath, - filemode="a") - - #log info to stdout, display messages with different format than the file output - console = logging.StreamHandler() - console.setLevel(logging.WARNING) - formatter = logging.Formatter("%(asctime)s > %(message)s") - console.setFormatter(formatter) - logging.getLogger("").addHandler(console) - - if __name__ == "__main__": @@ -455,7 +459,7 @@ if __name__ == "__main__": #enable logging - initLogging(logfileFilePath, verbose) + helperScript.initLogging(logfileFilePath, verbose) #start file receiver diff --git a/ZeroMQTunnel/watcher_lsyncd.py b/ZeroMQTunnel/watcher_lsyncd.py index a9eb644f229ea46635bc267664f71629d508fbc6..29700a839d057cf6c4cd9cd9a2d0ff2b0caa1fcd 100644 --- a/ZeroMQTunnel/watcher_lsyncd.py +++ b/ZeroMQTunnel/watcher_lsyncd.py @@ -12,7 +12,6 @@ import helperScript -# class MyHandler(PatternMatchingEventHandler): class DirectoryWatcherHandler(): patterns = ["*"] zmqContext = None @@ -24,36 +23,19 @@ class DirectoryWatcherHandler(): def __init__(self, zmqContext, fileEventServerIp, watchFolder, fileEventServerPort): logging.debug("DirectoryWatcherHandler: __init__()") - # logging.debug("DirectoryWatcherHandler(): type(zmqContext) = " + str(type(zmqContext))) - logging.info("registering zmq global context") - self.globalZmqContext = zmqContext + logging.info("registering zmq context") + self.zmqContext = zmqContext self.watchFolder = os.path.normpath(watchFolder) self.fileEventServerIp = fileEventServerIp self.fileEventServerPort = fileEventServerPort - #create zmq sockets - self.messageSocket = self.createPushSocket(self.globalZmqContext, fileEventServerPort) - - - def getZmqSocket_Push(self, context): - pattern = zmq.PUSH - assert isinstance(context, zmq.sugar.context.Context) - socket = context.socket(pattern) - - return socket - - - def createPushSocket(self, context, fileEventServerPort): + assert isinstance(self.zmqContext, zmq.sugar.context.Context) - assert isinstance(context, zmq.sugar.context.Context) - - socket = self.getZmqSocket_Push(context) - - zmqSocketStr = 'tcp://' + self.fileEventServerIp + ':' + str(fileEventServerPort) + #create zmq sockets + self.messageSocket = zmqContext.socket(zmq.PUSH) + zmqSocketStr = "tcp://" + self.fileEventServerIp + ":" + str(self.fileEventServerPort) + self.messageSocket.connect(zmqSocketStr) logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr)) - socket.connect(zmqSocketStr) - - return socket def passFileToZeromq(self, filepath, targetPath): @@ -64,6 +46,8 @@ class DirectoryWatcherHandler(): try: self.sendFilesystemEventToMessagePipe(filepath, self.messageSocket, targetPath) + except KeyboardInterrupt: + logging.info("Keyboard interruption detected. Stop passing file to zmq.") except Exception, e: logging.error("Unable to process file '" + str(filepath) + "'") logging.warning("Skip file '" + str(filepath) + "'. Reason was: " + str(e)) @@ -137,11 +121,15 @@ class DirectoryWatcherHandler(): logging.debug(str(messageDictJson)) targetSocket.send(messageDictJson) logging.info("Sending message...done.") + except KeyboardInterrupt: + logging.error("Sending message...failed because of KeyboardInterrupt.") except Exception, e: logging.error("Sending message...failed.") logging.debug("Error was: " + str(e)) raise Exception(e) + def shuttingDown(self): + self.messageSocket.close(0) def getDefaultConfig_logfilePath(): @@ -261,7 +249,7 @@ def argumentParsing(): sys.exit(1) #check logfile-path for existance - checkLogfileFolder(arguments.logfilePath) + helperScript.checkFolderExistance(arguments.logfilePath) return arguments @@ -269,58 +257,6 @@ def argumentParsing(): -def checkWatchFolder(watchFolderPath): - """ - abort if watch-folder does not exist - - :return: - """ - - #check folder path for existance. exits if it does not exist - if not os.path.exists(watchFolderPath): - logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath)) - sys.exit(1) - - - -def checkLogfileFolder(logfilePath): - """ - abort if watch-folder does not exist - - :return: - """ - - #check folder path for existance. exits if it does not exist - if not os.path.exists(logfilePath): - logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath)) - sys.exit(1) - - - -def initLogging(filenameFullPath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - #log everything to file - logging.basicConfig(level=loggingLevel, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d_%H:%M:%S', - filename=filenameFullPath, - filemode="a") - - #log info to stdout, display messages with different format than the file output - console = logging.StreamHandler() - console.setLevel(logging.WARNING) - formatter = logging.Formatter("%(asctime)s > %(message)s") - console.setFormatter(formatter) - logging.getLogger("").addHandler(console) - - - if __name__ == '__main__': arguments = argumentParsing() watchFolder = arguments.watchFolder @@ -333,15 +269,14 @@ if __name__ == '__main__': communicationWithLcyncdPort = "6080" #abort if watch-folder does not exist - checkWatchFolder(watchFolder) + helperScript.checkFolderExistance(watchFolder) #enable logging - initLogging(logfileFilePath, verbose) + helperScript.initLogging(logfileFilePath, verbose) #create zmq context - global zmqContext zmqContext = zmq.Context() @@ -350,69 +285,45 @@ if __name__ == '__main__': directoryWatcher = DirectoryWatcherHandler(zmqContext, fileEventServerIp, watchFolder, fileEventServerPort) -# pipe_path = "/tmp/zeromqllpipe" -# if not os.path.exists(pipe_path): -# os.mkfifo(pipe_path) -# -# # Open the fifo. We need to open in non-blocking mode or it will stalls until -# # someone opens it for writting -# pipe_fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK) -# -# -# #wait for new files -# with os.fdopen(pipe_fd) as pipe: -# while True: -# message = pipe.read() -# if message: -## print("Received: '%s'" % message) -# pathnames = message.splitlines() -# for filepath in pathnames: -# directoryWatcher.passFileToZeromq(filepath) -# time.sleep(0.1) - - workers = zmqContext.socket(zmq.PULL) zmqSocketStr = 'tcp://' + communicationWithLcyncdIp + ':' + communicationWithLcyncdPort - logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr)) workers.bind(zmqSocketStr) + logging.debug("Bind to lcyncd ZMQ socket: " + str(zmqSocketStr)) - while True: - #waiting for new jobs - try: + try: + while True: + #waiting for new jobs workload = workers.recv() - except KeyboardInterrupt: - break - - - #transform to dictionary - try: - workloadDict = json.loads(str(workload)) - except: - errorMessage = "invalid job received. skipping job" - logging.error(errorMessage) - logging.debug("workload=" + str(workload)) - continue - - #extract fileEvent metadata - try: - #TODO validate fileEventMessageDict dict - filepath = workloadDict["filepath"] - targetPath = workloadDict["targetPath"] - logging.info("Received message: filepath: " + str(filepath) + ", targetPath: " + str(targetPath)) - except Exception, e: - errorMessage = "Invalid fileEvent message received." - logging.error(errorMessage) - logging.debug("Error was: " + str(e)) - logging.debug("workloadDict=" + str(workloadDict)) - #skip all further instructions and continue with next iteration - continue - - # send the file to the fileMover - directoryWatcher.passFileToZeromq(filepath, targetPath) - - - # We never get here but clean up anyhow - workers.close() + #transform to dictionary + try: + workloadDict = json.loads(str(workload)) + except: + errorMessage = "invalid job received. skipping job" + logging.error(errorMessage) + logging.debug("workload=" + str(workload)) + continue + + #extract fileEvent metadata + try: + #TODO validate fileEventMessageDict dict + filepath = workloadDict["filepath"] + targetPath = workloadDict["targetPath"] + logging.info("Received message: filepath: " + str(filepath) + ", targetPath: " + str(targetPath)) + except Exception, e: + errorMessage = "Invalid fileEvent message received." + logging.error(errorMessage) + logging.debug("Error was: " + str(e)) + logging.debug("workloadDict=" + str(workloadDict)) + #skip all further instructions and continue with next iteration + continue + + # send the file to the fileMover + directoryWatcher.passFileToZeromq(filepath, targetPath) + except KeyboardInterrupt: + logging.info("Keyboard interruption detected. Shuting down") + + workers.close(0) + directoryWatcher.shuttingDown() zmqContext.destroy() diff --git a/ZeroMQTunnel/wrapper_script.py b/ZeroMQTunnel/wrapper_script.py new file mode 100644 index 0000000000000000000000000000000000000000..2f7ac8db7d9e817f932cebe1a9a4c70e94ce5cc8 --- /dev/null +++ b/ZeroMQTunnel/wrapper_script.py @@ -0,0 +1,69 @@ +__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>' + +import argparse +import subprocess +import os +import time +import zmq +import json +import logging +import helperScript + + +supportedFormats = [ "tif", "cbf", "hdf5"] +watchFolder = "/space/projects/Live_Viewer/source/" +logfile = "/space/projects/Live_Viewer/logs/wrapper_script.log" +verbose = True + +#enable logging +helperScript.initLogging(logfile, verbose) + + +parser = argparse.ArgumentParser() +parser.add_argument("--mv_source", help = "Move source") +parser.add_argument("--mv_target", help = "Move target") + +arguments = parser.parse_args() + + +source = os.path.normpath ( arguments.mv_source ) +target = os.path.normpath ( arguments.mv_target ) + +( parentDir, filename ) = os.path.split ( source ) +commonPrefix = os.path.commonprefix ( [ watchFolder, source ] ) +relativebasepath = os.path.relpath ( source, commonPrefix ) +( relativeParent, blub ) = os.path.split ( relativebasepath ) + +( name, postfix ) = filename.split( "." ) +supported_file = postfix in supportedFormats + +zmqIp = "127.0.0.1" +zmqPort = "6080" + +if supported_file: + + # set up ZeroMQ + zmqContext = zmq.Context() + + socket = zmqContext.socket(zmq.PUSH) + zmqSocketStr = 'tcp://' + zmqIp + ':' + zmqPort + socket.connect(zmqSocketStr) + logging.debug( "Connecting to ZMQ socket: " + str(zmqSocketStr)) + + #send reply back to server + workload = { "filepath": source, "targetPath": target } + workload_json = json.dumps(workload) + try: + socket.send(workload_json) + except: + logging.debug( "Could not send message to ZMQ: " + str(workload)) + + logging.debug( "Send message to ZMQ: " + str(workload)) + + # We never get here but clean up anyhow + try: + socket.close() + zmqContext.destroy() + except KeyboardInterrupt: + socket.close(0) + zmqContext.destroy() diff --git a/copy_test_data.sh b/copy_test_data.sh new file mode 100644 index 0000000000000000000000000000000000000000..de5990a747c59d9a61abd75d8635b20b7592ccd5 --- /dev/null +++ b/copy_test_data.sh @@ -0,0 +1,9 @@ +#/bin/sh + +FILES=/space/test_data/flat/* +TARGET=/space/projects/Live_Viewer/source/local +for f in $FILES +do + echo $f + cp $f $TARGET +done diff --git a/lsyncd.conf b/lsyncd.conf index 514892df9a9f214e81ca97e0475c833ffe177d0a..41defeb23627128db54cc69679e21cb48bc41616 100644 --- a/lsyncd.conf +++ b/lsyncd.conf @@ -20,42 +20,42 @@ local formats = { jpg = true, tif = true, cbf = true, log = true } local folders = { "/local", "/current/raw", "/commissioning/raw" } gpfs = { - maxProcesses = 50, - - onCreate = function(event) - -- check if in relevant subfolder - local location = false - for i, path in ipairs(folders) do - if string.sub(event.path, 1, string.len(path)) == path then - location = true - break - end - end - if location == false then return end - - if event.isdir then - for i, path in ipairs(folders) do - if path == event.pathname then return end - end - spawn( - event, - '/bin/mkdir', - event.targetPath - ) - else - -- check filetype - local extension = string.match(event.name, ".*%.([^.]+)$") - if formats[extension] ~= true then return end - - spawn ( - event, - '/usr/bin/python', - '/space/projects/Live_Viewer/wrapper_script.py', - '--mv_source', - event.sourcePath, - '--mv_target', - event.targetPathdir - ) +-- maxProcesses = 50, + +-- onCreate = function(event) +-- -- check if in relevant subfolder +-- local location = false +-- for i, path in ipairs(folders) do +-- if string.sub(event.path, 1, string.len(path)) == path then +-- location = true +-- break +-- end +-- end +-- if location == false then return end +-- +-- if event.isdir then +-- for i, path in ipairs(folders) do +-- if path == event.pathname then return end +-- end +-- spawn( +-- event, +-- '/bin/mkdir', +-- event.targetPath +-- ) +-- else +-- -- check filetype +-- local extension = string.match(event.name, ".*%.([^.]+)$") +-- if formats[extension] ~= true then return end +-- +-- spawn ( +-- event, +-- '/usr/bin/python', +-- '/space/projects/Live_Viewer/ZeroMQTunnel/wrapper_script.py', +-- '--mv_source', +-- event.sourcePath, +-- '--mv_target', +-- event.targetPathdir +-- ) -- spawn( -- event, @@ -63,8 +63,8 @@ gpfs = { -- event.sourcePath, -- event.targetPathdir -- ) - end - end, +-- end +-- end, onModify = function(event) if event.isdir then @@ -87,7 +87,7 @@ gpfs = { spawn ( event, '/usr/bin/python', - '/space/projects/Live_Viewer/wrapper_script.py', + '/space/projects/Live_Viewer/ZeroMQTunnel/wrapper_script.py', '--mv_source', event.sourcePath, '--mv_target', diff --git a/wrapper_script.py b/wrapper_script.py deleted file mode 100644 index 3bcf363cd0e89b112da29b920cd7c39da6de6ee2..0000000000000000000000000000000000000000 --- a/wrapper_script.py +++ /dev/null @@ -1,96 +0,0 @@ -import argparse -import subprocess -import os -import time -import zmq -import json -import logging - - -def initLogging(filenameFullPath, verbose): - #@see https://docs.python.org/2/howto/logging-cookbook.html - - #more detailed logging if verbose-option has been set - loggingLevel = logging.INFO - if verbose: - loggingLevel = logging.DEBUG - - #log everything to file - logging.basicConfig(level=loggingLevel, - format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d_%H:%M:%S', - filename=filenameFullPath, - filemode="a") - - #log info to stdout, display messages with different format than the file output - console = logging.StreamHandler() - console.setLevel(logging.WARNING) - formatter = logging.Formatter("%(asctime)s > %(message)s") - console.setFormatter(formatter) - logging.getLogger("").addHandler(console) - - - - -supportedFormats = [ "tif", "cbf", "hdf5"] -watchFolder = "/space/projects/Live_Viewer/source/" -logfile = "/space/projects/Live_Viewer/logs/wrapper_script.log" -verbose = True - -#enable logging -initLogging(logfile, verbose) - - -parser = argparse.ArgumentParser() -parser.add_argument("--mv_source", help = "Move source") -parser.add_argument("--mv_target", help = "Move target") - -arguments = parser.parse_args() - - -source = os.path.normpath ( arguments.mv_source ) -target = os.path.normpath ( arguments.mv_target ) - -( parentDir, filename ) = os.path.split ( source ) -commonPrefix = os.path.commonprefix ( [ watchFolder, source ] ) -relativebasepath = os.path.relpath ( source, commonPrefix ) -( relativeParent, blub ) = os.path.split ( relativebasepath ) - -( name, postfix ) = filename.split( "." ) -supported_file = postfix in supportedFormats - -zmqIp = "127.0.0.1" -zmqPort = "6080" - -if supported_file: - - # set up ZeroMQ - zmqContext = zmq.Context() - - socket = zmqContext.socket(zmq.PUSH) - zmqSocketStr = 'tcp://' + zmqIp + ':' + zmqPort - logging.debug( "Connecting to ZMQ socket: " + str(zmqSocketStr)) - socket.connect(zmqSocketStr) - - #send reply back to server - workload = { "filepath": source, "targetPath": target } - workload_json = json.dumps(workload) - socket.send(workload_json) - logging.debug( "Send message to ZMQ: " + str(workload)) - -# my_cmd = 'echo "' + source + '" > /tmp/zeromqllpipe' -# p = subprocess.Popen ( my_cmd, shell=True ) -# p.communicate() - - # wait to ZeroMQ to finish -# time.sleep(10) - -#p = subprocess.Popen ( [ 'mv', source, target ], -# stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, -# universal_newlines = False ) -#out, err = p.communicate() - # We never get here but clean up anyhow - - socket.close() - zmqContext.destroy() -