Commit d78bd5f1 authored by Manuela Kuhn's avatar Manuela Kuhn

Merge branch 'release-2.3'

parents 28e31449 71d3c0b9
# API to communicate with a data transfer unit
__version__ = '2.2.0'
__version__ = '2.3.0'
import zmq
import socket
......@@ -147,26 +147,7 @@ class dataTransfer():
self.stop()
raise ConnectionFailed("No host to send signal to specified." )
self.targets = []
# [host, port, prio]
if len(targets) == 3 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list:
host, port, prio = targets
self.targets = [[host + ":" + port, prio]]
# [[host, port, prio], ...]
else:
for t in targets:
if type(t) == list and len(t) == 3:
host, port, prio = t
self.targets.append([host + ":" + port, prio])
else:
self.stop()
self.log.debug("targets=" + str(targets))
raise FormatError("Argument 'targets' is of wrong format.")
# if type(dataPort) == list:
# self.dataHost = str([socket.gethostname() for i in dataPort])
# else:
# self.dataHost = socket.gethostname()
self.__setTargets (targets)
message = self.__sendSignal(signal)
......@@ -215,6 +196,33 @@ class dataTransfer():
self.poller.register(self.signalSocket, zmq.POLLIN)
def __setTargets (self, targets):
self.targets = []
# [host, port, prio]
if len(targets) == 3 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list:
host, port, prio = targets
self.targets = [[host + ":" + port, prio, [""]]]
# [host, port, prio, suffixes]
elif len(targets) == 4 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list and type(targets[3]) == list:
host, port, prio, suffixes = targets
self.targets = [[host + ":" + port, prio, suffixes]]
# [[host, port, prio], ...] or [[host, port, prio, suffixes], ...]
else:
for t in targets:
if type(t) == list and len(t) == 3:
host, port, prio = t
self.targets.append([host + ":" + port, prio, [""]])
elif type(t) == list and len(t) == 4 and type(t[3]):
host, port, prio, suffixes = t
self.targets.append([host + ":" + port, prio, suffixes])
else:
self.stop()
self.log.debug("targets=" + str(targets))
raise FormatError("Argument 'targets' is of wrong format.")
def __sendSignal (self, signal):
......@@ -628,6 +636,65 @@ class dataTransfer():
self.log.error("Closing ZMQ context...failed.", exc_info=True)
def forceStop (self, targets):
if type(targets) != list:
self.stop()
raise FormatError("Argument 'targets' must be list.")
if not self.context:
self.context = zmq.Context()
self.extContext = False
signal = None
# Signal exchange
if self.connectionType == "stream":
signalPort = self.signalPort
signal = "STOP_STREAM"
elif self.connectionType == "streamMetadata":
signalPort = self.signalPort
signal = "STOP_STREAM_METADATA"
elif self.connectionType == "queryNext":
signalPort = self.signalPort
signal = "STOP_QUERY_NEXT"
elif self.connectionType == "queryMetadata":
signalPort = self.signalPort
signal = "STOP_QUERY_METADATA"
self.log.debug("Create socket for signal exchange...")
if self.signalHost and not self.signalSocket:
self.__createSignalSocket(signalPort)
elif not self.signalHost:
self.stop()
raise ConnectionFailed("No host to send signal to specified." )
self.__setTargets (targets)
message = self.__sendSignal(signal)
if message and message == "VERSION_CONFLICT":
self.stop()
raise VersionError("Versions are conflicting.")
elif message and message == "NO_VALID_HOST":
self.stop()
raise AuthenticationFailed("Host is not allowed to connect.")
elif message and message == "CONNECTION_ALREADY_OPEN":
self.stop()
raise CommunicationFailed("Connection is already open.")
elif message and message == "NO_VALID_SIGNAL":
self.stop()
raise CommunicationFailed("Connection type is not supported for this kind of sender.")
# if there was no response or the response was of the wrong format, the receiver should be shut down
elif message and message.startswith(signal):
self.log.info("Received confirmation ...")
def __exit__ (self):
self.stop()
......
......@@ -3,10 +3,9 @@
#########################################
# Path where the logfile will be created
#logfilePath = /space/projects/zeromq-data-transfer/logs
logfilePath = /home/kuhnm/Arbeit/zeromq-data-transfer/logs
logfilePath = /space/projects/zeromq-data-transfer/logs
#logfilePath = /home/kuhnm/Arbeit/zeromq-data-transfer/logs
#logfilePath = /home/p11user/zeromq-data-transfer/logs
#logfilePath = /home/p11user/live-viewer/logs
# Filename used for logging
logfileName = dataManager.log
......@@ -57,17 +56,14 @@ fixSubdirs = ["commissioning", "current", "local"]
# Directory to be monitor for changes
# Inside this directory only the subdirectories "commissioning", "current" and "local" are monitored
# (needed if eventDetector is InotifyxDetector or WatchdogDetector)
#monitoredDir = /space/projects/zeromq-data-transfer/data/source
monitoredDir = /home/kuhnm/Arbeit/zeromq-data-transfer/data/source
monitoredDir = /space/projects/zeromq-data-transfer/data/source
#monitoredDir = /home/kuhnm/Arbeit/zeromq-data-transfer/data/source
#monitoredDir = /rd
# Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...)
# Event type of files (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...) and
# the formats to be monitored, files in an other format will be be neglected
# (needed if eventDetector is InotifyxDetector or WatchdogDetector)
monitoredEventType = IN_CLOSE_WRITE
#monitoredEventType = IN_MOVED_TO
# The formats to be monitored, files in an other format will be be neglected
# (needed if eventDetector is InotifyxDetector or WatchdogDetector)
monitoredFormats = [".tif", ".cbf"]
monitoredEvents = {"IN_CLOSE_WRITE" : [".tif", ".cbf", ".nxs"], "IN_MOVED_TO" : [".log"]}
# Number of events stored to look for doubles
# (needed if eventDetector is InotifyxDetector or HttpDetector)
......@@ -78,8 +74,12 @@ historySize = 0
# (needed if eventDetector is InotifyxDetector)
useCleanUp = False
# Intervall time (in seconds) used for clean up resp. checking of events
# (only needed if eventDetectorType is InotifyxDetector or WatchdogDetector)
actionTime = 150
# Time (in seconds) since last modification after which a file will be seen as closed
# (needed if eventDetector is WatchdogDetector)
# (needed if eventDetector is InotifyxDetector (for clean up) or WatchdogDetector)
timeTillClosed = 2
# ZMQ port to get events from
......@@ -110,7 +110,6 @@ dataFetcherType = getFromFile
dataFetcherPort = 50010
# Number of parallel data streams
# if this number is modifified, the port numbers also have to be adjusted
numberOfStreams = 1
# Enable ZMQ pipe into storage system (uses the fixedStreamHost and fixedStreamPort)
......@@ -129,8 +128,8 @@ chunkSize = 10485760 ; # = 1024*1024*10
routerPort = 50004
# Target to move the files into
#localTarget = /space/projects/zeromq-data-transfer/data/target
localTarget = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target
localTarget = /space/projects/zeromq-data-transfer/data/target
#localTarget = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target
#localTarget = /gpfs
# Flag describing if the data should be stored in localTarget
......@@ -138,5 +137,5 @@ localTarget = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target
storeData = True
# Flag describing if the files should be removed from the source
# (needed if dataFetcherType is getFromHttp)
# (needed if dataFetcherType is getFromFile or getFromHttp)
removeData = True
#Ignore everything in this directory
*
# Except this file
!.gitignore
#Ignore everything in this directory
*
# Except this file
!.gitignore
#Ignore everything in this directory
*
# Except this file
!.gitignore
#Ignore everything in this directory
*
# Except this file
!.gitignore
Zeromq Data Transfer 2.2.0
- Fixed problems that the dataTransferAPI could not received data
Zeromq Data Transfer 2.3.0
- Added method to dataTransferAPI to manually stop streams/queries
- Added option to specify which file formats to be send via zeromq
- Added option look for multiple event types in parallel (combined with file suffixes)
- DataManager can now be controlled via tango
- Added systemd service script
- Added cleanup arguments into config file
- Files get accessed only if data or metadata is send via zeromq
- Fixed DataReceiver (no shell)
- Added command line argument error handling
- Removed ringbuffer remains of old architecture
Zeromq Data Transfer 2.2.1
- Fixed data receiving problems with dataTransferAPI due to ZAP
Zeromq Data Transfer 2.2.0
......
#!/usr/bin/env python
import socket
import sys
port = 51000
msgs = [
'set localTarget /root/zeromq-data-transfer/data/target',
# 'set localTarget /space/projects/zeromq-data-transfer/data/target',
'get localTarget',
'set detectorDevice haspp06:10000/p06/eigerdectris/exp.01',
'set filewriterDevice haspp06:10000/p06/eigerfilewriter/exp.01',
'set historySize 0',
'set storeData True',
'set removeData True',
'set whitelist ["localhost","zitpcx19282"]',
'do start',
'do status',
'do stop',
# 'exit'
'bye'
]
#host = socket.gethostname()
host = "asap3-bl-prx07"
sckt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sckt.connect((host, port))
except Exception, e:
print "connect() failed", e
sckt.close()
sys.exit()
try:
for msg in msgs:
sckt.send(msg)
print "sent (len %2d): %s" % (len(msg), msg)
reply = sckt.recv(1024)
print "recv (len %2d): %s " % (len( reply), reply)
finally:
sckt.close()
This diff is collapsed.
......@@ -5,8 +5,9 @@ After=network-online.target
[Service]
TimeoutStartSec=0
WorkingDirectory=/space/projects/zeromq-data-transfer
ExecStart=/space/projects/zeromq-data-transfer/src/sender/DataManager.py --verbose
WorkingDirectory=/root/zeromq-data-transfer
User=root
ExecStart=/root/zeromq-data-transfer/src/sender/DataManager.py --verbose --configFile /root/zeromq-data-transfer/conf/%i.conf
[Install]
WantedBy=multi-user.target
......@@ -44,7 +44,7 @@ def argumentParsing():
whitelist = json.loads(config.get('asection', 'whitelist'))
except ValueError:
ldap_cn = config.get('asection', 'whitelist')
p = subprocess.Popen("ldapsearch -x -H ldap://it-ldap-slave.desy.de:1389 cn=" + ldap_cn + " -LLL", shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
p = subprocess.Popen(["ldapsearch", "-x", "-H ldap://it-ldap-slave.desy.de:1389", "cn=" + ldap_cn , "-LLL"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
lines = p.stdout.readlines()
matchHost = re.compile(r'nisNetgroupTriple: [(]([\w|\S|.]+),.*,[)]', re.M|re.I)
......
......@@ -156,6 +156,9 @@ class DataDispatcher():
self.log.debug("DataDispatcher-" + str(self.id) + ": waiting for new job")
socks = dict(self.poller.poll())
######################################
# messages from TaskProvider #
######################################
if self.routerSocket in socks and socks[self.routerSocket] == zmq.POLLIN:
try:
......@@ -173,7 +176,7 @@ class DataDispatcher():
targets = cPickle.loads(message[1])
if self.fixedStreamId:
targets.insert(0,[self.fixedStreamId, 0, "data"])
targets.insert(0,[self.fixedStreamId, 0, [""], "data"])
# sort the target list by the priority
targets = sorted(targets, key=lambda target: target[1])
......@@ -216,16 +219,16 @@ class DataDispatcher():
continue
elif self.fixedStreamId:
targets = [[self.fixedStreamId, 0, "data"]]
targets = [[self.fixedStreamId, 0, [""], "data"]]
else:
targets = []
# get metadata of the file
# get metadata and paths of the file
try:
self.log.debug("Getting file metadata")
sourceFile, targetFile, metadata = self.dataFetcher.getMetadata(self.log, workload, self.chunkSize, self.localTarget)
self.log.debug("Getting file paths and metadata")
sourceFile, targetFile, metadata = self.dataFetcher.getMetadata(self.log, self.dataFetcherProp, targets, workload, self.chunkSize, self.localTarget)
except:
self.log.error("Building of metadata dictionary failed for workload: " + str(workload) + ".", exc_info=True)
......@@ -242,6 +245,9 @@ class DataDispatcher():
self.dataFetcher.finishDataHandling(self.log, targets, sourceFile, targetFile, metadata, self.openConnections, self.context, self.dataFetcherProp)
######################################
# control commands #
######################################
if self.controlSocket in socks and socks[self.controlSocket] == zmq.POLLIN:
try:
......@@ -263,7 +269,7 @@ class DataDispatcher():
targets = cPickle.loads(message[1])
for socketId, prio in targets:
for socketId, prio, suffix in targets:
if self.openConnections.has_key(socketId):
self.log.info("Closing socket " + str(socketId))
if self.openConnections[socketId]:
......
This diff is collapsed.
......@@ -177,38 +177,48 @@ class SignalHandler():
while True:
socks = dict(self.poller.poll())
######################################
# incoming request from TaskProvider #
######################################
if self.requestFwSocket in socks and socks[self.requestFwSocket] == zmq.POLLIN:
try:
incomingMessage = self.requestFwSocket.recv()
self.log.debug("New request for signals received.")
openRequests = []
for requestSet in self.openRequPerm:
if requestSet:
index = self.openRequPerm.index(requestSet)
tmp = requestSet[self.nextRequNode[index]]
openRequests.append(copy.deepcopy(tmp))
# distribute in round-robin order
self.nextRequNode[index] = (self.nextRequNode[index] + 1) % len(requestSet)
for requestSet in self.openRequVari:
if requestSet:
tmp = requestSet.pop(0)
openRequests.append(tmp)
if openRequests:
self.requestFwSocket.send(cPickle.dumps(openRequests))
self.log.debug("Answered to request: " + str(openRequests))
else:
openRequests = ["None"]
self.requestFwSocket.send(cPickle.dumps(openRequests))
self.log.debug("Answered to request: " + str(openRequests))
incomingMessage = self.requestFwSocket.recv_multipart()
if incomingMessage[0] == "GET_REQUESTS":
self.log.debug("New request for signals received.")
filename = incomingMessage[1]
openRequests = []
for requestSet in self.openRequPerm:
if requestSet:
index = self.openRequPerm.index(requestSet)
tmp = requestSet[self.nextRequNode[index]]
# Check if filename suffix matches requested suffix
if filename.endswith(tuple(tmp[2])):
openRequests.append(copy.deepcopy(tmp))
# distribute in round-robin order
self.nextRequNode[index] = (self.nextRequNode[index] + 1) % len(requestSet)
for requestSet in self.openRequVari:
# Check if filename suffix matches requested suffix
if requestSet and filename.endswith(tuple(requestSet[0][2])):
tmp = requestSet.pop(0)
openRequests.append(tmp)
if openRequests:
self.requestFwSocket.send(cPickle.dumps(openRequests))
self.log.debug("Answered to request: " + str(openRequests))
else:
openRequests = ["None"]
self.requestFwSocket.send(cPickle.dumps(openRequests))
self.log.debug("Answered to request: " + str(openRequests))
except:
self.log.error("Failed to receive/answer new signal requests.", exc_info=True)
######################################
# start/stop command from external #
######################################
if self.comSocket in socks and socks[self.comSocket] == zmq.POLLIN:
incomingMessage = self.comSocket.recv_multipart()
......@@ -220,6 +230,9 @@ class SignalHandler():
else:
self.sendResponse(checkFailed)
######################################
# request from external #
######################################
if self.requestSocket in socks and socks[self.requestSocket] == zmq.POLLIN:
incomingMessage = self.requestSocket.recv_multipart()
......@@ -252,6 +265,9 @@ class SignalHandler():
self.log.info("Request not supported.")
######################################
# control commands from internal #
######################################
if self.controlSubSocket in socks and socks[self.controlSubSocket] == zmq.POLLIN:
try:
......@@ -564,7 +580,7 @@ class requestPuller():
self.log.info("[getRequests] Start run")
while True:
try:
self.requestFwSocket.send("")
self.requestFwSocket.send_multipart(["GET_REQUESTS"])
self.log.info("[getRequests] send")
requests = cPickle.loads(self.requestFwSocket.recv())
self.log.info("[getRequests] Requests: " + str(requests))
......
......@@ -178,12 +178,14 @@ class TaskProvider():
# get requests for this event
try:
self.log.debug("Get requests...")
self.requestFwSocket.send("")
self.requestFwSocket.send_multipart(["GET_REQUESTS", workload["filename"]])
requests = cPickle.loads(self.requestFwSocket.recv())
self.log.debug("Requests: " + str(requests))
except:
self.log.error("Get Requests... failed.", exc_info=True)
requests = ["None"]
# build message dict
try:
......@@ -295,9 +297,9 @@ class requestResponder():
def run (self):
hostname = socket.gethostname()
self.log.info("[requestResponder] Start run")
openRequests = [[hostname + ':6003', 1], [hostname + ':6004', 0]]
openRequests = [[hostname + ':6003', 1, [".cbf"]], [hostname + ':6004', 0, [".cbf"]]]
while True:
request = self.requestFwSocket.recv()
request = self.requestFwSocket.recv_multipart()
self.log.debug("[requestResponder] Received request: " + str(request) )
self.requestFwSocket.send(cPickle.dumps(openRequests))
......
......@@ -15,7 +15,8 @@ from send_helpers import __sendToTargets, DataHandlingError
def setup (log, prop):
if ( not prop.has_key("fixSubdirs") or
not prop.has_key("storeData") ):
not prop.has_key("storeData") or
not prop.has_key("removeData") ):
log.error ("Configuration of wrong format")
log.debug ("dataFetcherProp="+ str(prop))
......@@ -27,7 +28,7 @@ def setup (log, prop):
return True
def getMetadata (log, metadata, chunkSize, localTarget = None):
def getMetadata (log, prop, targets, metadata, chunkSize, localTarget = None):
#extract fileEvent metadata
try:
......@@ -55,51 +56,54 @@ def getMetadata (log, metadata, chunkSize, localTarget = None):
else:
targetFile = None
try:
# For quick testing set filesize of file as chunksize
log.debug("get filesize for '" + str(sourceFile) + "'...")
filesize = os.path.getsize(sourceFile)
fileModTime = os.stat(sourceFile).st_mtime
fileCreateTime = os.stat(sourceFile).st_ctime
chunksize = filesize #can be used later on to split multipart message
log.debug("filesize(%s) = %s" % (sourceFile, str(filesize)))
log.debug("fileModTime(%s) = %s" % (sourceFile, str(fileModTime)))
if targets:
try:
# For quick testing set filesize of file as chunksize
log.debug("get filesize for '" + str(sourceFile) + "'...")
filesize = os.path.getsize(sourceFile)
fileModTime = os.stat(sourceFile).st_mtime
fileCreateTime = os.stat(sourceFile).st_ctime
chunksize = filesize #can be used later on to split multipart message
log.debug("filesize(%s) = %s" % (sourceFile, str(filesize)))
log.debug("fileModTime(%s) = %s" % (sourceFile, str(fileModTime)))
except:
log.error("Unable to create metadata dictionary.")
raise
except:
log.error("Unable to create metadata dictionary.")
raise
try:
log.debug("create metadata for source file...")
#metadata = {
# "filename" : filename,
# "sourcePath" : sourcePath,
# "relativePath" : relativePath,
# "filesize" : filesize,
# "fileModTime" : fileModTime,
# "chunkSize" : self.zmqMessageChunkSize
# }
metadata[ "filesize" ] = filesize
metadata[ "fileModTime" ] = fileModTime
metadata[ "fileCreateTime"] = fileCreateTime
metadata[ "chunkSize" ] = chunkSize
log.debug("metadata = " + str(metadata))
except:
log.error("Unable to assemble multi-part message.")
raise
try:
log.debug("create metadata for source file...")
#metadata = {
# "filename" : filename,
# "sourcePath" : sourcePath,
# "relativePath" : relativePath,
# "filesize" : filesize,
# "fileModTime" : fileModTime,
# "chunkSize" : self.zmqMessageChunkSize
# }
metadata[ "filesize" ] = filesize
metadata[ "fileModTime" ] = fileModTime
metadata[ "fileCreateTime"] = fileCreateTime
metadata[ "chunkSize" ] = chunkSize
log.debug("metadata = " + str(metadata))
except:
log.error("Unable to assemble multi-part message.")
raise
return sourceFile, targetFile, metadata
return sourceFile, targetFile, metadata
else:
return sourceFile, targetFile, dict()
def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, context, prop):
targets_data = [i for i in targets if i[2] == "data"]
if not targets_data:
if not targets:
prop["removeFlag"] = True
return
targets_data = [i for i in targets if i[3] == "data"]
prop["removeFlag"] = False
chunkSize = metadata[ "chunkSize" ]
......@@ -196,9 +200,9 @@ def __dataHandling(log, sourceFile, targetFile, actionFunction, metadata, prop):
def finishDataHandling (log, targets, sourceFile, targetFile, metadata, openConnections, context, prop):
targets_metadata = [i for i in targets if i[2] == "metadata"]
targets_metadata = [i for i in targets if i[3] == "metadata"]
if prop["storeData"] and prop["removeFlag"]:
if prop["storeData"] and prop["removeData"] and prop["removeFlag"]:
# move file
try:
......@@ -207,15 +211,6 @@ def finishDataHandling (log, targets, sourceFile, targetFile, metadata, openConn
except:
return
#send message to metadata targets
if targets_metadata:
try:
__sendToTargets(log, targets_metadata, sourceFile, targetFile, openConnections, metadata, None, context, prop["timeout"])
log.debug("Passing metadata multipart-message for file " + str(sourceFile) + "...done.")
except:
log.error("Unable to send metadata multipart-message for file " + str(sourceFile), exc_info=True)
elif prop["storeData"]:
# copy file
......@@ -226,16 +221,7 @@ def finishDataHandling (log, targets, sourceFile, targetFile, metadata, openConn
except:
return
#send message to metadata targets
if targets_metadata:
try:
__sendToTargets(log, targets_metadata, sourceFile, targetFile, openConnections, metadata, None, context, prop["timeout"])
log.debug("Passing metadata multipart-message for file " + str(sourceFile) + "...done.")
except:
log.error("Unable to send metadata multipart-message for file " + str(sourceFile), exc_info=True)
elif prop["removeFlag"]:
elif prop["removeData"] and prop["removeFlag"]:
# remove file
try:
os.remove(sourceFile)
......@@ -245,14 +231,14 @@ def finishDataHandling (log, targets, sourceFile, targetFile, metadata, openConn
prop["removeFlag"] = False
#send message to metadata targets
if targets_metadata:
try:
__sendToTargets(log, targets_metadata, sourceFile, targetFile, openConnections, metadata, None, context, prop["timeout"] )
log.debug("Passing metadata multipart-message for file " + str(sourceFile) + "...done.")
#send message to metadata targets
if targets_metadata:
try:
__sendToTargets(log, targets_metadata, sourceFile, targetFile, openConnections, metadata, None, context, prop["timeout"] )
log.debug("Passing metadata multipart-message for file " + str(sourceFile) + "...done.")
except:
log.error("Unable to send metadata multipart-message for file " + str(sourceFile), exc_info=True)
except:
log.error("Unable to send metadata multipart-message for file " + str(sourceFile) + " to " + str(targets_metadata), exc_info=True)
def clean (prop):
......
......@@ -34,8 +34,7 @@ def setup (log, prop):
return True
def getMetadata (log, metadata, chunkSize, localTarget = None):
def getMetadata (log, prop, targets, metadata, chunkSize, localTarget = None):
#extract fileEvent metadata
try:
......@@ -139,8 +138,8 @@ def sendData (log, targets, sourceFile, targetFile, metadata, openConnections,
except:
log.error("Unable to open target file '" + targetFile + "'.", exc_info=True)
targets_data = [i for i in targets if i[2] == "data"]
targets_metadata = [i for i in targets if i[2] == "metadata"]
targets_data = [i for i in targets if i[3] == "data"]
targets_metadata = [i for i in targets if i[3] == "metadata"]
chunkNumber = 0
log.debug(