Skip to content
Snippets Groups Projects
DataDispatcher.py 17.1 KiB
Newer Older
from __builtin__ import open, type

__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'

import zmq
import os
import sys
import logging
import traceback
import shutil
from logutils.queue import QueueHandler
try:
    BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
except:
    BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) )))
SHARED_PATH  = BASE_PATH + os.sep + "src" + os.sep + "shared"

if not SHARED_PATH in sys.path:
    sys.path.append ( SHARED_PATH )
del SHARED_PATH

import helpers

#
#  --------------------------  class: DataDispatcher  --------------------------------------
#
class DataDispatcher():

    def __init__(self, id, routerPort, chunkSize, fixedStreamId, logQueue, localTarget = None, context = None):
Manuela Kuhn's avatar
Manuela Kuhn committed

        self.id           = id
        self.log          = self.getLogger(logQueue)

        self.log.debug("DataDispatcher Nr. " + str(self.id) + " started.")

Manuela Kuhn's avatar
Manuela Kuhn committed
        self.localhost    = "127.0.0.1"
        self.extIp        = "0.0.0.0"
        self.routerPort   = routerPort
        self.chunkSize    = chunkSize

        self.routerSocket = None

        self.fixedStreamId = fixedStreamId
        self.localTarget = localTarget

        # dict with informations of all open sockets to which a data stream is opened (host, port,...)
        self.openConnections = dict()

        if context:
            self.context    = context
            self.extContext = True
        else:
            self.context    = zmq.Context()
            self.extContext = False

        self.createSockets()

        try:
            self.process()
        except KeyboardInterrupt:
            self.log.debug("KeyboardInterrupt detected. Shutting down DataDispatcher Nr. " + str(self.id) + ".")
        except:
            self.log.error("Stopping DataDispatcher Nr " + str(self.id) + " due to unknown error condition.", exc_info=True)


    def createSockets(self):
        self.routerSocket = self.context.socket(zmq.PULL)
        connectionStr  = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.routerPort )
Manuela Kuhn's avatar
Manuela Kuhn committed
        self.routerSocket.connect(connectionStr)
        self.log.info("Start routerSocket (connect): '" + str(connectionStr) + "'")


    # Send all logs to the main process
    # The worker configuration is done at the start of the worker process run.
    # Note that on Windows you can't rely on fork semantics, so each process
    # will run the logging configuration code when it starts.
    def getLogger (self, queue):
        # Create log and set handler to queue handle
        h = QueueHandler(queue) # Just the one handler needed
        logger = logging.getLogger("DataDispatcher-" + str(self.id))
        logger.propagate = False
        logger.addHandler(h)
        logger.setLevel(logging.DEBUG)

        return logger


    def process(self):

        while True:
            # Get workload from router, until finished
            self.log.debug("DataDispatcher-" + str(self.id) + ": waiting for new job")
            message = self.routerSocket.recv_multipart()
            self.log.debug("DataDispatcher-" + str(self.id) + ": new job received")
            self.log.debug("message = " + str(message))
            if len(message) >= 2:
                workload = cPickle.loads(message[0])
                targets  = cPickle.loads(message[1])
                if self.fixedStreamId:
                    targets.insert(0,[self.fixedStreamId, 0])
                # sort the target list by the priority
                targets = sorted(targets, key=lambda target: target[1])
                finished = message[0] == b"EXIT"
                if finished:
                    self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
                    break
                workload = cPickle.loads(message[0])
                if self.fixedStreamId:
                    targets = [[self.fixedStreamId, 0]]
                else:
                    targets = None

            # get metadata of the file
            try:
                self.log.debug("Getting file metadata")
                sourceFile, targetFile, metadata = self.getMetadata(workload)
            except:
                self.log.error("Building of metadata dictionary failed for workload: " + str(workload) + ".", exc_info=True)
                #skip all further instructions and continue with next iteration
                continue

            # send data
            if targets:
                try:
                    self.sendData(targets, sourceFile, metadata)
                except:
                    self.log.error("DataDispatcher-"+str(self.id) + ": Passing new file to data Stream...failed.", exc_info=True)

                # remove file
                try:
                    os.remove(sourceFile)
                    self.log.info("Removing file '" + str(sourceFile) + "' ...success.")
                except:
                    self.log.error("Unable to remove file " + str(sourceFile), exc_info=True)
            else:
                # move file
                try:
                    shutil.move(sourceFile, targetFile)
                    self.log.info("Moving file '" + str(sourceFile) + "' ...success.")
                except:
                    self.log.error("Unable to move file " + str(sourceFile), exc_info=True)


            # send file to cleaner pipe
#            try:
#                #sending to pipe
#                self.log.debug("send file-event for file to cleaner-pipe...")
#                self.log.debug("metadata = " + str(metadata))
#                self.cleanerSocket.send(cPickle.dumps(metadata))
#                self.log.debug("send file-event for file to cleaner-pipe...success.")
#
#                #TODO: remember workload. append to list?
#                # can be used to verify files which have been processed twice or more
#            except:
#                self.log.error("Unable to notify Cleaner-pipe to handle file: " + str(workload), exc_info=True)
    def getMetadata(self, metadata):

        #extract fileEvent metadata
        try:
            #TODO validate metadata dict
            filename     = metadata["filename"]
            sourcePath   = metadata["sourcePath"]
            relativePath = metadata["relativePath"]
        except:
            self.log.error("Invalid fileEvent message received.", exc_info=True)
            self.log.debug("metadata=" + str(metadata))
            #skip all further instructions and continue with next iteration

        # filename = "img.tiff"
        # filepath = "C:\dir"
        #
        # -->  sourceFilePathFull = 'C:\\dir\img.tiff'
        sourceFilePath     = os.path.normpath(sourcePath + os.sep + relativePath)
        sourceFilePathFull = os.path.join(sourceFilePath, filename)

        #TODO combine better with sourceFile... (for efficiency)
        if self.localTarget:
            targetFilePath     = os.path.normpath(self.localTarget + os.sep + relativePath)
            targetFilePathFull = os.path.join(targetFilePath, filename)
        else:
            targetFilePathFull = None

        try:
            #for quick testing set filesize of file as chunksize
            self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...")
            filesize             = os.path.getsize(sourceFilePathFull)
            fileModificationTime = os.stat(sourceFilePathFull).st_mtime
            chunksize            = filesize    #can be used later on to split multipart message
            self.log.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize)))
            self.log.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime)))

        except:
            self.log.error("Unable to create metadata dictionary.", exc_info=True)
            raise

        #build payload for message-pipe by putting source-file into a message
        try:
            self.log.debug("create metadata for source file...")
            #metadata = {
            #                 "filename"             : filename,
            #                 "sourcePath"           : sourcePath,
            #                 "relativePath"         : relativePath,
            #                 "filesize"             : filesize,
            #                 "fileModificationTime" : fileModificationTime,
            #                 "chunkSize"            : self.zmqMessageChunkSize
            #                 }
            metadata[ "filesize"             ] = filesize
            metadata[ "fileModificationTime" ] = fileModificationTime
Manuela Kuhn's avatar
Manuela Kuhn committed
            metadata[ "chunkSize"            ] = self.chunkSize

            self.log.debug("metadata = " + str(metadata))
        except:
            self.log.error("Unable to assemble multi-part message.", exc_info=True)
            raise
        return sourceFilePathFull, targetFilePathFull, metadata


    def sendData(self, targets, sourceFilepath, metadata):
        #reading source file into memory
        try:
            self.log.debug("Opening '" + str(sourceFilepath) + "'...")
            fileDescriptor = open(str(sourceFilepath), "rb")
        except:
            self.log.error("Unable to read source file '" + str(sourceFilepath) + "'.", exc_info=True)
            raise

        #send message
        try:
            self.log.debug("Passing multipart-message for file " + str(sourceFilepath) + "...")
            chunkNumber = 0
            stillChunksToRead = True
            while stillChunksToRead:
                chunkNumber += 1

                #read next chunk from file
Manuela Kuhn's avatar
Manuela Kuhn committed
                fileContent = fileDescriptor.read(self.chunkSize)

                #detect if end of file has been reached
                if not fileContent:
                    stillChunksToRead = False

                    #as chunk is empty decrease chunck-counter
                    chunkNumber -= 1
                    break

                #assemble metadata for zmq-message
Manuela Kuhn's avatar
Manuela Kuhn committed
                chunkPayloadMetadata = metadata.copy()
                chunkPayloadMetadata["chunkNumber"] = chunkNumber
                chunkPayloadMetadataJson = cPickle.dumps(chunkPayloadMetadata)
                chunkPayload = []
                chunkPayload.append(chunkPayloadMetadataJson)
                chunkPayload.append(fileContent)

                # streaming data
Manuela Kuhn's avatar
Manuela Kuhn committed
                for target, prio in targets:
                    # send data to the data stream to store it in the storage system
                    if prio == 0:
                        # socket already known
                        if target in self.openConnections:
                            tracker = self.openConnections[target].send_multipart(chunkPayload, copy=False, track=True)
                            self.log.info("Sending message part from file " + str(sourceFilepath) + " to '" + target + "' with priority " + str(prio) )
                        else:
                            # open socket
                            socket        = self.context.socket(zmq.PUSH)
                            connectionStr = "tcp://" + str(target)

                            socket.connect(connectionStr)
                            self.log.info("Start socket (connect): '" + str(connectionStr) + "'")

                            # register socket
                            self.openConnections[target] = socket

                            # send data
                            tracker = self.openConnections[target].send_multipart(chunkPayload, copy=False, track=True)
                            self.log.info("Sending message part from file " + str(sourceFilepath) + " to '" + target + "' with priority " + str(prio) )

                        # socket not known
                        if not tracker.done:
                            self.log.info("Message part from file " + str(sourceFilepath) + " has not been sent yet, waiting...")
                            tracker.wait()
                            self.log.info("Message part from file " + str(sourceFilepath) + " has not been sent yet, waiting...done")
                    else:
                        # socket already known
                        if target in self.openConnections:
                            # send data
                            self.openConnections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
                            self.log.info("Sending message part from file " + str(sourceFilepath) + " to " + target)
                        # socket not known
                        else:
                            # open socket
                            socket        = self.context.socket(zmq.PUSH)
                            connectionStr = "tcp://" + str(target)

                            socket.connect(connectionStr)
                            self.log.info("Start socket (connect): '" + str(connectionStr) + "'")

                            # register socket
                            self.openConnections[target] = socket

                            # send data
                            self.openConnections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
                            self.log.info("Sending message part from file " + str(sourceFilepath) + " to " + target)

            #close file
            fileDescriptor.close()
            self.log.debug("Passing multipart-message for file " + str(sourceFilepath) + "...done.")

        except:
            self.log.error("Unable to send multipart-message for file " + str(sourceFilepath), exc_info=True)


    def stop(self):
        self.log.debug("Closing sockets for DataDispatcher-" + str(self.id))
        for connection in self.openConnections:
            if self.openConnections[connection]:
                self.openConnections[connection].close(0)
                self.openConnections[connection] = None
        self.routerSocket.close(0)
        self.routerSocket = None
        if not self.extContext:
            self.log.debug("Destroying context")
            self.context.destroy()


    def __exit__(self):
        self.stop()


    def __del__(self):
        self.stop()

if __name__ == '__main__':
    from multiprocessing import Process, freeze_support, Queue
Manuela Kuhn's avatar
Manuela Kuhn committed
    import time
    from shutil import copyfile

    freeze_support()    #see https://docs.python.org/2/library/multiprocessing.html#windows

