From 06088a3abcbb46533e74de360746db766d50cf66 Mon Sep 17 00:00:00 2001 From: GPK Date: Thu, 31 Oct 2024 10:44:46 +0000 Subject: [PATCH] Standard provider python operator (#42081) * add core.time provider * add source-date-epoch * change core to essentials * revert external task sensor location * add provider to airflow_providers_bug_report list * change new provider name to standard * add integration * revert hatch_build * move examples back to airflow core * change sensors example dags paths * remove init * revert howto docs * change provider as not-ready * Move python operator to standard provider * fix test failures * move python operator inside standard/operators * rebase and update python imports * keep standard provider in hatch_build * update selective checks and dependencies * update imports and dependency file * fix compat import errors * fix selective_checks and imports from compat * remove imports from compat for pythonoperator * update run_tests.py python branching test paths * add setup method in python operator tests * check imports in python operator tests * fix imports and move python virtual env scripts inside standard provider * update selective checks tests and imports in test_python * move python_virtualenv tests inside standard provider * fix pre-commit static checks * use compat provider for standard provider imports * fix _SERIALIZERS ModuleNotFoundError and static checks * import _SERIALIZERS for AIRFLOW_2_10 or higher versions * fix ruff format * fix static checks in test_dag * fix test_provide_context_does_not_fail failure: add TypeError to pytest assert --------- Co-authored-by: romsharon98 --- .pre-commit-config.yaml | 2 +- airflow/decorators/branch_external_python.py | 2 +- airflow/decorators/branch_python.py | 2 +- airflow/decorators/branch_virtualenv.py | 2 +- airflow/decorators/external_python.py | 2 +- airflow/decorators/python.py | 2 +- airflow/decorators/python_virtualenv.py | 2 +- airflow/decorators/sensor.py | 2 +- airflow/decorators/short_circuit.py | 2 +- .../example_asset_alias_with_no_taskflow.py | 2 +- .../example_dags/example_branch_operator.py | 4 +- .../example_branch_operator_decorator.py | 2 +- .../example_python_context_decorator.py | 6 +- .../example_python_context_operator.py | 12 ++- .../example_dags/example_python_decorator.py | 2 +- .../example_dags/example_python_operator.py | 2 +- airflow/example_dags/example_sensors.py | 2 +- .../example_short_circuit_operator.py | 2 +- airflow/example_dags/tutorial_dag.py | 2 +- .../tutorial_taskflow_api_virtualenv.py | 2 +- .../tutorial_taskflow_templates.py | 2 +- .../src/airflow_breeze/utils/run_tests.py | 10 +-- .../airflow_breeze/utils/selective_checks.py | 4 +- .../tests/test_pytest_args_for_test_types.py | 10 +-- dev/breeze/tests/test_selective_checks.py | 40 ++++----- dev/perf/dags/sql_perf_dag.py | 2 +- .../guides/user.rst | 4 +- .../dynamic-task-mapping.rst | 2 +- docs/apache-airflow/best-practices.rst | 16 ++-- docs/apache-airflow/core-concepts/dags.rst | 2 +- .../core-concepts/operators.rst | 4 +- docs/apache-airflow/howto/operator/python.rst | 24 +++--- .../operators-and-hooks-ref.rst | 4 +- docs/apache-airflow/tutorial/taskflow.rst | 2 +- docs/exts/templates/openlineage.rst.jinja2 | 2 +- generated/provider_dependencies.json | 15 +++- .../providers/amazon/aws/operators/appflow.py | 2 +- .../providers/apache/beam/hooks/beam.py | 2 +- .../apache/spark/decorators/pyspark.py | 2 +- .../celery/executors/celery_executor_utils.py | 5 +- .../common/compat/standard/__init__.py | 16 ++++ .../common/compat/standard/operators.py | 54 +++++++++++++ .../providers/common/compat/standard/utils.py | 31 +++++++ .../providers/docker/decorators/docker.py | 4 +- .../edge/example_dags/integration_test.py | 2 +- .../cloud/utils/mlengine_operator_utils.py | 2 +- .../providers/openlineage/provider.yaml | 2 +- .../providers/snowflake/operators/snowpark.py | 2 +- .../providers/standard}/operators/python.py | 81 +++++++------------ .../airflow/providers/standard/provider.yaml | 3 +- .../providers/standard}/sensors/python.py | 0 .../providers/standard/utils/__init__.py | 16 ++++ .../standard}/utils/python_virtualenv.py | 11 --- .../utils/python_virtualenv_script.jinja2 | 4 +- .../log_handlers/test_log_handlers.py | 3 +- .../utils/test_mlengine_operator_utils.py | 3 +- .../tests/openlineage/extractors/test_base.py | 4 +- .../openlineage/extractors/test_manager.py | 3 +- .../openlineage/extractors/test_python.py | 3 +- .../openlineage/plugins/test_listener.py | 3 +- .../tests/openlineage/plugins/test_utils.py | 4 +- .../tests/openlineage/utils/test_utils.py | 13 +-- .../tests/standard}/operators/test_python.py | 44 +++++----- .../tests/standard}/sensors/test_python.py | 4 +- providers/tests/standard/utils/__init__.py | 16 ++++ .../standard}/utils/test_python_virtualenv.py | 10 +-- .../tests/system/amazon/aws/example_s3.py | 2 +- .../system/amazon/aws/example_sagemaker.py | 2 +- .../kafka/example_dag_event_listener.py | 6 +- .../apache/kafka/example_dag_hello_kafka.py | 8 +- .../system/docker/example_docker_copy_data.py | 2 +- .../example_elasticsearch_query.py | 2 +- .../cloud/cloud_batch/example_cloud_batch.py | 2 +- .../cloud/cloud_run/example_cloud_run.py | 2 +- .../google/cloud/gcs/example_gcs_to_gcs.py | 2 +- tests/core/test_core.py | 2 +- tests/core/test_example_dags_system.py | 2 +- tests/core/test_sentry.py | 2 +- tests/dags/test_assets.py | 2 +- tests/dags/test_cli_triggered_dags.py | 2 +- tests/dags/test_dag_xcom_openlineage.py | 2 +- tests/dags/test_dagrun_fast_follow.py | 2 +- tests/dags/test_future_start_date.py | 2 +- tests/dags/test_invalid_param.py | 2 +- tests/dags/test_invalid_param2.py | 2 +- tests/dags/test_invalid_param3.py | 2 +- tests/dags/test_invalid_param4.py | 2 +- tests/dags/test_logging_in_dag.py | 2 +- tests/dags/test_mapped_classic.py | 2 +- tests/dags/test_mark_state.py | 2 +- tests/dags/test_on_failure_callback.py | 2 +- tests/dags/test_task_view_type_check.py | 2 +- tests/dags/test_valid_param.py | 2 +- tests/dags/test_valid_param2.py | 2 +- .../test_impersonation_custom.py | 2 +- tests/decorators/test_python.py | 2 +- tests/jobs/test_local_task_job.py | 2 +- tests/jobs/test_triggerer_job.py | 2 +- tests/models/test_backfill.py | 2 +- tests/models/test_baseoperatormeta.py | 2 +- tests/models/test_cleartasks.py | 2 +- tests/models/test_dag.py | 2 +- tests/models/test_dagbag.py | 2 +- tests/models/test_dagrun.py | 4 +- tests/models/test_mappedoperator.py | 6 +- tests/models/test_renderedtifields.py | 2 +- tests/models/test_taskinstance.py | 6 +- tests/models/test_xcom_arg.py | 2 +- tests/sensors/test_external_task_sensor.py | 2 +- .../serialization/test_serialized_objects.py | 2 +- .../deps/test_not_previously_skipped_dep.py | 2 +- tests/utils/log/test_log_reader.py | 2 +- tests/utils/test_db_cleanup.py | 2 +- tests/utils/test_dot_renderer.py | 6 +- tests/utils/test_edgemodifier.py | 2 +- tests/utils/test_log_handlers.py | 2 +- tests/utils/test_task_group.py | 3 +- tests/www/views/test_views_rendered.py | 3 +- tests_common/test_utils/compat.py | 4 + 119 files changed, 399 insertions(+), 283 deletions(-) create mode 100644 providers/src/airflow/providers/common/compat/standard/__init__.py create mode 100644 providers/src/airflow/providers/common/compat/standard/operators.py create mode 100644 providers/src/airflow/providers/common/compat/standard/utils.py rename {airflow => providers/src/airflow/providers/standard}/operators/python.py (95%) rename {airflow => providers/src/airflow/providers/standard}/sensors/python.py (100%) create mode 100644 providers/src/airflow/providers/standard/utils/__init__.py rename {airflow => providers/src/airflow/providers/standard}/utils/python_virtualenv.py (93%) rename {airflow => providers/src/airflow/providers/standard}/utils/python_virtualenv_script.jinja2 (95%) rename {tests => providers/tests/standard}/operators/test_python.py (98%) rename {tests => providers/tests/standard}/sensors/test_python.py (95%) create mode 100644 providers/tests/standard/utils/__init__.py rename {tests => providers/tests/standard}/utils/test_python_virtualenv.py (92%) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 78a5b3645c01..20c7a293bff0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -724,7 +724,7 @@ repos: files: > (?x) ^providers/src/airflow/providers/.*\.py$ - exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py + exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py - id: check-get-lineage-collector-providers language: python name: Check providers import hook lineage code from compat diff --git a/airflow/decorators/branch_external_python.py b/airflow/decorators/branch_external_python.py index 2902a47c6774..dbba01034ff5 100644 --- a/airflow/decorators/branch_external_python.py +++ b/airflow/decorators/branch_external_python.py @@ -20,7 +20,7 @@ from airflow.decorators.base import task_decorator_factory from airflow.decorators.python import _PythonDecoratedOperator -from airflow.operators.python import BranchExternalPythonOperator +from airflow.providers.standard.operators.python import BranchExternalPythonOperator if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator diff --git a/airflow/decorators/branch_python.py b/airflow/decorators/branch_python.py index 31750ef657a9..3d955a480b75 100644 --- a/airflow/decorators/branch_python.py +++ b/airflow/decorators/branch_python.py @@ -20,7 +20,7 @@ from airflow.decorators.base import task_decorator_factory from airflow.decorators.python import _PythonDecoratedOperator -from airflow.operators.python import BranchPythonOperator +from airflow.providers.standard.operators.python import BranchPythonOperator if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator diff --git a/airflow/decorators/branch_virtualenv.py b/airflow/decorators/branch_virtualenv.py index c96638ee2024..c2c39cfe58d3 100644 --- a/airflow/decorators/branch_virtualenv.py +++ b/airflow/decorators/branch_virtualenv.py @@ -20,7 +20,7 @@ from airflow.decorators.base import task_decorator_factory from airflow.decorators.python import _PythonDecoratedOperator -from airflow.operators.python import BranchPythonVirtualenvOperator +from airflow.providers.standard.operators.python import BranchPythonVirtualenvOperator if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator diff --git a/airflow/decorators/external_python.py b/airflow/decorators/external_python.py index 2d8e2603f94d..e57fccac141a 100644 --- a/airflow/decorators/external_python.py +++ b/airflow/decorators/external_python.py @@ -20,7 +20,7 @@ from airflow.decorators.base import task_decorator_factory from airflow.decorators.python import _PythonDecoratedOperator -from airflow.operators.python import ExternalPythonOperator +from airflow.providers.standard.operators.python import ExternalPythonOperator if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py index 7a890cf86227..b65a4a966700 100644 --- a/airflow/decorators/python.py +++ b/airflow/decorators/python.py @@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, Callable, Sequence from airflow.decorators.base import DecoratedOperator, task_decorator_factory -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator diff --git a/airflow/decorators/python_virtualenv.py b/airflow/decorators/python_virtualenv.py index d0eb93a0d7aa..869d61692d11 100644 --- a/airflow/decorators/python_virtualenv.py +++ b/airflow/decorators/python_virtualenv.py @@ -20,7 +20,7 @@ from airflow.decorators.base import task_decorator_factory from airflow.decorators.python import _PythonDecoratedOperator -from airflow.operators.python import PythonVirtualenvOperator +from airflow.providers.standard.operators.python import PythonVirtualenvOperator if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator diff --git a/airflow/decorators/sensor.py b/airflow/decorators/sensor.py index 9ee4eeb2a79c..5d409c2d599d 100644 --- a/airflow/decorators/sensor.py +++ b/airflow/decorators/sensor.py @@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Callable, ClassVar, Sequence from airflow.decorators.base import get_unique_task_id, task_decorator_factory -from airflow.sensors.python import PythonSensor +from airflow.providers.standard.sensors.python import PythonSensor if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator diff --git a/airflow/decorators/short_circuit.py b/airflow/decorators/short_circuit.py index c964ed6bb75f..89fa6ac5ac6d 100644 --- a/airflow/decorators/short_circuit.py +++ b/airflow/decorators/short_circuit.py @@ -20,7 +20,7 @@ from airflow.decorators.base import task_decorator_factory from airflow.decorators.python import _PythonDecoratedOperator -from airflow.operators.python import ShortCircuitOperator +from airflow.providers.standard.operators.python import ShortCircuitOperator if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator diff --git a/airflow/example_dags/example_asset_alias_with_no_taskflow.py b/airflow/example_dags/example_asset_alias_with_no_taskflow.py index 3293f7e45bb9..c9b04d66d2f6 100644 --- a/airflow/example_dags/example_asset_alias_with_no_taskflow.py +++ b/airflow/example_dags/example_asset_alias_with_no_taskflow.py @@ -37,7 +37,7 @@ from airflow import DAG from airflow.assets import Asset, AssetAlias -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator with DAG( dag_id="asset_s3_bucket_producer_with_no_taskflow", diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index 492d6315d1f2..db79ac0c211a 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -29,12 +29,12 @@ import pendulum -from airflow.operators.python import is_venv_installed +from airflow.providers.standard.operators.python import is_venv_installed if is_venv_installed(): from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator - from airflow.operators.python import ( + from airflow.providers.standard.operators.python import ( BranchExternalPythonOperator, BranchPythonOperator, BranchPythonVirtualenvOperator, diff --git a/airflow/example_dags/example_branch_operator_decorator.py b/airflow/example_dags/example_branch_operator_decorator.py index 59cb3b291947..6777db38a9b6 100644 --- a/airflow/example_dags/example_branch_operator_decorator.py +++ b/airflow/example_dags/example_branch_operator_decorator.py @@ -30,7 +30,7 @@ import pendulum -from airflow.operators.python import is_venv_installed +from airflow.providers.standard.operators.python import is_venv_installed if is_venv_installed(): from airflow.decorators import task diff --git a/airflow/example_dags/example_python_context_decorator.py b/airflow/example_dags/example_python_context_decorator.py index 497ee08e17ce..9cfc31875742 100644 --- a/airflow/example_dags/example_python_context_decorator.py +++ b/airflow/example_dags/example_python_context_decorator.py @@ -45,7 +45,7 @@ def print_context() -> str: """Print the Airflow context.""" from pprint import pprint - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context context = get_current_context() pprint(context) @@ -60,7 +60,7 @@ def print_context_venv() -> str: """Print the Airflow context in venv.""" from pprint import pprint - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context context = get_current_context() pprint(context) @@ -77,7 +77,7 @@ def print_context_external() -> str: """Print the Airflow context in external python.""" from pprint import pprint - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context context = get_current_context() pprint(context) diff --git a/airflow/example_dags/example_python_context_operator.py b/airflow/example_dags/example_python_context_operator.py index f1b76c527cfd..4dc9383dd06d 100644 --- a/airflow/example_dags/example_python_context_operator.py +++ b/airflow/example_dags/example_python_context_operator.py @@ -28,7 +28,11 @@ import pendulum from airflow import DAG -from airflow.operators.python import ExternalPythonOperator, PythonOperator, PythonVirtualenvOperator +from airflow.providers.standard.operators.python import ( + ExternalPythonOperator, + PythonOperator, + PythonVirtualenvOperator, +) SOME_EXTERNAL_PYTHON = sys.executable @@ -44,7 +48,7 @@ def print_context() -> str: """Print the Airflow context.""" from pprint import pprint - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context context = get_current_context() pprint(context) @@ -58,7 +62,7 @@ def print_context_venv() -> str: """Print the Airflow context in venv.""" from pprint import pprint - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context context = get_current_context() pprint(context) @@ -74,7 +78,7 @@ def print_context_external() -> str: """Print the Airflow context in external python.""" from pprint import pprint - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context context = get_current_context() pprint(context) diff --git a/airflow/example_dags/example_python_decorator.py b/airflow/example_dags/example_python_decorator.py index 264fc4333349..62de60fd6987 100644 --- a/airflow/example_dags/example_python_decorator.py +++ b/airflow/example_dags/example_python_decorator.py @@ -30,7 +30,7 @@ import pendulum from airflow.decorators import dag, task -from airflow.operators.python import is_venv_installed +from airflow.providers.standard.operators.python import is_venv_installed log = logging.getLogger(__name__) diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index a1ebb84ddff0..8452dfb9358b 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -30,7 +30,7 @@ import pendulum from airflow.models.dag import DAG -from airflow.operators.python import ( +from airflow.providers.standard.operators.python import ( ExternalPythonOperator, PythonOperator, PythonVirtualenvOperator, diff --git a/airflow/example_dags/example_sensors.py b/airflow/example_dags/example_sensors.py index f63908385810..52b1b84e223e 100644 --- a/airflow/example_dags/example_sensors.py +++ b/airflow/example_dags/example_sensors.py @@ -24,11 +24,11 @@ from airflow.models.dag import DAG from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.sensors.bash import BashSensor +from airflow.providers.standard.sensors.python import PythonSensor from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync from airflow.providers.standard.sensors.weekday import DayOfWeekSensor from airflow.sensors.filesystem import FileSensor -from airflow.sensors.python import PythonSensor from airflow.utils.trigger_rule import TriggerRule from airflow.utils.weekday import WeekDay diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index 3941ff17f95a..5ffab8a14bc0 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -24,7 +24,7 @@ from airflow.models.baseoperator import chain from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.python import ShortCircuitOperator +from airflow.providers.standard.operators.python import ShortCircuitOperator from airflow.utils.trigger_rule import TriggerRule with DAG( diff --git a/airflow/example_dags/tutorial_dag.py b/airflow/example_dags/tutorial_dag.py index 553b194fef0d..0e4f5086efc9 100644 --- a/airflow/example_dags/tutorial_dag.py +++ b/airflow/example_dags/tutorial_dag.py @@ -33,7 +33,7 @@ from airflow.models.dag import DAG # Operators; we need this to operate! -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator # [END import_module] diff --git a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py index 3860876e6e68..357a1e7b290a 100644 --- a/airflow/example_dags/tutorial_taskflow_api_virtualenv.py +++ b/airflow/example_dags/tutorial_taskflow_api_virtualenv.py @@ -21,7 +21,7 @@ from datetime import datetime from airflow.decorators import dag, task -from airflow.operators.python import is_venv_installed +from airflow.providers.standard.operators.python import is_venv_installed log = logging.getLogger(__name__) diff --git a/airflow/example_dags/tutorial_taskflow_templates.py b/airflow/example_dags/tutorial_taskflow_templates.py index 925f60524b5e..19206bff572a 100644 --- a/airflow/example_dags/tutorial_taskflow_templates.py +++ b/airflow/example_dags/tutorial_taskflow_templates.py @@ -22,7 +22,7 @@ import pendulum from airflow.decorators import dag, task -from airflow.operators.python import get_current_context +from airflow.providers.standard.operators.python import get_current_context # [END import_module] diff --git a/dev/breeze/src/airflow_breeze/utils/run_tests.py b/dev/breeze/src/airflow_breeze/utils/run_tests.py index 81cd0671dd8f..f80998af8455 100644 --- a/dev/breeze/src/airflow_breeze/utils/run_tests.py +++ b/dev/breeze/src/airflow_breeze/utils/run_tests.py @@ -135,10 +135,10 @@ def get_excluded_provider_args(python_version: str) -> list[str]: "Always": ["tests/always"], "API": ["tests/api", "tests/api_connexion", "tests/api_internal", "tests/api_fastapi"], "BranchPythonVenv": [ - "tests/operators/test_python.py::TestBranchPythonVirtualenvOperator", + "providers/tests/standard/operators/test_python.py::TestBranchPythonVirtualenvOperator", ], "BranchExternalPython": [ - "tests/operators/test_python.py::TestBranchExternalPythonOperator", + "providers/tests/standard/operators/test_python.py::TestBranchExternalPythonOperator", ], "CLI": ["tests/cli"], "Core": [ @@ -150,7 +150,7 @@ def get_excluded_provider_args(python_version: str) -> list[str]: "tests/utils", ], "ExternalPython": [ - "tests/operators/test_python.py::TestExternalPythonOperator", + "providers/tests/standard/operators/test_python.py::TestExternalPythonOperator", ], "Integration": ["tests/integration"], # Operators test type excludes Virtualenv/External tests - they have their own test types @@ -158,12 +158,12 @@ def get_excluded_provider_args(python_version: str) -> list[str]: # this one is mysteriously failing dill serialization. It could be removed once # https://github.com/pytest-dev/pytest/issues/10845 is fixed "PlainAsserts": [ - "tests/operators/test_python.py::TestPythonVirtualenvOperator::test_airflow_context", + "providers/tests/standard/operators/test_python.py::TestPythonVirtualenvOperator::test_airflow_context", "--assert=plain", ], "Providers": ["providers/tests"], "PythonVenv": [ - "tests/operators/test_python.py::TestPythonVirtualenvOperator", + "providers/tests/standard/operators/test_python.py::TestPythonVirtualenvOperator", ], "Serialization": [ "tests/serialization", diff --git a/dev/breeze/src/airflow_breeze/utils/selective_checks.py b/dev/breeze/src/airflow_breeze/utils/selective_checks.py index 799e7ec3fbe2..7a1b802fa509 100644 --- a/dev/breeze/src/airflow_breeze/utils/selective_checks.py +++ b/dev/breeze/src/airflow_breeze/utils/selective_checks.py @@ -276,8 +276,8 @@ def __hash__(self): ) PYTHON_OPERATOR_FILES = [ - r"^airflow/operators/python.py", - r"^tests/operators/test_python.py", + r"^providers/src/providers/standard/operators/python.py", + r"^providers/tests/standard/operators/test_python.py", ] TEST_TYPE_MATCHES = HashableDict( diff --git a/dev/breeze/tests/test_pytest_args_for_test_types.py b/dev/breeze/tests/test_pytest_args_for_test_types.py index de636a9893d2..e149db971d7e 100644 --- a/dev/breeze/tests/test_pytest_args_for_test_types.py +++ b/dev/breeze/tests/test_pytest_args_for_test_types.py @@ -110,7 +110,7 @@ ( "PlainAsserts", [ - "tests/operators/test_python.py::TestPythonVirtualenvOperator::test_airflow_context", + "providers/tests/standard/operators/test_python.py::TestPythonVirtualenvOperator::test_airflow_context", "--assert=plain", ], False, @@ -123,28 +123,28 @@ ( "PythonVenv", [ - "tests/operators/test_python.py::TestPythonVirtualenvOperator", + "providers/tests/standard/operators/test_python.py::TestPythonVirtualenvOperator", ], False, ), ( "BranchPythonVenv", [ - "tests/operators/test_python.py::TestBranchPythonVirtualenvOperator", + "providers/tests/standard/operators/test_python.py::TestBranchPythonVirtualenvOperator", ], False, ), ( "ExternalPython", [ - "tests/operators/test_python.py::TestExternalPythonOperator", + "providers/tests/standard/operators/test_python.py::TestExternalPythonOperator", ], False, ), ( "BranchExternalPython", [ - "tests/operators/test_python.py::TestBranchExternalPythonOperator", + "providers/tests/standard/operators/test_python.py::TestBranchExternalPythonOperator", ], False, ), diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index 95ecac07b9df..a6b7963848c0 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -271,7 +271,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): ), ( pytest.param( - ("airflow/operators/python.py",), + ("providers/src/airflow/providers/standard/operators/python.py",), { "affected-providers-list-as-string": None, "all-python-versions": "['3.9']", @@ -284,16 +284,14 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "run-tests": "true", "run-amazon-tests": "false", "docs-build": "true", - "skip-pre-commits": "check-provider-yaml-valid,identity,lint-helm-chart,mypy-airflow,mypy-dev," + "skip-pre-commits": "identity,lint-helm-chart,mypy-airflow,mypy-dev," "mypy-docs,mypy-providers,mypy-task-sdk,ts-compile-format-lint-ui,ts-compile-format-lint-www", "upgrade-to-newer-dependencies": "false", - "parallel-test-types-list-as-string": "Always BranchExternalPython BranchPythonVenv " - "ExternalPython Operators PythonVenv", - "providers-test-types-list-as-string": "", - "separate-test-types-list-as-string": "Always BranchExternalPython BranchPythonVenv " - "ExternalPython Operators PythonVenv", + "parallel-test-types-list-as-string": "Always Providers[common.compat,standard]", + "providers-test-types-list-as-string": "Providers[common.compat,standard]", + "separate-test-types-list-as-string": "Always Providers[common.compat] Providers[standard]", "needs-mypy": "true", - "mypy-checks": "['mypy-airflow']", + "mypy-checks": "['mypy-providers']", }, id="Only Python tests", ) @@ -364,7 +362,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): pytest.param( ("providers/tests/apache/beam/file.py",), { - "affected-providers-list-as-string": "apache.beam google", + "affected-providers-list-as-string": "apache.beam common.compat google", "all-python-versions": "['3.9']", "all-python-versions-list-as-string": "3.9", "python-versions": "['3.9']", @@ -381,9 +379,9 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): ), "run-kubernetes-tests": "false", "upgrade-to-newer-dependencies": "false", - "parallel-test-types-list-as-string": "Always Providers[apache.beam] Providers[google]", - "providers-test-types-list-as-string": "Providers[apache.beam] Providers[google]", - "separate-test-types-list-as-string": "Always Providers[apache.beam] Providers[google]", + "parallel-test-types-list-as-string": "Always Providers[apache.beam,common.compat] Providers[google]", + "providers-test-types-list-as-string": "Providers[apache.beam,common.compat] Providers[google]", + "separate-test-types-list-as-string": "Always Providers[apache.beam] Providers[common.compat] Providers[google]", "needs-mypy": "true", "mypy-checks": "['mypy-providers']", }, @@ -660,9 +658,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "run-kubernetes-tests": "false", "upgrade-to-newer-dependencies": "false", "run-amazon-tests": "true", - "parallel-test-types-list-as-string": "Always Providers[amazon] " - "Providers[apache.hive,cncf.kubernetes,common.compat,common.sql,exasol,ftp,http," - "imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh,teradata] Providers[google]", + "parallel-test-types-list-as-string": "Always Providers[amazon] Providers[apache.hive,cncf.kubernetes,common.compat,common.sql,exasol,ftp,http,imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh,teradata] Providers[google]", "needs-mypy": "true", "mypy-checks": "['mypy-providers']", }, @@ -712,9 +708,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "ts-compile-format-lint-ui,ts-compile-format-lint-www", "run-kubernetes-tests": "false", "upgrade-to-newer-dependencies": "false", - "parallel-test-types-list-as-string": "Always Providers[amazon] " - "Providers[apache.hive,cncf.kubernetes,common.compat,common.sql,exasol,ftp,http," - "imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh,teradata] Providers[google]", + "parallel-test-types-list-as-string": "Always Providers[amazon] Providers[apache.hive,cncf.kubernetes,common.compat,common.sql,exasol,ftp,http,imap,microsoft.azure,mongo,mysql,openlineage,postgres,salesforce,ssh,teradata] Providers[google]", "needs-mypy": "true", "mypy-checks": "['mypy-providers']", }, @@ -751,7 +745,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): pytest.param( ("providers/src/airflow/providers/standard/operators/bash.py",), { - "affected-providers-list-as-string": "standard", + "affected-providers-list-as-string": "common.compat standard", "all-python-versions": "['3.9']", "all-python-versions-list-as-string": "3.9", "python-versions": "['3.9']", @@ -766,14 +760,14 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "skip-pre-commits": "identity,lint-helm-chart,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,mypy-task-sdk," "ts-compile-format-lint-ui,ts-compile-format-lint-www", "upgrade-to-newer-dependencies": "false", - "parallel-test-types-list-as-string": "Always Core Providers[standard] Serialization", + "parallel-test-types-list-as-string": "Always Core Providers[common.compat,standard] Serialization", "needs-mypy": "true", "mypy-checks": "['mypy-providers']", }, id="Providers standard tests and Serialization tests to run when airflow bash.py changed", ), pytest.param( - ("providers/tests/standard/operators/bash.py",), + ("providers/src/airflow/providers/standard/operators/bash.py",), { "affected-providers-list-as-string": None, "all-python-versions": "['3.9']", @@ -785,12 +779,12 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str): "needs-helm-tests": "false", "run-tests": "true", "run-amazon-tests": "false", - "docs-build": "false", + "docs-build": "true", "run-kubernetes-tests": "false", "skip-pre-commits": "identity,lint-helm-chart,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,mypy-task-sdk," "ts-compile-format-lint-ui,ts-compile-format-lint-www", "upgrade-to-newer-dependencies": "false", - "parallel-test-types-list-as-string": "Always Core Providers[standard] Serialization", + "parallel-test-types-list-as-string": "Always Core Providers[common.compat,standard] Serialization", "needs-mypy": "true", "mypy-checks": "['mypy-providers']", }, diff --git a/dev/perf/dags/sql_perf_dag.py b/dev/perf/dags/sql_perf_dag.py index 09ac9b0a53c2..bfa28ea6b814 100644 --- a/dev/perf/dags/sql_perf_dag.py +++ b/dev/perf/dags/sql_perf_dag.py @@ -19,7 +19,7 @@ from datetime import datetime, timedelta from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator default_args = { "owner": "Airflow", diff --git a/docs/apache-airflow-providers-openlineage/guides/user.rst b/docs/apache-airflow-providers-openlineage/guides/user.rst index 4f95253e1f24..9098da623445 100644 --- a/docs/apache-airflow-providers-openlineage/guides/user.rst +++ b/docs/apache-airflow-providers-openlineage/guides/user.rst @@ -257,13 +257,13 @@ full import paths of Airflow Operators to disable as ``disabled_for_operators`` [openlineage] transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"} - disabled_for_operators = 'airflow.providers.standard.operators.bash.BashOperator;airflow.operators.python.PythonOperator' + disabled_for_operators = 'airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator' ``AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS`` environment variable is an equivalent. .. code-block:: ini - AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.providers.standard.operators.bash.BashOperator;airflow.operators.python.PythonOperator' + AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.providers.standard.operators.bash.BashOperator;airflow.providers.standard.operators.python.PythonOperator' Full Task Info ^^^^^^^^^^^^^^ diff --git a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst index fd7d57078543..c01ab7b1cf46 100644 --- a/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst +++ b/docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst @@ -196,7 +196,7 @@ Since the template is rendered after the main execution block, it is possible to .. code-block:: python - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context @task(map_index_template="{{ my_variable }}") diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index 466f546ff714..0f4b77001f20 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -228,7 +228,7 @@ Imagine this code: .. code-block:: python from airflow import DAG - from airflow.operators.python import PythonOperator + from airflow.providers.standard.operators.python import PythonOperator import pendulum @@ -260,7 +260,7 @@ What you can do to check it is add some print statements to the code you want to .. code-block:: python from airflow import DAG - from airflow.operators.python import PythonOperator + from airflow.providers.standard.operators.python import PythonOperator import pendulum @@ -891,7 +891,7 @@ of Airflow, or even that dependencies of several of your Custom Operators introd There are a number of strategies that can be employed to mitigate the problem. And while dealing with dependency conflict in custom operators is difficult, it's actually quite a bit easier when it comes to -using :class:`airflow.operators.python.PythonVirtualenvOperator` or :class:`airflow.operators.python.ExternalPythonOperator` +using :class:`airflow.providers.standard.operators.python.PythonVirtualenvOperator` or :class:`airflow.providers.standard.operators.python.ExternalPythonOperator` - either directly using classic "operator" approach or by using tasks decorated with ``@task.virtualenv`` or ``@task.external_python`` decorators if you use TaskFlow. @@ -905,7 +905,7 @@ This is simplest to use and most limited strategy. The PythonVirtualenvOperator create a virtualenv that your Python callable function will execute in. In the modern TaskFlow approach described in :doc:`/tutorial/taskflow`. this also can be done with decorating your callable with ``@task.virtualenv`` decorator (recommended way of using the operator). -Each :class:`airflow.operators.python.PythonVirtualenvOperator` task can +Each :class:`airflow.providers.standard.operators.python.PythonVirtualenvOperator` task can have its own independent Python virtualenv (dynamically created every time the task is run) and can specify fine-grained set of requirements that need to be installed for that task to execute. @@ -953,7 +953,7 @@ There are certain limitations and overhead introduced by this operator: that running tasks will still interfere with each other - for example subsequent tasks executed on the same worker might be affected by previous tasks creating/modifying files etc. -You can see detailed examples of using :class:`airflow.operators.python.PythonVirtualenvOperator` in +You can see detailed examples of using :class:`airflow.providers.standard.operators.python.PythonVirtualenvOperator` in :ref:`Taskflow Virtualenv example ` @@ -963,11 +963,11 @@ Using ExternalPythonOperator .. versionadded:: 2.4 A bit more involved but with significantly less overhead, security, stability problems is to use the -:class:`airflow.operators.python.ExternalPythonOperator``. In the modern +:class:`airflow.providers.standard.operators.python.ExternalPythonOperator``. In the modern TaskFlow approach described in :doc:`/tutorial/taskflow`. this also can be done with decorating your callable with ``@task.external_python`` decorator (recommended way of using the operator). It requires, however, that you have a pre-existing, immutable Python environment, that is prepared upfront. -Unlike in :class:`airflow.operators.python.PythonVirtualenvOperator` you cannot add new dependencies +Unlike in :class:`airflow.providers.standard.operators.python.PythonVirtualenvOperator` you cannot add new dependencies to such pre-existing environment. All dependencies you need should be added upfront in your environment and available in all the workers in case your Airflow runs in a distributed environment. @@ -1021,7 +1021,7 @@ after your DevOps/System Admin teams deploy your new dependencies in pre-existin The nice thing about this is that you can switch the decorator back at any time and continue developing it "dynamically" with ``PythonVirtualenvOperator``. -You can see detailed examples of using :class:`airflow.operators.python.ExternalPythonOperator` in +You can see detailed examples of using :class:`airflow.providers.standard.operators.python.ExternalPythonOperator` in :ref:`Taskflow External Python example ` Using DockerOperator or Kubernetes Pod Operator diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst index 0b4261e80a71..ae4da4ab18c6 100644 --- a/docs/apache-airflow/core-concepts/dags.rst +++ b/docs/apache-airflow/core-concepts/dags.rst @@ -362,7 +362,7 @@ The ``@task.branch`` can also be used with XComs allowing branching context to d If you wish to implement your own operators with branching functionality, you can inherit from :class:`~airflow.operators.branch.BaseBranchOperator`, which behaves similarly to ``@task.branch`` decorator but expects you to provide an implementation of the method ``choose_branch``. .. note:: - The ``@task.branch`` decorator is recommended over directly instantiating :class:`~airflow.operators.python.BranchPythonOperator` in a DAG. The latter should generally only be subclassed to implement a custom operator. + The ``@task.branch`` decorator is recommended over directly instantiating :class:`~airflow.providers.standard.operators.python.BranchPythonOperator` in a DAG. The latter should generally only be subclassed to implement a custom operator. As with the callable for ``@task.branch``, this method can return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped. It can also return None to skip all downstream task:: diff --git a/docs/apache-airflow/core-concepts/operators.rst b/docs/apache-airflow/core-concepts/operators.rst index 482fa40cb3aa..e721faa97560 100644 --- a/docs/apache-airflow/core-concepts/operators.rst +++ b/docs/apache-airflow/core-concepts/operators.rst @@ -29,12 +29,12 @@ An Operator is conceptually a template for a predefined :doc:`Task `, tha Airflow has a very extensive set of operators available, with some built-in to the core or pre-installed providers. Some popular operators from core include: - :class:`~airflow.providers.standard.operators.bash.BashOperator` - executes a bash command -- :class:`~airflow.operators.python.PythonOperator` - calls an arbitrary Python function +- :class:`~airflow.providers.standard.operators.python.PythonOperator` - calls an arbitrary Python function - :class:`~airflow.operators.email.EmailOperator` - sends an email - Use the ``@task`` decorator to execute an arbitrary Python function. It doesn't support rendering jinja templates passed as arguments. .. note:: - The ``@task`` decorator is recommended over the classic :class:`~airflow.operators.python.PythonOperator` + The ``@task`` decorator is recommended over the classic :class:`~airflow.providers.standard.operators.python.PythonOperator` to execute Python callables with no template rendering in its arguments. For a list of all core operators, see: :doc:`Core Operators and Hooks Reference `. diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index e68a257bcd67..5d06bfa3d3e1 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -22,7 +22,7 @@ PythonOperator ============== -Use the :class:`~airflow.operators.python.PythonOperator` to execute Python callables. +Use the :class:`~airflow.providers.standard.operators.python.PythonOperator` to execute Python callables. .. tip:: The ``@task`` decorator is recommended over the classic ``PythonOperator`` to execute Python callables. @@ -138,7 +138,7 @@ In this case, the type hint can be used for static analysis. PythonVirtualenvOperator ======================== -Use the :class:`~airflow.operators.python.PythonVirtualenvOperator` decorator to execute Python callables +Use the :class:`~airflow.providers.standard.operators.python.PythonVirtualenvOperator` decorator to execute Python callables inside a new Python virtual environment. The ``virtualenv`` package needs to be installed in the environment that runs Airflow (as optional dependency ``pip install apache-airflow[virtualenv] --constraint ...``). @@ -187,7 +187,7 @@ If you want the context related to datetime objects like ``data_interval_start`` The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code. As in the examples you need to add all imports again and you can not rely on variables from the global Python context. - If you want to pass variables into the classic :class:`~airflow.operators.python.PythonVirtualenvOperator` use + If you want to pass variables into the classic :class:`~airflow.providers.standard.operators.python.PythonVirtualenvOperator` use ``op_args`` and ``op_kwargs``. If additional parameters for package installation are needed pass them in via the ``pip_install_options`` parameter or use a @@ -294,7 +294,7 @@ environment, there is no need for ``activation`` of the environment. Merely usin automatically activates it. In both examples below ``PATH_TO_PYTHON_BINARY`` is such a path, pointing to the executable Python binary. -Use the :class:`~airflow.operators.python.ExternalPythonOperator` to execute Python callables inside a +Use the :class:`~airflow.providers.standard.operators.python.ExternalPythonOperator` to execute Python callables inside a pre-defined environment. The virtualenv package should be preinstalled in the environment where Python is run. In case ``dill`` is used, it has to be preinstalled in the environment (the same version that is installed in main Airflow environment). @@ -339,7 +339,7 @@ If you want the context related to datetime objects like ``data_interval_start`` The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code. As in the examples you need to add all imports again and you can not rely on variables from the global Python context. - If you want to pass variables into the classic :class:`~airflow.operators.python.ExternalPythonOperator` use + If you want to pass variables into the classic :class:`~airflow.providers.standard.operators.python.ExternalPythonOperator` use ``op_args`` and ``op_kwargs``. Templating @@ -377,7 +377,7 @@ You can use ``Context`` under the same conditions as ``PythonVirtualenvOperator` PythonBranchOperator ==================== -Use the :class:`~airflow.operators.python.PythonBranchOperator` to execute Python :ref:`branching ` +Use the :class:`~airflow.providers.standard.operators.python.PythonBranchOperator` to execute Python :ref:`branching ` tasks. .. tip:: @@ -414,8 +414,8 @@ Argument passing and templating options are the same as with :ref:`howto/operato BranchPythonVirtualenvOperator ============================== -Use the :class:`~airflow.operators.python.BranchPythonVirtualenvOperator` decorator to execute Python :ref:`branching ` -tasks and is a hybrid of the :class:`~airflow.operators.python.PythonBranchOperator` with execution in a virtual environment. +Use the :class:`~airflow.providers.standard.operators.python.BranchPythonVirtualenvOperator` decorator to execute Python :ref:`branching ` +tasks and is a hybrid of the :class:`~airflow.providers.standard.operators.python.PythonBranchOperator` with execution in a virtual environment. .. tip:: The ``@task.branch_virtualenv`` decorator is recommended over the classic @@ -451,8 +451,8 @@ Argument passing and templating options are the same as with :ref:`howto/operato BranchExternalPythonOperator ============================ -Use the :class:`~airflow.operators.python.BranchExternalPythonOperator` to execute Python :ref:`branching ` -tasks and is a hybrid of the :class:`~airflow.operators.python.PythonBranchOperator` with execution in an +Use the :class:`~airflow.providers.standard.operators.python.BranchExternalPythonOperator` to execute Python :ref:`branching ` +tasks and is a hybrid of the :class:`~airflow.providers.standard.operators.python.PythonBranchOperator` with execution in an external Python environment. .. tip:: @@ -490,7 +490,7 @@ Argument passing and templating options are the same as with :ref:`howto/operato ShortCircuitOperator ==================== -Use the :class:`~airflow.operators.python.ShortCircuitOperator` to control whether a pipeline continues +Use the :class:`~airflow.providers.standard.operators.python.ShortCircuitOperator` to control whether a pipeline continues if a condition is satisfied or a truthy value is obtained. The evaluation of this condition and truthy value is done via the output of a callable. If the @@ -570,7 +570,7 @@ Argument passing and templating options are the same as with :ref:`howto/operato PythonSensor ============ -The :class:`~airflow.sensors.python.PythonSensor` executes an arbitrary callable and waits for its return +The :class:`~airflow.providers.standard.sensors.python.PythonSensor` executes an arbitrary callable and waits for its return value to be True. .. tip:: diff --git a/docs/apache-airflow/operators-and-hooks-ref.rst b/docs/apache-airflow/operators-and-hooks-ref.rst index 655551705e41..026d206ea5f0 100644 --- a/docs/apache-airflow/operators-and-hooks-ref.rst +++ b/docs/apache-airflow/operators-and-hooks-ref.rst @@ -68,7 +68,7 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - :mod:`airflow.operators.latest_only` - - * - :mod:`airflow.operators.python` + * - :mod:`airflow.providers.standard.operators.python` - :doc:`How to use ` * - :mod:`airflow.operators.trigger_dagrun` @@ -91,7 +91,7 @@ For details see: :doc:`apache-airflow-providers:operators-and-hooks-ref/index`. * - :mod:`airflow.sensors.filesystem` - :ref:`How to use ` - * - :mod:`airflow.sensors.python` + * - :mod:`airflow.providers.standard.sensors.python` - :ref:`How to use ` * - :mod:`airflow.sensors.weekday` diff --git a/docs/apache-airflow/tutorial/taskflow.rst b/docs/apache-airflow/tutorial/taskflow.rst index 37d0d93f5999..72b5ac82dd04 100644 --- a/docs/apache-airflow/tutorial/taskflow.rst +++ b/docs/apache-airflow/tutorial/taskflow.rst @@ -619,7 +619,7 @@ method. .. code-block:: python - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context def some_function_in_your_library(): diff --git a/docs/exts/templates/openlineage.rst.jinja2 b/docs/exts/templates/openlineage.rst.jinja2 index 5f7ee49e7c12..9a134ef47eca 100644 --- a/docs/exts/templates/openlineage.rst.jinja2 +++ b/docs/exts/templates/openlineage.rst.jinja2 @@ -22,7 +22,7 @@ At the moment, two core operators supports OpenLineage. These operators function capable of running any code, which might limit the extent of lineage extraction. To enhance the extraction of lineage information, operators can utilize the hooks listed below that support OpenLineage. -- :class:`~airflow.operators.python.PythonOperator` (via :class:`airflow.providers.openlineage.extractors.python.PythonExtractor`) +- :class:`~airflow.providers.standard.operators.python.PythonOperator` (via :class:`airflow.providers.openlineage.extractors.python.PythonExtractor`) - :class:`~airflow.providers.standard.operators.bash.BashOperator` (via :class:`airflow.providers.openlineage.extractors.bash.BashExtractor`) diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 4a8ec90c7cdb..058f7c00063e 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -81,6 +81,7 @@ "devel-deps": [], "plugins": [], "cross-providers-deps": [ + "common.compat", "google" ], "excluded-python-versions": [ @@ -285,7 +286,8 @@ "devel-deps": [], "plugins": [], "cross-providers-deps": [ - "cncf.kubernetes" + "cncf.kubernetes", + "common.compat" ], "excluded-python-versions": [], "state": "ready" @@ -396,7 +398,8 @@ "devel-deps": [], "plugins": [], "cross-providers-deps": [ - "openlineage" + "openlineage", + "standard" ], "excluded-python-versions": [], "state": "ready" @@ -516,7 +519,9 @@ ], "devel-deps": [], "plugins": [], - "cross-providers-deps": [], + "cross-providers-deps": [ + "common.compat" + ], "excluded-python-versions": [], "state": "ready" }, @@ -532,7 +537,9 @@ "plugin-class": "airflow.providers.edge.plugins.edge_executor_plugin.EdgeExecutorPlugin" } ], - "cross-providers-deps": [], + "cross-providers-deps": [ + "common.compat" + ], "excluded-python-versions": [], "state": "not-ready" }, diff --git a/providers/src/airflow/providers/amazon/aws/operators/appflow.py b/providers/src/airflow/providers/amazon/aws/operators/appflow.py index 4aced5bea0c1..a8f81b28b033 100644 --- a/providers/src/airflow/providers/amazon/aws/operators/appflow.py +++ b/providers/src/airflow/providers/amazon/aws/operators/appflow.py @@ -21,11 +21,11 @@ from typing import TYPE_CHECKING, cast from airflow.exceptions import AirflowException -from airflow.operators.python import ShortCircuitOperator from airflow.providers.amazon.aws.hooks.appflow import AppflowHook from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator from airflow.providers.amazon.aws.utils import datetime_to_epoch_ms from airflow.providers.amazon.aws.utils.mixins import AwsBaseHookMixin, AwsHookParams, aws_template_fields +from airflow.providers.common.compat.standard.operators import ShortCircuitOperator if TYPE_CHECKING: from mypy_boto3_appflow.type_defs import ( diff --git a/providers/src/airflow/providers/apache/beam/hooks/beam.py b/providers/src/airflow/providers/apache/beam/hooks/beam.py index d772bb47329a..9b5d216c3dc0 100644 --- a/providers/src/airflow/providers/apache/beam/hooks/beam.py +++ b/providers/src/airflow/providers/apache/beam/hooks/beam.py @@ -37,8 +37,8 @@ from airflow.exceptions import AirflowConfigException, AirflowException from airflow.hooks.base import BaseHook +from airflow.providers.common.compat.standard.utils import prepare_virtualenv from airflow.providers.google.go_module_utils import init_module, install_dependencies -from airflow.utils.python_virtualenv import prepare_virtualenv if TYPE_CHECKING: import logging diff --git a/providers/src/airflow/providers/apache/spark/decorators/pyspark.py b/providers/src/airflow/providers/apache/spark/decorators/pyspark.py index c460d09f4f27..c44d37d0e092 100644 --- a/providers/src/airflow/providers/apache/spark/decorators/pyspark.py +++ b/providers/src/airflow/providers/apache/spark/decorators/pyspark.py @@ -22,8 +22,8 @@ from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory from airflow.hooks.base import BaseHook -from airflow.operators.python import PythonOperator from airflow.providers.apache.spark.hooks.spark_connect import SparkConnectHook +from airflow.providers.common.compat.standard.operators import PythonOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/src/airflow/providers/celery/executors/celery_executor_utils.py b/providers/src/airflow/providers/celery/executors/celery_executor_utils.py index a7aa6a87ea2c..28630dd1659b 100644 --- a/providers/src/airflow/providers/celery/executors/celery_executor_utils.py +++ b/providers/src/airflow/providers/celery/executors/celery_executor_utils.py @@ -111,12 +111,13 @@ def on_celery_import_modules(*args, **kwargs): import airflow.jobs.local_task_job_runner import airflow.macros - import airflow.operators.python try: import airflow.providers.standard.operators.bash + import airflow.providers.standard.operators.python except ImportError: - import airflow.operators.bash # noqa: F401 + import airflow.operators.bash + import airflow.operators.python # noqa: F401 with contextlib.suppress(ImportError): import numpy # noqa: F401 diff --git a/providers/src/airflow/providers/common/compat/standard/__init__.py b/providers/src/airflow/providers/common/compat/standard/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/providers/src/airflow/providers/common/compat/standard/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/src/airflow/providers/common/compat/standard/operators.py b/providers/src/airflow/providers/common/compat/standard/operators.py new file mode 100644 index 000000000000..1b319e1ff9f9 --- /dev/null +++ b/providers/src/airflow/providers/common/compat/standard/operators.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow import __version__ as AIRFLOW_VERSION + +if TYPE_CHECKING: + from airflow.providers.standard.operators.python import ( + _SERIALIZERS, + PythonOperator, + ShortCircuitOperator, + get_current_context, + ) +else: + try: + from airflow.providers.standard.operators.python import ( + _SERIALIZERS, + PythonOperator, + ShortCircuitOperator, + get_current_context, + ) + except ModuleNotFoundError: + from packaging.version import Version + + _IS_AIRFLOW_2_10_OR_HIGHER = Version(Version(AIRFLOW_VERSION).base_version) >= Version("2.10.0") + + from airflow.operators.python import ( + PythonOperator, + ShortCircuitOperator, + get_current_context, + ) + + if _IS_AIRFLOW_2_10_OR_HIGHER: + from airflow.operators.python import _SERIALIZERS + + +__all__ = ["PythonOperator", "_SERIALIZERS", "ShortCircuitOperator", "get_current_context"] diff --git a/providers/src/airflow/providers/common/compat/standard/utils.py b/providers/src/airflow/providers/common/compat/standard/utils.py new file mode 100644 index 000000000000..bfa263d1be94 --- /dev/null +++ b/providers/src/airflow/providers/common/compat/standard/utils.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script +else: + try: + from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script + except ModuleNotFoundError: + from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script + + +__all__ = ["write_python_script", "prepare_virtualenv"] diff --git a/providers/src/airflow/providers/docker/decorators/docker.py b/providers/src/airflow/providers/docker/decorators/docker.py index 9812e5fc5748..0439599257b1 100644 --- a/providers/src/airflow/providers/docker/decorators/docker.py +++ b/providers/src/airflow/providers/docker/decorators/docker.py @@ -24,8 +24,8 @@ from airflow.decorators.base import DecoratedOperator, task_decorator_factory from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning +from airflow.providers.common.compat.standard.utils import write_python_script from airflow.providers.docker.operators.docker import DockerOperator -from airflow.utils.python_virtualenv import write_python_script if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator @@ -34,7 +34,7 @@ Serializer = Literal["pickle", "dill", "cloudpickle"] try: - from airflow.operators.python import _SERIALIZERS + from airflow.providers.common.compat.standard.operators import _SERIALIZERS except ImportError: import logging diff --git a/providers/src/airflow/providers/edge/example_dags/integration_test.py b/providers/src/airflow/providers/edge/example_dags/integration_test.py index 0aad61d354c0..fb56be3b482d 100644 --- a/providers/src/airflow/providers/edge/example_dags/integration_test.py +++ b/providers/src/airflow/providers/edge/example_dags/integration_test.py @@ -33,7 +33,7 @@ from airflow.models.param import Param from airflow.models.variable import Variable from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator +from airflow.providers.common.compat.standard.operators import PythonOperator try: from airflow.providers.standard.operators.bash import BashOperator diff --git a/providers/src/airflow/providers/google/cloud/utils/mlengine_operator_utils.py b/providers/src/airflow/providers/google/cloud/utils/mlengine_operator_utils.py index 0c94e11c741c..eb128ef28572 100644 --- a/providers/src/airflow/providers/google/cloud/utils/mlengine_operator_utils.py +++ b/providers/src/airflow/providers/google/cloud/utils/mlengine_operator_utils.py @@ -28,9 +28,9 @@ import dill from airflow.exceptions import AirflowException -from airflow.operators.python import PythonOperator from airflow.providers.apache.beam.hooks.beam import BeamRunnerType from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator +from airflow.providers.common.compat.standard.operators import PythonOperator from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.operators.mlengine import MLEngineStartBatchPredictionJobOperator diff --git a/providers/src/airflow/providers/openlineage/provider.yaml b/providers/src/airflow/providers/openlineage/provider.yaml index 5c04f6686351..e89b08e85936 100644 --- a/providers/src/airflow/providers/openlineage/provider.yaml +++ b/providers/src/airflow/providers/openlineage/provider.yaml @@ -87,7 +87,7 @@ config: full import paths of Operators to disable. type: string example: "airflow.providers.standard.operators.bash.BashOperator; - airflow.operators.python.PythonOperator" + airflow.providers.standard.operators.python.PythonOperator" default: "" version_added: 1.1.0 selective_enable: diff --git a/providers/src/airflow/providers/snowflake/operators/snowpark.py b/providers/src/airflow/providers/snowflake/operators/snowpark.py index 1635eebaa348..81ed876f3f67 100644 --- a/providers/src/airflow/providers/snowflake/operators/snowpark.py +++ b/providers/src/airflow/providers/snowflake/operators/snowpark.py @@ -19,7 +19,7 @@ from typing import Any, Callable, Collection, Mapping, Sequence -from airflow.operators.python import PythonOperator, get_current_context +from airflow.providers.common.compat.standard.operators import PythonOperator, get_current_context from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook from airflow.providers.snowflake.utils.snowpark import inject_session_into_op_kwargs diff --git a/airflow/operators/python.py b/providers/src/airflow/providers/standard/operators/python.py similarity index 95% rename from airflow/operators/python.py rename to providers/src/airflow/providers/standard/operators/python.py index 3d40ad2c8450..4d908f6ba680 100644 --- a/airflow/operators/python.py +++ b/providers/src/airflow/providers/standard/operators/python.py @@ -33,10 +33,12 @@ from functools import cache from pathlib import Path from tempfile import TemporaryDirectory -from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Mapping, NamedTuple, Sequence +from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Mapping, NamedTuple, Sequence, cast import lazy_object_proxy +from packaging.version import Version +from airflow import __version__ as airflow_version from airflow.exceptions import ( AirflowConfigException, AirflowException, @@ -49,6 +51,7 @@ from airflow.models.taskinstance import _CURRENT_CONTEXT from airflow.models.variable import Variable from airflow.operators.branch import BranchMixIn +from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script from airflow.settings import _ENABLE_AIP_44 from airflow.typing_compat import Literal from airflow.utils import hashlib_wrapper @@ -56,12 +59,16 @@ from airflow.utils.file import get_unique_dag_module_name from airflow.utils.operator_helpers import ExecutionCallableRunner, KeywordParameters from airflow.utils.process_utils import execute_in_subprocess -from airflow.utils.python_virtualenv import prepare_virtualenv, write_python_script from airflow.utils.session import create_session log = logging.getLogger(__name__) +AIRFLOW_VERSION = Version(airflow_version) +AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0") + if TYPE_CHECKING: + from pendulum.datetime import DateTime + from airflow.serialization.enums import Encoding from airflow.utils.context import Context @@ -77,38 +84,6 @@ def is_venv_installed() -> bool: return False -def task(python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs): - """ - Use :func:`airflow.decorators.task` instead, this is deprecated. - - Calls ``@task.python`` and allows users to turn a Python function into - an Airflow task. - - :param python_callable: A reference to an object that is callable - :param op_kwargs: a dictionary of keyword arguments that will get unpacked - in your function (templated) - :param op_args: a list of positional arguments that will get unpacked when - calling your callable (templated) - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. - Defaults to False. - """ - # To maintain backwards compatibility, we import the task object into this file - # This prevents breakages in dags that use `from airflow.operators.python import task` - from airflow.decorators.python import python_task - - warnings.warn( - """airflow.operators.python.task is deprecated. Please use the following instead - - from airflow.decorators import task - @task - def my_task()""", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs) - - @cache def _parse_version_info(text: str) -> tuple[int, int, int, str, int]: """Parse python version info from a text.""" @@ -210,13 +185,6 @@ def __init__( show_return_value_in_logs: bool = True, **kwargs, ) -> None: - if kwargs.get("provide_context"): - warnings.warn( - "provide_context is deprecated as of 2.0 and is no longer required", - RemovedInAirflow3Warning, - stacklevel=2, - ) - kwargs.pop("provide_context", None) super().__init__(**kwargs) if not callable(python_callable): raise AirflowException("`python_callable` param must be callable") @@ -332,12 +300,20 @@ def get_tasks_to_skip(): self.log.debug("Downstream task IDs %s", to_skip := list(get_tasks_to_skip())) self.log.info("Skipping downstream tasks") + if AIRFLOW_V_3_0_PLUS: + self.skip( + dag_run=dag_run, + tasks=to_skip, + map_index=context["ti"].map_index, + ) + else: + self.skip( + dag_run=dag_run, + tasks=to_skip, + execution_date=cast("DateTime", dag_run.execution_date), # type: ignore[call-arg] + map_index=context["ti"].map_index, + ) - self.skip( - dag_run=dag_run, - tasks=to_skip, - map_index=context["ti"].map_index, - ) self.log.info("Done.") # returns the result of the super execute method as it is instead of returning None return condition @@ -414,6 +390,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): "prev_start_date_success", "prev_end_date_success", } + AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = { "macros", "conf", @@ -421,7 +398,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta): "dag_run", "task", "params", - "triggering_asset_events", + "triggering_asset_events" if AIRFLOW_V_3_0_PLUS else "triggering_dataset_events", } def __init__( @@ -729,12 +706,10 @@ def __init__( f"Sys version: {sys.version_info}. Virtual environment version: {python_version}" ) if python_version is not None and not isinstance(python_version, str): - warnings.warn( - "Passing non-string types (e.g. int or float) as python_version " - "is deprecated. Please use string value instead.", - RemovedInAirflow3Warning, - stacklevel=2, + raise AirflowException( + "Passing non-string types (e.g. int or float) as python_version not supported" ) + if not is_venv_installed(): raise AirflowException("PythonVirtualenvOperator requires virtualenv, please install it.") if use_airflow_context and (not expect_airflow and not system_site_packages): @@ -1165,7 +1140,7 @@ def my_task(**context): .. code:: python - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context def my_task(): diff --git a/providers/src/airflow/providers/standard/provider.yaml b/providers/src/airflow/providers/standard/provider.yaml index 6231e6c3d53b..426a3a67871a 100644 --- a/providers/src/airflow/providers/standard/provider.yaml +++ b/providers/src/airflow/providers/standard/provider.yaml @@ -44,6 +44,7 @@ operators: - airflow.providers.standard.operators.datetime - airflow.providers.standard.operators.weekday - airflow.providers.standard.operators.bash + - airflow.providers.standard.operators.python - airflow.providers.standard.operators.generic_transfer sensors: @@ -54,7 +55,7 @@ sensors: - airflow.providers.standard.sensors.time - airflow.providers.standard.sensors.weekday - airflow.providers.standard.sensors.bash - + - airflow.providers.standard.sensors.python hooks: - integration-name: Standard python-modules: diff --git a/airflow/sensors/python.py b/providers/src/airflow/providers/standard/sensors/python.py similarity index 100% rename from airflow/sensors/python.py rename to providers/src/airflow/providers/standard/sensors/python.py diff --git a/providers/src/airflow/providers/standard/utils/__init__.py b/providers/src/airflow/providers/standard/utils/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/providers/src/airflow/providers/standard/utils/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/utils/python_virtualenv.py b/providers/src/airflow/providers/standard/utils/python_virtualenv.py similarity index 93% rename from airflow/utils/python_virtualenv.py rename to providers/src/airflow/providers/standard/utils/python_virtualenv.py index 3fa0077a9661..00615d70f0f1 100644 --- a/airflow/utils/python_virtualenv.py +++ b/providers/src/airflow/providers/standard/utils/python_virtualenv.py @@ -21,13 +21,11 @@ import os import sys -import warnings from pathlib import Path import jinja2 from jinja2 import select_autoescape -from airflow.utils.decorators import remove_task_decorator as _remove_task_decorator from airflow.utils.process_utils import execute_in_subprocess @@ -62,15 +60,6 @@ def _generate_pip_conf(conf_file: Path, index_urls: list[str]) -> None: conf_file.write_text(f"[global]\n{pip_conf_options}") -def remove_task_decorator(python_source: str, task_decorator_name: str) -> str: - warnings.warn( - "Import remove_task_decorator from airflow.utils.decorators instead", - DeprecationWarning, - stacklevel=2, - ) - return _remove_task_decorator(python_source, task_decorator_name) - - def prepare_virtualenv( venv_directory: str, python_bin: str, diff --git a/airflow/utils/python_virtualenv_script.jinja2 b/providers/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2 similarity index 95% rename from airflow/utils/python_virtualenv_script.jinja2 rename to providers/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2 index 22d68acd755b..54c8412d1d8e 100644 --- a/airflow/utils/python_virtualenv_script.jinja2 +++ b/providers/src/airflow/providers/standard/utils/python_virtualenv_script.jinja2 @@ -69,7 +69,7 @@ if len(sys.argv) > 5: import json from types import ModuleType - from airflow.operators import python as airflow_python + from airflow.providers.standard.operators import python as airflow_python from airflow.serialization.serialized_objects import BaseSerialization @@ -85,7 +85,7 @@ if len(sys.argv) > 5: MockPython = _MockPython("MockPython") - sys.modules["airflow.operators.python"] = MockPython + sys.modules["airflow.providers.standard.operators.python"] = MockPython {% endif %} try: diff --git a/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py b/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py index 45b68d015e4b..f896af6cfe95 100644 --- a/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py +++ b/providers/tests/cncf/kubernetes/log_handlers/test_log_handlers.py @@ -32,7 +32,6 @@ from airflow.models.dag import DAG from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance -from airflow.operators.python import PythonOperator from airflow.utils.log.file_task_handler import ( FileTaskHandler, ) @@ -42,7 +41,7 @@ from airflow.utils.timezone import datetime from airflow.utils.types import DagRunType -from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS, PythonOperator from tests_common.test_utils.config import conf_vars if AIRFLOW_V_3_0_PLUS: diff --git a/providers/tests/google/cloud/utils/test_mlengine_operator_utils.py b/providers/tests/google/cloud/utils/test_mlengine_operator_utils.py index 47c7d8d3ec3d..aae62afb002e 100644 --- a/providers/tests/google/cloud/utils/test_mlengine_operator_utils.py +++ b/providers/tests/google/cloud/utils/test_mlengine_operator_utils.py @@ -26,11 +26,12 @@ from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models import DAG -from airflow.operators.python import PythonOperator from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.utils.mlengine_operator_utils import create_evaluate_ops +from tests_common.test_utils.compat import PythonOperator + TASK_PREFIX = "test-task-prefix" TASK_PREFIX_PREDICTION = TASK_PREFIX + "-prediction" TASK_PREFIX_SUMMARY = TASK_PREFIX + "-summary" diff --git a/providers/tests/openlineage/extractors/test_base.py b/providers/tests/openlineage/extractors/test_base.py index 15c96ac67553..aa1a2467a832 100644 --- a/providers/tests/openlineage/extractors/test_base.py +++ b/providers/tests/openlineage/extractors/test_base.py @@ -26,7 +26,6 @@ from airflow.models.baseoperator import BaseOperator from airflow.models.taskinstance import TaskInstanceState -from airflow.operators.python import PythonOperator from airflow.providers.openlineage.extractors.base import ( BaseExtractor, DefaultExtractor, @@ -35,11 +34,12 @@ from airflow.providers.openlineage.extractors.manager import ExtractorManager from airflow.providers.openlineage.extractors.python import PythonExtractor +from tests_common.test_utils.compat import PythonOperator + if TYPE_CHECKING: from openlineage.client.facet_v2 import RunFacet pytestmark = pytest.mark.db_test - INPUTS = [Dataset(namespace="database://host:port", name="inputtable")] OUTPUTS = [Dataset(namespace="database://host:port", name="inputtable")] RUN_FACETS: dict[str, RunFacet] = { diff --git a/providers/tests/openlineage/extractors/test_manager.py b/providers/tests/openlineage/extractors/test_manager.py index 726d7262c9b6..adba74fd4408 100644 --- a/providers/tests/openlineage/extractors/test_manager.py +++ b/providers/tests/openlineage/extractors/test_manager.py @@ -33,13 +33,12 @@ from airflow.lineage.entities import Column, File, Table, User from airflow.models.baseoperator import BaseOperator from airflow.models.taskinstance import TaskInstance -from airflow.operators.python import PythonOperator from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.extractors.manager import ExtractorManager from airflow.providers.openlineage.utils.utils import Asset from airflow.utils.state import State -from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS +from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS, PythonOperator if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/tests/openlineage/extractors/test_python.py b/providers/tests/openlineage/extractors/test_python.py index 367eb0711dea..6f3ca48c5187 100644 --- a/providers/tests/openlineage/extractors/test_python.py +++ b/providers/tests/openlineage/extractors/test_python.py @@ -28,10 +28,9 @@ from airflow import DAG from airflow.exceptions import AirflowProviderDeprecationWarning -from airflow.operators.python import PythonOperator from airflow.providers.openlineage.extractors.python import PythonExtractor -from tests_common.test_utils.compat import BashOperator +from tests_common.test_utils.compat import BashOperator, PythonOperator pytestmark = pytest.mark.db_test diff --git a/providers/tests/openlineage/plugins/test_listener.py b/providers/tests/openlineage/plugins/test_listener.py index 1c17b5841674..79149324e551 100644 --- a/providers/tests/openlineage/plugins/test_listener.py +++ b/providers/tests/openlineage/plugins/test_listener.py @@ -32,14 +32,13 @@ from airflow.models import DAG, DagRun, TaskInstance from airflow.models.baseoperator import BaseOperator -from airflow.operators.python import PythonOperator from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter from airflow.providers.openlineage.plugins.facets import AirflowDebugRunFacet from airflow.providers.openlineage.plugins.listener import OpenLineageListener from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage from airflow.utils.state import DagRunState, State -from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS +from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS, PythonOperator from tests_common.test_utils.config import conf_vars if AIRFLOW_V_3_0_PLUS: diff --git a/providers/tests/openlineage/plugins/test_utils.py b/providers/tests/openlineage/plugins/test_utils.py index 12f7ed32a825..624bdecb5b45 100644 --- a/providers/tests/openlineage/plugins/test_utils.py +++ b/providers/tests/openlineage/plugins/test_utils.py @@ -50,8 +50,10 @@ from airflow.utils.types import DagRunTriggeredByType BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash" +PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python" if not AIRFLOW_V_2_10_PLUS: BASH_OPERATOR_PATH = "airflow.operators.bash" + PYTHON_OPERATOR_PATH = "airflow.operators.python" class SafeStrDict(dict): @@ -283,7 +285,7 @@ def test_is_operator_disabled(mock_disabled_operators): mock_disabled_operators.return_value = { f"{BASH_OPERATOR_PATH}.BashOperator", - "airflow.operators.python.PythonOperator", + f"{PYTHON_OPERATOR_PATH}.PythonOperator", } assert is_operator_disabled(op) is True diff --git a/providers/tests/openlineage/utils/test_utils.py b/providers/tests/openlineage/utils/test_utils.py index cc8c20ac0a28..f4e286331a51 100644 --- a/providers/tests/openlineage/utils/test_utils.py +++ b/providers/tests/openlineage/utils/test_utils.py @@ -28,7 +28,6 @@ from airflow.models.mappedoperator import MappedOperator from airflow.models.taskinstance import TaskInstance, TaskInstanceState from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator from airflow.providers.openlineage.plugins.facets import AirflowDagRunFacet, AirflowJobFacet from airflow.providers.openlineage.utils.utils import ( _get_task_groups_details, @@ -44,12 +43,14 @@ from airflow.utils.task_group import TaskGroup from airflow.utils.types import DagRunType -from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS, BashOperator +from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS, BashOperator, PythonOperator from tests_common.test_utils.mock_operators import MockOperator BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash" +PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python" if not AIRFLOW_V_2_10_PLUS: BASH_OPERATOR_PATH = "airflow.operators.bash" + PYTHON_OPERATOR_PATH = "airflow.operators.python" class CustomOperatorForTest(BashOperator): @@ -97,7 +98,7 @@ def test_get_airflow_job_facet(): "downstream_task_ids": ["section_1.task_3"], }, "section_1.task_3": { - "operator": "airflow.operators.python.PythonOperator", + "operator": f"{PYTHON_OPERATOR_PATH}.PythonOperator", "task_group": "section_1", "emits_ol_events": True, "ui_color": "#ffefeb", @@ -351,7 +352,7 @@ def sum_values(values: list[int]) -> int: ], }, "task_2": { - "operator": "airflow.operators.python.PythonOperator", + "operator": f"{PYTHON_OPERATOR_PATH}.PythonOperator", "task_group": None, "emits_ol_events": True, "ui_color": PythonOperator.ui_color, @@ -416,7 +417,7 @@ def sum_values(values: list[int]) -> int: ], }, "section_1.task_3": { - "operator": "airflow.operators.python.PythonOperator", + "operator": f"{PYTHON_OPERATOR_PATH}.PythonOperator", "task_group": "section_1", "emits_ol_events": True, "ui_color": PythonOperator.ui_color, @@ -440,7 +441,7 @@ def sum_values(values: list[int]) -> int: "downstream_task_ids": [], }, "section_1.section_2.section_3.task_12": { - "operator": "airflow.operators.python.PythonOperator", + "operator": f"{PYTHON_OPERATOR_PATH}.PythonOperator", "task_group": "section_1.section_2.section_3", "emits_ol_events": True, "ui_color": PythonOperator.ui_color, diff --git a/tests/operators/test_python.py b/providers/tests/standard/operators/test_python.py similarity index 98% rename from tests/operators/test_python.py rename to providers/tests/standard/operators/test_python.py index b759e9a60054..1fd422d6c4f6 100644 --- a/tests/operators/test_python.py +++ b/providers/tests/standard/operators/test_python.py @@ -19,6 +19,7 @@ import copy import logging +import logging.config import os import pickle import re @@ -38,6 +39,7 @@ import pytest from slugify import slugify +from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.decorators import task_group from airflow.exceptions import ( AirflowException, @@ -48,7 +50,7 @@ from airflow.models.dag import DAG from airflow.models.taskinstance import TaskInstance, clear_task_instances, set_current_context from airflow.operators.empty import EmptyOperator -from airflow.operators.python import ( +from airflow.providers.standard.operators.python import ( BranchExternalPythonOperator, BranchPythonOperator, BranchPythonVirtualenvOperator, @@ -60,10 +62,10 @@ _PythonVersionInfo, get_current_context, ) +from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv from airflow.settings import _ENABLE_AIP_44 from airflow.utils import timezone from airflow.utils.context import AirflowContextDeprecationWarning, Context -from airflow.utils.python_virtualenv import prepare_virtualenv from airflow.utils.session import create_session from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.trigger_rule import TriggerRule @@ -76,6 +78,7 @@ if AIRFLOW_V_3_0_PLUS: from airflow.utils.types import DagRunTriggeredByType + if TYPE_CHECKING: from airflow.models.dagrun import DagRun @@ -189,6 +192,9 @@ def render_templates(self, fn, **kwargs): class TestPythonOperator(BasePythonTest): opcls = PythonOperator + def setup_method(self): + logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) + @pytest.fixture(autouse=True) def setup_tests(self): self.run = False @@ -293,7 +299,9 @@ def func(custom, dag): assert 1 == custom, "custom should be 1" assert dag is not None, "dag should be set" - with pytest.warns(RemovedInAirflow3Warning): + error_message = "Invalid arguments were passed to PythonOperator \\(task_id: task_test-provide-context-does-not-fail\\). Invalid arguments were:\n\\*\\*kwargs: {'provide_context': True}" + + with pytest.raises((TypeError, AirflowException), match=error_message): self.run_as_task(func, op_kwargs={"custom": 1}, provide_context=True) def test_context_with_conflicting_op_args(self): @@ -353,7 +361,7 @@ def func(): def test_python_operator_has_default_logger_name(self): python_operator = PythonOperator(task_id="task", python_callable=partial(int, 2)) - logger_name: str = "airflow.task.operators.airflow.operators.python.PythonOperator" + logger_name: str = "airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator" assert python_operator.log.name == logger_name def test_custom_logger_name_is_correctly_set(self): @@ -1022,7 +1030,7 @@ def f(): @USE_AIRFLOW_CONTEXT_MARKER def test_current_context(self): def f(): - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context from airflow.utils.context import Context context = get_current_context() @@ -1038,7 +1046,7 @@ def f(): @USE_AIRFLOW_CONTEXT_MARKER def test_current_context_not_found_error(self): def f(): - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context get_current_context() return [] @@ -1060,7 +1068,7 @@ def test_current_context_airflow_not_found_error(self): error_msg = "use_airflow_context is set to True, but expect_airflow and system_site_packages are set to False." def f(): - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context get_current_context() return [] @@ -1073,7 +1081,7 @@ def f(): @USE_AIRFLOW_CONTEXT_MARKER def test_use_airflow_context_touch_other_variables(self): def f(): - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context from airflow.utils.context import Context context = get_current_context() @@ -1081,8 +1089,6 @@ def f(): error_msg = f"Expected Context, got {type(context)}" raise TypeError(error_msg) - from airflow.operators.python import PythonOperator # noqa: F401 - return [] ti = self.run_as_task(f, return_ti=True, multiple_outputs=False, use_airflow_context=True) @@ -1091,7 +1097,7 @@ def f(): @pytest.mark.skipif(_ENABLE_AIP_44, reason="AIP-44 is enabled") def test_use_airflow_context_without_aip_44_error(self): def f(): - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context get_current_context() return [] @@ -1124,7 +1130,7 @@ def default_kwargs(*, python_version=DEFAULT_PYTHON_VERSION, **kwargs): return kwargs @mock.patch("shutil.which") - @mock.patch("airflow.operators.python.importlib") + @mock.patch("airflow.providers.standard.operators.python.importlib") def test_virtualenv_not_installed(self, importlib_mock, which_mock): which_mock.return_value = None importlib_mock.util.find_spec.return_value = None @@ -1270,7 +1276,7 @@ def f(): self.run_as_operator(f, requirements="requirements.txt", system_site_packages=False) - @mock.patch("airflow.operators.python.prepare_virtualenv") + @mock.patch("airflow.providers.standard.operators.python.prepare_virtualenv") def test_pip_install_options(self, mocked_prepare_virtualenv): def f(): import funcsigs # noqa: F401 @@ -1335,10 +1341,7 @@ def f(): return raise RuntimeError - with pytest.warns( - RemovedInAirflow3Warning, match="Passing non-string types.*python_version is deprecated" - ): - self.run_as_task(f, python_version=3, serializer=serializer, requirements=extra_requirements) + self.run_as_task(f, python_version="3", serializer=serializer, requirements=extra_requirements) def test_with_default(self): def f(a): @@ -1524,7 +1527,7 @@ def f( @USE_AIRFLOW_CONTEXT_MARKER def test_current_context_system_site_packages(self, session): def f(): - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context from airflow.utils.context import Context context = get_current_context() @@ -1554,6 +1557,9 @@ def f(): class TestExternalPythonOperator(BaseTestPythonVirtualenvOperator): opcls = ExternalPythonOperator + def setup_method(self): + logging.config.dictConfig(DEFAULT_LOGGING_CONFIG) + @staticmethod def default_kwargs(*, python_version=DEFAULT_PYTHON_VERSION, **kwargs): kwargs["python"] = sys.executable @@ -1867,7 +1873,7 @@ def default_kwargs(*, python_version=DEFAULT_PYTHON_VERSION, **kwargs): @USE_AIRFLOW_CONTEXT_MARKER def test_current_context_system_site_packages(self, session): def f(): - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context from airflow.utils.context import Context context = get_current_context() diff --git a/tests/sensors/test_python.py b/providers/tests/standard/sensors/test_python.py similarity index 95% rename from tests/sensors/test_python.py rename to providers/tests/standard/sensors/test_python.py index 0a898b339108..2d5dc570031d 100644 --- a/tests/sensors/test_python.py +++ b/providers/tests/standard/sensors/test_python.py @@ -23,10 +23,10 @@ import pytest from airflow.exceptions import AirflowSensorTimeout +from airflow.providers.standard.sensors.python import PythonSensor from airflow.sensors.base import PokeReturnValue -from airflow.sensors.python import PythonSensor -from tests.operators.test_python import BasePythonTest +from providers.tests.standard.operators.test_python import BasePythonTest pytestmark = pytest.mark.db_test diff --git a/providers/tests/standard/utils/__init__.py b/providers/tests/standard/utils/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/providers/tests/standard/utils/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/utils/test_python_virtualenv.py b/providers/tests/standard/utils/test_python_virtualenv.py similarity index 92% rename from tests/utils/test_python_virtualenv.py rename to providers/tests/standard/utils/test_python_virtualenv.py index 38cda4854baf..b5f8321d1bda 100644 --- a/tests/utils/test_python_virtualenv.py +++ b/providers/tests/standard/utils/test_python_virtualenv.py @@ -23,8 +23,8 @@ import pytest +from airflow.providers.standard.utils.python_virtualenv import _generate_pip_conf, prepare_virtualenv from airflow.utils.decorators import remove_task_decorator -from airflow.utils.python_virtualenv import _generate_pip_conf, prepare_virtualenv class TestPrepareVirtualenv: @@ -60,7 +60,7 @@ def test_generate_pip_conf( for term in unexpected_pip_conf_content: assert term not in generated_conf - @mock.patch("airflow.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") def test_should_create_virtualenv(self, mock_execute_in_subprocess): python_bin = prepare_virtualenv( venv_directory="/VENV", python_bin="pythonVER", system_site_packages=False, requirements=[] @@ -70,7 +70,7 @@ def test_should_create_virtualenv(self, mock_execute_in_subprocess): [sys.executable, "-m", "virtualenv", "/VENV", "--python=pythonVER"] ) - @mock.patch("airflow.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") def test_should_create_virtualenv_with_system_packages(self, mock_execute_in_subprocess): python_bin = prepare_virtualenv( venv_directory="/VENV", python_bin="pythonVER", system_site_packages=True, requirements=[] @@ -80,7 +80,7 @@ def test_should_create_virtualenv_with_system_packages(self, mock_execute_in_sub [sys.executable, "-m", "virtualenv", "/VENV", "--system-site-packages", "--python=pythonVER"] ) - @mock.patch("airflow.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") def test_pip_install_options(self, mock_execute_in_subprocess): pip_install_options = ["--no-deps"] python_bin = prepare_virtualenv( @@ -99,7 +99,7 @@ def test_pip_install_options(self, mock_execute_in_subprocess): ["/VENV/bin/pip", "install", *pip_install_options, "apache-beam[gcp]"] ) - @mock.patch("airflow.utils.python_virtualenv.execute_in_subprocess") + @mock.patch("airflow.providers.standard.utils.python_virtualenv.execute_in_subprocess") def test_should_create_virtualenv_with_extra_packages(self, mock_execute_in_subprocess): python_bin = prepare_virtualenv( venv_directory="/VENV", diff --git a/providers/tests/system/amazon/aws/example_s3.py b/providers/tests/system/amazon/aws/example_s3.py index a9c93fdd5beb..30ca003d6ccb 100644 --- a/providers/tests/system/amazon/aws/example_s3.py +++ b/providers/tests/system/amazon/aws/example_s3.py @@ -20,7 +20,6 @@ from airflow.models.baseoperator import chain from airflow.models.dag import DAG -from airflow.operators.python import BranchPythonOperator from airflow.providers.amazon.aws.operators.s3 import ( S3CopyObjectOperator, S3CreateBucketOperator, @@ -35,6 +34,7 @@ S3PutBucketTaggingOperator, ) from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor, S3KeysUnchangedSensor +from airflow.providers.standard.operators.python import BranchPythonOperator from airflow.utils.trigger_rule import TriggerRule from providers.tests.system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder diff --git a/providers/tests/system/amazon/aws/example_sagemaker.py b/providers/tests/system/amazon/aws/example_sagemaker.py index 3098ac647ea7..ca0c92ed4628 100644 --- a/providers/tests/system/amazon/aws/example_sagemaker.py +++ b/providers/tests/system/amazon/aws/example_sagemaker.py @@ -28,7 +28,6 @@ from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.models.dag import DAG -from airflow.operators.python import get_current_context from airflow.providers.amazon.aws.operators.s3 import ( S3CreateBucketOperator, S3CreateObjectOperator, @@ -52,6 +51,7 @@ SageMakerTransformSensor, SageMakerTuningSensor, ) +from airflow.providers.standard.operators.python import get_current_context from airflow.utils.trigger_rule import TriggerRule from providers.tests.system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder, prune_logs diff --git a/providers/tests/system/apache/kafka/example_dag_event_listener.py b/providers/tests/system/apache/kafka/example_dag_event_listener.py index 734cb9c49554..2cabdde66eb3 100644 --- a/providers/tests/system/apache/kafka/example_dag_event_listener.py +++ b/providers/tests/system/apache/kafka/example_dag_event_listener.py @@ -26,12 +26,12 @@ # Connections needed for this example dag to finish from airflow.models import Connection +from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator +from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageTriggerFunctionSensor # This is just for setting up connections in the demo - you should use standard # methods for setting these connections in production -from airflow.operators.python import PythonOperator -from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator -from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageTriggerFunctionSensor +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils import db diff --git a/providers/tests/system/apache/kafka/example_dag_hello_kafka.py b/providers/tests/system/apache/kafka/example_dag_hello_kafka.py index 88fef458ea26..0759a4565277 100644 --- a/providers/tests/system/apache/kafka/example_dag_hello_kafka.py +++ b/providers/tests/system/apache/kafka/example_dag_hello_kafka.py @@ -22,14 +22,14 @@ from datetime import datetime, timedelta from airflow import DAG - -# This is just for setting up connections in the demo - you should use standard -# methods for setting these connections in production -from airflow.operators.python import PythonOperator from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor +# This is just for setting up connections in the demo - you should use standard +# methods for setting these connections in production +from airflow.providers.standard.operators.python import PythonOperator + default_args = { "owner": "airflow", "depend_on_past": False, diff --git a/providers/tests/system/docker/example_docker_copy_data.py b/providers/tests/system/docker/example_docker_copy_data.py index 584b16b8e096..7da16636ca36 100644 --- a/providers/tests/system/docker/example_docker_copy_data.py +++ b/providers/tests/system/docker/example_docker_copy_data.py @@ -32,9 +32,9 @@ from docker.types import Mount from airflow import models -from airflow.operators.python import ShortCircuitOperator from airflow.providers.docker.operators.docker import DockerOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import ShortCircuitOperator ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "docker_sample_copy_data" diff --git a/providers/tests/system/elasticsearch/example_elasticsearch_query.py b/providers/tests/system/elasticsearch/example_elasticsearch_query.py index f4450b8e335c..a5744bffcdd8 100644 --- a/providers/tests/system/elasticsearch/example_elasticsearch_query.py +++ b/providers/tests/system/elasticsearch/example_elasticsearch_query.py @@ -25,8 +25,8 @@ from airflow import models from airflow.decorators import task -from airflow.operators.python import PythonOperator from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook, ElasticsearchSQLHook +from airflow.providers.standard.operators.python import PythonOperator ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "elasticsearch_dag" diff --git a/providers/tests/system/google/cloud/cloud_batch/example_cloud_batch.py b/providers/tests/system/google/cloud/cloud_batch/example_cloud_batch.py index 322c6cd7ecea..901579a9966d 100644 --- a/providers/tests/system/google/cloud/cloud_batch/example_cloud_batch.py +++ b/providers/tests/system/google/cloud/cloud_batch/example_cloud_batch.py @@ -27,13 +27,13 @@ from google.cloud import batch_v1 from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator from airflow.providers.google.cloud.operators.cloud_batch import ( CloudBatchDeleteJobOperator, CloudBatchListJobsOperator, CloudBatchListTasksOperator, CloudBatchSubmitJobOperator, ) +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID diff --git a/providers/tests/system/google/cloud/cloud_run/example_cloud_run.py b/providers/tests/system/google/cloud/cloud_run/example_cloud_run.py index 368d835ddb6a..4aee6882171e 100644 --- a/providers/tests/system/google/cloud/cloud_run/example_cloud_run.py +++ b/providers/tests/system/google/cloud/cloud_run/example_cloud_run.py @@ -27,7 +27,6 @@ from google.cloud.run_v2.types import k8s_min from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator from airflow.providers.google.cloud.operators.cloud_run import ( CloudRunCreateJobOperator, CloudRunDeleteJobOperator, @@ -35,6 +34,7 @@ CloudRunListJobsOperator, CloudRunUpdateJobOperator, ) +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") diff --git a/providers/tests/system/google/cloud/gcs/example_gcs_to_gcs.py b/providers/tests/system/google/cloud/gcs/example_gcs_to_gcs.py index c4c812868efa..48f4657dd091 100644 --- a/providers/tests/system/google/cloud/gcs/example_gcs_to_gcs.py +++ b/providers/tests/system/google/cloud/gcs/example_gcs_to_gcs.py @@ -29,7 +29,6 @@ from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator from airflow.providers.google.cloud.operators.gcs import ( GCSCreateBucketOperator, GCSDeleteBucketOperator, @@ -39,6 +38,7 @@ from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.trigger_rule import TriggerRule from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID diff --git a/tests/core/test_core.py b/tests/core/test_core.py index bfe106ef06e9..21b5037ed17d 100644 --- a/tests/core/test_core.py +++ b/tests/core/test_core.py @@ -26,8 +26,8 @@ from airflow.models import TaskInstance from airflow.models.baseoperator import BaseOperator from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.timezone import datetime from airflow.utils.types import DagRunType diff --git a/tests/core/test_example_dags_system.py b/tests/core/test_example_dags_system.py index dfa6b81027b2..5724240eaebf 100644 --- a/tests/core/test_example_dags_system.py +++ b/tests/core/test_example_dags_system.py @@ -24,11 +24,11 @@ from sqlalchemy import select from airflow.models import DagRun -from airflow.operators.python import PythonOperator from airflow.utils.module_loading import import_string from airflow.utils.state import DagRunState from airflow.utils.trigger_rule import TriggerRule +from tests_common.test_utils.compat import PythonOperator from tests_common.test_utils.system_tests import get_test_run from tests_common.test_utils.system_tests_class import SystemTest diff --git a/tests/core/test_sentry.py b/tests/core/test_sentry.py index a8cb04577b4e..7ec82fd5a56f 100644 --- a/tests/core/test_sentry.py +++ b/tests/core/test_sentry.py @@ -26,7 +26,7 @@ from sentry_sdk import configure_scope from sentry_sdk.transport import Transport -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils import timezone from airflow.utils.module_loading import import_string from airflow.utils.state import State diff --git a/tests/dags/test_assets.py b/tests/dags/test_assets.py index 2f2917dd6693..30a6e3f147a5 100644 --- a/tests/dags/test_assets.py +++ b/tests/dags/test_assets.py @@ -22,8 +22,8 @@ from airflow.assets import Asset from airflow.exceptions import AirflowFailException, AirflowSkipException from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator skip_task_dag_asset = Asset("s3://dag_with_skip_task/output_1.txt", extra={"hi": "bye"}) fail_task_dag_asset = Asset("s3://dag_with_fail_task/output_1.txt", extra={"hi": "bye"}) diff --git a/tests/dags/test_cli_triggered_dags.py b/tests/dags/test_cli_triggered_dags.py index 4dad87c94754..530c80f3f7ca 100644 --- a/tests/dags/test_cli_triggered_dags.py +++ b/tests/dags/test_cli_triggered_dags.py @@ -20,7 +20,7 @@ from datetime import timedelta from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.timezone import datetime DEFAULT_DATE = datetime(2016, 1, 1) diff --git a/tests/dags/test_dag_xcom_openlineage.py b/tests/dags/test_dag_xcom_openlineage.py index 6fec2c219094..ae2ef72f7ecb 100644 --- a/tests/dags/test_dag_xcom_openlineage.py +++ b/tests/dags/test_dag_xcom_openlineage.py @@ -20,7 +20,7 @@ import datetime from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator dag = DAG( dag_id="test_dag_xcom_openlineage", diff --git a/tests/dags/test_dagrun_fast_follow.py b/tests/dags/test_dagrun_fast_follow.py index 1053869d81ed..de9e44bd44a1 100644 --- a/tests/dags/test_dagrun_fast_follow.py +++ b/tests/dags/test_dagrun_fast_follow.py @@ -20,7 +20,7 @@ from datetime import datetime from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator DEFAULT_DATE = datetime(2016, 1, 1) diff --git a/tests/dags/test_future_start_date.py b/tests/dags/test_future_start_date.py index dadfbff600f6..b72d2865fd33 100644 --- a/tests/dags/test_future_start_date.py +++ b/tests/dags/test_future_start_date.py @@ -23,7 +23,7 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator exec_date = pendulum.datetime(2021, 1, 1) fut_start_date = pendulum.datetime(2021, 2, 1) diff --git a/tests/dags/test_invalid_param.py b/tests/dags/test_invalid_param.py index 9f5fdfa05f66..fb0d3c854d12 100644 --- a/tests/dags/test_invalid_param.py +++ b/tests/dags/test_invalid_param.py @@ -20,7 +20,7 @@ from airflow.models.dag import DAG from airflow.models.param import Param -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator with DAG( "test_invalid_param", diff --git a/tests/dags/test_invalid_param2.py b/tests/dags/test_invalid_param2.py index 7ed9d5c443c1..69ffda442301 100644 --- a/tests/dags/test_invalid_param2.py +++ b/tests/dags/test_invalid_param2.py @@ -20,7 +20,7 @@ from airflow import DAG from airflow.models.param import Param -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator with DAG( "test_invalid_param2", diff --git a/tests/dags/test_invalid_param3.py b/tests/dags/test_invalid_param3.py index 67cb1bbff7ce..a8017a3402b6 100644 --- a/tests/dags/test_invalid_param3.py +++ b/tests/dags/test_invalid_param3.py @@ -20,7 +20,7 @@ from airflow import DAG from airflow.models.param import Param -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator with DAG( "test_invalid_param3", diff --git a/tests/dags/test_invalid_param4.py b/tests/dags/test_invalid_param4.py index 7c6354af0dd6..bbfc7e970c51 100644 --- a/tests/dags/test_invalid_param4.py +++ b/tests/dags/test_invalid_param4.py @@ -20,7 +20,7 @@ from airflow import DAG from airflow.models.param import Param -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator with DAG( "test_invalid_param4", diff --git a/tests/dags/test_logging_in_dag.py b/tests/dags/test_logging_in_dag.py index 2a1a0a095bc5..e56c4d4a7431 100644 --- a/tests/dags/test_logging_in_dag.py +++ b/tests/dags/test_logging_in_dag.py @@ -20,7 +20,7 @@ import logging from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.timezone import datetime logger = logging.getLogger(__name__) diff --git a/tests/dags/test_mapped_classic.py b/tests/dags/test_mapped_classic.py index fec7e98a8934..4dcdd357feca 100644 --- a/tests/dags/test_mapped_classic.py +++ b/tests/dags/test_mapped_classic.py @@ -20,7 +20,7 @@ from airflow.decorators import task from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator @task diff --git a/tests/dags/test_mark_state.py b/tests/dags/test_mark_state.py index da520552e1c0..3ce67b7cef20 100644 --- a/tests/dags/test_mark_state.py +++ b/tests/dags/test_mark_state.py @@ -22,7 +22,7 @@ from time import sleep from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timezone import utcnow diff --git a/tests/dags/test_on_failure_callback.py b/tests/dags/test_on_failure_callback.py index f6765a369809..1e0f276e4aee 100644 --- a/tests/dags/test_on_failure_callback.py +++ b/tests/dags/test_on_failure_callback.py @@ -21,8 +21,8 @@ from airflow.exceptions import AirflowFailException from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator DEFAULT_DATE = datetime(2016, 1, 1) diff --git a/tests/dags/test_task_view_type_check.py b/tests/dags/test_task_view_type_check.py index f3414d4ac3fe..e5e68fde9088 100644 --- a/tests/dags/test_task_view_type_check.py +++ b/tests/dags/test_task_view_type_check.py @@ -26,7 +26,7 @@ from datetime import datetime from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator logger = logging.getLogger(__name__) DEFAULT_DATE = datetime(2016, 1, 1) diff --git a/tests/dags/test_valid_param.py b/tests/dags/test_valid_param.py index 67a6b4e81491..afa0f98ce21d 100644 --- a/tests/dags/test_valid_param.py +++ b/tests/dags/test_valid_param.py @@ -20,7 +20,7 @@ from airflow import DAG from airflow.models.param import Param -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator with DAG( "test_valid_param", diff --git a/tests/dags/test_valid_param2.py b/tests/dags/test_valid_param2.py index 9767357d3b3e..d59d6278c3a7 100644 --- a/tests/dags/test_valid_param2.py +++ b/tests/dags/test_valid_param2.py @@ -20,7 +20,7 @@ from airflow import DAG from airflow.models.param import Param -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator with DAG( "test_valid_param2", diff --git a/tests/dags_corrupted/test_impersonation_custom.py b/tests/dags_corrupted/test_impersonation_custom.py index 03a6e3ef7d27..832c1c0820ba 100644 --- a/tests/dags_corrupted/test_impersonation_custom.py +++ b/tests/dags_corrupted/test_impersonation_custom.py @@ -29,7 +29,7 @@ from fake_datetime import FakeDatetime from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator DEFAULT_DATE = datetime(2016, 1, 1) diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 0e030dcbdcb7..44fa54d7dd80 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -40,7 +40,7 @@ from airflow.utils.types import DagRunType from airflow.utils.xcom import XCOM_RETURN_KEY -from tests.operators.test_python import BasePythonTest +from providers.tests.standard.operators.test_python import BasePythonTest from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS if AIRFLOW_V_3_0_PLUS: diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 0120e71daf7e..7ff6fb1f391a 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -44,7 +44,7 @@ from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import TaskInstance from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.task.task_runner.standard_task_runner import StandardTaskRunner from airflow.utils import timezone from airflow.utils.net import get_hostname diff --git a/tests/jobs/test_triggerer_job.py b/tests/jobs/test_triggerer_job.py index 3ad03b0c35b7..939c210dfe75 100644 --- a/tests/jobs/test_triggerer_job.py +++ b/tests/jobs/test_triggerer_job.py @@ -36,7 +36,7 @@ from airflow.models.baseoperator import BaseOperator from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.triggers.base import TriggerEvent from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.triggers.testing import FailureTrigger, SuccessTrigger diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py index c7bfb3697096..a67590e91a57 100644 --- a/tests/models/test_backfill.py +++ b/tests/models/test_backfill.py @@ -35,7 +35,7 @@ _cancel_backfill, _create_backfill, ) -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.ti_deps.dep_context import DepContext from airflow.utils import timezone from airflow.utils.state import DagRunState, TaskInstanceState diff --git a/tests/models/test_baseoperatormeta.py b/tests/models/test_baseoperatormeta.py index 5244e86b2c38..6f27b4daac87 100644 --- a/tests/models/test_baseoperatormeta.py +++ b/tests/models/test_baseoperatormeta.py @@ -27,7 +27,7 @@ from airflow.decorators import task from airflow.exceptions import AirflowException, AirflowRescheduleException, AirflowSkipException from airflow.models.baseoperator import BaseOperator, ExecutorSafeguard -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils import timezone from airflow.utils.state import DagRunState, State diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py index 6f7fb49ea45c..2afccf8b4677 100644 --- a/tests/models/test_cleartasks.py +++ b/tests/models/test_cleartasks.py @@ -30,7 +30,7 @@ from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.models.taskreschedule import TaskReschedule from airflow.operators.empty import EmptyOperator -from airflow.sensors.python import PythonSensor +from airflow.providers.standard.sensors.python import PythonSensor from airflow.utils.session import create_session from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.types import DagRunType diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 4c1d8a67960c..218f635ad91d 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -67,8 +67,8 @@ from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import TaskInstance as TI from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.sdk import TaskGroup from airflow.sdk.definitions.contextmanager import TaskGroupContext from airflow.security import permissions diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py index f563c72f5451..63949fe9c160 100644 --- a/tests/models/test_dagbag.py +++ b/tests/models/test_dagbag.py @@ -629,7 +629,7 @@ def test_timeout_dag_errors_are_import_errors(self, tmp_path, caplog): import time import airflow -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator time.sleep(1) diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index e310d9ac2f86..e8889fb10223 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -37,8 +37,8 @@ from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator, ShortCircuitOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator, ShortCircuitOperator from airflow.serialization.serialized_objects import SerializedDAG from airflow.stats import Stats from airflow.triggers.base import StartTriggerArgs @@ -2012,7 +2012,7 @@ def test_ti_scheduling_mapped_zero_length(dag_maker, session): @pytest.mark.parametrize("trigger_rule", [TriggerRule.ALL_DONE, TriggerRule.ALL_SUCCESS]) def test_mapped_task_upstream_failed(dag_maker, session, trigger_rule): - from airflow.operators.python import PythonOperator + from airflow.providers.standard.operators.python import PythonOperator with dag_maker(session=session) as dag: diff --git a/tests/models/test_mappedoperator.py b/tests/models/test_mappedoperator.py index 98fa304917b1..16f80a567559 100644 --- a/tests/models/test_mappedoperator.py +++ b/tests/models/test_mappedoperator.py @@ -36,7 +36,7 @@ from airflow.models.taskinstance import TaskInstance from airflow.models.taskmap import TaskMap from airflow.models.xcom_arg import XComArg -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.state import TaskInstanceState from airflow.utils.task_group import TaskGroup from airflow.utils.task_instance_session import set_current_task_instance_session @@ -645,7 +645,7 @@ def execute(self, context): def _create_mapped_with_name_template_taskflow(*, task_id, map_names, template): - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context @task(task_id=task_id, map_index_template=template) def task1(map_name): @@ -671,7 +671,7 @@ def execute(self, context): def _create_named_map_index_renders_on_failure_taskflow(*, task_id, map_names, template): - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context @task(task_id=task_id, map_index_template=template) def task1(map_name): diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index 81421dca1711..62f4be3e7405 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -33,8 +33,8 @@ from airflow.decorators import task as task_decorator from airflow.models import DagRun, Variable from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF -from airflow.operators.python import PythonOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.task_instance_session import set_current_task_instance_session from airflow.utils.timezone import datetime diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index ef213f438377..ccd19ad3272f 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -77,10 +77,10 @@ from airflow.models.xcom import LazyXComSelectSequence, XCom from airflow.notifications.basenotifier import BaseNotifier from airflow.operators.empty import EmptyOperator -from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import BranchPythonOperator, PythonOperator +from airflow.providers.standard.sensors.python import PythonSensor from airflow.sensors.base import BaseSensorOperator -from airflow.sensors.python import PythonSensor from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG from airflow.settings import TIMEZONE, TracebackSessionForTests from airflow.stats import Stats @@ -3828,7 +3828,7 @@ def f(*args, **kwargs): @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_get_current_context_works_in_template(self, dag_maker): def user_defined_macro(): - from airflow.operators.python import get_current_context + from airflow.providers.standard.operators.python import get_current_context get_current_context() diff --git a/tests/models/test_xcom_arg.py b/tests/models/test_xcom_arg.py index f38414ef3001..c12ec305f239 100644 --- a/tests/models/test_xcom_arg.py +++ b/tests/models/test_xcom_arg.py @@ -19,8 +19,8 @@ import pytest from airflow.models.xcom_arg import XComArg -from airflow.operators.python import PythonOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.types import NOTSET from tests_common.test_utils.db import clear_db_dags, clear_db_runs diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index bd4d827b3fcf..43911c1a41d4 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -36,8 +36,8 @@ from airflow.models.serialized_dag import SerializedDagModel from airflow.models.xcom_arg import XComArg from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.sensors.time import TimeSensor from airflow.sensors.external_task import ( ExternalTaskMarker, diff --git a/tests/serialization/test_serialized_objects.py b/tests/serialization/test_serialized_objects.py index 1a20cc797594..81ba206af467 100644 --- a/tests/serialization/test_serialized_objects.py +++ b/tests/serialization/test_serialized_objects.py @@ -49,7 +49,7 @@ from airflow.models.tasklog import LogTemplate from airflow.models.xcom_arg import XComArg from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding from airflow.serialization.pydantic.asset import AssetEventPydantic, AssetPydantic from airflow.serialization.pydantic.dag import DagModelPydantic, DagTagPydantic diff --git a/tests/ti_deps/deps/test_not_previously_skipped_dep.py b/tests/ti_deps/deps/test_not_previously_skipped_dep.py index 810e556a9fbc..3d7adca3b97d 100644 --- a/tests/ti_deps/deps/test_not_previously_skipped_dep.py +++ b/tests/ti_deps/deps/test_not_previously_skipped_dep.py @@ -22,7 +22,7 @@ from airflow.models import DagRun, TaskInstance from airflow.operators.empty import EmptyOperator -from airflow.operators.python import BranchPythonOperator +from airflow.providers.standard.operators.python import BranchPythonOperator from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep from airflow.utils.state import State diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index b760e69fd7e1..fd8731be5d96 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -31,7 +31,7 @@ from airflow import settings from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.models.tasklog import LogTemplate -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.timetables.base import DataInterval from airflow.utils import timezone from airflow.utils.log.log_reader import TaskLogReader diff --git a/tests/utils/test_db_cleanup.py b/tests/utils/test_db_cleanup.py index c957f77a94b2..c05e0ceb5050 100644 --- a/tests/utils/test_db_cleanup.py +++ b/tests/utils/test_db_cleanup.py @@ -32,7 +32,7 @@ from airflow.exceptions import AirflowException from airflow.models import DagModel, DagRun, TaskInstance -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils import timezone from airflow.utils.db_cleanup import ( ARCHIVE_TABLE_PREFIX, diff --git a/tests/utils/test_dot_renderer.py b/tests/utils/test_dot_renderer.py index d8b80c0ba3f4..0bf6f75fa6d0 100644 --- a/tests/utils/test_dot_renderer.py +++ b/tests/utils/test_dot_renderer.py @@ -24,7 +24,6 @@ from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator from airflow.serialization.dag_dependency import DagDependency from airflow.utils import dot_renderer, timezone from airflow.utils.state import State @@ -33,6 +32,11 @@ from tests_common.test_utils.compat import BashOperator from tests_common.test_utils.db import clear_db_dags +try: + from airflow.providers.standard.operators.python import PythonOperator +except ImportError: + from airflow.operators.python import PythonOperator # type: ignore[no-redef,attr-defined] + START_DATE = timezone.utcnow() pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] diff --git a/tests/utils/test_edgemodifier.py b/tests/utils/test_edgemodifier.py index 89644180c27d..41c08f4c2d60 100644 --- a/tests/utils/test_edgemodifier.py +++ b/tests/utils/test_edgemodifier.py @@ -23,7 +23,7 @@ from airflow.models.dag import DAG from airflow.models.xcom_arg import XComArg from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.edgemodifier import Label from airflow.utils.task_group import TaskGroup from airflow.www.views import dag_edges diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 190197e310de..781e3d853308 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -40,7 +40,7 @@ from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.models.trigger import Trigger -from airflow.operators.python import PythonOperator +from airflow.providers.standard.operators.python import PythonOperator from airflow.utils.log.file_task_handler import ( FileTaskHandler, LogType, diff --git a/tests/utils/test_task_group.py b/tests/utils/test_task_group.py index b973a1f615e0..b0fd7fa6c183 100644 --- a/tests/utils/test_task_group.py +++ b/tests/utils/test_task_group.py @@ -34,12 +34,11 @@ from airflow.models.dag import DAG from airflow.models.xcom_arg import XComArg from airflow.operators.empty import EmptyOperator -from airflow.operators.python import PythonOperator from airflow.utils.dag_edges import dag_edges from airflow.utils.task_group import TaskGroup, task_group_to_dict from tests.models import DEFAULT_DATE -from tests_common.test_utils.compat import BashOperator +from tests_common.test_utils.compat import BashOperator, PythonOperator def make_task(name, type_="classic"): diff --git a/tests/www/views/test_views_rendered.py b/tests/www/views/test_views_rendered.py index 822e657da200..00f2a0c7d163 100644 --- a/tests/www/views/test_views_rendered.py +++ b/tests/www/views/test_views_rendered.py @@ -28,14 +28,13 @@ from airflow.models.dag import DAG from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.variable import Variable -from airflow.operators.python import PythonOperator from airflow.serialization.serialized_objects import SerializedDAG from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunType -from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS, BashOperator +from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS, BashOperator, PythonOperator from tests_common.test_utils.db import ( clear_db_dags, clear_db_runs, diff --git a/tests_common/test_utils/compat.py b/tests_common/test_utils/compat.py index 42c566c5d50b..6f788c30dcca 100644 --- a/tests_common/test_utils/compat.py +++ b/tests_common/test_utils/compat.py @@ -53,14 +53,18 @@ try: from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.generic_transfer import GenericTransfer + from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.sensors.bash import BashSensor from airflow.providers.standard.sensors.date_time import DateTimeSensor + from airflow.providers.standard.utils.python_virtualenv import write_python_script except ImportError: # Compatibility for Airflow < 2.10.* from airflow.operators.bash import BashOperator # type: ignore[no-redef,attr-defined] from airflow.operators.generic_transfer import GenericTransfer # type: ignore[no-redef,attr-defined] + from airflow.operators.python import PythonOperator # type: ignore[no-redef,attr-defined] from airflow.sensors.bash import BashSensor # type: ignore[no-redef,attr-defined] from airflow.sensors.date_time import DateTimeSensor # type: ignore[no-redef,attr-defined] + from airflow.utils.python_virtualenv import write_python_script # type: ignore[no-redef,attr-defined] if TYPE_CHECKING: