Skip to content
Snippets Groups Projects
sender.py 32.3 KiB
Newer Older
from __builtin__ import open, type

__author__ = 'Manuela Kuhn <marnuel.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'


import time
import argparse
import zmq
import os
import logging
import sys
import traceback
from multiprocessing import Process, freeze_support
import subprocess
import json
import shutil
import helperScript
import socket       # needed to get hostname
from watcher import DirectoryWatcher
from Cleaner import Cleaner

BASE_PATH   = os.path.dirname ( os.path.dirname ( os.path.dirname (  os.path.realpath ( __file__ ) ) ) )
CONFIG_PATH = BASE_PATH + os.sep + "conf"

sys.path.append ( CONFIG_PATH )
from config import defaultConfigSender

#
#  --------------------------  class: WorkerProcess  --------------------------------------
#
class WorkerProcess():
    id                   = None
    dataStreamIp         = None
    dataStreamPort       = None
    zmqContextForWorker  = None
    externalContext      = None    # if the context was created outside this class or not
    zmqMessageChunkSize  = None
    zmqCleanerIp         = None         # responsable to delete/move files
    zmqCleanerPort       = None         # responsable to delete/move files
    zmqDataStreamSocket  = None
    routerSocket         = None
    cleanerSocket        = None

    useLiveViewer        = False        # boolian to inform if the receiver to show the files in the live viewer is running
    # to get the logging only handling this class

    def __init__(self, id, dataStreamIp, dataStreamPort, chunkSize, zmqCleanerIp, zmqCleanerPort,
                 context = None):
        self.id                   = id
        self.dataStreamIp         = dataStreamIp
        self.dataStreamPort       = dataStreamPort
        self.zmqMessageChunkSize  = chunkSize
        self.zmqCleanerIp         = zmqCleanerIp
        self.zmqCleanerPort       = zmqCleanerPort

        #initialize router
        if context:
            self.zmqContextForWorker = context
            self.externalContext     = True
        else:
            self.zmqContextForWorker = zmq.Context()
            self.externalContext     = False
        self.log.debug("new workerProcess started. id=" + str(self.id))

        self.zmqDataStreamSocket      = self.zmqContextForWorker.socket(zmq.PUSH)
        connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=self.dataStreamIp, port=self.dataStreamPort)
Manuela Kuhn's avatar
Manuela Kuhn committed
        print "connectionStrDataStreamSocket", connectionStrDataStreamSocket
        self.zmqDataStreamSocket.bind(connectionStrDataStreamSocket)
        self.log.debug("zmqDataStreamSocket started (bind) for '" + connectionStrDataStreamSocket + "'")
        routerIp   = "127.0.0.1"
        routerPort = "50000"

        self.routerSocket             = self.zmqContextForWorker.socket(zmq.REQ)
        self.routerSocket.identity    = u"worker-{ID}".format(ID=self.id).encode("ascii")
        connectionStrRouterSocket     = "tcp://{ip}:{port}".format(ip=routerIp, port=routerPort)
        self.routerSocket.connect(connectionStrRouterSocket)
        self.log.debug("routerSocket started (connect) for '" + connectionStrRouterSocket + "'")

        #init Cleaner message-pipe
        self.cleanerSocket            = self.zmqContextForWorker.socket(zmq.PUSH)
        connectionStrCleanerSocket    = "tcp://{ip}:{port}".format(ip=self.zmqCleanerIp, port=self.zmqCleanerPort)
        self.cleanerSocket.connect(connectionStrCleanerSocket)
        self.log.debug("cleanerSocket started (connect) for '" + connectionStrCleanerSocket + "'")

        try:
            self.process()
        except KeyboardInterrupt:
            # trace = traceback.format_exc()
            self.log.debug("KeyboardInterrupt detected. Shutting down workerProcess " + str(self.id) + ".")
            trace = traceback.format_exc()
            self.log.error("Stopping workerProcess due to unknown error condition.")
            self.log.debug("Error was: " + str(trace))
        finally:
            self.stop()


    def process(self):
        """
          sends a 'ready' to a broker and receives a 'job' to process.
          The 'job' will be to pass the file of an fileEvent to the
          dataPipe.

          Why?
          -> the simulated "onClosed" event waits for a file for being
           not modified within a certain period of time.
           Instead of processing file after file the work will be
           spreaded to many workerProcesses. So each process can wait
           individual periods of time for a file without blocking
           new file events - as new file events will be handled by
           another workerProcess.
        """

        """
          takes the fileEventMessage, reading and passing the new file to
          a separate data-messagePipe. Afterwards the original file
          will be removed.
        """

        processingJobs = True
        jobCount = 0

        while processingJobs:
            #sending a "ready"-signal to the router.
            #the reply will contain the actual job/task.
            self.log.debug("worker-"+str(self.id)+": sending ready signal")

            self.routerSocket.send(b"READY")

            # Get workload from router, until finished
            self.log.debug("worker-"+str(self.id)+": waiting for new job")
            workload = self.routerSocket.recv()
            self.log.debug("worker-"+str(self.id)+": new job received")
            finished = workload == b"END"
            if finished:
                processingJobs = False
                self.log.debug("router requested to shutdown worker-process. Worker processed: %d files" % jobCount)
            startLV = workload == b"START_LIVE_VIEWER"
            if startLV:
                self.log.info("worker-"+str(self.id)+": Received live viewer start command...starting live viewer")
                self.useLiveViewer = True
                continue

            # the live viewer is turned of
            stopLV = workload == b"STOP_LIVE_VIEWER"
            if stopLV:
                self.log.info("worker-"+str(self.id)+": Received live viewer stop command...stopping live viewer")
                self.useLiveViewer = False
                continue

            if self.useLiveViewer:
                #convert fileEventMessage back to a dictionary
                fileEventMessageDict = None
                try:
                    fileEventMessageDict = json.loads(str(workload))
                    self.log.debug("str(messageDict) = " + str(fileEventMessageDict) + "  type(messageDict) = " + str(type(fileEventMessageDict)))
                except Exception, e:
                    errorMessage = "Unable to convert message into a dictionary."
                    self.log.error(errorMessage)
                    self.log.debug("Error was: " + str(e))
                #extract fileEvent metadata
                try:
                    #TODO validate fileEventMessageDict dict
                    filename     = fileEventMessageDict["filename"]
                    sourcePath   = fileEventMessageDict["sourcePath"]
                    relativePath = fileEventMessageDict["relativePath"]
                except Exception, e:
                    self.log.error("Invalid fileEvent message received.")
                    self.log.debug("Error was: " + str(e))
                    self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict))
                    #skip all further instructions and continue with next iteration
                    continue
                #passing file to data-messagPipe
                try:
                    self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...")
                    self.passFileToDataStream(filename, sourcePath, relativePath)
                    self.log.debug("worker-" + str(self.id) + ": passing new file to data-messagePipe...success.")
                except Exception, e:
                    errorMessage = "Unable to pass new file to data-messagePipe."
                    self.log.error(errorMessage)
                    self.log.error("Error was: " + str(e))
                    self.log.debug("worker-"+str(id) + ": passing new file to data-messagePipe...failed.")
                    #skip all further instructions and continue with next iteration
                    continue
            else:
                print "worker-"+str(self.id)+": no data sent"


            #send remove-request to message pipe
            try:
                #sending to pipe
Manuela Kuhn's avatar
Manuela Kuhn committed
                self.log.debug("send file-event for file to cleaner-pipe...")
                self.log.debug("workload = " + str(workload))
                self.cleanerSocket.send(workload)
Manuela Kuhn's avatar
Manuela Kuhn committed
                self.log.debug("send file-event for file to cleaner-pipe...success.")

                #TODO: remember workload. append to list?
                # can be used to verify files which have been processed twice or more
            except Exception, e:
Manuela Kuhn's avatar
Manuela Kuhn committed
                errorMessage = "Unable to notify Cleaner-pipe to delete file: " + str(workload)
                self.log.error(errorMessage)
                self.log.debug("fileEventMessageDict=" + str(fileEventMessageDict))


    def getLogger(self):
        logger = logging.getLogger("workerProcess")
        return logger


    def passFileToDataStream(self, filename, sourcePath, relativePath):
        """filesizeRequested == filesize submitted by file-event. In theory it can differ to real file size"""

        # filename = "img.tiff"
        # filepath = "C:\dir"
        #
        # -->  sourceFilePathFull = 'C:\\dir\img.tiff'
        sourceFilePath     = os.path.normpath(sourcePath + os.sep + relativePath)
        sourceFilePathFull = os.path.join(sourceFilePath, filename)

        #reading source file into memory
        try:
            #for quick testing set filesize of file as chunksize
            self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...")
            filesize             = os.path.getsize(sourceFilePathFull)
            fileModificationTime = os.stat(sourceFilePathFull).st_mtime
            chunksize            = filesize    #can be used later on to split multipart message
            self.log.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize)))
            self.log.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime)))

        except Exception, e:
            errorMessage = "Unable to get file metadata for '" + str(sourceFilePathFull) + "'."
            self.log.error(errorMessage)
            self.log.debug("Error was: " + str(e))
            raise Exception(e)

        try:
            self.log.debug("opening '" + str(sourceFilePathFull) + "'...")
            fileDescriptor = open(str(sourceFilePathFull), "rb")

        except Exception, e:
            errorMessage = "Unable to read source file '" + str(sourceFilePathFull) + "'."
            self.log.error(errorMessage)
            self.log.debug("Error was: " + str(e))
            raise Exception(e)


        #build payload for message-pipe by putting source-file into a message
        try:
            payloadMetadata = self.buildPayloadMetadata(filename, filesize, fileModificationTime, sourcePath, relativePath)
        except Exception, e:
            self.log.error("Unable to assemble multi-part message.")
            self.log.debug("Error was: " + str(e))
            raise Exception(e)


        #send message
        try:
Manuela Kuhn's avatar
Manuela Kuhn committed
            self.log.info("Passing multipart-message for file " + str(sourceFilePathFull) + "...")
            print "sending file: ", sourceFilePathFull
            chunkNumber = 0
            stillChunksToRead = True
            while stillChunksToRead:
                chunkNumber += 1

                #read next chunk from file
                fileContentAsByteObject = fileDescriptor.read(self.getChunkSize())

                #detect if end of file has been reached
                if not fileContentAsByteObject:
                    stillChunksToRead = False

                    #as chunk is empty decrease chunck-counter
                    chunkNumber -= 1
                    break

                #assemble metadata for zmq-message
                chunkPayloadMetadata = payloadMetadata.copy()
                chunkPayloadMetadata["chunkNumber"] = chunkNumber
                chunkPayloadMetadataJson = json.dumps(chunkPayloadMetadata)
                chunkPayload = []
                chunkPayload.append(chunkPayloadMetadataJson)
                chunkPayload.append(fileContentAsByteObject)

                #send to zmq pipe
                self.zmqDataStreamSocket.send_multipart(chunkPayload, zmq.NOBLOCK)

            #close file
            fileDescriptor.close()
            print "sending file: ", sourceFilePathFull, "done"

            # self.zmqDataStreamSocket.send_multipart(multipartMessage)
Manuela Kuhn's avatar
Manuela Kuhn committed
            self.log.info("Passing multipart-message for file " + str(sourceFilePathFull) + "...done.")
        except zmq.error.Again:
            self.log.error("unable to send multiplart-message for file " + str(sourceFilePathFull))
Manuela Kuhn's avatar
Manuela Kuhn committed
            self.log.error("Receiver has disconnected")
        except Exception, e:
Manuela Kuhn's avatar
Manuela Kuhn committed
            self.log.error("Unable to send multipart-message for file " + str(sourceFilePathFull))
            self.log.debug("Error was: " + str(e))
            self.log.info("Passing multipart-message...failed.")
#            raise Exception(e)


    def appendFileChunksToPayload(self, payload, sourceFilePathFull, fileDescriptor, chunkSize):
        try:
            # chunksize = 16777216 #16MB
            self.log.debug("reading file '" + str(sourceFilePathFull)+ "' to memory")

            # FIXME: chunk is read-out as str. why not as bin? will probably add to much overhead to zmq-message
            fileContentAsByteObject = fileDescriptor.read(chunkSize)

            while fileContentAsByteObject != "":
                payload.append(fileContentAsByteObject)
                fileContentAsByteObject = fileDescriptor.read(chunkSize)
        except Exception, e:
            raise Exception(str(e))



    def buildPayloadMetadata(self, filename, filesize, fileModificationTime, sourcePath, relativePath):
        """
        builds metadata for zmq-multipart-message. should be used as first element for payload.
        :param filename:
        :param filesize:
        :param fileModificationTime:
        :param sourcePath:
        :param relativePath:
        :return:
        """

        #add metadata to multipart
        self.log.debug("create metadata for source file...")
        metadataDict = {
                         "filename"             : filename,
                         "filesize"             : filesize,
                         "fileModificationTime" : fileModificationTime,
                         "sourcePath"           : sourcePath,
                         "relativePath"         : relativePath,
                         "chunkSize"            : self.getChunkSize()}

        self.log.debug("metadataDict = " + str(metadataDict))

        return metadataDict

    def getChunkSize(self):
        return self.zmqMessageChunkSize


    def showFilesystemStatistics(self, vfsPath):
        statvfs = os.statvfs(vfsPath)
        totalSize                 = statvfs.f_frsize * statvfs.f_blocks
        freeBytes                 = statvfs.f_frsize * statvfs.f_bfree
        freeSpaceAvailableForUser = statvfs.f_frsize * statvfs.f_bavail  #in bytes
        freeSpaceAvailableForUser_gigabytes = freeSpaceAvailableForUser / 1024 / 1024 / 1024
        freeUserSpaceLeft_percent = ( float(freeBytes) / float(totalSize) ) * 100

        # print "{number:.{digits}f}".format(number=freeUserSpaceLeft_percent, digits=0)
        # print int(freeUserSpaceLeft_percent)

        self.log.debug("vfsstat: freeSpaceAvailableForUser=" + str(freeSpaceAvailableForUser_gigabytes)+ " Gigabytes "
                       + " (" + str(int(freeUserSpaceLeft_percent)) + "% free disk space left)")

        #warn if disk space is running low
        highWaterMark = 85
        if int(freeUserSpaceLeft_percent) >= int(highWaterMark):
            self.log.warning("Running low in disk space! " + str(int(freeUserSpaceLeft_percent)) + "% free disk space left.")


    def stop(self):
        self.log.debug("Sending stop signal to cleaner from worker-" + str(self.id))
        self.cleanerSocket.send("STOP")
        self.log.info("Closing sockets for worker " + str(self.id))
        if self.zmqDataStreamSocket:
            self.zmqDataStreamSocket.close(0)
        self.routerSocket.close(0)
        self.cleanerSocket.close(0)
        if not self.externalContext:
            self.log.debug("Destroying context")
            self.zmqContextForWorker.destroy()

#
#  --------------------------  class: FileMover  --------------------------------------
#
class FileMover():
    zmqContext          = None
    fileEventIp         = None      # serverIp for incoming messages
    fileEventPort       = None
    dataStreamIp        = None      # ip of dataStream-socket to push new files to
    dataStreamPort      = None      # port number of dataStream-socket to push new files to
    zmqCleanerIp        = None      # zmq pull endpoint, responsable to delete/move files
    zmqCleanerPort      = None      # zmq pull endpoint, responsable to delete/move files
    receiverComIp       = None      # ip for socket to communicate with receiver
    receiverComPort     = None      # port for socket to communicate receiver
    receiverWhiteList   = None
    parallelDataStreams = None
    chunkSize           = None
    fileEventSocket     = None      # to receive fileMove-jobs as json-encoded dictionary
    receiverComSocket   = None      # to exchange messages with the receiver
    routerSocket        = None
    useLiveViewer       = False     # boolian to inform if the receiver to show the files in the live viewer is running
    # to get the logging only handling this class
    log                   = None


    def __init__(self, fileEventIp, fileEventPort, dataStreamIp, dataStreamPort,
                 receiverComPort, receiverWhiteList,
                 parallelDataStreams, chunkSize,
                 zmqCleanerIp, zmqCleanerPort,
                 context = None):

        assert isinstance(context, zmq.sugar.context.Context)

        self.zmqContext          = context or zmq.Context()
        self.fileEventIp         = fileEventIp
        self.fileEventPort       = fileEventPort
        self.dataStreamIp        = dataStreamIp
        self.dataStreamPort      = dataStreamPort
        self.zmqCleanerIp        = zmqCleanerIp
        self.zmqCleanerPort      = zmqCleanerPort
        self.receiverComIp       = dataStreamIp         # ip for socket to communicate with receiver; is the same ip as the data stream ip
        self.receiverComPort     = receiverComPort
        self.receiverWhiteList   = receiverWhiteList
        self.parallelDataStreams = parallelDataStreams
        self.chunkSize           = chunkSize


        self.log = self.getLogger()
        self.log.debug("Init")

        # create zmq socket for incoming file events
        self.fileEventSocket         = self.zmqContext.socket(zmq.PULL)
        connectionStrFileEventSocket = "tcp://{ip}:{port}".format(ip=self.fileEventIp, port=self.fileEventPort)
        self.fileEventSocket.bind(connectionStrFileEventSocket)
Manuela Kuhn's avatar
Manuela Kuhn committed
        self.log.debug("fileEventSocket started (bind) for '" + connectionStrFileEventSocket + "'")


        # create zmq socket for communitation with receiver
        self.receiverComSocket         = self.zmqContext.socket(zmq.REP)
        connectionStrReceiverComSocket = "tcp://{ip}:{port}".format(ip=self.receiverComIp, port=self.receiverComPort)
Manuela Kuhn's avatar
Manuela Kuhn committed
        print "connectionStrReceiverComSocket", connectionStrReceiverComSocket
        self.receiverComSocket.bind(connectionStrReceiverComSocket)
Manuela Kuhn's avatar
Manuela Kuhn committed
        self.log.debug("receiverComSocket started (bind) for '" + connectionStrReceiverComSocket + "'")
        # Poller to get either messages from the watcher or communication messages to stop sending data to the live viewer
        self.poller = zmq.Poller()
        self.poller.register(self.fileEventSocket, zmq.POLLIN)
        self.poller.register(self.receiverComSocket, zmq.POLLIN)
        # setting up router for load-balancing worker-processes.
        # each worker-process will handle a file event
        routerIp   = "127.0.0.1"
        routerPort = "50000"

        self.routerSocket         = self.zmqContext.socket(zmq.ROUTER)
        connectionStrRouterSocket = "tcp://{ip}:{port}".format(ip=routerIp, port=routerPort)
        self.routerSocket.bind(connectionStrRouterSocket)
Manuela Kuhn's avatar
Manuela Kuhn committed
        self.log.debug("routerSocket started (bind) for '" + connectionStrRouterSocket + "'")


    def process(self):
        try:
            self.startReceiving()
        except zmq.error.ZMQError as e:
            self.log.error("ZMQError: "+ str(e))
            self.log.debug("Shutting down workerProcess.")
        except:
            trace = traceback.format_exc()
            self.log.info("Stopping fileMover due to unknown error condition.")
            self.log.debug("Error was: " + str(trace))



    def getLogger(self):
        logger = logging.getLogger("fileMover")
        return logger


    def startReceiving(self):
        self.log.debug("new message-socket crated for: new file events.")

        incomingMessageCounter = 0

        #start worker-processes. each will have its own PushSocket.
        workerProcessList      = list()
        numberOfWorkerProcesses = int(self.parallelDataStreams)
        for processNumber in range(numberOfWorkerProcesses):
            self.log.debug("instantiate new workerProcess (nr " + str(processNumber) + " )")
            newWorkerProcess = Process(target=WorkerProcess, args=(processNumber,
                                                                  self.dataStreamIp,
                                                                  self.dataStreamPort,
                                                                  self.chunkSize,
                                                                  self.zmqCleanerIp,
                                                                  self.zmqCleanerPort))
            workerProcessList.append(newWorkerProcess)
            self.log.debug("start worker process nr " + str(processNumber))
            newWorkerProcess.start()

        #run loop, and wait for incoming messages
        continueReceiving = True
        self.log.debug("waiting for new fileEvent-messages")
        try:
            while continueReceiving:
                socks = dict(self.poller.poll())

                if self.fileEventSocket in socks and socks[self.fileEventSocket] == zmq.POLLIN:
                    try:
                        incomingMessage = self.fileEventSocket.recv()
                        self.log.debug("new fileEvent-message received.")
                        self.log.debug("message content: " + str(incomingMessage))
                        incomingMessageCounter += 1

                        self.log.debug("processFileEvent..." + str(incomingMessageCounter))
                        self.processFileEvent(incomingMessage)  #TODO refactor as separate process to emphasize unblocking
                        self.log.debug("processFileEvent...done")
                    except Exception, e:
                        self.log.error("Failed to receive new fileEvent-message.")
                        self.log.error(sys.exc_info())

                        #TODO might using a error-count and threshold when to stop receiving, e.g. after 100 misses?
                        # continueReceiving = False

                if self.receiverComSocket in socks and socks[self.receiverComSocket] == zmq.POLLIN:
                    # signals are of the form [signal, hostname]
                    incomingMessage = self.receiverComSocket.recv()
                    self.log.debug("Recieved control command: %s" % incomingMessage )

                    signal         = None
                    signalHostname = None

                    try:
                        incomingMessage = incomingMessage.split(',')
                        signal          = incomingMessage[0]
                        signalHostname  = incomingMessage[1]
                    except Exception as e:
                        self.log.info("Received live viewer signal from host " + str(signalHostname) + " is of the wrong format")
                        self.receiverComSocket.send("NO_VALID_SIGNAL", zmq.NOBLOCK)
                        continue

                    self.log.debug("Check if signal sending host is in WhiteList...")
                    if signalHostname in self.receiverWhiteList:
                        self.log.info("Check if signal sending host is in WhiteList...Host " + str(signalHostname) + " is allowed to connect.")
                    else:
                        self.log.info("Check if signal sending host is in WhiteList...Host " + str(signalHostname) + " is not allowed to connect.")
                        self.log.debug("Signal from host " + str(signalHostname) + " is discarded.")
                        print "Signal from host " + str(signalHostname) + " is discarded."
                        self.receiverComSocket.send("NO_VALID_HOST", zmq.NOBLOCK)
                    if signal == "STOP_LIVE_VIEWER":
                        self.log.info("Received live viewer stop signal from host " + str(signalHostname) + "...stopping live viewer")
                        print "Received live viewer stop signal from host " + signalHostname + "...stopping live viewer"
                        self.useLiveViewer = False
                        self.sendLiveViewerSignal(signal)
                    elif signal == "START_LIVE_VIEWER":
                        self.log.info("Received live viewer start signal from host " + str(signalHostname) + "...starting live viewer")
                        print "Received live viewer start signal from host " + str(signalHostname) + "...starting live viewer"
                        self.useLiveViewer = True
                        self.sendLiveViewerSignal(signal)
                    else:
                        self.log.info("Received live viewer signal from host " + str(signalHostname) + " unkown: " + str(signal))
                        self.receiverComSocket.send("NO_VALID_SIGNAL", zmq.NOBLOCK)

        except KeyboardInterrupt:
            self.log.info("Keyboard interuption detected. Stop receiving")



    def processFileEvent(self, fileEventMessage):
        self.log.debug("waiting for available workerProcess.")

        # address == "worker-0"
        # empty   == b''                   # as delimiter
        # ready   == b'READY'
        address, empty, ready = self.routerSocket.recv_multipart()
        self.log.debug("available workerProcess detected.")
        self.log.debug("passing job to workerProcess...")
        self.routerSocket.send_multipart([
                                     address,
                                     b'',
                                     fileEventMessage,
                                    ])

        self.log.debug("passing job to workerProcess...done.")
    def sendLiveViewerSignal(self, signal):
        numberOfWorkerProcesses = int(self.parallelDataStreams)
        for processNumber in range(numberOfWorkerProcesses):
            self.log.debug("send live viewer signal " + str(signal) + " to workerProcess (nr " + str(processNumber) + " )")

            address, empty, ready = self.routerSocket.recv_multipart()
            self.log.debug("available workerProcess detected.")

            # address == "worker-0"
            # empty   == b''                   # as delimiter
            # signal  == b'START_LIVE_VIEWER'
            self.routerSocket.send_multipart([
                                         address,
                                         b'',
                                         signal,
                                        ])
            self.receiverComSocket.send(signal, zmq.NOBLOCK)
        self.log.debug("Closing sockets")
        self.fileEventSocket.close(0)
        self.receiverComSocket.close(0)
        self.routerSocket.close(0)
class Sender():
    logfilePath         = None
    logfileName         = None
    logfileFullPath     = None
    verbose             = None
    watchFolder         = None
    monitoredSubfolders = None
    monitoredSuffixes   = None
    fileEventIp         = None
    fileEventPort       = None

    dataStreamIp        = None
    dataStreamPort      = None
    cleanerTargetPath   = None
    zmqCleanerIp        = None
    zmqCleanerPort      = None
    receiverComPort     = None
    receiverWhiteList   = None

    parallelDataStreams = None
    chunkSize           = None
    def __init__(self, verbose = True):
        defConf                  = defaultConfigSender()
        self.logfilePath         = defConf.logfilePath
        self.logfileName         = defConf.logfileName
        self.logfileFullPath     = os.path.join(self.logfilePath, self.logfileName)
        self.verbose             = verbose

        self.watchFolder         = defConf.watchFolder
        self.monitoredSubfolders = defConf.monitoredSubfolders
        self.monitoredFormats    = defConf.monitoredFormats
        self.fileEventIp         = defConf.fileEventIp
        self.fileEventPort       = defConf.fileEventPort

        self.dataStreamIp        = defConf.dataStreamIp
        self.dataStreamPort      = defConf.dataStreamPort
        self.cleanerTargetPath   = defConf.cleanerTargetPath
        self.zmqCleanerIp        = defConf.zmqCleanerIp
        self.zmqCleanerPort      = defConf.zmqCleanerPort
        self.receiverComPort     = defConf.receiverComPort
        self.receiverWhiteList   = defConf.receiverWhiteList

        self.parallelDataStreams = defConf.parallelDataStreams
        self.chunkSize           = defConf.chunkSize

        #enable logging
        helperScript.initLogging(self.logfileFullPath, self.verbose)


        #create zmq context
        # there should be only one context in one process
        self.zmqContext = zmq.Context.instance()
        logging.info("registering zmq global context")

        self.run()


    def run(self):
        logging.debug("start watcher process...")
        watcherProcess = Process(target=DirectoryWatcher, args=(self.fileEventIp, self.watchFolder, self.fileEventPort, self.monitoredSubfolders, self.monitoredFormats, self.zmqContext))
        logging.debug("watcher process registered")
        watcherProcess.start()
        logging.debug("start watcher process...done")

        logging.debug("start cleaner process...")
        cleanerProcess = Process(target=Cleaner, args=(self.cleanerTargetPath, self.zmqCleanerIp, self.zmqCleanerPort, self.zmqContext))
        logging.debug("cleaner process registered")
        cleanerProcess.start()
        logging.debug("start cleaner process...done")

        #start new fileMover
        fileMover = FileMover(self.fileEventIp, self.fileEventPort, self.dataStreamIp, self.dataStreamPort,
                              self.receiverComPort, self.receiverWhiteList,
                              self.parallelDataStreams, self.chunkSize,
                              self.zmqCleanerIp, self.zmqCleanerPort,
                              self.zmqContext)
        try:
            fileMover.process()
        except KeyboardInterrupt:
            logging.info("Keyboard interruption detected. Shutting down")
        # except Exception, e:
        #     print "unknown exception detected."


        logging.debug("shutting down zeromq...")
        try:
            fileMover.stop()
            logging.debug("shutting down zeromq...done.")
        except:
            logging.error(sys.exc_info())
            logging.error("shutting down zeromq...failed.")

        # give the other processes time to close the sockets
        time.sleep(0.1)
        try:
            logging.debug("closing zmqContext...")
Manuela Kuhn's avatar
Manuela Kuhn committed
            self.zmqContext.destroy()
            logging.debug("closing zmqContext...done.")
        except:
            logging.debug("closing zmqContext...failed.")
            logging.error(sys.exc_info())



if __name__ == '__main__':
    freeze_support()    #see https://docs.python.org/2/library/multiprocessing.html#windows

    parser = argparse.ArgumentParser()
    parser.add_argument("--verbose", action="store_true", help="more verbose output")
    arguments = parser.parse_args()

    sender = Sender(arguments.verbose)