Skip to content
Snippets Groups Projects
Commit 32bb4c91 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Changed comments and names from thread to process

parent d4015ebf
No related branches found
No related tags found
No related merge requests found
...@@ -36,8 +36,8 @@ class WorkerProcess(): ...@@ -36,8 +36,8 @@ class WorkerProcess():
dataStreamPort = None dataStreamPort = None
zmqContextForWorker = None zmqContextForWorker = None
zmqMessageChunkSize = None zmqMessageChunkSize = None
zmqCleanerIp = None # responsable to delete files zmqCleanerIp = None # responsable to delete/move files
zmqCleanerPort = None # responsable to delete files zmqCleanerPort = None # responsable to delete/move files
zmqDataStreamSocket = None zmqDataStreamSocket = None
routerSocket = None routerSocket = None
cleanerSocket = None cleanerSocket = None
...@@ -60,7 +60,7 @@ class WorkerProcess(): ...@@ -60,7 +60,7 @@ class WorkerProcess():
self.log = self.getLogger() self.log = self.getLogger()
self.log.debug("new workerThread started. id=" + str(self.id)) self.log.debug("new workerProcess started. id=" + str(self.id))
# initialize sockets # initialize sockets
self.zmqDataStreamSocket = self.zmqContextForWorker.socket(zmq.PUSH) self.zmqDataStreamSocket = self.zmqContextForWorker.socket(zmq.PUSH)
...@@ -107,10 +107,10 @@ class WorkerProcess(): ...@@ -107,10 +107,10 @@ class WorkerProcess():
-> the simulated "onClosed" event waits for a file for being -> the simulated "onClosed" event waits for a file for being
not modified within a certain period of time. not modified within a certain period of time.
Instead of processing file after file the work will be Instead of processing file after file the work will be
spreaded to many workerThreads. So each thread can wait spreaded to many workerProcesses. So each process can wait
individual periods of time for a file without blocking individual periods of time for a file without blocking
new file events - as new file events will be handled by new file events - as new file events will be handled by
another workerThread. another workerProcess.
""" """
""" """
...@@ -136,7 +136,7 @@ class WorkerProcess(): ...@@ -136,7 +136,7 @@ class WorkerProcess():
finished = workload == b"END" finished = workload == b"END"
if finished: if finished:
processingJobs = False processingJobs = False
self.log.debug("router requested to shutdown worker-thread. Worker processed: %d files" % jobCount) self.log.debug("router requested to shutdown worker-process. Worker processed: %d files" % jobCount)
break break
jobCount += 1 jobCount += 1
...@@ -429,8 +429,8 @@ class FileMover(): ...@@ -429,8 +429,8 @@ class FileMover():
self.poller.register(self.fileEventSocket, zmq.POLLIN) self.poller.register(self.fileEventSocket, zmq.POLLIN)
self.poller.register(self.receiverComSocket, zmq.POLLIN) self.poller.register(self.receiverComSocket, zmq.POLLIN)
# setting up router for load-balancing worker-threads. # setting up router for load-balancing worker-processes.
# each worker-thread will handle a file event # each worker-process will handle a file event
routerIp = "127.0.0.1" routerIp = "127.0.0.1"
routerPort = "50000" routerPort = "50000"
...@@ -468,21 +468,21 @@ class FileMover(): ...@@ -468,21 +468,21 @@ class FileMover():
incomingMessageCounter = 0 incomingMessageCounter = 0
#start worker-threads. each will have its own PushSocket. #start worker-processes. each will have its own PushSocket.
workerThreadList = list() workerProcessList = list()
numberOfWorkerThreads = parallelDataStreams numberOfWorkerProcesses = parallelDataStreams
for threadNumber in range(numberOfWorkerThreads): for processNumber in range(numberOfWorkerProcesses):
self.log.debug("instantiate new workerProcess (nr " + str(threadNumber) + " )") self.log.debug("instantiate new workerProcess (nr " + str(processNumber) + " )")
newWorkerThread = Process(target=WorkerProcess, args=(threadNumber, newWorkerProcess = Process(target=WorkerProcess, args=(processNumber,
self.dataStreamIp, self.dataStreamIp,
self.dataStreamPort, self.dataStreamPort,
self.chunkSize, self.chunkSize,
self.zmqCleanerIp, self.zmqCleanerIp,
self.zmqCleanerPort)) self.zmqCleanerPort))
workerThreadList.append(newWorkerThread) workerProcessList.append(newWorkerProcess)
self.log.debug("start worker process nr " + str(threadNumber)) self.log.debug("start worker process nr " + str(processNumber))
newWorkerThread.start() newWorkerProcess.start()
#run loop, and wait for incoming messages #run loop, and wait for incoming messages
continueReceiving = True continueReceiving = True
...@@ -529,22 +529,22 @@ class FileMover(): ...@@ -529,22 +529,22 @@ class FileMover():
def processFileEvent(self, fileEventMessage): def processFileEvent(self, fileEventMessage):
# LRU worker is next waiting in the queue # LRU worker is next waiting in the queue
self.log.debug("waiting for available workerThread.") self.log.debug("waiting for available workerProcess.")
# address == "worker-0" # address == "worker-0"
# empty == "" # empty == ""
# ready == "READY" # ready == "READY"
address, empty, ready = self.routerSocket.recv_multipart() address, empty, ready = self.routerSocket.recv_multipart()
self.log.debug("available workerThread detected.") self.log.debug("available workerProcess detected.")
self.log.debug("passing job to workerThread...") self.log.debug("passing job to workerProcess...")
self.routerSocket.send_multipart([ self.routerSocket.send_multipart([
address, address,
b'', b'',
fileEventMessage, fileEventMessage,
]) ])
self.log.debug("passing job to workerThread...done.") self.log.debug("passing job to workerProcess...done.")
def stop(self): def stop(self):
...@@ -624,17 +624,17 @@ if __name__ == '__main__': ...@@ -624,17 +624,17 @@ if __name__ == '__main__':
zmqContext = zmq.Context.instance() zmqContext = zmq.Context.instance()
logging.info("registering zmq global context") logging.info("registering zmq global context")
logging.debug("start watcher thread...") logging.debug("start watcher process...")
watcherThread = Process(target=DirectoryWatcher, args=(fileEventIp, watchFolder, fileEventPort, zmqContext)) watcherProcess = Process(target=DirectoryWatcher, args=(fileEventIp, watchFolder, fileEventPort, zmqContext))
logging.debug("watcher thread registered") logging.debug("watcher process registered")
watcherThread.start() watcherProcess.start()
logging.debug("start watcher thread...done") logging.debug("start watcher process...done")
logging.debug("start cleaner thread...") logging.debug("start cleaner process...")
cleanerThread = Process(target=Cleaner, args=(cleanerTargetPath, zmqCleanerIp, zmqCleanerPort, zmqContext)) cleanerProcess = Process(target=Cleaner, args=(cleanerTargetPath, zmqCleanerIp, zmqCleanerPort, zmqContext))
logging.debug("cleaner thread registered") logging.debug("cleaner process registered")
cleanerThread.start() cleanerProcess.start()
logging.debug("start cleaner thread...done") logging.debug("start cleaner process...done")
#start new fileMover #start new fileMover
fileMover = FileMover(fileEventIp, fileEventPort, dataStreamIp, dataStreamPort, fileMover = FileMover(fileEventIp, fileEventPort, dataStreamIp, dataStreamPort,
...@@ -657,7 +657,7 @@ if __name__ == '__main__': ...@@ -657,7 +657,7 @@ if __name__ == '__main__':
logging.error(sys.exc_info()) logging.error(sys.exc_info())
logging.error("shutting down zeromq...failed.") logging.error("shutting down zeromq...failed.")
# give the other threads time to close the sockets # give the other processes time to close the sockets
time.sleep(0.1) time.sleep(0.1)
try: try:
logging.debug("closing zmqContext...") logging.debug("closing zmqContext...")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment