Skip to content
Snippets Groups Projects
getFromHttp.py 9.32 KiB
Newer Older
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Jan Garrevoet <jan,garrevoet@desy.de>'

import zmq
import os
import sys
import logging
import cPickle
import urllib
import requests
import re
import urllib2
import time
import errno
from send_helpers import __sendToTargets


def setup (log, prop):
    #TODO
    # check if prop has correct format

    prop["session"] = requests.session()


def getMetadata (log, metadata, chunkSize, localTarget = None):

    #extract fileEvent metadata
    try:
        #TODO validate metadata dict
        filename     = metadata["filename"]
        sourcePath   = metadata["sourcePath"]
        relativePath = metadata["relativePath"]
    except:
        log.error("Invalid fileEvent message received.", exc_info=True)
        log.debug("metadata=" + str(metadata))
        #skip all further instructions and continue with next iteration
        raise


    # no normpath used because that would transform http://... into http:/...
    sourceFilePath = sourcePath + os.sep + relativePath
    sourceFile     = os.path.join(sourceFilePath, filename)

    #TODO combine better with sourceFile... (for efficiency)
    if localTarget:
        targetFilePath = os.path.normpath(localTarget + os.sep + relativePath)
        targetFile     = os.path.join(targetFilePath, filename)
    else:
        targetFile = None

    try:
        log.debug("create metadata for source file...")
        #metadata = {
        #        "filename"     : filename,
        #        "sourcePath"   : sourcePath,
        #        "relativePath" : relativePath,
        #        "chunkSize"    : chunkSize
        #        }
        metadata[ "chunkSize" ] = chunkSize

        log.debug("metadata = " + str(metadata))
    except:
        log.error("Unable to assemble multi-part message.", exc_info=True)
        raise

    return sourceFile, targetFile, metadata


def sendData (log, targets, sourceFile, targetFile,  metadata, openConnections, context, prop):
    response = prop["session"].get(sourceFile)
    try:
        response.raise_for_status()
        log.debug("Initiating http get for file '" + str(sourceFile) + "' succeded.")
    except:
        log.error("Initiating http get for file '" + str(sourceFile) + "' failed.", exc_info=True)
        return
#    if response.status_code != 200:
#        self.log.error("Unable to get file " + sourceFile + ". Http status code was " + respconse.status_code)

    try:
        chunkSize = metadata[ "chunkSize" ]
    except:
        log.error("Unable to get chunkSize", exc_info=True)

    if prop["storeFlag"]:
        try:
            log.debug("Opening '" + str(targetFile) + "'...")
            fileDescriptor = open(str(targetFile), "wb")
        except IOError, e:
            # errno.ENOENT == "No such file or directory"
            if e.errno == errno.ENOENT:
                #TODO create subdirectory first, then try to open the file again
                try:
                    targetPath = os.path.normpath(prop["localTarget"] + os.sep + metadata["relativePath"])
                    os.makedirs(targetPath)
                    newFile = open(targetFile, "w")
                    log.info("New target directory created: " + str(targetPath))
                except:
                    log.error("Unable to open target file '" + targetFile + "'.", exc_info=True)
                    log.debug("targetPath:" + str(targetPath))
                    raise
            else:
                log.error("Unable to open target file '" + targetFile + "'.", exc_info=True)
        except:
            log.error("Unable to open target file '" + targetFile + "'.", exc_info=True)
            log.debug("e.errno = " + str(e.errno) + "        errno.EEXIST==" + str(errno.EEXIST))

    chunkNumber = 0

    log.debug("Getting data for file '" + str(sourceFile) + "'...")
    #reading source file into memory
    for data in response.iter_content(chunk_size=chunkSize):
        log.debug("Packing multipart-message for file " + str(sourceFile) + "...")

        try:
            #assemble metadata for zmq-message
            metadataExtended = metadata.copy()
            metadataExtended["chunkNumber"] = chunkNumber
            metadataExtended = cPickle.dumps(metadataExtended)

            payload = []
            payload.append(metadataExtended)
            payload.append(data)
        except:
            log.error("Unable to pack multipart-message for file " + str(sourceFile), exc_info=True)


        if prop["storeFlag"]:
            fileDescriptor.write(data)

        #send message
        try:
            __sendToTargets(log, targets, sourceFile, targetFile, openConnections, metadataExtended, payload, context)
            log.debug("Passing multipart-message for file " + str(sourceFile) + "...done.")

        except:
            log.error("Unable to send multipart-message for file " + str(sourceFile), exc_info=True)

    if prop["storeFlag"]:
        try:
            log.debug("Closing '" + str(targetFile) + "'...")
            fileDescriptor.close()
        except:
            log.error("Unable to close target file '" + str(targetFile) + "'.", exc_info=True)
            raise


def finishDataHandling (log, sourceFile, targetFile, prop):
    if prop["removeFlag"]:
        pass
        #TODO delete file from detector after sending
        #    responce = requests.delete(sourceFile)
        #    try:
        #        responce.raise_for_status()
        #        log.debug("Deleting file " + str(sourceFile) + " succeded.")
        #    except:
        #        log.error("Deleting file " + str(sourceFile) + " failed.", exc_info=True)
        # remove file


def clean (prop):
    pass


if __name__ == '__main__':
    from shutil import copyfile
    import subprocess

    try:
        BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ))))
    except:
        BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) ))))
    print "BASE_PATH", BASE_PATH
    SHARED_PATH  = BASE_PATH + os.sep + "src" + os.sep + "shared"

    if not SHARED_PATH in sys.path:
        sys.path.append ( SHARED_PATH )
    del SHARED_PATH

    import helpers

    logfile = BASE_PATH + os.sep + "logs" + os.sep + "getFromHttp.log"
    logsize = 10485760

    # Get the log Configuration for the lisener
    h1, h2 = helpers.getLogHandlers(logfile, logsize, verbose=True, onScreenLogLevel="debug")

    # Create log and set handler to queue handle
    root = logging.getLogger()
    root.setLevel(logging.DEBUG) # Log level = DEBUG
    root.addHandler(h1)
    root.addHandler(h2)

    receivingPort    = "6005"
    receivingPort2   = "6006"
    extIp            = "0.0.0.0"
    dataFwPort       = "50010"

    context          = zmq.Context.instance()

    receivingSocket  = context.socket(zmq.PULL)
    connectionStr    = "tcp://{ip}:{port}".format( ip=extIp, port=receivingPort )
    receivingSocket.bind(connectionStr)
    logging.info("=== receivingSocket connected to " + connectionStr)

    receivingSocket2 = context.socket(zmq.PULL)
    connectionStr    = "tcp://{ip}:{port}".format( ip=extIp, port=receivingPort2 )
    receivingSocket2.bind(connectionStr)
    logging.info("=== receivingSocket2 connected to " + connectionStr)


    prework_sourceFile = BASE_PATH + os.sep + "test_file.cbf"
    localTarget        = BASE_PATH + os.sep + "data" + os.sep + "target"

    #read file to send it in data pipe
    logging.debug("=== copy file to lsdma-lab04")
#    os.system('scp "%s" "%s:%s"' % (localfile, remotehost, remotefile) )
    subprocess.call("scp " + prework_sourceFile + " root@lsdma-lab04:/var/www/html/test_httpget/data", shell=True)

    workload = {
            "sourcePath"  : "http://192.168.138.37/data",
            "relativePath": "testp06",
            "filename"    : "35_data_000170.h5"
#    workload = {
#            "sourcePath"  : "http://131.169.55.170/test_httpget/data",
#            "relativePath": "",
#            "filename"    : "test_file.cbf"
#            }
    targets = [['localhost:' + receivingPort, 1, "data"], ['localhost:' + receivingPort2, 1, "data"]]

    chunkSize       = 10485760 ; # = 1024*1024*10 = 10 MiB
    localTarget     = BASE_PATH + os.sep + "data" + os.sep + "target"
    openConnections = dict()

    dataFetcherProp = {
            "type"       : "getFromHttp",
            "session"    : None,
            "storeFlag"  : True,
            "removeFlag" : False
            }

    setup(logging, dataFetcherProp)

    sourceFile, targetFile, metadata = getMetadata (logging, workload, chunkSize, localTarget)
#    sourceFile = "http://131.169.55.170/test_httpget/data/test_file.cbf"

    sendData(logging, targets, sourceFile, targetFile, metadata, openConnections, context, dataFetcherProp)

    finishDataHandling(logging, sourceFile, targetFile, dataFetcherProp)

    logging.debug("openConnections after function call: " + str(openConnections))


    try:
        recv_message = receivingSocket.recv_multipart()
        logging.info("=== received: " + str(cPickle.loads(recv_message[0])))
        recv_message = receivingSocket2.recv_multipart()
        logging.info("=== received 2: " + str(cPickle.loads(recv_message[0])) + "\ndata-part: " + str(len(recv_message[1])))
    except KeyboardInterrupt:
        pass
    finally:
        receivingSocket.close(0)
        receivingSocket2.close(0)
        clean(dataFetcherProp)
        context.destroy()