Commit 4e8ddc5d authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Merge branch 'release-2.2.0'

parents 0f9ae2d8 f919be03
# API to communicate with a data transfer unit
__version__ = '2.1.4'
__version__ = '2.2.0'
import zmq
import socket
......@@ -10,6 +10,7 @@ import errno
import os
import cPickle
import traceback
from zmq.auth.thread import ThreadAuthenticator
class loggingFunction:
......@@ -89,8 +90,11 @@ class dataTransfer():
self.signalSocket = None
self.dataSocket = None
self.requestSocket = None
self.poller = zmq.Poller()
self.auth = None
self.targets = None
self.supportedConnections = ["stream", "streamMetadata", "queryNext", "queryMetadata"]
......@@ -256,52 +260,73 @@ class dataTransfer():
return message
def start (self, dataSocket = False):
def start (self, dataSocket = False, whitelist = None):
# Receive data only from whitelisted nodes
if whitelist:
if type(whitelist) == list:
self.auth = ThreadAuthenticator(self.context)
self.auth.start()
for host in whitelist:
try:
if host == "localhost":
ip = [socket.gethostbyname(host)]
else:
hostname, tmp, ip = socket.gethostbyaddr(host)
self.log.debug("Allowing host " + host + " (" + str(ip[0]) + ")")
self.auth.allow(ip[0])
except:
self.log.error("Error was: ", exc_info=True)
raise AuthenticationFailed("Could not get IP of host " + host)
else:
raise FormatError("Whitelist has to be a list of IPs")
alreadyConnected = self.streamStarted or self.queryNextStarted
#TODO Do I need to raise an exception here?
if alreadyConnected:
# raise Exception("Connection already started.")
self.log.info("Connection already started.")
return
socketIdToConnect = self.streamStarted or self.queryNextStarted
ip = "0.0.0.0" #TODO use IP of hostname?
if socketIdToConnect:
self.log.info("Reopening already started connection.")
else:
host = ""
port = ""
ip = "0.0.0.0" #TODO use IP of hostname?
if dataSocket:
if type(dataSocket) == list:
socketIdToConnect = dataSocket[0] + ":" + dataSocket[1]
host = dataSocket[0]
ip = socket.gethostbyaddr(host)[2][0]
port = dataSocket[1]
else:
port = str(dataSocket)
host = ""
port = ""
host = socket.gethostname()
socketId = host + ":" + port
if dataSocket:
if type(dataSocket) == list:
socketIdToConnect = dataSocket[0] + ":" + dataSocket[1]
host = dataSocket[0]
ip = socket.gethostbyaddr(host)[2][0]
port = dataSocket[1]
else:
port = str(dataSocket)
host = socket.gethostname()
socketId = host + ":" + port
ipFromHost = socket.gethostbyaddr(host)[2]
if len(ipFromHost) == 1:
ip = ipFromHost[0]
elif len(self.targets) == 1:
host, port = self.targets[0][0].split(":")
ipFromHost = socket.gethostbyaddr(host)[2]
if len(ipFromHost) == 1:
ip = ipFromHost[0]
elif len(self.targets) == 1:
host, port = self.targets[0][0].split(":")
ipFromHost = socket.gethostbyaddr(host)[2]
if len(ipFromHost) == 1:
ip = ipFromHost[0]
else:
raise FormatError("Multipe possible ports. Please choose which one to use.")
else:
raise FormatError("Multipe possible ports. Please choose which one to use.")
socketId = host + ":" + port
socketIdToConnect = ip + ":" + port
# socketIdToConnect = "[" + ip + "]:" + port
socketId = host + ":" + port
socketIdToConnect = ip + ":" + port
# socketIdToConnect = "[" + ip + "]:" + port
self.dataSocket = self.context.socket(zmq.PULL)
# An additional socket is needed to establish the data retriving mechanism
connectionStr = "tcp://" + socketIdToConnect
self.dataSocket.zap_domain = b'global'
try:
# self.dataSocket.ipv6 = True
......@@ -310,6 +335,7 @@ class dataTransfer():
self.log.info("Data socket of type " + self.connectionType + " started (bind) for '" + connectionStr + "'")
except:
self.log.error("Failed to start Socket of type " + self.connectionType + " (bind): '" + connectionStr + "'", exc_info=True)
raise
self.poller.register(self.dataSocket, zmq.POLLIN)
......@@ -323,6 +349,7 @@ class dataTransfer():
self.log.info("Request socket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start Socket of type " + self.connectionType + " (connect): '" + connectionStr + "'", exc_info=True)
raise
self.queryNextStarted = socketId
else:
......@@ -355,60 +382,64 @@ class dataTransfer():
self.log.error("Could not send request to requestSocket", exc_info=True)
return None, None
# receive data
if timeout:
try:
socks = dict(self.poller.poll(timeout))
except:
self.log.error("Could not poll for new message")
raise
else:
try:
socks = dict(self.poller.poll())
except:
self.log.error("Could not poll for new message")
raise
# if there was a response
if self.dataSocket in socks and socks[self.dataSocket] == zmq.POLLIN:
while True:
# receive data
if timeout:
try:
socks = dict(self.poller.poll(timeout))
except:
self.log.error("Could not poll for new message")
raise
else:
try:
socks = dict(self.poller.poll())
except:
self.log.error("Could not poll for new message")
raise
try:
multipartMessage = self.dataSocket.recv_multipart()
except:
self.log.error("Receiving files..failed.")
return [None, None]
# if there was a response
if self.dataSocket in socks and socks[self.dataSocket] == zmq.POLLIN:
if len(multipartMessage) < 2:
self.log.error("Received mutipart-message is too short. Either config or file content is missing.")
self.log.debug("multipartMessage=" + str(mutipartMessage))
return [None, None]
try:
multipartMessage = self.dataSocket.recv_multipart()
except:
self.log.error("Receiving data..failed.", exc_info=True)
return [None, None]
# extract multipart message
try:
metadata = cPickle.loads(multipartMessage[0])
except:
self.log.error("Could not extract metadata from the multipart-message.", exc_info=True)
metadata = None
#TODO validate multipartMessage (like correct dict-values for metadata)
if multipartMessage[0] == b"ALIVE_TEST":
continue
elif len(multipartMessage) < 2:
self.log.error("Received mutipart-message is too short. Either config or file content is missing.")
self.log.debug("multipartMessage=" + str(mutipartMessage)[:100])
return [None, None]
try:
payload = multipartMessage[1]
except:
self.log.warning("An empty file was received within the multipart-message", exc_info=True)
payload = None
# extract multipart message
try:
metadata = cPickle.loads(multipartMessage[0])
except:
self.log.error("Could not extract metadata from the multipart-message.", exc_info=True)
metadata = None
return [metadata, payload]
else:
self.log.warning("Could not receive data in the given time.")
#TODO validate multipartMessage (like correct dict-values for metadata)
if self.queryNextStarted :
try:
self.requestSocket.send_multipart(["CANCEL", self.queryNextStarted])
except Exception as e:
self.log.error("Could not cancel the next query", exc_info=True)
payload = multipartMessage[1]
except:
self.log.warning("An empty file was received within the multipart-message", exc_info=True)
payload = None
return [None, None]
return [metadata, payload]
else:
self.log.warning("Could not receive data in the given time.")
if self.queryNextStarted :
try:
self.requestSocket.send_multipart(["CANCEL", self.queryNextStarted])
except Exception as e:
self.log.error("Could not cancel the next query", exc_info=True)
return [None, None]
def store (self, targetBasePath, dataObject):
......@@ -420,12 +451,13 @@ class dataTransfer():
payload = dataObject[1]
if type(payloadMetadata) is not dict or type(payload) is not list:
if type(payloadMetadata) is not dict:
raise FormatError("payload: Wrong input format in 'store'")
#save all chunks to file
while True:
#TODO check if payload != cPickle.dumps(None) ?
if payloadMetadata and payload:
#append to file
try:
......@@ -458,8 +490,6 @@ class dataTransfer():
def __appendChunksToFile (self, targetBasePath, configDict, payload):
chunkCount = len(payload)
#generate target filepath
targetFilepath = self.generateTargetFilepath(targetBasePath, configDict)
self.log.debug("new file is going to be created at: " + targetFilepath)
......@@ -492,8 +522,7 @@ class dataTransfer():
#only write data if a payload exist
try:
if payload != None:
for chunk in payload:
newFile.write(chunk)
newFile.write(payload)
newFile.close()
except:
self.log.error("Unable to append data to file.")
......@@ -578,6 +607,14 @@ class dataTransfer():
except:
self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
if self.auth:
try:
self.auth.stop()
self.auth = None
self.log.info("Stopping authentication thread...done.")
except:
self.log.error("Stopping authentication thread...done.", exc_info=True)
# if the context was created inside this class,
# it has to be destroyed also within the class
if not self.extContext and self.context:
......
......@@ -14,6 +14,8 @@ logfileName = dataManager.log
# File size before rollover in B (linux only)
logfileSize = 10485760 ; #10 MB
# Name with which the service should be running
procname = zeromq-data-transfer
#########################################
#### SignalHandler Configuration ####
......@@ -35,7 +37,8 @@ requestFwPort = 50002
# ZMQ port to disribute control signals
# (needed if running on Windows)
controlPort = 50005
controlPubPort = 50005
controlSubPort = 50006
#########################################
#### EventDetector Configuration ####
......@@ -48,7 +51,7 @@ eventDetectorType = InotifyxDetector
#eventDetectorType = HttpDetector
# Subdirectories to be monitored and to store data to
# (needed if eventDetector is InotifyxDetector or WatchdogDetector or dataFetchter is getFromFile)
# (needed if eventDetector is InotifyxDetector or WatchdogDetector or dataFetcher is getFromFile)
fixSubdirs = ["commissioning", "current", "local"]
# Directory to be monitor for changes
......@@ -70,6 +73,11 @@ monitoredFormats = [".tif", ".cbf"]
# (needed if eventDetector is InotifyxDetector or HttpDetector)
historySize = 0
# Flag describing if a clean up thread which regularly checks
# if some files were missed should be activated
# (needed if eventDetector is InotifyxDetector)
useCleanUp = True
# Time (in seconds) since last modification after which a file will be seen as closed
# (needed if eventDetector is WatchdogDetector)
timeTillClosed = 2
......
#########################################
#### Logging Configuration ####
#########################################
# Path where the logfile will be created
logfilePath = D:\zeromq-data-transfer\logs
# Filename used for logging
logfileName = dataManager.log
# File size before rollover in B (linux only)
logfileSize = 10485760 ; #10 MB
#########################################
#### SignalHandler Configuration ####
#########################################
# Port number to receive signals from
comPort = 50000
# List of hosts allowed to connect
whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "zitpcx17858"]
# ZMQ port to get new requests
requestPort = 50001
# ZMQ port to forward requests
# (needed if running on Windows)
requestFwPort = 50002
# ZMQ port to disribute control signals
# (needed if running on Windows)
controlPubPort = 50005
controlSubPort = 50006
#########################################
#### EventDetector Configuration ####
#########################################
# Type of event detector to use (options are: InotifyxDetector, WatchdogDetector, ZmqDetector, HttpGetDetector)
#eventDetectorType = InotifyxDetector
eventDetectorType = WatchdogDetector
#eventDetectorType = ZmqDetector
#eventDetectorType = HttpDetector
# Subdirectories to be monitored and to store data to
# (needed if eventDetector is InotifyxDetector or WatchdogDetector or dataFetchter is getFromFile)
fixSubdirs = ["commissioning", "current", "local"]
# Directory to be monitor for changes
# Inside this directory only the subdirectories "commissioning", "current" and "local" are monitored
# (needed if eventDetector is InotifyxDetector or WatchdogDetector)
monitoredDir = D:\zeromq-data-transfer\data\source
# Target to move the files into
localTarget = D:\zeromq-data-transfer\data\target
# Type of event detector to use (options are: inotifyx, watchdog, zmq)
#eventDetectorType = inotifyx
eventDetectorType = watchdog
#eventDetectorType = zmq
# Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...)
# (needed if eventDetector is InotifyxDetector or WatchdogDetector)
monitoredEventType = IN_CLOSE_WRITE
# Subdirectories of watchDir to be monitored
monitoredSubdirs = ["commissioning", "current", "local"]
# The formats to be monitored, files in an other format will be be neglected
# (needed if eventDetector is InotifyxDetector or WatchdogDetector)
monitoredFormats = [".tif", ".cbf"]
# Number of events stored to look for doubles
# (needed if eventDetector is InotifyxDetector or HttpDetector)
historySize = 0
# Time (in seconds) since last modification after which a file will be seen as closed
# (needed if eventDetector is WatchdogDetector)
timeTillClosed = 2
# List of hosts allowed to connect
whitelist = ["localhost", "zitpcx19282", "zitpcx22614", "zitpcx17858"]
# ZMQ port to get events from
# (needed if eventDetectorType is ZmqDetector)
eventPort = 50003
# Tango device proxy for the detector
# (needed if eventDetectorType is HttpDetector)
#detectorDevice = "haspp10lab:10000/p10/eigerdectris/lab.01"
detectorDevice = haspp06:10000/p06/eigerdectris/exp.01
# Tango device proxy for the filewriter
# (needed if eventDetectorType is HttpDetector)
#filewriterDevice = "haspp10lab:10000/p10/eigerfilewriter/lab.01"
filewriterDevice = haspp06:10000/p06/eigerfilewriter/exp.01
#########################################
#### DataFetcher Configuration ####
#########################################
# Module with methods specifying how to get the data (options are "getFromFile", "getFromZmq", "getFromHttp")
dataFetcherType = getFromFile
#dataFetcherType = getFromZmq
#dataFetcherType = getFromHttp
# If "getFromZmq" is specified as dataFetcherType it needs a port to listen to
# (needed if eventDetectorType is ZmqDetector)
dataFetcherPort = 50010
# Number of parallel data streams
# if this number is modifified, the port numbers also have to be adjusted
numberOfStreams = 1
# Enable ZMQ pipe into storage system (if set to False: the file is moved into the localTarget)
# Enable ZMQ pipe into storage system (uses the fixedStreamHost and fixedStreamPort)
useDataStream = True
# Fixed host to send the data to with highest priority
fixedStreamHost = zitpcx19282
# Fixed Port to send the data to with highest priority
fixedStreamPort = 50100
# Port number to receive signals from
comPort = 50000
# ZMQ port to get new requests
requestPort = 50001
# ZMQ port to forward requests
requestFwPort = 50002
# ZMQ port to get events from (only needed if eventDetectorType is zmq)
eventPort = 50003
# ZMQ-router port which coordinates the load-balancing to the worker-processes
routerPort = 50004
# ZMQ-pull-socket port which deletes/moves given files
cleanerPort = 50005
# Chunk size of file-parts getting send via zmq
chunkSize = 10485760 ; # = 1024*1024*10
#chunkSize = 1073741824 ; # = 1024*1024*1024
# Path where the logfile will be created
logfilePath = D:\zeromq-data-transfer\logs
# ZMQ-router port which coordinates the load-balancing to the worker-processes
# (needed if running on Windows)
routerPort = 50004
# Filename used for logging
logfileName = dataManager.log
# Target to move the files into
localTarget = D:\zeromq-data-transfer\data\target
# File size before rollover in B (linux only)
logfileSize = 10485760 ; #10 MB
# Flag describing if the data should be stored in localTarget
# (needed if dataFetcherType is getFromFile or getFromHttp)
storeData = False
# Flag describing if the files should be removed from the source
# (needed if dataFetcherType is getFromHttp)
removeData = True
#########################################
#### Logging Configuration ####
#########################################
# Path where logfile will be created
logfilePath = /space/projects/zeromq-data-transfer/logs
# Filename used for logging
logfileName = dataReceiver.log
# File size before rollover in B (linux only)
logfileSize = 104857600 ; #100 MB
#########################################
#### DataReceiver Configuration ####
#########################################
#
# List of hosts allowed to receive data from
#whitelist = a3p02.1-hosts
whitelist = ["localhost", "zitpcx19282.desy.de", "zitpcx22614", "lsdma-lab04"]
# Where incoming data will be stored to
targetDir = /space/projects/zeromq-data-transfer/data/target
targetDir = /space/projects/zeromq-data-transfer/data/zmq_target
# Local IP to connect dataStream to
dataStreamIp = 131.169.185.121 ;# zitpcx19282.desy.de
# TCP port of data pipe
dataStreamPort = 50100
# Path where logfile will be created
logfilePath = /space/projects/zeromq-data-transfer/logs
# Filename used for logging
logfileName = dataReceiver.log
# File size before rollover in B (linux only)
logfileSize = 104857600 ; #100 MB
# Where incoming data will be stored to"
targetDir = /rd_liveviewer
#targetDir = /home/kuhnm/Arbeit/zeromq-data-transfer/data/zmq_target
#targetDir = /space/projects/zeromq-data-transfer/data/zmq_target
# TCP port of data pipe"
dataStreamPort = 50100
# IP to communicate with the liveViewer
liveViewerComIp = 0.0.0.0
# TCP port to communicate with the live viewer
liveViewerComPort = 50021
# List of hosts allowed to connect to the receiver
liveViewerWhiteList = ["localhost", "haspp11user02", "haspp11user03", "haspp11user04"]
#liveViewerWhiteList = ["localhost", "zitpcx19282", "zitpcx22614"]
# Port to exchange data and signals between receiver and LiveViewCommunicator
lvCommunicatorPort = 50020
# IP of dataStream-socket to send signals back to the sender
signalIp = haspp11eval01.desy.de
#signalIp = zitpcx19282.desy.de
#signalIp = zitpcx22614.desy.de
# Time to wait for the sender to give a confirmation of the signal
senderResponseTimeout = 1000
# Path where logfile will be created
logfilePath = /home/p11user/live-viewer/logs
#logfilePath = /home/p11user/zeromq-data-transfer/logs
#logfilePath = /home/kuhnm/Arbeit/zeromq-data-transfer/logs
#logfilePath = /space/projects/zeromq-data-transfer/logs
# Filename used for logging
logfileName = zmq_receiver_LiveViewer.log
# Size of the ring buffer for the live viewer
maxRingBufferSize = 20
#maxRingBufferSize = 2
# Size of the queue for the live viewer
maxQueueSize = 1000
#maxQueueSize = 2
Zeromq Data Transfer 2.2.0
- Fixed stopping: The service is shut down if one process dies
- Enabled whitelist for data receiver
- Added tests to check status of fixed data receiver
- Runs now under process name zeromq-data-receiver
- Added init script
- Fixed clean up after shut down
- Enabled combination of data receiver whitelist with ldapsearch
- Added option to enable a clean up thread which checks the directory for missed files
- Version check does not consider bugfixes anymore
Zeromq Data Transfer 2.1.4
- Fixed copied file removal (Part 2)
Zeromq Data Transfer 2.1.3
- Fixed copied file removal (Part 1)
Zeromq Data Transfer 2.1.2
- Fixed too high processor usage
- Fixed suffix check in treewalk after creation of directory
Zeromq Data Transfer 2.1.1
- Fixed error handling with incorrect whitelists