From f8466e0fc5b56cc7aaf6ca4284002ca8ece7ec2c Mon Sep 17 00:00:00 2001 From: Mikolaj Kowalik Date: Tue, 9 Jan 2024 14:40:02 -0500 Subject: [PATCH 1/8] Add a new flag to report() method Added a new flag to the report() method of HTCondorService class that controls if the non-zero exit codes should be included in the report. --- .../lsst/ctrl/bps/htcondor/htcondor_service.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/python/lsst/ctrl/bps/htcondor/htcondor_service.py b/python/lsst/ctrl/bps/htcondor/htcondor_service.py index 826b1e2..b0f7658 100644 --- a/python/lsst/ctrl/bps/htcondor/htcondor_service.py +++ b/python/lsst/ctrl/bps/htcondor/htcondor_service.py @@ -365,7 +365,15 @@ def list_submitted_jobs(self, wms_id=None, user=None, require_bps=True, pass_thr _LOG.debug("job_ids = %s", job_ids) return job_ids - def report(self, wms_workflow_id=None, user=None, hist=0, pass_thru=None, is_global=False): + def report( + self, + wms_workflow_id=None, + user=None, + hist=0, + pass_thru=None, + is_global=False, + return_exit_codes=False, + ): """Return run information based upon given constraints. Parameters @@ -382,6 +390,13 @@ def report(self, wms_workflow_id=None, user=None, hist=0, pass_thru=None, is_glo If set, all job queues (and their histories) will be queried for job information. Defaults to False which means that only the local job queue will be queried. + return_exit_codes : `bool`, optional + If set, return exit codes related to jobs with a + non-success status. Defaults to False, which means that only + the summary state is returned. + + Only applicable in the context of a WMS with associated + handlers to return exit codes from jobs. Returns ------- From b31bcc10664ca6058a370ac4a821e407221d9ded Mon Sep 17 00:00:00 2001 From: Mikolaj Kowalik Date: Mon, 29 Jan 2024 14:33:49 -0500 Subject: [PATCH 2/8] Allow the plugin to report job exit codes --- .../ctrl/bps/htcondor/htcondor_service.py | 53 +++++++++++++++---- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/python/lsst/ctrl/bps/htcondor/htcondor_service.py b/python/lsst/ctrl/bps/htcondor/htcondor_service.py index b0f7658..ddfd871 100644 --- a/python/lsst/ctrl/bps/htcondor/htcondor_service.py +++ b/python/lsst/ctrl/bps/htcondor/htcondor_service.py @@ -1207,6 +1207,7 @@ def _create_detailed_report_from_jobs(wms_workflow_id, jobs): """ _LOG.debug("_create_detailed_report: id = %s, job = %s", wms_workflow_id, jobs[wms_workflow_id]) dag_job = jobs[wms_workflow_id] + task_jobs = {job_id: job_ad for job_id, job_ad in jobs.items() if job_id != wms_workflow_id} report = WmsRunReport( wms_id=f"{dag_job['ClusterId']}.{dag_job['ProcId']}", global_wms_id=dag_job.get("GlobalJobId", "MISS"), @@ -1222,20 +1223,20 @@ def _create_detailed_report_from_jobs(wms_workflow_id, jobs): jobs=[], total_number_jobs=dag_job["total_jobs"], job_state_counts=dag_job["state_counts"], + exit_code_summary=_get_exit_code_summary(task_jobs), ) - for job_id, job_info in jobs.items(): + for job_id, job_info in task_jobs.items(): try: - if job_info["ClusterId"] != int(float(wms_workflow_id)): - job_report = WmsJobReport( - wms_id=job_id, - name=job_info.get("DAGNodeName", job_id), - label=job_info.get("bps_job_label", pegasus_name_to_label(job_info["DAGNodeName"])), - state=_htc_status_to_wms_state(job_info), - ) - if job_report.label == "init": - job_report.label = "pipetaskInit" - report.jobs.append(job_report) + job_report = WmsJobReport( + wms_id=job_id, + name=job_info.get("DAGNodeName", job_id), + label=job_info.get("bps_job_label", pegasus_name_to_label(job_info["DAGNodeName"])), + state=_htc_status_to_wms_state(job_info), + ) + if job_report.label == "init": + job_report.label = "pipetaskInit" + report.jobs.append(job_report) except KeyError as ex: _LOG.error("Job missing key '%s': %s", str(ex), job_info) raise @@ -1407,6 +1408,36 @@ def _get_run_summary(job): return summary +def _get_exit_code_summary(jobs): + """Get the exit code summary for a run. + + Parameters + ---------- + jobs : `dict` [`str`, `dict` [`str`, Any]] + Mapping HTCondor job id to job information. + + Returns + ------- + summary : `dict` [`str`, `list` [`int`]] + Jobs' exit codes per job label. + """ + summary = {} + for job_id, job_ad in jobs.items(): + job_label = job_ad["bps_job_label"] + summary.setdefault(job_label, []) + try: + if job_ad["JobStatus"] in {JobStatus.COMPLETED, JobStatus.HELD}: + try: + exit_code = job_ad["ExitSignal"] if job_ad["ExitBySignal"] else job_ad["ExitCode"] + except KeyError: + _LOG.debug("Cannot determine exit code for job '%s'", job_id) + else: + summary[job_label].append(exit_code) + except KeyError: + _LOG.debug("Attribute 'JobStatus' not found in classad for job '%s'", job_id) + return summary + + def _get_state_counts_from_jobs(wms_workflow_id, jobs): """Count number of jobs per WMS state. From 974401926ce9550d833feaeb0e2f5e2ec857c2d9 Mon Sep 17 00:00:00 2001 From: Mikolaj Kowalik Date: Mon, 29 Jan 2024 16:04:07 -0500 Subject: [PATCH 3/8] Describe changes in the news item --- doc/changes/DM-42127.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 doc/changes/DM-42127.feature.rst diff --git a/doc/changes/DM-42127.feature.rst b/doc/changes/DM-42127.feature.rst new file mode 100644 index 0000000..7be9ac2 --- /dev/null +++ b/doc/changes/DM-42127.feature.rst @@ -0,0 +1 @@ +Added plugin support for reporting error exit codes with ``bps report``. From 9be40a44796a4cbaac055cb7e01837cc1a0c3c21 Mon Sep 17 00:00:00 2001 From: Mikolaj Kowalik Date: Mon, 29 Jan 2024 18:44:58 -0500 Subject: [PATCH 4/8] Update .pre-commit-config.yaml Updated pre-commit config file to use the most recent Black. --- .pre-commit-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 94c65a7..0504cd0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -9,7 +9,7 @@ repos: - id: trailing-whitespace - id: check-toml - repo: https://github.com/psf/black - rev: 23.12.0 + rev: 24.1.1 hooks: - id: black # It is recommended to specify the latest version of Python @@ -24,7 +24,7 @@ repos: name: isort (python) - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.1.8 + rev: v0.1.15 hooks: - id: ruff - repo: https://github.com/numpy/numpydoc From ec941751aebaa3a76d45fd0222e8a8fbed895b18 Mon Sep 17 00:00:00 2001 From: Mikolaj Kowalik Date: Mon, 29 Jan 2024 18:46:28 -0500 Subject: [PATCH 5/8] Fix formatting using the most recent Black --- python/lsst/ctrl/bps/htcondor/htcondor_service.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/lsst/ctrl/bps/htcondor/htcondor_service.py b/python/lsst/ctrl/bps/htcondor/htcondor_service.py index ddfd871..cceca4f 100644 --- a/python/lsst/ctrl/bps/htcondor/htcondor_service.py +++ b/python/lsst/ctrl/bps/htcondor/htcondor_service.py @@ -556,9 +556,9 @@ def from_generic_workflow(cls, config, generic_workflow, out_prefix, service_cla out_prefix, ) if "post" not in final_htjob.dagcmds: - final_htjob.dagcmds[ - "post" - ] = f"{os.path.dirname(__file__)}/final_post.sh {final.name} $DAG_STATUS $RETURN" + final_htjob.dagcmds["post"] = ( + f"{os.path.dirname(__file__)}/final_post.sh {final.name} $DAG_STATUS $RETURN" + ) htc_workflow.dag.add_final_job(final_htjob) elif final and isinstance(final, GenericWorkflow): raise NotImplementedError("HTCondor plugin does not support a workflow as the final job") From b458dab6495ceb16bc63f2f55e78fd5cff74fa1a Mon Sep 17 00:00:00 2001 From: Mikolaj Kowalik Date: Wed, 31 Jan 2024 20:37:46 -0500 Subject: [PATCH 6/8] Optimize memory usage when storing error codes --- python/lsst/ctrl/bps/htcondor/htcondor_service.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/lsst/ctrl/bps/htcondor/htcondor_service.py b/python/lsst/ctrl/bps/htcondor/htcondor_service.py index cceca4f..93fadcc 100644 --- a/python/lsst/ctrl/bps/htcondor/htcondor_service.py +++ b/python/lsst/ctrl/bps/htcondor/htcondor_service.py @@ -1206,8 +1206,7 @@ def _create_detailed_report_from_jobs(wms_workflow_id, jobs): id and the value is a collection of report information for that run. """ _LOG.debug("_create_detailed_report: id = %s, job = %s", wms_workflow_id, jobs[wms_workflow_id]) - dag_job = jobs[wms_workflow_id] - task_jobs = {job_id: job_ad for job_id, job_ad in jobs.items() if job_id != wms_workflow_id} + dag_job = jobs.pop(wms_workflow_id) report = WmsRunReport( wms_id=f"{dag_job['ClusterId']}.{dag_job['ProcId']}", global_wms_id=dag_job.get("GlobalJobId", "MISS"), @@ -1223,10 +1222,10 @@ def _create_detailed_report_from_jobs(wms_workflow_id, jobs): jobs=[], total_number_jobs=dag_job["total_jobs"], job_state_counts=dag_job["state_counts"], - exit_code_summary=_get_exit_code_summary(task_jobs), + exit_code_summary=_get_exit_code_summary(jobs), ) - for job_id, job_info in task_jobs.items(): + for job_id, job_info in jobs.items(): try: job_report = WmsJobReport( wms_id=job_id, @@ -1241,6 +1240,10 @@ def _create_detailed_report_from_jobs(wms_workflow_id, jobs): _LOG.error("Job missing key '%s': %s", str(ex), job_info) raise + # Add the removed entry to restore the original content of the dictionary. + # The ordering of keys will be change permanently though. + jobs.update({wms_workflow_id: dag_job}) + run_reports = {report.wms_id: report} _LOG.debug("_create_detailed_report: run_reports = %s", run_reports) return run_reports From 7e5a97a9742ce5fdbfc10805f32f90ea68a7f510 Mon Sep 17 00:00:00 2001 From: Mikolaj Kowalik Date: Wed, 31 Jan 2024 20:41:22 -0500 Subject: [PATCH 7/8] Refactor _get_exit_code_summary() Modified the function responsible for gathering exit codes to omit error codes for jobs that finished successfully (i.e. returned 0 exit code) to align the plugin behavior with the one already implemented in the PanDA plugin. --- .../ctrl/bps/htcondor/htcondor_service.py | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/python/lsst/ctrl/bps/htcondor/htcondor_service.py b/python/lsst/ctrl/bps/htcondor/htcondor_service.py index 93fadcc..3aaeec1 100644 --- a/python/lsst/ctrl/bps/htcondor/htcondor_service.py +++ b/python/lsst/ctrl/bps/htcondor/htcondor_service.py @@ -1429,15 +1429,27 @@ def _get_exit_code_summary(jobs): job_label = job_ad["bps_job_label"] summary.setdefault(job_label, []) try: - if job_ad["JobStatus"] in {JobStatus.COMPLETED, JobStatus.HELD}: - try: + exit_code = 0 + job_status = job_ad["JobStatus"] + match job_status: + case JobStatus.COMPLETED: exit_code = job_ad["ExitSignal"] if job_ad["ExitBySignal"] else job_ad["ExitCode"] - except KeyError: - _LOG.debug("Cannot determine exit code for job '%s'", job_id) - else: - summary[job_label].append(exit_code) - except KeyError: - _LOG.debug("Attribute 'JobStatus' not found in classad for job '%s'", job_id) + case JobStatus.HELD: + exit_code = job_ad["ExitSignal"] if job_ad["ExitBySignal"] else job_ad["HoldReasonCode"] + case ( + JobStatus.IDLE + | JobStatus.RUNNING + | JobStatus.REMOVED + | JobStatus.TRANSFERRING_OUTPUT + | JobStatus.SUSPENDED + ): + pass + case _: + _LOG.debug("Unknown 'JobStatus' value ('%d') in classad for job '%d'", job_status, job_id) + if exit_code != 0: + summary[job_label].append(exit_code) + except KeyError as ex: + _LOG.debug("Attribute '%s' not found in the classad for job '%s'", ex, job_id) return summary From b1a50bc6cea5960b3a5f549f8c6372369befce7a Mon Sep 17 00:00:00 2001 From: Mikolaj Kowalik Date: Wed, 31 Jan 2024 20:39:50 -0500 Subject: [PATCH 8/8] Add a unit test for _get_exit_code_summary() --- tests/test_htcondor_service.py | 90 ++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 tests/test_htcondor_service.py diff --git a/tests/test_htcondor_service.py b/tests/test_htcondor_service.py new file mode 100644 index 0000000..0baf236 --- /dev/null +++ b/tests/test_htcondor_service.py @@ -0,0 +1,90 @@ +# This file is part of ctrl_bps_htcondor. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This software is dual licensed under the GNU General Public License and also +# under a 3-clause BSD license. Recipients may choose which of these licenses +# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, +# respectively. If you choose the GPL option then the following text applies +# (but note that there is still no warranty even if you opt for BSD instead): +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import unittest + +import htcondor +from lsst.ctrl.bps.htcondor.htcondor_service import _get_exit_code_summary + + +class TestGetExitCodeSummary(unittest.TestCase): + """Unit tests for function responsible for creating exit code summary.""" + + def setUp(self): + self.jobs = { + "1.0": { + "JobStatus": htcondor.JobStatus.IDLE, + "bps_job_label": "foo", + }, + "2.0": { + "JobStatus": htcondor.JobStatus.RUNNING, + "bps_job_label": "foo", + }, + "3.0": { + "JobStatus": htcondor.JobStatus.REMOVED, + "bps_job_label": "foo", + }, + "4.0": { + "ExitCode": 0, + "ExitBySignal": False, + "JobStatus": htcondor.JobStatus.COMPLETED, + "bps_job_label": "bar", + }, + "5.0": { + "ExitCode": 1, + "ExitBySignal": False, + "JobStatus": htcondor.JobStatus.COMPLETED, + "bps_job_label": "bar", + }, + "6.0": { + "ExitBySignal": True, + "ExitSignal": 11, + "JobStatus": htcondor.JobStatus.HELD, + "bps_job_label": "baz", + }, + "7.0": { + "ExitBySignal": False, + "HoldReasonCode": 42, + "JobStatus": htcondor.JobStatus.HELD, + "bps_job_label": "baz", + }, + "8.0": { + "JobStatus": htcondor.JobStatus.TRANSFERRING_OUTPUT, + "bps_job_label": "qux", + }, + "9.0": { + "JobStatus": htcondor.JobStatus.SUSPENDED, + "bps_job_label": "qux", + }, + } + + def tearDown(self): + pass + + def test(self): + actual = _get_exit_code_summary(self.jobs) + expected = {"foo": [], "bar": [1], "baz": [11, 42], "qux": []} + self.assertEqual(actual, expected)