Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ray Autoscaler to the Flyte-Ray plugin #1937

Merged
merged 10 commits into from
Mar 15, 2024

Conversation

Yicheng-Lu-llll
Copy link
Member

@Yicheng-Lu-llll Yicheng-Lu-llll commented Nov 6, 2023

TL;DR

NOTE:The ray CI test failed because we need to first merge flyteorg/flyte#4363 to update flytecli.

Currently, the Flyte-Ray plugin utilizes Rayjob. However, there are cases where Rayjob may require an autoscaler.

  1. After completing a workload with Rayjob, a user might want to retain all the information, logs, past tasks, and actor execution history for a period. As of now, Ray lacks a mechanism to persist these data, necessitating the continuous operation of the Ray cluster even after workload completion. With an autoscaler, the Ray cluster will maintain only the head pod while scaling down all worker pods.
  2. User does not need to pre estimate the need of the resource. Autoscaler will care everything.

So, this PR adds Ray Autoscaler config to the Flyte-Ray plugin. Also see flyteorg/flyte#4363.

btw, This PR adds shutdown_after_job_finishes and ttl_seconds_after_finished.

  • ttl_seconds_after_finished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
  • shutdown_after_job_finishes specifies whether the RayCluster should be deleted after the RayJob finishes.

Below is an example:

import typing
import ray
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
from flytekit import Resources, task, workflow

@ray.remote
def f(x):
    return x * x

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    # The behavior will be:
    # 1. Create a head node and 0 worker node.
    # 2. The worker node will be scaled to 2.
    # 3. The worker node will be scaled to 0 and will be terminated later(120s).
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=0, min_replicas=0, max_replicas=2)],
    enable_autoscaling=True,
    shutdown_after_job_finishes=True,
    # ttl_seconds_after_finished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
    ttl_seconds_after_finished=120,
)

@task(
    task_config=ray_config,
    requests=Resources(mem="1Gi", cpu="2"),
)
def ray_task() -> int:
    import time

    # Import placement group APIs.
    from ray.util.placement_group import (
        placement_group,
        placement_group_table,
        remove_placement_group,
    )
    from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

    # use the `placement_group` to trigger autoscaling.
    # each node have two cpus, so this will create 2 workers when executing.
    # Once the task is done, the two workers will be removed.
    pg = placement_group([{"CPU": 2} for i in range(3)])
    ray.get(pg.ready(), timeout=100)

    return 1

@workflow
def ray_workflow() -> int:
    return ray_task()

init(replicas=0), only head:

(flytekit) ubuntu@ip-172-31-2-249:~/flyte/flytekit$ kubectl get pod -n flytesnacks-development
NAME                                                    READY   STATUS    RESTARTS   AGE
f50cd4d1a5ceb40bc9aa-n0-0-raycluster-pbvx9-head-szdgn   2/2     Running   0          3s

task executing(max_replicas=2):

(flytekit) ubuntu@ip-172-31-2-249:~/flyte/flytekit$ kubectl get pod -n flytesnacks-development
NAME                                                      READY   STATUS    RESTARTS   AGE
f50cd4d1a5ceb40bc9aa-n0-0-raycluster-pbvx9-head-szdgn     2/2     Running   0          54s
ceb40bc9aa-n0-0-raycluster-pbvx9-worker-ray-group-llzp7   1/1     Running   0          14s
ceb40bc9aa-n0-0-raycluster-pbvx9-worker-ray-group-mqrb6   1/1     Running   0          14s

finsh(min_replicas=0):

(flytekit) ubuntu@ip-172-31-2-249:~/flyte/flytekit$ kubectl get pod -n flytesnacks-development
NAME                                                      READY   STATUS        RESTARTS   AGE
f50cd4d1a5ceb40bc9aa-n0-0-raycluster-pbvx9-head-szdgn     2/2     Running       0          2m16s
ceb40bc9aa-n0-0-raycluster-pbvx9-worker-ray-group-llzp7   1/1     Terminating   0          96s
ceb40bc9aa-n0-0-raycluster-pbvx9-worker-ray-group-mqrb6   1/1     Terminating   0          96s

After TTL 120s:

(flytekit) ubuntu@ip-172-31-2-249:~/flyte/flytekit$ kubectl get pod -n flytesnacks-development
No resources found in flytesnacks-development namespace.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

How did you fix the bug, make the feature etc. Link to any design docs etc

Tracking Issue

flyteorg/flyte#4187

Follow-up issue

NA
OR
https://github.com/flyteorg/flyte/issues/

Copy link

codecov bot commented Nov 6, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (7794d3a) 85.53% compared to head (150ab4d) 85.53%.

Additional details and impacted files
@@           Coverage Diff           @@
##           master    #1937   +/-   ##
=======================================
  Coverage   85.53%   85.53%           
=======================================
  Files         309      309           
  Lines       23460    23475   +15     
  Branches     3630     3630           
=======================================
+ Hits        20066    20079   +13     
- Misses       2752     2753    +1     
- Partials      642      643    +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Yicheng-Lu-llll <[email protected]>
@Yicheng-Lu-llll Yicheng-Lu-llll force-pushed the add-ray-autoscaler-config branch from 5de8cf7 to 3e54e42 Compare November 6, 2023 18:21
Signed-off-by: Yicheng-Lu-llll <[email protected]>
Signed-off-by: Yicheng-Lu-llll <[email protected]>
Copy link
Member

@pingsutw pingsutw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, could we add a small test here?

@@ -148,13 +152,22 @@ def worker_group_spec(self) -> typing.List[WorkerGroupSpec]:
"""
return self._worker_group_spec

@property
def enable_in_tree_autoscaling(self) -> bool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: why in_tree? Does Ray have other autoscaling strategies?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only have one autoscaling strategy. I have changed name to enable_autoscaling. Thank you!

Yicheng-Lu-llll and others added 4 commits December 22, 2023 01:20
@@ -178,9 +192,13 @@ def __init__(
self,
ray_cluster: RayCluster,
runtime_env: typing.Optional[str],
ttl_seconds_after_finished: typing.Optional[int] = None,
shutdown_after_job_finishes: bool = False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean that by default the rayjob will not be reclaimed by kuberay once the job finishes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -178,9 +192,13 @@ def __init__(
self,
ray_cluster: RayCluster,
runtime_env: typing.Optional[str],
ttl_seconds_after_finished: typing.Optional[int] = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a no-op if shutdown_after_job_finishes is set to False, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Comment on lines +15 to +17
enable_autoscaling=True,
shutdown_after_job_finishes=True,
ttl_seconds_after_finished=20,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a new release of flyteidl.

@dosubot dosubot bot added the size:M This PR changes 30-99 lines, ignoring generated files. label Feb 16, 2024
@Yicheng-Lu-llll Yicheng-Lu-llll force-pushed the add-ray-autoscaler-config branch from c03ad40 to 150ab4d Compare February 18, 2024 05:11
@dosubot dosubot bot added the lgtm This PR has been approved by maintainer label Mar 15, 2024
@pingsutw pingsutw merged commit 4208da2 into flyteorg:master Mar 15, 2024
80 of 81 checks passed
austin362667 pushed a commit to austin362667/flytekit that referenced this pull request Mar 16, 2024
fiedlerNr9 pushed a commit that referenced this pull request Jul 25, 2024
Signed-off-by: Yicheng-Lu-llll <[email protected]>
Signed-off-by: Jan Fiedler <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lgtm This PR has been approved by maintainer size:M This PR changes 30-99 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants