Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ASP-2220-data-normalization-agent #71

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ This file keeps track of all notable changes to the Cluster Agent.
Unreleased
----------

* Modified the interface with Jobbergate-API to address the new data model on the back-end

2.2.3 2023-07-02
---------------------

* Added support to set slurm restd version dynamically

2.2.2 2023-02-28
Expand Down
10 changes: 6 additions & 4 deletions cluster_agent/jobbergate/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def fetch_pending_submissions() -> List[PendingJobSubmission]:
response = await backend_client.get("/jobbergate/job-submissions/agent/pending")
response.raise_for_status()
pending_job_submissions = [
PendingJobSubmission(**pjs) for pjs in response.json()
PendingJobSubmission(**pjs) for pjs in response.json().get("items", [])
]

logger.debug(f"Retrieved {len(pending_job_submissions)} pending job submissions")
Expand All @@ -39,7 +39,9 @@ async def fetch_active_submissions() -> List[ActiveJobSubmission]:
):
response = await backend_client.get("jobbergate/job-submissions/agent/active")
response.raise_for_status()
active_job_submissions = [ActiveJobSubmission(**ajs) for ajs in response.json()]
active_job_submissions = [
ActiveJobSubmission(**ajs) for ajs in response.json().get("items", [])
]

logger.debug(f"Retrieved {len(active_job_submissions)} active job submissions")
return active_job_submissions
Expand All @@ -60,7 +62,7 @@ async def mark_as_submitted(job_submission_id: int, slurm_job_id: int):
response = await backend_client.put(
f"jobbergate/job-submissions/agent/{job_submission_id}",
json=dict(
new_status=JobSubmissionStatus.SUBMITTED,
status=JobSubmissionStatus.SUBMITTED,
slurm_job_id=slurm_job_id,
),
)
Expand Down Expand Up @@ -111,6 +113,6 @@ async def update_status(
):
response = await backend_client.put(
f"jobbergate/job-submissions/agent/{job_submission_id}",
json=dict(new_status=status, report_message=report_message),
json=dict(status=status, report_message=report_message),
)
response.raise_for_status()
7 changes: 7 additions & 0 deletions cluster_agent/jobbergate/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
from typing import DefaultDict


class FileType(str, Enum):
"""File type enum."""

ENTRYPOINT = "ENTRYPOINT"
SUPPORT = "SUPPORT"


class JobSubmissionStatus(str, Enum):
"""
Enumeration of possible job_submission statuses.
Expand Down
32 changes: 20 additions & 12 deletions cluster_agent/jobbergate/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,26 @@

import pydantic

from cluster_agent.jobbergate.constants import JobSubmissionStatus, status_map
from cluster_agent.jobbergate.constants import FileType, JobSubmissionStatus, status_map


class JobScriptFiles(pydantic.BaseModel, extra=pydantic.Extra.ignore):
"""
Model containing job-script files.
"""
class JobScriptFile(pydantic.BaseModel, extra=pydantic.Extra.ignore):
"""Model for the job_script_files field of the JobScript resource."""

parent_id: int
filename: str
file_type: FileType
parent_id: int

@property
def path(self) -> str:
return f"/jobbergate/job-scripts/{self.parent_id}/upload/{self.filename}"


main_file_path: Path
files: Dict[Path, str]
class JobScript(pydantic.BaseModel, extra=pydantic.Extra.ignore):
"""Model to match database for the JobScript resource."""

files: List[JobScriptFile]


class PendingJobSubmission(pydantic.BaseModel, extra=pydantic.Extra.ignore):
Expand All @@ -22,13 +32,11 @@ class PendingJobSubmission(pydantic.BaseModel, extra=pydantic.Extra.ignore):
"""

id: int
job_submission_name: str
job_submission_owner_email: str
name: str
owner_email: str
execution_directory: Optional[Path]
execution_parameters: Dict[str, Any] = pydantic.Field(default_factory=dict)
job_script_name: str
application_name: str
job_script_files: JobScriptFiles
job_script: JobScript


class ActiveJobSubmission(pydantic.BaseModel, extra=pydantic.Extra.ignore):
Expand Down
48 changes: 22 additions & 26 deletions cluster_agent/jobbergate/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from buzz import handle_errors
from loguru import logger

