Skip to content

Commit

Permalink
Remove caches for the KATalogus in the scheduler
Browse files Browse the repository at this point in the history
Add filter parameter in the KATalogus Plugins API to allow filtering on what the plugin consumes and produces to support it
  • Loading branch information
Donnype committed Feb 12, 2025
1 parent 74efaff commit 0c832ec
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 158 deletions.
6 changes: 6 additions & 0 deletions boefjes/boefjes/katalogus/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def list_plugins(
# filter plugins by scan level for boefje plugins
plugins = [plugin for plugin in plugins if plugin.type != "boefje" or plugin.scan_level >= filter_params.scan_level]

if filter_params.consumes is not None:
plugins = [plugin for plugin in plugins if filter_params.consumes.issubset(set(plugin.consumes))]

if filter_params.produces is not None:
plugins = [plugin for plugin in plugins if filter_params.produces.issubset(set(plugin.produces))]

if pagination_params.limit is None:
return plugins[pagination_params.offset :]

Expand Down
2 changes: 2 additions & 0 deletions boefjes/boefjes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class FilterParameters(BaseModel):
q: str | None = None
type: Literal["boefje", "normalizer", "bit"] | None = None
ids: list[str] | None = None
consumes: set[str] | None = None
produces: set[str] | None = None
state: bool | None = None
scan_level: int = 0
oci_image: str | None = None
6 changes: 0 additions & 6 deletions mula/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,6 @@ def monitor_organisations(self) -> None:
scheduler_boefje.run()
scheduler_report.run()

if additions:
# Flush katalogus caches when new organisations are added
self.ctx.services.katalogus.flush_caches()

self.logger.debug("Added %s organisations to scheduler", len(additions), additions=sorted(additions))

@tracer.start_as_current_span("collect_metrics")
def collect_metrics(self) -> None:
"""Collect application metrics
Expand Down
168 changes: 20 additions & 148 deletions mula/scheduler/clients/http/external/katalogus.py
Original file line number Diff line number Diff line change
@@ -1,136 +1,26 @@
import threading

import httpx
from pydantic import TypeAdapter

from scheduler.clients.errors import exception_handler
from scheduler.clients.http import HTTPService
from scheduler.models import Boefje, Organisation, Plugin
from scheduler.utils import dict_utils


class Katalogus(HTTPService):
"""A class that provides methods to interact with the Katalogus API."""

name = "katalogus"

def __init__(self, host: str, source: str, timeout: int, pool_connections: int, cache_ttl: int = 30):
def __init__(self, host: str, source: str, timeout: int, pool_connections: int):
super().__init__(host, source, timeout, pool_connections)

# For every organisation we cache its plugins, it references the
# plugin-id as key and the plugin as value.
self.plugin_cache_lock = threading.Lock()
self.plugin_cache = dict_utils.ExpiringDict(lifetime=cache_ttl)

# For every organisation we cache on which type of object (consumes)
# the boefjes consume, it references the object type (consumes)
# as the key and a dict of boefjes as value.
self.boefje_cache_lock = threading.Lock()
self.boefje_cache = dict_utils.ExpiringDict(lifetime=cache_ttl)

# For every organisation we cache on which type of object (consumes)
# the normalizers consume, it references the object type (consumes)
# as the key and a dict of normalizers as value.
self.normalizer_cache_lock = threading.Lock()
self.normalizer_cache = dict_utils.ExpiringDict(lifetime=cache_ttl)

# For every organisation we cache which new boefjes for an organisation
# have been enabled.
self.new_boefjes_cache_lock = threading.Lock()
self.new_boefjes_cache: dict = {}

# Initialise the cache.
self.flush_caches()

def flush_caches(self) -> None:
self.flush_plugin_cache()
self.flush_normalizer_cache()
self.flush_boefje_cache()

def flush_plugin_cache(self) -> None:
self.logger.debug("Flushing the katalogus plugin cache for organisations")

with self.plugin_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.plugin_cache.expiration_enabled = False
self.plugin_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.plugin_cache.setdefault(org.id, {})

plugins = self.get_plugins_by_organisation(org.id)
self.plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

self.plugin_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus plugin cache for organisations")

def flush_boefje_cache(self) -> None:
"""boefje.consumes -> plugin type boefje"""
self.logger.debug("Flushing the katalogus boefje type cache for organisations")

with self.boefje_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.boefje_cache.expiration_enabled = False
self.boefje_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.boefje_cache[org.id] = {}

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "boefje":
continue

if plugin.enabled is False:
continue

if not plugin.consumes:
continue

# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
self.boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue

for type_ in plugin.consumes:
self.boefje_cache[org.id].setdefault(type_, []).append(plugin)

self.boefje_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus boefje type cache for organisations")

def flush_normalizer_cache(self) -> None:
"""normalizer.consumes -> plugin type normalizer"""
self.logger.debug("Flushing the katalogus normalizer type cache for organisations")

with self.normalizer_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.normalizer_cache.expiration_enabled = False
self.normalizer_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.normalizer_cache[org.id] = {}

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "normalizer":
continue

if plugin.enabled is False:
continue

if not plugin.consumes:
continue

for type_ in plugin.consumes:
self.normalizer_cache[org.id].setdefault(type_, []).append(plugin)

self.normalizer_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus normalizer type cache for organisations")

@exception_handler
def get_boefjes(self) -> list[Boefje]:
url = f"{self.host}/boefjes"
Expand Down Expand Up @@ -186,49 +76,31 @@ def get_plugins_by_organisation(self, organisation_id: str) -> list[Plugin]:
return []
raise

def get_plugins_by_org_id(self, organisation_id: str) -> list[Plugin]:
def _get_from_cache() -> list[Plugin]:
with self.plugin_cache_lock:
return dict_utils.deep_get(self.plugin_cache, [organisation_id])
@exception_handler
def get_plugin_by_id_and_org_id(self, plugin_id: str, organisation_id: str) -> Plugin | None:
url = f"{self.host}/v1/organisations/{organisation_id}/plugins/{plugin_id}"

try:
return _get_from_cache()
except dict_utils.ExpiredError:
self.flush_plugin_cache()
return _get_from_cache()

def get_plugin_by_id_and_org_id(self, plugin_id: str, organisation_id: str) -> Plugin:
def _get_from_cache() -> Plugin:
with self.plugin_cache_lock:
return dict_utils.deep_get(self.plugin_cache, [organisation_id, plugin_id])
response = self.get(url)
return Plugin(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == httpx.codes.NOT_FOUND:
return None

try:
return _get_from_cache()
except dict_utils.ExpiredError:
self.flush_plugin_cache()
return _get_from_cache()
raise

def get_boefjes_by_type_and_org_id(self, boefje_type: str, organisation_id: str) -> list[Plugin]:
def _get_from_cache() -> list[Plugin]:
with self.boefje_cache_lock:
return dict_utils.deep_get(self.boefje_cache, [organisation_id, boefje_type])
@exception_handler
def get_boefjes_by_type_and_org_id(self, ooi_type: str, organisation_id: str) -> list[Plugin]:
url = f"{self.host}/v1/organisations/{organisation_id}/plugins/"
response = self.get(url, params={"type": "boefje", "consumes": [ooi_type]})

try:
return _get_from_cache()
except dict_utils.ExpiredError:
self.flush_boefje_cache()
return _get_from_cache()
return TypeAdapter(list[Plugin]).validate_python(**response.json())

def get_normalizers_by_org_id_and_type(self, organisation_id: str, normalizer_type: str) -> list[Plugin]:
def _get_from_cache() -> list[Plugin]:
with self.normalizer_cache_lock:
return dict_utils.deep_get(self.normalizer_cache, [organisation_id, normalizer_type])
def get_normalizers_by_org_id_and_type(self, organisation_id: str, ooi_type: str) -> list[Plugin]:
url = f"{self.host}/v1/organisations/{organisation_id}/plugins/"
response = self.get(url, params={"type": "normalizer", "produces": [ooi_type]})

try:
return _get_from_cache()
except dict_utils.ExpiredError:
self.flush_normalizer_cache()
return _get_from_cache()
return TypeAdapter(list[Plugin]).validate_python(**response.json())

def get_new_boefjes_by_org_id(self, organisation_id: str) -> list[Plugin]:
with self.new_boefjes_cache_lock:
Expand Down
2 changes: 0 additions & 2 deletions mula/scheduler/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ class Settings(BaseSettings):
10, description="The maximum number of connections to save in the pool for the octopoes api"
)

katalogus_cache_ttl: int = Field(30, description="The lifetime of the katalogus cache in seconds")

katalogus_request_timeout: int = Field(
10, description="The timeout in seconds for the requests to the katalogus api"
)
Expand Down
2 changes: 0 additions & 2 deletions mula/scheduler/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def __init__(self) -> None:
source=f"scheduler/{scheduler.__version__}",
timeout=self.config.katalogus_request_timeout,
pool_connections=self.config.katalogus_pool_connections,
cache_ttl=self.config.katalogus_cache_ttl,
)

bytes_service = clients.Bytes(
Expand Down Expand Up @@ -184,7 +183,6 @@ def __init__(self) -> None:
"pq_maxsize": str(self.config.pq_maxsize),
"pq_grace_period": str(self.config.pq_grace_period),
"pq_max_random_objects": str(self.config.pq_max_random_objects),
"katalogus_cache_ttl": str(self.config.katalogus_cache_ttl),
"monitor_organisations_interval": str(self.config.monitor_organisations_interval),
}
)
Expand Down

0 comments on commit 0c832ec

Please sign in to comment.