From a15f5ed06d4174d0529c4cd3189d055db957a89d Mon Sep 17 00:00:00 2001
From: Manuela Kuhn <manuela.kuhn@desy.de>
Date: Wed, 16 Mar 2016 16:09:24 +0100
Subject: [PATCH] Tested dataIngest and nexusTransfer APIs together

---
 APIs/dataIngestAPI.py                         |  15 +-
 APIs/nexusTransferAPI.py                      | 121 +++++++++----
 src/sender/DataDispatcher.py                  |  61 +++++--
 src/sender/DataManager.py                     |   5 +-
 src/sender/SignalHandler.py                   |   3 +-
 src/sender/TaskProvider.py                    |   2 +
 src/sender/dataFetchers/getFromQueue.py       |  18 +-
 test/API/test_dataIngest.py                   |   2 +-
 test/API/test_dataIngestWithNexusTransfer.py  | 169 ++++++++++++++++++
 ...usTransfer.py => test_nexusTransferAPI.py} |   2 +-
 10 files changed, 325 insertions(+), 73 deletions(-)
 create mode 100644 test/API/test_dataIngestWithNexusTransfer.py
 rename test/API/{test_nexusTransfer.py => test_nexusTransferAPI.py} (97%)

diff --git a/APIs/dataIngestAPI.py b/APIs/dataIngestAPI.py
index 3fc309d5..a2523b35 100644
--- a/APIs/dataIngestAPI.py
+++ b/APIs/dataIngestAPI.py
@@ -50,8 +50,9 @@ class dataIngest():
 
         # has to be the same port as configured in dataManager.conf as eventPort
         self.eventPort   = "50003"
-        # has to be the same port as configured in dataManager.conf as fixedStreamPort #TODO change that to a different port
-        self.dataPort    = "50100"
+        #TODO add port in config
+        # has to be the same port as configured in dataManager.conf as ...
+        self.dataPort    = "50010"
 
         self.eventSocket = None
         self.dataSocket  = None
@@ -137,7 +138,11 @@ class dataIngest():
         # send close-signal to signal socket
         sendMessage = "CLOSE_FILE"
         self.signalSocket.send(sendMessage)
-        self.log.info("Sending signal to close the file.")
+        self.log.info("Sending signal to close the file to signalSocket.")
+
+        # send close-signal to event Detector
+        self.eventSocket.send(sendMessage)
+        self.log.debug("Sending signal to close the file to eventSocket.(sendMessage=" + sendMessage + ")")
 
         recvMessage = self.signalSocket.recv()
 
@@ -146,10 +151,6 @@ class dataIngest():
             self.log.debug("send message: " + str(sendMessage))
             raise Exception("Something went wrong while notifying to close the file")
 
-        # send close-signal to event Detector
-        self.eventSocket.send(sendMessage)
-        self.log.debug("Sending signal to close the file.")
-
         self.openFile = None
         self.filePart = None
 
diff --git a/APIs/nexusTransferAPI.py b/APIs/nexusTransferAPI.py
index fd8edf49..43b3e203 100644
--- a/APIs/nexusTransferAPI.py
+++ b/APIs/nexusTransferAPI.py
@@ -45,14 +45,18 @@ class nexusTransfer():
             self.externalContext = False
 
 
-        self.extHost      = "0.0.0.0"
+        self.extHost         = "0.0.0.0"
 
-        self.signalPort   = "50050"
-        self.dataPort     = "50100"
+        self.signalPort      = "50050"
+        self.dataPort        = "50100"
 
-        self.signalSocket = None
-        self.dataSocket   = None
+        self.signalSocket    = None
+        self.dataSocket      = None
 
+        self.numberOfStreams = None
+        self.recvdCloseFrom  = []
+        self.replyToSignal   = False
+        self.allCloseRecvd   = False
 
         self.__createSockets()
 
@@ -63,7 +67,7 @@ class nexusTransfer():
         connectionStr     = "tcp://" + str(self.extHost) + ":" + str(self.signalPort)
         try:
             self.signalSocket.bind(connectionStr)
-            logging.info("signalSocket started (bind) for '" + connectionStr + "'")
+            self.log.info("signalSocket started (bind) for '" + connectionStr + "'")
         except:
             self.log.error("Failed to start signalSocket (bind): '" + connectionStr + "'", exc_info=True)
 
@@ -82,56 +86,95 @@ class nexusTransfer():
 
     def read(self):
 
-        socks = dict(self.poller.poll())
+        while True:
+            self.log.debug("polling")
+            try:
+                socks = dict(self.poller.poll())
+            except:
+                break
 
-        if self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
+            if self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
+                self.log.debug("signalSocket is polling")
 
-            message = self.signalSocket.recv()
-            logging.debug("signalSocket recv: " + message)
+                message = self.signalSocket.recv()
+                self.log.debug("signalSocket recv: " + message)
 
-            self.signalSocket.send(message)
-            logging.debug("signalSocket send: " + message)
+                if message == b"CLOSE_FILE" and not self.allCloseRecvd:
+                    self.replyToSignal = message
+                else:
+                    self.signalSocket.send(message)
+                    self.log.debug("signalSocket send: " + message)
 
-            return message
+                    return message
 
-        if self.dataSocket in socks and socks[self.dataSocket] == zmq.POLLIN:
+            if self.dataSocket in socks and socks[self.dataSocket] == zmq.POLLIN:
+                self.log.debug("dataSocket is polling")
 
-            try:
-                return self.__getMultipartMessage()
-            except KeyboardInterrupt:
-                self.log.debug("Keyboard interrupt detected. Stopping to receive.")
-                raise
-            except:
-                self.log.error("Unknown error while receiving files. Need to abort.", exc_info=True)
-                return None, None
+                try:
+                    return self.__getMultipartMessage()
+                except KeyboardInterrupt:
+                    self.log.debug("Keyboard interrupt detected. Stopping to receive.")
+                    raise
+                except:
+                    self.log.error("Unknown error while receiving files. Need to abort.", exc_info=True)
+                    return None, None
 
 
 
     def __getMultipartMessage(self):
 
-        #save all chunks to file
-        multipartMessage = self.dataSocket.recv_multipart()
+        try:
+            multipartMessage = self.dataSocket.recv_multipart()
+            self.log.debug("multipartMessage=" + str(multipartMessage))
+        except:
+            self.log.error("Could not receive data due to unknown error.", exc_info=True)
+
 
         if len(multipartMessage) < 2:
             self.log.error("Received mutipart-message is too short. Either config or file content is missing.")
             self.log.debug("multipartMessage=" + str(mutipartMessage))
+            #TODO return errorcode
 
-        #extract multipart message
-        try:
-            metadata = cPickle.loads(multipartMessage[0])
-        except:
-            self.log.error("Could not extract metadata from the multipart-message.", exc_info=True)
-            metadata = None
+        if multipartMessage[0] == b"CLOSE_FILE":
+            id = multipartMessage[1]
+            self.recvdCloseFrom.append(id)
+            self.log.debug("Received close-file-signal from DataDispatcher-" + id)
 
-        #TODO validate multipartMessage (like correct dict-values for metadata)
+            # get number of signals to wait for
+            if not self.numberOfStreams:
+                self.numberOfStreams = int(id.split("/")[1])
 
-        try:
-            payload = multipartMessage[1:]
-        except:
-            self.log.warning("An empty file was received within the multipart-message", exc_info=True)
-            payload = None
+            # have all signals arrived?
+            if len(self.recvdCloseFrom) == self.numberOfStreams:
+                self.log.info("All close-file-signals arrived")
+                self.allCloseRecvd = True
+                if self.replyToSignal:
+                    self.signalSocket.send(self.replyToSignal)
+                    self.log.debug("signalSocket send: " + self.replyToSignal)
+                    self.replyToSignal = False
+                else:
+                    pass
+
+                return "CLOSE_FILE"
+
+
+        else:
+            #extract multipart message
+            try:
+                metadata = cPickle.loads(multipartMessage[0])
+            except:
+                self.log.error("Could not extract metadata from the multipart-message.", exc_info=True)
+                metadata = None
+
+            #TODO validate multipartMessage (like correct dict-values for metadata)
+
+            try:
+                payload = multipartMessage[1:]
+            except:
+                self.log.warning("An empty file was received within the multipart-message", exc_info=True)
+                payload = None
 
-        return [metadata, payload]
+            return [metadata, payload]
 
 
     ##
@@ -143,11 +186,11 @@ class nexusTransfer():
 
         try:
             if self.signalSocket:
-                logging.info("closing signalSocket...")
+                self.log.info("closing signalSocket...")
                 self.signalSocket.close(linger=0)
                 self.signalSocket = None
             if self.dataSocket:
-                logging.info("closing dataSocket...")
+                self.log.info("closing dataSocket...")
                 self.dataSocket.close(linger=0)
                 self.dataSocket = None
         except:
diff --git a/src/sender/DataDispatcher.py b/src/sender/DataDispatcher.py
index 8aa4f0ef..82817bcf 100644
--- a/src/sender/DataDispatcher.py
+++ b/src/sender/DataDispatcher.py
@@ -47,8 +47,8 @@ class DataDispatcher():
 #                }
 
 
-        self.id            = id
-        self.log           = self.getLogger(logQueue)
+        self.id              = id
+        self.log             = self.getLogger(logQueue)
 
         self.log.debug("DataDispatcher-" + str(self.id) + " started (PID " + str(os.getpid()) + ").")
 
@@ -91,9 +91,11 @@ class DataDispatcher():
         try:
             self.process()
         except KeyboardInterrupt:
-            self.log.debug("KeyboardInterrupt detected. Shutting down DataDispatcher Nr. " + str(self.id) + ".")
+            self.log.debug("KeyboardInterrupt detected. Shutting down DataDispatcher-" + str(self.id) + ".")
+            self.stop()
         except:
-            self.log.error("Stopping DataDispatcher Nr " + str(self.id) + " due to unknown error condition.", exc_info=True)
+            self.log.error("Stopping DataDispatcher-" + str(self.id) + " due to unknown error condition.", exc_info=True)
+            self.stop()
 
 
     def createSockets(self):
@@ -135,19 +137,45 @@ class DataDispatcher():
                     targets.insert(0,[self.fixedStreamId, 0])
                 # sort the target list by the priority
                 targets = sorted(targets, key=lambda target: target[1])
-            else:
-                closeFile = message[0] == b"CLOSE_FILE"
-                if closeFile:
-                    self.log.debug("Router requested to send signal that file was closed.")
-                    time.sleep(2)
-                    self.log.debug("Continue after sleeping.")
-                    continue
-
-                finished = message[0] == b"EXIT"
-                if finished:
-                    self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
-                    break
 
+            elif message[0] == b"CLOSE_FILE":
+                self.log.debug("Router requested to send signal that file was closed.")
+                payload = [ metadata, self.id ]
+
+                # socket already known
+                if self.fixedStreamId in openConnections:
+                    tracker = openConnections[self.fixedStreamId].send_multipart(payload, copy=False, track=True)
+                    log.info("Sending close file signal to '" + self.fixedStreamId + "' with priority 0")
+                else:
+                    # open socket
+                    socket        = context.socket(zmq.PUSH)
+                    connectionStr = "tcp://" + str(self.fixedStreamId)
+
+                    socket.connect(connectionStr)
+                    log.info("Start socket (connect): '" + str(connectionStr) + "'")
+
+                    # register socket
+                    openConnections[self.fixedStreamId] = socket
+
+                    # send data
+                    tracker = openConnections[self.fixedStreamId].send_multipart(payload, copy=False, track=True)
+                    log.info("Sending close file signal to '" + self.fixedStreamId + "' with priority 0" )
+
+                # socket not known
+                if not tracker.done:
+                    log.info("Close file signal has not been sent yet, waiting...")
+                    tracker.wait()
+                    log.info("Close file signal has not been sent yet, waiting...done")
+
+                time.sleep(2)
+                self.log.debug("Continue after sleeping.")
+                continue
+
+            elif message[0] == b"EXIT":
+                self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
+                break
+
+            else:
                 workload = cPickle.loads(message[0])
                 if self.fixedStreamId:
                     targets = [[self.fixedStreamId, 0]]
