# API to communicate with a data transfer unit from __future__ import print_function import zmq import socket import logging import sys import json class dataTransferQuery(): context = None externalContext = True signalIp = None signalPort = None dataIp = None hostname = None dataPort = None signalSocket = None dataSocket = None log = None streamStarted = False queryNewestStarted = False queryMetadataStarted = False socketResponseTimeout = None def __init__(self, signalPort, signalIp, dataPort, dataIp = "0.0.0.0", useLog = False, context = None): if useLog: self.log = logging.getLogger("dataTransferAPI") else: class loggingFunction: def __init__(self): self.debug = lambda x: print(x) self.info = lambda x: print(x) self.warning = lambda x: print(x) self.error = lambda x: print(x) self.critical = lambda x: print(x) self.log = loggingFunction() # ZMQ applications always start by creating a context, # and then using that for creating sockets # (source: ZeroMQ, Messaging for Many Applications by Pieter Hintjens) if context: self.context = context self.externalContext = True else: self.context = zmq.Context() self.externalContext = False self.signalIp = signalIp self.signalPort = signalPort self.dataIp = dataIp self.hostname = socket.gethostname() self.dataPort = dataPort self.socketResponseTimeout = 1000 # To send a notification that a Displayer is up and running, a communication socket is needed # create socket to exchange signals with Sender self.signalSocket = self.context.socket(zmq.REQ) # time to wait for the sender to give a confirmation of the signal # self.signalSocket.RCVTIMEO = self.socketResponseTimeout connectionStr = "tcp://" + str(self.signalIp) + ":" + str(self.signalPort) try: self.signalSocket.connect(connectionStr) self.log.info("signalSocket started (connect) for '" + connectionStr + "'") except Exception as e: self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'") self.log.debug("Error was:" + str(e)) # using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO) self.poller = zmq.Poller() self.poller.register(self.signalSocket, zmq.POLLIN) ## # # Initailizes the data transfer socket and # signals the sender which kind of connection should be used # # Returns 0 if the connection could be initializes without errors # Error Codes are: # 10 if the connection type is not supported # 11 if there is already one connection running # 12 Could not send signal # 13 Could not poll new for new message # 14 Could not receive answer to signal # 15 if the response was not correct # ## def initConnection(self, connectionType): supportedConnections = ["stream", "queryNewest", "queryMetadata"] if connectionType not in supportedConnections: self.log.info("Chosen type of connection is not supported.") return 10 alreadyConnected = self.streamStarted or self.queryNewestStarted or self.queryMetadataStarted signal = None if connectionType == "stream" and not alreadyConnected: signal = "START_LIVE_VIEWER" elif connectionType == "queryNewest" and not alreadyConnected: signal = "START_REALTIME_ANALYSIS" elif connectionType == "queryMetadata" and not alreadyConnected: signal = "START_DISPLAYER" else: self.log.info("Other connection type already runnging.") self.log.info("More than one connection type is currently not supported.") return 11 # Send the signal that the communication infrastructure should be established self.log.info("Sending Start Signal") sendMessage = str(signal) + "," + str(self.hostname) + "," + str(self.dataPort) try: self.signalSocket.send(sendMessage) except Exception as e: self.log.info("Could not send signal") self.log.info("Error was: " + str(e)) return 12 message = None try: socks = dict(self.poller.poll(self.socketResponseTimeout)) except Exception as e: self.log.info("Could not poll for new message") self.log.info("Error was: " + str(e)) return 13 # if there was a response if self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN: try: # Get the reply. message = self.signalSocket.recv() self.log.info("Received answer to signal: " + str(message) ) except KeyboardInterrupt: self.log.error("KeyboardInterrupt: No message received") self.stop() return 14 except Exception as e: self.log.info("Could not receive answer to signal") self.log.debug("Error was: " + str(e)) self.stop() return 14 # if the response was correct if message and message.startswith(signal): self.log.info("Received confirmation ...start receiving files") if connectionType == "stream": self.dataSocket = self.context.socket(zmq.PULL) # An additional socket is needed to establish the data retriving mechanism connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort) try: self.dataSocket.bind(connectionStr) self.log.info("dataSocket started (bind) for '" + connectionStr + "'") except Exception as e: self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'") self.log.debug("Error was:" + str(e)) self.streamStarted = True elif connectionType == "queryNewest": self.dataSocket = self.context.socket(zmq.REQ) # An additional socket is needed to establish the data retriving mechanism connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort) try: self.dataSocket.connect(connectionStr) self.log.info("dataSocket started (bind) for '" + connectionStr + "'") except Exception as e: self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'") self.log.debug("Error was:" + str(e)) self.queryNewestStarted = True elif connectionType == "queryMetadata": self.dataSocket = self.context.socket(zmq.REQ) # An additional socket is needed to establish the data retriving mechanism connectionStr = "tcp://" + str(self.dataIp) + ":" + str(self.dataPort) try: self.dataSocket.connect(connectionStr) self.log.info("dataSocket started (bind) for '" + connectionStr + "'") except Exception as e: self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'") self.log.debug("Error was:" + str(e)) self.queryMetadataStarted = True # if there was no response or the response was of the wrong format, the receiver should be shut down else: self.log.error("Sending start signal ...failed.") sys.exit(1) return 15 return 0 ## # # Receives or queries for new files depending on the connection initialized # # returns either # the next file # (if connection type "stream" was choosen) # the newest file # (if connection type "queryNewest" was choosen) # the path of the newest file # (if connection type "queryMetadata" was choosen) # ## def getData(self): if self.streamStarted: #run loop, and wait for incoming messages self.log.debug("Waiting for new messages...") try: return self.__getMultipartMessage() except KeyboardInterrupt: self.log.debug("Keyboard interrupt detected. Stop receiving.") return None except Exception, e: self.log.error("Unknown error while receiving files. Need to abort.") self.log.debug("Error was: " + str(e)) return None elif self.queryNewestStarted or self.queryMetadataStarted: sendMessage = "NEXT_FILE" self.log.info("Asking for next file with message " + str(sendMessage)) try: self.dataSocket.send(sendMessage) except Exception as e: self.log.info("Could not send request to dataSocket") self.log.info("Error was: " + str(e)) return None try: # Get the reply. if self.queryNewestStarted: message = self.dataSocket.recv_multipart() else: message = self.dataSocket.recv() except Exception as e: message = "" self.log.info("Could not receive answer to request") self.log.info("Error was: " + str(e)) return None return message else: self.log.info("Could not communicate, no connection was initialized.") return None def __getMultipartMessage(self): receivingMessages = True #save all chunks to file multipartMessage = self.dataSocket.recv_multipart() #extract multipart message try: #TODO is string conversion needed here? metadata = str(multipartMessage[0]) except: self.log.error("An empty config was transferred for the multipart-message.") #TODO validate multipartMessage (like correct dict-values for metadata) #extraction metadata from multipart-message try: metadataDict = json.loads(metadata) except Exception as e: self.log.error("Could not extract metadata from the multipart-message.") self.log.debug("Error was:" + str(e)) try: payload = multipartMessage[1:] except Exception as e: self.log.warning("An empty file was received within the multipart-message") self.log.debug("Error was:" + str(e)) payload = None return [metadata, payload] ## # # Send signal that the displayer is quitting, close ZMQ connections, destoying context # ## def stop(self): if self.dataSocket: if self.streamStarted: signal = "STOP_LIVE_VIEWER" elif self.queryNewestStarted: signal = "STOP_REALTIME_ANALYSIS" elif self.queryMetadataStarted: signal = "STOP_DISPLAYER" self.log.info("Sending Stop Signal") sendMessage = str(signal) + "," + str(self.hostname) + "," + str(self.dataPort) try: self.signalSocket.send (sendMessage) # Get the reply. message = self.signalSocket.recv() self.log.info("Recieved signal: " + message) except Exception as e: self.log.info("Could not communicate") self.log.info("Error was: " + str(e)) try: if self.signalSocket: self.log.info("closing signalSocket...") self.signalSocket.close(linger=0) self.signalSocket = None if self.dataSocket: self.log.info("closing dataSocket...") self.dataSocket.close(linger=0) self.dataSocket = None except Exception as e: self.log.info("closing ZMQ Sockets...failed.") self.log.info("Error was: " + str(e)) if not self.externalContext: try: if self.context: self.log.info("closing zmqContext...") self.context.destroy() self.context = None self.log.info("closing zmqContext...done.") except Exception as e: self.log.info("closing zmqContext...failed.") self.log.info("Error was: " + str(e)) def __exit__(self): self.stop() def __del__(self): self.stop()