Commit 81ffb7a6 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added force-stop function to dataTransferAPI

parent fb98dc4c
......@@ -147,28 +147,7 @@ class dataTransfer():
self.stop()
raise ConnectionFailed("No host to send signal to specified." )
self.targets = []
# [host, port, prio]
if len(targets) == 3 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list:
host, port, prio = targets
self.targets = [[host + ":" + port, prio, [""]]]
# [host, port, prio, suffixes]
elif len(targets) == 4 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list and type(targets[3]) == list:
host, port, prio, suffixes = targets
self.targets = [[host + ":" + port, prio, suffixes]]
# [[host, port, prio], ...] or [[host, port, prio, suffixes], ...]
else:
for t in targets:
if type(t) == list and len(t) == 3:
host, port, prio = t
self.targets.append([host + ":" + port, prio, [""]])
elif type(t) == list and len(t) == 4 and type(t[3]):
host, port, prio, suffixes = t
self.targets.append([host + ":" + port, prio, suffixes])
else:
self.stop()
self.log.debug("targets=" + str(targets))
raise FormatError("Argument 'targets' is of wrong format.")
self.__setTargets (targets)
message = self.__sendSignal(signal)
......@@ -217,6 +196,33 @@ class dataTransfer():
self.poller.register(self.signalSocket, zmq.POLLIN)
def __setTargets (self, targets):
self.targets = []
# [host, port, prio]
if len(targets) == 3 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list:
host, port, prio = targets
self.targets = [[host + ":" + port, prio, [""]]]
# [host, port, prio, suffixes]
elif len(targets) == 4 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list and type(targets[3]) == list:
host, port, prio, suffixes = targets
self.targets = [[host + ":" + port, prio, suffixes]]
# [[host, port, prio], ...] or [[host, port, prio, suffixes], ...]
else:
for t in targets:
if type(t) == list and len(t) == 3:
host, port, prio = t
self.targets.append([host + ":" + port, prio, [""]])
elif type(t) == list and len(t) == 4 and type(t[3]):
host, port, prio, suffixes = t
self.targets.append([host + ":" + port, prio, suffixes])
else:
self.stop()
self.log.debug("targets=" + str(targets))
raise FormatError("Argument 'targets' is of wrong format.")
def __sendSignal (self, signal):
......@@ -630,6 +636,65 @@ class dataTransfer():
self.log.error("Closing ZMQ context...failed.", exc_info=True)
def force_stop (self, targets):
if type(targets) != list:
self.stop()
raise FormatError("Argument 'targets' must be list.")
if not self.context:
self.context = zmq.Context()
self.extContext = False
signal = None
# Signal exchange
if self.connectionType == "stream":
signalPort = self.signalPort
signal = "STOP_STREAM"
elif self.connectionType == "streamMetadata":
signalPort = self.signalPort
signal = "STOP_STREAM_METADATA"
elif self.connectionType == "queryNext":
signalPort = self.signalPort
signal = "STOP_QUERY_NEXT"
elif self.connectionType == "queryMetadata":
signalPort = self.signalPort
signal = "STOP_QUERY_METADATA"
self.log.debug("Create socket for signal exchange...")
if self.signalHost and not self.signalSocket:
self.__createSignalSocket(signalPort)
elif not self.signalHost:
self.stop()
raise ConnectionFailed("No host to send signal to specified." )
self.__setTargets (targets)
message = self.__sendSignal(signal)
if message and message == "VERSION_CONFLICT":
self.stop()
raise VersionError("Versions are conflicting.")
elif message and message == "NO_VALID_HOST":
self.stop()
raise AuthenticationFailed("Host is not allowed to connect.")
elif message and message == "CONNECTION_ALREADY_OPEN":
self.stop()
raise CommunicationFailed("Connection is already open.")
elif message and message == "NO_VALID_SIGNAL":
self.stop()
raise CommunicationFailed("Connection type is not supported for this kind of sender.")
# if there was no response or the response was of the wrong format, the receiver should be shut down
elif message and message.startswith(signal):
self.log.info("Received confirmation ...")
def __exit__ (self):
self.stop()
......
Zeromq Data Transfer develop
- Sender can now be controlled via tango
- Added cleanup arguments into config gile
- Added method to dataTransferAPI to manually stop streams/queries
- Added option to specify which file formats to be send via zeromq
- Added option look for multiple event types in parallel (combined with file suffixes)
- DataManager can now be controlled via tango
- Added systemd service script
- Added cleanup arguments into config file
- Files get accessed only if data or metadata is send via zeromq
- Fixed DataReceiver (no shell)
- Added command line argument error handling
......
......@@ -55,9 +55,7 @@ class worker(multiprocessing.Process):
break
self.log.debug("worker-" + str(self.id) + ": metadata " + str(metadata["filename"]))
# filepath = os.path.join(metadata["relativePath"], metadata["filename"])
# filepath = os.path.join(basePath, filepath)
filepath = self.query.generateTargetFilepath(basePath, metadata)
filepath = self.query.generateTargetFilepath(self.basePath, metadata)
self.log.debug("worker-" + str(self.id) + ": filepath " + filepath)
with open(filepath, "r") as fileDescriptor:
......@@ -88,6 +86,7 @@ if __name__ == "__main__":
# signalHost = "asap3-bl-prx07.desy.de"
# targets = [["asap3-bl-prx07.desy.de", "50101", 1, [".cbf"]], ["asap3-bl-prx07.desy.de", "50102", 1, [".cbf"]], ["asap3-bl-prx07.desy.de", "50103", 1, [".cbf"]]]
# targets = [["zitpcx19282.desy.de", "50101", 1, [".cbf"]]]
targets = [["zitpcx19282.desy.de", "50101", 1, [".cbf"]], ["zitpcx19282.desy.de", "50102", 1, [".cbf"]], ["zitpcx19282.desy.de", "50103", 1, [".cbf"]]]
# targets = [["zitpcx19282.desy.de", "50101", 1], ["zitpcx19282.desy.de", "50102", 1], ["zitpcx19282.desy.de", "50103", 1]]
# targets = [["zitpcx19282.desy.de", "50101", 1, [".cbf"]], ["zitpcx19282.desy.de", "50102", 1, [".cbf"]], ["zitpcx19282.desy.de", "50103", 1, [".cbf"]], ["lsdma-lab04.desy.de", "50104", 1, [".cbf"]]]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment