Skip to content

Commit

Permalink
Merge branch 'issue936-calrissian'
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Dec 20, 2024
2 parents ab268a8 + 15ac6d6 commit 167e029
Show file tree
Hide file tree
Showing 11 changed files with 777 additions and 7 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ without compromising stable operations.

## Unreleased


## 0.56.0

- Initial support for CWL based processes with Calrissian on a Kubernetes cluster ([#936](https://github.com/Open-EO/openeo-geopyspark-driver/issues/936))

## 0.55.0

- Support `file_metadata` format option to set file-specific metadata on `GTiff` output assets ([#970](https://github.com/Open-EO/openeo-geopyspark-driver/issues/970))
Expand Down
2 changes: 1 addition & 1 deletion openeogeotrellis/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.55.0a1"
__version__ = "0.56.0a1"
8 changes: 8 additions & 0 deletions openeogeotrellis/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class GpsBackendConfig(OpenEoBackendConfig):

s1backscatter_elev_geoid: Optional[str] = os.environ.get("OPENEO_S1BACKSCATTER_ELEV_GEOID")

# TODO: generically named config for a single bucket: not future/feature-proof (support for multiple buckets)
s3_bucket_name: str = os.environ.get("SWIFT_BUCKET", "OpenEO-data")

fuse_mount_batchjob_s3_bucket: bool = smart_bool(os.environ.get("FUSE_MOUNT_BATCHJOB_S3_BUCKET", False))
Expand Down Expand Up @@ -246,3 +247,10 @@ class GpsBackendConfig(OpenEoBackendConfig):
gdalinfo_python_call: bool = False
gdalinfo_use_subprocess: bool = True # TODO: Only keep one gdalinfo on true
gdalinfo_use_python_subprocess: bool = False

# TODO: replace these temp default with None (or better defaults)
calrissian_namespace: Optional[str] = "calrissian-demo-project"
# TODO: proper calrissian image? Official one doesn't work due to https://github.com/Duke-GCB/calrissian/issues/124#issuecomment-947008286
# calrissian_image: Optional[str] = "ghcr.io/duke-gcb/calrissian/calrissian:0.17.1"
calrissian_image: Optional[str] = "registry.stag.warsaw.openeo.dataspace.copernicus.eu/rand/calrissian:latest"
calrissian_bucket: Optional[str] = "calrissian"
1 change: 1 addition & 0 deletions openeogeotrellis/deploy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

def load_custom_processes(logger=_log, _name="custom_processes"):
"""Try loading optional `custom_processes` module"""
# TODO: use backend_config instead of env var
if path := os.environ.get("OPENEO_CUSTOM_PROCESSES"):
# Directly load custom processes from OPENEO_CUSTOM_PROCESSES
logger.debug(f"load_custom_processes: trying exec loading {path!r}")
Expand Down
89 changes: 87 additions & 2 deletions openeogeotrellis/deploy/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,30 @@

import logging
import os

import re
import textwrap

from openeo_driver.processes import ProcessArgs
from openeo_driver.ProcessGraphDeserializer import (
ENV_DRY_RUN_TRACER,
ProcessSpec,
non_standard_process,
)
from openeo_driver.server import run_gunicorn
from openeo_driver.util.logging import get_logging_config, setup_logging, LOG_HANDLER_STDERR_JSON
from openeo_driver.util.logging import (
LOG_HANDLER_STDERR_JSON,
FlaskRequestCorrelationIdLogging,
get_logging_config,
setup_logging,
)
from openeo_driver.utils import EvalEnv
from openeo_driver.views import build_app

from openeogeotrellis import deploy
from openeogeotrellis.config import get_backend_config
from openeogeotrellis.deploy import get_socket
from openeogeotrellis.job_registry import ZkJobRegistry
from openeogeotrellis.util.runtime import get_job_id, get_request_id

log = logging.getLogger(__name__)

Expand All @@ -37,6 +53,7 @@ def main():
)

from pyspark import SparkContext

log.info("starting spark context")
SparkContext.getOrCreate()

Expand Down Expand Up @@ -98,5 +115,73 @@ def on_started():
)


@non_standard_process(
ProcessSpec(id="_cwl_demo", description="Proof-of-concept process to run CWL based processing.")
.param(name="name", description="Name to greet", schema={"type": "string"}, required=False)
.returns(description="data", schema={})
)
def _cwl_demo(args: ProcessArgs, env: EvalEnv):
"""Proof of concept openEO process to run CWL based processing"""
name = args.get_optional(
"name",
default="World",
validator=ProcessArgs.validator_generic(
# TODO: helper to create regex based validator
lambda n: bool(re.fullmatch("^[a-zA-Z]+$", n)),
error_message="Must be a simple name, but got {actual!r}.",
),
)

if env.get(ENV_DRY_RUN_TRACER):
return "dummy"

# TODO: move this imports to top-level?
import kubernetes.config

from openeogeotrellis.integrations.calrissian import CalrissianJobLauncher

# TODO: better place to load this config?
kubernetes.config.load_incluster_config()

launcher = CalrissianJobLauncher.from_context()

cwl_content = textwrap.dedent(
"""
cwlVersion: v1.0
class: CommandLineTool
baseCommand: echo
requirements:
- class: DockerRequirement
dockerPull: debian:stretch-slim
inputs:
message:
type: string
default: "Hello World"
inputBinding:
position: 1
outputs:
output_file:
type: File
outputBinding:
glob: output.txt
stdout: output.txt
"""
)
correlation_id = get_job_id(default=None) or get_request_id(default=None)
cwl_arguments = [
"--message",
f"Hello {name}, greetings from {correlation_id}.",
]

results = launcher.run_cwl_workflow(
cwl_content=cwl_content,
cwl_arguments=cwl_arguments,
output_paths=["output.txt"],
)

return results["output.txt"].read(encoding="utf8")



if __name__ == '__main__':
main()
Loading

0 comments on commit 167e029

Please sign in to comment.