Skip to content

Commit

Permalink
Merge pull request #29 from lsst/tickets/DM-42127
Browse files Browse the repository at this point in the history
DM-42127: Add htcondor support for bps report --return-exit-codes
  • Loading branch information
mxk62 authored Feb 1, 2024
2 parents de23a71 + b1a50bc commit f425817
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 17 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repos:
- id: trailing-whitespace
- id: check-toml
- repo: https://github.com/psf/black
rev: 23.12.0
rev: 24.1.1
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -24,7 +24,7 @@ repos:
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.1.8
rev: v0.1.15
hooks:
- id: ruff
- repo: https://github.com/numpy/numpydoc
Expand Down
1 change: 1 addition & 0 deletions doc/changes/DM-42127.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added plugin support for reporting error exit codes with ``bps report``.
91 changes: 76 additions & 15 deletions python/lsst/ctrl/bps/htcondor/htcondor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,15 @@ def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thr
_LOG.debug("job_ids = %s", job_ids)
return job_ids

def report(self, wms_workflow_id=None, user=None, hist=0, pass_thru=None, is_global=False):
def report(
self,
wms_workflow_id=None,
user=None,
hist=0,
pass_thru=None,
is_global=False,
return_exit_codes=False,
):
"""Return run information based upon given constraints.
Parameters
Expand All @@ -382,6 +390,13 @@ def report(self, wms_workflow_id=None, user=None, hist=0, pass_thru=None, is_glo
If set, all job queues (and their histories) will be queried for
job information. Defaults to False which means that only the local
job queue will be queried.
return_exit_codes : `bool`, optional
If set, return exit codes related to jobs with a
non-success status. Defaults to False, which means that only
the summary state is returned.
Only applicable in the context of a WMS with associated
handlers to return exit codes from jobs.
Returns
-------
Expand Down Expand Up @@ -541,9 +556,9 @@ def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_cla
out_prefix,
)
if "post" not in final_htjob.dagcmds:
final_htjob.dagcmds[
"post"
] = f"{os.path.dirname(__file__)}/final_post.sh {final.name} $DAG_STATUS $RETURN"
final_htjob.dagcmds["post"] = (
f"{os.path.dirname(__file__)}/final_post.sh {final.name} $DAG_STATUS $RETURN"
)
htc_workflow.dag.add_final_job(final_htjob)
elif final and isinstance(final, GenericWorkflow):
raise NotImplementedError("HTCondor plugin does not support a workflow as the final job")
Expand Down Expand Up @@ -1191,7 +1206,7 @@ def _create_detailed_report_from_jobs(wms_workflow_id, jobs):
id and the value is a collection of report information for that run.
"""
_LOG.debug("_create_detailed_report: id = %s, job = %s", wms_workflow_id, jobs[wms_workflow_id])
dag_job = jobs[wms_workflow_id]
dag_job = jobs.pop(wms_workflow_id)
report = WmsRunReport(
wms_id=f"{dag_job['ClusterId']}.{dag_job['ProcId']}",
global_wms_id=dag_job.get("GlobalJobId", "MISS"),
Expand All @@ -1207,24 +1222,28 @@ def _create_detailed_report_from_jobs(wms_workflow_id, jobs):
jobs=[],
total_number_jobs=dag_job["total_jobs"],
job_state_counts=dag_job["state_counts"],
exit_code_summary=_get_exit_code_summary(jobs),
)

for job_id, job_info in jobs.items():
try:
if job_info["ClusterId"] != int(float(wms_workflow_id)):
job_report = WmsJobReport(
wms_id=job_id,
name=job_info.get("DAGNodeName", job_id),
label=job_info.get("bps_job_label", pegasus_name_to_label(job_info["DAGNodeName"])),
state=_htc_status_to_wms_state(job_info),
)
if job_report.label == "init":
job_report.label = "pipetaskInit"
report.jobs.append(job_report)
job_report = WmsJobReport(
wms_id=job_id,
name=job_info.get("DAGNodeName", job_id),
label=job_info.get("bps_job_label", pegasus_name_to_label(job_info["DAGNodeName"])),
state=_htc_status_to_wms_state(job_info),
)
if job_report.label == "init":
job_report.label = "pipetaskInit"
report.jobs.append(job_report)
except KeyError as ex:
_LOG.error("Job missing key '%s': %s", str(ex), job_info)
raise

