Skip to content
Snippets Groups Projects
Commit 5f5f58d1 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Moved/Removed file after sending

parent fad7de0f
No related branches found
No related tags found
No related merge requests found
......@@ -96,49 +96,76 @@ class DataDispatcher():
while True:
# Get workload from router, until finished
self.log.debug("DataDispatcher-" + str(self.id) + ": waiting for new job")
message = self.routerSocket.recv()
message = self.routerSocket.recv_multipart()
self.log.debug("DataDispatcher-" + str(self.id) + ": new job received")
finished = workload == b"EXIT"
if finished:
self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
break
workload = message[0]
targets = message[1:]
if len(message) >= 2:
workload = message[0]
targets = message[1:]
else:
workload = message
targets = None
finished = workload == b"EXIT"
if finished:
self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
break
# get metadata of the file
try:
self.log.debug("Getting file metadata")
sourcePathFull, metadata = self.getMetadata(workload)
sourceFile, metadata = self.getMetadata(workload)
except Exception as e:
self.log.error("Building of metadata dictionary failed for file: " + str(sourcePathFull) + ".")
self.log.error("Building of metadata dictionary failed for file: " + str(sourceFile) + ".")
self.log.debug("Error was: " + str(e))
#skip all further instructions and continue with next iteration
continue
# send data
try:
self.sendData(targets, sourcePathFull, metadataDict)
except Exception as e:
self.log.debug("DataDispatcher-"+str(self.id) + ": Passing new file to data Stream...failed.")
self.log.debug("Error was: " + str(e))
if targets:
try:
self.sendData(targets, sourceFile, metadata)
except Exception as e:
self.log.debug("DataDispatcher-"+str(self.id) + ": Passing new file to data Stream...failed.")
self.log.debug("Error was: " + str(e))
# remove file
try:
os.remove(sourceFile)
self.log.info("Removing file '" + str(sourceFile) + "' ...success.")
except IOError:
self.log.debug ("IOError: " + str(sourceFile))
except Exception, e:
trace = traceback.format_exc()
self.log.debug("Unable to remove file {FILE}.".format(FILE=str(sourceFile))
self.log.debug("trace=" + str(trace))
else:
# move file
try:
shutil.move(sourceFile, targetFile)
self.log.info("Moving file '" + str(sourceFile) + "' ...success.")
except IOError:
self.log.debug ("IOError: " + str(sourceFile))
except Exception, e:
trace = traceback.format_exc()
self.log.debug("Unable to move file {FILE}.".format(FILE=str(sourceFile))
self.log.debug("trace=" + str(trace))
# send file to cleaner pipe
try:
#sending to pipe
self.log.debug("send file-event for file to cleaner-pipe...")
self.log.debug("metadataDict = " + str(metadataDict))
self.cleanerSocket.send(json.dumps(metadataDict))
self.log.debug("send file-event for file to cleaner-pipe...success.")
#TODO: remember workload. append to list?
# can be used to verify files which have been processed twice or more
except Exception, e:
self.log.error("Unable to notify Cleaner-pipe to handle file: " + str(workload))
# try:
# #sending to pipe
# self.log.debug("send file-event for file to cleaner-pipe...")
# self.log.debug("metadata = " + str(metadata))
# self.cleanerSocket.send(json.dumps(metadata))
# self.log.debug("send file-event for file to cleaner-pipe...success.")
#
# #TODO: remember workload. append to list?
# # can be used to verify files which have been processed twice or more
# except Exception, e:
# self.log.error("Unable to notify Cleaner-pipe to handle file: " + str(workload))
#
def getMetadata(self, workload):
......@@ -326,3 +353,6 @@ class DataDispatcher():
def __del__(self):
self.stop()
if __name__ == '__main__':
pass
......@@ -152,7 +152,9 @@ class TaskProvider():
# send the file to the fileMover
try:
self.log.debug("Sending message...")
message = [messageDict] + requests
message = [messageDict]
if requests != ["None"]:
message += requests
self.log.debug(str(message))
self.routerSocket.send_multipart(message)
self.log.debug("Sending message...done.")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment