From c1a7eed0fdf8ef396eb0a77db5eb90033f2ec654 Mon Sep 17 00:00:00 2001 From: Amogh Date: Fri, 15 Nov 2024 12:20:03 +0530 Subject: [PATCH] moving _generate_queued_event_where_clause to assets.py --- .../core_api/routes/public/assets.py | 24 ++++++++++++++++++- .../core_api/routes/public/dags.py | 23 +----------------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 67218c471615..12b4bbe9e7a3 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -37,6 +37,7 @@ SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter +from airflow.api_fastapi.common.utils import format_datetime from airflow.api_fastapi.core_api.datamodels.assets import ( AssetCollectionResponse, AssetEventCollectionResponse, @@ -44,11 +45,32 @@ AssetResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.models.asset import AssetEvent, AssetModel +from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel assets_router = AirflowRouter(tags=["Asset"], prefix="/assets") +def _generate_queued_event_where_clause( + *, + dag_id: str | None = None, + uri: str | None = None, + before: str | None = None, +) -> list: + """Get AssetDagRunQueue where clause.""" + where_clause = [] + if dag_id is not None: + where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) + if uri is not None: + where_clause.append( + AssetDagRunQueue.asset_id.in_( + select(AssetModel.id).where(AssetModel.uri == uri), + ), + ) + if before is not None: + where_clause.append(AssetDagRunQueue.created_at < format_datetime(before)) + return where_clause + + @assets_router.get( "/", responses=create_openapi_http_exception_doc([401, 403, 404]), diff --git a/airflow/api_fastapi/core_api/routes/public/dags.py b/airflow/api_fastapi/core_api/routes/public/dags.py index 7796bc08b77e..d9fb6c28ad26 100644 --- a/airflow/api_fastapi/core_api/routes/public/dags.py +++ b/airflow/api_fastapi/core_api/routes/public/dags.py @@ -44,7 +44,6 @@ SortParam, ) from airflow.api_fastapi.common.router import AirflowRouter -from airflow.api_fastapi.common.utils import format_datetime from airflow.api_fastapi.core_api.datamodels.dags import ( DAGCollectionResponse, DAGDetailsResponse, @@ -55,6 +54,7 @@ QueuedEventResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.routes.public.assets import _generate_queued_event_where_clause from airflow.exceptions import AirflowException, DagNotFound from airflow.models import DAG, DagModel, DagTag from airflow.models.asset import AssetDagRunQueue, AssetModel @@ -62,27 +62,6 @@ dags_router = AirflowRouter(tags=["DAG"], prefix="/dags") -def _generate_queued_event_where_clause( - *, - dag_id: str | None = None, - uri: str | None = None, - before: str | None = None, -) -> list: - """Get AssetDagRunQueue where clause.""" - where_clause = [] - if dag_id is not None: - where_clause.append(AssetDagRunQueue.target_dag_id == dag_id) - if uri is not None: - where_clause.append( - AssetDagRunQueue.asset_id.in_( - select(AssetModel.id).where(AssetModel.uri == uri), - ), - ) - if before is not None: - where_clause.append(AssetDagRunQueue.created_at < format_datetime(before)) - return where_clause - - @dags_router.get("/") def get_dags( limit: QueryLimit,