From 6d1438c7f9b9a67d94b85f1b9c08972cfd5f4495 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Tue, 10 Oct 2023 12:27:20 -0700 Subject: [PATCH] Fix final mypy linter failures --- python/lsst/pipe/base/execution_reports.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/python/lsst/pipe/base/execution_reports.py b/python/lsst/pipe/base/execution_reports.py index e0d5e9029..d38e5e93b 100644 --- a/python/lsst/pipe/base/execution_reports.py +++ b/python/lsst/pipe/base/execution_reports.py @@ -31,7 +31,7 @@ import itertools import logging import uuid -from collections.abc import Iterable +from collections.abc import Iterable, Mapping from typing import Any import networkx @@ -174,7 +174,7 @@ def inspect_quantum( self, quantum_node: QuantumNode, status_graph: networkx.DiGraph, - refs: Iterable[DatasetRef], + refs: Mapping[str, Mapping[uuid.UUID, DatasetRef]], metadata_name: str, log_name: str, ) -> None: @@ -189,7 +189,7 @@ def inspect_quantum( The quantum graph produced by QuantumGraphExecutionReport.make_reports which steps through the quantum graph of a run and logs the status of each quantum. - refs: `Iterable[DatasetRef]` + refs: `Mapping[str, Mapping[uuid.UUID, DatasetRef]]` The DatasetRefs of each of the DatasetTypes produced by the task. Includes initialization, intermediate and output data products. metadata_name: `str` @@ -212,6 +212,7 @@ def inspect_quantum( for upstream_dataset_id in status_graph.predecessors(quantum_node.nodeId) for upstream_quantum_id in status_graph.predecessors(upstream_dataset_id) ): + assert quantum.dataId is not None self.failed_upstream[quantum_node.nodeId] = quantum.dataId else: self.failed[quantum_node.nodeId] = log_ref @@ -253,7 +254,7 @@ def to_summary_dict(self, butler: Butler, logs: bool = True) -> dict[str, Any]: """ failed_quanta = {} for node_id, log_ref in self.failed.items(): - quantum_info = {"data_id": log_ref.dataId.byName()} + quantum_info: dict[str, Any] = {"data_id": log_ref.dataId.byName()} if logs: try: log = butler.get(log_ref) @@ -365,12 +366,13 @@ def make_reports( report: `QuantumGraphExecutionReport` The TaskExecutionReport for each task in the quantum graph. """ - refs = {} + refs = {} # type: dict[str, Any] status_graph = networkx.DiGraph() if not isinstance(graph, QuantumGraph): qg = QuantumGraph.loadUri(graph) else: qg = graph + assert qg.metadata is not None, "Saved QGs always have metadata." collection = qg.metadata["output_run"] report = cls() task_defs = list(qg.iterTaskGraph()) @@ -397,6 +399,8 @@ def make_reports( for task_def in qg.iterTaskGraph(): task_report = TaskExecutionReport() + if task_def.logOutputDatasetName is None: + raise RuntimeError("QG must have log outputs to use execution reports.") for node in qg.getNodesForTask(task_def): task_report.inspect_quantum( node, @@ -414,7 +418,7 @@ def __str__(self) -> str: def lookup_quantum_data_id( graph_uri: ResourcePathExpression, nodes: Iterable[uuid.UUID] -) -> list[DataCoordinate]: +) -> list[DataCoordinate | None]: """Look up a dataId from a quantum graph and a list of quantum graph nodeIDs. @@ -422,7 +426,7 @@ def lookup_quantum_data_id( ---------- graph_uri: `ResourcePathExpression` URI of the quantum graph of the run. - nodes: `Iterable[uuid.UUID]` + nodes: `Sequence[uuid.UUID]` Quantum graph nodeID. Returns