Skip to content

Commit

Permalink
Standard provider python operator (apache#42081)
Browse files Browse the repository at this point in the history
* add core.time provider

* add source-date-epoch

* change core to essentials

* revert external task sensor location

* add provider to airflow_providers_bug_report list

* change new provider name to standard

* add integration

* revert hatch_build

* move examples back to airflow core

* change sensors example dags paths

* remove init

* revert howto docs

* change provider as not-ready

* Move python operator to standard provider

* fix test failures

* move python operator inside standard/operators

* rebase and update python imports

* keep standard provider in hatch_build

* update selective checks and dependencies

* update imports and dependency file

* fix compat import errors

* fix selective_checks and imports from compat

* remove imports from compat for pythonoperator

* update run_tests.py python branching test paths

* add setup method in python operator tests

* check imports in python operator tests

* fix imports and move python virtual env scripts inside standard provider

* update selective checks tests and imports in test_python

* move python_virtualenv tests inside standard provider

* fix pre-commit static checks

* use compat provider for standard provider imports

* fix _SERIALIZERS ModuleNotFoundError and static checks

* import _SERIALIZERS for AIRFLOW_2_10 or higher versions

* fix ruff format

* fix static checks in test_dag

* fix test_provide_context_does_not_fail failure: add TypeError to pytest assert

---------

Co-authored-by: romsharon98 <[email protected]>
  • Loading branch information
gopidesupavan and romsharon98 authored Oct 31, 2024
1 parent 5886016 commit 06088a3
Show file tree
Hide file tree
Showing 119 changed files with 399 additions and 283 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ repos:
files: >
(?x)
^providers/src/airflow/providers/.*\.py$
exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py
exclude: ^.*/.*_vendor/|providers/src/airflow/providers/standard/operators/bash.py|providers/src/airflow/providers/standard/operators/python.py
- id: check-get-lineage-collector-providers
language: python
name: Check providers import hook lineage code from compat
Expand Down
2 changes: 1 addition & 1 deletion airflow/decorators/branch_external_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import BranchExternalPythonOperator
from airflow.providers.standard.operators.python import BranchExternalPythonOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator
Expand Down
2 changes: 1 addition & 1 deletion airflow/decorators/branch_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import BranchPythonOperator
from airflow.providers.standard.operators.python import BranchPythonOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator
Expand Down
2 changes: 1 addition & 1 deletion airflow/decorators/branch_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import BranchPythonVirtualenvOperator
from airflow.providers.standard.operators.python import BranchPythonVirtualenvOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator
Expand Down
2 changes: 1 addition & 1 deletion airflow/decorators/external_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import ExternalPythonOperator
from airflow.providers.standard.operators.python import ExternalPythonOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator
Expand Down
2 changes: 1 addition & 1 deletion airflow/decorators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Callable, Sequence

from airflow.decorators.base import DecoratedOperator, task_decorator_factory
from airflow.operators.python import PythonOperator
from airflow.providers.standard.operators.python import PythonOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator
Expand Down
2 changes: 1 addition & 1 deletion airflow/decorators/python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import PythonVirtualenvOperator
from airflow.providers.standard.operators.python import PythonVirtualenvOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator
Expand Down
2 changes: 1 addition & 1 deletion airflow/decorators/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import TYPE_CHECKING, Callable, ClassVar, Sequence

from airflow.decorators.base import get_unique_task_id, task_decorator_factory
from airflow.sensors.python import PythonSensor
from airflow.providers.standard.sensors.python import PythonSensor

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator
Expand Down
2 changes: 1 addition & 1 deletion airflow/decorators/short_circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.providers.standard.operators.python import ShortCircuitOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

from airflow import DAG
from airflow.assets import Asset, AssetAlias
from airflow.operators.python import PythonOperator
from airflow.providers.standard.operators.python import PythonOperator

with DAG(
dag_id="asset_s3_bucket_producer_with_no_taskflow",
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_branch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@

import pendulum

from airflow.operators.python import is_venv_installed
from airflow.providers.standard.operators.python import is_venv_installed

if is_venv_installed():
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (
from airflow.providers.standard.operators.python import (
BranchExternalPythonOperator,
BranchPythonOperator,
BranchPythonVirtualenvOperator,
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_branch_operator_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import pendulum

from airflow.operators.python import is_venv_installed
from airflow.providers.standard.operators.python import is_venv_installed

if is_venv_installed():
from airflow.decorators import task
Expand Down
6 changes: 3 additions & 3 deletions airflow/example_dags/example_python_context_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def print_context() -> str:
"""Print the Airflow context."""
from pprint import pprint

from airflow.operators.python import get_current_context
from airflow.providers.standard.operators.python import get_current_context

context = get_current_context()
pprint(context)
Expand All @@ -60,7 +60,7 @@ def print_context_venv() -> str:
"""Print the Airflow context in venv."""
from pprint import pprint

from airflow.operators.python import get_current_context
from airflow.providers.standard.operators.python import get_current_context

context = get_current_context()
pprint(context)
Expand All @@ -77,7 +77,7 @@ def print_context_external() -> str:
"""Print the Airflow context in external python."""
from pprint import pprint

from airflow.operators.python import get_current_context
from airflow.providers.standard.operators.python import get_current_context

context = get_current_context()
pprint(context)
Expand Down
12 changes: 8 additions & 4 deletions airflow/example_dags/example_python_context_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import pendulum

from airflow import DAG
from airflow.operators.python import ExternalPythonOperator, PythonOperator, PythonVirtualenvOperator
from airflow.providers.standard.operators.python import (
ExternalPythonOperator,
PythonOperator,
PythonVirtualenvOperator,
)

SOME_EXTERNAL_PYTHON = sys.executable

Expand All @@ -44,7 +48,7 @@ def print_context() -> str:
"""Print the Airflow context."""
from pprint import pprint

from airflow.operators.python import get_current_context
from airflow.providers.standard.operators.python import get_current_context

context = get_current_context()
pprint(context)
Expand All @@ -58,7 +62,7 @@ def print_context_venv() -> str:
"""Print the Airflow context in venv."""
from pprint import pprint

from airflow.operators.python import get_current_context
from airflow.providers.standard.operators.python import get_current_context

context = get_current_context()
pprint(context)
Expand All @@ -74,7 +78,7 @@ def print_context_external() -> str:
"""Print the Airflow context in external python."""
from pprint import pprint

from airflow.operators.python import get_current_context
from airflow.providers.standard.operators.python import get_current_context

context = get_current_context()
pprint(context)
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_python_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import pendulum

from airflow.decorators import dag, task
from airflow.operators.python import is_venv_installed
from airflow.providers.standard.operators.python import is_venv_installed

log = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import pendulum

from airflow.models.dag import DAG
from airflow.operators.python import (
from airflow.providers.standard.operators.python import (
ExternalPythonOperator,
PythonOperator,
PythonVirtualenvOperator,
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.bash import BashSensor
from airflow.providers.standard.sensors.python import PythonSensor
from airflow.providers.standard.sensors.time import TimeSensor, TimeSensorAsync
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.python import PythonSensor
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.weekday import WeekDay

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.providers.standard.operators.python import ShortCircuitOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
from airflow.providers.standard.operators.python import PythonOperator

# [END import_module]

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_taskflow_api_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from datetime import datetime

from airflow.decorators import dag, task
from airflow.operators.python import is_venv_installed
from airflow.providers.standard.operators.python import is_venv_installed

log = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/tutorial_taskflow_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pendulum

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.providers.standard.operators.python import get_current_context

# [END import_module]

Expand Down
10 changes: 5 additions & 5 deletions dev/breeze/src/airflow_breeze/utils/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ def get_excluded_provider_args(python_version: str) -> list[str]:
"Always": ["tests/always"],
"API": ["tests/api", "tests/api_connexion", "tests/api_internal", "tests/api_fastapi"],
"BranchPythonVenv": [
"tests/operators/test_python.py::TestBranchPythonVirtualenvOperator",
"providers/tests/standard/operators/test_python.py::TestBranchPythonVirtualenvOperator",
],
"BranchExternalPython": [
"tests/operators/test_python.py::TestBranchExternalPythonOperator",
"providers/tests/standard/operators/test_python.py::TestBranchExternalPythonOperator",
],
"CLI": ["tests/cli"],
"Core": [
Expand All @@ -150,20 +150,20 @@ def get_excluded_provider_args(python_version: str) -> list[str]:
"tests/utils",
],
"ExternalPython": [
"tests/operators/test_python.py::TestExternalPythonOperator",
"providers/tests/standard/operators/test_python.py::TestExternalPythonOperator",
],
"Integration": ["tests/integration"],
# Operators test type excludes Virtualenv/External tests - they have their own test types
"Operators": ["tests/operators", "--exclude-virtualenv-operator", "--exclude-external-python-operator"],
# this one is mysteriously failing dill serialization. It could be removed once
# https://github.com/pytest-dev/pytest/issues/10845 is fixed
"PlainAsserts": [
"tests/operators/test_python.py::TestPythonVirtualenvOperator::test_airflow_context",
"providers/tests/standard/operators/test_python.py::TestPythonVirtualenvOperator::test_airflow_context",
"--assert=plain",
],
"Providers": ["providers/tests"],
"PythonVenv": [
"tests/operators/test_python.py::TestPythonVirtualenvOperator",
"providers/tests/standard/operators/test_python.py::TestPythonVirtualenvOperator",
],
"Serialization": [
"tests/serialization",
Expand Down
4 changes: 2 additions & 2 deletions dev/breeze/src/airflow_breeze/utils/selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ def __hash__(self):
)

PYTHON_OPERATOR_FILES = [
r"^airflow/operators/python.py",
r"^tests/operators/test_python.py",
r"^providers/src/providers/standard/operators/python.py",
r"^providers/tests/standard/operators/test_python.py",
]

TEST_TYPE_MATCHES = HashableDict(
Expand Down
10 changes: 5 additions & 5 deletions dev/breeze/tests/test_pytest_args_for_test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
(
"PlainAsserts",
[
"tests/operators/test_python.py::TestPythonVirtualenvOperator::test_airflow_context",
"providers/tests/standard/operators/test_python.py::TestPythonVirtualenvOperator::test_airflow_context",
"--assert=plain",
],
False,
Expand All @@ -123,28 +123,28 @@
(
"PythonVenv",
[
"tests/operators/test_python.py::TestPythonVirtualenvOperator",
"providers/tests/standard/operators/test_python.py::TestPythonVirtualenvOperator",
],
False,
),
(
"BranchPythonVenv",
[
"tests/operators/test_python.py::TestBranchPythonVirtualenvOperator",
"providers/tests/standard/operators/test_python.py::TestBranchPythonVirtualenvOperator",
],
False,
),
(
"ExternalPython",
[
"tests/operators/test_python.py::TestExternalPythonOperator",
"providers/tests/standard/operators/test_python.py::TestExternalPythonOperator",
],
False,
),
(
"BranchExternalPython",
[
"tests/operators/test_python.py::TestBranchExternalPythonOperator",
"providers/tests/standard/operators/test_python.py::TestBranchExternalPythonOperator",
],
False,
),
Expand Down
Loading

0 comments on commit 06088a3

Please sign in to comment.