Skip to content
Snippets Groups Projects
Commit 456465b6 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added fixed streaming option

parent b10d29ec
No related branches found
No related tags found
No related merge requests found
# Directory you want to monitor for changes
# Inside this directory only the subdirectories "commissioning", "current" and "local" are monitored
monitoredDir = /space/projects/live-viewer/data/source
#monitoredDir = /home/kuhnm/Arbeit/live-viewer/data/source
#monitoredDir = /rd
# Target to move the files into
localTarget = /space/projects/live-viewer/data/target
#localTarget = /home/kuhnm/Arbeit/live-viewer/data/target
#localTarget = /gpfs
# Event type of files to be monitored (options are: "IN_CLOSE_WRITE", "IN_MOVED_TO", ...)
monitoredEventType = IN_CLOSE_WRITE
#monitoredEventType = IN_MOVED_TO
# Subdirectories of watchDir to be monitored
monitoredSubdirs = ["commissioning", "current", "local"]
# The formats to be monitored, files in an other format will be be neglected
monitoredFormats = [".tif", ".cbf"]
# List of hosts allowed to connect
whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "haspp11eval01", "it-hpc-cxi03"]
#whitelist = ["localhost", "haspp11eval01", "it-hpc-cxi03"]
# Number of parallel data streams
# if this number is modifified, the port numbers also have to be adjusted
parallelDataStreams = 1
# Enable ZMQ pipe into storage system (if set to False: the file is moved into the localTarget)
useDataStream = True
# Fixed host to send the data to with highest priority
fixedStreamHost = zitpcx19282
# Fixed Port to send the data to with highest priority
fixedStreamPort = 6006
# Port number to receive signals from
comPort = 50000
# ZMQ port to forward requests
requestFwPort = 50001
# ZMQ port to get new requests
requestPort = 50100
# ZMQ-router port which coordinates the load-balancing to the worker-processes
routerPort = 50002
# ZMQ-pull-socket port which deletes/moves given files
cleanerPort = 50003
# Chunk size of file-parts getting send via zmq
chunkSize = 10485760 ; # = 1024*1024*10
#chunkSize = 1073741824 ; # = 1024*1024*1024
# Path where the logfile will be created
logfilePath = /space/projects/live-viewer/logs
#logfilePath = /home/kuhnm/Arbeit/live-viewer/logs
#logfilePath = /home/p11user/live-viewer/logs
# Filename used for logging
logfileName = zmq_sender.log
This diff is collapsed.
......@@ -8,6 +8,7 @@ import sys
import logging
import traceback
import cPickle
import shutil
#path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
SHARED_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) + os.sep + "shared"
......@@ -24,7 +25,7 @@ import helperScript
#
class DataDispatcher():
def __init__(self, id, routerPort, chunkSize, useDataStream, context = None):
def __init__(self, id, routerPort, chunkSize, fixedStreamId, localTarget = None, context = None):
self.log = self.getLogger()
self.id = id
......@@ -38,6 +39,9 @@ class DataDispatcher():
self.routerSocket = None
self.fixedStreamId = fixedStreamId
self.localTarget = localTarget
# dict with informations of all open sockets to which a data stream is opened (host, port,...)
self.openConnections = dict()
......@@ -99,26 +103,32 @@ class DataDispatcher():
self.log.debug("DataDispatcher-" + str(self.id) + ": waiting for new job")
message = self.routerSocket.recv_multipart()
self.log.debug("DataDispatcher-" + str(self.id) + ": new job received")
self.log.debug("message = " + str(message))
if len(message) >= 2:
workload = cPickle.loads(message[0])
targets = cPickle.loads(message[1])
# sort the target list by thge priority
if self.fixedStreamId:
targets.insert(0,[self.fixedStreamId, 0])
# sort the target list by the priority
targets = sorted(targets, key=lambda target: target[1])
else:
workload = cPickle.loads(message)
targets = None
finished = workload == b"EXIT"
finished = message[0] == b"EXIT"
if finished:
self.log.debug("Router requested to shutdown DataDispatcher-"+ str(self.id) + ".")
break
workload = cPickle.loads(message[0])
if self.fixedStreamId:
targets = [[self.fixedStreamId, 0]]
else:
targets = None
# get metadata of the file
try:
self.log.debug("Getting file metadata")
sourceFile, metadata = self.getMetadata(workload)
sourceFile, targetFile, metadata = self.getMetadata(workload)
except Exception as e:
self.log.error("Building of metadata dictionary failed for workload: " + str(workload) + ".")
self.log.debug("Error was: " + str(e))
......@@ -193,6 +203,13 @@ class DataDispatcher():
sourceFilePath = os.path.normpath(sourcePath + os.sep + relativePath)
sourceFilePathFull = os.path.join(sourceFilePath, filename)
#TODO combine better with sourceFile... (for efficiency)
if self.localTarget:
targetFilePath = os.path.normpath(self.localTarget + os.sep + relativePath)
targetFilePathFull = os.path.join(targetFilePath, filename)
else:
targetFilePathFull = None
try:
#for quick testing set filesize of file as chunksize
self.log.debug("get filesize for '" + str(sourceFilePathFull) + "'...")
......@@ -228,7 +245,7 @@ class DataDispatcher():
self.log.debug("Error was: " + str(e))
raise Exception(e)
return sourceFilePathFull, metadata
return sourceFilePathFull, targetFilePathFull, metadata
def sendData(self, targets, sourceFilepath, metadata):
......@@ -388,9 +405,12 @@ if __name__ == '__main__':
receivingPort = "6005"
receivingPort2 = "6006"
chunkSize = 10485760 ; # = 1024*1024*10 = 10 MiB
useDataStream = False
dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, routerPort, chunkSize, useDataStream) )
localTarget = BASE_PATH + "/data/target"
fixedStreamId = False
fixedStreamId = "zitpcx19282:6006"
dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, routerPort, chunkSize, fixedStreamId, localTarget) )
dataDispatcherPr.start()
context = zmq.Context.instance()
......@@ -419,6 +439,7 @@ if __name__ == '__main__':
targets = [['zitpcx19282:6005', 1], ['zitpcx19282:6006', 0]]
message = [ cPickle.dumps(metadata), cPickle.dumps(targets) ]
# message = [ cPickle.dumps(metadata)]
time.sleep(1)
......
......@@ -24,8 +24,7 @@ import helperScript
#
class SignalHandler():
def __init__ (self, whiteList,
comPort, signalFwPort, requestPort,
def __init__ (self, whiteList, comPort, signalFwPort, requestPort,
context = None):
# to get the logging only handling this class
......@@ -174,20 +173,12 @@ class SignalHandler():
incomingMessage = self.requestSocket.recv_multipart()
self.log.debug("Received request: " + str(incomingMessage) )
# print "allowedQueries", self.allowedQueries
for index in range(len(self.allowedQueries)):
# no double requests from the same socket TODO do we want that?
# print "allowedQueries[", index, "]", self.allowedQueries[index]
# print "openRequVari", self.openRequVari
# print "incomingMessage", incomingMessage[1]
for i in range(len(self.allowedQueries[index])):
if incomingMessage[1] == self.allowedQueries[index][i][0]:
self.openRequVari[index].append(self.allowedQueries[index][i])
self.log.debug("Add to openRequVari: " + str(self.allowedQueries[index][i]) )
# print "openRequVari", self.openRequVari
def checkSignal (self, incomingMessage):
......@@ -379,10 +370,6 @@ if __name__ == '__main__':
self.requestFwSocket.close(0)
self.context.destroy()
# def __del__(self):
# self.requestFwSocket.close(0)
# self.context.destroy()
helperScript.initLogging("/space/projects/live-viewer/logs/signalHandler.log", verbose=True, onScreenLogLevel="debug")
......
......@@ -97,9 +97,9 @@ class TaskProvider():
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.routerPort )
try:
self.routerSocket.bind(connectionStr)
self.log.info("Start to routeributing socket (bind): '" + str(connectionStr) + "'")
self.log.info("Start to router socket (bind): '" + str(connectionStr) + "'")
except Exception as e:
self.log.error("Failed to start routeributing Socket (bind): '" + connectionStr + "'")
self.log.error("Failed to start router Socket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment