Newer
Older
from __builtin__ import open, type
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import zmq
import os
import sys
import logging
import traceback
import cPickle
try:
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
except:
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) )))
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
#
# -------------------------- class: DataDispatcher --------------------------------------
#
class DataDispatcher():
def __init__(self, id, routerPort, chunkSize, fixedStreamId, logQueue, localTarget = None, context = None):
self.log.debug("DataDispatcher Nr. " + str(self.id) + " started.")
self.localhost = "127.0.0.1"
self.extIp = "0.0.0.0"
self.routerPort = routerPort
self.chunkSize = chunkSize
self.fixedStreamId = fixedStreamId
self.localTarget = localTarget
# dict with informations of all open sockets to which a data stream is opened (host, port,...)
self.openConnections = dict()
if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
self.createSockets()
try:
self.process()
except KeyboardInterrupt:
self.log.debug("KeyboardInterrupt detected. Shutting down DataDispatcher Nr. " + str(self.id) + ".")
except:
self.log.error("Stopping DataDispatcher Nr " + str(self.id) + " due to unknown error condition.", exc_info=True)
def createSockets(self):
self.routerSocket = self.context.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.routerPort )
self.log.info("Start routerSocket (connect): '" + str(connectionStr) + "'")
# Send all logs to the main process
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def getLogger (self, queue):
# Create log and set handler to queue handle
h = QueueHandler(queue) # Just the one handler needed
logger = logging.getLogger("DataDispatcher-" + str(self.id))
logger.addHandler(h)
logger.setLevel(logging.DEBUG)
return logger
def process(self):
while True:
# Get workload from router, until finished
self.log.debug("DataDispatcher-" + str(self.id) + ": waiting for new job")
message = self.routerSocket.recv_multipart()
self.log.debug("DataDispatcher-" + str(self.id) + ": new job received")
workload = cPickle.loads(message[0])
targets = cPickle.loads(message[1])
if self.fixedStreamId:
targets.insert(0,[self.fixedStreamId, 0])
# sort the target list by the priority
targets = sorted(targets, key=lambda target: target[1])
if finished:
self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
break
workload = cPickle.loads(message[0])
if self.fixedStreamId:
targets = [[self.fixedStreamId, 0]]
else:
targets = None
# get metadata of the file
try:
self.log.debug("Getting file metadata")
sourceFile, targetFile, metadata = self.getMetadata(workload)
except:
self.log.error("Building of metadata dictionary failed for workload: " + str(workload) + ".", exc_info=True)
#skip all further instructions and continue with next iteration
continue
# send data
if targets:
try:
self.sendData(targets, sourceFile, metadata)
except:
self.log.error("DataDispatcher-"+str(self.id) + ": Passing new file to data Stream...failed.", exc_info=True)
# remove file
try:
os.remove(sourceFile)
self.log.info("Removing file '" + str(sourceFile) + "' ...success.")
except:
self.log.error("Unable to remove file " + str(sourceFile), exc_info=True)
else:
# move file
try:
shutil.move(sourceFile, targetFile)
self.log.info("Moving file '" + str(sourceFile) + "' ...success.")
except:
self.log.error("Unable to move file " + str(sourceFile), exc_info=True)
# send file to cleaner pipe
# try:
# #sending to pipe
# self.log.debug("send file-event for file to cleaner-pipe...")
# self.log.debug("metadata = " + str(metadata))
# self.cleanerSocket.send(cPickle.dumps(metadata))
# self.log.debug("send file-event for file to cleaner-pipe...success.")
#
# #TODO: remember workload. append to list?
# # can be used to verify files which have been processed twice or more
# except:
# self.log.error("Unable to notify Cleaner-pipe to handle file: " + str(workload), exc_info=True)
def getMetadata(self, metadata):
#extract fileEvent metadata
try:
#TODO validate metadata dict
filename = metadata["filename"]
sourcePath = metadata["sourcePath"]
relativePath = metadata["relativePath"]
except:
self.log.error("Invalid fileEvent message received.", exc_info=True)
self.log.debug("metadata=" + str(metadata))
#skip all further instructions and continue with next iteration
# filename = "img.tiff"
# filepath = "C:\dir"
#
# --> sourceFilePathFull = 'C:\\dir\img.tiff'
sourceFilePath = os.path.normpath(sourcePath + os.sep + relativePath)
sourceFilePathFull = os.path.join(sourceFilePath, filename)
#TODO combine better with sourceFile... (for efficiency)
if self.localTarget:
targetFilePath = os.path.normpath(self.localTarget + os.sep + relativePath)
targetFilePathFull = os.path.join(targetFilePath, filename)
else:
targetFilePathFull = None
try:
#for quick testing set filesize of file as chunksize
self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...")
filesize = os.path.getsize(sourceFilePathFull)
fileModificationTime = os.stat(sourceFilePathFull).st_mtime
chunksize = filesize #can be used later on to split multipart message
self.log.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize)))
self.log.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime)))
except:
self.log.error("Unable to create metadata dictionary.", exc_info=True)
raise
#build payload for message-pipe by putting source-file into a message
try:
self.log.debug("create metadata for source file...")
#metadata = {
# "filename" : filename,
# "sourcePath" : sourcePath,
# "relativePath" : relativePath,
# "filesize" : filesize,
# "fileModificationTime" : fileModificationTime,
# "chunkSize" : self.zmqMessageChunkSize
# }
metadata[ "filesize" ] = filesize
metadata[ "fileModificationTime" ] = fileModificationTime
self.log.debug("metadata = " + str(metadata))
except:
self.log.error("Unable to assemble multi-part message.", exc_info=True)
raise
return sourceFilePathFull, targetFilePathFull, metadata
def sendData(self, targets, sourceFilepath, metadata):
#reading source file into memory
try:
self.log.debug("Opening '" + str(sourceFilepath) + "'...")
fileDescriptor = open(str(sourceFilepath), "rb")
except:
self.log.error("Unable to read source file '" + str(sourceFilepath) + "'.", exc_info=True)
raise
#send message
try:
self.log.debug("Passing multipart-message for file " + str(sourceFilepath) + "...")
chunkNumber = 0
stillChunksToRead = True
while stillChunksToRead:
chunkNumber += 1
#read next chunk from file
#detect if end of file has been reached
if not fileContent:
stillChunksToRead = False
#as chunk is empty decrease chunck-counter
chunkNumber -= 1
break
#assemble metadata for zmq-message
chunkPayloadMetadata["chunkNumber"] = chunkNumber
chunkPayloadMetadataJson = cPickle.dumps(chunkPayloadMetadata)
chunkPayload = []
chunkPayload.append(chunkPayloadMetadataJson)
chunkPayload.append(fileContent)
# streaming data
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# send data to the data stream to store it in the storage system
if prio == 0:
# socket already known
if target in self.openConnections:
tracker = self.openConnections[target].send_multipart(chunkPayload, copy=False, track=True)
self.log.info("Sending message part from file " + str(sourceFilepath) + " to '" + target + "' with priority " + str(prio) )
else:
# open socket
socket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
socket.connect(connectionStr)
self.log.info("Start socket (connect): '" + str(connectionStr) + "'")
# register socket
self.openConnections[target] = socket
# send data
tracker = self.openConnections[target].send_multipart(chunkPayload, copy=False, track=True)
self.log.info("Sending message part from file " + str(sourceFilepath) + " to '" + target + "' with priority " + str(prio) )
# socket not known
if not tracker.done:
self.log.info("Message part from file " + str(sourceFilepath) + " has not been sent yet, waiting...")
tracker.wait()
self.log.info("Message part from file " + str(sourceFilepath) + " has not been sent yet, waiting...done")
else:
# socket already known
if target in self.openConnections:
# send data
self.openConnections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
self.log.info("Sending message part from file " + str(sourceFilepath) + " to " + target)
# socket not known
else:
# open socket
socket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
socket.connect(connectionStr)
self.log.info("Start socket (connect): '" + str(connectionStr) + "'")
# register socket
self.openConnections[target] = socket
# send data
self.openConnections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
self.log.info("Sending message part from file " + str(sourceFilepath) + " to " + target)
#close file
fileDescriptor.close()
self.log.debug("Passing multipart-message for file " + str(sourceFilepath) + "...done.")
except:
self.log.error("Unable to send multipart-message for file " + str(sourceFilepath), exc_info=True)
def stop(self):
self.log.debug("Closing sockets for DataDispatcher-" + str(self.id))
for connection in self.openConnections:
if self.openConnections[connection]:
self.openConnections[connection].close(0)
self.openConnections[connection] = None
self.routerSocket.close(0)
self.routerSocket = None
self.log.debug("Destroying context")
def __exit__(self):
self.stop()
def __del__(self):
self.stop()
from multiprocessing import Process, freeze_support, Queue
freeze_support() #see https://docs.python.org/2/library/multiprocessing.html#windows
logfile = BASE_PATH + os.sep + "logs" + os.sep + "dataDispatcher.log"
logQueue = Queue(-1)
# Get the log Configuration for the lisener
h1, h2 = helpers.getLogHandlers(logfile, verbose=True, onScreenLogLevel="debug")
# Start queue listener using the stream handler above
logQueueListener = helpers.CustomQueueListener(logQueue, h1, h2)
logQueueListener.start()
# Create log and set handler to queue handle
root = logging.getLogger()
root.setLevel(logging.DEBUG) # Log level = DEBUG
qh = QueueHandler(logQueue)
root.addHandler(qh)
sourceFile = BASE_PATH + os.sep + "test_file.cbf"
targetFile = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep + "100.cbf"
time.sleep(0.5)
routerPort = "7000"
receivingPort = "6005"
receivingPort2 = "6006"
chunkSize = 10485760 ; # = 1024*1024*10 = 10 MiB
localTarget = BASE_PATH + os.sep + "data" + os.sep + "target"
fixedStreamId = "localhost:6006"
logConfig = "test"
dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, routerPort, chunkSize, fixedStreamId, logQueue, localTarget) )
dataDispatcherPr.start()
context = zmq.Context.instance()
routerSocket = context.socket(zmq.PUSH)
connectionStr = "tcp://127.0.0.1:" + routerPort
routerSocket.bind(connectionStr)
logging.info("=== routerSocket connected to " + connectionStr)
receivingSocket = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + receivingPort
receivingSocket.bind(connectionStr)
logging.info("=== receivingSocket connected to " + connectionStr)
receivingSocket2 = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + receivingPort2
receivingSocket2.bind(connectionStr)
logging.info("=== receivingSocket2 connected to " + connectionStr)
metadata = {
"sourcePath" : BASE_PATH + os.sep +"data" + os.sep + "source",
"relativePath": os.sep + "local" + os.sep + "raw",
"filename" : "100.cbf"
}
targets = [['localhost:6005', 1], ['localhost:6006', 0]]
message = [ cPickle.dumps(metadata), cPickle.dumps(targets) ]
time.sleep(1)
workload = routerSocket.send_multipart(message)
logging.info("=== send message")
try:
recv_message = receivingSocket.recv_multipart()
logging.info("=== received: " + str(cPickle.loads(recv_message[0])))
recv_message = receivingSocket2.recv_multipart()
logging.info("=== received 2: " + str(cPickle.loads(recv_message[0])))
finally:
dataDispatcherPr.terminate()
routerSocket.close(0)
receivingSocket.close(0)
receivingSocket2.close(0)
logQueue.put_nowait(None)
logQueueListener.stop()