-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-37163: A manifest checker on the workflow output data #374
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #374 +/- ##
==========================================
+ Coverage 82.26% 82.62% +0.35%
==========================================
Files 90 92 +2
Lines 10185 10323 +138
Branches 1913 1945 +32
==========================================
+ Hits 8379 8529 +150
+ Misses 1478 1452 -26
- Partials 328 342 +14
☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some general comments that didn't fit on any particular line:
- Standard practice for us is to put the Jira ticket into "In Review" and include links to all PRs as a comment there (or at least check to see if Jira linked them already - but it always misses a few of our packages, including pipe_base).
- I think you need to add a new
automodapi
entry todoc/lsst.pipe.base/index.rst
for this new module (you'll see entries for other modules and subpackages you can copy there). You can then runpackage-docs build
and then opendoc/_build/html/index.html
in a web browser to see if the docs look as you'd expect. - While I know we're relying on ci_middleware for most of the testing, it'd be good to at least run these functions in
pipe_base
itself. The hard part is manufacturing the QG and a butler to test with, but thelsst.pipe.base.tests.simpleQGraph.makeSimpleQGraph
function does both. Could you add a test that calls that with no arguments, runs themake_reports
andto_summary_dict
, and looks at the results? Since that QG won't have actually been run, I expect it to look like everything failed, and since we've got better tests in ci_middleware that's okay. - There are a number of failed linting checks from GitHub Actions that you need to resolve. Some of my PR comments will deal with some of them, but I doubt that will take care of them all. Feel free to ask for help interpreting any you don't understand.
python/lsst/pipe/base/__init__.py
Outdated
@@ -1,4 +1,5 @@ | |||
from . import automatic_connection_constants, connectionTypes, pipeline_graph, pipelineIR | |||
from ._check_qg_outputs import * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we've settled on names for the classes, we should rename the module to match; how about execution_reports
?
I've left out the underscore because I also think we should remove it from this __init__.py
, and instead expect users to do
from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport
(etc.)
That's a bit more verbose, but since the vast majority of lsst.pipe.base
imports aren't going to involve this module, it's best to not have all of its import-time logic executed all the time.
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
from __future__ import annotations | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
__all__ = ( | |
"QuantumGraphExecutionReport", | |
"TaskExecutionReport", | |
"DatasetTypeExecutionReport", | |
"lookup_quantum_data_id", | |
) | |
All modules should have an __all__
entries. Among other things, that tells Sphinx which things should appear in the documentation.
"""Datasets not produced because their inputs were not produced or not | ||
found | ||
""" | ||
# bool: predicted inputs to this task were not produced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should merge this code comment into the docstring above.
|
||
|
||
@dataclasses.dataclass | ||
class DatasetTypeExecutionReport: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a class docstring.
"""Counts of datasets produced by this run. | ||
""" | ||
|
||
def to_summary_dict(self) -> dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a docstring.
|
||
|
||
@dataclasses.dataclass | ||
class QuantumGraphExecutionReport: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a class docstring.
class QuantumGraphExecutionReport: | ||
tasks: dict[str, TaskExecutionReport] = dataclasses.field(default_factory=dict) | ||
|
||
def to_summary_dict(self, butler: Butler, logs: bool = True) -> dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a docstring.
def to_summary_dict(self, butler: Butler, logs: bool = True) -> dict[str, Any]: | ||
return {task: report.to_summary_dict(butler, logs=logs) for task, report in self.tasks.items()} | ||
|
||
def write_summary_yaml(self, butler: Butler, filename: str, logs: bool = True) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a docstring.
dataset_type.name, collections=collection, findFirst=False | ||
) | ||
} | ||
for taskDef in qg.iterTaskGraph(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since pretty much everything in this file is snake_case, taskDef
should be, too (also below).
return "\n".join(f"{tasklabel}:{report}" for tasklabel, report in self.tasks.items()) | ||
|
||
|
||
def lookup_quantum_dataId(graph_uri: ResourcePathExpression, nodes: Iterable[uuid.UUID]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a docstring and a -> list[DataCoordinate]
return type annotation.
31b3000
to
2eeed43
Compare
Ok, I've written some documentation and I got I should be able to add a test pretty soon. I'm confused though as to why mypy is failing. I don't understand what I can do to avoid this issue:
|
quantum = quantum_node.quantum | ||
(metadata_ref,) = quantum.outputs[metadata_name] | ||
(log_ref,) = quantum.outputs[log_name] | ||
if metadata_ref.id not in refs[metadata_name]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mypy warning there is because you are using refs[metadata_name]
but Iterable
does not implement __getitem__
. Maybe you should be using collections.abc.Sequence
instead of Iterable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually Mapping[str, Mapping[uuid.UUID, DatasetRef]]
that we need (see comment above).
for upstream_dataset_id in status_graph.predecessors(quantum_node.nodeId) | ||
for upstream_quantum_id in status_graph.predecessors(upstream_dataset_id) | ||
): | ||
self.failed_upstream[quantum_node.nodeId] = quantum.dataId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mypy warning here is telling you that Quantum
does not require a dataId be defined. That means that this can return None
but you have declared that failed_upstream
can not hold None
but must always be DataCoordinate
.
I think to appease mypy you will need to do something like
if quantum.dataId is not None:
self.failed_upstream[...] = ...
or else (because you know this must be impossible)
assert quantum.dataId is not None
(you know this must be impossible since you are getting the quantum from a graph but mypy can't know that).
ad6b3cd
to
36e87ae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added line comments with suggestions for changes that I think will address all the mypy errors.
self, | ||
quantum_node: QuantumNode, | ||
status_graph: networkx.DiGraph, | ||
refs: Iterable[DatasetRef], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mypy error:
python/lsst/pipe/base/execution_reports.py:404: error: Argument 3 to "inspect_quantum" of "TaskExecutionReport" has incompatible type "dict[str, dict[UUID, DatasetRef]]"; expected "Iterable[DatasetRef]" [arg-type]
should be fixed here, and I think that'll address a lot of other ones: we're passing a nested dictionary keyed by first dataset type and then UUID here, and then inside the function we're expecting that, but the annotation is wrong.
It should be:
refs: Iterable[DatasetRef], | |
refs: Mapping[str, Mapping[uuid.UUID, DatasetRef]], |
you will also have to import Mapping
from collections.abc
at the top.
quantum = quantum_node.quantum | ||
(metadata_ref,) = quantum.outputs[metadata_name] | ||
(log_ref,) = quantum.outputs[log_name] | ||
if metadata_ref.id not in refs[metadata_name]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's actually Mapping[str, Mapping[uuid.UUID, DatasetRef]]
that we need (see comment above).
""" | ||
failed_quanta = {} | ||
for node_id, log_ref in self.failed.items(): | ||
quantum_info = {"data_id": log_ref.dataId.byName()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quantum_info = {"data_id": log_ref.dataId.byName()} | |
quantum_info: dict[str, Any] = {"data_id": log_ref.dataId.byName()} |
Some of the mypy errors are coming from it trying too hard to infer the value type of this dictionary as str
because that's what the value type of this first element is. Any
tells it to not expect the value types to be consistent at all.
qg = QuantumGraph.loadUri(graph) | ||
else: | ||
qg = graph | ||
collection = qg.metadata["output_run"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collection = qg.metadata["output_run"] | |
assert qg.metadata is not None, "Saved QGs always have metadata." | |
collection = qg.metadata["output_run"] |
This will address another mypy error.
|
||
for task_def in qg.iterTaskGraph(): | ||
task_report = TaskExecutionReport() | ||
for node in qg.getNodesForTask(task_def): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for node in qg.getNodesForTask(task_def): | |
if task_node.logOutputDatasetName is not None: | |
raise RuntimeError( | |
"QG must have log outputs to use execution reports." | |
) | |
for node in qg.getNodesForTask(task_def): |
This fixes another mypy error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you refer to task_node
I'm not seeing that variable here. Do we want to use node
like in qg.getNodesForTask(task_def)
? Or should it come from somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, suggestion should be been task_def.logOutputDatasetName
.
|
||
def lookup_quantum_data_id( | ||
graph_uri: ResourcePathExpression, nodes: Iterable[uuid.UUID] | ||
) -> list[DataCoordinate]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) -> list[DataCoordinate]: | |
) -> list[DataCoordinate | None]: |
Fixes another mypy issue.
6d1438c
to
cabb4e2
Compare
missing datasets based on metadata. | ||
|
||
A DatasetTypeExecutionReport is created for each DatasetType in a | ||
TaskExecutionReport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you add single-backtickets around DatasetTypeExecutionReport
, DatasetType
, and TaskExecutionReport
those will turn into links to the class docs for each. And that arguably makes the "See Also" section here unnecessary.
missing_failed: set[DatasetRef] = dataclasses.field(default_factory=set) | ||
"""Datasets not produced because their quanta failed directly in this | ||
run (`set`). | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
""" | |
""" | |
Should have a blank line after the docstring of one attribute and the definition of the next.
"""Missing datasets which were not produced due either missing inputs or a | ||
failure in finding inputs (`dict`). | ||
bool: were predicted inputs produced? | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
""" | |
""" | |
""" | ||
missing_upstream_failed: set[DatasetRef] = dataclasses.field(default_factory=set) | ||
"""Datasets not produced due to an upstream failure (`set`). | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
""" | |
""" | |
------- | ||
A count of the DatasetTypes with each outcome; the number of produced, | ||
missing_failed, missing_not_produced, and missing_upstream_failed | ||
DatasetTypes. See above for attribute descriptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returns
sections should be formatted like Parameters
, with a named entry, a :
, and a type, and then a description like this. It's a little weird given that the "name" of a return value never appears in the code, but that's how it is.
failed: `bool` | ||
Whether the task associated with the missing dataset failed. | ||
status_graph: `networkx.DiGraph` | ||
The quantum graph produced by TaskExecutionReport.inspect_quantum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The quantum graph produced by TaskExecutionReport.inspect_quantum | |
The quantum graph produced by `TaskExecutionReport.inspect_quantum` |
|
||
Parameters | ||
---------- | ||
output_ref: `DatasetRef` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might need a space before the :
here (and throughout the file). At least that's how I usually see it, and I'm not sure if Sphinx cares or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure that it also has to be:
`~lsst.daf.butler.DatasetRef`
because DatasetRef
is not a pipe_base class.
failed: dict[uuid.UUID, DatasetRef] = dataclasses.field(default_factory=dict) | ||
"""A mapping from quantum data ID to log dataset reference for quanta that | ||
failed directly in this run (`dict`). | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
""" | |
""" | |
failed_upstream: dict[uuid.UUID, DataCoordinate] = dataclasses.field(default_factory=dict) | ||
"""A mapping of data IDs of quanta that were not attempted due to an | ||
upstream failure (`dict`). | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
""" | |
""" | |
The quantum graph produced by | ||
QuantumGraphExecutionReport.make_reports which steps through the | ||
quantum graph of a run and logs the status of each quantum. | ||
refs: `Mapping[str, Mapping[uuid.UUID, DatasetRef]]` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refs: `Mapping[str, Mapping[uuid.UUID, DatasetRef]]` | |
refs: `~collections.abc.Mapping` [ `str`, `~collections.abc.Mapping` [ \ | |
`uuid.UUID`, `~lsst.daf.butler.DatasetRef ] ] |
Unfortunately you can't put backticks around the whole thing; each symbol needs its own backticks. And symbols like Mapping
and DatasetRef
need to be fully-qualified if they're not in lsst.pipe.base
.
Parameters | ||
---------- | ||
tasks: `dict` | ||
A dictionary of TaskExecutionReports by pipetask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A dictionary of TaskExecutionReports by pipetask | |
A dictionary of TaskExecutionReports by task label. |
Formally pipetask
is the command-line tool only, but we don't want to say PipelineTask
here either if we want to be precise, since you could configure the same task multiple times in one pipeline, and it's really one labeled configuration we're talking about here.
""" | ||
|
||
tasks: dict[str, TaskExecutionReport] = dataclasses.field(default_factory=dict) | ||
"""A dictionary of TaskExecutionReports by pipetask (`dict`). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""A dictionary of TaskExecutionReports by pipetask (`dict`). | |
"""A dictionary of TaskExecutionReports by task label (`dict`). |
e0d54de
to
2dc01d1
Compare
Document functions (draft)
2dc01d1
to
ea43222
Compare
Checklist
doc/changes