# Add the removed entry to restore the original content of the dictionary.
# The ordering of keys will be change permanently though.
jobs.update({wms_workflow_id: dag_job})

run_reports = {report.wms_id: report}
_LOG.debug("_create_detailed_report: run_reports = %s", run_reports)
return run_reports
Expand Down Expand Up @@ -1392,6 +1411,48 @@ def _get_run_summary(job):
return summary


def _get_exit_code_summary(jobs):
"""Get the exit code summary for a run.
Parameters
----------
jobs : `dict` [`str`, `dict` [`str`, Any]]
Mapping HTCondor job id to job information.
Returns
-------
summary : `dict` [`str`, `list` [`int`]]
Jobs' exit codes per job label.
"""
summary = {}
for job_id, job_ad in jobs.items():
job_label = job_ad["bps_job_label"]
summary.setdefault(job_label, [])
try:
exit_code = 0
job_status = job_ad["JobStatus"]
match job_status:
case JobStatus.COMPLETED:
exit_code = job_ad["ExitSignal"] if job_ad["ExitBySignal"] else job_ad["ExitCode"]
case JobStatus.HELD:
exit_code = job_ad["ExitSignal"] if job_ad["ExitBySignal"] else job_ad["HoldReasonCode"]
case (
JobStatus.IDLE
| JobStatus.RUNNING
| JobStatus.REMOVED
| JobStatus.TRANSFERRING_OUTPUT
| JobStatus.SUSPENDED
):
pass
case _:
_LOG.debug("Unknown 'JobStatus' value ('%d') in classad for job '%d'", job_status, job_id)
if exit_code != 0:
summary[job_label].append(exit_code)
except KeyError as ex:
_LOG.debug("Attribute '%s' not found in the classad for job '%s'", ex, job_id)
return summary


def _get_state_counts_from_jobs(wms_workflow_id, jobs):
"""Count number of jobs per WMS state.
Expand Down
90 changes: 90 additions & 0 deletions tests/test_htcondor_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# This file is part of ctrl_bps_htcondor.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This software is dual licensed under the GNU General Public License and also
# under a 3-clause BSD license. Recipients may choose which of these licenses
# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
# respectively. If you choose the GPL option then the following text applies
# (but note that there is still no warranty even if you opt for BSD instead):
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import unittest

import htcondor
from lsst.ctrl.bps.htcondor.htcondor_service import _get_exit_code_summary


class TestGetExitCodeSummary(unittest.TestCase):
"""Unit tests for function responsible for creating exit code summary."""

def setUp(self):
self.jobs = {
"1.0": {
"JobStatus": htcondor.JobStatus.IDLE,
"bps_job_label": "foo",
},
"2.0": {
"JobStatus": htcondor.JobStatus.RUNNING,
"bps_job_label": "foo",
},
"3.0": {
"JobStatus": htcondor.JobStatus.REMOVED,
"bps_job_label": "foo",
},
"4.0": {
"ExitCode": 0,
"ExitBySignal": False,
"JobStatus": htcondor.JobStatus.COMPLETED,
"bps_job_label": "bar",
},
"5.0": {
"ExitCode": 1,
"ExitBySignal": False,
"JobStatus": htcondor.JobStatus.COMPLETED,
"bps_job_label": "bar",
},
"6.0": {
"ExitBySignal": True,
"ExitSignal": 11,
"JobStatus": htcondor.JobStatus.HELD,
"bps_job_label": "baz",
},
"7.0": {
"ExitBySignal": False,
"HoldReasonCode": 42,
"JobStatus": htcondor.JobStatus.HELD,
"bps_job_label": "baz",
},
"8.0": {
"JobStatus": htcondor.JobStatus.TRANSFERRING_OUTPUT,
"bps_job_label": "qux",
},
"9.0": {
"JobStatus": htcondor.JobStatus.SUSPENDED,
"bps_job_label": "qux",
},
}

def tearDown(self):
pass

def test(self):
actual = _get_exit_code_summary(self.jobs)
expected = {"foo": [], "bar": [1], "baz": [11, 42], "qux": []}
self.assertEqual(actual, expected)

0 comments on commit f425817

Please sign in to comment.