diff --git a/boefjes/boefjes/plugins/kat_ssl_certificates/normalize.py b/boefjes/boefjes/plugins/kat_ssl_certificates/normalize.py index 8d3daabb0a7..6ab49289ec7 100644 --- a/boefjes/boefjes/plugins/kat_ssl_certificates/normalize.py +++ b/boefjes/boefjes/plugins/kat_ssl_certificates/normalize.py @@ -9,7 +9,7 @@ from cryptography.hazmat.primitives.asymmetric import ec, rsa from dateutil.parser import parse -from boefjes.job_models import NormalizerOutput +from boefjes.job_models import NormalizerAffirmation, NormalizerOutput from octopoes.models import Reference from octopoes.models.ooi.certificate import ( AlgorithmType, @@ -73,7 +73,7 @@ def run(input_ooi: dict, raw: bytes) -> Iterable[NormalizerOutput]: ) # update website - yield website + yield NormalizerAffirmation(ooi=website) # chain certificates together last_certificate = None diff --git a/boefjes/tests/test_sslcertificate_normalizer.py b/boefjes/tests/test_sslcertificate_normalizer.py index 1687ff3f6cd..429329b3819 100644 --- a/boefjes/tests/test_sslcertificate_normalizer.py +++ b/boefjes/tests/test_sslcertificate_normalizer.py @@ -22,4 +22,4 @@ def test_ssl_certificates_normalizer(): output = list(run(input_ooi, get_dummy_data("ssl-certificates.txt"))) - assert len([ooi for ooi in output if ooi.object_type == "X509Certificate"]) == 3 + assert len([ooi for ooi in output if hasattr(ooi, "object_type") and ooi.object_type == "X509Certificate"]) == 3 diff --git a/mula/scheduler/connectors/services/bytes.py b/mula/scheduler/connectors/services/bytes.py index ec92dcec2df..67a8ef93a37 100644 --- a/mula/scheduler/connectors/services/bytes.py +++ b/mula/scheduler/connectors/services/bytes.py @@ -86,40 +86,41 @@ def get_token(self) -> str: @exception_handler def get_last_run_boefje(self, boefje_id: str, input_ooi: str, organization_id: str) -> BoefjeMeta | None: url = f"{self.host}/bytes/boefje_meta" - response = self.get( - url=url, - params={ - "boefje_id": boefje_id, - "input_ooi": input_ooi, - "organization": organization_id, - "limit": 1, - "descending": "true", - }, - ) - - self._verify_response(response) - - if response.status_code == 200 and len(response.json()) > 0: - return BoefjeMeta(**response.json()[0]) - - return None + try: + response = self.get( + url=url, + params={ + "boefje_id": boefje_id, + "input_ooi": input_ooi, + "organization": organization_id, + "limit": 1, + "descending": "true", + }, + ) + if len(response.json()) > 0: + return BoefjeMeta(**response.json()[0]) + + return None + except httpx.HTTPStatusError as exc: + if exc.response.status_code == httpx.codes.NOT_FOUND: + return None + raise @retry_with_login @exception_handler def get_last_run_boefje_by_organisation_id(self, organization_id: str) -> BoefjeMeta | None: url = f"{self.host}/bytes/boefje_meta" - response = self.get( - url=url, - params={ - "organization": organization_id, - "limit": 1, - "descending": "true", - }, - ) - - self._verify_response(response) - - if response.status_code == 200 and response.content: + try: + response = self.get( + url=url, + params={ + "organization": organization_id, + "limit": 1, + "descending": "true", + }, + ) return BoefjeMeta(**response.json()[0]) - - return None + except httpx.HTTPStatusError as exc: + if exc.response.status_code == httpx.codes.NOT_FOUND: + return None + raise diff --git a/mula/scheduler/connectors/services/katalogus.py b/mula/scheduler/connectors/services/katalogus.py index 3ce82f098ce..8ea1028d526 100644 --- a/mula/scheduler/connectors/services/katalogus.py +++ b/mula/scheduler/connectors/services/katalogus.py @@ -1,5 +1,7 @@ import threading +import httpx + from scheduler.connectors.errors import exception_handler from scheduler.models import Boefje, Organisation, Plugin from scheduler.utils import dict_utils @@ -140,32 +142,57 @@ def flush_normalizer_cache(self) -> None: @exception_handler def get_boefjes(self) -> list[Boefje]: url = f"{self.host}/boefjes" - response = self.get(url) - return [Boefje(**boefje) for boefje in response.json()] + try: + response = self.get(url) + return [Boefje(**boefje) for boefje in response.json()] + except httpx.HTTPStatusError as e: + if e.response.status_code == httpx.codes.NOT_FOUND: + return [] + raise @exception_handler - def get_boefje(self, boefje_id: str) -> Boefje: + def get_boefje(self, boefje_id: str) -> Boefje | None: url = f"{self.host}/boefjes/{boefje_id}" - response = self.get(url) - return Boefje(**response.json()) + try: + response = self.get(url) + return Boefje(**response.json()) + except httpx.HTTPStatusError as e: + if e.response.status_code == httpx.codes.NOT_FOUND: + return None + raise @exception_handler - def get_organisation(self, organisation_id) -> Organisation: + def get_organisation(self, organisation_id) -> Organisation | None: url = f"{self.host}/v1/organisations/{organisation_id}" - response = self.get(url) - return Organisation(**response.json()) + try: + response = self.get(url) + return Organisation(**response.json()) + except httpx.HTTPStatusError as e: + if e.response.status_code == httpx.codes.NOT_FOUND: + return None + raise @exception_handler def get_organisations(self) -> list[Organisation]: url = f"{self.host}/v1/organisations" - response = self.get(url) - return [Organisation(**organisation) for organisation in response.json().values()] + try: + response = self.get(url) + return [Organisation(**organisation) for organisation in response.json().values()] + except httpx.HTTPStatusError as e: + if e.response.status_code == httpx.codes.NOT_FOUND: + return [] + raise @exception_handler def get_plugins_by_organisation(self, organisation_id: str) -> list[Plugin]: url = f"{self.host}/v1/organisations/{organisation_id}/plugins" - response = self.get(url) - return [Plugin(**plugin) for plugin in response.json()] + try: + response = self.get(url) + return [Plugin(**plugin) for plugin in response.json()] + except httpx.HTTPStatusError as e: + if e.response.status_code == httpx.codes.NOT_FOUND: + return [] + raise def get_plugins_by_org_id(self, organisation_id: str) -> list[Plugin]: def _get_from_cache() -> list[Plugin]: diff --git a/mula/scheduler/connectors/services/octopoes.py b/mula/scheduler/connectors/services/octopoes.py index 04d7b9f33f0..1e076da97c4 100644 --- a/mula/scheduler/connectors/services/octopoes.py +++ b/mula/scheduler/connectors/services/octopoes.py @@ -1,5 +1,6 @@ from datetime import datetime, timezone +import httpx from pydantic import BaseModel from scheduler.connectors.errors import exception_handler @@ -61,7 +62,14 @@ def get_objects_by_object_types( oois = [] for offset in range(0, count, limit): params["offset"] = offset - response = self.get(url, params=params) + + try: + response = self.get(url, params=params) + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + break + raise + list_objects = ListObjectsResponse(**response.json()) oois.extend([ooi for ooi in list_objects.items]) @@ -81,19 +89,29 @@ def get_random_objects(self, organisation_id: str, n: int, scan_level: list[int] "valid_time": datetime.now(timezone.utc), } - response = self.get(url, params=params) - - return [OOI(**ooi) for ooi in response.json()] + try: + response = self.get(url, params=params) + return [OOI(**ooi) for ooi in response.json()] + except httpx.HTTPStatusError as e: + if e.response.status_code == httpx.codes.NOT_FOUND: + return [] + raise @exception_handler - def get_object(self, organisation_id: str, reference: str) -> OOI: + def get_object(self, organisation_id: str, reference: str) -> OOI | None: """Get an ooi from octopoes""" url = f"{self.host}/{organisation_id}/object" - response = self.get( - url, - params={"reference": reference, "valid_time": datetime.now(timezone.utc)}, - ) - return OOI(**response.json()) + + try: + response = self.get( + url, + params={"reference": reference, "valid_time": datetime.now(timezone.utc)}, + ) + return OOI(**response.json()) + except httpx.HTTPStatusError as e: + if e.response.status_code == httpx.codes.NOT_FOUND: + return None + raise def is_healthy(self) -> bool: healthy = True diff --git a/mula/scheduler/connectors/services/services.py b/mula/scheduler/connectors/services/services.py index b539b454836..ceb59630b9b 100644 --- a/mula/scheduler/connectors/services/services.py +++ b/mula/scheduler/connectors/services/services.py @@ -4,7 +4,7 @@ import httpx import structlog -from httpx import HTTPError, HTTPTransport, Limits +from httpx import HTTPTransport, Limits from ..connector import Connector # noqa: TID252 @@ -110,6 +110,8 @@ def get( url=url, ) + response.raise_for_status() + return response def post( @@ -144,7 +146,7 @@ def post( payload=payload, ) - self._verify_response(response) + response.raise_for_status() return response @@ -196,21 +198,3 @@ def is_healthy(self) -> bool: return False return self.is_host_healthy(self.host, self.health_endpoint) - - def _verify_response(self, response: httpx.Response) -> None: - """Verify the received response from a request. - - Raises: - Exception - """ - try: - response.raise_for_status() - except HTTPError as e: - self.logger.error( - "Received bad response from %s.", - response.url, - name=self.name, - url=response.url, - response=str(response.content), - ) - raise e diff --git a/mula/scheduler/schedulers/boefje.py b/mula/scheduler/schedulers/boefje.py index fcf618aa6dd..16aa5022832 100644 --- a/mula/scheduler/schedulers/boefje.py +++ b/mula/scheduler/schedulers/boefje.py @@ -371,19 +371,32 @@ def push_tasks_for_rescheduling(self): boefje_task = BoefjeTask.parse_obj(schedule.data) # Plugin still exists? - plugin = self.ctx.services.katalogus.get_plugin_by_id_and_org_id( - boefje_task.boefje.id, - self.organisation.id, - ) - if not plugin: - self.logger.debug( - "Boefje does not exist anymore, skipping", + try: + plugin = self.ctx.services.katalogus.get_plugin_by_id_and_org_id( + boefje_task.boefje.id, + self.organisation.id, + ) + if not plugin: + self.logger.info( + "Boefje does not exist anymore, skipping and disabling schedule", + boefje_id=boefje_task.boefje.id, + schedule_id=schedule.id, + organisation_id=self.organisation.id, + scheduler_id=self.scheduler_id, + ) + schedule.enabled = False + self.ctx.datastores.schedule_store.update_schedule(schedule) + continue + except ExternalServiceError as exc_plugin: + self.logger.error( + "Could not get plugin %s from katalogus", + boefje_task.boefje.id, boefje_id=boefje_task.boefje.id, + schedule_id=schedule.id, organisation_id=self.organisation.id, scheduler_id=self.scheduler_id, + exc_info=exc_plugin, ) - schedule.enabled = False - self.ctx.datastores.schedule_store.update_schedule(schedule) continue # Plugin still enabled? @@ -391,6 +404,7 @@ def push_tasks_for_rescheduling(self): self.logger.debug( "Boefje is disabled, skipping", boefje_id=boefje_task.boefje.id, + schedule_id=schedule.id, organisation_id=self.organisation.id, scheduler_id=self.scheduler_id, ) @@ -405,6 +419,7 @@ def push_tasks_for_rescheduling(self): self.logger.warning( "Plugin is not a boefje, skipping", plugin_id=plugin.id, + schedule_id=schedule.id, organisation_id=self.organisation.id, scheduler_id=self.scheduler_id, ) @@ -415,16 +430,29 @@ def push_tasks_for_rescheduling(self): ooi = None if boefje_task.input_ooi: # OOI still exists? - ooi = self.ctx.services.octopoes.get_object(boefje_task.organization, boefje_task.input_ooi) - if not ooi: - self.logger.debug( - "OOI does not exist anymore, skipping", + try: + ooi = self.ctx.services.octopoes.get_object(boefje_task.organization, boefje_task.input_ooi) + if not ooi: + self.logger.info( + "OOI does not exist anymore, skipping and disabling schedule", + ooi_primary_key=boefje_task.input_ooi, + schedule_id=schedule.id, + organisation_id=self.organisation.id, + scheduler_id=self.scheduler_id, + ) + schedule.enabled = False + self.ctx.datastores.schedule_store.update_schedule(schedule) + continue + except ExternalServiceError as exc_ooi: + self.logger.error( + "Could not get ooi %s from octopoes", + boefje_task.input_ooi, ooi_primary_key=boefje_task.input_ooi, + schedule_id=schedule.id, organisation_id=self.organisation.id, scheduler_id=self.scheduler_id, + exc_info=exc_ooi, ) - schedule.enabled = False - self.ctx.datastores.schedule_store.update_schedule(schedule) continue # Boefje still consuming ooi type? @@ -445,10 +473,11 @@ def push_tasks_for_rescheduling(self): # Boefje allowed to scan ooi? if not self.has_boefje_permission_to_run(plugin, ooi): - self.logger.debug( - "Boefje not allowed to scan ooi, skipping", + self.logger.info( + "Boefje not allowed to scan ooi, skipping and disabling schedule", boefje_id=boefje_task.boefje.id, ooi_primary_key=ooi.primary_key, + schedule_id=schedule.id, organisation_id=self.organisation.id, scheduler_id=self.scheduler_id, ) diff --git a/mula/scheduler/schedulers/normalizer.py b/mula/scheduler/schedulers/normalizer.py index 43ec0432f20..9c9465c08c3 100644 --- a/mula/scheduler/schedulers/normalizer.py +++ b/mula/scheduler/schedulers/normalizer.py @@ -187,14 +187,25 @@ def push_normalizer_task(self, normalizer_task: models.NormalizerTask, caller: s caller=caller, ) - plugin = self.ctx.services.katalogus.get_plugin_by_id_and_org_id( - normalizer_task.normalizer.id, - self.organisation.id, - ) - if not self.has_normalizer_permission_to_run(plugin): - self.logger.debug( - "Task is not allowed to run: %s", - normalizer_task.id, + try: + plugin = self.ctx.services.katalogus.get_plugin_by_id_and_org_id( + normalizer_task.normalizer.id, + self.organisation.id, + ) + if not self.has_normalizer_permission_to_run(plugin): + self.logger.debug( + "Task is not allowed to run: %s", + normalizer_task.id, + task_id=normalizer_task.id, + organisation_id=self.organisation.id, + scheduler_id=self.scheduler_id, + caller=caller, + ) + return + except ExternalServiceError: + self.logger.warning( + "Could not get plugin by id: %s", + normalizer_task.normalizer.id, task_id=normalizer_task.id, organisation_id=self.organisation.id, scheduler_id=self.scheduler_id,