Skip to content

Commit

Permalink
Add class documentation (draft),rename execution_reports and remove f…
Browse files Browse the repository at this point in the history
…rom __init__.py
  • Loading branch information
eigerx committed Sep 29, 2023
1 parent d10f945 commit 31b3000
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 11 deletions.
1 change: 0 additions & 1 deletion python/lsst/pipe/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from . import automatic_connection_constants, connectionTypes, pipeline_graph, pipelineIR
from ._check_qg_outputs import *
from ._dataset_handle import *
from ._instrument import *
from ._observation_dimension_packer import *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

__all__ = (
"QuantumGraphExecutionReport",
"TaskExecutionReport",
"DatasetTypeExecutionReport",
"lookup_quantum_data_id",
)

import dataclasses
import itertools
import logging
Expand All @@ -38,19 +45,44 @@

@dataclasses.dataclass
class DatasetTypeExecutionReport:
"""A report on the number of produced datasets as well as the status of
missing datasets based on metadata.
A DatasetTypeExecutionReport is created for each DatasetType in a
TaskExecutionReport.
Parameters
----------
missing_failed: `set`
Datasets not produced because their quanta failed directly in this run.
missing_not_produced: `dict`
A mapping from DatasetRef of missing datasets which were not produced
because their inputs were not produced or not found to a bool:
whether predicted inputs to this task were not produced.
missing_upstream_failed: `set`
Datasets not produced due to an upstream failure
n_produced: `int`
Count of datasets produced.
See Also
--------
TaskExecutionReport
QuantumGraphExecutionReport
"""

missing_failed: set[DatasetRef] = dataclasses.field(default_factory=set)
"""Datasets not produced because their quanta failed directly in this run.
"""Datasets not produced because their quanta failed in this run (`set`).
"""
missing_not_produced: dict[DatasetRef, bool] = dataclasses.field(default_factory=dict)
"""Datasets not produced because their inputs were not produced or not
found
"""Missing datasets which were not produced due either missing inputs or a
failure in finding inputs (`dict`).
bool: were predicted inputs produced?
"""
# bool: predicted inputs to this task were not produced
missing_upstream_failed: set[DatasetRef] = dataclasses.field(default_factory=set)
"""Datasets not produced due to an upstream failure
"""Datasets not produced due to an upstream failure (`set`).
"""
n_produced: int = 0
"""Counts of datasets produced by this run.
"""Count of datasets produced (`int`).
"""

def to_summary_dict(self) -> dict[str, Any]:
Expand Down Expand Up @@ -84,16 +116,39 @@ def handle_produced_dataset(self, output_ref: DatasetRef, status_graph: networkx

@dataclasses.dataclass
class TaskExecutionReport:
"""A report on the status and content of a task in an executed quantum
graph.
Use task metadata to identify and inspect failures and report on output
datasets.
Parameters
----------
failed: `dict`
A mapping from quantum data ID to log dataset reference for quanta that
failed directly in this run.
failed_upstream: `dict`
A mapping of data IDs of quanta that were not attempted due to an
upstream failure.
output_datasets: `dict`
Missing and produced outputs of each DatasetType.
See Also
--------
QuantumGraphExecutionReport
DatasetTypeExecutionReport
"""

failed: dict[uuid.UUID, DatasetRef] = dataclasses.field(default_factory=dict)
"""A mapping from quantum data ID to log dataset reference for quanta that
failed directly in this run.
failed directly in this run (`dict`).
"""
failed_upstream: dict[uuid.UUID, DataCoordinate] = dataclasses.field(default_factory=dict)
"""A mapping of data IDs of quanta that were not attempted due to an
upstream failure.
upstream failure (`dict`).
"""
output_datasets: dict[str, DatasetTypeExecutionReport] = dataclasses.field(default_factory=dict)
"""Reports of the missing and produced outputs of each DatasetType
"""Missing and produced outputs of each DatasetType (`dict`).
"""

def inspect_quantum(self, quantum_node, status_graph: networkx.DiGraph, refs, metadata_name, log_name):
Expand Down Expand Up @@ -149,7 +204,28 @@ def __str__(self) -> str:

@dataclasses.dataclass
class QuantumGraphExecutionReport:
"""A report on the execution of a quantum graph.
Report the detailed status of each failure; whether tasks were not run,
data is missing from upstream failures, or specific errors occurred during
task execution (and report the errors). Contains a count of expected,
produced DatasetTypes for each task. This report can be output as a
dictionary or a yaml file.
Parameters
----------
tasks: `dict`
A dictionary of TaskExecutionReports by pipetask
See Also
--------
TaskExecutionReport
DatasetTypeExecutionReport
"""

tasks: dict[str, TaskExecutionReport] = dataclasses.field(default_factory=dict)
"""A dictionary of TaskExecutionReports by pipetask (`dict`).
"""

def to_summary_dict(self, butler: Butler, logs: bool = True) -> dict[str, Any]:
return {task: report.to_summary_dict(butler, logs=logs) for task, report in self.tasks.items()}
Expand Down Expand Up @@ -211,6 +287,21 @@ def __str__(self) -> str:
return "\n".join(f"{tasklabel}:{report}" for tasklabel, report in self.tasks.items())


def lookup_quantum_dataId(graph_uri: ResourcePathExpression, nodes: Iterable[uuid.UUID]):
def lookup_quantum_data_id(graph_uri: ResourcePathExpression, nodes: Iterable[uuid.UUID]):
"""Look up a dataId from a quantum graph and a list of quantum graph
nodeIDs.
Parameters
----------
graph_uri: `ResourcePathExpression`
URI of the quantum graph of the run.
nodes: `Iterable[uuid.UUID]`
Quantum graph nodeID.
Returns
-------
A list of human-readable dataIDs which map to the nodeIDs on the quantum
graph at graph_uri.
"""
qg = QuantumGraph.loadUri(graph_uri, nodes=nodes)
return [qg.getQuantumNodeByNodeId(node).quantum.dataId for node in nodes]

0 comments on commit 31b3000

Please sign in to comment.