Commit 7e0f988a authored by Manuela Kuhn's avatar Manuela Kuhn

Fixed removing files with confirmation

parent 582be33e
......@@ -73,6 +73,7 @@ status_check_port = 50050
# ZMQ port to communicate with cleaner process
# (needed if running on Windows)
cleaner_port = 50051
cleaner_trigger_port = 50052
# Confirmation socket to get a confirmation for each data message sent
confirmation_port = 50052
confirmation_port = 50053
......@@ -227,7 +227,7 @@ class Transfer():
self.request_port = "50001"
self.file_op_port = "50050"
self.status_check_port = "50050"
self.confirmation_port = "50052"
self.confirmation_port = "50053"
self.ipc_path = os.path.join(tempfile.gettempdir(), "hidra")
self.is_ipv6 = False
......@@ -646,7 +646,7 @@ class Transfer():
self.log.debug("socket_id_bind_str={}".format(socket_bind_str))
""" ######## data socket ############ """
######## data socket ############
# Socket to retrieve data
self.data_socket = self.context.socket(zmq.PULL)
# remember the bind string for reestablishment of the connection
......@@ -680,11 +680,11 @@ class Transfer():
raise
self.poller.register(self.data_socket, zmq.POLLIN)
""" ##################################### """
#####################################
if self.connection_type in ["QUERY_NEXT", "QUERY_METADATA"]:
""" ########## request socket ########### """
########## request socket ###########
# An additional socket is needed to establish the data retriving
# mechanism
self.request_socket = self.context.socket(zmq.PUSH)
......@@ -698,7 +698,7 @@ class Transfer():
self.log.error("Failed to start request socket (connect):"
" '{}'".format(con_str), exc_info=True)
raise
""" ##################################### """
#####################################
self.started_connections["QUERY_NEXT"] = {
"id": socket_id,
......@@ -707,13 +707,13 @@ class Transfer():
elif self.connection_type in ["NEXUS"]:
""" ####### file operation socket ####### """
####### file operation socket #######
# Reuse status check socket to get signals to open and close
# nexus files
self.setopt("status_check")
""" ##################################### """
#####################################
""" ########## control socket ########### """
######### control socket ###########
# Socket to retrieve control signals from control API
if not os.path.exists(self.ipc_path):
os.makedirs(self.ipc_path)
......@@ -733,7 +733,7 @@ class Transfer():
exc_info=True)
self.poller.register(self.control_socket, zmq.POLLIN)
""" ##################################### """
#####################################
self.started_connections["NEXUS"] = {
"id": socket_id,
......@@ -794,7 +794,7 @@ class Transfer():
self.__get_socket_id(self.status_check_ip,
self.status_check_port))
""" ######## status check socket ######## """
######## status check socket ########
# socket to get signals to get status check requests. this socket
# is also used to get signals to open and close nexus files
self.status_check_socket = self.context.socket(zmq.REP)
......@@ -811,7 +811,7 @@ class Transfer():
"for '{}'".format(bind_str), exc_info=True)
self.poller.register(self.status_check_socket, zmq.POLLIN)
""" ##################################### """
#####################################
elif option == "file_op":
if self.file_op_socket is not None:
......@@ -843,7 +843,7 @@ class Transfer():
self.__get_socket_id(self.file_op_ip,
self.file_op_port))
""" ######## status check socket ######## """
######## status check socket ########
# socket to get signals to get status check requests. this socket
# is also used to get signals to open and close nexus files
self.file_op_socket = self.context.socket(zmq.REP)
......@@ -860,12 +860,12 @@ class Transfer():
"for '{}'".format(bind_str), exc_info=True)
self.poller.register(self.file_op_socket, zmq.POLLIN)
""" ##################################### """
#####################################
elif option == "confirmation":
if self.confirmation_socket is not None:
self.log.error("Confirmation is already enabled (used port: "
"{0})".format(self.confirmation_port))
"{})".format(self.confirmation_port))
return
self.confirmation_protocol = "tcp"
......@@ -881,7 +881,7 @@ class Transfer():
self.confirmation_ip = value[1]
self.confirmation_port = value[2]
else:
self.log.debug("value={0}".format(value))
self.log.debug("value={}".format(value))
raise FormatError("Socket information have to be of "
"the form [<host>, <port>].")
else:
......@@ -892,19 +892,19 @@ class Transfer():
self.__get_socket_id(self.confirmation_ip,
self.confirmation_port))
""" ######## confirmation socket ######## """
######## confirmation socket ########
# to send the a confirmation to the sender that the data packages
# was stored successfully
self.confirmation_socket = self.context.socket(zmq.PUSH)
self.confirmation_socket = self.context.socket(zmq.PUB)
try:
self.confirmation_socket.connect(con_str)
self.log.info("Start confirmation_socket (connect) for '{}'"
self.confirmation_socket.bind(con_str)
self.log.info("Start confirmation_socket (bind) for '{}'"
.format(con_str))
except:
self.log.error("Failed to start confirmation socket (connect) "
self.log.error("Failed to start confirmation socket (bind) "
"for '{}'".format(con_str), exc_info=True)
""" ##################################### """
#####################################
else:
raise NotSupported("Option {} is not supported".format(option))
......@@ -943,7 +943,7 @@ class Transfer():
# Recreate the socket (not with the new whitelist enables)
self.log.debug("Starting down data_socket")
""" ########## data socket ########### """
########## data socket ###########
self.data_socket = self.context.socket(zmq.PULL)
self.data_socket.zap_domain = b'global'
......@@ -964,7 +964,7 @@ class Transfer():
raise
self.poller.register(self.data_socket, zmq.POLLIN)
""" ##################################### """
#####################################
def read(self, callback_params, open_callback, read_callback,
close_callback):
......@@ -1367,10 +1367,16 @@ class Transfer():
file_id = generate_file_identifier(metadata)
# send confirmation
try:
self.confirmation_socket.send(file_id.encode("utf-8"))
topic = metadata["confirmation_required"].encode()
#topic = b"test"
message = [topic, file_id.encode("utf-8")]
self.confirmation_socket.send_multipart(message)
self.log.debug("Sending confirmation for chunk {} of "
"file '{}'"
.format(metadata["chunk_number"], file_id))
"file '{}' to {}"
.format(metadata["chunk_number"],
file_id,
topic))
except:
if self.confirmation_socket is None:
self.log.error("Correct data handling is requested to "
......@@ -1543,19 +1549,19 @@ class Transfer():
self.control_socket.close(linger=0)
self.control_socket = None
control_con_str = ("{0}/{1}_{2}"
control_con_str = ("{}/{}_{}"
.format(self.ipc_path,
self.current_pid,
"control_API"))
try:
os.remove(control_con_str)
self.log.debug("Removed ipc socket: {0}"
self.log.debug("Removed ipc socket: {}"
.format(control_con_str))
except OSError:
self.log.warning("Could not remove ipc socket: {0}"
self.log.warning("Could not remove ipc socket: {}"
.format(control_con_str))
except:
self.log.warning("Could not remove ipc socket: {0}"
self.log.warning("Could not remove ipc socket: {}"
.format(control_con_str), exc_info=True)
except:
self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
......@@ -1571,7 +1577,7 @@ class Transfer():
for target in self.file_descriptors:
self.file_descriptors[target].close()
self.log.warning("Not all chunks were received for file {0}"
self.log.warning("Not all chunks were received for file {}"
.format(target))
if self.file_descriptors:
......
This diff is collapsed.
from __future__ import unicode_literals
import zmq
import json
import sys
import abc
import json
import os
import socket
import sys
import zmq
from __init__ import BASE_PATH # noqa F401
import utils
......@@ -47,7 +48,8 @@ class DataFetcherBase(ABC):
False,
"stop_on_error",
"with_confirmation"]],
"cleaner_job_con_str"
"cleaner_job_con_str",
"main_pid"
]
# Check format of config
......@@ -73,6 +75,10 @@ class DataFetcherBase(ABC):
self.config["cleaner_job_con_str"]),
exc_info=True)
raise
self.confirmation_topic = (
utils.generate_sender_id(self.config["main_pid"])
)
else:
self.cleaner_job_socket = None
......
......@@ -100,8 +100,10 @@ class DataFetcher(DataFetcherBase):
metadata["file_mod_time"] = file_mod_time
metadata["file_create_time"] = file_create_time
metadata["chunksize"] = self.config["chunksize"]
metadata["confirmation_required"] = (
self.config["remove_data"] == "with_confirmation")
if self.config["remove_data"] == "with_confirmation":
metadata["confirmation_required"] = self.confirmation_topic
else:
metadata["confirmation_required"] = False
self.log.debug("metadata = {}".format(metadata))
except:
......@@ -370,8 +372,7 @@ class Cleaner(CleanerBase):
# remove file
try:
os.remove(source_file)
self.log.info("Removing file '{}' ...success"
.format(source_file))
self.log.info("Removing file '{}' ...success".format(source_file))
except:
self.log.error("Unable to remove file {}".format(source_file),
exc_info=True)
......@@ -428,12 +429,12 @@ if __name__ == '__main__':
.format(ipc_path))
if utils.is_windows():
job_con_str = "tcp://{0}:{1}".format(con_ip, cleaner_port)
job_bind_str = "tcp://{0}:{1}".format(ext_ip, cleaner_port)
job_con_str = "tcp://{}:{}".format(con_ip, cleaner_port)
job_bind_str = "tcp://{}:{}".format(ext_ip, cleaner_port)
else:
job_con_str = ("ipc://{0}/{1}_{2}".format(ipc_path,
current_pid,
"cleaner"))
job_con_str = ("ipc://{}/{}_{}".format(ipc_path,
current_pid,
"cleaner"))
job_bind_str = job_con_str
conf_con_str = "tcp://{}:{}".format(con_ip, confirmation_port)
......
......@@ -423,8 +423,10 @@ class DataManager():
self.params["session"] = None
if self.use_cleaner:
self.params["cleaner_conf_con_str"] = "tcp://{}:{}".format(
self.ext_ip, self.params["confirmation_port"])
self.params["cleaner_conf_con_str"] = (
"tcp://{}:{}".format(self.params["data_stream_targets"][0][0],
self.params["confirmation_port"])
)
else:
self.params["cleaner_conf_con_str"] = None
......@@ -455,8 +457,12 @@ class DataManager():
self.params["cleaner_job_con_str"] = (
"tcp://{}:{}".format(self.localhost,
self.params["cleaner_port"]))
self.params["cleaner_tigger_con_str"] = (
"tcp://{}:{}".format(self.localhost,
self.params["cleaner_trigger_port"]))
else:
self.params["cleaner_job_con_str"] = None
self.params["cleaner_trigger_con_str"] = None
else:
self.log.info("Using ipc for internal communication.")
......@@ -481,8 +487,13 @@ class DataManager():
"ipc://{}/{}_{}".format(self.ipc_path,
self.current_pid,
"cleaner"))
self.params["cleaner_trigger_con_str"] = (
"ipc://{}/{}_{}".format(self.ipc_path,
self.current_pid,
"cleaner_trigger"))
else:
self.params["cleaner_job_con_str"] = None
self.params["cleaner_trigger_con_str"] = None
self.whitelist = self.params["whitelist"]
self.ldapuri = self.params["ldapuri"]
......@@ -901,6 +912,7 @@ class DataManager():
args=(self.params,
self.log_queue,
self.params["cleaner_job_con_str"],
self.params["cleaner_trigger_con_str"],
self.params["cleaner_conf_con_str"],
self.control_sub_con_str))
self.cleaner_pr.start()
......
......@@ -477,6 +477,18 @@ def excecute_ldapsearch(ldap_cn, ldapuri):
return netgroup
def generate_sender_id(main_pid):
""" Generates an unique id to identify the running datamanager.
Args:
main_pid: The PID of the datamanager
Return:
A byte string containing the identifier
"""
return b"{}_{}".format(socket.getfqdn(), main_pid)
# http://stackoverflow.com/questions/25585518/
# python-logging-logutils-with-queuehandler-and-queuelistener#25594270
class CustomQueueListener (QueueListener):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment