Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-84 Convert async route to sync routes #43797

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
path="/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def list_backfills(
def list_backfills(
dag_id: str,
limit: QueryLimit,
offset: QueryOffset,
Expand Down Expand Up @@ -81,7 +81,7 @@ async def list_backfills(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_backfill(
def get_backfill(
backfill_id: str,
session: Annotated[Session, Depends(get_session)],
):
Expand All @@ -102,7 +102,7 @@ async def get_backfill(
]
),
)
async def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
b = session.get(Backfill, backfill_id)
if not b:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}")
Expand All @@ -125,7 +125,7 @@ async def pause_backfill(*, backfill_id, session: Annotated[Session, Depends(get
]
),
)
async def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
b = session.get(Backfill, backfill_id)
if not b:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}")
Expand All @@ -147,7 +147,7 @@ async def unpause_backfill(*, backfill_id, session: Annotated[Session, Depends(g
]
),
)
async def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(get_session)]):
b: Backfill = session.get(Backfill, backfill_id)
if not b:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Could not find backfill with id {backfill_id}")
Expand Down Expand Up @@ -194,7 +194,7 @@ async def cancel_backfill(*, backfill_id, session: Annotated[Session, Depends(ge
]
),
)
async def create_backfill(
def create_backfill(
backfill_request: BackfillPostBody,
):
from_date = timezone.coerce_datetime(backfill_request.from_date)
Expand Down
10 changes: 5 additions & 5 deletions airflow/api_fastapi/core_api/routes/public/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def delete_connection(
def delete_connection(
connection_id: str,
session: Annotated[Session, Depends(get_session)],
):
Expand All @@ -64,7 +64,7 @@ async def delete_connection(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_connection(
def get_connection(
connection_id: str,
session: Annotated[Session, Depends(get_session)],
) -> ConnectionResponse:
Expand All @@ -85,7 +85,7 @@ async def get_connection(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_connections(
def get_connections(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down Expand Up @@ -125,7 +125,7 @@ async def get_connections(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_409_CONFLICT]
),
)
async def post_connection(
def post_connection(
post_body: ConnectionBody,
session: Annotated[Session, Depends(get_session)],
) -> ConnectionResponse:
Expand Down Expand Up @@ -156,7 +156,7 @@ async def post_connection(
]
),
)
async def patch_connection(
def patch_connection(
connection_id: str,
patch_body: ConnectionBody,
session: Annotated[Session, Depends(get_session)],
Expand Down
8 changes: 4 additions & 4 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
]
),
)
async def get_dag_run(
def get_dag_run(
dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]
) -> DAGRunResponse:
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))
Expand All @@ -75,7 +75,7 @@ async def get_dag_run(
]
),
)
async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]):
def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Session, Depends(get_session)]):
"""Delete a DAG Run entry."""
dag_run = session.scalar(select(DagRun).filter_by(dag_id=dag_id, run_id=dag_run_id))

Expand All @@ -99,7 +99,7 @@ async def delete_dag_run(dag_id: str, dag_run_id: str, session: Annotated[Sessio
]
),
)
async def patch_dag_run_state(
def patch_dag_run_state(
dag_id: str,
dag_run_id: str,
patch_body: DAGRunPatchBody,
Expand Down Expand Up @@ -138,6 +138,6 @@ async def patch_dag_run_state(
else:
set_dag_run_state_to_failed(dag=dag, run_id=dag_run.run_id, commit=True)

dag_run = session.get(DagRun, dag_run.id)
session.refresh(dag_run)

return DAGRunResponse.model_validate(dag_run, from_attributes=True)
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
},
response_model=DAGSourceResponse,
)
async def get_dag_source(
def get_dag_source(
file_token: str,
session: Annotated[Session, Depends(get_session)],
request: Request,
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/dag_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
]
),
)
async def get_dag_stats(
def get_dag_stats(
session: Annotated[Session, Depends(get_session)],
dag_ids: QueryDagIdsFilter,
) -> DagStatsCollectionResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
"/dagWarnings",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def list_dag_warnings(
def list_dag_warnings(
dag_id: QueryDagIdInDagWarningFilter,
warning_type: QueryWarningTypeFilter,
limit: QueryLimit,
Expand Down
16 changes: 7 additions & 9 deletions airflow/api_fastapi/core_api/routes/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@


@dags_router.get("/")
async def get_dags(
def get_dags(
limit: QueryLimit,
offset: QueryOffset,
tags: QueryTagsFilter,
Expand Down Expand Up @@ -101,7 +101,7 @@ async def get_dags(
"/tags",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def get_dag_tags(
def get_dag_tags(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down Expand Up @@ -142,9 +142,7 @@ async def get_dag_tags(
]
),
)
async def get_dag(
dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request
) -> DAGResponse:
def get_dag(dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request) -> DAGResponse:
"""Get basic information about a DAG."""
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
Expand Down Expand Up @@ -172,7 +170,7 @@ async def get_dag(
]
),
)
async def get_dag_details(
def get_dag_details(
dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request
) -> DAGDetailsResponse:
"""Get details of DAG."""
Expand Down Expand Up @@ -202,7 +200,7 @@ async def get_dag_details(
]
),
)
async def patch_dag(
def patch_dag(
dag_id: str,
patch_body: DAGPatchBody,
session: Annotated[Session, Depends(get_session)],
Expand Down Expand Up @@ -241,7 +239,7 @@ async def patch_dag(
]
),
)
async def patch_dags(
def patch_dags(
patch_body: DAGPatchBody,
limit: QueryLimit,
offset: QueryOffset,
Expand Down Expand Up @@ -301,7 +299,7 @@ async def patch_dags(
]
),
)
async def delete_dag(
def delete_dag(
dag_id: str,
session: Annotated[Session, Depends(get_session)],
) -> Response:
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/event_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_event_log(
def get_event_log(
event_log_id: int,
session: Annotated[Session, Depends(get_session)],
) -> EventLogResponse:
Expand All @@ -66,7 +66,7 @@ async def get_event_log(
"/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def get_event_logs(
def get_event_logs(
limit: QueryLimit,
offset: QueryOffset,
session: Annotated[Session, Depends(get_session)],
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/import_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_import_error(
def get_import_error(
import_error_id: int,
session: Annotated[Session, Depends(get_session)],
) -> ImportErrorResponse:
Expand All @@ -66,7 +66,7 @@ async def get_import_error(
"/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def get_import_errors(
def get_import_errors(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@


@monitor_router.get("/health")
async def get_health() -> HealthInfoSchema:
def get_health() -> HealthInfoSchema:
airflow_health_status = get_airflow_health()
return HealthInfoSchema.model_validate(airflow_health_status)
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


@plugins_router.get("/")
async def get_plugins(
def get_plugins(
limit: QueryLimit,
offset: QueryOffset,
) -> PluginCollectionResponse:
Expand Down
10 changes: 5 additions & 5 deletions airflow/api_fastapi/core_api/routes/public/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
]
),
)
async def delete_pool(
def delete_pool(
pool_name: str,
session: Annotated[Session, Depends(get_session)],
):
Expand All @@ -71,7 +71,7 @@ async def delete_pool(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_pool(
def get_pool(
pool_name: str,
session: Annotated[Session, Depends(get_session)],
) -> PoolResponse:
Expand All @@ -89,7 +89,7 @@ async def get_pool(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_pools(
def get_pools(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down Expand Up @@ -127,7 +127,7 @@ async def get_pools(
]
),
)
async def patch_pool(
def patch_pool(
pool_name: str,
patch_body: PoolPatchBody,
session: Annotated[Session, Depends(get_session)],
Expand Down Expand Up @@ -170,7 +170,7 @@ async def patch_pool(
status_code=status.HTTP_201_CREATED,
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def post_pool(
def post_pool(
post_body: PoolPostBody,
session: Annotated[Session, Depends(get_session)],
) -> PoolResponse:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _provider_mapper(provider: ProviderInfo) -> ProviderResponse:


@providers_router.get("/")
async def get_providers(
def get_providers(
limit: QueryLimit,
offset: QueryOffset,
) -> ProviderCollectionResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_task_instance(
def get_task_instance(
dag_id: str, dag_run_id: str, task_id: str, session: Annotated[Session, Depends(get_session)]
) -> TaskInstanceResponse:
"""Get task instance."""
Expand Down Expand Up @@ -70,7 +70,7 @@ async def get_task_instance(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_mapped_task_instance(
def get_mapped_task_instance(
dag_id: str,
dag_run_id: str,
task_id: str,
Expand Down
10 changes: 5 additions & 5 deletions airflow/api_fastapi/core_api/routes/public/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def delete_variable(
def delete_variable(
variable_key: str,
session: Annotated[Session, Depends(get_session)],
):
Expand All @@ -59,7 +59,7 @@ async def delete_variable(
[status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN, status.HTTP_404_NOT_FOUND]
),
)
async def get_variable(
def get_variable(
variable_key: str,
session: Annotated[Session, Depends(get_session)],
) -> VariableResponse:
Expand All @@ -78,7 +78,7 @@ async def get_variable(
"/",
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def get_variables(
def get_variables(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
Expand Down Expand Up @@ -121,7 +121,7 @@ async def get_variables(
]
),
)
async def patch_variable(
def patch_variable(
variable_key: str,
patch_body: VariableBody,
session: Annotated[Session, Depends(get_session)],
Expand Down Expand Up @@ -154,7 +154,7 @@ async def patch_variable(
status_code=status.HTTP_201_CREATED,
responses=create_openapi_http_exception_doc([status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN]),
)
async def post_variable(
def post_variable(
post_body: VariableBody,
session: Annotated[Session, Depends(get_session)],
) -> VariableResponse:
Expand Down
Loading