From 927d6a6de3c3de46687a8b203babfa46f99d1c35 Mon Sep 17 00:00:00 2001 From: Matteo Piano Date: Tue, 25 Jan 2022 03:51:41 -0800 Subject: [PATCH 1/2] restart if probe added/removed --- pidtree_bcc/config.py | 50 ++++++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/pidtree_bcc/config.py b/pidtree_bcc/config.py index 26ad188..01ee7b0 100644 --- a/pidtree_bcc/config.py +++ b/pidtree_bcc/config.py @@ -2,10 +2,13 @@ from functools import partial from multiprocessing import SimpleQueue from typing import Generator +from typing import Iterable from typing import Optional from typing import Tuple +import staticconf.config import yaml +from staticconf.config import ConfigNamespace from staticconf.config import ConfigurationWatcher from staticconf.config import DEFAULT as DEFAULT_NAMESPACE from staticconf.config import get_namespace @@ -32,6 +35,28 @@ def _non_hotswap_settings(config_data: dict) -> dict: } +def _get_probe_namespaces() -> Generator[ConfigNamespace, None, None]: + """ Enumerate probe configuration namespaces """ + for namespace in get_namespaces_from_names(None, all_names=True): + if namespace.name not in (DEFAULT_NAMESPACE, HOTSWAP_CALLBACK_NAMESPACE.name): + yield namespace + + +def _clear_and_restart(): + """ Clear staticconf namespaces and restart """ + reset_config_state() + self_restart() + + +def _drop_namespaces(names: Iterable[str]): + """ Deletes configuration namespaces from staticconf + + :param Iterable[str] names: namespaces to drop + """ + for name in names: + staticconf.config.configuration_namespaces.pop(name, None) + + def parse_config(config_file: str, watch_config: bool = False): """ Parses yaml config file (if indicated) @@ -40,9 +65,13 @@ def parse_config(config_file: str, watch_config: bool = False): """ with open(config_file) as f: config_data = yaml.safe_load(f) - for key in config_data: - if key.startswith('_'): - continue + config_probe_names = {key for key in config_data if not key.startswith('_')} + current_probe_names = {ns.name for ns in _get_probe_namespaces()} + if watch_config and current_probe_names and config_probe_names != current_probe_names: + # probes added or removed, triggering restart + _drop_namespaces(current_probe_names - config_probe_names) + return _clear_and_restart() + for key in config_probe_names: probe_config = config_data[key] config_namespace = get_namespace(key) current_values = config_namespace.get_config_values().copy() @@ -54,9 +83,7 @@ def parse_config(config_file: str, watch_config: bool = False): is_different = probe_config != current_values if is_different and _non_hotswap_settings(probe_config) != _non_hotswap_settings(current_values): # Non hot-swappable setting changed -> restart - reset_config_state() - self_restart() - return + return _clear_and_restart() elif is_different: # Only hot-swappable settings changed, trigger proble filters reload HOTSWAP_CALLBACK_NAMESPACE[key](probe_config) @@ -93,12 +120,11 @@ def enumerate_probe_configs() -> Generator[Tuple[str, dict, Optional[SimpleQueue :return: tuple of probe name, configuration data, and optionally the queue for change notifications """ - for namespace in get_namespaces_from_names(None, all_names=True): - if namespace.name not in (DEFAULT_NAMESPACE, HOTSWAP_CALLBACK_NAMESPACE.name): - curr_values = namespace.get_config_values().copy() - change_callback = HOTSWAP_CALLBACK_NAMESPACE.get(namespace.name, default=None) - change_queue = change_callback.args[0] if change_callback else None - yield namespace.name, curr_values, change_queue + for namespace in _get_probe_namespaces(): + curr_values = namespace.get_config_values().copy() + change_callback = HOTSWAP_CALLBACK_NAMESPACE.get(namespace.name, default=None) + change_queue = change_callback.args[0] if change_callback else None + yield namespace.name, curr_values, change_queue def reset_config_state(): From 89f9ac0826e77ab6d26456e96874b0339380bf93 Mon Sep 17 00:00:00 2001 From: Matteo Piano Date: Tue, 25 Jan 2022 04:19:58 -0800 Subject: [PATCH 2/2] add mutex to filter reloads and fix horrible classvar mistake --- pidtree_bcc/probes/__init__.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pidtree_bcc/probes/__init__.py b/pidtree_bcc/probes/__init__.py index f82b0ec..0d12b7f 100644 --- a/pidtree_bcc/probes/__init__.py +++ b/pidtree_bcc/probes/__init__.py @@ -5,6 +5,7 @@ import re from datetime import datetime from multiprocessing import SimpleQueue +from threading import Lock from threading import Thread from typing import Any from typing import Mapping @@ -32,10 +33,10 @@ class BPFProbe: In either case the program text will be processed in Jinja templating. """ + # SIDECARS # List of (function, args) tuples to run in parallel with the probes as "sidecars" # No health monitoring is performed on these after launch so they are expect to be # stable or self-healing. - SIDECARS = [] # To be populated by `load_probes` EXTRA_PLUGIN_PATH = None @@ -66,6 +67,7 @@ class variable defining a list of config fields. Set to <= 0 to disable. :param SimpleQueue config_change_queue: queue for passing configuration changes """ + self.SIDECARS = [] probe_config = probe_config if probe_config else {} self.output_queue = output_queue self.validate_config(probe_config) @@ -85,6 +87,7 @@ class variable defining a list of config fields. self.lost_event_telemetry = lost_event_telemetry self.lost_event_timer = lost_event_telemetry self.lost_event_count = 0 + self.net_filter_mutex = Lock() if self.USES_DYNAMIC_FILTERS and config_change_queue: self.SIDECARS.append((self._poll_config_changes, (config_change_queue,))) @@ -176,9 +179,10 @@ def reload_filters(self, is_init: bool = False): :param bool is_init: Indicate this is the first time loading """ - logging.info('[{}] {}oading filters into BPF maps'.format(self.probe_name, 'L' if is_init else 'Rel')) - load_filters_into_map(self.net_filters, self.bpf[self.NET_FILTER_MAP_NAME], not is_init) - load_port_filters_into_map(*self.global_filters, self.bpf[self.PORT_FILTER_MAP_NAME], not is_init) + with self.net_filter_mutex: + logging.info('[{}] {}oading filters into BPF maps'.format(self.probe_name, 'L' if is_init else 'Rel')) + load_filters_into_map(self.net_filters, self.bpf[self.NET_FILTER_MAP_NAME], not is_init) + load_port_filters_into_map(*self.global_filters, self.bpf[self.PORT_FILTER_MAP_NAME], not is_init) def start_polling(self): """ Start infinite loop polling BPF events """