From a265d38c60f96e6e7720061e8ac4d62c33ffe753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=9A=E9=B1=BC?= Date: Thu, 1 Sep 2022 17:39:49 +0800 Subject: [PATCH 1/6] add actor-affinity-feature proposal --- reps/2022-08-31-actor-affinity-apis.md | 430 +++++++++++++++++++++++++ 1 file changed, 430 insertions(+) create mode 100644 reps/2022-08-31-actor-affinity-apis.md diff --git a/reps/2022-08-31-actor-affinity-apis.md b/reps/2022-08-31-actor-affinity-apis.md new file mode 100644 index 0000000..57fa574 --- /dev/null +++ b/reps/2022-08-31-actor-affinity-apis.md @@ -0,0 +1,430 @@ + +## Summary +### General Motivation + +Provides a set of lightweight actor affinity and anti-affinity scheduling interfaces. +Replacing the heavy PG interface to implement collocate or spread actors. + +* Affinity + * Co-locate the actors in the same node. + * Co-locate the actors in the same batch of nodes, like nodes in the same zones +* Anti-affinity + * Spread the actors of a service across nodes and/or availability zones, e.g. to reduce correlated failures. + * Give a actor "exclusive" access to a node to guarantee resource isolation + * Spread the actors of different services that will affect each other on different nodes. + + +### Should this change be within `ray` or outside? + +Yes, this will be a complement to ray core's ability to flexibly schedule actors/tasks. + +## Stewardship +### Required Reviewers + +@wumuzi520 SenlinZhu @WangTaoTheTonic @scv119 (Chen Shen) @jjyao (Jiajun Yao) +### Shepherd of the Proposal (should be a senior committer) + + +## Design and Architecture + +### Brief idea + +1. Tag the Actor with key-value labels first +2. Then affinity or anti-affinity scheduling to the actors of the specified labels. + +![ActorAffinityScheduling](https://user-images.githubusercontent.com/11072802/188054945-48d980ed-2973-46e7-bf46-d908ecf93b31.jpg) + +Actor affinity/anti-affinity schedule API Design +1. Scheduling Strategy adds an ActorAffinitySchedulingStrategy. +2. This strategy consists of several ActorAffinityMatchExpressions. +3. ActorAffinityMatchExpression has 4 match types which are IN/NOT_IN/EXISTS/DOES_NOT_EXIST + +Use Case | ActorAffinityOperator +-- | -- +Affinity | IN/EXISTS +Anti-Affinity | NOT_IN/DOES_NOT_EXIST + +This is to learn the Affinity & Anti-Affinity features of K8s. + +Advantage +1. Adding the Label mechanism makes affinity scheduling and anti-affinity scheduling particularly flexible. Actor scheduling of various topology types can be fully realized. +2. There can be many extensions to the actor's label mechanism, such as obtaining the actor based on the label. + + + +### API Design + +#### Python API +Python API Design: +Set key-value labels for actors +```python +class ActorClass: + def options(self, + ..... + labels: Dist[str, str] = None) +``` + +Actor affinity scheduling strategy API +```python +SchedulingStrategyT = Union[None, str, + PlacementGroupSchedulingStrategy, + NodeAffinitySchedulingStrategy, + ActorAffinitySchedulingStrategy] + +class ActorAffinitySchedulingStrategy: + def __init__(self, match_expressions: List[ActorAffinityMatchExpression]): + self.match_expressions = match_expressions + +class ActorAffinityMatchExpression: + """An expression used to represent an actor's affinity. + Attributes: + key: the key of label + operator: IN、NOT_IN、EXISTS、DOES_NOT_EXIST, + if EXISTS、DOES_NOT_EXIST, values set [] + values: a list of label value + soft: ... + """ + def __init__(self, key: str, operator: ActorAffinityOperator, + values: List[str], soft: bool): + self.key = key + self.operator = operator + self.values = values + self.soft = soft + +``` + +Python API example: +Step 1: Set the labels for this actor. +Each label consists of a key and value of type string. +```python +actor_1 = Actor.options(labels={ + "location": "dc_1" +}).remote() +``` + +Step 2: Set actor affinity strategy. +1. The target actor is expected to be scheduled with the actors whose label key is "location" and value in ["dc-1"]. +```python +match_expressions = [ + ActorAffinityMatchExpression("location", ActorAffinityOperator.IN, ["dc_1"], False) +] +actor_affinity_strategy = ActorAffinitySchedulingStrategy(match_expressions) +actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() +``` + +2. The target actor is not expected to be scheduled +with the actors whose label key is "location" and value in ["dc-1"]. +```python +match_expressions = [ + ActorAffinityMatchExpression("location", ActorAffinityOperator.NOT_IN, ["dc_1"], False) +] +actor_affinity_strategy = ActorAffinitySchedulingStrategy(match_expressions) +actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() +``` + +3. The target actor is expected to be scheduled with the actors whose label key exists "location". +```python +match_expressions = [ + ActorAffinityMatchExpression("location", ActorAffinityOperator.EXISTS, [], False) +] +actor_affinity_strategy = ActorAffinitySchedulingStrategy(match_expressions) +actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() +``` + +4. The target actor is not expected to be scheduled with the actors whose label key exists "location". +```python +match_expressions = [ + ActorAffinityMatchExpression("location", ActorAffinityOperator.DOES_NOT_EXIST, [], False) +] +actor_affinity_strategy = ActorAffinitySchedulingStrategy(match_expressions) +actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() +``` + +5. You can also set multiple expressions at the same time, and multiple expressions need to be satisfied when scheduling. +```python +match_expressions = [ + ActorAffinityMatchExpression("location", ActorAffinityOperator.DOES_NOT_EXIST, [], False), + ActorAffinityMatchExpression("version", ActorAffinityOperator.EXISTS, [], False) +] +actor_affinity_strategy = ActorAffinitySchedulingStrategy(match_expressions) +actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() +``` + +### Java API + +Set the labels for this actor API +```java + /** + * Set a key-value label. + * + * @param key the key of label. + * @param value the value of label. + * @return self + */ + public T setLabel(String key, String value) { + builder.setLabel(key, value); + return self(); + } + + /** + * Set batch key-value labels. + * + * @param labels A map that collects multiple labels. + * @return self + */ + public T setLabels(Map labels) { + builder.setLabels(labels); + return self(); + } +``` + +Actor affinity scheduling strategy API +```java +public class ActorAffinitySchedulingStrategy implements SchedulingStrategy { + private ActorAffinitySchedulingStrategy(List expressions) { +} + +public class ActorAffinityMatchExpression { + private String key; + private ActorAffinityOperator operator; + private List values; + private boolean isSoft; + + /** + * Returns an affinity expression to indicate that the target actor is expected to be scheduled + * with the actors whose label meets one of the composed key and values. eg: + * ActorAffinityMatchExpression.in("location", new ArrayList<>() {{ add("dc-1");}}, false). + * + * @param key The key of label. + * @param values A list of label values. + * @param isSoft If true, the actor will be scheduled even there's no matched actor. + * @return ActorAffinityMatchExpression. + */ + public static ActorAffinityMatchExpression in(String key, List values, boolean isSoft) { + return new ActorAffinityMatchExpression(key, ActorAffinityOperator.IN, values, isSoft); + } + + /** + * Returns an affinity expression to indicate that the target actor is not expected to be + * scheduled with the actors whose label meets one of the composed key and values. eg: + * ActorAffinityMatchExpression.notIn( "location", new ArrayList<>() {{ add("dc-1");}}, false). + * + * @param key The key of label. + * @param values A list of label values. + * @param isSoft If true, the actor will be scheduled even there's no matched actor. + * @return ActorAffinityMatchExpression. + */ + public static ActorAffinityMatchExpression notIn( + String key, List values, boolean isSoft) { + return new ActorAffinityMatchExpression(key, ActorAffinityOperator.NOT_IN, values, isSoft); + } + + /** + * Returns an affinity expression to indicate that the target actor is expected to be scheduled + * with the actors whose labels exists the specified key. eg: + * ActorAffinityMatchExpression.exists("location", false). + * + * @param key The key of label. + * @param isSoft If true, the actor will be scheduled even there's no matched actor. + * @return ActorAffinityMatchExpression. + */ + public static ActorAffinityMatchExpression exists(String key, boolean isSoft) { + return new ActorAffinityMatchExpression( + key, ActorAffinityOperator.EXISTS, new ArrayList(), isSoft); + } + + /** + * Returns an affinity expression to indicate that the target actor is not expected to be + * scheduled with the actors whose labels exists the specified key. eg: + * ActorAffinityMatchExpression.doesNotExist("location", false). + * + * @param key The key of label. + * @param isSoft If true, the actor will be scheduled even there's no matched actor. + * @return ActorAffinityMatchExpression. + */ + public static ActorAffinityMatchExpression doesNotExist(String key, boolean isSoft) { + return new ActorAffinityMatchExpression( + key, ActorAffinityOperator.DOES_NOT_EXIST, new ArrayList(), isSoft); + } + +} + +``` + +JAVA API example: +Step 1: Set labels for actors +```java +// Set a key-value label. +// This interface can be called multiple times. The value of the same key will be overwritten by the latest one. +ActorHandle actor = + Ray.actor(Counter::new, 1) + .setLabel("location", "dc_1") + .remote(); + +// Set batch key-value labels. +Map labels = + new HashMap() { + { + put("location", "dc-1"); + put("version", "1.0.0"); + } + }; +ActorHandle actor = + Ray.actor(Counter::new, 1) + .setLabels(labels) + .remote(); +``` +Step 2: Set actor affinity/anti-affinity strategy +1. Scheduling into node of actor which the value of label key "location" is in {"dc-1", "dc-2"} +```java +List locationValues = new ArrayList<>(); +locationValues.add("dc_1"); +locationValues.add("dc_2"); +ActorAffinitySchedulingStrategy schedulingStrategy = + new ActorAffinitySchedulingStrategy.Builder() + .addExpression(ActorAffinityMatchExpression.in("location", locationValues, false)) + .build(); +ActorHandle actor2 = + Ray.actor(Counter::new, 1).setSchedulingStrategy(schedulingStrategy).remote(); +``` + +2. Don't scheduling into node of actor which the value of label key "location" is in {"dc-1"} +```java +List values = new ArrayList<>(); +values.add("dc-1"); +ActorAffinitySchedulingStrategy schedulingStrategyNotIn = + new ActorAffinitySchedulingStrategy.Builder() + .addExpression(ActorAffinityMatchExpression.notIn("location", values, false)) + .build(); +ActorHandle actor3 = + Ray.actor(Counter::new, 1).setSchedulingStrategy(schedulingStrategyNotIn).remote(); +``` + +3. Scheduling into node of actor which exists the label key "version" +```java +ActorAffinitySchedulingStrategy schedulingStrategyExists = + new ActorAffinitySchedulingStrategy.Builder() + .addExpression(ActorAffinityMatchExpression.exists("version", false)) + .build(); +ActorHandle actor4 = + Ray.actor(Counter::new, 1).setSchedulingStrategy(schedulingStrategyExists).remote(); +Assert.assertEquals(actor4.task(Counter::getValue).remote().get(10000), Integer.valueOf(1)); +``` + +4. Don't scheduling into node of actor which exist the label key "version" +```java +ActorAffinitySchedulingStrategy schedulingStrategyDoesNotExist = + new ActorAffinitySchedulingStrategy.Builder() + .addExpression(ActorAffinityMatchExpression.doesNotExist("version", false)) + .build(); +ActorHandle actor5 = + Ray.actor(Counter::new, 1).setSchedulingStrategy(schedulingStrategyDoesNotExist).remote(); +``` + +5. You can also set multiple expressions at the same time, and multiple expressions need to be satisfied when scheduling. +```java +ActorAffinitySchedulingStrategy schedulingStrategy = + new ActorAffinitySchedulingStrategy.Builder() + .addExpression(ActorAffinityMatchExpression.doesNotExist("version", false)) + .addExpression(ActorAffinityMatchExpression.Exists("location", false)) + .build(); +ActorHandle actor6 = + Ray.actor(Counter::new, 1).setSchedulingStrategy(schedulingStrategy).remote(); +``` + +### Implementation plan +Now there are two modes of scheduling: GCS mode scheduling and raylet scheduling. +It will be simpler to implement in GCS mode. +#### GCS Scheduling Mode Implementation plan + +1. Actor adds the Labels property. Stored in the GcsActor structure +2. Gcs Server add GcsLabelManager. Add labels->node information to GcsLabelManager after per actor completes scheduling. +3. Added ActorAffinityPolicy, whose scheduling logic is to select the main scheduling node based on each matching expression and the labels->node information of GcsLabelManager + +The pseudo-code logic of the 4 matching operations is as follows: +Affinity Operator: IN +``` +feasible_nodes = FindFeasibleNodes() +for (expression : match_expressions) { + feasible_nodes = feasible_nodes ∩ GcsLabelManager->GetNodesByKeyAndValue(expression.key, expression.values) +} +``` + +Affinity Operator: EXISTS +``` +feasible_nodes = FindFeasibleNodes() +for (expression : match_expressions) { + feasible_nodes = feasible_nodes ∩ GcsLabelManager->GetNodesByKey(expression.key) +} +``` + +Affinity Operator: NOT_IN +``` +feasible_nodes = FindFeasibleNodes() +for (expression : match_expressions) { + feasible_nodes = feasible_nodes - GcsLabelManager->GetNodesByKeyAndValue(expression.key, expression. values) +} +``` + +Affinity Operator: DOES_NOT_EXIST +``` +feasible_nodes = FindFeasibleNodes() +for (expression : match_expressions) { + feasible_nodes = feasible_nodes - GcsLabelManager->GetNodesByKey(expression.key) +} +``` + +GcsLabelManger Desion + +Add 4 public interfaces for GcsLabelManger : +```c++ +// Add Actor's labels and node index relationship information. +void AddActorLabels(const std::shared_ptr &actor); + +// Remove Actor's labels and node index relationship information. +void RemoveActorLabels(const std::shared_ptr &actor); + +// Get the node where the actors those keys and values matched. +absl::flat_hash_set GetNodesByKeyAndValue(const std::string &ray_namespace, const std::string &key, const absl::flat_hash_set &values) const; + +// Get the node where the actors which exist this key. +absl::flat_hash_set GetNodesByKey(const std::string &ray_namespace, + const std::string &key) const; +``` + +Main data structure : +``` +Map>> label_to_nodes_ +Map> node_to_actors_ +``` +#### Raylet Scheduling Mode Implementation plan +The implementation of Raylet scheduling mode is same as GCS scheduling above. +Mainly, one more Labels information needs to be synchronized to all Raylet nodes + +### Failures and Special Scenarios +#### 1、If the Match Expression Cannot be satisfied +If the matching expression cannot be satisfied, The actor will be add to the pending actor queue. Util the matching expression all be statisfied。 + +Example: +If the next actor want to co-locate with the previous actor but the previous actor don't complete schedule. The next actor will be add to pending actor queue. It will be schedule complete After the previous actor complete schedule. + +#### 2、Failures Scenarios +If actor B collocates with actor A and The actor A & B both complete schedule to Node 1. +1、If actor A failed and restarted to Node 2, the actor B will not be affected, it still working in Node 1. +2、If actor A failed and restarted to Node 2, Then the actor B also failed, It will be scheduled to Node 2 and collocates with actor A. + +### how other system achieve the same goal? +1、K8s +This solution is to learn the PodAffinity/NodeAffinity features of K8s。 +https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity + +### what's the alternative to achieve the same goal? +1、Ray +Now ray placement group can achieve the same goal. But PG is too heavy and complicated to be user friendly +## Compatibility, Deprecation, and Migration Plan + +## Test Plan and Acceptance Criteria +All APIs will be fully unit tested. All specifications in this documentation will be thoroughly tested at the unit-test level. The end-to-end flow will be tested within CI tests. Before the beta release, we will add large-scale testing to precisely understand scalability limitations and performance degradation in large clusters. + +## (Optional) Follow-on Work + From 09312a67238cf57c8283a99f894466b0c1137e33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=9A=E9=B1=BC?= Date: Wed, 16 Nov 2022 17:36:45 +0800 Subject: [PATCH 2/6] --amend --- reps/2022-08-31-actor-affinity-apis.md | 118 ++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 3 deletions(-) diff --git a/reps/2022-08-31-actor-affinity-apis.md b/reps/2022-08-31-actor-affinity-apis.md index 57fa574..396f981 100644 --- a/reps/2022-08-31-actor-affinity-apis.md +++ b/reps/2022-08-31-actor-affinity-apis.md @@ -21,7 +21,7 @@ Yes, this will be a complement to ray core's ability to flexibly schedule actors ## Stewardship ### Required Reviewers -@wumuzi520 SenlinZhu @WangTaoTheTonic @scv119 (Chen Shen) @jjyao (Jiajun Yao) +@wumuzi520 SenlinZhu @Chong Li @scv119 (Chen Shen) @jjyao (Jiajun Yao) ### Shepherd of the Proposal (should be a senior committer) @@ -335,7 +335,7 @@ ActorHandle actor6 = ### Implementation plan Now there are two modes of scheduling: GCS mode scheduling and raylet scheduling. It will be simpler to implement in GCS mode. -#### GCS Scheduling Mode Implementation plan +#### 1. GCS Scheduling Mode Implementation plan 1. Actor adds the Labels property. Stored in the GcsActor structure 2. Gcs Server add GcsLabelManager. Add labels->node information to GcsLabelManager after per actor completes scheduling. @@ -397,10 +397,114 @@ Main data structure : Map>> label_to_nodes_ Map> node_to_actors_ ``` -#### Raylet Scheduling Mode Implementation plan +#### 2. Raylet Scheduling Mode Implementation plan The implementation of Raylet scheduling mode is same as GCS scheduling above. Mainly, one more Labels information needs to be synchronized to all Raylet nodes +1. Add the actor_labels data structure to the resource synchronization data structure(ResourcesData and NodeResources). +``` +message ResourcesData { + // Node id. + bytes node_id = 1; + // Resource capacity currently available on this node manager. + map resources_available = 2; + // Indicates whether available resources is changed. Only used when light + // heartbeat enabled. + bool resources_available_changed = 3; + + // Map> Actors scheduled to this node and actor labels information + repeat Map> actor_labels = 15 + // Whether the actors of this node is changed. + bool actor_labels_changed = 16, +} + + +NodeResources { + ResourceRequest total; + ResourceRequest available; + /// Only used by light resource report. + ResourceRequest load; + /// Resources owned by normal tasks. + ResourceRequest normal_task_resources + /// Actors scheduled to this node and actor labels information + absl::flat_hash_map> actor_labels; +} +``` + +2. Adapts where ResourcesData is constructed and used in the resource synchronization mechanism. +a. NodeManager::HandleRequestResourceReport +b. NodeManager::HandleUpdateResourceUsage + + +3. Add ActorLabels information to NodeResources during Actor scheduling + +a. When the Raylet is successfully scheduled, the ActorLabels information is added to the remote node scheduled in the ClusterResoucesManager. +``` +void ClusterTaskManager::ScheduleAndDispatchTasks() { + auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode( + ScheduleOnNode(node_id, work); + cluster_resource_scheduler_->AllocateRemoteTaskResources(node_id, resources) + cluster_resource_scheduler_->GetClusterResourceManager().AddActorLabels(node_id, actor); +``` +b. Add ActorLabels information to LocalResourcesManager when Actor is dispatched to Worker. +``` +LocalTaskManager::DispatchScheduledTasksToWorkers() + cluster_resource_scheduler_->GetLocalResourceManager().AllocateLocalTaskResources + cluster_resource_scheduler_->GetLocalResourceManager().AddActorLabels(actor) + worker_pool_.PopWorker() +``` + +c. When the Actor is destroyed, the ActorLabels information of the LocalResourcesManager is also deleted. +``` +NodeManager::HandleReturnWorker + local_task_manager_->ReleaseWorkerResources(worker); + local_resource_manager_->RemoveActorLabels(actor_id); +``` + +Actor scheduling flowchart: +![Actor scheduling flowchart](https://user-images.githubusercontent.com/11072802/202128385-f72609c5-308d-4210-84ff-bf3ba6df381c.png) + +Node Resources synchronization mechanism: +![Node Resources synchronization mechanism](https://user-images.githubusercontent.com/11072802/202128406-b4745e6e-3565-41a2-bfe3-78843379bf09.png) + +4. Scheduling optimization through ActorLabels +Now any node raylet has ActorLabels information for all nodes. +However, when ActorAffinity schedules, if it traverses the Labels of all Actors of each node, the algorithm complexity is very large, and the performance will be poor. + Therefore, it is necessary to generate a full-cluster ActorLabels index table to improve scheduling performance. + +``` +class GcsLabelManager { + public: + absl::flat_hash_set GetNodesByKeyAndValue(const std::string &ray_namespace, + const std::string &key, const absl::flat_hash_set &values) const; + + absl::flat_hash_set GetNodesByKey(const std::string &ray_namespace, + const std::string &key) const; + + void AddActorLabels(const std::shared_ptr &actor); + + void RemoveActorLabels(const std::shared_ptr &actor); + + private: + >> labels_to_nodes_; + > nodes_to_actors_; +} +``` + + VS. putting Labels into the custom resource solution +Advantages: +1. Compared with the scheme of putting Labels in the custom resource. This scheme can also reuse the resource synchronization mechanism. Then it won't destroy the concept of coustrom resouce. +2. The Label index table of all nodes can be constructed from the ActorLabels information of each node. If you use Custom Resource, you can't build. +3. If the Custom Resouces scheme is used, the accuracy of custom resouces scheduling will be affected. The current scheme is completely independent of existing scheduling policies and resources and will not affect them. The code is also more concise. + + +DisAdvantages: +1. The interface for resource reporting and updating needs to be adapted to ActorLabels in ResouceData. + +Issue +1. Because there must be a delay in resource synchronization under raylet scheduling. So if actor affinity is Soft semantics, there will be inaccurate scheduling. + + ### Failures and Special Scenarios #### 1、If the Match Expression Cannot be satisfied If the matching expression cannot be satisfied, The actor will be add to the pending actor queue. Util the matching expression all be statisfied。 @@ -428,3 +532,11 @@ All APIs will be fully unit tested. All specifications in this documentation wil ## (Optional) Follow-on Work +### Expression of "OR" semantics. +Later, if necessary, you can extend the semantics of "OR" by adding "is_or_semantics" to ActorAffinitySchedulingStrategy. +``` +class ActorAffinitySchedulingStrategy: + def __init__(self, match_expressions: List[ActorAffinityMatchExpression], is_or_semantics = false): + self.match_expressions = match_expressions + self.is_or_semantics = +``` \ No newline at end of file From acc5520e9fd2f9c064b440d96912ffde5b5f3193 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=9A=E9=B1=BC?= Date: Thu, 24 Nov 2022 20:39:53 +0800 Subject: [PATCH 3/6] add labels affinity feature --- reps/2022-08-31-actor-affinity-apis.md | 109 ++++--- ...mechanish-and-affinity-schedule-feature.md | 279 ++++++++++++++++++ 2 files changed, 330 insertions(+), 58 deletions(-) create mode 100644 reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md diff --git a/reps/2022-08-31-actor-affinity-apis.md b/reps/2022-08-31-actor-affinity-apis.md index 396f981..0ac057a 100644 --- a/reps/2022-08-31-actor-affinity-apis.md +++ b/reps/2022-08-31-actor-affinity-apis.md @@ -36,8 +36,8 @@ Yes, this will be a complement to ray core's ability to flexibly schedule actors Actor affinity/anti-affinity schedule API Design 1. Scheduling Strategy adds an ActorAffinitySchedulingStrategy. -2. This strategy consists of several ActorAffinityMatchExpressions. -3. ActorAffinityMatchExpression has 4 match types which are IN/NOT_IN/EXISTS/DOES_NOT_EXIST +2. This strategy consists of several LabelMatchExpressions. +3. LabelMatchExpression has 4 match types which are IN/NOT_IN/EXISTS/DOES_NOT_EXIST Use Case | ActorAffinityOperator -- | -- @@ -72,10 +72,10 @@ SchedulingStrategyT = Union[None, str, ActorAffinitySchedulingStrategy] class ActorAffinitySchedulingStrategy: - def __init__(self, match_expressions: List[ActorAffinityMatchExpression]): + def __init__(self, match_expressions: List[LabelMatchExpression]): self.match_expressions = match_expressions -class ActorAffinityMatchExpression: +class LabelMatchExpression: """An expression used to represent an actor's affinity. Attributes: key: the key of label @@ -84,7 +84,7 @@ class ActorAffinityMatchExpression: values: a list of label value soft: ... """ - def __init__(self, key: str, operator: ActorAffinityOperator, + def __init__(self, key: str, operator: LabelMatchOperator, values: List[str], soft: bool): self.key = key self.operator = operator @@ -106,7 +106,7 @@ Step 2: Set actor affinity strategy. 1. The target actor is expected to be scheduled with the actors whose label key is "location" and value in ["dc-1"]. ```python match_expressions = [ - ActorAffinityMatchExpression("location", ActorAffinityOperator.IN, ["dc_1"], False) + LabelMatchExpression("location", LabelMatchOperator.IN, ["dc_1"], False) ] actor_affinity_strategy = ActorAffinitySchedulingStrategy(match_expressions) actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() @@ -116,7 +116,7 @@ actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() with the actors whose label key is "location" and value in ["dc-1"]. ```python match_expressions = [ - ActorAffinityMatchExpression("location", ActorAffinityOperator.NOT_IN, ["dc_1"], False) + LabelMatchExpression("location", LabelMatchOperator.NOT_IN, ["dc_1"], False) ] actor_affinity_strategy = ActorAffinitySchedulingStrategy(match_expressions) actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() @@ -125,7 +125,7 @@ actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() 3. The target actor is expected to be scheduled with the actors whose label key exists "location". ```python match_expressions = [ - ActorAffinityMatchExpression("location", ActorAffinityOperator.EXISTS, [], False) + LabelMatchExpression("location", LabelMatchOperator.EXISTS, [], False) ] actor_affinity_strategy = ActorAffinitySchedulingStrategy(match_expressions) actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() @@ -134,7 +134,7 @@ actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() 4. The target actor is not expected to be scheduled with the actors whose label key exists "location". ```python match_expressions = [ - ActorAffinityMatchExpression("location", ActorAffinityOperator.DOES_NOT_EXIST, [], False) + LabelMatchExpression("location", ActorAffinityOperator.DOES_NOT_EXIST, [], False) ] actor_affinity_strategy = ActorAffinitySchedulingStrategy(match_expressions) actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() @@ -143,8 +143,8 @@ actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() 5. You can also set multiple expressions at the same time, and multiple expressions need to be satisfied when scheduling. ```python match_expressions = [ - ActorAffinityMatchExpression("location", ActorAffinityOperator.DOES_NOT_EXIST, [], False), - ActorAffinityMatchExpression("version", ActorAffinityOperator.EXISTS, [], False) + LabelMatchExpression("location", LabelMatchOperator.DOES_NOT_EXIST, [], False), + LabelMatchExpression("version", LabelMatchOperator.EXISTS, [], False) ] actor_affinity_strategy = ActorAffinitySchedulingStrategy(match_expressions) actor = Actor.options(scheduling_strategy = actor_affinity_strategy).remote() @@ -181,70 +181,70 @@ Set the labels for this actor API Actor affinity scheduling strategy API ```java public class ActorAffinitySchedulingStrategy implements SchedulingStrategy { - private ActorAffinitySchedulingStrategy(List expressions) { + private ActorAffinitySchedulingStrategy(List expressions) { } -public class ActorAffinityMatchExpression { +public class LabelMatchExpression { private String key; - private ActorAffinityOperator operator; + private LabelMatchOperator operator; private List values; private boolean isSoft; /** * Returns an affinity expression to indicate that the target actor is expected to be scheduled * with the actors whose label meets one of the composed key and values. eg: - * ActorAffinityMatchExpression.in("location", new ArrayList<>() {{ add("dc-1");}}, false). + * LabelMatchExpression.in("location", new ArrayList<>() {{ add("dc-1");}}, false). * * @param key The key of label. * @param values A list of label values. * @param isSoft If true, the actor will be scheduled even there's no matched actor. - * @return ActorAffinityMatchExpression. + * @return LabelMatchExpression. */ - public static ActorAffinityMatchExpression in(String key, List values, boolean isSoft) { - return new ActorAffinityMatchExpression(key, ActorAffinityOperator.IN, values, isSoft); + public static LabelMatchExpression in(String key, List values, boolean isSoft) { + return new LabelMatchExpression(key, LabelMatchOperator.IN, values, isSoft); } /** * Returns an affinity expression to indicate that the target actor is not expected to be * scheduled with the actors whose label meets one of the composed key and values. eg: - * ActorAffinityMatchExpression.notIn( "location", new ArrayList<>() {{ add("dc-1");}}, false). + * LabelMatchExpression.notIn( "location", new ArrayList<>() {{ add("dc-1");}}, false). * * @param key The key of label. * @param values A list of label values. * @param isSoft If true, the actor will be scheduled even there's no matched actor. - * @return ActorAffinityMatchExpression. + * @return LabelMatchExpression. */ - public static ActorAffinityMatchExpression notIn( + public static LabelMatchExpression notIn( String key, List values, boolean isSoft) { - return new ActorAffinityMatchExpression(key, ActorAffinityOperator.NOT_IN, values, isSoft); + return new LabelMatchExpression(key, LabelMatchOperator.NOT_IN, values, isSoft); } /** * Returns an affinity expression to indicate that the target actor is expected to be scheduled * with the actors whose labels exists the specified key. eg: - * ActorAffinityMatchExpression.exists("location", false). + * LabelMatchExpression.exists("location", false). * * @param key The key of label. * @param isSoft If true, the actor will be scheduled even there's no matched actor. - * @return ActorAffinityMatchExpression. + * @return LabelMatchExpression. */ - public static ActorAffinityMatchExpression exists(String key, boolean isSoft) { - return new ActorAffinityMatchExpression( - key, ActorAffinityOperator.EXISTS, new ArrayList(), isSoft); + public static LabelMatchExpression exists(String key, boolean isSoft) { + return new LabelMatchExpression( + key, LabelMatchOperator.EXISTS, new ArrayList(), isSoft); } /** * Returns an affinity expression to indicate that the target actor is not expected to be * scheduled with the actors whose labels exists the specified key. eg: - * ActorAffinityMatchExpression.doesNotExist("location", false). + * LabelMatchExpression.doesNotExist("location", false). * * @param key The key of label. * @param isSoft If true, the actor will be scheduled even there's no matched actor. - * @return ActorAffinityMatchExpression. + * @return LabelMatchExpression. */ - public static ActorAffinityMatchExpression doesNotExist(String key, boolean isSoft) { - return new ActorAffinityMatchExpression( - key, ActorAffinityOperator.DOES_NOT_EXIST, new ArrayList(), isSoft); + public static LabelMatchExpression doesNotExist(String key, boolean isSoft) { + return new LabelMatchExpression( + key, LabelMatchOperator.DOES_NOT_EXIST, new ArrayList(), isSoft); } } @@ -282,7 +282,7 @@ locationValues.add("dc_1"); locationValues.add("dc_2"); ActorAffinitySchedulingStrategy schedulingStrategy = new ActorAffinitySchedulingStrategy.Builder() - .addExpression(ActorAffinityMatchExpression.in("location", locationValues, false)) + .addExpression(LabelMatchExpression.in("location", locationValues, false)) .build(); ActorHandle actor2 = Ray.actor(Counter::new, 1).setSchedulingStrategy(schedulingStrategy).remote(); @@ -294,7 +294,7 @@ List values = new ArrayList<>(); values.add("dc-1"); ActorAffinitySchedulingStrategy schedulingStrategyNotIn = new ActorAffinitySchedulingStrategy.Builder() - .addExpression(ActorAffinityMatchExpression.notIn("location", values, false)) + .addExpression(LabelMatchExpression.notIn("location", values, false)) .build(); ActorHandle actor3 = Ray.actor(Counter::new, 1).setSchedulingStrategy(schedulingStrategyNotIn).remote(); @@ -304,7 +304,7 @@ ActorHandle actor3 = ```java ActorAffinitySchedulingStrategy schedulingStrategyExists = new ActorAffinitySchedulingStrategy.Builder() - .addExpression(ActorAffinityMatchExpression.exists("version", false)) + .addExpression(LabelMatchExpression.exists("version", false)) .build(); ActorHandle actor4 = Ray.actor(Counter::new, 1).setSchedulingStrategy(schedulingStrategyExists).remote(); @@ -315,7 +315,7 @@ Assert.assertEquals(actor4.task(Counter::getValue).remote().get(10000), Integer. ```java ActorAffinitySchedulingStrategy schedulingStrategyDoesNotExist = new ActorAffinitySchedulingStrategy.Builder() - .addExpression(ActorAffinityMatchExpression.doesNotExist("version", false)) + .addExpression(LabelMatchExpression.doesNotExist("version", false)) .build(); ActorHandle actor5 = Ray.actor(Counter::new, 1).setSchedulingStrategy(schedulingStrategyDoesNotExist).remote(); @@ -325,8 +325,8 @@ ActorHandle actor5 = ```java ActorAffinitySchedulingStrategy schedulingStrategy = new ActorAffinitySchedulingStrategy.Builder() - .addExpression(ActorAffinityMatchExpression.doesNotExist("version", false)) - .addExpression(ActorAffinityMatchExpression.Exists("location", false)) + .addExpression(LabelMatchExpression.doesNotExist("version", false)) + .addExpression(LabelMatchExpression.Exists("location", false)) .build(); ActorHandle actor6 = Ray.actor(Counter::new, 1).setSchedulingStrategy(schedulingStrategy).remote(); @@ -412,10 +412,10 @@ message ResourcesData { // heartbeat enabled. bool resources_available_changed = 3; - // Map> Actors scheduled to this node and actor labels information - repeat Map> actor_labels = 15 + // Map>> Actors/Tasks/Nodes labels information + repeat Map>> labels = 15 // Whether the actors of this node is changed. - bool actor_labels_changed = 16, + bool labels_changed = 16, } @@ -426,8 +426,8 @@ NodeResources { ResourceRequest load; /// Resources owned by normal tasks. ResourceRequest normal_task_resources - /// Actors scheduled to this node and actor labels information - absl::flat_hash_map> actor_labels; + /// Map>> Actors/Tasks/Nodes labels information + absl::flat_hash_map>> labels; } ``` @@ -465,12 +465,12 @@ Actor scheduling flowchart: ![Actor scheduling flowchart](https://user-images.githubusercontent.com/11072802/202128385-f72609c5-308d-4210-84ff-bf3ba6df381c.png) Node Resources synchronization mechanism: -![Node Resources synchronization mechanism](https://user-images.githubusercontent.com/11072802/202128406-b4745e6e-3565-41a2-bfe3-78843379bf09.png) +![Node Resources synchronization mechanism](https://user-images.githubusercontent.com/11072802/203783157-fad67f25-b046-49ac-b201-b54942073823.png) 4. Scheduling optimization through ActorLabels Now any node raylet has ActorLabels information for all nodes. However, when ActorAffinity schedules, if it traverses the Labels of all Actors of each node, the algorithm complexity is very large, and the performance will be poor. - Therefore, it is necessary to generate a full-cluster ActorLabels index table to improve scheduling performance. + Therefore, it is necessary to generate a full-cluster ActorLabels index table to improve scheduling performance. ``` class GcsLabelManager { @@ -491,18 +491,12 @@ class GcsLabelManager { } ``` - VS. putting Labels into the custom resource solution -Advantages: -1. Compared with the scheme of putting Labels in the custom resource. This scheme can also reuse the resource synchronization mechanism. Then it won't destroy the concept of coustrom resouce. -2. The Label index table of all nodes can be constructed from the ActorLabels information of each node. If you use Custom Resource, you can't build. -3. If the Custom Resouces scheme is used, the accuracy of custom resouces scheduling will be affected. The current scheme is completely independent of existing scheduling policies and resources and will not affect them. The code is also more concise. - +**Issue** +1. Because there must be a delay in resource synchronization under raylet scheduling. So if actor affinity is Soft semantics, there will be inaccurate scheduling. -DisAdvantages: -1. The interface for resource reporting and updating needs to be adapted to ActorLabels in ResouceData. + a. If the user selects the Soft strategy. That means that the user can accept the fact that the scheduling has a certain probability of being inaccurate. -Issue -1. Because there must be a delay in resource synchronization under raylet scheduling. So if actor affinity is Soft semantics, there will be inaccurate scheduling. + b. Most users schedule a batch of actors on the same node. In this case we can do exactly the right thing. ### Failures and Special Scenarios @@ -523,8 +517,7 @@ This solution is to learn the PodAffinity/NodeAffinity features of K8s。 https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity ### what's the alternative to achieve the same goal? -1、Ray -Now ray placement group can achieve the same goal. But PG is too heavy and complicated to be user friendly + ## Compatibility, Deprecation, and Migration Plan ## Test Plan and Acceptance Criteria @@ -536,7 +529,7 @@ All APIs will be fully unit tested. All specifications in this documentation wil Later, if necessary, you can extend the semantics of "OR" by adding "is_or_semantics" to ActorAffinitySchedulingStrategy. ``` class ActorAffinitySchedulingStrategy: - def __init__(self, match_expressions: List[ActorAffinityMatchExpression], is_or_semantics = false): + def __init__(self, match_expressions: List[LabelMatchExpression], is_or_semantics = false): self.match_expressions = match_expressions self.is_or_semantics = ``` \ No newline at end of file diff --git a/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md b/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md new file mode 100644 index 0000000..3b0e185 --- /dev/null +++ b/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md @@ -0,0 +1,279 @@ +## Summary +### General Motivation + +Introduce the Labels mechanism. Give Labels to Actors/Tasks/Nodes/Objects. +Affinity features such as ActorAffinity/TaskAffinity/NodeAffinity can be realized through Labels. + + +### Should this change be within `ray` or outside? + +Yes, this will be a complement to ray core's ability to flexibly schedule actors/tasks/node. + +## Stewardship +### Required Reviewers + +@wumuzi520 SenlinZhu @Chong Li @scv119 (Chen Shen) @jjyao (Jiajun Yao) @Yi Cheng +### Shepherd of the Proposal (should be a senior committer) + + +## Design and Architecture + +### Brief idea +1. Introduce the concept of Label. Add the Labels attribute to Actor/Task/Node. Labels = Map. +2. After Actor/Task are scheduled to a certain node, the Labels of Actor/Task will be attached to the node resource(Named: LabelsResource). Node's Labels are naturally in the node resource. +3. Actor/Task scheduling can choose Actor/Task/NodeAffinitySchedulingStratgy. +4. The actual principle of Actor/Task/NodeAffinitySchedulingStratgy is. According to the label matching expression in the strategy, traverse and search for LabelsResource of all nodes. Pick out nodes that meet the expression requirements. + +![LabelsAffinity](https://user-images.githubusercontent.com/11072802/203686866-385235b5-e08b-4aac-9c31-512621129bd4.png) + +### API Design +The apis of actors/tasks/nodes add labels. +```python +# Actor add labels. +actor_1 = Actor.options(labels={ + "location": "dc_1" +}).remote() + +# Task add labels. +task_1 = Task.options(labels={ + "location": "dc_1" +}).remote() + +# Node add static labels. +ray start ... --labels={"location": "dc_1"} +# The api of the dynamic update node labels is similar to the current dynamic set_resource. It can be determined later. +``` + +The apis of the actor-affinity/task-affinity/node-affinity scheduling. +```python +SchedulingStrategyT = Union[None, str, + PlacementGroupSchedulingStrategy, + ActorAffinitySchedulingStrategy, + TaskAffinitySchedulingStrategy, + NodeAffinitySchedulingStrategy] + +class ActorAffinitySchedulingStrategy: + def __init__(self, match_expressions: List[LabelMatchExpression]): + self.match_expressions = match_expressions + +class TaskAffinitySchedulingStrategy: + def __init__(self, match_expressions: List[LabelMatchExpression]): + self.match_expressions = match_expressions + +class NodeAffinitySchedulingStrategy: + def __init__(self, match_expressions: List[LabelMatchExpression]): + self.match_expressions = match_expressions + +class LabelMatchExpression: + """An expression used to select instance by instance's labels + Attributes: + key: the key of label + operator: IN、NOT_IN、EXISTS、DOES_NOT_EXIST, + if EXISTS、DOES_NOT_EXIST, values set [] + values: a list of label value + soft: ... + """ + def __init__(self, key: str, operator: LabelMatchOperator, + values: List[str], soft: bool): + self.key = key + self.operator = operator + self.values = values + self.soft = soft + +# ActorAffinity use case +actor_1 = Actor.options(scheduling_strategy=ActorAffinitySchedulingStrategy([ + LabelMatchExpression( + "location", LabelMatchOperator.IN, ["dc_1"], False) + ])).remote() +``` + + +### Implementation plan + +![LabelsAffinity](https://user-images.githubusercontent.com/11072802/203686851-894536d0-86fa-4fab-a5fe-2e656ac198e5.png) + +1. Add the actor_labels data structure to the resource synchronization data structure(ResourcesData and NodeResources). +``` +message ResourcesData { + // Node id. + bytes node_id = 1; + // Resource capacity currently available on this node manager. + map resources_available = 2; + // Indicates whether available resources is changed. Only used when light + // heartbeat enabled. + bool resources_available_changed = 3; + + // Map>> Actors/Tasks/Nodes labels information + repeat Map>> labels = 15 + // Whether the actors of this node is changed. + bool labels_changed = 16, +} + + +NodeResources { + ResourceRequest total; + ResourceRequest available; + /// Only used by light resource report. + ResourceRequest load; + /// Resources owned by normal tasks. + ResourceRequest normal_task_resources + /// Map>> Actors/Tasks/Nodes labels information + absl::flat_hash_map>> labels; +} +``` + +2. Adapts where ResourcesData is constructed and used in the resource synchronization mechanism. +a. NodeManager::HandleRequestResourceReport +b. NodeManager::HandleUpdateResourceUsage + + +3. Add Labels of actors/tasks to NodeResources during Actors/tasks scheduling + + +4. Scheduling optimization through Labels +Take ActorAffinity's scheduling optimization scheme as an example: +Now any node raylet has ActorLabels information for all nodes. +when ActorAffinity schedules, if it traverses the Labels of all Actors of each node, the algorithm complexity is very large, and the performance will be poor. +** Therefore, it is necessary to generate a full-cluster ActorLabels index table to improve scheduling performance. + +``` +class GcsLabelManager { + public: + absl::flat_hash_set GetNodesByKeyAndValue(const std::string &ray_namespace, + const std::string &key, const absl::flat_hash_set &values) const; + + absl::flat_hash_set GetNodesByKey(const std::string &ray_namespace, + const std::string &key) const; + + void AddActorLabels(const std::shared_ptr &actor); + + void RemoveActorLabels(const std::shared_ptr &actor); + + private: + >> labels_to_nodes_; + > nodes_to_actors_; +} +``` + +Actor scheduling flowchart: +![Actor scheduling flowchart](https://user-images.githubusercontent.com/11072802/202128385-f72609c5-308d-4210-84ff-bf3ba6df381c.png) + +Node Resources synchronization mechanism: +![Node Resources synchronization mechanism](https://user-images.githubusercontent.com/11072802/203783157-fad67f25-b046-49ac-b201-b54942073823.png) + +### how other system achieve the same goal? +1、K8s +This solution is to learn the PodAffinity/NodeAffinity features of K8s。 +https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity + +Scheduling Policy | Label Owner | Operator +-- | -- | -- +nodeAffinity | NODE | In, NotIn, Exists, DoesNotExist, Gt, Lt +podAffinity | POD | In, NotIn, Exists, DoesNotExist +podAntiAffinity | POD | In, NotIn, Exists, DoesNotExist + + +### what's the alternative to achieve the same goal? +**Option 2: Use LabelAffinitySchedulingStrategy instead of Actor/Task/NodeAffinitySchedulingStrategy** +Some people think that ActorAffinity/TaskAffinity is dispatched to the Node corresponding to the actor/Task with these labels. +Why not assign both ActorLabels and TaskLabels to Node? +Then the scheduling API only needs to use the LabelAffinitySchedulingStrategy set of APIs to instead of Actor/Task/NodeAffinitySchedulingStrategy. + +API Design: +``` +SchedulingStrategyT = Union[None, str, + PlacementGroupSchedulingStrategy, + LabelAffinitySchedulingStrategy] + +class LabelAffinitySchedulingStrategy: + def __init__(self, match_expressions: List[LabelMatchExpression]): + self.match_expressions = match_expressions + +class LabelMatchExpression: + """An expression used to select instance by instance's labels + Attributes: + key: the key of label + operator: IN、NOT_IN、EXISTS、DOES_NOT_EXIST, + if EXISTS、DOES_NOT_EXIST, values set [] + values: a list of label value + soft: ... + """ + def __init__(self, key: str, operator: LabelMatchOperator, + values: List[str], soft: bool): + self.key = key + self.operator = operator + self.values = values + self.soft = soft + + +actor_1 = Actor.options(scheduling_strategy=LabelAffinitySchedulingStrategy([ + LabelMatchExpression( + "location", LabelMatchOperator.IN, ["dc_1"], False) + ])).remote() +``` + +**Why didn't you choose this option?** +1. Actor and Task are isolated by namespace. But Node has no namespace isolation. As a result, Actor/Task labels cannot be directly integrated with Node labels. +2. Separately divided into Actor/Task/NodeAffinity. In this way, the interface semantics will be more simple and concise. Avoid conceptual confusion. +3. LabelAffinity combines the labels of Actor/Task/Node at the same time. The final scheduling result may not be accurate. Users only want to use ActorAffinity. But other Tasks have the same labels with actor labels. This scene will make scheduling result not be accurate. + +Advantages: +1. It is possible to realize the scene of affinity with Actor/Task/Node at the same time. +Example: +The user wants to affinity schedule to some Actors and nodes in a special computer room. +However, according to the results of internal user research, most of the requirements are just to realize Actor/Task "collocate" scheduling or spread scheduling. +So using a single ActorAffinity/TaskAffinity/NodeAffinity can already achieve practical effects. + +And the same effect can be achieved by combining the option 1 with custom resources + + +** Option 3: putting Labels into the custom resource solution ** +``` +class ResourceRequest { + absl::flat_hash_map resources_; +} + +Add Actor/Task/Node labels to resources_ as custom resources. +eg: +{ + "CPU": 16, + "memory": xx, + "custom_resources": xx, + "actor_labels_key@value": 1, + "task_labels_key@value": 1, + "node_labels_key@value": 1, +} +``` +If you put labels into custom_resources, you need to do the following adaptation: +1. Compared with custom_resources, labels need to add a specific prefix to distinguish them from custom_resources. +2. The key and value of Labels need to be concatenated with special characters (@). +3. When using Labels to build a Labels index table, you need to parse the resources key. + +**DisAdvantages:** +1. Labels unlike cpu resource these are numeric types. Compared with the above scheme. This will destroy the concept of coustrom resouce. +2. Actor and Task are isolated by namespace. It is difficult to isolate through namespace if adding custom_resource. +2. The Label index table of all nodes can be constructed from the ActorLabels information of each node. If you use Custom Resource, this requires parsing the resource_key and doing a lot of string splitting which will cost performance. +3. If custom_resource happens to be the same as the spliced string of labels. Then it will affect the correctness of scheduling. + + +**Advantages:** +1. The interface for resource reporting and updating don't modify. + +## Compatibility, Deprecation, and Migration Plan + +## Test Plan and Acceptance Criteria +All APIs will be fully unit tested. All specifications in this documentation will be thoroughly tested at the unit-test level. The end-to-end flow will be tested within CI tests. Before the beta release, we will add large-scale testing to precisely understand scalability limitations and performance degradation in large clusters. + +## (Optional) Follow-on Work + +### 1. Expression of "OR" semantics. +Later, if necessary, you can extend the semantics of "OR" by adding "is_or_semantics" to ActorAffinitySchedulingStrategy. +``` +class ActorAffinitySchedulingStrategy: + def __init__(self, match_expressions: List[LabelMatchExpression], is_or_semantics = false): + self.match_expressions = match_expressions + self.is_or_semantics = +``` + +### 2. ObjectAffinitySchedulingStrategy +If the user has a request, you can consider adding the attributes of labels to objects. Then the strategy of ObjectAffinity can be launched。 \ No newline at end of file From 090f8ac9e91294a8133392384e5edb8259fd6016 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=9A=E9=B1=BC?= <554538252@qq.com> Date: Mon, 12 Dec 2022 20:11:07 +0800 Subject: [PATCH 4/6] add api user demo --- ...mechanish-and-affinity-schedule-feature.md | 121 ++++++++++++++++-- 1 file changed, 112 insertions(+), 9 deletions(-) diff --git a/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md b/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md index 3b0e185..1927777 100644 --- a/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md +++ b/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md @@ -2,7 +2,7 @@ ### General Motivation Introduce the Labels mechanism. Give Labels to Actors/Tasks/Nodes/Objects. -Affinity features such as ActorAffinity/TaskAffinity/NodeAffinity can be realized through Labels. +Affinity features such as ActorAffinity/NodeAffinity can be realized through Labels. ### Should this change be within `ray` or outside? @@ -49,17 +49,12 @@ The apis of the actor-affinity/task-affinity/node-affinity scheduling. SchedulingStrategyT = Union[None, str, PlacementGroupSchedulingStrategy, ActorAffinitySchedulingStrategy, - TaskAffinitySchedulingStrategy, NodeAffinitySchedulingStrategy] class ActorAffinitySchedulingStrategy: def __init__(self, match_expressions: List[LabelMatchExpression]): self.match_expressions = match_expressions -class TaskAffinitySchedulingStrategy: - def __init__(self, match_expressions: List[LabelMatchExpression]): - self.match_expressions = match_expressions - class NodeAffinitySchedulingStrategy: def __init__(self, match_expressions: List[LabelMatchExpression]): self.match_expressions = match_expressions @@ -87,7 +82,112 @@ actor_1 = Actor.options(scheduling_strategy=ActorAffinitySchedulingStrategy([ ])).remote() ``` +### Example + +* Affinity + * Co-locate the actors in the same batch of nodes, like nodes in the same zones +* Anti-affinity + * Spread the actors of a service across nodes and/or availability zones, e.g. to reduce correlated failures. + +**1. Spread Demo** + +![spread demo](https://user-images.githubusercontent.com/11072802/207037933-8a9d9f1d-ee6e-472b-a877-669cef996db9.png) + +``` +@ray.remote +Class Cat: + pass + +cats = [] +for i in range(4): + cat = Actor.options( + labels = {"type": "cat"}, + scheduling_strategy=ActorAffinitySchedulingStrategy([ + LabelMatchExpression( + "type", LabelMatchOperator.NOT_IN, ["cat"], False) + ])).remote() + cats.apend(cat) +``` + +**2. Co-locate Demo** + +![co-locate demo](https://user-images.githubusercontent.com/11072802/207037951-df5bbf4a-442e-49e0-9561-39cfde45bf49.png) +``` +@ray.remote +Class Dog: + pass + +dogs = [] +# First schedule a dog to a random node. +dog_1 = Dog.options(labels={"type":"dog"}).remote() +dogs.apend(dog_1) + +# Then schedule the remaining dogs to the same node as the first dog. +for i in range(3): + dog = Actor.options(scheduling_strategy=ActorAffinitySchedulingStrategy([ + LabelMatchExpression( + "type", LabelMatchOperator.IN, ["dog"], False) + ])).remote() + dogs.apend(dog) +``` + +**2. Collocate and spread combination demo** + +![Collocate and spread combination demo](https://user-images.githubusercontent.com/11072802/207037895-125aab9d-d784-4a18-b777-1650c1d59226.png) + +``` +@ray.remote +Class Cat: + pass + +@ray.remote +Class Dog: + pass + +# First schedule cat to each node. +cats = [] +for i in range(4): + cat = Actor.options( + labels = { + "type": "cat", + "id": "cat-" + str(i) + }, + scheduling_strategy=ActorAffinitySchedulingStrategy([ + LabelMatchExpression( + "type", LabelMatchOperator.NOT_IN, ["cat"], False) + ])).remote() + cats.apend(cat) + +# Then each node schedules 3 dogs. +dogs = [] +for i in range(4): + node_dogs = [] + for i in range(3): + dog = Actor.options( + labels = { + "type": "dog", + }, + scheduling_strategy=ActorAffinitySchedulingStrategy([ + LabelMatchExpression( + "id", LabelMatchOperator.IN, ["cat-" + str(i)], False) + ])).remote() + node_dogs.apend(dog) + dogs.apend(node_dogs) +``` + +### Note +1. Actor/NodeAffinity can be used together with the CustomResource mechanism. +These two mechanisms are completely non-conflicting. +eg: +``` +actor_1 = Actor.options( + resources={"4c8g": 1}, + scheduling_strategy=ActorAffinitySchedulingStrategy([ + LabelMatchExpression( + "location", LabelMatchOperator.IN, ["dc_1"], False) + ])).remote() +``` ### Implementation plan ![LabelsAffinity](https://user-images.githubusercontent.com/11072802/203686851-894536d0-86fa-4fab-a5fe-2e656ac198e5.png) @@ -175,7 +275,7 @@ podAntiAffinity | POD | In, NotIn, Exists, DoesNotExist ### what's the alternative to achieve the same goal? **Option 2: Use LabelAffinitySchedulingStrategy instead of Actor/Task/NodeAffinitySchedulingStrategy** -Some people think that ActorAffinity/TaskAffinity is dispatched to the Node corresponding to the actor/Task with these labels. +Some people think that ActorAffinity is dispatched to the Node corresponding to the actor/Task with these labels. Why not assign both ActorLabels and TaskLabels to Node? Then the scheduling API only needs to use the LabelAffinitySchedulingStrategy set of APIs to instead of Actor/Task/NodeAffinitySchedulingStrategy. @@ -222,7 +322,7 @@ Advantages: Example: The user wants to affinity schedule to some Actors and nodes in a special computer room. However, according to the results of internal user research, most of the requirements are just to realize Actor/Task "collocate" scheduling or spread scheduling. -So using a single ActorAffinity/TaskAffinity/NodeAffinity can already achieve practical effects. +So using a single ActorAffinity/NodeAffinity can already achieve practical effects. And the same effect can be achieved by combining the option 1 with custom resources @@ -276,4 +376,7 @@ class ActorAffinitySchedulingStrategy: ``` ### 2. ObjectAffinitySchedulingStrategy -If the user has a request, you can consider adding the attributes of labels to objects. Then the strategy of ObjectAffinity can be launched。 \ No newline at end of file +If the user has a request, you can consider adding the attributes of labels to objects. Then the strategy of ObjectAffinity can be launched。 + +### 3. TaskAffinitySchedulingStrategy +Because the resource synchronization mechanism of Label has been implemented above. Therefore, it is easy to create a TaskAffinity strategy for Task. \ No newline at end of file From a83b9a109a293a12df55871dafa7ca0c28e29596 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=9A=E9=B1=BC?= <554538252@qq.com> Date: Wed, 21 Dec 2022 17:06:18 +0800 Subject: [PATCH 5/6] add multi api form --- reps/2022-08-31-actor-affinity-apis.md | 3 + ...mechanish-and-affinity-schedule-feature.md | 133 ++++++++++++++++-- 2 files changed, 122 insertions(+), 14 deletions(-) diff --git a/reps/2022-08-31-actor-affinity-apis.md b/reps/2022-08-31-actor-affinity-apis.md index 0ac057a..ae9c175 100644 --- a/reps/2022-08-31-actor-affinity-apis.md +++ b/reps/2022-08-31-actor-affinity-apis.md @@ -54,6 +54,9 @@ Advantage ### API Design +The API here is temporarily expressed in this form. +If the form of the API is determined in 2022-11-23-labels-mechanish-and-affinity-schedule-feature.md later, I will modify the API here. + #### Python API Python API Design: Set key-value labels for actors diff --git a/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md b/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md index 1927777..d6325e9 100644 --- a/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md +++ b/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md @@ -1,8 +1,9 @@ ## Summary ### General Motivation -Introduce the Labels mechanism. Give Labels to Actors/Tasks/Nodes/Objects. -Affinity features such as ActorAffinity/NodeAffinity can be realized through Labels. +Introduce labels mechanism, which associate an enumerated property to Ray nodes. We can static assign labels to Ray node on start up, or dynamically change node's label through Ray scheduling APIs. + +Labels mechanism makes it easy to implement Actor and Node affinity.(Taints and Tolerations style) ### Should this change be within `ray` or outside? @@ -19,32 +20,95 @@ Yes, this will be a complement to ray core's ability to flexibly schedule actors ## Design and Architecture ### Brief idea -1. Introduce the concept of Label. Add the Labels attribute to Actor/Task/Node. Labels = Map. -2. After Actor/Task are scheduled to a certain node, the Labels of Actor/Task will be attached to the node resource(Named: LabelsResource). Node's Labels are naturally in the node resource. -3. Actor/Task scheduling can choose Actor/Task/NodeAffinitySchedulingStratgy. -4. The actual principle of Actor/Task/NodeAffinitySchedulingStratgy is. According to the label matching expression in the strategy, traverse and search for LabelsResource of all nodes. Pick out nodes that meet the expression requirements. +1. Introduce the concept of Label. Add the Labels attribute to Actor/Node. Labels = Map. +2. After Actor are scheduled to a certain node, the Labels of Actor will be attached to the node resource(Named: LabelsResource). Node's Labels are naturally in the node resource. +3. Actor scheduling can choose Actor/NodeAffinitySchedulingStratgy. +4. The actual principle of Actor/NodeAffinitySchedulingStratgy is. According to the label matching expression in the strategy, traverse and search for LabelsResource of all nodes. Pick out nodes that meet the expression requirements. ![LabelsAffinity](https://user-images.githubusercontent.com/11072802/203686866-385235b5-e08b-4aac-9c31-512621129bd4.png) +Scheduling Policy | Label Owner | Label select operator +-- | -- | -- +ActorAffinity | Actor | in, not_in, exists, does_not_exist +NodeAffinity | Node | in, not_in, exists, does_not_exist + ### API Design -The apis of actors/tasks/nodes add labels. + +**The apis of actors/nodes add labels** + +This interface is already very simple, so we will not set up multiple solutions for everyone to discuss. ```python # Actor add labels. actor_1 = Actor.options(labels={ "location": "dc_1" }).remote() -# Task add labels. -task_1 = Task.options(labels={ - "location": "dc_1" -}).remote() # Node add static labels. ray start ... --labels={"location": "dc_1"} # The api of the dynamic update node labels is similar to the current dynamic set_resource. It can be determined later. ``` -The apis of the actor-affinity/task-affinity/node-affinity scheduling. +**The apis of the actor-affinity/node-affinity scheduling.** + +**Option 1: Simplify through syntactic sugar** + +```python +actor_1 = Actor.options( + scheduling_strategy=actor_affinity(label_in("location", ["dc_1"], false)) + ).remote() + +actor_1 = Actor.options( + scheduling_strategy=node_affinity(label_exist("location", false)) + ).remote() + +actor_1 = Actor.options( + scheduling_strategy=actor_affinity([ + label_in("location", ["dc_1"], false), + label_exists("location", false) + ]) + ).remote() + +def actor_affinity(...): + ... + return ActorAffinitySchedulingStrategy(...) + +def node_affinity(...): + ... + return NodeAffinitySchedulingStrategy(...) + +def label_in(key, values, is_soft): + ... + return LabelMatchExpression(...) + +def label_not_in(key, values, is_soft): + ... + return LabelMatchExpression(...) + +def label_exists(key, is_soft): + ... + return LabelMatchExpression(...) + +def label_does_not_exist(key, is_soft): + ... + return LabelMatchExpression(...) +``` + +**Option 2: another syntactic sugar** + +Personally, I think this Option is not as good as the above Option 1. +The label_in(key, values, is_soft) form of option 1 is more understandable and better than the form of ("location", LabelMatchOperator.IN, ["dc_1"], false). +```python +actor_1 = Actor.options( + scheduling_strategy=ActorAffinity([ + ("location", LabelMatchOperator.IN, ["dc_1"], false), + ("location", LabelMatchOperator.Exist)). + ).remote() +``` + +**Option 3: Java-like form** + +This form is similar to Java's syntax. The downside is that it's a bit complicated. ```python SchedulingStrategyT = Union[None, str, PlacementGroupSchedulingStrategy, @@ -75,11 +139,29 @@ class LabelMatchExpression: self.values = values self.soft = soft -# ActorAffinity use case actor_1 = Actor.options(scheduling_strategy=ActorAffinitySchedulingStrategy([ LabelMatchExpression( "location", LabelMatchOperator.IN, ["dc_1"], False) ])).remote() + +actor_1 = Actor.options(scheduling_strategy=ActorAffinitySchedulingStrategy([ + LabelMatchExpression( + "location", LabelMatchOperator.IN, ["dc_1"], False), + LabelMatchExpression( + "location", LabelMatchOperator.EXISTS, None, False) + ])).remote() +``` + +**Option 4: Like sql** + +This solution is not recommended. +This method needs to parse SQL, and the workload will be much larger. +And users often write wrong sql when using it. +```python +# ActorAffinity use case +actor_1 = Actor.options( + scheduling_strategy=ActorAffinity("location in [dc_1, dc_2]") + ).remote() ``` ### Example @@ -89,6 +171,9 @@ actor_1 = Actor.options(scheduling_strategy=ActorAffinitySchedulingStrategy([ * Anti-affinity * Spread the actors of a service across nodes and/or availability zones, e.g. to reduce correlated failures. +I will update the following API when the API plan is determined. +Now describe the example with an api like java form. + **1. Spread Demo** ![spread demo](https://user-images.githubusercontent.com/11072802/207037933-8a9d9f1d-ee6e-472b-a877-669cef996db9.png) @@ -379,4 +464,24 @@ class ActorAffinitySchedulingStrategy: If the user has a request, you can consider adding the attributes of labels to objects. Then the strategy of ObjectAffinity can be launched。 ### 3. TaskAffinitySchedulingStrategy -Because the resource synchronization mechanism of Label has been implemented above. Therefore, it is easy to create a TaskAffinity strategy for Task. \ No newline at end of file +Because the resource synchronization mechanism of Label has been implemented above. Therefore, it is easy to create a TaskAffinity strategy for Task. + +**Task add labels** +task_1 = Task.options(labels={ + "location": "dc_1" +}).remote() + +**Add TaskAffinitySchedulingStategy** +ref = Task.options( + scheduling_strategy = task_affinity(label_in("location", ["dc_1"], false) +).remote() + +### 4. Use Affinity scheduling as another dimension scheduling strategy +The Actor/NodeAffinity strategy can be independent of the SchedulingStrategy as a second-dimensional scheduling strategy. +Add a property of Affinity=Actor/NodeAffinity. eg: +``` +actor_1 = Actor.options( + scheduling_strategy="DEFAULT", + affinity=actor_affinity(label_in("location", ["dc_1"], false) + ).remote() +``` \ No newline at end of file From 09e0acd7c1c83f8e6703fbe7caaae636823dba14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=9A=E9=B1=BC?= <554538252@qq.com> Date: Thu, 12 Jan 2023 10:04:43 +0800 Subject: [PATCH 6/6] add actor_handle --- ...-mechanish-and-affinity-schedule-feature.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md b/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md index d6325e9..8e85eae 100644 --- a/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md +++ b/reps/2022-11-23-labels-mechanish-and-affinity-schedule-feature.md @@ -54,6 +54,16 @@ ray start ... --labels={"location": "dc_1"} **Option 1: Simplify through syntactic sugar** ```python + +actor_0 = Actor.remote() +actor_1 = Actor.options( + scheduling_strategy=actor_affinity(affinity(actor_0, false)) + ).remote() + +actor_2 = Actor.options( + scheduling_strategy=actor_affinity(anti_affinity(actor_0, false)) + ).remote() + actor_1 = Actor.options( scheduling_strategy=actor_affinity(label_in("location", ["dc_1"], false)) ).remote() @@ -92,6 +102,14 @@ def label_exists(key, is_soft): def label_does_not_exist(key, is_soft): ... return LabelMatchExpression(...) + +def affinity(actor_handles, is_soft): + ... + return LabelMatchExpression(...) + +def anti_affinity(actor_handles, is_soft): + ... + return LabelMatchExpression(...) ``` **Option 2: another syntactic sugar**