Skip to content

Commit

Permalink
remove soft_fail (apache#41710)
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelauv authored Aug 25, 2024
1 parent eb6ea6c commit 1613e9e
Show file tree
Hide file tree
Showing 62 changed files with 225 additions and 795 deletions.
8 changes: 1 addition & 7 deletions airflow/providers/airbyte/sensors/airbyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from typing import TYPE_CHECKING, Any, Literal, Sequence

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -93,16 +93,10 @@ def poke(self, context: Context) -> bool:
status = job.json()["status"]

if status == hook.FAILED:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"Job failed: \n{job}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
elif status == hook.CANCELLED:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"Job was cancelled: \n{job}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
elif status == hook.SUCCEEDED:
self.log.info("Job %s completed successfully.", self.airbyte_job_id)
Expand Down
8 changes: 1 addition & 7 deletions airflow/providers/alibaba/cloud/sensors/oss_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from deprecated.classic import deprecated

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
from airflow.sensors.base import BaseSensorOperator

Expand Down Expand Up @@ -73,23 +73,17 @@ def poke(self, context: Context):
parsed_url = urlsplit(self.bucket_key)
if self.bucket_name is None:
if parsed_url.netloc == "":
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = "If key is a relative path from root, please provide a bucket_name"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
self.bucket_name = parsed_url.netloc
self.bucket_key = parsed_url.path.lstrip("/")
else:
if parsed_url.scheme != "" or parsed_url.netloc != "":
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = (
"If bucket_name is provided, bucket_key"
" should be relative path from root"
" level, rather than a full oss:// url"
)
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)

self.log.info("Poking for key : oss://%s/%s", self.bucket_name, self.bucket_key)
Expand Down
4 changes: 1 addition & 3 deletions airflow/providers/amazon/aws/sensors/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from deprecated import deprecated

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.batch_client import BatchClientHook
from airflow.providers.amazon.aws.triggers.batch import BatchJobTrigger
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -265,6 +265,4 @@ def poke(self, context: Context) -> bool:
return False

message = f"AWS Batch job queue failed. AWS Batch job queue status: {status}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
8 changes: 4 additions & 4 deletions airflow/providers/amazon/aws/sensors/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from airflow.utils.context import Context


def _check_failed(current_state, target_state, failure_states, soft_fail: bool) -> None:
def _check_failed(current_state, target_state, failure_states) -> None:
if (current_state != target_state) and (current_state in failure_states):
raise AirflowException(
f"Terminal state reached. Current state: {current_state}, Expected state: {target_state}"
Expand Down Expand Up @@ -86,7 +86,7 @@ def poke(self, context: Context):
cluster_state = EcsClusterStates(self.hook.get_cluster_state(cluster_name=self.cluster_name))

self.log.info("Cluster state: %s, waiting for: %s", cluster_state, self.target_state)
_check_failed(cluster_state, self.target_state, self.failure_states, self.soft_fail)
_check_failed(cluster_state, self.target_state, self.failure_states)

return cluster_state == self.target_state

Expand Down Expand Up @@ -132,7 +132,7 @@ def poke(self, context: Context):
)

self.log.info("Task Definition state: %s, waiting for: %s", task_definition_state, self.target_state)
_check_failed(task_definition_state, self.target_state, [self.failure_states], self.soft_fail)
_check_failed(task_definition_state, self.target_state, [self.failure_states])
return task_definition_state == self.target_state


Expand Down Expand Up @@ -172,5 +172,5 @@ def poke(self, context: Context):
task_state = EcsTaskStates(self.hook.get_task_state(cluster=self.cluster, task=self.task))

