From 67ad843c49dd3327e85f06c9f6ce30df09ed3f74 Mon Sep 17 00:00:00 2001 From: Orion Eiger Date: Fri, 1 Dec 2023 15:09:32 -0800 Subject: [PATCH] Add human-readable option to report summary dictionaries --- doc/changes/DM-41606.feature.md | 1 + python/lsst/pipe/base/execution_reports.py | 49 ++++++++++++++++++---- 2 files changed, 42 insertions(+), 8 deletions(-) create mode 100644 doc/changes/DM-41606.feature.md diff --git a/doc/changes/DM-41606.feature.md b/doc/changes/DM-41606.feature.md new file mode 100644 index 000000000..396f0f2e4 --- /dev/null +++ b/doc/changes/DM-41606.feature.md @@ -0,0 +1 @@ +Add human-readable option to report summary dictionaries. diff --git a/python/lsst/pipe/base/execution_reports.py b/python/lsst/pipe/base/execution_reports.py index dcbb34c5c..2a9ec1dbd 100644 --- a/python/lsst/pipe/base/execution_reports.py +++ b/python/lsst/pipe/base/execution_reports.py @@ -197,7 +197,9 @@ def inspect_quantum( else: dataset_type_report.n_produced += 1 - def to_summary_dict(self, butler: Butler, do_store_logs: bool = True) -> dict[str, Any]: + def to_summary_dict( + self, butler: Butler, do_store_logs: bool = True, human_readable: bool = False + ) -> dict[str, Any]: """Summarize the results of the TaskExecutionReport in a dictionary. Parameters @@ -206,6 +208,9 @@ def to_summary_dict(self, butler: Butler, do_store_logs: bool = True) -> dict[st The Butler used for this report. do_store_logs : `bool` Store the logs in the summary dictionary. + human_readable : `bool` + Store more human-readable information to be printed out to the + command-line. Returns ------- @@ -220,10 +225,17 @@ def to_summary_dict(self, butler: Butler, do_store_logs: bool = True) -> dict[st - n_quanta_blocked: The number of quanta which failed due to upstream failures. - n_succeded: The number of quanta which succeeded. + And possibly, if human-readable is passed: + - errors: A dictionary of data ids associated with each error + message. If `human-readable` and `do_store_logs`, this is stored + here. Otherwise, if `do_store_logs`, it is stored in + `failed_quanta` keyed by the quantum graph node id. + """ failed_quanta = {} for node_id, log_ref in self.failed.items(): - quantum_info: dict[str, Any] = {"data_id": dict(log_ref.dataId.required)} + data_ids = dict(log_ref.dataId.required) + quantum_info: dict[str, Any] = {"data_id": data_ids} if do_store_logs: try: log = butler.get(log_ref) @@ -235,7 +247,17 @@ def to_summary_dict(self, butler: Butler, do_store_logs: bool = True) -> dict[st quantum_info["error"] = [ record.message for record in log if record.levelno >= logging.ERROR ] - failed_quanta[str(node_id)] = quantum_info + if human_readable: + failed_quanta["data_id"] = data_ids + return { + "outputs": {name: r.to_summary_dict() for name, r in self.output_datasets.items()}, + "failed_quanta": failed_quanta, + "n_quanta_blocked": len(self.blocked), + "n_succeeded": self.n_succeeded, + "errors": quantum_info, + } + else: + failed_quanta[str(node_id)] = quantum_info return { "outputs": {name: r.to_summary_dict() for name, r in self.output_datasets.items()}, "failed_quanta": failed_quanta, @@ -274,7 +296,9 @@ class QuantumGraphExecutionReport: tasks: dict[str, TaskExecutionReport] = dataclasses.field(default_factory=dict) """A dictionary of TaskExecutionReports by task label (`dict`).""" - def to_summary_dict(self, butler: Butler, do_store_logs: bool = True) -> dict[str, Any]: + def to_summary_dict( + self, butler: Butler, do_store_logs: bool = True, human_readable: bool = False + ) -> dict[str, Any]: """Summarize the results of the `QuantumGraphExecutionReport` in a dictionary. @@ -284,6 +308,9 @@ def to_summary_dict(self, butler: Butler, do_store_logs: bool = True) -> dict[st The Butler used for this report. do_store_logs : `bool` Store the logs in the summary dictionary. + human_readable : `bool` + Store more human-readable information to be printed out to the + command-line. Returns ------- @@ -291,10 +318,16 @@ def to_summary_dict(self, butler: Butler, do_store_logs: bool = True) -> dict[st A dictionary containing a summary of a `TaskExecutionReport` for each task in the quantum graph. """ - return { - task: report.to_summary_dict(butler, do_store_logs=do_store_logs) - for task, report in self.tasks.items() - } + if human_readable: + return { + task: report.to_summary_dict(butler, do_store_logs=do_store_logs, human_readable=True) + for task, report in self.tasks.items() + } + else: + return { + task: report.to_summary_dict(butler, do_store_logs=do_store_logs) + for task, report in self.tasks.items() + } def write_summary_yaml(self, butler: Butler, filename: str, do_store_logs: bool = True) -> None: """Take the dictionary from