Skip to content

Commit

Permalink
Rewrite command-lines to filter dataset types by collection summary
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Aug 19, 2024
1 parent fdee788 commit 1ebd5e5
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 22 deletions.
12 changes: 12 additions & 0 deletions python/lsst/daf/butler/_butler_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,15 @@ def x_remove(self, name: str) -> None:
be deleted or redefined first.
"""
raise NotImplementedError()

def _filter_dataset_types(
self, dataset_types: Iterable[str], collections: Iterable[CollectionInfo]
) -> Iterable[str]:
dataset_types_set = set(dataset_types)
collection_dataset_types: set[str] = set()
for info in collections:
if info.dataset_types is None:
raise RuntimeError("Can only filter by collections if include_summary was True")
collection_dataset_types.update(info.dataset_types)
dataset_types_set = dataset_types_set.intersection(collection_dataset_types)
return dataset_types_set
13 changes: 11 additions & 2 deletions python/lsst/daf/butler/script/queryDataIds.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,17 @@ def queryDataIds(
if datasets:
# Need to constrain results based on dataset type and collection.
query_collections = collections or "*"

expanded_collections = butler.collections.x_query(query_collections)
collections_info = butler.collections.x_query_info(query_collections, include_summary=True)
expanded_collections = [info.name for info in collections_info]
dataset_types = list(
butler.collections._filter_dataset_types([dt.name for dt in dataset_types], collections_info)
)
if not dataset_types:
return (
None,
f"No datasets of type {datasets!r} existed in the specified "
f"collections {','.join(expanded_collections)}.",
)

sub_query = query.join_dataset_search(dataset_types.pop(0), collections=expanded_collections)
for dt in dataset_types:
Expand Down
31 changes: 20 additions & 11 deletions python/lsst/daf/butler/script/queryDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from __future__ import annotations

import dataclasses
import logging
from collections import defaultdict
from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING
Expand All @@ -42,6 +43,9 @@
from lsst.resources import ResourcePath


_LOG = logging.getLogger(__name__)


@dataclasses.dataclass(frozen=True)
class _RefInfo:
datasetRef: DatasetRef
Expand Down Expand Up @@ -204,6 +208,7 @@ def getTables(self) -> list[AstropyTable]:

return [table.getAstropyTable(datasetTypeName) for datasetTypeName, table in tables.items()]

# @profile
def getDatasets(self) -> Iterator[DatasetRef]:
"""Get the datasets as a list.
Expand All @@ -219,23 +224,27 @@ def getDatasets(self) -> Iterator[DatasetRef]:
# dataset types and loop over the dataset types executing a new
# query each time.
dataset_types: set[str] = {d.name for d in self.butler.registry.queryDatasetTypes(datasetTypes)}
n_dataset_types = len(dataset_types)
with self.butler._query() as query:
# If there is more than one dataset type being requested include
# the summary information so we can pre-filter. With only one
# let query.datasets work it out.
include_summary = True if len(dataset_types) > 1 else False
# Expand the collections query and include summary information.
query_collections_info = self.butler.collections.x_query_info(
query_collections, include_summary=include_summary
query_collections, include_summary=True
)
query_collections = [c.name for c in query_collections_info]

# Only iterate over dataset types that are relevant for the query.
if include_summary:
collection_dataset_types: set[str] = set()
for info in query_collections_info:
assert info.dataset_types is not None # For mypy.
collection_dataset_types.update(info.dataset_types)
dataset_types = dataset_types.intersection(collection_dataset_types)
dataset_types = self.butler.collections._filter_dataset_types(
dataset_types, query_collections_info
)

if (n_filtered := len(dataset_types)) != n_dataset_types:
_LOG.info("Filtered %d dataset types down to %d", n_dataset_types, n_filtered)
elif n_dataset_types == 0:
_LOG.info("The given dataset type, %s, is not known to this butler.", datasetTypes)
else:
_LOG.info(
"Processing %d dataset type%s", n_dataset_types, "" if n_dataset_types == 1 else "s"
)

# Accumulate over dataset types.
for dt in sorted(dataset_types):
Expand Down
9 changes: 7 additions & 2 deletions python/lsst/daf/butler/script/queryDimensionRecords.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,13 @@ def queryDimensionRecords(

if datasets:
query_collections = collections or "*"
expanded_collections = butler.collections.x_query(query_collections)
dataset_types = list(butler.registry.queryDatasetTypes(datasets))
collections_info = butler.collections.x_query_info(query_collections, include_summary=True)
expanded_collections = [info.name for info in collections_info]
dataset_types = [dt.name for dt in butler.registry.queryDatasetTypes(datasets)]
dataset_types = list(butler.collections._filter_dataset_types(dataset_types, collections_info))

if not dataset_types:
return None

sub_query = query.join_dataset_search(dataset_types.pop(0), collections=expanded_collections)
for dt in dataset_types:
Expand Down
8 changes: 5 additions & 3 deletions python/lsst/daf/butler/script/removeRuns.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@ def _getCollectionInfo(
butler = Butler.from_config(repo)
try:
collections = butler.collections.x_query_info(
collection, CollectionType.RUN, include_chains=False, include_parents=True
collection, CollectionType.RUN, include_chains=False, include_parents=True, include_summary=True
)
except MissingCollectionError:
# Act as if no collections matched.
collections = []
dataset_types = butler.registry.queryDatasetTypes(...)
dataset_types = [dt.name for dt in butler.registry.queryDatasetTypes(...)]
dataset_types = butler.collections._filter_dataset_types(dataset_types, collections)

runs = []
datasets: dict[str, int] = defaultdict(int)
for collection_info in collections:
Expand All @@ -104,7 +106,7 @@ def _getCollectionInfo(
results = query.datasets(dt, collections=collection_info.name)
count = results.count(exact=False)
if count:
datasets[dt.name] += count
datasets[dt] += count

return runs, {k: datasets[k] for k in sorted(datasets.keys())}

Expand Down
6 changes: 4 additions & 2 deletions python/lsst/daf/butler/script/retrieveArtifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ def retrieveArtifacts(

# Need to store in list so we can count the number to give some feedback
# to caller.
dataset_types = butler.registry.queryDatasetTypes(query_types)
dataset_types = [dt.name for dt in butler.registry.queryDatasetTypes(query_types)]
refs: list[DatasetRef] = []
with butler._query() as query:
expanded_collections = butler.collections.x_query(query_collections)
collections_info = butler.collections.x_query_info(query_collections, include_summary=True)
expanded_collections = [info.name for info in collections_info]
dataset_types = list(butler.collections._filter_dataset_types(dataset_types, collections_info))
for dt in dataset_types:
results = query.datasets(dt, collections=expanded_collections, find_first=find_first)
if where:
Expand Down
8 changes: 6 additions & 2 deletions python/lsst/daf/butler/script/transferDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ def transferDatasets(
dataset_type_expr = dataset_type or ...
collections_expr: tuple[str, ...] = collections or ("*",)

dataset_types = source_butler.registry.queryDatasetTypes(dataset_type_expr)
dataset_types = [dt.name for dt in source_butler.registry.queryDatasetTypes(dataset_type_expr)]
source_refs: list[DatasetRef] = []
with source_butler._query() as query:
query_collections = source_butler.collections.x_query(collections_expr)
query_collections_info = source_butler.collections.x_query_info(
collections_expr, include_summary=True
)
query_collections = [info.name for info in query_collections_info]
dataset_types = source_butler.collections._filter_dataset_types(dataset_types, query_collections_info)
# Loop over dataset types and accumulate.
for dt in dataset_types:
results = query.datasets(dt, collections=query_collections, find_first=find_first)
Expand Down

0 comments on commit 1ebd5e5

Please sign in to comment.