Commit 820a34b8 authored by Manuela Kuhn's avatar Manuela Kuhn

Merge branch 'release-3.0.1'

parents 9822e8d0 e39c2ed2
[bumpversion]
current_version = 3.0.0
current_version = 3.0.1
tag = False
commit = False
files = freeze_setup.py src/APIs/hidra/_version.py src/shared/_version.py
......
[bumpversion]
current_version = 3.0.0
current_version = 3.0.1
tag = False
commit = False
......
HiDRA develop
HiDRA 3.0.1
- Added option to get configured settings to hidra control client
- The connection list is now contained in a seperate module
HiDRA 3.0.0
- Automatic bump versioning
......
......@@ -35,7 +35,7 @@ if platform.system() == "Windows":
os.path.join("datafetchers", "zmq_fetcher.py")),
(os.path.join(senderpath, "datafetchers", "send_helpers.py"),
os.path.join("datafetchers", "send_helpers.py"))
]
]
else:
libzmq_path = "/usr/local/lib/python2.7/dist-packages/zmq"
......@@ -60,7 +60,7 @@ else:
"zmq_fetcher.py"),
(os.path.join(senderpath, "datafetchers", "send_helpers.py"),
"send_helpers.py")
]
]
dist = platform.dist()
if dist[0].lower() == "suse" and dist[1].startswith("10"):
......@@ -93,7 +93,7 @@ buildOptions = {
(os.path.join(sharedpath, "helpers.py"), "helpers.py"),
(os.path.join(sharedpath, "_version.py"), "_version.py"),
(confpath, "conf"),
] + platform_specific_files,
] + platform_specific_files,
}
executables = [
......@@ -101,7 +101,7 @@ executables = [
]
setup(name='HiDRA',
version='3.0.0',
version='3.0.1',
description='',
options={"build_exe": buildOptions},
executables=executables
......
Name: hidra
Version: 3.0.0
Version: 3.0.1
Release: 1%{?dist}
Summary: High performance data multiplexing tool
......@@ -79,6 +79,8 @@ mkdir -p %{buildroot}/opt/%{name}/logs
%attr(1777,root,root) /opt/%{name}/logs
%changelog
Fri Dec 16 2016 Manuela Kuhn <manuela.kuhn@desy.de> - 3.0.1-1
Bump version
Wed Dec 14 2016 Manuela Kuhn <manuela.kuhn@desy.de> - 3.0.0-1
Bump version
* Tue Nov 22 2016 Stefan Dietrich <stefan.dietrich@desy.de> - 2.4.2-1
......
[flake8]
exclude = .git,./test/codingTests, ./src/shared/logutils/, ./src/sender/eventdetectors/inotifyx-0.2.2/
ignore = W503
from __future__ import absolute_import
from .transfer import Transfer
from .ingest import Ingest
from .control import Control
from .transfer import Transfer # noqa F401
from .ingest import Ingest # noqa F401
from .control import Control # noqa F401
from .control import check_netgroup
from ._version import __version__
from ._constants import connection_list
__all__ = ['transfer', 'control', "ingest", "check_netgroup", "__version__"]
__all__ = ["Transfer", "Control", "Ingest", "check_netgroup", "__version__",
"connection_list"]
connection_list = {
"p00": {
"host": "zitpcx22614w",
# "host": "asap3-p00",
"port": 51000
},
"p01": {
"host": "asap3-bl-prx07",
"port": 51001
},
"p02.1": {
"host": "asap3-bl-prx07",
"port": 51002
},
"p02.2": {
"host": "asap3-bl-prx07",
"port": 51003
},
"p03": {
"host": "asap3-bl-prx07",
"port": 51004
},
"p04": {
"host": "asap3-bl-prx07",
"port": 51005
},
"p05": {
"host": "asap3-bl-prx07",
"port": 51006
},
"p06": {
"host": "asap3-bl-prx07",
"port": 51007
},
"p07": {
"host": "asap3-bl-prx07",
"port": 51008
},
"p08": {
"host": "asap3-bl-prx07",
"port": 51009
},
"p09": {
"host": "asap3-bl-prx07",
"port": 51010
},
"p10": {
"host": "asap3-bl-prx07",
"port": 51011
},
"p11": {
"host": "asap3-bl-prx07",
"port": 51012
},
}
__version__ = b'3.0.0'
__version__ = b'3.0.1'
# API to communicate with a data transfer unit
from __future__ import print_function
#from __future__ import unicode_literals
# from __future__ import unicode_literals
from __future__ import absolute_import
import socket
......@@ -11,8 +11,11 @@ import sys
import traceback
import subprocess
import re
import json
# from ._version import __version__
from ._constants import connection_list
from ._version import __version__
class LoggingFunction:
def out(self, x, exc_info=None):
......@@ -69,62 +72,6 @@ class CommunicationFailed(Exception):
pass
connection_list = {
"p00": {
"host": "asap3-p00",
"port": 51000
},
"p01": {
"host": "asap3-bl-prx07",
"port": 51001
},
"p02.1": {
"host": "asap3-bl-prx07",
"port": 51002
},
"p02.2": {
"host": "asap3-bl-prx07",
"port": 51003
},
"p03": {
"host": "asap3-bl-prx07",
"port": 51004
},
"p04": {
"host": "asap3-bl-prx07",
"port": 51005
},
"p05": {
"host": "asap3-bl-prx07",
"port": 51006
},
"p06": {
"host": "asap3-bl-prx07",
"port": 51007
},
"p07": {
"host": "asap3-bl-prx07",
"port": 51008
},
"p08": {
"host": "asap3-bl-prx07",
"port": 51009
},
"p09": {
"host": "asap3-bl-prx07",
"port": 51010
},
"p10": {
"host": "asap3-bl-prx07",
"port": 51011
},
"p11": {
"host": "asap3-bl-prx07",
"port": 51012
},
}
def excecute_ldapsearch(ldap_cn):
p = subprocess.Popen(
......@@ -133,19 +80,21 @@ def excecute_ldapsearch(ldap_cn):
"-H ldap://it-ldap-slave.desy.de:1389",
"cn=" + ldap_cn, "-LLL"],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
lines = p.stdout.readlines()
matchHost = re.compile(r'nisNetgroupTriple: [(]([\w|\S|.]+),.*,[)]',
re.M | re.I)
match_host = re.compile(r'nisNetgroupTriple: [(]([\w|\S|.]+),.*,[)]',
re.M | re.I)
netgroup = []
for line in lines:
if matchHost.match(line):
if matchHost.match(line).group(1) not in netgroup:
netgroup.append(matchHost.match(line).group(1))
if match_host.match(line):
if match_host.match(line).group(1) not in netgroup:
netgroup.append(match_host.match(line).group(1))
return netgroup
def check_netgroup(hostname, beamline, log=None):
if log is None:
log = NoLoggingFunction()
......@@ -167,9 +116,9 @@ def check_netgroup(hostname, beamline, log=None):
"beamline {1}".format(hostname, beamline))
sys.exit(1)
class Control():
def __init__(self, beamline, use_log=False):
global connection_list
if use_log:
self.log = logging.getLogger("Control")
......@@ -214,7 +163,7 @@ class Control():
reply = self.signal_socket.recv(1024)
self.log.debug("recv (len %2d): %s " % (len(reply), reply))
return reply
return json.loads(reply)
def set(self, attribute, *value):
value = list(value)
......@@ -223,8 +172,8 @@ class Control():
if type(value[0]) == list:
value = [item for sublist in value for item in sublist]
if attribute == "eigerip":
check_netgroup(value, self.beamline, self.log)
if attribute == "eiger_ip":
check_netgroup(value[0], self.beamline, self.log)
if attribute == "whitelist":
msg = 'set {0} {1}'.format(attribute, value)
......
......@@ -13,7 +13,8 @@ import json
import tempfile
import socket
from ._version import __version__
# from ._version import __version__
class LoggingFunction:
def out(self, x, exc_info=None):
......@@ -104,8 +105,9 @@ class Ingest():
self.log.info("Using tcp for internal communication.")
self.eventdet_con_id = "tcp://{0}:{1}".format(self.localhost,
self.event_det_port)
self.datafetch_con_id = "tcp://{0}:{1}".format(self.localhost,
self.data_fetch_port)
self.datafetch_con_id = ("tcp://{0}:{1}"
.format(self.localhost,
self.data_fetch_port))
else:
self.log.info("Using ipc for internal communication.")
self.eventdet_con_id = "ipc://{0}/{1}".format(self.ipc_path,
......@@ -212,7 +214,7 @@ class Ingest():
"filename": self.filename,
"filepart": self.filepart,
"chunksize": len(data)
}
}
# message = ('{ "filepart": {0}, "filename": "{1}" }'
# .format(self.filepart, self.filename))
message = json.dumps(message).encode("utf-8")
......
This diff is collapsed.
......@@ -41,21 +41,22 @@ def argument_parsing():
parser.add_argument("--eigerapi",
type=str,
default="1.5.0",
help="API version of the Eiger detector (default: 1.5.0)")
help="API version of the Eiger detector "
"(default: 1.5.0)")
parser.add_argument("--start",
help="Starts the HiDRA Server for the Eiger detector",
help="Starts the HiDRA server for the Eiger detector",
action="store_true")
# parser.add_argument("--restart",
# help="Restarts the HiDRA Server for the Eiger "
# "detector",
# action="store_true")
parser.add_argument("--status",
help="Displays the Status of the HiDRA Server for "
help="Displays the status of the HiDRA server for "
"the Eiger detector",
action="store_true")
parser.add_argument("--stop",
help="Stops the HiDRA Server for the Eiger detector",
help="Stops the HiDRA server for the Eiger detector",
action="store_true")
parser.add_argument("--target",
type=str,
......@@ -69,6 +70,10 @@ def argument_parsing():
parser.add_argument("--version",
help="Displays the used hidra_control version",
action="store_true")
parser.add_argument("--getsettings",
help="Displays the settings of the HiDRA Server for "
"the Eiger detector",
action="store_true")
return parser
......@@ -86,7 +91,6 @@ if __name__ == '__main__':
print ("Hidra version: {0}".format(hidra.__version__))
sys.exit(0)
beamline = arguments.beamline
supported_targets = ["current/raw",
"current/scratch_bl",
......@@ -127,5 +131,26 @@ if __name__ == '__main__':
elif arguments.stop:
print ("Stopping HiDRA for Eiger:", obj.do("stop"))
elif arguments.getsettings:
if obj.do("status") == "RUNNING":
print ("Configured settings:")
print ("Data is written to: {0}"
.format(obj.get("local_target")))
print ("Eiger IP: {0}"
.format(obj.get("eiger_ip")))
print ("Eiger API version: {0}"
.format(obj.get("eiger_api_version")))
print ("History size: {0}"
.format(obj.get("history_size")))
print ("Store data: {0}"
.format(obj.get("store_data")))
print ("Remove data from the Eiger: {0}"
.format(obj.get("remove_data")))
print ("Whitelist: {0}"
.format(obj.get("whitelist")))
else:
print ("HiDRA is not running")
finally:
obj.stop()
This diff is collapsed.
......@@ -30,7 +30,7 @@ if SHARED_PATH not in sys.path:
try:
# search in global python modules first
from hidra import Transfer
from hidra import Transfer # noqa F401
except:
# then search in local modules
if API_PATH not in sys.path:
......
......@@ -196,7 +196,7 @@ class DataDispatcher():
.send_multipart(workload,
copy=False,
track=True)
)
)
self.log.info("Sending close file signal to "
"'{0}' with priority 0"
.format(self.fixed_stream_id))
......@@ -213,7 +213,7 @@ class DataDispatcher():
# register socket
self.open_connections[self.fixed_stream_id] = (
socket
)
)
# send data
tracker = (
......@@ -221,7 +221,7 @@ class DataDispatcher():
.send_multipart(workload,
copy=False,
track=True)
)
)
self.log.info("Sending close file signal to "
"'{0}' with priority 0"
.format(fixed_stream_id))
......@@ -258,7 +258,7 @@ class DataDispatcher():
self.datafetcher.get_metadata(
self.log, self.config, targets, workload,
self.chunksize, self.local_target)
)
)
except:
self.log.error("Building of metadata dictionary failed "
......@@ -426,7 +426,7 @@ if __name__ == '__main__':
"fix_subdirs": ["commissioning", "current", "local"],
"store_data": False,
"remove_data": False
}
}
context = zmq.Context.instance()
......@@ -457,7 +457,7 @@ if __name__ == '__main__':
"source_path": os.path.join(BASE_PATH, "data", "source"),
"relative_path": "local",
"filename": "100.cbf"
}
}
targets = [['localhost:6005', 1, [".cbf"], "data"],
['localhost:6006', 0, [".cbf"], "data"]]
......
......@@ -8,7 +8,7 @@ import json
import shutil
import errno
from send_helpers import __send_to_targets, DataHandlingError
from send_helpers import send_to_targets, DataHandlingError
import helpers
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
......@@ -127,7 +127,7 @@ def send_data(log, targets, source_file, target_file, metadata,
chunksize = metadata["chunksize"]
chunk_number = 0
sendError = False
send_error = False
# reading source file into memory
try:
......@@ -163,13 +163,13 @@ def send_data(log, targets, source_file, target_file, metadata,
# send message to data targets
try:
__send_to_targets(log, targets_data, source_file, target_file,
open_connections, None, chunk_payload, context)
send_to_targets(log, targets_data, source_file, target_file,
open_connections, None, chunk_payload, context)
except DataHandlingError:
log.error("Unable to send multipart-message for file '{0}' "
"(chunk {1})".format(source_file, chunk_number),
exc_info=True)
sendError = True
send_error = True
except:
log.error("Unable to send multipart-message for file '{0}' "
"(chunk {1})".format(source_file, chunk_number),
......@@ -186,12 +186,12 @@ def send_data(log, targets, source_file, target_file, metadata,
exc_info=True)
raise
if not sendError:
if not send_error:
config["remove_flag"] = True
def __datahandling(log, source_file, target_file, action_function, metadata,
config):
def _datahandling(log, source_file, target_file, action_function, metadata,
config):
try:
action_function(source_file, target_file)
except IOError as e:
......@@ -251,8 +251,8 @@ def finish_datahandling(log, targets, source_file, target_file, metadata,
# move file
try:
__datahandling(log, source_file, target_file, shutil.move,
metadata, config)
_datahandling(log, source_file, target_file, shutil.move,
metadata, config)
log.info("Moving file '{0}' ...success.".format(source_file))
except:
log.error("Could not move file {0} to {1}"
......@@ -264,8 +264,8 @@ def finish_datahandling(log, targets, source_file, target_file, metadata,
# copy file
# (does not preserve file owner, group or ACLs)
try:
__datahandling(log, source_file, target_file, shutil.copy,
metadata, config)
_datahandling(log, source_file, target_file, shutil.copy,
metadata, config)
log.info("Copying file '{0}' ...success.".format(source_file))
except:
return
......@@ -284,9 +284,9 @@ def finish_datahandling(log, targets, source_file, target_file, metadata,
# send message to metadata targets
if targets_metadata:
try:
__send_to_targets(log, targets_metadata, source_file, target_file,
open_connections, metadata, None, context,
config["send_timeout"])
send_to_targets(log, targets_metadata, source_file, target_file,
open_connections, metadata, None, context,
config["send_timeout"])
log.debug("Passing metadata multipart-message for file {0}...done."
.format(source_file))
......@@ -348,7 +348,7 @@ if __name__ == '__main__':
"source_path": os.path.join(BASE_PATH, "data", "source"),
"relative_path": os.sep + "local",
"filename": "100.cbf"
}
}
targets = [['localhost:{0}'.format(receiving_port), 1, [".cbf"], "data"],
['localhost:{0}'.format(receiving_port2), 0, [".cbf"], "data"]]
......@@ -360,7 +360,7 @@ if __name__ == '__main__':
"fix_subdirs": ["commissioning", "current", "local"],
"store_data": False,
"remove_data": False
}
}
logging.debug("open_connections before function call: {0}"
.format(open_connections))
......
......@@ -9,7 +9,7 @@ import requests
import time
import errno
from send_helpers import __send_to_targets
from send_helpers import send_to_targets
import helpers
__author__ = ('Manuela Kuhn <manuela.kuhn@desy.de>',
......@@ -193,9 +193,9 @@ def send_data(log, targets, source_file, target_file, metadata,
# send message to data targets
try:
__send_to_targets(log, targets_data, source_file, target_file,
open_connections, metadata_extended, payload,
context)
send_to_targets(log, targets_data, source_file, target_file,
open_connections, metadata_extended, payload,
context)
log.debug("Passing multipart-message for file {0}...done."
.format(source_file))
......@@ -222,9 +222,9 @@ def send_data(log, targets, source_file, target_file, metadata,
# send message to metadata targets
try:
__send_to_targets(log, targets_metadata, source_file, target_file,
open_connections, metadata_extended, payload,
context)
send_to_targets(log, targets_metadata, source_file, target_file,
open_connections, metadata_extended, payload,
context)
log.debug("Passing metadata multipart-message for file '{0}'"
"...done.".format(source_file))
......@@ -310,7 +310,7 @@ if __name__ == '__main__':
"source_path": "http://131.169.55.170/test_httpget/data",
"relative_path": "",
"filename": "test_file.cbf"
}
}
targets = [['localhost:{0}'.format(receiving_port), 1, [".cbf", ".tif"],
"data"],
['localhost:{0}'.format(receiving_port2), 1, [".cbf", ".tif"],
......@@ -325,7 +325,7 @@ if __name__ == '__main__':
"fix_subdirs": ["commissioning", "current", "local"],
"store_data": True,
"remove_data": False
}
}
setup(logging, config)
......
......@@ -10,8 +10,8 @@ class DataHandlingError(Exception):
pass
def __send_to_targets(log, targets, source_file, target_file, open_connections,
metadata, payload, context, timeout=-1):
def send_to_targets(log, targets, source_file, target_file, open_connections,
metadata, payload, context, timeout=-1):
for target, prio, suffixes, send_type in targets:
......
......@@ -7,7 +7,7 @@ import logging
import json
import time
from send_helpers import __send_to_targets
from send_helpers import send_to_targets
import helpers
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
......@@ -134,9 +134,9 @@ def send_data(log, targets, source_file, target_file, metadata,
# send message
try:
__send_to_targets(log, targets, source_file, target_file,
open_connections, metadata_extended, payload,
context)
send_to_targets(log, targets, source_file, target_file,
open_connections, metadata_extended, payload,
context)
log.debug("Passing multipart-message for file '{0}'...done."
.format(source_file))
except:
......@@ -218,7 +218,7 @@ if __name__ == '__main__':
"source_path": os.path.join(BASE_PATH, "data", "source"),
"relative_path": os.sep + "local" + os.sep + "raw",
"filename": "100.cbf"
}
}
targets = [['localhost:{0}'.format(receiving_port), 1, [".cbf", ".tif"],
"data"],
['localhost:{0}'.format(receiving_port2), 0, [".cbf", ".tif"],
......@@ -232,7 +232,7 @@ if __name__ == '__main__':
"type": "getFromZmq",
"context": context,
"data_fetch_con_str": data_fetch_con_str
}
}
logging.debug("open_connections before function call: {0}"
.format(open_connections))
......
......@@ -568,8 +568,7 @@ class DataManager():
self.request_fw_con_id,
self.request_con_id,
self.log_queue,
self.context
)
self.context)
)
self.signalhandler_pr.start()
......@@ -586,8 +585,7 @@ class DataManager():
self.control_sub_con_id,
self.request_fw_con_id,
self.router_con_id,
self.log_queue
)
self.log_queue)
)
self.taskprovider_pr.start()
......@@ -605,8 +603,7 @@ class DataManager():
self.fixed_stream_id,
self.params,
self.log_queue,
self.local_target
)
self.local_target)
)
pr.start()
self.datadispatcher_pr.append(pr)
......
......@@ -121,7 +121,7 @@ class EventDetector():
"source_path": "http://{0}/data".format(self.eiger_ip),
"relative_path": relative_path,
"filename": filename
}
}
self.log.debug("event_message {0}".format(event_message))
event_message_list.append(event_message)
self.files_downloaded.append(file)
......@@ -173,7 +173,7 @@ if __name__ == '__main__':
"eiger_ip": eiger_ip,
"eiger_api_version": eiger_api_version,
"history_size": 1000
}
}
eventdetector = EventDetector(config, log_queue)
......
......@@ -118,7 +118,7 @@ def get_event_message(path, filename, paths):
"source_path": parent_dir,
"relative_path": relative_path,
"filename": filename
}
}
return event_message
......@@ -570,7 +570,7 @@ if __name__ == '__main__':
"use_cleanup": False,
"time_till_closed": 5,
"action_time": 120
}
}