@@ -197,6 +225,7 @@ class DataDispatcher():
     def __del__(self):
         self.stop()
 
+
 if __name__ == '__main__':
     from multiprocessing import Process, freeze_support, Queue
     import time
diff --git a/src/sender/DataManager.py b/src/sender/DataManager.py
index e8d5d977..71096b72 100644
--- a/src/sender/DataManager.py
+++ b/src/sender/DataManager.py
@@ -311,7 +311,8 @@ class DataManager():
         self.taskProviderPr = Process ( target = TaskProvider, args = (self.eventDetectorConfig, self.requestFwPort, self.routerPort, self.logQueue) )
         self.taskProviderPr.start()
 
-        for id in range(self.numberOfStreams):
+        for i in range(self.numberOfStreams):
+            id = str(i) + "/" + str(self.numberOfStreams)
             pr = Process ( target = DataDispatcher, args = ( id, self.routerPort, self.chunkSize, self.fixedStreamId, self.logQueue, self.localTarget) )
             pr.start()
             self.dataDispatcherPr.append(pr)
@@ -331,7 +332,7 @@ class DataManager():
             self.log.info("terminate TaskProvider...done")
 
         for pr in self.dataDispatcherPr:
-            id = self.dataDispatcherPr.index(pr)
+            id = str(self.dataDispatcherPr.index(pr)) + "/" + str(self.numberOfStreams)
             self.log.info("terminate DataDispatcher-" + str(id) + "...")
             pr.terminate()
             pr = None
diff --git a/src/sender/SignalHandler.py b/src/sender/SignalHandler.py
index 8d0ba54f..519fb748 100644
--- a/src/sender/SignalHandler.py
+++ b/src/sender/SignalHandler.py
@@ -72,9 +72,10 @@ class SignalHandler():
         try:
             self.run()
         except KeyboardInterrupt:
-            pass
+            self.stop()
         except:
             self.log.error("Stopping signalHandler due to unknown error condition.", exc_info=True)
+            self.stop()
 
 
     # Send all logs to the main process
diff --git a/src/sender/TaskProvider.py b/src/sender/TaskProvider.py
index e3e3fb80..318fdba9 100644
--- a/src/sender/TaskProvider.py
+++ b/src/sender/TaskProvider.py
@@ -98,8 +98,10 @@ class TaskProvider():
             self.run()
         except KeyboardInterrupt:
             self.log.debug("Keyboard interruption detected. Shuting down")
+            self.stop()
         except:
             self.log.info("Stopping TaskProvider due to unknown error condition.", exc_info=True)
+            self.stop()
 
 
     # Send all logs to the main process
diff --git a/src/sender/dataFetchers/getFromQueue.py b/src/sender/dataFetchers/getFromQueue.py
index 7b7722af..0a811d78 100644
--- a/src/sender/dataFetchers/getFromQueue.py
+++ b/src/sender/dataFetchers/getFromQueue.py
@@ -73,19 +73,22 @@ def getMetadata (log, metadata, chunkSize, localTarget = None):
 
 
 def sendData (log, targets, sourceFile, metadata, openConnections, context, prop):
+
     #reading source file into memory
     try:
-        log.debug("Getting '" + str(sourceFile) + "'...")
+        log.debug("Getting data out of queue for file '" + str(sourceFile) + "'...")
         data = prop["socket"].recv()
     except:
-        log.error("Unable to get source file '" + str(sourceFile) + "'.", exc_info=True)
+        log.error("Unable to get data out of queue for file '" + str(sourceFile) + "'", exc_info=True)
         raise
 
-    chunkSize = metadata[ "chunkSize" ]
+    try:
+        chunkSize = metadata[ "chunkSize" ]
+    except:
+        log.error("Unable to get chunkSize", exc_info=True)
 
-    #send message
     try:
-        log.debug("Passing multipart-message for file " + str(sourceFile) + "...")
+        log.debug("Packing multipart-message for file " + str(sourceFile) + "...")
         chunkNumber = 0
 
         #assemble metadata for zmq-message
@@ -96,8 +99,11 @@ def sendData (log, targets, sourceFile, metadata, openConnections, context, prop
         payload = []
         payload.append(metadataExtended)
         payload.append(data)
+    except:
+        log.error("Unable to pack multipart-message for file " + str(sourceFile), exc_info=True)
 
-        # streaming data
+    #send message
+    try:
         for target, prio in targets:
 
             # send data to the data stream to store it in the storage system
diff --git a/test/API/test_dataIngest.py b/test/API/test_dataIngest.py
index af8e8422..7916914d 100644
--- a/test/API/test_dataIngest.py
+++ b/test/API/test_dataIngest.py
@@ -101,7 +101,7 @@ class Receiver(threading.Thread):
                 self.context.destroy()
                 self.context = None
         except:
-            self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
+            logging.error("closing ZMQ Sockets...failed.", exc_info=True)
 
 
 
diff --git a/test/API/test_dataIngestWithNexusTransfer.py b/test/API/test_dataIngestWithNexusTransfer.py
new file mode 100644
index 00000000..1458d9c3
--- /dev/null
+++ b/test/API/test_dataIngestWithNexusTransfer.py
@@ -0,0 +1,169 @@
+import os
+import sys
+import time
+import zmq
+import logging
+import threading
+import cPickle
+
+BASE_PATH   = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) )
+API_PATH    = BASE_PATH + os.sep + "APIs"
+SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
+
+if not API_PATH in sys.path:
+    sys.path.append ( API_PATH )
+del API_PATH
+
+from dataIngestAPI import dataIngest
+from nexusTransferAPI import nexusTransfer
+
+if not SHARED_PATH in sys.path:
+    sys.path.append ( SHARED_PATH )
+del SHARED_PATH
+
+import helpers
+
+#enable logging
+logfilePath = os.path.join(BASE_PATH + os.sep + "logs")
+logfile     = os.path.join(logfilePath, "testDataIngestWithNexusTransfer.log")
+helpers.initLogging(logfile, True, "DEBUG")
+
+
+print
+print "==== TEST: data ingest together with nexus transfer ===="
+print
+
+
+class ZmqDataManager(threading.Thread):
+    def __init__(self):
+        self.extHost      = "0.0.0.0"
+        self.localhost    = "localhost"
+        self.eventPort    = "50003"
+        self.dataInPort   = "50010"
+        self.dataOutPort  = "50100"
+
+        self.log          = logging.getLogger("ZmqDataManager")
+
+        self.context      = zmq.Context()
+
+        self.eventSocket  = self.context.socket(zmq.PULL)
+        connectionStr     = "tcp://" + str(self.extHost) + ":" + str(self.eventPort)
+        self.eventSocket.bind(connectionStr)
+        self.log.info("eventSocket started (bind) for '" + connectionStr + "'")
+
+        self.dataInSocket  = self.context.socket(zmq.PULL)
+        connectionStr      = "tcp://" + str(self.extHost) + ":" + str(self.dataInPort)
+        self.dataInSocket.bind(connectionStr)
+        self.log.info("dataInSocket started (bind) for '" + connectionStr + "'")
+
+        self.dataOutSocket = self.context.socket(zmq.PUSH)
+        connectionStr      = "tcp://" + str(self.localhost) + ":" + str(self.dataOutPort)
+        self.dataOutSocket.connect(connectionStr)
+        self.log.info("dataOutSocket started (connect) for '" + connectionStr + "'")
+
+        threading.Thread.__init__(self)
+
+
+    def run(self):
+        for i in range(6):
+            try:
+                metadata = self.eventSocket.recv()
+
+                if metadata == b"CLOSE_FILE":
+                    self.log.debug("eventSocket recv: " + metadata)
+                    dataMessage = [metadata, "0/1"]
+                else:
+                    self.log.debug("eventSocket recv: " + str(cPickle.loads(metadata)))
+
+                    data = self.dataInSocket.recv()
+                    self.log.debug("dataSocket recv: " + data)
+
+                    dataMessage = [metadata, data]
+
+                self.dataOutSocket.send_multipart(dataMessage)
+                self.log.debug("Send")
+            except:
+                self.log.error("Error in run", exc_info=True)
+
+
+    def stop(self):
+        try:
+            if self.eventSocket:
+                self.log.info("closing eventSocket...")
+                self.eventSocket.close(linger=0)
+                self.eventSocket = None
+            if self.dataInSocket:
+                self.log.info("closing dataInSocket...")
+                self.dataInSocket.close(linger=0)
+                self.dataInSocket = None
+            if self.dataOutSocket:
+                self.log.info("closing dataOutSocket...")
+                self.dataOutSocket.close(linger=0)
+                self.dataOutSocket = None
+            if self.context:
+                self.log.info("destroying context...")
+                self.context.destroy()
+                self.context = None
+        except:
+            self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
+
+
+def runDataIngest(numbToSend):
+    dI = dataIngest(useLog = True)
+
+    dI.createFile("1.h5")
+
+    for i in range(numbToSend):
+        try:
+            data = "THISISTESTDATA-" + str(i)
+            dI.write(data)
+            logging.info("write")
+
+        except:
+            logging.error("break", exc_info=True)
+            break
+
+    dI.closeFile()
+
+    dI.stop()
+
+
+
+
+def runNexusTransfer(numbToRecv):
+    nT = nexusTransfer(useLog = True)
+
+    # number to receive + open signal + close signal
+    for i in range(numbToRecv + 2):
+        try:
+            data = nT.read()
+            logging.info("Retrieved: " + str(data))
+        except:
+            logging.error("break", exc_info=True)
+            break
+
+    nT.stop()
+
+
+zmqDataManagerThread = ZmqDataManager()
+zmqDataManagerThread.start()
+
+number = 5
+
+runDataIngestThread = threading.Thread(target=runDataIngest, args = (number, ))
+runNexusTransferThread = threading.Thread(target=runNexusTransfer, args = (number, ))
+
+runDataIngestThread.start()
+runNexusTransferThread.start()
+
+
+runDataIngestThread.join()
+runNexusTransferThread.join()
+zmqDataManagerThread.stop()
+
+print
+print "==== TEST END: data ingest together with nexus transfer ===="
+print
+
+
+
diff --git a/test/API/test_nexusTransfer.py b/test/API/test_nexusTransferAPI.py
similarity index 97%
rename from test/API/test_nexusTransfer.py
rename to test/API/test_nexusTransferAPI.py
index 4ba83d4f..2fb4a4cf 100644
--- a/test/API/test_nexusTransfer.py
+++ b/test/API/test_nexusTransferAPI.py
@@ -90,7 +90,7 @@ class Sender(threading.Thread):
                 self.context = None
                 logging.info("Destroying context...done")
         except:
-            self.log.error("Closing ZMQ Sockets...failed.", exc_info=True)
+            logging.error("Closing ZMQ Sockets...failed.", exc_info=True)
 
 
 
-- 
GitLab