From 9a12c91cec3e2e7b5a3d6a671b81e81de256549f Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 27 Sep 2024 13:50:26 -0400 Subject: [PATCH] Rework dataset manager and dataset type caching. The dataset type cache now holds just the dataset type definition and dataset ID by name, with the SQLAlchemy table objects instead cached by DimensionGroup so they can be shared by multiple dataset types. DatasetRecordStorage has been removed (both the base class and its sole implementation) - its methods had already been moved to DatasetRecordStorageManager, and it no longer works as the opaque thing to put in the cache. Instead there's a new subpackage-private DynamicTables class that is cached by DimensionGroup (this is where the lazy loading of SQLAlchemy table objects now happens), and a module-private _DatasetRecordStorage struct that just combines that with the dataset type and its ID, to make it more convenient to pass all of that around. I also threw in some changes to the insert/import implementations because I started trying to reduce the degree that DatasetRecordStorage was being passed things that were either totally unused or assumed (without checking) have some value. I quickly realized that this problem is ubiquitous (especially with storage classes) and should be a separate ticket, but I've kept what I already did since I think it's a step in the right direction. --- .../daf/butler/direct_query_driver/_driver.py | 2 +- .../daf/butler/registry/_caching_context.py | 23 +- .../butler/registry/_dataset_type_cache.py | 133 ++++- .../datasets/byDimensions/__init__.py | 1 - .../datasets/byDimensions/_manager.py | 525 ++++++++++-------- .../datasets/byDimensions/_storage.py | 115 ---- .../registry/datasets/byDimensions/tables.py | 154 ++++- .../butler/registry/interfaces/_datasets.py | 158 +++--- .../lsst/daf/butler/registry/sql_registry.py | 47 +- 9 files changed, 653 insertions(+), 505 deletions(-) delete mode 100644 python/lsst/daf/butler/registry/datasets/byDimensions/_storage.py diff --git a/python/lsst/daf/butler/direct_query_driver/_driver.py b/python/lsst/daf/butler/direct_query_driver/_driver.py index 5d5e21da62..069f43e77b 100644 --- a/python/lsst/daf/butler/direct_query_driver/_driver.py +++ b/python/lsst/daf/butler/direct_query_driver/_driver.py @@ -422,7 +422,7 @@ def explain_no_results(self, tree: qt.QueryTree, execute: bool) -> Iterable[str] def get_dataset_type(self, name: str) -> DatasetType: # Docstring inherited - return self.managers.datasets[name].datasetType + return self.managers.datasets.get_dataset_type(name) def get_default_collections(self) -> tuple[str, ...]: # Docstring inherited. diff --git a/python/lsst/daf/butler/registry/_caching_context.py b/python/lsst/daf/butler/registry/_caching_context.py index 9a8461a312..81362df826 100644 --- a/python/lsst/daf/butler/registry/_caching_context.py +++ b/python/lsst/daf/butler/registry/_caching_context.py @@ -27,19 +27,19 @@ from __future__ import annotations -__all__ = ["CachingContext"] +__all__ = ["CachingContext", "GenericCachingContext"] -from typing import TYPE_CHECKING +from typing import Generic, TypeAlias, TypeVar from ._collection_record_cache import CollectionRecordCache from ._collection_summary_cache import CollectionSummaryCache from ._dataset_type_cache import DatasetTypeCache -if TYPE_CHECKING: - from .interfaces import DatasetRecordStorage +_T = TypeVar("_T") +_U = TypeVar("_U") -class CachingContext: +class GenericCachingContext(Generic[_T, _U]): """Collection of caches for various types of records retrieved from database. @@ -54,10 +54,16 @@ class is passed to the relevant managers that can use it to query or Dataset type cache is always enabled for now, this avoids the need for explicitly enabling caching in pipetask executors. + + `GenericCachingContext` is generic over two kinds of opaque dataset type + data, with the expectation that most code will use the ``CachingContext`` + type alias (which resolves to `GenericCachingContext[object, object]`); + the `DatasetRecordStorageManager` can then cast this to a + `GenericCachingContext` with the actual opaque data types it uses. """ def __init__(self) -> None: - self._dataset_types: DatasetTypeCache[DatasetRecordStorage] = DatasetTypeCache() + self._dataset_types: DatasetTypeCache[_T, _U] = DatasetTypeCache() self._collection_records: CollectionRecordCache | None = None self._collection_summaries: CollectionSummaryCache | None = None self._depth = 0 @@ -103,6 +109,9 @@ def collection_summaries(self) -> CollectionSummaryCache | None: return self._collection_summaries @property - def dataset_types(self) -> DatasetTypeCache[DatasetRecordStorage]: + def dataset_types(self) -> DatasetTypeCache[_T, _U]: """Cache for dataset types, never disabled (`DatasetTypeCache`).""" return self._dataset_types + + +CachingContext: TypeAlias = GenericCachingContext[object, object] diff --git a/python/lsst/daf/butler/registry/_dataset_type_cache.py b/python/lsst/daf/butler/registry/_dataset_type_cache.py index 3f1665dfa3..ab43c80ca5 100644 --- a/python/lsst/daf/butler/registry/_dataset_type_cache.py +++ b/python/lsst/daf/butler/registry/_dataset_type_cache.py @@ -33,35 +33,51 @@ from typing import Generic, TypeVar from .._dataset_type import DatasetType +from ..dimensions import DimensionGroup _T = TypeVar("_T") +_U = TypeVar("_U") -class DatasetTypeCache(Generic[_T]): +class DatasetTypeCache(Generic[_T, _U]): """Cache for dataset types. Notes ----- - This class caches mapping of dataset type name to a corresponding - `DatasetType` instance. Registry manager also needs to cache corresponding - "storage" instance, so this class allows storing additional opaque object - along with the dataset type. + This cache is a pair of mappings with different kinds of keys: - In come contexts (e.g. ``resolve_wildcard``) a full list of dataset types + - the `DatasetType` itself is cached by name, as is some opaque data used + only by a `DatasetRecordStorageManager` implementation; + - additional opaque data (also used only by `DatasetRecordStorageManager` + implementations can be cached by the dimensions dataset types (i.e. a + `DimensionGroup`). + + `DatasetTypeCache` is generic over these two opaque data types. + + In some contexts (e.g. ``resolve_wildcard``) a full list of dataset types is needed. To signify that cache content can be used in such contexts, - cache defines special ``full`` flag that needs to be set by client. + cache defines a special ``full`` flag that needs to be set by client. The + ``dimensions_full`` flag similarly reports whether all per-dimension-group + state is present in the cache. """ def __init__(self) -> None: - self._cache: dict[str, tuple[DatasetType, _T | None]] = {} + self._by_name_cache: dict[str, tuple[DatasetType, _T]] = {} + self._by_dimensions_cache: dict[DimensionGroup, _U] = {} self._full = False + self._dimensions_full = False @property def full(self) -> bool: """`True` if cache holds all known dataset types (`bool`).""" return self._full - def add(self, dataset_type: DatasetType, extra: _T | None = None) -> None: + @property + def dimensions_full(self) -> bool: + """`True` if cache holds all known dataset type dimensions (`bool`).""" + return self._dimensions_full + + def add(self, dataset_type: DatasetType, extra: _T) -> None: """Add one record to the cache. Parameters @@ -69,33 +85,46 @@ def add(self, dataset_type: DatasetType, extra: _T | None = None) -> None: dataset_type : `DatasetType` Dataset type, replaces any existing dataset type with the same name. - extra : `Any`, optional + extra : `Any` Additional opaque object stored with this dataset type. """ - self._cache[dataset_type.name] = (dataset_type, extra) - - def set(self, data: Iterable[DatasetType | tuple[DatasetType, _T | None]], *, full: bool = False) -> None: + self._by_name_cache[dataset_type.name] = (dataset_type, extra) + + def set( + self, + data: Iterable[tuple[DatasetType, _T]], + *, + full: bool = False, + dimensions_data: Iterable[tuple[DimensionGroup, _U]] | None = None, + dimensions_full: bool = False, + ) -> None: """Replace cache contents with the new set of dataset types. Parameters ---------- data : `~collections.abc.Iterable` - Sequence of `DatasetType` instances or tuples of `DatasetType` and - an extra opaque object. - full : `bool` + Sequence of tuples of `DatasetType` and an extra opaque object. + full : `bool`, optional If `True` then ``data`` contains all known dataset types. + dimensions_data : `~collections.abc.Iterable`, optional + Sequence of tuples of `DimensionGroup` and an extra opaque object. + dimensions_full : `bool`, optional + If `True` then ``data`` contains all known dataset type dimensions. """ self.clear() for item in data: - if isinstance(item, DatasetType): - item = (item, None) - self._cache[item[0].name] = item + self._by_name_cache[item[0].name] = item self._full = full + if dimensions_data is not None: + self._by_dimensions_cache.update(dimensions_data) + self._dimensions_full = dimensions_full def clear(self) -> None: """Remove everything from the cache.""" - self._cache = {} + self._by_name_cache = {} + self._by_dimensions_cache = {} self._full = False + self._dimensions_full = False def discard(self, name: str) -> None: """Remove named dataset type from the cache. @@ -105,7 +134,7 @@ def discard(self, name: str) -> None: name : `str` Name of the dataset type to remove. """ - self._cache.pop(name, None) + self._by_name_cache.pop(name, None) def get(self, name: str) -> tuple[DatasetType | None, _T | None]: """Return cached info given dataset type name. @@ -122,9 +151,9 @@ def get(self, name: str) -> tuple[DatasetType | None, _T | None]: cache. extra : `Any` or `None` Cached opaque data, `None` is returned if the name is not in the - cache or no extra info was stored for this dataset type. + cache. """ - item = self._cache.get(name) + item = self._by_name_cache.get(name) if item is None: return (None, None) return item @@ -143,15 +172,20 @@ def get_dataset_type(self, name: str) -> DatasetType | None: Cached dataset type, `None` is returned if the name is not in the cache. """ - item = self._cache.get(name) + item = self._by_name_cache.get(name) if item is None: return None return item[0] - def items(self) -> Iterator[tuple[DatasetType, _T | None]]: + def items(self) -> Iterator[tuple[DatasetType, _T]]: """Return iterator for the set of items in the cache, can only be used if `full` is true. + Returns + ------- + iter : `~collections.abc.Iterator` + Iterator over tuples of `DatasetType` and opaque data. + Raises ------ RuntimeError @@ -159,4 +193,51 @@ def items(self) -> Iterator[tuple[DatasetType, _T | None]]: """ if not self._full: raise RuntimeError("cannot call items() if cache is not full") - return iter(self._cache.values()) + return iter(self._by_name_cache.values()) + + def add_by_dimensions(self, dimensions: DimensionGroup, extra: _U) -> None: + """Add information about a set of dataset type dimensions to the cache. + + Parameters + ---------- + dimensions : `DimensionGroup` + Dimensions of one or more dataset types. + extra : `Any` + Additional opaque object stored with these dimensions. + """ + self._by_dimensions_cache[dimensions] = extra + + def get_by_dimensions(self, dimensions: DimensionGroup) -> _U | None: + """Get information about a set of dataset type dimensions. + + Parameters + ---------- + dimensions : `DimensionGroup` + Dimensions of one or more dataset types. + + Returns + ------- + extra : `Any` or `None` + Additional opaque object stored with these dimensions, or `None` if + these dimensions are not present in the cache. + """ + return self._by_dimensions_cache.get(dimensions) + + def by_dimensions_items(self) -> Iterator[tuple[DimensionGroup, _U]]: + """Return iterator for all dimensions-keyed data in the cache. + + This can only be called if `dimensions_full` is `True`. + + Returns + ------- + iter : `~collections.abc.Iterator` + Iterator over tuples of `DimensionGroup` and opaque data. + + Raises + ------ + RuntimeError + Raised if ``self.dimensions_full`` is `False`. + """ + if not self._dimensions_full: + raise RuntimeError("cannot call by_dimensions_items() if cache does not have full dimensions.") + return iter(self._by_dimensions_cache.items()) diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/__init__.py b/python/lsst/daf/butler/registry/datasets/byDimensions/__init__.py index 26fd39b361..3697e36509 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/__init__.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/__init__.py @@ -26,4 +26,3 @@ # along with this program. If not, see . from ._manager import * -from ._storage import * diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py index 0f950daeee..251b4ab799 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/_manager.py @@ -21,27 +21,19 @@ from ...._exceptions import CollectionTypeError, MissingDatasetTypeError from ...._exceptions_legacy import DatasetTypeError from ...._timespan import Timespan -from ....dimensions import DataCoordinate, DimensionUniverse +from ....dimensions import DataCoordinate, DimensionGroup, DimensionUniverse from ....direct_query_driver import QueryBuilder, QueryJoiner # new query system, server+direct only from ....queries import tree as qt # new query system, both clients + server +from ..._caching_context import CachingContext, GenericCachingContext from ..._collection_summary import CollectionSummary from ..._exceptions import ConflictingDefinitionError, DatasetTypeExpressionError, OrphanedRecordError -from ...interfaces import DatasetRecordStorage, DatasetRecordStorageManager, RunRecord, VersionTuple +from ...interfaces import DatasetRecordStorageManager, RunRecord, VersionTuple from ...queries import SqlQueryContext # old registry query system from ...wildcards import DatasetTypeWildcard -from ._storage import ByDimensionsDatasetRecordStorageUUID from .summaries import CollectionSummaryManager -from .tables import ( - addDatasetForeignKey, - makeCalibTableName, - makeCalibTableSpec, - makeStaticTableSpecs, - makeTagTableName, - makeTagTableSpec, -) +from .tables import DynamicTables, addDatasetForeignKey, makeStaticTableSpecs, makeTagTableSpec if TYPE_CHECKING: - from ..._caching_context import CachingContext from ...interfaces import ( CollectionManager, CollectionRecord, @@ -63,36 +55,49 @@ _LOG = logging.getLogger(__name__) -class MissingDatabaseTableError(RuntimeError): - """Exception raised when a table is not found in a database.""" - - @dataclasses.dataclass class _DatasetTypeRecord: """Contents of a single dataset type record.""" dataset_type: DatasetType dataset_type_id: int + dimensions_key: int tag_table_name: str calib_table_name: str | None + def make_dynamic_tables(self) -> DynamicTables: + return DynamicTables( + self.dataset_type.dimensions, self.dimensions_key, self.tag_table_name, self.calib_table_name + ) -class _SpecTableFactory: - """Factory for `sqlalchemy.schema.Table` instances that builds table - instances using provided `ddl.TableSpec` definition and verifies that - table exists in the database. - """ + def update_dynamic_tables(self, current: DynamicTables) -> DynamicTables: + assert self.dimensions_key == current.dimensions_key + assert self.tag_table_name == current.tags_name + if self.calib_table_name is not None: + if current.calibs_name is not None: + assert self.calib_table_name == current.calibs_name + else: + # Some previously-cached dataset type had the same dimensions + # but was not a calibration. + current.calibs_name = self.calib_table_name + # If some previously-cached dataset type was a calibration but this + # one isn't, we don't want to forget the calibs table. + return current - def __init__(self, db: Database, name: str, spec: ddl.TableSpec): - self._db = db - self._name = name - self._spec = spec - def __call__(self) -> sqlalchemy.schema.Table: - table = self._db.getExistingTable(self._name, self._spec) - if table is None: - raise MissingDatabaseTableError(f"Table {self._name} is missing from database schema.") - return table +@dataclasses.dataclass +class _DatasetRecordStorage: + """Information cached about a dataset type. + + This combines information cached with different keys - the dataset type + and its ID are cached by name, while the tables are cached by the dataset + types dimensions (and hence shared with other dataset types that have the + same dimensions). + """ + + dataset_type: DatasetType + dataset_type_id: int + dynamic_tables: DynamicTables class ByDimensionsDatasetRecordStorageManagerUUID(DatasetRecordStorageManager): @@ -150,7 +155,7 @@ def __init__( self._dimensions = dimensions self._static = static self._summaries = summaries - self._caching_context = caching_context + self._caching_context = cast(GenericCachingContext[int, DynamicTables], caching_context) self._use_astropy_ingest_date = self.ingest_date_dtype() is ddl.AstropyTimeNsecTai self._run_key_column = collections.getRunForeignKeyName() @@ -276,32 +281,7 @@ def refresh(self) -> None: if self._caching_context.dataset_types is not None: self._caching_context.dataset_types.clear() - def _make_storage(self, record: _DatasetTypeRecord) -> ByDimensionsDatasetRecordStorageUUID: - """Create storage instance for a dataset type record.""" - tags_spec = makeTagTableSpec(record.dataset_type.dimensions, type(self._collections)) - tags_table_factory = _SpecTableFactory(self._db, record.tag_table_name, tags_spec) - calibs_table_factory = None - if record.calib_table_name is not None: - calibs_spec = makeCalibTableSpec( - record.dataset_type.dimensions, - type(self._collections), - self._db.getTimespanRepresentation(), - ) - calibs_table_factory = _SpecTableFactory(self._db, record.calib_table_name, calibs_spec) - storage = ByDimensionsDatasetRecordStorageUUID( - db=self._db, - datasetType=record.dataset_type, - static=self._static, - summaries=self._summaries, - tags_table_factory=tags_table_factory, - calibs_table_factory=calibs_table_factory, - dataset_type_id=record.dataset_type_id, - collections=self._collections, - use_astropy_ingest_date=self.ingest_date_dtype() is ddl.AstropyTimeNsecTai, - ) - return storage - - def remove(self, name: str) -> None: + def remove_dataset_type(self, name: str) -> None: # Docstring inherited from DatasetRecordStorageManager. compositeName, componentName = DatasetType.splitDatasetTypeName(name) if componentName is not None: @@ -320,87 +300,61 @@ def remove(self, name: str) -> None: # not need to be fast. self.refresh() - def find(self, name: str) -> ByDimensionsDatasetRecordStorageUUID | None: + def get_dataset_type(self, name: str) -> DatasetType: # Docstring inherited from DatasetRecordStorageManager. - if self._caching_context.dataset_types is not None: - _, storage = self._caching_context.dataset_types.get(name) - if storage is not None: - return cast(ByDimensionsDatasetRecordStorageUUID, storage) - else: - # On the first cache miss populate the cache with complete list - # of dataset types (if it was not done yet). - if not self._caching_context.dataset_types.full: - self._fetch_dataset_types() - # Try again - _, storage = self._caching_context.dataset_types.get(name) - if storage is not None: - return cast(ByDimensionsDatasetRecordStorageUUID, storage) - record = self._fetch_dataset_type_record(name) - if record is not None: - storage = self._make_storage(record) - if self._caching_context.dataset_types is not None: - self._caching_context.dataset_types.add(storage.datasetType, storage) - return storage - else: - return None + return self._find_storage(name).dataset_type - def register(self, datasetType: DatasetType) -> bool: + def register_dataset_type(self, dataset_type: DatasetType) -> bool: # Docstring inherited from DatasetRecordStorageManager. - if datasetType.isComponent(): + # + # This is one of three places where we populate the dataset type cache. + # See the comment in _fetch_dataset_types for how these are related and + # invariants they must maintain. + # + if dataset_type.isComponent(): raise ValueError( - f"Component dataset types can not be stored in registry. Rejecting {datasetType.name}" + f"Component dataset types can not be stored in registry. Rejecting {dataset_type.name}" ) - record = self._fetch_dataset_type_record(datasetType.name) + record = self._fetch_dataset_type_record(dataset_type.name) if record is None: - dimensionsKey = self._dimensions.save_dimension_group(datasetType.dimensions) - tagTableName = makeTagTableName(dimensionsKey) - self._db.ensureTableExists( - tagTableName, - makeTagTableSpec(datasetType.dimensions, type(self._collections)), - ) - calibTableName = makeCalibTableName(dimensionsKey) if datasetType.isCalibration() else None - if calibTableName is not None: - self._db.ensureTableExists( - calibTableName, - makeCalibTableSpec( - datasetType.dimensions, - type(self._collections), - self._db.getTimespanRepresentation(), - ), + if ( + dynamic_tables := self._caching_context.dataset_types.get_by_dimensions( + dataset_type.dimensions + ) + ) is None: + dimensions_key = self._dimensions.save_dimension_group(dataset_type.dimensions) + dynamic_tables = DynamicTables.from_dimensions_key( + dataset_type.dimensions, dimensions_key, dataset_type.isCalibration() ) + dynamic_tables.create(self._db, type(self._collections)) row, inserted = self._db.sync( self._static.dataset_type, - keys={"name": datasetType.name}, + keys={"name": dataset_type.name}, compared={ - "dimensions_key": dimensionsKey, + "dimensions_key": dynamic_tables.dimensions_key, # Force the storage class to be loaded to ensure it # exists and there is no typo in the name. - "storage_class": datasetType.storageClass.name, + "storage_class": dataset_type.storageClass.name, }, extra={ - "tag_association_table": tagTableName, - "calibration_association_table": calibTableName, + "tag_association_table": dynamic_tables.tags_name, + "calibration_association_table": ( + dynamic_tables.calibs_name if dataset_type.isCalibration() else None + ), }, returning=["id", "tag_association_table"], ) # Make sure that cache is updated if self._caching_context.dataset_types is not None and row is not None: - record = _DatasetTypeRecord( - dataset_type=datasetType, - dataset_type_id=row["id"], - tag_table_name=tagTableName, - calib_table_name=calibTableName, - ) - storage = self._make_storage(record) - self._caching_context.dataset_types.add(datasetType, storage) + self._caching_context.dataset_types.add(dataset_type, row["id"]) + self._caching_context.dataset_types.add_by_dimensions(dataset_type.dimensions, dynamic_tables) else: - if datasetType != record.dataset_type: + if dataset_type != record.dataset_type: raise ConflictingDefinitionError( - f"Given dataset type {datasetType} is inconsistent " + f"Given dataset type {dataset_type} is inconsistent " f"with database definition {record.dataset_type}." ) inserted = False - return bool(inserted) def resolve_wildcard( @@ -418,8 +372,12 @@ def resolve_wildcard( "Component dataset types are not supported in Registry methods; use DatasetRef or " "DatasetType methods to obtain components from parents instead." ) - if (found_storage := self.find(parent_name)) is not None: - resolved_dataset_type = found_storage.datasetType + try: + resolved_dataset_type = self.get_dataset_type(parent_name) + except MissingDatasetTypeError: + if missing is not None: + missing.append(name) + else: if dataset_type is not None: if dataset_type.is_compatible_with(resolved_dataset_type): # Prefer the given dataset type to enable storage class @@ -431,8 +389,6 @@ def resolve_wildcard( f"not compatible with the registered type {resolved_dataset_type}." ) result.append(resolved_dataset_type) - elif missing is not None: - missing.append(name) if wildcard.patterns is ...: if explicit_only: raise TypeError( @@ -454,6 +410,11 @@ def resolve_wildcard( def getDatasetRef(self, id: DatasetId) -> DatasetRef | None: # Docstring inherited from DatasetRecordStorageManager. + # + # This is one of three places where we populate the dataset type cache. + # See the comment in _fetch_dataset_types for how these are related and + # invariants they must maintain. + # sql = ( sqlalchemy.sql.select( self._static.dataset.columns.dataset_type_id, @@ -470,24 +431,34 @@ def getDatasetRef(self, id: DatasetId) -> DatasetRef | None: return None run = row[self._run_key_column] record = self._record_from_row(row) - storage: DatasetRecordStorage | None = None + dynamic_tables: DynamicTables | None = None if self._caching_context.dataset_types is not None: - _, storage = self._caching_context.dataset_types.get(record.dataset_type.name) - if storage is None: - storage = self._make_storage(record) + _, dataset_type_id = self._caching_context.dataset_types.get(record.dataset_type.name) + if dataset_type_id is None: + if self._caching_context.dataset_types is not None: + self._caching_context.dataset_types.add(record.dataset_type, record.dataset_type_id) + else: + assert record.dataset_type_id == dataset_type_id, "Two IDs for the same dataset type name!" + dynamic_tables = self._caching_context.dataset_types.get_by_dimensions( + record.dataset_type.dimensions + ) + if dynamic_tables is None: + dynamic_tables = record.make_dynamic_tables() if self._caching_context.dataset_types is not None: - self._caching_context.dataset_types.add(record.dataset_type, storage) - assert isinstance(storage, ByDimensionsDatasetRecordStorageUUID) + self._caching_context.dataset_types.add_by_dimensions( + record.dataset_type.dimensions, dynamic_tables + ) if record.dataset_type.dimensions: # This query could return multiple rows (one for each tagged # collection the dataset is in, plus one for its run collection), # and we don't care which of those we get. + tags_table = dynamic_tables.tags(self._db, type(self._collections)) data_id_sql = ( - storage.tags.select() + tags_table.select() .where( sqlalchemy.sql.and_( - storage.tags.columns.dataset_id == id, - storage.tags.columns.dataset_type_id == storage.dataset_type_id, + tags_table.columns.dataset_id == id, + tags_table.columns.dataset_type_id == record.dataset_type_id, ) ) .limit(1) @@ -535,6 +506,7 @@ def _record_from_row(self, row: Mapping) -> _DatasetTypeRecord: return _DatasetTypeRecord( dataset_type=datasetType, dataset_type_id=row["id"], + dimensions_key=row["dimensions_key"], tag_table_name=row["tag_association_table"], calib_table_name=calibTableName, ) @@ -544,6 +516,28 @@ def _dataset_type_from_row(self, row: Mapping) -> DatasetType: def _fetch_dataset_types(self) -> list[DatasetType]: """Fetch list of all defined dataset types.""" + # This is one of three places we populate the dataset type cache: + # + # - This method handles almost all requests for dataset types that + # should already exist. It always marks the cache as "full" in both + # dataset type names and dimensions. + # + # - register_dataset_type handles the case where the dataset type might + # not existing yet. Since it can only add a single dataset type, it + # never changes whether the cache is full. + # + # - getDatasetRef is a special case for a dataset type that should + # already exist, but is looked up via a dataset ID rather than its + # name. It also never changes whether the cache is full, and it's + # handles separately essentially as an optimization: we can fetch a + # single dataset type definition record in a join when we query for + # the dataset type based on the dataset ID, and this is better than + # blindly fetching all dataset types in a separate query. + # + # In all three cases, we require that the per-dimensions data be cached + # whenever a dataset type is added to the cache by name, to reduce the + # number of possible states the cache can be in and minimize the number + # of queries. if self._caching_context.dataset_types is not None: if self._caching_context.dataset_types.full: return [dataset_type for dataset_type, _ in self._caching_context.dataset_types.items()] @@ -552,10 +546,57 @@ def _fetch_dataset_types(self) -> list[DatasetType]: records = [self._record_from_row(row) for row in sql_rows] # Cache everything and specify that cache is complete. if self._caching_context.dataset_types is not None: - cache_data = [(record.dataset_type, self._make_storage(record)) for record in records] - self._caching_context.dataset_types.set(cache_data, full=True) + cache_data: list[tuple[DatasetType, int]] = [] + cache_dimensions_data: dict[DimensionGroup, DynamicTables] = {} + for record in records: + cache_data.append((record.dataset_type, record.dataset_type_id)) + if (dynamic_tables := cache_dimensions_data.get(record.dataset_type.dimensions)) is None: + cache_dimensions_data[record.dataset_type.dimensions] = record.make_dynamic_tables() + else: + record.update_dynamic_tables(dynamic_tables) + self._caching_context.dataset_types.set( + cache_data, full=True, dimensions_data=cache_dimensions_data.items(), dimensions_full=True + ) return [record.dataset_type for record in records] + def _find_storage(self, name: str) -> _DatasetRecordStorage: + """Find a dataset type and the extra information needed to work with + it, utilizing and populating the cache as needed. + """ + if self._caching_context.dataset_types is not None: + dataset_type, dataset_type_id = self._caching_context.dataset_types.get(name) + if dataset_type is not None: + tables = self._caching_context.dataset_types.get_by_dimensions(dataset_type.dimensions) + assert ( + dataset_type_id is not None and tables is not None + ), "Dataset type cache population is incomplete." + return _DatasetRecordStorage( + dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables + ) + else: + # On the first cache miss populate the cache with complete list + # of dataset types (if it was not done yet). + if not self._caching_context.dataset_types.full: + self._fetch_dataset_types() + # Try again + dataset_type, dataset_type_id = self._caching_context.dataset_types.get(name) + if dataset_type is not None: + tables = self._caching_context.dataset_types.get_by_dimensions(dataset_type.dimensions) + assert ( + dataset_type_id is not None and tables is not None + ), "Dataset type cache population is incomplete." + return _DatasetRecordStorage( + dataset_type=dataset_type, dataset_type_id=dataset_type_id, dynamic_tables=tables + ) + record = self._fetch_dataset_type_record(name) + if record is not None: + if self._caching_context.dataset_types is not None: + self._caching_context.dataset_types.add(record.dataset_type, record.dataset_type_id) + return _DatasetRecordStorage( + record.dataset_type, record.dataset_type_id, record.make_dynamic_tables() + ) + raise MissingDatasetTypeError(f"Dataset type {name!r} does not exist.") + def getCollectionSummary(self, collection: CollectionRecord) -> CollectionSummary: # Docstring inherited from DatasetRecordStorageManager. summaries = self._summaries.fetch_summaries([collection], None, self._dataset_type_from_row) @@ -582,14 +623,14 @@ def ingest_date_dtype(self) -> type: def insert( self, - dataset_type: DatasetType, + dataset_type_name: str, run: RunRecord, data_ids: Iterable[DataCoordinate], id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, ) -> list[DatasetRef]: # Docstring inherited from DatasetRecordStorageManager. - if (storage := self.find(dataset_type.name)) is None: - raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") + if (storage := self._find_storage(dataset_type_name)) is None: + raise MissingDatasetTypeError(f"Dataset type {dataset_type_name!r} has not been registered.") # Current timestamp, type depends on schema version. Use microsecond # precision for astropy time to keep things consistent with # TIMESTAMP(6) SQL type. @@ -606,11 +647,13 @@ def insert( data_id_list: list[DataCoordinate] = [] rows = [] summary = CollectionSummary() - for dataId in summary.add_data_ids_generator(dataset_type, data_ids): + for dataId in summary.add_data_ids_generator(storage.dataset_type, data_ids): data_id_list.append(dataId) rows.append( { - "id": self._id_maker.makeDatasetId(run.name, dataset_type, dataId, id_generation_mode), + "id": self._id_maker.makeDatasetId( + run.name, storage.dataset_type, dataId, id_generation_mode + ), "dataset_type_id": storage.dataset_type_id, self._run_key_column: run.key, "ingest_date": timestamp, @@ -639,11 +682,11 @@ def insert( for dataId, row in zip(data_id_list, rows, strict=True) ] # Insert those rows into the tags table. - self._db.insert(storage.tags, *tagsRows) + self._db.insert(storage.dynamic_tables.tags(self._db, type(self._collections)), *tagsRows) return [ DatasetRef( - datasetType=dataset_type, + datasetType=storage.dataset_type, dataId=dataId, id=row["id"], run=run.name, @@ -653,13 +696,17 @@ def insert( def import_( self, - dataset_type: DatasetType, + dataset_type_name: str, run: RunRecord, - datasets: Iterable[DatasetRef], + data_ids: Mapping[DatasetId, DataCoordinate], ) -> list[DatasetRef]: # Docstring inherited from DatasetRecordStorageManager. - if (storage := self.find(dataset_type.name)) is None: - raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") + if not data_ids: + # Just in case an empty mapping is provided we want to avoid + # adding dataset type to summary tables. + return [] + if (storage := self._find_storage(dataset_type_name)) is None: + raise MissingDatasetTypeError(f"Dataset type {dataset_type_name!r} has not been registered.") # Current timestamp, type depends on schema version. if self._use_astropy_ingest_date: # Astropy `now()` precision should be the same as `now()` which @@ -667,21 +714,10 @@ def import_( timestamp = sqlalchemy.sql.literal(astropy.time.Time.now(), type_=ddl.AstropyTimeNsecTai) else: timestamp = sqlalchemy.sql.literal(datetime.datetime.now(datetime.UTC)) - - # Iterate over data IDs, transforming a possibly-single-pass iterable - # into a dict. - data_ids: dict[DatasetId, DataCoordinate] = {} - summary = CollectionSummary() - for dataset in summary.add_datasets_generator(datasets): - data_ids[dataset.id] = dataset.dataId - - if not data_ids: - # Just in case an empty collection is provided we want to avoid - # adding dataset type to summary tables. - return [] - # We'll insert all new rows into a temporary table - table_spec = makeTagTableSpec(dataset_type.dimensions, type(self._collections), constraints=False) + table_spec = makeTagTableSpec( + storage.dataset_type.dimensions, type(self._collections), constraints=False + ) collection_fkey_name = self._collections.getCollectionForeignKeyName() proto_ags_row = { "dataset_type_id": storage.dataset_type_id, @@ -694,18 +730,15 @@ def import_( with self._db.transaction(for_temp_tables=True), self._db.temporary_table(table_spec) as tmp_tags: # store all incoming data in a temporary table self._db.insert(tmp_tags, *tmpRows) - # There are some checks that we want to make for consistency # of the new datasets with existing ones. self._validate_import(storage, tmp_tags, run) - # Before we merge temporary table into dataset/tags we need to # drop datasets which are already there (and do not conflict). self._db.deleteWhere( tmp_tags, tmp_tags.columns.dataset_id.in_(sqlalchemy.sql.select(self._static.dataset.columns.id)), ) - # Copy it into dataset table, need to re-label some columns. self._db.insert( self._static.dataset, @@ -716,34 +749,35 @@ def import_( timestamp.label("ingest_date"), ), ) - + refs = [ + DatasetRef( + datasetType=storage.dataset_type, + id=dataset_id, + dataId=dataId, + run=run.name, + ) + for dataset_id, dataId in data_ids.items() + ] # Update the summary tables for this collection in case this # is the first time this dataset type or these governor values # will be inserted there. + summary = CollectionSummary() + summary.add_datasets(refs) self._summaries.update(run, [storage.dataset_type_id], summary) - - # Copy it into tags table. - self._db.insert(storage.tags, select=tmp_tags.select()) - - # Return refs in the same order as in the input list. - return [ - DatasetRef( - datasetType=dataset_type, - id=dataset_id, - dataId=dataId, - run=run.name, + # Copy from temp table into tags table. + self._db.insert( + storage.dynamic_tables.tags(self._db, type(self._collections)), select=tmp_tags.select() ) - for dataset_id, dataId in data_ids.items() - ] + return refs def _validate_import( - self, storage: ByDimensionsDatasetRecordStorageUUID, tmp_tags: sqlalchemy.schema.Table, run: RunRecord + self, storage: _DatasetRecordStorage, tmp_tags: sqlalchemy.schema.Table, run: RunRecord ) -> None: """Validate imported refs against existing datasets. Parameters ---------- - storage : `ByDimensionsDatasetRecordStorageUUID` + storage : `_DatasetREcordStorage` Struct that holds the tables and ID for a dataset type. tmp_tags : `sqlalchemy.schema.Table` Temporary table with new datasets and the same schema as tags @@ -757,7 +791,7 @@ def _validate_import( Raise if new datasets conflict with existing ones. """ dataset = self._static.dataset - tags = storage.tags + tags = storage.dynamic_tables.tags(self._db, type(self._collections)) collection_fkey_name = self._collections.getCollectionForeignKeyName() # Check that existing datasets have the same dataset type and @@ -792,15 +826,15 @@ def _validate_import( ) else: raise ConflictingDefinitionError( - f"Dataset {row.dataset_id} was provided with type {storage.datasetType.name!r} " + f"Dataset {row.dataset_id} was provided with type {storage.dataset_type.name!r} " f"in run {new_run!r}, but was already defined with type ID {row.dataset_type_id} " f"in run {run!r}." ) else: raise ConflictingDefinitionError( f"Dataset {row.dataset_id} was provided with type ID {row.new_dataset_type_id} " - f"in run {new_run!r}, but was already defined with type {storage.datasetType.name!r} " - f"in run {run!r}." + f"in run {new_run!r}, but was already defined with type " + f"{storage.dataset_type.name!r} in run {run!r}." ) # Check that matching dataset in tags table has the same DataId. @@ -809,10 +843,10 @@ def _validate_import( tags.columns.dataset_id, tags.columns.dataset_type_id.label("type_id"), tmp_tags.columns.dataset_type_id.label("new_type_id"), - *[tags.columns[dim] for dim in storage.datasetType.dimensions.required], + *[tags.columns[dim] for dim in storage.dataset_type.dimensions.required], *[ tmp_tags.columns[dim].label(f"new_{dim}") - for dim in storage.datasetType.dimensions.required + for dim in storage.dataset_type.dimensions.required ], ) .select_from(tags.join(tmp_tags, tags.columns.dataset_id == tmp_tags.columns.dataset_id)) @@ -821,7 +855,7 @@ def _validate_import( tags.columns.dataset_type_id != tmp_tags.columns.dataset_type_id, *[ tags.columns[dim] != tmp_tags.columns[dim] - for dim in storage.datasetType.dimensions.required + for dim in storage.dataset_type.dimensions.required ], ) ) @@ -838,7 +872,7 @@ def _validate_import( # Check that matching run+dataId have the same dataset ID. query = ( sqlalchemy.sql.select( - *[tags.columns[dim] for dim in storage.datasetType.dimensions.required], + *[tags.columns[dim] for dim in storage.dataset_type.dimensions.required], tags.columns.dataset_id, tmp_tags.columns.dataset_id.label("new_dataset_id"), tags.columns[collection_fkey_name], @@ -852,7 +886,7 @@ def _validate_import( tags.columns[collection_fkey_name] == tmp_tags.columns[collection_fkey_name], *[ tags.columns[dim] == tmp_tags.columns[dim] - for dim in storage.datasetType.dimensions.required + for dim in storage.dataset_type.dimensions.required ], ), ) @@ -863,11 +897,11 @@ def _validate_import( with self._db.query(query) as result: # only include the first one in the exception message if (row := result.first()) is not None: - data_id = {dim: getattr(row, dim) for dim in storage.datasetType.dimensions.required} + data_id = {dim: getattr(row, dim) for dim in storage.dataset_type.dimensions.required} existing_collection = self._collections[getattr(row, collection_fkey_name)].name new_collection = self._collections[getattr(row, f"new_{collection_fkey_name}")].name raise ConflictingDefinitionError( - f"Dataset with type {storage.datasetType.name!r} and data ID {data_id} " + f"Dataset with type {storage.dataset_type.name!r} and data ID {data_id} " f"has ID {row.dataset_id} in existing collection {existing_collection!r} " f"but ID {row.new_dataset_id} in new collection {new_collection!r}." ) @@ -886,7 +920,7 @@ def associate( self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef] ) -> None: # Docstring inherited from DatasetRecordStorageManager. - if (storage := self.find(dataset_type.name)) is None: + if (storage := self._find_storage(dataset_type.name)) is None: raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") if collection.type is not CollectionType.TAGGED: raise CollectionTypeError( @@ -907,13 +941,13 @@ def associate( # inserted there. self._summaries.update(collection, [storage.dataset_type_id], summary) # Update the tag table itself. - self._db.replace(storage.tags, *rows) + self._db.replace(storage.dynamic_tables.tags(self._db, type(self._collections)), *rows) def disassociate( self, dataset_type: DatasetType, collection: CollectionRecord, datasets: Iterable[DatasetRef] ) -> None: # Docstring inherited from DatasetRecordStorageManager. - if (storage := self.find(dataset_type.name)) is None: + if (storage := self._find_storage(dataset_type.name)) is None: raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") if collection.type is not CollectionType.TAGGED: raise CollectionTypeError( @@ -927,7 +961,11 @@ def disassociate( } for dataset in datasets ] - self._db.delete(storage.tags, ["dataset_id", self._collections.getCollectionForeignKeyName()], *rows) + self._db.delete( + storage.dynamic_tables.tags(self._db, type(self._collections)), + ["dataset_id", self._collections.getCollectionForeignKeyName()], + *rows, + ) def certify( self, @@ -938,10 +976,10 @@ def certify( context: SqlQueryContext, ) -> None: # Docstring inherited from DatasetRecordStorageManager. - if (storage := self.find(dataset_type.name)) is None: + if (storage := self._find_storage(dataset_type.name)) is None: raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") - if storage.calibs is None: - raise CollectionTypeError( + if not dataset_type.isCalibration(): + raise DatasetTypeError( f"Cannot certify datasets of type {dataset_type.name!r}, for which " "DatasetType.isCalibration() is False." ) @@ -975,11 +1013,12 @@ def certify( # inserted there. self._summaries.update(collection, [storage.dataset_type_id], summary) # Update the association table itself. + calibs_table = storage.dynamic_tables.calibs(self._db, type(self._collections)) if TimespanReprClass.hasExclusionConstraint(): # Rely on database constraint to enforce invariants; we just # reraise the exception for consistency across DB engines. try: - self._db.insert(storage.calibs, *rows) + self._db.insert(calibs_table, *rows) except sqlalchemy.exc.IntegrityError as err: raise ConflictingDefinitionError( f"Validity range conflict certifying datasets of type {dataset_type.name!r} " @@ -994,7 +1033,7 @@ def certify( # could invalidate our checking before we finish the inserts. We # use a SAVEPOINT in case there is an outer transaction that a # failure here should not roll back. - with self._db.transaction(lock=[storage.calibs], savepoint=True): + with self._db.transaction(lock=[calibs_table], savepoint=True): # Enter SqlQueryContext in case we need to use a temporary # table to include the give data IDs in the query. Note that # by doing this inside the transaction, we make sure it doesn't @@ -1011,7 +1050,7 @@ def certify( f"[{timespan.begin}, {timespan.end})." ) # Proceed with the insert. - self._db.insert(storage.calibs, *rows) + self._db.insert(calibs_table, *rows) def decertify( self, @@ -1023,11 +1062,11 @@ def decertify( context: SqlQueryContext, ) -> None: # Docstring inherited from DatasetRecordStorageManager. - if (storage := self.find(dataset_type.name)) is None: + if (storage := self._find_storage(dataset_type.name)) is None: raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") - if storage.calibs is None: - raise CollectionTypeError( - f"Cannot decertify datasets of type {dataset_type.name!r}, for which " + if not dataset_type.isCalibration(): + raise DatasetTypeError( + f"Cannot certify datasets of type {dataset_type.name!r}, for which " "DatasetType.isCalibration() is False." ) if collection.type is not CollectionType.CALIBRATION: @@ -1058,7 +1097,8 @@ def decertify( rows_to_insert = [] # Acquire a table lock to ensure there are no concurrent writes # between the SELECT and the DELETE and INSERT queries based on it. - with self._db.transaction(lock=[storage.calibs], savepoint=True): + calibs_table = storage.dynamic_tables.calibs(self._db, type(self._collections)) + with self._db.transaction(lock=[calibs_table], savepoint=True): # Enter SqlQueryContext in case we need to use a temporary table to # include the give data IDs in the query (see similar block in # certify for details). @@ -1080,8 +1120,8 @@ def decertify( TimespanReprClass.update(diff_timespan, result=new_insert_row.copy()) ) # Run the DELETE and INSERT queries. - self._db.delete(storage.calibs, ["id"], *rows_to_delete) - self._db.insert(storage.calibs, *rows_to_insert) + self._db.delete(calibs_table, ["id"], *rows_to_delete) + self._db.insert(calibs_table, *rows_to_insert) def _build_calib_overlap_query( self, @@ -1114,7 +1154,7 @@ def make_relation( context: SqlQueryContext, ) -> Relation: # Docstring inherited from DatasetRecordStorageManager. - if (storage := self.find(dataset_type.name)) is None: + if (storage := self._find_storage(dataset_type.name)) is None: raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") collection_types = {collection.type for collection in collections} assert CollectionType.CHAINED not in collection_types, "CHAINED collections must be flattened." @@ -1144,19 +1184,19 @@ def make_relation( tag_relation: Relation | None = None calib_relation: Relation | None = None if collection_types != {CollectionType.CALIBRATION}: + tags_table = storage.dynamic_tables.tags(self._db, type(self._collections)) # We'll need a subquery for the tags table if any of the given # collections are not a CALIBRATION collection. This intentionally # also fires when the list of collections is empty as a way to # create a dummy subquery that we know will fail. # We give the table an alias because it might appear multiple times # in the same query, for different dataset types. - tags_parts = sql.Payload[LogicalColumn](storage.tags.alias(f"{dataset_type.name}_tags")) + tags_parts = sql.Payload[LogicalColumn](tags_table.alias(f"{dataset_type.name}_tags")) if "timespan" in columns: tags_parts.columns_available[DatasetColumnTag(dataset_type.name, "timespan")] = ( TimespanReprClass.fromLiteral(Timespan(None, None)) ) tag_relation = self._finish_single_relation( - dataset_type, storage, tags_parts, columns, @@ -1172,10 +1212,8 @@ def make_relation( # If at least one collection is a CALIBRATION collection, we'll # need a subquery for the calibs table, and could include the # timespan as a result or constraint. - assert ( - storage.calibs is not None - ), "DatasetTypes with isCalibration() == False can never be found in a CALIBRATION collection." - calibs_parts = sql.Payload[LogicalColumn](storage.calibs.alias(f"{dataset_type.name}_calibs")) + calibs_table = storage.dynamic_tables.calibs(self._db, type(self._collections)) + calibs_parts = sql.Payload[LogicalColumn](calibs_table.alias(f"{dataset_type.name}_calibs")) if "timespan" in columns: calibs_parts.columns_available[DatasetColumnTag(dataset_type.name, "timespan")] = ( TimespanReprClass.from_columns(calibs_parts.from_clause.columns) @@ -1189,7 +1227,6 @@ def make_relation( calibs_parts.from_clause.columns.id ) calib_relation = self._finish_single_relation( - dataset_type, storage, calibs_parts, columns, @@ -1215,8 +1252,7 @@ def make_relation( def _finish_single_relation( self, - dataset_type: DatasetType, - storage: ByDimensionsDatasetRecordStorageUUID, + storage: _DatasetRecordStorage, payload: sql.Payload[LogicalColumn], requested_columns: Set[str], collections: Sequence[tuple[CollectionRecord, int]], @@ -1229,8 +1265,6 @@ def _finish_single_relation( Parameters ---------- - dataset_type : `DatasetType` - Type of dataset to query for. storage : `ByDimensionsDatasetRecordStorageUUID` Struct that holds the tables and ID for the dataset type. payload : `lsst.daf.relation.sql.Payload` @@ -1257,29 +1291,35 @@ def _finish_single_relation( if len(collections) == 1: payload.where.append(collection_col == collections[0][0].key) if "collection" in requested_columns: - payload.columns_available[DatasetColumnTag(dataset_type.name, "collection")] = ( + payload.columns_available[DatasetColumnTag(storage.dataset_type.name, "collection")] = ( sqlalchemy.sql.literal(collections[0][0].key) ) else: assert collections, "The no-collections case should be in calling code for better diagnostics." payload.where.append(collection_col.in_([collection.key for collection, _ in collections])) if "collection" in requested_columns: - payload.columns_available[DatasetColumnTag(dataset_type.name, "collection")] = collection_col + payload.columns_available[DatasetColumnTag(storage.dataset_type.name, "collection")] = ( + collection_col + ) # Add rank if requested as a CASE-based calculation the collection # column. if "rank" in requested_columns: - payload.columns_available[DatasetColumnTag(dataset_type.name, "rank")] = sqlalchemy.sql.case( - {record.key: rank for record, rank in collections}, - value=collection_col, + payload.columns_available[DatasetColumnTag(storage.dataset_type.name, "rank")] = ( + sqlalchemy.sql.case( + {record.key: rank for record, rank in collections}, + value=collection_col, + ) ) # Add more column definitions, starting with the data ID. - for dimension_name in dataset_type.dimensions.required: + for dimension_name in storage.dataset_type.dimensions.required: payload.columns_available[DimensionKeyColumnTag(dimension_name)] = payload.from_clause.columns[ dimension_name ] # We can always get the dataset_id from the tags/calibs table. if "dataset_id" in requested_columns: - payload.columns_available[DatasetColumnTag(dataset_type.name, "dataset_id")] = dataset_id_col + payload.columns_available[DatasetColumnTag(storage.dataset_type.name, "dataset_id")] = ( + dataset_id_col + ) # It's possible we now have everything we need, from just the # tags/calibs table. The things we might need to get from the static # dataset table are the run key and the ingest date. @@ -1290,18 +1330,18 @@ def _finish_single_relation( # know that if we find the dataset in that collection, # then that's the datasets's run; we don't need to # query for it. - payload.columns_available[DatasetColumnTag(dataset_type.name, "run")] = ( + payload.columns_available[DatasetColumnTag(storage.dataset_type.name, "run")] = ( sqlalchemy.sql.literal(collections[0][0].key) ) else: - payload.columns_available[DatasetColumnTag(dataset_type.name, "run")] = ( + payload.columns_available[DatasetColumnTag(storage.dataset_type.name, "run")] = ( self._static.dataset.columns[self._run_key_column] ) need_static_table = True # Ingest date can only come from the static table. if "ingest_date" in requested_columns: need_static_table = True - payload.columns_available[DatasetColumnTag(dataset_type.name, "ingest_date")] = ( + payload.columns_available[DatasetColumnTag(storage.dataset_type.name, "ingest_date")] = ( self._static.dataset.columns.ingest_date ) # If we need the static table, join it in via dataset_id and @@ -1319,7 +1359,7 @@ def _finish_single_relation( leaf = context.sql_engine.make_leaf( payload.columns_available.keys(), payload=payload, - name=dataset_type.name, + name=storage.dataset_type.name, parameters={record.name: rank for record, rank in collections}, ) return leaf @@ -1327,7 +1367,7 @@ def _finish_single_relation( def make_query_joiner( self, dataset_type: DatasetType, collections: Sequence[CollectionRecord], fields: Set[str] ) -> QueryJoiner: - if (storage := self.find(dataset_type.name)) is None: + if (storage := self._find_storage(dataset_type.name)) is None: raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") # This method largely mimics `make_relation`, but it uses the new query # system primitives instead of the old one. In terms of the SQL @@ -1375,10 +1415,10 @@ def make_query_joiner( # create a dummy subquery that we know will fail. # We give the table an alias because it might appear multiple times # in the same query, for different dataset types. + tags_table = storage.dynamic_tables.tags(self._db, type(self._collections)) tags_builder = self._finish_query_builder( - dataset_type, storage, - QueryJoiner(self._db, storage.tags.alias(f"{dataset_type.name}_tags")).to_builder(columns), + QueryJoiner(self._db, tags_table.alias(f"{dataset_type.name}_tags")).to_builder(columns), [record for record in collections if record.type is not CollectionType.CALIBRATION], fields, ) @@ -1391,12 +1431,10 @@ def make_query_joiner( # If at least one collection is a CALIBRATION collection, we'll # need a subquery for the calibs table, and could include the # timespan as a result or constraint. - assert ( - storage.calibs is not None - ), "DatasetTypes with isCalibration() == False can never be found in a CALIBRATION collection." - calibs_table = storage.calibs.alias(f"{dataset_type.name}_calibs") + calibs_table = storage.dynamic_tables.calibs(self._db, type(self._collections)).alias( + f"{dataset_type.name}_calibs" + ) calibs_builder = self._finish_query_builder( - dataset_type, storage, QueryJoiner(self._db, calibs_table).to_builder(columns), [record for record in collections if record.type is CollectionType.CALIBRATION], @@ -1423,8 +1461,7 @@ def make_query_joiner( def _finish_query_builder( self, - dataset_type: DatasetType, - storage: ByDimensionsDatasetRecordStorageUUID, + storage: _DatasetRecordStorage, sql_projection: QueryBuilder, collections: Sequence[CollectionRecord], fields: Set[str], @@ -1441,11 +1478,11 @@ def _finish_query_builder( ) dataset_id_col = sql_projection.joiner.from_clause.c.dataset_id collection_col = sql_projection.joiner.from_clause.c[self._collections.getCollectionForeignKeyName()] - fields_provided = sql_projection.joiner.fields[dataset_type.name] + fields_provided = sql_projection.joiner.fields[storage.dataset_type.name] # We always constrain and optionally retrieve the collection(s) via the # tags/calibs table. if "collection_key" in fields: - sql_projection.joiner.fields[dataset_type.name]["collection_key"] = collection_col + sql_projection.joiner.fields[storage.dataset_type.name]["collection_key"] = collection_col if len(collections) == 1: only_collection_record = collections[0] sql_projection.joiner.where(collection_col == only_collection_record.key) @@ -1471,7 +1508,7 @@ def _finish_query_builder( collections, collection_col ) # Add more column definitions, starting with the data ID. - sql_projection.joiner.extract_dimensions(dataset_type.dimensions.required) + sql_projection.joiner.extract_dimensions(storage.dataset_type.dimensions.required) # We can always get the dataset_id from the tags/calibs table, even if # could also get it from the 'static' dataset table. if "dataset_id" in fields: @@ -1550,7 +1587,7 @@ def _finish_query_builder( def refresh_collection_summaries(self, dataset_type: DatasetType) -> None: # Docstring inherited. - if (storage := self.find(dataset_type.name)) is None: + if (storage := self._find_storage(dataset_type.name)) is None: raise MissingDatasetTypeError(f"Dataset type {dataset_type.name!r} has not been registered.") with self._db.transaction(): # The main issue here is consistency in the presence of concurrent @@ -1567,15 +1604,17 @@ def refresh_collection_summaries(self, dataset_type: DatasetType) -> None: # Query datasets tables for associated collections. column_name = self._collections.getCollectionForeignKeyName() + tags_table = storage.dynamic_tables.tags(self._db, type(self._collections)) query: sqlalchemy.sql.expression.SelectBase = ( - sqlalchemy.select(storage.tags.columns[column_name]) - .where(storage.tags.columns.dataset_type_id == storage.dataset_type_id) + sqlalchemy.select(tags_table.columns[column_name]) + .where(tags_table.columns.dataset_type_id == storage.dataset_type_id) .distinct() ) - if (calibs := storage.calibs) is not None: + if dataset_type.isCalibration(): + calibs_table = storage.dynamic_tables.calibs(self._db, type(self._collections)) query2 = ( - sqlalchemy.select(calibs.columns[column_name]) - .where(calibs.columns.dataset_type_id == storage.dataset_type_id) + sqlalchemy.select(calibs_table.columns[column_name]) + .where(calibs_table.columns.dataset_type_id == storage.dataset_type_id) .distinct() ) query = sqlalchemy.sql.expression.union(query, query2) diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/_storage.py b/python/lsst/daf/butler/registry/datasets/byDimensions/_storage.py deleted file mode 100644 index 33de9e8fcf..0000000000 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/_storage.py +++ /dev/null @@ -1,115 +0,0 @@ -# This file is part of daf_butler. -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (http://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This software is dual licensed under the GNU General Public License and also -# under a 3-clause BSD license. Recipients may choose which of these licenses -# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, -# respectively. If you choose the GPL option then the following text applies -# (but note that there is still no warranty even if you opt for BSD instead): -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - - -from __future__ import annotations - -__all__ = ("ByDimensionsDatasetRecordStorageUUID",) - -from collections.abc import Callable -from typing import TYPE_CHECKING - -import sqlalchemy - -from ...._dataset_type import DatasetType -from ...interfaces import DatasetRecordStorage - -if TYPE_CHECKING: - from ...interfaces import CollectionManager, Database - from .summaries import CollectionSummaryManager - from .tables import StaticDatasetTablesTuple - - -class ByDimensionsDatasetRecordStorageUUID(DatasetRecordStorage): - """Dataset record storage implementation paired with - `ByDimensionsDatasetRecordStorageManagerUUID`; see that class for more - information. - - Instances of this class should never be constructed directly; use - `DatasetRecordStorageManager.register` instead. - - Parameters - ---------- - datasetType : `DatasetType` - The dataset type to use. - db : `Database` - Database connection. - dataset_type_id : `int` - Dataset type identifier. - collections : `CollectionManager` - The collection manager. - static : `StaticDatasetTablesTuple` - Unknown. - summaries : `CollectionSummaryManager` - Collection summary manager. - tags_table_factory : `~collections.abc.Callable` - Factory for creating tags tables. - use_astropy_ingest_date : `bool` - Whether to use Astropy for ingest date. - calibs_table_factory : `~collections.abc.Callable` - Factory for creating calibration tables. - """ - - def __init__( - self, - *, - datasetType: DatasetType, - db: Database, - dataset_type_id: int, - collections: CollectionManager, - static: StaticDatasetTablesTuple, - summaries: CollectionSummaryManager, - tags_table_factory: Callable[[], sqlalchemy.schema.Table], - use_astropy_ingest_date: bool, - calibs_table_factory: Callable[[], sqlalchemy.schema.Table] | None, - ): - super().__init__(datasetType=datasetType) - self.dataset_type_id = dataset_type_id - self._db = db - self._collections = collections - self._static = static - self._summaries = summaries - self._tags_table_factory = tags_table_factory - self._calibs_table_factory = calibs_table_factory - self._runKeyColumn = collections.getRunForeignKeyName() - self._use_astropy = use_astropy_ingest_date - self._tags_table: sqlalchemy.schema.Table | None = None - self._calibs_table: sqlalchemy.schema.Table | None = None - - @property - def tags(self) -> sqlalchemy.schema.Table: - if self._tags_table is None: - self._tags_table = self._tags_table_factory() - return self._tags_table - - @property - def calibs(self) -> sqlalchemy.schema.Table | None: - if self._calibs_table is None: - if self._calibs_table_factory is None: - return None - self._calibs_table = self._calibs_table_factory() - return self._calibs_table diff --git a/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py b/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py index 661160fe47..7528253d63 100644 --- a/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py +++ b/python/lsst/daf/butler/registry/datasets/byDimensions/tables.py @@ -27,8 +27,6 @@ from __future__ import annotations -from .... import ddl - __all__ = ( "addDatasetForeignKey", "makeCalibTableName", @@ -44,13 +42,18 @@ import sqlalchemy +from .... import ddl from ....dimensions import DimensionGroup, DimensionUniverse, GovernorDimension, addDimensionForeignKey from ....timespan_database_representation import TimespanDatabaseRepresentation -from ...interfaces import CollectionManager, VersionTuple +from ...interfaces import CollectionManager, Database, VersionTuple DATASET_TYPE_NAME_LENGTH = 128 +class MissingDatabaseTableError(RuntimeError): + """Exception raised when a table is not found in a database.""" + + StaticDatasetTablesTuple = namedtuple( "StaticDatasetTablesTuple", [ @@ -444,3 +447,148 @@ def makeCalibTableSpec( index.extend(fieldSpec.name for fieldSpec in tsFieldSpecs) tableSpec.indexes.add(ddl.IndexSpec(*index)) # type: ignore return tableSpec + + +class DynamicTables: + """A struct that holds the "dynamic" tables common to dataset types that + share the same dimensions. + + Parameters + ---------- + dimensions : `DimensionGroup` + Dimensions of the dataset types that use these tables. + dimensions_key : `int` + Integer key used to persist this dimension group in the database and + name the associated tables. + tags_name : `str` + Name of the "tags" table that associates datasets with data IDs in + RUN and TAGGED collections. + calibs_name : `str` or `None` + Name of the "calibs" table that associates datasets with data IDs and + timespans in CALIBRATION collections. This is `None` if none of the + dataset types (or at least none of those seen by this client) are + calibrations. + """ + + def __init__( + self, dimensions: DimensionGroup, dimensions_key: int, tags_name: str, calibs_name: str | None + ): + self._dimensions = dimensions + self.dimensions_key = dimensions_key + self.tags_name = tags_name + self.calibs_name = calibs_name + self._tags_table: sqlalchemy.Table | None = None + self._calibs_table: sqlalchemy.Table | None = None + + @classmethod + def from_dimensions_key( + cls, dimensions: DimensionGroup, dimensions_key: int, is_calibration: bool + ) -> DynamicTables: + """Construct with table names generated from the dimension key. + + Parameters + ---------- + dimensions : `DimensionGroup` + Dimensions of the dataset types that use these tables. + dimensions_key : `int` + Integer key used to persist this dimension group in the database + and name the associated tables. + is_calibration : `bool` + Whether any of the dataset types that use these tables are + calibrations. + + Returns + ------- + dynamic_tables : `DynamicTables` + Struct that holds tables for a group of dataset types. + """ + return cls( + dimensions, + dimensions_key=dimensions_key, + tags_name=makeTagTableName(dimensions_key), + calibs_name=makeCalibTableName(dimensions_key) if is_calibration else None, + ) + + def create(self, db: Database, collections: type[CollectionManager]) -> None: + """Create the tables if they don't already exist. + + Parameters + ---------- + db : `Database` + Database interface. + collections : `type` [ `CollectionManager` ] + Manager class for collections; used to create foreign key columns + for collections. + """ + if self._tags_table is None: + self._tags_table = db.ensureTableExists( + self.tags_name, + makeTagTableSpec(self._dimensions, collections), + ) + if self.calibs_name is not None and self._calibs_table is None: + self._calibs_table = db.ensureTableExists( + self.calibs_name, + makeCalibTableSpec(self._dimensions, collections, db.getTimespanRepresentation()), + ) + + def tags(self, db: Database, collections: type[CollectionManager]) -> sqlalchemy.Table: + """Return the "tags" table that associates datasets with data IDs in + TAGGED and RUN collections. + + This method caches its result the first time it is called (and assumes + the arguments it is given never change). + + Parameters + ---------- + db : `Database` + Database interface. + collections : `type` [ `CollectionManager` ] + Manager class for collections; used to create foreign key columns + for collections. + + Returns + ------- + table : `sqlalchemy.Table` + SQLAlchemy table object. + """ + if self._tags_table is None: + spec = makeTagTableSpec(self._dimensions, collections) + table = db.getExistingTable(self.tags_name, spec) + if table is None: + raise MissingDatabaseTableError(f"Table {self.tags_name!r} is missing from database schema.") + self._tags_table = table + return self._tags_table + + def calibs(self, db: Database, collections: type[CollectionManager]) -> sqlalchemy.Table: + """Return the "calibs" table that associates datasets with data IDs and + timespans in CALIBRATION collections. + + This method caches its result the first time it is called (and assumes + the arguments it is given never change). It may only be called if the + dataset type is calibration. + + Parameters + ---------- + db : `Database` + Database interface. + collections : `type` [ `CollectionManager` ] + Manager class for collections; used to create foreign key columns + for collections. + + Returns + ------- + table : `sqlalchemy.Table` + SQLAlchemy table object. + """ + assert ( + self.calibs_name is not None + ), "Dataset type should be checked to be calibration by calling code." + if self._calibs_table is None: + spec = makeCalibTableSpec(self._dimensions, collections, db.getTimespanRepresentation()) + table = db.getExistingTable(self.calibs_name, spec) + if table is None: + raise MissingDatabaseTableError( + f"Table {self.calibs_name!r} is missing from database schema." + ) + self._calibs_table = table + return self._calibs_table diff --git a/python/lsst/daf/butler/registry/interfaces/_datasets.py b/python/lsst/daf/butler/registry/interfaces/_datasets.py index 56b43f06ad..98767009db 100644 --- a/python/lsst/daf/butler/registry/interfaces/_datasets.py +++ b/python/lsst/daf/butler/registry/interfaces/_datasets.py @@ -29,9 +29,9 @@ from ... import ddl -__all__ = ("DatasetRecordStorageManager", "DatasetRecordStorage") +__all__ = ("DatasetRecordStorageManager",) -from abc import ABC, abstractmethod +from abc import abstractmethod from collections.abc import Iterable, Mapping, Sequence, Set from typing import TYPE_CHECKING, Any @@ -39,7 +39,7 @@ from ..._dataset_ref import DatasetId, DatasetIdGenEnum, DatasetRef from ..._dataset_type import DatasetType -from ..._exceptions import MissingDatasetTypeError +from ..._exceptions import DatasetTypeError, DatasetTypeNotSupportedError from ..._timespan import Timespan from ...dimensions import DataCoordinate from ._versioning import VersionedExtension, VersionTuple @@ -54,24 +54,6 @@ from ._dimensions import DimensionRecordStorageManager -class DatasetRecordStorage(ABC): - """An interface that manages the records associated with a particular - `DatasetType`. - - Parameters - ---------- - datasetType : `DatasetType` - Dataset type whose records this object manages. - """ - - def __init__(self, datasetType: DatasetType): - self.datasetType = datasetType - - datasetType: DatasetType - """Dataset type whose records this object manages (`DatasetType`). - """ - - class DatasetRecordStorageManager(VersionedExtension): """An interface that manages the tables that describe datasets. @@ -207,64 +189,78 @@ def refresh(self) -> None: """ raise NotImplementedError() - def __getitem__(self, name: str) -> DatasetRecordStorage: - """Return the object that provides access to the records associated - with the given `DatasetType` name. + @abstractmethod + def get_dataset_type(self, name: str) -> DatasetType: + """Look up a dataset type by name. - This is simply a convenience wrapper for `find` that raises `KeyError` - when the dataset type is not found. + Parameters + ---------- + name : `str` + Name of a parent dataset type. Returns ------- - records : `DatasetRecordStorage` + dataset_type : `DatasetType` The object representing the records for the given dataset type. Raises ------ - KeyError + MissingDatasetTypeError Raised if there is no dataset type with the given name. - - Notes - ----- - Dataset types registered by another client of the same repository since - the last call to `initialize` or `refresh` may not be found. """ - result = self.find(name) - if result is None: - raise MissingDatasetTypeError(f"Dataset type with name '{name}' not found.") - return result + raise NotImplementedError() - @abstractmethod - def find(self, name: str) -> DatasetRecordStorage | None: - """Return an object that provides access to the records associated with - the given `DatasetType` name, if one exists. + def conform_exact_dataset_type(self, dataset_type: DatasetType | str) -> DatasetType: + """Conform a value that may be a dataset type or dataset type name to + just the dataset type name, while checking that the dataset type is not + a component and (if a `DatasetType` instance is given) has the exact + same definition in the registry. Parameters ---------- - name : `str` - Name of the dataset type. + dataset_type : `str` or `DatasetType` + Dataset type object or name. Returns ------- - records : `DatasetRecordStorage` or `None` - The object representing the records for the given dataset type, or - `None` if there are no records for that dataset type. + dataset_type : `DatasetType` + The corresponding registered dataset type. - Notes - ----- - Dataset types registered by another client of the same repository since - the last call to `initialize` or `refresh` may not be found. + Raises + ------ + DatasetTypeError + Raised if ``dataset_type`` is a component, or if its definition + does not exactly match the registered dataset type. + MissingDatasetTypeError + Raised if this dataset type is not registered at all. """ - raise NotImplementedError() + if isinstance(dataset_type, DatasetType): + dataset_type_name = dataset_type.name + given_dataset_type = dataset_type + else: + dataset_type_name = dataset_type + given_dataset_type = None + parent_name, component = DatasetType.splitDatasetTypeName(dataset_type_name) + if component is not None: + raise DatasetTypeNotSupportedError( + f"Component dataset {dataset_type_name!r} is not supported in this context." + ) + registered_dataset_type = self.get_dataset_type(dataset_type_name) + if given_dataset_type is not None and registered_dataset_type != given_dataset_type: + raise DatasetTypeError( + f"Given dataset type {given_dataset_type} is not identical to the " + f"registered one {registered_dataset_type}." + ) + return registered_dataset_type @abstractmethod - def register(self, datasetType: DatasetType) -> bool: + def register_dataset_type(self, dataset_type: DatasetType) -> bool: """Ensure that this `Registry` can hold records for the given `DatasetType`, creating new tables as necessary. Parameters ---------- - datasetType : `DatasetType` + dataset_type : `DatasetType` Dataset type for which a table should created (as necessary) and an associated `DatasetRecordStorage` returned. @@ -281,7 +277,7 @@ def register(self, datasetType: DatasetType) -> bool: raise NotImplementedError() @abstractmethod - def remove(self, name: str) -> None: + def remove_dataset_type(self, name: str) -> None: """Remove the dataset type. Parameters @@ -389,7 +385,7 @@ def ingest_date_dtype(self) -> type: @abstractmethod def insert( self, - dataset_type: DatasetType, + dataset_type_name: str, run: RunRecord, data_ids: Iterable[DataCoordinate], id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE, @@ -398,8 +394,8 @@ def insert( Parameters ---------- - dataset_type : `DatasetType` - Type of the datasets to insert. Storage class is ignored. + dataset_type_name : `str` + Name of the dataset type. run : `RunRecord` The record object describing the `~CollectionType.RUN` collection these datasets will be associated with. @@ -423,38 +419,26 @@ def insert( @abstractmethod def import_( self, - dataset_type: DatasetType, + dataset_type_name: str, run: RunRecord, - datasets: Iterable[DatasetRef], + data_ids: Mapping[DatasetId, DataCoordinate], ) -> list[DatasetRef]: """Insert one or more dataset entries into the database. Parameters ---------- - dataset_type : `DatasetType` - Type of all datasets. Storage class is ignored. + dataset_type_name : `str` + Name of the dataset type. run : `RunRecord` The record object describing the `~CollectionType.RUN` collection these datasets will be associated with. - datasets : `~collections.abc.Iterable` of `DatasetRef` - Datasets to be inserted. Datasets can specify ``id`` attribute - which will be used for inserted datasets. All dataset IDs must - have the same type (`int` or `uuid.UUID`), if type of dataset IDs - does not match type supported by this class then IDs will be - ignored and new IDs will be generated by backend. + data_ids : `~collections.abc.Mapping` + Mapping from dataset ID to data ID. Returns ------- datasets : `list` [ `DatasetRef` ] References to the inserted or existing datasets. - - Notes - ----- - The ``datasetType`` and ``run`` attributes of datasets are supposed to - be identical across all datasets but this is not checked and it should - be enforced by higher level registry code. This method does not need - to use those attributes from datasets, only ``dataId`` and ``id`` are - relevant. """ raise NotImplementedError() @@ -484,8 +468,8 @@ def associate( The record object describing the collection. ``collection.type`` must be `~CollectionType.TAGGED`. datasets : `~collections.abc.Iterable` [ `DatasetRef` ] - Datasets to be associated. All datasets must be resolved and have - the same `DatasetType` as ``dataset_type``. + Datasets to be associated. All datasets must have the same + `DatasetType` as ``dataset_type``, but this is not checked. Notes ----- @@ -512,8 +496,8 @@ def disassociate( The record object describing the collection. ``collection.type`` must be `~CollectionType.TAGGED`. datasets : `~collections.abc.Iterable` [ `DatasetRef` ] - Datasets to be disassociated. All datasets must be resolved and - have the same `DatasetType` as ``self``. + Datasets to be disassociated. All datasets must have the same + `DatasetType` as ``dataset_type``, but this is not checked. """ raise NotImplementedError() @@ -537,8 +521,8 @@ def certify( The record object describing the collection. ``collection.type`` must be `~CollectionType.CALIBRATION`. datasets : `~collections.abc.Iterable` [ `DatasetRef` ] - Datasets to be associated. All datasets must be resolved and have - the same `DatasetType` as ``self``. + Datasets to be associated. All datasets must have the same + `DatasetType` as ``dataset_type``, but this is not checked. timespan : `Timespan` The validity range for these datasets within the collection. context : `SqlQueryContext` @@ -551,10 +535,11 @@ def certify( Raised if the collection already contains a different dataset with the same `DatasetType` and data ID and an overlapping validity range. + DatasetTypeError + Raised if ``dataset_type.isCalibration() is False``. CollectionTypeError Raised if - ``collection.type is not CollectionType.CALIBRATION`` or if - ``self.datasetType.isCalibration() is False``. + ``collection.type is not CollectionType.CALIBRATION``. """ raise NotImplementedError() @@ -593,8 +578,11 @@ def decertify( Raises ------ + DatasetTypeError + Raised if ``dataset_type.isCalibration() is False``. CollectionTypeError - Raised if ``collection.type is not CollectionType.CALIBRATION``. + Raised if + ``collection.type is not CollectionType.CALIBRATION``. """ raise NotImplementedError() @@ -606,7 +594,7 @@ def make_relation( columns: Set[str], context: SqlQueryContext, ) -> Relation: - """Return a `sql.Relation` that represents a query for for this + """Return a `sql.Relation` that represents a query for this `DatasetType` in one or more collections. Parameters diff --git a/python/lsst/daf/butler/registry/sql_registry.py b/python/lsst/daf/butler/registry/sql_registry.py index ff5cdac326..bc87681fcf 100644 --- a/python/lsst/daf/butler/registry/sql_registry.py +++ b/python/lsst/daf/butler/registry/sql_registry.py @@ -739,7 +739,7 @@ def registerDatasetType(self, datasetType: DatasetType) -> bool: This method cannot be called within transactions, as it needs to be able to perform its own transaction to be concurrent. """ - return self._managers.datasets.register(datasetType) + return self._managers.datasets.register_dataset_type(datasetType) def removeDatasetType(self, name: str | tuple[str, ...]) -> None: """Remove the named `DatasetType` from the registry. @@ -780,7 +780,7 @@ def removeDatasetType(self, name: str | tuple[str, ...]) -> None: _LOG.info("Dataset type %r not defined", datasetTypeExpression) else: for datasetType in datasetTypes: - self._managers.datasets.remove(datasetType.name) + self._managers.datasets.remove_dataset_type(datasetType.name) _LOG.info("Removed dataset type %r", datasetType.name) def getDatasetType(self, name: str) -> DatasetType: @@ -807,11 +807,11 @@ def getDatasetType(self, name: str) -> DatasetType: other registry operations do not. """ parent_name, component = DatasetType.splitDatasetTypeName(name) - storage = self._managers.datasets[parent_name] + parent_dataset_type = self._managers.datasets.get_dataset_type(parent_name) if component is None: - return storage.datasetType + return parent_dataset_type else: - return storage.datasetType.makeComponentDatasetType(component) + return parent_dataset_type.makeComponentDatasetType(component) def supportsIdGenerationMode(self, mode: DatasetIdGenEnum) -> bool: """Test whether the given dataset ID generation mode is supported by @@ -1044,8 +1044,7 @@ def insertDatasets( lsst.daf.butler.registry.MissingCollectionError Raised if ``run`` does not exist in the registry. """ - if not isinstance(datasetType, DatasetType): - datasetType = self.getDatasetType(datasetType) + datasetType = self._managers.datasets.conform_exact_dataset_type(datasetType) if run is None: if self.defaults.run is None: raise NoDefaultCollectionError( @@ -1070,7 +1069,7 @@ def insertDatasets( ] try: refs = list( - self._managers.datasets.insert(datasetType, runRecord, expandedDataIds, idGenerationMode) + self._managers.datasets.insert(datasetType.name, runRecord, expandedDataIds, idGenerationMode) ) if self._managers.obscore: self._managers.obscore.add_datasets(refs) @@ -1167,20 +1166,17 @@ def _importDatasets( ) assert isinstance(runRecord, RunRecord) - progress = Progress("daf.butler.Registry.insertDatasets", level=logging.DEBUG) + progress = Progress("daf.butler.Registry.importDatasets", level=logging.DEBUG) if expand: - expandedDatasets = [ - dataset.expanded(self.expandDataId(dataset.dataId, dimensions=datasetType.dimensions)) + data_ids = { + dataset.id: self.expandDataId(dataset.dataId, dimensions=datasetType.dimensions) for dataset in progress.wrap(datasets, f"Expanding {datasetType.name} data IDs") - ] + } else: - expandedDatasets = [ - DatasetRef(datasetType, dataset.dataId, id=dataset.id, run=dataset.run, conform=True) - for dataset in datasets - ] + data_ids = {dataset.id: dataset.dataId for dataset in datasets} try: - refs = list(self._managers.datasets.import_(datasetType, runRecord, expandedDatasets)) + refs = list(self._managers.datasets.import_(datasetType.name, runRecord, data_ids)) if self._managers.obscore: self._managers.obscore.add_datasets(refs) except sqlalchemy.exc.IntegrityError as err: @@ -1348,10 +1344,11 @@ def certify(self, collection: str, refs: Iterable[DatasetRef], timespan: Timespa Raised if the collection already contains a different dataset with the same `DatasetType` and data ID and an overlapping validity range. - lsst.daf.butler.registry.CollectionTypeError - Raised if ``collection`` is not a `~CollectionType.CALIBRATION` - collection or if one or more datasets are of a dataset type for - which `DatasetType.isCalibration` returns `False`. + DatasetTypeError + Raised if ``ref.datasetType.isCalibration() is False`` for any ref. + CollectionTypeError + Raised if + ``collection.type is not CollectionType.CALIBRATION``. """ progress = Progress("lsst.daf.butler.Registry.certify", level=logging.DEBUG) collectionRecord = self._managers.collections.find(collection) @@ -1397,9 +1394,11 @@ def decertify( Raises ------ - lsst.daf.butler.registry.CollectionTypeError - Raised if ``collection`` is not a `~CollectionType.CALIBRATION` - collection or if ``datasetType.isCalibration() is False``. + DatasetTypeError + Raised if ``datasetType.isCalibration() is False``. + CollectionTypeError + Raised if + ``collection.type is not CollectionType.CALIBRATION``. """ collectionRecord = self._managers.collections.find(collection) if isinstance(datasetType, str):