diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py index 5225508fb..0480cef70 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py @@ -15,13 +15,16 @@ from typing import Any import ray -from ray.experimental.state.api import list_actors -from data_processing.utils import GB, UnrecoverableException +from data_processing.utils import GB, UnrecoverableException, get_logger from ray.actor import ActorHandle from ray.exceptions import RayError +from ray.experimental.state.api import list_actors from ray.util.actor_pool import ActorPool +MAX_LIST = 10000 # Max number of actors returned by list + + class RayUtils: """ Class implementing support methods for Ray execution @@ -109,16 +112,38 @@ def operator() -> ActorHandle: time.sleep(creation_delay) return clazz.options(**actor_options).remote(params) - cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','') + logger = get_logger(__name__) + cls_name = clazz.__class__.__name__.replace("ActorClass(", "").replace(")", "") + current = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=MAX_LIST) + c_len = len(current) actors = [operator() for _ in range(n_actors)] - for i in range(120): - time.sleep(1) - alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")]) - if len(actors) == len(alive): - return actors - # failed - raise an exception - print(f"created {actors}, alive {alive}") - raise UnrecoverableException(f"out of {len(actors)} created actors only {len(alive)} alive") + overall = c_len + n_actors + if overall < MAX_LIST: + n_list = min(overall + 10, MAX_LIST) + alive = [] + for i in range(120): + time.sleep(1) + alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")], limit=n_list) + if len(alive) >= n_actors + c_len: + return actors + # failed + if len(alive) >= n_actors / 2 + c_len: + # At least half of the actors were created + logger.info(f"created {n_actors}, alive {len(alive)} Running with less actors") + created_ids = [item.actor_id for item in alive if item not in current] + return [ + actor + for actor in actors + if (str(actor._ray_actor_id).replace("ActorID(", "").replace(")", "") in created_ids) + ] + else: + # too few actors created + raise UnrecoverableException(f"Created {n_actors}, alive {len(alive)}. Too few actors were created") + else: + raise UnrecoverableException( + f"Overall number of actors of class {cls_name} is {overall}. " + f"Too many actors to create, greater then {MAX_LIST}" + ) @staticmethod def process_files(