Skip to content

Commit

Permalink
moving _generate_queued_event_where_clause to assets.py
Browse files Browse the repository at this point in the history
  • Loading branch information
amoghrajesh committed Nov 15, 2024
1 parent 9a7574c commit c1a7eed
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
24 changes: 23 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,40 @@
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.utils import format_datetime
from airflow.api_fastapi.core_api.datamodels.assets import (
AssetCollectionResponse,
AssetEventCollectionResponse,
AssetEventResponse,
AssetResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models.asset import AssetEvent, AssetModel
from airflow.models.asset import AssetDagRunQueue, AssetEvent, AssetModel

assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")


def _generate_queued_event_where_clause(
*,
dag_id: str | None = None,
uri: str | None = None,
before: str | None = None,
) -> list:
"""Get AssetDagRunQueue where clause."""
where_clause = []
if dag_id is not None:
where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
if uri is not None:
where_clause.append(
AssetDagRunQueue.asset_id.in_(
select(AssetModel.id).where(AssetModel.uri == uri),
),
)
if before is not None:
where_clause.append(AssetDagRunQueue.created_at < format_datetime(before))
return where_clause


@assets_router.get(
"/",
responses=create_openapi_http_exception_doc([401, 403, 404]),
Expand Down
23 changes: 1 addition & 22 deletions airflow/api_fastapi/core_api/routes/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.common.utils import format_datetime
from airflow.api_fastapi.core_api.datamodels.dags import (
DAGCollectionResponse,
DAGDetailsResponse,
Expand All @@ -55,34 +54,14 @@
QueuedEventResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.routes.public.assets import _generate_queued_event_where_clause
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models import DAG, DagModel, DagTag
from airflow.models.asset import AssetDagRunQueue, AssetModel

dags_router = AirflowRouter(tags=["DAG"], prefix="/dags")


def _generate_queued_event_where_clause(
*,
dag_id: str | None = None,
uri: str | None = None,
before: str | None = None,
) -> list:
"""Get AssetDagRunQueue where clause."""
where_clause = []
if dag_id is not None:
where_clause.append(AssetDagRunQueue.target_dag_id == dag_id)
if uri is not None:
where_clause.append(
AssetDagRunQueue.asset_id.in_(
select(AssetModel.id).where(AssetModel.uri == uri),
),
)
if before is not None:
where_clause.append(AssetDagRunQueue.created_at < format_datetime(before))
return where_clause


@dags_router.get("/")
def get_dags(
limit: QueryLimit,
Expand Down

0 comments on commit c1a7eed

Please sign in to comment.