From dd2f1470a54e3b9eb0e9e465480dceb1cfc0424b Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 1 Dec 2023 13:23:01 -0500 Subject: [PATCH 1/4] Use storage classes from QG in PreExecInit. PipelineDatasetTypes does preserve storage classes (that's why it's being deprecated). --- doc/changes/DM-41962.bugfix.md | 4 ++++ python/lsst/ctrl/mpexec/preExecInit.py | 30 +++++++++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) create mode 100644 doc/changes/DM-41962.bugfix.md diff --git a/doc/changes/DM-41962.bugfix.md b/doc/changes/DM-41962.bugfix.md new file mode 100644 index 00000000..5aaa7587 --- /dev/null +++ b/doc/changes/DM-41962.bugfix.md @@ -0,0 +1,4 @@ +Fix a storage class bug in registering dataset types in ``pipetask run``. + +Prior to this fix, the presence of multiple storage classes being associated with the same dataset type in a pipeline could cause the registered dataset type's storage class to be random and nondeterministic in regular `pipetask run` execution (but not quantum-backed butler execution). +It now follows the rules set by `PipelineGraph`, in which the definition in the task that produces the dataset wins. diff --git a/python/lsst/ctrl/mpexec/preExecInit.py b/python/lsst/ctrl/mpexec/preExecInit.py index 1e00ca7d..90718296 100644 --- a/python/lsst/ctrl/mpexec/preExecInit.py +++ b/python/lsst/ctrl/mpexec/preExecInit.py @@ -44,6 +44,7 @@ from lsst.daf.butler import DatasetRef, DatasetType from lsst.daf.butler.registry import ConflictingDefinitionError from lsst.pipe.base import PipelineDatasetTypes +from lsst.pipe.base import automatic_connection_constants as acc from lsst.utils.packages import Packages if TYPE_CHECKING: @@ -389,14 +390,37 @@ def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool pipelineDatasetTypes = PipelineDatasetTypes.fromPipeline( pipeline, registry=self.full_butler.registry, include_configs=True, include_packages=True ) - - for datasetTypes, is_input in ( + # The "registry dataset types" saved with the QG have had their storage + # classes carefully resolved by PipelineGraph, whereas the dataset + # types from PipelineDatasetTypes are a mess because it uses + # NamedValueSet and that ignores storage classes. It will be fully + # removed here (and deprecated everywhere) on DM-40441. + # Note that these "registry dataset types" include dataset types that + # are not actually registered yet; they're the PipelineGraph's + # determination of what _should_ be registered. + registry_storage_classes = { + dataset_type.name: dataset_type.storageClass_name for dataset_type in graph.registryDatasetTypes() + } + registry_storage_classes[acc.PACKAGES_INIT_OUTPUT_NAME] = acc.PACKAGES_INIT_OUTPUT_STORAGE_CLASS + dataset_types: Iterable[DatasetType] + for dataset_types, is_input in ( (pipelineDatasetTypes.initIntermediates, True), (pipelineDatasetTypes.initOutputs, False), (pipelineDatasetTypes.intermediates, True), (pipelineDatasetTypes.outputs, False), ): - self._register_output_dataset_types(registerDatasetTypes, datasetTypes, is_input) + dataset_types = [ + ( + # The registry dataset types do not include components, + # but we don't support storage class overrides for those + # in other contexts anyway. + dataset_type.overrideStorageClass(registry_storage_classes[dataset_type.name]) + if not dataset_type.isComponent() + else dataset_type + ) + for dataset_type in dataset_types + ] + self._register_output_dataset_types(registerDatasetTypes, dataset_types, is_input) def _register_output_dataset_types( self, registerDatasetTypes: bool, datasetTypes: Iterable[DatasetType], is_input: bool From ae2ae8553913aba1547f0bf5eb9fdc7fca2e4151 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 1 Dec 2023 15:35:03 -0500 Subject: [PATCH 2/4] Fix unit test that was looking for the wrong behavior. First problem was just that the test assumed it could register a dataset type after the QG was made and not run into trouble when running that QG. Second problem was that it was just expecting something other than what its own code comments suggested, which was also contrary to correct behavior. --- tests/test_simple_pipeline_executor.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/test_simple_pipeline_executor.py b/tests/test_simple_pipeline_executor.py index 344d344f..ec9bf58a 100644 --- a/tests/test_simple_pipeline_executor.py +++ b/tests/test_simple_pipeline_executor.py @@ -226,12 +226,6 @@ def test_from_pipeline_intermediates_differ(self): def test_from_pipeline_output_differ(self): """Run pipeline but output definition in registry differs.""" - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, - NoDimensionsTestTask.ConfigClass, - storageClass_a="TaskMetadataLike", - ) - # Pre-define the "output" storage class to be something that is # like a dict but is not a dict. This will fail unless storage # class conversion is supported in put and get. @@ -243,14 +237,21 @@ def test_from_pipeline_output_differ(self): ) ) + executor = self._configure_pipeline( + NoDimensionsTestTask.ConfigClass, + NoDimensionsTestTask.ConfigClass, + storageClass_a="TaskMetadataLike", + ) + with self.assertLogs("lsst", level="INFO") as cm: quanta = executor.run(register_dataset_types=True, save_versions=False) - # a has been told to return a TaskMetadata but will convert to dict. + # a has been told to return a TaskMetadata but this will convert to + # dict on read by b. # b returns a dict and that is converted to TaskMetadata on put. self._test_logs(cm.output, "dict", "lsst.pipe.base.TaskMetadata", "dict", "dict") self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) + self.assertEqual(self.butler.get("intermediate").to_dict(), {"zero": 0, "one": 1}) self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2}) def test_from_pipeline_input_differ(self): From 1a059970a1bb3c336a50d3405c1a395ca65e509e Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 1 Dec 2023 16:15:22 -0500 Subject: [PATCH 3/4] Clean up storage class conversion tests. This includes: - Using TaskMetadata.from_dict instead of to_dict in comparisons to make the intent clearer. - Documenting what the log-inspection utility method actually tests. - Making one test (test_from_pipeline_intermediates_differ) registering a dataset type before building a QG because that's what's needed in general for correctness (even though it didn't matter here). - Renaming and re-documenting another test (test_from_pipeline_inconsistent_dataset_types) where we were actually testing that building the QG and then changing a dataset type out from under it can be a problem. --- tests/test_simple_pipeline_executor.py | 36 ++++++++++++++------------ 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/tests/test_simple_pipeline_executor.py b/tests/test_simple_pipeline_executor.py index ec9bf58a..6a7fe01b 100644 --- a/tests/test_simple_pipeline_executor.py +++ b/tests/test_simple_pipeline_executor.py @@ -165,7 +165,12 @@ def _configure_pipeline(self, config_a_cls, config_b_cls, storageClass_a=None, s return executor def _test_logs(self, log_output, input_type_a, output_type_a, input_type_b, output_type_b): - """Check the expected input types received by tasks A and B""" + """Check the expected input types received by tasks A and B. + + Note that these are the types as seen from the perspective of the task, + so they must be consistent with the task's connections, but may not be + consistent with the registry dataset types. + """ all_logs = "\n".join(log_output) self.assertIn(f"lsst.a:Run method given data of type: {input_type_a}", all_logs) self.assertIn(f"lsst.b:Run method given data of type: {input_type_b}", all_logs) @@ -191,12 +196,6 @@ def test_from_pipeline(self): def test_from_pipeline_intermediates_differ(self): """Run pipeline but intermediates definition in registry differs.""" - executor = self._configure_pipeline( - NoDimensionsTestTask.ConfigClass, - NoDimensionsTestTask.ConfigClass, - storageClass_b="TaskMetadataLike", - ) - # Pre-define the "intermediate" storage class to be something that is # like a dict but is not a dict. This will fail unless storage # class conversion is supported in put and get. @@ -207,7 +206,11 @@ def test_from_pipeline_intermediates_differ(self): storageClass="TaskMetadataLike", ) ) - + executor = self._configure_pipeline( + NoDimensionsTestTask.ConfigClass, + NoDimensionsTestTask.ConfigClass, + storageClass_b="TaskMetadataLike", + ) with self.assertLogs("lsst", level="INFO") as cm: quanta = executor.run(register_dataset_types=True, save_versions=False) # A dict is given to task a without change. @@ -221,8 +224,8 @@ def test_from_pipeline_intermediates_differ(self): self._test_logs(cm.output, "dict", "dict", "dict", "lsst.pipe.base.TaskMetadata") self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate").to_dict(), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2}) + self.assertEqual(self.butler.get("intermediate"), TaskMetadata.from_dict({"zero": 0, "one": 1})) + self.assertEqual(self.butler.get("output"), TaskMetadata.from_dict({"zero": 0, "one": 1, "two": 2})) def test_from_pipeline_output_differ(self): """Run pipeline but output definition in registry differs.""" @@ -236,13 +239,11 @@ def test_from_pipeline_output_differ(self): storageClass="TaskMetadataLike", ) ) - executor = self._configure_pipeline( NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass, storageClass_a="TaskMetadataLike", ) - with self.assertLogs("lsst", level="INFO") as cm: quanta = executor.run(register_dataset_types=True, save_versions=False) # a has been told to return a TaskMetadata but this will convert to @@ -251,8 +252,8 @@ def test_from_pipeline_output_differ(self): self._test_logs(cm.output, "dict", "lsst.pipe.base.TaskMetadata", "dict", "dict") self.assertEqual(len(quanta), 2) - self.assertEqual(self.butler.get("intermediate").to_dict(), {"zero": 0, "one": 1}) - self.assertEqual(self.butler.get("output").to_dict(), {"zero": 0, "one": 1, "two": 2}) + self.assertEqual(self.butler.get("intermediate"), TaskMetadata.from_dict({"zero": 0, "one": 1})) + self.assertEqual(self.butler.get("output"), TaskMetadata.from_dict({"zero": 0, "one": 1, "two": 2})) def test_from_pipeline_input_differ(self): """Run pipeline but input definition in registry differs.""" @@ -268,8 +269,11 @@ def test_from_pipeline_input_differ(self): self.assertEqual(self.butler.get("intermediate"), {"zero": 0, "one": 1}) self.assertEqual(self.butler.get("output"), {"zero": 0, "one": 1, "two": 2}) - def test_from_pipeline_incompatible(self): - """Run pipeline but definitions are not compatible.""" + def test_from_pipeline_inconsistent_dataset_types(self): + """Generate the QG (by initializing the executor), then register the + dataset type with a different storage class than the QG should have + predicted, to make sure execution fails as it should. + """ executor = self._configure_pipeline( NoDimensionsTestTask.ConfigClass, NoDimensionsTestTask.ConfigClass ) From 2e33486468511d2e13e74a49f7a69f26fc659324 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 1 Dec 2023 20:58:43 -0500 Subject: [PATCH 4/4] Guard against QGs that don't populate registryDatasetTypes. We have a custom QG builders in the wild and they're not as well-behaved as the main one; guard against them until we can fix them. --- python/lsst/ctrl/mpexec/preExecInit.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/lsst/ctrl/mpexec/preExecInit.py b/python/lsst/ctrl/mpexec/preExecInit.py index 90718296..94d190f9 100644 --- a/python/lsst/ctrl/mpexec/preExecInit.py +++ b/python/lsst/ctrl/mpexec/preExecInit.py @@ -411,11 +411,12 @@ def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool ): dataset_types = [ ( - # The registry dataset types do not include components, - # but we don't support storage class overrides for those - # in other contexts anyway. + # The registry dataset types do not include components, but + # we don't support storage class overrides for those in + # other contexts anyway, and custom-built QGs may not have + # the registry dataset types field populated at all.x dataset_type.overrideStorageClass(registry_storage_classes[dataset_type.name]) - if not dataset_type.isComponent() + if dataset_type.name in registry_storage_classes else dataset_type ) for dataset_type in dataset_types