Skip to content

Commit

Permalink
Initial implementation of a general query result (DM-45429)
Browse files Browse the repository at this point in the history
Registry method `queryDatasetAssociations` is reimplemented (for both
direct and remote butler) to use new query system and new general query
result class.
  • Loading branch information
andy-slac committed Aug 5, 2024
1 parent a2130ee commit 90aadc1
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 76 deletions.
21 changes: 4 additions & 17 deletions python/lsst/daf/butler/direct_butler/_direct_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2195,32 +2195,19 @@ def dimensions(self) -> DimensionUniverse:
# Docstring inherited.
return self._registry.dimensions

@contextlib.contextmanager
def _query(self) -> Iterator[Query]:
def _query(self) -> contextlib.AbstractContextManager[Query]:
# Docstring inherited.
with self._query_driver(self._registry.defaults.collections, self.registry.defaults.dataId) as driver:
yield Query(driver)
return self._registry._query()

@contextlib.contextmanager
def _query_driver(
self,
default_collections: Iterable[str],
default_data_id: DataCoordinate,
) -> Iterator[DirectQueryDriver]:
) -> contextlib.AbstractContextManager[DirectQueryDriver]:
"""Set up a QueryDriver instance for use with this Butler. Although
this is marked as a private method, it is also used by Butler server.
"""
with self._caching_context():
driver = DirectQueryDriver(
self._registry._db,
self.dimensions,
self._registry._managers,
self._registry.dimension_record_cache,
default_collections=default_collections,
default_data_id=default_data_id,
)
with driver:
yield driver
return self._registry._query_driver(default_collections, default_data_id)

def _preload_cache(self) -> None:
"""Immediately load caches that are used for common operations."""
Expand Down
3 changes: 3 additions & 0 deletions python/lsst/daf/butler/direct_query_driver/_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
DataCoordinateResultPageConverter,
DatasetRefResultPageConverter,
DimensionRecordResultPageConverter,
GeneralResultPageConverter,
ResultPageConverter,
ResultPageConverterContext,
)
Expand Down Expand Up @@ -271,6 +272,8 @@ def _create_result_page_converter(self, spec: ResultSpec, builder: QueryBuilder)
return DatasetRefResultPageConverter(
spec, self.get_dataset_type(spec.dataset_type_name), context
)
case GeneralResultSpec():
return GeneralResultPageConverter(spec, context)
case _:
raise NotImplementedError(f"Result type '{spec.result_type}' not yet implemented")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from abc import abstractmethod
from collections.abc import Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

import sqlalchemy

Expand All @@ -50,9 +50,16 @@
DataCoordinateResultPage,
DatasetRefResultPage,
DimensionRecordResultPage,
GeneralResultPage,
ResultPage,
)
from ..queries.result_specs import DataCoordinateResultSpec, DatasetRefResultSpec, DimensionRecordResultSpec
from ..queries.result_specs import (
DataCoordinateResultSpec,
DatasetRefResultSpec,
DimensionRecordResultSpec,
GeneralResultSpec,
)
from ..timespan_database_representation import TimespanDatabaseRepresentation

if TYPE_CHECKING:
from ..registry.interfaces import Database
Expand Down Expand Up @@ -310,3 +317,81 @@ def convert(self, row: sqlalchemy.Row) -> dict[str, DimensionRecord]: # numpydo
the dimensions in the database row.
"""
return {name: converter.convert(row) for name, converter in self._record_converters.items()}


class GeneralResultPageConverter(ResultPageConverter): # numpydoc ignore=PR01
"""Converts raw SQL rows into pages of `GeneralResult` query results."""

def __init__(self, spec: GeneralResultSpec, ctx: ResultPageConverterContext) -> None:
self.spec = spec

result_columns = spec.get_result_columns()
self.converters: list[_GeneralColumnConverter] = []
for column in result_columns:
column_name = qt.ColumnSet.get_qualified_name(column.logical_table, column.field)
if column.field == TimespanDatabaseRepresentation.NAME:
self.converters.append(_TimespanGeneralColumnConverter(column_name, ctx.db))
else:
self.converters.append(_DefaultGeneralColumnConverter(column_name))

def convert(self, raw_rows: Iterable[sqlalchemy.Row]) -> GeneralResultPage:
rows = [tuple(cvt.convert(row) for cvt in self.converters) for row in raw_rows]
return GeneralResultPage(spec=self.spec, rows=rows)


class _GeneralColumnConverter:
"""Interface for converting one or more columns in a result row to a single
column value in output row.
"""

@abstractmethod
def convert(self, row: sqlalchemy.Row) -> Any:
"""Convert one or more columns in the row into single value.
Parameters
----------
row : `sqlalchemy.Row`
Row of values.
Returns
-------
value : `Any`
Result of the conversion.
"""
raise NotImplementedError()


class _DefaultGeneralColumnConverter(_GeneralColumnConverter):
"""Converter that returns column value without conversion.
Parameters
----------
name : `str`
Column name
"""

def __init__(self, name: str):
self.name = name

def convert(self, row: sqlalchemy.Row) -> Any:
return row._mapping[self.name]


class _TimespanGeneralColumnConverter(_GeneralColumnConverter):
"""Converter that extracts timespan from the row.
Parameters
----------
name : `str`
Column name or prefix.
db : `Database`
Database instance.
"""

def __init__(self, name: str, db: Database):
self.timespan_class = db.getTimespanRepresentation()
self.name = name

def convert(self, row: sqlalchemy.Row) -> Any:
timespan = self.timespan_class.extract(row._mapping, self.name)
return timespan
1 change: 1 addition & 0 deletions python/lsst/daf/butler/queries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@
from ._data_coordinate_query_results import *
from ._dataset_query_results import *
from ._dimension_record_query_results import *
from ._general_query_results import *
from ._query import *
125 changes: 125 additions & 0 deletions python/lsst/daf/butler/queries/_general_query_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# This file is part of daf_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ("GeneralQueryResults",)

from collections.abc import Iterator
from typing import Any, final

from .._dataset_ref import DatasetRef
from .._dataset_type import DatasetType
from ..dimensions import DataCoordinate, DimensionGroup
from ._base import QueryResultsBase
from .driver import QueryDriver
from .result_specs import GeneralResultSpec
from .tree import QueryTree, ResultColumn


@final
class GeneralQueryResults(QueryResultsBase):
"""A query for `DatasetRef` results with a single dataset type.
Parameters
----------
driver : `QueryDriver`
Implementation object that knows how to actually execute queries.
tree : `QueryTree`
Description of the query as a tree of joins and column expressions. The
instance returned directly by the `Butler._query` entry point should be
constructed via `make_unit_query_tree`.
spec : `GeneralResultSpec`
Specification of the query result rows, including output columns,
ordering, and slicing.
Notes
-----
This class should never be constructed directly by users; use `Query`
methods instead.
"""

def __init__(self, driver: QueryDriver, tree: QueryTree, spec: GeneralResultSpec):
spec.validate_tree(tree)
super().__init__(driver, tree)
self._spec = spec

def __iter__(self) -> Iterator[dict[ResultColumn, Any]]:
"""Iterate over result rows.
Yields
------
row_dict : `dict` [`ResultColumn`, `Any`]
Result row as dictionary, the keys are `ResultColumn` instances.
"""
for page in self._driver.execute(self._spec, self._tree):
columns = tuple(page.spec.get_result_columns())
for row in page.rows:
yield dict(zip(columns, row))

def iter_refs(self, dataset_type: DatasetType) -> Iterator[tuple[DatasetRef, dict[ResultColumn, Any]]]:
"""Iterate over result rows and return DatasetRef constructed from each
row and an original row.
Parameters
----------
dataset_type : `DatasetType`
Type of the dataset to return.
Yields
------
dataset_ref : `DatasetRef`
Dataset reference.
row_dict : `dict` [`ResultColumn`, `Any`]
Result row as dictionary, the keys are `ResultColumn` instances.
"""
dimensions = dataset_type.dimensions
id_key = ResultColumn(logical_table=dataset_type.name, field="dataset_id")
run_key = ResultColumn(logical_table=dataset_type.name, field="run")
data_id_keys = [ResultColumn(logical_table=element, field=None) for element in dimensions.required]
for row in self:
values = tuple(row[key] for key in data_id_keys)
data_id = DataCoordinate.from_required_values(dimensions, values)
ref = DatasetRef(dataset_type, data_id, row[run_key], id=row[id_key])
yield ref, row

@property
def dimensions(self) -> DimensionGroup:
# Docstring inherited
return self._spec.dimensions

Check warning on line 113 in python/lsst/daf/butler/queries/_general_query_results.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/queries/_general_query_results.py#L113

Added line #L113 was not covered by tests

def count(self, *, exact: bool = True, discard: bool = False) -> int:
# Docstring inherited.
return self._driver.count(self._tree, self._spec, exact=exact, discard=discard)

Check warning on line 117 in python/lsst/daf/butler/queries/_general_query_results.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/queries/_general_query_results.py#L117

Added line #L117 was not covered by tests

def _copy(self, tree: QueryTree, **kwargs: Any) -> GeneralQueryResults:
# Docstring inherited.
return GeneralQueryResults(self._driver, tree, self._spec.model_copy(update=kwargs))

Check warning on line 121 in python/lsst/daf/butler/queries/_general_query_results.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/queries/_general_query_results.py#L121

Added line #L121 was not covered by tests

def _get_datasets(self) -> frozenset[str]:
# Docstring inherited.
return frozenset(self._spec.dataset_fields)

Check warning on line 125 in python/lsst/daf/butler/queries/_general_query_results.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/queries/_general_query_results.py#L125

Added line #L125 was not covered by tests
40 changes: 39 additions & 1 deletion python/lsst/daf/butler/queries/_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@
from ._data_coordinate_query_results import DataCoordinateQueryResults
from ._dataset_query_results import DatasetRefQueryResults
from ._dimension_record_query_results import DimensionRecordQueryResults
from ._general_query_results import GeneralQueryResults
from .convert_args import convert_where_args
from .driver import QueryDriver
from .expression_factory import ExpressionFactory
from .result_specs import DataCoordinateResultSpec, DatasetRefResultSpec, DimensionRecordResultSpec
from .result_specs import (
DataCoordinateResultSpec,
DatasetRefResultSpec,
DimensionRecordResultSpec,
GeneralResultSpec,
)
from .tree import DatasetSearch, Predicate, QueryTree, make_identity_query_tree


Expand Down Expand Up @@ -287,6 +293,38 @@ def dimension_records(self, element: str) -> DimensionRecordQueryResults:
result_spec = DimensionRecordResultSpec(element=self._driver.universe[element])
return DimensionRecordQueryResults(self._driver, tree, result_spec)

def dataset_associations(
self,
dataset_type: DatasetType,
collections: Iterable[str],
) -> GeneralQueryResults:
"""Iterate over dataset-collection combinations where the dataset is in
the collection.
Parameters
----------
dataset_type : `DatasetType`
A dataset type object.
collections : `~collections.abc.Iterable` [`str`]
Names of the collections to search. Chained collections are
ignored.
Returns
-------
result : `GeneralQueryResults`
Query result that can be iterated over. The result includes all
columns needed to construct `DatasetRef`, plus ``collection`` and
``timespan`` columns.
"""
_, _, query = self._join_dataset_search_impl(dataset_type, collections)
result_spec = GeneralResultSpec(
dimensions=dataset_type.dimensions,
dimension_fields={},
dataset_fields={dataset_type.name: {"dataset_id", "run", "collection", "timespan"}},
find_first=False,
)
return GeneralQueryResults(self._driver, tree=query._tree, spec=result_spec)

def materialize(
self,
*,
Expand Down
3 changes: 2 additions & 1 deletion python/lsst/daf/butler/queries/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ class GeneralResultPage:

spec: GeneralResultSpec

# Raw tabular data, with columns in the same order as spec.columns.
# Raw tabular data, with columns in the same order as
# spec.get_result_columns().
rows: list[tuple[Any, ...]]


Expand Down
Loading

0 comments on commit 90aadc1

Please sign in to comment.