Skip to content

Commit

Permalink
return failure for failed runtime env deref
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny committed Jan 22, 2025
1 parent 89990f6 commit 9215e16
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions python/ray/_private/runtime_env/agent/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def _increase_reference_for_runtime_env(self, serialized_env: str):
self._runtime_env_reference[serialized_env] += 1

def _decrease_reference_for_runtime_env(self, serialized_env: str):
"""Decrease reference count for the given [serialized_env]. Return whether decrement succeeds or not."""
default_logger.debug(f"Decrease reference for runtime env {serialized_env}.")
unused = False
if self._runtime_env_reference[serialized_env] > 0:
Expand All @@ -126,10 +127,11 @@ def _decrease_reference_for_runtime_env(self, serialized_env: str):
del self._runtime_env_reference[serialized_env]
else:
default_logger.warning(f"Runtime env {serialized_env} does not exist.")
return False
if unused:
default_logger.info(f"Unused runtime env {serialized_env}.")
self._unused_runtime_env_callback(serialized_env)
return unused
return True

def increase_reference(
self, runtime_env: RuntimeEnv, serialized_env: str, source_process: str
Expand All @@ -142,12 +144,15 @@ def increase_reference(

def decrease_reference(
self, runtime_env: RuntimeEnv, serialized_env: str, source_process: str
) -> None:
) -> bool:
"""Decrease reference count for runtime env and uri. Return whether decrement succeeds or not."""
if source_process in self._reference_exclude_sources:
return list()
self._decrease_reference_for_runtime_env(serialized_env)
return True
if not self._decrease_reference_for_runtime_env(serialized_env):
return False
uris = self._uris_parser(runtime_env)
self._decrease_reference_for_uris(uris)
return True

@property
def runtime_env_refs(self) -> Dict[str, int]:
Expand Down Expand Up @@ -543,9 +548,14 @@ async def DeleteRuntimeEnvIfPossible(self, request):
),
)

self._reference_table.decrease_reference(
deref_succ = self._reference_table.decrease_reference(
runtime_env, request.serialized_runtime_env, request.source_process
)
if not deref_succ:
return runtime_env_agent_pb2.DeleteRuntimeEnvIfPossibleReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_FAILED,
error_message=f"Fails to dereference for runtime env {request.serialized_runtime_env}",
)

return runtime_env_agent_pb2.DeleteRuntimeEnvIfPossibleReply(
status=agent_manager_pb2.AGENT_RPC_STATUS_OK
Expand Down

0 comments on commit 9215e16

Please sign in to comment.