Skip to content

Commit

Permalink
[autoscaler] Optimize resource constraints proto to dedup requests wi…
Browse files Browse the repository at this point in the history
…th the same shape (#37863)

Signed-off-by: rickyyx <[email protected]>
  • Loading branch information
rickyyx authored Jul 27, 2023
1 parent d7dfdd8 commit 4978631
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 32 deletions.
3 changes: 2 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2332,12 +2332,13 @@ cdef class GcsClient:
def request_cluster_resource_constraint(
self,
bundles: c_vector[unordered_map[c_string, double]],
count_array: c_vector[int64_t],
timeout_s=None):
cdef:
int64_t timeout_ms = round(1000 * timeout_s) if timeout_s else -1
with nogil:
check_status(self.inner.get().RequestClusterResourceConstraint(
timeout_ms, bundles))
timeout_ms, bundles, count_array))

@_auto_reconnect
def get_cluster_status(
Expand Down
18 changes: 17 additions & 1 deletion python/ray/autoscaler/v2/sdk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from collections import defaultdict
from typing import List, Optional

import ray
Expand Down Expand Up @@ -43,7 +44,22 @@ def request_cluster_resources(
timeout: Timeout in seconds for the request to be timeout
"""
get_gcs_client().request_cluster_resource_constraint(to_request, timeout_s=timeout)

# Aggregate bundle by shape.
resource_requests_by_count = defaultdict(int)
for request in to_request:
bundle = frozenset(request.items())
resource_requests_by_count[bundle] += 1

bundles = []
counts = []
for bundle, count in resource_requests_by_count.items():
bundles.append(dict(bundle))
counts.append(count)

get_gcs_client().request_cluster_resource_constraint(
bundles, counts, timeout_s=timeout
)


def get_cluster_status(
Expand Down
36 changes: 28 additions & 8 deletions python/ray/autoscaler/v2/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_node_ids() -> Tuple[str, List[str]]:


def assert_cluster_resource_constraints(
state: ClusterResourceState, expected: List[dict]
state: ClusterResourceState, expected_bundles: List[dict], expected_count: List[int]
):
"""
Assert a GetClusterResourceStateReply has cluster_resource_constraints that
Expand All @@ -62,16 +62,26 @@ def assert_cluster_resource_constraints(
assert len(state.cluster_resource_constraints) == 1

min_bundles = state.cluster_resource_constraints[0].min_bundles
assert len(min_bundles) == len(expected)
assert len(min_bundles) == len(expected_bundles) == len(expected_count)

# Sort all the bundles by bundle's resource names
min_bundles = sorted(
min_bundles, key=lambda bundle: "".join(bundle.resources_bundle.keys())
min_bundles,
key=lambda bundle_by_count: "".join(
bundle_by_count.request.resources_bundle.keys()
),
)
expected = zip(expected_bundles, expected_count)
expected = sorted(
expected, key=lambda bundle_count: "".join(bundle_count[0].keys())
)
expected = sorted(expected, key=lambda bundle: "".join(bundle.keys()))

for actual_bundle, expected_bundle in zip(min_bundles, expected):
assert dict(actual_bundle.resources_bundle) == expected_bundle
for actual_bundle_count, expected_bundle_count in zip(min_bundles, expected):
assert (
dict(actual_bundle_count.request.resources_bundle)
== expected_bundle_count[0]
)
assert actual_bundle_count.count == expected_bundle_count[1]


@dataclass
Expand Down Expand Up @@ -240,7 +250,7 @@ def test_request_cluster_resources_basic(shutdown_only):

def verify():
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 1}])
assert_cluster_resource_constraints(state, [{"CPU": 1}], [1])
return True

wait_for_condition(verify)
Expand All @@ -250,7 +260,17 @@ def verify():

def verify():
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 2, "GPU": 1}, {"CPU": 1}])
assert_cluster_resource_constraints(
state, [{"CPU": 2, "GPU": 1}, {"CPU": 1}], [1, 1]
)
return True

# Request multiple is aggregated by shape.
request_cluster_resources([{"CPU": 1}] * 100)

def verify():
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 1}], [100])
return True

wait_for_condition(verify)
Expand Down
7 changes: 5 additions & 2 deletions python/ray/autoscaler/v2/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ def test_cluster_status_parser_cluster_resource_state():
{
"min_bundles": [
{
"resources_bundle": {"GPU": 2, "CPU": 100},
"placement_constraints": [],
"request": {
"resources_bundle": {"GPU": 2, "CPU": 100},
"placement_constraints": [],
},
"count": 1,
},
]
}
Expand Down
9 changes: 6 additions & 3 deletions python/ray/autoscaler/v2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,12 @@ def _parse_resource_demands(

for constraint_request in state.cluster_resource_constraints:
demand = ClusterConstraintDemand(
bundles_by_count=cls._aggregate_resource_requests_by_shape(
constraint_request.min_bundles
),
bundles_by_count=[
ResourceRequestByCount(
bundle=dict(r.request.resources_bundle.items()), count=r.count
)
for r in constraint_request.min_bundles
]
)
constraint_demand.append(demand)

Expand Down
3 changes: 2 additions & 1 deletion python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
int64_t timeout_ms, c_vector[CJobTableData]& result)
CRayStatus RequestClusterResourceConstraint(
int64_t timeout_ms,
const c_vector[unordered_map[c_string, double]] &bundles)
const c_vector[unordered_map[c_string, double]] &bundles,
const c_vector[int64_t] &count_array)
CRayStatus GetClusterStatus(
int64_t timeout_ms,
c_string &serialized_reply)
Expand Down
18 changes: 12 additions & 6 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,18 +399,24 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms,

Status PythonGcsClient::RequestClusterResourceConstraint(
int64_t timeout_ms,
const std::vector<std::unordered_map<std::string, double>> &bundles) {
const std::vector<std::unordered_map<std::string, double>> &bundles,
const std::vector<int64_t> &count_array) {
grpc::ClientContext context;
GrpcClientContextWithTimeoutMs(context, timeout_ms);

rpc::autoscaler::RequestClusterResourceConstraintRequest request;
rpc::autoscaler::RequestClusterResourceConstraintReply reply;
RAY_CHECK(bundles.size() == count_array.size());
for (size_t i = 0; i < bundles.size(); ++i) {
const auto &bundle = bundles[i];
auto count = count_array[i];

for (auto bundle : bundles) {
request.mutable_cluster_resource_constraint()
->add_min_bundles()
->mutable_resources_bundle()
->insert(bundle.begin(), bundle.end());
auto new_resource_requests_by_count =
request.mutable_cluster_resource_constraint()->add_min_bundles();

new_resource_requests_by_count->mutable_request()->mutable_resources_bundle()->insert(
bundle.begin(), bundle.end());
new_resource_requests_by_count->set_count(count);
}

grpc::Status status =
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_client/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ class RAY_EXPORT PythonGcsClient {
// For rpc::autoscaler::AutoscalerStateService
Status RequestClusterResourceConstraint(
int64_t timeout_ms,
const std::vector<std::unordered_map<std::string, double>> &bundles);
const std::vector<std::unordered_map<std::string, double>> &bundles,
const std::vector<int64_t> &count_array);
Status GetClusterStatus(int64_t timeout_ms, std::string &serialized_reply);

private:
Expand Down
10 changes: 5 additions & 5 deletions src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,22 +593,22 @@ TEST_F(GcsAutoscalerStateManagerTest, TestClusterResourcesConstraint) {
// Generate one constraint.
{
RequestClusterResourceConstraint(
Mocker::GenClusterResourcesConstraint({{{"CPU", 2}, {"GPU", 1}}}));
Mocker::GenClusterResourcesConstraint({{{"CPU", 2}, {"GPU", 1}}}, {1}));
const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.cluster_resource_constraints_size(), 1);
ASSERT_EQ(state.cluster_resource_constraints(0).min_bundles_size(), 1);
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0),
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0).request(),
{{"CPU", 2}, {"GPU", 1}});
}

// Override it
{
RequestClusterResourceConstraint(
Mocker::GenClusterResourcesConstraint({{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}}}));
RequestClusterResourceConstraint(Mocker::GenClusterResourcesConstraint(
{{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}}}, {1}));
const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.cluster_resource_constraints_size(), 1);
ASSERT_EQ(state.cluster_resource_constraints(0).min_bundles_size(), 1);
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0),
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0).request(),
{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}});
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/ray/gcs/test/gcs_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,17 @@ struct Mocker {
return placement_group_table_data;
}
static rpc::autoscaler::ClusterResourceConstraint GenClusterResourcesConstraint(
const std::vector<std::unordered_map<std::string, double>> &request_resources) {
const std::vector<std::unordered_map<std::string, double>> &request_resources,
const std::vector<int64_t> &count_array) {
rpc::autoscaler::ClusterResourceConstraint constraint;
for (const auto &resource : request_resources) {
RAY_CHECK(request_resources.size() == count_array.size());
for (size_t i = 0; i < request_resources.size(); i++) {
auto &resource = request_resources[i];
auto count = count_array[i];
auto bundle = constraint.add_min_bundles();
bundle->mutable_resources_bundle()->insert(resource.begin(), resource.end());
bundle->set_count(count);
bundle->mutable_request()->mutable_resources_bundle()->insert(resource.begin(),
resource.end());
}
return constraint;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/protobuf/experimental/autoscaler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ message GangResourceRequest {
message ClusterResourceConstraint {
// If not emtpy, the cluster should have the capacity (total resource) to fit
// the min_bundles.
repeated ResourceRequest min_bundles = 1;
repeated ResourceRequestByCount min_bundles = 1;
}

// Node status for a ray node.
Expand Down

0 comments on commit 4978631

Please sign in to comment.