Skip to content

Commit

Permalink
Fetch logs for 21 day from job start
Browse files Browse the repository at this point in the history
  • Loading branch information
totycro committed Dec 11, 2024
1 parent f238e39 commit 7c0041b
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 40 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.6.3
* Fetch logs for 21 day from job start

## 1.6.2
* Handle errors from job subscriptions more gracefully

Expand Down
5 changes: 5 additions & 0 deletions pygeoapi_kubernetes_papermill/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,18 @@ def now_str() -> str:
return datetime.now(timezone.utc).strftime(DATETIME_FORMAT)


def parse_pygeoapi_datetime(date_string: str) -> datetime:
return datetime.strptime(date_string, DATETIME_FORMAT)


JobDict = TypedDict(
"JobDict",
{
"identifier": str,
"status": str,
"result-notebook": str,
"message": str,
"job_start_datetime": Optional[str],
"job_end_datetime": Optional[str],
},
total=False,
Expand Down
28 changes: 25 additions & 3 deletions pygeoapi_kubernetes_papermill/log_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@
from http import HTTPStatus
import logging
import itertools
import time

import kubernetes.client.rest
from kubernetes import client as k8s_client
from flask import Response
import requests

# NOTE: this assumes flask_app, which is the default.
from pygeoapi.flask_app import APP
from pygeoapi_kubernetes_papermill.argo import (
K8S_CUSTOM_OBJECT_WORKFLOWS,
job_from_k8s_wf,
)
from pygeoapi_kubernetes_papermill.common import parse_pygeoapi_datetime


LOGGER = logging.getLogger(__name__)
Expand All @@ -62,15 +68,31 @@ def get_job_logs(job_id):
else:
namespace = api_.manager.namespace

try:
k8s_wf: dict = k8s_client.CustomObjectsApi().get_namespaced_custom_object(
**K8S_CUSTOM_OBJECT_WORKFLOWS,
name=k8s_job_name(job_id=job_id),
namespace=namespace,
)
except kubernetes.client.rest.ApiException as e:
if e.status == HTTPStatus.NOT_FOUND:
return f"Job {job_id} not found"
else:
raise

job_dict = job_from_k8s_wf(k8s_wf)
job_start = parse_pygeoapi_datetime(job_dict["job_start_datetime"])
job_start_ns_unix_time = int(job_start.timestamp() * 1_000_000_000)

# 21 days in ns (default limit in loki is 30 days)
query_time_range = 21 * 24 * 60 * 60 * 1_000_000_000

job_name = k8s_job_name(job_id)

request_params = {
"query": f'{{job="{namespace}/{job_name}"}}',
"start": time.time_ns() - query_time_range,
"end": time.time_ns(),
"start": job_start_ns_unix_time,
"end": job_start_ns_unix_time + query_time_range,
}
response = requests.get(
log_query_endpoint,
Expand Down
36 changes: 36 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def k8s_job() -> k8s_client.V1Job:
name=k8s_job_name("test"),
annotations={
"pygeoapi.io/result-notebook": "/a/b/a.ipynb",
"pygeoapi.io/job_start_datetime": "2024-12-11T13:22:24.093812Z",
},
),
status=k8s_client.V1JobStatus(
Expand All @@ -81,6 +82,41 @@ def k8s_job_failed(k8s_job: k8s_client.V1Job) -> k8s_client.V1Job:
return failed_job


@pytest.fixture()
def workflow() -> dict:
return {
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
"metadata": {
"name": "workflow-test-instance-4",
"namespace": "test",
"annotations": {
"pygeoapi.io/identifier": "annotations-identifier",
"pygeoapi.io/job_start_datetime": "2024-12-11T13:22:24.093812Z",
},
},
"spec": {
"arguments": {"parameters": [{"name": "inpfile", "value": "test2.txt"}]},
"entrypoint": "test",
"workflowTemplateRef": {"name": "workflow-template-test"},
},
"status": {
"artifactGCStatus": {"notSpecified": True},
"artifactRepositoryRef": {"artifactRepository": {}, "default": True},
"conditions": [
{"status": "False", "type": "PodRunning"},
{"status": "True", "type": "Completed"},
],
"finishedAt": "2024-09-18T12:01:12Z",
"phase": "Succeeded",
"progress": "1/1",
"resourcesDuration": {"cpu": 0, "memory": 3},
"startedAt": "2024-09-18T12:01:02Z",
"taskResultsCompletionStatus": {"workflow-test-instance-4": True},
},
}


@pytest.fixture()
def many_k8s_jobs(k8s_job: k8s_client.V1Job) -> list[k8s_client.V1Job]:
def create_job(i: int) -> k8s_client.V1Job:
Expand Down
40 changes: 4 additions & 36 deletions tests/test_argo_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,55 +172,23 @@ def mock_create_workflow():
yield mocker


MOCK_WORKFLOW = {
"apiVersion": "argoproj.io/v1alpha1",
"kind": "Workflow",
"metadata": {
"name": "workflow-test-instance-4",
"namespace": "test",
"annotations": {
"pygeoapi.io/identifier": "annotations-identifier",
},
},
"spec": {
"arguments": {"parameters": [{"name": "inpfile", "value": "test2.txt"}]},
"entrypoint": "test",
"workflowTemplateRef": {"name": "workflow-template-test"},
},
"status": {
"artifactGCStatus": {"notSpecified": True},
"artifactRepositoryRef": {"artifactRepository": {}, "default": True},
"conditions": [
{"status": "False", "type": "PodRunning"},
{"status": "True", "type": "Completed"},
],
"finishedAt": "2024-09-18T12:01:12Z",
"phase": "Succeeded",
"progress": "1/1",
"resourcesDuration": {"cpu": 0, "memory": 3},
"startedAt": "2024-09-18T12:01:02Z",
"taskResultsCompletionStatus": {"workflow-test-instance-4": True},
},
}


@pytest.fixture()
def mock_get_workflow():
def mock_get_workflow(workflow):
# NOTE: mocks same function as get_workflow_template
with mock.patch(
"pygeoapi_kubernetes_papermill."
"kubernetes.k8s_client.CustomObjectsApi.get_namespaced_custom_object",
return_value=MOCK_WORKFLOW,
return_value=workflow,
) as mocker:
yield mocker


@pytest.fixture()
def mock_list_workflows():
def mock_list_workflows(workflow):
with mock.patch(
"pygeoapi_kubernetes_papermill."
"kubernetes.k8s_client.CustomObjectsApi.list_namespaced_custom_object",
return_value={"items": [MOCK_WORKFLOW]},
return_value={"items": [workflow]},
) as mocker:
yield mocker

Expand Down
12 changes: 11 additions & 1 deletion tests/test_log_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ def mock_loki_request():
yield patcher


@pytest.fixture()
def mock_get_workflow(workflow):
with mock.patch(
"pygeoapi_kubernetes_papermill."
"log_view.k8s_client.CustomObjectsApi.get_namespaced_custom_object",
return_value=workflow,
) as mocker:
yield mocker


@pytest.fixture
def client():
from pygeoapi.flask_app import APP
Expand All @@ -69,7 +79,7 @@ def client():
yield client


def test_log_view_returns_log_lines(client, mock_loki_request):
def test_log_view_returns_log_lines(client, mock_loki_request, mock_get_workflow):
job_id = "abc-123"

response = client.get(f"/jobs/{job_id}/logs")
Expand Down

0 comments on commit 7c0041b

Please sign in to comment.