Skip to content

Commit

Permalink
Make SerializedDatasetRefContainerV1 public
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Oct 30, 2024
1 parent bb9bab4 commit 8b8c250
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 158 deletions.
176 changes: 174 additions & 2 deletions python/lsst/daf/butler/_dataset_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,26 @@
"DatasetIdGenEnum",
"DatasetRef",
"SerializedDatasetRef",
"SerializedDatasetRefContainerV1",
"SerializedDatasetRefContainers",
]

import enum
import logging
import sys
import uuid
from collections.abc import Iterable, Mapping
from typing import TYPE_CHECKING, Any, ClassVar, Literal, Protocol, TypeAlias, runtime_checkable
from typing import (
TYPE_CHECKING,
Annotated,
Any,
ClassVar,
Literal,
Protocol,
Self,
TypeAlias,
runtime_checkable,
)

import pydantic
from lsst.utils.classes import immutable
Expand All @@ -50,7 +63,13 @@
from ._dataset_type import DatasetType, SerializedDatasetType
from ._named import NamedKeyDict
from .datastore.stored_file_info import StoredDatastoreItemInfo
from .dimensions import DataCoordinate, DimensionGroup, DimensionUniverse, SerializedDataCoordinate
from .dimensions import (
DataCoordinate,
DimensionGroup,
DimensionUniverse,
SerializedDataCoordinate,
SerializedDataId,
)
from .json import from_json_pydantic, to_json_pydantic
from .persistence_context import PersistenceContextVars

Expand All @@ -63,6 +82,9 @@
DatasetDatastoreRecords: TypeAlias = Mapping[str, list[StoredDatastoreItemInfo]]


_LOG = logging.getLogger(__name__)


class AmbiguousDatasetError(Exception):
"""Raised when a `DatasetRef` is not resolved but should be.
Expand Down Expand Up @@ -864,3 +886,153 @@ class associated with the dataset type of the other ref can be
Cannot be changed after a `DatasetRef` is constructed.
"""


class MinimalistSerializableDatasetRef(pydantic.BaseModel):
"""Minimal information needed to define a DatasetRef.
The ID is not included and is presumed to be the key to a mapping
to this information.
"""

model_config = pydantic.ConfigDict(frozen=True)

dataset_type_name: str
"""Name of the dataset type."""

run: str
"""Name of the RUN collection."""

data_id: SerializedDataId
"""Data coordinate of this dataset."""


class SerializedDatasetRefContainer(pydantic.BaseModel):
"""Serializable model for a collection of DatasetRef.
Dimension records are not included.
"""

model_config = pydantic.ConfigDict(extra="allow", frozen=True)
container_version: str


class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer):
"""Serializable model for a collection of DatasetRef.
Dimension records are not included.
"""

container_version: Literal["V1"] = "V1"

universe_version: int
"""Dimension universe version."""

universe_namespace: str
"""Dimension universe namespace."""

dataset_types: dict[str, SerializedDatasetType]
"""Dataset types indexed by their name."""

compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef]
"""Minimal dataset ref information indexed by UUID."""

def __len__(self) -> int:
"""Return the number of datasets in the container."""
return len(self.compact_refs)

@classmethod
def from_refs(cls, refs: Iterable[DatasetRef]) -> Self:
"""Construct a serializable form from a list of `DatasetRef`.
Parameters
----------
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
The datasets to include in the container.
"""
# The serialized DatasetRef contains a lot of duplicated information.
# We also want to drop dimension records and assume that the records
# are already in the registry.
universe: DimensionUniverse | None = None
dataset_types: dict[str, SerializedDatasetType] = {}
compact_refs: dict[uuid.UUID, MinimalistSerializableDatasetRef] = {}
for ref in refs:
simple_ref = ref.to_simple()
dataset_type = simple_ref.datasetType
assert dataset_type is not None # For mypy
if universe is None:
universe = ref.datasetType.dimensions.universe
if (name := dataset_type.name) not in dataset_types:
dataset_types[name] = dataset_type
data_id = simple_ref.dataId
assert data_id is not None # For mypy
compact_refs[simple_ref.id] = MinimalistSerializableDatasetRef(
dataset_type_name=name, run=simple_ref.run, data_id=data_id.dataId
)
if universe:
universe_version = universe.version
universe_namespace = universe.namespace
else:
# No refs so no universe.
universe_version = 0
universe_namespace = "unknown"
return cls(
universe_version=universe_version,
universe_namespace=universe_namespace,
dataset_types=dataset_types,
compact_refs=compact_refs,
)

def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]:
"""Construct the original `DatasetRef`.
Parameters
----------
universe : `DimensionUniverse`
The universe to use when constructing the `DatasetRef`.
Returns
-------
refs : `list` [ `DatasetRef` ]
The `DatasetRef` that were serialized.
"""
if not self.compact_refs:
return []

if universe.namespace != self.universe_namespace:
raise RuntimeError(
f"Can not convert to refs in universe {universe.namespace} that were created from "
f"universe {self.universe_namespace}"
)

if universe.version != self.universe_version:
_LOG.warning(
"Universe mismatch when attempting to reconstruct DatasetRef from serialized form. "
"Serialized with version %d but asked to use version %d.",
self.universe_version,
universe.version,
)

# Reconstruct the DatasetType objects.
dataset_types = {
name: DatasetType.from_simple(dtype, universe=universe)
for name, dtype in self.dataset_types.items()
}
refs: list[DatasetRef] = []
for id_, minimal in self.compact_refs.items():
simple_data_id = SerializedDataCoordinate(dataId=minimal.data_id)
data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe)
ref = DatasetRef(
id=id_,
run=minimal.run,
datasetType=dataset_types[minimal.dataset_type_name],
dataId=data_id,
)
refs.append(ref)
return refs


SerializedDatasetRefContainers: TypeAlias = Annotated[
SerializedDatasetRefContainerV1,
pydantic.Field(discriminator="container_version"),
]
164 changes: 8 additions & 156 deletions python/lsst/daf/butler/datastores/file_datastore/retrieve_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,169 +34,21 @@
import uuid
import zipfile
from collections.abc import Iterable
from typing import Annotated, ClassVar, Literal, Protocol, Self, TypeAlias
from typing import ClassVar, Literal, Protocol, Self

from lsst.daf.butler import DatasetIdFactory, DatasetRef
from lsst.daf.butler import (
DatasetIdFactory,
DatasetRef,
SerializedDatasetRefContainers,
SerializedDatasetRefContainerV1,
)
from lsst.daf.butler.datastore.stored_file_info import SerializedStoredFileInfo
from lsst.resources import ResourcePath, ResourcePathExpression
from pydantic import BaseModel, ConfigDict, Field

from ..._dataset_type import DatasetType, SerializedDatasetType
from ...dimensions import DataCoordinate, DimensionUniverse, SerializedDataCoordinate, SerializedDataId
from pydantic import BaseModel

_LOG = logging.getLogger(__name__)


class MinimalistDatasetRef(BaseModel):
"""Minimal information needed to define a DatasetRef.
The ID is not included and is presumed to be the key to a mapping
to this information.
"""

model_config = ConfigDict(frozen=True)

dataset_type_name: str
"""Name of the dataset type."""

run: str
"""Name of the RUN collection."""

data_id: SerializedDataId
"""Data coordinate of this dataset."""


class SerializedDatasetRefContainer(BaseModel):
"""Serializable model for a collection of DatasetRef.
Dimension records are not included.
"""

model_config = ConfigDict(extra="allow", frozen=True)
container_version: str


class SerializedDatasetRefContainerV1(SerializedDatasetRefContainer):
"""Serializable model for a collection of DatasetRef.
Dimension records are not included.
"""

container_version: Literal["V1"] = "V1"

universe_version: int
"""Dimension universe version."""

universe_namespace: str
"""Dimension universe namespace."""

dataset_types: dict[str, SerializedDatasetType]
"""Dataset types indexed by their name."""

compact_refs: dict[uuid.UUID, MinimalistDatasetRef]
"""Minimal dataset ref information indexed by UUID."""

def __len__(self) -> int:
"""Return the number of datasets in the container."""
return len(self.compact_refs)

@classmethod
def from_refs(cls, refs: Iterable[DatasetRef]) -> Self:
"""Construct a serializable form from a list of `DatasetRef`.
Parameters
----------
refs : `~collections.abc.Iterable` [ `DatasetRef` ]
The datasets to include in the container.
"""
# The serialized DatasetRef contains a lot of duplicated information.
# We also want to drop dimension records and assume that the records
# are already in the registry.
universe: DimensionUniverse | None = None
dataset_types: dict[str, SerializedDatasetType] = {}
compact_refs: dict[uuid.UUID, MinimalistDatasetRef] = {}
for ref in refs:
simple_ref = ref.to_simple()
dataset_type = simple_ref.datasetType
assert dataset_type is not None # For mypy
if universe is None:
universe = ref.datasetType.dimensions.universe
if (name := dataset_type.name) not in dataset_types:
dataset_types[name] = dataset_type
data_id = simple_ref.dataId
assert data_id is not None # For mypy
compact_refs[simple_ref.id] = MinimalistDatasetRef(
dataset_type_name=name, run=simple_ref.run, data_id=data_id.dataId
)
if universe:
universe_version = universe.version
universe_namespace = universe.namespace
else:
# No refs so no universe.
universe_version = 0
universe_namespace = "unknown"
return cls(
universe_version=universe_version,
universe_namespace=universe_namespace,
dataset_types=dataset_types,
compact_refs=compact_refs,
)

def to_refs(self, universe: DimensionUniverse) -> list[DatasetRef]:
"""Construct the original `DatasetRef`.
Parameters
----------
universe : `DimensionUniverse`
The universe to use when constructing the `DatasetRef`.
Returns
-------
refs : `list` [ `DatasetRef` ]
The `DatasetRef` that were serialized.
"""
if not self.compact_refs:
return []

if universe.namespace != self.universe_namespace:
raise RuntimeError(
f"Can not convert to refs in universe {universe.namespace} that were created from "
f"universe {self.universe_namespace}"
)

if universe.version != self.universe_version:
_LOG.warning(
"Universe mismatch when attempting to reconstruct DatasetRef from serialized form. "
"Serialized with version %d but asked to use version %d.",
self.universe_version,
universe.version,
)

# Reconstruct the DatasetType objects.
dataset_types = {
name: DatasetType.from_simple(dtype, universe=universe)
for name, dtype in self.dataset_types.items()
}
refs: list[DatasetRef] = []
for id_, minimal in self.compact_refs.items():
simple_data_id = SerializedDataCoordinate(dataId=minimal.data_id)
data_id = DataCoordinate.from_simple(simple=simple_data_id, universe=universe)
ref = DatasetRef(
id=id_,
run=minimal.run,
datasetType=dataset_types[minimal.dataset_type_name],
dataId=data_id,
)
refs.append(ref)
return refs


SerializedDatasetRefContainers: TypeAlias = Annotated[
SerializedDatasetRefContainerV1,
Field(discriminator="container_version"),
]


class ArtifactIndexInfo(BaseModel):
"""Information related to an artifact in an index."""

Expand Down
Loading

0 comments on commit 8b8c250

Please sign in to comment.