diff --git a/.github/workflows/test_cuda_pytorch.yaml b/.github/workflows/test_cuda_pytorch.yaml index 92a38b82..dd845801 100644 --- a/.github/workflows/test_cuda_pytorch.yaml +++ b/.github/workflows/test_cuda_pytorch.yaml @@ -50,4 +50,4 @@ jobs: --gpus '"device=0,1"' --entrypoint /bin/bash opt-bench-cuda:${{ matrix.image.cuda_version }} - -c "pip install -e .[test,peft,diffusers] && pytest -k 'cuda and pytorch' -x" + -c "pip install -e .[test,peft,diffusers,deepspeed] && pytest -k 'cuda and pytorch' -x" diff --git a/.github/workflows/test_rocm_pytorch.yaml b/.github/workflows/test_rocm_pytorch.yaml index 952131e3..336af359 100644 --- a/.github/workflows/test_rocm_pytorch.yaml +++ b/.github/workflows/test_rocm_pytorch.yaml @@ -53,4 +53,4 @@ jobs: --device /dev/dri/renderD129 --entrypoint /bin/bash opt-bench-rocm:${{ matrix.image.rocm_version }} - -c "pip install -e .[test,peft,diffusers] && pytest -k 'cuda and pytorch' -x" + -c "pip install -e .[test,peft,diffusers,deepspeed] && pytest -k 'cuda and pytorch' -x" diff --git a/.gitignore b/.gitignore index d1b3725c..a589733f 100644 --- a/.gitignore +++ b/.gitignore @@ -169,4 +169,5 @@ version.txt actions-runner/ experiments/ -examples/ \ No newline at end of file +examples/ +results/ \ No newline at end of file diff --git a/optimum_benchmark/backends/base.py b/optimum_benchmark/backends/base.py index 7ffcbf97..4abc9474 100644 --- a/optimum_benchmark/backends/base.py +++ b/optimum_benchmark/backends/base.py @@ -12,23 +12,19 @@ ClassVar, Dict, Generic, - List, Optional, Union, ) import numpy as np -import torch from optimum.exporters import TasksManager from transformers import AutoConfig, AutoProcessor if TYPE_CHECKING: - from datasets import Dataset from transformers import ( Pipeline, PretrainedConfig, PreTrainedModel, - TrainerCallback, TrainerState, ) from transformers.utils import ModelOutput @@ -37,10 +33,7 @@ from ..task_utils import DIFFUSION_TASKS, TEXT_GENERATION_TASKS from .config import BackendConfigT -from .isolation_utils import ( - only_this_process_is_running_on_cuda_devices, - only_this_process_will_run_on_cuda_devices, -) +from .isolation_utils import check_cuda_continuous_isolation from .utils import ( extract_shapes_from_diffusion_pipeline, extract_shapes_from_model_artifacts, @@ -48,38 +41,31 @@ LOGGER = getLogger("backend") -CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", None) -if CUDA_VISIBLE_DEVICES is not None: - CUDA_DEVICES = list(map(int, CUDA_VISIBLE_DEVICES.split(","))) -elif torch.cuda.is_available(): - CUDA_DEVICES = list(range(torch.cuda.device_count())) -else: - CUDA_DEVICES = [] - class Backend(Generic[BackendConfigT], ABC): NAME: ClassVar[str] - # instance variables without default values https://stackoverflow.com/a/44962662 + library: str + model_type: str config: BackendConfigT + isolation_thread: Optional[Process] pretrained_model: Union["PreTrainedModel", "Pipeline"] pretrained_processor: Optional["PreTrainedProcessor"] pretrained_config: Optional["PretrainedConfig"] + automodel_class: Callable[..., "PreTrainedModel"] def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any]): self.task = task self.model = model + self.device = device self.hub_kwargs = hub_kwargs - self.device = torch.device(device) if self.is_diffusion_pipeline(): - # for pipelines self.library = "diffusers" self.model_type = self.task self.pretrained_config = None self.pretrained_processor = None else: - # for models self.library = "transformers" self.pretrained_config = AutoConfig.from_pretrained( pretrained_model_name_or_path=self.model, **self.hub_kwargs @@ -87,8 +73,8 @@ def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any self.model_type = self.pretrained_config.model_type try: - # the processor sometimes contains information about the model's - # input shapes that's not available in the config + # sometimes contains information about the model's + # input shapes that're not available in the config self.pretrained_processor = AutoProcessor.from_pretrained( pretrained_model_name_or_path=self.model, **self.hub_kwargs ) @@ -98,7 +84,10 @@ def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any self.pretrained_processor = None self.automodel_class = TasksManager.get_model_class_for_task( - task=self.task, library=self.library, model_type=self.model_type + framework="pt", # TODO: make this configurable to add support for other frameworks + task=self.task, + library=self.library, + model_type=self.model_type, ) def is_text_generation_model(self) -> bool: @@ -112,71 +101,50 @@ def configure(self, config: BackendConfigT) -> None: self.config = config # isolation options - if self.config.initial_isolation_check: - self.check_initial_isolation() - if self.config.continous_isolation_check: + if self.config.continuous_isolation: + LOGGER.info("\t+ Running continuous isolation check") self.check_continuous_isolation() - # seeding backend - LOGGER.info(f"\t+ Seeding backend with seed {self.config.seed}") - self.seed() - # clean up options if self.config.delete_cache: LOGGER.info("\t+ Model cache will be deleted after benchmark") - def check_initial_isolation(self) -> None: - if self.device.type == "cuda": - LOGGER.info(f"\t+ Checking initial device(s) isolation of CUDA device(s): {CUDA_DEVICES}") - only_this_process_is_running_on_cuda_devices(cuda_devices=CUDA_DEVICES, benchmark_pid=os.getpid()) - def check_continuous_isolation(self) -> None: - if self.device.type == "cuda": - LOGGER.info(f"\t+ Checking continuous device(s) isolation of CUDA device(s): {CUDA_DEVICES}") - self.isolation_thread = Process( - target=only_this_process_will_run_on_cuda_devices, - kwargs={"cuda_devices": CUDA_DEVICES, "benchmark_pid": os.getpid()}, + if self.device == "cuda": + self.isolation_process = Process( + target=check_cuda_continuous_isolation, + kwargs={ + "isolated_pid": os.getpid(), + "isolation_check_interval": self.config.isolation_check_interval, + }, daemon=True, ) - self.isolation_thread.start() + self.isolation_process.start() + LOGGER.info(f"\t+ Started isolation process with PID {self.isolation_process.pid}") def seed(self) -> None: - # https://pytorch.org/docs/stable/notes/randomness.html random.seed(self.config.seed) np.random.seed(self.config.seed) - torch.manual_seed(self.config.seed) def prepare_input(self, input: Dict[str, Any]) -> Dict[str, Any]: if self.is_diffusion_pipeline(): - # diffusion pipelines takes a list of strings - return input + return input # diffusion pipelines takes a list of strings else: - # models expect tensors on the target device for key, value in input.items(): - input[key] = value.to(self.device) + input[key] = value.to(self.device) # models expect tensors on the target device return input def prepare_for_inference(self, **kwargs) -> None: pass - # # symbolic tracing in transformers requires input names - # def prepare_for_profiling(self, input_names: List[str]) -> Dict[str, Any]: - # pass - def forward(self, input: Dict[str, Any], kwargs: Dict[str, Any]) -> "ModelOutput": return self.pretrained_model(**input, **kwargs) def generate(self, input: Dict[str, Any], kwargs: Dict[str, Any]) -> "ModelOutput": return self.pretrained_model.generate(**input, **kwargs) - def train( - self, - training_dataset: "Dataset", - training_arguments: Dict[str, Any], - training_callbacks: List["TrainerCallback"], - training_data_collator: Callable, - ) -> "TrainerState": + def train(self, **kwargs) -> "TrainerState": raise NotImplementedError("Backend must implement train method") @property @@ -194,9 +162,8 @@ def model_shapes(self) -> Dict[str, int]: return model_shapes def delete_pretrained_model(self) -> None: - if hasattr(self, "pretrained_model"): - LOGGER.info("\t+ Deleting pretrained model") - del self.pretrained_model + LOGGER.info("\t+ Deleting pretrained model") + del self.pretrained_model gc.collect() def delete_model_cache(self) -> None: @@ -205,9 +172,22 @@ def delete_model_cache(self) -> None: model_cache_path = os.path.join(os.path.expanduser("~/.cache/huggingface/hub"), model_cache_folder) shutil.rmtree(model_cache_path, ignore_errors=True) + def terminate_isolation_process(self) -> None: + LOGGER.info("\t+ Terminating isolation process") + self.isolation_process.kill() + self.isolation_process.join() + self.isolation_process.close() + def clean(self) -> None: LOGGER.info(f"Cleaning {self.NAME} backend") - self.delete_pretrained_model() + + if self.config.continuous_isolation: + self.terminate_isolation_process() + + if hasattr(self, "pretrained_model"): + self.delete_pretrained_model() if self.config.delete_cache: self.delete_model_cache() + + gc.collect() diff --git a/optimum_benchmark/backends/config.py b/optimum_benchmark/backends/config.py index 4943f395..68bfab68 100644 --- a/optimum_benchmark/backends/config.py +++ b/optimum_benchmark/backends/config.py @@ -16,9 +16,9 @@ class BackendConfig(ABC): inter_op_num_threads: Optional[int] = None intra_op_num_threads: Optional[int] = None - # isolation options - initial_isolation_check: bool = True - continous_isolation_check: bool = True + # device isolation options + continuous_isolation: bool = True + isolation_check_interval: Optional[int] = None # clean up options delete_cache: bool = False @@ -32,5 +32,8 @@ def __post_init__(self): if self.intra_op_num_threads == -1: self.intra_op_num_threads = cpu_count() + if self.isolation_check_interval is None: + self.isolation_check_interval = 1 # 1 second + BackendConfigT = TypeVar("BackendConfigT", bound=BackendConfig) diff --git a/optimum_benchmark/backends/ddp_utils.py b/optimum_benchmark/backends/ddp_utils.py deleted file mode 100644 index 4fa5cc74..00000000 --- a/optimum_benchmark/backends/ddp_utils.py +++ /dev/null @@ -1,97 +0,0 @@ -# TODO: this can be reformulated as a subclass of backend, from which pytorch and onnxruntime and any other backend - -import logging.config -import os -from logging import getLogger -from typing import TYPE_CHECKING, Optional - -from omegaconf import OmegaConf - -if TYPE_CHECKING: - from transformers import TrainerState - -from ..import_utils import is_torch_distributed_available - -# from launchConfig in https://github.com/pytorch/pytorch/blob/v2.0.0/torch/distributed/launcher/api.py#L29 adjusted -# to defaults of torch.distributed.run in https://github.com/pytorch/pytorch/blob/v2.0.0/torch/distributed/run.py#L770 -DDP_CONFIG = { - "min_nodes": 1, - "max_nodes": 1, - "run_id": "none", - "nproc_per_node": "${device_count:}", - "role": "default", - "rdzv_endpoint": "127.0.0.1:29500", - "rdzv_backend": "static", - "rdzv_configs": { - "timeout": 900, - "rank": 0, - }, - "max_restarts": 0, - "monitor_interval": 5, - "start_method": "spawn", - "log_dir": None, - "metrics_cfg": {}, - "local_addr": None, -} - - -def get_worker_logger(name: Optional[str] = None, log_all: bool = False) -> logging.Logger: - """ - PyTorch DDP subprocesses do not inherit from Hydra logger. - Thus, we need to reconfigure the logger for the workers. - """ - if os.environ["RANK"] == "0" or log_all: - # TODO: also configure logging for other ranks - hydra_conf = OmegaConf.load(".hydra/hydra.yaml") - logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) - - return getLogger(name) - - -def training_worker(args) -> "TrainerState": - dataset_format = args[0] - backend_logger = args[1] - trainer_class = args[2] - training_arguments_class = args[3] - use_ddp = args[4] - training_dataset = args[5] - training_arguments = args[6] - training_data_collator = args[7] - training_callbacks = args[8] - pretrained_model = args[9] - - if use_ddp: - LOGGER_WORKER = get_worker_logger("pytorch-ddp-worker", log_all=False) - env_variables = ["RANK", "WORLD_SIZE", "MASTER_ADDR", "MASTER_PORT", "TORCHELASTIC_MAX_RESTARTS"] - LOGGER_WORKER.info("Initializing DDP worker") - for env_var in env_variables: - LOGGER_WORKER.info(f"{env_var}: {os.environ.get(env_var)}") - else: - LOGGER_WORKER = backend_logger - - LOGGER_WORKER.info(f"\t+ Setting dataset format to `{dataset_format}`.") - training_dataset.set_format(type=dataset_format, columns=list(training_dataset.features.keys())) - LOGGER_WORKER.info("\t+ Wrapping training arguments with transformers.TrainingArguments") - training_arguments = training_arguments_class(**training_arguments) - LOGGER_WORKER.info("\t+ Wrapping model with transformers.Trainer") - trainer = trainer_class( - model=pretrained_model, - args=training_arguments, - callbacks=training_callbacks, - train_dataset=training_dataset, - data_collator=training_data_collator, - ) - LOGGER_WORKER.info("\t+ Starting training") - trainer.train() - LOGGER_WORKER.info("\t+ Training finished successfully") - return trainer.state - - -# a conditional decorator that is only applied if torch.distributed.elastic.multiprocessing.errors.record is available -def record_if_available(func): - if is_torch_distributed_available(): - from torch.distributed.elastic.multiprocessing.errors import record - - return record(func) - else: - return func diff --git a/optimum_benchmark/backends/isolation_utils.py b/optimum_benchmark/backends/isolation_utils.py index 80236e3e..447fc787 100644 --- a/optimum_benchmark/backends/isolation_utils.py +++ b/optimum_benchmark/backends/isolation_utils.py @@ -1,18 +1,24 @@ +import logging.config import os import signal import time +from logging import getLogger from typing import Dict, List +from omegaconf import OmegaConf + from ..env_utils import is_nvidia_system, is_rocm_system from ..import_utils import is_amdsmi_available, is_py3nvml_available, torch_version +LOGGER = getLogger("isolation") + -def only_this_process_is_running_on_cuda_devices(cuda_devices: List[int], benchmark_pid: int) -> None: +def check_cuda_isolation(isolated_devices: List[int], permitted_pids: List[int]) -> None: """ - Raises a RuntimeError if any process other than the benchmark process is running on the specified CUDA devices. + Raises a RuntimeError if any process other than the permitted ones is running on the specified CUDA devices. """ pids: Dict[int, set] = {} - for device_id in cuda_devices: + for device_id in isolated_devices: pids[device_id] = set() if is_nvidia_system(): @@ -24,10 +30,14 @@ def only_this_process_is_running_on_cuda_devices(cuda_devices: List[int], benchm import py3nvml.py3nvml as nvml nvml.nvmlInit() - for device_id in cuda_devices: + for device_id in isolated_devices: device_handle = nvml.nvmlDeviceGetHandleByIndex(device_id) device_processes = nvml.nvmlDeviceGetComputeRunningProcesses(device_handle) for device_process in device_processes: + if device_process.pid not in permitted_pids: + LOGGER.warning(f"Found unexpected process {device_process.pid} on device {device_id}.") + LOGGER.warning(f"Process info: {device_process}") + pids[device_id].add(device_process.pid) nvml.nvmlShutdown() @@ -39,74 +49,121 @@ def only_this_process_is_running_on_cuda_devices(cuda_devices: List[int], benchm "check_no_process_is_running_on_cuda_device requires amdsmi. " "Please follow the instructions at https://github.com/RadeonOpenCompute/amdsmi/tree/master" ) - import amdsmi as smi + import amdsmi - smi.amdsmi_init() + amdsmi.amdsmi_init() if rocm_version >= "5.7": # starting from rocm 5.7, the api seems to have changed names - devices_handles = smi.amdsmi_get_processor_handles() - for device_id in cuda_devices: + devices_handles = amdsmi.amdsmi_get_processor_handles() + for device_id in isolated_devices: device_handle = devices_handles[device_id] - processes_handles = smi.amdsmi_get_gpu_process_list(device_handle) + try: + # these functions fail a lot for no apparent reason + processes_handles = amdsmi.amdsmi_get_gpu_process_list(device_handle) + except Exception: + continue + for process_handle in processes_handles: - info = smi.amdsmi_get_gpu_process_info(device_handle, process_handle) + try: + # these functions fail a lot for no apparent reason + info = amdsmi.amdsmi_get_gpu_process_info(device_handle, process_handle) + except Exception: + continue + if info["memory_usage"]["vram_mem"] == 4096: continue + + if info["pid"] not in permitted_pids: + LOGGER.warning(f"Found unexpected process {info['pid']} on device {device_id}.") + LOGGER.warning(f"Process info: {info}") + pids[device_id].add(info["pid"]) else: - devices_handles = smi.amdsmi_get_device_handles() - for device_id in cuda_devices: + devices_handles = amdsmi.amdsmi_get_device_handles() + for device_id in isolated_devices: device_handle = devices_handles[device_id] - processes_handles = smi.amdsmi_get_process_list(device_handle) + try: + # these functions fail a lot for no apparent reason + processes_handles = amdsmi.amdsmi_get_process_list(device_handle) + except Exception: + continue + for process_handle in processes_handles: - info = smi.amdsmi_get_process_info(device_handle, process_handle) + try: + # these functions fail a lot for no apparent reason + info = amdsmi.amdsmi_get_process_info(device_handle, process_handle) + except Exception: + continue + if info["memory_usage"]["vram_mem"] == 4096: continue + + if info["pid"] not in permitted_pids: + LOGGER.warning(f"Found unexpected process {info['pid']} on device {device_id}.") + LOGGER.warning(f"Process info: {info}") + pids[device_id].add(info["pid"]) - smi.amdsmi_shut_down() + amdsmi.amdsmi_shut_down() else: raise ValueError("check_no_process_is_running_on_cuda_device is only supported on NVIDIA and AMD GPUs.") all_pids = set() - for device_id in cuda_devices: + for device_id in isolated_devices: all_pids |= pids[device_id] - other_pids = all_pids - {benchmark_pid} + other_pids = all_pids - set(permitted_pids) if len(other_pids) > 0: - error_message = f"Expected only process {benchmark_pid} on device(s) {cuda_devices}, but found {other_pids}." - # for pid in other_pids: - # error_message += f"\nProcess {pid} info: {get_pid_info(pid)}" + error_message = ( + f"Expected only process(se) {permitted_pids} on device(s) {isolated_devices}, but found {other_pids}." + ) raise RuntimeError(error_message) -def only_this_process_will_run_on_cuda_devices(cuda_devices: List[int], benchmark_pid: int) -> None: +def check_cuda_continuous_isolation(isolated_pid: int, isolation_check_interval: int = 1) -> None: """ - Kills the benchmark process if any other process is running on the specified CUDA devices. + Kills the isolated process if any other process than the permitted ones is running on the specified CUDA devices. """ - while True: - try: - only_this_process_is_running_on_cuda_devices(cuda_devices, benchmark_pid) - time.sleep(0.1) - except Exception as exception: - os.kill(benchmark_pid, signal.SIGTERM) - raise exception + hydra_conf = OmegaConf.load(".hydra/hydra.yaml") + logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) + + # distributed setting is tricky + if os.environ.get("LOCAL_WORLD_SIZE", None) is not None: + from torch.distributed import TCPStore -## we can report more information about the process to explain the source of the error -## but that might be dangerous in a CI context + local_rank = os.environ["LOCAL_RANK"] + all_isolated_keys = [f"isolated_{other_rank}" for other_rank in range(int(os.environ["LOCAL_WORLD_SIZE"]))] + all_isolators_keys = [f"isolator_{other_rank}" for other_rank in range(int(os.environ["LOCAL_WORLD_SIZE"]))] -# import psutil + store = TCPStore(host_name=os.environ["MASTER_ADDR"], port=int(os.environ["MASTER_PORT"])) -# def get_pid_info(pid: int) -> Dict[str, str]: -# """Returns a dictionary containing the process' information.""" + store.add(f"isolator_{local_rank}", os.getpid()) + store.add(f"isolated_{local_rank}", isolated_pid) + store.wait(all_isolated_keys + all_isolators_keys) -# process = psutil.Process(pid) + all_isolated_pids = [int(store.get(name)) for name in all_isolated_keys] + all_isolators_pids = [int(store.get(name)) for name in all_isolators_keys] + permitted_pids = all_isolated_pids + all_isolators_pids + assert len(permitted_pids) == len(set(permitted_pids)), "Found duplicated pids in the distributed setting" + else: + isolator_pid = os.getpid() + permitted_pids = [isolator_pid, isolated_pid] + + isolated_devices = [int(device) for device in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] + + LOGGER.info( + f"Continuously checking only process(es) {permitted_pids} is/are running on device(s) {isolated_devices}" + ) -# return { -# "pid": pid, -# "name": process.name(), -# "username": process.username(), -# "cmdline": " ".join(process.cmdline()), -# } + while True: + try: + check_cuda_isolation(isolated_devices, permitted_pids) + time.sleep(isolation_check_interval) + except RuntimeError as e: + LOGGER.error("Error while checking CUDA isolation:") + LOGGER.error(e) + LOGGER.error("Killing isolated process...") + os.kill(isolated_pid, signal.SIGTERM) # graceful kill, will trigger the backend cleanup + e.with_traceback() diff --git a/optimum_benchmark/backends/neural_compressor/backend.py b/optimum_benchmark/backends/neural_compressor/backend.py index 7d76f138..ca8ee1ed 100644 --- a/optimum_benchmark/backends/neural_compressor/backend.py +++ b/optimum_benchmark/backends/neural_compressor/backend.py @@ -31,8 +31,8 @@ def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any ) def validate_device(self) -> None: - if self.device.type != "cpu": - raise ValueError(f"INCBackend only supports CPU devices, got {self.device.type}") + if self.device != "cpu": + raise ValueError(f"INCBackend only supports CPU devices, got {self.device}") def validate_task(self) -> None: if self.task not in TASKS_TO_INCMODELS: diff --git a/optimum_benchmark/backends/onnxruntime/backend.py b/optimum_benchmark/backends/onnxruntime/backend.py index 84b1c759..cab6d980 100644 --- a/optimum_benchmark/backends/onnxruntime/backend.py +++ b/optimum_benchmark/backends/onnxruntime/backend.py @@ -27,9 +27,7 @@ from datasets import Dataset from transformers import TrainerCallback, TrainerState -from ...profilers.ort_profiler import ORTProfilingWrapper from ..base import Backend -from ..ddp_utils import record_if_available, training_worker from ..optimum_utils import main_export from ..pytorch.utils import randomize_weights from .config import ORTConfig @@ -46,17 +44,9 @@ def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any self.validate_device() self.validate_task() - if self.is_diffusion_pipeline(): - self.ortmodel_class = get_class(TASKS_TO_ORTSD[self.task]) - elif self.task in TASKS_TO_ORTMODELS: - self.ortmodel_class = TASKS_TO_ORTMODELS[self.task] - - ortmodel_name = self.ortmodel_class.__name__ - LOGGER.info(f"Inferred ORTModel class {ortmodel_name} for task {self.task} and model_type {self.model_type}") - def validate_device(self) -> None: - if self.device.type not in ["cpu", "cuda"]: - raise ValueError(f"ORTBackend only supports CPU and CUDA devices, got {self.device.type}") + if self.device not in ["cpu", "cuda"]: + raise ValueError(f"ORTBackend only supports CPU and CUDA devices, got {self.device}") def validate_task(self) -> None: if self.task not in TASKS_TO_ORTMODELS and self.task not in TASKS_TO_ORTSD: @@ -65,6 +55,14 @@ def validate_task(self) -> None: def configure(self, config: ORTConfig) -> None: super().configure(config) + if self.is_diffusion_pipeline(): + self.ortmodel_class = get_class(TASKS_TO_ORTSD[self.task]) + elif self.task in TASKS_TO_ORTMODELS: + self.ortmodel_class = TASKS_TO_ORTMODELS[self.task] + + ortmodel_name = self.ortmodel_class.__name__ + LOGGER.info(f"Inferred ORTModel class {ortmodel_name} for task {self.task} and model_type {self.model_type}") + # Process torch dtype self.torch_dtype = getattr(torch, self.config.torch_dtype) if self.config.torch_dtype is not None else None @@ -149,12 +147,11 @@ def load_automodel_from_config(self) -> None: def load_automodel_from_pretrained(self) -> None: LOGGER.info("\t+ Loading AutoModel from pretrained") - with self.device: - self.pretrained_model = self.automodel_class.from_pretrained( - self.model, - torch_dtype=self.torch_dtype, - **self.hub_kwargs, - ) + self.pretrained_model = self.automodel_class.from_pretrained( + self.model, + torch_dtype=self.torch_dtype, + **self.hub_kwargs, + ).to(self.device) def load_ortmodel(self) -> None: LOGGER.info("\t+ Loading ORTModel") @@ -189,7 +186,7 @@ def export_automodel(self) -> None: self.model, output=exported_model_dir, task=self.export_task, - device=self.device.type, + device=self.device, fp16=self.torch_dtype == torch.float16, **self.hub_kwargs, # we hijack the model instantiation and use our random weights model @@ -213,12 +210,12 @@ def optimize_onnx_files(self) -> None: if self.config.auto_optimization is not None: optimization_config = AutoOptimizationConfig.with_optimization_level( optimization_level=self.config.auto_optimization, - for_gpu=self.device.type == "cuda", + for_gpu=self.device == "cuda", **self.config.auto_optimization_config, ) elif self.config.optimization: optimization_config = OptimizationConfig( - optimize_for_gpu=self.device.type == "cuda", **self.config.optimization_config + optimize_for_gpu=self.device == "cuda", **self.config.optimization_config ) LOGGER.info("\t+ Creating optimizer") optimizer = ORTOptimizer.from_pretrained(self.model, file_names=self.onnx_files_names) @@ -277,7 +274,7 @@ def quantize_onnx_files(self) -> None: dataset=calibration_dataset, calibration_config=calibration_config, operators_to_quantize=quantization_config.operators_to_quantize, - use_gpu=self.device.type == "cuda", + use_gpu=self.device == "cuda", # TODO: add support for these batch_size=1, use_external_data_format=False, @@ -327,44 +324,31 @@ def prepare_for_inference(self, **kwargs) -> None: self.load_ortmodel() self.tmpdir.cleanup() - def prepare_for_profiling(self, input_names: List[str]) -> None: - LOGGER.info("Preparing model for profiling") - LOGGER.info("\t+ Wrapping model inside profiler") - self.pretrained_model = ORTProfilingWrapper(self.pretrained_model) - - @record_if_available def train( self, training_dataset: "Dataset", training_arguments: Dict[str, Any], training_callbacks: List["TrainerCallback"], training_data_collator: Callable, + dataset_format: str = "torch", ) -> "TrainerState": - worker_args = ( - "torch", - LOGGER, - ORTTrainer, - ORTTrainingArguments, - self.config.use_ddp, - training_dataset, - training_arguments, - training_data_collator, - training_callbacks, - self.pretrained_model, + LOGGER.info(f"\t+ Setting dataset format to `{dataset_format}`.") + training_dataset.set_format(type=dataset_format, columns=list(training_dataset.features.keys())) + LOGGER.info("\t+ Wrapping training arguments with optimum.onnxruntime.ORTTrainingArguments") + training_arguments = ORTTrainingArguments(**training_arguments) + LOGGER.info("\t+ Wrapping model with optimum.onnxruntime.ORTTrainer") + trainer = ORTTrainer( + model=self.pretrained_model, + args=training_arguments, + callbacks=training_callbacks, + train_dataset=training_dataset, + data_collator=training_data_collator, ) + LOGGER.info("\t+ Starting training") + trainer.train() + LOGGER.info("\t+ Training finished successfully") - if self.config.use_ddp: - from torch.distributed.launcher.api import LaunchConfig, elastic_launch - - # For DDP, we log only the state of the first rank as transformers does. - # since the batch size used in measuring the throughput is the one of world size. - ddp_config = LaunchConfig(**self.config.ddp_config) - results = elastic_launch(config=ddp_config, entrypoint=training_worker)(worker_args)[0] - else: - # For DP, we can still use training_worker, simply not wrapped by the elastic_launch class. - results = training_worker(worker_args) - - return results + return trainer.state def clean(self) -> None: super().clean() @@ -372,6 +356,7 @@ def clean(self) -> None: if hasattr(self, "tmpdir"): self.tmpdir.cleanup() - if self.device.type == "cuda": + if self.device == "cuda": torch.cuda.empty_cache() - gc.collect() + + gc.collect() diff --git a/optimum_benchmark/backends/onnxruntime/config.py b/optimum_benchmark/backends/onnxruntime/config.py index 7ea41d3e..10c7b79f 100644 --- a/optimum_benchmark/backends/onnxruntime/config.py +++ b/optimum_benchmark/backends/onnxruntime/config.py @@ -6,41 +6,10 @@ from ...import_utils import onnxruntime_version from ..config import BackendConfig -from ..ddp_utils import DDP_CONFIG from ..peft_utils import PEFT_CONFIGS, PEFT_TASKS_TYPES - -def infer_device_id(device: str) -> int: - """Infer the device id from the given device string.""" - if "cuda" in device: - if ":" in device: - # either CUDA_VISIBLE_DEVICES is set or device is set to cuda:0 - return int(device.split(":")[1]) - else: - # device is set to cuda - return 0 - elif device == "cpu": - return -1 - else: - raise ValueError(f"Unknown device: {device}") - - -DEVICE_PROVIDER_MAP = { - "cpu": "CPUExecutionProvider", - "cuda": "CUDAExecutionProvider", -} - -OmegaConf.register_new_resolver("onnxruntime_version", onnxruntime_version) -OmegaConf.register_new_resolver("infer_device_id", lambda device: infer_device_id(device)) -OmegaConf.register_new_resolver("infer_provider", lambda device: DEVICE_PROVIDER_MAP[device]) -OmegaConf.register_new_resolver("is_profiling", lambda benchmark_name: benchmark_name == "profiling") -OmegaConf.register_new_resolver( - "io_bind", lambda provider: provider in ["CPUExecutionProvider", "CUDAExecutionProvider"] -) - - OPTIMIZATION_CONFIG = { - "optimization_level": 1, # 0, 1, 2, 99 + "optimization_level": 1, "fp16": False, "enable_transformers_specific_optimizations": True, "enable_gelu_approximation": False, @@ -100,6 +69,14 @@ def infer_device_id(device: str) -> int: "trt_engine_cache_path": "tmp/trt_cache", } +DEVICE_PROVIDER_MAP = {"cpu": "CPUExecutionProvider", "cuda": "CUDAExecutionProvider"} +IO_BINDING_PROVIDERS = ["CPUExecutionProvider", "CUDAExecutionProvider"] + +OmegaConf.register_new_resolver("onnxruntime_version", onnxruntime_version) +OmegaConf.register_new_resolver("infer_provider", lambda device: DEVICE_PROVIDER_MAP[device]) +OmegaConf.register_new_resolver("is_profiling", lambda benchmark_name: benchmark_name == "profiling") +OmegaConf.register_new_resolver("ort_io_binding", lambda provider: provider in IO_BINDING_PROVIDERS) + @dataclass class ORTConfig(BackendConfig): @@ -117,10 +94,10 @@ class ORTConfig(BackendConfig): # provider options provider: str = "${infer_provider:${device}}" - provider_options: Dict[str, Any] = field(default_factory=lambda: {"device_id": "${infer_device_id:${device}}"}) + provider_options: Dict[str, Any] = field(default_factory=lambda: {}) # inference options - use_io_binding: bool = "${io_bind:${device}}" + use_io_binding: bool = "${ort_io_binding:${device}}" session_options: Dict[str, Any] = field( default_factory=lambda: {"enable_profiling": "${is_profiling:${benchmark.name}}"} ) @@ -148,17 +125,11 @@ class ORTConfig(BackendConfig): # ort-training is basically a different package so we might need to separate these two backends in the future use_inference_session: bool = "${is_inference:${benchmark.name}}" - # training options - use_ddp: bool = False - ddp_config: Dict[str, Any] = field(default_factory=dict) - # peft options peft_strategy: Optional[str] = None peft_config: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): - CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", None) - if not self.no_weights and not self.export and self.torch_dtype is not None: raise NotImplementedError("Can't convert an exported model's weights to a different dtype.") @@ -196,15 +167,6 @@ def __post_init__(self): if self.calibration: self.calibration_config = OmegaConf.to_object(OmegaConf.merge(CALIBRATION_CONFIG, self.calibration_config)) - if self.use_ddp: - if CUDA_VISIBLE_DEVICES is None: - raise ValueError("`use_ddp` can only be used when CUDA_VISIBLE_DEVICES is set.") - - self.ddp_config = OmegaConf.to_object(OmegaConf.merge(DDP_CONFIG, self.ddp_config)) - # TODO: check if it's not possible to use DDP with multiple nodes - if self.ddp_config["max_nodes"] > 1 or self.ddp_config["min_nodes"] > 1: - raise NotImplementedError("Currently, PyTorch DDP benchmark only supports training on a single node.") - if self.peft_strategy is not None: if self.peft_strategy not in PEFT_CONFIGS: raise ValueError( diff --git a/optimum_benchmark/backends/openvino/backend.py b/optimum_benchmark/backends/openvino/backend.py index f118544c..fc88559a 100644 --- a/optimum_benchmark/backends/openvino/backend.py +++ b/optimum_benchmark/backends/openvino/backend.py @@ -23,21 +23,23 @@ def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any self.validate_device() self.validate_task() - self.ovmodel_class = get_class(TASKS_TO_OVMODEL[self.task]) - ortmodel_name = self.ovmodel_class.__name__ - LOGGER.info(f"Inferred OVModel class {ortmodel_name} for task {self.task} and model_type {self.model_type}") - def validate_task(self) -> None: if self.task not in TASKS_TO_OVMODEL: raise NotImplementedError(f"OVBackend does not support task {self.task}") def validate_device(self) -> None: - if self.device.type != "cpu": - raise ValueError(f"OVBackend only supports CPU devices, got {self.device.type}") + if self.device != "cpu": + raise ValueError(f"OVBackend only supports CPU devices, got {self.device}") def configure(self, config: OVConfig) -> None: super().configure(config) + self.ovmodel_class = get_class(TASKS_TO_OVMODEL[self.task]) + ortmodel_name = self.ovmodel_class.__name__ + LOGGER.info( + f"\t+ Inferred OVModel class {ortmodel_name} for task {self.task} and model_type {self.model_type}" + ) + self.openvino_config = self.config.openvino_config.copy() if self.config.inter_op_num_threads is not None: LOGGER.info(f"\t+ Setting inter_op_num_threads to {self.config.inter_op_num_threads}") diff --git a/optimum_benchmark/backends/pytorch/backend.py b/optimum_benchmark/backends/pytorch/backend.py index 20aaec10..340a7a87 100644 --- a/optimum_benchmark/backends/pytorch/backend.py +++ b/optimum_benchmark/backends/pytorch/backend.py @@ -1,28 +1,25 @@ import gc -import logging +import os from logging import getLogger from typing import TYPE_CHECKING, Any, Callable, Dict, List import torch -from transformers.utils.fx import symbolic_trace + +if torch.distributed.is_available(): + import torch.distributed if TYPE_CHECKING: from datasets import Dataset from transformers import TrainerCallback, TrainerState from transformers.utils import ModelOutput -from ...profilers.fx_profiler import FXProfilingWrapper from ..base import Backend -from ..ddp_utils import record_if_available, training_worker from .config import PyTorchConfig from .utils import DTYPES_MAPPING, randomize_weights, to_pow2 # bachend logger LOGGER = getLogger("pytorch") -# disable numexpr.utils logger -getLogger("numexpr.utils").setLevel(logging.CRITICAL) - class PyTorchBackend(Backend[PyTorchConfig]): NAME: str = "pytorch" @@ -30,12 +27,23 @@ class PyTorchBackend(Backend[PyTorchConfig]): def __init__(self, model: str, task: str, device: str, hub_kwargs: Dict[str, Any]): super().__init__(model, task, device, hub_kwargs) - automodel = self.automodel_class.__name__ - LOGGER.info(f"Inferred AutoModel class {automodel} for task {self.task} and model_type {self.model_type}") - def configure(self, config: PyTorchConfig) -> None: super().configure(config) + automodel = self.automodel_class.__name__ + LOGGER.info(f"\t+ Inferred AutoModel class {automodel} for task {self.task} and model_type {self.model_type}") + + # for now we rely on this env variable to know if we're in a distributed setting + if os.environ.get("LOCAL_WORLD_SIZE", None) is not None: + LOGGER.info(f"\t+ Detected local world size: {os.environ['LOCAL_WORLD_SIZE']}") + local_rank = int(os.environ["LOCAL_RANK"]) + LOGGER.info(f"\t+ Detected local rank: {local_rank}") + available_devices = list(map(int, os.environ.get("CUDA_VISIBLE_DEVICES", "0").split(","))) + LOGGER.info(f"\t+ Detected available devices: {available_devices}") + default_device = available_devices[local_rank] + LOGGER.info(f"\t+ Setting default device to: {default_device}") + torch.cuda.set_device(default_device) + # Gradients options if self.config.disable_grad: LOGGER.info("\t+ Disabling gradients") @@ -98,6 +106,14 @@ def configure(self, config: PyTorchConfig) -> None: peft_config = peft_config_class(**self.config.peft_config) self.pretrained_model = get_peft_model(self.pretrained_model, peft_config=peft_config) + if self.config.deepspeed_inference: + LOGGER.info("\t+ Using DeepSpeed Inference") + from deepspeed import init_inference + + self.pretrained_model = init_inference( + self.pretrained_model, config=self.config.deepspeed_inference_config + ) + def load_model_from_pretrained(self) -> None: # iniline quantization or quantization config modification if self.config.quantization_scheme == "gptq": @@ -145,17 +161,17 @@ def load_model_from_pretrained(self) -> None: **self.hub_kwargs, ) elif hasattr(self.pretrained_config, "quantization_config") or self.quantization_config is not None: - LOGGER.info("\t+ Loading model with low cpu memory usage") + LOGGER.info(f"\t+ Loading quantized model and moving it to device: {self.device}") self.pretrained_model = self.automodel_class.from_pretrained( self.model, - low_cpu_memory_usage=True, torch_dtype=self.torch_dtype, **self.automodel_kwargs, **self.hub_kwargs, ).to(self.device) else: LOGGER.info(f"\t+ Loading model directly on device: {self.device}") - with self.device: + with torch.device(self.device): + # this is extremely faster than the above method self.pretrained_model = self.automodel_class.from_pretrained( self.model, torch_dtype=self.torch_dtype, @@ -167,6 +183,9 @@ def load_model_from_pretrained(self) -> None: def automodel_kwargs(self) -> Dict[str, Any]: kwargs = {} + if hasattr(self.pretrained_config, "quantization_config") or self.quantization_config is not None: + kwargs["low_cpu_memory_usage"] = True + if self.quantization_config is not None: kwargs["quantization_config"] = self.quantization_config @@ -247,18 +266,11 @@ def prepare_for_inference(self, input_shapes: Dict[str, int], **kwargs) -> None: and self.pretrained_config.quantization_config.desc_act ): LOGGER.info("\t+ Setting GPTQ's max_input_length") - from auto_gptq import exllama_set_max_input_length + from auto_gptq import exllama_set_max_input_length # type: ignore max_input_length = to_pow2(input_shapes["batch_size"] * input_shapes["sequence_length"]) self.pretrained_model = exllama_set_max_input_length(self.pretrained_model, max_input_length) - def prepare_for_profiling(self, input_names: List[str]) -> None: - LOGGER.info("Preparing model for profiling") - LOGGER.info("\t+ Symbolicly tracing model") - self.pretrained_model = symbolic_trace(self.pretrained_model, input_names=input_names) - LOGGER.info("\t+ Wrapping model with FXProfilingWrapper") - self.pretrained_model = FXProfilingWrapper(self.pretrained_model) - def forward(self, input: Dict[str, Any], kwargs: Dict[str, Any]) -> "ModelOutput": if self.is_diffusion_pipeline(): return super().forward(input, kwargs) @@ -277,50 +289,45 @@ def generate(self, input: Dict[str, Any], kwargs: Dict[str, Any]) -> "ModelOutpu else: return super().generate(input, kwargs) - @record_if_available def train( self, training_dataset: "Dataset", training_arguments: Dict[str, Any], training_callbacks: List["TrainerCallback"], training_data_collator: Callable, + dataset_format: str = "torch", ) -> "TrainerState": from transformers import Trainer, TrainingArguments - worker_args = ( - "torch", - LOGGER, - Trainer, - TrainingArguments, - self.config.use_ddp, - training_dataset, - training_arguments, - training_data_collator, - training_callbacks, - self.pretrained_model, + LOGGER.info(f"\t+ Setting dataset format to `{dataset_format}`.") + training_dataset.set_format(type=dataset_format, columns=list(training_dataset.features.keys())) + LOGGER.info("\t+ Wrapping training arguments with transformers.TrainingArguments") + training_arguments = TrainingArguments(**training_arguments) + LOGGER.info("\t+ Wrapping model with transformers.Trainer") + trainer = Trainer( + model=self.pretrained_model, + args=training_arguments, + callbacks=training_callbacks, + train_dataset=training_dataset, + data_collator=training_data_collator, ) - if self.config.use_ddp: - from torch.distributed.launcher.api import LaunchConfig, elastic_launch + LOGGER.info("\t+ Starting training") + trainer.train() + LOGGER.info("\t+ Training finished successfully") - # For DDP, we log only the state of the first rank as transformers does. - # since the batch size used in measuring the throughput is the one of world size. - ddp_config = LaunchConfig(**self.config.ddp_config) - results = elastic_launch(config=ddp_config, entrypoint=training_worker)(worker_args)[0] - else: - # For DP, we can still use training_worker, simply not wrapped by the elastic_launch class. - results = training_worker(worker_args) - - return results + return trainer.state def seed(self): super().seed() + torch.manual_seed(self.config.seed) - if self.device.type == "cuda": + if self.device == "cuda": torch.cuda.manual_seed_all(self.config.seed) def clean(self) -> None: super().clean() - if self.device.type == "cuda": + if self.device == "cuda": torch.cuda.empty_cache() - gc.collect() + + gc.collect() diff --git a/optimum_benchmark/backends/pytorch/config.py b/optimum_benchmark/backends/pytorch/config.py index ec7e89fb..071fbc28 100644 --- a/optimum_benchmark/backends/pytorch/config.py +++ b/optimum_benchmark/backends/pytorch/config.py @@ -7,7 +7,6 @@ from ...env_utils import is_rocm_system from ...import_utils import torch_version from ..config import BackendConfig -from ..ddp_utils import DDP_CONFIG from ..peft_utils import PEFT_CONFIGS, PEFT_TASKS_TYPES OmegaConf.register_new_resolver("device_count", lambda: len(os.environ.get("CUDA_VISIBLE_DEVICES", "").split(","))) @@ -64,33 +63,26 @@ class PyTorchConfig(BackendConfig): quantization_scheme: Optional[str] = None quantization_config: Dict[str, Any] = field(default_factory=dict) - # training options - use_ddp: bool = False - ddp_config: Dict[str, Any] = field(default_factory=dict) + # distributed options + deepspeed_inference: bool = False + deepspeed_inference_config: Dict[str, Any] = field(default_factory=dict) # peft options peft_strategy: Optional[str] = None peft_config: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): - CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", None) - if self.torch_compile: self.torch_compile_config = OmegaConf.to_object(OmegaConf.merge(COMPILE_CONFIG, self.torch_compile_config)) - if self.device_map is not None: - assert CUDA_VISIBLE_DEVICES is not None, "`device_map` can only be used when CUDA_VISIBLE_DEVICES is set." - - if self.device_map not in DEVICE_MAPS: - raise ValueError(f"`device_map` must be one of {DEVICE_MAPS}. Got {self.device_map} instead.") + if self.device_map is not None and self.device_map not in DEVICE_MAPS: + raise ValueError(f"`device_map` must be one of {DEVICE_MAPS}. Got {self.device_map} instead.") - if self.torch_dtype is not None: - if self.torch_dtype not in TORCH_DTYPES: - raise ValueError(f"`torch_dtype` must be one of {TORCH_DTYPES}. Got {self.torch_dtype} instead.") + if self.torch_dtype is not None and self.torch_dtype not in TORCH_DTYPES: + raise ValueError(f"`torch_dtype` must be one of {TORCH_DTYPES}. Got {self.torch_dtype} instead.") - if self.amp_dtype is not None: - if self.amp_dtype not in AMP_DTYPES: - raise ValueError(f"`amp_dtype` must be one of {AMP_DTYPES}. Got {self.amp_dtype} instead.") + if self.amp_dtype is not None and self.amp_dtype not in AMP_DTYPES: + raise ValueError(f"`amp_dtype` must be one of {AMP_DTYPES}. Got {self.amp_dtype} instead.") if self.quantization_scheme is not None: if self.quantization_scheme not in QUANTIZATION_CONFIGS: @@ -103,15 +95,6 @@ def __post_init__(self): OmegaConf.merge(QUANTIZATION_CONFIG, self.quantization_config) ) - if self.use_ddp: - if CUDA_VISIBLE_DEVICES is None: - raise ValueError("`use_ddp` can only be used when CUDA_VISIBLE_DEVICES is set.") - - self.ddp_config = OmegaConf.to_object(OmegaConf.merge(DDP_CONFIG, self.ddp_config)) - # TODO: check if it's not possible to use DDP with multiple nodes - if self.ddp_config["max_nodes"] > 1 or self.ddp_config["min_nodes"] > 1: - raise NotImplementedError("Currently, PyTorch DDP benchmark only supports training on a single node.") - if self.peft_strategy is not None: if self.peft_strategy not in PEFT_CONFIGS: raise ValueError( diff --git a/optimum_benchmark/backends/utils.py b/optimum_benchmark/backends/utils.py index 8897cd15..997f317c 100644 --- a/optimum_benchmark/backends/utils.py +++ b/optimum_benchmark/backends/utils.py @@ -38,7 +38,8 @@ def extract_shapes_from_diffusion_pipeline(pipeline: "Pipeline") -> Dict[str, An def extract_shapes_from_model_artifacts( - config: "PretrainedConfig", processor: Optional["PreTrainedProcessor"] = None + config: "PretrainedConfig", + processor: Optional["PreTrainedProcessor"] = None, ) -> Dict[str, Any]: shapes = {} artifacts_dict = {} diff --git a/optimum_benchmark/benchmarks/base.py b/optimum_benchmark/benchmarks/base.py index 4c33ed78..32cc9f9b 100644 --- a/optimum_benchmark/benchmarks/base.py +++ b/optimum_benchmark/benchmarks/base.py @@ -33,5 +33,8 @@ def configure(self, config: BenchmarkConfigT) -> None: def run(self, backend: "Backend") -> None: raise NotImplementedError("Benchmark must implement run method") + def get_results_df(self) -> None: + raise NotImplementedError("Benchmark must implement get_results_df method") + def save(self) -> None: raise NotImplementedError("Benchmark must implement save method") diff --git a/optimum_benchmark/benchmarks/inference/benchmark.py b/optimum_benchmark/benchmarks/inference/benchmark.py index 7b82efc2..c5d50d72 100644 --- a/optimum_benchmark/benchmarks/inference/benchmark.py +++ b/optimum_benchmark/benchmarks/inference/benchmark.py @@ -286,6 +286,6 @@ def get_results_df(self) -> DataFrame: return DataFrame(results_dict, index=[0]) def save(self) -> None: - LOGGER.info("Saving inference results") + LOGGER.info("Saving results") results_df = self.get_results_df() results_df.to_csv("inference_results.csv", index=False) diff --git a/optimum_benchmark/benchmarks/inference/config.py b/optimum_benchmark/benchmarks/inference/config.py index 3e4048e6..0da1614e 100644 --- a/optimum_benchmark/benchmarks/inference/config.py +++ b/optimum_benchmark/benchmarks/inference/config.py @@ -11,10 +11,8 @@ LOGGER = getLogger("inference") -OmegaConf.register_new_resolver("can_generate", lambda task: task in TEXT_GENERATION_TASKS) -OmegaConf.register_new_resolver("can_diffuse", lambda task: task in DIFFUSION_TASKS) - GENERATE_CONFIG = { + "num_return_sequences": 1, "max_new_tokens": 100, "min_new_tokens": 100, "do_sample": False, @@ -27,6 +25,9 @@ "num_images_per_prompt": 1, } +OmegaConf.register_new_resolver("can_generate", lambda task: task in TEXT_GENERATION_TASKS) +OmegaConf.register_new_resolver("can_diffuse", lambda task: task in DIFFUSION_TASKS) + @dataclass class InferenceConfig(BenchmarkConfig): @@ -81,6 +82,8 @@ def __post_init__(self): if self.new_tokens is not None: self.generate_kwargs["max_new_tokens"] = self.new_tokens self.generate_kwargs["min_new_tokens"] = self.new_tokens + else: + self.new_tokens = self.generate_kwargs["min_new_tokens"] if self.energy and os.environ.get("CUDA_VISIBLE_DEVICES", None) and is_rocm_system(): raise ValueError("Energy measurement through codecarbon is not available on RoCm-powered devices.") diff --git a/optimum_benchmark/benchmarks/utils.py b/optimum_benchmark/benchmarks/utils.py index 8dffaa60..2b4984bd 100644 --- a/optimum_benchmark/benchmarks/utils.py +++ b/optimum_benchmark/benchmarks/utils.py @@ -48,6 +48,7 @@ def on_train_end(self, args: "TrainingArguments", state: "TrainerState", control state.training_end = time.time_ns() * 1e-9 state.overall_training_end = time.time_ns() * 1e-9 + print(args.world_size) state.total_training_batch_size = args.train_batch_size * args.gradient_accumulation_steps * args.world_size # warmup metrics diff --git a/optimum_benchmark/cli.py b/optimum_benchmark/cli.py new file mode 100644 index 00000000..ded199e2 --- /dev/null +++ b/optimum_benchmark/cli.py @@ -0,0 +1,63 @@ +import glob +import os +import sys +from logging import getLogger + +import hydra +from omegaconf import DictConfig + +from .experiment import run_with_launcher + +LOGGER = getLogger("main") + + +@hydra.main(version_base=None) +# hydra takes care of the cli and returns the config object +def benchmark_cli(experiment: DictConfig) -> None: + if glob.glob("*.csv") and os.environ.get("OVERRIDE_BENCHMARKS", "0") != "1": + LOGGER.warning( + "Skipping benchmark because results already exist. " + "Set OVERRIDE_BENCHMARKS=1 to override benchmark results." + ) + return + + run_with_launcher(experiment) + + +def report_cli() -> None: + action = sys.argv[1] + sys.argv = sys.argv[1:] + + if action == "gather": + from .aggregators.gather import gather_cli + + gather_cli() + elif action == "display": + from .aggregators.display import display_cli + + display_cli() + elif action == "summarize": + from .aggregators.summarize import summarize_cli + + summarize_cli() + elif action == "plot": + from .aggregators.plot import plot_cli + + plot_cli() + elif action in ["-h", "--help"]: + print( + """ + Usage: optimum-report + Actions: + gather + display + summarize + plot + -h, --help + + For more information on each action, run: + optimum-report -h + """ + ) + else: + raise ValueError(f"Unknown action {action}") diff --git a/optimum_benchmark/env_utils.py b/optimum_benchmark/env_utils.py index a5bc424b..f0b0ea7c 100644 --- a/optimum_benchmark/env_utils.py +++ b/optimum_benchmark/env_utils.py @@ -28,6 +28,7 @@ def is_rocm_system(): def bytes_to_mega_bytes(bytes: int) -> int: + # MB, not MiB # Reference: https://en.wikipedia.org/wiki/Byte#Multiple-byte_units return int(bytes * 1e-6) diff --git a/optimum_benchmark/experiment.py b/optimum_benchmark/experiment.py index 6b99dca7..bdb3179e 100644 --- a/optimum_benchmark/experiment.py +++ b/optimum_benchmark/experiment.py @@ -1,13 +1,9 @@ -import glob -import logging.config -import multiprocessing import os import platform from dataclasses import dataclass, field from logging import getLogger from typing import TYPE_CHECKING, Any, Dict, Type -import hydra from hydra.core.config_store import ConfigStore from hydra.utils import get_class from omegaconf import DictConfig, OmegaConf @@ -26,11 +22,15 @@ optimum_version, transformers_version, ) +from .launchers.process.config import ProcessConfig +from .launchers.torchrun.config import TorchrunConfig from .task_utils import infer_task_from_model_name_or_path if TYPE_CHECKING: from .backends.base import Backend from .benchmarks.base import Benchmark + from .launchers.base import Launcher, LauncherConfig + LOGGER = getLogger("experiment") @@ -39,6 +39,9 @@ @dataclass class ExperimentConfig: + # LAUNCHER CONFIGURATION + launcher: Any # https://github.com/facebookresearch/hydra/issues/1722#issuecomment-883568386 + # BACKEND CONFIGURATION backend: Any # https://github.com/facebookresearch/hydra/issues/1722#issuecomment-883568386 @@ -46,17 +49,18 @@ class ExperimentConfig: benchmark: Any # https://github.com/facebookresearch/hydra/issues/1722#issuecomment-883568386 # EXPERIMENT CONFIGURATION - experiment_name: str + experiment_name: str = "experiment" # Model name or path (bert-base-uncased, google/vit-base-patch16-224, ...) - model: str - # Device name or path (cpu, cuda, cuda:0, ...) - device: str + model: str = "bert-base-uncased" # Task name (text-classification, image-classification, ...) task: str = "${infer_task:${model}}" + # Device name or path (cpu, cuda, cuda:0, ...) + device: str = "cuda" # ADDITIONAL MODEL CONFIGURATION: Model revision, use_auth_token, trust_remote_code hub_kwargs: Dict = field( default_factory=lambda: { + # "token": None, "revision": "main", "cache_dir": None, "force_download": False, @@ -65,7 +69,6 @@ class ExperimentConfig: ) # ENVIRONMENT CONFIGURATION - # TODO: add gpu info when available environment: Dict = field( default_factory=lambda: { "optimum_version": optimum_version(), @@ -86,22 +89,31 @@ class ExperimentConfig: ) def __post_init__(self) -> None: - # if the number of available GPUs is 1, then we have no problem - # torch and nvidia-smi will both index it as 0, otherwise: + if self.device.startswith("cuda:"): + raise ValueError( + f"Device was specified as {self.device} with a target index." + "We recommend using the main cuda device (`cuda`) and specifying the target index in `CUDA_VISIBLE_DEVICES`." + ) + + if self.device not in ["cuda", "cpu", "mps", "xla"]: + raise ValueError("`device` must be either `cuda`, `cpu`, `mps` or `xla`.") + if "cuda" in self.device and len(self.environment["gpus"]) > 1: - CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", None) - if CUDA_VISIBLE_DEVICES is None: - raise ValueError( + if os.environ.get("CUDA_VISIBLE_DEVICES", None) is None: + LOGGER.warning( "Multiple GPUs detected but CUDA_VISIBLE_DEVICES is not set. " - "This means that code might allocate resources from GPUs that are not intended to be used. " - "Please set `CUDA_VISIBLE_DEVICES` to the desired GPU ids." + "This means that code might allocate resources from the wrong GPUs. " + "We recommend setting CUDA_VISIBLE_DEVICES to isolate the GPUs that will be used for this experiment. " + "`CUDA_VISIBLE_DEVICES` will be set to `0` to ensure that only the first GPU is used." + "If you want to use multiple GPUs, please set `CUDA_VISIBLE_DEVICES` to the desired GPU indices." ) - CUDA_DEVICE_ORDER = os.environ.get("CUDA_DEVICE_ORDER", None) - if CUDA_DEVICE_ORDER is None or CUDA_DEVICE_ORDER != "PCI_BUS_ID": + os.environ["CUDA_VISIBLE_DEVICES"] = "0" + + if os.environ.get("CUDA_DEVICE_ORDER", None) != "PCI_BUS_ID": LOGGER.warning( - "Multiple GPUs detected but CUDA_DEVICE_ORDER is not set. " - "This means that code might allocate resources from the wrong GPUs even if CUDA_VISIBLE_DEVICES is set. " - "Pytorch uses the `FASTEST_FIRST` order by default, which is not guaranteed to be the same as nvidia-smi. " + "Multiple GPUs detected but CUDA_DEVICE_ORDER is not set to `PCI_BUS_ID`. " + "This means that code might allocate resources from the wrong GPUs even if `CUDA_VISIBLE_DEVICES` is set. " + "For example pytorch uses the `FASTEST_FIRST` order by default, which is not guaranteed to be the same as nvidia-smi. " "`CUDA_DEVICE_ORDER` will be set to `PCI_BUS_ID` to ensure that the GPUs are allocated in the same order as nvidia-smi. " ) os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" @@ -117,16 +129,13 @@ def __post_init__(self) -> None: cs.store(group="backend", name="text-generation-inference", node=TGIConfig) cs.store(group="benchmark", name="inference", node=InferenceConfig) cs.store(group="benchmark", name="training", node=TrainingConfig) +cs.store(group="launcher", name="process", node=ProcessConfig) +cs.store(group="launcher", name="torchrun", node=TorchrunConfig) -def run(experiment: DictConfig) -> None: - # Configure logging - hydra_conf = OmegaConf.load(".hydra/hydra.yaml") - logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) - - # This is required to trigger __post_init__. Reference: https://github.com/omry/omegaconf/issues/377 +def run(experiment: "ExperimentConfig") -> "Benchmark": + # Instantiate the experiment config to trigger __post_init__ experiment: ExperimentConfig = OmegaConf.to_object(experiment) - # Save the config OmegaConf.save(experiment, "hydra_config.yaml", resolve=True) # Allocate requested backend @@ -158,12 +167,10 @@ def run(experiment: DictConfig) -> None: backend.clean() raise e + # Run the benchmark try: - # Run the benchmark benchmark.run(backend) - # Save the benchmark results benchmark.save() - # Clean up the backend backend.clean() except Exception as e: LOGGER.error("Error during benchmark execution: %s", e) @@ -171,24 +178,25 @@ def run(experiment: DictConfig) -> None: raise e -def run_isolated(experiment: DictConfig, start_method: str = "spawn") -> None: - # Set the multiprocessing start method if not already set - if multiprocessing.get_start_method(allow_none=True) != start_method: - multiprocessing.set_start_method(start_method) - - # Execute the experiment in a child process - p = multiprocessing.Process(target=run, args=(experiment,)) - p.start() - p.join() +def run_with_launcher(experiment: DictConfig): + # instead of emplimenting hydra/launcher plugins, we handle the launcher ourselves + # thsi allows us to use spawn with torchrun, to gather outputs from parallel processes, + # and to handle errors gracefully - # Exit with the same exit code as the child process - exit(p.exitcode) + # Instantiate the experiment config to trigger __post_init__ + experiment.launcher: LauncherConfig = OmegaConf.to_object(experiment.launcher) + launcher_factory: Type["Launcher"] = get_class(experiment.launcher._target_) + launcher: "Launcher" = launcher_factory() -@hydra.main(version_base=None) -def main(experiment: DictConfig) -> None: - if glob.glob("*.csv"): - LOGGER.warning("Skipping because results already exist in experiment directory.") - return + try: + launcher.configure(experiment.launcher) + except Exception as e: + LOGGER.error("Error during launcher configuration: %s", e) + raise e - run_isolated(experiment, start_method="spawn") + try: + launcher.launch(run, experiment) + except Exception as e: + LOGGER.error("Error during experiment execution: %s", e) + raise e diff --git a/optimum_benchmark/import_utils.py b/optimum_benchmark/import_utils.py index 969bc3f0..9c5e869f 100644 --- a/optimum_benchmark/import_utils.py +++ b/optimum_benchmark/import_utils.py @@ -68,7 +68,7 @@ def onnxruntime_version(): try: return "ort-training:" + importlib.metadata.version("onnxruntime-training") except importlib.metadata.PackageNotFoundError: - return None + return "ort:unknown" def openvino_version(): diff --git a/optimum_benchmark/launchers/__init__.py b/optimum_benchmark/launchers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/optimum_benchmark/launchers/base.py b/optimum_benchmark/launchers/base.py new file mode 100644 index 00000000..e53f4e8c --- /dev/null +++ b/optimum_benchmark/launchers/base.py @@ -0,0 +1,31 @@ +from abc import ABC +from dataclasses import dataclass +from logging import getLogger +from typing import Callable, ClassVar, Generic, TypeVar + +LOGGER = getLogger("launcher") + + +@dataclass +class LauncherConfig(ABC): + name: str + _target_: str + + +LauncherConfigT = TypeVar("LauncherConfigT", bound=LauncherConfig) + + +class Launcher(Generic[LauncherConfigT], ABC): + NAME: ClassVar[str] + + config: LauncherConfigT + + def __init__(self) -> None: + pass + + def configure(self, config: LauncherConfigT) -> None: + LOGGER.info(f"Configuring {self.NAME} launcher") + self.config = config + + def launch(self, worker: Callable, *worker_args): + raise NotImplementedError("Launcher must implement launch method") diff --git a/optimum_benchmark/launchers/process/__init__.py b/optimum_benchmark/launchers/process/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/optimum_benchmark/launchers/process/config.py b/optimum_benchmark/launchers/process/config.py new file mode 100644 index 00000000..332b5dd6 --- /dev/null +++ b/optimum_benchmark/launchers/process/config.py @@ -0,0 +1,18 @@ +from dataclasses import dataclass +from logging import getLogger + +from ..base import LauncherConfig + +LOGGER = getLogger("process") + + +@dataclass +class ProcessConfig(LauncherConfig): + name: str = "process" + _target_: str = "optimum_benchmark.launchers.process.launcher.ProcessLauncher" + + start_method: str = "spawn" + + def __post_init__(self) -> None: + if self.start_method not in ["spawn", "fork"]: + raise ValueError(f"start_method must be one of ['spawn', 'fork'], got {self.start_method}") diff --git a/optimum_benchmark/launchers/process/launcher.py b/optimum_benchmark/launchers/process/launcher.py new file mode 100644 index 00000000..5c8f17bc --- /dev/null +++ b/optimum_benchmark/launchers/process/launcher.py @@ -0,0 +1,63 @@ +import logging.config +import multiprocessing as mp +from logging import getLogger +from multiprocessing import Process +from typing import Callable + +from omegaconf import OmegaConf + +from ..base import Launcher +from .config import ProcessConfig + +LOGGER = getLogger("process") + + +class ProcessLauncher(Launcher[ProcessConfig]): + NAME = "process" + + def __init__(self) -> None: + super().__init__() + + def configure(self, config: ProcessConfig) -> None: + super().configure(config) + + def launch(self, worker: Callable, *worker_args): + # Set the multiprocessing start method if not already set + if mp.get_start_method(allow_none=True) is None: + mp.set_start_method(self.config.start_method) + + # Create the process + process = Process( + target=target, + args=(worker, *worker_args), + ) + + process.start() + LOGGER.info(f"\t+ Launched experiment in process with PID {process.pid}.") + process.join() + + if process.exitcode is None: + LOGGER.warning("\t+ Process did not exit even after getting benchmark result, terminating it.") + process.terminate() + process.join() + + if process.exitcode is None: + LOGGER.error("\t+ Process did not exit even after being terminated, killing it.") + process.kill() + process.join() + + if process.exitcode != 0: + raise RuntimeError(f"Process exited with code {process.exitcode}") + + LOGGER.info("\t+ Process exited successfully, closing it.") + process.close() + + +def target(fn, *args): + """ + This a pickalable function that correctly sets up the logging configuration + """ + hydra_conf = OmegaConf.load(".hydra/hydra.yaml") + logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) + + fn(*args) diff --git a/optimum_benchmark/launchers/torchrun/__init__.py b/optimum_benchmark/launchers/torchrun/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/optimum_benchmark/launchers/torchrun/config.py b/optimum_benchmark/launchers/torchrun/config.py new file mode 100644 index 00000000..766c1aab --- /dev/null +++ b/optimum_benchmark/launchers/torchrun/config.py @@ -0,0 +1,71 @@ +import os +from dataclasses import dataclass, field +from logging import getLogger +from typing import Any, Dict, Optional + +from omegaconf import OmegaConf + +from ..base import LauncherConfig + +LOGGER = getLogger("torchrun") + +OmegaConf.register_new_resolver("available_gpus", lambda: len(os.environ.get("CUDA_VISIBLE_DEVICES", "0").split(","))) + + +@dataclass +class TorchrunConfig(LauncherConfig): + name: str = "torchrun" + _target_: str = "optimum_benchmark.launchers.torchrun.launcher.TorchrunLauncher" + + # Minimum amount of nodes that the user function will be launched on. + # Elastic agent ensures that the user function start only when the min_nodes amount enters the rendezvous. + min_nodes: int = 1 + # Maximum amount of nodes that the user function will be launched on. + max_nodes: int = 1 + # On each node the elastic agent will launch this amount of workers that will execute user defined function. + nproc_per_node: int = "${available_gpus:}" + # The unique run id of the job (if not passed a unique one will be deduced from run environment - flow workflow id in flow - or auto generated). + run_id: str = "${experiment_name}" + # User defined role of the worker (defaults to "trainer"). + role: str = "benchmark_worker" + # The interval in seconds that is used by the elastic_agent as a period of monitoring workers. + monitor_interval: int = 30 + # The endpoint of the rdzv sync. storage. + rdzv_endpoint: str = "localhost:29599" + # rdzv_backend to use in the rendezvous (zeus-adapter, etcd). + rdzv_backend: str = "static" + # Key, value pair that specifies rendezvous specific configuration. + rdzv_configs: Dict[str, Any] = field( + default_factory=lambda: { + "rank": 0, + "timeout": 900, + } + ) + # The maximum amount of restarts that elastic agent will conduct on workers before failure. + max_restarts: int = 0 + # The method is used by the elastic agent to start the workers (spawn, fork, forkserver). + start_method: str = "spawn" + # base log directory where log files are written. If not set, one is created in a tmp dir but NOT removed on exit. + log_dir: Optional[str] = None + # configuration to redirect stdout/stderr to log files. + # Pass a single Std enum to redirect all workers, or a mapping keyed by local_rank to selectively redirect. + redirects: str = "0" # Std.NONE + # configuration to "tee" stdout/stderr to console + log file. + tee: str = "0" # Std.NONE + # configuration to initialize metrics. + metrics_cfg: Dict[str, str] = field(default_factory=lambda: {}) + # address of the local node if any. If not set, a lookup on the local machine's FQDN will be performed. + local_addr: Optional[str] = None + + def __post_init__(self) -> None: + if self.start_method not in ["spawn", "fork"]: + raise ValueError(f"start_method must be one of ['spawn', 'fork'], got {self.start_method}") + + if self.min_nodes != self.max_nodes: + raise ValueError( + f"min_nodes and max_nodes must be equal for a reproducible benchmark, got {self.min_nodes} and {self.max_nodes}" + ) + + if self.min_nodes != 1: + LOGGER.info("For multi-node benchmarks, run the benchmark on each node separately.") + LOGGER.info(f"Waiting for the other nodes to be avaialable at {self.rdzv_endpoint}...") diff --git a/optimum_benchmark/launchers/torchrun/launcher.py b/optimum_benchmark/launchers/torchrun/launcher.py new file mode 100644 index 00000000..591444dc --- /dev/null +++ b/optimum_benchmark/launchers/torchrun/launcher.py @@ -0,0 +1,69 @@ +import logging.config +import os +from logging import getLogger +from typing import Callable + +from omegaconf import OmegaConf +from torch.distributed.elastic.multiprocessing import Std +from torch.distributed.elastic.multiprocessing.errors import record +from torch.distributed.launcher.api import LaunchConfig, launch_agent + +from ..base import Launcher +from .config import TorchrunConfig + +LOGGER = getLogger("torchrun") + + +class TorchrunLauncher(Launcher[TorchrunConfig]): + NAME = "torchrun" + + def __init__(self) -> None: + super().__init__() + + def configure(self, config: "TorchrunConfig") -> None: + super().configure(config) + + def launch(self, worker: Callable, *worker_args): + launch_config = LaunchConfig( + min_nodes=self.config.min_nodes, + max_nodes=self.config.max_nodes, + nproc_per_node=self.config.nproc_per_node, + run_id=self.config.run_id, + role=self.config.role, + monitor_interval=self.config.monitor_interval, + rdzv_endpoint=self.config.rdzv_endpoint, + rdzv_backend=self.config.rdzv_backend, + rdzv_configs=self.config.rdzv_configs, + max_restarts=self.config.max_restarts, + start_method=self.config.start_method, + metrics_cfg=self.config.metrics_cfg, + redirects=Std.from_str(self.config.redirects), + tee=Std.from_str(self.config.tee), + local_addr=self.config.local_addr, + log_dir=self.config.log_dir, + ) + + LOGGER.info(f"\t+ Launching {self.config.nproc_per_node} processes with torchrun") + + launch_agent( + entrypoint=entrypoint, + args=(worker, *worker_args), + config=launch_config, + ) + + LOGGER.info("\t+ Torchrun exited successfully") + + +@record +def entrypoint(fn, *args): + """ + This a pickalable function that correctly sets up the logging configuration + """ + + if os.environ["LOCAL_RANK"] == "0": + hydra_conf = OmegaConf.load(".hydra/hydra.yaml") + logging.config.dictConfig(OmegaConf.to_container(hydra_conf.hydra.job_logging, resolve=True)) + else: + logging.disable(logging.CRITICAL) + + fn(*args) diff --git a/optimum_benchmark/report.py b/optimum_benchmark/report.py deleted file mode 100644 index 26c2b9f8..00000000 --- a/optimum_benchmark/report.py +++ /dev/null @@ -1,40 +0,0 @@ -import sys - -HELP = """ -Usage: optimum-report -Actions: - gather - display - summarize - plot - -h, --help - -For more information on each action, run: - optimum-report -h -""" - - -def main(): - action = sys.argv[1] - sys.argv = sys.argv[1:] - - if action == "gather": - from .aggregators.gather import gather_cli - - gather_cli() - elif action == "display": - from .aggregators.display import display_cli - - display_cli() - elif action == "summarize": - from .aggregators.summarize import summarize_cli - - summarize_cli() - elif action == "plot": - from .aggregators.plot import plot_cli - - plot_cli() - elif action in ["-h", "--help"]: - print(HELP) - else: - raise ValueError(f"Unknown action {action}") diff --git a/optimum_benchmark/trackers/energy.py b/optimum_benchmark/trackers/energy.py index 052809a7..cca1a8f2 100644 --- a/optimum_benchmark/trackers/energy.py +++ b/optimum_benchmark/trackers/energy.py @@ -7,7 +7,7 @@ if is_codecarbon_available(): from codecarbon import EmissionsTracker, OfflineEmissionsTracker -LOGGER = getLogger("latency_tracker") +LOGGER = getLogger("energy") class EnergyTracker: @@ -30,9 +30,12 @@ def track(self, interval=1, file_prefix=""): LOGGER.info("Falling back to Offline Emissions Tracker") country_iso_code = os.environ.get("COUNTRY_ISO_CODE", None) if country_iso_code is None: - raise ValueError( - "COUNTRY_ISO_CODE environment variable must be set when using Offline Emissions Tracker" + LOGGER.warning( + "Offline Emissions Tracker requires COUNTRY_ISO_CODE to be set. " + "We will set it to FRA but the carbon footprint will be inaccurate." ) + country_iso_code = "FRA" + self.emission_tracker = OfflineEmissionsTracker( log_level="error", tracking_mode="process", diff --git a/optimum_benchmark/trackers/latency.py b/optimum_benchmark/trackers/latency.py index d34be65f..e63d5d0d 100644 --- a/optimum_benchmark/trackers/latency.py +++ b/optimum_benchmark/trackers/latency.py @@ -3,25 +3,18 @@ from logging import getLogger from typing import List -import torch - -LOGGER = getLogger("latency_tracker") +LOGGER = getLogger("latency") class LatencyTracker: - def __init__(self, device: torch.device, backend: str): + def __init__(self, device: str, backend: str): self.device = device self.backend = backend self.latencies: List[float] = [] - if self.device.type == "cuda" and self.backend == "pytorch": - # because pytorch will always see devices as 0, 1, 2, ... CUDA_VISIBLE_DEVICES doesn't matter - self.device_ids = list(range(torch.cuda.device_count())) - LOGGER.info(f"Tracking Pytorch CUDA devices: {self.device_ids}") - @contextmanager def track(self): - if self.device.type == "cuda" and self.backend == "pytorch": + if self.device == "cuda" and self.backend == "pytorch": yield from self._cuda_latency() else: yield from self._cpu_latency() @@ -29,32 +22,32 @@ def track(self): def get_latencies(self): return self.latencies + def _cpu_latency(self): + start = time.perf_counter_ns() + yield + end = time.perf_counter_ns() + latency_ns = end - start + latency = latency_ns / 1e9 + + LOGGER.debug(f"Tracked CPU latency: {latency:.2e}s") + self.latencies.append(latency) + def _cuda_latency(self): + import torch.cuda + start_event = torch.cuda.Event(enable_timing=True) end_event = torch.cuda.Event(enable_timing=True) - for device_index in self.device_ids: - torch.cuda.synchronize(device=device_index) - # here must record the start event after the synchronization of all devices - start_event.record(stream=torch.cuda.current_stream(device=self.device_ids[-1])) + + torch.cuda.synchronize() + start_event.record() + torch.cuda.synchronize() yield - for device_index in self.device_ids: - if device_index == self.device_ids[-1]: - # here we must record the end event before the synchronization of the last device - end_event.record(stream=torch.cuda.current_stream(device=self.device_ids[-1])) - torch.cuda.synchronize(device=device_index) + torch.cuda.synchronize() + end_event.record() + torch.cuda.synchronize() latency_ms = start_event.elapsed_time(end_event) latency = latency_ms / 1e3 LOGGER.debug(f"Tracked CUDA latency: {latency:.2e}s") self.latencies.append(latency) - - def _cpu_latency(self): - start = time.perf_counter_ns() - yield - end = time.perf_counter_ns() - latency_ns = end - start - latency = latency_ns / 1e9 - - LOGGER.debug(f"Tracked CPU latency: {latency:.2e}s") - self.latencies.append(latency) diff --git a/optimum_benchmark/trackers/memory.py b/optimum_benchmark/trackers/memory.py index 0ad7fac2..af18b8a7 100644 --- a/optimum_benchmark/trackers/memory.py +++ b/optimum_benchmark/trackers/memory.py @@ -10,7 +10,7 @@ from ..env_utils import bytes_to_mega_bytes, is_nvidia_system, is_rocm_system from ..import_utils import is_py3nvml_available, is_pyrsmi_available -LOGGER = getLogger("memory_tracker") +LOGGER = getLogger("memory") class MemoryTracker: @@ -21,25 +21,16 @@ def __init__(self, device: torch.device): self.max_memory_reserved: int = 0 self.max_memory_allocated: int = 0 - if self.device.type == "cuda": - CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", None) - if CUDA_VISIBLE_DEVICES is not None: - # if CUDA_VISIBLE_DEVICES is set, only the visible devices' memory is tracked - self.cuda_device_ids = list(map(int, CUDA_VISIBLE_DEVICES.split(","))) - else: - # if CUDA_VISIBLE_DEVICES is not set, only the main device's memory is tracked - # which is 0 because otherwise, the experiment would've raised an error asking for - # CUDA_VISIBLE_DEVICES to be set - self.cuda_device_ids = [self.device.index if self.device.index is not None else 0] - - self.pytorch_device_ids = list(range(len(self.cuda_device_ids))) + if self.device == "cuda": + self.pytorch_device_ids = list(range(torch.cuda.device_count())) + self.cuda_device_ids = list(map(int, os.environ.get("CUDA_VISIBLE_DEVICES", None).split(","))) LOGGER.info(f"Tracking CUDA devices: {self.cuda_device_ids}") LOGGER.info(f"Tracking Pytorch CUDA devices: {self.pytorch_device_ids}") @contextmanager def track(self): - if self.device.type == "cuda": + if self.device == "cuda": yield from self._cuda_memory() else: yield from self._cpu_memory() diff --git a/pyproject.toml b/pyproject.toml index 37477057..824b767d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,3 +19,8 @@ target-version = ['py37'] [tool.ruff] ignore = ["E501", "C901"] select = ["C", "E", "F", "I", "W"] + +[tool.pytest.ini_options] +log_cli = true +log_cli_level = "INFO" +log_cli_format = "%(message)s" diff --git a/setup.py b/setup.py index 9ad63753..21a5998c 100644 --- a/setup.py +++ b/setup.py @@ -10,9 +10,9 @@ f"optimum>={OPTIMUM_VERSION}", # backends, tasks and input generation "accelerate", # distributed inference and no weights init # Hydra - "omegaconf", - "hydra-core", "hydra_colorlog", + "hydra-core", + "omegaconf", # Other "psutil", "pandas", @@ -53,9 +53,10 @@ # gpu backends "onnxruntime-gpu": [f"optimum[onnxruntime-gpu]>={OPTIMUM_VERSION}"], "onnxruntime-training": ["torch-ort", "onnxruntime-training"], - # server-like backends - "text-generation-inference": ["docker>=6.0.0"], + # docker-based backends + "text-generation-inference": ["docker"], # specific settings + "deepspeed": ["deepspeed"], "diffusers": ["diffusers"], "peft": ["peft"], } @@ -69,8 +70,8 @@ extras_require=EXTRAS_REQUIRE, entry_points={ "console_scripts": [ - "optimum-benchmark=optimum_benchmark.experiment:main", - "optimum-report=optimum_benchmark.report:main", + "optimum-benchmark=optimum_benchmark.cli:benchmark_cli", + "optimum-report=optimum_benchmark.cli:report_cli", ] }, ) diff --git a/tests/configs/_base_.yaml b/tests/configs/_base_.yaml index af2811c6..b60c5e43 100644 --- a/tests/configs/_base_.yaml +++ b/tests/configs/_base_.yaml @@ -1,5 +1,6 @@ # This is a base config file that can potentially be used for all tests defaults: + - launcher: process # we use process launcher for tests - experiment # inheriting experiment schema - _self_ # for hydra 1.1 compatibility - override hydra/job_logging: colorlog # colorful logging @@ -16,13 +17,12 @@ hydra: # this is useful for saving outputs in a separate directory chdir: true env_set: - # by default, we only use one GPU - CUDA_VISIBLE_DEVICES: 0 - CUDA_DEVICE_ORDER: PCI_BUS_ID + CUDA_VISIBLE_DEVICES: 0 # by default we only use one GPU + CUDA_DEVICE_ORDER: PCI_BUS_ID # laking we use the right GPU + OVERRIDE_BENCHMARKS: 1 # to not skip benchmarks backend: # we turn off isolation checks because tests run on shared resources - initial_isolation_check: false - continous_isolation_check: false + continuous_isolation: false experiment_name: ${device}_${backend.name}_${benchmark.name}_${task} diff --git a/tests/configs/_ddp_.yaml b/tests/configs/_ddp_.yaml index ddfbf8b7..3512d61c 100644 --- a/tests/configs/_ddp_.yaml +++ b/tests/configs/_ddp_.yaml @@ -1,5 +1,14 @@ # Distributed Data Parallel (DDP) training -experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_ddp +defaults: + - override launcher: torchrun # for DDP training, use torchrun launcher + +hydra: + job: + env_set: + CUDA_VISIBLE_DEVICES: 0,1 + +launcher: + nproc_per_node: 2 benchmark: dataset_shapes: @@ -8,12 +17,4 @@ benchmark: training_arguments: per_device_train_batch_size: 8 -backend: - use_ddp: true - ddp_config: - rdzv_endpoint: 127.0.0.1:29509 - -hydra: - job: - env_set: - CUDA_VISIBLE_DEVICES: 0,1 +experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_ddp diff --git a/tests/configs/_dp_.yaml b/tests/configs/_dp_.yaml index 9cb39074..74b47f3a 100644 --- a/tests/configs/_dp_.yaml +++ b/tests/configs/_dp_.yaml @@ -1,5 +1,8 @@ # Data Parallel (DP) training -experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_dp +hydra: + job: + env_set: + CUDA_VISIBLE_DEVICES: 0,1 benchmark: dataset_shapes: @@ -8,7 +11,4 @@ benchmark: training_arguments: per_device_train_batch_size: 8 -hydra: - job: - env_set: - CUDA_VISIBLE_DEVICES: 0,1 +experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_dp diff --git a/tests/configs/_peft_.yaml b/tests/configs/_peft_.yaml index eca5157f..01365aa4 100644 --- a/tests/configs/_peft_.yaml +++ b/tests/configs/_peft_.yaml @@ -1,7 +1,8 @@ # Parameters Effecient Fine Tuning -experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_peft backend: peft_strategy: lora peft_config: - task_type: CAUSAL_LM \ No newline at end of file + task_type: CAUSAL_LM + +experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_peft diff --git a/tests/configs/_pp_.yaml b/tests/configs/_pp_.yaml index 4bb900c3..d85942f4 100644 --- a/tests/configs/_pp_.yaml +++ b/tests/configs/_pp_.yaml @@ -1,10 +1,10 @@ # Pipeline Parallelism (PP) -experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_mp - -backend: - device_map: auto - hydra: job: env_set: CUDA_VISIBLE_DEVICES: 0,1 + +backend: + device_map: auto + +experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_mp diff --git a/tests/configs/_tp_.yaml b/tests/configs/_tp_.yaml new file mode 100644 index 00000000..28e5360e --- /dev/null +++ b/tests/configs/_tp_.yaml @@ -0,0 +1,20 @@ +# Tensor Parallelism (TP) +defaults: + - override launcher: torchrun # for TP training, use torchrun launcher + +hydra: + job: + env_set: + CUDA_VISIBLE_DEVICES: 0,1 + +launcher: + nproc_per_node: 2 + rdzv_endpoint: "localhost:29599" + +backend: + deepspeed_inference: true + deepspeed_inference_config: + tensor_parallel: + tp_size: 2 + +experiment_name: ${device}_${backend.name}_${benchmark.name}_${task}_tp diff --git a/tests/configs/cuda_pytorch_inference_bert_tp.yaml b/tests/configs/cuda_pytorch_inference_bert_tp.yaml new file mode 100644 index 00000000..249fc85a --- /dev/null +++ b/tests/configs/cuda_pytorch_inference_bert_tp.yaml @@ -0,0 +1,10 @@ +defaults: + - benchmark: inference + - backend: pytorch + - _base_ # inherits from base config + - _tp_ # inherits from tensor parallelism config + - _self_ # for hydra 1.1 compatibility + +model: bert-base-uncased +task: text-classification +device: cuda diff --git a/tests/test_cli.py b/tests/test_cli.py index 4bce083e..24030b59 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,8 +1,13 @@ import os import subprocess +from logging import getLogger +from subprocess import PIPE, STDOUT, Popen import pytest +LOGGER = getLogger("test") + + SINGLERUNS = [ config for config in os.listdir("tests/configs") @@ -21,10 +26,10 @@ @pytest.mark.parametrize("config_file", SINGLERUNS) -def test_configs(config_file): +def test_single_run(config_file): config_name = config_file.split(".")[0] - result = subprocess.run( + process = Popen( [ "optimum-benchmark", "--config-dir", @@ -32,10 +37,17 @@ def test_configs(config_file): "--config-name", config_name, ], - capture_output=True, + stdout=PIPE, + stderr=STDOUT, ) - assert result.returncode == 0, result.stderr.decode("utf-8") + for line in iter(process.stdout.readline, b""): + if line is not None: + LOGGER.info(line.decode("utf-8").rstrip()) + + process.wait() + + assert process.returncode == 0, process.stderr def test_exit_code(): @@ -48,7 +60,6 @@ def test_exit_code(): "cpu_pytorch_inference_bert", "model=inexistent_model", ], - capture_output=True, ) assert result.returncode == 1, result.stderr.decode("utf-8")