diff --git a/genai-perf/genai_perf/main.py b/genai-perf/genai_perf/main.py index caf3e804..e163e78d 100755 --- a/genai-perf/genai_perf/main.py +++ b/genai-perf/genai_perf/main.py @@ -111,7 +111,8 @@ def report_output(data_parser: ProfileDataParser, args: Namespace) -> None: load_level = f"{args.request_rate}" else: raise GenAIPerfException("No valid infer mode specified") - + + #TPA-274 - Integrate telemetry metrics with other metrics for export stats = data_parser.get_statistics(infer_mode, load_level) reporter = OutputReporter(stats, args) reporter.report_output() diff --git a/genai-perf/genai_perf/metrics/telemetry_metrics.py b/genai-perf/genai_perf/metrics/telemetry_metrics.py index 59eae210..9a4ae32a 100755 --- a/genai-perf/genai_perf/metrics/telemetry_metrics.py +++ b/genai-perf/genai_perf/metrics/telemetry_metrics.py @@ -33,9 +33,9 @@ class TelemetryMetrics: """ - A class that contains common telemetry level metrics. + A class that contains common telemetry metrics. Metrics are stored as lists where each inner list corresponds to multiple measurements per GPU. - Each measurement is recorded every 1000 ms. + Each measurement is recorded every second. """ TELEMETRY_METRICS = [ @@ -53,8 +53,8 @@ def __init__( gpu_power_limit: List[List[float]] = [], energy_consumption: List[List[float]] = [], gpu_utilization: List[List[float]] = [], - total_gpu_memory: List[List[int]] = [], - gpu_memory_used: List[List[int]] = [], + total_gpu_memory: List[List[float]] = [], + gpu_memory_used: List[List[float]] = [], ) -> None: self.gpu_power_usage = gpu_power_usage self.gpu_power_limit = gpu_power_limit @@ -63,6 +63,13 @@ def __init__( self.total_gpu_memory = total_gpu_memory self.gpu_memory_used = gpu_memory_used + def update_metrics(self, measurement_data: dict) -> None: + """Update the metrics with new measurement data""" + for metric in self.TELEMETRY_METRICS: + metric_key = metric.name + if metric_key in measurement_data: + getattr(self, metric_key).append(measurement_data[metric_key]) + def __repr__(self): attr_strs = [] for k, v in self.__dict__.items(): @@ -70,6 +77,11 @@ def __repr__(self): attr_strs.append(f"{k}={v}") return f"TelemetryMetrics({','.join(attr_strs)})" + + def __iter__(self): + for attr, value in self.__dict__.items(): + yield attr, value + @property def telemetry_metrics(self) -> List[MetricMetadata]: return self.TELEMETRY_METRICS diff --git a/genai-perf/genai_perf/parser.py b/genai-perf/genai_perf/parser.py index 60994f29..dc2c1cc0 100644 --- a/genai-perf/genai_perf/parser.py +++ b/genai-perf/genai_perf/parser.py @@ -39,6 +39,7 @@ DEFAULT_ARTIFACT_DIR, DEFAULT_COMPARE_DIR, OPEN_ORCA, + DEFAULT_TRITON_METRICS_URL, ) from genai_perf.llm_inputs.llm_inputs import ( LlmInputs, @@ -761,8 +762,17 @@ def compare_handler(args: argparse.Namespace): def profile_handler(args, extra_args): from genai_perf.wrapper import Profiler + from genai_perf.telemetry_data.triton_telemetry_data_collector import ( + TritonTelemetryDataCollector, + ) + telemetry_data_collector = None + if args.service_kind == "triton": + # TPA-275: pass server url as a CLI option in non-default case + telemetry_data_collector = TritonTelemetryDataCollector( + server_metrics_url=DEFAULT_TRITON_METRICS_URL + ) - Profiler.run(args=args, extra_args=extra_args) + Profiler.run(telemetry_data_collector, args=args, extra_args=extra_args) ### Parser Initialization ### diff --git a/genai-perf/genai_perf/telemetry_data/telemetry_data_collector.py b/genai-perf/genai_perf/telemetry_data/telemetry_data_collector.py index bd606d09..6865bf97 100755 --- a/genai-perf/genai_perf/telemetry_data/telemetry_data_collector.py +++ b/genai-perf/genai_perf/telemetry_data/telemetry_data_collector.py @@ -45,39 +45,6 @@ def __init__( self._stop_event = threading.Event() self._thread = None - def _fetch_metrics(self) -> None: - """Fetch the metrics from the metrics endpoint""" - response = requests.get(self._server_metrics_url) - response.raise_for_status() - return response.text - - @abstractmethod - def _parse_metrics(self) -> None: - """Parse metrics data. This method should be implemented by subclasses.""" - pass - - def _update_metrics(self, parsed_data) -> None: - for metric_name, metric_values in parsed_data.items(): - if len(metric_values) > len(getattr(self.metrics, metric_name, [])): - current_values = getattr(self.metrics, metric_name, []) - current_values.append(metric_values) - setattr(self.metrics, metric_name, current_values) - - def _collect_metrics(self) -> None: - while not self._stop_event.is_set(): - metrics_data = self._fetch_metrics() - parsed_data = self._parse_metrics(metrics_data) - self._update_metrics(parsed_data) - - self.metrics.gpu_power_usage.append(parsed_data["gpu_power_usage"]) - self.metrics.gpu_power_limit.append(parsed_data["gpu_power_limit"]) - self.metrics.energy_consumption.append(parsed_data["energy_consumption"]) - self.metrics.gpu_utilization.append(parsed_data["gpu_utilization"]) - self.metrics.total_gpu_memory.append(parsed_data["total_gpu_memory"]) - self.metrics.gpu_memory_used.append(parsed_data["gpu_memory_used"]) - - time.sleep(self._collection_interval) - def start(self) -> None: """Start the telemetry data collection thread.""" if self._thread is None or not self._thread.is_alive(): @@ -91,6 +58,24 @@ def stop(self) -> None: self._stop_event.set() self._thread.join() + def _fetch_metrics(self) -> str: + """Fetch metrics from the metrics endpoint""" + response = requests.get(self._server_metrics_url) + response.raise_for_status() + return response.text + + @abstractmethod + def _process_and_update_metrics(self) -> None: + """This method should be implemented by subclasses.""" + pass + + def _collect_metrics(self) -> None: + """Continuously collect telemetry metrics at for every second""" + while not self._stop_event.is_set(): + metrics_data = self._fetch_metrics() + self._process_and_update_metrics(metrics_data) + time.sleep(self._collection_interval) + @property def metrics(self) -> TelemetryMetrics: """Return the collected metrics.""" diff --git a/genai-perf/genai_perf/telemetry_data/triton_telemetry_data_collector.py b/genai-perf/genai_perf/telemetry_data/triton_telemetry_data_collector.py index 9fa0d633..c29f217e 100755 --- a/genai-perf/genai_perf/telemetry_data/triton_telemetry_data_collector.py +++ b/genai-perf/genai_perf/telemetry_data/triton_telemetry_data_collector.py @@ -26,44 +26,57 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import re from typing import Dict, List - from genai_perf.telemetry_data.telemetry_data_collector import TelemetryDataCollector class TritonTelemetryDataCollector(TelemetryDataCollector): """Class to collect telemetry metrics from Triton server""" - def _parse_metrics(self, data: str) -> None: - # Parsing logic for Prometheus metrics - metrics = { - "gpu_power_usage": [], - "gpu_power_limit": [], - "energy_consumption": [], - "gpu_utilization": [], - "total_gpu_memory": [], - "gpu_memory_used": [], - } + """Mapping from Triton metric names to GenAI-Perf telemetry metric names""" + METRIC_NAME_MAPPING = { + "nv_gpu_power_usage": "gpu_power_usage", + "nv_gpu_power_limit": "gpu_power_limit", + "nv_energy_consumption": "energy_consumption", + "nv_gpu_utilization": "gpu_utilization", + "nv_gpu_memory_total_bytes": "total_gpu_memory", + "nv_gpu_memory_used_bytes": "gpu_memory_used", + } + + def _process_and_update_metrics(self, metrics_data: str) -> None: + """Process the response from Triton metrics endpoint and update metrics. + + This method extracts metric names and values from the raw data. Metric names + are extracted from the start of each line up to the '{' character, as all metrics + follow the format 'metric_name{labels} value'. Only metrics defined in + METRIC_NAME_MAPPING are processed. + + Args: + data (str): Raw metrics data from the Triton endpoint. + + Example: + Given the metric data: + ``` + nv_gpu_power_usage{gpu_uuid="GPU-abschdinjacgdo65gdj7"} 27.01 + nv_gpu_utilization{gpu_uuid="GPU-abcdef123456"} 75.5 + nv_energy_consumption{gpu_uuid="GPU-xyz789"} 1234.56 + ``` + + The method will extract and process: + - `nv_gpu_power_usage` as `gpu_power_usage` + - `nv_gpu_utilization` as `gpu_utilization` + - `nv_energy_consumption` as `energy_consumption` + """ - for line in data.splitlines(): - if line.startswith("nv_gpu_power_usage"): - self._extract_metric(line, metrics["gpu_power_usage"]) - elif line.startswith("nv_gpu_power_limit"): - self._extract_metric(line, metrics["gpu_power_limit"]) - elif line.startswith("nv_energy_consumption"): - self._extract_metric(line, metrics["energy_consumption"]) - elif line.startswith("nv_gpu_utilization"): - self._extract_metric(line, metrics["gpu_utilization"]) - elif line.startswith("nv_gpu_memory_total_bytes"): - self._extract_metric(line, metrics["total_gpu_memory"]) - elif line.startswith("nv_gpu_memory_used_bytes"): - self._extract_metric(line, metrics["gpu_memory_used"]) - return metrics + current_measurement_data = {metric.name: [] for metric in self.metrics.TELEMETRY_METRICS} - def _extract_metric( - self, metric_line: str, metric_list: List[List[float]] - ) -> Dict[str, List[List[float]]]: - metric_components = metric_line.split() - metric_value = float(metric_components[1]) - metric_list.append(metric_value) + for metric_data in metrics_data.splitlines(): + triton_metric_key = metric_data.split('{')[0] # Extract metric name before '{' + metric_value = metric_data.split()[1] # Extract metric value + + if triton_metric_key in self.METRIC_NAME_MAPPING: + metric_key = self.METRIC_NAME_MAPPING[triton_metric_key] + if metric_key in current_measurement_data: + current_measurement_data[metric_key].append(float(metric_value)) + + self.metrics.update_metrics(current_measurement_data) diff --git a/genai-perf/genai_perf/wrapper.py b/genai-perf/genai_perf/wrapper.py index 73c146a9..23b148ff 100644 --- a/genai-perf/genai_perf/wrapper.py +++ b/genai-perf/genai_perf/wrapper.py @@ -33,12 +33,11 @@ from genai_perf.constants import ( DEFAULT_GRPC_URL, DEFAULT_INPUT_DATA_JSON, - DEFAULT_TRITON_METRICS_URL, ) from genai_perf.llm_inputs.llm_inputs import OutputFormat from genai_perf.telemetry_data.triton_telemetry_data_collector import ( - TritonTelemetryDataCollector, -) + TritonTelemetryDataCollector, + ) logger = logging.getLogger(__name__) @@ -145,13 +144,9 @@ def build_cmd(args: Namespace, extra_args: Optional[List[str]] = None) -> List[s return cmd @staticmethod - def run(args: Namespace, extra_args: Optional[List[str]]) -> None: - telemetry_data_collector = None + def run(telemetry_data_collector: TritonTelemetryDataCollector, args: Namespace, extra_args: Optional[List[str]]) -> None: try: - if args.service_kind == "triton": - telemetry_data_collector = TritonTelemetryDataCollector( - server_metrics_url=DEFAULT_TRITON_METRICS_URL - ) + if telemetry_data_collector is not None: telemetry_data_collector.start() cmd = Profiler.build_cmd(args, extra_args) logger.info(f"Running Perf Analyzer : '{' '.join(cmd)}'") @@ -162,7 +157,4 @@ def run(args: Namespace, extra_args: Optional[List[str]]) -> None: finally: if telemetry_data_collector is not None: telemetry_data_collector.stop() - metrics = telemetry_data_collector.metrics - #This print statement will be removed once the test is in place. - print("Collected Metrics:") - print(metrics) +