Commit 828d3641 authored by Manuela Kuhn's avatar Manuela Kuhn

hidra fetcher uses transfer API

parent 2042de54
......@@ -132,9 +132,18 @@ data_fetcher_type = file_fetcher
#data_fetcher_type = zmq_fetcher
# If "zmq_fetcher" is specified as data_fetcher_type it needs a port to listen to
# (needed if event_detector_type is zmq_events)
# (needed if event_detector_type is zmq_events or if on windows and
# event_detector_type is http_events)
data_fetcher_port = 50010
# Test signal port to notify the data sender about problems
# (needed if event_detector_type is hidra_events)
status_check_resp_port = 50011
# Confirmation socket to send a confirmation for each data message sent
# (needed if event_detector_type is hidra_events)
confirmation_resp_port = 50012
# Number of parallel data streams
number_of_streams = 8
......@@ -182,11 +191,11 @@ remove_data = False
#remove_data = with_confirmation
# Test signal port to check if the data receiver is having problems
test_port = 50050
status_check_port = 50050
# ZMQ port to communicate with cleaner process
# (needed if running on Windows)
cleaner_port = 50051
# Confirmation socket to get a confirmation for each data message sent)
# Confirmation socket to get a confirmation for each data message sent
confirmation_port = 50052
......@@ -94,13 +94,17 @@ action_time = 10
# watchdog_events)
time_till_closed = 2
# ZMQ port to get incoming data from
# (needed if event_detector_type is hidra_events)
ext_data_port = 50101
# ZMQ port to get events from
# (needed if event_detector_type is zmq_events)
event_det_port = 50003
# IP/DNS name of the detector
# (needed if event_detector_type is http_events)
det_ip = lsdma-lab04
det_ip = asap3-mon
# API version of the detector
# (needed if event_detector_type is http_events)
det_api_version = 1.5.0
......@@ -119,6 +123,15 @@ data_fetcher_type = file_fetcher
# (needed if event_detector_type is zmq_events)
data_fetcher_port = 50010
# Test signal port to notify the data sender about problems
# (needed if event_detector_type is hidra_events)
status_check_resp_port = 50011
# Confirmation socket to send a confirmation for each data
# message sent
# needed if event_detector_type is hidra_events)
confirmation_resp_port = 50012
# Number of parallel data streams
number_of_streams = 8
......@@ -126,17 +139,18 @@ number_of_streams = 8
use_data_stream = False
# Fixed host and port to send the data to with highest priority
# (only needed if use_data_stream is enabled)
data_stream_targets = [["zitpcx19282", 50100]]
data_stream_targets = [["asap3-p00", 50100]]
# Chunk size of file-parts getting send via zmq
#1024*1024*10
chunksize = 10485760
# ZMQ-router port which coordinates the load-balancing to the
# worker-processes
# ZMQ-router port which coordinates the load-balancing to the worker-processes
# (needed if running on Windows)
router_port = 50004
# Target to move the files into (needed if store_data is enabled)
# Target directory to move the files into
# (needed if store_data is enabled)
local_target = D:\opt\hidra\data\target
# Flag describing if the data should be stored in local_target
......@@ -147,8 +161,24 @@ store_data = False
# options are:
# True - data stays on the source
# False - data is removed from the source after processing it
# deferred_error_handling - only supported if use_data_stream is
# enabled;
# datamanager gets a feedback if the
# data writing was successful
# with_confirmation - only supported if use_data_stream is
# enabled; data is removed from the source
# only after the target sent a verification
# enabled and data_fetcher_type is
# file_fetcher or http_fetcher;
# data is removed from the source only after
# the target sent a verification
# (needed if data_fetcher_type is file_fetcher or http_fetcher)
remove_data = False
# Test signal port to check if the data receiver is having problems
status_check_port = 50050
# ZMQ port to communicate with cleaner process
# (needed if running on Windows)
cleaner_port = 50051
# Confirmation socket to get a confirmation for each data message sent
confirmation_port = 50052
......@@ -6,9 +6,10 @@ import os
import logging
import json
import time
from zmq.devices.basedevice import ProcessDevice
from datafetcherbase import DataFetcherBase, DataHandlingError
from hidra import generate_filepath, store_data_chunk
from hidra import generate_filepath, Transfer
import helpers
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
......@@ -28,15 +29,16 @@ class DataFetcher(DataFetcherBase):
self.f_descriptors = dict()
required_params = ["context",
"store_data",
"status_check_resp_port",
"confirmation_resp_port"]
if helpers.is_windows():
required_params = ["context",
"data_fetch_port",
"store_data"]
required_params += ["data_fetch_port"]
else:
required_params = ["context",
"ipc_path",
"main_pid",
"store_data"]
required_params += ["ipc_path",
"main_pid"]
# Check format of config
check_passed, config_reduced = helpers.check_config(required_params,
......@@ -52,23 +54,23 @@ class DataFetcher(DataFetcherBase):
self.metadata_r = None
self.data_r = None
con_str = "ipc://{0}/{1}_{2}".format(self.config["ipc_path"],
self.config["main_pid"],
"out")
self.transfer = Transfer("STREAM", use_log=log_queue)
# Create zmq socket to get events
try:
self.config["data_fetch_socket"] = context.socket(zmq.PULL)
# self.config["data_fetch_socket"] = (
# self.config["context"].socket(zmq.PULL))
self.config["data_fetch_socket"].connect(con_str)
self.transfer.start([self.config["ipc_path"],
"{0}_{1}".format(self.config["main_pid"], "out")],
protocol="ipc", data_con_style="connect")
# enable status check requests from any sender
self.transfer.setopt("status_check",
[self.config["ext_ip"],
self.config["status_check_resp_port"]])
# enable confirmation reply if this is requested in a received data
# packet
self.transfer.setopt("confirmation",
[self.config["ext_ip"],
self.config["confirmation_resp_port"]])
self.log.info("Start data fetcher socket (connect): '{0}'"
.format(con_str))
except:
self.log.error("Failed to start data fetcher socket (connect):"
" '{0}'".format(con_str), exc_info=True)
raise
else:
self.log.debug("config={0}".format(self.config))
raise Exception("Wrong configuration")
......@@ -76,12 +78,7 @@ class DataFetcher(DataFetcherBase):
def get_metadata(self, targets, metadata):
# Get new data
self.metadata_r, self.data_r = (
self.config["data_fetch_socket"].recv_multipart())
# the metadata were received as string and have to be converted into
# a dictionary
self.metadata_r = json.loads(self.metadata_r.decode("utf-8"))
self.metadata_r, self.data_r = self.transfer.get()
if (metadata["relative_path"] != self.metadata_r["relative_path"]
or metadata["source_path"] != self.metadata_r["source_path"]
......@@ -179,12 +176,19 @@ class DataFetcher(DataFetcherBase):
.format(self.source_file, targets_metadata),
exc_info=True)
# store data
if self.config["store_data"]:
# TODO: save message to file using a thread (avoids blocking)
store_data_chunk(self.f_descriptors, self.target_file,
self.data_r, self.config["local_target"],
self.metadata_r, self.log)
# store data
try:
# TODO: save message to file using a thread (avoids blocking)
self.transfer.store_data_chunk(self.f_descriptors,
self.target_file,
self.data_r,
self.config["local_target"],
self.metadata_r)
except:
self.log.error("Storing multipart message for file '{0}' "
"failed".format(self.source_file),
exc_info=True)
def stop(self):
......@@ -193,10 +197,8 @@ class DataFetcher(DataFetcherBase):
self.f_descriptors[target_file].close()
del self.f_descriptors[target_file]
# Close zmq socket
if self.config["data_fetch_socket"]:
self.config["data_fetch_socket"].close(0)
self.config["data_fetch_socket"] = None
# close zmq sockets
self.transfer.stop()
if __name__ == '__main__':
......
......@@ -474,9 +474,9 @@ class DataManager():
self.fixed_stream_id = ("{0}:{1}"
.format(self.params["data_stream_targets"][0][0],
self.params["data_stream_targets"][0][1]))
self.test_signal_id = ("{0}:{1}"
self.status_check_id = ("{0}:{1}"
.format(self.params["data_stream_targets"][0][0],
self.params["test_port"]))
self.params["status_check_port"]))
if self.params["remove_data"] == "deferred_error_handling":
self.log.info("Enabled receiver checking")
......@@ -488,7 +488,7 @@ class DataManager():
else:
self.check_target_host = lambda enable_logging=False: True
self.fixed_stream_id = None
self.test_signal_id = None
self.status_check_id = None
self.number_of_streams = self.params["number_of_streams"]
self.chunksize = self.params["chunksize"]
......@@ -577,7 +577,7 @@ class DataManager():
# socket
try:
self.test_socket = self.context.socket(zmq.REQ)
con_str = "tcp://{0}".format(self.test_signal_id)
con_str = "tcp://{0}".format(self.status_check_id)
self.test_socket.connect(con_str)
self.log.info("Start test_socket (connect): '{0}'"
......@@ -602,13 +602,13 @@ class DataManager():
if enable_logging:
self.log.info("Sending status check to fixed streaming"
" host {0} ... success"
.format(self.fixed_stream_id))
.format(self.status_check_id))
status = self.test_socket.recv_multipart()
if enable_logging:
self.log.info("Received responce for status check of "
"fixed streaming host {0}"
.format(self.fixed_stream_id))
.format(self.status_check_id))
else:
self.socket_reconnected = False
......@@ -622,7 +622,7 @@ class DataManager():
# reopen it
try:
self.test_socket = self.context.socket(zmq.REQ)
con_str = "tcp://{0}".format(self.test_signal_id)
con_str = "tcp://{0}".format(self.status_check_id)
self.test_socket.connect(con_str)
self.log.info("Restart test_socket (connect): "
......@@ -644,7 +644,7 @@ class DataManager():
if enable_logging:
self.log.info("Sent status check to fixed "
"streaming host {0}"
.format(self.fixed_stream_id))
.format(self.status_check_id))
# The receiver may have dropped authentication or
# previous status check was not answered
......@@ -655,7 +655,7 @@ class DataManager():
if self.zmq_again_occured == 0:
self.log.error("Failed to send test message to "
"fixed streaming host {0}"
.format(self.fixed_stream_id))
.format(self.status_check_id))
self.log.debug("Error was: {0}: {0}"
.format(exc_type, exc_value))
self.zmq_again_occured += 1
......@@ -672,19 +672,25 @@ class DataManager():
if not tracker.done:
self.log.error("Failed check status of fixed"
"streaming host {0}"
.format(self.fixed_stream_id),
.format(self.status_check_id),
exc_info=True)
return False
# test message was successfully sent
elif enable_logging:
if enable_logging:
self.log.info("Sending status test to fixed "
"streaming host {0} ... success"
.format(self.fixed_stream_id))
.format(self.status_check_id))
self.zmq_again_occured = 0
self.log.debug("Receiving responce...")
status = self.test_socket.recv_multipart()
if enable_logging:
self.log.debug("Received responce: {0}".format(status))
# responce to test message was successfully received
# TODO check status + react
if status[0] == b"ERROR":
......@@ -695,14 +701,14 @@ class DataManager():
elif enable_logging:
self.log.info("Responce for status check of fixed "
"streaming host {0}: {1}"
.format(self.fixed_stream_id, status))
.format(self.status_check_id, status))
except KeyboardInterrupt:
raise
except:
self.log.error("Failed to check status of fixed "
"streaming host {0}"
.format(self.fixed_stream_id), exc_info=True)
.format(self.status_check_id), exc_info=True)
return False
return True
......
......@@ -39,23 +39,26 @@ class MonitorDevice():
try:
msg = self.in_socket.recv_multipart()
# print ("[MonitoringDevice] In: Received message {0}"
# .format(msg[:20]))
# .format(msg[0][:20]))
except KeyboardInterrupt:
break
if msg != [b'ALIVE_TEST']:
mon_msg = [self.in_prefix] + msg
self.mon_socket.send_multipart(mon_msg)
# print ("[MonitoringDevice] Mon: Sent message")
try:
mon_msg = [self.in_prefix] + msg
self.mon_socket.send_multipart(mon_msg)
# print ("[MonitoringDevice] Mon: Sent message")
self.out_socket.send_multipart(msg)
# print ("[MonitoringDevice] Out: Sent message {0}"
# .format(msg[:20]))
self.out_socket.send_multipart(msg)
# print ("[MonitoringDevice] Out: Sent message {0}"
# .format([msg[0], msg[1][:20]]))
mon_msg = [self.out_prefix] + msg
self.mon_socket.send_multipart(mon_msg)
# print ("[MonitoringDevice] Mon: Sent message")
mon_msg = [self.out_prefix] + msg
self.mon_socket.send_multipart(mon_msg)
# print ("[MonitoringDevice] Mon: Sent message")
except KeyboardInterrupt:
break
class EventDetector(EventDetectorBase):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment