-
Manuela Kuhn authoredManuela Kuhn authored
fileMover.py 27.71 KiB
from __builtin__ import open, type
__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <manuela.kuhn@desy.de>'
import time
import argparse
import zmq
import os
import logging
import sys
import json
import traceback
from multiprocessing import Process, freeze_support
import subprocess
import json
DEFAULT_CHUNK_SIZE = 1048576
#
# -------------------------- class: WorkerProcess --------------------------------------
#
class WorkerProcess():
id = None
dataStreamIp = None
dataStreamPort = None
logfileFullPath = None
zmqContextForWorker = None
zmqMessageChunkSize = None
fileWaitTime_inMs = None
fileMaxWaitTime_InMs= None
def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize,
fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0):
self.id = id
self.dataStreamIp = dataStreamIp
self.dataStreamPort = dataStreamPort
self.logfileFullPath = logfileFullPath
self.zmqMessageChunkSize = chunkSize
self.fileWaitTime_inMs = fileWaitTimeInMs
self.fileMaxWaitTime_InMs = fileMaxWaitTimeInMs
self.initLogging(logfileFullPath)
try:
self.process()
except KeyboardInterrupt:
# trace = traceback.format_exc()
logging.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
self.zmqContextForWorker.destroy()
else:
trace = traceback.format_exc()
logging.error("Stopping workerProcess due to unknown error condition.")
logging.debug("Error was: " + str(trace))
def process(self):
"""
sends a 'ready' to a broker and receives a 'job' to process.
The 'job' will be to pass the file of an fileEvent to the
dataPipe.
Why?
-> the simulated "onClosed" event waits for a file for being
not modified within a certain period of time.
Instead of processing file after file the work will be
spreaded to many workerThreads. So each thread can wait
individual periods of time for a file without blocking
new file events - as new file events will be handled by
another workerThread.
"""
"""
takes the fileEventMessage, reading and passing the new file to
a separate data-messagePipe. Afterwards the original file
will be removed.
"""
id = self.id
dataStreamIp = self.dataStreamIp
dataStreamPort = self.dataStreamPort
logging.debug("new workerThread started. id=" + str(id))
#initialize router
zmqContextForWorker = zmq.Context()
self.zmqContextForWorker = zmqContextForWorker
zmqDataStreamSocket = zmqContextForWorker.socket(zmq.PUSH)
connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=dataStreamIp, port=dataStreamPort)
zmqDataStreamSocket.connect(connectionStrDataStreamSocket)
routerSocket = zmqContextForWorker.socket(zmq.REQ)
routerSocket.identity = u"worker-{ID}".format(ID=id).encode("ascii")
connectionStrRouterSocket = "tcp://{ip}:{port}".format(ip="127.0.0.1", port="50000")
routerSocket.connect(connectionStrRouterSocket)
processingJobs = True
jobCount = 0
while processingJobs:
#sending a "ready"-signal to the router.
#the reply will contain the actual job/task.
logging.debug("worker-"+str(id)+": sending ready signal")
routerSocket.send(b"READY")
# Get workload from router, until finished
logging.debug("worker-"+str(id)+": waiting for new job")
workload = routerSocket.recv()
logging.debug("worker-"+str(id)+": new job received")
finished = workload == b"END"
if finished:
processingJobs = False
logging.debug("router requested to shutdown worker-thread. Worker processed: %d files" % jobCount)
break
jobCount += 1
#convert fileEventMessage back to a dictionary
fileEventMessageDict = None
try:
fileEventMessageDict = json.loads(str(workload))
logging.debug("str(messageDict) = " + str(fileEventMessageDict) + " type(messageDict) = " + str(type(fileEventMessageDict)))
except Exception, e:
errorMessage = "Unable to convert message into a dictionary."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
#extract fileEvent metadata
try:
#TODO validate fileEventMessageDict dict
filename = fileEventMessageDict["filename"]
sourcePath = fileEventMessageDict["sourcePath"]
relativeParent = fileEventMessageDict["relativeParent"]
except Exception, e:
errorMessage = "Invalid fileEvent message received."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.debug("fileEventMessageDict=" + str(fileEventMessageDict))
#skip all further instructions and continue with next iteration
continue
#passing file to data-messagPipe
try:
logging.debug("worker-" + str(id) + ": passing new file to data-messagePipe...")
self.passFileToDataStream(zmqDataStreamSocket, filename, sourcePath, relativeParent)
logging.debug("worker-" + str(id) + ": passing new file to data-messagePipe...success.")
except Exception, e:
errorMessage = "Unable to pass new file to data-messagePipe."
logging.error(errorMessage)
logging.error("Error was: " + str(e))
logging.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.")
#skip all further instructions and continue with next iteration
continue
def getFileWaitTimeInMs(self):
waitTime = 2000.0
return waitTime
def getFileMaxWaitTimeInMs(self):
maxWaitTime = 10000.0
return maxWaitTime
def passFileToDataStream(self, zmqDataStreamSocket, filename, sourcePath, relativeParent):
"""filesizeRequested == filesize submitted by file-event. In theory it can differ to real file size"""
# filename = "img.tiff"
# filepath = "C:\dir"
#
# --> sourceFilePathFull = 'C:\\dir\img.tiff'
sourceFilePathFull = os.path.join(sourcePath, filename)
#reading source file into memory
try:
#wait x seconds if file was modified within past y seconds
fileWaitTimeInMs = self.getFileWaitTimeInMs()
fileMaxWaitTimeInMs = self.getFileMaxWaitTimeInMs()
fileIsStillInUse = True #true == still being written to file by a process
timeStartWaiting = time.time()
while fileIsStillInUse:
#skip waiting periode if waiting to long for file to get closed
if time.time() - timeStartWaiting >= (fileMaxWaitTimeInMs / 1000):
logging.debug("waited to long for file getting closed. aborting")
break
#wait for other process to finish file access
#grabs time when file was modified last
statInfo = os.stat(sourceFilePathFull)
fileLastModified = statInfo.st_mtime
logging.debug("'" + str(sourceFilePathFull) + "' modified last: " + str(fileLastModified))
timeNow = time.time()
timeDiff = timeNow - fileLastModified
logging.debug("timeNow=" + str(timeNow) + " timeDiff=" + str(timeDiff))
waitTimeInSeconds = fileWaitTimeInMs/1000
if timeDiff >= waitTimeInSeconds:
fileIsStillInUse = False
logging.debug("File was not modified within past " + str(fileWaitTimeInMs) + "ms.")
else:
logging.debug("still waiting for file to get closed...")
time.sleep(fileWaitTimeInMs / 1000 )
#for quick testing set filesize of file as chunksize
logging.debug("get filesize for '" + str(sourceFilePathFull) + "'...")
filesize = os.path.getsize(sourceFilePathFull)
fileModificationTime = os.stat(sourceFilePathFull)
chunksize = filesize #can be used later on to split multipart message
logging.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize)))
logging.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime)))
except Exception, e:
errorMessage = "Unable to get file metadata for '" + str(sourceFilePathFull) + "'."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
raise Exception(e)
try:
logging.debug("opening '" + str(sourceFilePathFull) + "'...")
fileDescriptor = open(str(sourceFilePathFull), "rb")
except Exception, e:
errorMessage = "Unable to read source file '" + str(sourceFilePathFull) + "'."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
raise Exception(e)
#build payload for message-pipe by putting source-file into a message
try:
payloadMetadata = self.buildPayloadMetadata(filename, filesize, fileModificationTime, sourcePath, relativeParent)
except Exception, e:
errorMessage = "Unable to assemble multi-part message."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
raise Exception(e)
#send message
try:
logging.debug("Passing multipart-message...")
chunkNumber = 0
stillChunksToRead = True
while stillChunksToRead:
chunkNumber += 1
#read next chunk from file
fileContentAsByteObject = fileDescriptor.read(self.getChunkSize())
#detect if end of file has been reached
if not fileContentAsByteObject:
stillChunksToRead = False
#as chunk is empty decrease chunck-counter
chunkNumber -= 1
#assemble metadata for zmq-message
chunkPayloadMetadata = payloadMetadata.copy()
chunkPayloadMetadata["chunkNumber"] = chunkNumber
chunkPayloadMetadataJson = json.dumps(chunkPayloadMetadata)
chunkPayload = []
chunkPayload.append(chunkPayloadMetadataJson)
chunkPayload.append(fileContentAsByteObject)
#send to zmq pipe
zmqDataStreamSocket.send_multipart(chunkPayload)
#close file
fileDescriptor.close()
# zmqDataStreamSocket.send_multipart(multipartMessage)
logging.debug("Passing multipart-message...done.")
except Exception, e:
logging.error("Unable to send multipart-message")
logging.debug("Error was: " + str(e))
logging.info("Passing multipart-message...failed.")
raise Exception(e)
def appendFileChunksToPayload(self, payload, sourceFilePathFull, fileDescriptor, chunkSize):
try:
# chunksize = 16777216 #16MB
logging.debug("reading file '" + str(sourceFilePathFull)+ "' to memory")
# FIXME: chunk is read-out as str. why not as bin? will probably add to much overhead to zmq-message
fileContentAsByteObject = fileDescriptor.read(chunkSize)
while fileContentAsByteObject != "":
payload.append(fileContentAsByteObject)
fileContentAsByteObject = fileDescriptor.read(chunkSize)
except Exception, e:
raise Exception(str(e))
def buildPayloadMetadata(self, filename, filesize, fileModificationTime, sourcePath, relativeParent):
"""
builds metadata for zmq-multipart-message. should be used as first element for payload.
:param filename:
:param filesize:
:param fileModificationTime:
:param sourcePath:
:param relativeParent:
:return:
"""
#add metadata to multipart
logging.debug("create metadata for source file...")
metadataDict = {
"filename" : filename,
"filesize" : filesize,
"fileModificationTime" : fileModificationTime,
"sourcePath" : sourcePath,
"relativeParent" : relativeParent,
"chunkSize" : self.getChunkSize()}
logging.debug("metadataDict = " + str(metadataDict))
return metadataDict
def getChunkSize(self):
return self.zmqMessageChunkSize
def showFilesystemStatistics(self, vfsPath):
statvfs = os.statvfs(vfsPath)
totalSize = statvfs.f_frsize * statvfs.f_blocks
freeBytes = statvfs.f_frsize * statvfs.f_bfree
freeSpaceAvailableForUser = statvfs.f_frsize * statvfs.f_bavail #in bytes
freeSpaceAvailableForUser_gigabytes = freeSpaceAvailableForUser / 1024 / 1024 / 1024
freeUserSpaceLeft_percent = ( float(freeBytes) / float(totalSize) ) * 100
# print "{number:.{digits}f}".format(number=freeUserSpaceLeft_percent, digits=0)
# print int(freeUserSpaceLeft_percent)
logging.debug("vfsstat: freeSpaceAvailableForUser=" + str(freeSpaceAvailableForUser_gigabytes)+ " Gigabytes "
+ " (" + str(int(freeUserSpaceLeft_percent)) + "% free disk space left)")
#warn if disk space is running low
highWaterMark = 85
if int(freeUserSpaceLeft_percent) >= int(highWaterMark):
logging.warning("Running low in disk space! " + str(int(freeUserSpaceLeft_percent)) + "% free disk space left.")
def initLogging(self, filenameFullPath):
#@see https://docs.python.org/2/howto/logging-cookbook.html
#log everything to file
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M',
filename=filenameFullPath,
filemode="a")
#log info to stdout, display messages with different format than the file output
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
#
# -------------------------- class: FileMover --------------------------------------
#
class FileMover():
patterns = ["*"]
fileList_newFiles = list()
fileCount_newFiles = 0
zmqContext = None
messageSocket = None # to receiver fileMove-jobs as json-encoded dictionary
dataSocket = None # to send fileObject as multipart message
bindingIpForSocket = None
zqmFileEventServerIp = "127.0.0.1" # serverIp for incoming messages
tcpPort_messageStream = "6060"
dataStreamIp = "127.0.0.1" # ip of dataStream-socket to push new files to
dataStreamPort = "6061" # port number of dataStream-socket to push new files to
fileWaitTimeInMs = None
fileMaxWaitTimeInMs = None
pipe_name = "/tmp/zeromqllpipe_resp"
currentZmqDataStreamSocketListIndex = None # Index-Number of a socket used to send datafiles to
logfileFullPath = None
chunkSize = None
def process(self):
try:
self.startReceiving()
except KeyboardInterrupt:
logging.debug("KeyboardInterrupt detected. Shutting down fileMover.")
logging.info("Shutting down fileMover as KeyboardInterrupt was detected.")
self.zmqContext.destroy()
else:
logging.error("Unknown Error. Quitting.")
logging.info("Stopping fileMover due to unknown error condition.")
def __init__(self, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams,
logfileFullPath, chunkSize,
fileWaitTimeInMs, fileMaxWaitTimeInMs):
logging.info("registering zmq global context")
#create zmq context
zmqContext = zmq.Context()
self.zmqContext = zmqContext
self.bindingIpForSocket = bindingIpForSocket
self.tcpPort_messageStream = bindingPortForSocket
self.dataStreamIp = dataStreamIp
self.dataStreamPort = dataStreamPort
self.parallelDataStreams = parallelDataStreams
self.logfileFullPath = logfileFullPath
self.chunkSize = chunkSize
self.fileWaitTimeInMs = fileWaitTimeInMs
self.fileMaxWaitTimeInMs = fileMaxWaitTimeInMs
#create zmq sockets. one for incoming file events, one for passing fileObjects to
self.messageSocket = self.getZmqSocket_Pull(self.zmqContext)
self.dataSocket = self.getZmqSocket_Push(self.zmqContext)
def getFileWaitTimeInMs(self):
return self.fileWaitTimeInMs
def getFileMaxWaitTimeInMs(self):
return self.fileMaxWaitTimeInMs
def startReceiving(self):
#create socket
zmqContext = self.zmqContext
zmqSocketForNewFileEvents = self.createPullSocket()
logging.debug("new message-socket crated for: new file events.")
parallelDataStreams = int(self.parallelDataStreams)
logging.debug("new message-socket crated for: passing file objects.")
incomingMessageCounter = 0
#setting up router for load-balancing worker-threads.
#each worker-thread will handle a file event
routerSocket = self.zmqContext.socket(zmq.ROUTER)
routerSocket.bind("tcp://127.0.0.1:50000")
logging.debug("routerSocket started for 'tcp://127.0.0.1:50000'")
#start worker-threads. each will have its own PushSocket.
workerThreadList = list()
numberOfWorkerThreads = parallelDataStreams
fileWaitTimeInMs = self.getFileWaitTimeInMs()
fileMaxWaitTimeInMs = self.getFileMaxWaitTimeInMs()
for threadNumber in range(numberOfWorkerThreads):
logging.debug("instantiate new workerProcess (nr " + str(threadNumber))
newWorkerThread = Process(target=WorkerProcess, args=(threadNumber,
self.dataStreamIp,
self.dataStreamPort,
logfileFullPath,
self.chunkSize,
fileWaitTimeInMs,
fileMaxWaitTimeInMs))
workerThreadList.append(newWorkerThread)
logging.debug("start worker process nr " + str(threadNumber))
newWorkerThread.start()
#run loop, and wait for incoming messages
continueReceiving = True
logging.debug("waiting for new fileEvent-messages")
while continueReceiving:
try:
incomingMessage = zmqSocketForNewFileEvents.recv()
logging.debug("new fileEvent-message received.")
logging.debug("message content: " + str(incomingMessage))
incomingMessageCounter += 1
logging.debug("processFileEvent...")
self.processFileEvent(incomingMessage, routerSocket) #TODO refactor as separate process to emphasize unblocking
logging.debug("processFileEvent...done")
except Exception, e:
print "exception"
logging.error("Failed to receive new fileEvent-message.")
logging.error(sys.exc_info())
#TODO might using a error-count and threshold when to stop receiving, e.g. after 100 misses?
# continueReceiving = False
print "shutting down fileEvent-receiver..."
try:
logging.debug("shutting down zeromq...")
self.stopReceiving(zmqSocketForNewFileEvents, zmqContext)
logging.debug("shutting down zeromq...done.")
except:
logging.error(sys.exc_info())
logging.error("shutting down zeromq...failed.")
def routeFileEventToWorkerThread(self, fileEventMessage, routerSocket):
# LRU worker is next waiting in the queue
logging.debug("waiting for available workerThread.")
# address == "worker-0"
# empty == ""
# ready == "READY"
address, empty, ready = routerSocket.recv_multipart()
logging.debug("available workerThread detected.")
logging.debug("passing job to workerThread...")
routerSocket.send_multipart([
address,
b'',
fileEventMessage,
])
# inform lsyncd wrapper that the file can be moved
# if not os.path.exists(self.pipe_name):
# os.mkfifo(self.pipe_name)
# messageToPipe = json.loads ( fileEventMessage )
# messageToPipe = messageToPipe["sourcePath"] + os.sep + messageToPipe["filename"]
# my_cmd = 'echo "' + messageToPipe + '" > ' + self.pipe_name
# p = subprocess.Popen ( my_cmd, shell=True )
# p.communicate()
logging.debug("passing job to workerThread...done.")
def processFileEvent(self, fileEventMessage, routerSocket):
self.routeFileEventToWorkerThread(fileEventMessage, routerSocket)
def stopReceiving(self, zmqSocket, msgContext):
try:
logging.debug("closing zmqSocket...")
zmqSocket.close()
logging.debug("closing zmqSocket...done.")
except:
logging.debug("closing zmqSocket...failed.")
logging.error(sys.exc_info())
try:
logging.debug("closing zmqContext...")
msgContext.destroy()
logging.debug("closing zmqContext...done.")
except:
logging.debug("closing zmqContext...failed.")
logging.error(sys.exc_info())
def getZmqSocket_Pull(self, context):
pattern_pull = zmq.PULL
assert isinstance(context, zmq.sugar.context.Context)
socket = context.socket(pattern_pull)
return socket
def getZmqSocket_Push(self, context):
pattern = zmq.PUSH
assert isinstance(context, zmq.sugar.context.Context)
socket = context.socket(pattern)
return socket
def createPullSocket(self):
#get default message-socket
socket = self.messageSocket
logging.info("binding to message socket: tcp://" + self.bindingIpForSocket + ":%s" % self.tcpPort_messageStream)
socket.bind('tcp://' + self.bindingIpForSocket + ':%s' % self.tcpPort_messageStream)
return socket
def argumentParsing():
parser = argparse.ArgumentParser()
parser.add_argument("--logfilePath" , type=str, help="path where logfile will be created", default="/tmp/log/")
parser.add_argument("--logfileName" , type=str, help="filename used for logging", default="fileMover.log")
parser.add_argument("--bindingIpForSocket" , type=str, help="local ip to bind to", default="127.0.0.1")
parser.add_argument("--bindingPortForSocket", type=str, help="local port to bind to", default="6060")
parser.add_argument("--dataStreamIp" , type=str, help="ip of dataStream-socket to push new files to", default="127.0.0.1")
parser.add_argument("--dataStreamPort", type=str, help="port number of dataStream-socket to push new files to", default="6061")
parser.add_argument("--parallelDataStreams", type=int, help="number of parallel data streams. default is 1", default="1")
parser.add_argument("--chunkSize", type=int, help="chunk size of file-parts getting send via zmq", default=DEFAULT_CHUNK_SIZE)
parser.add_argument("--verbose" , help="more verbose output", action="store_true")
parser.add_argument("--fileWaitTimeInMs", type=int, help=argparse.SUPPRESS, default=2000)
parser.add_argument("--fileMaxWaitTimeInMs", type=int, help=argparse.SUPPRESS, default=10000)
arguments = parser.parse_args()
return arguments
def checkFolderForExistance(watchFolderPath):
"""
abort if watch-folder does not exist
:return:
"""
#check folder path for existance. exits if it does not exist
if not os.path.exists(watchFolderPath):
logging.error("WatchFolder '%s' does not exist. Abort." % str(watchFolderPath))
sys.exit(1)
def checkLogfileFolder(logfilePath):
"""
abort if watch-folder does not exist
:return:
"""
#check folder path for existance. exits if it does not exist
if not os.path.exists(logfilePath):
logging.error("LogfileFilder '%s' does not exist. Abort." % str(logfilePath))
sys.exit(1)
def initLogging(filenameFullPath, verbose):
#@see https://docs.python.org/2/howto/logging-cookbook.html
#more detailed logging if verbose-option has been set
loggingLevel = logging.INFO
if verbose:
loggingLevel = logging.DEBUG
#log everything to file
logging.basicConfig(level=loggingLevel,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=filenameFullPath,
filemode="a")
#log info to stdout, display messages with different format than the file output
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
if __name__ == '__main__':
freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows
arguments = argumentParsing()
logFile = str(arguments.logfilePath) + "/" + str(arguments.logfileName)
bindingIpForSocket = str(arguments.bindingIpForSocket)
bindingPortForSocket = str(arguments.bindingPortForSocket)
dataStreamIp = str(arguments.dataStreamIp)
dataStreamPort = str(arguments.dataStreamPort)
logfilePath = str(arguments.logfilePath)
logfileName = str(arguments.logfileName)
parallelDataStreams = str(arguments.parallelDataStreams)
chunkSize = arguments.chunkSize
verbose = arguments.verbose
logfileFullPath = os.path.join(logfilePath, logfileName)
fileWaitTimeInMs = float(arguments.fileWaitTimeInMs)
fileMaxWaitTimeInMs = float(arguments.fileMaxWaitTimeInMs)
#enable logging
initLogging(logfileFullPath, verbose)
#start new fileMover
# try:
fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort,
parallelDataStreams, logfileFullPath, chunkSize,
fileWaitTimeInMs, fileMaxWaitTimeInMs)
fileMover.process()
# except KeyboardInterrupt, ke:
# print "keyboardInterrupt detected."
# except Exception, e:
# print "unknown exception detected."