Skip to content

Commit

Permalink
Support storage class conversions of components in PipelineGraph.
Browse files Browse the repository at this point in the history
This can leave the component DatasetType/Ref seen by the
component-reading task with an unexpected parentStorageClass, but
we hope the task won't care.
  • Loading branch information
TallJimbo committed Sep 4, 2024
1 parent b007c62 commit c904bdf
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 18 deletions.
1 change: 1 addition & 0 deletions doc/changes/DM-46064.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Storage class conversions of component dataset types are now supported in pipelines.
28 changes: 15 additions & 13 deletions python/lsst/pipe/base/pipeline_graph/_edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from collections.abc import Callable, Mapping, Sequence
from typing import Any, ClassVar, Self, TypeVar

from lsst.daf.butler import DatasetRef, DatasetType, DimensionUniverse
from lsst.daf.butler import DatasetRef, DatasetType, DimensionUniverse, StorageClassFactory
from lsst.daf.butler.registry import MissingDatasetTypeError
from lsst.utils.classes import immutable

Expand Down Expand Up @@ -396,21 +396,15 @@ def diff(self: ReadEdge, other: ReadEdge, connection_type: str = "connection") -
def adapt_dataset_type(self, dataset_type: DatasetType) -> DatasetType:
# Docstring inherited.
if self.component is not None:
assert (
self.storage_class_name == dataset_type.storageClass.allComponents()[self.component].name
), "components with storage class overrides are not supported"
return dataset_type.makeComponentDatasetType(self.component)
dataset_type = dataset_type.makeComponentDatasetType(self.component)

Check warning on line 399 in python/lsst/pipe/base/pipeline_graph/_edges.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeline_graph/_edges.py#L399

Added line #L399 was not covered by tests
if self.storage_class_name != dataset_type.storageClass_name:
return dataset_type.overrideStorageClass(self.storage_class_name)
return dataset_type

def adapt_dataset_ref(self, ref: DatasetRef) -> DatasetRef:
# Docstring inherited.
if self.component is not None:
assert (
self.storage_class_name == ref.datasetType.storageClass.allComponents()[self.component].name
), "components with storage class overrides are not supported"
return ref.makeComponentRef(self.component)
ref = ref.makeComponentRef(self.component)

Check warning on line 407 in python/lsst/pipe/base/pipeline_graph/_edges.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeline_graph/_edges.py#L407

Added line #L407 was not covered by tests
if self.storage_class_name != ref.datasetType.storageClass_name:
return ref.overrideStorageClass(self.storage_class_name)
return ref
Expand Down Expand Up @@ -618,13 +612,21 @@ def report_current_origin() -> str:
f"which does not include component {self.component!r} "
f"as requested by task {self.task_label!r}."
)
if all_current_components[self.component].name != self.storage_class_name:
# Note that we can't actually make a fully-correct DatasetType
# for the component the task wants, because we don't have the
# parent storage class.
current_component = all_current_components[self.component]

Check warning on line 618 in python/lsst/pipe/base/pipeline_graph/_edges.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/pipe/base/pipeline_graph/_edges.py#L618

Added line #L618 was not covered by tests
if (
current_component.name != self.storage_class_name
and not StorageClassFactory()
.getStorageClass(self.storage_class_name)
.can_convert(current_component)
):
raise IncompatibleDatasetTypeError(
f"Dataset type '{self.parent_dataset_type_name}.{self.component}' has storage class "
f"{all_current_components[self.component].name!r} "
f"(from {report_current_origin()}), which does not match "
f"{self.storage_class_name!r}, as requested by task {self.task_label!r}. "
"Note that storage class conversions of components are not supported."
f"(from {report_current_origin()}), which cannot be converted to "
f"{self.storage_class_name!r}, as requested by task {self.task_label!r}."
)
return current, is_initial_query_constraint, is_prerequisite
else:
Expand Down
46 changes: 41 additions & 5 deletions tests/test_pipeline_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,11 +1022,11 @@ def _have_example_storage_classes() -> bool:
"""Check whether some storage classes work as expected.
Given that these have registered converters, it shouldn't actually be
necessary to import be able to those types in order to determine that
they're convertible, but the storage class machinery is implemented such
that types that can't be imported can't be converted, and while that's
inconvenient here it's totally fine in non-testing scenarios where you only
care about a storage class if you can actually use it.
necessary to import those types in order to determine that they're
convertible, but the storage class machinery is implemented such that types
that can't be imported can't be converted, and while that's inconvenient
here it's totally fine in non-testing scenarios where you only care about a
storage class if you can actually use it.
"""
getter = StorageClassFactory().getStorageClass
return (
Expand Down Expand Up @@ -1416,6 +1416,42 @@ def test_component_resolved_by_output(self) -> None:
self.assertEqual(graph.dataset_types["d"].generalize_ref(a_ref), ref)
self.assertEqual(graph.dataset_types["d"].generalize_ref(b_ref), ref)

@unittest.skipUnless(
_have_example_storage_classes(), "Arrow/Astropy/Pandas storage classes are not available."
)
def test_component_storage_class_converted(self) -> None:
"""Test successful resolution of a component dataset type due to
an output connection referencing the parent dataset type, but with a
different (convertible) storage class.
"""
self.a_config.outputs["o"] = DynamicConnectionConfig(dataset_type_name="d", storage_class="DataFrame")
self.b_config.inputs["i"] = DynamicConnectionConfig(

Check warning on line 1428 in tests/test_pipeline_graph.py

View check run for this annotation

Codecov / codecov/patch

tests/test_pipeline_graph.py#L1427-L1428

Added lines #L1427 - L1428 were not covered by tests
dataset_type_name="d.schema", storage_class="ArrowSchema"
)
graph = self.make_graph()
output_parent_dataset_type = DatasetType("d", self.dimensions.empty, get_mock_name("DataFrame"))
graph.resolve(MockRegistry(self.dimensions, {}))
self.assertEqual(graph.dataset_types["d"].dataset_type, output_parent_dataset_type)
a_o = graph.tasks["a"].outputs["o"]
b_i = graph.tasks["b"].inputs["i"]
self.assertEqual(b_i.dataset_type_name, "d.schema")
self.assertEqual(a_o.adapt_dataset_type(output_parent_dataset_type), output_parent_dataset_type)
self.assertEqual(

Check warning on line 1439 in tests/test_pipeline_graph.py

View check run for this annotation

Codecov / codecov/patch

tests/test_pipeline_graph.py#L1431-L1439

Added lines #L1431 - L1439 were not covered by tests
# We don't really want to compare the full dataset type here,
# because that's going to include a parentStorageClass that may or
# may not make sense.
b_i.adapt_dataset_type(output_parent_dataset_type).storageClass_name,
get_mock_name("ArrowSchema"),
)
data_id = DataCoordinate.make_empty(self.dimensions)
ref = DatasetRef(output_parent_dataset_type, data_id, run="r")
a_ref = a_o.adapt_dataset_ref(ref)
b_ref = b_i.adapt_dataset_ref(ref)
self.assertEqual(a_ref, ref)
self.assertEqual(b_ref.datasetType.storageClass_name, get_mock_name("ArrowSchema"))
self.assertEqual(graph.dataset_types["d"].generalize_ref(a_ref), ref)
self.assertEqual(graph.dataset_types["d"].generalize_ref(b_ref), ref)

Check warning on line 1453 in tests/test_pipeline_graph.py

View check run for this annotation

Codecov / codecov/patch

tests/test_pipeline_graph.py#L1446-L1453

Added lines #L1446 - L1453 were not covered by tests

@unittest.skipUnless(
_have_example_storage_classes(), "Arrow/Astropy/Pandas storage classes are not available."
)
Expand Down

0 comments on commit c904bdf

Please sign in to comment.