Skip to content
Snippets Groups Projects
Commit 3c0cc20f authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Remove File only if prioSend/move/copy is successful

parent f920a11d
No related branches found
No related tags found
No related merge requests found
......@@ -9,20 +9,20 @@ import cPickle
import shutil
import errno
from send_helpers import __sendToTargets
from send_helpers import __sendToTargets, DataHandlingError
def setup (log, prop):
if ( not prop.has_key("fixSubdirs") or
not prop.has_key("storeData") or
not prop.has_key("removeFlag") ):
not prop.has_key("storeData") ):
log.error ("Configuration of wrong format")
log.debug ("dataFetcherProp="+ str(prop))
return False
else:
prop["removeFlag"] = False
return True
......@@ -97,6 +97,13 @@ def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, c
prop["removeFlag"] = False
return
chunkSize = metadata[ "chunkSize" ]
targets_data = [i for i in targets if i[2] == "data"]
chunkNumber = 0
sendError = False
#reading source file into memory
try:
log.debug("Opening '" + str(sourceFile) + "'...")
......@@ -105,11 +112,6 @@ def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, c
log.error("Unable to read source file '" + str(sourceFile) + "'.", exc_info=True)
raise
chunkSize = metadata[ "chunkSize" ]
targets_data = [i for i in targets if i[2] == "data"]
chunkNumber = 0
log.debug("Passing multipart-message for file " + str(sourceFile) + "...")
while True:
......@@ -137,6 +139,9 @@ def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, c
__sendToTargets(log, targets_data, sourceFile, targetFile, openConnections, None, chunkPayload, context)
log.debug("Passing multipart-message for file " + str(sourceFile) + " (chunk " + str(chunkNumber) + ")...done.")
except DataHandlingError:
log.error("Unable to send multipart-message for file " + str(sourceFile) + " (chunk " + str(chunkNumber) + ")", exc_info=True)
sendError = True
except:
log.error("Unable to send multipart-message for file " + str(sourceFile) + " (chunk " + str(chunkNumber) + ")", exc_info=True)
......@@ -148,48 +153,76 @@ def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, c
fileDescriptor.close()
except:
log.error("Unable to close target file '" + str(targetFile) + "'.", exc_info=True)
raise
prop["removeFlag"] = True
if not sendError:
prop["removeFlag"] = True
def __dataHandling(log, sourceFile, targetFile, actionFunction, metadata, prop):
try:
actionFunction(sourceFile, targetFile)
except IOError as e:
# errno.ENOENT == "No such file or directory"
if e.errno == errno.ENOENT:
subdir, tmp = os.path.split(metadata["relativePath"])
if metadata["relativePath"] in prop["fixSubdirs"]:
log.error("Unable to copy/move file '" + sourceFile + "' to '" + targetFile +
": Directory " + metadata["relativePath"] + " is not available.", exc_info=True)
raise
elif subdir in prop["fixSubdirs"] :
log.error("Unable to copy/move file '" + sourceFile + "' to '" + targetFile +
": Directory " + subdir + " is not available.", exc_info=True)
raise
else:
try:
targetPath, filename = os.path.split(targetFile)
os.makedirs(targetPath)
log.info("New target directory created: " + str(targetPath))
actionFunction(sourceFile, targetFile)
except:
log.error("Unable to copy/move file '" + sourceFile + "' to '" + targetFile, exc_info=True)
log.debug("targetPath:" + str(targetPath))
else:
log.error("Unable to copy/move file '" + sourceFile + "' to '" + targetFile, exc_info=True)
raise
except:
log.error("Unable to copy/move file '" + sourceFile + "' to '" + targetFile, exc_info=True)
raise
def finishDataHandling (log, targets, sourceFile, targetFile, metadata, openConnections, context, prop):
targets_metadata = [i for i in targets if i[2] == "metadata"]
if prop["storeData"]:
if prop["storeData"] and prop["removeFlag"]:
# move file
try:
shutil.move(sourceFile, targetFile)
__dataHandling(log, sourceFile, targetFile, shutil.move, metadata, prop)
log.info("Moving file '" + str(sourceFile) + "' ...success.")
except IOError as e:
# errno.ENOENT == "No such file or directory"
if e.errno == errno.ENOENT:
subdir, tmp = os.path.split(metadata["relativePath"])
if metadata["relativePath"] in prop["fixSubdirs"]:
log.error("Unable to move file '" + sourceFile + "' to '" + targetFile +
": Directory " + metadata["relativePath"] + " is not available", exc_info=True)
return
elif subdir in prop["fixSubdirs"] :
log.error("Unable to move file '" + sourceFile + "' to '" + targetFile +
": Directory " + subdir + " is not available", exc_info=True)
return
else:
try:
targetPath, filename = os.path.split(targetFile)
os.makedirs(targetPath)
shutil.move(sourceFile, targetFile)
log.info("New target directory created: " + str(targetPath))
except:
log.error("Unable to move file '" + sourceFile + "' to '" + targetFile, exc_info=True)
log.debug("targetPath:" + str(targetPath))
else:
log.error("Unable to move file '" + sourceFile + "' to '" + targetFile, exc_info=True)
return
except:
log.error("Unable to move file '" + sourceFile + "' to '" + targetFile, exc_info=True)
return
#send message to metadata targets
if targets_metadata:
try:
__sendToTargets(log, targets_metadata, sourceFile, targetFile, openConnections, metadata, None, context)
log.debug("Passing metadata multipart-message for file " + str(sourceFile) + "...done.")
except:
log.error("Unable to send metadata multipart-message for file " + str(sourceFile), exc_info=True)
elif prop["StoreData"]:
# copy file
# (does not preserve file owner, group or ACLs)
try:
__dataHandling(log, sourceFile, targetFile, shutil.copy, metadata, prop)
log.info("Copying file '" + str(sourceFile) + "' ...success.")
except:
return
#send message to metadata targets
......@@ -281,7 +314,8 @@ if __name__ == '__main__':
dataFetcherProp = {
"type" : "getFromFile",
"removeFlag" : False
"fixSubdirs" : ["commissioning", "current", "local"],
"storeData" : False
}
logging.debug("openConnections before function call: " + str(openConnections))
......
......@@ -3,6 +3,11 @@ __author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import zmq
import cPickle
class DataHandlingError(Exception):
pass
def __sendToTargets(log, targets, sourceFile, targetFile, openConnections, metadata, payload, context):
for target, prio, sendType in targets:
......@@ -12,35 +17,43 @@ def __sendToTargets(log, targets, sourceFile, targetFile, openConnections, metad
# socket not known
if target not in openConnections:
# open socket
socket = context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
try:
socket = context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
socket.connect(connectionStr)
log.info("Start socket (connect): '" + str(connectionStr) + "'")
socket.connect(connectionStr)
log.info("Start socket (connect): '" + str(connectionStr) + "'")
# register socket
openConnections[target] = socket
# register socket
openConnections[target] = socket
except:
raise DataHandlingError("Failed to start socket (connect): '" + str(connectionStr) + "'")
# send data
if sendType == "data":
tracker = openConnections[target].send_multipart(payload, copy=False, track=True)
log.info("Sending message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) )
elif sendType == "metadata":
#cPickle.dumps(None) is 'N.'
tracker = openConnections[target].send_multipart([cPickle.dumps(metadata), cPickle.dumps(None)], copy=False, track=True)
log.info("Sending metadata of message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) )
log.debug("metadata=" + str(metadata))
try:
if sendType == "data":
tracker = openConnections[target].send_multipart(payload, copy=False, track=True)
log.info("Sending message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) )
elif sendType == "metadata":
#cPickle.dumps(None) is 'N.'
tracker = openConnections[target].send_multipart([cPickle.dumps(metadata), cPickle.dumps(None)], copy=False, track=True)
log.info("Sending metadata of message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) )
log.debug("metadata=" + str(metadata))
if not tracker.done:
log.debug("Message part from file " + str(sourceFile) +
" has not been sent yet, waiting...")
tracker.wait()
log.debug("Message part from file " + str(sourceFile) +
" has not been sent yet, waiting...done")
except:
raise DataHandlingError("Sending (metadata of) message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) + " failed.")
if not tracker.done:
log.idebug("Message part from file " + str(sourceFile) +
" has not been sent yet, waiting...")
tracker.wait()
log.debug("Message part from file " + str(sourceFile) +
" has not been sent yet, waiting...done")
else:
# socket not known
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment