Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from __builtin__ import open, type
__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <manuela.kuhn@desy.de>'
import time
import argparse
import zmq
import os
import logging
import sys
import json
import traceback
from multiprocessing import Process, freeze_support
import subprocess
import json
import shutil
import helperScript
DEFAULT_CHUNK_SIZE = 1048576
#
# -------------------------- class: Cleaner --------------------------------------
#
class Cleaner():
"""
* received cleaning jobs via zeromq,
such as removing a file
* Does regular checks on the watched directory,
such as
- deleting files which have been successfully send
to target but still remain in the watched directory
- poll the watched directory and reissue new files
to fileMover which have not been detected yet
"""
bindingPortForSocket = None
bindingIpForSocket = None
zmqContextForCleaner = None
externalContext = None # if the context was created outside this class or not
zmqCleanerSocket = None
# to get the logging only handling this class
log = None
def __init__(self, targetPath, bindingIp="127.0.0.1", bindingPort="6062", context = None, verbose=False):
self.bindingPortForSocket = bindingPort
self.bindingIpForSocket = bindingIp
self.targetPath = targetPath
if context:
self.zmqContextForCleaner = context
self.externalContext = True
else:
self.zmqContextForCleaner = zmq.Context()
self.externalContext = False
self.log = self.getLogger()
self.log.debug("Init")
#bind to local port
self.zmqCleanerSocket = self.zmqContextForCleaner.socket(zmq.PULL)
connectionStrCleanerSocket = "tcp://" + self.bindingIpForSocket + ":%s" % self.bindingPortForSocket
self.zmqCleanerSocket.bind(connectionStrCleanerSocket)
self.log.debug("zmqCleanerSocket started for '" + connectionStrCleanerSocket + "'")
try:
self.process()
except zmq.error.ZMQError:
self.log.error("ZMQError: "+ str(e))
self.log.debug("Shutting down cleaner.")
except KeyboardInterrupt:
self.log.info("KeyboardInterrupt detected. Shutting down cleaner.")
except:
trace = traceback.format_exc()
self.log.error("Stopping cleanerProcess due to unknown error condition.")
self.log.debug("Error was: " + str(trace))
def getLogger(self):
logger = logging.getLogger("cleaner")
return logger
def process(self):
#processing messaging
while True:
#waiting for new jobs
self.log.debug("Waiting for new jobs")
try:
workload = self.zmqCleanerSocket.recv()
except Exception as e:
self.log.error("Error in receiving job: " + str(e))
if workload == "STOP":
self.log.info("Stopping cleaner")
self.stop()
#transform to dictionary
try:
workloadDict = json.loads(str(workload))
except:
errorMessage = "invalid job received. skipping job"
self.log.error(errorMessage)
self.log.debug("workload=" + str(workload))
continue
#extract fileEvent metadata
try:
#TODO validate fileEventMessageDict dict
filename = workloadDict["filename"]
sourcePath = workloadDict["sourcePath"]
relativePath = workloadDict["relativePath"]
# filesize = workloadDict["filesize"]
except Exception, e:
errorMessage = "Invalid fileEvent message received."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
self.log.debug("workloadDict=" + str(workloadDict))
#skip all further instructions and continue with next iteration
continue
#moving source file
sourceFilepath = None
try:
self.log.debug("removing source file...")
#generate target filepath
sourcePath = os.path.normpath(sourcePath + os.sep + relativePath)
sourceFullPath = os.path.join(sourcePath,filename)
targetFullPath = os.path.normpath(self.targetPath + relativePath)
# self.removeFile(sourceFilepath)
self.log.debug ("sourcePath: " + str (sourcePath))
self.log.debug ("filename: " + str (filename))
self.log.debug ("targetPath: " + str (targetFullPath))
self.moveFile(sourcePath, filename, targetFullPath)
# #show filesystem statistics
# try:
# self.showFilesystemStatistics(sourcePath)
# except Exception, f:
# logging.warning("Unable to get filesystem statistics")
# logging.debug("Error was: " + str(f))
self.log.debug("file removed: " + str(sourceFullPath))
self.log.debug("removing source file...success.")
except Exception, e:
errorMessage = "Unable to remove source file: " + str (sourceFullPath)
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
self.log.error(errorMessage)
trace = traceback.format_exc()
self.log.error("Error was: " + str(trace))
self.log.debug("sourceFilepath="+str(sourceFilepath))
self.log.debug("removing source file...failed.")
#skip all further instructions and continue with next iteration
continue
def moveFile(self, source, filename, target):
maxAttemptsToRemoveFile = 2
waitTimeBetweenAttemptsInMs = 500
iterationCount = 0
self.log.info("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
fileWasMoved = False
while iterationCount <= maxAttemptsToRemoveFile and not fileWasMoved:
iterationCount+=1
try:
# check if the directory exists before moving the file
if not os.path.exists(target):
try:
os.makedirs(target)
except OSError:
pass
# moving the file
sourceFile = source + os.sep + filename
targetFile = target + os.sep + filename
self.log.debug("sourceFile: " + str(sourceFile))
self.log.debug("targetFile: " + str(targetFile))
shutil.move(sourceFile, targetFile)
fileWasMoved = True
self.log.debug("Moving file '" + str(filename) + "' from '" + str(source) + "' to '" + str(target) + "' (attempt " + str(iterationCount) + ")...success.")
except IOError:
self.log.debug ("IOError: " + str(filename))
except Exception, e:
trace = traceback.format_exc()
warningMessage = "Unable to move file {FILE}.".format(FILE=str(source) + str(filename))
self.log.warning(warningMessage)
self.log.debug("trace=" + str(trace))
self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
if not fileWasMoved:
self.log.error("Moving file '" + str(filename) + " from " + str(source) + " to " + str(target) + "' (attempt " + str(iterationCount) + ")...FAILED.")
raise Exception("maxAttemptsToMoveFile reached (value={ATTEMPT}). Unable to move file '{FILE}'.".format(ATTEMPT=str(iterationCount),
FILE=filename))
def removeFile(self, filepath):
maxAttemptsToRemoveFile = 2
waitTimeBetweenAttemptsInMs = 500
iterationCount = 0
self.log.info("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...")
fileWasRemoved = False
while iterationCount <= maxAttemptsToRemoveFile and not fileWasRemoved:
iterationCount+=1
try:
os.remove(filepath)
fileWasRemoved = True
self.log.debug("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...success.")
except Exception, e:
trace = traceback.format_exc()
warningMessage = "Unable to remove file {FILE}.".format(FILE=str(filepath))
self.log.warning(warningMessage)
self.log.debug("trace=" + str(trace))
self.log.warning("will try again in {MS}ms.".format(MS=str(waitTimeBetweenAttemptsInMs)))
if not fileWasRemoved:
self.log.error("Removing file '" + str(filepath) + "' (attempt " + str(iterationCount) + ")...FAILED.")
raise Exception("maxAttemptsToRemoveFile reached (value={ATTEMPT}). Unable to remove file '{FILE}'.".format(ATTEMPT=str(iterationCount),
FILE=filepath))
if not self.externalContext:
self.log.debug("Destroying context")
self.zmqContextForCleaner.destroy()