From 2f34be4d7405dd0067606a668773c00a8cd52063 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 16 Jul 2023 01:26:34 +0200 Subject: [PATCH 1/4] Improve the query count getting in Airflow API --- airflow/api_connexion/endpoints/dag_endpoint.py | 8 ++++---- airflow/api_connexion/endpoints/dag_run_endpoint.py | 12 +++++++++--- .../api_connexion/endpoints/dag_warning_endpoint.py | 6 +++--- airflow/api_connexion/endpoints/dataset_endpoint.py | 4 ++-- .../endpoints/task_instance_endpoint.py | 13 ++++++------- airflow/api_connexion/endpoints/xcom_endpoint.py | 6 +++--- airflow/api_connexion/parameters.py | 9 ++++++++- airflow/www/views.py | 13 ++++++------- 8 files changed, 41 insertions(+), 30 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index 55eb971c08b0..6119d68be494 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -22,14 +22,14 @@ from connexion import NoContent from flask import g, request from marshmallow import ValidationError -from sqlalchemy import func, select, update +from sqlalchemy import select, update from sqlalchemy.orm import Session from sqlalchemy.sql.expression import or_ from airflow import DAG from airflow.api_connexion import security from airflow.api_connexion.exceptions import AlreadyExists, BadRequest, NotFound -from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters +from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters, get_query_count from airflow.api_connexion.schemas.dag_schema import ( DAGCollection, dag_detail_schema, @@ -99,7 +99,7 @@ def get_dags( cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags] dags_query = dags_query.where(or_(*cond)) - total_entries = session.scalar(select(func.count()).select_from(dags_query)) + total_entries = get_query_count(dags_query, session=session) dags_query = apply_sorting(dags_query, order_by, {}, allowed_attrs) dags = session.scalars(dags_query.offset(offset).limit(limit)).all() @@ -159,7 +159,7 @@ def patch_dags(limit, session, offset=0, only_active=True, tags=None, dag_id_pat cond = [DagModel.tags.any(DagTag.name == tag) for tag in tags] dags_query = dags_query.where(or_(*cond)) - total_entries = session.scalar(select(func.count()).select_from(dags_query)) + total_entries = get_query_count(dags_query, session=session) dags = session.scalars(dags_query.order_by(DagModel.dag_id).offset(offset).limit(limit)).all() diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index f62b28273f7b..cd27dc0f8cf7 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -23,7 +23,7 @@ from flask import g from flask_login import current_user from marshmallow import ValidationError -from sqlalchemy import delete, func, or_, select +from sqlalchemy import delete, or_, select from sqlalchemy.orm import Session from sqlalchemy.sql import Select @@ -35,7 +35,13 @@ from airflow.api_connexion import security from airflow.api_connexion.endpoints.request_dict import get_json_request_dict from airflow.api_connexion.exceptions import AlreadyExists, BadRequest, NotFound -from airflow.api_connexion.parameters import apply_sorting, check_limit, format_datetime, format_parameters +from airflow.api_connexion.parameters import ( + apply_sorting, + check_limit, + format_datetime, + format_parameters, + get_query_count, +) from airflow.api_connexion.schemas.dag_run_schema import ( DAGRunCollection, clear_dagrun_form_schema, @@ -166,7 +172,7 @@ def _fetch_dag_runs( if updated_at_lte: query = query.where(DagRun.updated_at <= updated_at_lte) - total_entries = session.scalar(select(func.count()).select_from(query)) + total_entries = get_query_count(query, session=session) to_replace = {"dag_run_id": "run_id"} allowed_filter_attrs = [ "id", diff --git a/airflow/api_connexion/endpoints/dag_warning_endpoint.py b/airflow/api_connexion/endpoints/dag_warning_endpoint.py index 66aa1184a192..dd5f8dd94805 100644 --- a/airflow/api_connexion/endpoints/dag_warning_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_warning_endpoint.py @@ -16,11 +16,11 @@ # under the License. from __future__ import annotations -from sqlalchemy import func, select +from sqlalchemy import select from sqlalchemy.orm import Session from airflow.api_connexion import security -from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters +from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters, get_query_count from airflow.api_connexion.schemas.dag_warning_schema import ( DagWarningCollection, dag_warning_collection_schema, @@ -54,7 +54,7 @@ def get_dag_warnings( query = query.where(DagWarningModel.dag_id == dag_id) if warning_type: query = query.where(DagWarningModel.warning_type == warning_type) - total_entries = session.scalar(select(func.count()).select_from(query)) + total_entries = get_query_count(query, session=session) query = apply_sorting(query=query, order_by=order_by, allowed_attrs=allowed_filter_attrs) dag_warnings = session.scalars(query.offset(offset).limit(limit)).all() return dag_warning_collection_schema.dump( diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py b/airflow/api_connexion/endpoints/dataset_endpoint.py index 9f4fa443b99d..62c15634fb14 100644 --- a/airflow/api_connexion/endpoints/dataset_endpoint.py +++ b/airflow/api_connexion/endpoints/dataset_endpoint.py @@ -21,7 +21,7 @@ from airflow.api_connexion import security from airflow.api_connexion.exceptions import NotFound -from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters +from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters, get_query_count from airflow.api_connexion.schemas.dataset_schema import ( DatasetCollection, DatasetEventCollection, @@ -112,7 +112,7 @@ def get_dataset_events( query = query.options(subqueryload(DatasetEvent.created_dagruns)) - total_entries = session.scalar(select(func.count()).select_from(query)) + total_entries = get_query_count(query, session=session) query = apply_sorting(query, order_by, {}, allowed_attrs) events = session.scalars(query.offset(offset).limit(limit)).all() return dataset_event_collection_schema.dump( diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 2496433a26d7..6ddc0f59d0f3 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -19,7 +19,7 @@ from typing import Any, Iterable, TypeVar from marshmallow import ValidationError -from sqlalchemy import and_, func, or_, select +from sqlalchemy import and_, or_, select from sqlalchemy.exc import MultipleResultsFound from sqlalchemy.orm import Session, joinedload from sqlalchemy.sql import ClauseElement, Select @@ -27,7 +27,7 @@ from airflow.api_connexion import security from airflow.api_connexion.endpoints.request_dict import get_json_request_dict from airflow.api_connexion.exceptions import BadRequest, NotFound -from airflow.api_connexion.parameters import format_datetime, format_parameters +from airflow.api_connexion.parameters import format_datetime, format_parameters, get_query_count from airflow.api_connexion.schemas.task_instance_schema import ( TaskInstanceCollection, TaskInstanceReferenceCollection, @@ -196,7 +196,7 @@ def get_mapped_task_instances( ) # 0 can mean a mapped TI that expanded to an empty list, so it is not an automatic 404 - unfiltered_total_count = session.execute(select(func.count("*")).select_from(base_query)).scalar() + unfiltered_total_count = get_query_count(base_query, session=session) if unfiltered_total_count == 0: dag = get_airflow_app().dag_bag.get_dag(dag_id) if not dag: @@ -229,7 +229,7 @@ def get_mapped_task_instances( base_query = _apply_array_filter(base_query, key=TI.queue, values=queue) # Count elements before joining extra columns - total_entries = session.execute(select(func.count("*")).select_from(base_query)).scalar() + total_entries = get_query_count(base_query, session=session) # Add SLA miss entry_query = ( @@ -355,8 +355,7 @@ def get_task_instances( base_query = _apply_array_filter(base_query, key=TI.queue, values=queue) # Count elements before joining extra columns - count_query = select(func.count("*")).select_from(base_query) - total_entries = session.execute(count_query).scalar() + total_entries = get_query_count(base_query, session=session) # Add join entry_query = ( @@ -420,7 +419,7 @@ def get_task_instances_batch(session: Session = NEW_SESSION) -> APIResponse: base_query = _apply_array_filter(base_query, key=TI.queue, values=data["queue"]) # Count elements before joining extra columns - total_entries = session.execute(select(func.count("*")).select_from(base_query)).scalar() + total_entries = get_query_count(base_query, session=session) # Add join base_query = base_query.join( SlaMiss, diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py index d95a3d15fce6..708eae57b232 100644 --- a/airflow/api_connexion/endpoints/xcom_endpoint.py +++ b/airflow/api_connexion/endpoints/xcom_endpoint.py @@ -19,12 +19,12 @@ import copy from flask import g -from sqlalchemy import and_, func, select +from sqlalchemy import and_, select from sqlalchemy.orm import Session from airflow.api_connexion import security from airflow.api_connexion.exceptions import BadRequest, NotFound -from airflow.api_connexion.parameters import check_limit, format_parameters +from airflow.api_connexion.parameters import check_limit, format_parameters, get_query_count from airflow.api_connexion.schemas.xcom_schema import XComCollection, xcom_collection_schema, xcom_schema from airflow.api_connexion.types import APIResponse from airflow.models import DagRun as DR, XCom @@ -75,7 +75,7 @@ def get_xcom_entries( if xcom_key is not None: query = query.where(XCom.key == xcom_key) query = query.order_by(DR.execution_date, XCom.task_id, XCom.dag_id, XCom.key) - total_entries = session.execute(select(func.count()).select_from(query)).scalar() + total_entries = get_query_count(query, session=session) query = session.scalars(query.offset(offset).limit(limit)) return xcom_collection_schema.dump(XComCollection(xcom_entries=query, total_entries=total_entries)) diff --git a/airflow/api_connexion/parameters.py b/airflow/api_connexion/parameters.py index f4f55cfecd37..0c4e1b91af13 100644 --- a/airflow/api_connexion/parameters.py +++ b/airflow/api_connexion/parameters.py @@ -21,8 +21,9 @@ from functools import wraps from typing import Any, Callable, Container, TypeVar, cast +import sqlalchemy.orm from pendulum.parsing import ParserError -from sqlalchemy import text +from sqlalchemy import func, select, text from sqlalchemy.sql import Select from airflow.api_connexion.exceptions import BadRequest @@ -125,3 +126,9 @@ def apply_sorting( else: order_by = f"{lstriped_orderby} asc" return query.order_by(text(order_by)) + + +def get_query_count(query_stmt: sqlalchemy.sql.selectable.Select, session: sqlalchemy.orm.Session) -> int: + """Get count of query.""" + count_stmt = select(func.count()).select_from(query_stmt.order_by(None).subquery()) + return session.scalar(count_stmt) diff --git a/airflow/www/views.py b/airflow/www/views.py index 85375a518b3f..6a9648b1cb8b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -83,6 +83,7 @@ set_dag_run_state_to_success, set_state, ) +from airflow.api_connexion.parameters import get_query_count from airflow.configuration import AIRFLOW_CONFIG, auth_manager, conf from airflow.datasets import Dataset from airflow.exceptions import ( @@ -759,7 +760,7 @@ def index(self): dags_query = dags_query.where(DagModel.tags.any(DagTag.name.in_(arg_tags_filter))) dags_query = dags_query.where(DagModel.dag_id.in_(filter_dag_ids)) - filtered_dag_count = session.scalar(select(func.count()).select_from(dags_query)) + filtered_dag_count = get_query_count(dags_query, session=session) if filtered_dag_count == 0 and len(arg_tags_filter): flash( "No matching DAG tags found.", @@ -811,8 +812,8 @@ def index(self): status_count_active = is_paused_count.get(False, 0) status_count_paused = is_paused_count.get(True, 0) - status_count_running = session.scalar(select(func.count()).select_from(running_dags)) - status_count_failed = session.scalar(select(func.count()).select_from(failed_dags)) + status_count_running = get_query_count(running_dags, session=session) + status_count_failed = get_query_count(failed_dags, session=session) all_dags_count = status_count_active + status_count_paused if arg_status_filter == "active": @@ -951,9 +952,7 @@ def _iter_parsed_moved_data_table_names(): .where(Log.event == "robots") .where(Log.dttm > (utcnow() - datetime.timedelta(days=7))) ) - robots_file_access_count = session.scalar( - select(func.count()).select_from(robots_file_access_count) - ) + robots_file_access_count = get_query_count(robots_file_access_count, session=session) if robots_file_access_count > 0: flash( Markup( @@ -4171,7 +4170,7 @@ def audit_log(self, dag_id: str, session: Session = NEW_SESSION): arg_sorting_direction = request.args.get("sorting_direction", default="desc") logs_per_page = PAGE_SIZE - audit_logs_count = session.scalar(select(func.count()).select_from(query)) + audit_logs_count = get_query_count(query, session=session) num_of_pages = int(math.ceil(audit_logs_count / float(logs_per_page))) start = current_page * logs_per_page From 6dc40170918ba2106dd2f6f8a8e44957d7aa46e5 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 16 Jul 2023 11:54:37 +0200 Subject: [PATCH 2/4] Move get_query_count to utils.db --- airflow/api_connexion/endpoints/dag_endpoint.py | 3 ++- airflow/api_connexion/endpoints/dag_run_endpoint.py | 2 +- airflow/api_connexion/endpoints/dag_warning_endpoint.py | 3 ++- airflow/api_connexion/endpoints/dataset_endpoint.py | 3 ++- .../api_connexion/endpoints/task_instance_endpoint.py | 3 ++- airflow/api_connexion/endpoints/xcom_endpoint.py | 3 ++- airflow/api_connexion/parameters.py | 9 +-------- airflow/utils/db.py | 7 +++++++ airflow/www/views.py | 2 +- 9 files changed, 20 insertions(+), 15 deletions(-) diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index 6119d68be494..a301699b380d 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -29,7 +29,7 @@ from airflow import DAG from airflow.api_connexion import security from airflow.api_connexion.exceptions import AlreadyExists, BadRequest, NotFound -from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters, get_query_count +from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters from airflow.api_connexion.schemas.dag_schema import ( DAGCollection, dag_detail_schema, @@ -41,6 +41,7 @@ from airflow.models.dag import DagModel, DagTag from airflow.security import permissions from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index cd27dc0f8cf7..490923c6c381 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -40,7 +40,6 @@ check_limit, format_datetime, format_parameters, - get_query_count, ) from airflow.api_connexion.schemas.dag_run_schema import ( DAGRunCollection, @@ -63,6 +62,7 @@ from airflow.models import DagModel, DagRun from airflow.security import permissions from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.db import get_query_count from airflow.utils.log.action_logger import action_event_from_permission from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import DagRunState diff --git a/airflow/api_connexion/endpoints/dag_warning_endpoint.py b/airflow/api_connexion/endpoints/dag_warning_endpoint.py index dd5f8dd94805..1031d532a00f 100644 --- a/airflow/api_connexion/endpoints/dag_warning_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_warning_endpoint.py @@ -20,7 +20,7 @@ from sqlalchemy.orm import Session from airflow.api_connexion import security -from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters, get_query_count +from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters from airflow.api_connexion.schemas.dag_warning_schema import ( DagWarningCollection, dag_warning_collection_schema, @@ -28,6 +28,7 @@ from airflow.api_connexion.types import APIResponse from airflow.models.dagwarning import DagWarning as DagWarningModel from airflow.security import permissions +from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py b/airflow/api_connexion/endpoints/dataset_endpoint.py index 62c15634fb14..71012b59a2e7 100644 --- a/airflow/api_connexion/endpoints/dataset_endpoint.py +++ b/airflow/api_connexion/endpoints/dataset_endpoint.py @@ -21,7 +21,7 @@ from airflow.api_connexion import security from airflow.api_connexion.exceptions import NotFound -from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters, get_query_count +from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters from airflow.api_connexion.schemas.dataset_schema import ( DatasetCollection, DatasetEventCollection, @@ -32,6 +32,7 @@ from airflow.api_connexion.types import APIResponse from airflow.models.dataset import DatasetEvent, DatasetModel from airflow.security import permissions +from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 6ddc0f59d0f3..55db7ef8b937 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -27,7 +27,7 @@ from airflow.api_connexion import security from airflow.api_connexion.endpoints.request_dict import get_json_request_dict from airflow.api_connexion.exceptions import BadRequest, NotFound -from airflow.api_connexion.parameters import format_datetime, format_parameters, get_query_count +from airflow.api_connexion.parameters import format_datetime, format_parameters from airflow.api_connexion.schemas.task_instance_schema import ( TaskInstanceCollection, TaskInstanceReferenceCollection, @@ -48,6 +48,7 @@ from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances from airflow.security import permissions from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import DagRunState, TaskInstanceState diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py index 708eae57b232..2ea4db79f7aa 100644 --- a/airflow/api_connexion/endpoints/xcom_endpoint.py +++ b/airflow/api_connexion/endpoints/xcom_endpoint.py @@ -24,13 +24,14 @@ from airflow.api_connexion import security from airflow.api_connexion.exceptions import BadRequest, NotFound -from airflow.api_connexion.parameters import check_limit, format_parameters, get_query_count +from airflow.api_connexion.parameters import check_limit, format_parameters from airflow.api_connexion.schemas.xcom_schema import XComCollection, xcom_collection_schema, xcom_schema from airflow.api_connexion.types import APIResponse from airflow.models import DagRun as DR, XCom from airflow.security import permissions from airflow.settings import conf from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session diff --git a/airflow/api_connexion/parameters.py b/airflow/api_connexion/parameters.py index 0c4e1b91af13..f4f55cfecd37 100644 --- a/airflow/api_connexion/parameters.py +++ b/airflow/api_connexion/parameters.py @@ -21,9 +21,8 @@ from functools import wraps from typing import Any, Callable, Container, TypeVar, cast -import sqlalchemy.orm from pendulum.parsing import ParserError -from sqlalchemy import func, select, text +from sqlalchemy import text from sqlalchemy.sql import Select from airflow.api_connexion.exceptions import BadRequest @@ -126,9 +125,3 @@ def apply_sorting( else: order_by = f"{lstriped_orderby} asc" return query.order_by(text(order_by)) - - -def get_query_count(query_stmt: sqlalchemy.sql.selectable.Select, session: sqlalchemy.orm.Session) -> int: - """Get count of query.""" - count_stmt = select(func.count()).select_from(query_stmt.order_by(None).subquery()) - return session.scalar(count_stmt) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index eb4c863dc3e3..eb94792aa509 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -45,6 +45,7 @@ from alembic.runtime.environment import EnvironmentContext from alembic.script import ScriptDirectory from sqlalchemy.orm import Query, Session + from sqlalchemy.sql.selectable import Select from airflow.models.base import Base from airflow.models.connection import Connection @@ -1872,3 +1873,9 @@ def get_sqla_model_classes(): return [mapper.class_ for mapper in Base.registry.mappers] except AttributeError: return Base._decl_class_registry.values() + + +def get_query_count(query_stmt: Select, session: Session) -> int: + """Get count of query.""" + count_stmt = select(func.count()).select_from(query_stmt.order_by(None).subquery()) + return session.scalar(count_stmt) diff --git a/airflow/www/views.py b/airflow/www/views.py index 6a9648b1cb8b..d175b7be5079 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -83,7 +83,6 @@ set_dag_run_state_to_success, set_state, ) -from airflow.api_connexion.parameters import get_query_count from airflow.configuration import AIRFLOW_CONFIG, auth_manager, conf from airflow.datasets import Dataset from airflow.exceptions import ( @@ -118,6 +117,7 @@ from airflow.utils.airflow_flask_app import get_airflow_app from airflow.utils.dag_edges import dag_edges from airflow.utils.dates import infer_time_unit, scale_time_units +from airflow.utils.db import get_query_count from airflow.utils.docs import get_doc_url_for_provider, get_docs_url from airflow.utils.helpers import alchemy_to_dict, exactly_one from airflow.utils.log import secrets_masker From 3f54d437763c677af2e45b8b717e1139c8529223 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Mon, 17 Jul 2023 22:15:01 +0200 Subject: [PATCH 3/4] Add a comment to explain why we reset order_by --- airflow/utils/db.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index eb94792aa509..0bddb78b3811 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1877,5 +1877,7 @@ def get_sqla_model_classes(): def get_query_count(query_stmt: Select, session: Session) -> int: """Get count of query.""" + # Remove ORDER BY clause from the subquery statement since it's unnecessary for count + # in order to improve the query performance. count_stmt = select(func.count()).select_from(query_stmt.order_by(None).subquery()) return session.scalar(count_stmt) From dd172b6abd3ee0c47124db96e0b877de2574c89f Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Wed, 19 Jul 2023 00:24:50 +0200 Subject: [PATCH 4/4] Update airflow/utils/db.py Co-authored-by: Tzu-ping Chung --- airflow/utils/db.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 0bddb78b3811..862b7dfc7fd5 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1876,8 +1876,14 @@ def get_sqla_model_classes(): def get_query_count(query_stmt: Select, session: Session) -> int: - """Get count of query.""" - # Remove ORDER BY clause from the subquery statement since it's unnecessary for count - # in order to improve the query performance. + """Get count of query. + + A SELECT COUNT() FROM is issued against the subquery built from the + given statement. The ORDER BY clause is stripped from the statement + since it's unnecessary for COUNT, and can impact query planning and + degrade performance. + + :meta private: + """ count_stmt = select(func.count()).select_from(query_stmt.order_by(None).subquery()) return session.scalar(count_stmt)