Skip to content

Commit

Permalink
Merge branch 'main' into feature/paginated-generic-transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
dabla authored Jan 17, 2025
2 parents 84b75e9 + 9984dcd commit b0f56c5
Show file tree
Hide file tree
Showing 146 changed files with 4,850 additions and 2,938 deletions.
2 changes: 1 addition & 1 deletion .github/actions/install-pre-commit/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ inputs:
default: "3.9"
uv-version:
description: 'uv version to use'
default: "0.5.14" # Keep this comment to allow automatic replacement of uv version
default: "0.5.20" # Keep this comment to allow automatic replacement of uv version
pre-commit-version:
description: 'pre-commit version to use'
default: "4.0.1" # Keep this comment to allow automatic replacement of pre-commit version
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.14
ARG AIRFLOW_UV_VERSION=0.5.20
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
20 changes: 12 additions & 8 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,15 @@ function environment_initialization() {
fi
}

function handle_mount_sources() {
if [[ ${MOUNT_SOURCES=} == "remove" ]]; then
echo
echo "${COLOR_BLUE}Mounted sources are removed, cleaning up mounted dist-info files${COLOR_RESET}"
echo
rm -rf /usr/local/lib/python${PYTHON_MAJOR_MINOR_VERSION}/site-packages/apache_airflow*.dist-info/
fi
}

function determine_airflow_to_use() {
USE_AIRFLOW_VERSION="${USE_AIRFLOW_VERSION:=""}"
if [[ ${USE_AIRFLOW_VERSION} == "" && ${USE_PACKAGES_FROM_DIST=} != "true" ]]; then
Expand All @@ -885,12 +894,6 @@ function determine_airflow_to_use() {
mkdir -p "${AIRFLOW_SOURCES}"/logs/
mkdir -p "${AIRFLOW_SOURCES}"/tmp/
else
if [[ ${USE_AIRFLOW_VERSION} =~ 2\.[7-8].* && ${TEST_TYPE} == "Providers[fab]" ]]; then
echo
echo "${COLOR_YELLOW}Skipping FAB tests on Airflow 2.7 and 2.8 because of FAB incompatibility with them${COLOR_RESET}"
echo
exit 0
fi
if [[ ${CLEAN_AIRFLOW_INSTALLATION=} == "true" ]]; then
echo
echo "${COLOR_BLUE}Uninstalling all packages first${COLOR_RESET}"
Expand Down Expand Up @@ -1050,7 +1053,7 @@ function start_webserver_with_examples(){
echo
echo "${COLOR_BLUE}Parsing example dags${COLOR_RESET}"
echo
airflow scheduler --num-runs 100
airflow dags reserialize
echo "Example dags parsing finished"
echo "Create admin user"
airflow users create -u admin -p admin -f Thor -l Administrator -r Admin -e [email protected]
Expand All @@ -1074,6 +1077,7 @@ function start_webserver_with_examples(){
echo "${COLOR_BLUE}Airflow webserver started${COLOR_RESET}"
}

handle_mount_sources
determine_airflow_to_use
environment_initialization
check_boto_upgrade
Expand Down Expand Up @@ -1264,7 +1268,7 @@ COPY --from=scripts common.sh install_packaging_tools.sh install_additional_depe
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.14
ARG AIRFLOW_UV_VERSION=0.5.20
# TODO(potiuk): automate with upgrade check (possibly)
ARG AIRFLOW_PRE_COMMIT_VERSION="4.0.1"
ARG AIRFLOW_PRE_COMMIT_UV_VERSION="4.1.4"
Expand Down
5 changes: 4 additions & 1 deletion airflow/api/common/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session =
# This handles the case when the dag_id is changed in the file
session.execute(
delete(ParseImportError)
.where(ParseImportError.filename == dag.fileloc)
.where(
ParseImportError.filename == dag.fileloc,
ParseImportError.bundle_name == dag.bundle_name,
)
.execution_options(synchronize_session="fetch")
)

Expand Down
27 changes: 21 additions & 6 deletions airflow/api_connexion/endpoints/import_error_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING

from sqlalchemy import func, select
from sqlalchemy import func, select, tuple_

from airflow.api_connexion import security
from airflow.api_connexion.exceptions import NotFound, PermissionDenied
Expand Down Expand Up @@ -61,7 +61,9 @@ def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) ->
readable_dag_ids = security.get_readable_dags()
file_dag_ids = {
dag_id[0]
for dag_id in session.query(DagModel.dag_id).filter(DagModel.fileloc == error.filename).all()
for dag_id in session.query(DagModel.dag_id)
.filter(DagModel.fileloc == error.filename, DagModel.bundle_name == error.bundle_name)
.all()
}

# Can the user read any DAGs in the file?
Expand Down Expand Up @@ -98,9 +100,17 @@ def get_import_errors(
if not can_read_all_dags:
# if the user doesn't have access to all DAGs, only display errors from visible DAGs
readable_dag_ids = security.get_readable_dags()
dagfiles_stmt = select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids))
query = query.where(ParseImportError.filename.in_(dagfiles_stmt))
count_query = count_query.where(ParseImportError.filename.in_(dagfiles_stmt))
dagfiles_stmt = session.execute(
select(DagModel.fileloc, DagModel.bundle_name)
.distinct()
.where(DagModel.dag_id.in_(readable_dag_ids))
).all()
query = query.where(
tuple_(ParseImportError.filename, ParseImportError.bundle_name or None).in_(dagfiles_stmt)
)
count_query = count_query.where(
tuple_(ParseImportError.filename, ParseImportError.bundle_name).in_(dagfiles_stmt)
)

total_entries = session.scalars(count_query).one()
import_errors = session.scalars(query.offset(offset).limit(limit)).all()
Expand All @@ -109,7 +119,12 @@ def get_import_errors(
for import_error in import_errors:
# Check if user has read access to all the DAGs defined in the file
file_dag_ids = (
session.query(DagModel.dag_id).filter(DagModel.fileloc == import_error.filename).all()
session.query(DagModel.dag_id)
.filter(
DagModel.fileloc == import_error.filename,
DagModel.bundle_name == import_error.bundle_name,
)
.all()
)
requests: Sequence[IsAuthorizedDagRequest] = [
{
Expand Down
4 changes: 4 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3370,6 +3370,10 @@ components:
type: string
readOnly: true
description: The filename
bundle_name:
type: string
readOnly: true
description: The bundle name
stack_trace:
type: string
readOnly: true
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/error_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Meta:
import_error_id = auto_field("id", dump_only=True)
timestamp = auto_field(format="iso", dump_only=True)
filename = auto_field(dump_only=True)
bundle_name = auto_field(dump_only=True)
stack_trace = auto_field("stacktrace", dump_only=True)


Expand Down
1 change: 1 addition & 0 deletions airflow/api_fastapi/core_api/datamodels/import_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ImportErrorResponse(BaseModel):
id: int = Field(alias="import_error_id")
timestamp: datetime
filename: str
bundle_name: str
stacktrace: str = Field(alias="stack_trace")


Expand Down
1 change: 0 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ class PluginResponse(BaseModel):
global_operator_extra_links: list[str]
operator_extra_links: list[str]
source: Annotated[str, BeforeValidator(coerce_to_string)]
ti_deps: list[Annotated[str, BeforeValidator(coerce_to_string)]]
listeners: list[str]
timetables: list[str]

Expand Down
10 changes: 4 additions & 6 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8440,6 +8440,9 @@ components:
filename:
type: string
title: Filename
bundle_name:
type: string
title: Bundle Name
stack_trace:
type: string
title: Stack Trace
Expand All @@ -8448,6 +8451,7 @@ components:
- import_error_id
- timestamp
- filename
- bundle_name
- stack_trace
title: ImportErrorResponse
description: Import Error Response.
Expand Down Expand Up @@ -8695,11 +8699,6 @@ components:
source:
type: string
title: Source
ti_deps:
items:
type: string
type: array
title: Ti Deps
listeners:
items:
type: string
Expand All @@ -8721,7 +8720,6 @@ components:
- global_operator_extra_links
- operator_extra_links
- source
- ti_deps
- listeners
- timetables
title: PluginResponse
Expand Down
44 changes: 34 additions & 10 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.exceptions import DagNotFound
from airflow.models import DagRun
from airflow.models.backfill import (
AlreadyRunningBackfill,
Backfill,
BackfillDagRun,
DagNoScheduleException,
_create_backfill,
_do_dry_run,
)
Expand Down Expand Up @@ -209,6 +211,17 @@ def create_backfill(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag {backfill_request.dag_id}",
)
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"{backfill_request.dag_id} has no schedule",
)

except DagNotFound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Could not find dag {backfill_request.dag_id}",
)


@backfills_router.post(
Expand All @@ -227,14 +240,25 @@ def create_backfill_dry_run(
from_date = timezone.coerce_datetime(body.from_date)
to_date = timezone.coerce_datetime(body.to_date)

backfills_dry_run = _do_dry_run(
dag_id=body.dag_id,
from_date=from_date,
to_date=to_date,
reverse=body.run_backwards,
reprocess_behavior=body.reprocess_behavior,
session=session,
)
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run]
try:
backfills_dry_run = _do_dry_run(
dag_id=body.dag_id,
from_date=from_date,
to_date=to_date,
reverse=body.run_backwards,
reprocess_behavior=body.reprocess_behavior,
session=session,
)
backfills = [DryRunBackfillResponse(logical_date=d) for d in backfills_dry_run]

return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run))
return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run))
except DagNotFound:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Could not find dag {body.dag_id}",
)
except DagNoScheduleException:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"{body.dag_id} has no schedule",
)
1 change: 1 addition & 0 deletions airflow/api_fastapi/core_api/routes/public/import_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def get_import_errors(
"id",
"timestamp",
"filename",
"bundle_name",
"stacktrace",
],
ParseImportError,
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ def string_lower_type(val):
help="The option name",
)

# lint
ARG_LINT_CONFIG_SECTION = Arg(
("--section",),
help="The section name(s) to lint in the airflow config.",
Expand Down
12 changes: 11 additions & 1 deletion airflow/cli/commands/local_commands/standalone_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from airflow.configuration import conf
from airflow.executors import executor_constants
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.job import most_recent_job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
Expand Down Expand Up @@ -76,6 +77,12 @@ def run(self):
command=["scheduler"],
env=env,
)
self.subcommands["dag-processor"] = SubCommand(
self,
name="dag-processor",
command=["dag-processor"],
env=env,
)
self.subcommands["webserver"] = SubCommand(
self,
name="webserver",
Expand Down Expand Up @@ -147,6 +154,7 @@ def print_output(self, name: str, output):
"fastapi-api": "magenta",
"webserver": "green",
"scheduler": "blue",
"dag-processor": "yellow",
"triggerer": "cyan",
"standalone": "white",
}
Expand All @@ -169,6 +177,7 @@ def calculate_env(self):
We override some settings as part of being standalone.
"""
env = dict(os.environ)
env["AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR"] = "True"

# Make sure we're using a local executor flavour
executor_class, _ = ExecutorLoader.import_default_executor_cls()
Expand Down Expand Up @@ -205,6 +214,7 @@ def is_ready(self):
return (
self.port_open(self.web_server_port)
and self.job_running(SchedulerJobRunner)
and self.job_running(DagProcessorJobRunner)
and self.job_running(TriggererJobRunner)
)

Expand All @@ -228,7 +238,7 @@ def job_running(self, job_runner_class: type[BaseJobRunner]):
"""
Check if the given job name is running and heartbeating correctly.
Used to tell if scheduler is alive.
Used to tell if a component is alive.
"""
recent = most_recent_job(job_runner_class.job_type)
if not recent:
Expand Down
22 changes: 18 additions & 4 deletions airflow/cli/commands/remote_commands/config_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,21 @@ def message(self) -> str:
# core
ConfigChange(
config=ConfigParameter("core", "check_slas"),
suggestion="The SLA feature is removed in Airflow 3.0, to be replaced with Airflow Alerts in "
"future",
suggestion="The SLA feature is removed in Airflow 3.0, to be replaced with Airflow Alerts in future",
),
ConfigChange(
config=ConfigParameter("core", "strict_asset_uri_validation"),
suggestion="Asset URI with a defined scheme will now always be validated strictly, "
config=ConfigParameter("core", "strict_dataset_uri_validation"),
suggestion="Dataset URI with a defined scheme will now always be validated strictly, "
"raising a hard error on validation failure.",
),
ConfigChange(
config=ConfigParameter("core", "dataset_manager_class"),
renamed_to=ConfigParameter("core", "asset_manager_class"),
),
ConfigChange(
config=ConfigParameter("core", "dataset_manager_kwargs"),
renamed_to=ConfigParameter("core", "asset_manager_kwargs"),
),
ConfigChange(
config=ConfigParameter("core", "worker_precheck"),
renamed_to=ConfigParameter("celery", "worker_precheck"),
Expand Down Expand Up @@ -237,6 +244,9 @@ def message(self) -> str:
ConfigChange(
config=ConfigParameter("webserver", "allow_raw_html_descriptions"),
),
ConfigChange(
config=ConfigParameter("webserver", "cookie_samesite"),
),
ConfigChange(
config=ConfigParameter("webserver", "update_fab_perms"),
renamed_to=ConfigParameter("fab", "update_fab_perms"),
Expand Down Expand Up @@ -317,6 +327,10 @@ def message(self) -> str:
config=ConfigParameter("scheduler", "statsd_custom_client_path"),
renamed_to=ConfigParameter("metrics", "statsd_custom_client_path"),
),
ConfigChange(
config=ConfigParameter("scheduler", "dag_dir_list_interval"),
renamed_to=ConfigParameter("dag_bundles", "refresh_interval"),
),
# celery
ConfigChange(
config=ConfigParameter("celery", "stalled_task_timeout"),
Expand Down
Loading

0 comments on commit b0f56c5

Please sign in to comment.