From 95e75418d84ae01ee28ea3a2fe0113e61db886c3 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Thu, 5 Sep 2024 13:40:09 -0400 Subject: [PATCH 1/4] Move dataset type registration from PreExecInit to PipelineGraph. --- python/lsst/ctrl/mpexec/preExecInit.py | 64 ++----------------- ...test_execution_storage_class_conversion.py | 2 +- 2 files changed, 8 insertions(+), 58 deletions(-) diff --git a/python/lsst/ctrl/mpexec/preExecInit.py b/python/lsst/ctrl/mpexec/preExecInit.py index 1961641d..54c3836c 100644 --- a/python/lsst/ctrl/mpexec/preExecInit.py +++ b/python/lsst/ctrl/mpexec/preExecInit.py @@ -41,12 +41,9 @@ # ----------------------------- # Imports for other modules -- # ----------------------------- -from lsst.daf.butler import DatasetRef, DatasetType -from lsst.daf.butler.registry import ConflictingDefinitionError, MissingDatasetTypeError -from lsst.pipe.base.automatic_connection_constants import ( - PACKAGES_INIT_OUTPUT_NAME, - PACKAGES_INIT_OUTPUT_STORAGE_CLASS, -) +from lsst.daf.butler import DatasetRef +from lsst.daf.butler.registry import ConflictingDefinitionError +from lsst.pipe.base.automatic_connection_constants import PACKAGES_INIT_OUTPUT_NAME from lsst.utils.packages import Packages if TYPE_CHECKING: @@ -410,57 +407,10 @@ def transaction(self) -> Iterator[None]: def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool = False) -> None: # docstring inherited - missing_dataset_types: set[str] = set() - dataset_types = [node.dataset_type for node in graph.pipeline_graph.dataset_types.values()] - dataset_types.append( - DatasetType( - PACKAGES_INIT_OUTPUT_NAME, self.butler.dimensions.empty, PACKAGES_INIT_OUTPUT_STORAGE_CLASS - ) - ) - for dataset_type in dataset_types: - # Resolving the PipelineGraph when building the QuantumGraph should - # have already guaranteed that this is the registry dataset type - # and that all references to it use compatible storage classes, - # so we don't need another check for compatibility here; if the - # dataset type doesn't match the registry that's already a problem. - if registerDatasetTypes: - _LOG.debug("Registering DatasetType %s with registry", dataset_type.name) - try: - self.full_butler.registry.registerDatasetType(dataset_type) - except ConflictingDefinitionError: - expected = self.full_butler.registry.getDatasetType(dataset_type.name) - raise ConflictingDefinitionError( - f"DatasetType definition in registry has changed since the QuantumGraph was built: " - f"{dataset_type} (graph) != {expected} (registry)." - ) - else: - _LOG.debug("Checking DatasetType %s against registry", dataset_type.name) - try: - expected = self.full_butler.registry.getDatasetType(dataset_type.name) - except MissingDatasetTypeError: - # Likely means that --register-dataset-types is forgotten, - # but we could also get here if there is a prerequisite - # input that is optional and none were found in this repo; - # that is not an error. And we don't bother to check if - # they are optional here, since the fact that we were able - # to make the QG says that they were, since there couldn't - # have been any datasets if the dataset types weren't - # registered. - if not graph.pipeline_graph.dataset_types[dataset_type.name].is_prerequisite: - missing_dataset_types.add(dataset_type.name) - continue - if expected != dataset_type: - raise ConflictingDefinitionError( - f"DatasetType definition in registry has changed since the QuantumGraph was built: " - f"{dataset_type} (graph) != {expected} (registry)." - ) - if missing_dataset_types: - plural = "s" if len(missing_dataset_types) != 1 else "" - raise MissingDatasetTypeError( - f"Missing dataset type definition{plural}: {', '.join(missing_dataset_types)}. " - "Dataset types have to be registered with either `butler register-dataset-type` or " - "passing `--register-dataset-types` option to `pipetask run`." - ) + if registerDatasetTypes: + graph.pipeline_graph.register_dataset_types(self.full_butler) + else: + graph.pipeline_graph.check_dataset_type_registrations(self.full_butler) class PreExecInitLimited(PreExecInitBase): diff --git a/tests/test_execution_storage_class_conversion.py b/tests/test_execution_storage_class_conversion.py index e90f0fb4..1cabb818 100644 --- a/tests/test_execution_storage_class_conversion.py +++ b/tests/test_execution_storage_class_conversion.py @@ -292,7 +292,7 @@ def test_registry_changed(self): ) with self.assertRaisesRegex( lsst.daf.butler.registry.ConflictingDefinitionError, - ".*definition in registry has changed.*StructuredDataDict.*TaskMetadataLike.*", + ".*_mock_StructuredDataDict.*is inconsistent with.*TaskMetadataLike.*", ): executor.run(register_dataset_types=True, save_versions=False) From 8deafa97c71cac75559c7e5b04d684d8826c6ce1 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 6 Sep 2024 11:44:47 -0400 Subject: [PATCH 2/4] Move logic for making QBB for pre-exec-init to QG class. --- python/lsst/ctrl/mpexec/cmdLineFwk.py | 40 ++------------------------- 1 file changed, 2 insertions(+), 38 deletions(-) diff --git a/python/lsst/ctrl/mpexec/cmdLineFwk.py b/python/lsst/ctrl/mpexec/cmdLineFwk.py index 492d1f66..c02706f8 100644 --- a/python/lsst/ctrl/mpexec/cmdLineFwk.py +++ b/python/lsst/ctrl/mpexec/cmdLineFwk.py @@ -48,7 +48,6 @@ Butler, CollectionType, Config, - DatasetId, DatasetType, DimensionConfig, DimensionUniverse, @@ -58,7 +57,6 @@ Registry, ) from lsst.daf.butler.datastore.cache_manager import DatastoreCacheManager -from lsst.daf.butler.datastore.record_data import DatastoreRecordData from lsst.daf.butler.direct_butler import DirectButler from lsst.daf.butler.registry import MissingCollectionError, RegistryDefaults from lsst.daf.butler.registry.wildcards import CollectionWildcard @@ -956,42 +954,8 @@ def preExecInitQBB(self, task_factory: TaskFactory, args: SimpleNamespace) -> No # but we need datastore records for initInputs, and those are only # available from Quanta, so load the whole thing. qgraph = QuantumGraph.loadUri(args.qgraph, graphID=args.qgraph_id) - universe = qgraph.universe - - # Collect all init input/output dataset IDs. - predicted_inputs: set[DatasetId] = set() - predicted_outputs: set[DatasetId] = set() - for taskDef in qgraph.iterTaskGraph(): - if (refs := qgraph.initInputRefs(taskDef)) is not None: - predicted_inputs.update(ref.id for ref in refs) - if (refs := qgraph.initOutputRefs(taskDef)) is not None: - predicted_outputs.update(ref.id for ref in refs) - predicted_outputs.update(ref.id for ref in qgraph.globalInitOutputRefs()) - # remove intermediates from inputs - predicted_inputs -= predicted_outputs - - # Very inefficient way to extract datastore records from quantum graph, - # we have to scan all quanta and look at their datastore records. - datastore_records: dict[str, DatastoreRecordData] = {} - for quantum_node in qgraph: - for store_name, records in quantum_node.quantum.datastore_records.items(): - subset = records.subset(predicted_inputs) - if subset is not None: - datastore_records.setdefault(store_name, DatastoreRecordData()).update(subset) - - dataset_types = {dstype.name: dstype for dstype in qgraph.registryDatasetTypes()} - - # Make butler from everything. - butler = QuantumBackedButler.from_predicted( - config=args.butler_config, - predicted_inputs=predicted_inputs, - predicted_outputs=predicted_outputs, - dimensions=universe, - datastore_records=datastore_records, - search_paths=args.config_search_path, - dataset_types=dataset_types, - ) - + # Make QBB. + butler = qgraph.make_init_qbb(args.butler_config, config_search_paths=args.config_search_path) # Save all InitOutputs, configs, etc. preExecInit = PreExecInitLimited(butler, task_factory) preExecInit.initialize(qgraph) From a2d2a8b29effefd59929171c7b905ffe868d7414 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 6 Sep 2024 12:37:34 -0400 Subject: [PATCH 3/4] Use new QG method for PreExecInit.saveConfigs implementation. This changes the exception types raised (to be more consistent with what saveInitOutputs already raised), but these weren't really recoverable errors so this shouldn't break anything. --- python/lsst/ctrl/mpexec/preExecInit.py | 79 +++----------------------- 1 file changed, 7 insertions(+), 72 deletions(-) diff --git a/python/lsst/ctrl/mpexec/preExecInit.py b/python/lsst/ctrl/mpexec/preExecInit.py index 54c3836c..34f33c2a 100644 --- a/python/lsst/ctrl/mpexec/preExecInit.py +++ b/python/lsst/ctrl/mpexec/preExecInit.py @@ -180,36 +180,7 @@ def saveInitOutputs(self, graph: QuantumGraph) -> None: new data. """ _LOG.debug("Will save InitOutputs for all tasks") - for taskDef in self._task_iter(graph): - init_input_refs = graph.initInputRefs(taskDef) or [] - task = self.taskFactory.makeTask( - graph.pipeline_graph.tasks[taskDef.label], self.butler, init_input_refs - ) - for name in taskDef.connections.initOutputs: - attribute = getattr(taskDef.connections, name) - init_output_refs = graph.initOutputRefs(taskDef) or [] - init_output_ref, obj_from_store = self._find_dataset(init_output_refs, attribute.name) - if init_output_ref is None: - raise ValueError(f"Cannot find dataset reference for init output {name} in a graph") - init_output_var = getattr(task, name) - - if obj_from_store is not None: - _LOG.debug( - "Retrieving InitOutputs for task=%s key=%s dsTypeName=%s", task, name, attribute.name - ) - obj_from_store = self.butler.get(init_output_ref) - # Types are supposed to be identical. - # TODO: Check that object contents is identical too. - if type(obj_from_store) is not type(init_output_var): - raise TypeError( - f"Stored initOutput object type {type(obj_from_store)} " - "is different from task-generated type " - f"{type(init_output_var)} for task {taskDef}" - ) - else: - _LOG.debug("Saving InitOutputs for task=%s key=%s", taskDef.label, name) - # This can still raise if there is a concurrent write. - self.butler.put(init_output_var, init_output_ref) + graph.write_init_outputs(self.butler, skip_existing=self.extendRun) def saveConfigs(self, graph: QuantumGraph) -> None: """Write configurations for pipeline tasks to butler or check that @@ -222,49 +193,13 @@ def saveConfigs(self, graph: QuantumGraph) -> None: Raises ------ - TypeError - Raised if existing object in butler is different from new data. - Exception - Raised if ``extendRun`` is `False` and datasets already exists. - Content of a butler collection should not be changed if exception - is raised. + ConflictingDefinitionError + Raised if existing object in butler is different from new data, or + if ``extendRun`` is `False` and datasets already exists. + Content of a butler collection should not be changed if this + exception is raised. """ - - def logConfigMismatch(msg: str) -> None: - """Log messages about configuration mismatch. - - Parameters - ---------- - msg : `str` - Log message to use. - """ - _LOG.fatal("Comparing configuration: %s", msg) - - _LOG.debug("Will save Configs for all tasks") - # start transaction to rollback any changes on exceptions - with self.transaction(): - for taskDef in self._task_iter(graph): - # Config dataset ref is stored in task init outputs, but it - # may be also be missing. - task_output_refs = graph.initOutputRefs(taskDef) - if task_output_refs is None: - continue - - config_ref, old_config = self._find_dataset(task_output_refs, taskDef.configDatasetName) - if config_ref is None: - continue - - if old_config is not None: - if not taskDef.config.compare(old_config, shortcut=False, output=logConfigMismatch): - raise TypeError( - f"Config does not match existing task config {taskDef.configDatasetName!r} in " - "butler; tasks configurations must be consistent within the same run collection" - ) - else: - _LOG.debug( - "Saving Config for task=%s dataset type=%s", taskDef.label, taskDef.configDatasetName - ) - self.butler.put(taskDef.config, config_ref) + graph.write_configs(self.butler, compare_existing=self.extendRun) def savePackageVersions(self, graph: QuantumGraph) -> None: """Write versions of software packages to butler. From 3226412f9a9a8a586925810da4737e2cebe43244 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Fri, 6 Sep 2024 14:13:35 -0400 Subject: [PATCH 4/4] Use new QG method for PreExecInit.savePackageVersion implementation. This again changes the exception types raised (to be more consistent with what saveInitOutputs already raised), but these weren't really recoverable errors so this shouldn't break anything. --- python/lsst/ctrl/mpexec/preExecInit.py | 139 +------------------------ 1 file changed, 4 insertions(+), 135 deletions(-) diff --git a/python/lsst/ctrl/mpexec/preExecInit.py b/python/lsst/ctrl/mpexec/preExecInit.py index 34f33c2a..ba102222 100644 --- a/python/lsst/ctrl/mpexec/preExecInit.py +++ b/python/lsst/ctrl/mpexec/preExecInit.py @@ -34,54 +34,19 @@ # ------------------------------- import abc import logging -from collections.abc import Iterable, Iterator -from contextlib import contextmanager -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING # ----------------------------- # Imports for other modules -- # ----------------------------- -from lsst.daf.butler import DatasetRef -from lsst.daf.butler.registry import ConflictingDefinitionError -from lsst.pipe.base.automatic_connection_constants import PACKAGES_INIT_OUTPUT_NAME -from lsst.utils.packages import Packages if TYPE_CHECKING: from lsst.daf.butler import Butler, LimitedButler - from lsst.pipe.base import QuantumGraph, TaskDef, TaskFactory + from lsst.pipe.base import QuantumGraph, TaskFactory _LOG = logging.getLogger(__name__) -class MissingReferenceError(Exception): - """Exception raised when resolved reference is missing from graph.""" - - pass - - -def _compare_packages(old_packages: Packages, new_packages: Packages) -> None: - """Compare two versions of Packages. - - Parameters - ---------- - old_packages : `Packages` - Previously recorded package versions. - new_packages : `Packages` - New set of package versions. - - Raises - ------ - TypeError - Raised if parameters are inconsistent. - """ - diff = new_packages.difference(old_packages) - if diff: - versions_str = "; ".join(f"{pkg}: {diff[pkg][1]} vs {diff[pkg][0]}" for pkg in diff) - raise TypeError(f"Package versions mismatch: ({versions_str})") - else: - _LOG.debug("new packages are consistent with old") - - class PreExecInitBase(abc.ABC): """Common part of the implementation of PreExecInit classes that does not depend on Butler type. @@ -91,14 +56,13 @@ class PreExecInitBase(abc.ABC): butler : `~lsst.daf.butler.LimitedButler` Butler to use. taskFactory : `lsst.pipe.base.TaskFactory` - Task factory. + Ignored and accepted for backwards compatibility. extendRun : `bool` Whether extend run parameter is in use. """ def __init__(self, butler: LimitedButler, taskFactory: TaskFactory, extendRun: bool): self.butler = butler - self.taskFactory = taskFactory self.extendRun = extendRun def initialize( @@ -214,96 +178,7 @@ def savePackageVersions(self, graph: QuantumGraph) -> None: TypeError Raised if existing object in butler is incompatible with new data. """ - packages = Packages.fromSystem() - _LOG.debug("want to save packages: %s", packages) - - # start transaction to rollback any changes on exceptions - with self.transaction(): - # Packages dataset ref is stored in graph's global init outputs, - # but it may be also be missing. - - packages_ref, old_packages = self._find_dataset( - graph.globalInitOutputRefs(), PACKAGES_INIT_OUTPUT_NAME - ) - if packages_ref is None: - return - - if old_packages is not None: - # Note that because we can only detect python modules that have - # been imported, the stored list of products may be more or - # less complete than what we have now. What's important is - # that the products that are in common have the same version. - _compare_packages(old_packages, packages) - # Update the old set of packages in case we have more packages - # that haven't been persisted. - extra = packages.extra(old_packages) - if extra: - _LOG.debug("extra packages: %s", extra) - old_packages.update(packages) - # have to remove existing dataset first, butler has no - # replace option. - self.butler.pruneDatasets([packages_ref], unstore=True, purge=True) - self.butler.put(old_packages, packages_ref) - else: - self.butler.put(packages, packages_ref) - - def _find_dataset( - self, refs: Iterable[DatasetRef], dataset_type: str - ) -> tuple[DatasetRef | None, Any | None]: - """Find a ref with a given dataset type name in a list of references - and try to retrieve its data from butler. - - Parameters - ---------- - refs : `~collections.abc.Iterable` [ `~lsst.daf.butler.DatasetRef` ] - References to check for matching dataset type. - dataset_type : `str` - Name of a dataset type to look for. - - Returns - ------- - ref : `~lsst.daf.butler.DatasetRef` or `None` - Dataset reference or `None` if there is no matching dataset type. - data : `Any` - An existing object extracted from butler, `None` if ``ref`` is - `None` or if there is no existing object for that reference. - """ - ref: DatasetRef | None = None - for ref in refs: - if ref.datasetType.name == dataset_type: - break - else: - return None, None - - try: - data = self.butler.get(ref) - if data is not None and not self.extendRun: - # It must not exist unless we are extending run. - raise ConflictingDefinitionError(f"Dataset {ref} already exists in butler") - except (LookupError, FileNotFoundError): - data = None - return ref, data - - def _task_iter(self, graph: QuantumGraph) -> Iterator[TaskDef]: - """Iterate over TaskDefs in a graph, return only tasks that have one or - more associated quanta. - """ - for taskDef in graph.iterTaskGraph(): - if graph.getNumberOfQuantaForTask(taskDef) > 0: - yield taskDef - - @contextmanager - def transaction(self) -> Iterator[None]: - """Context manager for transaction. - - Default implementation has no transaction support. - - Yields - ------ - `None` - No transaction support. - """ - yield + graph.write_packages(self.butler, compare_existing=self.extendRun) class PreExecInit(PreExecInitBase): @@ -334,12 +209,6 @@ def __init__(self, butler: Butler, taskFactory: TaskFactory, extendRun: bool = F "with a default output RUN collection." ) - @contextmanager - def transaction(self) -> Iterator[None]: - # dosctring inherited - with self.full_butler.transaction(): - yield - def initializeDatasetTypes(self, graph: QuantumGraph, registerDatasetTypes: bool = False) -> None: # docstring inherited if registerDatasetTypes: