Skip to content

Commit

Permalink
Merge branch 'astronomer:main' into feature/duckdb
Browse files Browse the repository at this point in the history
  • Loading branch information
prithvijitguha authored Mar 2, 2025
2 parents 990a9f2 + ddea39c commit cf09f17
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
1 change: 1 addition & 0 deletions cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def create_task_metadata(
}

if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES:
args["on_warning_callback"] = on_warning_callback
exclude_detached_tests_if_needed(node, args, detached_from_parent)
task_id, args = _get_task_id_and_args(
node, args, use_task_group, normalize_task_id, "build", include_resource_type=True
Expand Down
30 changes: 29 additions & 1 deletion cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,36 @@ class DbtBuildLocalOperator(DbtBuildMixin, DbtLocalBaseOperator):

template_fields: Sequence[str] = DbtLocalBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
def __init__(self, *args: Any, on_warning_callback: Callable[..., Any] | None = None, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.on_warning_callback = on_warning_callback
self.extract_issues: Callable[..., tuple[list[str], list[str]]]

def _handle_warnings(self, result: FullOutputSubprocessResult | dbtRunnerResult, context: Context) -> None:
"""
Handles warnings by extracting log issues, creating additional context, and calling the
on_warning_callback with the updated context.
:param result: The result object from the build and run command.
:param context: The original airflow context in which the build and run command was executed.
"""
if self.invocation_mode == InvocationMode.SUBPROCESS:
self.extract_issues = extract_freshness_warn_msg
elif self.invocation_mode == InvocationMode.DBT_RUNNER:
self.extract_issues = dbt_runner.extract_message_by_status

test_names, test_results = self.extract_issues(result)

warning_context = dict(context)
warning_context["test_names"] = test_names
warning_context["test_results"] = test_results

self.on_warning_callback and self.on_warning_callback(warning_context)

def execute(self, context: Context, **kwargs: Any) -> None:
result = self.build_and_run_cmd(context=context, cmd_flags=self.add_cmd_flags())
if self.on_warning_callback:
self._handle_warnings(result, context)


class DbtLSLocalOperator(DbtLSMixin, DbtLocalBaseOperator):
Expand Down
12 changes: 11 additions & 1 deletion tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,17 @@ def test_run_test_operator_with_callback(invocation_mode, failing_test_dbt_proje
on_warning_callback=on_warning_callback,
invocation_mode=invocation_mode,
)
run_operator >> test_operator

build_operator = DbtBuildLocalOperator(
profile_config=mini_profile_config,
project_dir=failing_test_dbt_project,
task_id="build",
append_env=True,
on_warning_callback=on_warning_callback,
invocation_mode=invocation_mode,
)

run_operator >> build_operator >> test_operator
run_test_dag(dag)
assert on_warning_callback.called

Expand Down

0 comments on commit cf09f17

Please sign in to comment.