Skip to content
Snippets Groups Projects
send_helpers.py 2.7 KiB
Newer Older
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'

import zmq

def __sendToTargets(log, targets, sourceFile, targetFile, openConnections, metadata, payload, context):

    for target, prio, sendType in targets:

        # send data to the data stream to store it in the storage system
        if prio == 0:
            # socket not known
            if target not in openConnections:
                # open socket
                socket        = context.socket(zmq.PUSH)
                connectionStr = "tcp://" + str(target)

                socket.connect(connectionStr)
                log.info("Start socket (connect): '" + str(connectionStr) + "'")

                # register socket
                openConnections[target] = socket

            # send data
            if sendType == "data":
                tracker = openConnections[target].send_multipart(payload, copy=False, track=True)
                log.info("Sending message part from file " + str(sourceFile) +
                         " to '" + target + "' with priority " + str(prio) )

            elif sendType == "metadata":
                tracker = openConnections[target].send_multipart(metadata, copy=False, track=True)
                log.info("Sending metadata of message part from file " + str(sourceFile) +
                         " to '" + target + "' with priority " + str(prio) )
                log.debug("metadata=" + str(metadata))


            if not tracker.done:
                log.info("Message part from file " + str(sourceFile) +
                         " has not been sent yet, waiting...")
                tracker.wait()
                log.info("Message part from file " + str(sourceFile) +
                         " has not been sent yet, waiting...done")

        else:
            # socket not known
            if target not in openConnections:
                # open socket
                socket        = context.socket(zmq.PUSH)
                connectionStr = "tcp://" + str(target)

                socket.connect(connectionStr)
                log.info("Start socket (connect): '" + str(connectionStr) + "'")

                # register socket
                openConnections[target] = socket

            # send data
            if sendType == "data":
                openConnections[target].send_multipart(payload, zmq.NOBLOCK)
                log.info("Sending message part from file " + str(sourceFile) +
                         " to " + target)

            elif sendType == "metadata":
                openConnections[target].send_multipart(metadata, zmq.NOBLOCK)
                log.info("Sending metadata of message part from file " + str(sourceFile) +
                         " to " + target)
                log.debug("metadata=" + str(metadata))