From 31b3000ed526c4a7e340e8b0e31e7d8b71215c9b Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Fri, 29 Sep 2023 16:34:37 -0700 Subject: [PATCH] Add class documentation (draft),rename execution_reports and remove from __init__.py --- python/lsst/pipe/base/__init__.py | 1 - ...eck_qg_outputs.py => execution_reports.py} | 111 ++++++++++++++++-- 2 files changed, 101 insertions(+), 11 deletions(-) rename python/lsst/pipe/base/{_check_qg_outputs.py => execution_reports.py} (73%) diff --git a/python/lsst/pipe/base/__init__.py b/python/lsst/pipe/base/__init__.py index 27b4d146f..64b8220d7 100644 --- a/python/lsst/pipe/base/__init__.py +++ b/python/lsst/pipe/base/__init__.py @@ -1,5 +1,4 @@ from . import automatic_connection_constants, connectionTypes, pipeline_graph, pipelineIR -from ._check_qg_outputs import * from ._dataset_handle import * from ._instrument import * from ._observation_dimension_packer import * diff --git a/python/lsst/pipe/base/_check_qg_outputs.py b/python/lsst/pipe/base/execution_reports.py similarity index 73% rename from python/lsst/pipe/base/_check_qg_outputs.py rename to python/lsst/pipe/base/execution_reports.py index 3bdeb34c2..49be5c546 100644 --- a/python/lsst/pipe/base/_check_qg_outputs.py +++ b/python/lsst/pipe/base/execution_reports.py @@ -20,6 +20,13 @@ # along with this program. If not, see . from __future__ import annotations +__all__ = ( + "QuantumGraphExecutionReport", + "TaskExecutionReport", + "DatasetTypeExecutionReport", + "lookup_quantum_data_id", +) + import dataclasses import itertools import logging @@ -38,19 +45,44 @@ @dataclasses.dataclass class DatasetTypeExecutionReport: + """A report on the number of produced datasets as well as the status of + missing datasets based on metadata. + + A DatasetTypeExecutionReport is created for each DatasetType in a + TaskExecutionReport. + + Parameters + ---------- + missing_failed: `set` + Datasets not produced because their quanta failed directly in this run. + missing_not_produced: `dict` + A mapping from DatasetRef of missing datasets which were not produced + because their inputs were not produced or not found to a bool: + whether predicted inputs to this task were not produced. + missing_upstream_failed: `set` + Datasets not produced due to an upstream failure + n_produced: `int` + Count of datasets produced. + + See Also + -------- + TaskExecutionReport + QuantumGraphExecutionReport + """ + missing_failed: set[DatasetRef] = dataclasses.field(default_factory=set) - """Datasets not produced because their quanta failed directly in this run. + """Datasets not produced because their quanta failed in this run (`set`). """ missing_not_produced: dict[DatasetRef, bool] = dataclasses.field(default_factory=dict) - """Datasets not produced because their inputs were not produced or not - found + """Missing datasets which were not produced due either missing inputs or a + failure in finding inputs (`dict`). + bool: were predicted inputs produced? """ - # bool: predicted inputs to this task were not produced missing_upstream_failed: set[DatasetRef] = dataclasses.field(default_factory=set) - """Datasets not produced due to an upstream failure + """Datasets not produced due to an upstream failure (`set`). """ n_produced: int = 0 - """Counts of datasets produced by this run. + """Count of datasets produced (`int`). """ def to_summary_dict(self) -> dict[str, Any]: @@ -84,16 +116,39 @@ def handle_produced_dataset(self, output_ref: DatasetRef, status_graph: networkx @dataclasses.dataclass class TaskExecutionReport: + """A report on the status and content of a task in an executed quantum + graph. + + Use task metadata to identify and inspect failures and report on output + datasets. + + Parameters + ---------- + failed: `dict` + A mapping from quantum data ID to log dataset reference for quanta that + failed directly in this run. + failed_upstream: `dict` + A mapping of data IDs of quanta that were not attempted due to an + upstream failure. + output_datasets: `dict` + Missing and produced outputs of each DatasetType. + + See Also + -------- + QuantumGraphExecutionReport + DatasetTypeExecutionReport + """ + failed: dict[uuid.UUID, DatasetRef] = dataclasses.field(default_factory=dict) """A mapping from quantum data ID to log dataset reference for quanta that - failed directly in this run. + failed directly in this run (`dict`). """ failed_upstream: dict[uuid.UUID, DataCoordinate] = dataclasses.field(default_factory=dict) """A mapping of data IDs of quanta that were not attempted due to an - upstream failure. + upstream failure (`dict`). """ output_datasets: dict[str, DatasetTypeExecutionReport] = dataclasses.field(default_factory=dict) - """Reports of the missing and produced outputs of each DatasetType + """Missing and produced outputs of each DatasetType (`dict`). """ def inspect_quantum(self, quantum_node, status_graph: networkx.DiGraph, refs, metadata_name, log_name): @@ -149,7 +204,28 @@ def __str__(self) -> str: @dataclasses.dataclass class QuantumGraphExecutionReport: + """A report on the execution of a quantum graph. + + Report the detailed status of each failure; whether tasks were not run, + data is missing from upstream failures, or specific errors occurred during + task execution (and report the errors). Contains a count of expected, + produced DatasetTypes for each task. This report can be output as a + dictionary or a yaml file. + + Parameters + ---------- + tasks: `dict` + A dictionary of TaskExecutionReports by pipetask + + See Also + -------- + TaskExecutionReport + DatasetTypeExecutionReport + """ + tasks: dict[str, TaskExecutionReport] = dataclasses.field(default_factory=dict) + """A dictionary of TaskExecutionReports by pipetask (`dict`). + """ def to_summary_dict(self, butler: Butler, logs: bool = True) -> dict[str, Any]: return {task: report.to_summary_dict(butler, logs=logs) for task, report in self.tasks.items()} @@ -211,6 +287,21 @@ def __str__(self) -> str: return "\n".join(f"{tasklabel}:{report}" for tasklabel, report in self.tasks.items()) -def lookup_quantum_dataId(graph_uri: ResourcePathExpression, nodes: Iterable[uuid.UUID]): +def lookup_quantum_data_id(graph_uri: ResourcePathExpression, nodes: Iterable[uuid.UUID]): + """Look up a dataId from a quantum graph and a list of quantum graph + nodeIDs. + + Parameters + ---------- + graph_uri: `ResourcePathExpression` + URI of the quantum graph of the run. + nodes: `Iterable[uuid.UUID]` + Quantum graph nodeID. + + Returns + ------- + A list of human-readable dataIDs which map to the nodeIDs on the quantum + graph at graph_uri. + """ qg = QuantumGraph.loadUri(graph_uri, nodes=nodes) return [qg.getQuantumNodeByNodeId(node).quantum.dataId for node in nodes]