Commit c800a44a authored by Manuela Kuhn's avatar Manuela Kuhn

Revert sharing of socket for file operations and status checks in transfer

parent 828d3641
......@@ -156,6 +156,7 @@ class Transfer():
self.signal_host = signal_host
self.signal_port = "50000"
self.request_port = "50001"
self.file_op_port = "50050"
self.status_check_port = "50050"
self.confirmation_port = "50052"
self.ipc_path = os.path.join(tempfile.gettempdir(), "hidra")
......@@ -166,6 +167,7 @@ class Transfer():
self.signal_socket = None
self.request_socket = None
self.file_op_socket = None
self.status_check_socket = None
self.data_socket = None
self.confirmation_socket = None
......@@ -626,9 +628,10 @@ class Transfer():
"""
Args
option (str):
"status_check": enable and configure socket to use for
- receiving file operation commands for NEXUS use case
- status check requests
"file_op": enable and configure socket to use for receiving
file operation commands for NEXUS use case
"status_check": enable and configure socket to use for status
check requests
"confirmation": enable and configure socket to use for
confirmation that individual data packets where
handled without problems
......@@ -640,6 +643,7 @@ class Transfer():
option [<protocol>, <ip>, <port>]
"""
if option == "status_check":
# TODO create Thread shich handles this asynchroniously
if self.status_check_socket is not None:
self.log.error("Status check is already enabled (used port: "
"{0})".format(self.status_check_port))
......@@ -688,6 +692,55 @@ class Transfer():
self.poller.register(self.status_check_socket, zmq.POLLIN)
#####################################
elif option == "file_op":
if self.file_op_socket is not None:
self.log.error("File operation is already enabled (used port: "
"{0})".format(self.file_op_port))
return
self.file_op_protocol = "tcp"
self.file_op_ip = self.ip
if value is not None:
if type(value) == list:
if len(value) == 2:
self.file_op_ip = value[0]
self.file_op_port = value[1]
elif len(value) == 3:
self.file_op_protocol = value[0]
self.file_op_ip = value[1]
self.file_op_port = value[2]
else:
self.log.debug("value={0}".format(value))
raise FormatError("Socket information have to be of the"
"form [<host>, <port>].")
else:
self.file_op_port = value
bind_str = "{0}://{1}".format(
self.file_op_protocol,
self.__get_socket_id(self.file_op_ip,
self.file_op_port))
######## status check socket ########
# socket to get signals to get status check requests. this socket is
# also used to get signals to open and close nexus files
self.file_op_socket = self.context.socket(zmq.REP)
if self.is_ipv6:
self.file_op_socket.ipv6 = true
self.log.debug("enabling ipv6 socket for file_op_socket")
try:
self.file_op_socket.bind(bind_str)
self.log.info("Start status check socket (bind) for '{0}'"
.format(bind_str))
except:
self.log.error("Failed to start status check socket (bind) "
"for '{0}'".format(bind_str), exc_info=true)
self.poller.register(self.file_op_socket, zmq.POLLIN)
#####################################
elif option == "confirmation":
if self.confirmation_socket is not None:
self.log.error("Confirmation is already enabled (used port: "
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment