From 53060b78eacaa799990bddff8318950b7cc347c4 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Wed, 31 Jan 2024 11:18:07 -0500 Subject: [PATCH 1/4] Switch back to butler.registry query system in QG generation. This is temporary; the switch to the new system was premature, but is still in the works. --- .../all_dimensions_quantum_graph_builder.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py b/python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py index 6761d6a22..48ee5238c 100644 --- a/python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py +++ b/python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py @@ -52,7 +52,8 @@ ) if TYPE_CHECKING: - from lsst.daf.butler import Butler, DataCoordinateQueryResults, DimensionGroup + from lsst.daf.butler import Butler, DimensionGroup + from lsst.daf.butler.registry.queries import DataCoordinateQueryResults from lsst.utils.logging import LsstLogAdapter from .pipeline_graph import DatasetTypeNode, PipelineGraph, TaskNode @@ -230,7 +231,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG # to find these. count = 0 try: - for ref in data_ids.find_datasets(dataset_type_node.name, self.input_collections): + for ref in data_ids.findDatasets(dataset_type_node.name, self.input_collections): self.existing_datasets.inputs[ DatasetKey(dataset_type_node.name, ref.dataId.required_values) ] = ref @@ -247,7 +248,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG # that we might skip... count = 0 try: - for ref in data_ids.find_datasets(dataset_type_node.name, self.skip_existing_in): + for ref in data_ids.findDatasets(dataset_type_node.name, self.skip_existing_in): key = DatasetKey(dataset_type_node.name, ref.dataId.required_values) self.existing_datasets.outputs_for_skip[key] = ref count += 1 @@ -267,7 +268,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG # previous block). count = 0 try: - for ref in data_ids.find_datasets(dataset_type_node.name, [self.output_run]): + for ref in data_ids.findDatasets(dataset_type_node.name, [self.output_run]): self.existing_datasets.outputs_in_the_way[ DatasetKey(dataset_type_node.name, ref.dataId.required_values) ] = ref @@ -337,7 +338,7 @@ def _find_followup_datasets(self, query: _AllDimensionsQuery, skeleton: QuantumG # IDs to the datasets we're looking for. count = 0 try: - query_results = data_ids.find_related_datasets( + query_results = data_ids.findRelatedDatasets( finder.dataset_type_node.dataset_type, self.input_collections ) except MissingDatasetTypeError: @@ -457,7 +458,7 @@ def from_builder( result.query_args = { "dimensions": dimensions, "where": builder.where, - "data_id": result.subgraph.data_id, + "dataId": result.subgraph.data_id, "bind": builder.bind, } if builder.dataset_query_constraint == DatasetQueryConstraintVariant.ALL: @@ -492,15 +493,15 @@ def from_builder( ) builder.log.verbose("Querying for data IDs with arguments:") builder.log.verbose(" dimensions=%s,", list(result.query_args["dimensions"].names)) - builder.log.verbose(" data_id=%s,", dict(result.query_args["data_id"].required)) + builder.log.verbose(" dataId=%s,", dict(result.query_args["dataId"].required)) if result.query_args["where"]: builder.log.verbose(" where=%s,", repr(result.query_args["where"])) if "datasets" in result.query_args: builder.log.verbose(" datasets=%s,", list(result.query_args["datasets"])) if "collections" in result.query_args: builder.log.verbose(" collections=%s,", list(result.query_args["collections"])) - with builder.butler._query() as query: - with query.data_ids(**result.query_args).materialize() as common_data_ids: + with builder.butler.registry.caching_context(): + with builder.butler.registry.queryDataIds(**result.query_args).materialize() as common_data_ids: builder.log.debug("Expanding data IDs.") result.common_data_ids = common_data_ids.expanded() yield result @@ -527,7 +528,7 @@ def log_failure(self, log: LsstLogAdapter) -> None: # so they can read it more easily and copy and paste into # a Python terminal. log.critical(" dimensions=%s,", list(self.query_args["dimensions"].names)) - log.critical(" data_id=%s,", dict(self.query_args["data_id"].required)) + log.critical(" dataId=%s,", dict(self.query_args["dataId"].required)) if self.query_args["where"]: log.critical(" where=%s,", repr(self.query_args["where"])) if "datasets" in self.query_args: From ae8bc5ba15a007bb1b5e4025c0affc789c7d8fd5 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Wed, 31 Jan 2024 11:28:51 -0500 Subject: [PATCH 2/4] Reformat with latest black. --- python/lsst/pipe/base/_quantumContext.py | 14 ++++++++------ python/lsst/pipe/base/_task_metadata.py | 6 ++---- .../base/all_dimensions_quantum_graph_builder.py | 6 +++--- python/lsst/pipe/base/connectionTypes.py | 6 +++--- .../pipe/base/pipeline_graph/_mapping_views.py | 6 ++---- python/lsst/pipe/base/prerequisite_helpers.py | 6 +++--- 6 files changed, 21 insertions(+), 23 deletions(-) diff --git a/python/lsst/pipe/base/_quantumContext.py b/python/lsst/pipe/base/_quantumContext.py index ea53b9098..306d4fb26 100644 --- a/python/lsst/pipe/base/_quantumContext.py +++ b/python/lsst/pipe/base/_quantumContext.py @@ -226,12 +226,14 @@ def _put(self, value: Any, ref: DatasetRef) -> None: def get( self, - dataset: InputQuantizedConnection - | list[DatasetRef | None] - | list[DeferredDatasetRef | None] - | DatasetRef - | DeferredDatasetRef - | None, + dataset: ( + InputQuantizedConnection + | list[DatasetRef | None] + | list[DeferredDatasetRef | None] + | DatasetRef + | DeferredDatasetRef + | None + ), ) -> Any: """Fetch data from the butler. diff --git a/python/lsst/pipe/base/_task_metadata.py b/python/lsst/pipe/base/_task_metadata.py index 25cf816fc..79501a0ab 100644 --- a/python/lsst/pipe/base/_task_metadata.py +++ b/python/lsst/pipe/base/_task_metadata.py @@ -47,11 +47,9 @@ class PropertySetLike(Protocol): ``PropertySet`` to a `TaskMetadata`. """ - def paramNames(self, topLevelOnly: bool = True) -> Collection[str]: - ... + def paramNames(self, topLevelOnly: bool = True) -> Collection[str]: ... - def getArray(self, name: str) -> Any: - ... + def getArray(self, name: str) -> Any: ... def _isListLike(v: Any) -> bool: diff --git a/python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py b/python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py index 48ee5238c..aa5fce025 100644 --- a/python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py +++ b/python/lsst/pipe/base/all_dimensions_quantum_graph_builder.py @@ -384,9 +384,9 @@ class _AllDimensionsQuery: subgraph: PipelineGraph """Graph of this subset of the pipeline.""" - grouped_by_dimensions: dict[ - DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]] - ] = dataclasses.field(default_factory=dict) + grouped_by_dimensions: dict[DimensionGroup, tuple[dict[str, TaskNode], dict[str, DatasetTypeNode]]] = ( + dataclasses.field(default_factory=dict) + ) """The tasks and dataset types of this subset of the pipeline, grouped by their dimensions. diff --git a/python/lsst/pipe/base/connectionTypes.py b/python/lsst/pipe/base/connectionTypes.py index 18d13e522..ad378222a 100644 --- a/python/lsst/pipe/base/connectionTypes.py +++ b/python/lsst/pipe/base/connectionTypes.py @@ -385,9 +385,9 @@ class PrerequisiteInput(BaseInput): - Prerequisite inputs may be optional (regular inputs are never optional). """ - lookupFunction: Callable[ - [DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef] - ] | None = None + lookupFunction: ( + Callable[[DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef]] | None + ) = None _connection_type_set: ClassVar[str] = "prerequisiteInputs" diff --git a/python/lsst/pipe/base/pipeline_graph/_mapping_views.py b/python/lsst/pipe/base/pipeline_graph/_mapping_views.py index 4c2ac4eea..dcbfc2ef2 100644 --- a/python/lsst/pipe/base/pipeline_graph/_mapping_views.py +++ b/python/lsst/pipe/base/pipeline_graph/_mapping_views.py @@ -186,12 +186,10 @@ def is_resolved(self, key: str) -> bool: return super().__getitem__(key) is not None @overload - def get_if_resolved(self, key: str) -> DatasetTypeNode | None: - ... # pragma: nocover + def get_if_resolved(self, key: str) -> DatasetTypeNode | None: ... # pragma: nocover @overload - def get_if_resolved(self, key: str, default: _T) -> DatasetTypeNode | _T: - ... # pragma: nocover + def get_if_resolved(self, key: str, default: _T) -> DatasetTypeNode | _T: ... # pragma: nocover def get_if_resolved(self, key: str, default: Any = None) -> DatasetTypeNode | Any: """Get a node or return a default if it has not been resolved. diff --git a/python/lsst/pipe/base/prerequisite_helpers.py b/python/lsst/pipe/base/prerequisite_helpers.py index 35ae2b1bd..5fa40b599 100644 --- a/python/lsst/pipe/base/prerequisite_helpers.py +++ b/python/lsst/pipe/base/prerequisite_helpers.py @@ -204,9 +204,9 @@ def __init__( dataset type. """ - lookup_function: Callable[ - [DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef] - ] | None + lookup_function: ( + Callable[[DatasetType, Registry, DataCoordinate, Sequence[str]], Iterable[DatasetRef]] | None + ) """A task-provided callback for finding these datasets. If this is not `None`, it must be used to ensure correct behavior. From c2ac72dd3ecc2cc80a954c0c6a87e004b6bb7514 Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Wed, 31 Jan 2024 11:37:14 -0500 Subject: [PATCH 3/4] Drop flake8 lint action. It's disagreeing with black again, and with both black and ruff on, we don't need it. --- .github/workflows/lint.yaml | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 .github/workflows/lint.yaml diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml deleted file mode 100644 index 6c463eed5..000000000 --- a/.github/workflows/lint.yaml +++ /dev/null @@ -1,16 +0,0 @@ -name: lint - -on: - push: - branches: - - main - pull_request: - -jobs: - call-workflow: - uses: lsst/rubin_workflows/.github/workflows/lint.yaml@main - ruff: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - uses: chartboost/ruff-action@v1 From 6aea5bc4b6ea2ffef53cc2a32b26966ecf2637ea Mon Sep 17 00:00:00 2001 From: Jim Bosch Date: Wed, 31 Jan 2024 17:26:41 -0500 Subject: [PATCH 4/4] Fix log message in QG generation. --- doc/changes/DM-42737.bugfix.md | 1 + python/lsst/pipe/base/quantum_graph_builder.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 doc/changes/DM-42737.bugfix.md diff --git a/doc/changes/DM-42737.bugfix.md b/doc/changes/DM-42737.bugfix.md new file mode 100644 index 000000000..afedadd37 --- /dev/null +++ b/doc/changes/DM-42737.bugfix.md @@ -0,0 +1 @@ +Fix an incorrect count of previously-successful quanta in `QuantumGraphBuilder` logging. diff --git a/python/lsst/pipe/base/quantum_graph_builder.py b/python/lsst/pipe/base/quantum_graph_builder.py index a0a55755b..64169033f 100644 --- a/python/lsst/pipe/base/quantum_graph_builder.py +++ b/python/lsst/pipe/base/quantum_graph_builder.py @@ -570,7 +570,7 @@ def _resolve_task_quanta(self, task_node: TaskNode, skeleton: QuantumGraphSkelet if no_work_quanta: message_terms.append(f"{len(no_work_quanta)} had no work to do") if skipped_quanta: - message_terms.append(f"{len(no_work_quanta)} previously succeeded") + message_terms.append(f"{len(skipped_quanta)} previously succeeded") message_parenthetical = f" ({', '.join(message_terms)})" if message_terms else "" if remaining_quanta: self.log.info(