diff --git a/python/lsst/daf/butler/_dataset_ref.py b/python/lsst/daf/butler/_dataset_ref.py index 250e86e87e..9e25b98c14 100644 --- a/python/lsst/daf/butler/_dataset_ref.py +++ b/python/lsst/daf/butler/_dataset_ref.py @@ -34,13 +34,26 @@ "DatasetIdGenEnum", "DatasetRef", "SerializedDatasetRef", + "SerializedDatasetRefContainerV1", + "SerializedDatasetRefContainers", ] import enum +import logging import sys import uuid from collections.abc import Iterable, Mapping -from typing import TYPE_CHECKING, Any, ClassVar, Literal, Protocol, TypeAlias, runtime_checkable +from typing import ( + TYPE_CHECKING, + Annotated, + Any, + ClassVar, + Literal, + Protocol, + Self, + TypeAlias, + runtime_checkable, +) import pydantic from lsst.utils.classes import immutable @@ -50,7 +63,13 @@ from ._dataset_type import DatasetType, SerializedDatasetType from ._named import NamedKeyDict from .datastore.stored_file_info import StoredDatastoreItemInfo -from .dimensions import DataCoordinate, DimensionGroup, DimensionUniverse, SerializedDataCoordinate +from .dimensions import ( + DataCoordinate, + DimensionGroup, + DimensionUniverse, + SerializedDataCoordinate, + SerializedDataId, +) from .json import from_json_pydantic, to_json_pydantic from .persistence_context import PersistenceContextVars @@ -63,6 +82,9 @@ DatasetDatastoreRecords: TypeAlias = Mapping[str, list[StoredDatastoreItemInfo]] +_LOG = logging.getLogger(__name__) + + class AmbiguousDatasetError(Exception): """Raised when a `DatasetRef` is not resolved but should be. @@ -864,3 +886,153 @@ class associated with the dataset type of the other ref can be Cannot be changed after a `DatasetRef` is constructed. """ + + +class MinimalistSerializableDatasetRef(pydantic.BaseModel): + """Minimal information needed to define a DatasetRef. + + The ID is not included and is presumed to be the key to a mapping + to this information. + """ + + model_config = pydantic.ConfigDict(frozen=True) + + dataset_type_name: str + """Name of the dataset type.""" + + run: str + """Name of the RUN collection.""" + + data_id: SerializedDataId + """Data coordinate of this dataset.""" + + +class SerializedDatasetRefContainer(pydantic.BaseModel): + """Serializable model for a collection of DatasetRef. + + Dimension records are not included. + """ + + model_config = pydantic.ConfigDict(extra="allow", frozen=True) + container_version: str + + +class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer): + """Serializable model for a collection of DatasetRef. + + Dimension records are not included. + """ + + container_version: Literal["V1"] = "V1" + + universe_version: int + """Dimension universe version.""" + + universe_namespace: str + """Dimension universe namespace.""" + + dataset_types: dict[str, SerializedDatasetType] + """Dataset types indexed by their name.""" + + compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] + """Minimal dataset ref information indexed by UUID.""" + + def __len__(self) -> int: + """Return the number of datasets in the container.""" + return len(self.compact_refs) + + @classmethod + def from_refs(cls, refs: Iterable[DatasetRef]) -> Self: + """Construct a serializable form from a list of `DatasetRef`. + + Parameters + ---------- + refs : `~collections.abc.Iterable` [ `DatasetRef` ] + The datasets to include in the container. + """ + # The serialized DatasetRef contains a lot of duplicated information. + # We also want to drop dimension records and assume that the records + # are already in the registry. + universe: DimensionUniverse | None = None + dataset_types: dict[str, SerializedDatasetType] = {} + compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] = {} + for ref in refs: + simple_ref = ref.to_simple() + dataset_type = simple_ref.datasetType + assert dataset_type is not None # For mypy + if universe is None: + universe = ref.datasetType.dimensions.universe + if (name := dataset_type.name) not in dataset_types: + dataset_types[name] = dataset_type + data_id = simple_ref.dataId + assert data_id is not None # For mypy + compact_refs[simple_ref.id] = MinimalistSerializableDatasetRef( + dataset_type_name=name, run=simple_ref.run, data_id=data_id.dataId + ) + if universe: + universe_version = universe.version + universe_namespace = universe.namespace + else: + # No refs so no universe. + universe_version = 0 + universe_namespace = "unknown" + return cls( + universe_version=universe_version, + universe_namespace=universe_namespace, + dataset_types=dataset_types, + compact_refs=compact_refs, + ) + + def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]: + """Construct the original `DatasetRef`. + + Parameters + ---------- + universe : `DimensionUniverse` + The universe to use when constructing the `DatasetRef`. + + Returns + ------- + refs : `list` [ `DatasetRef` ] + The `DatasetRef` that were serialized. + """ + if not self.compact_refs: + return [] + + if universe.namespace != self.universe_namespace: + raise RuntimeError( + f"Can not convert to refs in universe {universe.namespace} that were created from " + f"universe {self.universe_namespace}" + ) + + if universe.version != self.universe_version: + _LOG.warning( + "Universe mismatch when attempting to reconstruct DatasetRef from serialized form. " + "Serialized with version %d but asked to use version %d.", + self.universe_version, + universe.version, + ) + + # Reconstruct the DatasetType objects. + dataset_types = { + name: DatasetType.from_simple(dtype, universe=universe) + for name, dtype in self.dataset_types.items() + } + refs: list[DatasetRef] = [] + for id_, minimal in self.compact_refs.items(): + simple_data_id = SerializedDataCoordinate(dataId=minimal.data_id) + data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe) + ref = DatasetRef( + id=id_, + run=minimal.run, + datasetType=dataset_types[minimal.dataset_type_name], + dataId=data_id, + ) + refs.append(ref) + return refs + + +SerializedDatasetRefContainers: TypeAlias = Annotated[ + SerializedDatasetRefContainerV1, + pydantic.Field(discriminator="container_version"), +] diff --git a/python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py b/python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py index 8672db0476..c143e82712 100644 --- a/python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py +++ b/python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py @@ -34,169 +34,21 @@ import uuid import zipfile from collections.abc import Iterable -from typing import Annotated, ClassVar, Literal, Protocol, Self, TypeAlias +from typing import ClassVar, Literal, Protocol, Self -from lsst.daf.butler import DatasetIdFactory, DatasetRef +from lsst.daf.butler import ( + DatasetIdFactory, + DatasetRef, + SerializedDatasetRefContainers, + SerializedDatasetRefContainerV1, +) from lsst.daf.butler.datastore.stored_file_info import SerializedStoredFileInfo from lsst.resources import ResourcePath, ResourcePathExpression -from pydantic import BaseModel, ConfigDict, Field - -from ..._dataset_type import DatasetType, SerializedDatasetType -from ...dimensions import DataCoordinate, DimensionUniverse, SerializedDataCoordinate, SerializedDataId +from pydantic import BaseModel _LOG = logging.getLogger(__name__) -class MinimalistDatasetRef(BaseModel): - """Minimal information needed to define a DatasetRef. - - The ID is not included and is presumed to be the key to a mapping - to this information. - """ - - model_config = ConfigDict(frozen=True) - - dataset_type_name: str - """Name of the dataset type.""" - - run: str - """Name of the RUN collection.""" - - data_id: SerializedDataId - """Data coordinate of this dataset.""" - - -class SerializedDatasetRefContainer(BaseModel): - """Serializable model for a collection of DatasetRef. - - Dimension records are not included. - """ - - model_config = ConfigDict(extra="allow", frozen=True) - container_version: str - - -class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer): - """Serializable model for a collection of DatasetRef. - - Dimension records are not included. - """ - - container_version: Literal["V1"] = "V1" - - universe_version: int - """Dimension universe version.""" - - universe_namespace: str - """Dimension universe namespace.""" - - dataset_types: dict[str, SerializedDatasetType] - """Dataset types indexed by their name.""" - - compact_refs: dict[uuid.UUID, MinimalistDatasetRef] - """Minimal dataset ref information indexed by UUID.""" - - def __len__(self) -> int: - """Return the number of datasets in the container.""" - return len(self.compact_refs) - - @classmethod - def from_refs(cls, refs: Iterable[DatasetRef]) -> Self: - """Construct a serializable form from a list of `DatasetRef`. - - Parameters - ---------- - refs : `~collections.abc.Iterable` [ `DatasetRef` ] - The datasets to include in the container. - """ - # The serialized DatasetRef contains a lot of duplicated information. - # We also want to drop dimension records and assume that the records - # are already in the registry. - universe: DimensionUniverse | None = None - dataset_types: dict[str, SerializedDatasetType] = {} - compact_refs: dict[uuid.UUID, MinimalistDatasetRef] = {} - for ref in refs: - simple_ref = ref.to_simple() - dataset_type = simple_ref.datasetType - assert dataset_type is not None # For mypy - if universe is None: - universe = ref.datasetType.dimensions.universe - if (name := dataset_type.name) not in dataset_types: - dataset_types[name] = dataset_type - data_id = simple_ref.dataId - assert data_id is not None # For mypy - compact_refs[simple_ref.id] = MinimalistDatasetRef( - dataset_type_name=name, run=simple_ref.run, data_id=data_id.dataId - ) - if universe: - universe_version = universe.version - universe_namespace = universe.namespace - else: - # No refs so no universe. - universe_version = 0 - universe_namespace = "unknown" - return cls( - universe_version=universe_version, - universe_namespace=universe_namespace, - dataset_types=dataset_types, - compact_refs=compact_refs, - ) - - def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]: - """Construct the original `DatasetRef`. - - Parameters - ---------- - universe : `DimensionUniverse` - The universe to use when constructing the `DatasetRef`. - - Returns - ------- - refs : `list` [ `DatasetRef` ] - The `DatasetRef` that were serialized. - """ - if not self.compact_refs: - return [] - - if universe.namespace != self.universe_namespace: - raise RuntimeError( - f"Can not convert to refs in universe {universe.namespace} that were created from " - f"universe {self.universe_namespace}" - ) - - if universe.version != self.universe_version: - _LOG.warning( - "Universe mismatch when attempting to reconstruct DatasetRef from serialized form. " - "Serialized with version %d but asked to use version %d.", - self.universe_version, - universe.version, - ) - - # Reconstruct the DatasetType objects. - dataset_types = { - name: DatasetType.from_simple(dtype, universe=universe) - for name, dtype in self.dataset_types.items() - } - refs: list[DatasetRef] = [] - for id_, minimal in self.compact_refs.items(): - simple_data_id = SerializedDataCoordinate(dataId=minimal.data_id) - data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe) - ref = DatasetRef( - id=id_, - run=minimal.run, - datasetType=dataset_types[minimal.dataset_type_name], - dataId=data_id, - ) - refs.append(ref) - return refs - - -SerializedDatasetRefContainers: TypeAlias = Annotated[ - SerializedDatasetRefContainerV1, - Field(discriminator="container_version"), -] - - class ArtifactIndexInfo(BaseModel): """Information related to an artifact in an index.""" diff --git a/tests/test_datasets.py b/tests/test_datasets.py index df5b9725bd..0e8123254e 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -38,6 +38,7 @@ DimensionConfig, DimensionUniverse, FileDataset, + SerializedDatasetRefContainerV1, StorageClass, StorageClassFactory, ) @@ -744,6 +745,16 @@ def testFileDataset(self) -> None: with self.assertRaises(ValueError): FileDataset(path="other.yaml", refs=[ref, ref2]) + def test_container(self) -> None: + ref1 = DatasetRef(self.datasetType, self.dataId, run="somerun") + ref2 = ref1.replace(run="somerun2") + + container = SerializedDatasetRefContainerV1.from_refs([ref1, ref2]) + self.assertEqual(len(container), 2) + + new_refs = container.to_refs(universe=self.universe) + self.assertEqual(new_refs, [ref1, ref2]) + class ZipIndexTestCase(unittest.TestCase): """Test that a ZipIndex can be read."""