Manuela Kuhn's avatar
Manuela Kuhn committed
    logfile = BASE_PATH + os.sep + "logs" + os.sep + "dataDispatcher.log"

    logQueue = Queue(-1)

    # Get the log Configuration for the lisener
    h1, h2 = helpers.getLogHandlers(logfile, verbose=True, onScreenLogLevel="debug")

    # Start queue listener using the stream handler above
    logQueueListener    = helpers.CustomQueueListener(logQueue, h1, h2)
    logQueueListener.start()

    # Create log and set handler to queue handle
    root = logging.getLogger()
    root.setLevel(logging.DEBUG) # Log level = DEBUG
    qh = QueueHandler(logQueue)
    root.addHandler(qh)

Manuela Kuhn's avatar
Manuela Kuhn committed

    sourceFile = BASE_PATH + os.sep + "test_file.cbf"
    targetFile = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep + "100.cbf"
Manuela Kuhn's avatar
Manuela Kuhn committed
    copyfile(sourceFile, targetFile)
Manuela Kuhn's avatar
Manuela Kuhn committed
    time.sleep(0.5)


    routerPort    = "7000"
    receivingPort = "6005"
Manuela Kuhn's avatar
Manuela Kuhn committed
    chunkSize     = 10485760 ; # = 1024*1024*10 = 10 MiB

Manuela Kuhn's avatar
Manuela Kuhn committed
    localTarget   = BASE_PATH + os.sep + "data" + os.sep + "target"
    fixedStreamId = False
    fixedStreamId = "localhost:6006"

    logConfig = "test"
    dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, routerPort, chunkSize, fixedStreamId, logQueue, localTarget) )
Manuela Kuhn's avatar
Manuela Kuhn committed
    dataDispatcherPr.start()

    context       = zmq.Context.instance()

    routerSocket  = context.socket(zmq.PUSH)
    connectionStr = "tcp://127.0.0.1:" + routerPort
    routerSocket.bind(connectionStr)
    logging.info("=== routerSocket connected to " + connectionStr)

    receivingSocket = context.socket(zmq.PULL)
    connectionStr   = "tcp://0.0.0.0:" + receivingPort
    receivingSocket.bind(connectionStr)
    logging.info("=== receivingSocket connected to " + connectionStr)

    receivingSocket2 = context.socket(zmq.PULL)
    connectionStr   = "tcp://0.0.0.0:" + receivingPort2
    receivingSocket2.bind(connectionStr)
    logging.info("=== receivingSocket2 connected to " + connectionStr)


    metadata = {
Manuela Kuhn's avatar
Manuela Kuhn committed
            "sourcePath"  : BASE_PATH + os.sep +"data" + os.sep + "source",
            "relativePath": os.sep + "local" + os.sep + "raw",
    targets = [['localhost:6005', 1], ['localhost:6006', 0]]

    message = [ cPickle.dumps(metadata), cPickle.dumps(targets) ]
#    message = [ cPickle.dumps(metadata)]
Manuela Kuhn's avatar
Manuela Kuhn committed

    time.sleep(1)

    workload = routerSocket.send_multipart(message)
    logging.info("=== send message")

    try:
        recv_message = receivingSocket.recv_multipart()
        logging.info("=== received: " + str(cPickle.loads(recv_message[0])))
        recv_message = receivingSocket2.recv_multipart()
        logging.info("=== received 2: " + str(cPickle.loads(recv_message[0])))
Manuela Kuhn's avatar
Manuela Kuhn committed
    except KeyboardInterrupt:
Manuela Kuhn's avatar
Manuela Kuhn committed
    finally:
        dataDispatcherPr.terminate()

        routerSocket.close(0)
        receivingSocket.close(0)
        receivingSocket2.close(0)
Manuela Kuhn's avatar
Manuela Kuhn committed
        context.destroy()

        logQueue.put_nowait(None)
        logQueueListener.stop()