Skip to content
Snippets Groups Projects
Commit 90e81ef2 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Removed waiting time for file to get closed

parent 49f94fb3
No related branches found
No related tags found
No related merge requests found
...@@ -35,8 +35,6 @@ class defaultConfigSender(): ...@@ -35,8 +35,6 @@ class defaultConfigSender():
chunkSize = 1048576 # = 1024*1024 chunkSize = 1048576 # = 1024*1024
#chunkSize = 1073741824 # = 1024*1024*1024 #chunkSize = 1073741824 # = 1024*1024*1024
fileWaitTimeInMs = 2000
fileMaxWaitTimeInMs = 10000
# # path where logfile will be created # # path where logfile will be created
# if helperScript.isWindows(): # if helperScript.isWindows():
......
...@@ -35,8 +35,6 @@ class WorkerProcess(): ...@@ -35,8 +35,6 @@ class WorkerProcess():
dataStreamPort = None dataStreamPort = None
zmqContextForWorker = None zmqContextForWorker = None
zmqMessageChunkSize = None zmqMessageChunkSize = None
fileWaitTime_inMs = None
fileMaxWaitTime_InMs = None
zmqCleanerIp = None # responsable to delete files zmqCleanerIp = None # responsable to delete files
zmqCleanerPort = None # responsable to delete files zmqCleanerPort = None # responsable to delete files
zmqDataStreamSocket = None zmqDataStreamSocket = None
...@@ -47,7 +45,6 @@ class WorkerProcess(): ...@@ -47,7 +45,6 @@ class WorkerProcess():
log = None log = None
def __init__(self, id, dataStreamIp, dataStreamPort, chunkSize, zmqCleanerIp, zmqCleanerPort, def __init__(self, id, dataStreamIp, dataStreamPort, chunkSize, zmqCleanerIp, zmqCleanerPort,
fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0,
context = None): context = None):
self.id = id self.id = id
self.dataStreamIp = dataStreamIp self.dataStreamIp = dataStreamIp
...@@ -55,8 +52,6 @@ class WorkerProcess(): ...@@ -55,8 +52,6 @@ class WorkerProcess():
self.zmqMessageChunkSize = chunkSize self.zmqMessageChunkSize = chunkSize
self.zmqCleanerIp = zmqCleanerIp self.zmqCleanerIp = zmqCleanerIp
self.zmqCleanerPort = zmqCleanerPort self.zmqCleanerPort = zmqCleanerPort
self.fileWaitTime_inMs = fileWaitTimeInMs
self.fileMaxWaitTime_InMs = fileMaxWaitTimeInMs
#initialize router #initialize router
self.zmqContextForWorker = context or zmq.Context() self.zmqContextForWorker = context or zmq.Context()
...@@ -223,34 +218,6 @@ class WorkerProcess(): ...@@ -223,34 +218,6 @@ class WorkerProcess():
#reading source file into memory #reading source file into memory
try: try:
#wait x seconds if file was modified within past y seconds
fileWaitTimeInMs = self.getFileWaitTimeInMs()
fileMaxWaitTimeInMs = self.getFileMaxWaitTimeInMs()
fileIsStillInUse = False #true == still being written to file by a process
# fileIsStillInUse = True #true == still being written to file by a process
timeStartWaiting = time.time()
while fileIsStillInUse:
#skip waiting periode if waiting to long for file to get closed
if time.time() - timeStartWaiting >= (fileMaxWaitTimeInMs / 1000):
self.log.debug("waited to long for file getting closed. aborting")
break
#wait for other process to finish file access
#grabs time when file was modified last
statInfo = os.stat(sourceFilePathFull)
fileLastModified = statInfo.st_mtime
self.log.debug("'" + str(sourceFilePathFull) + "' modified last: " + str(fileLastModified))
timeNow = time.time()
timeDiff = timeNow - fileLastModified
self.log.debug("timeNow=" + str(timeNow) + " timeDiff=" + str(timeDiff))
waitTimeInSeconds = fileWaitTimeInMs/1000
if timeDiff >= waitTimeInSeconds:
fileIsStillInUse = False
self.log.debug("File was not modified within past " + str(fileWaitTimeInMs) + "ms.")
else:
self.log.debug("still waiting for file to get closed...")
time.sleep(fileWaitTimeInMs / 1000 )
#for quick testing set filesize of file as chunksize #for quick testing set filesize of file as chunksize
self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...") self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...")
filesize = os.path.getsize(sourceFilePathFull) filesize = os.path.getsize(sourceFilePathFull)
...@@ -416,7 +383,6 @@ class FileMover(): ...@@ -416,7 +383,6 @@ class FileMover():
dataStreamPort = "6061" # port number of dataStream-socket to push new files to dataStreamPort = "6061" # port number of dataStream-socket to push new files to
zmqCleanerIp = "127.0.0.1" # zmq pull endpoint, responsable to delete files zmqCleanerIp = "127.0.0.1" # zmq pull endpoint, responsable to delete files
zmqCleanerPort = "6062" # zmq pull endpoint, responsable to delete files zmqCleanerPort = "6062" # zmq pull endpoint, responsable to delete files
fileWaitTimeInMs = None
fileMaxWaitTimeInMs = None fileMaxWaitTimeInMs = None
parallelDataStreams = None parallelDataStreams = None
chunkSize = None chunkSize = None
...@@ -431,7 +397,6 @@ class FileMover(): ...@@ -431,7 +397,6 @@ class FileMover():
def __init__(self, fileEventIp, fileEventPort, dataStreamIp, dataStreamPort, parallelDataStreams, def __init__(self, fileEventIp, fileEventPort, dataStreamIp, dataStreamPort, parallelDataStreams,
chunkSize, zmqCleanerIp, zmqCleanerPort, chunkSize, zmqCleanerIp, zmqCleanerPort,
fileWaitTimeInMs, fileMaxWaitTimeInMs,
context = None): context = None):
assert isinstance(context, zmq.sugar.context.Context) assert isinstance(context, zmq.sugar.context.Context)
...@@ -445,8 +410,6 @@ class FileMover(): ...@@ -445,8 +410,6 @@ class FileMover():
self.chunkSize = chunkSize self.chunkSize = chunkSize
self.zmqCleanerIp = zmqCleanerIp self.zmqCleanerIp = zmqCleanerIp
self.zmqCleanerPort = zmqCleanerPort self.zmqCleanerPort = zmqCleanerPort
self.fileWaitTimeInMs = fileWaitTimeInMs
self.fileMaxWaitTimeInMs = fileMaxWaitTimeInMs
self.log = self.getLogger() self.log = self.getLogger()
...@@ -488,14 +451,6 @@ class FileMover(): ...@@ -488,14 +451,6 @@ class FileMover():
return logger return logger
def getFileWaitTimeInMs(self):
return self.fileWaitTimeInMs
def getFileMaxWaitTimeInMs(self):
return self.fileMaxWaitTimeInMs
def startReceiving(self): def startReceiving(self):
self.log.debug("new message-socket crated for: new file events.") self.log.debug("new message-socket crated for: new file events.")
parallelDataStreams = int(self.parallelDataStreams) parallelDataStreams = int(self.parallelDataStreams)
...@@ -507,8 +462,6 @@ class FileMover(): ...@@ -507,8 +462,6 @@ class FileMover():
#start worker-threads. each will have its own PushSocket. #start worker-threads. each will have its own PushSocket.
workerThreadList = list() workerThreadList = list()
numberOfWorkerThreads = parallelDataStreams numberOfWorkerThreads = parallelDataStreams
fileWaitTimeInMs = self.getFileWaitTimeInMs()
fileMaxWaitTimeInMs = self.getFileMaxWaitTimeInMs()
for threadNumber in range(numberOfWorkerThreads): for threadNumber in range(numberOfWorkerThreads):
self.log.debug("instantiate new workerProcess (nr " + str(threadNumber) + " )") self.log.debug("instantiate new workerProcess (nr " + str(threadNumber) + " )")
newWorkerThread = Process(target=WorkerProcess, args=(threadNumber, newWorkerThread = Process(target=WorkerProcess, args=(threadNumber,
...@@ -516,9 +469,7 @@ class FileMover(): ...@@ -516,9 +469,7 @@ class FileMover():
self.dataStreamPort, self.dataStreamPort,
self.chunkSize, self.chunkSize,
self.zmqCleanerIp, self.zmqCleanerIp,
self.zmqCleanerPort, self.zmqCleanerPort))
fileWaitTimeInMs,
fileMaxWaitTimeInMs))
workerThreadList.append(newWorkerThread) workerThreadList.append(newWorkerThread)
self.log.debug("start worker process nr " + str(threadNumber)) self.log.debug("start worker process nr " + str(threadNumber))
...@@ -595,9 +546,6 @@ def argumentParsing(): ...@@ -595,9 +546,6 @@ def argumentParsing():
parser.add_argument("--parallelDataStreams", type=int, default=defConf.parallelDataStreams, help="number of parallel data streams (default=" + str(defConf.parallelDataStreams) + ")") parser.add_argument("--parallelDataStreams", type=int, default=defConf.parallelDataStreams, help="number of parallel data streams (default=" + str(defConf.parallelDataStreams) + ")")
parser.add_argument("--chunkSize" , type=int, default=defConf.chunkSize , help="chunk size of file-parts getting send via zmq (default=" + str(defConf.chunkSize) + ")") parser.add_argument("--chunkSize" , type=int, default=defConf.chunkSize , help="chunk size of file-parts getting send via zmq (default=" + str(defConf.chunkSize) + ")")
parser.add_argument("--fileWaitTimeInMs" , type=int, default=defConf.fileWaitTimeInMs , help=argparse.SUPPRESS)
parser.add_argument("--fileMaxWaitTimeInMs", type=int, default=defConf.fileMaxWaitTimeInMs, help=argparse.SUPPRESS)
arguments = parser.parse_args() arguments = parser.parse_args()
watchFolder = str(arguments.watchFolder) watchFolder = str(arguments.watchFolder)
...@@ -638,9 +586,6 @@ if __name__ == '__main__': ...@@ -638,9 +586,6 @@ if __name__ == '__main__':
parallelDataStreams = str(arguments.parallelDataStreams) parallelDataStreams = str(arguments.parallelDataStreams)
chunkSize = int(arguments.chunkSize) chunkSize = int(arguments.chunkSize)
fileWaitTimeInMs = float(arguments.fileWaitTimeInMs)
fileMaxWaitTimeInMs = float(arguments.fileMaxWaitTimeInMs)
#enable logging #enable logging
helperScript.initLogging(logfileFullPath, verbose) helperScript.initLogging(logfileFullPath, verbose)
...@@ -663,7 +608,6 @@ if __name__ == '__main__': ...@@ -663,7 +608,6 @@ if __name__ == '__main__':
fileMover = FileMover(fileEventIp, fileEventPort, dataStreamIp, dataStreamPort, fileMover = FileMover(fileEventIp, fileEventPort, dataStreamIp, dataStreamPort,
parallelDataStreams, chunkSize, parallelDataStreams, chunkSize,
zmqCleanerIp, zmqCleanerPort, zmqCleanerIp, zmqCleanerPort,
fileWaitTimeInMs, fileMaxWaitTimeInMs,
zmqContext) zmqContext)
try: try:
fileMover.process() fileMover.process()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment