Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import argparse
import zmq
import os
import logging
import sys
import json
import trace
#
# -------------------------- class: TaskHandler --------------------------------------
#
class TaskHandler():
def __init__ (self, eventDetectorConfig, requestFwPort, distrPort, context = None):
#eventDetectorConfig = {
# configType: ... ,
# watchDir : ... ,
# monEventType : ... ,
# monDefaultSubdirs : ... ,
# monSuffixes : ... ,
#}
self.log = self.getLogger()
self.log.debug("TaskHandler: __init__()")
self.eventDetector = None
self.watchDir = None
self.monEventType = "IN_CLOSE_WRITE"
self.monDefaultSubdirs = ["commissioning", "current", "local"]
self.monSuffixes = [".tif", ".cbf"]
self.config = eventDetectorConfig
self.localhost = "127.0.0.1"
self.extIp = "0.0.0.0"
self.requestFwPort = requestFwPort
self.distrPort = distrPort
self.requestFwSocket = None
self.distrSocket = None
self.log.debug("Registering ZMQ context")
# remember if the context was created outside this class or not
if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
if self.config["configType"] == "inotify":
from InotifyDetector import InotifyDetector as EventDetector
self.watchDir = os.path.normpath(watchDir)
self.monEventType = self.config["monEventType"] or None
self.log.info ("Monitored event type is: " + str(self.monEventType))
self.monDefaultSubdirs = self.config["monDefaultSubdirs"] or None
self.monSuffixes = self.config["monSuffixes"] or None
self.log.info ("Monitored suffixes are: " + str(self.monSuffixes))
monDirs = [self.config["self.watchDir"]]
#TODO forward self.config instead of seperate variables
self.eventDetector = EventDetector(monDirs, self.monEventType, self.monDefaultSubdirs, self.monSuffixes)
else:
self.log.error("Type of event detector is not supported: " + str( self.config["configType"] ))
return -1
self.createSockets()
try:
self.run()
except KeyboardInterrupt:
self.log.debug("Keyboard interruption detected. Shuting down")
except:
trace = traceback.format_exc()
self.log.info("Stopping TaskHandler due to unknown error condition.")
self.log.debug("Error was: " + str(trace))
def getLogger (self):
logger = logging.getLogger("TaskHandler")
return logger
def createSockets (self):
# socket to get requests
self.requestFwSocket = self.context.socket(zmq.REQ)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.requestFwPort )
try:
self.requestFwSocket.connect(connectionStr)
self.log.debug("Connecting to requestFwSocket (connect): " + str(connectionStr))
except Exception as e:
self.log.error("Failed to start requestFwSocket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
# socket to disribute the events to the worker
self.distrSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.requestFwPort )
try:
self.distrSocket.bind(connectionStr)
self.log.debug("Connecting to distributing socket (bind): " + str(connectionStr))
except Exception as e:
self.log.error("Failed to start distributing Socket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
def run (self):
while True:
try:
# the event for a file /tmp/test/source/local/file1.tif is of the form:
# {
# "sourcePath" : "/tmp/test/source/"
# "relativePath": "local"
# "filename" : "file1.tif"
# }
workloadList = self.eventDetector.getNewEvent()
except Exception, e:
self.log.error("Invalid fileEvent message received.")
self.log.debug("Error was: " + str(e))
#skip all further instructions and continue with next iteration
continue
#TODO validate workload dict
for workload in workloadList:
# get requests for this event
try:
self.log.debug("Get requests...")
self.requestFwSocket.send("")
requests = self.requestFwSocket.recv_multipart()
self.log.debug("Get requests... done.")
self.log.debug("Requests: " + str(requests))
except:
self.log.error("Get Requests... failed.")
trace = traceback.format_exc()
self.log.debug("Error was: " + str(trace))
# build message dict
try:
self.log.debug("Building message dict...")
messageDict = json.dumps(workload) #sets correct escape characters
self.log.debug("Building message dict...done.")
except Exception, e:
self.log.error("Unable to assemble message dict.")
self.log.debug("Error was: " + stri(e))
continue
# send the file to the fileMover
try:
self.log.debug("Sending message...")
self.log.debug(str(messageDict))
self.distrSocket.send_multipart([messageDict, requests])
self.log.debug("Sending message...done.")
except Exception, e:
self.log.error("Sending message...failed.")
self.log.debug("Error was: " + str(e))
def stop(self):
if self.distrSocket:
self.distrSocket.close(0)
self.distrSocket = None
if self.requestFwSocket:
self.requestFwSocket.close(0)
self.requestFwSocket = None
if not self.extContext and if self.context:
self.context.destroy()
self.context = None
def __exit__(self):
self.stop()
def __del__(self):
self.stop()
if __name__ == '__main__':
def argumentParsing():
parser = argparse.ArgumentParser()
parser.add_argument("--watchDir" , type=str, help="directory you want to monitor for changes")
parser.add_argument("--staticNotification",
help="disables new file-events. just sends a list of currently available files within the defined 'watchDir'.",
action="store_true")
parser.add_argument("--logfilePath" , type=str, help="path where logfile will be created" , default="/tmp/log/")
parser.add_argument("--logfileName" , type=str, help="filename used for logging" , default="watchDir.log")
parser.add_argument("--fileEventIp" , type=str, help="zqm endpoint (IP-address) to send file events to", default="127.0.0.1")
parser.add_argument("--fileEventPort", type=str, help="zqm endpoint (port) to send file events to" , default="6060")
parser.add_argument("--verbose" , help="more verbose output", action="store_true")
arguments = parser.parse_args()
# TODO: check watchDir-directory for existance
watchDir = str(arguments.watchDir)
assert isinstance(type(watchDir), type(str))
#exit with error if no watchDir path was provided
if watchDir in [ None, "", "None" ]:
print """You need to set the following option:
--watchDir {DIRECTORY}
"""
sys.exit(1)
#abort if watchDir does not exist
helperScript.checkDirExistance(watchDir)
#error if logfile cannot be written
try:
fullPath = os.path.join(arguments.logfilePath, arguments.logfileName)
logFile = open(fullPath, "a")
except:
print "Unable to create the logfile """ + str(fullPath)
print """Please specify a new target by setting the following arguments:
--logfileName
--logfilePath
"""
sys.exit(1)
#check logfile-path for existance
helperScript.checkDirExistance(arguments.logfilePath)
return arguments
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
SRC_PATH = BASE_PATH + os.sep + "src"
sys.path.append ( SRC_PATH )
import shared.helperScript as helperScript
arguments = argumentParsing()
watchDir = arguments.watchDir
verbose = arguments.verbose
logfileFilePath = os.path.join(arguments.logfilePath, arguments.logfileName)
fileEventIp = str(arguments.fileEventIp)
fileEventPort = str(arguments.fileEventPort)
#enable logging
helperScript.initLogging(logfileFilePath, verbose)
#run only once, skipping file events
#just get a list of all files in watchDir and pass to zeromq
directoryWatcher = DirectoryWatcher(fileEventIp, watchDir, fileEventPort)