From c9ccc5fbd351c4ab5d5edb2c0cc252b79ea4c908 Mon Sep 17 00:00:00 2001 From: IlyasMoutawwakil Date: Wed, 17 Jan 2024 07:44:37 +0000 Subject: [PATCH 1/2] extended permitted processes during device isolation to all child processes --- .../launchers/inline/launcher.py | 5 +- .../launchers/isolation_utils.py | 104 +++++++++--------- .../launchers/process/launcher.py | 7 +- .../launchers/torchrun/launcher.py | 7 +- 4 files changed, 60 insertions(+), 63 deletions(-) diff --git a/optimum_benchmark/launchers/inline/launcher.py b/optimum_benchmark/launchers/inline/launcher.py index 7558645a..8aaba239 100644 --- a/optimum_benchmark/launchers/inline/launcher.py +++ b/optimum_benchmark/launchers/inline/launcher.py @@ -19,9 +19,6 @@ def configure(self, config: InlineConfig) -> None: super().configure(config) def launch(self, worker: Callable, *worker_args): - with device_isolation( - enabled=self.config.device_isolation, - permitted_pids={os.getpid()}, - ): + with device_isolation(enabled=self.config.device_isolation, benchmark_pid=os.getpid()): LOGGER.info("\t+ Launching inline experiment (no process isolation)") worker(*worker_args) diff --git a/optimum_benchmark/launchers/isolation_utils.py b/optimum_benchmark/launchers/isolation_utils.py index e6755297..65302fde 100644 --- a/optimum_benchmark/launchers/isolation_utils.py +++ b/optimum_benchmark/launchers/isolation_utils.py @@ -6,8 +6,9 @@ from contextlib import contextmanager from logging import getLogger from multiprocessing import Process -from typing import Dict, Optional, Set +from typing import Dict, Set +import psutil from omegaconf import OmegaConf # from omegaconf import OmegaConf @@ -15,15 +16,11 @@ from ..import_utils import ( is_amdsmi_available, is_py3nvml_available, - is_torch_distributed_available, torch_version, ) -if is_torch_distributed_available(): - from torch.distributed import FileStore - if is_py3nvml_available(): - import py3nvml.py3nvml as nvml + import py3nvml.py3nvml as nvml # type: ignore if is_amdsmi_available(): import amdsmi # type: ignore @@ -31,7 +28,7 @@ LOGGER = getLogger("isolation") -def get_nvidia_device_pids() -> Dict[int, set]: +def get_nvidia_devices_pids() -> Dict[int, set]: devices_pids: Dict[int, set] = {} devices_ids = [int(device_id) for device_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] @@ -53,7 +50,7 @@ def get_nvidia_device_pids() -> Dict[int, set]: return devices_pids -def get_amd_device_pids() -> None: +def get_amd_devices_pids() -> None: devices_pids: Dict[int, list] = {} rocm_version = torch_version().split("rocm")[-1] devices_ids = [int(device_id) for device_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] @@ -96,14 +93,14 @@ def get_amd_device_pids() -> None: for device_id in devices_ids: device_handle = devices_handles[device_id] try: - # these functions fail a lot for no apparent reason + # these functions might fail for no apparent reason processes_handles = amdsmi.amdsmi_get_process_list(device_handle) except Exception: continue for process_handle in processes_handles: try: - # these functions fail a lot for no apparent reason + # these functions might fail for no apparent reason info = amdsmi.amdsmi_get_process_info(device_handle, process_handle) except Exception: continue @@ -125,9 +122,9 @@ def get_pids_running_on_system_device() -> Set[int]: """Returns the set of pids running on the system device(s).""" if is_nvidia_system(): - devices_pids = get_nvidia_device_pids() + devices_pids = get_nvidia_devices_pids() elif is_rocm_system(): - devices_pids = get_amd_device_pids() + devices_pids = get_amd_devices_pids() else: raise ValueError("get_pids_running_on_system_device is only supported on NVIDIA and AMD GPUs") @@ -136,58 +133,67 @@ def get_pids_running_on_system_device() -> Set[int]: return all_devices_pids -def assert_system_device_isolation(permitted_pids: Set[int], world_size: Optional[int] = None) -> None: +def assert_system_devices_isolation(benchmark_pid: int) -> None: + isolation_pid = os.getpid() + hydra_conf = OmegaConf.load(".hydra/hydra.yaml") logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) - if os.getpid() not in permitted_pids: - permitted_pids.add(os.getpid()) - - if world_size is not None: - # add all pids in tcp store to the permitted pids - STORE = FileStore("torchrun_filestore") - perimitted_workers_names = [f"rank_{rank}" for rank in range(world_size)] - STORE.wait(perimitted_workers_names) - perimitted_workers_pids = {int(STORE.get(name)) for name in perimitted_workers_names} - permitted_pids.update(perimitted_workers_pids) + while psutil.pid_exists(benchmark_pid): + permitted_pids = set() + non_permitted_pids = set() - while True: all_devices_pids = get_pids_running_on_system_device() - non_permitted_pids = all_devices_pids - permitted_pids + + for pid in list(all_devices_pids): + if pid == benchmark_pid or pid == isolation_pid: + continue + + try: + info = psutil.Process(pid) + prent_pid = info.ppid() + except Exception as e: + LOGGER.error(f"Failed to get info for process {pid} with error {e}") + continue + + if prent_pid == benchmark_pid or prent_pid == isolation_pid: + permitted_pids.add(pid) + else: + non_permitted_pids.add(pid) if len(non_permitted_pids) > 0: LOGGER.error(f"Found non-permitted process(es) running on system device(s): {non_permitted_pids}") for pid in permitted_pids: - if pid == os.getpid(): - continue try: - LOGGER.error(f"Killing isolated process {pid}") + LOGGER.error(f"Terminating child process {pid}") os.kill(pid, signal.SIGTERM) except Exception as e: - LOGGER.error(f"Failed to kill isolated process {pid} with error {e}") + LOGGER.error(f"Failed to terminate child process {pid} with error {e}") - LOGGER.error("Exiting isolation process") - exit() - else: - time.sleep(1) + LOGGER.error(f"Terminating benchmark process {benchmark_pid}") + os.kill(benchmark_pid, signal.SIGTERM) + break + + time.sleep(1) @contextmanager -def device_isolation(enabled: bool, permitted_pids: Set[int], world_size: Optional[int] = None) -> None: +def device_isolation(enabled: bool, benchmark_pid: int) -> None: if not enabled: yield - else: - isolation_process = Process( - target=assert_system_device_isolation, - kwargs={"permitted_pids": permitted_pids, "world_size": world_size}, - daemon=True, - ) - isolation_process.start() - LOGGER.info(f"\t+ Launched device(s) isolation process {isolation_process.pid}.") - - yield - - LOGGER.info("\t+ Closing device(s) isolation process...") - isolation_process.kill() - isolation_process.join() - isolation_process.close() + return + + isolation_process = Process( + target=assert_system_devices_isolation, + kwargs={"benchmark_pid": benchmark_pid}, + daemon=True, + ) + isolation_process.start() + LOGGER.info(f"\t+ Launched device(s) isolation process {isolation_process.pid}.") + + yield + + LOGGER.info("\t+ Closing device(s) isolation process...") + isolation_process.kill() + isolation_process.join() + isolation_process.close() diff --git a/optimum_benchmark/launchers/process/launcher.py b/optimum_benchmark/launchers/process/launcher.py index 3e523e78..77fd5129 100644 --- a/optimum_benchmark/launchers/process/launcher.py +++ b/optimum_benchmark/launchers/process/launcher.py @@ -28,15 +28,12 @@ def configure(self, config: ProcessConfig) -> None: mp.set_start_method(self.config.start_method, force=True) def launch(self, worker: Callable, *worker_args): - # worker process can't be daemon since it spawns its own processes + # worker process can't be daemon since it might spawn its own processes worker_process = Process(target=target, args=(worker, *worker_args), daemon=False) worker_process.start() LOGGER.info(f"\t+ Launched worker process with PID {worker_process.pid}.") - with device_isolation( - enabled=self.config.device_isolation, - permitted_pids={os.getpid(), worker_process.pid}, - ): + with device_isolation(enabled=self.config.device_isolation, benchmark_pid=os.getpid()): worker_process.join() if worker_process.exitcode != 0: diff --git a/optimum_benchmark/launchers/torchrun/launcher.py b/optimum_benchmark/launchers/torchrun/launcher.py index bd75e3d5..ffd94a1e 100644 --- a/optimum_benchmark/launchers/torchrun/launcher.py +++ b/optimum_benchmark/launchers/torchrun/launcher.py @@ -50,11 +50,8 @@ def launch(self, worker: Callable, *worker_args): log_dir=self.config.log_dir, ) - with device_isolation( - enabled=self.config.device_isolation, - permitted_pids={os.getpid()}, - ): - LOGGER.info(f"\t+ Launching torchrun/torchelastic agent with {self.config.nproc_per_node} processes") + with device_isolation(enabled=self.config.device_isolation, benchmark_pid=os.getpid()): + LOGGER.info(f"\t+ Launching torchrun agent with {self.config.nproc_per_node} workers processes") launch_agent( entrypoint=entrypoint, args=(worker, *worker_args), From f35303dd55d9515f63920544c9d44a9142ae8d1c Mon Sep 17 00:00:00 2001 From: IlyasMoutawwakil Date: Wed, 17 Jan 2024 08:25:34 +0000 Subject: [PATCH 2/2] fix --- .../launchers/isolation_utils.py | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/optimum_benchmark/launchers/isolation_utils.py b/optimum_benchmark/launchers/isolation_utils.py index 65302fde..365e0d9d 100644 --- a/optimum_benchmark/launchers/isolation_utils.py +++ b/optimum_benchmark/launchers/isolation_utils.py @@ -28,36 +28,37 @@ LOGGER = getLogger("isolation") -def get_nvidia_devices_pids() -> Dict[int, set]: - devices_pids: Dict[int, set] = {} +def get_nvidia_devices_pids() -> Dict[int, list]: + devices_pids: Dict[int, list] = {} devices_ids = [int(device_id) for device_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] if not is_py3nvml_available(): - raise ValueError("get_nvidia_device_pids requires py3nvml. " "Please install it with `pip install py3nvml`.") + raise ValueError("get_nvidia_device_pids requires py3nvml. Please install it with `pip install py3nvml`.") nvml.nvmlInit() + for device_id in devices_ids: device_handle = nvml.nvmlDeviceGetHandleByIndex(device_id) device_processes = nvml.nvmlDeviceGetComputeRunningProcesses(device_handle) for device_process in device_processes: if device_id not in devices_pids: devices_pids[device_id] = [] - else: - devices_pids[device_id].append(device_process.pid) + + devices_pids[device_id].append(device_process.pid) nvml.nvmlShutdown() return devices_pids -def get_amd_devices_pids() -> None: +def get_amd_devices_pids() -> Dict[int, list]: devices_pids: Dict[int, list] = {} rocm_version = torch_version().split("rocm")[-1] devices_ids = [int(device_id) for device_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] if not is_amdsmi_available(): raise ValueError( - "check_no_process_is_running_on_cuda_device requires amdsmi. " + "get_amd_devices_pids requires amdsmi. " "Please follow the instructions at https://github.com/RadeonOpenCompute/amdsmi/tree/master" ) @@ -86,8 +87,8 @@ def get_amd_devices_pids() -> None: if device_id not in devices_pids: devices_pids[device_id] = [] - else: - devices_pids[device_id].append(info["pid"]) + + devices_pids[device_id].append(info["pid"]) else: devices_handles = amdsmi.amdsmi_get_device_handles() for device_id in devices_ids: @@ -110,8 +111,8 @@ def get_amd_devices_pids() -> None: if device_id not in devices_pids: devices_pids[device_id] = [] - else: - devices_pids[device_id].append(info["pid"]) + + devices_pids[device_id].append(info["pid"]) amdsmi.amdsmi_shut_down() @@ -140,7 +141,7 @@ def assert_system_devices_isolation(benchmark_pid: int) -> None: logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) while psutil.pid_exists(benchmark_pid): - permitted_pids = set() + child_processes = set() non_permitted_pids = set() all_devices_pids = get_pids_running_on_system_device() @@ -151,19 +152,19 @@ def assert_system_devices_isolation(benchmark_pid: int) -> None: try: info = psutil.Process(pid) - prent_pid = info.ppid() + parent_pid = info.ppid() except Exception as e: LOGGER.error(f"Failed to get info for process {pid} with error {e}") - continue + parent_pid = None - if prent_pid == benchmark_pid or prent_pid == isolation_pid: - permitted_pids.add(pid) + if parent_pid == benchmark_pid or parent_pid == isolation_pid: + child_processes.add(pid) else: non_permitted_pids.add(pid) if len(non_permitted_pids) > 0: LOGGER.error(f"Found non-permitted process(es) running on system device(s): {non_permitted_pids}") - for pid in permitted_pids: + for pid in child_processes: try: LOGGER.error(f"Terminating child process {pid}") os.kill(pid, signal.SIGTERM)