-
Notifications
You must be signed in to change notification settings - Fork 5.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: dentiny <[email protected]>
- Loading branch information
Showing
3 changed files
with
203 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
"""Context for physial mode execution. | ||
""" | ||
|
||
from dataclasses import dataclass | ||
|
||
from typing import Optional | ||
|
||
|
||
@dataclass | ||
class PhysicalModeExecutionContext: | ||
"""Specs for physical mode execution.""" | ||
|
||
# Memory related spec. | ||
# UUID here is to indicate cgroup path, it's necessary to enable cgroup-based | ||
# memory control. | ||
memory_cgroup_uuid: str = None | ||
# Unit: bytes. Corresponds to cgroup V2 `memory.max`, which enforces hard cap on | ||
# max memory consumption. | ||
max_memory: Optional[int] = None | ||
# Unit: bytes. Corresponds to cgroup V2 `memory.min`, which enforces hard cap on | ||
# min memory reservation. | ||
min_memory: Optional[int] = None |
173 changes: 173 additions & 0 deletions
173
python/ray/_private/runtime_env/physical_mode_execution_utils.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
"""A few util functions for physical mode execution. | ||
NOTE: | ||
1. It should be only used under linux environment; for other platforms, all functions | ||
are no-op. | ||
2. It assumes cgroup v2 has been mounted. | ||
""" | ||
# TODO(hjiang): | ||
# 1. Consider whether we need to support V1. | ||
# 2. Check the existence for cgroup V2. | ||
# 3. Add error handling; for now just use boolean return to differentiate success or | ||
# failure. Better to log the error. | ||
|
||
from ray._private.runtime_env import physical_mode_context | ||
|
||
import sys | ||
import os | ||
import shutil | ||
|
||
# For tasks without limit memory specified, they will be added to the default memory | ||
# cgroup. | ||
DEFAULT_MEM_CGROUP_UUID = "default_cgroup_memory_uuid" | ||
|
||
|
||
def _setup_new_cgroup_for_memory( | ||
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext, | ||
) -> bool: | ||
"""Setup a new cgroup for memory consumption. Return whether memory constraint is | ||
applied successfully.""" | ||
if not physical_exec_ctx.memory_cgroup_uuid: | ||
return False | ||
|
||
if physical_exec_ctx.memory_cgroup_uuid == DEFAULT_MEM_CGROUP_UUID: | ||
return False | ||
|
||
cgroup_folder = f"/sys/fs/cgroup/{physical_exec_ctx.memory_cgroup_uuid}" | ||
os.makedirs(cgroup_folder, exist_ok=True) | ||
pid = os.getpid() | ||
|
||
with open(f"{cgroup_folder}/cgroup.procs", "w") as f: | ||
f.write(str(pid)) | ||
|
||
if physical_exec_ctx.min_memory: | ||
with open(f"{cgroup_folder}/memory.min", "w") as f: | ||
f.write(str(physical_exec_ctx.min_memory)) | ||
|
||
if physical_exec_ctx.max_memory: | ||
with open(f"{cgroup_folder}/memory.max", "w") as f: | ||
f.write(str(physical_exec_ctx.max_memory)) | ||
|
||
return True | ||
|
||
|
||
def _update_default_cgroup_for_memory_at_setup( | ||
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext, | ||
) -> bool: | ||
"""Add current process to the default cgroup, and update reserved memory if | ||
necessary.""" | ||
if physical_exec_ctx.memory_cgroup_uuid != DEFAULT_MEM_CGROUP_UUID: | ||
return False | ||
|
||
# Sanity check. | ||
assert physical_exec_ctx.max_memory is None | ||
|
||
cgroup_folder = f"/sys/fs/cgroup/{DEFAULT_MEM_CGROUP_UUID}" | ||
os.makedirs(cgroup_folder, exist_ok=True) | ||
pid = os.getpid() | ||
|
||
cgroup_procs_path = f"{cgroup_folder}/cgroup.procs" | ||
with open(cgroup_procs_path, "a") as f: | ||
f.write(f"{pid}\n") | ||
|
||
if not physical_exec_ctx.min_memory: | ||
return True | ||
|
||
cur_memory_min = 0 | ||
if os.path.exists(f"{cgroup_folder}/memory.min"): | ||
with open(f"{cgroup_folder}/memory.min", "r") as f: | ||
cur_memory_min = int(f.read().strip()) | ||
|
||
new_memory_min = cur_memory_min + physical_exec_ctx.min_memory | ||
with open(f"{cgroup_folder}/memory.min", "w") as f: | ||
f.write(str(new_memory_min)) | ||
|
||
return True | ||
|
||
|
||
def _setup_cgroup_for_memory( | ||
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext, | ||
) -> bool: | ||
"""Setup cgroup for memory consumption. Return whether memory constraint is applied | ||
successfully.""" | ||
# Create a new dedicated cgroup if max memory specified. | ||
if physical_exec_ctx.max_memory: | ||
return _setup_new_cgroup_for_memory(physical_exec_ctx) | ||
|
||
# Otherwise, place the process to the default cgroup. | ||
return _update_default_cgroup_for_memory_at_setup(physical_exec_ctx) | ||
|
||
|
||
def setup_cgroup_for_context( | ||
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext, | ||
) -> bool: | ||
"""Setup cgroup for the partcular execution context.""" | ||
if sys.platform == "win32": | ||
return True | ||
|
||
if not _setup_cgroup_for_memory(physical_exec_ctx): | ||
return False | ||
|
||
return True | ||
|
||
|
||
def _cleanup_dedicated_cgroup_for_memory( | ||
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext, | ||
) -> bool: | ||
"""Cleanup the dedicated cgroup.""" | ||
if physical_exec_ctx.memory_cgroup_uuid == DEFAULT_MEM_CGROUP_UUID: | ||
return False | ||
|
||
cgroup_folder = f"/sys/fs/cgroup/{physical_exec_ctx.memory_cgroup_uuid}" | ||
shutil.rmtree(cgroup_folder) | ||
|
||
return True | ||
|
||
|
||
def _cleanup_default_cgroup_for_memory( | ||
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext, | ||
) -> bool: | ||
"""Cleanup the default cgroup for the given context.""" | ||
|
||
if physical_exec_ctx.memory_cgroup_uuid != DEFAULT_MEM_CGROUP_UUID: | ||
return False | ||
|
||
cgroup_folder = f"/sys/fs/cgroup/{DEFAULT_MEM_CGROUP_UUID}" | ||
cur_memory_min = 0 | ||
if not os.path.exists(f"{cgroup_folder}/memory.min"): | ||
return False | ||
|
||
with open(f"{cgroup_folder}/memory.min", "r") as f: | ||
cur_memory_min = int(f.read().strip()) | ||
|
||
if cur_memory_min < physical_exec_ctx.min_memory: | ||
return False | ||
|
||
new_memory_min = cur_memory_min - physical_exec_ctx.min_memory | ||
with open(f"{cgroup_folder}/memory.min", "w") as f: | ||
f.write(str(new_memory_min)) | ||
|
||
return True | ||
|
||
|
||
def _cleanup_cgroup_for_memory( | ||
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext, | ||
) -> bool: | ||
"""Cleanup cgroup for memory consumption. Return whether cleanup opertion succeeds | ||
or not.""" | ||
if physical_exec_ctx.max_memory: | ||
return _cleanup_dedicated_cgroup_for_memory(physical_exec_ctx) | ||
|
||
return _cleanup_default_cgroup_for_memory(physical_exec_ctx) | ||
|
||
|
||
def cleanup_cgroup_for_context( | ||
physical_exec_ctx: physical_mode_context.PhysicalModeExecutionContext, | ||
) -> bool: | ||
"""Cleanup cgroup for the particular execution context.""" | ||
if sys.platform == "win32": | ||
return True | ||
|
||
if not _cleanup_cgroup_for_memory(physical_exec_ctx): | ||
return False | ||
|
||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters