From 49786318a03fce4cefeea277b412b38c8254da77 Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Thu, 27 Jul 2023 14:13:39 -0700 Subject: [PATCH] [autoscaler] Optimize resource constraints proto to dedup requests with the same shape (#37863) Signed-off-by: rickyyx --- python/ray/_raylet.pyx | 3 +- python/ray/autoscaler/v2/sdk.py | 18 +++++++++- python/ray/autoscaler/v2/tests/test_sdk.py | 36 ++++++++++++++----- python/ray/autoscaler/v2/tests/test_utils.py | 7 ++-- python/ray/autoscaler/v2/utils.py | 9 +++-- python/ray/includes/common.pxd | 3 +- src/ray/gcs/gcs_client/gcs_client.cc | 18 ++++++---- src/ray/gcs/gcs_client/gcs_client.h | 3 +- .../test/gcs_autoscaler_state_manager_test.cc | 10 +++--- src/ray/gcs/test/gcs_test_util.h | 12 +++++-- .../protobuf/experimental/autoscaler.proto | 2 +- 11 files changed, 89 insertions(+), 32 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index dd29378f4eab..57b5f6292005 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -2332,12 +2332,13 @@ cdef class GcsClient: def request_cluster_resource_constraint( self, bundles: c_vector[unordered_map[c_string, double]], + count_array: c_vector[int64_t], timeout_s=None): cdef: int64_t timeout_ms = round(1000 * timeout_s) if timeout_s else -1 with nogil: check_status(self.inner.get().RequestClusterResourceConstraint( - timeout_ms, bundles)) + timeout_ms, bundles, count_array)) @_auto_reconnect def get_cluster_status( diff --git a/python/ray/autoscaler/v2/sdk.py b/python/ray/autoscaler/v2/sdk.py index 22304bef8aac..891f8a1ab303 100644 --- a/python/ray/autoscaler/v2/sdk.py +++ b/python/ray/autoscaler/v2/sdk.py @@ -1,4 +1,5 @@ import time +from collections import defaultdict from typing import List, Optional import ray @@ -43,7 +44,22 @@ def request_cluster_resources( timeout: Timeout in seconds for the request to be timeout """ - get_gcs_client().request_cluster_resource_constraint(to_request, timeout_s=timeout) + + # Aggregate bundle by shape. + resource_requests_by_count = defaultdict(int) + for request in to_request: + bundle = frozenset(request.items()) + resource_requests_by_count[bundle] += 1 + + bundles = [] + counts = [] + for bundle, count in resource_requests_by_count.items(): + bundles.append(dict(bundle)) + counts.append(count) + + get_gcs_client().request_cluster_resource_constraint( + bundles, counts, timeout_s=timeout + ) def get_cluster_status( diff --git a/python/ray/autoscaler/v2/tests/test_sdk.py b/python/ray/autoscaler/v2/tests/test_sdk.py index 1f6ec1e13e7b..a133c08e6577 100644 --- a/python/ray/autoscaler/v2/tests/test_sdk.py +++ b/python/ray/autoscaler/v2/tests/test_sdk.py @@ -52,7 +52,7 @@ def get_node_ids() -> Tuple[str, List[str]]: def assert_cluster_resource_constraints( - state: ClusterResourceState, expected: List[dict] + state: ClusterResourceState, expected_bundles: List[dict], expected_count: List[int] ): """ Assert a GetClusterResourceStateReply has cluster_resource_constraints that @@ -62,16 +62,26 @@ def assert_cluster_resource_constraints( assert len(state.cluster_resource_constraints) == 1 min_bundles = state.cluster_resource_constraints[0].min_bundles - assert len(min_bundles) == len(expected) + assert len(min_bundles) == len(expected_bundles) == len(expected_count) # Sort all the bundles by bundle's resource names min_bundles = sorted( - min_bundles, key=lambda bundle: "".join(bundle.resources_bundle.keys()) + min_bundles, + key=lambda bundle_by_count: "".join( + bundle_by_count.request.resources_bundle.keys() + ), + ) + expected = zip(expected_bundles, expected_count) + expected = sorted( + expected, key=lambda bundle_count: "".join(bundle_count[0].keys()) ) - expected = sorted(expected, key=lambda bundle: "".join(bundle.keys())) - for actual_bundle, expected_bundle in zip(min_bundles, expected): - assert dict(actual_bundle.resources_bundle) == expected_bundle + for actual_bundle_count, expected_bundle_count in zip(min_bundles, expected): + assert ( + dict(actual_bundle_count.request.resources_bundle) + == expected_bundle_count[0] + ) + assert actual_bundle_count.count == expected_bundle_count[1] @dataclass @@ -240,7 +250,7 @@ def test_request_cluster_resources_basic(shutdown_only): def verify(): state = get_cluster_resource_state(stub) - assert_cluster_resource_constraints(state, [{"CPU": 1}]) + assert_cluster_resource_constraints(state, [{"CPU": 1}], [1]) return True wait_for_condition(verify) @@ -250,7 +260,17 @@ def verify(): def verify(): state = get_cluster_resource_state(stub) - assert_cluster_resource_constraints(state, [{"CPU": 2, "GPU": 1}, {"CPU": 1}]) + assert_cluster_resource_constraints( + state, [{"CPU": 2, "GPU": 1}, {"CPU": 1}], [1, 1] + ) + return True + + # Request multiple is aggregated by shape. + request_cluster_resources([{"CPU": 1}] * 100) + + def verify(): + state = get_cluster_resource_state(stub) + assert_cluster_resource_constraints(state, [{"CPU": 1}], [100]) return True wait_for_condition(verify) diff --git a/python/ray/autoscaler/v2/tests/test_utils.py b/python/ray/autoscaler/v2/tests/test_utils.py index 51a1861a034d..823dfc39329c 100644 --- a/python/ray/autoscaler/v2/tests/test_utils.py +++ b/python/ray/autoscaler/v2/tests/test_utils.py @@ -148,8 +148,11 @@ def test_cluster_status_parser_cluster_resource_state(): { "min_bundles": [ { - "resources_bundle": {"GPU": 2, "CPU": 100}, - "placement_constraints": [], + "request": { + "resources_bundle": {"GPU": 2, "CPU": 100}, + "placement_constraints": [], + }, + "count": 1, }, ] } diff --git a/python/ray/autoscaler/v2/utils.py b/python/ray/autoscaler/v2/utils.py index 5a0003c1590a..abef66d554d8 100644 --- a/python/ray/autoscaler/v2/utils.py +++ b/python/ray/autoscaler/v2/utils.py @@ -291,9 +291,12 @@ def _parse_resource_demands( for constraint_request in state.cluster_resource_constraints: demand = ClusterConstraintDemand( - bundles_by_count=cls._aggregate_resource_requests_by_shape( - constraint_request.min_bundles - ), + bundles_by_count=[ + ResourceRequestByCount( + bundle=dict(r.request.resources_bundle.items()), count=r.count + ) + for r in constraint_request.min_bundles + ] ) constraint_demand.append(demand) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index fc2081d9bf6c..9ee17a220731 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -395,7 +395,8 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil: int64_t timeout_ms, c_vector[CJobTableData]& result) CRayStatus RequestClusterResourceConstraint( int64_t timeout_ms, - const c_vector[unordered_map[c_string, double]] &bundles) + const c_vector[unordered_map[c_string, double]] &bundles, + const c_vector[int64_t] &count_array) CRayStatus GetClusterStatus( int64_t timeout_ms, c_string &serialized_reply) diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 889dee2efa9f..250034c028d5 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -399,18 +399,24 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms, Status PythonGcsClient::RequestClusterResourceConstraint( int64_t timeout_ms, - const std::vector> &bundles) { + const std::vector> &bundles, + const std::vector &count_array) { grpc::ClientContext context; GrpcClientContextWithTimeoutMs(context, timeout_ms); rpc::autoscaler::RequestClusterResourceConstraintRequest request; rpc::autoscaler::RequestClusterResourceConstraintReply reply; + RAY_CHECK(bundles.size() == count_array.size()); + for (size_t i = 0; i < bundles.size(); ++i) { + const auto &bundle = bundles[i]; + auto count = count_array[i]; - for (auto bundle : bundles) { - request.mutable_cluster_resource_constraint() - ->add_min_bundles() - ->mutable_resources_bundle() - ->insert(bundle.begin(), bundle.end()); + auto new_resource_requests_by_count = + request.mutable_cluster_resource_constraint()->add_min_bundles(); + + new_resource_requests_by_count->mutable_request()->mutable_resources_bundle()->insert( + bundle.begin(), bundle.end()); + new_resource_requests_by_count->set_count(count); } grpc::Status status = diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index b0d4153fe8da..561472faeee1 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -228,7 +228,8 @@ class RAY_EXPORT PythonGcsClient { // For rpc::autoscaler::AutoscalerStateService Status RequestClusterResourceConstraint( int64_t timeout_ms, - const std::vector> &bundles); + const std::vector> &bundles, + const std::vector &count_array); Status GetClusterStatus(int64_t timeout_ms, std::string &serialized_reply); private: diff --git a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc index 74c66adb5664..07257c4da7b3 100644 --- a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc @@ -593,22 +593,22 @@ TEST_F(GcsAutoscalerStateManagerTest, TestClusterResourcesConstraint) { // Generate one constraint. { RequestClusterResourceConstraint( - Mocker::GenClusterResourcesConstraint({{{"CPU", 2}, {"GPU", 1}}})); + Mocker::GenClusterResourcesConstraint({{{"CPU", 2}, {"GPU", 1}}}, {1})); const auto &state = GetClusterResourceStateSync(); ASSERT_EQ(state.cluster_resource_constraints_size(), 1); ASSERT_EQ(state.cluster_resource_constraints(0).min_bundles_size(), 1); - CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0), + CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0).request(), {{"CPU", 2}, {"GPU", 1}}); } // Override it { - RequestClusterResourceConstraint( - Mocker::GenClusterResourcesConstraint({{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}}})); + RequestClusterResourceConstraint(Mocker::GenClusterResourcesConstraint( + {{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}}}, {1})); const auto &state = GetClusterResourceStateSync(); ASSERT_EQ(state.cluster_resource_constraints_size(), 1); ASSERT_EQ(state.cluster_resource_constraints(0).min_bundles_size(), 1); - CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0), + CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0).request(), {{"CPU", 4}, {"GPU", 5}, {"TPU", 1}}); } } diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index a1eb30b039f3..2945d66b9bbf 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -378,11 +378,17 @@ struct Mocker { return placement_group_table_data; } static rpc::autoscaler::ClusterResourceConstraint GenClusterResourcesConstraint( - const std::vector> &request_resources) { + const std::vector> &request_resources, + const std::vector &count_array) { rpc::autoscaler::ClusterResourceConstraint constraint; - for (const auto &resource : request_resources) { + RAY_CHECK(request_resources.size() == count_array.size()); + for (size_t i = 0; i < request_resources.size(); i++) { + auto &resource = request_resources[i]; + auto count = count_array[i]; auto bundle = constraint.add_min_bundles(); - bundle->mutable_resources_bundle()->insert(resource.begin(), resource.end()); + bundle->set_count(count); + bundle->mutable_request()->mutable_resources_bundle()->insert(resource.begin(), + resource.end()); } return constraint; } diff --git a/src/ray/protobuf/experimental/autoscaler.proto b/src/ray/protobuf/experimental/autoscaler.proto index b7ecd9b72034..9f8271628a76 100644 --- a/src/ray/protobuf/experimental/autoscaler.proto +++ b/src/ray/protobuf/experimental/autoscaler.proto @@ -80,7 +80,7 @@ message GangResourceRequest { message ClusterResourceConstraint { // If not emtpy, the cluster should have the capacity (total resource) to fit // the min_bundles. - repeated ResourceRequest min_bundles = 1; + repeated ResourceRequestByCount min_bundles = 1; } // Node status for a ray node.