diff --git a/optimum_benchmark/benchmarks/inference/benchmark.py b/optimum_benchmark/benchmarks/inference/benchmark.py index 9cc96ee1..9df1ebfd 100644 --- a/optimum_benchmark/benchmarks/inference/benchmark.py +++ b/optimum_benchmark/benchmarks/inference/benchmark.py @@ -1,13 +1,12 @@ from logging import getLogger -from typing import List, Tuple, Dict from ..base import Benchmark from .config import InferenceConfig -from ...trackers.energy import EnergyTracker from ...trackers.memory import MemoryTracker -from ...trackers.latency import LatencyTracker from ...backends.base import Backend, BackendConfigT from ...generators.input_generator import InputGenerator +from ...trackers.energy import EnergyTracker, Efficiency +from ...trackers.latency import LatencyTracker, Throughput from ...import_utils import is_torch_distributed_available from ...task_utils import TEXT_GENERATION_TASKS, IMAGE_DIFFUSION_TASKS from .report import InferenceReport, TextGenerationReport, ImageDiffusionReport @@ -33,6 +32,17 @@ "num_beams": 1, } +EFFICIENCY_UNIT = "samples/kWh" +THROUGHPUT_UNIT = "samples/s" + +PREFILL_THROUGHPUT_UNIT = "tokens/s" +DECODE_THROUGHPUT_UNIT = "tokens/s" +CALL_THROUGHPUT_UNIT = "images/s" + +PREFILL_EFFICIENCY_UNIT = "tokens/kWh" +DECODE_EFFICIENCY_UNIT = "tokens/kWh" +CALL_EFFICIENCY_UNIT = "images/kWh" + class InferenceBenchmark(Benchmark[InferenceConfig]): NAME = "inference" @@ -42,6 +52,7 @@ def __init__(self, config: InferenceConfig) -> None: def run(self, backend: Backend[BackendConfigT]) -> None: if is_torch_distributed_available() and torch.distributed.is_initialized(): + LOGGER.info("\t+ Distributing batch size across processes") if self.config.input_shapes["batch_size"] % torch.distributed.get_world_size() != 0: raise ValueError( "The batch size must be divisible by the number of processes in a distributed environment" @@ -64,12 +75,7 @@ def run(self, backend: Backend[BackendConfigT]) -> None: LOGGER.info("\t+ Updating Text Generation kwargs with default values") self.config.generate_kwargs = {**TEXT_GENERATION_KWARGS, **self.config.generate_kwargs} LOGGER.info("\t+ Initializing Text Generation report") - self.report = TextGenerationReport( - batch_size=self.config.input_shapes["batch_size"], - sequence_length=self.config.input_shapes["sequence_length"], - num_new_tokens=self.config.generate_kwargs["max_new_tokens"], - num_return_sequences=self.config.generate_kwargs["num_return_sequences"], - ) + self.report = TextGenerationReport() elif backend.config.task in IMAGE_DIFFUSION_TASKS: LOGGER.info("\t+ Generating and preparing Image Diffusion input") @@ -78,19 +84,14 @@ def run(self, backend: Backend[BackendConfigT]) -> None: LOGGER.info("\t+ Updating Image Diffusion kwargs with default values") self.config.forward_kwargs = {**IMAGE_DIFFUSION_KWARGS, **self.config.forward_kwargs} LOGGER.info("\t+ Initializing Image Diffusion report") - self.report = ImageDiffusionReport( - batch_size=self.config.input_shapes["batch_size"], - num_images_per_prompts=self.config.forward_kwargs["num_images_per_prompt"], - ) + self.report = ImageDiffusionReport() else: LOGGER.info("\t+ Generating and preparing Inference input") self.forward_inputs = self.input_generator(mode="forward") self.forward_inputs = backend.prepare_inputs(self.forward_inputs) LOGGER.info("\t+ Initializing Inference report") - self.report = InferenceReport( - batch_size=self.config.input_shapes["batch_size"], - ) + self.report = InferenceReport() LOGGER.info("\t+ Preparing backend for Inference") backend.prepare_for_inference( @@ -103,11 +104,9 @@ def run(self, backend: Backend[BackendConfigT]) -> None: LOGGER.info("\t+ Warming up backend for Inference") for _ in range(self.config.warmup_runs): if backend.config.task in TEXT_GENERATION_TASKS: - generate_warmup_kwargs = {"max_new_tokens": 2, "min_new_tokens": 2} - _ = backend.generate(self.generate_input, generate_warmup_kwargs) + _ = backend.generate(self.generate_input, {"max_new_tokens": 2, "min_new_tokens": 2}) elif backend.config.task in IMAGE_DIFFUSION_TASKS: - diffuse_warmup_kwargs = {"num_inference_steps": 2} - _ = backend.call(self.diffuse_input, diffuse_warmup_kwargs) + _ = backend.call(self.diffuse_input, {"num_inference_steps": 2}) else: _ = backend.forward(self.forward_inputs, self.config.forward_kwargs) @@ -117,161 +116,180 @@ def run(self, backend: Backend[BackendConfigT]) -> None: backend=backend.config.name, device=backend.config.device, device_ids=backend.config.device_ids ) if backend.config.task in TEXT_GENERATION_TASKS: - forward_memories_dict, generate_memories_dict = self.run_text_generation_memory_tracking(backend) - self.report.populate_memory(forward_memories_dict, generate_memories_dict) + self.run_text_generation_memory_tracking(backend) elif backend.config.task in IMAGE_DIFFUSION_TASKS: - call_memories_dict = self.run_image_diffusion_memory_tracking(backend) - self.report.populate_memory(call_memories_dict) + self.run_image_diffusion_memory_tracking(backend) else: - forward_memories_dict = self.run_inference_memory_tracking(backend) - self.report.populate_memory(forward_memories_dict) + self.run_inference_memory_tracking(backend) - self.report.log_memory() + self.report.log_max_memory() if self.config.latency: LOGGER.info("\t+ Creating inference latency tracker") self.latency_tracker = LatencyTracker(backend=backend.config.name, device=backend.config.device) if backend.config.task in TEXT_GENERATION_TASKS: - forward_latencies_dict, generate_latencies_dict = self.run_text_generation_latency_tracking(backend) - self.report.populate_latency(forward_latencies_dict, generate_latencies_dict) + self.run_text_generation_latency_tracking(backend) elif backend.config.task in IMAGE_DIFFUSION_TASKS: - call_latencies_dict = self.run_image_diffusion_latency_tracking(backend) - self.report.populate_latency(call_latencies_dict) + self.run_image_diffusion_latency_tracking(backend) else: - forward_latencies_dict = self.run_latency_inference_tracking(backend) - self.report.populate_latency(forward_latencies_dict) + self.run_latency_inference_tracking(backend) self.report.log_latency() + self.report.log_throughput() if self.config.energy: LOGGER.info("\t+ Creating inference energy tracker") self.energy_tracker = EnergyTracker(device=backend.config.device, device_ids=backend.config.device_ids) if backend.config.task in TEXT_GENERATION_TASKS: - forward_energies_dict, generate_energies_dict = self.run_text_generation_energy_tracking(backend) - self.report.populate_energy(forward_energies_dict, generate_energies_dict) + self.run_text_generation_energy_tracking(backend) elif backend.config.task in IMAGE_DIFFUSION_TASKS: - call_energies_dict = self.run_image_diffusion_energy_tracking(backend) - self.report.populate_energy(call_energies_dict) + self.run_image_diffusion_energy_tracking(backend) else: - forward_energies_dict = self.run_inference_energy_tracking(backend) - self.report.populate_energy(forward_energies_dict) + self.run_inference_energy_tracking(backend) self.report.log_energy() + self.report.log_efficiency() ## Memory tracking - def run_text_generation_memory_tracking(self, backend: Backend) -> Tuple[Dict[str, float], Dict[str, float]]: + def run_text_generation_memory_tracking(self, backend: Backend): LOGGER.info("\t+ Running memory tracking") self.memory_tracker.reset() with self.memory_tracker.track(): _ = backend.forward(self.forward_inputs, self.config.forward_kwargs) - forward_memories_dict = self.memory_tracker.get_memories_dict() + self.report.prefill.max_memory = self.memory_tracker.get_max_memory() self.memory_tracker.reset() with self.memory_tracker.track(): _ = backend.generate(self.generate_input, self.config.generate_kwargs) - generate_memories_dict = self.memory_tracker.get_memories_dict() - - return forward_memories_dict, generate_memories_dict + self.report.decode.max_memory = self.memory_tracker.get_max_memory() - def run_image_diffusion_memory_tracking(self, backend: Backend) -> Dict[str, float]: + def run_image_diffusion_memory_tracking(self, backend: Backend): LOGGER.info("\t+ Running memory tracking") self.memory_tracker.reset() with self.memory_tracker.track(): _ = backend.call(self.diffuse_input, self.config.forward_kwargs) - call_memories_dict = self.memory_tracker.get_memories_dict() + self.report.call.max_memory = self.memory_tracker.get_max_memory() - return call_memories_dict - - def run_inference_memory_tracking(self, backend: Backend) -> Dict[str, float]: + def run_inference_memory_tracking(self, backend: Backend): LOGGER.info("\t+ Running memory tracking") self.memory_tracker.reset() with self.memory_tracker.track(): _ = backend.forward(self.forward_inputs, self.config.forward_kwargs) - forward_memories_dict = self.memory_tracker.get_memories_dict() - - return forward_memories_dict + self.report.forward.max_memory = self.memory_tracker.get_max_memory() ## Latency tracking - def run_text_generation_latency_tracking(self, backend: Backend) -> Tuple[List[float], List[float]]: + def run_text_generation_latency_tracking(self, backend: Backend): LOGGER.info("\t+ Running latency tracking") self.latency_tracker.reset() - while self.latency_tracker.get_total_latency() < self.config.duration: + while self.latency_tracker.get_elapsed_time() < self.config.duration: with self.latency_tracker.track(): _ = backend.forward(self.forward_inputs, self.config.forward_kwargs) - forward_latencies_list = self.latency_tracker.get_latencies_list() + self.report.prefill.latency = self.latency_tracker.get_latency() + self.report.prefill.throughput = Throughput.from_latency( + self.report.prefill.latency, self.prefill_volume, unit=PREFILL_THROUGHPUT_UNIT + ) self.latency_tracker.reset() - while self.latency_tracker.get_total_latency() < self.config.duration: + while self.latency_tracker.get_elapsed_time() < self.config.duration: with self.latency_tracker.track(): _ = backend.generate(self.generate_input, self.config.generate_kwargs) - generate_latencies_list = self.latency_tracker.get_latencies_list() - - return forward_latencies_list, generate_latencies_list + self.report.decode.latency = self.latency_tracker.get_latency() - self.report.prefill.latency.mean + self.report.decode.throughput = Throughput.from_latency( + self.report.decode.latency, self.decode_volume, unit=DECODE_THROUGHPUT_UNIT + ) - def run_image_diffusion_latency_tracking(self, backend: Backend) -> List[float]: + def run_image_diffusion_latency_tracking(self, backend: Backend): LOGGER.info("\t+ Running latency tracking") self.latency_tracker.reset() - while self.latency_tracker.get_total_latency() < self.config.duration: + while self.latency_tracker.get_elapsed_time() < self.config.duration: with self.latency_tracker.track(): _ = backend.call(self.diffuse_input, self.config.forward_kwargs) - call_latencies_list = self.latency_tracker.get_latencies_list() - - return call_latencies_list + self.report.call.latency = self.latency_tracker.get_latency() + self.report.call.throughput = Throughput.from_latency( + self.report.call.latency, self.call_volume, unit=CALL_THROUGHPUT_UNIT + ) - def run_latency_inference_tracking(self, backend: Backend) -> List[float]: + def run_latency_inference_tracking(self, backend: Backend): LOGGER.info("\t+ Running latency tracking") self.latency_tracker.reset() - while self.latency_tracker.get_total_latency() < self.config.duration: + while self.latency_tracker.get_elapsed_time() < self.config.duration: with self.latency_tracker.track(): _ = backend.forward(self.forward_inputs, self.config.forward_kwargs) - forward_latencies_list = self.latency_tracker.get_latencies_list() - - return forward_latencies_list + self.report.forward.latency = self.latency_tracker.get_latency() + self.report.forward.throughput = Throughput.from_latency( + self.report.forward.latency, self.forward_volume, unit=THROUGHPUT_UNIT + ) ## Energy tracking - def run_text_generation_energy_tracking(self, backend: Backend) -> Tuple[Dict[str, float], Dict[str, float]]: + def run_text_generation_energy_tracking(self, backend: Backend): LOGGER.info("\t+ Running energy tracking") self.energy_tracker.reset() with self.energy_tracker.track(): _ = backend.forward(self.forward_inputs, self.config.forward_kwargs) - forward_energies_dict = self.energy_tracker.get_energies_dict() + self.report.prefill.energy = self.energy_tracker.get_energy() + self.report.prefill.efficiency = Efficiency.from_energy( + self.report.prefill.energy, self.prefill_volume, unit=PREFILL_EFFICIENCY_UNIT + ) self.energy_tracker.reset() with self.energy_tracker.track(): _ = backend.generate(self.generate_input, self.config.generate_kwargs) - generate_energies_dict = self.energy_tracker.get_energies_dict() - - return forward_energies_dict, generate_energies_dict + self.report.decode.energy = self.energy_tracker.get_energy() - self.report.prefill.energy + self.report.decode.efficiency = Efficiency.from_energy( + self.report.decode.energy, self.decode_volume, unit=DECODE_EFFICIENCY_UNIT + ) - def run_image_diffusion_energy_tracking(self, backend: Backend) -> Dict[str, float]: + def run_image_diffusion_energy_tracking(self, backend: Backend): LOGGER.info("\t+ Running energy tracking") self.energy_tracker.reset() with self.energy_tracker.track(): _ = backend.call(self.diffuse_input, self.config.forward_kwargs) - call_energies_dict = self.energy_tracker.get_energies_dict() - - return call_energies_dict + self.report.call.energy = self.energy_tracker.get_energy() + self.report.call.efficiency = Efficiency.from_energy( + self.report.call.energy, self.call_volume, unit=CALL_EFFICIENCY_UNIT + ) - def run_inference_energy_tracking(self, backend: Backend) -> Dict[str, float]: + def run_inference_energy_tracking(self, backend: Backend): LOGGER.info("\t+ Running energy tracking") self.energy_tracker.reset() with self.energy_tracker.track(): _ = backend.forward(self.forward_inputs, self.config.forward_kwargs) - forward_energies_dict = self.energy_tracker.get_energies_dict() + self.report.forward.energy = self.energy_tracker.get_energy() + self.report.forward.efficiency = Efficiency.from_energy( + self.report.forward.energy, self.forward_volume, unit=EFFICIENCY_UNIT + ) + + @property + def forward_volume(self) -> int: # in samples + return self.config.input_shapes["batch_size"] - return forward_energies_dict + @property + def prefill_volume(self) -> int: # in tokens + return self.config.input_shapes["batch_size"] * self.config.input_shapes["sequence_length"] + + @property + def call_volume(self) -> int: # in images + return self.config.input_shapes["batch_size"] * self.config.forward_kwargs["num_images_per_prompt"] + + @property + def decode_volume(self) -> int: # in tokens + return ( + self.config.input_shapes["batch_size"] + * self.config.generate_kwargs["num_return_sequences"] + * self.config.generate_kwargs["max_new_tokens"] + ) def get_report(self) -> InferenceReport: return self.report diff --git a/optimum_benchmark/benchmarks/inference/callback.py b/optimum_benchmark/benchmarks/inference/callback.py deleted file mode 100644 index 4871691d..00000000 --- a/optimum_benchmark/benchmarks/inference/callback.py +++ /dev/null @@ -1,25 +0,0 @@ -import time - -from ...import_utils import is_torch_available - -from transformers import LogitsProcessor - -if is_torch_available(): - import torch - - -# TODO: uses this class for more fine-grained latency measurements in text generation -class MeasurementProcessor(LogitsProcessor): - def __init__(self, device: str, backend: str): - self.device = device - self.backend = backend - - self.latencies = [] - - def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor): - """ - Callback to track the time it takes to generate one batch of tokens. - """ - self.latencies.append(time.perf_counter_ns()) - - return scores diff --git a/optimum_benchmark/benchmarks/inference/config.py b/optimum_benchmark/benchmarks/inference/config.py index d5c4a0bb..f2c1d4ab 100644 --- a/optimum_benchmark/benchmarks/inference/config.py +++ b/optimum_benchmark/benchmarks/inference/config.py @@ -9,8 +9,8 @@ INPUT_SHAPES = { "batch_size": 2, - "sequence_length": 16, "num_choices": 2, + "sequence_length": 16, } diff --git a/optimum_benchmark/benchmarks/inference/report.py b/optimum_benchmark/benchmarks/inference/report.py index 9cd43cfc..1f2edd52 100644 --- a/optimum_benchmark/benchmarks/inference/report.py +++ b/optimum_benchmark/benchmarks/inference/report.py @@ -1,353 +1,22 @@ -from dataclasses import dataclass, field -from statistics import mean, stdev -from typing import Any, Dict, List +from dataclasses import dataclass from logging import getLogger -from ..report import BenchmarkReport +from ..report import BenchmarkReport, BenchmarkMeasurements LOGGER = getLogger("report") @dataclass class InferenceReport(BenchmarkReport): - # Config - batch_size: int - # Metrics - forward: Dict[str, Any] = field(default_factory=dict) - - # POPULATING - def populate_latency(self, forward_latencies_list: List[float]): - ## Latency - self.forward["latency"] = { - "list[s]": forward_latencies_list, - "mean(s)": compute_mean(forward_latencies_list), - "stdev(s)": compute_stdev(forward_latencies_list), - } - ## Throughput - forward_throughputs_list = [self.batch_size / latency for latency in forward_latencies_list] - self.forward["throughput"] = { - "list[samples/s]": forward_throughputs_list, - "mean(samples/s)": compute_mean(forward_throughputs_list), - "stdev(samples/s)": compute_stdev(forward_throughputs_list), - } - - def populate_memory(self, forward_memories_dict: Dict[str, Any]): - self.forward["memory"] = forward_memories_dict - - def populate_energy(self, forward_energies_dict: Dict[str, Any]): - self.forward["energy"] = forward_energies_dict - - # LOGGING - def log_latency(self): - for key, value in self.forward["latency"].items(): - if "list" in key: - continue - LOGGER.info(f"\t+ forward.latency.{key}: {value:f} (s)") - for key, value in self.forward["throughput"].items(): - if "list" in key: - continue - LOGGER.info(f"\t+ forward.throughput.{key}: {value:f} (samples/s)") - - def log_memory(self): - for key, value in self.forward["memory"].items(): - LOGGER.info(f"\t+ forward.memory.{key}: {value:f} (MB)") - - def log_energy(self): - for key, value in self.forward["energy"].items(): - LOGGER.info(f"\t+ forward.energy.{key}: {value:f} (kWh)") - - def log_all(self) -> None: - if "latency" in self.forward: - self.log_latency() - if "memory" in self.forward: - self.log_memory() - if "energy" in self.forward: - self.log_energy() - - # add operator to aggregate multiple reports - def __add__(self, other: "InferenceReport") -> "InferenceReport": - agg_report = InferenceReport(batch_size=self.batch_size + other.batch_size) - if "latency" in self.forward and "latency" in other.forward: - agg_forward_latencies_list = [ - (lat_1 + lat_2) / 2 - for lat_1, lat_2 in zip(self.forward["latency"]["list[s]"], other.forward["latency"]["list[s]"]) - ] - agg_report.populate_latency(agg_forward_latencies_list) - - if "memory" in self.forward and "memory" in other.forward: - agg_forward_memories_dict = {} - for key in self.forward["memory"]: - if "vram" in key: - # our vram measures are not process-specific - agg_forward_memories_dict[key] = max(self.forward["memory"][key], other.forward["memory"][key]) - else: - # ram and pytorch measures are process-specific - agg_forward_memories_dict[key] = self.forward["memory"][key] + other.forward["memory"][key] - - agg_report.populate_memory(agg_forward_memories_dict) - - if "energy" in self.forward and "energy" in other.forward: - agg_forward_energies_dict = {} - for key in self.forward["energy"]: - # theoretically, the energies measured by codecarbon are process-specific (it's not clear from the code) - agg_forward_energies_dict[key] = self.forward["energy"][key] + other.forward["energy"][key] - - agg_report.populate_energy(agg_forward_energies_dict) - - return agg_report + forward: BenchmarkMeasurements = BenchmarkMeasurements() @dataclass class ImageDiffusionReport(BenchmarkReport): - # Config - batch_size: int - num_images_per_prompts: int - # Metrics - call: Dict[str, Any] = field(default_factory=dict) - - # POPULATING - def populate_latency(self, call_latencies_list: List[float]): - ## Latency - self.call["latency"] = { - "list[s]": call_latencies_list, - "mean(s)": compute_mean(call_latencies_list), - "stdev(s)": compute_stdev(call_latencies_list), - } - ## Throughput - call_throughputs_list = [ - self.batch_size * self.num_images_per_prompts / latency for latency in call_latencies_list - ] - self.call["throughput"] = { - "list[images/s]": call_throughputs_list, - "mean[images/s]": compute_mean(call_throughputs_list), - "stdev[images/s]": compute_stdev(call_throughputs_list), - } - - def populate_memory(self, call_memories_dict: Dict[str, Any]): - self.call["memory"] = call_memories_dict - - def populate_energy(self, call_energies_dict: Dict[str, Any]): - self.call["energy"] = call_energies_dict - - # LOGGING - def log_latency(self): - for key, value in self.call["latency"].items(): - if "list" in key: - continue - LOGGER.info(f"\t+ call.latency.{key}: {value:f} (s)") - for key, value in self.call["throughput"].items(): - if "list" in key: - continue - LOGGER.info(f"\t+ call.throughput.{key}: {value:f} (images/s)") - - def log_memory(self): - for key, value in self.call["memory"].items(): - LOGGER.info(f"\t+ call.memory.{key}: {value:f} (MB)") - - def log_energy(self): - for key, value in self.call["energy"].items(): - LOGGER.info(f"\t+ call.energy.{key}: {value:f} (kWh)") - - def log_all(self) -> None: - if "latency" in self.call: - self.log_latency() - if "memory" in self.call: - self.log_memory() - if "energy" in self.call: - self.log_energy() - - # add operator to aggregate multiple reports - def __add__(self, other: "ImageDiffusionReport") -> "ImageDiffusionReport": - assert self.num_images_per_prompts == other.num_images_per_prompts, "num_images_per_prompts must be the same" - - agg_report = ImageDiffusionReport( - batch_size=self.batch_size + other.batch_size, - num_images_per_prompts=self.num_images_per_prompts, - ) - if "latency" in self.call and "latency" in other.call: - agg_call_latencies_list = [ - (lat_1 + lat_2) / 2 - for lat_1, lat_2 in zip(self.call["latency"]["list[s]"], other.call["latency"]["list[s]"]) - ] - agg_report.populate_latency(agg_call_latencies_list) - - if "memory" in self.call and "memory" in other.call: - agg_call_memories_dict = {} - for key in self.call["memory"]: - if "vram" in key: - # our vram measures are not process-specific - agg_call_memories_dict[key] = max(self.call["memory"][key], other.call["memory"][key]) - else: - # ram and pytorch measures are process-specific - agg_call_memories_dict[key] = self.call["memory"][key] + other.call["memory"][key] - - agg_report.populate_memory(agg_call_memories_dict) - - if "energy" in self.call and "energy" in other.call: - agg_call_energies_dict = {} - for key in self.call["energy"]: - # theoretically, the energies measured by codecarbon are process-specific (it's not clear from the code) - agg_call_energies_dict[key] = self.call["energy"][key] + other.call["energy"][key] - - agg_report.populate_energy(agg_call_energies_dict) - - return agg_report + call: BenchmarkMeasurements = BenchmarkMeasurements() @dataclass class TextGenerationReport(BenchmarkReport): - # Config - batch_size: int - sequence_length: int - num_new_tokens: int - num_return_sequences: int - # Prefill Metrics - prefill: Dict[str, Any] = field(default_factory=dict) - # Decode Metrics - decode: Dict[str, Any] = field(default_factory=dict) - - def populate_latency(self, forward_latencies_list: List[float], generate_latencies_list: List[float]): - ## Latency - self.prefill["latency"] = { - "list[s]": forward_latencies_list, - "mean(s)": compute_mean(forward_latencies_list), - "stdev(s)": compute_stdev(forward_latencies_list), - } - ## Throughput - prefill_throughputs_list = [ - self.batch_size * self.sequence_length / latency for latency in forward_latencies_list - ] - self.prefill["throughput"] = { - "list[tokens/s]": prefill_throughputs_list, - "mean[tokens/s]": compute_mean(prefill_throughputs_list), - "stdev[tokens/s]": compute_stdev(prefill_throughputs_list), - } - ## Latency - decode_latencies_list = [ - generate_latency - self.prefill["latency"]["mean(s)"] for generate_latency in generate_latencies_list - ] - self.decode["latency"] = { - "list[s]": decode_latencies_list, - "mean(s)": compute_mean(decode_latencies_list), - "stdev(s)": compute_stdev(decode_latencies_list), - } - ## Throughput - decode_throughputs_list = [ - self.batch_size * self.num_new_tokens * self.num_return_sequences / latency - for latency in decode_latencies_list - ] - self.decode["throughput"] = { - "list[tokens/s]": decode_throughputs_list, - "mean[tokens/s]": compute_mean(decode_throughputs_list), - "stdev[tokens/s]": compute_stdev(decode_throughputs_list), - } - - def populate_memory(self, forward_memories_dict: Dict[str, Any], generate_memories_dict: Dict[str, Any]): - self.prefill["memory"] = forward_memories_dict - self.decode["memory"] = generate_memories_dict - - def populate_energy(self, forward_energies_dict: Dict[str, Any], generate_energies_dict: Dict[str, Any]): - self.prefill["energy"] = forward_energies_dict - self.decode["energy"] = generate_energies_dict - - # LOGGING - def log_latency(self): - for key, value in self.prefill["latency"].items(): - if "list" in key: - continue - LOGGER.info(f"\t+ prefill.latency.{key}: {value:f} (s)") - for key, value in self.prefill["throughput"].items(): - if "list" in key: - continue - LOGGER.info(f"\t+ prefill.throughput.{key}: {value:f} (tokens/s)") - for key, value in self.decode["latency"].items(): - if "list" in key: - continue - LOGGER.info(f"\t+ decode.latency.{key}: {value:f} (s)") - for key, value in self.decode["throughput"].items(): - if "list" in key: - continue - LOGGER.info(f"\t+ decode.throughput.{key}: {value:f} (tokens/s)") - - def log_memory(self): - for key, value in self.prefill["memory"].items(): - LOGGER.info(f"\t+ prefill.memory.{key}: {value:f} (MB)") - for key, value in self.decode["memory"].items(): - LOGGER.info(f"\t+ decode.memory.{key}: {value:f} (MB)") - - def log_energy(self): - for key, value in self.prefill["energy"].items(): - LOGGER.info(f"\t+ prefill.energy.{key}: {value:f} (kWh)") - for key, value in self.decode["energy"].items(): - LOGGER.info(f"\t+ decode.energy.{key}: {value:f} (kWh)") - - def log_all(self) -> None: - if "latency" in self.prefill: - self.log_latency() - if "memory" in self.prefill: - self.log_memory() - if "energy" in self.prefill: - self.log_energy() - - # add operator to aggregate multiple reports - def __add__(self, other: "TextGenerationReport") -> "TextGenerationReport": - agg_report = TextGenerationReport( - batch_size=self.batch_size + other.batch_size, - sequence_length=self.sequence_length, - num_new_tokens=self.num_new_tokens, - num_return_sequences=self.num_return_sequences, - ) - if "latency" in self.prefill and "latency" in other.prefill: - agg_forward_latencies_list = [ - (lat_1 + lat_2) / 2 - for lat_1, lat_2 in zip(self.prefill["latency"]["list[s]"], other.prefill["latency"]["list[s]"]) - ] - agg_generate_latencies_list = [ - (lat_1 + lat_2) / 2 - for lat_1, lat_2 in zip(self.decode["latency"]["list[s]"], other.decode["latency"]["list[s]"]) - ] - agg_report.populate_latency(agg_forward_latencies_list, agg_generate_latencies_list) - - if "memory" in self.prefill and "memory" in other.prefill: - agg_forward_memories_dict = {} - for key in self.prefill["memory"]: - if "vram" in key: - # our vram measures are not process-specific - agg_forward_memories_dict[key] = max(self.prefill["memory"][key], other.prefill["memory"][key]) - else: - # ram and pytorch measures are process-specific - agg_forward_memories_dict[key] = self.prefill["memory"][key] + other.prefill["memory"][key] - - agg_generate_memories_dict = {} - for key in self.decode["memory"]: - if "vram" in key: - # our vram measures are not process-specific - agg_generate_memories_dict[key] = max(self.decode["memory"][key], other.decode["memory"][key]) - else: - # ram and pytorch measures are process-specific - agg_generate_memories_dict[key] = self.decode["memory"][key] + other.decode["memory"][key] - - agg_report.populate_memory(agg_forward_memories_dict, agg_generate_memories_dict) - - if "energy" in self.prefill and "energy" in other.prefill: - agg_forward_energies_dict = {} - for key in self.prefill["energy"]: - # theoretically, the energies measured by codecarbon are process-specific (it's not clear from the code) - agg_forward_energies_dict[key] = self.prefill["energy"][key] + other.prefill["energy"][key] - - agg_generate_energies_dict = {} - for key in self.decode["energy"]: - # theoretically, the energies measured by codecarbon are process-specific (it's not clear from the code) - agg_generate_energies_dict[key] = self.decode["energy"][key] + other.decode["energy"][key] - - agg_report.populate_energy(agg_forward_energies_dict, agg_generate_energies_dict) - - return agg_report - - -def compute_mean(values: List[float]) -> float: - return mean(values) if len(values) > 0 else 0.0 - - -def compute_stdev(values: List[float]) -> float: - return stdev(values) if len(values) > 1 else 0.0 + prefill: BenchmarkMeasurements = BenchmarkMeasurements() + decode: BenchmarkMeasurements = BenchmarkMeasurements() diff --git a/optimum_benchmark/benchmarks/report.py b/optimum_benchmark/benchmarks/report.py index 69491d65..f4600425 100644 --- a/optimum_benchmark/benchmarks/report.py +++ b/optimum_benchmark/benchmarks/report.py @@ -1,12 +1,44 @@ from dataclasses import dataclass, asdict -from typing import Union, Optional +from typing import Optional, Union, List +from logging import getLogger from json import dump import os +from ..trackers.latency import Latency, Throughput +from ..trackers.energy import Energy, Efficiency +from ..trackers.memory import MaxMemory + from transformers.configuration_utils import PushToHubMixin from flatten_dict import flatten import pandas as pd +LOGGER = getLogger("report") + + +@dataclass +class BenchmarkMeasurements: + max_memory: Optional[MaxMemory] = None + latency: Optional[Latency] = None + throughput: Optional[Throughput] = None + energy: Optional[Energy] = None + efficiency: Optional[Efficiency] = None + + @staticmethod + def aggregate(measurements: List["BenchmarkMeasurements"]) -> "BenchmarkMeasurements": + max_memory = MaxMemory.aggregate([m.max_memory for m in measurements if m.max_memory is not None]) + latency = Latency.aggregate([m.latency for m in measurements if m.latency is not None]) + throughput = Throughput.aggregate([m.throughput for m in measurements if m.throughput is not None]) + energy = Energy.aggregate([m.energy for m in measurements if m.energy is not None]) + efficiency = Efficiency.aggregate([m.efficiency for m in measurements if m.efficiency is not None]) + + return BenchmarkMeasurements( + max_memory=max_memory, + latency=latency, + throughput=throughput, + energy=energy, + efficiency=efficiency, + ) + @dataclass class BenchmarkReport(PushToHubMixin): @@ -69,5 +101,49 @@ def to_dataframe(self) -> pd.DataFrame: def to_csv(self, path: str) -> None: self.to_dataframe().to_csv(path, index=False) - def log_all(self) -> None: - raise NotImplementedError("`log_all` method must be implemented in the child class") + def log_max_memory(self): + for target in self.to_dict().keys(): + benchmark_measurements: BenchmarkMeasurements = getattr(self, target) + if benchmark_measurements.max_memory is not None: + benchmark_measurements.max_memory.log(prefix=target) + + def log_latency(self): + for target in self.to_dict().keys(): + benchmark_measurements: BenchmarkMeasurements = getattr(self, target) + if benchmark_measurements.latency is not None: + benchmark_measurements.latency.log(prefix=target) + + def log_throughput(self): + for target in self.to_dict().keys(): + benchmark_measurements: BenchmarkMeasurements = getattr(self, target) + if benchmark_measurements.throughput is not None: + benchmark_measurements.throughput.log(prefix=target) + + def log_energy(self): + for target in self.to_dict().keys(): + benchmark_measurements: BenchmarkMeasurements = getattr(self, target) + if benchmark_measurements.energy is not None: + benchmark_measurements.energy.log(prefix=target) + + def log_efficiency(self): + for target in self.to_dict().keys(): + benchmark_measurements: BenchmarkMeasurements = getattr(self, target) + if benchmark_measurements.efficiency is not None: + benchmark_measurements.efficiency.log(prefix=target) + + def log_all(self): + self.log_max_memory() + self.log_latency() + self.log_throughput() + self.log_energy() + self.log_efficiency() + + @classmethod + def aggregate(cls, reports: List["BenchmarkReport"]) -> "BenchmarkReport": + aggregated_report = cls() + for target in aggregated_report.to_dict().keys(): + measurements = [getattr(report, target) for report in reports] + aggregated_measurements = BenchmarkMeasurements.aggregate(measurements) + setattr(aggregated_report, target, aggregated_measurements) + + return aggregated_report diff --git a/optimum_benchmark/benchmarks/training/benchmark.py b/optimum_benchmark/benchmarks/training/benchmark.py index 90c231d0..994e1206 100644 --- a/optimum_benchmark/benchmarks/training/benchmark.py +++ b/optimum_benchmark/benchmarks/training/benchmark.py @@ -5,15 +5,18 @@ from .config import TrainingConfig from .report import TrainingReport from ...trackers.memory import MemoryTracker -from ...trackers.energy import EnergyTracker -from .callback import LatencyTrainerCallback from ...backends.base import Backend, BackendConfigT +from ...trackers.energy import EnergyTracker, Efficiency from ...generators.dataset_generator import DatasetGenerator +from ...trackers.latency import LatencyTrainerCallback, Throughput from transformers import default_data_collator LOGGER = getLogger("training") +TRAIN_THROUGHPUT_UNIT = "samples/s" +TRAIN_EFFICIENCY_UNIT = "samples/kWh" + class TrainingBenchmark(Benchmark[TrainingConfig]): NAME = "training" @@ -33,12 +36,7 @@ def run(self, backend: Backend[BackendConfigT]) -> None: training_dataset = dataset_generator() LOGGER.info("\t+ Initializing training report") - self.report = TrainingReport( - max_steps=self.config.max_steps, - warmup_steps=self.config.warmup_steps, - per_process_batch_size=self.config.training_arguments["per_device_train_batch_size"], - gradient_accumulation_steps=self.config.training_arguments["gradient_accumulation_steps"], - ) + self.report = TrainingReport() training_callbackes = [] if self.config.latency: @@ -71,16 +69,67 @@ def run(self, backend: Backend[BackendConfigT]) -> None: ) if self.config.latency: - self.report.populate_latency(overall_latencies_list=latency_callback.get_latencies_list()) + self.report.overall.latency = latency_callback.get_latency() + self.report.overall.throughput = Throughput.from_latency( + self.report.overall.latency, + volume=self.overall_volume, + unit=TRAIN_THROUGHPUT_UNIT, + ) + self.report.warmup.latency = self.report.overall.latency[: self.config.warmup_steps] + self.report.warmup.throughput = Throughput.from_latency( + self.report.warmup.latency, + volume=self.warmup_volume, + unit=TRAIN_THROUGHPUT_UNIT, + ) + self.report.train.latency = self.report.overall.latency[self.config.warmup_steps :] + self.report.train.throughput = Throughput.from_latency( + self.report.train.latency, + volume=self.train_volume, + unit=TRAIN_THROUGHPUT_UNIT, + ) + self.report.log_latency() + self.report.log_throughput() if self.config.memory: - self.report.populate_memory(overall_memories_dict=memory_tracker.get_memories_dict()) - self.report.log_memory() + # it's the same + self.report.overall.max_memory = memory_tracker.get_max_memory() + self.report.warmup.max_memory = memory_tracker.get_max_memory() + self.report.train.max_memory = memory_tracker.get_max_memory() + + self.report.log_max_memory() if self.config.energy: - self.report.populate_energy(overall_energies_dict=energy_tracker.get_energies_dict()) + # can only get overall energy consumption + self.report.overall.energy = energy_tracker.get_energy() + self.report.overall.efficiency = Efficiency.from_energy( + self.report.overall.energy, + volume=self.overall_volume, + unit=TRAIN_EFFICIENCY_UNIT, + ) + self.report.log_energy() + self.report.log_efficiency() + + @property + def overall_volume(self) -> int: + return ( + self.config.max_steps + * self.config.training_arguments["per_device_train_batch_size"] + * self.config.training_arguments["gradient_accumulation_steps"] + ) + + @property + def warmup_volume(self) -> int: + return ( + self.config.warmup_steps + * self.config.training_arguments["per_device_train_batch_size"] + * self.config.training_arguments["gradient_accumulation_steps"] + ) + + @property + def train_volume(self) -> int: + return self.overall_volume - self.warmup_volume def get_report(self) -> TrainingReport: return self.report diff --git a/optimum_benchmark/benchmarks/training/callback.py b/optimum_benchmark/benchmarks/training/callback.py deleted file mode 100644 index 88026d79..00000000 --- a/optimum_benchmark/benchmarks/training/callback.py +++ /dev/null @@ -1,43 +0,0 @@ -import time -from typing import List - -import torch -from transformers import TrainerCallback - - -class LatencyTrainerCallback(TrainerCallback): - def __init__(self, device: str, backend: str) -> None: - self.device = device - self.backend = backend - self.all_latencies_list = [] - - def on_step_begin(self, *args, **kwargs): - # one record per step - if self.device == "cuda" and self.backend == "pytorch": - self.all_latencies_list.append(torch.cuda.Event(enable_timing=True)) - self.all_latencies_list[-1].record() - else: - self.all_latencies_list.append(time.perf_counter_ns()) - - def on_train_end(self, *args, **kwargs): - # one last record to measure the time of the last step - if self.device == "cuda" and self.backend == "pytorch": - self.all_latencies_list.append(torch.cuda.Event(enable_timing=True)) - self.all_latencies_list[-1].record() - else: - self.all_latencies_list.append(time.perf_counter_ns()) - - def get_latencies_list(self) -> List[float]: - if self.device == "cuda" and self.backend == "pytorch": - torch.cuda.synchronize() # synchronize the device to make sure all events have been recorded - latencies_list = [ - self.all_latencies_list[i - 1].elapsed_time(self.all_latencies_list[i]) * 1e-3 - for i in range(1, len(self.all_latencies_list)) - ] - else: - latencies_list = [ - (self.all_latencies_list[i] - self.all_latencies_list[i - 1]) * 1e-9 - for i in range(1, len(self.all_latencies_list)) - ] - - return latencies_list diff --git a/optimum_benchmark/benchmarks/training/report.py b/optimum_benchmark/benchmarks/training/report.py index 9eeba211..90cd91f2 100644 --- a/optimum_benchmark/benchmarks/training/report.py +++ b/optimum_benchmark/benchmarks/training/report.py @@ -1,169 +1,13 @@ -from dataclasses import dataclass, field -from statistics import mean, stdev -from typing import Any, Dict, List +from dataclasses import dataclass from logging import getLogger -from ..report import BenchmarkReport +from ..report import BenchmarkReport, BenchmarkMeasurements LOGGER = getLogger("report") @dataclass class TrainingReport(BenchmarkReport): - max_steps: int - warmup_steps: int - per_process_batch_size: int - gradient_accumulation_steps: int - - overall: Dict[str, Any] = field(default_factory=dict) - training: Dict[str, Any] = field(default_factory=dict) - warmup: Dict[str, Any] = field(default_factory=dict) - - world_size: int = 1 - - # POPULATING - def populate_latency(self, overall_latencies_list: List[float]) -> None: - assert ( - len(overall_latencies_list) == self.max_steps - ), f"Expected {self.max_steps} latencies, but got {len(overall_latencies_list)} latencies" - # Overall - ## Latency - self.overall["latency"] = { - "list[s/step]": overall_latencies_list, - "mean(s/step)": compute_mean(overall_latencies_list), - "stdev(s/step)": compute_stdev(overall_latencies_list), - } - ## Throughput - overall_throughputs_list = [ - self.world_size * self.per_process_batch_size * self.gradient_accumulation_steps / latency - for latency in overall_latencies_list - ] - self.overall["throughput"] = { - "list[samples/s]": overall_throughputs_list, - "mean(samples/s)": compute_mean(overall_throughputs_list), - "stdev(samples/s)": compute_stdev(overall_throughputs_list), - } - # Training - ## Latency - training_latencies_list = overall_latencies_list[self.warmup_steps :] - self.training["latency"] = { - "list[s/step]": training_latencies_list, - "mean(s/step)": compute_mean(training_latencies_list), - "stdev(s/step)": compute_stdev(training_latencies_list), - } - ## Throughput - training_throughputs_list = overall_throughputs_list[self.warmup_steps :] - self.training["throughput"] = { - "list[samples/s]": training_throughputs_list, - "mean(samples/s)": compute_mean(training_throughputs_list), - "stdev(samples/s)": compute_stdev(training_throughputs_list), - } - # Warmup - ## Latency - warmup_latencies_list = overall_latencies_list[: self.warmup_steps] - self.warmup["latency"] = { - "list[s/step]": warmup_latencies_list, - "mean(s/step)": compute_mean(warmup_latencies_list), - "stdev(s/step)": compute_stdev(warmup_latencies_list), - } - ## Throughput - warmup_throughputs_list = overall_throughputs_list[: self.warmup_steps] - self.warmup["throughput"] = { - "list[samples/s]": warmup_throughputs_list, - "mean(samples/s)": compute_mean(warmup_throughputs_list), - "stdev(samples/s)": compute_stdev(warmup_throughputs_list), - } - - def populate_memory(self, overall_memories_dict: Dict[str, float]) -> None: - self.warmup["memory"] = overall_memories_dict - self.overall["memory"] = overall_memories_dict - self.training["memory"] = overall_memories_dict - - def populate_energy(self, overall_energies_dict: Dict[str, float]) -> None: - self.overall["energy"] = overall_energies_dict - # can't get training only or warmup only energies - # self.warmup["energy"] = overall_energies_dict - # self.training["energy"] = overall_energies_dict - # TODO: use a callback for energy instead of a tracker - - # LOGGING - def log_latency(self): - for key, value in self.training["latency"].items(): - if "list" in key: - continue - LOGGER.info(f"\t+ training.latency.{key}: {value:f} (s)") - for key, value in self.training["throughput"].items(): - if "list" in key: - continue - LOGGER.info(f"\t+ training.throughput.{key}: {value:f} (samples/s)") - - def log_memory(self): - for key, value in self.training["memory"].items(): - LOGGER.info(f"\t+ training.memory.{key}: {value:f} (MB)") - - def log_energy(self): - for key, value in self.overall["energy"].items(): - LOGGER.info(f"\t+ overall.energy.{key}: {value:f} (kWh)") - - def log_all(self): - if "latency" in self.training: - self.log_latency() - if "memory" in self.training: - self.log_memory() - if "energy" in self.training: - self.log_energy() - - # LOGIC - def __add__(self, other: "TrainingReport") -> "TrainingReport": - assert self.max_steps == other.max_steps, "Both reports must have the same max_steps" - assert self.warmup_steps == other.warmup_steps, "Both reports must have the same warmup_steps" - assert ( - self.gradient_accumulation_steps == other.gradient_accumulation_steps - ), "Both reports must have the same gradient_accumulation_steps" - - agg_report = TrainingReport( - max_steps=self.max_steps, - warmup_steps=self.warmup_steps, - world_size=self.world_size + other.world_size, - per_process_batch_size=self.per_process_batch_size, - gradient_accumulation_steps=self.gradient_accumulation_steps, - ) - - if "latency" in self.overall: - agg_overall_latencies_list = [ - max(lat_1, lat_2) - for lat_1, lat_2 in zip( - self.overall["latency"]["list[s/step]"], other.overall["latency"]["list[s/step]"] - ) - ] - agg_report.populate_latency(agg_overall_latencies_list) - - if "memory" in self.overall: - agg_overall_memories_dict = {} - for key in self.overall["memory"]: - if "vram" in key: - # our vram measures are not process-specific - agg_overall_memories_dict[key] = max(self.overall["memory"][key], other.overall["memory"][key]) - else: - # ram and pytorch measures are process-specific (can be accumulated) - agg_overall_memories_dict[key] = self.overall["memory"][key] + other.overall["memory"][key] - - agg_report.populate_memory(agg_overall_memories_dict) - - if "energy" in self.overall: - agg_overall_energies_dict = {} - for key in self.overall["energy"]: - # theoretically, the energies measured by codecarbon are process-specific (it's not clear from the code) - agg_overall_energies_dict[key] = self.overall["energy"][key] + other.overall["energy"][key] - - agg_report.populate_energy(agg_overall_energies_dict) - - return agg_report - - -def compute_mean(values: List[float]) -> float: - return mean(values) if len(values) > 0 else 0.0 - - -def compute_stdev(values: List[float]) -> float: - return stdev(values) if len(values) > 1 else 0.0 + overall: BenchmarkMeasurements = BenchmarkMeasurements() + warmup: BenchmarkMeasurements = BenchmarkMeasurements() + train: BenchmarkMeasurements = BenchmarkMeasurements() diff --git a/optimum_benchmark/launchers/torchrun/launcher.py b/optimum_benchmark/launchers/torchrun/launcher.py index f327e85c..b3003bd7 100644 --- a/optimum_benchmark/launchers/torchrun/launcher.py +++ b/optimum_benchmark/launchers/torchrun/launcher.py @@ -2,21 +2,19 @@ import multiprocessing as mp from logging import getLogger from multiprocessing import Queue -from typing import Callable, Dict, Any +from typing import Callable, Dict, Any, List from ..base import Launcher from .config import TorchrunConfig from ...logging_utils import setup_logging from ..isolation_utils import device_isolation from ...benchmarks.report import BenchmarkReport -from ...import_utils import is_torch_distributed_available -if is_torch_distributed_available(): - import torch.distributed - from torch.distributed import FileStore - from torch.distributed.elastic.multiprocessing import Std - from torch.distributed.elastic.multiprocessing.errors import record - from torch.distributed.launcher.api import LaunchConfig, launch_agent +import torch.distributed +from torch.distributed import FileStore +from torch.distributed.elastic.multiprocessing import Std +from torch.distributed.elastic.multiprocessing.errors import record +from torch.distributed.launcher.api import LaunchConfig, launch_agent LOGGER = getLogger("torchrun") @@ -62,7 +60,7 @@ def launch(self, worker: Callable, *worker_args) -> Dict[str, Any]: args=(worker, queue, current_log_level, *worker_args), ) - outputs = [] + outputs: List[BenchmarkReport] = [] while not queue.empty(): outputs.append(queue.get()) @@ -71,7 +69,7 @@ def launch(self, worker: Callable, *worker_args) -> Dict[str, Any]: report: BenchmarkReport = outputs[0] else: LOGGER.info(f"\t+ Merging benchmark reports from {len(outputs)} workers") - report: BenchmarkReport = sum(outputs[1:], outputs[0]) + report: BenchmarkReport = outputs[0].aggregate(outputs) report.log_all() return report diff --git a/optimum_benchmark/trackers/energy.py b/optimum_benchmark/trackers/energy.py index 7d3bb7ad..c75a0cc6 100644 --- a/optimum_benchmark/trackers/energy.py +++ b/optimum_benchmark/trackers/energy.py @@ -1,7 +1,9 @@ import os +from functools import reduce from logging import getLogger +from dataclasses import dataclass from contextlib import contextmanager -from typing import Optional, Dict +from typing import Optional, Literal, List from ..env_utils import get_cuda_device_ids from ..import_utils import is_codecarbon_available @@ -9,13 +11,82 @@ if is_codecarbon_available(): from codecarbon import EmissionsTracker, OfflineEmissionsTracker - LOGGER = getLogger("energy") +Energy_Unit_Literal = Literal["kWh"] +Efficiency_Unit_Literal = Literal["samples/kWh", "tokens/kWh", "images/kWh"] + + +@dataclass +class Energy: + unit: Energy_Unit_Literal + + cpu: float + ram: float + gpu: float + total: float + + def __add__(self, other: "Energy") -> "Energy": + if self.unit != other.unit: + raise ValueError(f"Cannot add energies with different units: {self.unit} and {other.unit}") + + return Energy( + unit=self.unit, + cpu=self.cpu + other.cpu, + gpu=self.gpu + other.gpu, + ram=self.ram + other.ram, + total=self.total + other.total, + ) + + @staticmethod + def aggregate(energies: List["Energy"]) -> "Energy": + if len(energies) == 0 or all(energy is None for energy in energies): + return None + elif any(energy is None for energy in energies): + raise ValueError("Some energy measurements are missing") + + return reduce(lambda x, y: x + y, energies) + + def log(self, prefix: str = "forward"): + LOGGER.info(f"\t\t+ {prefix} CPU energy: {self.cpu:f} ({self.unit})") + LOGGER.info(f"\t\t+ {prefix} GPU energy: {self.gpu:f} ({self.unit})") + LOGGER.info(f"\t\t+ {prefix} RAM energy: {self.ram:f} ({self.unit})") + LOGGER.info(f"\t\t+ {prefix} total energy: {self.total:f} ({self.unit})") + + +@dataclass +class Efficiency: + unit: Efficiency_Unit_Literal + + value: float + + def __add__(self, other: "Efficiency") -> "Efficiency": + if self.unit != other.unit: + raise ValueError(f"Cannot add efficiencies with different units: {self.unit} and {other.unit}") + + return Efficiency(value=(self.value + other.value) / 2, unit=self.unit) + + @staticmethod + def aggregate(efficiencies: List["Efficiency"]) -> "Efficiency": + if len(efficiencies) == 0 or all(efficiency is None for efficiency in efficiencies): + return None + elif any(efficiency is None for efficiency in efficiencies): + raise ValueError("Some efficiency measurements are missing") + + return reduce(lambda x, y: x + y, efficiencies) + + @staticmethod + def from_energy(energy: "Energy", volume: int, unit: str) -> "Efficiency": + return Efficiency(value=volume / energy.total if energy.total > 0 else 0, unit=unit) + + def log(self, prefix: str = "forward"): + LOGGER.info(f"\t\t+ {prefix} efficiency: {self.value:f} ({self.unit})") + class EnergyTracker: def __init__(self, device: str, device_ids: Optional[str] = None): self.device = device + self.device_ids = device_ids self.cpu_energy: float = 0 self.gpu_energy: float = 0 @@ -23,13 +94,12 @@ def __init__(self, device: str, device_ids: Optional[str] = None): self.total_energy: float = 0 if self.device == "cuda": - if device_ids is None: + if self.device_ids is None: LOGGER.warning("\t+ `device=cuda` but `device_ids` not provided. Using all available CUDA devices.") - self.device_ids = list(map(int, get_cuda_device_ids().split(","))) - else: - self.device_ids = list(map(int, device_ids.split(","))) - else: - self.device_ids = [] + self.device_ids = get_cuda_device_ids() + + self.device_ids = list(map(int, self.device_ids.split(","))) + LOGGER.info(f"\t+ Tracking GPU energy on devices {self.device_ids}") def reset(self): self.cpu_energy = 0 @@ -84,10 +154,11 @@ def track(self, interval=1, file_prefix="method"): def get_elapsed_time(self) -> float: return self.emission_tracker._last_measured_time - self.emission_tracker._start_time - def get_energies_dict(self) -> Dict[str, float]: - return { - "cpu_energy(kHh)": self.cpu_energy, - "gpu_energy(kHh)": self.gpu_energy, - "ram_energy(kHh)": self.ram_energy, - "total(kHh)": self.total_energy, - } + def get_energy(self) -> Energy: + return Energy( + unit="kWh", + cpu=self.cpu_energy, + gpu=self.gpu_energy, + ram=self.ram_energy, + total=self.total_energy, + ) diff --git a/optimum_benchmark/trackers/latency.py b/optimum_benchmark/trackers/latency.py index 369c2b70..21bb8f4d 100644 --- a/optimum_benchmark/trackers/latency.py +++ b/optimum_benchmark/trackers/latency.py @@ -1,91 +1,236 @@ +from typing import List, Literal, Union from contextlib import contextmanager +from dataclasses import dataclass from logging import getLogger -from typing import List +from functools import reduce import time -from ..import_utils import is_torch_distributed_available, is_torch_available +from .utils import compute_mean, compute_stdev -if is_torch_available(): - import torch - -if is_torch_distributed_available(): - import torch.distributed +from transformers import TrainerCallback, LogitsProcessor +import torch LOGGER = getLogger("latency") +Throughput_Unit_Literal = Literal["samples/s", "tokens/s", "images/s", "steps/s"] +Latency_Unit_Literal = Literal["s"] + + +@dataclass +class Latency: + unit: Latency_Unit_Literal + + mean: float + stdev: float + values: List[float] + + def __getitem__(self, index: int) -> float: + if isinstance(index, slice): + return Latency.from_values(values=self.values[index], unit=self.unit) + else: + return self.values[index] + + def __sub__(self, scalar: float) -> "Latency": + if not isinstance(scalar, (int, float)): + raise ValueError(f"Cannot subtract non-scalar value from latency: {scalar}") + + latencies = [lat - scalar for lat in self.values] + + return Latency.from_values(values=latencies, unit=self.unit) + + def __add__(self, other: "Latency") -> "Latency": + if self.unit != other.unit: + raise ValueError(f"Cannot add latencies with different units: {self.unit} and {other.unit}") + + return Latency.from_values(values=self.values + other.values, unit=self.unit) + + @staticmethod + def aggregate(latencies: List["Latency"]) -> "Latency": + if len(latencies) == 0 or all(latency is None for latency in latencies): + return None + elif any(latency is None for latency in latencies): + raise ValueError("Some latency measurements are missing") + + return reduce(lambda x, y: x + y, latencies) + + @staticmethod + def from_values(values: List[float], unit: str) -> "Latency": + return Latency(mean=compute_mean(values), stdev=compute_stdev(values), values=values, unit=unit) + + def log(self, prefix: str = "forward"): + LOGGER.info(f"\t\t+ {prefix} latency: {self.mean:f} ± 2 x {self.stdev:f} ({self.unit})") + + +@dataclass +class Throughput: + unit: Throughput_Unit_Literal + + mean: float + stdev: float + + def __add__(self, other: "Throughput") -> "Throughput": + if self.unit != other.unit: + raise ValueError(f"Cannot add throughputs with different units: {self.unit} and {other.unit}") + + return Throughput(mean=self.mean + other.mean, stdev=(self.stdev**2 + other.stdev**2) ** 0.5, unit=self.unit) + + @staticmethod + def aggregate(throughputs: List["Throughput"]) -> "Throughput": + if len(throughputs) == 0 or all(throughput is None for throughput in throughputs): + return None + elif any(throughput is None for throughput in throughputs): + raise ValueError("Some throughput measurements are missing") + + return reduce(lambda x, y: x + y, throughputs) + + @staticmethod + def from_values(values: List[float], unit: str) -> "Throughput": + return Throughput(mean=compute_mean(values), stdev=compute_stdev(values), unit=unit) + + @staticmethod + def from_latency(latency: "Latency", volume: int, unit: str) -> "Throughput": + throughputs = [volume / lat if lat > 0 else 0 for lat in latency.values] + return Throughput.from_values(values=throughputs, unit=unit) + + def log(self, prefix: str = "forward"): + LOGGER.info(f"\t\t+ {prefix} throughput: {self.mean:f} ± 2 x {self.stdev:f} ({self.unit})") + class LatencyTracker: def __init__(self, device: str, backend: str): self.device = device self.backend = backend - self.latencies: List[float] = [] + self.start_events: List[Union[float, torch.cuda.Event]] = [] + self.end_events: List[Union[float, torch.cuda.Event]] = [] + self.start_time: float = time.perf_counter() - # this is not in track, because this tracker is used repeatedly - if is_torch_distributed_available() and torch.distributed.is_initialized(): - LOGGER.info("\t+ Tracking Pytorch Distributed latency") - elif self.device == "cuda" and self.backend == "pytorch": + def reset(self): + self.start_time = time.perf_counter() + self.start_events = [] + self.end_events = [] + + if self.backend == "pytorch" and self.device == "cuda": LOGGER.info("\t+ Tracking Pytorch CUDA latency") else: LOGGER.info("\t+ Tracking CPU latency") - def reset(self): - self.latencies = [] - @contextmanager def track(self): - if is_torch_distributed_available() and torch.distributed.is_initialized(): - yield from self._pytorch_distributed_latency() - elif self.backend == "pytorch" and self.device == "cuda": + if self.backend == "pytorch" and self.device == "cuda": yield from self._pytorch_cuda_latency() else: yield from self._cpu_latency() - def _pytorch_distributed_latency(self): - torch.distributed.barrier() # synchronize before workload - start = time.perf_counter_ns() + def _pytorch_cuda_latency(self): + start = torch.cuda.Event(enable_timing=True) + start.record() + self.start_events.append(start) + yield - torch.distributed.barrier() # synchronize after workload - end = time.perf_counter_ns() - latency = (end - start) / 1e9 - self.latencies.append(latency) + end = torch.cuda.Event(enable_timing=True) + end.record() + self.end_events.append(end) - LOGGER.debug(f"\t+ Tracked Pytorch distributed latency: {latency:.2e}s") + def _cpu_latency(self): + start = time.perf_counter() + self.start_events.append(start) - def _pytorch_cuda_latency(self): - # Note: torch.cuda.Event is not used here, - # there's actually no specific need to use cuda events if you're synchronizing - # it's rather a feature that can be used to measure kernel latency without synchronizing, - # allowing us to measure the time it takes to perform an operation without necessarily stalling the GPU. - # An interesting use case is with cuda graphs where synchronization makes us shoot the optimization in the foot. - # details: https://developer.nvidia.com/blog/how-implement-performance-metrics-cuda-cc/ - torch.cuda.synchronize() # synchronize before workload - start = time.perf_counter_ns() yield - torch.cuda.synchronize() # synchronize after workload - end = time.perf_counter_ns() - latency = (end - start) / 1e9 - self.latencies.append(latency) + end = time.perf_counter() + self.end_events.append(end) - LOGGER.debug(f"\t+ Tracked Pytorch CUDA latency: {latency:.2e}s") + def get_elapsed_time(self) -> float: + # we measured in cpu to not synchronize all events + return time.perf_counter() - self.start_time - def _cpu_latency(self): - start = time.perf_counter_ns() - yield - end = time.perf_counter_ns() + def get_latency(self) -> Latency: + if self.backend == "pytorch" and self.device == "cuda": + # synchronize the device to make sure all events have been recorded + torch.cuda.synchronize() + latencies_list = [ + self.start_events[i].elapsed_time(self.end_events[i]) / 1e3 for i in range(len(self.start_events)) + ] + else: + latencies_list = [(self.end_events[i] - self.start_events[i]) for i in range(len(self.start_events))] - latency = (end - start) / 1e9 - self.latencies.append(latency) + return Latency.from_values(latencies_list, unit="s") - LOGGER.debug(f"\t+ Tracked CPU latency: {latency:.2e}s") - def get_total_count(self): - return len(self.latencies) +class LatencyTrainerCallback(TrainerCallback): + def __init__(self, device: str, backend: str) -> None: + self.device = device + self.backend = backend + + self.events: List[Union[float, torch.cuda.Event]] = [] + + def reset(self): + self.events = [] - def get_total_latency(self): - return sum(self.latencies) + def on_step_begin(self, *args, **kwargs): + if self.device == "cuda" and self.backend == "pytorch": + event = torch.cuda.Event(enable_timing=True) + event.record() + self.events.append(event) + else: + self.events.append(time.perf_counter()) + + def on_train_end(self, *args, **kwargs): + # one last record to measure the time of the last step + if self.device == "cuda" and self.backend == "pytorch": + event = torch.cuda.Event(enable_timing=True) + event.record() + self.events.append(event) + else: + self.events.append(time.perf_counter()) + + def get_latency(self) -> Latency: + if self.device == "cuda" and self.backend == "pytorch": + # synchronize the device to make sure all events have been recorded + torch.cuda.synchronize() + latencies_list = [ + self.events[i - 1].elapsed_time(self.events[i]) / 1e3 for i in range(1, len(self.events)) + ] + else: + latencies_list = [(self.events[i] - self.events[i - 1]) for i in range(1, len(self.events))] + + return Latency.from_values(latencies_list, unit="s") + + +class LatencyLogitsProcessor(LogitsProcessor): + def __init__(self, device: str, backend: str): + self.device = device + self.backend = backend + self.reset() + + def reset(self): + if self.device == "cuda" and self.backend == "pytorch": + event = torch.cuda.Event(enable_timing=True) + event.record() + self.events = [event] + else: + self.events = [time.perf_counter()] + + def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor): + if self.device == "cuda" and self.backend == "pytorch": + event = torch.cuda.Event(enable_timing=True) + event.record() + self.events.append(event) + else: + self.events.append(time.perf_counter()) + + return scores + + def get_latency(self) -> Latency: + if self.device == "cuda" and self.backend == "pytorch": + # synchronize the device to make sure all events have been recorded + torch.cuda.synchronize() + latencies_list = [ + self.events[i - 1].elapsed_time(self.events[i]) / 1e3 for i in range(1, len(self.events)) + ] + else: + latencies_list = [(self.events[i] - self.events[i - 1]) for i in range(1, len(self.events))] - def get_latencies_list(self) -> List[float]: - return self.latencies + return Latency.from_values(latencies_list, unit="s") diff --git a/optimum_benchmark/trackers/memory.py b/optimum_benchmark/trackers/memory.py index 816f1d5a..46eb778d 100644 --- a/optimum_benchmark/trackers/memory.py +++ b/optimum_benchmark/trackers/memory.py @@ -1,83 +1,110 @@ import os +from functools import reduce from logging import getLogger +from dataclasses import dataclass from contextlib import contextmanager -from typing import List, Optional, Dict from multiprocessing import Pipe, Process +from typing import List, Optional, Dict, Literal from multiprocessing.connection import Connection from ..env_utils import bytes_to_mega_bytes, get_cuda_device_ids, is_nvidia_system, is_rocm_system -from ..import_utils import is_py3nvml_available, is_pyrsmi_available, is_torch_available +from ..import_utils import is_pynvml_available, is_amdsmi_available, is_torch_available +from .utils import compute_max -if is_nvidia_system(): - if is_py3nvml_available(): - import py3nvml.py3nvml as nvml - else: - raise ValueError( - "The library py3nvml is required to run memory benchmark on NVIDIA GPUs, but is not installed. " - "Please install it through `pip install py3nvml`." - ) +if is_nvidia_system() and is_pynvml_available(): + import pynvml -if is_rocm_system(): - if is_pyrsmi_available(): - from pyrsmi import rocml - else: - raise ValueError( - "The library pyrsmi is required to run memory benchmark on AMD GPUs, but is not installed. " - "Please install it through `pip install pyrsmi@git+https://github.com/RadeonOpenCompute/pyrsmi.git." - ) +if is_rocm_system() and is_amdsmi_available(): + import amdsmi # type: ignore if is_torch_available(): import torch import psutil +Memory_Unit_Literal = Literal["MB"] LOGGER = getLogger("memory") -class MemoryTracker: - """ - Memory tracker to measure max memory usage of CPU or GPU devices. +@dataclass +class MaxMemory: + unit: Memory_Unit_Literal + + ram: float + vram: Optional[float] = None + reserved: Optional[float] = None + allocated: Optional[float] = None + + def __add__(self, other: "MaxMemory") -> "MaxMemory": + if self.unit != other.unit: + raise ValueError(f"Cannot add memory with different units: {self.unit} and {other.unit}") - Args: - device (str): Device to track memory usage. Can be either "cuda" or any other device. - backend (str): Backend to track memory usage. Can be either "pytorch" or any other backend. - device_ids (List[int], optional): List of device IDs to track memory usage. Defaults to None. - """ + ram = self.ram + other.ram + vram = self.vram + other.vram if self.vram is not None and other.vram is not None else None + reserved = self.reserved + other.reserved if self.reserved is not None and other.reserved is not None else None + allocated = ( + self.allocated + other.allocated if self.allocated is not None and other.allocated is not None else None + ) + + return MaxMemory(unit=self.unit, ram=ram, vram=vram, reserved=reserved, allocated=allocated) + + @staticmethod + def aggregate(max_memories: List["MaxMemory"]) -> "MaxMemory": + if len(max_memories) == 0 or all(memory is None for memory in max_memories): + return None + elif any(memory is None for memory in max_memories): + raise ValueError("Some memory measurements are missing") + + return reduce(lambda x, y: x + y, max_memories) + def log(self, prefix: str = "forward"): + LOGGER.info(f"\t\t+ {prefix} max RAM memory: {self.ram:f} ({self.unit})") + if self.vram is not None: + LOGGER.info(f"\t\t+ {prefix} max VRAM memory: {self.vram:f} ({self.unit})") + if self.reserved is not None: + LOGGER.info(f"\t\t+ {prefix} max reserved memory: {self.reserved:f} ({self.unit})") + if self.allocated is not None: + LOGGER.info(f"\t\t+ {prefix} max allocated memory: {self.allocated:f} ({self.unit})") + + +class MemoryTracker: def __init__(self, device: str, backend: str, device_ids: Optional[str] = None): self.device = device self.backend = backend + self.device_ids = device_ids - self.max_memory_used = 0 - self.max_memory_reserved = 0 - self.max_memory_allocated = 0 + self.ram_memory = [] + self.vram_memory = [] + self.reserved_memory = {} + self.allocated_memory = {} + + LOGGER.info("\t+ Tracking RAM memory") if self.device == "cuda": - if device_ids is None: + if self.device_ids is None: LOGGER.warning("\t+ `device=cuda` but `device_ids` not provided. Using all available CUDA devices.") - self.device_ids = list(map(int, get_cuda_device_ids().split(","))) - else: - self.device_ids = list(map(int, device_ids.split(","))) + self.device_ids = get_cuda_device_ids() + self.device_ids = list(map(int, self.device_ids.split(","))) LOGGER.info(f"\t+ Tracking VRAM memory of CUDA devices: {self.device_ids}") if self.backend == "pytorch": - self.pytorch_device_ids = list(range(torch.cuda.device_count())) - LOGGER.info(f"\t+ Tracking Pytorch memory of Pytorch CUDA devices: {self.pytorch_device_ids}") - - if len(self.device_ids) != len(self.pytorch_device_ids): + num_pytorch_cuda_devices = torch.cuda.device_count() + if len(self.device_ids) != num_pytorch_cuda_devices: raise ValueError( "The number of CUDA devices and Pytorch CUDA devices must be the same. " - f"Got {len(self.device_ids)} and {len(self.pytorch_device_ids)} respectively." + f"Got {len(self.device_ids)} and {num_pytorch_cuda_devices} respectively." ) - else: - LOGGER.info("\t+ Tracking RAM memory") + LOGGER.info( + f"\t+ Tracking Allocated/Reserved memory of {num_pytorch_cuda_devices} Pytorch CUDA devices" + ) def reset(self): - self.max_memory_used = 0 - self.max_memory_reserved = 0 - self.max_memory_allocated = 0 + self.ram_memory = [] + self.vram_memory = [] + self.reserved_memory = {} + self.allocated_memory = {} @contextmanager def track(self): @@ -88,43 +115,65 @@ def track(self): else: yield from self._cpu_memory() - def _cuda_pytorch_memory(self): + def _cuda_pytorch_memory(self, interval: float = 0.001): torch.cuda.empty_cache() - for pytorch_device_index in self.pytorch_device_ids: + + for device in range(torch.cuda.device_count()): + self.reserved_memory[device] = [] + self.allocated_memory[device] = [] try: - torch.cuda.reset_peak_memory_stats(device=pytorch_device_index) + torch.cuda.reset_peak_memory_stats(device=device) except Exception as e: - LOGGER.warning(f"\t+ Could not reset max memory stats for device {pytorch_device_index}: {e}") + LOGGER.warning(f"\t+ Could not reset max memory stats for device {device}: {e}") + + # initial memory usage + self.reserved_memory[device].append(torch.cuda.memory_reserved(device=device)) + self.allocated_memory[device].append(torch.cuda.memory_allocated(device=device)) + + # start recording memory allocations + torch.cuda.memory._record_memory_history() + + yield from self._cuda_memory(interval) - yield from self._cuda_memory() + # get snapshots and stop recording memory allocations + memory_device_traces = torch.cuda.memory._snapshot()["device_traces"] + torch.cuda.memory._record_memory_history(enabled=None) - for pytorch_device_index in self.pytorch_device_ids: - self.max_memory_reserved += torch.cuda.max_memory_reserved(device=pytorch_device_index) - self.max_memory_allocated += torch.cuda.max_memory_allocated(device=pytorch_device_index) + for device in range(torch.cuda.device_count()): + device_trace = memory_device_traces[device] + for entry in device_trace: + if entry["action"] == "alloc": + self.allocated_memory[device].append(self.allocated_memory[device][-1] + entry["size"]) + elif entry["action"] == "free_completed": + self.allocated_memory[device].append(self.allocated_memory[device][-1] - entry["size"]) + elif entry["action"] == "segment_alloc": + self.reserved_memory[device].append(self.reserved_memory[device][-1] + entry["size"]) + elif entry["action"] == "segment_free": + self.reserved_memory[device].append(self.reserved_memory[device][-1] - entry["size"]) - LOGGER.debug(f"\t+ Pytorch max memory reserved: {self.get_max_memory_reserved_mb()} MB") - LOGGER.debug(f"\t+ Pytorch max memory allocated: {self.get_max_memory_allocated_mb()} MB") + LOGGER.debug(f"\t+ Max allocated memory: {self.get_max_allocated_memory_mb()} MB") + LOGGER.debug(f"\t+ Max reserved memory: {self.get_max_reserved_memory_mb()} MB") - def _cuda_memory(self, interval: float = 0.001): + def _cuda_memory(self, interval: float = 0.0001): child_connection, parent_connection = Pipe() memory_process = Process( - target=monitor_gpu_max_vram_memory, - args=(self.device_ids, child_connection, interval), + target=monitor_gpu_vram_memory, + args=(os.getpid(), self.device_ids, child_connection, interval), daemon=True, ) memory_process.start() parent_connection.recv() # wait for memory process to be ready - yield + yield from self._cpu_memory(interval) parent_connection.send(True) - self.max_memory_used = parent_connection.recv() - LOGGER.debug(f"\t+ Max memory (VRAM) used: {self.get_max_memory_used_mb()} MB") + self.vram_memory = parent_connection.recv() + LOGGER.debug(f"\t+ Max memory (VRAM) used: {self.get_max_vram_memory_mb()} MB") - def _cpu_memory(self, interval: float = 0.001): + def _cpu_memory(self, interval: float = 0.0001): child_connection, parent_connection = Pipe() memory_process = Process( - target=monitor_cpu_max_ram_memory, + target=monitor_cpu_ram_memory, args=(os.getpid(), child_connection, interval), daemon=True, ) @@ -134,76 +183,142 @@ def _cpu_memory(self, interval: float = 0.001): yield parent_connection.send(True) - self.max_memory_used = parent_connection.recv() - LOGGER.debug(f"\t+ Max memory (RAM) used: {self.get_max_memory_used_mb()} MB") + self.ram_memory = parent_connection.recv() + LOGGER.debug(f"\t+ Max memory (RAM) used: {self.get_max_ram_memory_mb()} MB") + + def get_ram_memory_mb(self) -> List[int]: + return [bytes_to_mega_bytes(memory) for memory in self.ram_memory] + + def get_vram_memory_mb(self) -> List[int]: + return [bytes_to_mega_bytes(memory) for memory in self.vram_memory] + + def get_reserved_memory_mb(self) -> Dict[str, List[int]]: + return { + device: [bytes_to_mega_bytes(memory) for memory in self.reserved_memory[device]] + for device in self.reserved_memory + } + + def get_allocated_memory_mb(self) -> Dict[str, List[int]]: + return { + device: [bytes_to_mega_bytes(memory) for memory in self.allocated_memory[device]] + for device in self.allocated_memory + } + + def get_max_ram_memory_mb(self) -> int: + return compute_max(self.get_ram_memory_mb()) - def get_max_memory_used_mb(self) -> int: - return bytes_to_mega_bytes(self.max_memory_used) + def get_max_vram_memory_mb(self) -> int: + return compute_max(self.get_vram_memory_mb()) - def get_max_memory_allocated_mb(self) -> int: - return bytes_to_mega_bytes(self.max_memory_allocated) + def get_max_reserved_memory_mb(self) -> Dict[str, int]: + reserved_memory_mb = self.get_reserved_memory_mb() + return sum(compute_max(reserved_memory_mb[device]) for device in reserved_memory_mb) - def get_max_memory_reserved_mb(self) -> int: - return bytes_to_mega_bytes(self.max_memory_reserved) + def get_max_allocated_memory_mb(self) -> Dict[str, int]: + allocated_memory_mb = self.get_allocated_memory_mb() + return sum(compute_max(allocated_memory_mb[device]) for device in allocated_memory_mb) - def get_memories_dict(self) -> Dict[str, int]: + def get_max_memory(self): if self.device == "cuda" and self.backend == "pytorch": - return { - "max_vram_used(MB)": self.get_max_memory_used_mb(), - "max_memory_reserved(MB)": self.get_max_memory_reserved_mb(), - "max_memory_allocated(MB)": self.get_max_memory_allocated_mb(), - } + return MaxMemory( + unit="MB", + ram=self.get_max_ram_memory_mb(), + vram=self.get_max_vram_memory_mb(), + reserved=self.get_max_reserved_memory_mb(), + allocated=self.get_max_allocated_memory_mb(), + ) elif self.device == "cuda": - return {"max_vram_used(MB)": self.get_max_memory_used_mb()} + return MaxMemory( + unit="MB", + ram=self.get_max_ram_memory_mb(), + vram=self.get_max_vram_memory_mb(), + ) else: - return {"max_ram_used(MB)": self.get_max_memory_used_mb()} + return MaxMemory( + unit="MB", + ram=self.get_max_ram_memory_mb(), + ) -def monitor_cpu_max_ram_memory(process_id: int, connection: Connection, interval: float): +def monitor_cpu_ram_memory(process_id: int, connection: Connection, interval: float): process = psutil.Process(process_id) - max_memory_usage = 0 connection.send(0) + used_memory = [] stop = False while not stop: meminfo_attr = "memory_info" if hasattr(process, "memory_info") else "get_memory_info" - current_memory_usage = getattr(process, meminfo_attr)()[0] - max_memory_usage = max(max_memory_usage, current_memory_usage) + current_used_memory = getattr(process, meminfo_attr)()[0] + used_memory.append(current_used_memory) stop = connection.poll(interval) - connection.send(max_memory_usage) + connection.send(used_memory) connection.close() -def monitor_gpu_max_vram_memory(device_ids: List[int], connection: Connection, interval: float): - if is_nvidia_system() and is_py3nvml_available(): - nvml.nvmlInit() - handles = [nvml.nvmlDeviceGetHandleByIndex(device_id) for device_id in device_ids] - max_memory_usage = 0 +def monitor_gpu_vram_memory(process_id: int, device_ids: List[int], connection: Connection, interval: float): + if is_nvidia_system(): + if not is_pynvml_available(): + raise ValueError( + "The library pynvml is required to run memory benchmark on NVIDIA GPUs, but is not installed. " + "Please install the official and NVIDIA maintained PyNVML library through `pip install nvidia-ml-py`." + ) + pynvml.nvmlInit() + devices_handles = [pynvml.nvmlDeviceGetHandleByIndex(device_id) for device_id in device_ids] connection.send(0) + used_memory = [] stop = False while not stop: - current_memory_usage = sum(nvml.nvmlDeviceGetMemoryInfo(handle).used for handle in handles) - max_memory_usage = max(max_memory_usage, current_memory_usage) + current_used_memory = 0 + for device_id, device_handle in zip(device_ids, devices_handles): + device_processes = pynvml.nvmlDeviceGetComputeRunningProcesses(device_handle) + for device_process in device_processes: + if device_process.pid == process_id or ( + psutil.pid_exists(device_process.pid) + and psutil.Process(device_process.pid).parent().pid == process_id + ): + # only memory usage of the process and its children is tracked + current_used_memory += device_process.usedGpuMemory + + used_memory.append(current_used_memory) stop = connection.poll(interval) - connection.send(max_memory_usage) - nvml.nvmlShutdown() + connection.send(used_memory) + pynvml.nvmlShutdown() connection.close() - elif is_rocm_system() and is_pyrsmi_available(): - rocml.smi_initialize() - max_memory_usage = 0 + elif is_rocm_system(): + if not is_amdsmi_available(): + raise ValueError( + "The library amdsmi is required to run memory benchmark on AMD GPUs, but is not installed. " + "Please install the official and AMD maintained amdsmi library from https://github.com/ROCm/amdsmi." + ) + amdsmi.amdsmi_init() + # we can only get all handles at once + devices_handles = amdsmi.amdsmi_get_processor_handles() connection.send(0) + used_memory = [] stop = False while not stop: - current_memory_usage = sum(rocml.smi_get_device_memory_used(device_id) for device_id in device_ids) - max_memory_usage = max(max_memory_usage, current_memory_usage) + current_used_memory = 0 + for device_id in device_ids: + device_handle = devices_handles[device_id] + device_process = amdsmi.amdsmi_get_gpu_process_list(device_handle) + for process_handle in device_process: + process_info = amdsmi.amdsmi_get_gpu_process_info(device_handle, process_handle) + if process_info["pid"] == process_id or ( + psutil.pid_exists(process_info["pid"]) + and psutil.Process(process_info["pid"]).parent().pid == process_id + ): + # only memory usage of the process and its children is tracked + current_used_memory += process_info["memory_usage"]["vram_mem"] + + used_memory.append(current_used_memory) stop = connection.poll(interval) - connection.send(max_memory_usage) - rocml.smi_shutdown() + connection.send(used_memory) + amdsmi.amdsmi_shut_down() connection.close() else: raise ValueError("Only NVIDIA and AMD ROCm GPUs are supported for CUDA memory tracking.") diff --git a/optimum_benchmark/trackers/utils.py b/optimum_benchmark/trackers/utils.py new file mode 100644 index 00000000..55811d3b --- /dev/null +++ b/optimum_benchmark/trackers/utils.py @@ -0,0 +1,14 @@ +from typing import List +from statistics import mean, stdev + + +def compute_max(values: List[int]) -> int: + return max(values) if len(values) > 0 else 0 + + +def compute_mean(values: List[int]) -> float: + return mean(values) if len(values) > 0 else 0.0 + + +def compute_stdev(values: List[int]) -> float: + return stdev(values) if len(values) > 1 else 0.0