diff --git a/pyproject.toml b/pyproject.toml index f3d9799..d2d5169 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "google-cloud-storage", "tomli", "docker>=7.1.0", + "pynvml>=12.0.0" ] [project.optional-dependencies] diff --git a/src/genesys/data.py b/src/genesys/data.py index b2b8c51..91bf5ed 100644 --- a/src/genesys/data.py +++ b/src/genesys/data.py @@ -4,7 +4,7 @@ import rich.progress from transformers import AutoTokenizer import random -from genesys.utils import log_prime +from genesys.prime_metrics import PrimeMetric class DataConfig(BaseConfig): @@ -18,6 +18,8 @@ class DataConfig(BaseConfig): prime_log: bool = False + prime_log_freq: int = 5 + def repeat_elements(lst, n): return [item for item in lst for _ in range(n)] @@ -85,6 +87,8 @@ def _add_column(dataset, path): for i, length in enumerate(self.dataset_lengths) ] + self.prime_metric = PrimeMetric(disable=not (config.prime_log), period=config.prime_log_freq) + def _prepare_batch(self, batch: dict, dataset: str) -> tuple: batch = repeat_elements( [b for b in batch], self.config.num_responses_per_question @@ -129,7 +133,6 @@ def __iter__(self) -> Generator[tuple, None, None]: break def log_progress_prime(self, paths: list[str], dataset_counters: list[int]): - if self.config.prime_log: - metric = {path: counter for path, counter in zip(paths, dataset_counters)} - metric.update({"total": sum(dataset_counters)}) - log_prime(metric) + metric = {path: counter for path, counter in zip(paths, dataset_counters)} + metric.update({"total": sum(dataset_counters)}) + self.prime_metric.log_prime(metric) diff --git a/src/genesys/prime_metrics.py b/src/genesys/prime_metrics.py new file mode 100644 index 0000000..fa029e4 --- /dev/null +++ b/src/genesys/prime_metrics.py @@ -0,0 +1,131 @@ +import socket +import platform +import json +import os +from typing import Any +import psutil +import threading +import time +import pynvml + + +class PrimeMetric: + """ + A class to log metrics to Prime Miner via Unix socket. + + Periodically collects and logs system metrics including CPU, memory and GPU usage. + + Args: + disable (bool): If True, disables metric logging. Defaults to False. + period (int): Collection interval in seconds. Defaults to 5. + + Usage: + metrics = PrimeMetric() + metrics.log_prime({"custom_metric": value}) + """ + + def __init__(self, disable: bool = False, period: int = 5): + self.disable = disable + self.period = period + self._thread = None + self._stop_event = threading.Event() + self._start_metrics_thread() + + self.has_gpu = False + try: + pynvml.nvmlInit() + pynvml.nvmlDeviceGetHandleByIndex(0) # Check if at least one GPU exists + self.has_gpu = True + except pynvml.NVMLError: + pass + + ## public + + def log_prime(self, metric: dict[str, Any]): + if self.disable: + return + if not (self._send_message_prime(metric)): + print(f"Prime logging failed: {metric}") + + ## private + + @classmethod + def _get_default_socket_path(cls) -> str: + """Returns the default socket path based on the operating system.""" + default = ( + "/tmp/com.prime.miner/metrics.sock" + if platform.system() == "Darwin" + else "/var/run/com.prime.miner/metrics.sock" + ) + return os.getenv("PRIME_TASK_BRIDGE_SOCKET", default=default) + + def _send_message_prime(self, metric: dict, socket_path: str = None) -> bool: + """Sends a message to the specified socket path or uses the default if none is provided.""" + socket_path = socket_path or os.getenv("PRIME_TASK_BRIDGE_SOCKET", self._get_default_socket_path()) + # print("Sending message to socket: ", socket_path) + + task_id = os.getenv("PRIME_TASK_ID", None) + if task_id is None: + print("No task ID found, skipping logging to Prime") + return False + try: + with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: + sock.connect(socket_path) + + for key, value in metric.items(): + message = {"label": key, "value": value, "task_id": task_id} + sock.sendall(json.dumps(message).encode()) + return True + except Exception: + return False + + ### background system metrics + + def _start_metrics_thread(self): + """Starts the metrics collection thread""" + if self._thread is not None: + return + self._stop_event.clear() + self._thread = threading.Thread(target=self._collect_metrics) + self._thread.daemon = True + self._thread.start() + + def _stop_metrics_thread(self): + """Stops the metrics collection thread""" + if self._thread is None: + return + self._stop_event.set() + self._thread.join() + self._thread = None + + def _collect_metrics(self): + while not self._stop_event.is_set(): + metrics = { + "cpu_percent": psutil.cpu_percent(), + "memory_percent": psutil.virtual_memory().percent, + "memory_usage": psutil.virtual_memory().used, + "memory_total": psutil.virtual_memory().total, + } + + if self.has_gpu: + gpu_count = pynvml.nvmlDeviceGetCount() + for i in range(gpu_count): + handle = pynvml.nvmlDeviceGetHandleByIndex(i) + info = pynvml.nvmlDeviceGetMemoryInfo(handle) + gpu_util = pynvml.nvmlDeviceGetUtilizationRates(handle) + + metrics.update( + { + f"gpu_{i}_memory_used": info.used, + f"gpu_{i}_memory_total": info.total, + f"gpu_{i}_utilization": gpu_util.gpu, + } + ) + + self.log_prime(metrics) + time.sleep(self.period) + + def __del__(self): + if hasattr(self, "_thread") and self._thread is not None: + # need to check hasattr because __del__ sometine delete attributes befores + self._stop_metrics_thread() diff --git a/src/genesys/utils.py b/src/genesys/utils.py index 37a1cb4..737bc54 100644 --- a/src/genesys/utils.py +++ b/src/genesys/utils.py @@ -5,8 +5,7 @@ import random import base64 import threading -import socket -import platform + from google.cloud import storage from google.oauth2 import service_account from queue import Queue @@ -134,39 +133,3 @@ def extract_json(text): return json.loads(json_str) except json.JSONDecodeError: raise ValueError("Failed to parse JSON from the extracted content") - - -def get_default_socket_path() -> str: - """Returns the default socket path based on the operating system.""" - default = ( - "/tmp/com.prime.miner/metrics.sock" - if platform.system() == "Darwin" - else "/var/run/com.prime.miner/metrics.sock" - ) - return os.getenv("PRIME_TASK_BRIDGE_SOCKET", default=default) - - -def send_message_prime(metric: dict, socket_path: str = None) -> bool: - """Sends a message to the specified socket path or uses the default if none is provided.""" - socket_path = socket_path or os.getenv("PRIME_TASK_BRIDGE_SOCKET", get_default_socket_path()) - # print("Sending message to socket: ", socket_path) - - task_id = os.getenv("PRIME_TASK_ID", None) - if task_id is None: - print("No task ID found, skipping logging to Prime") - return False - try: - with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock: - sock.connect(socket_path) - - for key, value in metric.items(): - message = {"label": key, "value": value, "task_id": task_id} - sock.sendall(json.dumps(message).encode()) - return True - except Exception: - return False - - -def log_prime(metric: dict): - if not (send_message_prime(metric)): - print(f"Prime logging failed: {metric}") diff --git a/uv.lock b/uv.lock index ef27a00..f579b96 100644 --- a/uv.lock +++ b/uv.lock @@ -644,6 +644,7 @@ dependencies = [ { name = "docker" }, { name = "google-cloud-storage" }, { name = "pydantic-config" }, + { name = "pynvml" }, { name = "rich" }, { name = "tomli" }, { name = "torch" }, @@ -682,6 +683,7 @@ requires-dist = [ { name = "google-cloud-storage" }, { name = "psutil", marker = "extra == 'sglang'" }, { name = "pydantic-config", git = "https://github.com/samsja/pydantic_config.git?rev=74c94ee" }, + { name = "pynvml", specifier = ">=12.0.0" }, { name = "rich" }, { name = "setuptools", marker = "extra == 'sglang'" }, { name = "sglang", extras = ["srt"], marker = "extra == 'sglang'", specifier = ">=0.4.1" }, @@ -1736,6 +1738,7 @@ name = "nvidia-cublas-cu12" version = "12.4.5.8" source = { registry = "https://pypi.org/simple" } wheels = [ + { url = "https://files.pythonhosted.org/packages/7f/7f/7fbae15a3982dc9595e49ce0f19332423b260045d0a6afe93cdbe2f1f624/nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_aarch64.whl", hash = "sha256:0f8aa1706812e00b9f19dfe0cdb3999b092ccb8ca168c0db5b8ea712456fd9b3", size = 363333771 }, { url = "https://files.pythonhosted.org/packages/ae/71/1c91302526c45ab494c23f61c7a84aa568b8c1f9d196efa5993957faf906/nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl", hash = "sha256:2fc8da60df463fdefa81e323eef2e36489e1c94335b5358bcb38360adf75ac9b", size = 363438805 }, ] @@ -1744,6 +1747,7 @@ name = "nvidia-cuda-cupti-cu12" version = "12.4.127" source = { registry = "https://pypi.org/simple" } wheels = [ + { url = "https://files.pythonhosted.org/packages/93/b5/9fb3d00386d3361b03874246190dfec7b206fd74e6e287b26a8fcb359d95/nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_aarch64.whl", hash = "sha256:79279b35cf6f91da114182a5ce1864997fd52294a87a16179ce275773799458a", size = 12354556 }, { url = "https://files.pythonhosted.org/packages/67/42/f4f60238e8194a3106d06a058d494b18e006c10bb2b915655bd9f6ea4cb1/nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl", hash = "sha256:9dec60f5ac126f7bb551c055072b69d85392b13311fcc1bcda2202d172df30fb", size = 13813957 }, ] @@ -1752,6 +1756,7 @@ name = "nvidia-cuda-nvrtc-cu12" version = "12.4.127" source = { registry = "https://pypi.org/simple" } wheels = [ + { url = "https://files.pythonhosted.org/packages/77/aa/083b01c427e963ad0b314040565ea396f914349914c298556484f799e61b/nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_aarch64.whl", hash = "sha256:0eedf14185e04b76aa05b1fea04133e59f465b6f960c0cbf4e37c3cb6b0ea198", size = 24133372 }, { url = "https://files.pythonhosted.org/packages/2c/14/91ae57cd4db3f9ef7aa99f4019cfa8d54cb4caa7e00975df6467e9725a9f/nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl", hash = "sha256:a178759ebb095827bd30ef56598ec182b85547f1508941a3d560eb7ea1fbf338", size = 24640306 }, ] @@ -1760,6 +1765,7 @@ name = "nvidia-cuda-runtime-cu12" version = "12.4.127" source = { registry = "https://pypi.org/simple" } wheels = [ + { url = "https://files.pythonhosted.org/packages/a1/aa/b656d755f474e2084971e9a297def515938d56b466ab39624012070cb773/nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_aarch64.whl", hash = "sha256:961fe0e2e716a2a1d967aab7caee97512f71767f852f67432d572e36cb3a11f3", size = 894177 }, { url = "https://files.pythonhosted.org/packages/ea/27/1795d86fe88ef397885f2e580ac37628ed058a92ed2c39dc8eac3adf0619/nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl", hash = "sha256:64403288fa2136ee8e467cdc9c9427e0434110899d07c779f25b5c068934faa5", size = 883737 }, ] @@ -1782,6 +1788,7 @@ dependencies = [ { name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ + { url = "https://files.pythonhosted.org/packages/7a/8a/0e728f749baca3fbeffad762738276e5df60851958be7783af121a7221e7/nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_aarch64.whl", hash = "sha256:5dad8008fc7f92f5ddfa2101430917ce2ffacd86824914c82e28990ad7f00399", size = 211422548 }, { url = "https://files.pythonhosted.org/packages/27/94/3266821f65b92b3138631e9c8e7fe1fb513804ac934485a8d05776e1dd43/nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl", hash = "sha256:f083fc24912aa410be21fa16d157fed2055dab1cc4b6934a0e03cba69eb242b9", size = 211459117 }, ] @@ -1790,6 +1797,7 @@ name = "nvidia-curand-cu12" version = "10.3.5.147" source = { registry = "https://pypi.org/simple" } wheels = [ + { url = "https://files.pythonhosted.org/packages/80/9c/a79180e4d70995fdf030c6946991d0171555c6edf95c265c6b2bf7011112/nvidia_curand_cu12-10.3.5.147-py3-none-manylinux2014_aarch64.whl", hash = "sha256:1f173f09e3e3c76ab084aba0de819c49e56614feae5c12f69883f4ae9bb5fad9", size = 56314811 }, { url = "https://files.pythonhosted.org/packages/8a/6d/44ad094874c6f1b9c654f8ed939590bdc408349f137f9b98a3a23ccec411/nvidia_curand_cu12-10.3.5.147-py3-none-manylinux2014_x86_64.whl", hash = "sha256:a88f583d4e0bb643c49743469964103aa59f7f708d862c3ddb0fc07f851e3b8b", size = 56305206 }, ] @@ -1803,6 +1811,7 @@ dependencies = [ { name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ + { url = "https://files.pythonhosted.org/packages/46/6b/a5c33cf16af09166845345275c34ad2190944bcc6026797a39f8e0a282e0/nvidia_cusolver_cu12-11.6.1.9-py3-none-manylinux2014_aarch64.whl", hash = "sha256:d338f155f174f90724bbde3758b7ac375a70ce8e706d70b018dd3375545fc84e", size = 127634111 }, { url = "https://files.pythonhosted.org/packages/3a/e1/5b9089a4b2a4790dfdea8b3a006052cfecff58139d5a4e34cb1a51df8d6f/nvidia_cusolver_cu12-11.6.1.9-py3-none-manylinux2014_x86_64.whl", hash = "sha256:19e33fa442bcfd085b3086c4ebf7e8debc07cfe01e11513cc6d332fd918ac260", size = 127936057 }, ] @@ -1814,6 +1823,7 @@ dependencies = [ { name = "nvidia-nvjitlink-cu12", marker = "(platform_machine != 'aarch64' and sys_platform == 'linux') or (sys_platform != 'darwin' and sys_platform != 'linux')" }, ] wheels = [ + { url = "https://files.pythonhosted.org/packages/96/a9/c0d2f83a53d40a4a41be14cea6a0bf9e668ffcf8b004bd65633f433050c0/nvidia_cusparse_cu12-12.3.1.170-py3-none-manylinux2014_aarch64.whl", hash = "sha256:9d32f62896231ebe0480efd8a7f702e143c98cfaa0e8a76df3386c1ba2b54df3", size = 207381987 }, { url = "https://files.pythonhosted.org/packages/db/f7/97a9ea26ed4bbbfc2d470994b8b4f338ef663be97b8f677519ac195e113d/nvidia_cusparse_cu12-12.3.1.170-py3-none-manylinux2014_x86_64.whl", hash = "sha256:ea4f11a2904e2a8dc4b1833cc1b5181cde564edd0d5cd33e3c168eff2d1863f1", size = 207454763 }, ] @@ -1839,6 +1849,7 @@ name = "nvidia-nvjitlink-cu12" version = "12.4.127" source = { registry = "https://pypi.org/simple" } wheels = [ + { url = "https://files.pythonhosted.org/packages/02/45/239d52c05074898a80a900f49b1615d81c07fceadd5ad6c4f86a987c0bc4/nvidia_nvjitlink_cu12-12.4.127-py3-none-manylinux2014_aarch64.whl", hash = "sha256:4abe7fef64914ccfa909bc2ba39739670ecc9e820c83ccc7a6ed414122599b83", size = 20552510 }, { url = "https://files.pythonhosted.org/packages/ff/ff/847841bacfbefc97a00036e0fce5a0f086b640756dc38caea5e1bb002655/nvidia_nvjitlink_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl", hash = "sha256:06b3b9b25bf3f8af351d664978ca26a16d2c5127dbd53c0497e28d1fb9611d57", size = 21066810 }, ] @@ -1847,6 +1858,7 @@ name = "nvidia-nvtx-cu12" version = "12.4.127" source = { registry = "https://pypi.org/simple" } wheels = [ + { url = "https://files.pythonhosted.org/packages/06/39/471f581edbb7804b39e8063d92fc8305bdc7a80ae5c07dbe6ea5c50d14a5/nvidia_nvtx_cu12-12.4.127-py3-none-manylinux2014_aarch64.whl", hash = "sha256:7959ad635db13edf4fc65c06a6e9f9e55fc2f92596db928d169c0bb031e88ef3", size = 100417 }, { url = "https://files.pythonhosted.org/packages/87/20/199b8713428322a2f22b722c62b8cc278cc53dffa9705d744484b5035ee9/nvidia_nvtx_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl", hash = "sha256:781e950d9b9f60d8241ccea575b32f5105a5baf4c2351cab5256a24869f12a1a", size = 99144 }, ] @@ -2536,6 +2548,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8a/0b/9fcc47d19c48b59121088dd6da2488a49d5f72dacf8262e2790a1d2c7d15/pygments-2.19.1-py3-none-any.whl", hash = "sha256:9ea1544ad55cecf4b8242fab6dd35a93bbce657034b0611ee383099054ab6d8c", size = 1225293 }, ] +[[package]] +name = "pynvml" +version = "12.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "nvidia-ml-py" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/26/6f/6b5880ed0239e85b9a39aed103b65b2ef81425beef9f45e5c035bf008330/pynvml-12.0.0.tar.gz", hash = "sha256:299ce2451a6a17e6822d6faee750103e25b415f06f59abb8db65d30f794166f5", size = 33636 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ed/df/f7cf07a65a96dd11d71f346f9c2863accdd4784da83af7181b067d556cbc/pynvml-12.0.0-py3-none-any.whl", hash = "sha256:fdff84b62a27dbe98e08e1a647eb77342bef1aebe0878bcd15e99a83fcbecb9e", size = 26560 }, +] + [[package]] name = "pytest" version = "8.3.4"