Skip to content
Snippets Groups Projects
getFromQueue.py 9.51 KiB
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'

import zmq
import os
import sys
import logging
import traceback
import cPickle
import shutil


def setup(log, prop):

    # Create zmq socket
    socket        = prop["context"].socket(zmq.PULL)
    connectionStr = "tcp://{ip}:{port}".format( ip=prop["extIp"], port=prop["port"] )
    socket.bind(connectionStr)
    log.info("Start socket (bind): '" + str(connectionStr) + "'")

    # register socket
    prop["socket"] = socket


def getMetadata (log, metadata, chunkSize, localTarget = None):

    #extract fileEvent metadata
    try:
        #TODO validate metadata dict
        sourceFile = metadata["filename"]
    except:
        log.error("Invalid fileEvent message received.", exc_info=True)
        log.debug("metadata=" + str(metadata))
        #skip all further instructions and continue with next iteration
        raise

    #TODO combine better with sourceFile... (for efficiency)
    if localTarget:
        targetFile     = os.path.join(localTarget, filename)
    else:
        targetFile     = None

    try:
        # For quick testing set filesize of file as chunksize
        log.debug("get filesize for '" + str(sourceFile) + "'...")
#        filesize    = os.path.getsize(sourceFile)
#        fileModTime = os.stat(sourceFile).st_mtime
#        chunksize   = filesize    #can be used later on to split multipart message
#        log.debug("filesize(%s) = %s" % (sourceFile, str(filesize)))
#        log.debug("fileModTime(%s) = %s" % (sourceFile, str(fileModTime)))

    except:
        log.error("Unable to create metadata dictionary.", exc_info=True)
        raise

    try:
        log.debug("create metadata for source file...")
        #metadata = {
        #        "filename"     : filename,
        #        "filesize"     : filesize,
        #        "fileModTime"  : fileModTime,
        #        "chunkSize"    : self.zmqMessageChunkSize
        #        }
#        metadata[ "filesize"    ] = filesize
#        metadata[ "fileModTime" ] = fileModTime
        metadata[ "chunkSize"   ] = chunkSize

        log.debug("metadata = " + str(metadata))
    except:
        log.error("Unable to assemble multi-part message.", exc_info=True)
        raise

    return sourceFile, targetFile, metadata


def sendData (log, targets, sourceFile, metadata, openConnections, context, prop):
    #reading source file into memory
    try:
        log.debug("Getting '" + str(sourceFile) + "'...")
        data = prop["socket"].recv()
    except:
        log.error("Unable to get source file '" + str(sourceFile) + "'.", exc_info=True)
        raise

    chunkSize = metadata[ "chunkSize" ]

    #send message
    try:
        log.debug("Passing multipart-message for file " + str(sourceFile) + "...")
        chunkNumber = 0

        #assemble metadata for zmq-message
        metadataExtended = metadata.copy()
        metadataExtended["chunkNumber"] = chunkNumber
        metadataExtended = cPickle.dumps(metadata)

        payload = []
        payload.append(metadataExtended)
        payload.append(data)

        # streaming data
        for target, prio in targets:

            # send data to the data stream to store it in the storage system
            if prio == 0:
                # socket already known
                if target in openConnections:
                    tracker = openConnections[target].send_multipart(payload, copy=False, track=True)
                    log.info("Sending message part from file " + str(sourceFile) + " to '" + target + "' with priority " + str(prio) )
                else:
                    # open socket
                    socket        = context.socket(zmq.PUSH)
                    connectionStr = "tcp://" + str(target)

                    socket.connect(connectionStr)
                    log.info("Start socket (connect): '" + str(connectionStr) + "'")

                    # register socket
                    openConnections[target] = socket

                    # send data
                    tracker = openConnections[target].send_multipart(payload, copy=False, track=True)
                    log.info("Sending message part from file " + str(sourceFile) + " to '" + target + "' with priority " + str(prio) )

                # socket not known
                if not tracker.done:
                    log.info("Message part from file " + str(sourceFile) + " has not been sent yet, waiting...")
                    tracker.wait()
                    log.info("Message part from file " + str(sourceFile) + " has not been sent yet, waiting...done")

            else:
                # socket already known
                if target in openConnections:
                    # send data
                    openConnections[target].send_multipart(payload, zmq.NOBLOCK)
                    log.info("Sending message part from file " + str(sourceFile) + " to " + target)
                # socket not known
                else:
                    # open socket
                    socket        = context.socket(zmq.PUSH)
                    connectionStr = "tcp://" + str(target)

                    socket.connect(connectionStr)
                    log.info("Start socket (connect): '" + str(connectionStr) + "'")

                    # register socket
                    openConnections[target] = socket

                    # send data
                    openConnections[target].send_multipart(payload, zmq.NOBLOCK)
                    log.info("Sending message part from file " + str(sourceFile) + " to " + target)

        log.debug("Passing multipart-message for file " + str(sourceFile) + "...done.")

    except:
        log.error("Unable to send multipart-message for file " + str(sourceFile), exc_info=True)


def finishDataHandling (log, sourceFile, targetFile, removeFlag = False):
    pass


def clean(prop):
    # Close zmq socket
    if prop["socket"]:
        prop["socket"].close(0)
        prop["socket"] = None


if __name__ == '__main__':
    import time
    from shutil import copyfile

    try:
        BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ))))
    except:
        BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) ))))
    print "BASE_PATH", BASE_PATH
    SHARED_PATH  = BASE_PATH + os.sep + "src" + os.sep + "shared"

    if not SHARED_PATH in sys.path:
        sys.path.append ( SHARED_PATH )
    del SHARED_PATH

    import helpers

    logfile = BASE_PATH + os.sep + "logs" + os.sep + "getFromFile.log"

    # Get the log Configuration for the lisener
    h1, h2 = helpers.getLogHandlers(logfile, verbose=True, onScreenLogLevel="debug")

    # Create log and set handler to queue handle
    root = logging.getLogger()
    root.setLevel(logging.DEBUG) # Log level = DEBUG
    root.addHandler(h1)
    root.addHandler(h2)

    receivingPort    = "6005"
    receivingPort2   = "6006"
    extIp            = "0.0.0.0"
    dataFwPort       = "6050"

    context          = zmq.Context.instance()

    dataFwSocket     = context.socket(zmq.PUSH)
    connectionStr    = "tcp://{ip}:{port}".format( ip=extIp, port=dataFwPort )
    dataFwSocket.connect(connectionStr)
    logging.info("=== Start dataFwsocket (connect): '" + str(connectionStr) + "'")

    receivingSocket  = context.socket(zmq.PULL)
    connectionStr    = "tcp://{ip}:{port}".format( ip=extIp, port=receivingPort )
    receivingSocket.bind(connectionStr)
    logging.info("=== receivingSocket connected to " + connectionStr)

    receivingSocket2 = context.socket(zmq.PULL)
    connectionStr    = "tcp://{ip}:{port}".format( ip=extIp, port=receivingPort2 )
    receivingSocket2.bind(connectionStr)
    logging.info("=== receivingSocket2 connected to " + connectionStr)


    prework_sourceFile = BASE_PATH + os.sep + "test_file.cbf"

    #read file to send it in data pipe
    fileDescriptor = open(prework_sourceFile, "rb")
    fileContent = fileDescriptor.read()
    logging.debug("=== File read")
    fileDescriptor.close()

    dataFwSocket.send(fileContent)
    logging.debug("=== File send")

    workload = {
            "sourcePath"  : BASE_PATH + os.sep +"data" + os.sep + "source",
            "relativePath": os.sep + "local" + os.sep + "raw",
            "filename"    : "100.cbf"
            }
    targets = [['localhost:' + receivingPort, 1], ['localhost:' + receivingPort2, 0]]

    chunkSize       = 10485760 ; # = 1024*1024*10 = 10 MiB
    localTarget     = BASE_PATH + os.sep + "data" + os.sep + "target"
    openConnections = dict()

    dataFetcherProp = {
            "type"       : "getFromQueue",
            "context"    : context,
            "extIp"      : extIp,
            "port"       : dataFwPort
            }

    logging.debug("openConnections before function call: " + str(openConnections))

    setup(logging, dataFetcherProp)

    sourceFile, targetFile, metadata = getMetadata (logging, workload, chunkSize, localTarget = None)
    sendData(logging, targets, sourceFile, metadata, openConnections, context, dataFetcherProp)

    finishDataHandling(logging, sourceFile, targetFile, dataFetcherProp)

    logging.debug("openConnections after function call: " + str(openConnections))


    try:
        recv_message = receivingSocket.recv_multipart()
        logging.info("=== received: " + str(cPickle.loads(recv_message[0])))
        recv_message = receivingSocket2.recv_multipart()
        logging.info("=== received 2: " + str(cPickle.loads(recv_message[0])))
    except KeyboardInterrupt:
        pass
    finally:
        receivingSocket.close(0)
        receivingSocket2.close(0)
        clean(dataFetcherProp)
        context.destroy()