Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lkomali committed Aug 7, 2024
1 parent a16c268 commit e4da216
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 84 deletions.
3 changes: 2 additions & 1 deletion genai-perf/genai_perf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def report_output(data_parser: ProfileDataParser, args: Namespace) -> None:
load_level = f"{args.request_rate}"
else:
raise GenAIPerfException("No valid infer mode specified")


#TPA-274 - Integrate telemetry metrics with other metrics for export
stats = data_parser.get_statistics(infer_mode, load_level)
reporter = OutputReporter(stats, args)
reporter.report_output()
Expand Down
20 changes: 16 additions & 4 deletions genai-perf/genai_perf/metrics/telemetry_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@

class TelemetryMetrics:
"""
A class that contains common telemetry level metrics.
A class that contains common telemetry metrics.
Metrics are stored as lists where each inner list corresponds to multiple measurements per GPU.
Each measurement is recorded every 1000 ms.
Each measurement is recorded every second.
"""

TELEMETRY_METRICS = [
Expand All @@ -53,8 +53,8 @@ def __init__(
gpu_power_limit: List[List[float]] = [],
energy_consumption: List[List[float]] = [],
gpu_utilization: List[List[float]] = [],
total_gpu_memory: List[List[int]] = [],
gpu_memory_used: List[List[int]] = [],
total_gpu_memory: List[List[float]] = [],
gpu_memory_used: List[List[float]] = [],
) -> None:
self.gpu_power_usage = gpu_power_usage
self.gpu_power_limit = gpu_power_limit
Expand All @@ -63,13 +63,25 @@ def __init__(
self.total_gpu_memory = total_gpu_memory
self.gpu_memory_used = gpu_memory_used

def update_metrics(self, measurement_data: dict) -> None:
"""Update the metrics with new measurement data"""
for metric in self.TELEMETRY_METRICS:
metric_key = metric.name
if metric_key in measurement_data:
getattr(self, metric_key).append(measurement_data[metric_key])

def __repr__(self):
attr_strs = []
for k, v in self.__dict__.items():
if not k.startswith("_"):
attr_strs.append(f"{k}={v}")
return f"TelemetryMetrics({','.join(attr_strs)})"


def __iter__(self):
for attr, value in self.__dict__.items():
yield attr, value

@property
def telemetry_metrics(self) -> List[MetricMetadata]:
return self.TELEMETRY_METRICS
Expand Down
12 changes: 11 additions & 1 deletion genai-perf/genai_perf/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
DEFAULT_ARTIFACT_DIR,
DEFAULT_COMPARE_DIR,
OPEN_ORCA,
DEFAULT_TRITON_METRICS_URL,
)
from genai_perf.llm_inputs.llm_inputs import (
LlmInputs,
Expand Down Expand Up @@ -761,8 +762,17 @@ def compare_handler(args: argparse.Namespace):

def profile_handler(args, extra_args):
from genai_perf.wrapper import Profiler
from genai_perf.telemetry_data.triton_telemetry_data_collector import (
TritonTelemetryDataCollector,
)
telemetry_data_collector = None
if args.service_kind == "triton":
# TPA-275: pass server url as a CLI option in non-default case
telemetry_data_collector = TritonTelemetryDataCollector(
server_metrics_url=DEFAULT_TRITON_METRICS_URL
)

Profiler.run(args=args, extra_args=extra_args)
Profiler.run(telemetry_data_collector, args=args, extra_args=extra_args)


### Parser Initialization ###
Expand Down
51 changes: 18 additions & 33 deletions genai-perf/genai_perf/telemetry_data/telemetry_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,39 +45,6 @@ def __init__(
self._stop_event = threading.Event()
self._thread = None

def _fetch_metrics(self) -> None:
"""Fetch the metrics from the metrics endpoint"""
response = requests.get(self._server_metrics_url)
response.raise_for_status()
return response.text

@abstractmethod
def _parse_metrics(self) -> None:
"""Parse metrics data. This method should be implemented by subclasses."""
pass

def _update_metrics(self, parsed_data) -> None:
for metric_name, metric_values in parsed_data.items():
if len(metric_values) > len(getattr(self.metrics, metric_name, [])):
current_values = getattr(self.metrics, metric_name, [])
current_values.append(metric_values)
setattr(self.metrics, metric_name, current_values)

def _collect_metrics(self) -> None:
while not self._stop_event.is_set():
metrics_data = self._fetch_metrics()
parsed_data = self._parse_metrics(metrics_data)
self._update_metrics(parsed_data)

self.metrics.gpu_power_usage.append(parsed_data["gpu_power_usage"])
self.metrics.gpu_power_limit.append(parsed_data["gpu_power_limit"])
self.metrics.energy_consumption.append(parsed_data["energy_consumption"])
self.metrics.gpu_utilization.append(parsed_data["gpu_utilization"])
self.metrics.total_gpu_memory.append(parsed_data["total_gpu_memory"])
self.metrics.gpu_memory_used.append(parsed_data["gpu_memory_used"])

time.sleep(self._collection_interval)

def start(self) -> None:
"""Start the telemetry data collection thread."""
if self._thread is None or not self._thread.is_alive():
Expand All @@ -91,6 +58,24 @@ def stop(self) -> None:
self._stop_event.set()
self._thread.join()

def _fetch_metrics(self) -> str:
"""Fetch metrics from the metrics endpoint"""
response = requests.get(self._server_metrics_url)
response.raise_for_status()
return response.text

@abstractmethod
def _process_and_update_metrics(self) -> None:
"""This method should be implemented by subclasses."""
pass

def _collect_metrics(self) -> None:
"""Continuously collect telemetry metrics at for every second"""
while not self._stop_event.is_set():
metrics_data = self._fetch_metrics()
self._process_and_update_metrics(metrics_data)
time.sleep(self._collection_interval)

@property
def metrics(self) -> TelemetryMetrics:
"""Return the collected metrics."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,44 +26,57 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import re
from typing import Dict, List

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'Dict' is not used.
Import of 'List' is not used.

from genai_perf.telemetry_data.telemetry_data_collector import TelemetryDataCollector


class TritonTelemetryDataCollector(TelemetryDataCollector):
"""Class to collect telemetry metrics from Triton server"""

def _parse_metrics(self, data: str) -> None:
# Parsing logic for Prometheus metrics
metrics = {
"gpu_power_usage": [],
"gpu_power_limit": [],
"energy_consumption": [],
"gpu_utilization": [],
"total_gpu_memory": [],
"gpu_memory_used": [],
}
"""Mapping from Triton metric names to GenAI-Perf telemetry metric names"""
METRIC_NAME_MAPPING = {
"nv_gpu_power_usage": "gpu_power_usage",
"nv_gpu_power_limit": "gpu_power_limit",
"nv_energy_consumption": "energy_consumption",
"nv_gpu_utilization": "gpu_utilization",
"nv_gpu_memory_total_bytes": "total_gpu_memory",
"nv_gpu_memory_used_bytes": "gpu_memory_used",
}

def _process_and_update_metrics(self, metrics_data: str) -> None:

Check warning

Code scanning / CodeQL

Signature mismatch in overriding method Warning

Overriding method '_process_and_update_metrics' has signature mismatch with
overridden method
.
"""Process the response from Triton metrics endpoint and update metrics.
This method extracts metric names and values from the raw data. Metric names
are extracted from the start of each line up to the '{' character, as all metrics
follow the format 'metric_name{labels} value'. Only metrics defined in
METRIC_NAME_MAPPING are processed.
Args:
data (str): Raw metrics data from the Triton endpoint.
Example:
Given the metric data:
```
nv_gpu_power_usage{gpu_uuid="GPU-abschdinjacgdo65gdj7"} 27.01
nv_gpu_utilization{gpu_uuid="GPU-abcdef123456"} 75.5
nv_energy_consumption{gpu_uuid="GPU-xyz789"} 1234.56
```
The method will extract and process:
- `nv_gpu_power_usage` as `gpu_power_usage`
- `nv_gpu_utilization` as `gpu_utilization`
- `nv_energy_consumption` as `energy_consumption`
"""

for line in data.splitlines():
if line.startswith("nv_gpu_power_usage"):
self._extract_metric(line, metrics["gpu_power_usage"])
elif line.startswith("nv_gpu_power_limit"):
self._extract_metric(line, metrics["gpu_power_limit"])
elif line.startswith("nv_energy_consumption"):
self._extract_metric(line, metrics["energy_consumption"])
elif line.startswith("nv_gpu_utilization"):
self._extract_metric(line, metrics["gpu_utilization"])
elif line.startswith("nv_gpu_memory_total_bytes"):
self._extract_metric(line, metrics["total_gpu_memory"])
elif line.startswith("nv_gpu_memory_used_bytes"):
self._extract_metric(line, metrics["gpu_memory_used"])
return metrics
current_measurement_data = {metric.name: [] for metric in self.metrics.TELEMETRY_METRICS}

def _extract_metric(
self, metric_line: str, metric_list: List[List[float]]
) -> Dict[str, List[List[float]]]:
metric_components = metric_line.split()
metric_value = float(metric_components[1])
metric_list.append(metric_value)
for metric_data in metrics_data.splitlines():
triton_metric_key = metric_data.split('{')[0] # Extract metric name before '{'
metric_value = metric_data.split()[1] # Extract metric value

if triton_metric_key in self.METRIC_NAME_MAPPING:
metric_key = self.METRIC_NAME_MAPPING[triton_metric_key]
if metric_key in current_measurement_data:
current_measurement_data[metric_key].append(float(metric_value))

self.metrics.update_metrics(current_measurement_data)
18 changes: 5 additions & 13 deletions genai-perf/genai_perf/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@
from genai_perf.constants import (
DEFAULT_GRPC_URL,
DEFAULT_INPUT_DATA_JSON,
DEFAULT_TRITON_METRICS_URL,
)
from genai_perf.llm_inputs.llm_inputs import OutputFormat
from genai_perf.telemetry_data.triton_telemetry_data_collector import (
TritonTelemetryDataCollector,
)
TritonTelemetryDataCollector,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -145,13 +144,9 @@ def build_cmd(args: Namespace, extra_args: Optional[List[str]] = None) -> List[s
return cmd

@staticmethod
def run(args: Namespace, extra_args: Optional[List[str]]) -> None:
telemetry_data_collector = None
def run(telemetry_data_collector: TritonTelemetryDataCollector, args: Namespace, extra_args: Optional[List[str]]) -> None:
try:
if args.service_kind == "triton":
telemetry_data_collector = TritonTelemetryDataCollector(
server_metrics_url=DEFAULT_TRITON_METRICS_URL
)
if telemetry_data_collector is not None:
telemetry_data_collector.start()
cmd = Profiler.build_cmd(args, extra_args)
logger.info(f"Running Perf Analyzer : '{' '.join(cmd)}'")
Expand All @@ -162,7 +157,4 @@ def run(args: Namespace, extra_args: Optional[List[str]]) -> None:
finally:
if telemetry_data_collector is not None:
telemetry_data_collector.stop()
metrics = telemetry_data_collector.metrics
#This print statement will be removed once the test is in place.
print("Collected Metrics:")
print(metrics)

0 comments on commit e4da216

Please sign in to comment.