self.log.info("Task state: %s, waiting for: %s", task_state, self.target_state)
_check_failed(task_state, self.target_state, self.failure_states, self.soft_fail)
_check_failed(task_state, self.target_state, self.failure_states)
return task_state == self.target_state
6 changes: 1 addition & 5 deletions airflow/providers/amazon/aws/sensors/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import TYPE_CHECKING, Any, Sequence

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.glue import GlueDataQualityHook, GlueJobHook
from airflow.providers.amazon.aws.sensors.base_aws import AwsBaseSensor
from airflow.providers.amazon.aws.triggers.glue import (
Expand Down Expand Up @@ -177,8 +177,6 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None

if event["status"] != "success":
message = f"Error: AWS Glue data quality ruleset evaluation run: {event}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)

self.hook.validate_evaluation_run_results(
Expand Down Expand Up @@ -300,8 +298,6 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None

if event["status"] != "success":
message = f"Error: AWS Glue data quality recommendation run: {event}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)

if self.show_results:
Expand Down
5 changes: 1 addition & 4 deletions airflow/providers/apache/flink/sensors/flink_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from kubernetes import client

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.sensors.base import BaseSensorOperator

Expand Down Expand Up @@ -125,10 +125,7 @@ def poke(self, context: Context) -> bool:
if self.attach_log and application_state in self.FAILURE_STATES + self.SUCCESS_STATES:
self._log_driver(application_state, response)
if application_state in self.FAILURE_STATES:
# TODO: remove this if check when min_airflow_version is set to higher than 2.7.1
message = f"Flink application failed with state: {application_state}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
elif application_state in self.SUCCESS_STATES:
self.log.info("Flink application ended successfully")
Expand Down
9 changes: 0 additions & 9 deletions airflow/providers/celery/sensors/celery_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from celery.app import control

from airflow.exceptions import AirflowSkipException
from airflow.sensors.base import BaseSensorOperator

if TYPE_CHECKING:
Expand Down Expand Up @@ -73,13 +72,5 @@ def poke(self, context: Context) -> bool:

return reserved == 0 and scheduled == 0 and active == 0
except KeyError:
# TODO: remove this if check when min_airflow_version is set to higher than 2.7.1
message = f"Could not locate Celery queue {self.celery_queue}"
if self.soft_fail:
raise AirflowSkipException(message)
raise KeyError(message)
except Exception as err:
# TODO: remove this if check when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException from err
raise
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from kubernetes import client

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from airflow.sensors.base import BaseSensorOperator

Expand Down Expand Up @@ -125,10 +125,7 @@ def poke(self, context: Context) -> bool:
self._log_driver(application_state, response)

if application_state in self.FAILURE_STATES:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"Spark application failed with state: {application_state}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
elif application_state in self.SUCCESS_STATES:
self.log.info("Spark application ended successfully")
Expand Down
14 changes: 1 addition & 13 deletions airflow/providers/common/sql/sensors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from typing import TYPE_CHECKING, Any, Callable, Mapping, Sequence

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.providers.common.sql.hooks.sql import DbApiHook
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -97,10 +97,7 @@ def poke(self, context: Context) -> bool:
records = hook.get_records(self.sql, self.parameters)
if not records:
if self.fail_on_empty:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = "No rows returned, raising as per fail_on_empty flag"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
else:
return False
Expand All @@ -109,25 +106,16 @@ def poke(self, context: Context) -> bool:
if self.failure is not None:
if callable(self.failure):
if self.failure(first_cell):
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"Failure criteria met. self.failure({first_cell}) returned True"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
else:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"self.failure is present, but not callable -> {self.failure}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)

if self.success is not None:
if callable(self.success):
return self.success(first_cell)
else:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"self.success is present, but not callable -> {self.success}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
return bool(first_cell)
5 changes: 1 addition & 4 deletions airflow/providers/common/sql/sensors/sql.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ Definition of the public interface for airflow.providers.common.sql.sensors.sql
isort:skip_file
"""
from _typeshed import Incomplete
from airflow.exceptions import (
AirflowException as AirflowException,
AirflowSkipException as AirflowSkipException,
)
from airflow.exceptions import AirflowException as AirflowException
from airflow.hooks.base import BaseHook as BaseHook
from airflow.providers.common.sql.hooks.sql import DbApiHook as DbApiHook
from airflow.sensors.base import BaseSensorOperator as BaseSensorOperator
Expand Down
14 changes: 1 addition & 13 deletions airflow/providers/databricks/sensors/databricks_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from databricks.sql.utils import ParamEscaper

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException
from airflow.providers.common.sql.hooks.sql import fetch_all_handler
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -182,10 +182,7 @@ def _generate_partition_query(
partition_columns = self._sql_sensor(f"DESCRIBE DETAIL {table_name}")[0][7]
self.log.debug("Partition columns: %s", partition_columns)
if len(partition_columns) < 1:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"Table {table_name} does not have partitions"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)

formatted_opts = ""
Expand All @@ -207,17 +204,11 @@ def _generate_partition_query(
f"""{partition_col}{self.partition_operator}{self.escaper.escape_item(partition_value)}"""
)
else:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"Column {partition_col} not part of table partitions: {partition_columns}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
else:
# Raises exception if the table does not have any partitions.
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = "No partitions specified to check with the sensor."
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
formatted_opts = f"{prefix} {joiner_val.join(output_list)} {suffix}"
self.log.debug("Formatted options: %s", formatted_opts)
Expand All @@ -231,8 +222,5 @@ def poke(self, context: Context) -> bool:
if partition_result:
return True
else:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"Specified partition(s): {self.partitions} were not found."
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
5 changes: 1 addition & 4 deletions airflow/providers/databricks/sensors/databricks_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException
from airflow.providers.common.sql.hooks.sql import fetch_all_handler
from airflow.providers.databricks.hooks.databricks_sql import DatabricksSqlHook
from airflow.sensors.base import BaseSensorOperator
Expand Down Expand Up @@ -117,13 +117,10 @@ def hook(self) -> DatabricksSqlHook:
def _get_results(self) -> bool:
"""Use the Databricks SQL hook and run the specified SQL query."""
if not (self._http_path or self._sql_warehouse_name):
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = (
"Databricks SQL warehouse/cluster configuration missing. Please specify either"
" http_path or sql_warehouse_name."
)
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
hook = self.hook
sql_result = hook.run(
Expand Down
5 changes: 1 addition & 4 deletions airflow/providers/datadog/sensors/datadog.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from datadog import api

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException
from airflow.providers.datadog.hooks.datadog import DatadogHook
from airflow.sensors.base import BaseSensorOperator

Expand Down Expand Up @@ -89,10 +89,7 @@ def poke(self, context: Context) -> bool:

if isinstance(response, dict) and response.get("status", "ok") != "ok":
self.log.error("Unexpected Datadog result: %s", response)
# TODO: remove this if check when min_airflow_version is set to higher than 2.7.1
message = "Datadog returned unexpected result"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)

if self.response_check:
Expand Down
11 changes: 1 addition & 10 deletions airflow/providers/dbt/cloud/sensors/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from deprecated import deprecated

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunException, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger
from airflow.providers.dbt.cloud.utils.openlineage import generate_openlineage_events_from_dbt_cloud_run
Expand Down Expand Up @@ -93,17 +93,11 @@ def poke(self, context: Context) -> bool:
job_run_status = self.hook.get_job_run_status(run_id=self.run_id, account_id=self.account_id)

if job_run_status == DbtCloudJobRunStatus.ERROR.value:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"Job run {self.run_id} has failed."
if self.soft_fail:
raise AirflowSkipException(message)
raise DbtCloudJobRunException(message)

if job_run_status == DbtCloudJobRunStatus.CANCELLED.value:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f"Job run {self.run_id} has been cancelled."
if self.soft_fail:
raise AirflowSkipException(message)
raise DbtCloudJobRunException(message)

return job_run_status == DbtCloudJobRunStatus.SUCCESS.value
Expand Down Expand Up @@ -141,9 +135,6 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
execution was successful.
"""
if event["status"] in ["error", "cancelled"]:
message = f"Error in dbt: {event['message']}"
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException()
self.log.info(event["message"])
return int(event["run_id"])
Expand Down
Loading

0 comments on commit 1613e9e

Please sign in to comment.