From 3d30cb3953d612ff9384bb702976fa07761b92ba Mon Sep 17 00:00:00 2001 From: "Felipe N. Schuch" Date: Fri, 19 Apr 2024 14:15:53 -0300 Subject: [PATCH] [ASP-4600] Add cluster_statuses endpoints to Jobbergate API (#538) * feat(api): add tracking for cluster status * code review --- jobbergate-api/CHANGELOG.md | 2 + jobbergate-api/alembic/env.py | 3 +- ...22877cfe_add_cluster_statuses_endpoints.py | 33 +++++ .../jobbergate_api/apps/clusters/__init__.py | 1 + .../jobbergate_api/apps/clusters/models.py | 24 ++++ .../jobbergate_api/apps/clusters/routers.py | 62 +++++++++ .../jobbergate_api/apps/clusters/schemas.py | 52 +++++++ jobbergate-api/jobbergate_api/main.py | 2 + .../tests/apps/clusters/test_routers.py | 131 ++++++++++++++++++ 9 files changed, 309 insertions(+), 1 deletion(-) create mode 100644 jobbergate-api/alembic/versions/20240417_144852--815022877cfe_add_cluster_statuses_endpoints.py create mode 100644 jobbergate-api/jobbergate_api/apps/clusters/__init__.py create mode 100644 jobbergate-api/jobbergate_api/apps/clusters/models.py create mode 100644 jobbergate-api/jobbergate_api/apps/clusters/routers.py create mode 100644 jobbergate-api/jobbergate_api/apps/clusters/schemas.py create mode 100644 jobbergate-api/tests/apps/clusters/test_routers.py diff --git a/jobbergate-api/CHANGELOG.md b/jobbergate-api/CHANGELOG.md index 206204df..7e3a0bc3 100644 --- a/jobbergate-api/CHANGELOG.md +++ b/jobbergate-api/CHANGELOG.md @@ -4,6 +4,8 @@ This file keeps track of all notable changes to jobbergate-api ## Unreleased +- Added cluster statuses table and endpoints to monitor if the agents are pinging the API in the expected time interval [ASP-4600] + ## 5.0.0 -- 2024-04-18 - Added logic to coerce empty `identifier` on job script templates to a `None` value [PENG-2152] diff --git a/jobbergate-api/alembic/env.py b/jobbergate-api/alembic/env.py index 027749c1..b8955dc2 100644 --- a/jobbergate-api/alembic/env.py +++ b/jobbergate-api/alembic/env.py @@ -3,10 +3,11 @@ from sqlalchemy import engine_from_config, pool from alembic import context +from jobbergate_api.apps.clusters import models # noqa # must be imported for metadata to work +from jobbergate_api.apps.job_script_templates import models # noqa # must be imported for metadata to work from jobbergate_api.apps.job_scripts import models # noqa # must be imported for metadata to work from jobbergate_api.apps.job_submissions import models # noqa # must be imported for metadata to work from jobbergate_api.apps.models import Base -from jobbergate_api.apps.job_script_templates import models # noqa # must be imported for metadata to work # from jobbergate_api.metadata import ( # metadata as jobbergate_api_metadata, diff --git a/jobbergate-api/alembic/versions/20240417_144852--815022877cfe_add_cluster_statuses_endpoints.py b/jobbergate-api/alembic/versions/20240417_144852--815022877cfe_add_cluster_statuses_endpoints.py new file mode 100644 index 00000000..eeb15fbd --- /dev/null +++ b/jobbergate-api/alembic/versions/20240417_144852--815022877cfe_add_cluster_statuses_endpoints.py @@ -0,0 +1,33 @@ +"""Add cluster statuses endpoints + +Revision ID: 815022877cfe +Revises: 64edbf695d69 +Create Date: 2024-04-17 14:48:52.956389 + +""" + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "815022877cfe" +down_revision = "64edbf695d69" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "cluster_statuses", + sa.Column("client_id", sa.String(), nullable=False), + sa.Column("interval", sa.Integer(), nullable=False), + sa.Column("last_reported", sa.DateTime(timezone=True), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint("client_id"), + ) + + +def downgrade(): + op.drop_table("cluster_statuses") diff --git a/jobbergate-api/jobbergate_api/apps/clusters/__init__.py b/jobbergate-api/jobbergate_api/apps/clusters/__init__.py new file mode 100644 index 00000000..9acf3e67 --- /dev/null +++ b/jobbergate-api/jobbergate_api/apps/clusters/__init__.py @@ -0,0 +1 @@ +"""Module to track agent's health on the clusters.""" diff --git a/jobbergate-api/jobbergate_api/apps/clusters/models.py b/jobbergate-api/jobbergate_api/apps/clusters/models.py new file mode 100644 index 00000000..20dede70 --- /dev/null +++ b/jobbergate-api/jobbergate_api/apps/clusters/models.py @@ -0,0 +1,24 @@ +"""Database models for the cluster health resource.""" + +from pendulum.datetime import DateTime as PendulumDateTime +from sqlalchemy import DateTime, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from jobbergate_api.apps.models import Base, CommonMixin, TimestampMixin + + +class ClusterStatus(CommonMixin, TimestampMixin, Base): + """Cluster status table definition.""" + + client_id: Mapped[str] = mapped_column(String, primary_key=True) + interval: Mapped[int] = mapped_column(Integer, nullable=False) + last_reported: Mapped[PendulumDateTime] = mapped_column( + DateTime(timezone=True), + nullable=False, + default=PendulumDateTime.utcnow, + ) + + @property + def is_healthy(self) -> bool: + """Return True if the last_reported time is before now plus the interval in seconds between pings.""" + return PendulumDateTime.utcnow().subtract(seconds=self.interval) <= self.last_reported diff --git a/jobbergate-api/jobbergate_api/apps/clusters/routers.py b/jobbergate-api/jobbergate_api/apps/clusters/routers.py new file mode 100644 index 00000000..24112cff --- /dev/null +++ b/jobbergate-api/jobbergate_api/apps/clusters/routers.py @@ -0,0 +1,62 @@ +"""Cluster status API endpoints.""" + +from fastapi import APIRouter, Depends, Query +from fastapi import Response as FastAPIResponse +from fastapi import status +from fastapi_pagination import Page +from fastapi_pagination.ext.sqlalchemy import paginate +from loguru import logger +from pendulum.datetime import DateTime as PendulumDateTime +from sqlalchemy import select + +from jobbergate_api.apps.clusters.models import ClusterStatus +from jobbergate_api.apps.clusters.schemas import ClusterStatusView +from jobbergate_api.apps.permissions import Permissions +from jobbergate_api.storage import SecureSession, secure_session + +router = APIRouter(prefix="/clusters", tags=["Cluster Status"]) + + +@router.put( + "/status", + status_code=status.HTTP_202_ACCEPTED, + description="Endpoints to accept a status check from the agent on the clusters.", +) +async def report_cluster_status( + interval: int = Query(description="The interval in seconds between pings.", gt=0), + secure_session: SecureSession = Depends( + secure_session(Permissions.JOB_SUBMISSIONS_EDIT, ensure_client_id=True) + ), +): + """ + Report the status of the cluster. + """ + logger.debug( + "Got status report from client_id={}, another ping is expected in {} seconds", + secure_session.identity_payload.client_id, + interval, + ) + instance = ClusterStatus( + client_id=secure_session.identity_payload.client_id, + interval=interval, + last_reported=PendulumDateTime.utcnow(), + ) + await secure_session.session.merge(instance) + + return FastAPIResponse(status_code=status.HTTP_202_ACCEPTED) + + +@router.get( + "/status", + description="Endpoint to get the status of the cluster.", + response_model=Page[ClusterStatusView], +) +async def get_cluster_status( + secure_session: SecureSession = Depends(secure_session(Permissions.JOB_SUBMISSIONS_VIEW, commit=False)), +): + """ + Get the status of the cluster. + """ + logger.debug("Getting list of cluster statuses") + query = select(ClusterStatus).order_by(ClusterStatus.client_id) + return await paginate(secure_session.session, query) diff --git a/jobbergate-api/jobbergate_api/apps/clusters/schemas.py b/jobbergate-api/jobbergate_api/apps/clusters/schemas.py new file mode 100644 index 00000000..bd3deaca --- /dev/null +++ b/jobbergate-api/jobbergate_api/apps/clusters/schemas.py @@ -0,0 +1,52 @@ +"""Schema definitions for the cluster app.""" + +from pendulum.datetime import DateTime +from pydantic import BaseModel + +from jobbergate_api.apps.schemas import IgnoreLazyGetterDict +from jobbergate_api.meta_mapper import MetaField, MetaMapper + +cluster_status_meta_mapper = MetaMapper( + client_id=MetaField( + description="The client_id of the cluster where this agent is reporting status from", + example="mega-cluster-1", + ), + created_at=MetaField( + description="The timestamp for when this entry was created", + example="2023-08-18T13:55:37.172285", + ), + updated_at=MetaField( + description="The timestamp for when this entry was last updated", + example="2023-08-18T13:55:37.172285", + ), + last_reported=MetaField( + description="The timestamp for when the agent on the cluster last reported its status", + example="2023-08-18T13:55:37.172285", + ), + interval=MetaField( + description="The expected interval in seconds between pings from the agent", + example=60, + ), + is_healthy=MetaField( + description="A boolean indicating if the cluster is healthy based on the last_reported time", + example=True, + ), +) + + +class ClusterStatusView(BaseModel): + """ + Describes the status of a cluster. + """ + + client_id: str + created_at: DateTime + updated_at: DateTime + last_reported: DateTime + interval: int + is_healthy: bool + + class Config: + orm_mode = True + getter_dict = IgnoreLazyGetterDict + schema_extra = cluster_status_meta_mapper diff --git a/jobbergate-api/jobbergate_api/main.py b/jobbergate-api/jobbergate_api/main.py index 1c1f9c1f..f71f6c8b 100644 --- a/jobbergate-api/jobbergate_api/main.py +++ b/jobbergate-api/jobbergate_api/main.py @@ -14,6 +14,7 @@ from starlette.middleware.cors import CORSMiddleware from jobbergate_api import __version__ +from jobbergate_api.apps.clusters.routers import router as cluster_status_router from jobbergate_api.apps.job_script_templates.routers import router as job_script_templates_router from jobbergate_api.apps.job_scripts.routers import router as job_scripts_router from jobbergate_api.apps.job_submissions.routers import router as job_submissions_router @@ -59,6 +60,7 @@ subapp.include_router(job_script_templates_router) subapp.include_router(job_scripts_router) subapp.include_router(job_submissions_router) +subapp.include_router(cluster_status_router) subapp.exception_handler(asyncpg.exceptions.ForeignKeyViolationError)(handle_fk_error) add_pagination(subapp) diff --git a/jobbergate-api/tests/apps/clusters/test_routers.py b/jobbergate-api/tests/apps/clusters/test_routers.py new file mode 100644 index 00000000..1e91fcd3 --- /dev/null +++ b/jobbergate-api/tests/apps/clusters/test_routers.py @@ -0,0 +1,131 @@ +from jobbergate_api.apps.permissions import Permissions +from fastapi import status +from jobbergate_api.apps.clusters.models import ClusterStatus +from sqlalchemy import select +import pytest +import pendulum + + +class TestPutClusterStatus: + + async def test_report_cluster_status__create(self, client, inject_security_header, synth_session): + client_id = "dummy-client" + inject_security_header( + "who@cares.com", + Permissions.JOB_SUBMISSIONS_EDIT, + client_id=client_id, + ) + + interval = 60 + now = pendulum.datetime(2023, 1, 1) + with pendulum.test(now): + response = await client.put("/jobbergate/clusters/status", params={"interval": interval}) + + assert response.status_code == status.HTTP_202_ACCEPTED + + query = select(ClusterStatus).filter(ClusterStatus.client_id == client_id) + instance = (await synth_session.execute(query)).unique().scalar_one() + + assert instance.client_id == client_id + assert instance.interval == interval + assert instance.last_reported == now + + async def test_report_cluster_status__update(self, client, inject_security_header, synth_session): + client_id = "dummy-client" + inject_security_header( + "who@cares.com", + Permissions.JOB_SUBMISSIONS_EDIT, + client_id=client_id, + ) + + original_time = pendulum.datetime(2023, 1, 1) + original_interval = 60 + with pendulum.test(original_time): + response = await client.put("/jobbergate/clusters/status", params={"interval": original_interval}) + assert response.status_code == status.HTTP_202_ACCEPTED + + now = original_time.add(days=1) + interval = original_interval * 2 + with pendulum.test(now): + response = await client.put("/jobbergate/clusters/status", params={"interval": interval}) + assert response.status_code == status.HTTP_202_ACCEPTED + + query = select(ClusterStatus).filter(ClusterStatus.client_id == client_id) + instance = (await synth_session.execute(query)).unique().scalar_one() + + assert instance.client_id == client_id + assert instance.interval == interval + assert instance.last_reported == now + + @pytest.mark.parametrize("interval", [0, -1, None]) + async def test_report__invalid_interval_raises_error( + self, interval, client, inject_security_header, synth_session + ): + client_id = "dummy-client" + inject_security_header( + "who@cares.com", + Permissions.JOB_SUBMISSIONS_EDIT, + client_id=client_id, + ) + + response = await client.put("/jobbergate/clusters/status", params={"interval": interval}) + + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY + + async def test_report_cluster_status__no_client_id(self, client, inject_security_header): + + inject_security_header("who@cares.com", Permissions.JOB_SUBMISSIONS_EDIT) + + response = await client.put("/jobbergate/clusters/status", params={"interval": 60}) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + async def test_report_cluster_status__bad_permission(self, client, inject_security_header): + + inject_security_header("who@cares.com", client_id="dummy-client") + + response = await client.put("/jobbergate/clusters/status", params={"interval": 60}) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + +class TestListClusterStatus: + + async def test_get_cluster_status__empty( + self, client, inject_security_header, unpack_response, synth_session + ): + + inject_security_header("who@cares.com", Permissions.JOB_SUBMISSIONS_VIEW) + + response = await client.get("/jobbergate/clusters/status") + assert unpack_response(response, check_total=0, check_page=1, check_pages=0) == [] + + async def test_get_cluster_status__list( + self, client, inject_security_header, unpack_response, synth_session + ): + + statuses = [ + ClusterStatus(client_id="client-1", interval=10, last_reported=pendulum.datetime(2023, 1, 1)), + ClusterStatus(client_id="client-2", interval=20, last_reported=pendulum.datetime(2023, 1, 1)), + ClusterStatus(client_id="client-3", interval=30, last_reported=pendulum.datetime(2022, 1, 1)), + ] + + inject_security_header("who@cares.com", Permissions.JOB_SUBMISSIONS_VIEW) + + with pendulum.test(pendulum.datetime(2023, 1, 1)): + synth_session.add_all(statuses) + response = await client.get("/jobbergate/clusters/status") + + assert unpack_response( + response, check_total=3, check_page=1, check_pages=1, key="client_id", sort=True + ) == [s.client_id for s in statuses] + + assert unpack_response(response, key="is_healthy") == [True, True, False] + + async def test_get_cluster_status__bad_permission(self, client, inject_security_header, synth_session): + + inject_security_header("who@cares.com") + + response = await client.get("/jobbergate/clusters/status") + + assert response.status_code == status.HTTP_403_FORBIDDEN