Skip to content

Commit

Permalink
Rework dataset manager and dataset type caching.
Browse files Browse the repository at this point in the history
The dataset type cache now holds just the dataset type definition and
dataset ID by name, with the SQLAlchemy table objects instead cached
by DimensionGroup so they can be shared by multiple dataset types.

DatasetRecordStorage has been removed (both the base class and its
sole implementation) - its methods had already been moved to
DatasetRecordStorageManager, and it no longer works as the opaque
thing to put in the cache.  Instead there's a new subpackage-private
DynamicTables class that is cached by DimensionGroup (this is where
the lazy loading of SQLAlchemy table objects now happens), and a
module-private _DatasetRecordStorage struct that just combines that
with the dataset type and its ID, to make it more convenient to pass
all of that around.

I also threw in some changes to the insert/import implementations
because I started trying to reduce the degree that
DatasetRecordStorage was being passed things that were either totally
unused or assumed (without checking) have some value.  I quickly
realized that this problem is ubiquitous (especially with storage
classes) and should be a separate ticket, but I've kept what I already
did since I think it's a step in the right direction.
  • Loading branch information
TallJimbo committed Oct 16, 2024
1 parent 781bb32 commit 9a12c91
Show file tree
Hide file tree
Showing 9 changed files with 653 additions and 505 deletions.
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/direct_query_driver/_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ def explain_no_results(self, tree: qt.QueryTree, execute: bool) -> Iterable[str]

def get_dataset_type(self, name: str) -> DatasetType:
# Docstring inherited
return self.managers.datasets[name].datasetType
return self.managers.datasets.get_dataset_type(name)

def get_default_collections(self) -> tuple[str, ...]:
# Docstring inherited.
Expand Down
23 changes: 16 additions & 7 deletions python/lsst/daf/butler/registry/_caching_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@

from __future__ import annotations

__all__ = ["CachingContext"]
__all__ = ["CachingContext", "GenericCachingContext"]

from typing import TYPE_CHECKING
from typing import Generic, TypeAlias, TypeVar

from ._collection_record_cache import CollectionRecordCache
from ._collection_summary_cache import CollectionSummaryCache
from ._dataset_type_cache import DatasetTypeCache

if TYPE_CHECKING:
from .interfaces import DatasetRecordStorage
_T = TypeVar("_T")
_U = TypeVar("_U")


class CachingContext:
class GenericCachingContext(Generic[_T, _U]):
"""Collection of caches for various types of records retrieved from
database.
Expand All @@ -54,10 +54,16 @@ class is passed to the relevant managers that can use it to query or
Dataset type cache is always enabled for now, this avoids the need for
explicitly enabling caching in pipetask executors.
`GenericCachingContext` is generic over two kinds of opaque dataset type
data, with the expectation that most code will use the ``CachingContext``
type alias (which resolves to `GenericCachingContext[object, object]`);
the `DatasetRecordStorageManager` can then cast this to a
`GenericCachingContext` with the actual opaque data types it uses.
"""

def __init__(self) -> None:
self._dataset_types: DatasetTypeCache[DatasetRecordStorage] = DatasetTypeCache()
self._dataset_types: DatasetTypeCache[_T, _U] = DatasetTypeCache()
self._collection_records: CollectionRecordCache | None = None
self._collection_summaries: CollectionSummaryCache | None = None
self._depth = 0
Expand Down Expand Up @@ -103,6 +109,9 @@ def collection_summaries(self) -> CollectionSummaryCache | None:
return self._collection_summaries

@property
def dataset_types(self) -> DatasetTypeCache[DatasetRecordStorage]:
def dataset_types(self) -> DatasetTypeCache[_T, _U]:
"""Cache for dataset types, never disabled (`DatasetTypeCache`)."""
return self._dataset_types


CachingContext: TypeAlias = GenericCachingContext[object, object]
133 changes: 107 additions & 26 deletions python/lsst/daf/butler/registry/_dataset_type_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,69 +33,98 @@
from typing import Generic, TypeVar

from .._dataset_type import DatasetType
from ..dimensions import DimensionGroup

_T = TypeVar("_T")
_U = TypeVar("_U")


class DatasetTypeCache(Generic[_T]):
class DatasetTypeCache(Generic[_T, _U]):
"""Cache for dataset types.
Notes
-----
This class caches mapping of dataset type name to a corresponding
`DatasetType` instance. Registry manager also needs to cache corresponding
"storage" instance, so this class allows storing additional opaque object
along with the dataset type.
This cache is a pair of mappings with different kinds of keys:
In come contexts (e.g. ``resolve_wildcard``) a full list of dataset types
- the `DatasetType` itself is cached by name, as is some opaque data used
only by a `DatasetRecordStorageManager` implementation;
- additional opaque data (also used only by `DatasetRecordStorageManager`
implementations can be cached by the dimensions dataset types (i.e. a
`DimensionGroup`).
`DatasetTypeCache` is generic over these two opaque data types.
In some contexts (e.g. ``resolve_wildcard``) a full list of dataset types
is needed. To signify that cache content can be used in such contexts,
cache defines special ``full`` flag that needs to be set by client.
cache defines a special ``full`` flag that needs to be set by client. The
``dimensions_full`` flag similarly reports whether all per-dimension-group
state is present in the cache.
"""

def __init__(self) -> None:
self._cache: dict[str, tuple[DatasetType, _T | None]] = {}
self._by_name_cache: dict[str, tuple[DatasetType, _T]] = {}
self._by_dimensions_cache: dict[DimensionGroup, _U] = {}
self._full = False
self._dimensions_full = False

@property
def full(self) -> bool:
"""`True` if cache holds all known dataset types (`bool`)."""
return self._full

def add(self, dataset_type: DatasetType, extra: _T | None = None) -> None:
@property
def dimensions_full(self) -> bool:
"""`True` if cache holds all known dataset type dimensions (`bool`)."""
return self._dimensions_full

def add(self, dataset_type: DatasetType, extra: _T) -> None:
"""Add one record to the cache.
Parameters
----------
dataset_type : `DatasetType`
Dataset type, replaces any existing dataset type with the same
name.
extra : `Any`, optional
extra : `Any`
Additional opaque object stored with this dataset type.
"""
self._cache[dataset_type.name] = (dataset_type, extra)

def set(self, data: Iterable[DatasetType | tuple[DatasetType, _T | None]], *, full: bool = False) -> None:
self._by_name_cache[dataset_type.name] = (dataset_type, extra)

def set(
self,
data: Iterable[tuple[DatasetType, _T]],
*,
full: bool = False,
dimensions_data: Iterable[tuple[DimensionGroup, _U]] | None = None,
dimensions_full: bool = False,
) -> None:
"""Replace cache contents with the new set of dataset types.
Parameters
----------
data : `~collections.abc.Iterable`
Sequence of `DatasetType` instances or tuples of `DatasetType` and
an extra opaque object.
full : `bool`
Sequence of tuples of `DatasetType` and an extra opaque object.
full : `bool`, optional
If `True` then ``data`` contains all known dataset types.
dimensions_data : `~collections.abc.Iterable`, optional
Sequence of tuples of `DimensionGroup` and an extra opaque object.
dimensions_full : `bool`, optional
If `True` then ``data`` contains all known dataset type dimensions.
"""
self.clear()
for item in data:
if isinstance(item, DatasetType):
item = (item, None)
self._cache[item[0].name] = item
self._by_name_cache[item[0].name] = item
self._full = full
if dimensions_data is not None:
self._by_dimensions_cache.update(dimensions_data)
self._dimensions_full = dimensions_full

def clear(self) -> None:
"""Remove everything from the cache."""
self._cache = {}
self._by_name_cache = {}
self._by_dimensions_cache = {}
self._full = False
self._dimensions_full = False

def discard(self, name: str) -> None:
"""Remove named dataset type from the cache.
Expand All @@ -105,7 +134,7 @@ def discard(self, name: str) -> None:
name : `str`
Name of the dataset type to remove.
"""
self._cache.pop(name, None)
self._by_name_cache.pop(name, None)

def get(self, name: str) -> tuple[DatasetType | None, _T | None]:
"""Return cached info given dataset type name.
Expand All @@ -122,9 +151,9 @@ def get(self, name: str) -> tuple[DatasetType | None, _T | None]:
cache.
extra : `Any` or `None`
Cached opaque data, `None` is returned if the name is not in the
cache or no extra info was stored for this dataset type.
cache.
"""
item = self._cache.get(name)
item = self._by_name_cache.get(name)
if item is None:
return (None, None)
return item
Expand All @@ -143,20 +172,72 @@ def get_dataset_type(self, name: str) -> DatasetType | None:
Cached dataset type, `None` is returned if the name is not in the
cache.
"""
item = self._cache.get(name)
item = self._by_name_cache.get(name)
if item is None:
return None
return item[0]

def items(self) -> Iterator[tuple[DatasetType, _T | None]]:
def items(self) -> Iterator[tuple[DatasetType, _T]]:
"""Return iterator for the set of items in the cache, can only be
used if `full` is true.
Returns
-------
iter : `~collections.abc.Iterator`
Iterator over tuples of `DatasetType` and opaque data.
Raises
------
RuntimeError
Raised if ``self.full`` is `False`.
"""
if not self._full:
raise RuntimeError("cannot call items() if cache is not full")
return iter(self._cache.values())
return iter(self._by_name_cache.values())

def add_by_dimensions(self, dimensions: DimensionGroup, extra: _U) -> None:
"""Add information about a set of dataset type dimensions to the cache.
Parameters
----------
dimensions : `DimensionGroup`
Dimensions of one or more dataset types.
extra : `Any`
Additional opaque object stored with these dimensions.
"""
self._by_dimensions_cache[dimensions] = extra

def get_by_dimensions(self, dimensions: DimensionGroup) -> _U | None:
"""Get information about a set of dataset type dimensions.
Parameters
----------
dimensions : `DimensionGroup`
Dimensions of one or more dataset types.
Returns
-------
extra : `Any` or `None`
Additional opaque object stored with these dimensions, or `None` if
these dimensions are not present in the cache.
"""
return self._by_dimensions_cache.get(dimensions)

def by_dimensions_items(self) -> Iterator[tuple[DimensionGroup, _U]]:
"""Return iterator for all dimensions-keyed data in the cache.
This can only be called if `dimensions_full` is `True`.
Returns
-------
iter : `~collections.abc.Iterator`
Iterator over tuples of `DimensionGroup` and opaque data.
Raises
------
RuntimeError
Raised if ``self.dimensions_full`` is `False`.
"""
if not self._dimensions_full:
raise RuntimeError("cannot call by_dimensions_items() if cache does not have full dimensions.")
return iter(self._by_dimensions_cache.items())
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,3 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from ._manager import *
from ._storage import *
Loading

0 comments on commit 9a12c91

Please sign in to comment.