Skip to content

Commit

Permalink
implement cgroup utils
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny committed Nov 18, 2024
1 parent 1576af1 commit 99bf4a6
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 0 deletions.
22 changes: 22 additions & 0 deletions python/ray/_private/runtime_env/physical_mode_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Context for physial mode execution.
"""

from dataclasses import dataclass

from typing import Optional


@dataclass
class PhysicalModeExecutionContext:
"""Specs for physical mode execution."""

# Memory related spec.
# UUID here is to indicate cgroup path, it's necessary to enable cgroup-based
# memory control.
memory_cgroup_uuid: str = None
# Unit: bytes. Corresponds to cgroup V2 `memory.max`, which enforces hard cap on
# max memory consumption.
max_memory: Optional[int] = None
# Unit: bytes. Corresponds to cgroup V2 `memory.min`, which enforces hard cap on
# min memory reservation.
min_memory: Optional[int] = None
177 changes: 177 additions & 0 deletions python/ray/_private/runtime_env/physical_mode_execution_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""A few util functions for physical mode execution.
NOTE:
1. It should be only used under linux environment; for other platforms, all functions
are no-op.
2. It assumes cgroup v2 has been mounted.
"""
# TODO(hjiang):
# 1. Consider whether we need to support V1.
# 2. Check the existence for cgroup V2.
# 3. Add error handling; for now just use boolean return to differentiate success or
# failure. Better to log the error.

from ray._private.runtime_env import physical_mode_context

import sys
import os
import shutil

# There're two types of memory cgroup constraints:
# 1. For those with limit memory capped, they will be created a dedicated cgroup;
# 2. For those without limit memory specified, they will be added to the default memory
# cgroup.
DEFAULT_MEM_CGROUP_UUID = "default_cgroup_memory_uuid"


def _setup_new_cgroup_for_memory(
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext,
) -> bool:
"""Setup a new cgroup for memory consumption. Return whether memory constraint is
applied successfully."""
if not physical_exec_ctx.memory_cgroup_uuid:
return False

if physical_exec_ctx.memory_cgroup_uuid == DEFAULT_MEM_CGROUP_UUID:
return False

cgroup_folder = f"/sys/fs/cgroup/{physical_exec_ctx.memory_cgroup_uuid}"
os.makedirs(cgroup_folder, exist_ok=True)
pid = os.getpid()

with open(f"{cgroup_folder}/cgroup.procs", "w") as f:
f.write(str(pid))

if physical_exec_ctx.min_memory:
with open(f"{cgroup_folder}/memory.min", "w") as f:
f.write(str(physical_exec_ctx.min_memory))

if physical_exec_ctx.max_memory:
with open(f"{cgroup_folder}/memory.max", "w") as f:
f.write(str(physical_exec_ctx.max_memory))

return True


def _update_default_cgroup_for_memory_at_setup(
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext,
) -> bool:
"""Add current process to the default cgroup, and update reserved memory if
necessary."""
if physical_exec_ctx.memory_cgroup_uuid != DEFAULT_MEM_CGROUP_UUID:
return False

# Sanity check.
assert physical_exec_ctx.max_memory is None

cgroup_folder = f"/sys/fs/cgroup/{DEFAULT_MEM_CGROUP_UUID}"
os.makedirs(cgroup_folder, exist_ok=True)
pid = os.getpid()

cgroup_procs_path = f"{cgroup_folder}/cgroup.procs"
with open(cgroup_procs_path, "a") as f:
f.write(f"{pid}\n")

if not physical_exec_ctx.min_memory:
return True

cur_memory_min = 0
if os.path.exists(f"{cgroup_folder}/memory.min"):
with open(f"{cgroup_folder}/memory.min", "r") as f:
cur_memory_min = int(f.read().strip())

new_memory_min = cur_memory_min + physical_exec_ctx.min_memory
with open(f"{cgroup_folder}/memory.min", "w") as f:
f.write(str(new_memory_min))

return True


def _setup_cgroup_for_memory(
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext,
) -> bool:
"""Setup cgroup for memory consumption. Return whether memory constraint is applied
successfully."""
# Create a new dedicated cgroup if max memory specified.
if physical_exec_ctx.max_memory:
return _setup_new_cgroup_for_memory(physical_exec_ctx)

# Otherwise, place the process to the default cgroup.
return _update_default_cgroup_for_memory_at_setup(physical_exec_ctx)


def setup_cgroup_for_context(
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext,
) -> bool:
"""Setup cgroup for the partcular execution context."""
if sys.platform == "win32":
return True

if not _setup_cgroup_for_memory(physical_exec_ctx):
return False

return True


def _cleanup_dedicated_cgroup_for_memory(
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext,
) -> bool:
"""Cleanup the dedicated cgroup."""
if physical_exec_ctx.memory_cgroup_uuid == DEFAULT_MEM_CGROUP_UUID:
return False

cgroup_folder = f"/sys/fs/cgroup/{physical_exec_ctx.memory_cgroup_uuid}"
shutil.rmtree(cgroup_folder)

return True


def _cleanup_default_cgroup_for_memory(
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext,
) -> bool:
"""Cleanup the default cgroup for the given context."""

if physical_exec_ctx.memory_cgroup_uuid != DEFAULT_MEM_CGROUP_UUID:
return False

cgroup_folder = f"/sys/fs/cgroup/{DEFAULT_MEM_CGROUP_UUID}"
cur_memory_min = 0
if not os.path.exists(f"{cgroup_folder}/memory.min"):
return False

with open(f"{cgroup_folder}/memory.min", "r") as f:
cur_memory_min = int(f.read().strip())

if cur_memory_min < physical_exec_ctx.min_memory:
return False

new_memory_min = cur_memory_min - physical_exec_ctx.min_memory
with open(f"{cgroup_folder}/memory.min", "w") as f:
f.write(str(new_memory_min))

return True


def _cleanup_cgroup_for_memory(
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext,
) -> bool:
"""Cleanup cgroup for memory consumption. Return whether cleanup opertion succeeds
or not."""
# Cleanup a new dedicated cgroup if max memory specified.
if physical_exec_ctx.max_memory:
return _cleanup_dedicated_cgroup_for_memory(physical_exec_ctx)

# Cleanup the default memory cgroup.
return _cleanup_default_cgroup_for_memory(physical_exec_ctx)


def cleanup_cgroup_for_context(
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext,
) -> bool:
"""Cleanup cgroup for the particular execution context."""
if sys.platform == "win32":
return True

if not _cleanup_cgroup_for_memory(physical_exec_ctx):
return False

return True
8 changes: 8 additions & 0 deletions src/ray/protobuf/runtime_env_agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ message GetOrCreateRuntimeEnvRequest {
/// For details, checkout "//src/ray/common/id.h".
bytes job_id = 3;
string source_process = 4;

/// A UUID used to indicate a unique context for physical mode execution in control of memory.
/// The UUID should be the same as [DeleteRuntimeEnvIfPossibleRequest].
string physical_mode_execution_context_memory_uuid = 5;
}

message GetOrCreateRuntimeEnvReply {
Expand All @@ -45,6 +49,10 @@ message DeleteRuntimeEnvIfPossibleRequest {
/// Json serialized [RuntimeEnv].
string serialized_runtime_env = 1;
string source_process = 2;

/// A UUID used to indicate a unique context for physical mode execution in control of memory.
/// The UUID should be the same as [GetOrCreateRuntimeEnvRequest].
string physical_mode_execution_context_memory_uuid = 3;
}

message DeleteRuntimeEnvIfPossibleReply {
Expand Down

0 comments on commit 99bf4a6

Please sign in to comment.