diff --git a/boefjes/boefjes/katalogus/plugins.py b/boefjes/boefjes/katalogus/plugins.py index 6dc26614c50..d4f11793eb4 100644 --- a/boefjes/boefjes/katalogus/plugins.py +++ b/boefjes/boefjes/katalogus/plugins.py @@ -70,6 +70,12 @@ def list_plugins( # filter plugins by scan level for boefje plugins plugins = [plugin for plugin in plugins if plugin.type != "boefje" or plugin.scan_level >= filter_params.scan_level] + if filter_params.consumes is not None: + plugins = [plugin for plugin in plugins if filter_params.consumes.issubset(set(plugin.consumes))] + + if filter_params.produces is not None: + plugins = [plugin for plugin in plugins if filter_params.produces.issubset(set(plugin.produces))] + if pagination_params.limit is None: return plugins[pagination_params.offset :] diff --git a/boefjes/boefjes/models.py b/boefjes/boefjes/models.py index 409661f20c0..55430f22a79 100644 --- a/boefjes/boefjes/models.py +++ b/boefjes/boefjes/models.py @@ -107,6 +107,8 @@ class FilterParameters(BaseModel): q: str | None = None type: Literal["boefje", "normalizer", "bit"] | None = None ids: list[str] | None = None + consumes: set[str] | None = None + produces: set[str] | None = None state: bool | None = None scan_level: int = 0 oci_image: str | None = None diff --git a/mula/scheduler/app.py b/mula/scheduler/app.py index d8770730762..da4550412f4 100644 --- a/mula/scheduler/app.py +++ b/mula/scheduler/app.py @@ -141,12 +141,6 @@ def monitor_organisations(self) -> None: scheduler_boefje.run() scheduler_report.run() - if additions: - # Flush katalogus caches when new organisations are added - self.ctx.services.katalogus.flush_caches() - - self.logger.debug("Added %s organisations to scheduler", len(additions), additions=sorted(additions)) - @tracer.start_as_current_span("collect_metrics") def collect_metrics(self) -> None: """Collect application metrics diff --git a/mula/scheduler/clients/http/external/katalogus.py b/mula/scheduler/clients/http/external/katalogus.py index ba174259de5..1ad57c2e160 100644 --- a/mula/scheduler/clients/http/external/katalogus.py +++ b/mula/scheduler/clients/http/external/katalogus.py @@ -1,11 +1,11 @@ import threading import httpx +from pydantic import TypeAdapter from scheduler.clients.errors import exception_handler from scheduler.clients.http import HTTPService from scheduler.models import Boefje, Organisation, Plugin -from scheduler.utils import dict_utils class Katalogus(HTTPService): @@ -13,124 +13,14 @@ class Katalogus(HTTPService): name = "katalogus" - def __init__(self, host: str, source: str, timeout: int, pool_connections: int, cache_ttl: int = 30): + def __init__(self, host: str, source: str, timeout: int, pool_connections: int): super().__init__(host, source, timeout, pool_connections) - # For every organisation we cache its plugins, it references the - # plugin-id as key and the plugin as value. - self.plugin_cache_lock = threading.Lock() - self.plugin_cache = dict_utils.ExpiringDict(lifetime=cache_ttl) - - # For every organisation we cache on which type of object (consumes) - # the boefjes consume, it references the object type (consumes) - # as the key and a dict of boefjes as value. - self.boefje_cache_lock = threading.Lock() - self.boefje_cache = dict_utils.ExpiringDict(lifetime=cache_ttl) - - # For every organisation we cache on which type of object (consumes) - # the normalizers consume, it references the object type (consumes) - # as the key and a dict of normalizers as value. - self.normalizer_cache_lock = threading.Lock() - self.normalizer_cache = dict_utils.ExpiringDict(lifetime=cache_ttl) - # For every organisation we cache which new boefjes for an organisation # have been enabled. self.new_boefjes_cache_lock = threading.Lock() self.new_boefjes_cache: dict = {} - # Initialise the cache. - self.flush_caches() - - def flush_caches(self) -> None: - self.flush_plugin_cache() - self.flush_normalizer_cache() - self.flush_boefje_cache() - - def flush_plugin_cache(self) -> None: - self.logger.debug("Flushing the katalogus plugin cache for organisations") - - with self.plugin_cache_lock: - # First, we reset the cache, to make sure we won't get any ExpiredError - self.plugin_cache.expiration_enabled = False - self.plugin_cache.reset() - - orgs = self.get_organisations() - for org in orgs: - self.plugin_cache.setdefault(org.id, {}) - - plugins = self.get_plugins_by_organisation(org.id) - self.plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled} - - self.plugin_cache.expiration_enabled = True - - self.logger.debug("Flushed the katalogus plugin cache for organisations") - - def flush_boefje_cache(self) -> None: - """boefje.consumes -> plugin type boefje""" - self.logger.debug("Flushing the katalogus boefje type cache for organisations") - - with self.boefje_cache_lock: - # First, we reset the cache, to make sure we won't get any ExpiredError - self.boefje_cache.expiration_enabled = False - self.boefje_cache.reset() - - orgs = self.get_organisations() - for org in orgs: - self.boefje_cache[org.id] = {} - - for plugin in self.get_plugins_by_organisation(org.id): - if plugin.type != "boefje": - continue - - if plugin.enabled is False: - continue - - if not plugin.consumes: - continue - - # NOTE: backwards compatibility, when it is a boefje the - # consumes field is a string field. - if isinstance(plugin.consumes, str): - self.boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin) - continue - - for type_ in plugin.consumes: - self.boefje_cache[org.id].setdefault(type_, []).append(plugin) - - self.boefje_cache.expiration_enabled = True - - self.logger.debug("Flushed the katalogus boefje type cache for organisations") - - def flush_normalizer_cache(self) -> None: - """normalizer.consumes -> plugin type normalizer""" - self.logger.debug("Flushing the katalogus normalizer type cache for organisations") - - with self.normalizer_cache_lock: - # First, we reset the cache, to make sure we won't get any ExpiredError - self.normalizer_cache.expiration_enabled = False - self.normalizer_cache.reset() - - orgs = self.get_organisations() - for org in orgs: - self.normalizer_cache[org.id] = {} - - for plugin in self.get_plugins_by_organisation(org.id): - if plugin.type != "normalizer": - continue - - if plugin.enabled is False: - continue - - if not plugin.consumes: - continue - - for type_ in plugin.consumes: - self.normalizer_cache[org.id].setdefault(type_, []).append(plugin) - - self.normalizer_cache.expiration_enabled = True - - self.logger.debug("Flushed the katalogus normalizer type cache for organisations") - @exception_handler def get_boefjes(self) -> list[Boefje]: url = f"{self.host}/boefjes" @@ -186,49 +76,31 @@ def get_plugins_by_organisation(self, organisation_id: str) -> list[Plugin]: return [] raise - def get_plugins_by_org_id(self, organisation_id: str) -> list[Plugin]: - def _get_from_cache() -> list[Plugin]: - with self.plugin_cache_lock: - return dict_utils.deep_get(self.plugin_cache, [organisation_id]) + @exception_handler + def get_plugin_by_id_and_org_id(self, plugin_id: str, organisation_id: str) -> Plugin | None: + url = f"{self.host}/v1/organisations/{organisation_id}/plugins/{plugin_id}" try: - return _get_from_cache() - except dict_utils.ExpiredError: - self.flush_plugin_cache() - return _get_from_cache() - - def get_plugin_by_id_and_org_id(self, plugin_id: str, organisation_id: str) -> Plugin: - def _get_from_cache() -> Plugin: - with self.plugin_cache_lock: - return dict_utils.deep_get(self.plugin_cache, [organisation_id, plugin_id]) + response = self.get(url) + return Plugin(**response.json()) + except httpx.HTTPStatusError as e: + if e.response.status_code == httpx.codes.NOT_FOUND: + return None - try: - return _get_from_cache() - except dict_utils.ExpiredError: - self.flush_plugin_cache() - return _get_from_cache() + raise - def get_boefjes_by_type_and_org_id(self, boefje_type: str, organisation_id: str) -> list[Plugin]: - def _get_from_cache() -> list[Plugin]: - with self.boefje_cache_lock: - return dict_utils.deep_get(self.boefje_cache, [organisation_id, boefje_type]) + @exception_handler + def get_boefjes_by_type_and_org_id(self, ooi_type: str, organisation_id: str) -> list[Plugin]: + url = f"{self.host}/v1/organisations/{organisation_id}/plugins/" + response = self.get(url, params={"type": "boefje", "consumes": [ooi_type]}) - try: - return _get_from_cache() - except dict_utils.ExpiredError: - self.flush_boefje_cache() - return _get_from_cache() + return TypeAdapter(list[Plugin]).validate_python(**response.json()) - def get_normalizers_by_org_id_and_type(self, organisation_id: str, normalizer_type: str) -> list[Plugin]: - def _get_from_cache() -> list[Plugin]: - with self.normalizer_cache_lock: - return dict_utils.deep_get(self.normalizer_cache, [organisation_id, normalizer_type]) + def get_normalizers_by_org_id_and_type(self, organisation_id: str, ooi_type: str) -> list[Plugin]: + url = f"{self.host}/v1/organisations/{organisation_id}/plugins/" + response = self.get(url, params={"type": "normalizer", "produces": [ooi_type]}) - try: - return _get_from_cache() - except dict_utils.ExpiredError: - self.flush_normalizer_cache() - return _get_from_cache() + return TypeAdapter(list[Plugin]).validate_python(**response.json()) def get_new_boefjes_by_org_id(self, organisation_id: str) -> list[Plugin]: with self.new_boefjes_cache_lock: diff --git a/mula/scheduler/config/settings.py b/mula/scheduler/config/settings.py index f095350b8bb..8ea3a33869f 100644 --- a/mula/scheduler/config/settings.py +++ b/mula/scheduler/config/settings.py @@ -77,8 +77,6 @@ class Settings(BaseSettings): 10, description="The maximum number of connections to save in the pool for the octopoes api" ) - katalogus_cache_ttl: int = Field(30, description="The lifetime of the katalogus cache in seconds") - katalogus_request_timeout: int = Field( 10, description="The timeout in seconds for the requests to the katalogus api" ) diff --git a/mula/scheduler/context/context.py b/mula/scheduler/context/context.py index 540a86ba545..e30164215b2 100644 --- a/mula/scheduler/context/context.py +++ b/mula/scheduler/context/context.py @@ -124,7 +124,6 @@ def __init__(self) -> None: source=f"scheduler/{scheduler.__version__}", timeout=self.config.katalogus_request_timeout, pool_connections=self.config.katalogus_pool_connections, - cache_ttl=self.config.katalogus_cache_ttl, ) bytes_service = clients.Bytes( @@ -184,7 +183,6 @@ def __init__(self) -> None: "pq_maxsize": str(self.config.pq_maxsize), "pq_grace_period": str(self.config.pq_grace_period), "pq_max_random_objects": str(self.config.pq_max_random_objects), - "katalogus_cache_ttl": str(self.config.katalogus_cache_ttl), "monitor_organisations_interval": str(self.config.monitor_organisations_interval), } )