From 3e54e42156ac411ffaef838b827c173e438b1b31 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Mon, 6 Nov 2023 18:20:48 +0000 Subject: [PATCH 1/7] add enable_in_tree_autoscaling Signed-off-by: Yicheng-Lu-llll --- .../flytekitplugins/ray/models.py | 22 +++++++++++++++---- .../flytekit-ray/flytekitplugins/ray/task.py | 2 ++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 080f1239b4..9de2fceff0 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -10,14 +10,14 @@ def __init__( self, group_name: str, replicas: int, - min_replicas: typing.Optional[int] = 0, + min_replicas: typing.Optional[int] = None, max_replicas: typing.Optional[int] = None, ray_start_params: typing.Optional[typing.Dict[str, str]] = None, ): self._group_name = group_name self._replicas = replicas - self._min_replicas = min_replicas - self._max_replicas = max_replicas if max_replicas else replicas + self._max_replicas = max(replicas, max_replicas) if max_replicas is not None else replicas + self._min_replicas = min(replicas, self._max_replicas) if min_replicas is not None else replicas self._ray_start_params = ray_start_params @property @@ -127,10 +127,14 @@ class RayCluster(_common.FlyteIdlEntity): """ def __init__( - self, worker_group_spec: typing.List[WorkerGroupSpec], head_group_spec: typing.Optional[HeadGroupSpec] = None + self, + worker_group_spec: typing.List[WorkerGroupSpec], + head_group_spec: typing.Optional[HeadGroupSpec] = None, + enable_in_tree_autoscaling: bool = False, ): self._head_group_spec = head_group_spec self._worker_group_spec = worker_group_spec + self._enable_in_tree_autoscaling = enable_in_tree_autoscaling @property def head_group_spec(self) -> HeadGroupSpec: @@ -148,6 +152,14 @@ def worker_group_spec(self) -> typing.List[WorkerGroupSpec]: """ return self._worker_group_spec + @property + def enable_in_tree_autoscaling(self) -> bool: + """ + Whether to enable in-tree autoscaling. + :rtype: bool + """ + return self._enable_in_tree_autoscaling + def to_flyte_idl(self) -> _ray_pb2.RayCluster: """ :rtype: flyteidl.plugins._ray_pb2.RayCluster @@ -155,6 +167,7 @@ def to_flyte_idl(self) -> _ray_pb2.RayCluster: return _ray_pb2.RayCluster( head_group_spec=self.head_group_spec.to_flyte_idl() if self.head_group_spec else None, worker_group_spec=[wg.to_flyte_idl() for wg in self.worker_group_spec], + enable_in_tree_autoscaling=self.enable_in_tree_autoscaling, ) @classmethod @@ -166,6 +179,7 @@ def from_flyte_idl(cls, proto): return cls( head_group_spec=HeadGroupSpec.from_flyte_idl(proto.head_group_spec) if proto.head_group_spec else None, worker_group_spec=[WorkerGroupSpec.from_flyte_idl(wg) for wg in proto.worker_group_spec], + enable_in_tree_autoscaling=proto.enable_in_tree_autoscaling, ) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 09e21966d7..139e5ef26c 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -32,6 +32,7 @@ class WorkerNodeConfig: class RayJobConfig: worker_node_config: typing.List[WorkerNodeConfig] head_node_config: typing.Optional[HeadNodeConfig] = None + enable_in_tree_autoscaling: bool = False runtime_env: typing.Optional[dict] = None address: typing.Optional[str] = None @@ -65,6 +66,7 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] WorkerGroupSpec(c.group_name, c.replicas, c.min_replicas, c.max_replicas, c.ray_start_params) for c in cfg.worker_node_config ], + enable_in_tree_autoscaling=cfg.enable_in_tree_autoscaling if cfg.enable_in_tree_autoscaling else False, ), # Use base64 to encode runtime_env dict and convert it to byte string runtime_env=base64.b64encode(json.dumps(cfg.runtime_env).encode()).decode(), From bcd6bdbced569eb7d0b1acd9ea18b7cb6a676263 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Mon, 6 Nov 2023 19:06:40 +0000 Subject: [PATCH 2/7] nit Signed-off-by: Yicheng-Lu-llll --- plugins/flytekit-ray/flytekitplugins/ray/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 9de2fceff0..6b3d2605f4 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -17,7 +17,7 @@ def __init__( self._group_name = group_name self._replicas = replicas self._max_replicas = max(replicas, max_replicas) if max_replicas is not None else replicas - self._min_replicas = min(replicas, self._max_replicas) if min_replicas is not None else replicas + self._min_replicas = min(replicas, self._min_replicas) if min_replicas is not None else replicas self._ray_start_params = ray_start_params @property From 9d9450152aa864d64265000643e61ed51768eda8 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Mon, 6 Nov 2023 19:19:05 +0000 Subject: [PATCH 3/7] nit Signed-off-by: Yicheng-Lu-llll --- plugins/flytekit-ray/flytekitplugins/ray/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index 6b3d2605f4..e8f3fa7612 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -17,7 +17,7 @@ def __init__( self._group_name = group_name self._replicas = replicas self._max_replicas = max(replicas, max_replicas) if max_replicas is not None else replicas - self._min_replicas = min(replicas, self._min_replicas) if min_replicas is not None else replicas + self._min_replicas = min(replicas, min_replicas) if min_replicas is not None else replicas self._ray_start_params = ray_start_params @property From 74c62874f41516c5a06bb2a6ca18fa04652670f6 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Fri, 22 Dec 2023 01:20:55 +0000 Subject: [PATCH 4/7] add ttl Signed-off-by: Yicheng-Lu-llll --- .../flytekitplugins/ray/models.py | 31 +++++++++++++++---- .../flytekit-ray/flytekitplugins/ray/task.py | 8 +++-- plugins/flytekit-ray/tests/test_ray.py | 6 ++-- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index e8f3fa7612..e8b0cb24c4 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -130,11 +130,11 @@ def __init__( self, worker_group_spec: typing.List[WorkerGroupSpec], head_group_spec: typing.Optional[HeadGroupSpec] = None, - enable_in_tree_autoscaling: bool = False, + enable_autoscaling: bool = False, ): self._head_group_spec = head_group_spec self._worker_group_spec = worker_group_spec - self._enable_in_tree_autoscaling = enable_in_tree_autoscaling + self._enable_autoscaling = enable_autoscaling @property def head_group_spec(self) -> HeadGroupSpec: @@ -153,12 +153,12 @@ def worker_group_spec(self) -> typing.List[WorkerGroupSpec]: return self._worker_group_spec @property - def enable_in_tree_autoscaling(self) -> bool: + def enable_autoscaling(self) -> bool: """ Whether to enable in-tree autoscaling. :rtype: bool """ - return self._enable_in_tree_autoscaling + return self._enable_autoscaling def to_flyte_idl(self) -> _ray_pb2.RayCluster: """ @@ -167,7 +167,7 @@ def to_flyte_idl(self) -> _ray_pb2.RayCluster: return _ray_pb2.RayCluster( head_group_spec=self.head_group_spec.to_flyte_idl() if self.head_group_spec else None, worker_group_spec=[wg.to_flyte_idl() for wg in self.worker_group_spec], - enable_in_tree_autoscaling=self.enable_in_tree_autoscaling, + enable_autoscaling=self.enable_autoscaling, ) @classmethod @@ -179,7 +179,7 @@ def from_flyte_idl(cls, proto): return cls( head_group_spec=HeadGroupSpec.from_flyte_idl(proto.head_group_spec) if proto.head_group_spec else None, worker_group_spec=[WorkerGroupSpec.from_flyte_idl(wg) for wg in proto.worker_group_spec], - enable_in_tree_autoscaling=proto.enable_in_tree_autoscaling, + enable_autoscaling=proto.enable_autoscaling, ) @@ -192,9 +192,14 @@ def __init__( self, ray_cluster: RayCluster, runtime_env: typing.Optional[str], + ttl_seconds_after_finished: typing.Optional[int] = None, + shutdown_after_job_finishes: bool = False, ): self._ray_cluster = ray_cluster self._runtime_env = runtime_env + self._ttl_seconds_after_finished = ttl_seconds_after_finished + self._shutdown_after_job_finishes = shutdown_after_job_finishes + @property def ray_cluster(self) -> RayCluster: @@ -203,11 +208,23 @@ def ray_cluster(self) -> RayCluster: @property def runtime_env(self) -> typing.Optional[str]: return self._runtime_env + + @property + def ttl_seconds_after_finished(self) -> typing.Optional[int]: + # ttl_seconds_after_finished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. + return self._ttl_seconds_after_finished + + @property + def shutdown_after_job_finishes(self) -> bool: + # shutdown_after_job_finishes specifies whether the RayCluster should be deleted after the RayJob finishes. + return self._shutdown_after_job_finishes def to_flyte_idl(self) -> _ray_pb2.RayJob: return _ray_pb2.RayJob( ray_cluster=self.ray_cluster.to_flyte_idl(), runtime_env=self.runtime_env, + ttl_seconds_after_finished=self.ttl_seconds_after_finished, + shutdown_after_job_finishes=self.shutdown_after_job_finishes, ) @classmethod @@ -215,4 +232,6 @@ def from_flyte_idl(cls, proto: _ray_pb2.RayJob): return cls( ray_cluster=RayCluster.from_flyte_idl(proto.ray_cluster) if proto.ray_cluster else None, runtime_env=proto.runtime_env, + ttl_seconds_after_finished=proto.ttl_seconds_after_finished, + shutdown_after_job_finishes=proto.shutdown_after_job_finishes, ) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/task.py b/plugins/flytekit-ray/flytekitplugins/ray/task.py index 139e5ef26c..f77604befc 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/task.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/task.py @@ -32,9 +32,11 @@ class WorkerNodeConfig: class RayJobConfig: worker_node_config: typing.List[WorkerNodeConfig] head_node_config: typing.Optional[HeadNodeConfig] = None - enable_in_tree_autoscaling: bool = False + enable_autoscaling: bool = False runtime_env: typing.Optional[dict] = None address: typing.Optional[str] = None + shutdown_after_job_finishes: bool = False + ttl_seconds_after_finished: typing.Optional[int] = None class RayFunctionTask(PythonFunctionTask): @@ -66,10 +68,12 @@ def get_custom(self, settings: SerializationSettings) -> Optional[Dict[str, Any] WorkerGroupSpec(c.group_name, c.replicas, c.min_replicas, c.max_replicas, c.ray_start_params) for c in cfg.worker_node_config ], - enable_in_tree_autoscaling=cfg.enable_in_tree_autoscaling if cfg.enable_in_tree_autoscaling else False, + enable_autoscaling=cfg.enable_autoscaling if cfg.enable_autoscaling else False, ), # Use base64 to encode runtime_env dict and convert it to byte string runtime_env=base64.b64encode(json.dumps(cfg.runtime_env).encode()).decode(), + ttl_seconds_after_finished=cfg.ttl_seconds_after_finished, + shutdown_after_job_finishes=cfg.shutdown_after_job_finishes, ) return MessageToDict(ray_job.to_flyte_idl()) diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index 8bcebf7937..0b25f867bb 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -10,8 +10,9 @@ from flytekit.configuration import Image, ImageConfig, SerializationSettings config = RayJobConfig( - worker_node_config=[WorkerNodeConfig(group_name="test_group", replicas=3)], + worker_node_config=[WorkerNodeConfig(group_name="test_group", replicas=3, min_replicas=0, max_replicas=10)], runtime_env={"pip": ["numpy"]}, + enable_autoscaling=True, ) @@ -37,8 +38,9 @@ def t1(a: int) -> str: ) ray_job_pb = RayJob( - ray_cluster=RayCluster(worker_group_spec=[WorkerGroupSpec("test_group", 3)]), + ray_cluster=RayCluster(worker_group_spec=[WorkerGroupSpec("test_group", 3, 0, 10)]), runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(), + enable_autoscaling=True, ).to_flyte_idl() assert t1.get_custom(settings) == MessageToDict(ray_job_pb) From bcad2b082bd75934f44bd7fc4ad0e024566752e1 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Fri, 22 Dec 2023 02:04:06 +0000 Subject: [PATCH 5/7] fix lint error Signed-off-by: Yicheng-Lu-llll --- plugins/flytekit-ray/flytekitplugins/ray/models.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/plugins/flytekit-ray/flytekitplugins/ray/models.py b/plugins/flytekit-ray/flytekitplugins/ray/models.py index e8b0cb24c4..06e36af186 100644 --- a/plugins/flytekit-ray/flytekitplugins/ray/models.py +++ b/plugins/flytekit-ray/flytekitplugins/ray/models.py @@ -155,7 +155,7 @@ def worker_group_spec(self) -> typing.List[WorkerGroupSpec]: @property def enable_autoscaling(self) -> bool: """ - Whether to enable in-tree autoscaling. + Whether to enable autoscaling. :rtype: bool """ return self._enable_autoscaling @@ -200,7 +200,6 @@ def __init__( self._ttl_seconds_after_finished = ttl_seconds_after_finished self._shutdown_after_job_finishes = shutdown_after_job_finishes - @property def ray_cluster(self) -> RayCluster: return self._ray_cluster @@ -208,12 +207,12 @@ def ray_cluster(self) -> RayCluster: @property def runtime_env(self) -> typing.Optional[str]: return self._runtime_env - + @property def ttl_seconds_after_finished(self) -> typing.Optional[int]: # ttl_seconds_after_finished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. return self._ttl_seconds_after_finished - + @property def shutdown_after_job_finishes(self) -> bool: # shutdown_after_job_finishes specifies whether the RayCluster should be deleted after the RayJob finishes. From 4b9cd08ff23f349159aa09cc29b472c9fe53f3b4 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Fri, 22 Dec 2023 02:53:41 +0000 Subject: [PATCH 6/7] add ttl test Signed-off-by: Yicheng-Lu-llll --- plugins/flytekit-ray/tests/test_ray.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index 0b25f867bb..ce5f819893 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -13,6 +13,8 @@ worker_node_config=[WorkerNodeConfig(group_name="test_group", replicas=3, min_replicas=0, max_replicas=10)], runtime_env={"pip": ["numpy"]}, enable_autoscaling=True, + shutdown_after_job_finishes=True, + ttl_seconds_after_finished=20, ) @@ -41,6 +43,8 @@ def t1(a: int) -> str: ray_cluster=RayCluster(worker_group_spec=[WorkerGroupSpec("test_group", 3, 0, 10)]), runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(), enable_autoscaling=True, + shutdown_after_job_finishes=True, + ttl_seconds_after_finished=20, ).to_flyte_idl() assert t1.get_custom(settings) == MessageToDict(ray_job_pb) From 150ab4dd3673c4512f5646953dadd94831b6d09c Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Sun, 18 Feb 2024 05:11:31 +0000 Subject: [PATCH 7/7] fix ray test Signed-off-by: Yicheng-Lu-llll --- plugins/flytekit-ray/tests/test_ray.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/flytekit-ray/tests/test_ray.py b/plugins/flytekit-ray/tests/test_ray.py index ce5f819893..0c0ada1944 100644 --- a/plugins/flytekit-ray/tests/test_ray.py +++ b/plugins/flytekit-ray/tests/test_ray.py @@ -40,9 +40,8 @@ def t1(a: int) -> str: ) ray_job_pb = RayJob( - ray_cluster=RayCluster(worker_group_spec=[WorkerGroupSpec("test_group", 3, 0, 10)]), + ray_cluster=RayCluster(worker_group_spec=[WorkerGroupSpec("test_group", 3, 0, 10)], enable_autoscaling=True), runtime_env=base64.b64encode(json.dumps({"pip": ["numpy"]}).encode()).decode(), - enable_autoscaling=True, shutdown_after_job_finishes=True, ttl_seconds_after_finished=20, ).to_flyte_idl()