Newer
Older
__author__ = 'Marco Strutz <marco.strutz@desy.de>', 'Manuela Kuhn <marnuel.kuhn@desy.de>'
import time
import zmq
import sys
import random
import json
import argparse
import logging
import errno
import os
import traceback
from stat import S_ISREG, ST_MTIME, ST_MODE
import threading
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class Coordinator:
zmqContext = None
liveViewerZmqContext = None
outputDir = None
zqmDataStreamIp = None
zmqDataStreamPort = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
liveViewerExchangeIp = "127.0.0.1"
liveViewerExchangePort = "6072"
receiverExchangeIp = "127.0.0.1"
receiverExchangePort = "6073"
ringBuffer = []
maxRingBufferSize = 200
timeToWaitForRingBuffer = 2
log = None
receiverThread = None
liveViewerThread = None
def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None):
self.outputDir = outputDir
self.zmqDataStreamIp = zmqDataStreamIp
self.zmqDataStreamPort = zmqDataStreamPort
self.zmqLiveViewerIp = zmqLiveViewerIp
self.zmqLiveViewerPort = zmqLiveViewerPort
self.log = self.getLogger()
self.log.debug("Init")
if context:
assert isinstance(context, zmq.sugar.context.Context)
self.zmqContext = context or zmq.Context()
# create sockets
self.receiverExchangeSocket = self.zmqContext.socket(zmq.PAIL)
connectionStrReceiverExchangeSocket = "tcp://" + self.receiverExchangeIp + ":%s" % self.receiverExchangePort
self.zmqSocket.bind(connectionStrReceiverExchangeSocket)
self.log.debug("receiverExchangeSocket started (bind) for '" + connectionStrReceiverExchangeSocket + "'")
self.liveViewerExchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrLiveViewerExchangeSocket = "tcp://" + self.liveViewerExchangeIp + ":%s" % self.liveViewerExchangePort
self.liveViewerExchangeSocket.bind(connectionStrLiveViewerExchangeSocket)
self.log.debug("liveViewerExchangeSocket started (bind) for '" + connectionStrLiveViewerExchangeSocket + "'")
self.poller = zmq.Poller()
self.poller.register(self.receiverExchangeSocket, zmq.POLLIN)
self.poller.register(self.liveViewerExchangeSocket, zmq.POLLIN)
# start file receiver
self.receiverThread = threading.Thread(target=FileReceiver, args=(self.outputDir, self.zmqDataStreamPort, self.zmqDataStreamIp, self.zmqLiveViewerPort, self.zmqLiveViewerIp))
self.receiverThread.start()
# thread to communicate with live viewer
self.liveViewerThread = threading.Thread(target=LiveViewer)
self.liveViewerThread.start()
# initialize ring buffer
# get all entries in the directory
# TODO empty target dir -> ringBuffer = []
self.ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir))
# get the corresponding stats
self.ringBuffer = ((os.stat(path), path) for path in self.ringBuffer)
# leave only regular files, insert modification date
self.ringBuffer = [[stat[ST_MTIME], path]
for stat, path in self.ringBuffer if S_ISREG(stat[ST_MODE])]
# sort the ring buffer in descending order (new to old files)
self.ringBuffer = sorted(self.ringBuffer, reverse=True)
self.log.debug("Init ring buffer")
try:
self.log.info("Start communication")
self.communicateWithThreads()
self.log.info("Stopped communication.")
except Exception, e:
trace = traceback.format_exc()
self.log.info("Unkown error state. Shutting down...")
self.log.debug("Error was: " + str(e))
self.log.debug("Closing socket")
self.receiverExchangeSocket.close()
self.liveViewerExchangeSocket.close()
self.log.info("Quitting.")
def getLogger(self):
logger = logging.getLogger("coordinator")
return logger
def communicateWithThreads(self):
should_continue = True
while should_continue:
socks = dict(self.poller.poll())
if self.receiverExchangeSocket in socks and socks[self.receiverExchangeSocket] == zmq.POLLIN:
message = self.receiverExchangeSocket.recv()
self.log.debug("Recieved control command: %s" % message )
if message == "Exit":
self.log.debug("Recieved exit command, liveViewer thread will stop recieving messages")
should_continue = False
self.liveViewerSocket.send("Exit")
break
elif message.startswith("AddFile"):
# add file to ring buffer
filename, fileModTime = message[7:].split(", ")
addFileToRingBuffer(filename, fileModTime)
if self.liveViewerExchangeSocket in socks and socks[self.liveViewerExchangeSocket] == zmq.POLLIN:
message = self.liveViewerExchangeSocket.recv()
self.log.debug("Call for next file... ")
# send first element in ring buffer to live viewer (the path of this file is the second entry)
if self.ringBuffer:
answer = self.ringBuffer[0][1]
else:
answer = "None"
print answer
try:
self.liveViewerExchangeSocket.send(answer)
except zmq.error.ContextTerminated:
break
def addFileToRingBuffer(self, filename, fileModTime):
# prepend file to ring buffer and restore order
self.ringBuffer[:0] = [[fileModTime, filename]]
self.ringBuffer = sorted(self.ringBuffer, reverse=True)
# if the maximal size is exceeded: remove the oldest files
if len(self.ringBuffer) > self.maxRingBufferSize:
for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]:
if float(time.time()) - mod_time > self.timeToWaitForRingBuffer:
os.remove(path)
self.ringBuffer.remove([mod_time, path])
outputDir = None
zqmDataStreamIp = None
zmqDataStreamPort = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
zmqLiveViewerExchangeIp = "127.0.0.1"
zmqLiveViewerExchangePort = "6072"
ringBuffer = []
maxRingBufferSize = 200
timeToWaitForRingBuffer = 2
def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None):
self.outputDir = outputDir
self.zmqDataStreamIp = zmqDataStreamIp
self.zmqDataStreamPort = zmqDataStreamPort
self.zmqLiveViewerIp = zmqLiveViewerIp
self.zmqLiveViewerPort = zmqLiveViewerPort
if context:
assert isinstance(context, zmq.sugar.context.Context)
self.zmqContext = context or zmq.Context()
self.log = self.getLogger()
self.log.debug("Init")
# create pull socket
self.zmqSocket = self.zmqContext.socket(zmq.PULL)
connectionStrZmqSocket = "tcp://" + self.zmqDataStreamIp + ":%s" % self.zmqDataStreamPort
self.zmqSocket.bind(connectionStrZmqSocket)
self.log.debug("zmqSocket started for '" + connectionStrZmqSocket + "'")
self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrExchangeSocket = "tcp://" + self.zmqLiveViewerExchangeIp + ":%s" % self.zmqLiveViewerExchangePort
self.exchangeSocket.bind(connectionStrExchangeSocket)
self.log.debug("exchangeSocket started (bind) for '" + connectionStrExchangeSocket + "'")
self.liveViewerThread = threading.Thread(target=LiveViewer)
# initialize ring buffer
# get all entries in the directory
# TODO empty target dir -> ringBuffer = []
ringBuffer = (os.path.join(self.outputDir, fn) for fn in os.listdir(self.outputDir))
# get the corresponding stats
ringBuffer = ((os.stat(path), path) for path in ringBuffer)
# leave only regular files, insert modification date
ringBuffer = [[stat[ST_MTIME], path]
for stat, path in ringBuffer if S_ISREG(stat[ST_MODE])]
# sort the ring buffer in descending order (new to old files)
ringBuffer = sorted(ringBuffer, reverse=True)
self.log.info("Start receiving new files")
self.log.info("Stopped receiving.")
self.log.error("Unknown error while receiving files. Need to abort.")
self.log.debug("Error was: " + str(e))
except:
trace = traceback.format_exc()
self.log.info("Unkown error state. Shutting down...")
self.log.debug("Error was: " + str(trace))
self.log.info("Quitting.")
def getLogger(self):
logger = logging.getLogger("fileReceiver")
return logger
def addFileToRingBuffer(self, filename, fileModTime):
# prepend file to ring buffer and restore order
self.ringBuffer[:0] = [[fileModTime, filename]]
self.ringBuffer = sorted(self.ringBuffer, reverse=True)
# if the maximal size is exceeded: remove the oldest files
if len(self.ringBuffer) > self.maxRingBufferSize:
for mod_time, path in self.ringBuffer[self.maxRingBufferSize:]:
if float(time.time()) - mod_time > self.timeToWaitForRingBuffer:
os.remove(path)
self.ringBuffer.remove([mod_time, path])
def combineMessage(self, zmqSocket):
#save all chunks to file
multipartMessage = zmqSocket.recv_multipart()
#extract multipart message
try:
#TODO is string conversion needed here?
payloadMetadata = str(multipartMessage[0])
except:
self.log.error("an empty config was transferred for multipartMessage")
#TODO validate multipartMessage (like correct dict-values for metadata)
self.log.debug("multipartMessage.metadata = " + str(payloadMetadata))
#extraction metadata from multipart-message
payloadMetadataDict = json.loads(payloadMetadata)
#append to file
try:
self.log.debug("append to file based on multipart-message...")
#TODO: save message to file using a thread (avoids blocking)
#TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
self.appendChunksToFileFromMultipartMessage(payloadMetadataDict, multipartMessage)
self.log.debug("append to file based on multipart-message...success.")
except Exception, e:
errorMessage = "Unable to append multipart-content to file."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
self.log.debug("append to file based on multipart-message...failed.")
except:
errorMessage = "Unable to append multipart-content to file. Unknown Error."
self.log.error(errorMessage)
self.log.debug("append to file based on multipart-message...failed.")
if len(multipartMessage[1]) < payloadMetadataDict["chunkSize"] :
#indicated end of file. closing file and leave loop
self.log.debug("last file-chunk received. stop appending.")
break
filename = self.generateTargetFilepath(payloadMetadataDict)
fileModTime = payloadMetadataDict["fileModificationTime"]
self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
# add to ring buffer
self.log.debug("add file to ring buffer: "+ str(filename) + ", " + str(fileModTime))
self.addFileToRingBuffer(str(filename), fileModTime)
#run loop, and wait for incoming messages
continueStreaming = True
loopCounter = 0 #counter of total received messages
continueReceiving = True #receiving will stop if value gets False
self.log.debug("Waiting for new messages...")
while continueReceiving:
try:
loopCounter+=1
except KeyboardInterrupt:
self.log.debug("Keyboard interrupt detected. Stop receiving.")
self.log.error("receive message...failed.")
self.log.error(sys.exc_info())
self.log.info("shutting down receiver...")
self.stopReceiving(self.zmqSocket, self.zmqContext)
self.log.debug("shutting down receiver...done.")
self.log.error(sys.exc_info())
self.log.error("shutting down receiver...failed.")
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def generateTargetFilepath(self,configDict):
"""
generates full path where target file will saved to.
"""
targetFilename = configDict["filename"]
targetRelativePath = configDict["relativeParent"]
if targetRelativePath is '' or targetRelativePath is None:
targetPath = self.getOutputDir()
else:
targetPath = os.path.join(self.getOutputDir(), targetRelativePath)
targetFilepath = os.path.join(targetPath, targetFilename)
return targetFilepath
def getOutputDir(self):
return self.outputDir
def generateTargetPath(self,configDict):
"""
generates path where target file will saved to.
"""
targetRelativePath = configDict["relativeParent"]
outputDir = self.getOutputDir()
targetPath = os.path.join(outputDir, targetRelativePath)
return targetPath
def appendChunksToFileFromMultipartMessage(self, configDict, multipartMessage):
try:
chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata
payload = multipartMessage[1:]
except:
self.log.warning("an empty file was received within the multipart-message")
payload = None
#generate target filepath
targetFilepath = self.generateTargetFilepath(configDict)
self.log.debug("new file is going to be created at: " + targetFilepath)
#append payload to file
try:
newFile = open(targetFilepath, "a")
except IOError, e:
# errno.ENOENT == "No such file or directory"
if e.errno == errno.ENOENT:
#TODO create subdirectory first, then try to open the file again
try:
targetPath = self.generateTargetPath(configDict)
os.makedirs(targetPath)
newFile = open(targetFilepath, "w")
self.log.info("New target directory created: " + str(targetPath))
except Exception, f:
errorMessage = "unable to save payload to file: '" + targetFilepath + "'"
self.log.error(errorMessage)
self.log.debug("Error was: " + str(f))
self.log.debug("targetPath="+str(targetPath))
raise Exception(errorMessage)
except Exception, e:
self.log.error("failed to append payload to file: '" + targetFilepath + "'")
self.log.debug("Error was: " + str(e))
self.log.debug("ErrorTyp: " + str(type(e)))
self.log.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST))
#only write data if a payload exist
try:
if payload != None:
for chunk in payload:
newFile.write(chunk)
newFile.close()
except Exception, e:
errorMessage = "unable to append data to file."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
raise Exception(errorMessage)
def stopReceiving(self, zmqSocket, zmqContext):
self.log.debug("stopReceiving...")
self.log.debug("closing zmqSocket...done.")
self.log.error("closing zmqSocket...failed.")
self.log.error(sys.exc_info())
self.log.debug("sending exit signal to thread...")
self.log.debug("sending exit signal to thread...done")
self.log.debug("closing zmqContext...done.")
self.log.error("closing zmqContext...failed.")
self.log.error(sys.exc_info())
class LiveViewer():
zmqContext = None
liveViewerIp = None
liveViewerPort = None
exchangeIp = "127.0.0.1"
exchangePort = "6072"
log = None
liveViewerSocket = None
exchangeSocket = None
def __init__(self, liveViewerIp = "127.0.0.1", liveViewerPort = "6071", context = None):
self.liveViewerIp = liveViewerIp
self.liveViewerPort = liveViewerPort
if context:
assert isinstance(context, zmq.sugar.context.Context)
self.zmqContext = context or zmq.Context()
self.log = self.getLogger()
self.log.debug("Init")
# create socket for live viewer
self.liveViewerSocket = self.zmqContext.socket(zmq.REP)
connectionStrLiveViewerSocket = "tcp://" + self.liveViewerIp + ":%s" % self.liveViewerPort
self.liveViewerSocket.bind(connectionStrLiveViewerSocket)
self.log.debug("zmqLiveViewerSocket started for '" + connectionStrLiveViewerSocket + "'")
# create socket for message exchange
self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrExchangeSocket = "tcp://" + self.exchangeIp + ":%s" % self.exchangePort
self.exchangeSocket.connect(connectionStrExchangeSocket)
self.log.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'")
self.poller = zmq.Poller()
self.poller.register(self.liveViewerSocket, zmq.POLLIN)
self.poller.register(self.exchangeSocket, zmq.POLLIN)
self.sendFileToLiveViewer()
def getLogger(self):
logger = logging.getLogger("liveViewer")
return logger
def sendFileToLiveViewer(self):
should_continue = True
while should_continue:
socks = dict(self.poller.poll())
if self.exchangeSocket in socks and socks[self.exchangeSocket] == zmq.POLLIN:
message = self.exchangeSocket.recv()
self.log.debug("Recieved control command: %s" % message )
self.log.debug("Recieved exit command, liveViewer thread will stop recieving messages")
should_continue = False
break
if self.liveViewerSocket in socks and socks[self.liveViewerSocket] == zmq.POLLIN:
message = self.liveViewerSocket.recv()
self.log.debug("Call for next file... ")
# send first element in ring buffer to live viewer (the path of this file is the second entry)
if self.ringBuffer:
answer = self.ringBuffer[0][1]
else:
answer = "None"
print answer
try:
self.liveViewerSocket.send(answer)
except zmq.error.ContextTerminated:
break
self.log.debug("LiveViewerThread: closing socket")
self.liveViewerSocket.close()
self.exchangeSocket.close()
def argumentParsing():
parser = argparse.ArgumentParser()
parser.add_argument("--outputDir" , type=str, help="where incoming data will be stored to", default="/tmp/watchdog/data_mirror/")
parser.add_argument("--tcpPortDataStream" , type=int, help="tcp port of data pipe", default=6061)
parser.add_argument("--tcpPortLiveViewer" , type=int, help="tcp port of live viewer", default=6071)
parser.add_argument("--logfile" , type=str, help="file used for logging", default="/tmp/watchdog/fileReceiver.log")
parser.add_argument("--bindingIpForDataStream", type=str, help="local ip to bind dataStream to", default="127.0.0.1")
parser.add_argument("--bindingIpForLiveViewer", type=str, help="local ip to bind LiveViewer to", default="127.0.0.1")
parser.add_argument("--verbose" , help="more verbose output", action="store_true")
arguments = parser.parse_args()
# TODO: check folder-directory for existance
return arguments
def initLogging(filenameFullPath, verbose):
#@see https://docs.python.org/2/howto/logging-cookbook.html
#more detailed logging if verbose-option has been set
loggingLevel = logging.INFO
if verbose:
loggingLevel = logging.DEBUG
#log everything to file
logging.basicConfig(level=loggingLevel,
format='[%(asctime)s] [PID %(process)d] [%(filename)s] [%(module)s:%(funcName)s:%(lineno)d] [%(name)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d_%H:%M:%S',
filename=filenameFullPath,
filemode="a")
#log info to stdout, display messages with different format than the file output
console = logging.StreamHandler()
console.setLevel(logging.WARNING)
formatter = logging.Formatter("%(asctime)s > %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
if __name__ == "__main__":
#argument parsing
arguments = argumentParsing()
outputDir = arguments.outputDir
verbose = arguments.verbose
zmqDataStreamIp = str(arguments.bindingIpForDataStream)
zmqDataStreamPort = str(arguments.tcpPortDataStream)
zmqLiveViewerIp = str(arguments.bindingIpForLiveViewer)
zmqLiveViewerPort = str(arguments.tcpPortLiveViewer)
logFile = arguments.logfile
logfileFilePath = arguments.logfile
#enable logging
initLogging(logfileFilePath, verbose)
#start file receiver
myWorker = FileReceiver(outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp)