Skip to content

Commit

Permalink
Add generic methods for retrieving job summary
Browse files Browse the repository at this point in the history
  • Loading branch information
mxk62 committed Apr 22, 2024
1 parent 8242931 commit 9725a20
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 10 deletions.
37 changes: 28 additions & 9 deletions python/lsst/ctrl/bps/bps_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"""Classes and functions used in reporting run status.
"""

__all__ = ["BaseRunReport", "DetailedRunReport", "SummaryRunReport", "ExitCodesReport"]
__all__ = ["BaseRunReport", "DetailedRunReport", "SummaryRunReport", "ExitCodesReport", "get_job_summary"]

import abc
import logging
Expand Down Expand Up @@ -193,13 +193,8 @@ def add(self, run_report, use_global_id=False):
total.append(sum(by_label_expected.values()) if by_label_expected else run_report.total_number_jobs)
self._table.add_row(total)

# Use the provided job summary. If it doesn't exist, compile it from
# information about individual jobs.
if run_report.job_summary:
job_summary = run_report.job_summary
elif run_report.jobs:
job_summary = compile_job_summary(run_report.jobs)
else:
job_summary = get_job_summary(run_report)
if job_summary is None:
id_ = run_report.global_wms_id if use_global_id else run_report.wms_id
self._msg = f"WARNING: Job summary for run '{id_}' not available, report maybe incomplete."
return
Expand Down Expand Up @@ -282,6 +277,30 @@ def __str__(self):
return str("\n".join(lines))


def get_job_summary(run_report):
"""Produce job summary based on the provided run report.
Parameters
----------
run_report : `lsst.ctrl.bps.WmsRunReport`
Information for single run.
Returns
-------
summary : `dict` [`str`, `dict` [`lsst.ctrl.bps.WmsStates`, `int`]] | None
The summary of the execution statuses for each job label in the run.
For each job label, execution statuses are mapped to number of jobs
having a given status. None if the job summary could not be produced.
"""
if run_report.job_summary:
summary = run_report.job_summary
elif run_report.jobs:
summary = compile_job_summary(run_report.jobs)
else:
summary = None
return summary


def compile_job_summary(jobs):
"""Compile job summary from information available for individual jobs.
Expand All @@ -292,7 +311,7 @@ def compile_job_summary(jobs):
Returns
-------
job_summary : `dict` [`str`, dict` [`lsst.ctrl.bps.WmsState`, `int`]]
job_summary : `dict` [`str`, dict` [`lsst.ctrl.bps.WmsStates`, `int`]]
The summary of the execution statuses for each job label in the run.
For each job label, execution statuses are mapped to number of jobs
having a given status.
Expand Down
59 changes: 58 additions & 1 deletion python/lsst/ctrl/bps/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from lsst.utils import doImport

from .bps_reports import DetailedRunReport, ExitCodesReport, SummaryRunReport
from .bps_reports import DetailedRunReport, ExitCodesReport, SummaryRunReport, get_job_summary
from .wms_service import WmsStates

_LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -141,3 +141,60 @@ def report(wms_service, run_id, user, hist_days, pass_thru, is_global=False, ret
if message:
print(message)
print("\n")


def summarize(wms_service, run_id, hist_days=2, pass_thru=None, is_global=False):
"""Provide a job summary for the given run.
Parameters
----------
wms_service : `str`
Name of the class representing a plugin/service for the given WMS.
run_id : `str`
A run id the report will be restricted to.
hist_days : `int`, optional
If non-zero, the records of the completed jobs from the given number
of past days will be queried additionally to the records of
the currently running jobs.
pass_thru : `str`, optional
A string to pass directly to the WMS service class.
is_global : `bool`, optional
If set, all available job queues will be queried for job information.
Defaults to False which means that only a local job queue will be
queried for information.
Only applicable in the context of a WMS using distributed job queues
(e.g., HTCondor).
Returns
-------
summary : `dict` [`str`, `dict` [`lsst.ctrl.bps.WmsState`, `int`]] | None
The summary of the execution statuses for each job label in the run.
For each job label, execution statuses are mapped to number of jobs
having a given status. None if the job summary could not be produced.
"""
wms_service_class = doImport(wms_service)
wms_service = wms_service_class({})
wms_reports, wms_message = wms_service.report(
run_id, user=None, hist=hist_days, pass_thru=pass_thru, is_global=is_global
)

message = ""
params = []
try:
wms_report = wms_reports.pop()
except IndexError:
summary = None
params.append(run_id)
message = "No records found for job id '%s'"
else:
summary = get_job_summary(wms_report)
if summary is None:
params.append(wms_report.global_wms_id if is_global else wms_report.wms_id)
message = "Job summary for run '%s' not available"
if message:
if wms_message:
params.append(wms_message)
message += ": %s"
_LOG.warning(message, *params)
return summary

0 comments on commit 9725a20

Please sign in to comment.