Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dashboard] Remove DataSource.agents with internal kv get. #50025

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 1 addition & 36 deletions python/ray/dashboard/datacenter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, List, Optional
from typing import List, Optional

import ray.dashboard.consts as dashboard_consts
from ray._private.utils import (
Expand Down Expand Up @@ -188,41 +188,6 @@ async def get_all_node_summary(cls):
for node_id in DataSource.nodes.keys()
]

@classmethod
async def get_agent_infos(
cls, target_node_ids: Optional[List[str]] = None
) -> Dict[str, Dict[str, Any]]:
"""Fetches running Agent (like HTTP/gRPC ports, IP, etc) running on every node

:param target_node_ids: Target node ids to fetch agent info for. If omitted will
fetch the info for all agents
"""

# Return all available agent infos in case no target node-ids were provided
target_node_ids = target_node_ids or DataSource.agents.keys()

missing_node_ids = [
node_id for node_id in target_node_ids if node_id not in DataSource.agents
]
if missing_node_ids:
logger.warning(
f"Agent info was not found for {missing_node_ids}"
f" (having agent infos for {list(DataSource.agents.keys())})"
)
return {}

def _create_agent_info(node_id: str):
(node_ip, http_port, grpc_port) = DataSource.agents[node_id]

return dict(
ipAddress=node_ip,
httpPort=int(http_port or -1),
grpcPort=int(grpc_port or -1),
httpAddress=f"{node_ip}:{http_port}",
)

return {node_id: _create_agent_info(node_id) for node_id in target_node_ids}

@classmethod
async def get_actor_infos(cls, actor_ids: Optional[List[str]] = None):
target_actor_table_entries: dict[str, Optional[dict]]
Expand Down
144 changes: 105 additions & 39 deletions python/ray/dashboard/modules/job/job_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,31 @@
import json
import logging
import traceback
from random import sample
from typing import AsyncIterator, List, Optional
from random import choice
from typing import AsyncIterator, Dict, List, Optional, Tuple

import aiohttp.web
from aiohttp.client import ClientResponse
from aiohttp.web import Request, Response

import ray
from ray import NodeID
import ray.dashboard.consts as dashboard_consts
from ray.dashboard.consts import (
GCS_RPC_TIMEOUT_SECONDS,
DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX,
TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS,
WAIT_AVAILABLE_AGENT_TIMEOUT,
)
import ray.dashboard.optional_utils as optional_utils
import ray.dashboard.utils as dashboard_utils
from ray._private.ray_constants import env_bool
from ray._private.ray_constants import env_bool, KV_NAMESPACE_DASHBOARD
from ray._private.runtime_env.packaging import (
package_exists,
pin_runtime_env_uri,
upload_package_to_gcs,
)
from ray._private.utils import get_or_create_event_loop
from ray.dashboard.datacenter import DataOrganizer
from ray.dashboard.modules.job.common import (
JobDeleteResponse,
JobInfoStorageClient,
Expand Down Expand Up @@ -166,7 +172,8 @@ def __init__(self, config: dashboard_utils.DashboardHeadModuleConfig):
# `JobHead` has ever used, and will not be deleted
# from it unless `JobAgentSubmissionClient` is no
# longer available (the corresponding agent process is dead)
self._agents = dict()
# {node_id: JobAgentSubmissionClient}
self._agents: Dict[NodeID, JobAgentSubmissionClient] = dict()

async def get_target_agent(self) -> Optional[JobAgentSubmissionClient]:
if RAY_JOB_AGENT_USE_HEAD_NODE_ONLY:
Expand All @@ -191,76 +198,135 @@ async def _pick_random_agent(self) -> Optional[JobAgentSubmissionClient]:
"""
# NOTE: Following call will block until there's at least 1 agent info
# being populated from GCS
agent_infos = await self._fetch_agent_infos()
agent_infos = await self._fetch_all_agent_infos()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be expensive for a large cluster. I feel we can let dashboard gcs client subscribes to nodes table so we have a local cache of nodes.


# delete dead agents.
for dead_node in set(self._agents) - set(agent_infos):
client = self._agents.pop(dead_node)
await client.close()

if len(self._agents) >= dashboard_consts.CANDIDATE_AGENT_NUMBER:
node_id = sample(list(set(self._agents)), 1)[0]
node_id = choice(list(self._agents))
return self._agents[node_id]
else:
# Randomly select one from among all agents, it is possible that
# the selected one already exists in `self._agents`
node_id = sample(sorted(agent_infos), 1)[0]
node_id = choice(list(agent_infos))
agent_info = agent_infos[node_id]

if node_id not in self._agents:
node_ip = agent_info["ipAddress"]
http_port = agent_info["httpPort"]
agent_http_address = f"http://{node_ip}:{http_port}"
ip, http_port, grpc_port = agent_info
agent_http_address = f"http://{ip}:{http_port}"
self._agents[node_id] = JobAgentSubmissionClient(agent_http_address)

return self._agents[node_id]

async def _get_head_node_agent(self) -> Optional[JobAgentSubmissionClient]:
"""Retrieves HTTP client for `JobAgent` running on the Head node"""

head_node_id = await get_head_node_id(self.gcs_aio_client)
head_node_id_hex = await get_head_node_id(self.gcs_aio_client)

if not head_node_id:
if not head_node_id_hex:
logger.warning("Head node id has not yet been persisted in GCS")
return None

head_node_id = NodeID.from_hex(head_node_id_hex)

if head_node_id not in self._agents:
agent_infos = await self._fetch_agent_infos(target_node_ids=[head_node_id])
agent_infos = await self._fetch_agent_infos([head_node_id])
if head_node_id not in agent_infos:
logger.error("Head node agent's information was not found")
logger.error(
f"Head node agent's information was not found: {head_node_id} not in {agent_infos}"
)
return None

agent_info = agent_infos[head_node_id]

node_ip = agent_info["ipAddress"]
http_port = agent_info["httpPort"]
agent_http_address = f"http://{node_ip}:{http_port}"
ip, http_port, grpc_port = agent_infos[head_node_id]
agent_http_address = f"http://{ip}:{http_port}"

self._agents[head_node_id] = JobAgentSubmissionClient(agent_http_address)

return self._agents[head_node_id]

@staticmethod
async def _fetch_agent_infos(target_node_ids: Optional[List[str]] = None):
"""Fetches agent infos for nodes identified by provided node-ids (for all
nodes if not provided)

NOTE: This call will block until there's at least 1 valid agent info populated
async def _fetch_all_agent_infos(self) -> Dict[NodeID, Tuple[str, int, int]]:
"""
Fetches all agent infos for all nodes in the cluster.

If there's no agent available at all, or there's exception, it will retry every
`TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS` seconds indefinitely.

Returns: {node_id_hex: (ip, http_port, grpc_port)}
"""
while True:
raw_agent_infos = await DataOrganizer.get_agent_infos(target_node_ids)
# Filter out invalid agent infos with unset HTTP port
agent_infos = {
key: value
for key, value in raw_agent_infos.items()
if value.get("httpPort", -1) > 0
}
try:
keys = await self.gcs_aio_client.internal_kv_keys(
f"{DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX}".encode(),
namespace=KV_NAMESPACE_DASHBOARD,
timeout=GCS_RPC_TIMEOUT_SECONDS,
)
if not keys:
# No agent keys found, retry
raise Exception()
values: Dict[
bytes, bytes
] = await self.gcs_aio_client.internal_kv_multi_get(
keys,
namespace=KV_NAMESPACE_DASHBOARD,
timeout=GCS_RPC_TIMEOUT_SECONDS,
)
prefix_len = len(DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX)
return {
NodeID.from_hex(key[prefix_len:].decode()): json.loads(
value.decode()
)
for key, value in values.items()
}

except Exception:
logger.info(
f"Failed to fetch all agent infos, retrying in {TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS} seconds..."
)
await asyncio.sleep(TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS)

async def _fetch_agent_infos(
self, target_node_ids: List[NodeID]
) -> Dict[NodeID, Tuple[str, int, int]]:
"""
Fetches agent infos for nodes identified by provided node-ids.

If any of the node-ids is not found, it will retry every
`TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS` seconds indefinitely.

if len(agent_infos) > 0:
return agent_infos
Returns: {node_id_hex: (ip, http_port, grpc_port)}
"""

await asyncio.sleep(dashboard_consts.TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS)
while True:
try:
keys = [
f"{DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX}{node_id.hex()}"
for node_id in target_node_ids
]
values: Dict[
bytes, bytes
] = await self.gcs_aio_client.internal_kv_multi_get(
keys,
namespace=KV_NAMESPACE_DASHBOARD,
timeout=GCS_RPC_TIMEOUT_SECONDS,
)
if not values or len(values) != len(target_node_ids):
# Not all agent infos found, retry
raise Exception()
prefix_len = len(DASHBOARD_AGENT_ADDR_NODE_ID_PREFIX)
return {
NodeID.from_hex(key[prefix_len:].decode()): json.loads(
value.decode()
)
for key, value in values.items()
}
except Exception:
logger.info(
f"Failed to fetch agent infos for nodes {target_node_ids}, retrying in {TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS} seconds..."
)
await asyncio.sleep(TRY_TO_GET_AGENT_INFO_INTERVAL_SECONDS)

@routes.get("/api/version")
async def get_version(self, req: Request) -> Response:
Expand Down Expand Up @@ -337,7 +403,7 @@ async def submit_job(self, req: Request) -> Response:
try:
job_agent_client = await asyncio.wait_for(
self.get_target_agent(),
timeout=dashboard_consts.WAIT_AVAILABLE_AGENT_TIMEOUT,
timeout=WAIT_AVAILABLE_AGENT_TIMEOUT,
)
resp = await job_agent_client.submit_job_internal(submit_request)
except asyncio.TimeoutError:
Expand Down Expand Up @@ -384,7 +450,7 @@ async def stop_job(self, req: Request) -> Response:
try:
job_agent_client = await asyncio.wait_for(
self.get_target_agent(),
timeout=dashboard_consts.WAIT_AVAILABLE_AGENT_TIMEOUT,
timeout=WAIT_AVAILABLE_AGENT_TIMEOUT,
)
resp = await job_agent_client.stop_job_internal(job.submission_id)
except Exception:
Expand Down Expand Up @@ -419,7 +485,7 @@ async def delete_job(self, req: Request) -> Response:
try:
job_agent_client = await asyncio.wait_for(
self.get_target_agent(),
timeout=dashboard_consts.WAIT_AVAILABLE_AGENT_TIMEOUT,
timeout=WAIT_AVAILABLE_AGENT_TIMEOUT,
)
resp = await job_agent_client.delete_job_internal(job.submission_id)
except Exception:
Expand Down
Loading
Loading