Skip to content

Commit

Permalink
Implement fetching results from external resource
Browse files Browse the repository at this point in the history
  • Loading branch information
totycro committed Dec 9, 2024
1 parent 4ad75ee commit d185a61
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 7 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.6.0
* Implement fetching results from external resource

## 1.5.9
* Add view to retrieve log output of job from external server

Expand Down
17 changes: 11 additions & 6 deletions pygeoapi_kubernetes_papermill/argo.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import datetime
import logging
from typing import Optional, Any, cast

from kubernetes import client as k8s_client, config as k8s_config

from http import HTTPStatus
import json

from kubernetes import client as k8s_client, config as k8s_config
import requests

import kubernetes.client.rest


Expand Down Expand Up @@ -103,6 +103,7 @@ def __init__(self, manager_def: dict) -> None:
# self.core_api = k8s_client.CoreV1Api()

self.log_query_endpoint: str = manager_def["log_query_endpoint"]
self.results_link_template: str = manager_def["results_link_template"]

def get_jobs(self, status=None, limit=None, offset=None) -> dict:
"""
Expand Down Expand Up @@ -198,7 +199,7 @@ def update_job(self, processid, job_id, update_dict):
# we could update the metadata by changing the job annotations
raise NotImplementedError("Currently there's no use case for updating k8s jobs")

def get_job_result(self, job_id) -> tuple[Optional[Any], Optional[str]]:
def get_job_result(self, job_id) -> tuple[Optional[str], Optional[Any]]:
"""
Returns the actual output from a completed process
Expand All @@ -207,8 +208,12 @@ def get_job_result(self, job_id) -> tuple[Optional[Any], Optional[str]]:
:returns: `tuple` of mimetype and raw output
"""

# TODO: fetch from argo somehow
raise NotImplementedError
resolved_url = self.results_link_template.format(job_id=job_id)
LOGGER.debug(f"Fetching job result from {resolved_url}")

response = requests.get(resolved_url)
response.raise_for_status()
return response.headers.get("content-type"), response.content

def delete_job(self, job_id) -> bool:
"""
Expand Down
32 changes: 31 additions & 1 deletion tests/test_argo_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
#
# =================================================================

from http import HTTPStatus
from unittest import mock
import json

import requests
import pytest
from kubernetes import client as k8s_client

Expand All @@ -40,7 +42,12 @@
@pytest.fixture()
def manager(mock_k8s_base) -> ArgoManager:
man = ArgoManager(
{"name": "kman", "skip_k8s_setup": True, "log_query_endpoint": ""}
{
"name": "kman",
"skip_k8s_setup": True,
"log_query_endpoint": "",
"results_link_template": "https://example.com/a/{job_id}",
}
)
man.get_processor = lambda *args, **kwargs: ArgoProcessor(
{"name": "", "workflow_template": "mywf"}
Expand Down Expand Up @@ -132,6 +139,29 @@ def test_inputs_retrieved_from_workflow_template(
assert processor.metadata["inputs"]["param-optional"]["minOccurs"] == 0


@pytest.fixture()
def mock_fetch_job_result():
response = requests.Response()
response.status_code = HTTPStatus.OK
response.headers["content-type"] = "application/json"
response._content = b"{'a': 3}"

with mock.patch(
"pygeoapi_kubernetes_papermill.argo.requests.get",
return_value=response,
) as patcher:
yield patcher


def test_result_link_is_resolved(manager: ArgoManager, mock_fetch_job_result):
job_id = "abc-123"
result = manager.get_job_result(job_id)

assert result == ("application/json", b"{'a': 3}")

mock_fetch_job_result.assert_called_with("https://example.com/a/abc-123")


@pytest.fixture()
def mock_create_workflow():
with mock.patch(
Expand Down

0 comments on commit d185a61

Please sign in to comment.