diff --git a/.gitignore b/.gitignore index dd41de446..114814b0c 100644 --- a/.gitignore +++ b/.gitignore @@ -111,3 +111,5 @@ site/ # local dotenv files *.env + +*.msgpack diff --git a/jobbergate-api/CHANGELOG.md b/jobbergate-api/CHANGELOG.md index 8b0c0e1b1..7be49beff 100644 --- a/jobbergate-api/CHANGELOG.md +++ b/jobbergate-api/CHANGELOG.md @@ -4,6 +4,8 @@ This file keeps track of all notable changes to jobbergate-api ## Unreleased +- Implemented logic for receiving job metrics from the agent and making them available for a client [[PENG-2456](https://sharing.clickup.com/t/h/c/18022949/PENG-2456/43YCG8RCBBQJELQ)] + ## 5.4.0 -- 2024-11-18 - Added clone capability to job submissions so they can be resubmitted to the cluster when needed [PENG-1676, ASP-4597] diff --git a/jobbergate-api/Makefile b/jobbergate-api/Makefile index 36ff8efa3..2da5b96be 100644 --- a/jobbergate-api/Makefile +++ b/jobbergate-api/Makefile @@ -46,6 +46,7 @@ clean: ## Clean up build artifacts and other junk @rm -fr build/ @rm -fr dist/ @rm -fr *.egg-info + @find . -name '*.msgpack' -delete # Recipe stolen from: https://gist.github.com/prwhite/8168133?permalink_comment_id=4160123#gistcomment-4160123 help: ## Show help message diff --git a/jobbergate-api/alembic/versions/20241128_162020--99c3877d0f10_create_job_metric_table.py b/jobbergate-api/alembic/versions/20241128_162020--99c3877d0f10_create_job_metric_table.py new file mode 100644 index 000000000..878c0c020 --- /dev/null +++ b/jobbergate-api/alembic/versions/20241128_162020--99c3877d0f10_create_job_metric_table.py @@ -0,0 +1,426 @@ +"""create job metric table + +Revision ID: 99c3877d0f10 +Revises: 815022877cfe +Create Date: 2024-11-28 16:20:20.533111 + +""" + +import time +from datetime import datetime, timezone +from textwrap import dedent + +import sqlalchemy as sa +from sqlalchemy.types import TypeDecorator, DateTime + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "99c3877d0f10" +down_revision = "815022877cfe" +branch_labels = None +depends_on = None + + +create_materialized_view_template = dedent( + """ + CREATE MATERIALIZED VIEW {view_name} + WITH (timescaledb.continuous) AS + SELECT job_submission_id, + node_host, + time_bucket('{time_bucket}', time) AS bucket, + AVG(cpu_frequency) AS cpu_frequency, + SUM(cpu_time) AS cpu_time, + AVG(cpu_utilization) AS cpu_utilization, + AVG(gpu_memory) AS gpu_memory, + AVG(gpu_utilization) AS gpu_utilization, + SUM(page_faults) AS page_faults, + AVG(memory_rss) AS memory_rss, + AVG(memory_virtual) AS memory_virtual, + SUM(disk_read) AS disk_read, + SUM(disk_write) AS disk_write + FROM job_submission_metrics + GROUP BY job_submission_id, node_host, bucket + WITH NO DATA + """ +) + +add_continuous_aggregate_policy_template = dedent( + """ + SELECT add_continuous_aggregate_policy('{mat_view_name}', + start_offset => INTERVAL '{start_offset}', + end_offset => INTERVAL '{end_offset}', + schedule_interval => INTERVAL '{schedule_interval}', + initial_start => '{initial_start}') + """ +) + +remove_continuous_aggregate_policy_template = "SELECT remove_continuous_aggregate_policy('{mat_view_name}')" + +drop_materialized_view_template = "DROP MATERIALIZED VIEW {view_name}" + + +class TimestampInt(TypeDecorator): + impl = DateTime(timezone=True) + + def process_bind_param(self, value: int | None, dialect: sa.Dialect) -> datetime | None: + if value is not None: + return datetime.fromtimestamp(value, tz=timezone.utc) + return value + + def process_result_value(self, value: datetime | None, dialect: sa.Dialect) -> int | None: + if value is not None: + return int(value.timestamp()) + return value + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "job_submission_metrics", + sa.Column( + "time", + TimestampInt(), + nullable=False, + ), + sa.Column("job_submission_id", sa.Integer(), nullable=False), + sa.Column("slurm_job_id", sa.Integer(), nullable=False), + sa.Column("node_host", sa.String(), nullable=False), + sa.Column("step", sa.Integer(), nullable=False), + sa.Column("task", sa.Integer(), nullable=False), + sa.Column("cpu_frequency", sa.Float(), nullable=False), + sa.Column("cpu_time", sa.Float(), nullable=False), + sa.Column("cpu_utilization", sa.Float(), nullable=False), + sa.Column("gpu_memory", sa.Integer(), nullable=False), + sa.Column("gpu_utilization", sa.Float(), nullable=False), + sa.Column("page_faults", sa.Integer(), nullable=False), + sa.Column("memory_rss", sa.Integer(), nullable=False), + sa.Column("memory_virtual", sa.Integer(), nullable=False), + sa.Column("disk_read", sa.Integer(), nullable=False), + sa.Column("disk_write", sa.Integer(), nullable=False), + sa.PrimaryKeyConstraint("time", "job_submission_id", "node_host", "step", "task"), + sa.ForeignKeyConstraint(["job_submission_id"], ["job_submissions.id"], ondelete="CASCADE"), + ) + op.create_index( + "idx_job_submission_metrics_time", + "job_submission_metrics", + ["time"], + unique=False, + ) + op.create_index( + "idx_node_host_step_task", + "job_submission_metrics", + ["node_host", "step", "task"], + unique=False, + ) + op.create_index( + op.f("ix_job_submission_metrics_node_host"), + "job_submission_metrics", + ["node_host"], + unique=False, + ) + op.create_index( + op.f("ix_job_submission_metrics_step"), + "job_submission_metrics", + ["step"], + unique=False, + ) + op.create_index( + op.f("ix_job_submission_metrics_task"), + "job_submission_metrics", + ["task"], + unique=False, + ) + op.execute(sa.text("SELECT create_hypertable('job_submission_metrics', by_range('time'))")) + op.execute( + sa.text( + create_materialized_view_template.format( + view_name="metrics_nodes_mv_10_seconds_by_node", + time_bucket="10 seconds", + ) + ) + ) + op.execute( + sa.text( + add_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_10_seconds_by_node", + start_offset="20 seconds", + end_offset="0 seconds", + schedule_interval="10 seconds", + initial_start=datetime.now(timezone.utc).isoformat(), + ) + ) + ) + op.execute( + sa.text( + create_materialized_view_template.format( + view_name="metrics_nodes_mv_10_seconds_all_nodes", + time_bucket="10 seconds", + ) + ) + ) + op.execute( + sa.text( + add_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_10_seconds_all_nodes", + start_offset="20 seconds", + end_offset="0 seconds", + schedule_interval="10 seconds", + initial_start=datetime.now(timezone.utc).isoformat(), + ) + ) + ) + op.execute( + sa.text( + create_materialized_view_template.format( + view_name="metrics_nodes_mv_1_minute_by_node", + time_bucket="1 minute", + ) + ) + ) + op.execute( + sa.text( + add_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_minute_by_node", + start_offset="2 minutes", + end_offset="0 seconds", + schedule_interval="1 minute", + initial_start=datetime.now(timezone.utc).isoformat(), + ) + ) + ) + op.execute( + sa.text( + create_materialized_view_template.format( + view_name="metrics_nodes_mv_1_minute_all_nodes", + time_bucket="1 minute", + ) + ) + ) + op.execute( + sa.text( + add_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_minute_all_nodes", + start_offset="2 minutes", + end_offset="0 seconds", + schedule_interval="1 minute", + initial_start=datetime.now(timezone.utc).isoformat(), + ) + ) + ) + op.execute( + sa.text( + create_materialized_view_template.format( + view_name="metrics_nodes_mv_10_minutes_by_node", + time_bucket="10 minutes", + ) + ) + ) + op.execute( + sa.text( + add_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_10_minutes_by_node", + start_offset="20 minutes", + end_offset="0 seconds", + schedule_interval="10 minutes", + initial_start=datetime.now(timezone.utc).isoformat(), + ) + ) + ) + op.execute( + sa.text( + create_materialized_view_template.format( + view_name="metrics_nodes_mv_10_minutes_all_nodes", + time_bucket="10 minutes", + ) + ) + ) + op.execute( + sa.text( + add_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_10_minutes_all_nodes", + start_offset="20 minutes", + end_offset="0 seconds", + schedule_interval="10 minutes", + initial_start=datetime.now(timezone.utc).isoformat(), + ) + ) + ) + op.execute( + sa.text( + create_materialized_view_template.format( + view_name="metrics_nodes_mv_1_hour_by_node", + time_bucket="1 hour", + ) + ) + ) + op.execute( + sa.text( + add_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_hour_by_node", + start_offset="2 hours", + end_offset="0 seconds", + schedule_interval="1 hour", + initial_start=datetime.now(timezone.utc).isoformat(), + ) + ) + ) + op.execute( + sa.text( + create_materialized_view_template.format( + view_name="metrics_nodes_mv_1_hour_all_nodes", + time_bucket="1 hour", + ) + ) + ) + op.execute( + sa.text( + add_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_hour_all_nodes", + start_offset="2 hours", + end_offset="0 seconds", + schedule_interval="1 hour", + initial_start=datetime.now(timezone.utc).isoformat(), + ) + ) + ) + op.execute( + sa.text( + create_materialized_view_template.format( + view_name="metrics_nodes_mv_1_week_by_node", + time_bucket="1 week", + ) + ) + ) + op.execute( + sa.text( + add_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_week_by_node", + start_offset="2 weeks", + end_offset="0 seconds", + schedule_interval="1 week", + initial_start=datetime.now(timezone.utc).isoformat(), + ) + ) + ) + op.execute( + sa.text( + create_materialized_view_template.format( + view_name="metrics_nodes_mv_1_week_all_nodes", + time_bucket="1 week", + ) + ) + ) + op.execute( + sa.text( + add_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_week_all_nodes", + start_offset="2 weeks", + end_offset="0 seconds", + schedule_interval="1 week", + initial_start=datetime.now(timezone.utc).isoformat(), + ) + ) + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f("ix_job_submission_metrics_task"), table_name="job_submission_metrics") + op.drop_index(op.f("ix_job_submission_metrics_step"), table_name="job_submission_metrics") + op.drop_index(op.f("ix_job_submission_metrics_node_host"), table_name="job_submission_metrics") + op.drop_index("idx_node_host_step_task", table_name="job_submission_metrics") + op.drop_index("idx_job_submission_metrics_time", table_name="job_submission_metrics") + op.execute( + sa.text( + remove_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_week_by_node" + ) + ) + ) + op.execute( + sa.text( + remove_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_hour_by_node" + ) + ) + ) + op.execute( + sa.text( + remove_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_10_minutes_by_node" + ) + ) + ) + op.execute( + sa.text( + remove_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_minute_by_node" + ) + ) + ) + op.execute( + sa.text( + remove_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_10_seconds_by_node" + ) + ) + ) + op.execute( + sa.text( + remove_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_week_all_nodes" + ) + ) + ) + op.execute( + sa.text( + remove_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_hour_all_nodes" + ) + ) + ) + op.execute( + sa.text( + remove_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_10_minutes_all_nodes" + ) + ) + ) + op.execute( + sa.text( + remove_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_1_minute_all_nodes" + ) + ) + ) + op.execute( + sa.text( + remove_continuous_aggregate_policy_template.format( + mat_view_name="metrics_nodes_mv_10_seconds_all_nodes" + ) + ) + ) + op.execute(sa.text(drop_materialized_view_template.format(view_name="metrics_nodes_mv_1_week_by_node"))) + op.execute(sa.text(drop_materialized_view_template.format(view_name="metrics_nodes_mv_1_hour_by_node"))) + op.execute( + sa.text(drop_materialized_view_template.format(view_name="metrics_nodes_mv_10_minutes_by_node")) + ) + op.execute(sa.text(drop_materialized_view_template.format(view_name="metrics_nodes_mv_1_minute_by_node"))) + op.execute( + sa.text(drop_materialized_view_template.format(view_name="metrics_nodes_mv_10_seconds_by_node")) + ) + op.execute(sa.text(drop_materialized_view_template.format(view_name="metrics_nodes_mv_1_week_all_nodes"))) + op.execute(sa.text(drop_materialized_view_template.format(view_name="metrics_nodes_mv_1_hour_all_nodes"))) + op.execute( + sa.text(drop_materialized_view_template.format(view_name="metrics_nodes_mv_10_minutes_all_nodes")) + ) + op.execute( + sa.text(drop_materialized_view_template.format(view_name="metrics_nodes_mv_1_minute_all_nodes")) + ) + op.execute( + sa.text(drop_materialized_view_template.format(view_name="metrics_nodes_mv_10_seconds_all_nodes")) + ) + op.drop_table("job_submission_metrics") + # ### end Alembic commands ### diff --git a/jobbergate-api/dev_tools/__init__.py b/jobbergate-api/dev_tools/__init__.py index 2485d11eb..f3a79ac85 100644 --- a/jobbergate-api/dev_tools/__init__.py +++ b/jobbergate-api/dev_tools/__init__.py @@ -4,11 +4,12 @@ import typer -from dev_tools import db, dev_server, show_env +from dev_tools import db, dev_server, show_env, metrics app = typer.Typer() app.command(name="dev-server")(dev_server.dev_server) app.command(name="show-env")(show_env.show_env) +app.command(name="generate-metrics")(metrics.generate_metrics) app.add_typer(db.app, name="db") diff --git a/jobbergate-api/dev_tools/metrics.py b/jobbergate-api/dev_tools/metrics.py new file mode 100644 index 000000000..cce40f4bf --- /dev/null +++ b/jobbergate-api/dev_tools/metrics.py @@ -0,0 +1,128 @@ +""" +Provide command for generating dummy job metrics for testing purposes. +""" + +import random +from collections.abc import Iterator +from datetime import datetime +from typing import cast, Generator, get_args, Literal, TypedDict + +import typer +import msgpack + +app = typer.Typer() + + +INFLUXDB_MEASUREMENT = Literal[ + "CPUFrequency", + "CPUTime", + "CPUUtilization", + "GPUMemMB", + "GPUUtilization", + "Pages", + "RSS", + "ReadMB", + "VMSize", + "WriteMB", +] + +INFLUXDB_MEASUREMENT_TYPES = { + "CPUFrequency": int, + "CPUTime": float, + "CPUUtilization": float, + "GPUMemMB": int, + "GPUUtilization": float, + "Pages": int, + "RSS": int, + "ReadMB": int, + "VMSize": int, + "WriteMB": int, +} + + +class InfluxDBMeasure(TypedDict): + """ + Map each entry in the generator returned by InfluxDBClient(...).query(...).get_points(). + """ + + time: int + host: str + job: str + step: str + task: str + value: float + measurement: INFLUXDB_MEASUREMENT + + +def _generate_influxdb_data( + num_points_per_measurement: int, num_hosts: int, num_jobs: int, num_steps: int, num_tasks: int +) -> Generator[InfluxDBMeasure, None, None]: + current_time = int(datetime.now().timestamp()) + + for _ in range(num_points_per_measurement): + for host in range(1, num_hosts + 1): + for job in range(1, num_jobs + 1): + for step in range(1, num_steps + 1): + for task in range(1, num_tasks + 1): + for measurement in get_args(INFLUXDB_MEASUREMENT): + yield { + "time": current_time, + "host": f"host_{host}", + "job": str(job), + "step": str(step), + "task": str(task), + "value": INFLUXDB_MEASUREMENT_TYPES[measurement](random.random() * 100), + "measurement": measurement, + } + current_time += 10 + + +def _aggregate_influxdb_data( + data_points: Iterator[InfluxDBMeasure], +) -> list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]]: + measurement_names = get_args(INFLUXDB_MEASUREMENT) + default_measurements: dict[str, float] = {measurement: 0.0 for measurement in measurement_names} + + aggregated_data: dict[tuple[int, str, str, str], dict[str, float]] = {} + + for measure in data_points: + key = (measure["time"], measure["host"], measure["step"], measure["task"]) + + # aggregate measurements lazily to avoid creating a new dict for each point + if key not in aggregated_data: + aggregated_data[key] = default_measurements.copy() + aggregated_data[key][measure["measurement"]] = measure["value"] + + return cast( + list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + [ + ( + time, + host, + step, + task, + *(aggregated_data[(time, host, step, task)][measurement] for measurement in measurement_names), + ) + for (time, host, step, task) in aggregated_data + ], + ) + + +@app.command() +def generate_metrics( + num_points_per_measurement: int = typer.Option(5, help="Number of data points per measurement."), + num_hosts: int = typer.Option(5, help="Number of hosts to generate metrics for."), + num_jobs: int = typer.Option(5, help="Number of jobs to generate metrics for."), + num_steps: int = typer.Option(5, help="Number of steps to generate metrics for."), + num_tasks: int = typer.Option(5, help="Number of tasks to generate metrics for."), + path: str = typer.Option("dummy_metrics.msgpack", help="Path to write the generated metrics."), +): + """ + Generate dummy job metrics for a given job submission. + """ + data_points = _generate_influxdb_data(num_points_per_measurement, num_hosts, num_jobs, num_steps, num_tasks) + aggregated_data = _aggregate_influxdb_data(data_points) + binary_data: bytes = msgpack.packb(aggregated_data) + + with open(path, "wb") as f: + f.write(binary_data) diff --git a/jobbergate-api/jobbergate_api/apps/job_submissions/constants.py b/jobbergate-api/jobbergate_api/apps/job_submissions/constants.py index e1ede9d8b..016cb644e 100644 --- a/jobbergate-api/jobbergate_api/apps/job_submissions/constants.py +++ b/jobbergate-api/jobbergate_api/apps/job_submissions/constants.py @@ -2,6 +2,7 @@ Describe constants for the job_submissions module. """ +import enum from dataclasses import dataclass from auto_name_enum import AutoNameEnum, auto @@ -209,3 +210,46 @@ class SlurmJobStateDetails: is_abort_status=True, ), } + + +class JobSubmissionMetricSampleRate(enum.IntEnum): + """ + Defines the set of possible sample rates for job submission metrics. + + All values are in seconds. + """ + + ten_seconds = 10 + one_minute = 60 + ten_minutes = 600 + one_hour = 3600 + one_week = 604800 + + +class JobSubmissionMetricAggregateNames(AutoNameEnum): + """ + An enumeration representing different time intervals for aggregating job submission metrics. + + Attributes: + metrics_nodes_mv_1_week_by_node: Aggregation of metrics by node over a 1-week period. + metrics_nodes_mv_1_hour_by_node: Aggregation of metrics by node over a 1-hour period. + metrics_nodes_mv_10_minutes_by_node: Aggregation of metrics by node over a 10-minute period. + metrics_nodes_mv_1_minute_by_node: Aggregation of metrics by node over a 1-minute period. + metrics_nodes_mv_10_seconds_by_node: Aggregation of metrics by node over a 10-second period. + metrics_nodes_mv_1_week_all_nodes: Aggregation of metrics for all nodes over a 1-week period. + metrics_nodes_mv_1_hour_all_nodes: Aggregation of metrics for all nodes over a 1-hour period. + metrics_nodes_mv_10_minutes_all_nodes: Aggregation of metrics for all nodes over a 10-minute period. + metrics_nodes_mv_1_minute_all_nodes: Aggregation of metrics for all nodes over a 1-minute period. + metrics_nodes_mv_10_seconds_all_nodes: Aggregation of metrics for all nodes over a 10-second period. + """ + + metrics_nodes_mv_1_week_by_node = auto() + metrics_nodes_mv_1_hour_by_node = auto() + metrics_nodes_mv_10_minutes_by_node = auto() + metrics_nodes_mv_1_minute_by_node = auto() + metrics_nodes_mv_10_seconds_by_node = auto() + metrics_nodes_mv_1_week_all_nodes = auto() + metrics_nodes_mv_1_hour_all_nodes = auto() + metrics_nodes_mv_10_minutes_all_nodes = auto() + metrics_nodes_mv_1_minute_all_nodes = auto() + metrics_nodes_mv_10_seconds_all_nodes = auto() diff --git a/jobbergate-api/jobbergate_api/apps/job_submissions/helpers.py b/jobbergate-api/jobbergate_api/apps/job_submissions/helpers.py new file mode 100644 index 000000000..24fc5fd73 --- /dev/null +++ b/jobbergate-api/jobbergate_api/apps/job_submissions/helpers.py @@ -0,0 +1,124 @@ +"""Core helper functions for job submissions.""" + +from collections.abc import Iterable +from math import ceil +from textwrap import dedent +from typing import Any, assert_never, Type + +from loguru import logger + +from jobbergate_api.apps.job_submissions.constants import ( + JobSubmissionMetricSampleRate, + JobSubmissionMetricAggregateNames, +) + + +def validate_job_metric_upload_input( + data: Any, expected_types: tuple[Type[Any], ...] +) -> Iterable[tuple[Any]]: + """Validate if the input data of job metric upload is correct once decoded. + + It will brute force apply the expected types to the data and raise an error in case it fails. + + Args: + data (Iterable[list[Any] | tuple[Any]]): The decoded data, which should be a list of lists or tuples, + where each inner list or tuple contains the data for a single job metric upload. + expected_types (tuple[Type[Any], ...]): A tuple of types that each element in the inner lists or tuples + should match. + + Returns: + Iterable[list[Any] | tuple[Any]]: The validated data. + """ + + def _force_cast(object: Any, expected_type: Type[Any]) -> Any: + return expected_type(object) + + if not isinstance(data, list): + raise ValueError("Decoded data must be a list.") + if not all(isinstance(x, (list, tuple)) for x in data): + raise ValueError("All elements of the inner data must be a Sequence") + if not all(len(x) == len(expected_types) for x in data): + raise ValueError("Every iterable in `data` must match the length of `expected_types`.") + # postgres limits the number of query params to 2**15 - 1 + # https://www.postgresql.org/docs/17/protocol-message-formats.html + if len(expected_types) * len(data) > 2**15 - 1: + raise ValueError( + "The maximum number of elements has been exceeded. " "Maximum is {}, received {}".format( + ceil((2**15 - 1) / len(expected_types)), len(data) + ) + ) + for idx, aggregated_data in enumerate(data): + # for each element in the inner list or tuple, force apply the expected type + try: + data[idx] = tuple( + _force_cast(data, expected_types[idx]) for idx, data in enumerate(aggregated_data) + ) + except Exception as e: + logger.error(f"Failed to cast data to expected types: {e}") + raise ValueError("Failed to cast data to expected types.") + return data + + +def build_job_metric_aggregation_query(node: str | None, sample_rate: JobSubmissionMetricSampleRate) -> str: + """ + Build a SQL query string to aggregate job metrics based on the provided node and sample rate. + + Args: + node (str | None): The node host identifier. If None, the query will aggregate metrics for all nodes. + sample_rate (JobSubmissionMetricSampleRate): The sample rate for the metrics aggregation. Determines the view name to use. + + Returns: + str: The SQL query string for aggregating job metrics. + """ + if node is not None: + where_statement = "WHERE job_submission_id = :job_submission_id AND node_host = :node_host" + match sample_rate: + case JobSubmissionMetricSampleRate.ten_seconds: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_10_seconds_by_node + case JobSubmissionMetricSampleRate.one_minute: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_minute_by_node + case JobSubmissionMetricSampleRate.ten_minutes: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_10_minutes_by_node + case JobSubmissionMetricSampleRate.one_hour: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_hour_by_node + case JobSubmissionMetricSampleRate.one_week: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_week_by_node + case _ as unreachable: + assert_never(unreachable) + else: + where_statement = "WHERE job_submission_id = :job_submission_id" + match sample_rate: + case JobSubmissionMetricSampleRate.ten_seconds: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_10_seconds_all_nodes + case JobSubmissionMetricSampleRate.one_minute: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_minute_all_nodes + case JobSubmissionMetricSampleRate.ten_minutes: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_10_minutes_all_nodes + case JobSubmissionMetricSampleRate.one_hour: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_hour_all_nodes + case JobSubmissionMetricSampleRate.one_week: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_week_all_nodes + case _ as unreachable: + assert_never(unreachable) + + return dedent( + f""" + SELECT bucket, + node_host, + cpu_frequency, + cpu_time, + cpu_utilization, + gpu_memory, + gpu_utilization, + page_faults, + memory_rss, + memory_virtual, + disk_read, + disk_write + FROM {view_name} + {where_statement} + AND bucket >= :start_time + AND bucket <= :end_time + ORDER BY bucket + """ + ) diff --git a/jobbergate-api/jobbergate_api/apps/job_submissions/models.py b/jobbergate-api/jobbergate_api/apps/job_submissions/models.py index 209ebe4ac..16bdaa6d1 100644 --- a/jobbergate-api/jobbergate_api/apps/job_submissions/models.py +++ b/jobbergate-api/jobbergate_api/apps/job_submissions/models.py @@ -3,14 +3,16 @@ """ from __future__ import annotations +from datetime import datetime, timezone -from sqlalchemy import ARRAY, Enum, ForeignKey, Integer, String +from sqlalchemy import ARRAY, Dialect, Enum, ForeignKey, Integer, String, Float, Index, PrimaryKeyConstraint from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload from sqlalchemy.sql.expression import Select +from sqlalchemy.types import DateTime, TypeDecorator from jobbergate_api.apps.job_scripts.models import JobScript as JobScriptModel from jobbergate_api.apps.job_submissions.constants import JobSubmissionStatus, SlurmJobState -from jobbergate_api.apps.models import Base, CrudMixin +from jobbergate_api.apps.models import Base, CrudMixin, CommonMixin from jobbergate_api.safe_types import JobScript @@ -64,6 +66,11 @@ class JobSubmission(CrudMixin, Base): back_populates="submissions", lazy="raise", ) + metrics: Mapped[list["JobSubmissionMetric"]] = relationship( + "JobSubmissionMetric", + back_populates="job_submission", + lazy="raise", + ) @classmethod def searchable_fields(cls): @@ -98,3 +105,87 @@ def include_parent(cls, query: Select) -> Select: Include custom options on a query to eager load parent data. """ return query.options(selectinload(cls.job_script)) + + @classmethod + def include_metrics(cls, query: Select) -> Select: + """ + Include custom options on a query to eager load metrics. + """ + return query.options(selectinload(cls.metrics)) + + +class TimestampInt(TypeDecorator): + impl = DateTime(timezone=True) + + def process_bind_param(self, value: int | None, dialect: Dialect) -> datetime | None: + if value is not None: + return datetime.fromtimestamp(value, tz=timezone.utc) + return value + + def process_result_value(self, value: datetime | None, dialect: Dialect) -> int | None: + if value is not None: + return int(value.timestamp()) + return value + + +class JobSubmissionMetric(CommonMixin, Base): + """ + Job submission metric table definition. + + Attributes: + time: The time the metric was recorded. + job_submission_id: The id of the job submission this metric is for. + slurm_job_id: The id of the job in the slurm queue. + node_host: The node on which the metric was recorded. + step: The step for which the metric was recorded. + task: The task for which the metric was recorded. + cpu_frequency: The CPU frequency at the time. + cpu_time: The CPU time (system + user) consumed at the time. + cpu_utilization: The CPU utilization (% of available) consumed at the time. + gpu_memory: The GPU memory consumed at the time (in MB). + gpu_utilization: The GPU utilizaiton (% of availavble) consumed at the time. + page_faults: The number of page faults at the time. + memory_rss: The resident set size of memory consumed at the time (in MB). + memory_virtual: The virtual memory allocated at the time (in MB). + disc_read: The amount of data read from disk at the time (in MB). + disk_write: The amount of data written to disk at the time (in MB). + """ + + time: Mapped[int] = mapped_column(TimestampInt, nullable=False, index=True) + job_submission_id: Mapped[int] = mapped_column( + Integer, + ForeignKey("job_submissions.id", ondelete="CASCADE"), + nullable=False, + ) + slurm_job_id: Mapped[int] = mapped_column(Integer, nullable=False) + node_host: Mapped[str] = mapped_column(String, nullable=False, index=True) + step: Mapped[int] = mapped_column(Integer, nullable=False, index=True) + task: Mapped[int] = mapped_column(Integer, nullable=False, index=True) + cpu_frequency: Mapped[float] = mapped_column(Float, nullable=False) + cpu_time: Mapped[float] = mapped_column(Float, nullable=False) + cpu_utilization: Mapped[float] = mapped_column(Float, nullable=False) + gpu_memory: Mapped[int] = mapped_column(Integer, nullable=False) + gpu_utilization: Mapped[float] = mapped_column(Float, nullable=False) + page_faults: Mapped[int] = mapped_column(Integer, nullable=False) + memory_rss: Mapped[int] = mapped_column(Integer, nullable=False) + memory_virtual: Mapped[int] = mapped_column(Integer, nullable=False) + disk_read: Mapped[int] = mapped_column(Integer, nullable=False) + disk_write: Mapped[int] = mapped_column(Integer, nullable=False) + + __table_args__ = ( + PrimaryKeyConstraint("time", "job_submission_id", "node_host", "step", "task"), + Index("idx_node_host_step_task", "node_host", "step", "task"), + ) + + job_submission: Mapped[JobSubmission] = relationship( + "JobSubmission", + back_populates="metrics", + lazy="raise", + ) + + @classmethod + def include_parent(cls, query: Select) -> Select: + """ + Include custom options on a query to eager load parent data. + """ + return query.options(selectinload(cls.job_submission)) diff --git a/jobbergate-api/jobbergate_api/apps/job_submissions/routers.py b/jobbergate-api/jobbergate_api/apps/job_submissions/routers.py index f52b7afac..ca22952d7 100644 --- a/jobbergate-api/jobbergate_api/apps/job_submissions/routers.py +++ b/jobbergate-api/jobbergate_api/apps/job_submissions/routers.py @@ -2,17 +2,27 @@ Router for the JobSubmission resource. """ +from datetime import datetime, timedelta, timezone from typing import Any -from fastapi import APIRouter, Depends, HTTPException, Path, Query +from fastapi import APIRouter, Depends, HTTPException, Path, Query, Body from fastapi import Response as FastAPIResponse from fastapi import status from fastapi_pagination import Page from loguru import logger +from jobbergate_api.apps.job_submissions.models import JobSubmissionMetric +from sqlalchemy import select, insert, text as sa_text +from sqlalchemy.sql.functions import max +import msgpack +from sqlalchemy.exc import IntegrityError from jobbergate_api.apps.constants import FileType from jobbergate_api.apps.dependencies import SecureService, secure_services -from jobbergate_api.apps.job_submissions.constants import JobSubmissionStatus, slurm_job_state_details +from jobbergate_api.apps.job_submissions.constants import ( + JobSubmissionStatus, + slurm_job_state_details, + JobSubmissionMetricSampleRate, +) from jobbergate_api.apps.job_submissions.schemas import ( ActiveJobSubmission, JobSubmissionAgentRejectedRequest, @@ -23,6 +33,13 @@ JobSubmissionListView, JobSubmissionUpdateRequest, PendingJobSubmission, + JobSubmissionAgentMetricsRequest, + JobSubmissionAgentMaxTimes, + JobSubmissionMetricSchema, +) +from jobbergate_api.apps.job_submissions.helpers import ( + validate_job_metric_upload_input, + build_job_metric_aggregation_query, ) from jobbergate_api.apps.permissions import Permissions, can_bypass_ownership_check from jobbergate_api.apps.schemas import ListParams @@ -442,3 +459,171 @@ async def job_submissions_agent_active( client_id=client_id, ) return pages + + +@router.get( + "/agent/metrics/{job_submission_id}", + description="Endpoint to get metrics for a job submission", + response_model=JobSubmissionAgentMetricsRequest, + tags=["Agent", "Metrics"], +) +async def job_submissions_agent_metrics( + job_submission_id: int, + secure_services: SecureService = Depends( + secure_services( + Permissions.ADMIN, Permissions.JOB_SUBMISSIONS_READ, commit=False, ensure_client_id=True + ) + ), +): + """Get the max times for the tuple (node_host, step, task) of a job submission.""" + logger.debug(f"Agent is requesting metrics for job submission {job_submission_id}") + + query = ( + select( + max(JobSubmissionMetric.time).label("max_time"), + JobSubmissionMetric.node_host, + JobSubmissionMetric.step, + JobSubmissionMetric.task, + ) + .where(JobSubmissionMetric.job_submission_id == job_submission_id) + .group_by( + JobSubmissionMetric.node_host, + JobSubmissionMetric.step, + JobSubmissionMetric.task, + ) + ) + + result = await secure_services.session.execute(query) + + return JobSubmissionAgentMetricsRequest( + job_submission_id=job_submission_id, + max_times=[JobSubmissionAgentMaxTimes.model_validate(row) for row in result.all()], + ) + + +@router.put( + "/agent/metrics/{job_submission_id}", + description="Endpoint to upload metrics for a job submission", + responses={ + status.HTTP_204_NO_CONTENT: {"description": "Metrics uploaded successfully"}, + status.HTTP_400_BAD_REQUEST: {"description": "Either invalid metrics data or duplicate metrics data"}, + }, + tags=["Agent", "Metrics"], +) +async def job_submissions_agent_metrics_upload( + job_submission_id: int, + body: bytes = Body(..., description="The binary data to upload"), + secure_services: SecureService = Depends( + secure_services( + Permissions.ADMIN, Permissions.JOB_SUBMISSIONS_UPDATE, ensure_client_id=True, commit=True + ) + ), +): + """Upload metrics for a job submission.""" + logger.debug(f"Agent is uploading metrics for job submission {job_submission_id}") + + logger.debug(f"Getting slurm_job_id of job submission {job_submission_id}") + job_submission = await secure_services.crud.job_submission.get( + job_submission_id, ensure_attributes={"client_id": secure_services.identity_payload.client_id} + ) + slurm_job_id = job_submission.slurm_job_id + logger.debug(f"Got slurm_job_id {slurm_job_id}") + + logger.debug("Decoding binary data") + data = msgpack.unpackb(body) + + logger.debug("Asserting the decoded binary data structure") + try: + data = validate_job_metric_upload_input( + data, (int, str, int, int, float, float, float, int, float, int, int, int, int, int) + ) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e), + ) + else: + logger.debug("Decoded data is valid") + + logger.debug("Inserting metrics into the database") + query = insert(JobSubmissionMetric).values( + [ + { + "time": data_point[0], + "node_host": data_point[1], + "step": data_point[2], + "task": data_point[3], + "cpu_frequency": data_point[4], + "cpu_time": data_point[5], + "cpu_utilization": data_point[6], + "gpu_memory": data_point[7], + "gpu_utilization": data_point[8], + "page_faults": data_point[9], + "memory_rss": data_point[10], + "memory_virtual": data_point[11], + "disk_read": data_point[12], + "disk_write": data_point[13], + "job_submission_id": job_submission_id, + "slurm_job_id": slurm_job_id, + } + for data_point in data + ] + ) + try: + await secure_services.session.execute(query) + except IntegrityError as e: + logger.error(f"Failed to insert metrics: {e.args}") + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Failed to insert metrics", + ) + + return FastAPIResponse(status_code=status.HTTP_204_NO_CONTENT) + + +@router.get( + "/{job_submission_id}/metrics", + description="Endpoint to get metrics for a job submission", + response_model=list[JobSubmissionMetricSchema], + tags=["Metrics"], +) +async def job_submissions_metrics( + job_submission_id: int, + node: str | None = Query( + None, description="Filter by node_host. If omitted, metrics will be gathered over all nodes." + ), + start_time: datetime = Query( + datetime.now(tz=timezone.utc) - timedelta(hours=1), + description="Start time for the metrics query. Defaults to one hour ago.", + ), + sample_rate: JobSubmissionMetricSampleRate = Query( + JobSubmissionMetricSampleRate.ten_minutes, description="Sample rate in seconds for the metrics query." + ), + end_time: datetime | None = Query( + None, + description="End time for the metrics query. If omitted, assume the window to be up to the present.", + ), + secure_services: SecureService = Depends( + secure_services(Permissions.ADMIN, Permissions.JOB_SUBMISSIONS_READ, commit=False) + ), +): + """Get the metrics for a job submission.""" + logger.debug(f"Getting metrics for job submission {job_submission_id}") + if end_time is not None and end_time < start_time: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="End time must be greater than the start time.", + ) + end_time = end_time or datetime.now(tz=timezone.utc) + + query = build_job_metric_aggregation_query(node, sample_rate) + query_params = { + "job_submission_id": job_submission_id, + "start_time": start_time, + "end_time": end_time, + } + if node is not None: + query_params["node_host"] = node + + result = await secure_services.session.execute(sa_text(query), query_params) + return [JobSubmissionMetricSchema.from_iterable(row, skip_optional=True) for row in result.fetchall()] diff --git a/jobbergate-api/jobbergate_api/apps/job_submissions/schemas.py b/jobbergate-api/jobbergate_api/apps/job_submissions/schemas.py index 06cc074a8..a3d5da9fd 100644 --- a/jobbergate-api/jobbergate_api/apps/job_submissions/schemas.py +++ b/jobbergate-api/jobbergate_api/apps/job_submissions/schemas.py @@ -2,7 +2,9 @@ JobSubmission resource schema. """ -from typing import Optional +from typing import Optional, Self +from datetime import datetime +from collections.abc import Iterable from pydantic import BaseModel, ConfigDict, Field, NonNegativeInt, field_validator @@ -248,3 +250,69 @@ class JobSubmissionAgentUpdateRequest(BaseModel): slurm_job_state_reason: Optional[str] = None model_config = ConfigDict(json_schema_extra=job_submission_meta_mapper) + + +class JobSubmissionAgentMaxTimes(BaseModel): + """Model for the max_times field of the JobSubmissionMetricsMaxResponse.""" + + max_time: int + node_host: str + step: int + task: int + + model_config = ConfigDict(from_attributes=True, extra="ignore") + + +class JobSubmissionAgentMetricsRequest(BaseModel): + """Request model for updating JobSubmission instances.""" + + job_submission_id: int + max_times: list[JobSubmissionAgentMaxTimes] + + +class JobSubmissionMetricSchema(BaseModel): + """Model for the JobSubmissionMetric resource. + + Both `step` and `task` are optional fields, as they are not relevant when the metrics are + queried over all the nodes. As well as, all measurements are both `int` and `float` due to + the aggregation done by the time series database over time. For better understanding on the + math behind the aggregation, please refer to the alembic revision ``99c3877d0f10``. + """ + + time: int | datetime + node_host: str + step: Optional[int] = None + task: Optional[int] = None + cpu_frequency: int | float + cpu_time: float + cpu_utilization: float + gpu_memory: int | float + gpu_utilization: float + page_faults: int | float + memory_rss: int | float + memory_virtual: int | float + disk_read: int | float + disk_write: int | float + + model_config = ConfigDict(from_attributes=True, extra="ignore") + + @field_validator("time", mode="before") + @classmethod + def validate_time(cls, v: int | datetime) -> int: + """Ensure that the time is always an int.""" + if isinstance(v, datetime): + return int(v.timestamp()) + return v + + @classmethod + def from_iterable(cls, iterable: Iterable, skip_optional: bool = False) -> Self: + """Convert an iterable containing the fields of the model to an instance of the model.""" + if skip_optional: + fields = list(field_name for field_name, field in cls.model_fields.items() if field.is_required()) + else: + fields = list(cls.model_fields.keys()) + + if len(fields) != len(list(iterable)): + raise ValueError("The iterable must have the same length as the model fields.") + + return cls(**{field: value for field, value in zip(fields, iterable)}) diff --git a/jobbergate-api/poetry.lock b/jobbergate-api/poetry.lock index 806f8e6e4..9d606f619 100644 --- a/jobbergate-api/poetry.lock +++ b/jobbergate-api/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. [[package]] name = "aio-pika" @@ -1424,6 +1424,79 @@ files = [ {file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"}, ] +[[package]] +name = "msgpack" +version = "1.1.0" +description = "MessagePack serializer" +optional = false +python-versions = ">=3.8" +files = [ + {file = "msgpack-1.1.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:7ad442d527a7e358a469faf43fda45aaf4ac3249c8310a82f0ccff9164e5dccd"}, + {file = "msgpack-1.1.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:74bed8f63f8f14d75eec75cf3d04ad581da6b914001b474a5d3cd3372c8cc27d"}, + {file = "msgpack-1.1.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:914571a2a5b4e7606997e169f64ce53a8b1e06f2cf2c3a7273aa106236d43dd5"}, + {file = "msgpack-1.1.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c921af52214dcbb75e6bdf6a661b23c3e6417f00c603dd2070bccb5c3ef499f5"}, + {file = "msgpack-1.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d8ce0b22b890be5d252de90d0e0d119f363012027cf256185fc3d474c44b1b9e"}, + {file = "msgpack-1.1.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:73322a6cc57fcee3c0c57c4463d828e9428275fb85a27aa2aa1a92fdc42afd7b"}, + {file = "msgpack-1.1.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:e1f3c3d21f7cf67bcf2da8e494d30a75e4cf60041d98b3f79875afb5b96f3a3f"}, + {file = "msgpack-1.1.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:64fc9068d701233effd61b19efb1485587560b66fe57b3e50d29c5d78e7fef68"}, + {file = "msgpack-1.1.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:42f754515e0f683f9c79210a5d1cad631ec3d06cea5172214d2176a42e67e19b"}, + {file = "msgpack-1.1.0-cp310-cp310-win32.whl", hash = "sha256:3df7e6b05571b3814361e8464f9304c42d2196808e0119f55d0d3e62cd5ea044"}, + {file = "msgpack-1.1.0-cp310-cp310-win_amd64.whl", hash = "sha256:685ec345eefc757a7c8af44a3032734a739f8c45d1b0ac45efc5d8977aa4720f"}, + {file = "msgpack-1.1.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:3d364a55082fb2a7416f6c63ae383fbd903adb5a6cf78c5b96cc6316dc1cedc7"}, + {file = "msgpack-1.1.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:79ec007767b9b56860e0372085f8504db5d06bd6a327a335449508bbee9648fa"}, + {file = "msgpack-1.1.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:6ad622bf7756d5a497d5b6836e7fc3752e2dd6f4c648e24b1803f6048596f701"}, + {file = "msgpack-1.1.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8e59bca908d9ca0de3dc8684f21ebf9a690fe47b6be93236eb40b99af28b6ea6"}, + {file = "msgpack-1.1.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5e1da8f11a3dd397f0a32c76165cf0c4eb95b31013a94f6ecc0b280c05c91b59"}, + {file = "msgpack-1.1.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:452aff037287acb1d70a804ffd022b21fa2bb7c46bee884dbc864cc9024128a0"}, + {file = "msgpack-1.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:8da4bf6d54ceed70e8861f833f83ce0814a2b72102e890cbdfe4b34764cdd66e"}, + {file = "msgpack-1.1.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:41c991beebf175faf352fb940bf2af9ad1fb77fd25f38d9142053914947cdbf6"}, + {file = "msgpack-1.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:a52a1f3a5af7ba1c9ace055b659189f6c669cf3657095b50f9602af3a3ba0fe5"}, + {file = "msgpack-1.1.0-cp311-cp311-win32.whl", hash = "sha256:58638690ebd0a06427c5fe1a227bb6b8b9fdc2bd07701bec13c2335c82131a88"}, + {file = "msgpack-1.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd2906780f25c8ed5d7b323379f6138524ba793428db5d0e9d226d3fa6aa1788"}, + {file = "msgpack-1.1.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:d46cf9e3705ea9485687aa4001a76e44748b609d260af21c4ceea7f2212a501d"}, + {file = "msgpack-1.1.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:5dbad74103df937e1325cc4bfeaf57713be0b4f15e1c2da43ccdd836393e2ea2"}, + {file = "msgpack-1.1.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:58dfc47f8b102da61e8949708b3eafc3504509a5728f8b4ddef84bd9e16ad420"}, + {file = "msgpack-1.1.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4676e5be1b472909b2ee6356ff425ebedf5142427842aa06b4dfd5117d1ca8a2"}, + {file = "msgpack-1.1.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:17fb65dd0bec285907f68b15734a993ad3fc94332b5bb21b0435846228de1f39"}, + {file = "msgpack-1.1.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a51abd48c6d8ac89e0cfd4fe177c61481aca2d5e7ba42044fd218cfd8ea9899f"}, + {file = "msgpack-1.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2137773500afa5494a61b1208619e3871f75f27b03bcfca7b3a7023284140247"}, + {file = "msgpack-1.1.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:398b713459fea610861c8a7b62a6fec1882759f308ae0795b5413ff6a160cf3c"}, + {file = "msgpack-1.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:06f5fd2f6bb2a7914922d935d3b8bb4a7fff3a9a91cfce6d06c13bc42bec975b"}, + {file = "msgpack-1.1.0-cp312-cp312-win32.whl", hash = "sha256:ad33e8400e4ec17ba782f7b9cf868977d867ed784a1f5f2ab46e7ba53b6e1e1b"}, + {file = "msgpack-1.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:115a7af8ee9e8cddc10f87636767857e7e3717b7a2e97379dc2054712693e90f"}, + {file = "msgpack-1.1.0-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:071603e2f0771c45ad9bc65719291c568d4edf120b44eb36324dcb02a13bfddf"}, + {file = "msgpack-1.1.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:0f92a83b84e7c0749e3f12821949d79485971f087604178026085f60ce109330"}, + {file = "msgpack-1.1.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:4a1964df7b81285d00a84da4e70cb1383f2e665e0f1f2a7027e683956d04b734"}, + {file = "msgpack-1.1.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:59caf6a4ed0d164055ccff8fe31eddc0ebc07cf7326a2aaa0dbf7a4001cd823e"}, + {file = "msgpack-1.1.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0907e1a7119b337971a689153665764adc34e89175f9a34793307d9def08e6ca"}, + {file = "msgpack-1.1.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:65553c9b6da8166e819a6aa90ad15288599b340f91d18f60b2061f402b9a4915"}, + {file = "msgpack-1.1.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:7a946a8992941fea80ed4beae6bff74ffd7ee129a90b4dd5cf9c476a30e9708d"}, + {file = "msgpack-1.1.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:4b51405e36e075193bc051315dbf29168d6141ae2500ba8cd80a522964e31434"}, + {file = "msgpack-1.1.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:b4c01941fd2ff87c2a934ee6055bda4ed353a7846b8d4f341c428109e9fcde8c"}, + {file = "msgpack-1.1.0-cp313-cp313-win32.whl", hash = "sha256:7c9a35ce2c2573bada929e0b7b3576de647b0defbd25f5139dcdaba0ae35a4cc"}, + {file = "msgpack-1.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:bce7d9e614a04d0883af0b3d4d501171fbfca038f12c77fa838d9f198147a23f"}, + {file = "msgpack-1.1.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c40ffa9a15d74e05ba1fe2681ea33b9caffd886675412612d93ab17b58ea2fec"}, + {file = "msgpack-1.1.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1ba6136e650898082d9d5a5217d5906d1e138024f836ff48691784bbe1adf96"}, + {file = "msgpack-1.1.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e0856a2b7e8dcb874be44fea031d22e5b3a19121be92a1e098f46068a11b0870"}, + {file = "msgpack-1.1.0-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:471e27a5787a2e3f974ba023f9e265a8c7cfd373632247deb225617e3100a3c7"}, + {file = "msgpack-1.1.0-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:646afc8102935a388ffc3914b336d22d1c2d6209c773f3eb5dd4d6d3b6f8c1cb"}, + {file = "msgpack-1.1.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:13599f8829cfbe0158f6456374e9eea9f44eee08076291771d8ae93eda56607f"}, + {file = "msgpack-1.1.0-cp38-cp38-win32.whl", hash = "sha256:8a84efb768fb968381e525eeeb3d92857e4985aacc39f3c47ffd00eb4509315b"}, + {file = "msgpack-1.1.0-cp38-cp38-win_amd64.whl", hash = "sha256:879a7b7b0ad82481c52d3c7eb99bf6f0645dbdec5134a4bddbd16f3506947feb"}, + {file = "msgpack-1.1.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:53258eeb7a80fc46f62fd59c876957a2d0e15e6449a9e71842b6d24419d88ca1"}, + {file = "msgpack-1.1.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:7e7b853bbc44fb03fbdba34feb4bd414322180135e2cb5164f20ce1c9795ee48"}, + {file = "msgpack-1.1.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f3e9b4936df53b970513eac1758f3882c88658a220b58dcc1e39606dccaaf01c"}, + {file = "msgpack-1.1.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:46c34e99110762a76e3911fc923222472c9d681f1094096ac4102c18319e6468"}, + {file = "msgpack-1.1.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8a706d1e74dd3dea05cb54580d9bd8b2880e9264856ce5068027eed09680aa74"}, + {file = "msgpack-1.1.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:534480ee5690ab3cbed89d4c8971a5c631b69a8c0883ecfea96c19118510c846"}, + {file = "msgpack-1.1.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:8cf9e8c3a2153934a23ac160cc4cba0ec035f6867c8013cc6077a79823370346"}, + {file = "msgpack-1.1.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:3180065ec2abbe13a4ad37688b61b99d7f9e012a535b930e0e683ad6bc30155b"}, + {file = "msgpack-1.1.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:c5a91481a3cc573ac8c0d9aace09345d989dc4a0202b7fcb312c88c26d4e71a8"}, + {file = "msgpack-1.1.0-cp39-cp39-win32.whl", hash = "sha256:f80bc7d47f76089633763f952e67f8214cb7b3ee6bfa489b3cb6a84cfac114cd"}, + {file = "msgpack-1.1.0-cp39-cp39-win_amd64.whl", hash = "sha256:4d1b7ff2d6146e16e8bd665ac726a89c74163ef8cd39fa8c1087d4e52d3a2325"}, + {file = "msgpack-1.1.0.tar.gz", hash = "sha256:dd432ccc2c72b914e4cb77afce64aab761c1137cc698be3984eee260bcb2896e"}, +] + [[package]] name = "multidict" version = "6.0.5" @@ -2687,7 +2760,7 @@ files = [ ] [package.dependencies] -greenlet = {version = "!=0.4.17", markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\""} +greenlet = {version = "!=0.4.17", markers = "platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\""} mypy = {version = ">=0.910", optional = true, markers = "extra == \"mypy\""} typing-extensions = ">=4.6.0" @@ -4082,4 +4155,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "e960edf0714e9baa7339a16f82f7c3ff3b943270d15268ca4f3c70fd03cbcaee" +content-hash = "2abee4350a4f83a78cb890ed6ef7a15dbb649d6260d0c4f56cbb36e09e8af9da" diff --git a/jobbergate-api/pyproject.toml b/jobbergate-api/pyproject.toml index 4d927c58d..92ab084f0 100644 --- a/jobbergate-api/pyproject.toml +++ b/jobbergate-api/pyproject.toml @@ -48,6 +48,7 @@ yarl = "^1.7.2" auto-name-enum = "^2.0.0" aio-pika = "^8.3.0" pydantic-settings = "^2.2.1" +msgpack = "^1.1.0" [tool.poetry.group.dev.dependencies] asgi-lifespan = "^1.0.1" diff --git a/jobbergate-api/tests/apps/job_submissions/test_helpers.py b/jobbergate-api/tests/apps/job_submissions/test_helpers.py new file mode 100644 index 000000000..ef85d2d32 --- /dev/null +++ b/jobbergate-api/tests/apps/job_submissions/test_helpers.py @@ -0,0 +1,230 @@ +"""Core module for testing the helper functions of the job submissions app.""" + +from math import ceil +from textwrap import dedent + +import pytest + +from jobbergate_api.apps.job_submissions.constants import ( + JobSubmissionMetricAggregateNames, + JobSubmissionMetricSampleRate, +) +from jobbergate_api.apps.job_submissions.helpers import ( + build_job_metric_aggregation_query, + validate_job_metric_upload_input, +) + + +class TestValidateJobMetricUploadInput: + """ + Test suite for the `validate_job_metric_upload_input` function. + + This test suite contains various test cases to validate the behavior of the + `validate_job_metric_upload_input` function under different scenarios. The + function is expected to validate and process input data, ensuring that the + data conforms to the expected types and structure. + + Test Cases: + - `test_validate_job_metric_upload_input_valid_data`: Tests the function with valid data. + - `test_validate_job_metric_upload_input_invalid_data_type`: Tests that the function raises a ValueError for invalid data types. + - `test_validate_job_metric_upload_input_invalid_inner_type`: Tests that the function raises a ValueError for invalid inner types. + - `test_validate_job_metric_upload_input_invalid_length`: Tests that the function raises a ValueError when the length of any iterable in `data` does not match the length of `expected_types`. + - `test_validate_job_metric_upload_input_exceeds_max_elements`: Tests that the function raises a ValueError when the input data exceeds the maximum number of allowed elements. + - `test_validate_job_metric_upload_input_cast_failure`: Tests that the function raises a ValueError when data cannot be cast to the expected types. + - `test_validate_job_metric_upload_input_cast_success`: Tests that the function successfully casts data to the expected types. + """ + + def test_validate_job_metric_upload_input_valid_data(self): + """ + Test the validate_job_metric_upload_input function with valid data. + + This test checks if the function correctly validates and processes input data + when provided with valid data types. + + The input data consists of a list of lists, where each inner list contains + an integer, a string, and a float. The expected types for these values are + (int, str, float). + """ + data = [[1, "test", 3.5], [2, "example", 4.5]] + expected_types = (int, str, float) + result = validate_job_metric_upload_input(data, expected_types) + assert result == [(1, "test", 3.5), (2, "example", 4.5)] + + def test_validate_job_metric_upload_input_invalid_data_type(self): + """ + Test that `validate_job_metric_upload_input` raises a ValueError when provided with data of an invalid type. + + This test checks that the function correctly identifies and rejects data that is not of the expected type, i.e. list. + """ + data = "invalid data type" + expected_types = (int, str, float) + with pytest.raises(ValueError, match="Decoded data must be a list."): + validate_job_metric_upload_input(data, expected_types) + + def test_validate_job_metric_upload_input_invalid_inner_type(self): + """ + Test that `validate_job_metric_upload_input` raises a ValueError when the input data contains an invalid inner type. + + This test checks that the function correctly identifies and raises an error when the input data is not a list of lists or tuples. + + The input data contains a mix of valid and invalid types: + - A list with elements of types int, str, and float. + - A string which is an invalid inner type. + + The expected types for the elements are specified as a tuple: (int, str, float). + + The test expects a ValueError to be raised with the message "All elements of the inner data must be a Sequence". + """ + data = [[1, "test", 3.5], "invalid inner type"] + expected_types = (int, str, float) + with pytest.raises(ValueError, match="All elements of the inner data must be a Sequence"): + validate_job_metric_upload_input(data, expected_types) + + def test_validate_job_metric_upload_input_invalid_length(self): + """ + Test that `validate_job_metric_upload_input` raises a ValueError when the length of any + iterable in `data` does not match the length of `expected_types`. + + This test case provides a `data` list containing iterables of different lengths and an + `expected_types` tuple. It asserts that a ValueError is raised with the appropriate error message. + """ + data = [[1, "test"], [2, "example", 4.5]] + expected_types = (int, str, float) + with pytest.raises( + ValueError, match="Every iterable in `data` must match the length of `expected_types`." + ): + validate_job_metric_upload_input(data, expected_types) + + @pytest.mark.parametrize("num_of_elements", [3, 8, 83]) + def test_validate_job_metric_upload_input_exceeds_max_elements(self, num_of_elements: int): + """ + Test that the `validate_job_metric_upload_input` function raises a `ValueError` + when the input data exceeds the maximum number of allowed elements. + + The test creates a list of lists `data` where each sublist contains three identical + integers. The length of `data` is set to exceed the maximum allowed elements. + The `expected_types` tuple specifies the expected types for each element in the sublists. + + The test asserts that a `ValueError` is raised with the message "The maximum number of elements + has been exceeded." when `validate_job_metric_upload_input` is called with the `data` and + `expected_types` arguments. + + Postgres limits the number of query params to 2**15 - 1. Considering that each + element in the sublists will generate len(data) * len(expected_types) query params, the maximum + number of elements is calculated as ceil((2**15 - 1) / num_of_elements). + [Reference](https://www.postgresql.org/docs/17/protocol-message-formats.html) + """ + data = [[i] * num_of_elements for i in range(ceil((2**15 - 1) / num_of_elements) + 1)] + expected_types = [int] * num_of_elements + with pytest.raises( + ValueError, + match=( + "The maximum number of elements has been exceeded. " "Maximum is {}, received {}".format( + ceil((2**15 - 1) / num_of_elements), len(data) + ) + ), + ): + validate_job_metric_upload_input(data, expected_types) + + def test_validate_job_metric_upload_input_cast_failure(self): + """ + Test that `validate_job_metric_upload_input` raises a ValueError when data cannot be cast to the expected types. + + This test provides a list of data where one of the elements cannot be cast to the expected type (float). + It expects the function to raise a ValueError with a specific error message. + """ + data = [[1, "test", "not a float"]] + expected_types = (int, str, float) + with pytest.raises(ValueError, match="Failed to cast data to expected types."): + validate_job_metric_upload_input(data, expected_types) + + def test_validate_job_metric_upload_input_cast_success(self): + """ + Test that `validate_job_metric_upload_input` successfully casts data to the expected types. + + This test provides a list of data where one of the elements cannot be cast to the expected type (float). + It expects the function to raise a ValueError with a specific error message. + """ + data = [[1, "test", "3.5"]] + expected_types = (int, str, float) + result = validate_job_metric_upload_input(data, expected_types) + assert result == [(1, "test", 3.5)] + + +class TestBuildJobMetricAggregationQuery: + """ + Test suite for the `build_job_metric_aggregation_query` function. + + This test suite contains various test cases to validate the behavior of the + `build_job_metric_aggregation_query` function under different scenarios. The + function is expected to build a SQL query string based on the provided node and sample rate. + + Test Cases: + - `test_build_query_with_node`: Tests the function with a specific node. + - `test_build_query_without_node`: Tests the function without a specific node. + - `test_build_query_invalid_sample_rate`: Tests that the function raises an error for an invalid sample rate. + """ + + def test_build_query_with_node(self): + """ + Test the `build_job_metric_aggregation_query` function with a specific node. + + This test checks if the function correctly builds a SQL query string when provided with a node. + """ + node = "node1" + sample_rate = JobSubmissionMetricSampleRate.ten_seconds + expected_query = dedent( + f""" + SELECT bucket, + node_host, + cpu_frequency, + cpu_time, + cpu_utilization, + gpu_memory, + gpu_utilization, + page_faults, + memory_rss, + memory_virtual, + disk_read, + disk_write + FROM {JobSubmissionMetricAggregateNames.metrics_nodes_mv_10_seconds_by_node} + WHERE job_submission_id = :job_submission_id AND node_host = :node_host + AND bucket >= :start_time + AND bucket <= :end_time + ORDER BY bucket + """ + ) + result = build_job_metric_aggregation_query(node, sample_rate) + assert result == expected_query + + def test_build_query_without_node(self): + """ + Test the `build_job_metric_aggregation_query` function without a specific node. + + This test checks if the function correctly builds a SQL query string when no node is provided. + """ + node = None + sample_rate = JobSubmissionMetricSampleRate.one_minute + expected_query = dedent( + f""" + SELECT bucket, + node_host, + cpu_frequency, + cpu_time, + cpu_utilization, + gpu_memory, + gpu_utilization, + page_faults, + memory_rss, + memory_virtual, + disk_read, + disk_write + FROM {JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_minute_all_nodes} + WHERE job_submission_id = :job_submission_id + AND bucket >= :start_time + AND bucket <= :end_time + ORDER BY bucket + """ + ) + result = build_job_metric_aggregation_query(node, sample_rate) + assert result == expected_query diff --git a/jobbergate-api/tests/apps/job_submissions/test_routers.py b/jobbergate-api/tests/apps/job_submissions/test_routers.py index 284526496..8ebf4c5bd 100644 --- a/jobbergate-api/tests/apps/job_submissions/test_routers.py +++ b/jobbergate-api/tests/apps/job_submissions/test_routers.py @@ -2,20 +2,65 @@ Tests for the /job-submissions/ endpoint. """ +import itertools import json -from datetime import datetime +import random +import uuid +from datetime import datetime, timezone, timedelta +from textwrap import dedent +from unittest import mock import pytest +import msgpack from fastapi import status +from sqlalchemy import insert, select -from jobbergate_api.apps.job_submissions.constants import JobSubmissionStatus, SlurmJobState +from jobbergate_api.apps.job_submissions.constants import ( + JobSubmissionStatus, + SlurmJobState, + JobSubmissionMetricAggregateNames, + JobSubmissionMetricSampleRate, +) from jobbergate_api.apps.permissions import Permissions from jobbergate_api.rabbitmq_notification import rabbitmq_connect +from jobbergate_api.apps.job_submissions.models import JobSubmissionMetric +from jobbergate_api.apps.job_submissions.schemas import JobSubmissionMetricSchema, JobSubmissionAgentMaxTimes # Not using the synth_session fixture in a route that needs the database is unsafe pytest.mark.usefixtures("synth_session") +def generate_job_submission_metric_columns(base_time: int, num_rows: int = 5) -> list[tuple]: + """ + Generate a list of JobSubmissionMetric objects for a given job_submission_id. + + For simplicity, generate a list of JobSubmissionMetric objects with random values for each field + and all matching the same tuple (job_submission_id, node_host, step, task). + """ + node_host = str(uuid.uuid4()) + step = random.randint(0, 100) + task = random.randint(0, 100) + return [ + ( + base_time + i, + node_host, + step, + task, + random.uniform(0, 5), + random.uniform(0, 5), + random.uniform(0, 5), + random.randint(0, 5), + random.uniform(0, 5), + random.randint(0, 5), + random.randint(0, 5), + random.randint(0, 5), + random.randint(0, 5), + random.randint(0, 5), + ) + for i in range(num_rows) + ] + + @pytest.mark.parametrize("permission", (Permissions.ADMIN, Permissions.JOB_SUBMISSIONS_CREATE)) async def test_create_job_submission__on_site_submission( permission, @@ -2079,3 +2124,498 @@ async def test_job_submissions_agent_active__returns_400_if_token_does_not_carry response = await client.get("/jobbergate/job-submissions/agent/active") assert response.status_code == status.HTTP_400_BAD_REQUEST assert "Access token does not contain\\n 1: client_id" in response.text + + +@pytest.mark.parametrize("permission", (Permissions.ADMIN, Permissions.JOB_SUBMISSIONS_READ)) +async def test_job_submissions_agent_metrics__returns_successful_request__empty_list( + permission, + fill_job_script_data, + fill_job_submission_data, + client, + inject_security_header, + synth_services, +): + """ + Test GET /job-submissions/agent/metrics/{job_submission_id} returns 200 with no data. + """ + base_job_script = await synth_services.crud.job_script.create(**fill_job_script_data()) + + inserted_job_script_id = base_job_script.id + + inserted_submission = await synth_services.crud.job_submission.create( + job_script_id=inserted_job_script_id, + **fill_job_submission_data( + client_id="dummy-client", + status=JobSubmissionStatus.SUBMITTED, + slurm_job_id=111, + slurm_job_state=SlurmJobState.PENDING, + slurm_job_info="Fake slurm job info", + ), + ) + inserted_job_submission_id = inserted_submission.id + + inject_security_header("who@cares.com", permission, client_id="dummy-client") + response = await client.get(f"/jobbergate/job-submissions/agent/metrics/{inserted_job_submission_id}") + assert response.status_code == status.HTTP_200_OK + assert response.json() == {"job_submission_id": inserted_job_submission_id, "max_times": []} + + +@pytest.mark.parametrize("permission", (Permissions.ADMIN, Permissions.JOB_SUBMISSIONS_READ)) +async def test_job_submissions_agent_metrics__returns_successful_request( + permission, + fill_job_script_data, + fill_job_submission_data, + client, + inject_security_header, + synth_services, + synth_session, +): + """ + Test GET /job-submissions/agent/metrics/{job_submission_id} returns 200 with data. + """ + base_job_script = await synth_services.crud.job_script.create(**fill_job_script_data()) + + inserted_job_script_id = base_job_script.id + + inserted_submission = await synth_services.crud.job_submission.create( + job_script_id=inserted_job_script_id, + **fill_job_submission_data( + client_id="dummy-client", + status=JobSubmissionStatus.SUBMITTED, + slurm_job_id=111, + slurm_job_state=SlurmJobState.PENDING, + slurm_job_info="Fake slurm job info", + ), + ) + inserted_job_submission_id = inserted_submission.id + + base_time = int(datetime.now().timestamp()) + + job_metrics = generate_job_submission_metric_columns(base_time) + query = insert(JobSubmissionMetric).values( + [ + { + "time": item[0], + "job_submission_id": inserted_job_submission_id, + "slurm_job_id": inserted_submission.slurm_job_id, + "node_host": item[1], + "step": item[2], + "task": item[3], + "cpu_frequency": item[4], + "cpu_time": item[5], + "cpu_utilization": item[6], + "gpu_memory": item[7], + "gpu_utilization": item[8], + "page_faults": item[9], + "memory_rss": item[10], + "memory_virtual": item[11], + "disk_read": item[12], + "disk_write": item[13], + } + for item in job_metrics + ] + ) + await synth_session.execute(query) + + inject_security_header("who@cares.com", permission, client_id="dummy-client") + response = await client.get(f"/jobbergate/job-submissions/agent/metrics/{inserted_job_submission_id}") + assert response.status_code == status.HTTP_200_OK + max_time_element = max(job_metrics, key=lambda item: item[0]) + assert response.json() == { + "job_submission_id": inserted_job_submission_id, + "max_times": [ + JobSubmissionAgentMaxTimes( + max_time=max_time_element[0], + node_host=max_time_element[1], + step=max_time_element[2], + task=max_time_element[3], + ).model_dump() + ], + } + + +@pytest.mark.parametrize( + "permission, data", + [ + (Permissions.ADMIN, "dummy-string-data"), + (Permissions.JOB_SUBMISSIONS_UPDATE, {"dummy": "data"}), + ], +) +@mock.patch("jobbergate_api.apps.job_submissions.routers.validate_job_metric_upload_input") +async def test_job_submissions_agent_metrics_upload__400_uploading_invalid_data( + mocked_validate_job_metric_upload_input, + permission, + data, + fill_job_script_data, + fill_job_submission_data, + client, + inject_security_header, + synth_services, +): + """ + Test PUT /job-submissions/agent/metrics/{job_submission_id} returns 400 when the input data + is invalid. + """ + mocked_validate_job_metric_upload_input.side_effect = ValueError("Invalid data") + + base_job_script = await synth_services.crud.job_script.create(**fill_job_script_data()) + + inserted_job_script_id = base_job_script.id + + inserted_submission = await synth_services.crud.job_submission.create( + job_script_id=inserted_job_script_id, + **fill_job_submission_data( + client_id="dummy-client", + status=JobSubmissionStatus.SUBMITTED, + slurm_job_id=111, + slurm_job_state=SlurmJobState.PENDING, + slurm_job_info="Fake slurm job info", + ), + ) + inserted_job_submission_id = inserted_submission.id + + encoded_data = msgpack.packb(data) + + inject_security_header("who@cares.com", permission, client_id="dummy-client") + response = await client.put( + f"/jobbergate/job-submissions/agent/metrics/{inserted_job_submission_id}", + content=encoded_data, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.json() == {"detail": "Invalid data"} + mocked_validate_job_metric_upload_input.assert_called_once_with( + data, (int, str, int, int, float, float, float, int, float, int, int, int, int, int) + ) + + +@pytest.mark.parametrize( + "permission, num_rows", + [ + (Permissions.ADMIN, 3), + (Permissions.JOB_SUBMISSIONS_UPDATE, 8), + ], +) +async def test_job_submissions_agent_metrics_upload__successful_request( + permission, + num_rows, + fill_job_script_data, + fill_job_submission_data, + client, + inject_security_header, + synth_services, + synth_session, +): + """ + Test PUT /job-submissions/agent/metrics/{job_submission_id} returns 204 upon a successful request. + """ + base_job_script = await synth_services.crud.job_script.create(**fill_job_script_data()) + + inserted_job_script_id = base_job_script.id + + inserted_submission = await synth_services.crud.job_submission.create( + job_script_id=inserted_job_script_id, + **fill_job_submission_data( + client_id="dummy-client", + status=JobSubmissionStatus.SUBMITTED, + slurm_job_id=111, + slurm_job_state=SlurmJobState.PENDING, + slurm_job_info="Fake slurm job info", + ), + ) + inserted_job_submission_id = inserted_submission.id + + base_time = int(datetime.now().timestamp()) + raw_data = generate_job_submission_metric_columns(base_time, num_rows) + encoded_data = msgpack.packb(raw_data) + + inject_security_header("who@cares.com", permission, client_id="dummy-client") + response = await client.put( + f"/jobbergate/job-submissions/agent/metrics/{inserted_job_submission_id}", + content=encoded_data, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == status.HTTP_204_NO_CONTENT + + query = select(JobSubmissionMetric).where( + JobSubmissionMetric.job_submission_id == inserted_job_submission_id + ) + result = await synth_session.execute(query) + scalars = result.scalars() + assert all( + ( + scalar.time, + scalar.node_host, + scalar.step, + scalar.task, + scalar.cpu_frequency, + scalar.cpu_time, + scalar.cpu_utilization, + scalar.gpu_memory, + scalar.gpu_utilization, + scalar.page_faults, + scalar.memory_rss, + scalar.memory_virtual, + scalar.disk_read, + scalar.disk_write, + ) + in raw_data + for scalar in scalars + ) + + +@pytest.mark.parametrize( + "permission, num_rows", + [ + (Permissions.ADMIN, 3), + (Permissions.JOB_SUBMISSIONS_UPDATE, 8), + ], +) +async def test_job_submissions_agent_metrics_upload__400_duplicated_data( + permission, + num_rows, + fill_job_script_data, + fill_job_submission_data, + client, + inject_security_header, + synth_services, + synth_session, +): + """ + Test PUT /job-submissions/agent/metrics/{job_submission_id} returns 400 when uploading + duplicated data. + """ + base_job_script = await synth_services.crud.job_script.create(**fill_job_script_data()) + + inserted_job_script_id = base_job_script.id + + inserted_submission = await synth_services.crud.job_submission.create( + job_script_id=inserted_job_script_id, + **fill_job_submission_data( + client_id="dummy-client", + status=JobSubmissionStatus.SUBMITTED, + slurm_job_id=111, + slurm_job_state=SlurmJobState.PENDING, + slurm_job_info="Fake slurm job info", + ), + ) + inserted_job_submission_id = inserted_submission.id + + base_time = int(datetime.now().timestamp()) + raw_data = generate_job_submission_metric_columns(base_time, num_rows) + encoded_data = msgpack.packb(raw_data) + + query = insert(JobSubmissionMetric).values( + [ + { + "time": data_point[0], + "node_host": data_point[1], + "step": data_point[2], + "task": data_point[3], + "cpu_frequency": data_point[4], + "cpu_time": data_point[5], + "cpu_utilization": data_point[6], + "gpu_memory": data_point[7], + "gpu_utilization": data_point[8], + "page_faults": data_point[9], + "memory_rss": data_point[10], + "memory_virtual": data_point[11], + "disk_read": data_point[12], + "disk_write": data_point[13], + "job_submission_id": inserted_job_submission_id, + "slurm_job_id": inserted_submission.slurm_job_id, + } + for data_point in raw_data + ] + ) + await synth_session.execute(query) + + inject_security_header("who@cares.com", permission, client_id="dummy-client") + response = await client.put( + f"/jobbergate/job-submissions/agent/metrics/{inserted_job_submission_id}", + content=encoded_data, + headers={"Content-Type": "application/octet-stream"}, + ) + assert response.status_code == status.HTTP_400_BAD_REQUEST + assert response.json() == {"detail": "Failed to insert metrics"} + + +@pytest.mark.parametrize( + "permission, sample_rate, node_host", + list( + itertools.product( + (Permissions.ADMIN, Permissions.JOB_SUBMISSIONS_READ), + map(lambda enum: enum.value, JobSubmissionMetricSampleRate), + ("node_1", "dummy-node", None), + ) + ), +) +@mock.patch("jobbergate_api.apps.job_submissions.routers.sa_text") +async def test_job_submissions_metrics__aggregation_by_all_nodes( + mocked_sa_text, + permission, + sample_rate, + node_host, + client, + inject_security_header, + synth_session, +): + """ + Test GET /job-submissions/{job_submission_id}/metrics returns 200. + """ + num_rows = random.randint(1, 10) + job_submission_id = random.randint(1, 100) + random_hour_interval = random.randint(1, 23) + + mocked_session_execute = mock.AsyncMock() + mocked_session_execute.return_value.fetchall = mock.Mock() + + base_time = int(datetime.now().timestamp()) + raw_data = generate_job_submission_metric_columns(base_time, num_rows) + + mocked_session_execute.return_value.fetchall.return_value = [ + JobSubmissionMetricSchema.from_iterable(data_point).model_dump(exclude=["step", "task"]).values() + for data_point in raw_data + ] + + if node_host is not None: + where_statement = "WHERE job_submission_id = :job_submission_id AND node_host = :node_host" + match sample_rate: + case JobSubmissionMetricSampleRate.ten_seconds: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_10_seconds_by_node + case JobSubmissionMetricSampleRate.one_minute: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_minute_by_node + case JobSubmissionMetricSampleRate.ten_minutes: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_10_minutes_by_node + case JobSubmissionMetricSampleRate.one_hour: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_hour_by_node + case JobSubmissionMetricSampleRate.one_week: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_week_by_node + else: + where_statement = "WHERE job_submission_id = :job_submission_id" + match sample_rate: + case JobSubmissionMetricSampleRate.ten_seconds: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_10_seconds_all_nodes + case JobSubmissionMetricSampleRate.one_minute: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_minute_all_nodes + case JobSubmissionMetricSampleRate.ten_minutes: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_10_minutes_all_nodes + case JobSubmissionMetricSampleRate.one_hour: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_hour_all_nodes + case JobSubmissionMetricSampleRate.one_week: + view_name = JobSubmissionMetricAggregateNames.metrics_nodes_mv_1_week_all_nodes + + expected_sql_query = dedent( + f""" + SELECT bucket, + node_host, + cpu_frequency, + cpu_time, + cpu_utilization, + gpu_memory, + gpu_utilization, + page_faults, + memory_rss, + memory_virtual, + disk_read, + disk_write + FROM {view_name} + {where_statement} + AND bucket >= :start_time + AND bucket <= :end_time + ORDER BY bucket + """ + ) + + start_time = datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + end_time = start_time + timedelta(hours=random_hour_interval) + + http_query_params = { + "start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + "end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + "sample_rate": sample_rate, + } + + sql_query_params = { + "job_submission_id": job_submission_id, + "start_time": start_time, + "end_time": end_time, + } + + if node_host is not None: + http_query_params["node"] = node_host + sql_query_params["node_host"] = node_host + + with mock.patch.object(synth_session, "execute", mocked_session_execute): + inject_security_header("who@cares.com", permission, client_id="dummy-client") + response = await client.get( + f"/jobbergate/job-submissions/{job_submission_id}/metrics", params=http_query_params + ) + + assert response.status_code == status.HTTP_200_OK + mocked_session_execute.assert_awaited_once_with(mocked_sa_text.return_value, sql_query_params) + mocked_session_execute.return_value.fetchall.assert_called_once_with() + mocked_sa_text.assert_called_once_with(expected_sql_query) + assert response.json() == [ + JobSubmissionMetricSchema.from_iterable( + ( + data_point[0], + data_point[1], + # skip both task and step because the API aggregates in the node level + # data_point[2], + # data_point[3], + data_point[4], + data_point[5], + data_point[6], + data_point[7], + data_point[8], + data_point[9], + data_point[10], + data_point[11], + data_point[12], + data_point[13], + ), + skip_optional=True, + ).model_dump() + for data_point in raw_data + ] + + +@pytest.mark.parametrize("permission", [Permissions.ADMIN, Permissions.JOB_SUBMISSIONS_READ]) +@mock.patch("jobbergate_api.apps.job_submissions.routers.sa_text") +async def test_job_submissions_metrics__start_time_less_greater_than_end_time( + mocked_sa_text, + permission, + client, + inject_security_header, + synth_session, +): + """ + Test GET /job-submissions/{job_submission_id}/metrics returns 400 when the start_time + query param is greater than the end_time. + """ + job_submission_id = random.randint(1, 100) + random_hour_interval = random.randint(1, 23) + + mocked_session_execute = mock.AsyncMock() + mocked_session_execute.return_value.fetchall = mock.Mock() + + end_time = datetime(2021, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + start_time = end_time + timedelta(hours=random_hour_interval) + + http_query_params = { + "start_time": start_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + "end_time": end_time.strftime("%Y-%m-%dT%H:%M:%SZ"), + } + + with mock.patch.object(synth_session, "execute", mocked_session_execute): + inject_security_header("who@cares.com", permission, client_id="dummy-client") + response = await client.get( + f"/jobbergate/job-submissions/{job_submission_id}/metrics", params=http_query_params + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + mocked_session_execute.assert_not_awaited() + mocked_session_execute.return_value.fetchall.assert_not_called() + mocked_sa_text.assert_not_called() + assert response.json() == {"detail": "End time must be greater than the start time."} diff --git a/jobbergate-api/tests/apps/job_submissions/test_schemas.py b/jobbergate-api/tests/apps/job_submissions/test_schemas.py index a5315625b..ce4633077 100644 --- a/jobbergate-api/tests/apps/job_submissions/test_schemas.py +++ b/jobbergate-api/tests/apps/job_submissions/test_schemas.py @@ -1,6 +1,11 @@ import pytest -from jobbergate_api.apps.job_submissions.schemas import JobSubmissionCreateRequest, JobSubmissionUpdateRequest +from jobbergate_api.apps.job_submissions.schemas import ( + JobSubmissionCreateRequest, + JobSubmissionUpdateRequest, + JobSubmissionMetricSchema, +) +from datetime import datetime @pytest.mark.parametrize( @@ -22,3 +27,116 @@ def test_empty_string_to_none(schema): With that, the default value was not applied on the Agent side. """ assert schema.execution_directory is None + + +class TestJobSubmissionMetricSchema: + def test_job_submission_metric_schema_validate_time(self): + """ + Test that the validate_time method correctly converts datetime to int. + """ + timestamp = 1638316800 + dt = datetime.fromtimestamp(timestamp) + schema = JobSubmissionMetricSchema( + time=dt, + node_host="node1", + cpu_frequency=2.5, + cpu_time=100.0, + cpu_utilization=50.0, + gpu_memory=1024, + gpu_utilization=75.0, + page_faults=10, + memory_rss=2048, + memory_virtual=4096, + disk_read=500, + disk_write=300, + ) + assert schema.time == timestamp + + def test_job_submission_metric_schema_from_iterable(self): + """ + Test that the from_iterable method correctly creates an instance from an iterable. + """ + iterable = [ + 1638316800, + "node1", + None, + None, + 2.5, + 100.0, + 50.0, + 1024, + 75.0, + 10, + 2048, + 4096, + 500, + 300, + ] + schema = JobSubmissionMetricSchema.from_iterable(iterable) + assert schema.time == 1638316800 + assert schema.node_host == "node1" + assert schema.cpu_frequency == 2.5 + assert schema.cpu_time == 100.0 + assert schema.cpu_utilization == 50.0 + assert schema.gpu_memory == 1024 + assert schema.gpu_utilization == 75.0 + assert schema.page_faults == 10 + assert schema.memory_rss == 2048 + assert schema.memory_virtual == 4096 + assert schema.disk_read == 500 + assert schema.disk_write == 300 + + def test_job_submission_metric_schema_from_iterable_invalid_length(self): + """ + Test that the from_iterable method raises a ValueError if the iterable length is incorrect. + """ + iterable = [ + 1638316800, + "node1", + 2.5, + 100.0, + 50.0, + 1024, + 75.0, + 10, + 2048, + 4096, + 500, + 300, + ] + with pytest.raises(ValueError, match="The iterable must have the same length as the model fields."): + JobSubmissionMetricSchema.from_iterable(iterable) + + def test_job_submission_metric_schema_from_iterable__skip_optional_fields(self): + """ + Test that the from_iterable method correctly creates an instance from an iterable with optional fields skipped. + """ + iterable = [ + 1638316800, + "node1", + 2.5, + 100.0, + 50.0, + 1024, + 75.0, + 10, + 2048, + 4096, + 500, + 300, + ] + schema = JobSubmissionMetricSchema.from_iterable(iterable, skip_optional=True) + assert schema.time == 1638316800 + assert schema.step is None + assert schema.task is None + assert schema.node_host == "node1" + assert schema.cpu_frequency == 2.5 + assert schema.cpu_time == 100.0 + assert schema.cpu_utilization == 50.0 + assert schema.gpu_memory == 1024 + assert schema.gpu_utilization == 75.0 + assert schema.page_faults == 10 + assert schema.memory_rss == 2048 + assert schema.memory_virtual == 4096 + assert schema.disk_read == 500 + assert schema.disk_write == 300 diff --git a/jobbergate-api/tests/apps/job_submissions/test_services.py b/jobbergate-api/tests/apps/job_submissions/test_services.py index ca25f53bb..c7a36b8c3 100644 --- a/jobbergate-api/tests/apps/job_submissions/test_services.py +++ b/jobbergate-api/tests/apps/job_submissions/test_services.py @@ -88,6 +88,6 @@ async def test_update_includes_no_files( result = await synth_services.crud.job_submission.update(submission_instance.id, name="new-name") actual_unloaded = inspect(result).unloaded - expected_unloaded = {"job_script"} + expected_unloaded = {"job_script", "metrics"} assert actual_unloaded == expected_unloaded diff --git a/jobbergate-composed/docker-compose.yml b/jobbergate-composed/docker-compose.yml index 09f6d6206..b760157cb 100644 --- a/jobbergate-composed/docker-compose.yml +++ b/jobbergate-composed/docker-compose.yml @@ -100,7 +100,7 @@ services: - DEFAULT_CLUSTER_NAME=local-slurm db: - image: postgres + image: timescale/timescaledb:latest-pg17 restart: always networks: - jobbergate-net @@ -119,7 +119,7 @@ services: retries: 5 test-db: - image: postgres + image: timescale/timescaledb:latest-pg17 restart: always networks: - jobbergate-net @@ -363,6 +363,7 @@ volumes: var_log_slurm: jobbergate-agent-cache: jobbergate-cli-cache: + timeseries_data: networks: jobbergate-net: