Skip to content

Commit

Permalink
Airflow Operator - dry_run and support for constructor to create job_…
Browse files Browse the repository at this point in the history
…request (#3889)

* Operator now supports dry_run, which will render job_request created, but will not submit it to Armada.
* job_request, can now be callable - so users can provide more complex function to render it based on xcom data and jinja environment.
* This will be next Armada Airflow Operator release (1.0.2).
  • Loading branch information
masipauskas authored Aug 23, 2024
1 parent 537852a commit b006b7b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
11 changes: 8 additions & 3 deletions docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This class provides integration with Airflow and Armada
## armada.operators.armada module


### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, \*\*kwargs)
### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, dry_run=False, \*\*kwargs)
Bases: `BaseOperator`, `LoggingMixin`

An Airflow operator that manages Job submission to Armada.
Expand All @@ -33,7 +33,7 @@ and handles job cancellation if the Airflow task is killed.
* **armada_queue** (*str*) –


* **job_request** (*JobSubmitRequestItem*) –
* **job_request** (*JobSubmitRequestItem** | **Callable**[**[**Context**, **jinja2.Environment**]**, **JobSubmitRequestItem**]*) –


* **job_set_prefix** (*Optional**[**str**]*) –
Expand All @@ -57,6 +57,9 @@ and handles job cancellation if the Airflow task is killed.
* **job_acknowledgement_timeout** (*int*) –


* **dry_run** (*bool*) –



#### execute(context)
Submits the job to Armada and polls for completion.
Expand Down Expand Up @@ -138,7 +141,7 @@ Initializes a new ArmadaOperator.
* **armada_queue** (*str*) – The name of the Armada queue to which the job will be submitted.


* **job_request** (*JobSubmitRequestItem*) – The job to be submitted to Armada.
* **job_request** (*JobSubmitRequestItem** | **Callable**[**[**Context**, **jinja2.Environment**]**, **JobSubmitRequestItem**]*) – The job to be submitted to Armada.


* **job_set_prefix** (*Optional**[**str**]*) – A string to prepend to the jobSet name.
Expand All @@ -162,6 +165,8 @@ for asynchronous execution.
:param job_acknowledgement_timeout: The timeout in seconds to wait for a job to be
acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param kwargs: Additional keyword arguments to pass to the BaseOperator.


Expand Down
26 changes: 23 additions & 3 deletions third_party/airflow/armada/operators/armada.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import datetime
import os
import time
from typing import Any, Dict, Optional, Sequence, Tuple
from typing import Any, Callable, Dict, Optional, Sequence, Tuple

import jinja2
from airflow.configuration import conf
Expand Down Expand Up @@ -79,7 +79,8 @@ class ArmadaOperator(BaseOperator, LoggingMixin):
:param armada_queue: The name of the Armada queue to which the job will be submitted.
:type armada_queue: str
:param job_request: The job to be submitted to Armada.
:type job_request: JobSubmitRequestItem
:type job_request: JobSubmitRequestItem | \
Callable[[Context, jinja2.Environment], JobSubmitRequestItem]
:param job_set_prefix: A string to prepend to the jobSet name.
:type job_set_prefix: Optional[str]
:param lookout_url_template: Template for creating lookout links. If not specified
Expand All @@ -98,6 +99,8 @@ class ArmadaOperator(BaseOperator, LoggingMixin):
:param job_acknowledgement_timeout: The timeout in seconds to wait for a job to be
acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param kwargs: Additional keyword arguments to pass to the BaseOperator.
"""

Expand All @@ -106,7 +109,10 @@ def __init__(
name: str,
channel_args: GrpcChannelArgs,
armada_queue: str,
job_request: JobSubmitRequestItem,
job_request: (
JobSubmitRequestItem
| Callable[[Context, jinja2.Environment], JobSubmitRequestItem]
),
job_set_prefix: Optional[str] = "",
lookout_url_template: Optional[str] = None,
poll_interval: int = 30,
Expand All @@ -116,6 +122,7 @@ def __init__(
"operators", "default_deferrable", fallback=True
),
job_acknowledgement_timeout: int = 5 * 60,
dry_run: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -131,6 +138,7 @@ def __init__(
self.k8s_token_retriever = k8s_token_retriever
self.deferrable = deferrable
self.job_acknowledgement_timeout = job_acknowledgement_timeout
self.dry_run = dry_run
self.job_context = None

if self.container_logs and self.k8s_token_retriever is None:
Expand All @@ -153,6 +161,13 @@ def execute(self, context) -> None:

self._annotate_job_request(context, self.job_request)

if self.dry_run:
self.log.info(
f"Running in dry_run mode. job_set_id: {self.job_set_id} \n"
f"{self.job_request}"
)
return

# Submit job or reattach to previously submitted job.
# Always do this synchronously.
self.job_context = self._reattach_or_submit_job(
Expand Down Expand Up @@ -183,6 +198,11 @@ def render_template_fields(
:param context: Airflow Context dict wi1th values to apply on content
:param jinja_env: jinja’s environment to use for rendering.
"""
if callable(self.job_request):
if not jinja_env:
jinja_env = self.get_template_env()
self.job_request = self.job_request(context, jinja_env)

self.job_request = MessageToDict(
self.job_request, preserving_proto_field_name=True
)
Expand Down
2 changes: 1 addition & 1 deletion third_party/airflow/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "armada_airflow"
version = "1.0.1"
version = "1.0.2"
description = "Armada Airflow Operator"
readme='README.md'
authors = [{name = "Armada-GROSS", email = "[email protected]"}]
Expand Down

0 comments on commit b006b7b

Please sign in to comment.