diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index e9e93673b8af..3b61e0fedb4d 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -49,7 +49,7 @@ path="/", responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]), ) -async def list_backfills( +def list_backfills( dag_id: str, limit: QueryLimit, offset: QueryOffset, @@ -81,7 +81,7 @@ async def list_backfills( [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def get_backfill( +def get_backfill( backfill_id: str, session: Annotated[Session, Depends(get_session)], ): @@ -102,7 +102,7 @@ async def get_backfill( ] ), ) -async def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): +def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): b = session.get(Backfill, backfill_id) if not b: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}") @@ -125,7 +125,7 @@ async def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get ] ), ) -async def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): +def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): b = session.get(Backfill, backfill_id) if not b: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}") @@ -147,7 +147,7 @@ async def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(g ] ), ) -async def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): +def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]): b: Backfill = session.get(Backfill, backfill_id) if not b: raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}") @@ -194,7 +194,7 @@ async def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(ge ] ), ) -async def create_backfill( +def create_backfill( backfill_request: BackfillPostBody, ): from_date = timezone.coerce_datetime(backfill_request.from_date) diff --git a/airflow/api_fastapi/core_api/routes/public/connections.py b/airflow/api_fastapi/core_api/routes/public/connections.py index b1b6fb4abeb6..fd378601385a 100644 --- a/airflow/api_fastapi/core_api/routes/public/connections.py +++ b/airflow/api_fastapi/core_api/routes/public/connections.py @@ -43,7 +43,7 @@ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def delete_connection( +def delete_connection( connection_id: str, session: Annotated[Session, Depends(get_session)], ): @@ -64,7 +64,7 @@ async def delete_connection( [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def get_connection( +def get_connection( connection_id: str, session: Annotated[Session, Depends(get_session)], ) -> ConnectionResponse: @@ -85,7 +85,7 @@ async def get_connection( [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def get_connections( +def get_connections( limit: QueryLimit, offset: QueryOffset, order_by: Annotated[ @@ -125,7 +125,7 @@ async def get_connections( [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_409_CONFLICT] ), ) -async def post_connection( +def post_connection( post_body: ConnectionBody, session: Annotated[Session, Depends(get_session)], ) -> ConnectionResponse: @@ -156,7 +156,7 @@ async def post_connection( ] ), ) -async def patch_connection( +def patch_connection( connection_id: str, patch_body: ConnectionBody, session: Annotated[Session, Depends(get_session)], diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 7f41573b1dba..b05ed2ba1138 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -50,7 +50,7 @@ ] ), ) -async def get_dag_run( +def get_dag_run( dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)] ) -> DAGRunResponse: dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) @@ -75,7 +75,7 @@ async def get_dag_run( ] ), ) -async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]): +def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]): """Delete a DAG Run entry.""" dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id)) @@ -99,7 +99,7 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio ] ), ) -async def patch_dag_run_state( +def patch_dag_run_state( dag_id: str, dag_run_id: str, patch_body: DAGRunPatchBody, @@ -138,6 +138,6 @@ async def patch_dag_run_state( else: set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True) - dag_run = session.get(DagRun, dag_run.id) + session.refresh(dag_run) return DAGRunResponse.model_validate(dag_run, from_attributes=True) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_sources.py b/airflow/api_fastapi/core_api/routes/public/dag_sources.py index 3cf046f5b757..1d008e9ad2d1 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_sources.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_sources.py @@ -55,7 +55,7 @@ }, response_model=DAGSourceResponse, ) -async def get_dag_source( +def get_dag_source( file_token: str, session: Annotated[Session, Depends(get_session)], request: Request, diff --git a/airflow/api_fastapi/core_api/routes/public/dag_stats.py b/airflow/api_fastapi/core_api/routes/public/dag_stats.py index e4bc1c05b16d..5d773703224b 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_stats.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_stats.py @@ -50,7 +50,7 @@ ] ), ) -async def get_dag_stats( +def get_dag_stats( session: Annotated[Session, Depends(get_session)], dag_ids: QueryDagIdsFilter, ) -> DagStatsCollectionResponse: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_warning.py b/airflow/api_fastapi/core_api/routes/public/dag_warning.py index f445fb0afcf5..a981fd9ed58a 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_warning.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_warning.py @@ -48,7 +48,7 @@ "/dagWarnings", responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]), ) -async def list_dag_warnings( +def list_dag_warnings( dag_id: QueryDagIdInDagWarningFilter, warning_type: QueryWarningTypeFilter, limit: QueryLimit, diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index a36c391e55dc..9a9583ed22a1 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -58,7 +58,7 @@ @dags_router.get("/") -async def get_dags( +def get_dags( limit: QueryLimit, offset: QueryOffset, tags: QueryTagsFilter, @@ -101,7 +101,7 @@ async def get_dags( "/tags", responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]), ) -async def get_dag_tags( +def get_dag_tags( limit: QueryLimit, offset: QueryOffset, order_by: Annotated[ @@ -142,9 +142,7 @@ async def get_dag_tags( ] ), ) -async def get_dag( - dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request -) -> DAGResponse: +def get_dag(dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request) -> DAGResponse: """Get basic information about a DAG.""" dag: DAG = request.app.state.dag_bag.get_dag(dag_id) if not dag: @@ -172,7 +170,7 @@ async def get_dag( ] ), ) -async def get_dag_details( +def get_dag_details( dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request ) -> DAGDetailsResponse: """Get details of DAG.""" @@ -202,7 +200,7 @@ async def get_dag_details( ] ), ) -async def patch_dag( +def patch_dag( dag_id: str, patch_body: DAGPatchBody, session: Annotated[Session, Depends(get_session)], @@ -241,7 +239,7 @@ async def patch_dag( ] ), ) -async def patch_dags( +def patch_dags( patch_body: DAGPatchBody, limit: QueryLimit, offset: QueryOffset, @@ -301,7 +299,7 @@ async def patch_dags( ] ), ) -async def delete_dag( +def delete_dag( dag_id: str, session: Annotated[Session, Depends(get_session)], ) -> Response: diff --git a/airflow/api_fastapi/core_api/routes/public/event_logs.py b/airflow/api_fastapi/core_api/routes/public/event_logs.py index 462c26696957..510846b67c47 100644 --- a/airflow/api_fastapi/core_api/routes/public/event_logs.py +++ b/airflow/api_fastapi/core_api/routes/public/event_logs.py @@ -49,7 +49,7 @@ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def get_event_log( +def get_event_log( event_log_id: int, session: Annotated[Session, Depends(get_session)], ) -> EventLogResponse: @@ -66,7 +66,7 @@ async def get_event_log( "/", responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]), ) -async def get_event_logs( +def get_event_logs( limit: QueryLimit, offset: QueryOffset, session: Annotated[Session, Depends(get_session)], diff --git a/airflow/api_fastapi/core_api/routes/public/import_error.py b/airflow/api_fastapi/core_api/routes/public/import_error.py index 9007d6ff891b..a067a7608fe7 100644 --- a/airflow/api_fastapi/core_api/routes/public/import_error.py +++ b/airflow/api_fastapi/core_api/routes/public/import_error.py @@ -47,7 +47,7 @@ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def get_import_error( +def get_import_error( import_error_id: int, session: Annotated[Session, Depends(get_session)], ) -> ImportErrorResponse: @@ -66,7 +66,7 @@ async def get_import_error( "/", responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]), ) -async def get_import_errors( +def get_import_errors( limit: QueryLimit, offset: QueryOffset, order_by: Annotated[ diff --git a/airflow/api_fastapi/core_api/routes/public/monitor.py b/airflow/api_fastapi/core_api/routes/public/monitor.py index e5d746e15c24..d855e7bdebf4 100644 --- a/airflow/api_fastapi/core_api/routes/public/monitor.py +++ b/airflow/api_fastapi/core_api/routes/public/monitor.py @@ -25,6 +25,6 @@ @monitor_router.get("/health") -async def get_health() -> HealthInfoSchema: +def get_health() -> HealthInfoSchema: airflow_health_status = get_airflow_health() return HealthInfoSchema.model_validate(airflow_health_status) diff --git a/airflow/api_fastapi/core_api/routes/public/plugins.py b/airflow/api_fastapi/core_api/routes/public/plugins.py index c264e748373c..516344e9b3a2 100644 --- a/airflow/api_fastapi/core_api/routes/public/plugins.py +++ b/airflow/api_fastapi/core_api/routes/public/plugins.py @@ -26,7 +26,7 @@ @plugins_router.get("/") -async def get_plugins( +def get_plugins( limit: QueryLimit, offset: QueryOffset, ) -> PluginCollectionResponse: diff --git a/airflow/api_fastapi/core_api/routes/public/pools.py b/airflow/api_fastapi/core_api/routes/public/pools.py index 99389e0bd6e9..5b58c83d8aa2 100644 --- a/airflow/api_fastapi/core_api/routes/public/pools.py +++ b/airflow/api_fastapi/core_api/routes/public/pools.py @@ -51,7 +51,7 @@ ] ), ) -async def delete_pool( +def delete_pool( pool_name: str, session: Annotated[Session, Depends(get_session)], ): @@ -71,7 +71,7 @@ async def delete_pool( [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def get_pool( +def get_pool( pool_name: str, session: Annotated[Session, Depends(get_session)], ) -> PoolResponse: @@ -89,7 +89,7 @@ async def get_pool( [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def get_pools( +def get_pools( limit: QueryLimit, offset: QueryOffset, order_by: Annotated[ @@ -127,7 +127,7 @@ async def get_pools( ] ), ) -async def patch_pool( +def patch_pool( pool_name: str, patch_body: PoolPatchBody, session: Annotated[Session, Depends(get_session)], @@ -170,7 +170,7 @@ async def patch_pool( status_code=status.HTTP_201_CREATED, responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]), ) -async def post_pool( +def post_pool( post_body: PoolPostBody, session: Annotated[Session, Depends(get_session)], ) -> PoolResponse: diff --git a/airflow/api_fastapi/core_api/routes/public/providers.py b/airflow/api_fastapi/core_api/routes/public/providers.py index 6c01578dd5f6..0386404f420d 100644 --- a/airflow/api_fastapi/core_api/routes/public/providers.py +++ b/airflow/api_fastapi/core_api/routes/public/providers.py @@ -40,7 +40,7 @@ def _provider_mapper(provider: ProviderInfo) -> ProviderResponse: @providers_router.get("/") -async def get_providers( +def get_providers( limit: QueryLimit, offset: QueryOffset, ) -> ProviderCollectionResponse: diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow/api_fastapi/core_api/routes/public/task_instances.py index be76d1922548..9d74ed273111 100644 --- a/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -60,7 +60,7 @@ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def get_task_instance( +def get_task_instance( dag_id: str, dag_run_id: str, task_id: str, session: Annotated[Session, Depends(get_session)] ) -> TaskInstanceResponse: """Get task instance.""" @@ -236,7 +236,7 @@ async def get_task_instance_dependencies( [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def get_mapped_task_instance( +def get_mapped_task_instance( dag_id: str, dag_run_id: str, task_id: str, diff --git a/airflow/api_fastapi/core_api/routes/public/variables.py b/airflow/api_fastapi/core_api/routes/public/variables.py index 6ed680cd7bc2..ac1be6e6edb0 100644 --- a/airflow/api_fastapi/core_api/routes/public/variables.py +++ b/airflow/api_fastapi/core_api/routes/public/variables.py @@ -42,7 +42,7 @@ [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def delete_variable( +def delete_variable( variable_key: str, session: Annotated[Session, Depends(get_session)], ): @@ -59,7 +59,7 @@ async def delete_variable( [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND] ), ) -async def get_variable( +def get_variable( variable_key: str, session: Annotated[Session, Depends(get_session)], ) -> VariableResponse: @@ -78,7 +78,7 @@ async def get_variable( "/", responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]), ) -async def get_variables( +def get_variables( limit: QueryLimit, offset: QueryOffset, order_by: Annotated[ @@ -121,7 +121,7 @@ async def get_variables( ] ), ) -async def patch_variable( +def patch_variable( variable_key: str, patch_body: VariableBody, session: Annotated[Session, Depends(get_session)], @@ -154,7 +154,7 @@ async def patch_variable( status_code=status.HTTP_201_CREATED, responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]), ) -async def post_variable( +def post_variable( post_body: VariableBody, session: Annotated[Session, Depends(get_session)], ) -> VariableResponse: diff --git a/airflow/api_fastapi/core_api/routes/public/version.py b/airflow/api_fastapi/core_api/routes/public/version.py index 218e0b90702d..b2ca6fef6ae2 100644 --- a/airflow/api_fastapi/core_api/routes/public/version.py +++ b/airflow/api_fastapi/core_api/routes/public/version.py @@ -26,7 +26,7 @@ @version_router.get("/") -async def get_version() -> VersionInfo: +def get_version() -> VersionInfo: """Get version information.""" airflow_version = airflow.__version__ git_version = get_airflow_git_version() diff --git a/airflow/api_fastapi/core_api/routes/ui/assets.py b/airflow/api_fastapi/core_api/routes/ui/assets.py index 3b98e4f59a3c..b9c07124277f 100644 --- a/airflow/api_fastapi/core_api/routes/ui/assets.py +++ b/airflow/api_fastapi/core_api/routes/ui/assets.py @@ -31,7 +31,7 @@ @assets_router.get("/next_run_assets/{dag_id}", include_in_schema=False) -async def next_run_assets( +def next_run_assets( dag_id: str, request: Request, session: Annotated[Session, Depends(get_session)], diff --git a/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow/api_fastapi/core_api/routes/ui/dags.py index 665373734bb9..14d0c72458d1 100644 --- a/airflow/api_fastapi/core_api/routes/ui/dags.py +++ b/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -50,7 +50,7 @@ @dags_router.get("/recent_dag_runs", include_in_schema=False, response_model_exclude_none=True) -async def recent_dag_runs( +def recent_dag_runs( limit: QueryLimit, offset: QueryOffset, tags: QueryTagsFilter, diff --git a/airflow/api_fastapi/core_api/routes/ui/dashboard.py b/airflow/api_fastapi/core_api/routes/ui/dashboard.py index 3cb39e4e8b21..ada47f2fe853 100644 --- a/airflow/api_fastapi/core_api/routes/ui/dashboard.py +++ b/airflow/api_fastapi/core_api/routes/ui/dashboard.py @@ -44,7 +44,7 @@ include_in_schema=False, responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST]), ) -async def historical_metrics( +def historical_metrics( start_date: DateTimeQuery, end_date: DateTimeQuery, session: Annotated[Session, Depends(get_session)], diff --git a/airflow/api_fastapi/execution_api/routes/health.py b/airflow/api_fastapi/execution_api/routes/health.py index e0d51e3c7145..c8d903815dc6 100644 --- a/airflow/api_fastapi/execution_api/routes/health.py +++ b/airflow/api_fastapi/execution_api/routes/health.py @@ -23,5 +23,5 @@ @health_router.get("/health") -async def health() -> dict: +def health() -> dict: return {"status": "healthy"} diff --git a/airflow/api_fastapi/execution_api/routes/task_instance.py b/airflow/api_fastapi/execution_api/routes/task_instance.py index ddf4055d4d01..8f6331f5a599 100644 --- a/airflow/api_fastapi/execution_api/routes/task_instance.py +++ b/airflow/api_fastapi/execution_api/routes/task_instance.py @@ -56,7 +56,7 @@ status.HTTP_422_UNPROCESSABLE_ENTITY: {"description": "Invalid payload for the state transition"}, }, ) -async def ti_update_state( +def ti_update_state( task_instance_id: UUID, ti_patch_payload: Annotated[schemas.TIStateUpdate, Body()], session: Annotated[Session, Depends(get_session)], @@ -144,7 +144,7 @@ async def ti_update_state( status.HTTP_422_UNPROCESSABLE_ENTITY: {"description": "Invalid payload for the state transition"}, }, ) -async def ti_heartbeat( +def ti_heartbeat( task_instance_id: UUID, ti_payload: schemas.TIHeartbeatInfo, session: Annotated[Session, Depends(get_session)], diff --git a/airflow/settings.py b/airflow/settings.py index 89c77a2abfe3..9a1801ccc41c 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -459,6 +459,12 @@ def configure_orm(disable_connection_pool=False, pool_class=None): else: connect_args = {} + if os.environ.get("AIRFLOW__CORE__UNIT_TEST_MODE") == "True" and os.environ.get("BACKEND") == "sqlite": + # FastAPI runs sync endpoints in a separate thread. SQLite does note allow + # to use objects created in another threads by default. Allowing that in test + # to so the `test` thread and the tested endpoints can use common objects. + connect_args["check_same_thread"] = False + engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True) mask_secret(engine.url.password) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index dfd48af2fa53..6c48cece798d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -156,7 +156,7 @@ def test_patch_dag_run(self, test_client, dag_id, run_id, state, response_state) assert body["state"] == response_state @pytest.mark.parametrize( - "query_params,patch_body, expected_status_code", + "query_params, patch_body, expected_status_code", [ ({"update_mask": ["state"]}, {"state": DagRunState.SUCCESS}, 200), ({}, {"state": DagRunState.SUCCESS}, 200), diff --git a/tests/core/test_sqlalchemy_config.py b/tests/core/test_sqlalchemy_config.py index 1ce879683400..4f0a3e843a73 100644 --- a/tests/core/test_sqlalchemy_config.py +++ b/tests/core/test_sqlalchemy_config.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import os from unittest.mock import patch import pytest @@ -59,7 +60,7 @@ def test_configure_orm_with_default_values( settings.configure_orm() mock_create_engine.assert_called_once_with( settings.SQL_ALCHEMY_CONN, - connect_args={}, + connect_args={} if os.environ["BACKEND"] != "sqlite" else {"check_same_thread": False}, encoding="utf-8", max_overflow=10, pool_pre_ping=True,