Skip to content

Commit

Permalink
PENG-2193 Added slurm job state to submission payload
Browse files Browse the repository at this point in the history
Now, when Jobs are submitted by the jobbergate agent, the slurm job info
(like job_state, job_info, and reason) are immediately fetched from
sinfo. The payload to the Jobbergate API for the newly submitted job now
includes the slurm job info.

Consequently, the slurm job state info is available immediatley at job
submission time instead of requiring a full cycle of the jobbergate
agent to update the job.
  • Loading branch information
dusktreader committed Apr 8, 2024
1 parent 073ded9 commit 5d3ea9b
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 12 deletions.
15 changes: 11 additions & 4 deletions jobbergate-agent/jobbergate_agent/jobbergate/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
from tempfile import TemporaryDirectory

from buzz import DoExceptParams
from jobbergate_core.tools.sbatch import SubmissionHandler, inject_sbatch_params
from jobbergate_core.tools.sbatch import SubmissionHandler, InfoHandler, inject_sbatch_params
from loguru import logger

from jobbergate_agent.clients.cluster_api import backend_client as jobbergate_api_client
from jobbergate_agent.jobbergate.constants import FileType
from jobbergate_agent.jobbergate.schemas import JobScriptFile, PendingJobSubmission
from jobbergate_agent.jobbergate.schemas import JobScriptFile, PendingJobSubmission, SlurmJobData
from jobbergate_agent.settings import SETTINGS
from jobbergate_agent.utils.exception import JobbergateApiError, JobSubmissionError, handle_errors_async
from jobbergate_agent.utils.logging import log_error
from jobbergate_agent.utils.user_mapper import SlurmUserMapper, manufacture
from jobbergate_agent.jobbergate.update import fetch_job_data


async def retrieve_submission_file(file: JobScriptFile) -> str:
Expand Down Expand Up @@ -118,7 +119,7 @@ async def fetch_pending_submissions() -> list[PendingJobSubmission]:
return pending_job_submissions


async def mark_as_submitted(job_submission_id: int, slurm_job_id: int):
async def mark_as_submitted(job_submission_id: int, slurm_job_id: int, slurm_job_data: SlurmJobData):
"""
Mark job_submission as submitted in the Jobbergate API.
"""
Expand All @@ -133,6 +134,9 @@ async def mark_as_submitted(job_submission_id: int, slurm_job_id: int):
json=dict(
id=job_submission_id,
slurm_job_id=slurm_job_id,
slurm_job_state=slurm_job_data.job_state,
slurm_job_info=slurm_job_data.job_info,
slurm_job_state_reason=slurm_job_data.state_reason,
),
)
response.raise_for_status()
Expand Down Expand Up @@ -256,6 +260,8 @@ async def submit_pending_jobs():
"""
logger.debug("Started submitting pending jobs...")

info_handler = InfoHandler(scontrol_path=SETTINGS.SCONTROL_PATH)

logger.debug("Building user-mapper")
user_mapper = manufacture()

Expand All @@ -271,7 +277,8 @@ async def submit_pending_jobs():
re_raise=False,
):
slurm_job_id = await submit_job_script(pending_job_submission, user_mapper)
slurm_job_data: SlurmJobData = await fetch_job_data(slurm_job_id, info_handler)

await mark_as_submitted(pending_job_submission.id, slurm_job_id)
await mark_as_submitted(pending_job_submission.id, slurm_job_id, slurm_job_data)

logger.debug("...Finished submitting pending jobs")
42 changes: 36 additions & 6 deletions jobbergate-agent/tests/jobbergate/test_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pytest
import respx

from jobbergate_agent.jobbergate.schemas import JobScriptFile, PendingJobSubmission
from jobbergate_agent.jobbergate.schemas import JobScriptFile, PendingJobSubmission, SlurmJobData
from jobbergate_agent.jobbergate.submit import (
fetch_pending_submissions,
get_job_script_file,
Expand Down Expand Up @@ -323,10 +323,23 @@ async def test_mark_as_submitted__success():
update_route = respx.post(f"{SETTINGS.BASE_API_URL}/jobbergate/job-submissions/agent/submitted")
update_route.mock(return_value=httpx.Response(status_code=200))

await mark_as_submitted(1, 111)
await mark_as_submitted(
1,
111,
SlurmJobData(
job_state="RUNNING",
job_info="{}",
)
)
assert update_route.called
last_request = update_route.calls.last.request
assert json.loads(last_request.content) == dict(id=1, slurm_job_id=111)
assert json.loads(last_request.content) == dict(
id=1,
slurm_job_id=111,
slurm_job_state="RUNNING",
slurm_job_info="{}",
slurm_job_state_reason=None,
)


@pytest.mark.asyncio
Expand All @@ -344,7 +357,14 @@ async def test_mark_as_submitted__raises_JobbergateApiError_if_the_response_is_n
JobbergateApiError,
match="Could not mark job submission 1 as submitted",
):
await mark_as_submitted(1, 111)
await mark_as_submitted(
1,
111,
SlurmJobData(
job_state="RUNNING",
job_info="{}",
)
)
assert update_route.called


Expand Down Expand Up @@ -668,6 +688,16 @@ def _mocked_mark_as_submitted(job_submission_id: int, slurm_job_id: int):
"jobbergate_agent.jobbergate.submit.mark_as_submitted", side_effect=_mocked_mark_as_submitted
)

mocked_sbatch = mock.MagicMock()
mocker.patch("jobbergate_agent.jobbergate.submit.InfoHandler", return_value=mocked_sbatch)
mocked_fetch = mocker.patch(
"jobbergate_agent.jobbergate.submit.fetch_job_data",
return_value=SlurmJobData(
job_state="RUNNING",
job_info="{}",
),
)

test_mapper = manufacture()

await submit_pending_jobs()
Expand All @@ -683,8 +713,8 @@ def _mocked_mark_as_submitted(job_submission_id: int, slurm_job_id: int):

mock_mark.assert_has_calls(
[
mocker.call(1, 11),
mocker.call(2, 22),
mocker.call(1, 11, SlurmJobData(job_state="RUNNING", job_info="{}")),
mocker.call(2, 22, SlurmJobData(job_state="RUNNING", job_info="{}")),
]
)
assert mock_mark.call_count == 2
3 changes: 3 additions & 0 deletions jobbergate-api/jobbergate_api/apps/job_submissions/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ async def job_submissions_agent_submitted(
await secure_services.crud.job_submission.update(
submitted_request.id,
slurm_job_id=submitted_request.slurm_job_id,
slurm_job_state=submitted_request.slurm_job_state,
slurm_job_info=submitted_request.slurm_job_info,
report_message=submitted_request.slurm_job_state_reason,
status=JobSubmissionStatus.SUBMITTED,
)
return FastAPIResponse(status_code=status.HTTP_202_ACCEPTED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ class JobSubmissionAgentSubmittedRequest(BaseModel):
"""Request model for marking JobSubmission instances as SUBMITTED."""

id: int
slurm_job_id: Optional[NonNegativeInt]
slurm_job_id: NonNegativeInt
slurm_job_state: SlurmJobState
slurm_job_info: str
slurm_job_state_reason: Optional[str]

class Config:
schema_extra = job_submission_meta_mapper
Expand Down
16 changes: 15 additions & 1 deletion jobbergate-api/tests/apps/job_submissions/test_routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,8 @@ async def test_job_submissions_agent_submitted__success(
payload = dict(
id=inserted_job_submission_id,
slurm_job_id=111,
slurm_job_state=SlurmJobState.RUNNING,
slurm_job_info="Fake slurm job info",
)

inject_security_header("[email protected]", Permissions.JOB_SUBMISSIONS_EDIT, client_id="dummy-client")
Expand All @@ -1299,6 +1301,9 @@ async def test_job_submissions_agent_submitted__success(
assert instance.id == inserted_job_submission_id
assert instance.status == JobSubmissionStatus.SUBMITTED
assert instance.slurm_job_id == payload["slurm_job_id"]
assert instance.slurm_job_state == payload["slurm_job_state"]
assert instance.slurm_job_info == payload["slurm_job_info"]
assert instance.report_message is None


async def test_job_submissions_agent_submitted__fails_if_status_is_not_CREATED(
Expand Down Expand Up @@ -1330,6 +1335,8 @@ async def test_job_submissions_agent_submitted__fails_if_status_is_not_CREATED(
payload = dict(
id=inserted_job_submission_id,
slurm_job_id=111,
slurm_job_state=SlurmJobState.RUNNING,
slurm_job_info="Fake slurm job info",
)

inject_security_header("[email protected]", Permissions.JOB_SUBMISSIONS_EDIT, client_id="dummy-client")
Expand All @@ -1351,7 +1358,12 @@ async def test_job_submissions_agent_submitted__fails_if_token_does_not_carry_cl
inject_security_header("[email protected]", Permissions.JOB_SUBMISSIONS_EDIT)
response = await client.put(
"/jobbergate/job-submissions/agent/1",
json=dict(status=JobSubmissionStatus.SUBMITTED, slurm_job_id=111),
json=dict(
status=JobSubmissionStatus.SUBMITTED,
slurm_job_id=111,
slurm_job_state=SlurmJobState.RUNNING,
slurm_job_info="Fake slurm job info",
),
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "Checked expressions failed: Access token does not contain\\n 1: client_id" in response.text
Expand Down Expand Up @@ -1383,6 +1395,8 @@ async def test_job_submissions_agent_submitted__fails_if_client_id_does_not_matc
payload = dict(
id=inserted_job_submission_id,
slurm_job_id=111,
slurm_job_state=SlurmJobState.RUNNING,
slurm_job_info="Fake slurm job info",
)

inject_security_header("[email protected]", Permissions.JOB_SUBMISSIONS_EDIT, client_id="stupid-client")
Expand Down

0 comments on commit 5d3ea9b

Please sign in to comment.