Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dashboard] Remove DataSource.agents with internal kv get. #50025

Merged
merged 10 commits into from
Feb 1, 2025
40 changes: 1 addition & 39 deletions python/ray/dashboard/datacenter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, List, Optional
from typing import List, Optional

import ray.dashboard.consts as dashboard_consts
from ray._private.utils import (
Expand All @@ -26,9 +26,6 @@ class DataSource:
# {actor id hex(str): actor table data(dict of ActorTableData
# in gcs.proto)}
actors = MutableNotificationDict()
# {job id hex(str): job table data(dict of JobTableData in gcs.proto)}
# {node id hex(str): dashboard agent [http port(int), grpc port(int)]}
agents = Dict()
# {node id hex(str): gcs node info(dict of GcsNodeInfo in gcs.proto)}
nodes = Dict()
# {node id hex(str): worker list}
Expand Down Expand Up @@ -188,41 +185,6 @@ async def get_all_node_summary(cls):
for node_id in DataSource.nodes.keys()
]

@classmethod
async def get_agent_infos(
cls, target_node_ids: Optional[List[str]] = None
) -> Dict[str, Dict[str, Any]]:
"""Fetches running Agent (like HTTP/gRPC ports, IP, etc) running on every node

:param target_node_ids: Target node ids to fetch agent info for. If omitted will
fetch the info for all agents
"""

# Return all available agent infos in case no target node-ids were provided
target_node_ids = target_node_ids or DataSource.agents.keys()

missing_node_ids = [
node_id for node_id in target_node_ids if node_id not in DataSource.agents
]
if missing_node_ids:
logger.warning(
f"Agent info was not found for {missing_node_ids}"
f" (having agent infos for {list(DataSource.agents.keys())})"
)
return {}

def _create_agent_info(node_id: str):
(node_ip, http_port, grpc_port) = DataSource.agents[node_id]

return dict(
ipAddress=node_ip,
httpPort=int(http_port or -1),
grpcPort=int(grpc_port or -1),
httpAddress=f"{node_ip}:{http_port}",
)

return {node_id: _create_agent_info(node_id) for node_id in target_node_ids}

@classmethod
async def get_actor_infos(cls, actor_ids: Optional[List[str]] = None):
target_actor_table_entries: dict[str, Optional[dict]]
Expand Down
148 changes: 97 additions & 51 deletions python/ray/dashboard/modules/job/job_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,31 @@
import json
import logging
import traceback
from random import sample
from typing import AsyncIterator, List, Optional
from random import choice
from typing import AsyncIterator, Dict, List, Optional, Tuple

import aiohttp.web
from aiohttp.client import ClientResponse
from aiohttp.web import Request, Response

import ray
from ray import NodeID
import ray.dashboard.consts as dashboard_consts
from ray.dashboard.consts import (
GCS_RPC_TIMEOUT_SECONDS,
DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX,
TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS,
WAIT_AVAILABLE_AGENT_TIMEOUT,
)
import ray.dashboard.optional_utils as optional_utils
import ray.dashboard.utils as dashboard_utils
from ray._private.ray_constants import env_bool
from ray._private.ray_constants import env_bool, KV_NAMESPACE_DASHBOARD
from ray._private.runtime_env.packaging import (
package_exists,
pin_runtime_env_uri,
upload_package_to_gcs,
)
from ray._private.utils import get_or_create_event_loop
from ray.dashboard.datacenter import DataOrganizer
from ray.dashboard.modules.job.common import (
JobDeleteResponse,
JobInfoStorageClient,
Expand Down Expand Up @@ -166,9 +172,10 @@ def __init__(self, config: dashboard_utils.DashboardHeadModuleConfig):
# `JobHead` has ever used, and will not be deleted
# from it unless `JobAgentSubmissionClient` is no
# longer available (the corresponding agent process is dead)
self._agents = dict()
# {node_id: JobAgentSubmissionClient}
self._agents: Dict[NodeID, JobAgentSubmissionClient] = dict()

async def get_target_agent(self) -> Optional[JobAgentSubmissionClient]:
async def get_target_agent(self) -> JobAgentSubmissionClient:
if RAY_JOB_AGENT_USE_HEAD_NODE_ONLY:
return await self._get_head_node_agent()

Expand All @@ -188,79 +195,118 @@ async def _pick_random_agent(self) -> Optional[JobAgentSubmissionClient]:
2. if not, randomly select one agent from all available agents,
it is possible that the selected one already exists in
`self._agents`.

If there's no agent available at all, or there's exception, it will retry every
`TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS` seconds indefinitely.
"""
while True:
try:
return await self._pick_random_agent_once()
except Exception:
logger.exception(
f"Failed to pick a random agent, retrying in {TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS} seconds..."
)
await asyncio.sleep(TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS)

async def _pick_random_agent_once(self) -> JobAgentSubmissionClient:
"""
Query the internal kv for all agent infos, and pick agents randomly. May raise
exception if there's no agent available at all or there's network error.
"""
# NOTE: Following call will block until there's at least 1 agent info
# being populated from GCS
Comment on lines 216 to 217
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is no longer accurate?

agent_infos = await self._fetch_agent_infos()
agent_node_ids = await self._fetch_all_agent_node_ids()

# delete dead agents.
for dead_node in set(self._agents) - set(agent_infos):
for dead_node in set(self._agents) - set(agent_node_ids):
client = self._agents.pop(dead_node)
await client.close()

if len(self._agents) >= dashboard_consts.CANDIDATE_AGENT_NUMBER:
node_id = sample(list(set(self._agents)), 1)[0]
node_id = choice(list(self._agents))
return self._agents[node_id]
else:
# Randomly select one from among all agents, it is possible that
# the selected one already exists in `self._agents`
node_id = sample(sorted(agent_infos), 1)[0]
agent_info = agent_infos[node_id]
node_id = choice(list(agent_node_ids))

if node_id not in self._agents:
node_ip = agent_info["ipAddress"]
http_port = agent_info["httpPort"]
agent_http_address = f"http://{node_ip}:{http_port}"
# Fetch agent info from InternalKV, and create a new
# JobAgentSubmissionClient. May raise if the node_id is removed in
# InternalKV after the _fetch_all_agent_node_ids, though unlikely.
ip, http_port, grpc_port = await self._fetch_agent_info(node_id)
agent_http_address = f"http://{ip}:{http_port}"
self._agents[node_id] = JobAgentSubmissionClient(agent_http_address)

return self._agents[node_id]

async def _get_head_node_agent(self) -> Optional[JobAgentSubmissionClient]:
"""Retrieves HTTP client for `JobAgent` running on the Head node"""
async def _get_head_node_agent_once(self) -> JobAgentSubmissionClient:
head_node_id_hex = await get_head_node_id(self.gcs_aio_client)

head_node_id = await get_head_node_id(self.gcs_aio_client)
if not head_node_id_hex:
raise Exception("Head node id has not yet been persisted in GCS")

if not head_node_id:
logger.warning("Head node id has not yet been persisted in GCS")
return None
head_node_id = NodeID.from_hex(head_node_id_hex)

if head_node_id not in self._agents:
agent_infos = await self._fetch_agent_infos(target_node_ids=[head_node_id])
if head_node_id not in agent_infos:
logger.error("Head node agent's information was not found")
return None

agent_info = agent_infos[head_node_id]

node_ip = agent_info["ipAddress"]
http_port = agent_info["httpPort"]
agent_http_address = f"http://{node_ip}:{http_port}"

ip, http_port, grpc_port = await self._fetch_agent_info(head_node_id)
agent_http_address = f"http://{ip}:{http_port}"
self._agents[head_node_id] = JobAgentSubmissionClient(agent_http_address)

return self._agents[head_node_id]

@staticmethod
async def _fetch_agent_infos(target_node_ids: Optional[List[str]] = None):
"""Fetches agent infos for nodes identified by provided node-ids (for all
nodes if not provided)

NOTE: This call will block until there's at least 1 valid agent info populated
async def _get_head_node_agent(self) -> JobAgentSubmissionClient:
"""Retrieves HTTP client for `JobAgent` running on the Head node. If the head
node does not have an agent, it will retry every
`TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS` seconds indefinitely.
"""

while True:
raw_agent_infos = await DataOrganizer.get_agent_infos(target_node_ids)
# Filter out invalid agent infos with unset HTTP port
agent_infos = {
key: value
for key, value in raw_agent_infos.items()
if value.get("httpPort", -1) > 0
}
try:
return await self._get_head_node_agent_once()
except Exception:
logger.exception(
f"Failed to get head node agent, retrying in {TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS} seconds..."
)
await asyncio.sleep(TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS)

async def _fetch_all_agent_node_ids(self) -> List[NodeID]:
"""
Fetches all NodeIDs with agent infos in the cluster.

May raise exception if there's no agent available at all or there's network error.
Returns: List[NodeID]
"""
keys = await self.gcs_aio_client.internal_kv_keys(
f"{DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX}".encode(),
namespace=KV_NAMESPACE_DASHBOARD,
timeout=GCS_RPC_TIMEOUT_SECONDS,
)
if not keys:
# No agent keys found, retry
raise Exception("No agents found in InternalKV.")
return [
NodeID.from_hex(key[len(DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX) :].decode())
for key in keys
]

if len(agent_infos) > 0:
return agent_infos
async def _fetch_agent_info(self, target_node_id: NodeID) -> Tuple[str, int, int]:
"""
Fetches agent info by the Node ID. May raise exception if there's network error or the
agent info is not found.

await asyncio.sleep(dashboard_consts.TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS)
Returns: (ip, http_port, grpc_port)
"""
key = f"{DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX}{target_node_id.hex()}"
value = await self.gcs_aio_client.internal_kv_get(
key,
namespace=KV_NAMESPACE_DASHBOARD,
timeout=GCS_RPC_TIMEOUT_SECONDS,
)
if not value:
raise KeyError(
f"Agent info not found in internal kv for node {target_node_id}"
)
return json.loads(value.decode())

@routes.get("/api/version")
async def get_version(self, req: Request) -> Response:
Expand Down Expand Up @@ -337,7 +383,7 @@ async def submit_job(self, req: Request) -> Response:
try:
job_agent_client = await asyncio.wait_for(
self.get_target_agent(),
timeout=dashboard_consts.WAIT_AVAILABLE_AGENT_TIMEOUT,
timeout=WAIT_AVAILABLE_AGENT_TIMEOUT,
)
resp = await job_agent_client.submit_job_internal(submit_request)
except asyncio.TimeoutError:
Expand Down Expand Up @@ -384,7 +430,7 @@ async def stop_job(self, req: Request) -> Response:
try:
job_agent_client = await asyncio.wait_for(
self.get_target_agent(),
timeout=dashboard_consts.WAIT_AVAILABLE_AGENT_TIMEOUT,
timeout=WAIT_AVAILABLE_AGENT_TIMEOUT,
)
resp = await job_agent_client.stop_job_internal(job.submission_id)
except Exception:
Expand Down Expand Up @@ -419,7 +465,7 @@ async def delete_job(self, req: Request) -> Response:
try:
job_agent_client = await asyncio.wait_for(
self.get_target_agent(),
timeout=dashboard_consts.WAIT_AVAILABLE_AGENT_TIMEOUT,
timeout=WAIT_AVAILABLE_AGENT_TIMEOUT,
)
resp = await job_agent_client.delete_job_internal(job.submission_id)
except Exception:
Expand Down
Loading