From cf0955b169b1af76a0525107f37bc4cef3d478d6 Mon Sep 17 00:00:00 2001 From: matheushent Date: Thu, 12 Dec 2024 18:53:18 +0000 Subject: [PATCH] PENG-2457 improve typing by adding a TypeAlias for the job metric data structure expected by the API --- .../jobbergate_agent/jobbergate/schemas.py | 12 +++++++++++- jobbergate-agent/jobbergate_agent/utils/compute.py | 7 +++---- jobbergate-agent/tests/jobbergate/test_compute.py | 10 +++++----- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py b/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py index c265b0bb..3d241d2e 100644 --- a/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py +++ b/jobbergate-agent/jobbergate_agent/jobbergate/schemas.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Optional, TypedDict +from typing import List, Optional, TypedDict, TypeAlias import pydantic from pydantic import ConfigDict @@ -120,3 +120,13 @@ class JobSubmissionMetricsMaxResponse(pydantic.BaseModel): job_submission_id: int max_times: list[JobSubmissionMetricsMaxTime] + + +""" +Type alias for job metric structure. It matches the following sequence of data +(time, host, step, task, CPUFrequency, CPUTime, CPUUtilization, GPUMemMB, +GPUUtilization, Pages, RSS, VMSize, ReadMB, WriteMB) +""" +JobMetricData: TypeAlias = list[ + tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float] +] diff --git a/jobbergate-agent/jobbergate_agent/utils/compute.py b/jobbergate-agent/jobbergate_agent/utils/compute.py index 882029bc..202a919d 100644 --- a/jobbergate-agent/jobbergate_agent/utils/compute.py +++ b/jobbergate-agent/jobbergate_agent/utils/compute.py @@ -10,9 +10,8 @@ from loguru import logger from numba import njit -from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure - from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT +from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure, JobMetricData def measure_memory_usage(func: Callable) -> Callable: @@ -64,7 +63,7 @@ def _aggregate_with_numba( @measure_memory_usage def aggregate_influx_measures( data_points: Iterator[InfluxDBMeasure], -) -> list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]]: +) -> JobMetricData: """Aggregate the list of data points by time, host, step and task. The output data is a list of tuples with the following format: @@ -112,7 +111,7 @@ def aggregate_influx_measures( reverse_task_mapping = {v: k for k, v in task_mapping.items()} return cast( - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + JobMetricData, [ ( int(unique_key[0]), # time diff --git a/jobbergate-agent/tests/jobbergate/test_compute.py b/jobbergate-agent/tests/jobbergate/test_compute.py index 69984b3c..4799ae63 100644 --- a/jobbergate-agent/tests/jobbergate/test_compute.py +++ b/jobbergate-agent/tests/jobbergate/test_compute.py @@ -11,7 +11,7 @@ import numpy as np from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT -from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure +from jobbergate_agent.jobbergate.schemas import InfluxDBMeasure, JobMetricData from jobbergate_agent.utils.compute import ( aggregate_influx_measures, measure_memory_usage, @@ -26,7 +26,7 @@ def generate_and_aggregate_job_metrics_data() -> ( [int, int, int, int, int], tuple[ list[InfluxDBMeasure], - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + JobMetricData, ], ] ): @@ -40,7 +40,7 @@ def _generate_and_aggregate( num_points_per_measurement: int, num_hosts: int, num_jobs: int, num_steps: int, num_tasks: int ) -> tuple[ list[InfluxDBMeasure], - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + JobMetricData, ]: # Initialize data structures current_time = int(datetime.now().timestamp()) @@ -82,7 +82,7 @@ def _generate_and_aggregate( # Create aggregated list aggregated_list = cast( - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + JobMetricData, [ ( time, @@ -119,7 +119,7 @@ async def test_aggregate_influx_measures__success( [int, int, int, int, int], tuple[ list[InfluxDBMeasure], - list[tuple[int, str, str, str, float, float, float, float, float, float, float, float, float, float]], + JobMetricData, ], ], ):