From 9725a20b6df3376859137f3e753d1a132c8056c0 Mon Sep 17 00:00:00 2001 From: Mikolaj Kowalik Date: Mon, 22 Apr 2024 12:32:02 -0400 Subject: [PATCH] Add generic methods for retrieving job summary --- python/lsst/ctrl/bps/bps_reports.py | 37 +++++++++++++----- python/lsst/ctrl/bps/report.py | 59 ++++++++++++++++++++++++++++- 2 files changed, 86 insertions(+), 10 deletions(-) diff --git a/python/lsst/ctrl/bps/bps_reports.py b/python/lsst/ctrl/bps/bps_reports.py index ee48dcfe..9e8f1132 100644 --- a/python/lsst/ctrl/bps/bps_reports.py +++ b/python/lsst/ctrl/bps/bps_reports.py @@ -28,7 +28,7 @@ """Classes and functions used in reporting run status. """ -__all__ = ["BaseRunReport", "DetailedRunReport", "SummaryRunReport", "ExitCodesReport"] +__all__ = ["BaseRunReport", "DetailedRunReport", "SummaryRunReport", "ExitCodesReport", "get_job_summary"] import abc import logging @@ -193,13 +193,8 @@ def add(self, run_report, use_global_id=False): total.append(sum(by_label_expected.values()) if by_label_expected else run_report.total_number_jobs) self._table.add_row(total) - # Use the provided job summary. If it doesn't exist, compile it from - # information about individual jobs. - if run_report.job_summary: - job_summary = run_report.job_summary - elif run_report.jobs: - job_summary = compile_job_summary(run_report.jobs) - else: + job_summary = get_job_summary(run_report) + if job_summary is None: id_ = run_report.global_wms_id if use_global_id else run_report.wms_id self._msg = f"WARNING: Job summary for run '{id_}' not available, report maybe incomplete." return @@ -282,6 +277,30 @@ def __str__(self): return str("\n".join(lines)) +def get_job_summary(run_report): + """Produce job summary based on the provided run report. + + Parameters + ---------- + run_report : `lsst.ctrl.bps.WmsRunReport` + Information for single run. + + Returns + ------- + summary : `dict` [`str`, `dict` [`lsst.ctrl.bps.WmsStates`, `int`]] | None + The summary of the execution statuses for each job label in the run. + For each job label, execution statuses are mapped to number of jobs + having a given status. None if the job summary could not be produced. + """ + if run_report.job_summary: + summary = run_report.job_summary + elif run_report.jobs: + summary = compile_job_summary(run_report.jobs) + else: + summary = None + return summary + + def compile_job_summary(jobs): """Compile job summary from information available for individual jobs. @@ -292,7 +311,7 @@ def compile_job_summary(jobs): Returns ------- - job_summary : `dict` [`str`, dict` [`lsst.ctrl.bps.WmsState`, `int`]] + job_summary : `dict` [`str`, dict` [`lsst.ctrl.bps.WmsStates`, `int`]] The summary of the execution statuses for each job label in the run. For each job label, execution statuses are mapped to number of jobs having a given status. diff --git a/python/lsst/ctrl/bps/report.py b/python/lsst/ctrl/bps/report.py index 965325b9..6704a9d8 100644 --- a/python/lsst/ctrl/bps/report.py +++ b/python/lsst/ctrl/bps/report.py @@ -35,7 +35,7 @@ from lsst.utils import doImport -from .bps_reports import DetailedRunReport, ExitCodesReport, SummaryRunReport +from .bps_reports import DetailedRunReport, ExitCodesReport, SummaryRunReport, get_job_summary from .wms_service import WmsStates _LOG = logging.getLogger(__name__) @@ -141,3 +141,60 @@ def report(wms_service, run_id, user, hist_days, pass_thru, is_global=False, ret if message: print(message) print("\n") + + +def summarize(wms_service, run_id, hist_days=2, pass_thru=None, is_global=False): + """Provide a job summary for the given run. + + Parameters + ---------- + wms_service : `str` + Name of the class representing a plugin/service for the given WMS. + run_id : `str` + A run id the report will be restricted to. + hist_days : `int`, optional + If non-zero, the records of the completed jobs from the given number + of past days will be queried additionally to the records of + the currently running jobs. + pass_thru : `str`, optional + A string to pass directly to the WMS service class. + is_global : `bool`, optional + If set, all available job queues will be queried for job information. + Defaults to False which means that only a local job queue will be + queried for information. + + Only applicable in the context of a WMS using distributed job queues + (e.g., HTCondor). + + Returns + ------- + summary : `dict` [`str`, `dict` [`lsst.ctrl.bps.WmsState`, `int`]] | None + The summary of the execution statuses for each job label in the run. + For each job label, execution statuses are mapped to number of jobs + having a given status. None if the job summary could not be produced. + """ + wms_service_class = doImport(wms_service) + wms_service = wms_service_class({}) + wms_reports, wms_message = wms_service.report( + run_id, user=None, hist=hist_days, pass_thru=pass_thru, is_global=is_global + ) + + message = "" + params = [] + try: + wms_report = wms_reports.pop() + except IndexError: + summary = None + params.append(run_id) + message = "No records found for job id '%s'" + else: + summary = get_job_summary(wms_report) + if summary is None: + params.append(wms_report.global_wms_id if is_global else wms_report.wms_id) + message = "Job summary for run '%s' not available" + if message: + if wms_message: + params.append(wms_message) + message += ": %s" + _LOG.warning(message, *params) + return summary