Skip to content

Commit

Permalink
Merge pull request #1102 from lsst/u/jbosch/DM-46799/fixed
Browse files Browse the repository at this point in the history
DM-46799: Reapply after revert with bugfix.
  • Loading branch information
TallJimbo authored Oct 18, 2024
2 parents bde5552 + e0205d6 commit 0ccfb04
Show file tree
Hide file tree
Showing 21 changed files with 2,045 additions and 1,957 deletions.
10 changes: 4 additions & 6 deletions python/lsst/daf/butler/datastores/fileDatastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,10 @@ def setConfigRoot(cls, root: str, config: Config, full: Config, overwrite: bool
)

@classmethod
def makeTableSpec(cls, datasetIdColumnType: type) -> ddl.TableSpec:
def makeTableSpec(cls) -> ddl.TableSpec:
return ddl.TableSpec(
fields=[
ddl.FieldSpec(name="dataset_id", dtype=datasetIdColumnType, primaryKey=True),
ddl.FieldSpec(name="dataset_id", dtype=ddl.GUID, primaryKey=True),
ddl.FieldSpec(name="path", dtype=String, length=256, nullable=False),
ddl.FieldSpec(name="formatter", dtype=String, length=128, nullable=False),
ddl.FieldSpec(name="storage_class", dtype=String, length=64, nullable=False),
Expand Down Expand Up @@ -274,9 +274,7 @@ def __init__(
self._opaque_table_name = self.config["records", "table"]
try:
# Storage of paths and formatters, keyed by dataset_id
self._table = bridgeManager.opaque.register(
self._opaque_table_name, self.makeTableSpec(bridgeManager.datasetIdColumnType)
)
self._table = bridgeManager.opaque.register(self._opaque_table_name, self.makeTableSpec())
# Interface to Registry.
self._bridge = bridgeManager.register(self.name)
except ReadOnlyDatabaseError:
Expand Down Expand Up @@ -2855,7 +2853,7 @@ def _cast_storage_class(self, ref: DatasetRef) -> DatasetRef:

def get_opaque_table_definitions(self) -> Mapping[str, DatastoreOpaqueTable]:
# Docstring inherited from the base class.
return {self._opaque_table_name: DatastoreOpaqueTable(self.makeTableSpec(ddl.GUID), StoredFileInfo)}
return {self._opaque_table_name: DatastoreOpaqueTable(self.makeTableSpec(), StoredFileInfo)}


def _to_file_info_payload(
Expand Down
9 changes: 6 additions & 3 deletions python/lsst/daf/butler/direct_query_driver/_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ def explain_no_results(self, tree: qt.QueryTree, execute: bool) -> Iterable[str]

def get_dataset_type(self, name: str) -> DatasetType:
# Docstring inherited
return self.managers.datasets[name].datasetType
return self.managers.datasets.get_dataset_type(name)

def get_default_collections(self) -> tuple[str, ...]:
# Docstring inherited.
Expand Down Expand Up @@ -1133,7 +1133,6 @@ def _join_dataset_search(
fields : `~collections.abc.Set` [ `str` ]
Dataset fields to include.
"""
storage = self.managers.datasets[resolved_search.name]
# The next two asserts will need to be dropped (and the implications
# dealt with instead) if materializations start having dataset fields.
assert (
Expand All @@ -1142,7 +1141,11 @@ def _join_dataset_search(
assert (
resolved_search.name not in joiner.timespans
), "Dataset timespan has unexpectedly already been joined in."
joiner.join(storage.make_query_joiner(resolved_search.collection_records, fields))
joiner.join(
self.managers.datasets.make_query_joiner(
self.get_dataset_type(resolved_search.name), resolved_search.collection_records, fields
)
)


@dataclasses.dataclass
Expand Down
6 changes: 3 additions & 3 deletions python/lsst/daf/butler/queries/_general_query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,19 @@ def iter_tuples(self, *dataset_types: DatasetType) -> Iterator[GeneralResultTupl
Structure containing data coordinate, refs, and a copy of the row.
"""
all_dimensions = self._spec.dimensions
dataset_keys: list[tuple[DimensionGroup, str, str]] = []
dataset_keys: list[tuple[DatasetType, DimensionGroup, str, str]] = []
for dataset_type in dataset_types:
dimensions = dataset_type.dimensions
id_key = f"{dataset_type.name}.dataset_id"
run_key = f"{dataset_type.name}.run"
dataset_keys.append((dimensions, id_key, run_key))
dataset_keys.append((dataset_type, dimensions, id_key, run_key))
for row in self:
values = tuple(
row[key] for key in itertools.chain(all_dimensions.required, all_dimensions.implied)
)
data_coordinate = DataCoordinate.from_full_values(all_dimensions, values)
refs = []
for dimensions, id_key, run_key in dataset_keys:
for dataset_type, dimensions, id_key, run_key in dataset_keys:
values = tuple(row[key] for key in itertools.chain(dimensions.required, dimensions.implied))
data_id = DataCoordinate.from_full_values(dimensions, values)
refs.append(DatasetRef(dataset_type, data_id, row[run_key], id=row[id_key]))
Expand Down
5 changes: 5 additions & 0 deletions python/lsst/daf/butler/queries/result_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ class GeneralResultSpec(ResultSpecBase):
def find_first_dataset(self) -> str | None:
# Docstring inherited.
if self.find_first:
if len(self.dataset_fields) != 1:
raise InvalidQueryError(
"General query with find_first=True cannot have results from multiple "
"dataset searches."
)
(dataset_type,) = self.dataset_fields.keys()
return dataset_type
return None
Expand Down
7 changes: 1 addition & 6 deletions python/lsst/daf/butler/queries/tree/_query_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def join_materialization(self, key: MaterializationKey, dimensions: DimensionGro
)

def join_dataset(self, dataset_type: str, search: DatasetSearch) -> QueryTree:
"""Return a new tree joins in a search for a dataset.
"""Return a new tree that joins in a search for a dataset.
Parameters
----------
Expand All @@ -231,11 +231,6 @@ def join_dataset(self, dataset_type: str, search: DatasetSearch) -> QueryTree:
-------
result : `QueryTree`
A new tree that joins in the dataset search.
Notes
-----
If this dataset type was already joined in, the new `DatasetSearch`
replaces the old one.
"""
if existing := self.datasets.get(dataset_type):
assert existing == search, "Dataset search should be new or the same."
Expand Down
23 changes: 16 additions & 7 deletions python/lsst/daf/butler/registry/_caching_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@

from __future__ import annotations

__all__ = ["CachingContext"]
__all__ = ["CachingContext", "GenericCachingContext"]

from typing import TYPE_CHECKING
from typing import Generic, TypeAlias, TypeVar

from ._collection_record_cache import CollectionRecordCache
from ._collection_summary_cache import CollectionSummaryCache
from ._dataset_type_cache import DatasetTypeCache

if TYPE_CHECKING:
from .interfaces import DatasetRecordStorage
_T = TypeVar("_T")
_U = TypeVar("_U")


class CachingContext:
class GenericCachingContext(Generic[_T, _U]):
"""Collection of caches for various types of records retrieved from
database.
Expand All @@ -54,10 +54,16 @@ class is passed to the relevant managers that can use it to query or
Dataset type cache is always enabled for now, this avoids the need for
explicitly enabling caching in pipetask executors.
`GenericCachingContext` is generic over two kinds of opaque dataset type
data, with the expectation that most code will use the ``CachingContext``
type alias (which resolves to `GenericCachingContext[object, object]`);
the `DatasetRecordStorageManager` can then cast this to a
`GenericCachingContext` with the actual opaque data types it uses.
"""

def __init__(self) -> None:
self._dataset_types: DatasetTypeCache[DatasetRecordStorage] = DatasetTypeCache()
self._dataset_types: DatasetTypeCache[_T, _U] = DatasetTypeCache()
self._collection_records: CollectionRecordCache | None = None
self._collection_summaries: CollectionSummaryCache | None = None
self._depth = 0
Expand Down Expand Up @@ -103,6 +109,9 @@ def collection_summaries(self) -> CollectionSummaryCache | None:
return self._collection_summaries

@property
def dataset_types(self) -> DatasetTypeCache[DatasetRecordStorage]:
def dataset_types(self) -> DatasetTypeCache[_T, _U]:
"""Cache for dataset types, never disabled (`DatasetTypeCache`)."""
return self._dataset_types


CachingContext: TypeAlias = GenericCachingContext[object, object]
133 changes: 107 additions & 26 deletions python/lsst/daf/butler/registry/_dataset_type_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,69 +33,98 @@
from typing import Generic, TypeVar

from .._dataset_type import DatasetType
from ..dimensions import DimensionGroup

_T = TypeVar("_T")
_U = TypeVar("_U")


class DatasetTypeCache(Generic[_T]):
class DatasetTypeCache(Generic[_T, _U]):
"""Cache for dataset types.
Notes
-----
This class caches mapping of dataset type name to a corresponding
`DatasetType` instance. Registry manager also needs to cache corresponding
"storage" instance, so this class allows storing additional opaque object
along with the dataset type.
This cache is a pair of mappings with different kinds of keys:
In come contexts (e.g. ``resolve_wildcard``) a full list of dataset types
- the `DatasetType` itself is cached by name, as is some opaque data used
only by a `DatasetRecordStorageManager` implementation;
- additional opaque data (also used only by `DatasetRecordStorageManager`
implementations can be cached by the dimensions dataset types (i.e. a
`DimensionGroup`).
`DatasetTypeCache` is generic over these two opaque data types.
In some contexts (e.g. ``resolve_wildcard``) a full list of dataset types
is needed. To signify that cache content can be used in such contexts,
cache defines special ``full`` flag that needs to be set by client.
cache defines a special ``full`` flag that needs to be set by client. The
``dimensions_full`` flag similarly reports whether all per-dimension-group
state is present in the cache.
"""

def __init__(self) -> None:
self._cache: dict[str, tuple[DatasetType, _T | None]] = {}
self._by_name_cache: dict[str, tuple[DatasetType, _T]] = {}
self._by_dimensions_cache: dict[DimensionGroup, _U] = {}
self._full = False
self._dimensions_full = False

@property
def full(self) -> bool:
"""`True` if cache holds all known dataset types (`bool`)."""
return self._full

def add(self, dataset_type: DatasetType, extra: _T | None = None) -> None:
@property
def dimensions_full(self) -> bool:
"""`True` if cache holds all known dataset type dimensions (`bool`)."""
return self._dimensions_full

def add(self, dataset_type: DatasetType, extra: _T) -> None:
"""Add one record to the cache.
Parameters
----------
dataset_type : `DatasetType`
Dataset type, replaces any existing dataset type with the same
name.
extra : `Any`, optional
extra : `Any`
Additional opaque object stored with this dataset type.
"""
self._cache[dataset_type.name] = (dataset_type, extra)

def set(self, data: Iterable[DatasetType | tuple[DatasetType, _T | None]], *, full: bool = False) -> None:
self._by_name_cache[dataset_type.name] = (dataset_type, extra)

def set(
self,
data: Iterable[tuple[DatasetType, _T]],
*,
full: bool = False,
dimensions_data: Iterable[tuple[DimensionGroup, _U]] | None = None,
dimensions_full: bool = False,
) -> None:
"""Replace cache contents with the new set of dataset types.
Parameters
----------
data : `~collections.abc.Iterable`
Sequence of `DatasetType` instances or tuples of `DatasetType` and
an extra opaque object.
full : `bool`
Sequence of tuples of `DatasetType` and an extra opaque object.
full : `bool`, optional
If `True` then ``data`` contains all known dataset types.
dimensions_data : `~collections.abc.Iterable`, optional
Sequence of tuples of `DimensionGroup` and an extra opaque object.
dimensions_full : `bool`, optional
If `True` then ``data`` contains all known dataset type dimensions.
"""
self.clear()
for item in data:
if isinstance(item, DatasetType):
item = (item, None)
self._cache[item[0].name] = item
self._by_name_cache[item[0].name] = item
self._full = full
if dimensions_data is not None:
self._by_dimensions_cache.update(dimensions_data)
self._dimensions_full = dimensions_full

def clear(self) -> None:
"""Remove everything from the cache."""
self._cache = {}
self._by_name_cache = {}
self._by_dimensions_cache = {}
self._full = False
self._dimensions_full = False

def discard(self, name: str) -> None:
"""Remove named dataset type from the cache.
Expand All @@ -105,7 +134,7 @@ def discard(self, name: str) -> None:
name : `str`
Name of the dataset type to remove.
"""
self._cache.pop(name, None)
self._by_name_cache.pop(name, None)

def get(self, name: str) -> tuple[DatasetType | None, _T | None]:
"""Return cached info given dataset type name.
Expand All @@ -122,9 +151,9 @@ def get(self, name: str) -> tuple[DatasetType | None, _T | None]:
cache.
extra : `Any` or `None`
Cached opaque data, `None` is returned if the name is not in the
cache or no extra info was stored for this dataset type.
cache.
"""
item = self._cache.get(name)
item = self._by_name_cache.get(name)
if item is None:
return (None, None)
return item
Expand All @@ -143,20 +172,72 @@ def get_dataset_type(self, name: str) -> DatasetType | None:
Cached dataset type, `None` is returned if the name is not in the
cache.
"""
item = self._cache.get(name)
item = self._by_name_cache.get(name)
if item is None:
return None
return item[0]

def items(self) -> Iterator[tuple[DatasetType, _T | None]]:
def items(self) -> Iterator[tuple[DatasetType, _T]]:
"""Return iterator for the set of items in the cache, can only be
used if `full` is true.
Returns
-------
iter : `~collections.abc.Iterator`
Iterator over tuples of `DatasetType` and opaque data.
Raises
------
RuntimeError
Raised if ``self.full`` is `False`.
"""
if not self._full:
raise RuntimeError("cannot call items() if cache is not full")
return iter(self._cache.values())
return iter(self._by_name_cache.values())

def add_by_dimensions(self, dimensions: DimensionGroup, extra: _U) -> None:
"""Add information about a set of dataset type dimensions to the cache.
Parameters
----------
dimensions : `DimensionGroup`
Dimensions of one or more dataset types.
extra : `Any`
Additional opaque object stored with these dimensions.
"""
self._by_dimensions_cache[dimensions] = extra

def get_by_dimensions(self, dimensions: DimensionGroup) -> _U | None:
"""Get information about a set of dataset type dimensions.
Parameters
----------
dimensions : `DimensionGroup`
Dimensions of one or more dataset types.
Returns
-------
extra : `Any` or `None`
Additional opaque object stored with these dimensions, or `None` if
these dimensions are not present in the cache.
"""
return self._by_dimensions_cache.get(dimensions)

def by_dimensions_items(self) -> Iterator[tuple[DimensionGroup, _U]]:
"""Return iterator for all dimensions-keyed data in the cache.
This can only be called if `dimensions_full` is `True`.
Returns
-------
iter : `~collections.abc.Iterator`
Iterator over tuples of `DimensionGroup` and opaque data.
Raises
------
RuntimeError
Raised if ``self.dimensions_full`` is `False`.
"""
if not self._dimensions_full:
raise RuntimeError("cannot call by_dimensions_items() if cache does not have full dimensions.")
return iter(self._by_dimensions_cache.items())
Loading

0 comments on commit 0ccfb04

Please sign in to comment.