From 7c0041bc98e2efadaecc59f8877e3b8ea7a2a0e2 Mon Sep 17 00:00:00 2001 From: Bernhard Mallinger Date: Wed, 11 Dec 2024 14:41:52 +0100 Subject: [PATCH] Fetch logs for 21 day from job start --- CHANGELOG.md | 3 ++ pygeoapi_kubernetes_papermill/common.py | 5 +++ pygeoapi_kubernetes_papermill/log_view.py | 28 ++++++++++++++-- tests/conftest.py | 36 ++++++++++++++++++++ tests/test_argo_manager.py | 40 +++-------------------- tests/test_log_view.py | 12 ++++++- 6 files changed, 84 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d745f6..225af21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 1.6.3 +* Fetch logs for 21 day from job start + ## 1.6.2 * Handle errors from job subscriptions more gracefully diff --git a/pygeoapi_kubernetes_papermill/common.py b/pygeoapi_kubernetes_papermill/common.py index 4fc1d00..828238a 100644 --- a/pygeoapi_kubernetes_papermill/common.py +++ b/pygeoapi_kubernetes_papermill/common.py @@ -401,6 +401,10 @@ def now_str() -> str: return datetime.now(timezone.utc).strftime(DATETIME_FORMAT) +def parse_pygeoapi_datetime(date_string: str) -> datetime: + return datetime.strptime(date_string, DATETIME_FORMAT) + + JobDict = TypedDict( "JobDict", { @@ -408,6 +412,7 @@ def now_str() -> str: "status": str, "result-notebook": str, "message": str, + "job_start_datetime": Optional[str], "job_end_datetime": Optional[str], }, total=False, diff --git a/pygeoapi_kubernetes_papermill/log_view.py b/pygeoapi_kubernetes_papermill/log_view.py index b9a1811..9dd2207 100644 --- a/pygeoapi_kubernetes_papermill/log_view.py +++ b/pygeoapi_kubernetes_papermill/log_view.py @@ -30,13 +30,19 @@ from http import HTTPStatus import logging import itertools -import time +import kubernetes.client.rest +from kubernetes import client as k8s_client from flask import Response import requests # NOTE: this assumes flask_app, which is the default. from pygeoapi.flask_app import APP +from pygeoapi_kubernetes_papermill.argo import ( + K8S_CUSTOM_OBJECT_WORKFLOWS, + job_from_k8s_wf, +) +from pygeoapi_kubernetes_papermill.common import parse_pygeoapi_datetime LOGGER = logging.getLogger(__name__) @@ -62,6 +68,22 @@ def get_job_logs(job_id): else: namespace = api_.manager.namespace + try: + k8s_wf: dict = k8s_client.CustomObjectsApi().get_namespaced_custom_object( + **K8S_CUSTOM_OBJECT_WORKFLOWS, + name=k8s_job_name(job_id=job_id), + namespace=namespace, + ) + except kubernetes.client.rest.ApiException as e: + if e.status == HTTPStatus.NOT_FOUND: + return f"Job {job_id} not found" + else: + raise + + job_dict = job_from_k8s_wf(k8s_wf) + job_start = parse_pygeoapi_datetime(job_dict["job_start_datetime"]) + job_start_ns_unix_time = int(job_start.timestamp() * 1_000_000_000) + # 21 days in ns (default limit in loki is 30 days) query_time_range = 21 * 24 * 60 * 60 * 1_000_000_000 @@ -69,8 +91,8 @@ def get_job_logs(job_id): request_params = { "query": f'{{job="{namespace}/{job_name}"}}', - "start": time.time_ns() - query_time_range, - "end": time.time_ns(), + "start": job_start_ns_unix_time, + "end": job_start_ns_unix_time + query_time_range, } response = requests.get( log_query_endpoint, diff --git a/tests/conftest.py b/tests/conftest.py index 5a8d884..11e8bf0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -64,6 +64,7 @@ def k8s_job() -> k8s_client.V1Job: name=k8s_job_name("test"), annotations={ "pygeoapi.io/result-notebook": "/a/b/a.ipynb", + "pygeoapi.io/job_start_datetime": "2024-12-11T13:22:24.093812Z", }, ), status=k8s_client.V1JobStatus( @@ -81,6 +82,41 @@ def k8s_job_failed(k8s_job: k8s_client.V1Job) -> k8s_client.V1Job: return failed_job +@pytest.fixture() +def workflow() -> dict: + return { + "apiVersion": "argoproj.io/v1alpha1", + "kind": "Workflow", + "metadata": { + "name": "workflow-test-instance-4", + "namespace": "test", + "annotations": { + "pygeoapi.io/identifier": "annotations-identifier", + "pygeoapi.io/job_start_datetime": "2024-12-11T13:22:24.093812Z", + }, + }, + "spec": { + "arguments": {"parameters": [{"name": "inpfile", "value": "test2.txt"}]}, + "entrypoint": "test", + "workflowTemplateRef": {"name": "workflow-template-test"}, + }, + "status": { + "artifactGCStatus": {"notSpecified": True}, + "artifactRepositoryRef": {"artifactRepository": {}, "default": True}, + "conditions": [ + {"status": "False", "type": "PodRunning"}, + {"status": "True", "type": "Completed"}, + ], + "finishedAt": "2024-09-18T12:01:12Z", + "phase": "Succeeded", + "progress": "1/1", + "resourcesDuration": {"cpu": 0, "memory": 3}, + "startedAt": "2024-09-18T12:01:02Z", + "taskResultsCompletionStatus": {"workflow-test-instance-4": True}, + }, + } + + @pytest.fixture() def many_k8s_jobs(k8s_job: k8s_client.V1Job) -> list[k8s_client.V1Job]: def create_job(i: int) -> k8s_client.V1Job: diff --git a/tests/test_argo_manager.py b/tests/test_argo_manager.py index 5fd2bc3..943af14 100644 --- a/tests/test_argo_manager.py +++ b/tests/test_argo_manager.py @@ -172,55 +172,23 @@ def mock_create_workflow(): yield mocker -MOCK_WORKFLOW = { - "apiVersion": "argoproj.io/v1alpha1", - "kind": "Workflow", - "metadata": { - "name": "workflow-test-instance-4", - "namespace": "test", - "annotations": { - "pygeoapi.io/identifier": "annotations-identifier", - }, - }, - "spec": { - "arguments": {"parameters": [{"name": "inpfile", "value": "test2.txt"}]}, - "entrypoint": "test", - "workflowTemplateRef": {"name": "workflow-template-test"}, - }, - "status": { - "artifactGCStatus": {"notSpecified": True}, - "artifactRepositoryRef": {"artifactRepository": {}, "default": True}, - "conditions": [ - {"status": "False", "type": "PodRunning"}, - {"status": "True", "type": "Completed"}, - ], - "finishedAt": "2024-09-18T12:01:12Z", - "phase": "Succeeded", - "progress": "1/1", - "resourcesDuration": {"cpu": 0, "memory": 3}, - "startedAt": "2024-09-18T12:01:02Z", - "taskResultsCompletionStatus": {"workflow-test-instance-4": True}, - }, -} - - @pytest.fixture() -def mock_get_workflow(): +def mock_get_workflow(workflow): # NOTE: mocks same function as get_workflow_template with mock.patch( "pygeoapi_kubernetes_papermill." "kubernetes.k8s_client.CustomObjectsApi.get_namespaced_custom_object", - return_value=MOCK_WORKFLOW, + return_value=workflow, ) as mocker: yield mocker @pytest.fixture() -def mock_list_workflows(): +def mock_list_workflows(workflow): with mock.patch( "pygeoapi_kubernetes_papermill." "kubernetes.k8s_client.CustomObjectsApi.list_namespaced_custom_object", - return_value={"items": [MOCK_WORKFLOW]}, + return_value={"items": [workflow]}, ) as mocker: yield mocker diff --git a/tests/test_log_view.py b/tests/test_log_view.py index 85d81be..e502bec 100644 --- a/tests/test_log_view.py +++ b/tests/test_log_view.py @@ -56,6 +56,16 @@ def mock_loki_request(): yield patcher +@pytest.fixture() +def mock_get_workflow(workflow): + with mock.patch( + "pygeoapi_kubernetes_papermill." + "log_view.k8s_client.CustomObjectsApi.get_namespaced_custom_object", + return_value=workflow, + ) as mocker: + yield mocker + + @pytest.fixture def client(): from pygeoapi.flask_app import APP @@ -69,7 +79,7 @@ def client(): yield client -def test_log_view_returns_log_lines(client, mock_loki_request): +def test_log_view_returns_log_lines(client, mock_loki_request, mock_get_workflow): job_id = "abc-123" response = client.get(f"/jobs/{job_id}/logs")