from cluster_agent.identity.cluster_api import backend_client
from cluster_agent.identity.slurm_user.factory import manufacture
from cluster_agent.identity.slurm_user.mappers import SlurmUserMapper
from cluster_agent.identity.slurmrestd import backend_client as slurmrestd_client
Expand All @@ -13,7 +14,7 @@
fetch_pending_submissions,
mark_as_submitted,
)
from cluster_agent.jobbergate.constants import JobSubmissionStatus
from cluster_agent.jobbergate.constants import FileType, JobSubmissionStatus
from cluster_agent.jobbergate.schemas import (
PendingJobSubmission,
SlurmJobParams,
Expand All @@ -30,23 +31,6 @@
from cluster_agent.utils.logging import log_error


def get_job_script(pending_job_submission: PendingJobSubmission) -> str:
"""
Get the job script from a PendingJobSubmission object.
Raise JobSubmissionError if no job script is found or if its empty.
"""
job_script = pending_job_submission.job_script_files.files.get(
pending_job_submission.job_script_files.main_file_path, ""
)

JobSubmissionError.require_condition(
bool(job_script),
"Could not find an executable script in retrieved job script data.",
)

return job_script


def unpack_error_from_slurm_response(response: SlurmSubmitResponse) -> str:
"""
Unpack the error message from the response of a slurmrestd request.
Expand Down Expand Up @@ -86,36 +70,48 @@ async def submit_job_script(
raise_exc_class=JobSubmissionError,
do_except=notify_submission_rejected.report_error,
):
email = pending_job_submission.job_submission_owner_email
name = pending_job_submission.application_name
email = pending_job_submission.owner_email
name = pending_job_submission.name
mapper_class_name = user_mapper.__class__.__name__
logger.debug(
f"Fetching username for email {email} with mapper {mapper_class_name}"
)
username = await user_mapper.find_username(email)
logger.debug(f"Using local slurm user {username} for job submission")

job_script = get_job_script(pending_job_submission)

submit_dir = (
pending_job_submission.execution_directory
or SETTINGS.DEFAULT_SLURM_WORK_DIR
)

for path, file_content in pending_job_submission.job_script_files.files.items():
local_script_path = submit_dir / path
job_script = None

for metadata in pending_job_submission.job_script.files:
local_script_path = submit_dir / metadata.filename
local_script_path.parent.mkdir(parents=True, exist_ok=True)
local_script_path.write_text(file_content)

response = await backend_client.get(metadata.path)
response.raise_for_status()
local_script_path.write_bytes(response.content)

if metadata.file_type == FileType.ENTRYPOINT:
job_script = response.content.decode("utf-8")

logger.debug(f"Copied job script file to {local_script_path}")

JobSubmissionError.require_condition(
job_script,
"Could not find an executable script in retrieved job script data.",
)

async with handle_errors_async(
"Failed to extract Slurm parameters",
raise_exc_class=SlurmParameterParserError,
do_except=notify_submission_rejected.report_error,
):
job_parameters = get_job_parameters(
pending_job_submission.execution_parameters,
name=pending_job_submission.application_name,
name=name,
current_working_directory=submit_dir,
standard_output=submit_dir / f"{name}.out",
standard_error=submit_dir / f"{name}.err",
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

here = dirname(__file__)

_VERSION = "2.2.2"
_VERSION = "2.2.3"

setup(
name="ovs-cluster-agent",
Expand Down
24 changes: 12 additions & 12 deletions tests/jobbergate/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@ def dummy_template_source():


@pytest.fixture
def dummy_job_script_files(dummy_template_source):
return {
"main_file_path": "application.sh",
"files": {"application.sh": dummy_template_source},
}
def dummy_job_script_files():
return [
{
"parent_id": 1,
"filename": "application.sh",
"file_type": "ENTRYPOINT",
},
]


@pytest.fixture
def dummy_pending_job_submission_data(dummy_job_script_files):
def dummy_pending_job_submission_data(dummy_job_script_files, tmp_path):
"""
Provide a fixture that returns a dict that is compatible with PendingJobSubmission.
"""
return dict(
id=1,
job_submission_name="sub1",
job_submission_owner_email="[email protected]",
job_script_id=11,
job_script_name="script1",
job_script_files=dummy_job_script_files,
application_name="app1",
name="sub1",
owner_email="[email protected]",
job_script={"files": dummy_job_script_files},
slurm_job_id=13,
)
Loading