Skip to content
Snippets Groups Projects
Commit 13752ad8 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Enabled modularity for Data Dispatcher

parent e452a384
No related branches found
No related tags found
No related merge requests found
from __builtin__ import open, type
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import zmq
......@@ -15,12 +13,17 @@ try:
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
except:
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) )))
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
del SHARED_PATH
DATAFETCHER_PATH = BASE_PATH + os.sep + "src" + os.sep + "sender" + os.sep + "dataFetchers"
if not DATAFETCHER_PATH in sys.path:
sys.path.append ( DATAFETCHER_PATH )
del DATAFETCHER_PATH
import helpers
#
......@@ -33,6 +36,8 @@ class DataDispatcher():
self.id = id
self.log = self.getLogger(logQueue)
supportedDataFetchers = ["getFromFile"]
self.log.debug("DataDispatcher Nr. " + str(self.id) + " started.")
self.localhost = "127.0.0.1"
......@@ -57,6 +62,13 @@ class DataDispatcher():
self.createSockets()
if dataFetcher in supportedDataFetchers:
self.log.info("Loading data Fetcher: " + getFromFile)
self.dataFetcher = __import__(dataFetcher)
else:
raise Exception("DataFetcher type " + dataFetcher + " not supported")
try:
self.process()
except KeyboardInterrupt:
......@@ -119,7 +131,8 @@ class DataDispatcher():
# get metadata of the file
try:
self.log.debug("Getting file metadata")
sourceFile, targetFile, metadata = self.getMetadata(workload)
sourceFile, targetFile, metadata = self.dataFetcher.getMetadata(self.log, workload, self.chunkSize, self.localTarget)
except:
self.log.error("Building of metadata dictionary failed for workload: " + str(workload) + ".", exc_info=True)
#skip all further instructions and continue with next iteration
......@@ -128,9 +141,9 @@ class DataDispatcher():
# send data
if targets:
try:
self.sendData(targets, sourceFile, metadata)
self.dataFetcher.sendData(self.log, targets, sourceFile, metadata, self.openConnections, self.context)
except:
self.log.error("DataDispatcher-"+str(self.id) + ": Passing new file to data Stream...failed.", exc_info=True)
self.log.error("DataDispatcher-"+str(self.id) + ": Passing new file to data stream...failed.", exc_info=True)
# remove file
try:
......@@ -161,178 +174,19 @@ class DataDispatcher():
# self.log.error("Unable to notify Cleaner-pipe to handle file: " + str(workload), exc_info=True)
#
def getMetadata(self, metadata):
#extract fileEvent metadata
try:
#TODO validate metadata dict
filename = metadata["filename"]
sourcePath = metadata["sourcePath"]
relativePath = metadata["relativePath"]
except:
self.log.error("Invalid fileEvent message received.", exc_info=True)
self.log.debug("metadata=" + str(metadata))
#skip all further instructions and continue with next iteration
raise
# filename = "img.tiff"
# filepath = "C:\dir"
#
# --> sourceFilePathFull = 'C:\\dir\img.tiff'
sourceFilePath = os.path.normpath(sourcePath + os.sep + relativePath)
sourceFilePathFull = os.path.join(sourceFilePath, filename)
#TODO combine better with sourceFile... (for efficiency)
if self.localTarget:
targetFilePath = os.path.normpath(self.localTarget + os.sep + relativePath)
targetFilePathFull = os.path.join(targetFilePath, filename)
else:
targetFilePathFull = None
try:
#for quick testing set filesize of file as chunksize
self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...")
filesize = os.path.getsize(sourceFilePathFull)
fileModificationTime = os.stat(sourceFilePathFull).st_mtime
chunksize = filesize #can be used later on to split multipart message
self.log.debug("filesize(%s) = %s" % (sourceFilePathFull, str(filesize)))
self.log.debug("fileModificationTime(%s) = %s" % (sourceFilePathFull, str(fileModificationTime)))
except:
self.log.error("Unable to create metadata dictionary.", exc_info=True)
raise
#build payload for message-pipe by putting source-file into a message
try:
self.log.debug("create metadata for source file...")
#metadata = {
# "filename" : filename,
# "sourcePath" : sourcePath,
# "relativePath" : relativePath,
# "filesize" : filesize,
# "fileModificationTime" : fileModificationTime,
# "chunkSize" : self.zmqMessageChunkSize
# }
metadata[ "filesize" ] = filesize
metadata[ "fileModificationTime" ] = fileModificationTime
metadata[ "chunkSize" ] = self.chunkSize
self.log.debug("metadata = " + str(metadata))
except:
self.log.error("Unable to assemble multi-part message.", exc_info=True)
raise
return sourceFilePathFull, targetFilePathFull, metadata
def sendData(self, targets, sourceFilepath, metadata):
#reading source file into memory
try:
self.log.debug("Opening '" + str(sourceFilepath) + "'...")
fileDescriptor = open(str(sourceFilepath), "rb")
except:
self.log.error("Unable to read source file '" + str(sourceFilepath) + "'.", exc_info=True)
raise
#send message
try:
self.log.debug("Passing multipart-message for file " + str(sourceFilepath) + "...")
chunkNumber = 0
stillChunksToRead = True
while stillChunksToRead:
chunkNumber += 1
#read next chunk from file
fileContent = fileDescriptor.read(self.chunkSize)
#detect if end of file has been reached
if not fileContent:
stillChunksToRead = False
#as chunk is empty decrease chunck-counter
chunkNumber -= 1
break
#assemble metadata for zmq-message
chunkPayloadMetadata = metadata.copy()
chunkPayloadMetadata["chunkNumber"] = chunkNumber
chunkPayloadMetadataJson = cPickle.dumps(chunkPayloadMetadata)
chunkPayload = []
chunkPayload.append(chunkPayloadMetadataJson)
chunkPayload.append(fileContent)
# streaming data
for target, prio in targets:
# send data to the data stream to store it in the storage system
if prio == 0:
# socket already known
if target in self.openConnections:
tracker = self.openConnections[target].send_multipart(chunkPayload, copy=False, track=True)
self.log.info("Sending message part from file " + str(sourceFilepath) + " to '" + target + "' with priority " + str(prio) )
else:
# open socket
socket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
socket.connect(connectionStr)
self.log.info("Start socket (connect): '" + str(connectionStr) + "'")
# register socket
self.openConnections[target] = socket
# send data
tracker = self.openConnections[target].send_multipart(chunkPayload, copy=False, track=True)
self.log.info("Sending message part from file " + str(sourceFilepath) + " to '" + target + "' with priority " + str(prio) )
# socket not known
if not tracker.done:
self.log.info("Message part from file " + str(sourceFilepath) + " has not been sent yet, waiting...")
tracker.wait()
self.log.info("Message part from file " + str(sourceFilepath) + " has not been sent yet, waiting...done")
else:
# socket already known
if target in self.openConnections:
# send data
self.openConnections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
self.log.info("Sending message part from file " + str(sourceFilepath) + " to " + target)
# socket not known
else:
# open socket
socket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
socket.connect(connectionStr)
self.log.info("Start socket (connect): '" + str(connectionStr) + "'")
# register socket
self.openConnections[target] = socket
# send data
self.openConnections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
self.log.info("Sending message part from file " + str(sourceFilepath) + " to " + target)
#close file
fileDescriptor.close()
self.log.debug("Passing multipart-message for file " + str(sourceFilepath) + "...done.")
except:
self.log.error("Unable to send multipart-message for file " + str(sourceFilepath), exc_info=True)
def stop(self):
self.log.debug("Closing sockets for DataDispatcher-" + str(self.id))
for connection in self.openConnections:
if self.openConnections[connection]:
self.openConnections[connection].close(0)
self.openConnections[connection] = None
self.routerSocket.close(0)
self.routerSocket = None
if not self.extContext:
if self.routerSocket:
self.routerSocket.close(0)
self.routerSocket = None
if not self.extContext and self.context:
self.log.debug("Destroying context")
self.context.destroy()
self.context = None
def __exit__(self):
......
......@@ -90,7 +90,7 @@ class TaskProvider():
self.log.error("Type of event detector is not supported: " + str( self.config["eventDetectorType"] ))
return -1
self.eventDetector = EventDetector(self.config, logQueue)
self.eventDetector = EventDetector(self.config, logQueue)
self.createSockets()
......
......@@ -34,7 +34,7 @@ def getMetadata (log, metadata, chunkSize, localTarget = None):
#TODO combine better with sourceFile... (for efficiency)
if localTarget:
targetFilePath = os.path.normpath(self.localTarget + os.sep + relativePath)
targetFilePath = os.path.normpath(localTarget + os.sep + relativePath)
targetFile = os.path.join(targetFilePath, filename)
else:
targetFile = None
......@@ -223,7 +223,7 @@ if __name__ == '__main__':
copyfile(prework_sourceFile, prework_targetFile)
time.sleep(0.5)
metadata = {
workload = {
"sourcePath" : BASE_PATH + os.sep +"data" + os.sep + "source",
"relativePath": os.sep + "local" + os.sep + "raw",
"filename" : "100.cbf"
......@@ -237,7 +237,7 @@ if __name__ == '__main__':
logging.debug("openConnections before function call: " + str(openConnections))
sourceFile, targetFile, metadata = getMetadata (logging, metadata, chunkSize, localTarget = None)
sourceFile, targetFile, metadata = getMetadata (logging, workload, chunkSize, localTarget = None)
sendData(logging, targets, sourceFile, metadata, openConnections, context)
logging.debug("openConnections after function call: " + str(openConnections))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment