From dfdc091bef072efab8f449a9cc92946dc99981c9 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 29 Nov 2024 16:21:16 +0100 Subject: [PATCH 01/18] Issue #936 initial helper to build calrissian k8s job body based on eu-cdse/openeo-cdse-infra#267 --- openeogeotrellis/config/config.py | 3 + openeogeotrellis/integrations/calrissian.py | 115 ++++++++++++++++++++ tests/integrations/test_calrissian.py | 104 ++++++++++++++++++ 3 files changed, 222 insertions(+) create mode 100644 openeogeotrellis/integrations/calrissian.py create mode 100644 tests/integrations/test_calrissian.py diff --git a/openeogeotrellis/config/config.py b/openeogeotrellis/config/config.py index 333718c5e..17f3f4d34 100644 --- a/openeogeotrellis/config/config.py +++ b/openeogeotrellis/config/config.py @@ -246,3 +246,6 @@ class GpsBackendConfig(OpenEoBackendConfig): gdalinfo_python_call: bool = False gdalinfo_use_subprocess: bool = True # TODO: Only keep one gdalinfo on true gdalinfo_use_python_subprocess: bool = False + + calrissian_namespace: Optional[str] = None + calrissian_image: Optional[str] = "ghcr.io/duke-gcb/calrissian/calrissian:0.17.1" diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py new file mode 100644 index 000000000..f2db9729c --- /dev/null +++ b/openeogeotrellis/integrations/calrissian.py @@ -0,0 +1,115 @@ +from typing import Optional + +from openeo_driver.utils import generate_unique_id +import dataclasses + +from openeogeotrellis.config import get_backend_config + + +@dataclasses.dataclass(frozen=True) +class VolumeInfo: + name: str + claim_name: str + mount_path: str + read_only: Optional[bool] = None + + +def create_cwl_job_body( + *, + # TODO: don't define a default namespace here + namespace: Optional[str] = None, + name: Optional[str] = None, +) -> "kubernetes.client.V1Job": + import kubernetes.client + + name = name or generate_unique_id(prefix="cj") + namespace = namespace or get_backend_config().calrissian_namespace + container_image = get_backend_config().calrissian_image + if not namespace or not container_image: + raise ValueError(f"Must be set: {namespace=}, {container_image=}") + + # TODO: config for this + security_context = kubernetes.client.V1SecurityContext(run_as_user=1000, run_as_group=1000) + + volumes = [ + VolumeInfo( + name="calrissian-input-data", + claim_name="calrissian-input-data", + mount_path="/calrissian/input-data", + read_only=True, + ), + VolumeInfo( + name="calrissian-tmpout", + claim_name="calrissian-tmpout", + mount_path="/calrissian/tmpout", + ), + VolumeInfo( + name="calrissian-output-data", + claim_name="calrissian-output-data", + mount_path="/calrissian/output-data", + ), + ] + + calrissian_arguments = [ + "--max-ram", + "2G", + "--max-cores", + "1", + "--debug", + "--tmp-outdir-prefix", + "/calrissian/tmpout/", + "--outdir", + "/calrissian/output-data/", + "/calrissian/input-data/hello-workflow.cwl", + "/calrissian/input-data/hello-input.yaml", + ] + + body = kubernetes.client.V1Job( + metadata=kubernetes.client.V1ObjectMeta( + name=name, + namespace=namespace, + ), + spec=kubernetes.client.V1JobSpec( + template=kubernetes.client.V1PodTemplateSpec( + spec=kubernetes.client.V1PodSpec( + containers=[ + kubernetes.client.V1Container( + name="calrissian", + image=container_image, + security_context=security_context, + command=["calrissian"], + args=calrissian_arguments, + volume_mounts=[ + kubernetes.client.V1VolumeMount( + name=v.name, + mount_path=v.mount_path, + read_only=v.read_only, + ) + for v in volumes + ], + env=[ + kubernetes.client.V1EnvVar( + name="CALRISSIAN_POD_NAME", + value_from=kubernetes.client.V1EnvVarSource( + field_ref=kubernetes.client.V1ObjectFieldSelector(field_path="metadata.name") + ), + ) + ], + ) + ], + restart_policy="Never", + volumes=[ + kubernetes.client.V1Volume( + name=v.name, + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( + claim_name=v.claim_name, + read_only=v.read_only, + ), + ) + for v in volumes + ], + ) + ) + ), + ) + return body diff --git a/tests/integrations/test_calrissian.py b/tests/integrations/test_calrissian.py new file mode 100644 index 000000000..f5a230bd3 --- /dev/null +++ b/tests/integrations/test_calrissian.py @@ -0,0 +1,104 @@ +import dirty_equals +from openeogeotrellis.integrations.calrissian import create_cwl_job_body + + +def test_create_cwl_job_body(): + body = create_cwl_job_body(namespace="calrissian-test") + + assert body.to_dict() == { + "api_version": None, + "kind": None, + "metadata": dirty_equals.IsPartialDict( + { + "name": dirty_equals.IsStr(regex="cj-2.*"), + "namespace": "calrissian-test", + } + ), + "spec": dirty_equals.IsPartialDict( + { + "template": dirty_equals.IsPartialDict( + { + "spec": dirty_equals.IsPartialDict( + { + "containers": [ + dirty_equals.IsPartialDict( + { + "args": [ + "--max-ram", + "2G", + "--max-cores", + "1", + "--debug", + "--tmp-outdir-prefix", + "/calrissian/tmpout/", + "--outdir", + "/calrissian/output-data/", + "/calrissian/input-data/hello-workflow.cwl", + "/calrissian/input-data/hello-input.yaml", + ], + "command": ["calrissian"], + "image": "ghcr.io/duke-gcb/calrissian/calrissian:0.17.1", + "volume_mounts": [ + dirty_equals.IsPartialDict( + { + "mount_path": "/calrissian/input-data", + "name": "calrissian-input-data", + "read_only": True, + } + ), + dirty_equals.IsPartialDict( + { + "mount_path": "/calrissian/tmpout", + "name": "calrissian-tmpout", + "read_only": None, + } + ), + dirty_equals.IsPartialDict( + { + "mount_path": "/calrissian/output-data", + "name": "calrissian-output-data", + "read_only": None, + } + ), + ], + "working_dir": None, + } + ) + ], + "volumes": [ + dirty_equals.IsPartialDict( + { + "name": "calrissian-input-data", + "persistent_volume_claim": { + "claim_name": "calrissian-input-data", + "read_only": True, + }, + } + ), + dirty_equals.IsPartialDict( + { + "name": "calrissian-tmpout", + "persistent_volume_claim": { + "claim_name": "calrissian-tmpout", + "read_only": None, + }, + } + ), + dirty_equals.IsPartialDict( + { + "name": "calrissian-output-data", + "persistent_volume_claim": { + "claim_name": "calrissian-output-data", + "read_only": None, + }, + } + ), + ], + } + ), + } + ), + } + ), + "status": None, + } From 1091dd9274d787c15a34caddfaa5d6a742196dbf Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 29 Nov 2024 22:31:39 +0100 Subject: [PATCH 02/18] Issue #936 add launch_cwl_job_and_wait --- openeogeotrellis/integrations/calrissian.py | 106 ++++++++++++++------ openeogeotrellis/job_tracker_v2.py | 1 + 2 files changed, 78 insertions(+), 29 deletions(-) diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index f2db9729c..c62d41d4c 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -1,11 +1,15 @@ +import logging +import dataclasses +import time from typing import Optional +from openeo.util import ContextTimer from openeo_driver.utils import generate_unique_id -import dataclasses - from openeogeotrellis.config import get_backend_config +_log = logging.getLogger(__name__) + @dataclasses.dataclass(frozen=True) class VolumeInfo: name: str @@ -16,9 +20,9 @@ class VolumeInfo: def create_cwl_job_body( *, - # TODO: don't define a default namespace here namespace: Optional[str] = None, name: Optional[str] = None, + # TODO: arguments to set an actual CWL workflow and inputs ) -> "kubernetes.client.V1Job": import kubernetes.client @@ -28,7 +32,7 @@ def create_cwl_job_body( if not namespace or not container_image: raise ValueError(f"Must be set: {namespace=}, {container_image=}") - # TODO: config for this + # TODO: config for this? security_context = kubernetes.client.V1SecurityContext(run_as_user=1000, run_as_group=1000) volumes = [ @@ -64,6 +68,29 @@ def create_cwl_job_body( "/calrissian/input-data/hello-input.yaml", ] + container = kubernetes.client.V1Container( + name="calrissian", + image=container_image, + security_context=security_context, + command=["calrissian"], + args=calrissian_arguments, + volume_mounts=[ + kubernetes.client.V1VolumeMount( + name=v.name, + mount_path=v.mount_path, + read_only=v.read_only, + ) + for v in volumes + ], + env=[ + kubernetes.client.V1EnvVar( + name="CALRISSIAN_POD_NAME", + value_from=kubernetes.client.V1EnvVarSource( + field_ref=kubernetes.client.V1ObjectFieldSelector(field_path="metadata.name") + ), + ) + ], + ) body = kubernetes.client.V1Job( metadata=kubernetes.client.V1ObjectMeta( name=name, @@ -72,31 +99,7 @@ def create_cwl_job_body( spec=kubernetes.client.V1JobSpec( template=kubernetes.client.V1PodTemplateSpec( spec=kubernetes.client.V1PodSpec( - containers=[ - kubernetes.client.V1Container( - name="calrissian", - image=container_image, - security_context=security_context, - command=["calrissian"], - args=calrissian_arguments, - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=v.name, - mount_path=v.mount_path, - read_only=v.read_only, - ) - for v in volumes - ], - env=[ - kubernetes.client.V1EnvVar( - name="CALRISSIAN_POD_NAME", - value_from=kubernetes.client.V1EnvVarSource( - field_ref=kubernetes.client.V1ObjectFieldSelector(field_path="metadata.name") - ), - ) - ], - ) - ], + containers=[container], restart_policy="Never", volumes=[ kubernetes.client.V1Volume( @@ -113,3 +116,48 @@ def create_cwl_job_body( ), ) return body + + +def launch_cwl_job_and_wait( + body: "kubernetes.client.V1Job", + *, + namespace: str, + sleep: float = 10, + timeout: float = 300, +) -> "kubernetes.client.V1Job": + import kubernetes.client + + k8s_batch = kubernetes.client.BatchV1Api() + + # Launch job. + job: kubernetes.client.V1Job = k8s_batch.create_namespaced_job( + namespace=namespace, + body=body, + ) + job_name = job.metadata.name + _log.info( + f"Created CWL job {job.metadata.name=} {job.metadata.namespace=} {job.metadata.creation_timestamp=} {job.metadata.uid=}" + ) + + # Track job status. + final_status = None + with ContextTimer() as timer: + while timer.elapsed() < timeout: + job: kubernetes.client.V1Job = k8s_batch.read_namespaced_job(name=job_name, namespace=namespace) + _log.info(f"CWL job {job_name=} {timer.elapsed()=:.2f} {job.status=}") + if job.status.conditions: + if any(c.type == "Failed" and c.status == "True" for c in job.status.conditions): + final_status = "failed" + break + elif any(c.type == "Complete" and c.status == "True" for c in job.status.conditions): + final_status = "complete" + break + time.sleep(sleep) + + _log.info(f"CWL job {job_name=} {timer.elapsed()=:.2f} {final_status=}") + if not final_status: + raise TimeoutError(f"CWL Job {job_name} did not finish within {timeout}s") + + # TODO: raise Exception if final status is "failed" too? + + return job diff --git a/openeogeotrellis/job_tracker_v2.py b/openeogeotrellis/job_tracker_v2.py index 556854b0e..dae784a47 100644 --- a/openeogeotrellis/job_tracker_v2.py +++ b/openeogeotrellis/job_tracker_v2.py @@ -285,6 +285,7 @@ def get_job_metadata(self, job_id: str, user_id: str, app_id: str) -> _JobMetada metadata = self._kubernetes_api.get_namespaced_custom_object( group="sparkoperator.k8s.io", version="v1beta2", + # TODO: this namespace should come from job metadata, not config namespace=ConfigParams().pod_namespace, plural="sparkapplications", name=app_id, From 31b5be3b588371ea888055fd71424de377b5a3f0 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 2 Dec 2024 16:22:16 +0100 Subject: [PATCH 03/18] Issue #936 ad-hoc attempt to first run input staging job --- openeogeotrellis/deploy/kube.py | 23 +++++ openeogeotrellis/integrations/calrissian.py | 91 +++++++++++++++++-- .../resources/calrissian/hello-world.cwl | 15 +++ 3 files changed, 122 insertions(+), 7 deletions(-) create mode 100644 openeogeotrellis/integrations/resources/calrissian/hello-world.cwl diff --git a/openeogeotrellis/deploy/kube.py b/openeogeotrellis/deploy/kube.py index 7876719f5..b4bfc7321 100644 --- a/openeogeotrellis/deploy/kube.py +++ b/openeogeotrellis/deploy/kube.py @@ -84,6 +84,29 @@ def on_started(): } ) + @app.route("/tmp/ogd936", methods=["GET"]) + def tmp_ogd936(): + """Temporary endpoint to play with Calrissian based CWL job management""" + import kubernetes + from openeogeotrellis.integrations.calrissian import ( + create_cwl_job_body, + launch_cwl_job_and_wait, + create_input_staging_job_body, + ) + + namespace = "calrissian-demo-project" + kubernetes.config.load_incluster_config() + + # Input staging + body = create_input_staging_job_body(namespace=namespace) + res = launch_cwl_job_and_wait(body=body, namespace=namespace) + + # CWL job + body = create_cwl_job_body(namespace=namespace) + res = launch_cwl_job_and_wait(body=body, namespace=namespace) + + return f"Hello from the backend: {res!r}" + host = os.environ.get('SPARK_LOCAL_IP', None) if host is None: host, _ = get_socket() diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index c62d41d4c..0bbbaf718 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -18,15 +18,90 @@ class VolumeInfo: read_only: Optional[bool] = None +def create_input_staging_job_body( + *, + namespace: Optional[str] = None, +) -> "kubernetes.client.V1Job": + """ + Input staging job to put CWL resources on the input data volume. + """ + import kubernetes.client + + name = generate_unique_id(prefix="cjs") + namespace = namespace or get_backend_config().calrissian_namespace + assert namespace + + # TODO: config for this? + security_context = kubernetes.client.V1SecurityContext(run_as_user=1000, run_as_group=1000) + + volumes = [ + VolumeInfo( + name="calrissian-input-data", + claim_name="calrissian-input-data", + mount_path="/calrissian/input-data", + read_only=True, + ), + ] + + container = kubernetes.client.V1Container( + name="calrissian-input-staging", + image="alpine:3", + security_context=security_context, + command=["/bin/sh"], + args=[ + "-c", + "; ".join( + [ + "set -euxo pipefail", + "wget -O /tmp/calrissian-resources.tar.gz https://artifactory.vgt.vito.be/artifactory/auxdata-public/openeo/calrissian-resources/calrissian-resources.tar.gz", + "tar -xzvf /tmp/calrissian-resources.tar.gz -C /calrissian/input-data", + "ls -al /calrissian/input-data", + ] + ), + ], + volume_mounts=[ + kubernetes.client.V1VolumeMount( + name=v.name, + mount_path=v.mount_path, + read_only=v.read_only, + ) + for v in volumes + ], + ) + body = kubernetes.client.V1Job( + metadata=kubernetes.client.V1ObjectMeta( + name=name, + namespace=namespace, + ), + spec=kubernetes.client.V1JobSpec( + template=kubernetes.client.V1PodTemplateSpec( + spec=kubernetes.client.V1PodSpec( + containers=[container], + restart_policy="Never", + volumes=[ + kubernetes.client.V1Volume( + name=v.name, + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( + claim_name=v.claim_name, + read_only=v.read_only, + ), + ) + for v in volumes + ], + ) + ) + ), + ) + return body + def create_cwl_job_body( *, namespace: Optional[str] = None, - name: Optional[str] = None, # TODO: arguments to set an actual CWL workflow and inputs ) -> "kubernetes.client.V1Job": import kubernetes.client - name = name or generate_unique_id(prefix="cj") + name = generate_unique_id(prefix="cj") namespace = namespace or get_backend_config().calrissian_namespace container_image = get_backend_config().calrissian_image if not namespace or not container_image: @@ -54,6 +129,7 @@ def create_cwl_job_body( ), ] + calrissian_arguments = [ "--max-ram", "2G", @@ -64,12 +140,13 @@ def create_cwl_job_body( "/calrissian/tmpout/", "--outdir", "/calrissian/output-data/", - "/calrissian/input-data/hello-workflow.cwl", - "/calrissian/input-data/hello-input.yaml", + "/calrissian/input-data/hello-world.cwl", + "--message", + "Hello EO world!", ] container = kubernetes.client.V1Container( - name="calrissian", + name="calrissian-job", image=container_image, security_context=security_context, command=["calrissian"], @@ -122,8 +199,8 @@ def launch_cwl_job_and_wait( body: "kubernetes.client.V1Job", *, namespace: str, - sleep: float = 10, - timeout: float = 300, + sleep: float = 5, + timeout: float = 60, ) -> "kubernetes.client.V1Job": import kubernetes.client diff --git a/openeogeotrellis/integrations/resources/calrissian/hello-world.cwl b/openeogeotrellis/integrations/resources/calrissian/hello-world.cwl new file mode 100644 index 000000000..24bd9adad --- /dev/null +++ b/openeogeotrellis/integrations/resources/calrissian/hello-world.cwl @@ -0,0 +1,15 @@ +cwlVersion: v1.2 +class: CommandLineTool +baseCommand: echo +stdout: hello-stdout.txt + +inputs: + message: + type: string + default: "Hello World" + inputBinding: + position: 1 + +outputs: + stdout: + type: stdout From fb23a49fc525412623467a6bb23ba6301ba1c8da Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 4 Dec 2024 11:59:23 +0100 Subject: [PATCH 04/18] Issue #936 encapsulate logic in CalrissianJobLauncher class for better code/value reuse --- openeogeotrellis/deploy/__init__.py | 1 + openeogeotrellis/deploy/kube.py | 28 +- openeogeotrellis/integrations/calrissian.py | 440 ++++++++++---------- 3 files changed, 243 insertions(+), 226 deletions(-) diff --git a/openeogeotrellis/deploy/__init__.py b/openeogeotrellis/deploy/__init__.py index 7cc4c331d..b6448f52d 100644 --- a/openeogeotrellis/deploy/__init__.py +++ b/openeogeotrellis/deploy/__init__.py @@ -20,6 +20,7 @@ def load_custom_processes(logger=_log, _name="custom_processes"): """Try loading optional `custom_processes` module""" + # TODO: use backend_config instead of env var if path := os.environ.get("OPENEO_CUSTOM_PROCESSES"): # Directly load custom processes from OPENEO_CUSTOM_PROCESSES logger.debug(f"load_custom_processes: trying exec loading {path!r}") diff --git a/openeogeotrellis/deploy/kube.py b/openeogeotrellis/deploy/kube.py index b4bfc7321..7004a7aa2 100644 --- a/openeogeotrellis/deploy/kube.py +++ b/openeogeotrellis/deploy/kube.py @@ -6,7 +6,12 @@ import os from openeo_driver.server import run_gunicorn -from openeo_driver.util.logging import get_logging_config, setup_logging, LOG_HANDLER_STDERR_JSON +from openeo_driver.util.logging import ( + get_logging_config, + setup_logging, + LOG_HANDLER_STDERR_JSON, + FlaskRequestCorrelationIdLogging, +) from openeo_driver.views import build_app from openeogeotrellis import deploy from openeogeotrellis.config import get_backend_config @@ -87,23 +92,24 @@ def on_started(): @app.route("/tmp/ogd936", methods=["GET"]) def tmp_ogd936(): """Temporary endpoint to play with Calrissian based CWL job management""" - import kubernetes - from openeogeotrellis.integrations.calrissian import ( - create_cwl_job_body, - launch_cwl_job_and_wait, - create_input_staging_job_body, - ) + import kubernetes.config + from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher + + request_id = FlaskRequestCorrelationIdLogging.get_request_id() + name_base = request_id[:20] namespace = "calrissian-demo-project" kubernetes.config.load_incluster_config() + launcher = CalrissianJobLauncher(namespace=namespace, name_base=name_base) + # Input staging - body = create_input_staging_job_body(namespace=namespace) - res = launch_cwl_job_and_wait(body=body, namespace=namespace) + input_staging_manifest = launcher.create_input_staging_job_manifest() + res = launcher.launch_job_and_wait(manifest=input_staging_manifest) # CWL job - body = create_cwl_job_body(namespace=namespace) - res = launch_cwl_job_and_wait(body=body, namespace=namespace) + cwl_manifest = launcher.create_cwl_job_manifest() + res = launcher.launch_job_and_wait(manifest=cwl_manifest) return f"Hello from the backend: {res!r}" diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index 0bbbaf718..0760d624c 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -7,6 +7,7 @@ from openeo_driver.utils import generate_unique_id from openeogeotrellis.config import get_backend_config +import kubernetes.client _log = logging.getLogger(__name__) @@ -18,223 +19,232 @@ class VolumeInfo: read_only: Optional[bool] = None -def create_input_staging_job_body( - *, - namespace: Optional[str] = None, -) -> "kubernetes.client.V1Job": +class CalrissianJobLauncher: """ - Input staging job to put CWL resources on the input data volume. + Helper class to launch a Calrissian job on Kubernetes. """ - import kubernetes.client - - name = generate_unique_id(prefix="cjs") - namespace = namespace or get_backend_config().calrissian_namespace - assert namespace - - # TODO: config for this? - security_context = kubernetes.client.V1SecurityContext(run_as_user=1000, run_as_group=1000) - - volumes = [ - VolumeInfo( - name="calrissian-input-data", - claim_name="calrissian-input-data", - mount_path="/calrissian/input-data", - read_only=True, - ), - ] - - container = kubernetes.client.V1Container( - name="calrissian-input-staging", - image="alpine:3", - security_context=security_context, - command=["/bin/sh"], - args=[ - "-c", - "; ".join( - [ - "set -euxo pipefail", - "wget -O /tmp/calrissian-resources.tar.gz https://artifactory.vgt.vito.be/artifactory/auxdata-public/openeo/calrissian-resources/calrissian-resources.tar.gz", - "tar -xzvf /tmp/calrissian-resources.tar.gz -C /calrissian/input-data", - "ls -al /calrissian/input-data", - ] + + def __init__( + self, + *, + namespace: Optional[str] = None, + name_base: Optional[str] = None, + backoff_limit: int = 1, + ): + self._namespace = namespace or get_backend_config().calrissian_namespace + assert self._namespace + self._name_base = name_base + self._backoff_limit = backoff_limit + + # TODO: config for this? + self._security_context = kubernetes.client.V1SecurityContext(run_as_user=1000, run_as_group=1000) + + def _get_name(self, affix: str) -> str: + return f"{self._name_base}-{affix}" if self._name_base else generate_unique_id(prefix=affix) + + def create_input_staging_job_manifest(self) -> kubernetes.client.V1Job: + """ + Create a k8s manifest for a Calrissian input staging job. + """ + name = self._get_name("cal-input") + _log.info("Creating input staging job manifest: {name=}") + + volumes = [ + # TODO: build these volume infos in init? + VolumeInfo( + name="calrissian-input-data", + claim_name="calrissian-input-data", + mount_path="/calrissian/input-data", + # TODO: note: no read_only here. Instead do input staging as part of deployment, instead of on the fly? ), - ], - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=v.name, - mount_path=v.mount_path, - read_only=v.read_only, - ) - for v in volumes - ], - ) - body = kubernetes.client.V1Job( - metadata=kubernetes.client.V1ObjectMeta( - name=name, - namespace=namespace, - ), - spec=kubernetes.client.V1JobSpec( - template=kubernetes.client.V1PodTemplateSpec( - spec=kubernetes.client.V1PodSpec( - containers=[container], - restart_policy="Never", - volumes=[ - kubernetes.client.V1Volume( - name=v.name, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=v.claim_name, - read_only=v.read_only, - ), - ) - for v in volumes - ], - ) - ) - ), - ) - return body - -def create_cwl_job_body( - *, - namespace: Optional[str] = None, - # TODO: arguments to set an actual CWL workflow and inputs -) -> "kubernetes.client.V1Job": - import kubernetes.client - - name = generate_unique_id(prefix="cj") - namespace = namespace or get_backend_config().calrissian_namespace - container_image = get_backend_config().calrissian_image - if not namespace or not container_image: - raise ValueError(f"Must be set: {namespace=}, {container_image=}") - - # TODO: config for this? - security_context = kubernetes.client.V1SecurityContext(run_as_user=1000, run_as_group=1000) - - volumes = [ - VolumeInfo( - name="calrissian-input-data", - claim_name="calrissian-input-data", - mount_path="/calrissian/input-data", - read_only=True, - ), - VolumeInfo( - name="calrissian-tmpout", - claim_name="calrissian-tmpout", - mount_path="/calrissian/tmpout", - ), - VolumeInfo( - name="calrissian-output-data", - claim_name="calrissian-output-data", - mount_path="/calrissian/output-data", - ), - ] - - - calrissian_arguments = [ - "--max-ram", - "2G", - "--max-cores", - "1", - "--debug", - "--tmp-outdir-prefix", - "/calrissian/tmpout/", - "--outdir", - "/calrissian/output-data/", - "/calrissian/input-data/hello-world.cwl", - "--message", - "Hello EO world!", - ] - - container = kubernetes.client.V1Container( - name="calrissian-job", - image=container_image, - security_context=security_context, - command=["calrissian"], - args=calrissian_arguments, - volume_mounts=[ - kubernetes.client.V1VolumeMount( - name=v.name, - mount_path=v.mount_path, - read_only=v.read_only, - ) - for v in volumes - ], - env=[ - kubernetes.client.V1EnvVar( - name="CALRISSIAN_POD_NAME", - value_from=kubernetes.client.V1EnvVarSource( - field_ref=kubernetes.client.V1ObjectFieldSelector(field_path="metadata.name") + ] + + container = kubernetes.client.V1Container( + name="calrissian-input-staging", + image="alpine:3", + security_context=self._security_context, + command=["/bin/sh"], + args=[ + "-c", + "; ".join( + [ + "set -euxo pipefail", + # TODO: better way to deploy and fetch these resources? + "wget -O /tmp/calrissian-resources.tar.gz https://artifactory.vgt.vito.be/artifactory/auxdata-public/openeo/calrissian-resources/calrissian-resources.tar.gz", + "tar -xzvf /tmp/calrissian-resources.tar.gz -C /calrissian/input-data", + "ls -al /calrissian/input-data", + ] + ), + ], + volume_mounts=[ + kubernetes.client.V1VolumeMount(name=v.name, mount_path=v.mount_path, read_only=v.read_only) + for v in volumes + ], + ) + manifest = kubernetes.client.V1Job( + metadata=kubernetes.client.V1ObjectMeta( + name=name, + namespace=self._namespace, + ), + spec=kubernetes.client.V1JobSpec( + template=kubernetes.client.V1PodTemplateSpec( + spec=kubernetes.client.V1PodSpec( + containers=[container], + restart_policy="Never", + volumes=[ + kubernetes.client.V1Volume( + name=v.name, + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( + claim_name=v.claim_name, + read_only=v.read_only, + ), + ) + for v in volumes + ], + ) ), - ) - ], - ) - body = kubernetes.client.V1Job( - metadata=kubernetes.client.V1ObjectMeta( - name=name, - namespace=namespace, - ), - spec=kubernetes.client.V1JobSpec( - template=kubernetes.client.V1PodTemplateSpec( - spec=kubernetes.client.V1PodSpec( - containers=[container], - restart_policy="Never", - volumes=[ - kubernetes.client.V1Volume( - name=v.name, - persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=v.claim_name, - read_only=v.read_only, - ), - ) - for v in volumes - ], + backoff_limit=self._backoff_limit, + ), + ) + return manifest + + def create_cwl_job_manifest( + self, + # TODO: arguments to set an actual CWL workflow and inputs + ) -> kubernetes.client.V1Job: + name = self._get_name("cal-cwl") + _log.info(f"Creating CWL job manifest: {name=}") + + container_image = get_backend_config().calrissian_image + assert container_image + + volumes = [ + # TODO: build these volume infos in init? + VolumeInfo( + name="calrissian-input-data", + claim_name="calrissian-input-data", + mount_path="/calrissian/input-data", + read_only=True, + ), + VolumeInfo( + name="calrissian-tmpout", + claim_name="calrissian-tmpout", + mount_path="/calrissian/tmpout", + ), + VolumeInfo( + name="calrissian-output-data", + claim_name="calrissian-output-data", + mount_path="/calrissian/output-data", + ), + ] + + calrissian_arguments = [ + "--max-ram", + "2G", + "--max-cores", + "1", + "--debug", + "--tmp-outdir-prefix", + "/calrissian/tmpout/", + "--outdir", + "/calrissian/output-data/", + "/calrissian/input-data/hello-world.cwl", + "--message", + "Hello EO world!", + ] + + container = kubernetes.client.V1Container( + name="calrissian-job", + image=container_image, + security_context=self._security_context, + command=["calrissian"], + args=calrissian_arguments, + volume_mounts=[ + kubernetes.client.V1VolumeMount(name=v.name, mount_path=v.mount_path, read_only=v.read_only) + for v in volumes + ], + env=[ + kubernetes.client.V1EnvVar( + name="CALRISSIAN_POD_NAME", + value_from=kubernetes.client.V1EnvVarSource( + field_ref=kubernetes.client.V1ObjectFieldSelector(field_path="metadata.name") + ), ) - ) - ), - ) - return body - - -def launch_cwl_job_and_wait( - body: "kubernetes.client.V1Job", - *, - namespace: str, - sleep: float = 5, - timeout: float = 60, -) -> "kubernetes.client.V1Job": - import kubernetes.client - - k8s_batch = kubernetes.client.BatchV1Api() - - # Launch job. - job: kubernetes.client.V1Job = k8s_batch.create_namespaced_job( - namespace=namespace, - body=body, - ) - job_name = job.metadata.name - _log.info( - f"Created CWL job {job.metadata.name=} {job.metadata.namespace=} {job.metadata.creation_timestamp=} {job.metadata.uid=}" - ) - - # Track job status. - final_status = None - with ContextTimer() as timer: - while timer.elapsed() < timeout: - job: kubernetes.client.V1Job = k8s_batch.read_namespaced_job(name=job_name, namespace=namespace) - _log.info(f"CWL job {job_name=} {timer.elapsed()=:.2f} {job.status=}") - if job.status.conditions: - if any(c.type == "Failed" and c.status == "True" for c in job.status.conditions): - final_status = "failed" - break - elif any(c.type == "Complete" and c.status == "True" for c in job.status.conditions): - final_status = "complete" - break - time.sleep(sleep) - - _log.info(f"CWL job {job_name=} {timer.elapsed()=:.2f} {final_status=}") - if not final_status: - raise TimeoutError(f"CWL Job {job_name} did not finish within {timeout}s") - - # TODO: raise Exception if final status is "failed" too? - - return job + ], + ) + manifest = kubernetes.client.V1Job( + metadata=kubernetes.client.V1ObjectMeta( + name=name, + namespace=self._namespace, + ), + spec=kubernetes.client.V1JobSpec( + template=kubernetes.client.V1PodTemplateSpec( + spec=kubernetes.client.V1PodSpec( + containers=[container], + restart_policy="Never", + volumes=[ + kubernetes.client.V1Volume( + name=v.name, + persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( + claim_name=v.claim_name, + read_only=v.read_only, + ), + ) + for v in volumes + ], + ) + ), + backoff_limit=self._backoff_limit, + ), + ) + return manifest + + def launch_job_and_wait( + self, + manifest: kubernetes.client.V1Job, + *, + sleep: float = 5, + timeout: float = 60, + ) -> kubernetes.client.V1Job: + """Launch a k8s job and wait (with active polling) for it to finish.""" + + k8s_batch = kubernetes.client.BatchV1Api() + + # Launch job. + job: kubernetes.client.V1Job = k8s_batch.create_namespaced_job( + namespace=self._namespace, + body=manifest, + ) + job_name = job.metadata.name + _log.info( + f"Created CWL job {job.metadata.name=} {job.metadata.namespace=} {job.metadata.creation_timestamp=} {job.metadata.uid=}" + ) + + # Track job status (active polling). + final_status = None + with ContextTimer() as timer: + while timer.elapsed() < timeout: + job: kubernetes.client.V1Job = k8s_batch.read_namespaced_job(name=job_name, namespace=self._namespace) + _log.info(f"CWL job {job_name=} {timer.elapsed()=:.2f} {job.status=}") + if job.status.conditions: + if any(c.type == "Failed" and c.status == "True" for c in job.status.conditions): + final_status = "failed" + break + elif any(c.type == "Complete" and c.status == "True" for c in job.status.conditions): + final_status = "complete" + break + time.sleep(sleep) + + _log.info(f"CWL job {job_name=} {timer.elapsed()=:.2f} {final_status=}") + if final_status == "complete": + pass + elif final_status is None: + raise TimeoutError(f"CWL Job {job_name} did not finish within {timeout}s") + elif final_status != "complete": + raise RuntimeError(f"CWL Job {job_name} failed with {final_status=} after {timer.elapsed()=:.2f}s") + else: + raise ValueError("CWL") + + # TODO: how to resolve and extract the results? + + return job From 441a485b209e290a4cf76f7d51a2131079ce016e Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 17 Dec 2024 13:42:51 +0100 Subject: [PATCH 05/18] Issue #936 create input CWL with b64 inline data instead of tedious artifactory deploy --- openeogeotrellis/config/config.py | 4 +- openeogeotrellis/deploy/kube.py | 33 ++++++++++++- openeogeotrellis/integrations/calrissian.py | 48 ++++++++++++------- .../resources/calrissian/hello-world.cwl | 15 ------ 4 files changed, 64 insertions(+), 36 deletions(-) delete mode 100644 openeogeotrellis/integrations/resources/calrissian/hello-world.cwl diff --git a/openeogeotrellis/config/config.py b/openeogeotrellis/config/config.py index 17f3f4d34..70064f8a9 100644 --- a/openeogeotrellis/config/config.py +++ b/openeogeotrellis/config/config.py @@ -248,4 +248,6 @@ class GpsBackendConfig(OpenEoBackendConfig): gdalinfo_use_python_subprocess: bool = False calrissian_namespace: Optional[str] = None - calrissian_image: Optional[str] = "ghcr.io/duke-gcb/calrissian/calrissian:0.17.1" + # TODO: proper calrissian image? Official one doesn't work due to https://github.com/Duke-GCB/calrissian/issues/124#issuecomment-947008286 + # calrissian_image: Optional[str] = "ghcr.io/duke-gcb/calrissian/calrissian:0.17.1" + calrissian_image: Optional[str] = "registry.stag.warsaw.openeo.dataspace.copernicus.eu/rand/calrissian:latest" diff --git a/openeogeotrellis/deploy/kube.py b/openeogeotrellis/deploy/kube.py index 7004a7aa2..ab0d59b46 100644 --- a/openeogeotrellis/deploy/kube.py +++ b/openeogeotrellis/deploy/kube.py @@ -4,6 +4,7 @@ import logging import os +import textwrap from openeo_driver.server import run_gunicorn from openeo_driver.util.logging import ( @@ -104,11 +105,39 @@ def tmp_ogd936(): launcher = CalrissianJobLauncher(namespace=namespace, name_base=name_base) # Input staging - input_staging_manifest = launcher.create_input_staging_job_manifest() + cwl_content = textwrap.dedent( + """ + cwlVersion: v1.0 + class: CommandLineTool + baseCommand: echo + requirements: + - class: DockerRequirement + dockerPull: debian:stretch-slim + inputs: + message: + type: string + default: "Hello World" + inputBinding: + position: 1 + outputs: + output_file: + type: File + outputBinding: + glob: output.txt + stdout: output.txt + """ + ) + input_staging_manifest, cwl_path = launcher.create_input_staging_job_manifest(cwl_content=cwl_content) res = launcher.launch_job_and_wait(manifest=input_staging_manifest) # CWL job - cwl_manifest = launcher.create_cwl_job_manifest() + cwl_manifest = launcher.create_cwl_job_manifest( + cwl_path=cwl_path, + cwl_arguments=[ + "--message", + f"Hello Earth, greetings from {request_id}", + ], + ) res = launcher.launch_job_and_wait(manifest=cwl_manifest) return f"Hello from the backend: {res!r}" diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index 0760d624c..4a44036c3 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -1,7 +1,12 @@ +import base64 +import gzip import logging import dataclasses +import textwrap +from pathlib import Path + import time -from typing import Optional +from typing import Optional, Union, Tuple, List from openeo.util import ContextTimer from openeo_driver.utils import generate_unique_id @@ -19,6 +24,7 @@ class VolumeInfo: read_only: Optional[bool] = None + class CalrissianJobLauncher: """ Helper class to launch a Calrissian job on Kubernetes. @@ -33,20 +39,18 @@ def __init__( ): self._namespace = namespace or get_backend_config().calrissian_namespace assert self._namespace - self._name_base = name_base + self._name_base = name_base or generate_unique_id(prefix="cal") + _log.info(f"CalrissianJobLauncher {self._namespace=} {self._name_base=}") self._backoff_limit = backoff_limit # TODO: config for this? self._security_context = kubernetes.client.V1SecurityContext(run_as_user=1000, run_as_group=1000) - def _get_name(self, affix: str) -> str: - return f"{self._name_base}-{affix}" if self._name_base else generate_unique_id(prefix=affix) - - def create_input_staging_job_manifest(self) -> kubernetes.client.V1Job: + def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernetes.client.V1Job, str]: """ Create a k8s manifest for a Calrissian input staging job. """ - name = self._get_name("cal-input") + name = f"{self._name_base}-cal-input" _log.info("Creating input staging job manifest: {name=}") volumes = [ @@ -59,6 +63,14 @@ def create_input_staging_job_manifest(self) -> kubernetes.client.V1Job: ), ] + # Serialize CWL content to string that is safe to pass as command line argument + cwl_serialized = base64.b64encode(cwl_content.encode("utf8")).decode("ascii") + cwl_path = f"/calrissian/input-data/{self._name_base}.cwl" + + _log.info( + f"create_input_staging_job_manifest creating {cwl_path=} from {cwl_content[:32]=} through {cwl_serialized[:32]=}" + ) + container = kubernetes.client.V1Container( name="calrissian-input-staging", image="alpine:3", @@ -69,10 +81,7 @@ def create_input_staging_job_manifest(self) -> kubernetes.client.V1Job: "; ".join( [ "set -euxo pipefail", - # TODO: better way to deploy and fetch these resources? - "wget -O /tmp/calrissian-resources.tar.gz https://artifactory.vgt.vito.be/artifactory/auxdata-public/openeo/calrissian-resources/calrissian-resources.tar.gz", - "tar -xzvf /tmp/calrissian-resources.tar.gz -C /calrissian/input-data", - "ls -al /calrissian/input-data", + f"echo '{cwl_serialized}' | base64 -d > {cwl_path}", ] ), ], @@ -106,13 +115,16 @@ def create_input_staging_job_manifest(self) -> kubernetes.client.V1Job: backoff_limit=self._backoff_limit, ), ) - return manifest + return manifest, cwl_path def create_cwl_job_manifest( self, + cwl_path: str, + cwl_arguments: List[str], # TODO: arguments to set an actual CWL workflow and inputs ) -> kubernetes.client.V1Job: - name = self._get_name("cal-cwl") + # TODO: name must be unique per invocation of this method, not just per instance/request/job. + name = f"{self._name_base}-cal-cwl" _log.info(f"Creating CWL job manifest: {name=}") container_image = get_backend_config().calrissian_image @@ -147,11 +159,11 @@ def create_cwl_job_manifest( "--tmp-outdir-prefix", "/calrissian/tmpout/", "--outdir", - "/calrissian/output-data/", - "/calrissian/input-data/hello-world.cwl", - "--message", - "Hello EO world!", - ] + f"/calrissian/output-data/{self._name_base}", + cwl_path, + ] + cwl_arguments + + _log.info(f"create_cwl_job_manifest {calrissian_arguments=}") container = kubernetes.client.V1Container( name="calrissian-job", diff --git a/openeogeotrellis/integrations/resources/calrissian/hello-world.cwl b/openeogeotrellis/integrations/resources/calrissian/hello-world.cwl deleted file mode 100644 index 24bd9adad..000000000 --- a/openeogeotrellis/integrations/resources/calrissian/hello-world.cwl +++ /dev/null @@ -1,15 +0,0 @@ -cwlVersion: v1.2 -class: CommandLineTool -baseCommand: echo -stdout: hello-stdout.txt - -inputs: - message: - type: string - default: "Hello World" - inputBinding: - position: 1 - -outputs: - stdout: - type: stdout From 21f67ba1ee9f22811545d7a5759a37c55481b516 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 17 Dec 2024 15:56:58 +0100 Subject: [PATCH 06/18] Issue #936 initial steps figuring out S3 URL of CWL output --- openeogeotrellis/integrations/calrissian.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index 4a44036c3..9d9b0bcda 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -65,6 +65,7 @@ def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernete # Serialize CWL content to string that is safe to pass as command line argument cwl_serialized = base64.b64encode(cwl_content.encode("utf8")).decode("ascii") + # TODO: ensure isolation of different CWLs (e.g. req/job id is not enough). Include content hash cwl_path = f"/calrissian/input-data/{self._name_base}.cwl" _log.info( @@ -258,5 +259,14 @@ def launch_job_and_wait( raise ValueError("CWL") # TODO: how to resolve and extract the results? + # TODO: automatic cleanup after success? + # TODO: automatic cleanup after fail? return job + + def get_volume_name_from_pvc(self, pvc_name: str) -> str: + """Get the name of the volume that is mounted by a PVC.""" + core_api = kubernetes.client.CoreV1Api() + pvc = core_api.read_namespaced_persistent_volume_claim(name=pvc_name, namespace=self._namespace) + volume_name = pvc.spec.volume_name + return volume_name From 5f224e8e4737da378eaf211a9191390009d86fa5 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 17 Dec 2024 17:17:09 +0100 Subject: [PATCH 07/18] Issue #936 CalrissianJobLauncher move volume info details to `__init__` --- openeogeotrellis/integrations/calrissian.py | 98 +++++++++------------ 1 file changed, 42 insertions(+), 56 deletions(-) diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index 9d9b0bcda..46296b4be 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -21,7 +21,6 @@ class VolumeInfo: name: str claim_name: str mount_path: str - read_only: Optional[bool] = None @@ -43,6 +42,22 @@ def __init__( _log.info(f"CalrissianJobLauncher {self._namespace=} {self._name_base=}") self._backoff_limit = backoff_limit + self._volume_input = VolumeInfo( + name="calrissian-input-data", + claim_name="calrissian-input-data", + mount_path="/calrissian/input-data", + ) + self._volume_tmp = VolumeInfo( + name="calrissian-tmpout", + claim_name="calrissian-tmpout", + mount_path="/calrissian/tmpout", + ) + self._volume_output = VolumeInfo( + name="calrissian-output-data", + claim_name="calrissian-output-data", + mount_path="/calrissian/output-data", + ) + # TODO: config for this? self._security_context = kubernetes.client.V1SecurityContext(run_as_user=1000, run_as_group=1000) @@ -53,21 +68,11 @@ def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernete name = f"{self._name_base}-cal-input" _log.info("Creating input staging job manifest: {name=}") - volumes = [ - # TODO: build these volume infos in init? - VolumeInfo( - name="calrissian-input-data", - claim_name="calrissian-input-data", - mount_path="/calrissian/input-data", - # TODO: note: no read_only here. Instead do input staging as part of deployment, instead of on the fly? - ), - ] - # Serialize CWL content to string that is safe to pass as command line argument cwl_serialized = base64.b64encode(cwl_content.encode("utf8")).decode("ascii") # TODO: ensure isolation of different CWLs (e.g. req/job id is not enough). Include content hash - cwl_path = f"/calrissian/input-data/{self._name_base}.cwl" - + # TODO: cleanup procedure of these CWL files? + cwl_path = str(Path(self._volume_input.mount_path) / f"{self._name_base}.cwl") _log.info( f"create_input_staging_job_manifest creating {cwl_path=} from {cwl_content[:32]=} through {cwl_serialized[:32]=}" ) @@ -77,18 +82,11 @@ def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernete image="alpine:3", security_context=self._security_context, command=["/bin/sh"], - args=[ - "-c", - "; ".join( - [ - "set -euxo pipefail", - f"echo '{cwl_serialized}' | base64 -d > {cwl_path}", - ] - ), - ], + args=["-c", f"set -euxo pipefail; echo '{cwl_serialized}' | base64 -d > {cwl_path}"], volume_mounts=[ - kubernetes.client.V1VolumeMount(name=v.name, mount_path=v.mount_path, read_only=v.read_only) - for v in volumes + kubernetes.client.V1VolumeMount( + name=self._volume_input.name, mount_path=self._volume_input.mount_path, read_only=False + ) ], ) manifest = kubernetes.client.V1Job( @@ -103,19 +101,18 @@ def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernete restart_policy="Never", volumes=[ kubernetes.client.V1Volume( - name=v.name, + name=self._volume_input.name, persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=v.claim_name, - read_only=v.read_only, + claim_name=self._volume_input.claim_name, read_only=False ), ) - for v in volumes ], ) ), backoff_limit=self._backoff_limit, ), ) + return manifest, cwl_path def create_cwl_job_manifest( @@ -131,36 +128,23 @@ def create_cwl_job_manifest( container_image = get_backend_config().calrissian_image assert container_image - volumes = [ - # TODO: build these volume infos in init? - VolumeInfo( - name="calrissian-input-data", - claim_name="calrissian-input-data", - mount_path="/calrissian/input-data", - read_only=True, - ), - VolumeInfo( - name="calrissian-tmpout", - claim_name="calrissian-tmpout", - mount_path="/calrissian/tmpout", - ), - VolumeInfo( - name="calrissian-output-data", - claim_name="calrissian-output-data", - mount_path="/calrissian/output-data", - ), - ] + # Pairs of (volume_info, read_only) + volumes = [(self._volume_input, True), (self._volume_tmp, False), (self._volume_output, False)] + + # Ensure trailing "/" so that `tmp-outdir-prefix` is handled as a root directory. + tmp_dir = self._volume_tmp.mount_path.rstrip("/") + "/" + output_dir = str(Path(self._volume_output.mount_path) / self._name_base) calrissian_arguments = [ + "--debug", "--max-ram", "2G", "--max-cores", "1", - "--debug", "--tmp-outdir-prefix", - "/calrissian/tmpout/", + tmp_dir, "--outdir", - f"/calrissian/output-data/{self._name_base}", + output_dir, cwl_path, ] + cwl_arguments @@ -173,8 +157,10 @@ def create_cwl_job_manifest( command=["calrissian"], args=calrissian_arguments, volume_mounts=[ - kubernetes.client.V1VolumeMount(name=v.name, mount_path=v.mount_path, read_only=v.read_only) - for v in volumes + kubernetes.client.V1VolumeMount( + name=volume_info.name, mount_path=volume_info.mount_path, read_only=read_only + ) + for volume_info, read_only in volumes ], env=[ kubernetes.client.V1EnvVar( @@ -197,13 +183,13 @@ def create_cwl_job_manifest( restart_policy="Never", volumes=[ kubernetes.client.V1Volume( - name=v.name, + name=volume_info.name, persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=v.claim_name, - read_only=v.read_only, + claim_name=volume_info.claim_name, + read_only=read_only, ), ) - for v in volumes + for volume_info, read_only in volumes ], ) ), From d7f47ec1d36bad5cbc3c64a0ae032d5c68dd3466 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 18 Dec 2024 09:47:17 +0100 Subject: [PATCH 08/18] Issue #936 encapsulate CWL PoC in openEO process function --- openeogeotrellis/deploy/kube.py | 125 +++++++++++++++++++------------- 1 file changed, 73 insertions(+), 52 deletions(-) diff --git a/openeogeotrellis/deploy/kube.py b/openeogeotrellis/deploy/kube.py index ab0d59b46..ff0313e21 100644 --- a/openeogeotrellis/deploy/kube.py +++ b/openeogeotrellis/deploy/kube.py @@ -4,8 +4,10 @@ import logging import os +import re import textwrap +from openeo_driver.processes import ProcessArgs from openeo_driver.server import run_gunicorn from openeo_driver.util.logging import ( get_logging_config, @@ -13,6 +15,8 @@ LOG_HANDLER_STDERR_JSON, FlaskRequestCorrelationIdLogging, ) +from openeo_driver.ProcessGraphDeserializer import non_standard_process, ProcessSpec, ENV_DRY_RUN_TRACER +from openeo_driver.utils import EvalEnv from openeo_driver.views import build_app from openeogeotrellis import deploy from openeogeotrellis.config import get_backend_config @@ -90,58 +94,6 @@ def on_started(): } ) - @app.route("/tmp/ogd936", methods=["GET"]) - def tmp_ogd936(): - """Temporary endpoint to play with Calrissian based CWL job management""" - import kubernetes.config - from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher - - request_id = FlaskRequestCorrelationIdLogging.get_request_id() - name_base = request_id[:20] - - namespace = "calrissian-demo-project" - kubernetes.config.load_incluster_config() - - launcher = CalrissianJobLauncher(namespace=namespace, name_base=name_base) - - # Input staging - cwl_content = textwrap.dedent( - """ - cwlVersion: v1.0 - class: CommandLineTool - baseCommand: echo - requirements: - - class: DockerRequirement - dockerPull: debian:stretch-slim - inputs: - message: - type: string - default: "Hello World" - inputBinding: - position: 1 - outputs: - output_file: - type: File - outputBinding: - glob: output.txt - stdout: output.txt - """ - ) - input_staging_manifest, cwl_path = launcher.create_input_staging_job_manifest(cwl_content=cwl_content) - res = launcher.launch_job_and_wait(manifest=input_staging_manifest) - - # CWL job - cwl_manifest = launcher.create_cwl_job_manifest( - cwl_path=cwl_path, - cwl_arguments=[ - "--message", - f"Hello Earth, greetings from {request_id}", - ], - ) - res = launcher.launch_job_and_wait(manifest=cwl_manifest) - - return f"Hello from the backend: {res!r}" - host = os.environ.get('SPARK_LOCAL_IP', None) if host is None: host, _ = get_socket() @@ -156,5 +108,74 @@ def tmp_ogd936(): ) +@non_standard_process( + ProcessSpec(id="_cwl_demo", description="Proof-of-concept process to run CWL based processing.") + .param(name="name", description="Name to greet", schema={"type": "string"}, required=False) + .returns(description="data", schema={}) +) +def _cwl_demo(args: ProcessArgs, env: EvalEnv): + """Proof of concept openEO process to run CWL based processing""" + name = args.get_optional( + "name", + default="World", + validator=ProcessArgs.validator_generic( + lambda n: bool(re.fullmatch("^[a-zA-Z]+$", n)), error_message="Must be a simple name, but got {actual!r}." + ), + ) + + if env.get(ENV_DRY_RUN_TRACER): + return name + + # TODO: move this imports to top-level? + import kubernetes.config + from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher + + request_id = FlaskRequestCorrelationIdLogging.get_request_id() + name_base = request_id[:20] + namespace = "calrissian-demo-project" + + kubernetes.config.load_incluster_config() + launcher = CalrissianJobLauncher(namespace=namespace, name_base=name_base) + + # Input staging + cwl_content = textwrap.dedent( + """ + cwlVersion: v1.0 + class: CommandLineTool + baseCommand: echo + requirements: + - class: DockerRequirement + dockerPull: debian:stretch-slim + inputs: + message: + type: string + default: "Hello World" + inputBinding: + position: 1 + outputs: + output_file: + type: File + outputBinding: + glob: output.txt + stdout: output.txt + """ + ) + input_staging_manifest, cwl_path = launcher.create_input_staging_job_manifest(cwl_content=cwl_content) + res = launcher.launch_job_and_wait(manifest=input_staging_manifest) + + # CWL job + cwl_manifest = launcher.create_cwl_job_manifest( + cwl_path=cwl_path, + cwl_arguments=[ + "--message", + f"Hello {name}, greetings from {request_id}.", + ], + ) + res = launcher.launch_job_and_wait(manifest=cwl_manifest) + + # TODO + return "TODO" + + if __name__ == '__main__': main() From 85f41d5a86a413fc3164af329bc95f6fabc54154 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 18 Dec 2024 10:34:40 +0100 Subject: [PATCH 09/18] Issue #936 try to fetch CWL output from S3 --- openeogeotrellis/deploy/kube.py | 23 ++++++++++++++++----- openeogeotrellis/integrations/calrissian.py | 17 ++++++++------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/openeogeotrellis/deploy/kube.py b/openeogeotrellis/deploy/kube.py index ff0313e21..a297cbe6c 100644 --- a/openeogeotrellis/deploy/kube.py +++ b/openeogeotrellis/deploy/kube.py @@ -22,6 +22,7 @@ from openeogeotrellis.config import get_backend_config from openeogeotrellis.deploy import get_socket from openeogeotrellis.job_registry import ZkJobRegistry +from openeogeotrellis.utils import get_s3_file_contents, s3_client log = logging.getLogger(__name__) @@ -123,6 +124,8 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv): ), ) + log = logging.getLogger("openeogeotrellis.deploy.kube._cwl_demo") + if env.get(ENV_DRY_RUN_TRACER): return name @@ -161,20 +164,30 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv): """ ) input_staging_manifest, cwl_path = launcher.create_input_staging_job_manifest(cwl_content=cwl_content) - res = launcher.launch_job_and_wait(manifest=input_staging_manifest) + input_staging_job = launcher.launch_job_and_wait(manifest=input_staging_manifest) # CWL job - cwl_manifest = launcher.create_cwl_job_manifest( + cwl_manifest, relative_output_dir = launcher.create_cwl_job_manifest( cwl_path=cwl_path, cwl_arguments=[ "--message", f"Hello {name}, greetings from {request_id}.", ], ) - res = launcher.launch_job_and_wait(manifest=cwl_manifest) + cwl_job = launcher.launch_job_and_wait(manifest=cwl_manifest) + + output_volume_name = launcher.get_output_volume_name() + s3_instance = s3_client() + # TODO: get S3 bucket name from config? + s3_bucket = "calrissian" + # TODO: this must correspond with the CWL output definition + s3_key = f"{output_volume_name}/{relative_output_dir.strip('/')}/output.txt" + log.info(f"Getting CWL output from S3: {s3_bucket=}, {s3_key=}") + s3_file_object = s3_instance.get_object(Bucket=s3_bucket, Key=s3_key) + body = s3_file_object["Body"] + content = body.read().decode("utf8") + return content - # TODO - return "TODO" if __name__ == '__main__': diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index 46296b4be..e8da6145c 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -120,7 +120,7 @@ def create_cwl_job_manifest( cwl_path: str, cwl_arguments: List[str], # TODO: arguments to set an actual CWL workflow and inputs - ) -> kubernetes.client.V1Job: + ) -> Tuple[kubernetes.client.V1Job, str]: # TODO: name must be unique per invocation of this method, not just per instance/request/job. name = f"{self._name_base}-cal-cwl" _log.info(f"Creating CWL job manifest: {name=}") @@ -133,7 +133,8 @@ def create_cwl_job_manifest( # Ensure trailing "/" so that `tmp-outdir-prefix` is handled as a root directory. tmp_dir = self._volume_tmp.mount_path.rstrip("/") + "/" - output_dir = str(Path(self._volume_output.mount_path) / self._name_base) + relative_output_dir = name + output_dir = str(Path(self._volume_output.mount_path) / relative_output_dir) calrissian_arguments = [ "--debug", @@ -196,7 +197,7 @@ def create_cwl_job_manifest( backoff_limit=self._backoff_limit, ), ) - return manifest + return manifest, relative_output_dir def launch_job_and_wait( self, @@ -244,15 +245,15 @@ def launch_job_and_wait( else: raise ValueError("CWL") - # TODO: how to resolve and extract the results? # TODO: automatic cleanup after success? # TODO: automatic cleanup after fail? - return job - def get_volume_name_from_pvc(self, pvc_name: str) -> str: - """Get the name of the volume that is mounted by a PVC.""" + def get_output_volume_name(self) -> str: + """Get the actual name of the output volume claim.""" core_api = kubernetes.client.CoreV1Api() - pvc = core_api.read_namespaced_persistent_volume_claim(name=pvc_name, namespace=self._namespace) + pvc = core_api.read_namespaced_persistent_volume_claim( + name=self._volume_output.claim_name, namespace=self._namespace + ) volume_name = pvc.spec.volume_name return volume_name From f8dba82280a12c7fc7dc48859fabfd3f7fcc0700 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 18 Dec 2024 18:19:40 +0100 Subject: [PATCH 10/18] Issue #936 encapsulate `run_cwl_workflow` --- openeogeotrellis/config/config.py | 4 +- openeogeotrellis/deploy/kube.py | 49 +++++--------- openeogeotrellis/integrations/calrissian.py | 74 ++++++++++++++++++++- 3 files changed, 93 insertions(+), 34 deletions(-) diff --git a/openeogeotrellis/config/config.py b/openeogeotrellis/config/config.py index 70064f8a9..630df1d16 100644 --- a/openeogeotrellis/config/config.py +++ b/openeogeotrellis/config/config.py @@ -247,7 +247,9 @@ class GpsBackendConfig(OpenEoBackendConfig): gdalinfo_use_subprocess: bool = True # TODO: Only keep one gdalinfo on true gdalinfo_use_python_subprocess: bool = False - calrissian_namespace: Optional[str] = None + # TODO: replace these temp default with None (or better defaults) + calrissian_namespace: Optional[str] = "calrissian-demo-project" # TODO: proper calrissian image? Official one doesn't work due to https://github.com/Duke-GCB/calrissian/issues/124#issuecomment-947008286 # calrissian_image: Optional[str] = "ghcr.io/duke-gcb/calrissian/calrissian:0.17.1" calrissian_image: Optional[str] = "registry.stag.warsaw.openeo.dataspace.copernicus.eu/rand/calrissian:latest" + calrissian_bucket: Optional[str] = "calrissian" diff --git a/openeogeotrellis/deploy/kube.py b/openeogeotrellis/deploy/kube.py index a297cbe6c..a7b564695 100644 --- a/openeogeotrellis/deploy/kube.py +++ b/openeogeotrellis/deploy/kube.py @@ -22,7 +22,7 @@ from openeogeotrellis.config import get_backend_config from openeogeotrellis.deploy import get_socket from openeogeotrellis.job_registry import ZkJobRegistry -from openeogeotrellis.utils import get_s3_file_contents, s3_client + log = logging.getLogger(__name__) @@ -120,14 +120,14 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv): "name", default="World", validator=ProcessArgs.validator_generic( - lambda n: bool(re.fullmatch("^[a-zA-Z]+$", n)), error_message="Must be a simple name, but got {actual!r}." + # TODO: helper to create regex based validator + lambda n: bool(re.fullmatch("^[a-zA-Z]+$", n)), + error_message="Must be a simple name, but got {actual!r}.", ), ) - log = logging.getLogger("openeogeotrellis.deploy.kube._cwl_demo") - if env.get(ENV_DRY_RUN_TRACER): - return name + return "dummy" # TODO: move this imports to top-level? import kubernetes.config @@ -135,12 +135,11 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv): request_id = FlaskRequestCorrelationIdLogging.get_request_id() name_base = request_id[:20] - namespace = "calrissian-demo-project" + # TODO: better place to load this config? kubernetes.config.load_incluster_config() - launcher = CalrissianJobLauncher(namespace=namespace, name_base=name_base) + launcher = CalrissianJobLauncher(name_base=name_base) - # Input staging cwl_content = textwrap.dedent( """ cwlVersion: v1.0 @@ -163,30 +162,18 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv): stdout: output.txt """ ) - input_staging_manifest, cwl_path = launcher.create_input_staging_job_manifest(cwl_content=cwl_content) - input_staging_job = launcher.launch_job_and_wait(manifest=input_staging_manifest) - - # CWL job - cwl_manifest, relative_output_dir = launcher.create_cwl_job_manifest( - cwl_path=cwl_path, - cwl_arguments=[ - "--message", - f"Hello {name}, greetings from {request_id}.", - ], + cwl_arguments = [ + "--message", + f"Hello {name}, greetings from {request_id}.", + ] + + results = launcher.run_cwl_workflow( + cwl_content=cwl_content, + cwl_arguments=cwl_arguments, + output_paths=["output.txt"], ) - cwl_job = launcher.launch_job_and_wait(manifest=cwl_manifest) - - output_volume_name = launcher.get_output_volume_name() - s3_instance = s3_client() - # TODO: get S3 bucket name from config? - s3_bucket = "calrissian" - # TODO: this must correspond with the CWL output definition - s3_key = f"{output_volume_name}/{relative_output_dir.strip('/')}/output.txt" - log.info(f"Getting CWL output from S3: {s3_bucket=}, {s3_key=}") - s3_file_object = s3_instance.get_object(Bucket=s3_bucket, Key=s3_key) - body = s3_file_object["Body"] - content = body.read().decode("utf8") - return content + + return results["output.txt"].read(encoding="utf8") diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index e8da6145c..a5f729602 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -6,7 +6,7 @@ from pathlib import Path import time -from typing import Optional, Union, Tuple, List +from typing import Optional, Union, Tuple, List, Dict from openeo.util import ContextTimer from openeo_driver.utils import generate_unique_id @@ -14,6 +14,8 @@ import kubernetes.client +from openeogeotrellis.utils import s3_client + _log = logging.getLogger(__name__) @dataclasses.dataclass(frozen=True) @@ -24,6 +26,21 @@ class VolumeInfo: +@dataclasses.dataclass(frozen=True) +class CalrissianS3Result: + s3_bucket: str + s3_key: str + + def read(self, encoding: Union[None, str] = None) -> Union[bytes, str]: + _log.info(f"Reading from S3: {self.s3_bucket=}, {self.s3_key=}") + s3_file_object = s3_client().get_object(Bucket=self.s3_bucket, Key=self.s3_key) + body = s3_file_object["Body"] + content = body.read() + if encoding: + content = content.decode(encoding) + return content + + class CalrissianJobLauncher: """ Helper class to launch a Calrissian job on Kubernetes. @@ -34,11 +51,14 @@ def __init__( *, namespace: Optional[str] = None, name_base: Optional[str] = None, + s3_bucket: Optional[str] = None, backoff_limit: int = 1, ): self._namespace = namespace or get_backend_config().calrissian_namespace assert self._namespace self._name_base = name_base or generate_unique_id(prefix="cal") + self._s3_bucket = s3_bucket or get_backend_config().calrissian_bucket + _log.info(f"CalrissianJobLauncher {self._namespace=} {self._name_base=}") self._backoff_limit = backoff_limit @@ -64,6 +84,11 @@ def __init__( def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernetes.client.V1Job, str]: """ Create a k8s manifest for a Calrissian input staging job. + + :param cwl_content: CWL content as a string to dump to CWL file in the input volume. + :return: Tuple of + - k8s job manifest + - path to the CWL file in the input volume. """ name = f"{self._name_base}-cal-input" _log.info("Creating input staging job manifest: {name=}") @@ -121,6 +146,16 @@ def create_cwl_job_manifest( cwl_arguments: List[str], # TODO: arguments to set an actual CWL workflow and inputs ) -> Tuple[kubernetes.client.V1Job, str]: + """ + Create a k8s manifest for a Calrissian CWL job. + + :param cwl_path: path to the CWL file to run (inside the input staging volume), + as produced by `create_input_staging_job_manifest` + :param cwl_arguments: + :return: Tuple of + - k8s job manifest + - relative output directory (inside the output volume) + """ # TODO: name must be unique per invocation of this method, not just per instance/request/job. name = f"{self._name_base}-cal-cwl" _log.info(f"Creating CWL job manifest: {name=}") @@ -206,7 +241,9 @@ def launch_job_and_wait( sleep: float = 5, timeout: float = 60, ) -> kubernetes.client.V1Job: - """Launch a k8s job and wait (with active polling) for it to finish.""" + """ + Launch a k8s job and wait (with active polling) for it to finish. + """ k8s_batch = kubernetes.client.BatchV1Api() @@ -257,3 +294,36 @@ def get_output_volume_name(self) -> str: ) volume_name = pvc.spec.volume_name return volume_name + + def run_cwl_workflow( + self, cwl_content: str, cwl_arguments: List[str], output_paths: List[str] + ) -> Dict[str, CalrissianS3Result]: + """ + Run a CWL workflow on Calrissian and return the output as a string. + + :param cwl_content: CWL content as a string. + :param cwl_arguments: arguments to pass to the CWL workflow. + :return: output of the CWL workflow as a string. + """ + # Input staging + input_staging_manifest, cwl_path = self.create_input_staging_job_manifest(cwl_content=cwl_content) + input_staging_job = self.launch_job_and_wait(manifest=input_staging_manifest) + + # CWL job + cwl_manifest, relative_output_dir = self.create_cwl_job_manifest( + cwl_path=cwl_path, + cwl_arguments=cwl_arguments, + ) + cwl_job = self.launch_job_and_wait(manifest=cwl_manifest) + + # Collect results + output_volume_name = self.get_output_volume_name() + s3_bucket = self._s3_bucket + results = { + output_path: CalrissianS3Result( + s3_bucket=s3_bucket, + s3_key=f"{output_volume_name}/{relative_output_dir.strip('/')}/{output_path.strip('/')}", + ) + for output_path in output_paths + } + return results From 52fdc496d07414fc17c37698e9dd9bfe659d5250 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 19 Dec 2024 09:55:02 +0100 Subject: [PATCH 11/18] Issue #936 bit of isort and code style cleanup --- openeogeotrellis/deploy/kube.py | 12 ++++++++---- openeogeotrellis/integrations/calrissian.py | 14 ++++++-------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/openeogeotrellis/deploy/kube.py b/openeogeotrellis/deploy/kube.py index a7b564695..1b2e520d4 100644 --- a/openeogeotrellis/deploy/kube.py +++ b/openeogeotrellis/deploy/kube.py @@ -8,22 +8,26 @@ import textwrap from openeo_driver.processes import ProcessArgs +from openeo_driver.ProcessGraphDeserializer import ( + ENV_DRY_RUN_TRACER, + ProcessSpec, + non_standard_process, +) from openeo_driver.server import run_gunicorn from openeo_driver.util.logging import ( - get_logging_config, - setup_logging, LOG_HANDLER_STDERR_JSON, FlaskRequestCorrelationIdLogging, + get_logging_config, + setup_logging, ) -from openeo_driver.ProcessGraphDeserializer import non_standard_process, ProcessSpec, ENV_DRY_RUN_TRACER from openeo_driver.utils import EvalEnv from openeo_driver.views import build_app + from openeogeotrellis import deploy from openeogeotrellis.config import get_backend_config from openeogeotrellis.deploy import get_socket from openeogeotrellis.job_registry import ZkJobRegistry - log = logging.getLogger(__name__) diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index a5f729602..9c1647ed4 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -1,23 +1,22 @@ import base64 +import dataclasses import gzip import logging -import dataclasses import textwrap -from pathlib import Path - import time -from typing import Optional, Union, Tuple, List, Dict +from pathlib import Path +from typing import Dict, List, Optional, Tuple, Union +import kubernetes.client from openeo.util import ContextTimer from openeo_driver.utils import generate_unique_id -from openeogeotrellis.config import get_backend_config - -import kubernetes.client +from openeogeotrellis.config import get_backend_config from openeogeotrellis.utils import s3_client _log = logging.getLogger(__name__) + @dataclasses.dataclass(frozen=True) class VolumeInfo: name: str @@ -25,7 +24,6 @@ class VolumeInfo: mount_path: str - @dataclasses.dataclass(frozen=True) class CalrissianS3Result: s3_bucket: str From 8776f02190e474f79b5416897d1bb750410671b4 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 19 Dec 2024 11:09:41 +0100 Subject: [PATCH 12/18] Issue #936 fixup calrissian tests --- openeogeotrellis/integrations/calrissian.py | 4 +- tests/integrations/test_calrissian.py | 237 ++++++++++++-------- 2 files changed, 145 insertions(+), 96 deletions(-) diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index 9c1647ed4..62f9e8043 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -170,7 +170,9 @@ def create_cwl_job_manifest( output_dir = str(Path(self._volume_output.mount_path) / relative_output_dir) calrissian_arguments = [ + # TODO (still) need for this debug flag? "--debug", + # TODO: better RAM/CPU values than these arbitrary ones? "--max-ram", "2G", "--max-cores", @@ -185,7 +187,7 @@ def create_cwl_job_manifest( _log.info(f"create_cwl_job_manifest {calrissian_arguments=}") container = kubernetes.client.V1Container( - name="calrissian-job", + name=name, image=container_image, security_context=self._security_context, command=["calrissian"], diff --git a/tests/integrations/test_calrissian.py b/tests/integrations/test_calrissian.py index f5a230bd3..00b0eba8e 100644 --- a/tests/integrations/test_calrissian.py +++ b/tests/integrations/test_calrissian.py @@ -1,104 +1,151 @@ import dirty_equals -from openeogeotrellis.integrations.calrissian import create_cwl_job_body +import kubernetes.client +from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher, CalrissianS3Result -def test_create_cwl_job_body(): - body = create_cwl_job_body(namespace="calrissian-test") - assert body.to_dict() == { - "api_version": None, - "kind": None, - "metadata": dirty_equals.IsPartialDict( +class TestCalrissianJobLauncher: + + def test_create_input_staging_job_manifest(self): + launcher = CalrissianJobLauncher(namespace="calrissian-test", name_base="r-123") + + manifest, cwl_path = launcher.create_input_staging_job_manifest(cwl_content="class: Dummy") + + assert cwl_path == "/calrissian/input-data/r-123.cwl" + + assert isinstance(manifest, kubernetes.client.V1Job) + manifest_dict = manifest.to_dict() + + assert manifest_dict["metadata"] == dirty_equals.IsPartialDict( { - "name": dirty_equals.IsStr(regex="cj-2.*"), + "name": "r-123-cal-input", "namespace": "calrissian-test", } - ), - "spec": dirty_equals.IsPartialDict( + ) + assert manifest_dict["spec"] == dirty_equals.IsPartialDict( + { + "backoff_limit": 1, + } + ) + assert manifest_dict["spec"]["template"]["spec"] == dirty_equals.IsPartialDict( + { + "containers": [ + dirty_equals.IsPartialDict( + { + "name": "calrissian-input-staging", + "image": "alpine:3", + "command": ["/bin/sh"], + "args": [ + "-c", + "set -euxo pipefail; echo 'Y2xhc3M6IER1bW15' | base64 -d > /calrissian/input-data/r-123.cwl", + ], + "volume_mounts": [ + dirty_equals.IsPartialDict( + { + "mount_path": "/calrissian/input-data", + "name": "calrissian-input-data", + "read_only": False, + } + ), + ], + } + ) + ], + "volumes": [ + dirty_equals.IsPartialDict( + { + "name": "calrissian-input-data", + "persistent_volume_claim": {"claim_name": "calrissian-input-data", "read_only": False}, + } + ), + ], + } + ) + + def test_create_cwl_job_manifest(self): + launcher = CalrissianJobLauncher(namespace="calrissian-test", name_base="r-123") + + manifest, output_dir = launcher.create_cwl_job_manifest( + cwl_path="/calrissian/input-data/r-123.cwl", cwl_arguments=["--message", "Howdy Earth!"] + ) + + assert output_dir == "r-123-cal-cwl" + + assert isinstance(manifest, kubernetes.client.V1Job) + manifest_dict = manifest.to_dict() + + assert manifest_dict["metadata"] == dirty_equals.IsPartialDict( + { + "name": "r-123-cal-cwl", + "namespace": "calrissian-test", + } + ) + assert manifest_dict["spec"] == dirty_equals.IsPartialDict( + { + "backoff_limit": 1, + } + ) + assert manifest_dict["spec"]["template"]["spec"] == dirty_equals.IsPartialDict( { - "template": dirty_equals.IsPartialDict( - { - "spec": dirty_equals.IsPartialDict( - { - "containers": [ - dirty_equals.IsPartialDict( - { - "args": [ - "--max-ram", - "2G", - "--max-cores", - "1", - "--debug", - "--tmp-outdir-prefix", - "/calrissian/tmpout/", - "--outdir", - "/calrissian/output-data/", - "/calrissian/input-data/hello-workflow.cwl", - "/calrissian/input-data/hello-input.yaml", - ], - "command": ["calrissian"], - "image": "ghcr.io/duke-gcb/calrissian/calrissian:0.17.1", - "volume_mounts": [ - dirty_equals.IsPartialDict( - { - "mount_path": "/calrissian/input-data", - "name": "calrissian-input-data", - "read_only": True, - } - ), - dirty_equals.IsPartialDict( - { - "mount_path": "/calrissian/tmpout", - "name": "calrissian-tmpout", - "read_only": None, - } - ), - dirty_equals.IsPartialDict( - { - "mount_path": "/calrissian/output-data", - "name": "calrissian-output-data", - "read_only": None, - } - ), - ], - "working_dir": None, - } - ) - ], - "volumes": [ - dirty_equals.IsPartialDict( - { - "name": "calrissian-input-data", - "persistent_volume_claim": { - "claim_name": "calrissian-input-data", - "read_only": True, - }, - } - ), - dirty_equals.IsPartialDict( - { - "name": "calrissian-tmpout", - "persistent_volume_claim": { - "claim_name": "calrissian-tmpout", - "read_only": None, - }, - } - ), - dirty_equals.IsPartialDict( - { - "name": "calrissian-output-data", - "persistent_volume_claim": { - "claim_name": "calrissian-output-data", - "read_only": None, - }, - } - ), - ], - } - ), - } - ), + "containers": [ + dirty_equals.IsPartialDict( + { + "name": "r-123-cal-cwl", + "command": ["calrissian"], + "args": dirty_equals.Contains( + "--tmp-outdir-prefix", + "/calrissian/tmpout/", + "--outdir", + "/calrissian/output-data/r-123-cal-cwl", + "/calrissian/input-data/r-123.cwl", + "--message", + "Howdy Earth!", + ), + "volume_mounts": [ + dirty_equals.IsPartialDict( + { + "mount_path": "/calrissian/input-data", + "name": "calrissian-input-data", + "read_only": True, + } + ), + dirty_equals.IsPartialDict( + { + "mount_path": "/calrissian/tmpout", + "name": "calrissian-tmpout", + "read_only": False, + } + ), + dirty_equals.IsPartialDict( + { + "mount_path": "/calrissian/output-data", + "name": "calrissian-output-data", + "read_only": False, + } + ), + ], + } + ) + ], + "volumes": [ + dirty_equals.IsPartialDict( + { + "name": "calrissian-input-data", + "persistent_volume_claim": {"claim_name": "calrissian-input-data", "read_only": True}, + } + ), + dirty_equals.IsPartialDict( + { + "name": "calrissian-tmpout", + "persistent_volume_claim": {"claim_name": "calrissian-tmpout", "read_only": False}, + } + ), + dirty_equals.IsPartialDict( + { + "name": "calrissian-output-data", + "persistent_volume_claim": {"claim_name": "calrissian-output-data", "read_only": False}, + } + ), + ], } - ), - "status": None, - } + ) From 8e44197d8d323fdd6033b577fd0c6e61add00888 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 19 Dec 2024 12:18:03 +0100 Subject: [PATCH 13/18] Issue #936 improve naming (and uniqueness) in CalrissianJobLauncher --- openeogeotrellis/deploy/kube.py | 10 ++--- openeogeotrellis/integrations/calrissian.py | 31 +++++++++++---- openeogeotrellis/util/runtime.py | 21 +++++++++- tests/integrations/test_calrissian.py | 37 +++++++++++------- tests/util/test_runtime.py | 43 ++++++++++++++++++++- 5 files changed, 115 insertions(+), 27 deletions(-) diff --git a/openeogeotrellis/deploy/kube.py b/openeogeotrellis/deploy/kube.py index 1b2e520d4..ae8924d39 100644 --- a/openeogeotrellis/deploy/kube.py +++ b/openeogeotrellis/deploy/kube.py @@ -27,6 +27,7 @@ from openeogeotrellis.config import get_backend_config from openeogeotrellis.deploy import get_socket from openeogeotrellis.job_registry import ZkJobRegistry +from openeogeotrellis.util.runtime import get_job_id, get_request_id log = logging.getLogger(__name__) @@ -137,12 +138,10 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv): import kubernetes.config from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher - request_id = FlaskRequestCorrelationIdLogging.get_request_id() - name_base = request_id[:20] - # TODO: better place to load this config? kubernetes.config.load_incluster_config() - launcher = CalrissianJobLauncher(name_base=name_base) + + launcher = CalrissianJobLauncher.from_context() cwl_content = textwrap.dedent( """ @@ -166,9 +165,10 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv): stdout: output.txt """ ) + correlation_id = get_job_id(default=None) or get_request_id(default=None) cwl_arguments = [ "--message", - f"Hello {name}, greetings from {request_id}.", + f"Hello {name}, greetings from {correlation_id}.", ] results = launcher.run_cwl_workflow( diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index 62f9e8043..56dea84cc 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -1,3 +1,4 @@ +from __future__ import annotations import base64 import dataclasses import gzip @@ -12,6 +13,7 @@ from openeo_driver.utils import generate_unique_id from openeogeotrellis.config import get_backend_config +from openeogeotrellis.util.runtime import get_job_id, get_request_id from openeogeotrellis.utils import s3_client _log = logging.getLogger(__name__) @@ -54,7 +56,7 @@ def __init__( ): self._namespace = namespace or get_backend_config().calrissian_namespace assert self._namespace - self._name_base = name_base or generate_unique_id(prefix="cal") + self._name_base = name_base or generate_unique_id(prefix="cal")[:20] self._s3_bucket = s3_bucket or get_backend_config().calrissian_bucket _log.info(f"CalrissianJobLauncher {self._namespace=} {self._name_base=}") @@ -79,6 +81,21 @@ def __init__( # TODO: config for this? self._security_context = kubernetes.client.V1SecurityContext(run_as_user=1000, run_as_group=1000) + @staticmethod + def from_context() -> CalrissianJobLauncher: + """ + Factory for creating a CalrissianJobLauncher from the current context + (e.g. using job_id/request_id for base name). + """ + correlation_id = get_job_id(default=None) or get_request_id(default=None) + name_base = correlation_id[:20] if correlation_id else None + return CalrissianJobLauncher(name_base=name_base) + + def _build_unique_name(self, infix: str) -> str: + """Build unique name from base name, an infix and something random, to be used for k8s resources""" + suffix = generate_unique_id(date_prefix=False)[:8] + return f"{self._name_base}-{infix}-{suffix}" + def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernetes.client.V1Job, str]: """ Create a k8s manifest for a Calrissian input staging job. @@ -88,14 +105,13 @@ def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernete - k8s job manifest - path to the CWL file in the input volume. """ - name = f"{self._name_base}-cal-input" - _log.info("Creating input staging job manifest: {name=}") + name = self._build_unique_name(infix="cal-inp") + _log.info(f"Creating input staging job manifest: {name=}") # Serialize CWL content to string that is safe to pass as command line argument cwl_serialized = base64.b64encode(cwl_content.encode("utf8")).decode("ascii") - # TODO: ensure isolation of different CWLs (e.g. req/job id is not enough). Include content hash # TODO: cleanup procedure of these CWL files? - cwl_path = str(Path(self._volume_input.mount_path) / f"{self._name_base}.cwl") + cwl_path = str(Path(self._volume_input.mount_path) / f"{name}.cwl") _log.info( f"create_input_staging_job_manifest creating {cwl_path=} from {cwl_content[:32]=} through {cwl_serialized[:32]=}" ) @@ -126,7 +142,8 @@ def create_input_staging_job_manifest(self, cwl_content: str) -> Tuple[kubernete kubernetes.client.V1Volume( name=self._volume_input.name, persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource( - claim_name=self._volume_input.claim_name, read_only=False + claim_name=self._volume_input.claim_name, + read_only=False, ), ) ], @@ -155,7 +172,7 @@ def create_cwl_job_manifest( - relative output directory (inside the output volume) """ # TODO: name must be unique per invocation of this method, not just per instance/request/job. - name = f"{self._name_base}-cal-cwl" + name = self._build_unique_name(infix="cal-cwl") _log.info(f"Creating CWL job manifest: {name=}") container_image = get_backend_config().calrissian_image diff --git a/openeogeotrellis/util/runtime.py b/openeogeotrellis/util/runtime.py index 86f2504d4..9d4520659 100644 --- a/openeogeotrellis/util/runtime.py +++ b/openeogeotrellis/util/runtime.py @@ -1,5 +1,9 @@ import os from typing import Union, Type +import inspect + + +from openeo_driver.util.logging import FlaskRequestCorrelationIdLogging ENV_VAR_OPENEO_BATCH_JOB_ID = "OPENEO_BATCH_JOB_ID" @@ -9,7 +13,7 @@ def _is_exception_like(value) -> bool: return isinstance(value, Exception) or (isinstance(value, type) and issubclass(value, Exception)) -def get_job_id(*, default: Union[str, None, Exception, Type[Exception]] = None) -> Union[str, None]: +def get_job_id(*, default: Union[None, str, Exception, Type[Exception]] = None) -> Union[str, None]: """ Get job id from batch job context, or a default/exception if not in batch job context. @@ -22,3 +26,18 @@ def get_job_id(*, default: Union[str, None, Exception, Type[Exception]] = None) def in_batch_job_context() -> bool: return bool(get_job_id(default=None)) + + +def get_request_id(*, default: Union[None, str, Exception, Type[Exception]] = None) -> Union[str, None]: + """ + Get webapp request id from request context, + or a default/exception if not in request context. + """ + kwargs = {} + if "default" in inspect.signature(FlaskRequestCorrelationIdLogging.get_request_id).parameters: + # TODO #936 remove this temporary adapter when openeo-driver dependency is fully updated (to >=0.122.0) + kwargs = {"default": default} + request_id = FlaskRequestCorrelationIdLogging.get_request_id(**kwargs) + if _is_exception_like(request_id): + raise request_id + return request_id diff --git a/tests/integrations/test_calrissian.py b/tests/integrations/test_calrissian.py index 00b0eba8e..8d07b568c 100644 --- a/tests/integrations/test_calrissian.py +++ b/tests/integrations/test_calrissian.py @@ -1,24 +1,34 @@ +from unittest import mock import dirty_equals import kubernetes.client - +import pytest from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher, CalrissianS3Result +@pytest.fixture +def generate_unique_id_mock() -> str: + # TODO: move this mock fixture to a more generic place + with mock.patch("openeo_driver.utils.uuid") as uuid: + fake_uuid = "0123456789abcdef0123456789abcdef" + uuid.uuid4.return_value.hex = fake_uuid + yield fake_uuid + + class TestCalrissianJobLauncher: - def test_create_input_staging_job_manifest(self): - launcher = CalrissianJobLauncher(namespace="calrissian-test", name_base="r-123") + def test_create_input_staging_job_manifest(self, generate_unique_id_mock): + launcher = CalrissianJobLauncher(namespace="calrissian-test", name_base="r-1234") manifest, cwl_path = launcher.create_input_staging_job_manifest(cwl_content="class: Dummy") - assert cwl_path == "/calrissian/input-data/r-123.cwl" + assert cwl_path == "/calrissian/input-data/r-1234-cal-inp-01234567.cwl" assert isinstance(manifest, kubernetes.client.V1Job) manifest_dict = manifest.to_dict() assert manifest_dict["metadata"] == dirty_equals.IsPartialDict( { - "name": "r-123-cal-input", + "name": "r-1234-cal-inp-01234567", "namespace": "calrissian-test", } ) @@ -37,7 +47,7 @@ def test_create_input_staging_job_manifest(self): "command": ["/bin/sh"], "args": [ "-c", - "set -euxo pipefail; echo 'Y2xhc3M6IER1bW15' | base64 -d > /calrissian/input-data/r-123.cwl", + "set -euxo pipefail; echo 'Y2xhc3M6IER1bW15' | base64 -d > /calrissian/input-data/r-1234-cal-inp-01234567.cwl", ], "volume_mounts": [ dirty_equals.IsPartialDict( @@ -62,21 +72,22 @@ def test_create_input_staging_job_manifest(self): } ) - def test_create_cwl_job_manifest(self): + def test_create_cwl_job_manifest(self, generate_unique_id_mock): launcher = CalrissianJobLauncher(namespace="calrissian-test", name_base="r-123") manifest, output_dir = launcher.create_cwl_job_manifest( - cwl_path="/calrissian/input-data/r-123.cwl", cwl_arguments=["--message", "Howdy Earth!"] + cwl_path="/calrissian/input-data/r-1234-cal-inp-01234567.cwl", + cwl_arguments=["--message", "Howdy Earth!"], ) - assert output_dir == "r-123-cal-cwl" + assert output_dir == "r-123-cal-cwl-01234567" assert isinstance(manifest, kubernetes.client.V1Job) manifest_dict = manifest.to_dict() assert manifest_dict["metadata"] == dirty_equals.IsPartialDict( { - "name": "r-123-cal-cwl", + "name": "r-123-cal-cwl-01234567", "namespace": "calrissian-test", } ) @@ -90,14 +101,14 @@ def test_create_cwl_job_manifest(self): "containers": [ dirty_equals.IsPartialDict( { - "name": "r-123-cal-cwl", + "name": "r-123-cal-cwl-01234567", "command": ["calrissian"], "args": dirty_equals.Contains( "--tmp-outdir-prefix", "/calrissian/tmpout/", "--outdir", - "/calrissian/output-data/r-123-cal-cwl", - "/calrissian/input-data/r-123.cwl", + "/calrissian/output-data/r-123-cal-cwl-01234567", + "/calrissian/input-data/r-1234-cal-inp-01234567.cwl", "--message", "Howdy Earth!", ), diff --git a/tests/util/test_runtime.py b/tests/util/test_runtime.py index 71a9fe33c..33fa6d9ec 100644 --- a/tests/util/test_runtime.py +++ b/tests/util/test_runtime.py @@ -1,6 +1,9 @@ +import dirty_equals +import flask import pytest -from openeogeotrellis.util.runtime import get_job_id, ENV_VAR_OPENEO_BATCH_JOB_ID +from openeo_driver.util.logging import FlaskRequestCorrelationIdLogging +from openeogeotrellis.util.runtime import get_job_id, ENV_VAR_OPENEO_BATCH_JOB_ID, get_request_id def test_get_job_id_basic(): @@ -26,3 +29,41 @@ def test_get_job_id_with_exception(monkeypatch, exception): monkeypatch.setenv(ENV_VAR_OPENEO_BATCH_JOB_ID, "j-123") assert get_job_id(default=exception) == "j-123" + + +def test_get_request_id_no_flask(): + assert get_request_id() is None + assert get_request_id(default=None) is None + assert get_request_id(default="nope") == "nope" + + with pytest.raises(RuntimeError): + get_request_id(default=RuntimeError) + + with pytest.raises(RuntimeError): + get_request_id(default=RuntimeError("nope!")) + + +def test_get_request_id_in_flask(): + results = [] + + app = flask.Flask(__name__) + + @app.before_request + def before_request(): + FlaskRequestCorrelationIdLogging.before_request() + + @app.route("/hello") + def hello(): + results.append(get_request_id()) + results.append(get_request_id(default="nope")) + results.append(get_request_id(default=RuntimeError)) + return "Hello world" + + with app.test_client() as client: + client.get("/hello") + + assert results == [ + dirty_equals.IsStr(regex="r-[0-9a-f]+"), + dirty_equals.IsStr(regex="r-[0-9a-f]+"), + dirty_equals.IsStr(regex="r-[0-9a-f]+"), + ] From 001328cae867fbcdd3939bb1c03615f72a91f9be Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 19 Dec 2024 15:35:11 +0100 Subject: [PATCH 14/18] Issue #936 code/import style cleanups --- openeogeotrellis/deploy/kube.py | 2 ++ openeogeotrellis/integrations/calrissian.py | 7 +++---- openeogeotrellis/util/runtime.py | 5 ++--- tests/integrations/test_calrissian.py | 8 ++++++-- tests/util/test_runtime.py | 8 ++++++-- 5 files changed, 19 insertions(+), 11 deletions(-) diff --git a/openeogeotrellis/deploy/kube.py b/openeogeotrellis/deploy/kube.py index ae8924d39..42d975d99 100644 --- a/openeogeotrellis/deploy/kube.py +++ b/openeogeotrellis/deploy/kube.py @@ -53,6 +53,7 @@ def main(): ) from pyspark import SparkContext + log.info("starting spark context") SparkContext.getOrCreate() @@ -136,6 +137,7 @@ def _cwl_demo(args: ProcessArgs, env: EvalEnv): # TODO: move this imports to top-level? import kubernetes.config + from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher # TODO: better place to load this config? diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index 56dea84cc..02d3a8ed1 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -1,17 +1,16 @@ from __future__ import annotations + import base64 import dataclasses -import gzip import logging -import textwrap -import time from pathlib import Path from typing import Dict, List, Optional, Tuple, Union import kubernetes.client +import time + from openeo.util import ContextTimer from openeo_driver.utils import generate_unique_id - from openeogeotrellis.config import get_backend_config from openeogeotrellis.util.runtime import get_job_id, get_request_id from openeogeotrellis.utils import s3_client diff --git a/openeogeotrellis/util/runtime.py b/openeogeotrellis/util/runtime.py index 9d4520659..bcc9472bd 100644 --- a/openeogeotrellis/util/runtime.py +++ b/openeogeotrellis/util/runtime.py @@ -1,7 +1,6 @@ -import os -from typing import Union, Type import inspect - +import os +from typing import Type, Union from openeo_driver.util.logging import FlaskRequestCorrelationIdLogging diff --git a/tests/integrations/test_calrissian.py b/tests/integrations/test_calrissian.py index 8d07b568c..5a9d39dd3 100644 --- a/tests/integrations/test_calrissian.py +++ b/tests/integrations/test_calrissian.py @@ -1,8 +1,13 @@ from unittest import mock + import dirty_equals import kubernetes.client import pytest -from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher, CalrissianS3Result + +from openeogeotrellis.integrations.calrissian import ( + CalrissianJobLauncher, + CalrissianS3Result, +) @pytest.fixture @@ -15,7 +20,6 @@ def generate_unique_id_mock() -> str: class TestCalrissianJobLauncher: - def test_create_input_staging_job_manifest(self, generate_unique_id_mock): launcher = CalrissianJobLauncher(namespace="calrissian-test", name_base="r-1234") diff --git a/tests/util/test_runtime.py b/tests/util/test_runtime.py index 33fa6d9ec..35084ad02 100644 --- a/tests/util/test_runtime.py +++ b/tests/util/test_runtime.py @@ -1,9 +1,13 @@ import dirty_equals import flask import pytest - from openeo_driver.util.logging import FlaskRequestCorrelationIdLogging -from openeogeotrellis.util.runtime import get_job_id, ENV_VAR_OPENEO_BATCH_JOB_ID, get_request_id + +from openeogeotrellis.util.runtime import ( + ENV_VAR_OPENEO_BATCH_JOB_ID, + get_job_id, + get_request_id, +) def test_get_job_id_basic(): From f79838582537e9eff20fa4d5683091299cdfeef2 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 20 Dec 2024 13:40:14 +0100 Subject: [PATCH 15/18] Issue #936 add test for CalrissianS3Result --- openeogeotrellis/config/config.py | 1 + tests/integrations/test_calrissian.py | 24 ++++++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/openeogeotrellis/config/config.py b/openeogeotrellis/config/config.py index 630df1d16..c93f88265 100644 --- a/openeogeotrellis/config/config.py +++ b/openeogeotrellis/config/config.py @@ -141,6 +141,7 @@ class GpsBackendConfig(OpenEoBackendConfig): s1backscatter_elev_geoid: Optional[str] = os.environ.get("OPENEO_S1BACKSCATTER_ELEV_GEOID") + # TODO: generically named config for a single bucket: not future/feature-proof (support for multiple buckets) s3_bucket_name: str = os.environ.get("SWIFT_BUCKET", "OpenEO-data") fuse_mount_batchjob_s3_bucket: bool = smart_bool(os.environ.get("FUSE_MOUNT_BATCHJOB_S3_BUCKET", False)) diff --git a/tests/integrations/test_calrissian.py b/tests/integrations/test_calrissian.py index 5a9d39dd3..93661be8b 100644 --- a/tests/integrations/test_calrissian.py +++ b/tests/integrations/test_calrissian.py @@ -3,6 +3,8 @@ import dirty_equals import kubernetes.client import pytest +import moto +import boto3 from openeogeotrellis.integrations.calrissian import ( CalrissianJobLauncher, @@ -164,3 +166,25 @@ def test_create_cwl_job_manifest(self, generate_unique_id_mock): ], } ) + + +class TestCalrissianS3Result: + @pytest.fixture + def s3_output(self): + with moto.mock_aws(): + s3 = boto3.client("s3") + bucket = "the-bucket" + s3.create_bucket(Bucket=bucket) + key = "path/to/output.txt" + s3.put_object(Bucket=bucket, Key=key, Body="Howdy, Earth!") + yield bucket, key + + def test_read(self, s3_output): + bucket, key = s3_output + result = CalrissianS3Result(s3_bucket=bucket, s3_key=key) + assert result.read() == b"Howdy, Earth!" + + def test_read_encoding(self, s3_output): + bucket, key = s3_output + result = CalrissianS3Result(s3_bucket=bucket, s3_key=key) + assert result.read(encoding="utf-8") == "Howdy, Earth!" From d0246fcff18ea41bd76c8375fca38b547d45ac34 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 20 Dec 2024 15:03:28 +0100 Subject: [PATCH 16/18] Issue #936 add tests about k8s interactions --- openeogeotrellis/integrations/calrissian.py | 5 +- tests/integrations/test_calrissian.py | 87 +++++++++++++++++++-- 2 files changed, 84 insertions(+), 8 deletions(-) diff --git a/openeogeotrellis/integrations/calrissian.py b/openeogeotrellis/integrations/calrissian.py index 02d3a8ed1..e5aa01d83 100644 --- a/openeogeotrellis/integrations/calrissian.py +++ b/openeogeotrellis/integrations/calrissian.py @@ -58,7 +58,7 @@ def __init__( self._name_base = name_base or generate_unique_id(prefix="cal")[:20] self._s3_bucket = s3_bucket or get_backend_config().calrissian_bucket - _log.info(f"CalrissianJobLauncher {self._namespace=} {self._name_base=}") + _log.info(f"CalrissianJobLauncher.__init__: {self._namespace=} {self._name_base=} {self._s3_bucket=}") self._backoff_limit = backoff_limit self._volume_input = VolumeInfo( @@ -272,13 +272,14 @@ def launch_job_and_wait( _log.info( f"Created CWL job {job.metadata.name=} {job.metadata.namespace=} {job.metadata.creation_timestamp=} {job.metadata.uid=}" ) + # TODO: add assertions here about successful job creation? # Track job status (active polling). final_status = None with ContextTimer() as timer: while timer.elapsed() < timeout: job: kubernetes.client.V1Job = k8s_batch.read_namespaced_job(name=job_name, namespace=self._namespace) - _log.info(f"CWL job {job_name=} {timer.elapsed()=:.2f} {job.status=}") + _log.info(f"CWL job {job_name=} {timer.elapsed()=:.2f} {job.status.to_dict()=}") if job.status.conditions: if any(c.type == "Failed" and c.status == "True" for c in job.status.conditions): final_status = "failed" diff --git a/tests/integrations/test_calrissian.py b/tests/integrations/test_calrissian.py index 93661be8b..ee3202c3d 100644 --- a/tests/integrations/test_calrissian.py +++ b/tests/integrations/test_calrissian.py @@ -1,10 +1,11 @@ +from typing import Dict from unittest import mock +import boto3 import dirty_equals import kubernetes.client -import pytest import moto -import boto3 +import pytest from openeogeotrellis.integrations.calrissian import ( CalrissianJobLauncher, @@ -14,6 +15,7 @@ @pytest.fixture def generate_unique_id_mock() -> str: + """Fixture to fix the UUID used in `generate_unique_id`""" # TODO: move this mock fixture to a more generic place with mock.patch("openeo_driver.utils.uuid") as uuid: fake_uuid = "0123456789abcdef0123456789abcdef" @@ -22,8 +24,10 @@ def generate_unique_id_mock() -> str: class TestCalrissianJobLauncher: + NAMESPACE = "test-calrissian" + def test_create_input_staging_job_manifest(self, generate_unique_id_mock): - launcher = CalrissianJobLauncher(namespace="calrissian-test", name_base="r-1234") + launcher = CalrissianJobLauncher(namespace=self.NAMESPACE, name_base="r-1234") manifest, cwl_path = launcher.create_input_staging_job_manifest(cwl_content="class: Dummy") @@ -35,7 +39,7 @@ def test_create_input_staging_job_manifest(self, generate_unique_id_mock): assert manifest_dict["metadata"] == dirty_equals.IsPartialDict( { "name": "r-1234-cal-inp-01234567", - "namespace": "calrissian-test", + "namespace": self.NAMESPACE, } ) assert manifest_dict["spec"] == dirty_equals.IsPartialDict( @@ -79,7 +83,7 @@ def test_create_input_staging_job_manifest(self, generate_unique_id_mock): ) def test_create_cwl_job_manifest(self, generate_unique_id_mock): - launcher = CalrissianJobLauncher(namespace="calrissian-test", name_base="r-123") + launcher = CalrissianJobLauncher(namespace=self.NAMESPACE, name_base="r-123") manifest, output_dir = launcher.create_cwl_job_manifest( cwl_path="/calrissian/input-data/r-1234-cal-inp-01234567.cwl", @@ -94,7 +98,7 @@ def test_create_cwl_job_manifest(self, generate_unique_id_mock): assert manifest_dict["metadata"] == dirty_equals.IsPartialDict( { "name": "r-123-cal-cwl-01234567", - "namespace": "calrissian-test", + "namespace": self.NAMESPACE, } ) assert manifest_dict["spec"] == dirty_equals.IsPartialDict( @@ -167,6 +171,77 @@ def test_create_cwl_job_manifest(self, generate_unique_id_mock): } ) + @pytest.fixture() + def k8_pvc_api(self): + """Mock for PVC API in kubernetes.client.CoreV1Api""" + pvc_to_volume_name = { + "calrissian-output-data": "1234-abcd-5678-efgh", + } + + def read_namespaced_persistent_volume_claim(name: str, namespace: str): + assert namespace == self.NAMESPACE + return kubernetes.client.V1PersistentVolumeClaim( + spec=kubernetes.client.V1PersistentVolumeClaimSpec(volume_name=pvc_to_volume_name[name]) + ) + + with mock.patch("kubernetes.client.CoreV1Api") as CoreV1Api: + CoreV1Api.return_value.read_namespaced_persistent_volume_claim = read_namespaced_persistent_volume_claim + yield + + def test_get_output_volume_name(self, k8_pvc_api): + launcher = CalrissianJobLauncher(namespace=self.NAMESPACE, name_base="r-123") + assert launcher.get_output_volume_name() == "1234-abcd-5678-efgh" + + @pytest.fixture() + def k8s_batch_api(self): + """mock for kubernetes.client.BatchV1Api""" + + class BatchV1Api: + def __init__(self): + self.jobs: Dict[str, kubernetes.client.V1Job] = {} + + def create_namespaced_job(self, namespace: str, body: kubernetes.client.V1Job): + assert body.metadata.namespace == namespace + job = kubernetes.client.V1Job(metadata=body.metadata) + self.jobs[job.metadata.name] = job + return job + + def read_namespaced_job(self, name: str, namespace: str): + assert name in self.jobs + assert self.jobs[name].metadata.namespace == namespace + return kubernetes.client.V1Job( + metadata=self.jobs[name].metadata, + status=kubernetes.client.V1JobStatus( + # TODO: way to specify timeline of job conditions? + conditions=[kubernetes.client.V1JobCondition(type="Complete", status="True")] + ), + ) + + with mock.patch("kubernetes.client.BatchV1Api", new=BatchV1Api): + yield + + def test_launch_job_and_wait_basic(self, k8s_batch_api, caplog): + launcher = CalrissianJobLauncher(namespace=self.NAMESPACE, name_base="r-456") + job_manifest = kubernetes.client.V1Job( + metadata=kubernetes.client.V1ObjectMeta(name="cal-123", namespace=self.NAMESPACE) + ) + result = launcher.launch_job_and_wait(manifest=job_manifest) + assert isinstance(result, kubernetes.client.V1Job) + + assert caplog.messages[-1] == dirty_equals.IsStr(regex=".*job_name='cal-123'.*final_status='complete'.*") + + def test_run_cwl_workflow_basic(self, k8_pvc_api, k8s_batch_api, generate_unique_id_mock, caplog): + launcher = CalrissianJobLauncher(namespace=self.NAMESPACE, name_base="r-456", s3_bucket="test-bucket") + res = launcher.run_cwl_workflow( + cwl_content="class: Dummy", cwl_arguments=["--message", "Howdy Earth!"], output_paths=["output.txt"] + ) + assert res == { + "output.txt": CalrissianS3Result( + s3_bucket="test-bucket", + s3_key="1234-abcd-5678-efgh/r-456-cal-cwl-01234567/output.txt", + ), + } + class TestCalrissianS3Result: @pytest.fixture From aaf5622cc602473ba927ada78a9c30945731a4b9 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 20 Dec 2024 17:46:53 +0100 Subject: [PATCH 17/18] Issue #936 drop temp adapter for openeo_driver<0.122 --- openeogeotrellis/util/runtime.py | 6 +----- setup.py | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/openeogeotrellis/util/runtime.py b/openeogeotrellis/util/runtime.py index bcc9472bd..721857082 100644 --- a/openeogeotrellis/util/runtime.py +++ b/openeogeotrellis/util/runtime.py @@ -32,11 +32,7 @@ def get_request_id(*, default: Union[None, str, Exception, Type[Exception]] = No Get webapp request id from request context, or a default/exception if not in request context. """ - kwargs = {} - if "default" in inspect.signature(FlaskRequestCorrelationIdLogging.get_request_id).parameters: - # TODO #936 remove this temporary adapter when openeo-driver dependency is fully updated (to >=0.122.0) - kwargs = {"default": default} - request_id = FlaskRequestCorrelationIdLogging.get_request_id(**kwargs) + request_id = FlaskRequestCorrelationIdLogging.get_request_id(default=default) if _is_exception_like(request_id): raise request_id return request_id diff --git a/setup.py b/setup.py index 17a133d70..e396e8396 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ tests_require=tests_require, install_requires=[ "openeo>=0.33.0", - "openeo_driver>=0.121.0.dev", + "openeo_driver>=0.122.0.dev", 'pyspark==3.4.2; python_version>="3.8"', 'pyspark>=2.3.1,<2.4.0; python_version<"3.8"', 'geopyspark==0.4.7+openeo', From 15ac6d608fd412aacdad43623e8e9f5106ba8a0d Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 19 Dec 2024 09:52:13 +0100 Subject: [PATCH 18/18] Issue #936 bump version and add changelog entry --- CHANGELOG.md | 5 +++++ openeogeotrellis/_version.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b27a5cbab..6a0e85bc7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,11 @@ without compromising stable operations. ## Unreleased + +## 0.56.0 + +- Initial support for CWL based processes with Calrissian on a Kubernetes cluster ([#936](https://github.com/Open-EO/openeo-geopyspark-driver/issues/936)) + ## 0.55.0 - Support `file_metadata` format option to set file-specific metadata on `GTiff` output assets ([#970](https://github.com/Open-EO/openeo-geopyspark-driver/issues/970)) diff --git a/openeogeotrellis/_version.py b/openeogeotrellis/_version.py index ea75999ec..86bcd289d 100644 --- a/openeogeotrellis/_version.py +++ b/openeogeotrellis/_version.py @@ -1 +1 @@ -__version__ = "0.55.0a1" +__version__ = "0.56.0a1"