from __builtin__ import open, type __author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <manuela.kuhn@desy.de>' import time import argparse import zmq import os import logging import sys import json import traceback from multiprocessing import Process, freeze_support import subprocess import json DEFAULT_CHUNK_SIZE = 1048576 # # -------------------------- class: WorkerProcess -------------------------------------- # class WorkerProcess(): id = None dataStreamIp = None dataStreamPort = None logfileFullPath = None zmqContextForWorker = None zmqMessageChunkSize = None fileWaitTime_inMs = None fileMaxWaitTime_InMs= None def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize, fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0): self.id = id self.dataStreamIp = dataStreamIp self.dataStreamPort = dataStreamPort self.logfileFullPath = logfileFullPath self.zmqMessageChunkSize = chunkSize self.fileWaitTime_inMs = fileWaitTimeInMs self.fileMaxWaitTime_InMs = fileMaxWaitTimeInMs self.initLogging(logfileFullPath) try: self.process() except KeyboardInterrupt: # trace = traceback.format_exc() logging.debug("KeyboardInterrupt detected. Shutting down workerProcess.") self.zmqContextForWorker.destroy() else: trace = traceback.format_exc() logging.error("Stopping workerProcess due to unknown error condition.") logging.debug("Error was: " + str(trace)) def process(self): """ sends a 'ready' to a broker and receives a 'job' to process. The 'job' will be to pass the file of an fileEvent to the dataPipe. Why? -> the simulated "onClosed" event waits for a file for being not modified within a certain period of time. Instead of processing file after file the work will be spreaded to many workerThreads. So each thread can wait individual periods of time for a file without blocking new file events - as new file events will be handled by another workerThread. """ """ takes the fileEventMessage, reading and passing the new file to a separate data-messagePipe. Afterwards the original file will be removed. """ id = self.id dataStreamIp = self.dataStreamIp dataStreamPort = self.dataStreamPort logging.debug("new workerThread started. id=" + str(id)) #initialize router zmqContextForWorker = zmq.Context() self.zmqContextForWorker = zmqContextForWorker zmqDataStreamSocket = zmqContextForWorker.socket(zmq.PUSH) connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=dataStreamIp, port=dataStreamPort) zmqDataStreamSocket.connect(connectionStrDataStreamSocket) routerSocket = zmqContextForWorker.socket(zmq.REQ) routerSocket.identity = u"worker-{ID}".format(ID=id).encode("ascii") connectionStrRouterSocket = "tcp://{ip}:{port}".format(ip="127.0.0.1", port="50000") routerSocket.connect(connectionStrRouterSocket) processingJobs = True jobCount = 0 while processingJobs: #sending a "ready"-signal to the router. #the reply will contain the actual job/task. logging.debug("worker-"+str(id)+": sending ready signal") routerSocket.send(b"READY") # Get workload from router, until finished logging.debug("worker-"+str(id)+": waiting for new job") workload = routerSocket.recv() logging.debug("worker-"+str(id)+": new job received") finished = workload == b"END" if finished: processingJobs = False logging.debug("router requested to shutdown worker-thread. Worker processed: %d files" % jobCount) break jobCount += 1 #convert fileEventMessage back to a dictionary fileEventMessageDict = None try: fileEventMessageDict = json.loads(str(workload)) logging.debug("str(messageDict) = " + str(fileEventMessageDict) + " type(messageDict) = " + str(type(fileEventMessageDict))) except Exception, e: errorMessage = "Unable to convert message into a dictionary." logging.error(errorMessage) logging.debug("Error was: " + str(e)) #extract fileEvent metadata try: #TODO validate fileEventMessageDict dict filename = fileEventMessageDict["filename"] sourcePath = fileEventMessageDict["sourcePath"] relativeParent = fileEventMessageDict["relativeParent"] except Exception, e: errorMessage = "Invalid fileEvent message received." logging.error(errorMessage) logging.debug("Error was: " + str(e)) logging.debug("fileEventMessageDict=" + str(fileEventMessageDict)) #skip all further instructions and continue with next iteration continue #passing file to data-messagPipe try: logging.debug("worker-" + str(id) + ": passing new file to data-messagePipe...") self.passFileToDataStream(zmqDataStreamSocket, filename, sourcePath, relativeParent) logging.debug("worker-" + str(id) + ": passing new file to data-messagePipe...success.") except Exception, e: errorMessage = "Unable to pass new file to data-messagePipe." logging.error(errorMessage) logging.error("Error was: " + str(e)) logging.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.") #skip all further instructions and continue with next iteration continue def getFileWaitTimeInMs(self): waitTime = 2000.0 return waitTime def getFileMaxWaitTimeInMs(self): maxWaitTime = 10000.0 return maxWaitTime def passFileToDataStream(self, zmqDataStreamSocket, filename, sourcePath, relativeParent): """filesizeRequested == filesize submitted by file-event. In theory it can differ to real file size""" # filename = "img.tiff" # filepath = "C:\dir" # # --> sourceFilePathFull = 'C:\\dir\img.tiff' sourceFilePathFull = os.path.join(sourcePath, filename) #reading source file into memory try: #wait x seconds if file was modified within past y seconds fileWaitTimeInMs = self.getFileWaitTimeInMs() fileMaxWaitTimeInMs = self.getFileMaxWaitTimeInMs() fileIsStillInUse = True #true == still being written to file by a process timeStartWaiting = time.time() while fileIsStillInUse: #skip waiting periode if waiting to long for file to get closed if time.time() - timeStartWaiting >= (fileMaxWaitTimeInMs / 1000): logging.debug("waited to long for file getting closed. aborting") break #wait for other process to finish file access #grabs time when file was modified last statInfo = os.stat(sourceFilePathFull) fileLastModified = statInfo.st_mtime logging.debug("'" + str(sourceFilePathFull) + "' modified last: " + str(fileLastModified)) timeNow = time.time() timeDiff = timeNow - fileLastModified logging.debug("timeNow=" + str(timeNow) + " timeDiff=" + str(timeDiff)) waitTimeInSeconds = fileWaitTimeInMs/1000 if timeDiff >= waitTimeInSeconds: fileIsStillInUse = False logging.debug("File was not modified within past " + str(fileWaitTimeInMs) + "ms.") else: logging.debug("still waiting for file to get closed...") time.sleep(fileWaitTimeInMs / 1000 ) #for quick testing set filesize of file as chunksize logging.debug("get filesize for '" + str(sourceFilePathFull) + "'...") filesize = os.path.getsize(sourceFilePathFull) fileModificationTime = os.stat(sourceFilePathFull) chunksize = filesize #can be used later on to split multipart message logging.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize))) logging.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime))) except Exception, e: errorMessage = "Unable to get file metadata for '" + str(sourceFilePathFull) + "'." logging.error(errorMessage) logging.debug("Error was: " + str(e)) raise Exception(e) try: logging.debug("opening '" + str(sourceFilePathFull) + "'...") fileDescriptor = open(str(sourceFilePathFull), "rb") except Exception, e: errorMessage = "Unable to read source file '" + str(sourceFilePathFull) + "'." logging.error(errorMessage) logging.debug("Error was: " + str(e)) raise Exception(e) #build payload for message-pipe by putting source-file into a message try: payloadMetadata = self.buildPayloadMetadata(filename, filesize, fileModificationTime, sourcePath, relativeParent) except Exception, e: errorMessage = "Unable to assemble multi-part message." logging.error(errorMessage) logging.debug("Error was: " + str(e)) raise Exception(e) #send message try: logging.debug("Passing multipart-message...") chunkNumber = 0 stillChunksToRead = True while stillChunksToRead: chunkNumber += 1 #read next chunk from file fileContentAsByteObject = fileDescriptor.read(self.getChunkSize()) #detect if end of file has been reached if not fileContentAsByteObject: stillChunksToRead = False #as chunk is empty decrease chunck-counter chunkNumber -= 1 #assemble metadata for zmq-message chunkPayloadMetadata = payloadMetadata.copy() chunkPayloadMetadata["chunkNumber"] = chunkNumber chunkPayloadMetadataJson = json.dumps(chunkPayloadMetadata) chunkPayload = [] chunkPayload.append(chunkPayloadMetadataJson) chunkPayload.append(fileContentAsByteObject) #send to zmq pipe zmqDataStreamSocket.send_multipart(chunkPayload) #close file fileDescriptor.close() # zmqDataStreamSocket.send_multipart(multipartMessage) logging.debug("Passing multipart-message...done.") except Exception, e: logging.error("Unable to send multipart-message") logging.debug("Error was: " + str(e)) logging.info("Passing multipart-message...failed.") raise Exception(e) def appendFileChunksToPayload(self, payload, sourceFilePathFull, fileDescriptor, chunkSize): try: # chunksize = 16777216 #16MB logging.debug("reading file '" + str(sourceFilePathFull)+ "' to memory") # FIXME: chunk is read-out as str. why not as bin? will probably add to much overhead to zmq-message fileContentAsByteObject = fileDescriptor.read(chunkSize) while fileContentAsByteObject != "": payload.append(fileContentAsByteObject) fileContentAsByteObject = fileDescriptor.read(chunkSize) except Exception, e: raise Exception(str(e)) def buildPayloadMetadata(self, filename, filesize, fileModificationTime, sourcePath, relativeParent): """ builds metadata for zmq-multipart-message. should be used as first element for payload. :param filename: :param filesize: :param fileModificationTime: :param sourcePath: :param relativeParent: :return: """ #add metadata to multipart logging.debug("create metadata for source file...") metadataDict = { "filename" : filename, "filesize" : filesize, "fileModificationTime" : fileModificationTime, "sourcePath" : sourcePath, "relativeParent" : relativeParent, "chunkSize" : self.getChunkSize()} logging.debug("metadataDict = " + str(metadataDict)) return metadataDict def getChunkSize(self): return self.zmqMessageChunkSize def showFilesystemStatistics(self, vfsPath): statvfs = os.statvfs(vfsPath) totalSize = statvfs.f_frsize * statvfs.f_blocks freeBytes = statvfs.f_frsize * statvfs.f_bfree freeSpaceAvailableForUser = statvfs.f_frsize * statvfs.f_bavail #in bytes freeSpaceAvailableForUser_gigabytes = freeSpaceAvailableForUser / 1024 / 1024 / 1024 freeUserSpaceLeft_percent = ( float(freeBytes) / float(totalSize) ) * 100 # print "{number:.{digits}f}".format(number=freeUserSpaceLeft_percent, digits=0) # print int(freeUserSpaceLeft_percent) logging.debug("vfsstat: freeSpaceAvailableForUser=" + str(freeSpaceAvailableForUser_gigabytes)+ " Gigabytes " + " (" + str(int(freeUserSpaceLeft_percent)) + "% free disk space left)") #warn if disk space is running low highWaterMark = 85 if int(freeUserSpaceLeft_percent) >= int(highWaterMark): logging.warning("Running low in disk space! " + str(int(freeUserSpaceLeft_percent)) + "% free disk space left.") def initLogging(self, filenameFullPath): #@see https://docs.python.org/2/howto/logging-cookbook.html #log everything to file logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s', datefmt='%Y-%m-%d_%H:%M', filename=filenameFullPath, filemode="a") #log info to stdout, display messages with different format than the file output console = logging.StreamHandler() console.setLevel(logging.INFO) formatter = logging.Formatter("%(asctime)s > %(message)s") console.setFormatter(formatter) logging.getLogger("").addHandler(console) # # -------------------------- class: FileMover -------------------------------------- # class FileMover(): patterns = ["*"] fileList_newFiles = list() fileCount_newFiles = 0 zmqContext = None messageSocket = None # to receiver fileMove-jobs as json-encoded dictionary dataSocket = None # to send fileObject as multipart message bindingIpForSocket = None zqmFileEventServerIp = "127.0.0.1" # serverIp for incoming messages tcpPort_messageStream = "6060" dataStreamIp = "127.0.0.1" # ip of dataStream-socket to push new files to dataStreamPort = "6061" # port number of dataStream-socket to push new files to fileWaitTimeInMs = None fileMaxWaitTimeInMs = None pipe_name = "/tmp/zeromqllpipe_resp" currentZmqDataStreamSocketListIndex = None # Index-Number of a socket used to send datafiles to logfileFullPath = None chunkSize = None def process(self): try: self.startReceiving() except KeyboardInterrupt: logging.debug("KeyboardInterrupt detected. Shutting down fileMover.") logging.info("Shutting down fileMover as KeyboardInterrupt was detected.") self.zmqContext.destroy() else: logging.error("Unknown Error. Quitting.") logging.info("Stopping fileMover due to unknown error condition.") def __init__(self, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams, logfileFullPath, chunkSize, fileWaitTimeInMs, fileMaxWaitTimeInMs): logging.info("registering zmq global context") #create zmq context zmqContext = zmq.Context() self.zmqContext = zmqContext self.bindingIpForSocket = bindingIpForSocket self.tcpPort_messageStream = bindingPortForSocket self.dataStreamIp = dataStreamIp self.dataStreamPort = dataStreamPort self.parallelDataStreams = parallelDataStreams self.logfileFullPath = logfileFullPath self.chunkSize = chunkSize self.fileWaitTimeInMs = fileWaitTimeInMs self.fileMaxWaitTimeInMs = fileMaxWaitTimeInMs #create zmq sockets. one for incoming file events, one for passing fileObjects to self.messageSocket = self.getZmqSocket_Pull(self.zmqContext) self.dataSocket = self.getZmqSocket_Push(self.zmqContext) def getFileWaitTimeInMs(self): return self.fileWaitTimeInMs def getFileMaxWaitTimeInMs(self): return self.fileMaxWaitTimeInMs def startReceiving(self): #create socket zmqContext = self.zmqContext zmqSocketForNewFileEvents = self.createPullSocket() logging.debug("new message-socket crated for: new file events.") parallelDataStreams = int(self.parallelDataStreams) logging.debug("new message-socket crated for: passing file objects.") incomingMessageCounter = 0 #setting up router for load-balancing worker-threads. #each worker-thread will handle a file event routerSocket = self.zmqContext.socket(zmq.ROUTER) routerSocket.bind("tcp://127.0.0.1:50000") logging.debug("routerSocket started for 'tcp://127.0.0.1:50000'") #start worker-threads. each will have its own PushSocket. workerThreadList = list() numberOfWorkerThreads = parallelDataStreams fileWaitTimeInMs = self.getFileWaitTimeInMs() fileMaxWaitTimeInMs = self.getFileMaxWaitTimeInMs() for threadNumber in range(numberOfWorkerThreads): logging.debug("instantiate new workerProcess (nr " + str(threadNumber)) newWorkerThread = Process(target=WorkerProcess, args=(threadNumber, self.dataStreamIp, self.dataStreamPort, logfileFullPath, self.chunkSize, fileWaitTimeInMs, fileMaxWaitTimeInMs)) workerThreadList.append(newWorkerThread) logging.debug("start worker process nr " + str(threadNumber)) newWorkerThread.start() #run loop, and wait for incoming messages continueReceiving = True logging.debug("waiting for new fileEvent-messages") while continueReceiving: try: incomingMessage = zmqSocketForNewFileEvents.recv() logging.debug("new fileEvent-message received.") logging.debug("message content: " + str(incomingMessage)) incomingMessageCounter += 1 logging.debug("processFileEvent...") self.processFileEvent(incomingMessage, routerSocket) #TODO refactor as separate process to emphasize unblocking logging.debug("processFileEvent...done") except Exception, e: print "exception" logging.error("Failed to receive new fileEvent-message.") logging.error(sys.exc_info()) #TODO might using a error-count and threshold when to stop receiving, e.g. after 100 misses? # continueReceiving = False print "shutting down fileEvent-receiver..." try: logging.debug("shutting down zeromq...") self.stopReceiving(zmqSocketForNewFileEvents, zmqContext) logging.debug("shutting down zeromq...done.") except: logging.error(sys.exc_info()) logging.error("shutting down zeromq...failed.") def routeFileEventToWorkerThread(self, fileEventMessage, routerSocket): # LRU worker is next waiting in the queue logging.debug("waiting for available workerThread.") # address == "worker-0" # empty == "" # ready == "READY" address, empty, ready = routerSocket.recv_multipart() logging.debug("available workerThread detected.") logging.debug("passing job to workerThread...") routerSocket.send_multipart([ address, b'', fileEventMessage, ]) # inform lsyncd wrapper that the file can be moved # if not os.path.exists(self.pipe_name): # os.mkfifo(self.pipe_name) # messageToPipe = json.loads ( fileEventMessage ) # messageToPipe = messageToPipe["sourcePath"] + os.sep + messageToPipe["filename"] # my_cmd = 'echo "' + messageToPipe + '" > ' + self.pipe_name # p = subprocess.Popen ( my_cmd, shell=True ) # p.communicate() logging.debug("passing job to workerThread...done.") def processFileEvent(self, fileEventMessage, routerSocket): self.routeFileEventToWorkerThread(fileEventMessage, routerSocket) def stopReceiving(self, zmqSocket, msgContext): try: logging.debug("closing zmqSocket...") zmqSocket.close() logging.debug("closing zmqSocket...done.") except: logging.debug("closing zmqSocket...failed.") logging.error(sys.exc_info()) try: logging.debug("closing zmqContext...") msgContext.destroy() logging.debug("closing zmqContext...done.") except: logging.debug("closing zmqContext...failed.") logging.error(sys.exc_info()) def getZmqSocket_Pull(self, context): pattern_pull = zmq.PULL assert isinstance(context, zmq.sugar.context.Context) socket = context.socket(pattern_pull) return socket def getZmqSocket_Push(self, context): pattern = zmq.PUSH assert isinstance(context, zmq.sugar.context.Context) socket = context.socket(pattern) return socket def createPullSocket(self): #get default message-socket socket = self.messageSocket logging.info("binding to message socket: tcp://" + self.bindingIpForSocket + ":%s" % self.tcpPort_messageStream) socket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.tcpPort_messageStream) return socket def argumentParsing(): parser = argparse.ArgumentParser() parser.add_argument("--logfilePath" , type=str, help="path where logfile will be created", default="/tmp/log/") parser.add_argument("--logfileName" , type=str, help="filename used for logging", default="fileMover.log") parser.add_argument("--bindingIpForSocket" , type=str, help="local ip to bind to", default="127.0.0.1") parser.add_argument("--bindingPortForSocket", type=str, help="local port to bind to", default="6060") parser.add_argument("--dataStreamIp" , type=str, help="ip of dataStream-socket to push new files to", default="127.0.0.1") parser.add_argument("--dataStreamPort", type=str, help="port number of dataStream-socket to push new files to", default="6061") parser.add_argument("--parallelDataStreams", type=int, help="number of parallel data streams. default is 1", default="1") parser.add_argument("--chunkSize", type=int, help="chunk size of file-parts getting send via zmq", default=DEFAULT_CHUNK_SIZE) parser.add_argument("--verbose" , help="more verbose output", action="store_true") parser.add_argument("--fileWaitTimeInMs", type=int, help=argparse.SUPPRESS, default=2000) parser.add_argument("--fileMaxWaitTimeInMs", type=int, help=argparse.SUPPRESS, default=10000) arguments = parser.parse_args() return arguments def checkFolderForExistance(watchFolderPath): """ abort if watch-folder does not exist :return: """ #check folder path for existance. exits if it does not exist if not os.path.exists(watchFolderPath): logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath)) sys.exit(1) def checkLogfileFolder(logfilePath): """ abort if watch-folder does not exist :return: """ #check folder path for existance. exits if it does not exist if not os.path.exists(logfilePath): logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath)) sys.exit(1) def initLogging(filenameFullPath, verbose): #@see https://docs.python.org/2/howto/logging-cookbook.html #more detailed logging if verbose-option has been set loggingLevel = logging.INFO if verbose: loggingLevel = logging.DEBUG #log everything to file logging.basicConfig(level=loggingLevel, format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s', datefmt='%Y-%m-%d_%H:%M:%S', filename=filenameFullPath, filemode="a") #log info to stdout, display messages with different format than the file output console = logging.StreamHandler() console.setLevel(logging.WARNING) formatter = logging.Formatter("%(asctime)s > %(message)s") console.setFormatter(formatter) logging.getLogger("").addHandler(console) if __name__ == '__main__': freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows arguments = argumentParsing() logFile = str(arguments.logfilePath) + "/" + str(arguments.logfileName) bindingIpForSocket = str(arguments.bindingIpForSocket) bindingPortForSocket = str(arguments.bindingPortForSocket) dataStreamIp = str(arguments.dataStreamIp) dataStreamPort = str(arguments.dataStreamPort) logfilePath = str(arguments.logfilePath) logfileName = str(arguments.logfileName) parallelDataStreams = str(arguments.parallelDataStreams) chunkSize = arguments.chunkSize verbose = arguments.verbose logfileFullPath = os.path.join(logfilePath, logfileName) fileWaitTimeInMs = float(arguments.fileWaitTimeInMs) fileMaxWaitTimeInMs = float(arguments.fileMaxWaitTimeInMs) #enable logging initLogging(logfileFullPath, verbose) #start new fileMover # try: fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams, logfileFullPath, chunkSize, fileWaitTimeInMs, fileMaxWaitTimeInMs) fileMover.process() # except KeyboardInterrupt, ke: # print "keyboardInterrupt detected." # except Exception, e: # print "unknown exception detected."