Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(agent): PENG-2457 modify the fetch_influx_data and update_job_metrics functions #677

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 47 additions & 18 deletions jobbergate-agent/jobbergate_agent/jobbergate/update.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
from itertools import chain
from textwrap import dedent
from typing import List

import msgpack
@@ -17,7 +18,7 @@
InfluxDBPointDict,
)
from jobbergate_agent.settings import SETTINGS
from jobbergate_agent.utils.exception import JobbergateApiError, SbatchError
from jobbergate_agent.utils.exception import JobbergateApiError, SbatchError, JobbergateAgentError
from jobbergate_agent.utils.logging import log_error
from jobbergate_agent.jobbergate.constants import INFLUXDB_MEASUREMENT
from jobbergate_agent.utils.compute import aggregate_influx_measures
@@ -83,20 +84,39 @@ async def update_job_data(


async def fetch_influx_data(
time: int, host: str, step: int, task: int, job: int, measurement: INFLUXDB_MEASUREMENT
job: int,
measurement: INFLUXDB_MEASUREMENT,
*,
time: int | None = None,
host: str | None = None,
step: int | None = None,
task: int | None = None,
) -> list[InfluxDBPointDict]:
"""
Fetch data from InfluxDB for a given host, step and task.
"""
query = f"""
SELECT * FROM {measurement} WHERE time > $time AND host = $host AND step = $step AND task = $task AND job = $job
"""
with JobbergateApiError.handle_errors("Failed to fetch data from InfluxDB", do_except=log_error):
with JobbergateAgentError.handle_errors("Failed to fetch measures from InfluxDB", do_except=log_error):
all_none = all(arg is None for arg in [time, host, step, task])
all_set = all(arg is not None for arg in [time, host, step, task])

if not (all_none or all_set):
raise ValueError("Invalid argument combination: all optional arguments must be either set or None.")

if all_set:
query = dedent(f"""
SELECT * FROM {measurement} WHERE time > $time AND host = $host AND step = $step AND task = $task AND job = $job
""")
params = {"time": time, "host": host, "step": str(step), "task": str(task), "job": str(job)}
else:
query = f"SELECT * FROM {measurement} WHERE job = $job"
params = {"job": str(job)}

assert influxdb_client is not None # mypy assertion
params = dict(time=time, host=host, step=str(step), task=str(task), job=str(job))

logger.debug(f"Querying InfluxDB with: {query=}, {params=}")
result = influxdb_client.query(query, bind_params=params, epoch="us")
logger.debug("Successfully fetched data from InfluxDB")

return [
InfluxDBPointDict(
time=point["time"],
@@ -140,18 +160,27 @@ async def update_job_metrics(active_job_submittion: ActiveJobSubmission) -> None

influx_measurements = fetch_influx_measurements()

tasks = (
fetch_influx_data(
job_max_time.max_time,
job_max_time.node_host,
job_max_time.step,
job_max_time.task,
active_job_submittion.slurm_job_id,
measurement["name"],
if not job_max_times.max_times:
tasks = (
fetch_influx_data(
active_job_submittion.slurm_job_id,
measurement["name"],
)
for measurement in influx_measurements
)
else:
tasks = (
fetch_influx_data(
active_job_submittion.slurm_job_id,
measurement["name"],
time=job_max_time.max_time,
host=job_max_time.node_host,
step=job_max_time.step,
task=job_max_time.task,
)
for job_max_time in job_max_times.max_times
for measurement in influx_measurements
)
for job_max_time in job_max_times.max_times
for measurement in influx_measurements
)
results = await asyncio.gather(*list(tasks))
data_points = chain.from_iterable(results)
aggregated_data_points = aggregate_influx_measures(data_points)
14 changes: 7 additions & 7 deletions jobbergate-agent/jobbergate_agent/utils/exception.py
Original file line number Diff line number Diff line change
@@ -8,31 +8,31 @@
from buzz.tools import DoExceptParams, noop


class ClusterAgentError(Buzz):
class JobbergateAgentError(Buzz):
"""Raise exception when execution command returns an error"""


class ProcessExecutionError(ClusterAgentError):
class ProcessExecutionError(JobbergateAgentError):
"""Raise exception when execution command returns an error"""


class AuthTokenError(ClusterAgentError):
class AuthTokenError(JobbergateAgentError):
"""Raise exception when there are connection issues with the backend"""


class SbatchError(ClusterAgentError):
class SbatchError(JobbergateAgentError):
"""Raise exception when sbatch raises any error"""


class JobbergateApiError(ClusterAgentError):
class JobbergateApiError(JobbergateAgentError):
"""Raise exception when communication with Jobbergate API fails"""


class JobSubmissionError(ClusterAgentError):
class JobSubmissionError(JobbergateAgentError):
"""Raise exception when a job cannot be submitted raises any error"""


class SlurmParameterParserError(ClusterAgentError):
class SlurmParameterParserError(JobbergateAgentError):
"""Raise exception when Slurm mapper or SBATCH parser face any error"""


Loading
Loading