Skip to content

Commit

Permalink
Merge pull request #875 from lsst/tickets/DM-40053
Browse files Browse the repository at this point in the history
DM-40053: Add Datastore records to DatasetRef and use them in butler.get
  • Loading branch information
andy-slac authored Oct 17, 2023
2 parents 73579f0 + 2a1b9c1 commit 828e697
Show file tree
Hide file tree
Showing 18 changed files with 801 additions and 362 deletions.
26 changes: 23 additions & 3 deletions python/lsst/daf/butler/_butler.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ def __init__(
self._datastore = Datastore.fromConfig(
self._config, self._registry.getDatastoreBridgeManager(), butlerRoot=butlerRoot
)
# TODO: Once datastore drops dependency on registry we can
# construct datastore first and pass opaque tables to registry
# constructor.
self._registry.make_datastore_tables(self._datastore.get_opaque_table_definitions())
self.storageClasses = StorageClassFactory()
self.storageClasses.addFromConfig(self._config)
except Exception:
Expand Down Expand Up @@ -1004,6 +1008,7 @@ def _findDatasetRef(
collections: Any = None,
predict: bool = False,
run: str | None = None,
datastore_records: bool = False,
**kwargs: Any,
) -> DatasetRef:
"""Shared logic for methods that start with a search for a dataset in
Expand All @@ -1029,6 +1034,8 @@ def _findDatasetRef(
run : `str`, optional
Run collection name to use for creating `DatasetRef` for predicted
datasets. Only used if ``predict`` is `True`.
datastore_records : `bool`, optional
If `True` add datastore records to returned `DatasetRef`.
**kwargs
Additional keyword arguments used to augment or construct a
`DataId`. See `DataId` parameters.
Expand All @@ -1055,6 +1062,9 @@ def _findDatasetRef(
if isinstance(datasetRefOrType, DatasetRef):
if collections is not None:
warnings.warn("Collections should not be specified with DatasetRef", stacklevel=3)
# May need to retrieve datastore records if requested.
if datastore_records and datasetRefOrType._datastore_records is None:
datasetRefOrType = self._registry.get_datastore_records(datasetRefOrType)
return datasetRefOrType
timespan: Timespan | None = None

Expand All @@ -1081,7 +1091,13 @@ def _findDatasetRef(
)
# Always lookup the DatasetRef, even if one is given, to ensure it is
# present in the current collection.
ref = self._registry.findDataset(datasetType, dataId, collections=collections, timespan=timespan)
ref = self._registry.findDataset(
datasetType,
dataId,
collections=collections,
timespan=timespan,
datastore_records=datastore_records,
)
if ref is None:
if predict:
if run is None:
Expand All @@ -1102,7 +1118,9 @@ def _findDatasetRef(
# registry definition. The DatasetRef must therefore be recreated
# using the user definition such that the expected type is
# returned.
ref = DatasetRef(datasetType, ref.dataId, run=ref.run, id=ref.id)
ref = DatasetRef(
datasetType, ref.dataId, run=ref.run, id=ref.id, datastore_records=ref._datastore_records
)

return ref

Expand Down Expand Up @@ -1419,7 +1437,9 @@ def get(
``exposure`` is a temporal dimension.
"""
log.debug("Butler get: %s, dataId=%s, parameters=%s", datasetRefOrType, dataId, parameters)
ref = self._findDatasetRef(datasetRefOrType, dataId, collections=collections, **kwargs)
ref = self._findDatasetRef(
datasetRefOrType, dataId, collections=collections, datastore_records=True, **kwargs
)
return self._datastore.get(ref, parameters=parameters, storageClass=storageClass)

def getURIs(
Expand Down
81 changes: 74 additions & 7 deletions python/lsst/daf/butler/_dataset_ref.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

__all__ = [
"AmbiguousDatasetError",
"DatasetDatastoreRecords",
"DatasetId",
"DatasetIdFactory",
"DatasetIdGenEnum",
Expand All @@ -38,8 +39,8 @@
import enum
import sys
import uuid
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, ClassVar, Protocol, TypeAlias, runtime_checkable
from collections.abc import Iterable, Mapping
from typing import TYPE_CHECKING, Any, ClassVar, Literal, Protocol, TypeAlias, runtime_checkable

import pydantic
from lsst.daf.butler._compat import PYDANTIC_V2, _BaseModelCompat
Expand All @@ -49,6 +50,7 @@
from ._config_support import LookupKey
from ._dataset_type import DatasetType, SerializedDatasetType
from ._named import NamedKeyDict
from .datastore.stored_file_info import StoredDatastoreItemInfo
from .dimensions import DataCoordinate, DimensionGraph, DimensionUniverse, SerializedDataCoordinate
from .json import from_json_pydantic, to_json_pydantic
from .persistence_context import PersistenceContextVars
Expand All @@ -57,6 +59,10 @@
from ._storage_class import StorageClass
from .registry import Registry

# Per-dataset records grouped by opaque table name, usually there is just one
# opaque table.
DatasetDatastoreRecords: TypeAlias = Mapping[str, Iterable[StoredDatastoreItemInfo]]


class AmbiguousDatasetError(Exception):
"""Raised when a `DatasetRef` is not resolved but should be.
Expand Down Expand Up @@ -176,6 +182,10 @@ def makeDatasetId(
# This is constant, so don't recreate a set for each instance
_serializedDatasetRefFieldsSet = {"id", "datasetType", "dataId", "run", "component"}

# Serialized representation of StoredDatastoreItemInfo collection, first item
# is the record class name.
_DatastoreRecords: TypeAlias = tuple[str, list[Mapping[str, Any]]]


class SerializedDatasetRef(_BaseModelCompat):
"""Simplified model of a `DatasetRef` suitable for serialization."""
Expand All @@ -185,6 +195,8 @@ class SerializedDatasetRef(_BaseModelCompat):
dataId: SerializedDataCoordinate | None = None
run: StrictStr | None = None
component: StrictStr | None = None
datastore_records: Mapping[str, _DatastoreRecords] | None = None
"""Maps opaque table name to datastore records."""

if PYDANTIC_V2:
# Can not use "after" validator since in some cases the validator
Expand Down Expand Up @@ -225,6 +237,7 @@ def direct(
datasetType: dict[str, Any] | None = None,
dataId: dict[str, Any] | None = None,
component: str | None = None,
datastore_records: Mapping[str, _DatastoreRecords] | None = None,
) -> SerializedDatasetRef:
"""Construct a `SerializedDatasetRef` directly without validators.
Expand All @@ -251,6 +264,7 @@ def direct(
dataId=serialized_dataId,
run=sys.intern(run),
component=component,
datastore_records=datastore_records,
)

return node
Expand Down Expand Up @@ -306,6 +320,7 @@ class DatasetRef:
"datasetType",
"dataId",
"run",
"_datastore_records",
)

def __init__(
Expand All @@ -317,6 +332,7 @@ def __init__(
id: DatasetId | None = None,
conform: bool = True,
id_generation_mode: DatasetIdGenEnum = DatasetIdGenEnum.UNIQUE,
datastore_records: DatasetDatastoreRecords | None = None,
):
self.datasetType = datasetType
if conform:
Expand All @@ -332,6 +348,7 @@ def __init__(
.makeDatasetId(self.run, self.datasetType, self.dataId, id_generation_mode)
.int
)
self._datastore_records = datastore_records

@property
def id(self) -> DatasetId:
Expand Down Expand Up @@ -413,11 +430,19 @@ def to_simple(self, minimal: bool = False) -> SerializedDatasetRef:
simple["component"] = self.datasetType.component()
return SerializedDatasetRef(**simple)

datastore_records: Mapping[str, _DatastoreRecords] | None = None
if self._datastore_records is not None:
datastore_records = {}
for opaque_name, records in self._datastore_records.items():
class_name, record_dicts = StoredDatastoreItemInfo.to_records(records)
datastore_records[opaque_name] = class_name, list(record_dicts)

return SerializedDatasetRef(
datasetType=self.datasetType.to_simple(minimal=minimal),
dataId=self.dataId.to_simple(),
run=self.run,
id=self.id,
datastore_records=datastore_records,
)

@classmethod
Expand Down Expand Up @@ -512,7 +537,21 @@ def from_simple(
f"Encountered with {simple!r}{dstr}."
)

newRef = cls(datasetType, dataId, id=simple.id, run=simple.run)
# rebuild datastore records
datastore_records: DatasetDatastoreRecords | None = None
if simple.datastore_records is not None:
datastore_records = {}
for opaque_name, (class_name, records) in simple.datastore_records.items():
infos = StoredDatastoreItemInfo.from_records(class_name, records)
datastore_records[opaque_name] = infos

newRef = cls(
datasetType,
dataId,
id=simple.id,
run=simple.run,
datastore_records=datastore_records,
)
if cache is not None:
cache[key] = newRef
return newRef
Expand All @@ -527,16 +566,20 @@ def _unpickle(
dataId: DataCoordinate,
id: DatasetId,
run: str,
datastore_records: DatasetDatastoreRecords | None,
) -> DatasetRef:
"""Create new `DatasetRef`.
A custom factory method for use by `__reduce__` as a workaround for
its lack of support for keyword arguments.
"""
return cls(datasetType, dataId, id=id, run=run)
return cls(datasetType, dataId, id=id, run=run, datastore_records=datastore_records)

def __reduce__(self) -> tuple:
return (self._unpickle, (self.datasetType, self.dataId, self.id, self.run))
return (
self._unpickle,
(self.datasetType, self.dataId, self.id, self.run, self._datastore_records),
)

def __deepcopy__(self, memo: dict) -> DatasetRef:
# DatasetRef is recursively immutable; see note in @immutable
Expand All @@ -559,7 +602,12 @@ def expanded(self, dataId: DataCoordinate) -> DatasetRef:
"""
assert dataId == self.dataId
return DatasetRef(
datasetType=self.datasetType, dataId=dataId, id=self.id, run=self.run, conform=False
datasetType=self.datasetType,
dataId=dataId,
id=self.id,
run=self.run,
conform=False,
datastore_records=self._datastore_records,
)

def isComponent(self) -> bool:
Expand Down Expand Up @@ -669,7 +717,12 @@ def makeCompositeRef(self) -> DatasetRef:
# Assume that the data ID does not need to be standardized
# and should match whatever this ref already has.
return DatasetRef(
self.datasetType.makeCompositeDatasetType(), self.dataId, id=self.id, run=self.run, conform=False
self.datasetType.makeCompositeDatasetType(),
self.dataId,
id=self.id,
run=self.run,
conform=False,
datastore_records=self._datastore_records,
)

def makeComponentRef(self, name: str) -> DatasetRef:
Expand All @@ -695,6 +748,7 @@ def makeComponentRef(self, name: str) -> DatasetRef:
id=self.id,
run=self.run,
conform=False,
datastore_records=self._datastore_records,
)

def overrideStorageClass(self, storageClass: str | StorageClass) -> DatasetRef:
Expand All @@ -720,6 +774,7 @@ def replace(
id: DatasetId | None = None,
run: str | None = None,
storage_class: str | StorageClass | None = None,
datastore_records: DatasetDatastoreRecords | None | Literal[False] = False,
) -> DatasetRef:
"""Create a new `DatasetRef` from this one, but with some modified
attributes.
Expand All @@ -734,12 +789,17 @@ def replace(
storage_class : `str` or `StorageClass` or `None`.
The new storage class. If not `None`, replaces existing storage
class.
datastore_records : `DatasetDatastoreRecords` or `None`
New datastore records. If `None` remove all records. By default
datastore records are preserved.
Returns
-------
modified : `DatasetRef`
A new dataset reference with updated attributes.
"""
if datastore_records is False:
datastore_records = self._datastore_records
if storage_class is None:
datasetType = self.datasetType
else:
Expand All @@ -755,6 +815,7 @@ def replace(
run=run,
id=id,
conform=False,
datastore_records=datastore_records,
)

def is_compatible_with(self, ref: DatasetRef) -> bool:
Expand Down Expand Up @@ -821,3 +882,9 @@ class associated with the dataset type of the other ref can be
Cannot be changed after a `DatasetRef` is constructed.
"""

datastore_records: DatasetDatastoreRecords | None
"""Optional datastore records (`DatasetDatastoreRecords`).
Cannot be changed after a `DatasetRef` is constructed.
"""
15 changes: 7 additions & 8 deletions python/lsst/daf/butler/_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,28 @@
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import Iterator, Mapping, Set
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, TypeAlias

from lsst.utils.introspection import get_full_type_name

from ._config import Config
from ._config_support import LookupKey, processLookupConfigs
from ._dataset_ref import DatasetRef
from ._dataset_type import DatasetType
from ._file_descriptor import FileDescriptor
from ._location import Location
from ._storage_class import StorageClass
from .dimensions import DimensionUniverse
from .mapping_factory import MappingFactory

log = logging.getLogger(__name__)

# Define a new special type for functions that take "entity"
Entity = DatasetType | DatasetRef | StorageClass | str


if TYPE_CHECKING:
from ._dataset_ref import DatasetRef
from ._dataset_type import DatasetType
from ._storage_class import StorageClass
from .dimensions import DataCoordinate

# Define a new special type for functions that take "entity"
Entity: TypeAlias = DatasetType | DatasetRef | StorageClass | str


class Formatter(metaclass=ABCMeta):
"""Interface for reading and writing Datasets.
Expand Down
Loading

0 comments on commit 828e697

Please sign in to comment.