Skip to content
Snippets Groups Projects
DataDispatcher.py 14.1 KiB
Newer Older
from __builtin__ import open, type

__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'

import zmq
import os
import sys
import logging
import traceback
import json

#path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
SHARED_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) + os.sep + "shared"

if not SHARED_PATH in sys.path:
    sys.path.append ( SHARED_PATH )
del SHARED_PATH

import helperScript

#
#  --------------------------  class: DataDispatcher  --------------------------------------
#
class DataDispatcher():

    def __init__(self, id, dataStreamPort, chunkSize, cleanerPort,
                 useDataStream, context = None):

        self.log = self.getLogger()
        self.log.debug("DataDispatcher Nr. " + str(self.id) + " started.")

        self.id          = id
        self.localhost   = "127.0.0.1"
        self.extIp       = "0.0.0.0"
        self.routerPort  = routerPort
        self.chunkSize   = chunkSize

        self.routerSocket = None

        # dict with informations of all open sockets to which a data stream is opened (host, port,...)
        self.openConnections = dict()

        if context:
            self.context    = context
            self.extContext = True
        else:
            self.context    = zmq.Context()
            self.extContext = False

        self.createSockets()

        try:
            self.process()
        except KeyboardInterrupt:
            self.log.debug("KeyboardInterrupt detected. Shutting down DataDispatcher Nr. " + str(self.id) + ".")
        except:
            trace = traceback.format_exc()
            self.log.error("Stopping DataDispatcher Nr " + str(self.id) + " due to unknown error condition.")
            self.log.debug("Error was: " + str(trace))


    def createSockets(self):
        self.routerSocket = self.context.socket(zmq.PULL)
        connectionStr  = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.routerPort )
        routerSocket.connect(connectionStr)
        self.log.info("Start routerSocket (connect): '" + str(connectionStr) + "'")


    def getLogger(self):
        logger = logging.getLogger("DataDispatcher")
        return logger


    def process(self):
        """
          sends a 'ready' to a broker and receives a 'job' to process.
          The 'job' will be to pass the file of an fileEvent to the
          dataPipe.

          Why?
          -> the simulated "onClosed" event waits for a file for being
           not modified within a certain period of time.
           Instead of processing file after file the work will be
           spreaded to many workerProcesses. So each process can wait
           individual periods of time for a file without blocking
           new file events - as new file events will be handled by
           another workerProcess.
        """

        """
          takes the fileEventMessage, reading and passing the new file to
          a separate data-messagePipe. Afterwards the original file
          will be removed.
        """

        while True:
            # Get workload from router, until finished
            self.log.debug("DataDispatcher-" + str(self.id) + ": waiting for new job")
            message = self.routerSocket.recv_multipart()
            self.log.debug("DataDispatcher-" + str(self.id) + ": new job received")


            if len(message) >= 2:
                workload = message[0]
                targets  = message[1:]
            else:
                workload = message
                targets  = None

                finished = workload == b"EXIT"
                if finished:
                    self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
                    break

            # get metadata of the file
            try:
                self.log.debug("Getting file metadata")
                sourceFile, metadata = self.getMetadata(workload)
            except Exception as e:
                self.log.error("Building of metadata dictionary failed for file: " + str(sourceFile) + ".")
                self.log.debug("Error was: " + str(e))
                #skip all further instructions and continue with next iteration
                continue

            # send data
            if targets:
                try:
                    self.sendData(targets, sourceFile, metadata)
                except Exception as e:
                    self.log.debug("DataDispatcher-"+str(self.id) + ": Passing new file to data Stream...failed.")
                    self.log.debug("Error was: " + str(e))

                # remove file
                try:
                    os.remove(sourceFile)
                    self.log.info("Removing file '" + str(sourceFile) + "' ...success.")
                except IOError:
                    self.log.debug ("IOError: " + str(sourceFile))
                except Exception, e:
                    trace = traceback.format_exc()
                    self.log.debug("Unable to remove file {FILE}.".format(FILE=str(sourceFile))
                    self.log.debug("trace=" + str(trace))
            else:
                # move file
                try:
                    shutil.move(sourceFile, targetFile)
                    self.log.info("Moving file '" + str(sourceFile) + "' ...success.")
                except IOError:
                    self.log.debug ("IOError: " + str(sourceFile))
                except Exception, e:
                    trace = traceback.format_exc()
                    self.log.debug("Unable to move file {FILE}.".format(FILE=str(sourceFile))
                    self.log.debug("trace=" + str(trace))


            # send file to cleaner pipe
#            try:
#                #sending to pipe
#                self.log.debug("send file-event for file to cleaner-pipe...")
#                self.log.debug("metadata = " + str(metadata))
#                self.cleanerSocket.send(json.dumps(metadata))
#                self.log.debug("send file-event for file to cleaner-pipe...success.")
#
#                #TODO: remember workload. append to list?
#                # can be used to verify files which have been processed twice or more
#            except Exception, e:
#                self.log.error("Unable to notify Cleaner-pipe to handle file: " + str(workload))
#

    def getMetadata(self, workload):

        #convert fileEventMessage back to a dictionary
        metadata = None
        try:
            metadata = json.loads(str(workload))
            self.log.debug("str(metadata) = " + str(metadata) + "  type(metadata) = " + str(type(metadata)))
        except Exception as e:
            self.log.info("Unable to convert message into a dictionary.")
            self.log.debug("Error was: " + str(e))
            raise Exception(e)


        #extract fileEvent metadata
        try:
            #TODO validate metadata dict
            filename     = metadata["filename"]
            sourcePath   = metadata["sourcePath"]
            relativePath = metadata["relativePath"]
        except Exception as e:
            self.log.info("Invalid fileEvent message received.")
            self.log.debug("Error was: " + str(e))
            self.log.debug("metadata=" + str(metadata))
            #skip all further instructions and continue with next iteration
            raise Exception(e)

        # filename = "img.tiff"
        # filepath = "C:\dir"
        #
        # -->  sourceFilePathFull = 'C:\\dir\img.tiff'
        sourceFilePath     = os.path.normpath(sourcePath + os.sep + relativePath)
        sourceFilePathFull = os.path.join(sourceFilePath, filename)

        try:
            #for quick testing set filesize of file as chunksize
            self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...")
            filesize             = os.path.getsize(sourceFilePathFull)
            fileModificationTime = os.stat(sourceFilePathFull).st_mtime
            chunksize            = filesize    #can be used later on to split multipart message
            self.log.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize)))
            self.log.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime)))

        except Exception as e:
            self.log.info("Unable to create metadata dictionary.")
            self.log.debug("Error was: " + str(e))
            raise Exception(e)

        #build payload for message-pipe by putting source-file into a message
        try:
            self.log.debug("create metadata for source file...")
            #metadata = {
            #                 "filename"             : filename,
            #                 "sourcePath"           : sourcePath,
            #                 "relativePath"         : relativePath,
            #                 "filesize"             : filesize,
            #                 "fileModificationTime" : fileModificationTime,
            #                 "chunkSize"            : self.zmqMessageChunkSize
            #                 }
            metadata[ "filesize"             ] = filesize
            metadata[ "fileModificationTime" ] = fileModificationTime
            metadata[ "chunkSize"            ] = self.zmqMessageChunkSize

            self.log.debug("metadata = " + str(metadata))
        except Exception as e:
            self.log.info("Unable to assemble multi-part message.")
            self.log.debug("Error was: " + str(e))
            raise Exception(e)

        return sourceFilePathFull, metadata


    def sendData(self, targets, sourceFilepath, metadata):
        #reading source file into memory
        try:
            self.log.debug("Opening '" + str(sourceFilepath) + "'...")
            fileDescriptor = open(str(sourceFilepath), "rb")
        except Exception as e:
            self.log.error("Unable to read source file '" + str(sourceFilepath) + "'.")
            self.log.debug("Error was: " + str(e))
            raise Exception(e)

        #send message
        try:
            self.log.debug("Passing multipart-message for file " + str(sourceFilepath) + "...")
            chunkNumber = 0
            stillChunksToRead = True
            payloadAll = [json.dumps(payloadMetadata.copy())]
            while stillChunksToRead:
                chunkNumber += 1

                #read next chunk from file
                fileContent = fileDescriptor.read(self.zmqMessageChunkSize)

                #detect if end of file has been reached
                if not fileContent:
                    stillChunksToRead = False

                    #as chunk is empty decrease chunck-counter
                    chunkNumber -= 1
                    break

                #assemble metadata for zmq-message
                chunkPayloadMetadata = payloadMetadata.copy()
                chunkPayloadMetadata["chunkNumber"] = chunkNumber
                chunkPayloadMetadataJson = json.dumps(chunkPayloadMetadata)
                chunkPayload = []
                chunkPayload.append(chunkPayloadMetadataJson)
                chunkPayload.append(fileContent)

                # send data to the data stream to store it in the storage system
#                if self.useDataStream:
#
#                    tracker = self.dataStreamSocket.send_multipart(chunkPayload, copy=False, track=True)
#
#                    if not tracker.done:
#                        self.log.info("Message part from file " + str(sourceFilepath) + " has not been sent yet, waiting...")
#                        tracker.wait()
#                        self.log.info("Message part from file " + str(sourceFilepath) + " has not been sent yet, waiting...done")

                # streaming data
                #TODO priority
                for target in targets:
                    # socket already known
                    if target in self.openConnections:
                        # send data
                        self.connections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
                        self.log.info("Sending message part from file " + str(sourceFilepath) + " to " + target)
                    # socket not known
                    else:
                        # open socket
                        socket        = self.zmqContextForWorker.socket(zmq.PUSH)
                        connectionStr = "tcp://" + str(target)

                        # register socket
                        self.openConnections[target] = socket

                        # send data
                        self.connections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
                        self.log.info("Sending message part from file " + str(sourceFilepath) + " to " + target)

            #close file
            fileDescriptor.close()
            self.log.debug("Passing multipart-message for file " + str(sourceFilepath) + "...done.")

        except Exception, e:
            self.log.error("Unable to send multipart-message for file " + str(sourceFilepath))
            trace = traceback.format_exc()
            self.log.debug("Error was: " + str(trace))
            self.log.debug("Passing multipart-message...failed.")


    def appendFileChunksToPayload(self, payload, sourceFilePathFull, fileDescriptor, chunkSize):
        try:
            # chunksize = 16777216 #16MB
            self.log.debug("reading file '" + str(sourceFilePathFull)+ "' to memory")

            # FIXME: chunk is read-out as str. why not as bin? will probably add to much overhead to zmq-message
            fileContent = fileDescriptor.read(chunkSize)

            while fileContent != "":
                payload.append(fileContent)
                fileContent = fileDescriptor.read(chunkSize)
        except Exception, e:
             self.log("Error was: " + str(e))



    def stop(self):
        self.log.debug("Closing sockets for DataDispatcher-" + str(self.id))
        for connection in self.openConnections:
            if self.openConnections[connection]:
                self.openConnections[connection].close(0)
                self.openConnections[connection] = None
        self.routerSocket.close(0)
        self.routerSocket = None
        if not self.externalContext:
            self.log.debug("Destroying context")
            self.zmqContextForWorker.destroy()


    def __exit__(self):
        self.stop()


    def __del__(self):
        self.stop()

if __name__ == '__main__':
    pass