Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task sla and timeout support. #1317

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import timedelta
from typing import Any, Callable, Union

from airflow.models import BaseOperator
Expand Down Expand Up @@ -135,6 +136,8 @@ def create_task_metadata(
dbt_dag_task_group_identifier: str,
use_task_group: bool = False,
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE,
model_timeout: bool = False,
model_sla: bool = False,
) -> TaskMetadata | None:
"""
Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node.
Expand Down Expand Up @@ -166,6 +169,11 @@ def create_task_metadata(
task_id = f"{node.name}_run"
if use_task_group is True:
task_id = "run"
if model_timeout and "model_timeout" in node.config.keys():
logger.error(f'model_timeout: {node.config["model_timeout"]} in values')
args["execution_timeout"] = timedelta(seconds=int(node.config["model_timeout"]))
if model_sla and "model_sla" in node.config.keys():
args["sla"] = timedelta(seconds=int(node.config["model_sla"]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Airflow SLAs are known to have few issues.

They mentioned in AF 3 meeting notes that they would be dropping SLAs of AF 3 & come up with something better in later versions in AF 3+. I am thinking if we should we hold on adding SLA support here? WDYT?

It was agreed that for now, we will mark it for removal in Airflow 3.0 and adding it back in 3.1. We will assess again in the coming dev calls if something changes. Elad has also offered to help if needed.

Copy link
Contributor

@pankajkoti pankajkoti Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issues with the current SLA implementation are outlined here: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=247828059#AIP57RefactorSLAFeature-ProblemsintheCurrentState.

I’m concerned that if we add SLA support in Cosmos now, any problems stemming from the underlying implementation could lead users to report it as a Cosmos error rather than an Airflow issue. Additionally, since SLAs are set to be removed in Airflow 3 and are undergoing a planned refactor per this AIP, it may be best to hold off on adding this for now, IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand well. I’ll consider SLA support to be out of scope. I wasn’t aware of the upcoming features planned for Airflow 3.0. Thank you for letting me know.

elif node.resource_type == DbtResourceType.SOURCE:
if (source_rendering_behavior == SourceRenderingBehavior.NONE) or (
source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS
Expand Down Expand Up @@ -217,6 +225,8 @@ def generate_task_or_group(
source_rendering_behavior: SourceRenderingBehavior,
test_indirect_selection: TestIndirectSelection,
on_warning_callback: Callable[..., Any] | None,
model_timeout: bool,
model_sla: bool,
**kwargs: Any,
) -> BaseOperator | TaskGroup | None:
task_or_group: BaseOperator | TaskGroup | None = None
Expand All @@ -234,6 +244,8 @@ def generate_task_or_group(
dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group),
use_task_group=use_task_group,
source_rendering_behavior=source_rendering_behavior,
model_timeout=model_timeout,
model_sla=model_sla,
)

# In most cases, we'll map one DBT node to one Airflow task
Expand Down Expand Up @@ -335,6 +347,8 @@ def build_airflow_graph(
node_converters = render_config.node_converters or {}
test_behavior = render_config.test_behavior
source_rendering_behavior = render_config.source_rendering_behavior
model_timeout = render_config.model_timeout
model_sla = render_config.model_sla
tasks_map = {}
task_or_group: TaskGroup | BaseOperator

Expand All @@ -356,6 +370,8 @@ def build_airflow_graph(
source_rendering_behavior=source_rendering_behavior,
test_indirect_selection=test_indirect_selection,
on_warning_callback=on_warning_callback,
model_timeout=model_timeout,
model_sla=model_sla,
node=node,
)
if task_or_group is not None:
Expand Down
2 changes: 2 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class RenderConfig:
enable_mock_profile: bool = True
source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE
airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list)
model_timeout: bool = False
model_sla: bool = False
Comment on lines +83 to +84
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR adds two parameters, but I’m unsure if it’s advisable to add too many parameters to Cosmos without careful consideration.

I’d like to request a review. What do you think? If you have any preferable ideas, please let me know. I’ll make adjustments accordingly and add documentation and tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering of whether we need this config. Can we not directly use timeouts if they're specified & otherwise not set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You’re absolutely right. Instead of adding parameters, it would have been better to set Timeout and SLA automatically only when metadata is present.

I wonder why I didn't think of it... thank you very much.


def __post_init__(self, dbt_project_path: str | Path | None) -> None:
if self.env_vars:
Expand Down
Loading