Skip to content

Commit

Permalink
Merge branch 'main' into feature/codeql_scanning
Browse files Browse the repository at this point in the history
  • Loading branch information
BramVWS authored Feb 19, 2025
2 parents c82e783 + bda8ad8 commit 68783fc
Show file tree
Hide file tree
Showing 102 changed files with 3,063 additions and 3,176 deletions.
3 changes: 1 addition & 2 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
# These owners will be the default owners for everything in
# the repo. Unless a later match takes precedence,
* @minvws/kat-managers
.github/CODEOWNERS @minvws/irealisatie-operations
2 changes: 1 addition & 1 deletion .github/workflows/build-rdo-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ jobs:
- name: Octopoes Upload whl package
uses: actions/upload-artifact@v4
with:
name: "octopoes-${{env.RELEASE_VERSION}}"
name: "octopoes-${{env.RELEASE_VERSION}}_python${{ matrix.python_version }}"
path: "${{ github.workspace }}/octopoes/dist/octopoes*.whl"

- name: Octopoes Upload venv tar
Expand Down
74 changes: 21 additions & 53 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,60 +85,33 @@ def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue) -> Non
time.sleep(self.settings.worker_heartbeat)
return

logger.debug("Popping from queue %s", queue_type.value)

try:
queues = self.scheduler_client.get_queues()
except HTTPError:
# Scheduler is having issues, so make note of it and try again
logger.exception("Getting the queues from the scheduler failed")
time.sleep(self.settings.poll_interval) # But not immediately
p_item = self.scheduler_client.pop_item(queue_type.value)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(self.settings.worker_heartbeat)
return

# We do not target a specific queue since we start one runtime for all organisations
# and queue ids contain the organisation_id
queues = [q for q in queues if q.id.startswith(queue_type.value) and q.size > 0]

logger.debug("Found queues: %s", [queue.id for queue in queues])

all_queues_empty = True

for queue in queues:
logger.debug("Popping from queue %s", queue.id)

try:
p_item = self.scheduler_client.pop_item(queue.id)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(10)
continue

if not p_item:
logger.debug("Queue %s empty", queue.id)
continue
if p_item is None:
time.sleep(self.settings.worker_heartbeat)
return

all_queues_empty = False
logger.info("Handling task[%s]", p_item.data.id)

logger.info("Handling task[%s]", p_item.data.id)
try:
task_queue.put(p_item)
logger.info("Dispatched task[%s]", p_item.data.id)
except: # noqa
logger.exception("Exiting worker...")
logger.info("Patching scheduler task[id=%s] to %s", p_item.data.id, TaskStatus.FAILED.value)

try:
task_queue.put(p_item)
logger.info("Dispatched task[%s]", p_item.data.id)
except: # noqa
logger.exception("Exiting worker...")
logger.info("Patching scheduler task[id=%s] to %s", p_item.data.id, TaskStatus.FAILED.value)

try:
self.scheduler_client.patch_task(p_item.id, TaskStatus.FAILED)
logger.info(
"Set task status to %s in the scheduler for task[id=%s]", TaskStatus.FAILED, p_item.data.id
)
except HTTPError:
logger.exception("Could not patch scheduler task to %s", TaskStatus.FAILED.value)

raise

if all_queues_empty:
logger.debug("All queues empty, sleeping %f seconds", self.settings.poll_interval)
time.sleep(self.settings.poll_interval)
self.scheduler_client.patch_task(p_item.id, TaskStatus.FAILED)
logger.info("Set task status to %s in the scheduler for task[id=%s]", TaskStatus.FAILED, p_item.data.id)
except HTTPError:
logger.exception("Could not patch scheduler task to %s", TaskStatus.FAILED.value)

def _check_workers(self) -> None:
new_workers = []
Expand Down Expand Up @@ -279,9 +252,4 @@ def get_runtime_manager(settings: Settings, queue: WorkerManager.Queue, log_leve
LocalNormalizerJobRunner(local_repository), bytes_api_client, settings.scan_profile_whitelist
)

return SchedulerWorkerManager(
item_handler,
SchedulerAPIClient(str(settings.scheduler_api)), # Do not share a session between workers
settings,
log_level,
)
return SchedulerWorkerManager(item_handler, SchedulerAPIClient(str(settings.scheduler_api)), settings, log_level)
34 changes: 25 additions & 9 deletions boefjes/boefjes/clients/scheduler_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import uuid
from enum import Enum
from typing import Any

from httpx import Client, HTTPTransport, Response
from pydantic import BaseModel, TypeAdapter
Expand Down Expand Up @@ -29,7 +30,8 @@ class TaskStatus(Enum):
class Task(BaseModel):
id: uuid.UUID
scheduler_id: str
schedule_id: str | None
schedule_id: uuid.UUID | None = None
organisation: str
priority: int
status: TaskStatus
type: str
Expand All @@ -39,11 +41,21 @@ class Task(BaseModel):
modified_at: datetime.datetime


class PaginatedTasksResponse(BaseModel):
count: int
next: str | None = None
previous: str | None = None
results: list[Task]


class SchedulerClientInterface:
def get_queues(self) -> list[Queue]:
raise NotImplementedError()

def pop_item(self, queue_id: str) -> Task | None:
def pop_item(self, scheduler_id: str) -> Task | None:
raise NotImplementedError()

def pop_items(self, scheduler_id: str, filters: dict[str, Any]) -> PaginatedTasksResponse | None:
raise NotImplementedError()

def patch_task(self, task_id: uuid.UUID, status: TaskStatus) -> None:
Expand All @@ -66,20 +78,24 @@ def __init__(self, base_url: str):
def _verify_response(response: Response) -> None:
response.raise_for_status()

def get_queues(self) -> list[Queue]:
response = self._session.get("/queues")
def pop_item(self, scheduler_id: str) -> Task | None:
response = self._session.post(f"/schedulers/{scheduler_id}/pop?limit=1")
self._verify_response(response)

return TypeAdapter(list[Queue]).validate_json(response.content)
page = TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)
if page.count == 0:
return None

return page.results[0]

def pop_item(self, queue_id: str) -> Task | None:
response = self._session.post(f"/queues/{queue_id}/pop")
def pop_items(self, scheduler_id: str, filters: dict[str, Any]) -> PaginatedTasksResponse | None:
response = self._session.post(f"/schedulers/{scheduler_id}/pop", json=filters)
self._verify_response(response)

return TypeAdapter(Task | None).validate_json(response.content)
return TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)

def push_item(self, p_item: Task) -> None:
response = self._session.post(f"/queues/{p_item.scheduler_id}/push", content=p_item.model_dump_json())
response = self._session.post(f"/schedulers/{p_item.scheduler_id}/push", content=p_item.model_dump_json())
self._verify_response(response)

def patch_task(self, task_id: uuid.UUID, status: TaskStatus) -> None:
Expand Down
5 changes: 1 addition & 4 deletions boefjes/boefjes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Any, Literal

from pydantic import AmqpDsn, AnyHttpUrl, Field, FilePath, IPvAnyAddress, PostgresDsn, conint
from pydantic import AnyHttpUrl, Field, FilePath, IPvAnyAddress, PostgresDsn, conint
from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict
from pydantic_settings.sources import EnvSettingsSource

Expand Down Expand Up @@ -63,9 +63,6 @@ class Settings(BaseSettings):
examples=['{"kat_external_db_normalize": 3, "kat_dns_normalize": 1}'],
)

# Queue configuration
queue_uri: AmqpDsn = Field(..., description="KAT queue URI", examples=["amqp://"], validation_alias="QUEUE_URI")

katalogus_db_uri: PostgresDsn = Field(
...,
examples=["postgresql://xx:xx@host:5432/katalogus"],
Expand Down
2 changes: 0 additions & 2 deletions boefjes/packaging/deb/data/etc/kat/boefjes.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
QUEUE_URI=

# OCTOPOES_API=http://localhost:8001
# BYTES_API=http://localhost:8002
BYTES_USERNAME=bytes
Expand Down
1 change: 0 additions & 1 deletion boefjes/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ markers = ["slow: marks tests as slow"]
addopts = "-m 'not slow'"
env = [
"D:KATALOGUS_DB_URI=postgresql://postgres:postgres@ci_katalogus-db:5432/ci_katalogus",
"D:QUEUE_URI=amqp://placeholder",
"D:BOEFJES_API=http://placeholder:8006",
"D:KATALOGUS_API=http://placeholder:8000",
"D:OCTOPOES_API=http://placeholder:8001",
Expand Down
19 changes: 8 additions & 11 deletions boefjes/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from boefjes.app import SchedulerWorkerManager
from boefjes.clients.bytes_client import BytesAPIClient
from boefjes.clients.scheduler_client import Queue, SchedulerClientInterface, Task, TaskStatus
from boefjes.clients.scheduler_client import PaginatedTasksResponse, SchedulerClientInterface, Task, TaskStatus
from boefjes.config import Settings, settings
from boefjes.dependencies.plugins import PluginService, get_plugin_service
from boefjes.job_handler import bytes_api_client
Expand Down Expand Up @@ -50,15 +50,13 @@
class MockSchedulerClient(SchedulerClientInterface):
def __init__(
self,
queue_response: bytes,
boefje_responses: list[bytes],
normalizer_responses: list[bytes],
log_path: Path,
raise_on_empty_queue: Exception = KeyboardInterrupt,
iterations_to_wait_for_exception: int = 0,
sleep_time: float = 0.1,
):
self.queue_response = queue_response
self.boefje_responses = boefje_responses
self.normalizer_responses = normalizer_responses

Expand All @@ -73,26 +71,25 @@ def __init__(
self._popped_items: dict[str, Task] = multiprocessing.Manager().dict()
self._pushed_items: dict[str, Task] = multiprocessing.Manager().dict()

def get_queues(self) -> list[Queue]:
time.sleep(self.sleep_time)
return TypeAdapter(list[Queue]).validate_json(self.queue_response)

def pop_item(self, queue: str) -> Task | None:
time.sleep(self.sleep_time)

try:
if WorkerManager.Queue.BOEFJES.value in queue:
p_item = TypeAdapter(Task).validate_json(self.boefje_responses.pop(0))
response = TypeAdapter(PaginatedTasksResponse).validate_json(self.boefje_responses.pop(0))
p_item = response.results[0]
self._popped_items[str(p_item.id)] = p_item
self._tasks[str(p_item.id)] = self._task_from_id(p_item.id)
return p_item

if WorkerManager.Queue.NORMALIZERS.value in queue:
p_item = TypeAdapter(Task).validate_json(self.normalizer_responses.pop(0))
response = TypeAdapter(PaginatedTasksResponse).validate_json(self.normalizer_responses.pop(0))
p_item = response.results[0]
self._popped_items[str(p_item.id)] = p_item
self._tasks[str(p_item.id)] = self._task_from_id(p_item.id)
return p_item
except IndexError:
time.sleep(3 * self.sleep_time)
raise self.raise_on_empty_queue

def patch_task(self, task_id: UUID, status: TaskStatus) -> None:
Expand Down Expand Up @@ -126,7 +123,8 @@ def __init__(self, exception=Exception):
def handle(self, item: BoefjeMeta | NormalizerMeta):
time.sleep(self.sleep_time)

if str(item.id) == "9071c9fd-2b9f-440f-a524-ef1ca4824fd4":
if str(item.id) in ["9071c9fd-2b9f-440f-a524-ef1ca4824fd4", "2071c9fd-2b9f-440f-a524-ef1ca4824fd4"]:
time.sleep(self.sleep_time)
raise self.exception()

self.queue.put(item)
Expand All @@ -151,7 +149,6 @@ def item_handler(tmp_path: Path):
@pytest.fixture
def manager(item_handler: MockHandler, tmp_path: Path) -> SchedulerWorkerManager:
scheduler_client = MockSchedulerClient(
queue_response=get_dummy_data("scheduler/queues_response.json"),
boefje_responses=[
get_dummy_data("scheduler/pop_response_boefje.json"),
get_dummy_data("scheduler/pop_response_boefje_2.json"),
Expand Down
54 changes: 31 additions & 23 deletions boefjes/tests/examples/scheduler/pop_response_boefje.json
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
{
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"priority": 1,
"scheduler_id": "boefje-_dev",
"schedule_id": null,
"status": "dispatched",
"type": "boefje",
"hash": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"data": {
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"boefje": {
"id": "dns-records",
"version": null
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
"arguments": {},
"started_at": null,
"runnable_hash": null,
"environment": null,
"ended_at": null
},
"created_at": "2021-06-29T14:00:00",
"modified_at": "2021-06-29T14:00:00"
"count": 1,
"next": null,
"previous": null,
"results": [
{
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"priority": 1,
"scheduler_id": "boefje",
"organisation": "_dev",
"schedule_id": null,
"status": "dispatched",
"type": "boefje",
"hash": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"data": {
"id": "70da7d4f-f41f-4940-901b-d98a92e9014b",
"boefje": {
"id": "dns-records",
"version": null
},
"input_ooi": "Hostname|internet|test.test",
"organization": "_dev",
"arguments": {},
"started_at": null,
"runnable_hash": null,
"environment": null,
"ended_at": null
},
"created_at": "2021-06-29T14:00:00",
"modified_at": "2021-06-29T14:00:00"
}
]
}
Loading

0 comments on commit 68783fc

Please sign in to comment.