Skip to content

Commit

Permalink
Merge pull request #437 from lsst/tickets/DM-41605
Browse files Browse the repository at this point in the history
DM-41605: Command-line aggregator for pipetask report
  • Loading branch information
eigerx authored Oct 16, 2024
2 parents d975a85 + 0639af2 commit a403324
Show file tree
Hide file tree
Showing 3 changed files with 586 additions and 24 deletions.
7 changes: 7 additions & 0 deletions doc/changes/DM-41605.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Add functionality to aggregate multiple `QuantumProvenanceGraph.Summary`
objects into one `Summary` for a wholistic report.

While the `QuantumProvenanceGraph` was designed to resolve processing over
dataquery-identified groups, `QuantumProvenanceGraph.aggregate` is designed to
combine multiple group-level reports into one which totals the successes,
issues and failures over the same section of pipeline.
72 changes: 71 additions & 1 deletion python/lsst/pipe/base/quantum_provenance_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,24 @@ def add_quantum_info(self, info: QuantumInfo, butler: Butler, do_store_logs: boo
case unrecognized_state:
raise AssertionError(f"Unrecognized quantum status {unrecognized_state!r}")

def add_data_id_group(self, other_summary: TaskSummary) -> None:
"""Add information from a `TaskSummary` over one dataquery-identified
group to another, as part of aggregating `Summary` reports.
Parameters
----------
other_summary : `TaskSummary`
`TaskSummary` to aggregate.
"""
self.n_successful += other_summary.n_successful
self.n_blocked += other_summary.n_blocked
self.n_unknown += other_summary.n_unknown
self.n_expected += other_summary.n_expected

self.wonky_quanta.extend(other_summary.wonky_quanta)
self.recovered_quanta.extend(other_summary.recovered_quanta)
self.failed_quanta.extend(other_summary.failed_quanta)


class CursedDatasetSummary(pydantic.BaseModel):
"""A summary of all the relevant information on a cursed dataset."""
Expand Down Expand Up @@ -549,7 +567,7 @@ class DatasetTypeSummary(pydantic.BaseModel):
runs.
"""

producer: str
producer: str = ""
"""The name of the task which produced this dataset.
"""

Expand Down Expand Up @@ -626,6 +644,37 @@ def add_dataset_info(self, info: DatasetInfo, producer_info: QuantumInfo) -> Non
case unrecognized_state:
raise AssertionError(f"Unrecognized dataset status {unrecognized_state!r}")

def add_data_id_group(self, other_summary: DatasetTypeSummary) -> None:
"""Add information from a `DatasetTypeSummary` over one
dataquery-identified group to another, as part of aggregating `Summary`
reports.
Parameters
----------
other_summary : `DatasetTypeSummary`
`DatasetTypeSummary` to aggregate.
"""
if self.producer and other_summary.producer:
# Guard against empty string
if self.producer != other_summary.producer:
_LOG.warning(
"Producer for dataset type is not consistent: %r != %r.",
self.producer,
other_summary.producer,
)
_LOG.warning("Ignoring %r.", other_summary.producer)
else:
if other_summary.producer and not self.producer:
self.producer = other_summary.producer

self.n_visible += other_summary.n_visible
self.n_shadowed += other_summary.n_shadowed
self.n_predicted_only += other_summary.n_predicted_only
self.n_expected += other_summary.n_expected

self.cursed_datasets.extend(other_summary.cursed_datasets)
self.unsuccessful_datasets.extend(other_summary.unsuccessful_datasets)


class Summary(pydantic.BaseModel):
"""A summary of the contents of the QuantumProvenanceGraph, including
Expand All @@ -641,6 +690,27 @@ class Summary(pydantic.BaseModel):
"""Summaries for the datasets.
"""

@classmethod
def aggregate(cls, summaries: Sequence[Summary]) -> Summary:
"""Combine summaries from disjoint data id groups into an overall
summary of common tasks and datasets. Intended for use when the same
pipeline has been run over all groups.
Parameters
----------
summaries : `Sequence[Summary]`
Sequence of all `Summary` objects to aggregate.
"""
result = cls()
for summary in summaries:
for label, task_summary in summary.tasks.items():
result_task_summary = result.tasks.setdefault(label, TaskSummary())
result_task_summary.add_data_id_group(task_summary)
for dataset_type, dataset_type_summary in summary.datasets.items():
result_dataset_summary = result.datasets.setdefault(dataset_type, DatasetTypeSummary())
result_dataset_summary.add_data_id_group(dataset_type_summary)
return result


class QuantumProvenanceGraph:
"""A set of already-run, merged quantum graphs with provenance
Expand Down
Loading

0 comments on commit a403324

Please sign in to comment.