diff --git a/doc/changes/DM-41962.bugfix.md b/doc/changes/DM-41962.bugfix.md new file mode 100644 index 00000000..5aaa7587 --- /dev/null +++ b/doc/changes/DM-41962.bugfix.md @@ -0,0 +1,4 @@ +Fix a storage class bug in registering dataset types in ``pipetask run``. + +Prior to this fix, the presence of multiple storage classes being associated with the same dataset type in a pipeline could cause the registered dataset type's storage class to be random and nondeterministic in regular `pipetask run` execution (but not quantum-backed butler execution). +It now follows the rules set by `PipelineGraph`, in which the definition in the task that produces the dataset wins. diff --git a/python/lsst/ctrl/mpexec/preExecInit.py b/python/lsst/ctrl/mpexec/preExecInit.py index 1e00ca7d..94d190f9 100644 --- a/python/lsst/ctrl/mpexec/preExecInit.py +++ b/python/lsst/ctrl/mpexec/preExecInit.py @@ -44,6 +44,7 @@ from lsst.daf.butler import DatasetRef, DatasetType from lsst.daf.butler.registry import ConflictingDefinitionError from lsst.pipe.base import PipelineDatasetTypes +from lsst.pipe.base import automatic_connection_constants as acc from lsst.utils.packages import Packages if TYPE_CHECKING: @@ -389,14 +390,38 @@ def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool pipelineDatasetTypes = PipelineDatasetTypes.fromPipeline( pipeline, registry=self.full_butler.registry, include_configs=True, include_packages=True ) - - for datasetTypes, is_input in ( + # The "registry dataset types" saved with the QG have had their storage + # classes carefully resolved by PipelineGraph, whereas the dataset + # types from PipelineDatasetTypes are a mess because it uses + # NamedValueSet and that ignores storage classes. It will be fully + # removed here (and deprecated everywhere) on DM-40441. + # Note that these "registry dataset types" include dataset types that + # are not actually registered yet; they're the PipelineGraph's + # determination of what _should_ be registered. + registry_storage_classes = { + dataset_type.name: dataset_type.storageClass_name for dataset_type in graph.registryDatasetTypes() + } + registry_storage_classes[acc.PACKAGES_INIT_OUTPUT_NAME] = acc.PACKAGES_INIT_OUTPUT_STORAGE_CLASS + dataset_types: Iterable[DatasetType] + for dataset_types, is_input in ( (pipelineDatasetTypes.initIntermediates, True), (pipelineDatasetTypes.initOutputs, False), (pipelineDatasetTypes.intermediates, True), (pipelineDatasetTypes.outputs, False), ): - self._register_output_dataset_types(registerDatasetTypes, datasetTypes, is_input) + dataset_types = [ + ( + # The registry dataset types do not include components, but + # we don't support storage class overrides for those in + # other contexts anyway, and custom-built QGs may not have + # the registry dataset types field populated at all.x + dataset_type.overrideStorageClass(registry_storage_classes[dataset_type.name]) + if dataset_type.name in registry_storage_classes + else dataset_type + ) + for dataset_type in dataset_types + ] + self._register_output_dataset_types(registerDatasetTypes, dataset_types, is_input) def _register_output_dataset_types( self, registerDatasetTypes: bool, datasetTypes: Iterable[DatasetType], is_input: bool diff --git a/tests/test_simple_pipeline_executor.py b/tests/test_simple_pipeline_executor.py index 344d344f..6a7fe01b 100644 --- a/tests/test_simple_pipeline_executor.py +++ b/tests/test_simple_pipeline_executor.py @@ -165,7 +165,12 @@ def _configure_pipeline(self, config_a_cls, config_b_cls, storageClass_a=None, s return executor def _test_logs(self, log_output, input_type_a, output_type_a, input_type_b, output_type_b): - """Check the expected input types received by tasks A and B""" + """Check the expected input types received by tasks A and B. + + Note that these are the types as seen from the perspective of the task, + so they must be consistent with the task's connections, but may not be + consistent with the registry dataset types. + """ all_logs = "\n".join(log_output) self.assertIn(f"lsst.a:Run method given data of type: {input_type_a}", all_logs) self.assertIn(f"lsst.b:Run method given data of type: {input_type_b}", all_logs) @@ -191,12 +196,6 @@ def test_from_pipeline(self): def test_from_pipeline_intermediates_differ(self): """Run pipeline but intermediates definition in registry differs.""" - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, - NoDimensionsTestTask.ConfigClass, - storageClass_b="TaskMetadataLike", - ) - # Pre-define the "intermediate" storage class to be something that is # like a dict but is not a dict. This will fail unless storage # class conversion is supported in put and get. @@ -207,7 +206,11 @@ def test_from_pipeline_intermediates_differ(self): storageClass="TaskMetadataLike", ) ) - + executor = self._configure_pipeline( + NoDimensionsTestTask.ConfigClass, + NoDimensionsTestTask.ConfigClass, + storageClass_b="TaskMetadataLike", + ) with self.assertLogs("lsst", level="INFO") as cm: quanta = executor.run(register_dataset_types=True, save_versions=False) # A dict is given to task a without change. @@ -221,17 +224,11 @@ def test_from_pipeline_intermediates_differ(self): self._test_logs(cm.output, "dict", "dict", "dict", "lsst.pipe.base.TaskMetadata") self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate").to_dict(), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2}) + self.assertEqual(self.butler.get("intermediate"), TaskMetadata.from_dict({"zero": 0, "one": 1})) + self.assertEqual(self.butler.get("output"), TaskMetadata.from_dict({"zero": 0, "one": 1, "two": 2})) def test_from_pipeline_output_differ(self): """Run pipeline but output definition in registry differs.""" - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, - NoDimensionsTestTask.ConfigClass, - storageClass_a="TaskMetadataLike", - ) - # Pre-define the "output" storage class to be something that is # like a dict but is not a dict. This will fail unless storage # class conversion is supported in put and get. @@ -242,16 +239,21 @@ def test_from_pipeline_output_differ(self): storageClass="TaskMetadataLike", ) ) - + executor = self._configure_pipeline( + NoDimensionsTestTask.ConfigClass, + NoDimensionsTestTask.ConfigClass, + storageClass_a="TaskMetadataLike", + ) with self.assertLogs("lsst", level="INFO") as cm: quanta = executor.run(register_dataset_types=True, save_versions=False) - # a has been told to return a TaskMetadata but will convert to dict. + # a has been told to return a TaskMetadata but this will convert to + # dict on read by b. # b returns a dict and that is converted to TaskMetadata on put. self._test_logs(cm.output, "dict", "lsst.pipe.base.TaskMetadata", "dict", "dict") self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2}) + self.assertEqual(self.butler.get("intermediate"), TaskMetadata.from_dict({"zero": 0, "one": 1})) + self.assertEqual(self.butler.get("output"), TaskMetadata.from_dict({"zero": 0, "one": 1, "two": 2})) def test_from_pipeline_input_differ(self): """Run pipeline but input definition in registry differs.""" @@ -267,8 +269,11 @@ def test_from_pipeline_input_differ(self): self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) - def test_from_pipeline_incompatible(self): - """Run pipeline but definitions are not compatible.""" + def test_from_pipeline_inconsistent_dataset_types(self): + """Generate the QG (by initializing the executor), then register the + dataset type with a different storage class than the QG should have + predicted, to make sure execution fails as it should. + """ executor = self._configure_pipeline( NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass )