Commit 9a538890 authored by Manuela Kuhn's avatar Manuela Kuhn

Cleanup and pep8 compliance

parent 24dbd27a
......@@ -100,8 +100,10 @@ use_cleanup = False
#action_time = 150
action_time = 10
# Time (in seconds) since last modification after which a file will be seen as closed
# (needed if event_detector is inotifyx_events (for clean up) or watchdog_events)
# Time (in seconds) since last modification after which a file
# will be seen as closed
# (needed if event_detector is inotifyx_events (for clean up) or
# watchdog_events)
time_till_closed = 2
# ZMQ port to get incoming data from
......
import traceback
class LoggingFunction:
def out(self, x, exc_info=None):
if exc_info:
......@@ -48,12 +51,10 @@ class LoggingFunction:
self.error = lambda x, exc_info=None: self.no_out(x, exc_info)
# using output
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
elif level == None:
elif level is None:
# using no output
self.debug = lambda x, exc_info=None: self.no_out(x, exc_info)
self.info = lambda x, exc_info=None: self.no_out(x, exc_info)
self.warning = lambda x, exc_info=None: self.no_out(x, exc_info)
self.error = lambda x, exc_info=None: self.no_out(x, exc_info)
self.critical = lambda x, exc_info=None: self.no_out(x, exc_info)
......@@ -8,7 +8,6 @@ import socket
import logging
import os
import sys
import traceback
import subprocess
import re
import json
......@@ -184,7 +183,6 @@ class Control():
self.stop(unregister=False)
sys.exit(1)
def get(self, attribute, timeout=None):
msg = [b"get", self.host, self.detector, attribute]
......@@ -257,6 +255,7 @@ class Control():
def __del__(self):
self.stop()
def reset_receiver_status(host, port):
context = zmq.Context()
......
......@@ -8,7 +8,6 @@ import os
import platform
import zmq
import logging
import traceback
import json
import tempfile
import socket
......@@ -213,8 +212,8 @@ class Ingest():
self.file_op_socket.send_multipart(send_message)
self.log.info("Sending signal to close the file to file_op_socket")
except:
raise Exception("Sending signal to close the file to file_op_socket"
"...failed")
raise Exception("Sending signal to close the file to "
"file_op_socket...failed")
# send close-signal to event Detector
try:
......
This diff is collapsed.
......@@ -527,11 +527,11 @@ def call_hidra_service(cmd, beamline, det_id, log):
SERVICE_NAME = "hidra"
# systems using systemd
if os.path.exists("/usr/lib/systemd") \
if (os.path.exists("/usr/lib/systemd")
and (os.path.exists("/usr/lib/systemd/{0}.service"
.format(SYSTEMD_PREFIX))
or os.path.exists("/etc/systemd/system/{0}.service"
.format(SYSTEMD_PREFIX))):
.format(SYSTEMD_PREFIX))
or os.path.exists("/etc/systemd/system/{0}.service"
.format(SYSTEMD_PREFIX)))):
svc = "{0}{1}_{2}.service".format(SYSTEMD_PREFIX, beamline, det_id)
log.debug("Call: systemctl {0} {1}".format(cmd, svc))
......
......@@ -288,11 +288,11 @@ class CleanerBase(ABC):
self.job_checking_thread.start()
# self.conf_checking_thread.start()
while True:
# # intersect
# removable_elements = new_jobs & new_confirmations
# self.log.debug("removable_elements={0}".format(removable_elements))
# self.log.debug("removable_elements={0}"
# .format(removable_elements))
#
# for element in removable_elements:
# self.remove_element(element)
......@@ -314,9 +314,11 @@ class CleanerBase(ABC):
self.log.debug("Waiting for confirmation")
element = self.confirmation_socket.recv().decode("utf-8")
self.log.debug("New confirmation received: {0}".format(element))
self.log.debug("New confirmation received: {0}"
.format(element))
self.log.debug("new_jobs={0}".format(new_jobs))
self.log.debug("old_confirmations={0}".format(old_confirmations))
self.log.debug("old_confirmations={0}"
.format(old_confirmations))
for base_path, file_id in new_jobs:
if element == file_id:
......@@ -335,8 +337,9 @@ class CleanerBase(ABC):
.format(old_confirmations))
else:
old_confirmations.append(element)
self.log.error("confirmations without job notification "
"received: {0}".format(element))
self.log.error("confirmations without job "
"notification received: {0}"
.format(element))
######################################
# control commands #
......@@ -476,7 +479,8 @@ if __name__ == '__main__':
job_con_str = "tcp://{0}:{1}".format(con_ip, config["cleaner_port"])
job_bind_str = "tcp://{0}:{1}".format(ext_ip, config["cleaner_port"])
control_con_str = "tcp://{0}:{1}".format(ext_ip, config["control_port"])
control_con_str = "tcp://{0}:{1}".format(ext_ip,
config["control_port"])
else:
job_con_str = ("ipc://{0}/{1}_{2}".format(config["ipc_path"],
config["current_pid"],
......@@ -508,7 +512,8 @@ if __name__ == '__main__':
### Set up receiver simulator ###
confirmation_socket = context.socket(zmq.PUSH)
confirmation_socket.connect(conf_con_str)
logging.info("=== Start confirmation_socket (connect): {0}".format(conf_con_str))
logging.info("=== Start confirmation_socket (connect): {0}"
.format(conf_con_str))
# to give init time to finish
time.sleep(0.5)
......
......@@ -91,8 +91,10 @@ class DataFetcher(DataFetcherBase):
if self.is_windows:
# path convertions is save, see:
# http://softwareengineering.stackexchange.com/questions/245156/is-it-safe-to-convert-windows-file-paths-to-unix-file-paths-with-a-simple-replac # noqa E501
metadata["source_path"] = metadata["source_path"].replace("\\", "/")
metadata["relative_path"] = metadata["relative_path"].replace("\\", "/")
metadata["source_path"] = (
metadata["source_path"].replace("\\", "/"))
metadata["relative_path"] = (
metadata["relative_path"].replace("\\", "/"))
metadata["filesize"] = filesize
metadata["file_mod_time"] = file_mod_time
......@@ -277,8 +279,8 @@ class DataFetcher(DataFetcherBase):
file_id = self.generate_file_id(metadata)
self.cleaner_job_socket.send_multipart(
[metadata["source_path"].encode("utf-8"),
file_id.encode("utf-8")])
[metadata["source_path"].encode("utf-8"),
file_id.encode("utf-8")])
self.log.debug("Forwarded to cleaner {0}".format(file_id))
# send message to metadata targets
......@@ -307,8 +309,8 @@ class DataFetcher(DataFetcherBase):
# move file
try:
self._datahandling(shutil.move, metadata)
self.log.info("Moving file '{0}' ...success."
.format(self.source_file))
self.log.info("Moving file '{0}' to '{1}'...success."
.format(self.source_file, self.target_file))
except:
self.log.error("Could not move file {0} to {1}"
.format(self.source_file, self.target_file),
......@@ -439,14 +441,15 @@ if __name__ == '__main__':
config = {
"fix_subdirs": ["commissioning", "current", "local"],
"store_data": False,
"remove_data": "with_confirmation",
# "remove_data": False,
"remove_data": False,
"cleaner_job_con_str": job_bind_str,
"cleaner_conf_con_str": conf_bind_str,
"chunksize": 10485760, # = 1024*1024*10 = 10 MiB
"local_target": None
}
config["remove_data"] = "with_confirmation"
context = zmq.Context.instance()
### Set up cleaner ###
......@@ -482,7 +485,6 @@ if __name__ == '__main__':
logging.info("=== Start confirmation_socket (connect): {0}"
.format(config["cleaner_conf_con_str"]))
### Test file fetcher ###
prework_source_file = os.path.join(BASE_PATH, "test_file.cbf")
prework_target_file = os.path.join(
......
......@@ -6,7 +6,6 @@ import os
import logging
import json
import time
from zmq.devices.basedevice import ProcessDevice
from datafetcherbase import DataFetcherBase, DataHandlingError
from hidra import generate_filepath, Transfer
......@@ -38,7 +37,7 @@ class DataFetcher(DataFetcherBase):
required_params += ["data_fetch_port"]
else:
required_params += ["ipc_path",
"main_pid"]
"main_pid"]
# Check format of config
check_passed, config_reduced = helpers.check_config(required_params,
......@@ -57,8 +56,9 @@ class DataFetcher(DataFetcherBase):
self.transfer = Transfer("STREAM", use_log=log_queue)
self.transfer.start([self.config["ipc_path"],
"{0}_{1}".format(self.config["main_pid"], "out")],
protocol="ipc", data_con_style="connect")
"{0}_{1}".format(self.config["main_pid"],
"out")],
protocol="ipc", data_con_style="connect")
# enable status check requests from any sender
self.transfer.setopt("status_check",
......@@ -320,9 +320,10 @@ if __name__ == '__main__':
"chunk_number": 0,
}
# targets = [['{0}:{1}'.format(ext_ip, receiving_port), 1, [".cbf"], "data"]]
targets = [['{0}:{1}'.format(ext_ip, receiving_port), 1, [".cbf"], "data"],
['{0}:{1}'.format(ext_ip, receiving_port2), 1, [".cbf"], "data"]]
targets = [
['{0}:{1}'.format(ext_ip, receiving_port), 1, [".cbf"], "data"],
['{0}:{1}'.format(ext_ip, receiving_port2), 1, [".cbf"], "data"]
]
open_connections = dict()
......
......@@ -259,8 +259,8 @@ class DataFetcher(DataFetcherBase):
file_id = self.generate_file_id(metadata)
self.cleaner_job_socket.send_multipart(
[metadata["source_path"].encode("utf-8"),
file_id.encode("utf-8")])
[metadata["source_path"].encode("utf-8"),
file_id.encode("utf-8")])
self.log.debug("Forwarded to cleaner {0}".format(file_id))
def finish_without_cleaner(self, targets, metadata, open_connections):
......@@ -382,7 +382,6 @@ if __name__ == '__main__':
receiving_port2 = "6006"
dataFwPort = "50010"
receiving_socket = context.socket(zmq.PULL)
connection_str = "tcp://{0}:{1}".format(ext_ip, receiving_port)
receiving_socket.bind(connection_str)
......
......@@ -662,7 +662,6 @@ class DataManager():
self.socket_reconnected = False
return False
# test if someone picks up the test message in the next
# 2 sec
if not tracker.done:
......@@ -690,7 +689,6 @@ class DataManager():
if enable_logging:
self.log.debug("Received responce: {0}".format(status))
# responce to test message was successfully received
# TODO check status + react
if status[0] == b"ERROR":
......@@ -859,15 +857,16 @@ class DataManager():
### Cleaner ###
if self.use_cleaner:
self.log.info("Loading cleaner from data fetcher module: {0}"
.format(self.params["data_fetcher_type"]))
.format(self.params["data_fetcher_type"]))
self.cleaner_m = __import__(self.params["data_fetcher_type"])
self.cleaner_pr = Process(target=self.cleaner_m.Cleaner,
args=(self.params,
self.log_queue,
self.params["cleaner_job_con_str"],
self.params["cleaner_conf_con_str"],
self.control_sub_con_str))
self.cleaner_pr = Process(
target=self.cleaner_m.Cleaner,
args=(self.params,
self.log_queue,
self.params["cleaner_job_con_str"],
self.params["cleaner_conf_con_str"],
self.control_sub_con_str))
self.cleaner_pr.start()
self.log.info("Configured Type of data fetcher: {0}"
......@@ -894,16 +893,16 @@ class DataManager():
sleep_was_sent = False
if self.use_cleaner:
run_loop = (self.signalhandler_pr.is_alive() and \
self.taskprovider_pr.is_alive() and \
self.cleaner_pr.is_alive() and \
all(datadispatcher.is_alive()
for datadispatcher in self.datadispatcher_pr))
run_loop = (self.signalhandler_pr.is_alive()
and self.taskprovider_pr.is_alive()
and self.cleaner_pr.is_alive()
and all(datadispatcher.is_alive()
for datadispatcher in self.datadispatcher_pr))
else:
run_loop = (self.signalhandler_pr.is_alive() and \
self.taskprovider_pr.is_alive() and \
all(datadispatcher.is_alive()
for datadispatcher in self.datadispatcher_pr))
run_loop = (self.signalhandler_pr.is_alive()
and self.taskprovider_pr.is_alive()
and all(datadispatcher.is_alive()
for datadispatcher in self.datadispatcher_pr))
while run_loop:
......@@ -934,16 +933,18 @@ class DataManager():
time.sleep(1)
if self.use_cleaner:
run_loop = (self.signalhandler_pr.is_alive() and \
self.taskprovider_pr.is_alive() and \
self.cleaner_pr.is_alive() and \
all(datadispatcher.is_alive()
for datadispatcher in self.datadispatcher_pr))
run_loop = (self.signalhandler_pr.is_alive()
and self.taskprovider_pr.is_alive()
and self.cleaner_pr.is_alive()
and all(datadispatcher.is_alive()
for datadispatcher
in self.datadispatcher_pr))
else:
run_loop = (self.signalhandler_pr.is_alive() and \
self.taskprovider_pr.is_alive() and \
all(datadispatcher.is_alive()
for datadispatcher in self.datadispatcher_pr))
run_loop = (self.signalhandler_pr.is_alive()
and self.taskprovider_pr.is_alive()
and all(datadispatcher.is_alive()
for datadispatcher
in self.datadispatcher_pr))
# notify which subprocess terminated
if not self.signalhandler_pr.is_alive():
......
......@@ -2,7 +2,6 @@ from __future__ import print_function
from __future__ import unicode_literals
import os
import sys
import time
import requests
import collections
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment