Skip to content

Commit

Permalink
Merge branch 'feature/add-json-schema-for-copied-boefjes' into featur…
Browse files Browse the repository at this point in the history
…e/improve-settings-env-logic
  • Loading branch information
Donnype committed Aug 20, 2024
2 parents a224b13 + cafdfa4 commit 0ce499a
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 101 deletions.
4 changes: 2 additions & 2 deletions boefjes/boefjes/plugins/kat_ssl_certificates/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from dateutil.parser import parse

from boefjes.job_models import NormalizerOutput
from boefjes.job_models import NormalizerAffirmation, NormalizerOutput
from octopoes.models import Reference
from octopoes.models.ooi.certificate import (
AlgorithmType,
Expand Down Expand Up @@ -73,7 +73,7 @@ def run(input_ooi: dict, raw: bytes) -> Iterable[NormalizerOutput]:
)

# update website
yield website
yield NormalizerAffirmation(ooi=website)

# chain certificates together
last_certificate = None
Expand Down
2 changes: 1 addition & 1 deletion boefjes/tests/test_sslcertificate_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
def test_ssl_certificates_normalizer():
output = list(run(input_ooi, get_dummy_data("ssl-certificates.txt")))

assert len([ooi for ooi in output if ooi.object_type == "X509Certificate"]) == 3
assert len([ooi for ooi in output if hasattr(ooi, "object_type") and ooi.object_type == "X509Certificate"]) == 3
63 changes: 32 additions & 31 deletions mula/scheduler/connectors/services/bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,40 +86,41 @@ def get_token(self) -> str:
@exception_handler
def get_last_run_boefje(self, boefje_id: str, input_ooi: str, organization_id: str) -> BoefjeMeta | None:
url = f"{self.host}/bytes/boefje_meta"
response = self.get(
url=url,
params={
"boefje_id": boefje_id,
"input_ooi": input_ooi,
"organization": organization_id,
"limit": 1,
"descending": "true",
},
)

self._verify_response(response)

if response.status_code == 200 and len(response.json()) > 0:
return BoefjeMeta(**response.json()[0])

return None
try:
response = self.get(
url=url,
params={
"boefje_id": boefje_id,
"input_ooi": input_ooi,
"organization": organization_id,
"limit": 1,
"descending": "true",
},
)
if len(response.json()) > 0:
return BoefjeMeta(**response.json()[0])

return None
except httpx.HTTPStatusError as exc:
if exc.response.status_code == httpx.codes.NOT_FOUND:
return None
raise

@retry_with_login
@exception_handler
def get_last_run_boefje_by_organisation_id(self, organization_id: str) -> BoefjeMeta | None:
url = f"{self.host}/bytes/boefje_meta"
response = self.get(
url=url,
params={
"organization": organization_id,
"limit": 1,
"descending": "true",
},
)

self._verify_response(response)

if response.status_code == 200 and response.content:
try:
response = self.get(
url=url,
params={
"organization": organization_id,
"limit": 1,
"descending": "true",
},
)
return BoefjeMeta(**response.json()[0])

return None
except httpx.HTTPStatusError as exc:
if exc.response.status_code == httpx.codes.NOT_FOUND:
return None
raise
51 changes: 39 additions & 12 deletions mula/scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import threading

import httpx

from scheduler.connectors.errors import exception_handler
from scheduler.models import Boefje, Organisation, Plugin
from scheduler.utils import dict_utils
Expand Down Expand Up @@ -140,32 +142,57 @@ def flush_normalizer_cache(self) -> None:
@exception_handler
def get_boefjes(self) -> list[Boefje]:
url = f"{self.host}/boefjes"
response = self.get(url)
return [Boefje(**boefje) for boefje in response.json()]
try:
response = self.get(url)
return [Boefje(**boefje) for boefje in response.json()]
except httpx.HTTPStatusError as e:
if e.response.status_code == httpx.codes.NOT_FOUND:
return []
raise

@exception_handler
def get_boefje(self, boefje_id: str) -> Boefje:
def get_boefje(self, boefje_id: str) -> Boefje | None:
url = f"{self.host}/boefjes/{boefje_id}"
response = self.get(url)
return Boefje(**response.json())
try:
response = self.get(url)
return Boefje(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == httpx.codes.NOT_FOUND:
return None
raise

@exception_handler
def get_organisation(self, organisation_id) -> Organisation:
def get_organisation(self, organisation_id) -> Organisation | None:
url = f"{self.host}/v1/organisations/{organisation_id}"
response = self.get(url)
return Organisation(**response.json())
try:
response = self.get(url)
return Organisation(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == httpx.codes.NOT_FOUND:
return None
raise

@exception_handler
def get_organisations(self) -> list[Organisation]:
url = f"{self.host}/v1/organisations"
response = self.get(url)
return [Organisation(**organisation) for organisation in response.json().values()]
try:
response = self.get(url)
return [Organisation(**organisation) for organisation in response.json().values()]
except httpx.HTTPStatusError as e:
if e.response.status_code == httpx.codes.NOT_FOUND:
return []
raise

@exception_handler
def get_plugins_by_organisation(self, organisation_id: str) -> list[Plugin]:
url = f"{self.host}/v1/organisations/{organisation_id}/plugins"
response = self.get(url)
return [Plugin(**plugin) for plugin in response.json()]
try:
response = self.get(url)
return [Plugin(**plugin) for plugin in response.json()]
except httpx.HTTPStatusError as e:
if e.response.status_code == httpx.codes.NOT_FOUND:
return []
raise

def get_plugins_by_org_id(self, organisation_id: str) -> list[Plugin]:
def _get_from_cache() -> list[Plugin]:
Expand Down
38 changes: 28 additions & 10 deletions mula/scheduler/connectors/services/octopoes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime, timezone

import httpx
from pydantic import BaseModel

from scheduler.connectors.errors import exception_handler
Expand Down Expand Up @@ -61,7 +62,14 @@ def get_objects_by_object_types(
oois = []
for offset in range(0, count, limit):
params["offset"] = offset
response = self.get(url, params=params)

try:
response = self.get(url, params=params)
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
break
raise

list_objects = ListObjectsResponse(**response.json())
oois.extend([ooi for ooi in list_objects.items])

Expand All @@ -81,19 +89,29 @@ def get_random_objects(self, organisation_id: str, n: int, scan_level: list[int]
"valid_time": datetime.now(timezone.utc),
}

response = self.get(url, params=params)

return [OOI(**ooi) for ooi in response.json()]
try:
response = self.get(url, params=params)
return [OOI(**ooi) for ooi in response.json()]
except httpx.HTTPStatusError as e:
if e.response.status_code == httpx.codes.NOT_FOUND:
return []
raise

@exception_handler
def get_object(self, organisation_id: str, reference: str) -> OOI:
def get_object(self, organisation_id: str, reference: str) -> OOI | None:
"""Get an ooi from octopoes"""
url = f"{self.host}/{organisation_id}/object"
response = self.get(
url,
params={"reference": reference, "valid_time": datetime.now(timezone.utc)},
)
return OOI(**response.json())

try:
response = self.get(
url,
params={"reference": reference, "valid_time": datetime.now(timezone.utc)},
)
return OOI(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == httpx.codes.NOT_FOUND:
return None
raise

def is_healthy(self) -> bool:
healthy = True
Expand Down
24 changes: 4 additions & 20 deletions mula/scheduler/connectors/services/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import httpx
import structlog
from httpx import HTTPError, HTTPTransport, Limits
from httpx import HTTPTransport, Limits

from ..connector import Connector # noqa: TID252

Expand Down Expand Up @@ -110,6 +110,8 @@ def get(
url=url,
)

response.raise_for_status()

return response

def post(
Expand Down Expand Up @@ -144,7 +146,7 @@ def post(
payload=payload,
)

self._verify_response(response)
response.raise_for_status()

return response

Expand Down Expand Up @@ -196,21 +198,3 @@ def is_healthy(self) -> bool:
return False

return self.is_host_healthy(self.host, self.health_endpoint)

def _verify_response(self, response: httpx.Response) -> None:
"""Verify the received response from a request.
Raises:
Exception
"""
try:
response.raise_for_status()
except HTTPError as e:
self.logger.error(
"Received bad response from %s.",
response.url,
name=self.name,
url=response.url,
response=str(response.content),
)
raise e
63 changes: 46 additions & 17 deletions mula/scheduler/schedulers/boefje.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,26 +371,40 @@ def push_tasks_for_rescheduling(self):
boefje_task = BoefjeTask.parse_obj(schedule.data)

# Plugin still exists?
plugin = self.ctx.services.katalogus.get_plugin_by_id_and_org_id(
boefje_task.boefje.id,
self.organisation.id,
)
if not plugin:
self.logger.debug(
"Boefje does not exist anymore, skipping",
try:
plugin = self.ctx.services.katalogus.get_plugin_by_id_and_org_id(
boefje_task.boefje.id,
self.organisation.id,
)
if not plugin:
self.logger.info(
"Boefje does not exist anymore, skipping and disabling schedule",
boefje_id=boefje_task.boefje.id,
schedule_id=schedule.id,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
)
schedule.enabled = False
self.ctx.datastores.schedule_store.update_schedule(schedule)
continue
except ExternalServiceError as exc_plugin:
self.logger.error(
"Could not get plugin %s from katalogus",
boefje_task.boefje.id,
boefje_id=boefje_task.boefje.id,
schedule_id=schedule.id,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
exc_info=exc_plugin,
)
schedule.enabled = False
self.ctx.datastores.schedule_store.update_schedule(schedule)
continue

# Plugin still enabled?
if not plugin.enabled:
self.logger.debug(
"Boefje is disabled, skipping",
boefje_id=boefje_task.boefje.id,
schedule_id=schedule.id,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
)
Expand All @@ -405,6 +419,7 @@ def push_tasks_for_rescheduling(self):
self.logger.warning(
"Plugin is not a boefje, skipping",
plugin_id=plugin.id,
schedule_id=schedule.id,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
)
Expand All @@ -415,16 +430,29 @@ def push_tasks_for_rescheduling(self):
ooi = None
if boefje_task.input_ooi:
# OOI still exists?
ooi = self.ctx.services.octopoes.get_object(boefje_task.organization, boefje_task.input_ooi)
if not ooi:
self.logger.debug(
"OOI does not exist anymore, skipping",
try:
ooi = self.ctx.services.octopoes.get_object(boefje_task.organization, boefje_task.input_ooi)
if not ooi:
self.logger.info(
"OOI does not exist anymore, skipping and disabling schedule",
ooi_primary_key=boefje_task.input_ooi,
schedule_id=schedule.id,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
)
schedule.enabled = False
self.ctx.datastores.schedule_store.update_schedule(schedule)
continue
except ExternalServiceError as exc_ooi:
self.logger.error(
"Could not get ooi %s from octopoes",
boefje_task.input_ooi,
ooi_primary_key=boefje_task.input_ooi,
schedule_id=schedule.id,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
exc_info=exc_ooi,
)
schedule.enabled = False
self.ctx.datastores.schedule_store.update_schedule(schedule)
continue

# Boefje still consuming ooi type?
Expand All @@ -445,10 +473,11 @@ def push_tasks_for_rescheduling(self):

# Boefje allowed to scan ooi?
if not self.has_boefje_permission_to_run(plugin, ooi):
self.logger.debug(
"Boefje not allowed to scan ooi, skipping",
self.logger.info(
"Boefje not allowed to scan ooi, skipping and disabling schedule",
boefje_id=boefje_task.boefje.id,
ooi_primary_key=ooi.primary_key,
schedule_id=schedule.id,
organisation_id=self.organisation.id,
scheduler_id=self.scheduler_id,
)
Expand Down
Loading

0 comments on commit 0ce499a

Please sign in to comment.