Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend permitted processes during device isolation to all child processes #113

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions optimum_benchmark/launchers/inline/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ def configure(self, config: InlineConfig) -> None:
super().configure(config)

def launch(self, worker: Callable, *worker_args):
with device_isolation(
enabled=self.config.device_isolation,
permitted_pids={os.getpid()},
):
with device_isolation(enabled=self.config.device_isolation, benchmark_pid=os.getpid()):
LOGGER.info("\t+ Launching inline experiment (no process isolation)")
worker(*worker_args)
125 changes: 66 additions & 59 deletions optimum_benchmark/launchers/isolation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,59 @@
from contextlib import contextmanager
from logging import getLogger
from multiprocessing import Process
from typing import Dict, Optional, Set
from typing import Dict, Set

import psutil
from omegaconf import OmegaConf

# from omegaconf import OmegaConf
from ..env_utils import is_nvidia_system, is_rocm_system
from ..import_utils import (
is_amdsmi_available,
is_py3nvml_available,
is_torch_distributed_available,
torch_version,
)

if is_torch_distributed_available():
from torch.distributed import FileStore

if is_py3nvml_available():
import py3nvml.py3nvml as nvml
import py3nvml.py3nvml as nvml # type: ignore

if is_amdsmi_available():
import amdsmi # type: ignore

LOGGER = getLogger("isolation")


def get_nvidia_device_pids() -> Dict[int, set]:
devices_pids: Dict[int, set] = {}
def get_nvidia_devices_pids() -> Dict[int, list]:
devices_pids: Dict[int, list] = {}
devices_ids = [int(device_id) for device_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",")]

if not is_py3nvml_available():
raise ValueError("get_nvidia_device_pids requires py3nvml. " "Please install it with `pip install py3nvml`.")
raise ValueError("get_nvidia_device_pids requires py3nvml. Please install it with `pip install py3nvml`.")

nvml.nvmlInit()

for device_id in devices_ids:
device_handle = nvml.nvmlDeviceGetHandleByIndex(device_id)
device_processes = nvml.nvmlDeviceGetComputeRunningProcesses(device_handle)
for device_process in device_processes:
if device_id not in devices_pids:
devices_pids[device_id] = []
else:
devices_pids[device_id].append(device_process.pid)

devices_pids[device_id].append(device_process.pid)

nvml.nvmlShutdown()

return devices_pids


def get_amd_device_pids() -> None:
def get_amd_devices_pids() -> Dict[int, list]:
devices_pids: Dict[int, list] = {}
rocm_version = torch_version().split("rocm")[-1]
devices_ids = [int(device_id) for device_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",")]

if not is_amdsmi_available():
raise ValueError(
"check_no_process_is_running_on_cuda_device requires amdsmi. "
"get_amd_devices_pids requires amdsmi. "
"Please follow the instructions at https://github.com/RadeonOpenCompute/amdsmi/tree/master"
)

Expand Down Expand Up @@ -89,21 +87,21 @@ def get_amd_device_pids() -> None:

if device_id not in devices_pids:
devices_pids[device_id] = []
else:
devices_pids[device_id].append(info["pid"])

devices_pids[device_id].append(info["pid"])
else:
devices_handles = amdsmi.amdsmi_get_device_handles()
for device_id in devices_ids:
device_handle = devices_handles[device_id]
try:
# these functions fail a lot for no apparent reason
# these functions might fail for no apparent reason
processes_handles = amdsmi.amdsmi_get_process_list(device_handle)
except Exception:
continue

for process_handle in processes_handles:
try:
# these functions fail a lot for no apparent reason
# these functions might fail for no apparent reason
info = amdsmi.amdsmi_get_process_info(device_handle, process_handle)
except Exception:
continue
Expand All @@ -113,8 +111,8 @@ def get_amd_device_pids() -> None:

if device_id not in devices_pids:
devices_pids[device_id] = []
else:
devices_pids[device_id].append(info["pid"])

devices_pids[device_id].append(info["pid"])

amdsmi.amdsmi_shut_down()

Expand All @@ -125,9 +123,9 @@ def get_pids_running_on_system_device() -> Set[int]:
"""Returns the set of pids running on the system device(s)."""

if is_nvidia_system():
devices_pids = get_nvidia_device_pids()
devices_pids = get_nvidia_devices_pids()
elif is_rocm_system():
devices_pids = get_amd_device_pids()
devices_pids = get_amd_devices_pids()
else:
raise ValueError("get_pids_running_on_system_device is only supported on NVIDIA and AMD GPUs")

Expand All @@ -136,58 +134,67 @@ def get_pids_running_on_system_device() -> Set[int]:
return all_devices_pids


def assert_system_device_isolation(permitted_pids: Set[int], world_size: Optional[int] = None) -> None:
def assert_system_devices_isolation(benchmark_pid: int) -> None:
isolation_pid = os.getpid()

hydra_conf = OmegaConf.load(".hydra/hydra.yaml")
logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True))

if os.getpid() not in permitted_pids:
permitted_pids.add(os.getpid())

if world_size is not None:
# add all pids in tcp store to the permitted pids
STORE = FileStore("torchrun_filestore")
perimitted_workers_names = [f"rank_{rank}" for rank in range(world_size)]
STORE.wait(perimitted_workers_names)
perimitted_workers_pids = {int(STORE.get(name)) for name in perimitted_workers_names}
permitted_pids.update(perimitted_workers_pids)
while psutil.pid_exists(benchmark_pid):
child_processes = set()
non_permitted_pids = set()

while True:
all_devices_pids = get_pids_running_on_system_device()
non_permitted_pids = all_devices_pids - permitted_pids

for pid in list(all_devices_pids):
if pid == benchmark_pid or pid == isolation_pid:
continue

try:
info = psutil.Process(pid)
parent_pid = info.ppid()
except Exception as e:
LOGGER.error(f"Failed to get info for process {pid} with error {e}")
parent_pid = None

if parent_pid == benchmark_pid or parent_pid == isolation_pid:
child_processes.add(pid)
else:
non_permitted_pids.add(pid)

if len(non_permitted_pids) > 0:
LOGGER.error(f"Found non-permitted process(es) running on system device(s): {non_permitted_pids}")
for pid in permitted_pids:
if pid == os.getpid():
continue
for pid in child_processes:
try:
LOGGER.error(f"Killing isolated process {pid}")
LOGGER.error(f"Terminating child process {pid}")
os.kill(pid, signal.SIGTERM)
except Exception as e:
LOGGER.error(f"Failed to kill isolated process {pid} with error {e}")
LOGGER.error(f"Failed to terminate child process {pid} with error {e}")

LOGGER.error(f"Terminating benchmark process {benchmark_pid}")
os.kill(benchmark_pid, signal.SIGTERM)
break

LOGGER.error("Exiting isolation process")
exit()
else:
time.sleep(1)
time.sleep(1)


@contextmanager
def device_isolation(enabled: bool, permitted_pids: Set[int], world_size: Optional[int] = None) -> None:
def device_isolation(enabled: bool, benchmark_pid: int) -> None:
if not enabled:
yield
else:
isolation_process = Process(
target=assert_system_device_isolation,
kwargs={"permitted_pids": permitted_pids, "world_size": world_size},
daemon=True,
)
isolation_process.start()
LOGGER.info(f"\t+ Launched device(s) isolation process {isolation_process.pid}.")

yield

LOGGER.info("\t+ Closing device(s) isolation process...")
isolation_process.kill()
isolation_process.join()
isolation_process.close()
return

isolation_process = Process(
target=assert_system_devices_isolation,
kwargs={"benchmark_pid": benchmark_pid},
daemon=True,
)
isolation_process.start()
LOGGER.info(f"\t+ Launched device(s) isolation process {isolation_process.pid}.")

yield

LOGGER.info("\t+ Closing device(s) isolation process...")
isolation_process.kill()
isolation_process.join()
isolation_process.close()
7 changes: 2 additions & 5 deletions optimum_benchmark/launchers/process/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,12 @@ def configure(self, config: ProcessConfig) -> None:
mp.set_start_method(self.config.start_method, force=True)

def launch(self, worker: Callable, *worker_args):
# worker process can't be daemon since it spawns its own processes
# worker process can't be daemon since it might spawn its own processes
worker_process = Process(target=target, args=(worker, *worker_args), daemon=False)
worker_process.start()
LOGGER.info(f"\t+ Launched worker process with PID {worker_process.pid}.")

with device_isolation(
enabled=self.config.device_isolation,
permitted_pids={os.getpid(), worker_process.pid},
):
with device_isolation(enabled=self.config.device_isolation, benchmark_pid=os.getpid()):
worker_process.join()

if worker_process.exitcode != 0:
Expand Down
7 changes: 2 additions & 5 deletions optimum_benchmark/launchers/torchrun/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,8 @@ def launch(self, worker: Callable, *worker_args):
log_dir=self.config.log_dir,
)

with device_isolation(
enabled=self.config.device_isolation,
permitted_pids={os.getpid()},
):
LOGGER.info(f"\t+ Launching torchrun/torchelastic agent with {self.config.nproc_per_node} processes")
with device_isolation(enabled=self.config.device_isolation, benchmark_pid=os.getpid()):
LOGGER.info(f"\t+ Launching torchrun agent with {self.config.nproc_per_node} workers processes")
launch_agent(
entrypoint=entrypoint,
args=(worker, *worker_args),
Expand Down
Loading