diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index f507b03ac..0f135bfa5 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import timedelta from typing import Any, Callable, Union from airflow.models import BaseOperator @@ -135,6 +136,8 @@ def create_task_metadata( dbt_dag_task_group_identifier: str, use_task_group: bool = False, source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, + model_timeout: bool = False, + model_sla: bool = False, ) -> TaskMetadata | None: """ Create the metadata that will be used to instantiate the Airflow Task used to run the Dbt node. @@ -166,6 +169,11 @@ def create_task_metadata( task_id = f"{node.name}_run" if use_task_group is True: task_id = "run" + if model_timeout and "model_timeout" in node.config.keys(): + logger.error(f'model_timeout: {node.config["model_timeout"]} in values') + args["execution_timeout"] = timedelta(seconds=int(node.config["model_timeout"])) + if model_sla and "model_sla" in node.config.keys(): + args["sla"] = timedelta(seconds=int(node.config["model_sla"])) elif node.resource_type == DbtResourceType.SOURCE: if (source_rendering_behavior == SourceRenderingBehavior.NONE) or ( source_rendering_behavior == SourceRenderingBehavior.WITH_TESTS_OR_FRESHNESS @@ -217,6 +225,8 @@ def generate_task_or_group( source_rendering_behavior: SourceRenderingBehavior, test_indirect_selection: TestIndirectSelection, on_warning_callback: Callable[..., Any] | None, + model_timeout: bool, + model_sla: bool, **kwargs: Any, ) -> BaseOperator | TaskGroup | None: task_or_group: BaseOperator | TaskGroup | None = None @@ -234,6 +244,8 @@ def generate_task_or_group( dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group), use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, + model_timeout=model_timeout, + model_sla=model_sla, ) # In most cases, we'll map one DBT node to one Airflow task @@ -335,6 +347,8 @@ def build_airflow_graph( node_converters = render_config.node_converters or {} test_behavior = render_config.test_behavior source_rendering_behavior = render_config.source_rendering_behavior + model_timeout = render_config.model_timeout + model_sla = render_config.model_sla tasks_map = {} task_or_group: TaskGroup | BaseOperator @@ -356,6 +370,8 @@ def build_airflow_graph( source_rendering_behavior=source_rendering_behavior, test_indirect_selection=test_indirect_selection, on_warning_callback=on_warning_callback, + model_timeout=model_timeout, + model_sla=model_sla, node=node, ) if task_or_group is not None: diff --git a/cosmos/config.py b/cosmos/config.py index 516a6787b..0275189c4 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -80,6 +80,8 @@ class RenderConfig: enable_mock_profile: bool = True source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE airflow_vars_to_purge_dbt_ls_cache: list[str] = field(default_factory=list) + model_timeout: bool = False + model_sla: bool = False def __post_init__(self, dbt_project_path: str | Path | None) -> None: if self.env_vars: