Skip to content

Commit

Permalink
Merge pull request #54 from lsst/tickets/DM-48752
Browse files Browse the repository at this point in the history
DM-48752: Improve reporting of provisioning job.
  • Loading branch information
MichelleGower authored Feb 26, 2025
2 parents c7631ea + e3e5c0c commit eb99b16
Show file tree
Hide file tree
Showing 35 changed files with 3,641 additions and 85 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ repos:
rev: "v1.8.0"
hooks:
- id: numpydoc-validation
exclude: tests/data/.*
1 change: 1 addition & 0 deletions doc/changes/DM-48752.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved reporting of provisioning job in atypical situations.
26 changes: 26 additions & 0 deletions doc/lsst.ctrl.bps.htcondor/userguide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ from files. So, the detailed report can distinguish between failed and
deleted jobs, and thus will show ``D`` in the flag column for a running
workflow if there is a deleted job.

Rarely, a detailed report may warn about job submission issues. For example:

.. code-block:: bash
Warn: Job submission issues (last: 01/30/25 10:36:57)
A job submission issue could be intermittent or not. It may cause
problems with the status or counts in the reports. To get more information
about the submission issue, look in the ``*.dag.dagman.out`` file for
errors, in particular lines containing ``submit attempt failed``.

Occasionally, some jobs are put on hold by HTCondor. To see the reason why
jobs are being held, use

Expand Down Expand Up @@ -276,12 +287,27 @@ Look for the line starting with "Provisioning job status". For example
calibrate 0 0 1 0 0 0 0 0 0 0 0 1
finalJob 0 0 1 0 0 0 0 0 0 0 0 1
If the provisioning job status is UNREADY, check the end of the report to see
if there is a warning about submission issues. There may be a temporary problem.
Check the ``*.dag.dagman.out`` in run submit directory for errors, in
particular for ``ERROR: submit attempt failed``.

If the provisioning job status is HELD, the hold reason will appear in parentheses.

The service job managing the glideins will be automatically canceled once the
workflow is completed. However, the existing glideins will be left for
HTCondor to shut them down once they remain inactive for the period specified
by ``provisioningMaxIdleTime`` (default value: 15 min., see below) or maximum
wall time is reached.

The provisioning job is expected to run as long as the workflow. If the job
dies, the job status will be `FAILED`. If the job just completed successfully,
the job status will be `SUCCEEDED` with a message saying it ended early (which
may or may not cause a problem since existing glideins could remain running).
To get more information about either of these cases, check the job output
and error files in the `jobs/provisioningJob` subdirectory.


If the automatic provisioning of the resources is enabled, the script that the
service job is supposed to run in order to provide the required resources *must
be* defined by the ``provisioningScript`` setting in the ``provisioning``
Expand Down
9 changes: 9 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ convention = "numpy"
# not fit on one line.
add-ignore = ["D107", "D105", "D102", "D100", "D200", "D205", "D400", "D104"]

[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"raise AssertionError",
"raise NotImplementedError",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
]

[tool.ruff]
target-version = "py311"
line-length = 110
Expand Down
113 changes: 81 additions & 32 deletions python/lsst/ctrl/bps/htcondor/htcondor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
read_dag_log,
read_dag_status,
read_node_status,
summary_from_dag,
summarize_dag,
write_dag_info,
)
from .provisioner import Provisioner
Expand Down Expand Up @@ -154,7 +154,7 @@ def prepare(self, config, generic_workflow, out_prefix=None):
if enable_provisioning:
provisioner = Provisioner(config)
provisioner.configure()
provisioner.prepare("provisioning_job.bash", prefix=out_prefix)
provisioner.prepare("provisioningJob.bash", prefix=out_prefix)
provisioner.provision(workflow.dag)

with time_this(
Expand Down Expand Up @@ -1317,9 +1317,9 @@ def _create_detailed_report_from_jobs(
job_state_counts=dag_ad.get("state_counts", state_counts),
exit_code_summary=_get_exit_code_summary(jobs),
)

specific_info = WmsSpecificInfo()
for job_id, job_ad in jobs.items():
if not is_service_job(job_id):
if not is_service_job(job_ad):
try:
job_report = WmsJobReport(
wms_id=job_id,
Expand All @@ -1334,33 +1334,85 @@ def _create_detailed_report_from_jobs(
_LOG.error("Job missing key '%s': %s", str(ex), job_ad)
raise
else:
job_label = job_ad.get("bps_job_label")
if job_label is None:
_LOG.warning("Service job with id '%s': missing label, no action taken", job_id)
elif job_label == dag_ad.get("bps_provisioning_job", "MISS"):
report.specific_info = WmsSpecificInfo()
job_status = _htc_status_to_wms_state(job_ad)
if job_status == WmsStates.DELETED:
if "Reason" in job_ad and "Removed by DAGMan" in job_ad["Reason"]:
job_status = WmsStates.SUCCEEDED
report.specific_info.add_message(
template="Provisioning job status: {status}",
context={"status": job_status.name},
)
else:
_LOG.warning(
"Service job with id '%s' (label '%s'): no handler, no action taken", job_id, job_label
)
_LOG.debug(
"Found service job: id='%s', name='%s', label='%s', NodeStatus='%s', JobStatus='%s'",
job_id,
job_ad["DAGNodeName"],
job_ad.get("bps_job_label", "MISS"),
job_ad.get("NodeStatus", "MISS"),
job_ad.get("JobStatus", "MISS"),
)
_add_service_job_specific_info(job_ad, specific_info)

if specific_info:
report.specific_info = specific_info

# Add the removed entry to restore the original content of the dictionary.
# The ordering of keys will be change permanently though.
jobs.update({wms_workflow_id: dag_ad})

# Workflow will exit with non-zero DAG_STATUS if problem with
# any of the wms jobs. So change FAILED to SUCCEEDED if all
# payload jobs SUCCEEDED.
if report.total_number_jobs == report.job_state_counts[WmsStates.SUCCEEDED]:
report.state = WmsStates.SUCCEEDED

run_reports = {report.wms_id: report}
_LOG.debug("_create_detailed_report: run_reports = %s", run_reports)
return run_reports


def _add_service_job_specific_info(job_ad: dict[str, Any], specific_info: WmsSpecificInfo) -> None:
"""Generate report information for service job.
Parameters
----------
job_ad : `dict` [`str`, `Any`]
Provisioning job information.
specific_info : `lsst.ctrl.bps.WmsSpecificInfo`
Where to add message.
"""
status_details = ""
job_status = _htc_status_to_wms_state(job_ad)

# Service jobs in queue are deleted when DAG is done.
# To get accurate status, need to check other info.
if (
job_status == WmsStates.DELETED
and "Reason" in job_ad
and (
"Removed by DAGMan" in job_ad["Reason"]
or "removed because <OtherJobRemoveRequirements = DAGManJobId =?=" in job_ad["Reason"]
or "DAG is exiting and writing rescue file." in job_ad["Reason"]
)
):
if "HoldReason" in job_ad:
# HoldReason exists even if released, so check.
if "job_released_time" in job_ad and job_ad["job_held_time"] < job_ad["job_released_time"]:
# If released, assume running until deleted.
job_status = WmsStates.SUCCEEDED
status_details = ""
else:
# If job held when deleted by DAGMan, still want to
# report hold reason
status_details = f"(Job was held for the following reason: {job_ad['HoldReason']})"

else:
job_status = WmsStates.SUCCEEDED
elif job_status == WmsStates.SUCCEEDED:
status_details = "(Note: Finished before workflow.)"
elif job_status == WmsStates.HELD:
status_details = f"({job_ad['HoldReason']})"

template = "Status of {job_name}: {status} {status_details}"
context = {
"job_name": job_ad["DAGNodeName"],
"status": job_status.name,
"status_details": status_details,
}
specific_info.add_message(template=template, context=context)


def _summary_report(user, hist, pass_thru, schedds=None):
"""Gather run information to be used in generating summary reports.
Expand Down Expand Up @@ -1509,7 +1561,7 @@ def _get_run_summary(job):
"""
summary = job.get("bps_job_summary", job.get("bps_run_summary", None))
if not summary:
summary, _ = summary_from_dag(job["Iwd"])
summary, _, _ = summarize_dag(job["Iwd"])
if not summary:
_LOG.warning("Could not get run summary for htcondor job: %s", job)
_LOG.debug("_get_run_summary: summary=%s", summary)
Expand Down Expand Up @@ -1587,7 +1639,7 @@ def _get_state_counts_from_jobs(
"""
state_counts = dict.fromkeys(WmsStates, 0)
for job_id, job_ad in jobs.items():
if job_id != wms_workflow_id and not is_service_job(job_id):
if job_id != wms_workflow_id and not is_service_job(job_ad):
state_counts[_htc_status_to_wms_state(job_ad)] += 1
total_counted = sum(state_counts.values())

Expand Down Expand Up @@ -2143,13 +2195,13 @@ def _gather_site_values(config, compute_site):
return site_values


def is_service_job(job_id: str) -> bool:
def is_service_job(job_ad: dict[str, Any]) -> bool:
"""Determine if a job is a service one.
Parameters
----------
job_id : str
HTCondor job id.
job_ad : `dict` [`str`, Any]
Information about an HTCondor job.
Returns
-------
Expand All @@ -2159,10 +2211,7 @@ def is_service_job(job_id: str) -> bool:
Notes
-----
At the moment, HTCondor does not provide a native way to distinguish
between payload and service jobs in the workflow. As a result, the current
implementation depends entirely on the logic that is used in
:py:func:`read_node_status()` (service jobs are given ids with ClusterId=0
and ProcId=some integer). If it changes, this function needs to be
updated too.
between payload and service jobs in the workflow. This code depends
on read_node_status adding bps_job_type.
"""
return int(float(job_id)) == 0
return job_ad.get("bps_job_type", "MISSING") == "service"
Loading

0 comments on commit eb99b16

Please sign in to comment.