Newer
Older
import os
import sys
import time
import zmq
import logging
import threading
import cPickle
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) )
API_PATH = BASE_PATH + os.sep + "APIs"
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
if not API_PATH in sys.path:
sys.path.append ( API_PATH )
del API_PATH
from dataIngestAPI import dataIngest
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
import helpers
#enable logging
logfilePath = os.path.join(BASE_PATH + os.sep + "logs")
logfile = os.path.join(logfilePath, "testDataIngestAPI.log")
helpers.initLogging(logfile, True, "DEBUG")
del BASE_PATH
print
print "==== TEST: data ingest ===="
print
class Receiver(threading.Thread):
def __init__(self, context = None):
self.signalPort = "50050"
self.eventPort = "50003"
self.dataPort = "50100"
if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
self.signalSocket = self.context.socket(zmq.REP)
connectionStr = "tcp://" + str(self.extHost) + ":" + str(self.signalPort)
self.signalSocket.bind(connectionStr)
logging.info("signalSocket started (bind) for '" + connectionStr + "'")
self.eventSocket = self.context.socket(zmq.PULL)
connectionStr = "tcp://" + str(self.extHost) + ":" + str(self.eventPort)
self.eventSocket.bind(connectionStr)
logging.info("eventSocket started (bind) for '" + connectionStr + "'")
self.dataSocket = self.context.socket(zmq.PULL)
connectionStr = "tcp://" + str(self.extHost) + ":" + str(self.dataPort)
self.dataSocket.bind(connectionStr)
logging.info("dataSocket started (bind) for '" + connectionStr + "'")
threading.Thread.__init__(self)
def run(self):
message = self.signalSocket.recv()
logging.debug("signalSocket recv: " + message)
self.signalSocket.send(message)
logging.debug("signalSocket send: " + message)
for i in range(5):
# logging.debug("eventSocket recv: " + str(cPickle.loads(self.eventSocket.recv())))
logging.debug("eventSocket recv: " + self.eventSocket.recv())
logging.debug("dataSocket recv: " + self.dataSocket.recv())
message = self.signalSocket.recv()
logging.debug("signalSocket recv: " + message)
self.signalSocket.send(message)
logging.debug("signalSocket send: " + message)
logging.debug("eventSocket recv: " + self.eventSocket.recv())
def stop(self):
try:
if self.signalSocket:
logging.info("closing signalSocket...")
self.signalSocket.close(linger=0)
self.signalSocket = None
if self.eventSocket:
logging.info("closing eventSocket...")
self.eventSocket.close(linger=0)
self.eventSocket = None
if self.dataSocket:
logging.info("closing dataSocket...")
self.dataSocket.close(linger=0)
self.dataSocket = None
if self.context:
logging.info("destroying context...")
self.context.destroy()
self.context = None
logging.error("closing ZMQ Sockets...failed.", exc_info=True)
if not self.extContext and self.context:
try:
self.log.info("Closing ZMQ context...")
self.context.destroy(0)
self.context = None
self.log.info("Closing ZMQ context...done.")
except:
self.log.error("Closing ZMQ context...failed.", exc_info=True)
receiverThread = Receiver(context)
receiverThread.start()
obj = dataIngest(useLog = True, context = context)
obj.createFile("1.h5")
for i in range(5):
try:
data = "asdfasdasdfasd"
obj.write(data)
print "write"
except:
logging.error("break", exc_info=True)
break
try:
obj.closeFile()
except:
logging.error("Failed to close file", exc_info=True)