Newer
Older
__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <marnuel.kuhn@desy.de>'
import time
import zmq
import sys
import random
import json
import argparse
import logging
import errno
import os
import traceback
from stat import S_ISREG, ST_MTIME, ST_MODE
import threading
class FileReceiver:
globalZmqContext = None
outputDir = None
maxRingBufferSize = 100
timeToWaitForRingBuffer = 2
ringBuffer = []
liveViewerZmqContext = None
def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp):
self.outputDir = outputDir
self.zmqDataStreamIp = zmqDataStreamIp
self.zmqDataStreamPort = zmqDataStreamPort
self.zmqLiveViewerIp = zmqLiveViewerIp
self.zmqLiveViewerPort = zmqLiveViewerPort
# initialize ring buffer
# get all entries in the directory
# TODO empty target dir -> ringBuffer = []
ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir))
# get the corresponding stats
ringBuffer = ((os.stat(path), path) for path in ringBuffer)
# leave only regular files, insert modification date
ringBuffer = [[stat[ST_MTIME], path]
for stat, path in ringBuffer if S_ISREG(stat[ST_MODE])]
# sort the ring buffer in descending order (new to old files)
ringBuffer = sorted(ringBuffer, reverse=True)
# "global" in Python is per-module !
global globalZmqContext
self.globalZmqContext = zmq.Context()
# thread to communicate with live viewer
#create socket for live viewer
try:
logging.info("creating socket for communication with live viewer...")
zmqContext = self.getZmqContext()
zmqLiveViewerSocket = self.createSocketForLiveViewer(zmqContext)
logging.info("creating socket for communication with live viewer...done.")
except Exception, e:
errorMessage = "Unable to create zeromq context."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.info("creating socket for communication with live viewer...failed.")
raise Exception(e)
t1 = threading.Thread(target=self.sendFileToLiveViewer, args=(zmqLiveViewerSocket,))
t1.start()
try:
logging.info("Start receiving new files")
self.startReceiving(zmqLiveViewerSocket)
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
logging.info("Stopped receiving.")
except Exception, e:
logging.error("Unknown error while receiving files. Need to abort.")
logging.debug("Error was: " + str(e))
except:
trace = traceback.format_exc()
logging.info("Unkown error state. Shutting down...")
logging.debug("Error was: " + str(trace))
self.globalZmqContext.destroy()
logging.info("Quitting.")
def receiveMessage(self, socket):
assert isinstance(socket, zmq.sugar.socket.Socket)
logging.debug("receiving messages...")
# message = socket.recv()
# while True:
message = socket.recv_multipart()
return message
def getZmqContext(self):
#get reference for global context-var
globalZmqContext = self.globalZmqContext
return globalZmqContext
def getZmqSocket_Pull(self, context):
pattern_pull = zmq.PULL
assert isinstance(context, zmq.sugar.context.Context)
socket = context.socket(pattern_pull)
return socket
def getZmqSocket_Rep(self, context):
pattern_pull = zmq.REP
assert isinstance(context, zmq.sugar.context.Context)
socket = context.socket(pattern_pull)
return socket
def createPullSocket(self, context):
assert isinstance(context, zmq.sugar.context.Context)
socket = self.getZmqSocket_Pull(context)
logging.info("binding to data socket: tcp://" + self.zmqDataStreamIp + ":%s" % self.zmqDataStreamPort)
socket.bind('tcp://' + self.zmqDataStreamIp + ':%s' % self.zmqDataStreamPort)
def createSocketForLiveViewer(self, context):
assert isinstance(context, zmq.sugar.context.Context)
socket = self.getZmqSocket_Rep(context)
logging.info("binding to data socket: tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort)
socket.bind('tcp://' + self.zmqLiveViewerIp + ':%s' % self.zmqLiveViewerPort)
return socket
def addFileToRingBuffer(self, filename, fileModTime):
# prepend file to ring buffer and restore order
self.ringBuffer[:0] = [[fileModTime, filename]]
self.ringBuffer = sorted(self.ringBuffer, reverse=True)
# if the maximal size is exceeded: remove the oldest files
if len(self.ringBuffer) > self.maxRingBufferSize:
for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]:
if float(time.time()) - mod_time > self.timeToWaitForRingBuffer:
os.remove(path)
self.ringBuffer.remove([mod_time, path])
# Albula is the live viewer used at the beamlines
def sendFileToLiveViewer(self, zmqLiveViewerSocket):
# if there is a request of the live viewer:
# send first element in ring buffer to live viewer
# while True:
# # Wait for next request from client
# message = zmqLiveViewerSocket.recv()
# print "Received request: ", message
# time.sleep (1)
# socket.send("World from %s" % port)
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def combineMessage(self, zmqSocket):
# multipartMessage = zmqSocket.recv_multipart()
# logging.info("New message received.")
# logging.debug("message-type : " + str(type(multipartMessage)))
# logging.debug("message-length: " + str(len(multipartMessage)))
# loopCounter+=1
#save all chunks to file
while True:
multipartMessage = zmqSocket.recv_multipart()
#append to file
try:
logging.debug("append to file based on multipart-message...")
#TODO: save message to file using a thread (avoids blocking)
#TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
self.appendChunksToFileFromMultipartMessage(multipartMessage)
logging.debug("append to file based on multipart-message...success.")
except Exception, e:
errorMessage = "Unable to append multipart-content to file."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.debug("append to file based on multipart-message...failed.")
except:
errorMessage = "Unable to append multipart-content to file. Unknown Error."
logging.error(errorMessage)
logging.debug("append to file based on multipart-message...failed.")
if len(multipartMessage[1]) == 0:
#indicated end of file. closing file and leave loop
logging.debug("last file-chunk received. stop appending.")
break
payloadMetadata = str(multipartMessage[0])
payloadMetadataDict = json.loads(payloadMetadata)
filename = self.generateTargetFilepath(payloadMetadataDict)
fileModTime = payloadMetadataDict["fileModificationTime"]
logging.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
# logging.debug("message-type : " + str(type(multipartMessage)))
# logging.debug("message-length: " + str(len(multipartMessage)))
# add to ring buffer
self.addFileToRingBuffer(str(filename), fileModTime)
def startReceiving(self, zmqLiveViewerSocket):
#create pull socket
try:
logging.info("creating local pullSocket for incoming files...")
zmqContext = self.getZmqContext()
zmqSocket = self.createPullSocket(zmqContext)
logging.info("creating local pullSocket for incoming files...done.")
except Exception, e:
errorMessage = "Unable to create zeromq context."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
logging.info("creating local pullSocket for incoming files...failed.")
raise Exception(e)
#run loop, and wait for incoming messages
continueStreaming = True
loopCounter = 0 #counter of total received messages
continueReceiving = True #receiving will stop if value gets False
logging.debug("Waiting for new messages...")
while continueReceiving:
try:
self.combineMessage(zmqSocket)
loopCounter+=1
except KeyboardInterrupt:
logging.debug("Keyboard interrupt detected. Stop receiving.")
break
except:
logging.error("receive message...failed.")
logging.error(sys.exc_info())
continueReceiving = False
logging.info("shutting down receiver...")
try:
logging.debug("shutting down zeromq...")
self.stopReceiving(zmqSocket, zmqLiveViewerSocket, zmqContext)
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
logging.debug("shutting down zeromq...done.")
except:
logging.error(sys.exc_info())
logging.error("shutting down zeromq...failed.")
def generateTargetFilepath(self,configDict):
"""
generates full path where target file will saved to.
"""
targetFilename = configDict["filename"]
targetRelativePath = configDict["relativeParent"]
if targetRelativePath is '' or targetRelativePath is None:
targetPath = self.getOutputDir()
else:
targetPath = os.path.join(self.getOutputDir(), targetRelativePath)
targetFilepath = os.path.join(targetPath, targetFilename)
return targetFilepath
def getOutputDir(self):
return self.outputDir
def generateTargetPath(self,configDict):
"""
generates path where target file will saved to.
"""
targetRelativePath = configDict["relativeParent"]
outputDir = self.getOutputDir()
targetPath = os.path.join(outputDir, targetRelativePath)
return targetPath
def appendChunksToFileFromMultipartMessage(self, multipartMessage):
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
#extract multipart message
try:
configDictJson = multipartMessage[0]
except:
logging.error("an empty config was transferred for multipartMessage")
try:
chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata
payload = multipartMessage[1:]
except:
logging.warning("an empty file was received within the multipart-message")
payload = None
#TODO validate multipartMessage (like correct dict-values for metadata)
logging.debug("multipartMessage.metadata = " + str(configDictJson))
#extraction metadata from multipart-message
configDict = json.loads(configDictJson)
#generate target filepath
targetFilepath = self.generateTargetFilepath(configDict)
logging.debug("new file is going to be created at: " + targetFilepath)
#append payload to file
try:
newFile = open(targetFilepath, "a")
except IOError, e:
# errno.ENOENT == "No such file or directory"
if e.errno == errno.ENOENT:
#TODO create subdirectory first, then try to open the file again
try:
targetPath = self.generateTargetPath(configDict)
os.makedirs(targetPath)
newFile = open(targetFilepath, "w")
logging.info("New target directory created: " + str(targetPath))
except Exception, f:
errorMessage = "unable to save payload to file: '" + targetFilepath + "'"
logging.error(errorMessage)
logging.debug("Error was: " + str(f))
logging.debug("targetPath="+str(targetPath))
raise Exception(errorMessage)
except Exception, e:
logging.error("failed to append payload to file: '" + targetFilepath + "'")
logging.debug("Error was: " + str(e))
logging.debug("ErrorTyp: " + str(type(e)))
logging.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST))
#only write data if a payload exist
try:
if payload != None:
for chunk in payload:
newFile.write(chunk)
newFile.close()
except Exception, e:
errorMessage = "unable to append data to file."
logging.error(errorMessage)
logging.debug("Error was: " + str(e))
raise Exception(errorMessage)
def stopReceiving(self, zmqSocket, zmqLiveViewerSocket, msgContext):
try:
logging.debug("closing zmqSocket...")
zmqSocket.close()
logging.debug("closing zmqSocket...done.")
except:
logging.error("closing zmqSocket...failed.")
logging.error(sys.exc_info())
try:
logging.debug("closing zmqLiveViwerSocket...")
zmqLiveViewerSocket.close()
logging.debug("closing zmqLiveViwerSocket...done.")
except:
logging.error("closing zmqLiveViewerSocket...failed.")
logging.error(sys.exc_info())
try:
logging.debug("closing zmqContext...")
msgContext.destroy()
logging.debug("closing zmqContext...done.")
except:
logging.error("closing zmqContext...failed.")
logging.error(sys.exc_info())
def argumentParsing():
parser = argparse.ArgumentParser()
parser.add_argument("--outputDir" , type=str, help="where incoming data will be stored to", default="/tmp/watchdog/data_mirror/")
parser.add_argument("--tcpPortDataStream" , type=int, help="tcp port of data pipe", default=6061)
parser.add_argument("--tcpPortLiveViewer" , type=int, help="tcp port of live viewer", default=6071)
parser.add_argument("--logfile" , type=str, help="file used for logging", default="/tmp/watchdog/fileReceiver.log")
parser.add_argument("--bindingIpForDataStream", type=str, help="local ip to bind dataStream to", default="127.0.0.1")
parser.add_argument("--bindingIpForLiveViewer", type=str, help="local ip to bind LiveViewer to", default="127.0.0.1")
parser.add_argument("--verbose" , help="more verbose output", action="store_true")
arguments = parser.parse_args()
# TODO: check folder-directory for existance
return arguments
def initLogging(filenameFullPath, verbose):
#@see https://docs.python.org/2/howto/logging-cookbook.html
#more detailed logging if verbose-option has been set
loggingLevel = logging.INFO
if verbose:
loggingLevel = logging.DEBUG
#log everything to file
logging.basicConfig(level=loggingLevel,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=filenameFullPath,
filemode="a")
#log info to stdout, display messages with different format than the file output
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
if __name__ == "__main__":
#argument parsing
arguments = argumentParsing()
outputDir = arguments.outputDir
verbose = arguments.verbose
zmqDataStreamIp = str(arguments.bindingIpForDataStream)
zmqDataStreamPort = str(arguments.tcpPortDataStream)
zmqLiveViewerIp = str(arguments.bindingIpForLiveViewer)
zmqLiveViewerPort = str(arguments.tcpPortLiveViewer)
logFile = arguments.logfile
logfileFilePath = arguments.logfile
#enable logging
initLogging(logfileFilePath, verbose)
#start file receiver
myWorker = FileReceiver(outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp)