Skip to content
Snippets Groups Projects
Commit 282bbae4 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Cleaner via tcp socket

parent 3694a3d9
No related branches found
No related tags found
No related merge requests found
...@@ -29,16 +29,19 @@ class WorkerProcess(): ...@@ -29,16 +29,19 @@ class WorkerProcess():
zmqMessageChunkSize = None zmqMessageChunkSize = None
fileWaitTime_inMs = None fileWaitTime_inMs = None
fileMaxWaitTime_InMs = None fileMaxWaitTime_InMs = None
cleaner_url = "inproc://workers" zmqCleanerIp = None # responsable to delete files
zmqCleanerPort = None # responsable to delete files
zmqDataStreamSocket = None zmqDataStreamSocket = None
def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize, def __init__(self, id, dataStreamIp, dataStreamPort, logfileFullPath, chunkSize, zmqCleanerIp, zmqCleanerPort,
fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0): fileWaitTimeInMs=2000.0, fileMaxWaitTimeInMs=10000.0):
self.id = id self.id = id
self.dataStreamIp = dataStreamIp self.dataStreamIp = dataStreamIp
self.dataStreamPort = dataStreamPort self.dataStreamPort = dataStreamPort
self.logfileFullPath = logfileFullPath self.logfileFullPath = logfileFullPath
self.zmqMessageChunkSize = chunkSize self.zmqMessageChunkSize = chunkSize
self.zmqCleanerIp = zmqCleanerIp
self.zmqCleanerPort = zmqCleanerPort
self.fileWaitTime_inMs = fileWaitTimeInMs self.fileWaitTime_inMs = fileWaitTimeInMs
self.fileMaxWaitTime_InMs = fileMaxWaitTimeInMs self.fileMaxWaitTime_InMs = fileMaxWaitTimeInMs
...@@ -49,8 +52,8 @@ class WorkerProcess(): ...@@ -49,8 +52,8 @@ class WorkerProcess():
except KeyboardInterrupt: except KeyboardInterrupt:
# trace = traceback.format_exc() # trace = traceback.format_exc()
logging.debug("KeyboardInterrupt detected. Shutting down workerProcess.") logging.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
self.zmqDataStreamSocket.close() # self.zmqDataStreamSocket.close()
self.zmqContextForWorker.destroy() # self.zmqContextForWorker.destroy()
else: else:
trace = traceback.format_exc() trace = traceback.format_exc()
logging.error("Stopping workerProcess due to unknown error condition.") logging.error("Stopping workerProcess due to unknown error condition.")
...@@ -101,7 +104,7 @@ class WorkerProcess(): ...@@ -101,7 +104,7 @@ class WorkerProcess():
#init Cleaner message-pipe #init Cleaner message-pipe
cleanerSocket = zmqContextForWorker.socket(zmq.PUSH) cleanerSocket = zmqContextForWorker.socket(zmq.PUSH)
connectionStringCleanerSocket = self.cleaner_url connectionStringCleanerSocket = "tcp://{ip}:{port}".format(ip=self.zmqCleanerIp, port=self.zmqCleanerPort)
cleanerSocket.connect(connectionStringCleanerSocket) cleanerSocket.connect(connectionStringCleanerSocket)
...@@ -404,6 +407,8 @@ class FileMover(): ...@@ -404,6 +407,8 @@ class FileMover():
tcpPort_messageStream = "6060" tcpPort_messageStream = "6060"
dataStreamIp = "127.0.0.1" # ip of dataStream-socket to push new files to dataStreamIp = "127.0.0.1" # ip of dataStream-socket to push new files to
dataStreamPort = "6061" # port number of dataStream-socket to push new files to dataStreamPort = "6061" # port number of dataStream-socket to push new files to
zmqCleanerIp = "127.0.0.1" # zmq pull endpoint, responsable to delete files
zmqCleanerPort = "6062" # zmq pull endpoint, responsable to delete files
fileWaitTimeInMs = None fileWaitTimeInMs = None
fileMaxWaitTimeInMs = None fileMaxWaitTimeInMs = None
pipe_name = "/tmp/zeromqllpipe_resp" pipe_name = "/tmp/zeromqllpipe_resp"
...@@ -418,6 +423,7 @@ class FileMover(): ...@@ -418,6 +423,7 @@ class FileMover():
self.startReceiving() self.startReceiving()
except zmq.error.ZMQError as e: except zmq.error.ZMQError as e:
logging.error("ZMQError: "+ str(e)) logging.error("ZMQError: "+ str(e))
log.debug("Shutting down workerProcess.")
except KeyboardInterrupt: except KeyboardInterrupt:
logging.debug("KeyboardInterrupt detected. Shutting down fileMover.") logging.debug("KeyboardInterrupt detected. Shutting down fileMover.")
logging.info("Shutting down fileMover as KeyboardInterrupt was detected.") logging.info("Shutting down fileMover as KeyboardInterrupt was detected.")
...@@ -430,7 +436,7 @@ class FileMover(): ...@@ -430,7 +436,7 @@ class FileMover():
def __init__(self, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams, def __init__(self, bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, parallelDataStreams,
logfileFullPath, chunkSize, logfileFullPath, chunkSize, zmqCleanerIp, zmqCleanerPort,
fileWaitTimeInMs, fileMaxWaitTimeInMs): fileWaitTimeInMs, fileMaxWaitTimeInMs):
logging.info("registering zmq global context") logging.info("registering zmq global context")
...@@ -445,6 +451,8 @@ class FileMover(): ...@@ -445,6 +451,8 @@ class FileMover():
self.parallelDataStreams = parallelDataStreams self.parallelDataStreams = parallelDataStreams
self.logfileFullPath = logfileFullPath self.logfileFullPath = logfileFullPath
self.chunkSize = chunkSize self.chunkSize = chunkSize
self.zmqCleanerIp = zmqCleanerIp
self.zmqCleanerPort = zmqCleanerPort
self.fileWaitTimeInMs = fileWaitTimeInMs self.fileWaitTimeInMs = fileWaitTimeInMs
self.fileMaxWaitTimeInMs = fileMaxWaitTimeInMs self.fileMaxWaitTimeInMs = fileMaxWaitTimeInMs
...@@ -493,6 +501,8 @@ class FileMover(): ...@@ -493,6 +501,8 @@ class FileMover():
self.dataStreamPort, self.dataStreamPort,
logfileFullPath, logfileFullPath,
self.chunkSize, self.chunkSize,
zmqCleanerIp,
zmqCleanerPort,
fileWaitTimeInMs, fileWaitTimeInMs,
fileMaxWaitTimeInMs)) fileMaxWaitTimeInMs))
workerThreadList.append(newWorkerThread) workerThreadList.append(newWorkerThread)
...@@ -638,7 +648,8 @@ class Cleaner(): ...@@ -638,7 +648,8 @@ class Cleaner():
try: try:
self.process() self.process()
except zmq.error.ZMQError: except zmq.error.ZMQError:
log.debug("KeyboardInterrupt detected. Shutting down workerProcess.") logging.error("ZMQError: "+ str(e))
log.debug("Shutting down workerProcess.")
self.zmqContextForCleaner.destroy() self.zmqContextForCleaner.destroy()
except KeyboardInterrupt: except KeyboardInterrupt:
log.debug("KeyboardInterrupt detected. Shutting down workerProcess.") log.debug("KeyboardInterrupt detected. Shutting down workerProcess.")
...@@ -828,6 +839,8 @@ def argumentParsing(): ...@@ -828,6 +839,8 @@ def argumentParsing():
parser.add_argument("--bindingPortForSocket", type=str, help="local port to bind to", default="6060") parser.add_argument("--bindingPortForSocket", type=str, help="local port to bind to", default="6060")
parser.add_argument("--dataStreamIp" , type=str, help="ip of dataStream-socket to push new files to", default="127.0.0.1") parser.add_argument("--dataStreamIp" , type=str, help="ip of dataStream-socket to push new files to", default="127.0.0.1")
parser.add_argument("--dataStreamPort", type=str, help="port number of dataStream-socket to push new files to", default="6061") parser.add_argument("--dataStreamPort", type=str, help="port number of dataStream-socket to push new files to", default="6061")
parser.add_argument("--zmqCleanerIp" , type=str, help="zmq-pull-socket which deletes given files", default="127.0.0.1")
parser.add_argument("--zmqCleanerPort", type=str, help="zmq-pull-socket which deletes given files", default="6063")
parser.add_argument("--parallelDataStreams", type=int, help="number of parallel data streams. default is 1", default="1") parser.add_argument("--parallelDataStreams", type=int, help="number of parallel data streams. default is 1", default="1")
parser.add_argument("--chunkSize", type=int, help="chunk size of file-parts getting send via zmq", default=DEFAULT_CHUNK_SIZE) parser.add_argument("--chunkSize", type=int, help="chunk size of file-parts getting send via zmq", default=DEFAULT_CHUNK_SIZE)
parser.add_argument("--verbose" , help="more verbose output", action="store_true") parser.add_argument("--verbose" , help="more verbose output", action="store_true")
...@@ -909,6 +922,8 @@ if __name__ == '__main__': ...@@ -909,6 +922,8 @@ if __name__ == '__main__':
logfilePath = str(arguments.logfilePath) logfilePath = str(arguments.logfilePath)
logfileName = str(arguments.logfileName) logfileName = str(arguments.logfileName)
parallelDataStreams = str(arguments.parallelDataStreams) parallelDataStreams = str(arguments.parallelDataStreams)
zmqCleanerIp = str(arguments.zmqCleanerIp)
zmqCleanerPort = str(arguments.zmqCleanerPort)
chunkSize = arguments.chunkSize chunkSize = arguments.chunkSize
verbose = arguments.verbose verbose = arguments.verbose
logfileFullPath = os.path.join(logfilePath, logfileName) logfileFullPath = os.path.join(logfilePath, logfileName)
...@@ -916,6 +931,10 @@ if __name__ == '__main__': ...@@ -916,6 +931,10 @@ if __name__ == '__main__':
fileMaxWaitTimeInMs = float(arguments.fileMaxWaitTimeInMs) fileMaxWaitTimeInMs = float(arguments.fileMaxWaitTimeInMs)
cleanerThread = Process(target=Cleaner, args=(logfilePath, zmqCleanerIp, zmqCleanerPort))
logging.debug("cleaner thread started")
cleanerThread.start()
#enable logging #enable logging
initLogging(logfileFullPath, verbose) initLogging(logfileFullPath, verbose)
...@@ -924,6 +943,7 @@ if __name__ == '__main__': ...@@ -924,6 +943,7 @@ if __name__ == '__main__':
# try: # try:
fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort, fileMover = FileMover(bindingIpForSocket, bindingPortForSocket, dataStreamIp, dataStreamPort,
parallelDataStreams, logfileFullPath, chunkSize, parallelDataStreams, logfileFullPath, chunkSize,
zmqCleanerIp, zmqCleanerPort,
fileWaitTimeInMs, fileMaxWaitTimeInMs) fileWaitTimeInMs, fileMaxWaitTimeInMs)
fileMover.process() fileMover.process()
# except KeyboardInterrupt, ke: # except KeyboardInterrupt, ke:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment