-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[ASP-4600] Add cluster_statuses endpoints to Jobbergate API (#538)
* feat(api): add tracking for cluster status * code review
- Loading branch information
Showing
9 changed files
with
309 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
33 changes: 33 additions & 0 deletions
33
...gate-api/alembic/versions/20240417_144852--815022877cfe_add_cluster_statuses_endpoints.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
"""Add cluster statuses endpoints | ||
Revision ID: 815022877cfe | ||
Revises: 64edbf695d69 | ||
Create Date: 2024-04-17 14:48:52.956389 | ||
""" | ||
|
||
import sqlalchemy as sa | ||
|
||
from alembic import op | ||
|
||
# revision identifiers, used by Alembic. | ||
revision = "815022877cfe" | ||
down_revision = "64edbf695d69" | ||
branch_labels = None | ||
depends_on = None | ||
|
||
|
||
def upgrade(): | ||
op.create_table( | ||
"cluster_statuses", | ||
sa.Column("client_id", sa.String(), nullable=False), | ||
sa.Column("interval", sa.Integer(), nullable=False), | ||
sa.Column("last_reported", sa.DateTime(timezone=True), nullable=False), | ||
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False), | ||
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), | ||
sa.PrimaryKeyConstraint("client_id"), | ||
) | ||
|
||
|
||
def downgrade(): | ||
op.drop_table("cluster_statuses") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Module to track agent's health on the clusters.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
"""Database models for the cluster health resource.""" | ||
|
||
from pendulum.datetime import DateTime as PendulumDateTime | ||
from sqlalchemy import DateTime, Integer, String | ||
from sqlalchemy.orm import Mapped, mapped_column | ||
|
||
from jobbergate_api.apps.models import Base, CommonMixin, TimestampMixin | ||
|
||
|
||
class ClusterStatus(CommonMixin, TimestampMixin, Base): | ||
"""Cluster status table definition.""" | ||
|
||
client_id: Mapped[str] = mapped_column(String, primary_key=True) | ||
interval: Mapped[int] = mapped_column(Integer, nullable=False) | ||
last_reported: Mapped[PendulumDateTime] = mapped_column( | ||
DateTime(timezone=True), | ||
nullable=False, | ||
default=PendulumDateTime.utcnow, | ||
) | ||
|
||
@property | ||
def is_healthy(self) -> bool: | ||
"""Return True if the last_reported time is before now plus the interval in seconds between pings.""" | ||
return PendulumDateTime.utcnow().subtract(seconds=self.interval) <= self.last_reported |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
"""Cluster status API endpoints.""" | ||
|
||
from fastapi import APIRouter, Depends, Query | ||
from fastapi import Response as FastAPIResponse | ||
from fastapi import status | ||
from fastapi_pagination import Page | ||
from fastapi_pagination.ext.sqlalchemy import paginate | ||
from loguru import logger | ||
from pendulum.datetime import DateTime as PendulumDateTime | ||
from sqlalchemy import select | ||
|
||
from jobbergate_api.apps.clusters.models import ClusterStatus | ||
from jobbergate_api.apps.clusters.schemas import ClusterStatusView | ||
from jobbergate_api.apps.permissions import Permissions | ||
from jobbergate_api.storage import SecureSession, secure_session | ||
|
||
router = APIRouter(prefix="/clusters", tags=["Cluster Status"]) | ||
|
||
|
||
@router.put( | ||
"/status", | ||
status_code=status.HTTP_202_ACCEPTED, | ||
description="Endpoints to accept a status check from the agent on the clusters.", | ||
) | ||
async def report_cluster_status( | ||
interval: int = Query(description="The interval in seconds between pings.", gt=0), | ||
secure_session: SecureSession = Depends( | ||
secure_session(Permissions.JOB_SUBMISSIONS_EDIT, ensure_client_id=True) | ||
), | ||
): | ||
""" | ||
Report the status of the cluster. | ||
""" | ||
logger.debug( | ||
"Got status report from client_id={}, another ping is expected in {} seconds", | ||
secure_session.identity_payload.client_id, | ||
interval, | ||
) | ||
instance = ClusterStatus( | ||
client_id=secure_session.identity_payload.client_id, | ||
interval=interval, | ||
last_reported=PendulumDateTime.utcnow(), | ||
) | ||
await secure_session.session.merge(instance) | ||
|
||
return FastAPIResponse(status_code=status.HTTP_202_ACCEPTED) | ||
|
||
|
||
@router.get( | ||
"/status", | ||
description="Endpoint to get the status of the cluster.", | ||
response_model=Page[ClusterStatusView], | ||
) | ||
async def get_cluster_status( | ||
secure_session: SecureSession = Depends(secure_session(Permissions.JOB_SUBMISSIONS_VIEW, commit=False)), | ||
): | ||
""" | ||
Get the status of the cluster. | ||
""" | ||
logger.debug("Getting list of cluster statuses") | ||
query = select(ClusterStatus).order_by(ClusterStatus.client_id) | ||
return await paginate(secure_session.session, query) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
"""Schema definitions for the cluster app.""" | ||
|
||
from pendulum.datetime import DateTime | ||
from pydantic import BaseModel | ||
|
||
from jobbergate_api.apps.schemas import IgnoreLazyGetterDict | ||
from jobbergate_api.meta_mapper import MetaField, MetaMapper | ||
|
||
cluster_status_meta_mapper = MetaMapper( | ||
client_id=MetaField( | ||
description="The client_id of the cluster where this agent is reporting status from", | ||
example="mega-cluster-1", | ||
), | ||
created_at=MetaField( | ||
description="The timestamp for when this entry was created", | ||
example="2023-08-18T13:55:37.172285", | ||
), | ||
updated_at=MetaField( | ||
description="The timestamp for when this entry was last updated", | ||
example="2023-08-18T13:55:37.172285", | ||
), | ||
last_reported=MetaField( | ||
description="The timestamp for when the agent on the cluster last reported its status", | ||
example="2023-08-18T13:55:37.172285", | ||
), | ||
interval=MetaField( | ||
description="The expected interval in seconds between pings from the agent", | ||
example=60, | ||
), | ||
is_healthy=MetaField( | ||
description="A boolean indicating if the cluster is healthy based on the last_reported time", | ||
example=True, | ||
), | ||
) | ||
|
||
|
||
class ClusterStatusView(BaseModel): | ||
""" | ||
Describes the status of a cluster. | ||
""" | ||
|
||
client_id: str | ||
created_at: DateTime | ||
updated_at: DateTime | ||
last_reported: DateTime | ||
interval: int | ||
is_healthy: bool | ||
|
||
class Config: | ||
orm_mode = True | ||
getter_dict = IgnoreLazyGetterDict | ||
schema_extra = cluster_status_meta_mapper |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
from jobbergate_api.apps.permissions import Permissions | ||
from fastapi import status | ||
from jobbergate_api.apps.clusters.models import ClusterStatus | ||
from sqlalchemy import select | ||
import pytest | ||
import pendulum | ||
|
||
|
||
class TestPutClusterStatus: | ||
|
||
async def test_report_cluster_status__create(self, client, inject_security_header, synth_session): | ||
client_id = "dummy-client" | ||
inject_security_header( | ||
"[email protected]", | ||
Permissions.JOB_SUBMISSIONS_EDIT, | ||
client_id=client_id, | ||
) | ||
|
||
interval = 60 | ||
now = pendulum.datetime(2023, 1, 1) | ||
with pendulum.test(now): | ||
response = await client.put("/jobbergate/clusters/status", params={"interval": interval}) | ||
|
||
assert response.status_code == status.HTTP_202_ACCEPTED | ||
|
||
query = select(ClusterStatus).filter(ClusterStatus.client_id == client_id) | ||
instance = (await synth_session.execute(query)).unique().scalar_one() | ||
|
||
assert instance.client_id == client_id | ||
assert instance.interval == interval | ||
assert instance.last_reported == now | ||
|
||
async def test_report_cluster_status__update(self, client, inject_security_header, synth_session): | ||
client_id = "dummy-client" | ||
inject_security_header( | ||
"[email protected]", | ||
Permissions.JOB_SUBMISSIONS_EDIT, | ||
client_id=client_id, | ||
) | ||
|
||
original_time = pendulum.datetime(2023, 1, 1) | ||
original_interval = 60 | ||
with pendulum.test(original_time): | ||
response = await client.put("/jobbergate/clusters/status", params={"interval": original_interval}) | ||
assert response.status_code == status.HTTP_202_ACCEPTED | ||
|
||
now = original_time.add(days=1) | ||
interval = original_interval * 2 | ||
with pendulum.test(now): | ||
response = await client.put("/jobbergate/clusters/status", params={"interval": interval}) | ||
assert response.status_code == status.HTTP_202_ACCEPTED | ||
|
||
query = select(ClusterStatus).filter(ClusterStatus.client_id == client_id) | ||
instance = (await synth_session.execute(query)).unique().scalar_one() | ||
|
||
assert instance.client_id == client_id | ||
assert instance.interval == interval | ||
assert instance.last_reported == now | ||
|
||
@pytest.mark.parametrize("interval", [0, -1, None]) | ||
async def test_report__invalid_interval_raises_error( | ||
self, interval, client, inject_security_header, synth_session | ||
): | ||
client_id = "dummy-client" | ||
inject_security_header( | ||
"[email protected]", | ||
Permissions.JOB_SUBMISSIONS_EDIT, | ||
client_id=client_id, | ||
) | ||
|
||
response = await client.put("/jobbergate/clusters/status", params={"interval": interval}) | ||
|
||
assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY | ||
|
||
async def test_report_cluster_status__no_client_id(self, client, inject_security_header): | ||
|
||
inject_security_header("[email protected]", Permissions.JOB_SUBMISSIONS_EDIT) | ||
|
||
response = await client.put("/jobbergate/clusters/status", params={"interval": 60}) | ||
|
||
assert response.status_code == status.HTTP_400_BAD_REQUEST | ||
|
||
async def test_report_cluster_status__bad_permission(self, client, inject_security_header): | ||
|
||
inject_security_header("[email protected]", client_id="dummy-client") | ||
|
||
response = await client.put("/jobbergate/clusters/status", params={"interval": 60}) | ||
|
||
assert response.status_code == status.HTTP_403_FORBIDDEN | ||
|
||
|
||
class TestListClusterStatus: | ||
|
||
async def test_get_cluster_status__empty( | ||
self, client, inject_security_header, unpack_response, synth_session | ||
): | ||
|
||
inject_security_header("[email protected]", Permissions.JOB_SUBMISSIONS_VIEW) | ||
|
||
response = await client.get("/jobbergate/clusters/status") | ||
assert unpack_response(response, check_total=0, check_page=1, check_pages=0) == [] | ||
|
||
async def test_get_cluster_status__list( | ||
self, client, inject_security_header, unpack_response, synth_session | ||
): | ||
|
||
statuses = [ | ||
ClusterStatus(client_id="client-1", interval=10, last_reported=pendulum.datetime(2023, 1, 1)), | ||
ClusterStatus(client_id="client-2", interval=20, last_reported=pendulum.datetime(2023, 1, 1)), | ||
ClusterStatus(client_id="client-3", interval=30, last_reported=pendulum.datetime(2022, 1, 1)), | ||
] | ||
|
||
inject_security_header("[email protected]", Permissions.JOB_SUBMISSIONS_VIEW) | ||
|
||
with pendulum.test(pendulum.datetime(2023, 1, 1)): | ||
synth_session.add_all(statuses) | ||
response = await client.get("/jobbergate/clusters/status") | ||
|
||
assert unpack_response( | ||
response, check_total=3, check_page=1, check_pages=1, key="client_id", sort=True | ||
) == [s.client_id for s in statuses] | ||
|
||
assert unpack_response(response, key="is_healthy") == [True, True, False] | ||
|
||
async def test_get_cluster_status__bad_permission(self, client, inject_security_header, synth_session): | ||
|
||
inject_security_header("[email protected]") | ||
|
||
response = await client.get("/jobbergate/clusters/status") | ||
|
||
assert response.status_code == status.HTTP_403_FORBIDDEN |