Skip to content
Snippets Groups Projects
Commit 3cf779fc authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed API for streams

parent e81de41a
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,7 @@ import json
import errno
import os
import cPickle
import traceback
class dataTransfer():
......@@ -17,7 +18,8 @@ class dataTransfer():
externalContext = True
signalHost = None
signalPort = None
signalPort = "50000"
requestPort = "50001"
dataIp = None
dataHost = None
dataPort = None
......@@ -33,10 +35,11 @@ class dataTransfer():
connectionType = None
supportedConnections = ["stream", "queryNext"]
signalPort_data = "50000"
streamStarted = None
queryStarted = None
signalExchanged = False
streamStarted = None
queryNextStarted = None
socketResponseTimeout = None
......@@ -47,14 +50,17 @@ class dataTransfer():
self.log = logging.getLogger("dataTransferAPI")
else:
class loggingFunction:
def out(self, x):
print x
def out(self, x, exc_info = None):
if exc_info:
print x, traceback.format_exc()
else:
print x
def __init__(self):
self.debug = lambda x: self.out(x)
self.info = lambda x: self.out(x)
self.warning = lambda x: self.out(x)
self.error = lambda x: self.out(x)
self.critical = lambda x: self.out(x)
self.debug = lambda x, exc_info=None: self.out(x, exc_info)
self.info = lambda x, exc_info=None: self.out(x, exc_info)
self.warning = lambda x, exc_info=None: self.out(x, exc_info)
self.error = lambda x, exc_info=None: self.out(x, exc_info)
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
self.log = loggingFunction()
......@@ -89,10 +95,10 @@ class dataTransfer():
signal = None
# Signal exchange
if self.connectionType == "stream":
signalPort = self.signalPort_data
signalPort = self.signalPort
signal = "START_STREAM"
elif self.connectionType == "queryNext":
signalPort = self.signalPort_data
signalPort = self.signalPort
signal = "START_QUERY_NEXT"
self.log.debug("Create socket for signal exchange...")
......@@ -104,18 +110,20 @@ class dataTransfer():
self.stop()
raise Exception("No host to send signal to specified." )
trg = []
self.targets = []
# [host, port, prio]
if len(targets) == 3 and type(targets[0]) != str and type(targets[1]) != str and type(targets[2]) != str:
trg = targets
if len(targets) == 3 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list:
host, port, prio = targets
self.targets = [[host + ":" + port, prio]]
# [[host, port, prio], ...]
else:
for socket in sockets:
for t in targets:
if type(socket) == list:
host, port, prio = socket
trg.append([host + ":" + port, prio])
host, port, prio = t
self.targets.append([host + ":" + port, prio])
else:
self.stop()
self.log.debug("targets=" + str(targets))
raise Exception("Argument 'targets' is of wrong format.")
# if type(dataPort) == list:
......@@ -163,8 +171,7 @@ class dataTransfer():
self.signalSocket.connect(connectionStr)
self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
......@@ -187,17 +194,15 @@ class dataTransfer():
self.log.debug("Signal: " + str(sendMessage))
try:
self.signalSocket.send_multipart(sendMessage)
except Exception as e:
self.log.error("Could not send signal")
self.log.info("Error was: " + str(e))
except:
self.log.error("Could not send signal", exc_info=True)
raise
message = None
try:
socks = dict(self.poller.poll(self.socketResponseTimeout))
except Exception as e:
self.log.error("Could not poll for new message")
self.log.info("Error was: " + str(e))
except:
self.log.error("Could not poll for new message", exc_info=True)
raise
......@@ -212,16 +217,15 @@ class dataTransfer():
self.log.error("KeyboardInterrupt: No message received")
self.stop()
raise
except Exception as e:
self.log.error("Could not receive answer to signal")
self.log.debug("Error was: " + str(e))
except:
self.log.error("Could not receive answer to signal", exc_info=True)
self.stop()
raise
return message
def start(self, dataPort = False):
def start(self, dataPort = False, requestHost = None):
# if not self.connectionType:
# raise Exception("No connection specified. Please initiate a connection first.")
......@@ -241,7 +245,12 @@ class dataTransfer():
else:
socketId = ip + ":" + str(dataPort)
elif len(self.targets) == 1:
socketId = self.targets[0] + ":" + self.targets[1]
host, port = self.targets[0][0].split(":")
# ipFromHost = socket.gethostbyaddr(host)[2]
# if len(ipFromHost) == 1:
# ip = ipFromHost[0]
socketId = ip + ":" + port
else:
raise Exception("Multipe possible ports. Please choose which one to use.")
......@@ -251,9 +260,8 @@ class dataTransfer():
try:
self.dataSocket.bind(connectionStr)
self.log.info("Socket of type " + self.connectionType + " started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start Socket of type " + self.connectionType + " (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
except:
self.log.error("Failed to start Socket of type " + self.connectionType + " (bind): '" + connectionStr + "'", exc_info=True)
if self.connectionType == "queryNext":
......@@ -264,9 +272,8 @@ class dataTransfer():
try:
self.requestSocket.connect(connectionStr)
self.log.info("Socket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start Socket of type " + self.connectionType + " (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
except:
self.log.error("Failed to start Socket of type " + self.connectionType + " (connect): '" + connectionStr + "'", exc_info=True)
self.queryNextStarted = socketId
else:
......@@ -292,7 +299,7 @@ class dataTransfer():
if not self.streamStarted and not self.queryNextStarted:
self.log.info("Could not communicate, no connection was initialized.")
return None
return None, None
if self.queryNextStarted :
......@@ -302,19 +309,17 @@ class dataTransfer():
try:
self.requestSocket.send_multipart(sendMessage)
except Exception as e:
self.log.info("Could not send request to requestSocket")
self.log.info("Error was: " + str(e))
return None
self.log.error("Could not send request to requestSocket", exc_info=True)
return None, None
try:
return self.__getMultipartMessage()
except KeyboardInterrupt:
self.log.debug("Keyboard interrupt detected. Stopping to receive.")
raise
except Exception as e:
self.log.error("Unknown error while receiving files. Need to abort.")
self.log.debug("Error was: " + str(e))
return None
except:
self.log.error("Unknown error while receiving files. Need to abort.", exc_info=True)
return None, None
def __getMultipartMessage(self):
......@@ -330,19 +335,17 @@ class dataTransfer():
try:
metadata = cPickle.loads(multipartMessage[0])
except:
self.log.error("Could not extract metadata from the multipart-message.")
self.log.debug("Error was:" + str(e))
self.log.error("Could not extract metadata from the multipart-message.", exc_info=True)
#TODO validate multipartMessage (like correct dict-values for metadata)
try:
payload = multipartMessage[1:]
except Exception as e:
self.log.warning("An empty file was received within the multipart-message")
self.log.debug("Error was:" + str(e))
except:
self.log.warning("An empty file was received within the multipart-message", exc_info=True)
payload = None
return [metadataDict, payload]
return [metadata, payload]
def store(self, targetBasePath, dataObject):
......@@ -369,14 +372,11 @@ class dataTransfer():
self.__appendChunksToFile(targetBasePath, payloadMetadata, payload)
self.log.debug("append to file based on multipart-message...success.")
except KeyboardInterrupt:
errorMessage = "KeyboardInterrupt detected. Unable to append multipart-content to file."
self.log.info(errorMessage)
self.log.info("KeyboardInterrupt detected. Unable to append multipart-content to file.")
break
except Exception, e:
errorMessage = "Unable to append multipart-content to file."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
self.log.debug("append to file based on multipart-message...failed.")
self.log.error("Unable to append multipart-content to file.", exc_info=True)
self.log.debug("Append to file based on multipart-message...failed.")
if len(payload) < payloadMetadata["chunkSize"] :
#indicated end of file. Leave loop
......@@ -388,9 +388,8 @@ class dataTransfer():
try:
[payloadMetadata, payload] = self.get()
except Exception as e:
self.log.error("Getting data failed.")
self.log.debug("Error was: " + str(e))
except:
self.log.error("Getting data failed.", exc_info=True)
break
......@@ -415,19 +414,14 @@ class dataTransfer():
os.makedirs(targetPath)
newFile = open(targetFilepath, "w")
self.log.info("New target directory created: " + str(targetPath))
except Exception, f:
errorMessage = "Unable to save payload to file: '" + targetFilepath + "'"
self.log.error(errorMessage)
self.log.debug("Error was: " + str(f))
except:
self.log.error("Unable to save payload to file: '" + targetFilepath + "'", exc_info=True)
self.log.debug("targetPath:" + str(targetPath))
raise Exception(errorMessage)
raise
else:
self.log.error("Failed to append payload to file: '" + targetFilepath + "'")
self.log.debug("Error was: " + str(e))
self.log.error("Failed to append payload to file: '" + targetFilepath + "'", exc_info=True)
except Exception, e:
self.log.error("Failed to append payload to file: '" + targetFilepath + "'")
self.log.debug("Error was: " + str(e))
self.log.debug("ErrorTyp: " + str(type(e)))
self.log.error("Failed to append payload to file: '" + targetFilepath + "'", exc_info=True)
self.log.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST))
#only write data if a payload exist
......@@ -436,11 +430,9 @@ class dataTransfer():
for chunk in payload:
newFile.write(chunk)
newFile.close()
except Exception, e:
errorMessage = "unable to append data to file."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
raise Exception(errorMessage)
except:
self.log.error("Unable to append data to file.", exc_info=True)
raise
def generateTargetFilepath(self, basePath, configDict):
......@@ -501,9 +493,8 @@ class dataTransfer():
self.log.info("closing dataSocket...")
self.dataSocket.close(linger=0)
self.dataSocket = None
except Exception as e:
self.log.error("closing ZMQ Sockets...failed.")
self.log.info("Error was: " + str(e))
except:
self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
# if the context was created inside this class,
# it has to be destroyed also within the class
......@@ -514,10 +505,8 @@ class dataTransfer():
self.context.destroy()
self.context = None
self.log.info("closing ZMQ context...done.")
except Exception as e:
self.log.error("closing ZMQ context...failed.")
self.log.debug("Error was: " + str(e))
self.log.debug(sys.exc_info())
except:
self.log.error("closing ZMQ context...failed.", exc_info=True)
def __exit__(self):
......
......@@ -263,16 +263,21 @@ class SignalHandler():
elif signal == "STOP_STREAM":
#FIXME
socketId = socketIds[0]
socketId = socketIds[0][0]
self.log.info("Received signal: " + signal + " to host " + str(socketId[0]))
if socketId in [i[0] for i in self.openRequPerm]:
# send signal back to receiver
self.sendResponse(signal)
self.log.debug("Send response back: " + str(signal))
self.openRequPerm.remove(socketId)
for element in self.openRequPerm:
if element[0] == socketId:
self.openRequPerm.remove(element)
else:
self.log.info("No connection to close was found for " + str(socketId))
self.log.debug("self.openReqPerm=" + str(self.openRequPerm))
self.sendResponse("NO_OPEN_CONNECTION_FOUND")
return
......
......@@ -214,10 +214,10 @@ class checkModTime(threading.Thread):
while True:
try:
# Open the urls in their own threads
self.log.debug("List to observe: " + str(eventListToObserve))
self.log.debug("eventMessageList: " + str(eventMessageList))
# self.log.debug("List to observe: " + str(eventListToObserve))
# self.log.debug("eventMessageList: " + str(eventMessageList))
self.pool.map(self.checkLastModified, eventListToObserve)
self.log.debug("eventMessageList: " + str(eventMessageList))
# self.log.debug("eventMessageList: " + str(eventMessageList))
time.sleep(2)
except:
break
......
import os
import sys
import time
import traceback
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) )
......@@ -14,28 +15,28 @@ del BASE_PATH
from dataTransferAPI import dataTransfer
signalHost = "zitpcx19282.desy.de"
dataPort = "50010"
dataPort = "50100"
print
print "==== TEST: Query for the newest filename ===="
print
query = dataTransfer("priorityStream", signalHost)
query = dataTransfer("stream")
query.initiate(dataPort)
query.start()
query.start(dataPort)
while True:
try:
[metadata, data] = query.get()
except:
except KeyboardInterrupt:
break
except Exception as e:
print "Getting data failed."
print "Error was: " + str(e)
break
print
print "metadata"
print metadata
print "metadata of file", metadata["filename"]
print "data", str(data)[:10]
print
......
......@@ -20,19 +20,19 @@ if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
import helperScript
import helpers
#enable logging
logfilePath = os.path.join(BASE_PATH + os.sep + "logs")
logfileFullPath = os.path.join(logfilePath, "testAPI.log")
helperScript.initLogging(logfileFullPath, True, "DEBUG")
logfile = os.path.join(logfilePath, "testAPI.log")
helpers.initLogging(logfile, True, "DEBUG")
del BASE_PATH
signalHost = "zitpcx19282.desy.de"
#signalHost = "zitpcx22614.desy.de"
dataPort = "50100"
targets = ["zitpcx19282.desy.de", "50100", 0]
print
print "==== TEST: Stream all files and store them ===="
......@@ -41,7 +41,7 @@ print
query = dataTransfer("stream", signalHost, useLog = True)
query.initiate(dataPort)
query.initiate(targets)
query.start()
......@@ -59,14 +59,15 @@ while True:
try:
query.store("/space/projects/live-viewer/data/target/testStore", result)
except Exception as e:
print e
print "Storing data failed."
print "Error was:", e
break
query.stop()
print
print "==== TEST END: Stream for all files ===="
print "==== TEST END: Stream all files and store them ===="
print
......
......@@ -16,7 +16,7 @@ from dataTransferAPI import dataTransfer
signalHost = "zitpcx19282.desy.de"
#signalHost = "zitpcx22614.desy.de"
dataPort = "50100"
targets = ["zitpcx19282.desy.de", "50101", 0]
print
print "==== TEST: Stream all files ===="
......@@ -25,7 +25,7 @@ print
query = dataTransfer("stream", signalHost)
query.initiate(dataPort)
query.initiate(targets)
query.start()
......@@ -37,8 +37,7 @@ while True:
break
print
print "metadata"
print metadata
print "metadata", metadata["filename"]
# print "data", str(data)[:10]
print
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment