Skip to content

Commit

Permalink
prometheus metrics: receipt transfer stats
Browse files Browse the repository at this point in the history
  • Loading branch information
kkowalski-reef committed Dec 12, 2024
1 parent 95c54f2 commit bfe0c72
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 8 deletions.
4 changes: 3 additions & 1 deletion compute_horde/compute_horde/receipts/transfer_checkpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ async def get(self, key: str) -> int:
try:
return int(await self.cache.aget(key, 0))
except TypeError:
logger.error(f"Django cache contained non-integer checkpoint value for {key}. Defaulting to 0.")
logger.error(
f"Django cache contained non-integer checkpoint value for {key}. Defaulting to 0."
)
return 0

async def set(self, key: str, checkpoint: int) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import time
from collections import defaultdict
from collections.abc import Awaitable, Callable, Sequence
from datetime import datetime, timedelta
from typing import cast
Expand All @@ -9,17 +10,48 @@
import bittensor
from asgiref.sync import async_to_sync
from compute_horde.receipts.store.local import N_ACTIVE_PAGES, LocalFilesystemPagedReceiptStore
from compute_horde.receipts.transfer import MinerInfo, ReceiptsTransfer
from compute_horde.receipts.transfer import MinerInfo, ReceiptsTransfer, TransferResult
from django.conf import settings
from django.core.management import BaseCommand
from django.utils import timezone
from prometheus_client import Counter, Gauge, Histogram

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = "Fetch receipts from miners"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

self.m_receipts = Counter(
"receipts",
documentation="Number of transferred receipts",
)
self.m_miners = Gauge(
"miners",
documentation="Number of miners to transfer from",
)
self.m_line_errors = Counter(
"line_errors",
labelnames=["exc_type"],
documentation="Number of invalid lines in received pages",
)
self.m_transfer_errors = Counter(
"transfer_errors",
labelnames=["exc_type"],
documentation="Number of completely failed page transfers",
)
self.m_transfer_duration = Histogram(
"transfer_duration",
documentation="Total time to transfer all required pages",
)
self.m_catchup_pages_left = Gauge(
"catchup_pages_left",
documentation="Pages waiting for catch-up",
)

def add_arguments(self, parser):
parser.add_argument(
"--debug-miner-hotkey",
Expand Down Expand Up @@ -168,28 +200,31 @@ async def catch_up(
Fetches new receipts on given pages one by one.
"""
for idx, page in enumerate(pages):
self.m_catchup_pages_left.set(len(pages) - idx)
start_time = time.monotonic()
current_loop_miners = await miners()
result = await ReceiptsTransfer.transfer(
miners=await miners(),
miners=current_loop_miners,
pages=[page],
session=session,
semaphore=semaphore,
# We may need to download a lot of full pages, so the timeout is higher.
request_timeout=3.0,
)
elapsed = time.monotonic() - start_time
rps = result.n_receipts / elapsed

logger.info(
f"Catching up: "
f"{page=} ({idx + 1}/{len(pages)}) "
f"receipts={result.n_receipts} "
f"{elapsed=:.3f} "
f"{rps=:.0f} "
f"transfer_errors={len(result.transfer_errors)} "
f"line_errors={len(result.line_errors)} "
)

self._push_common_metrics(result)
self.m_catchup_pages_left.set(0)

async def keep_up(
self,
interval: float,
Expand All @@ -204,26 +239,44 @@ async def keep_up(
start_time = time.monotonic()
current_page = LocalFilesystemPagedReceiptStore.current_page()
pages = list(reversed(range(current_page - N_ACTIVE_PAGES + 1, current_page + 1)))
current_loop_miners = await miners()
result = await ReceiptsTransfer.transfer(
miners=await miners(),
miners=current_loop_miners,
pages=pages,
session=session,
semaphore=semaphore,
request_timeout=1.0,
)
elapsed = time.monotonic() - start_time
rps = result.n_receipts / elapsed

logger.info(
f"Keeping up: "
f"{pages=} "
f"receipts={result.n_receipts} "
f"{elapsed=:.3f} "
f"{rps=:.0f} "
f"transfer_errors={len(result.transfer_errors)} "
f"line_errors={len(result.line_errors)} "
)

self._push_common_metrics(result)
self.m_miners.set(len(current_loop_miners))
self.m_transfer_duration.observe(elapsed)

# Sleep for the remainder of the time if any
if elapsed < interval:
time.sleep(interval - elapsed)

def _push_common_metrics(self, result: TransferResult):
n_line_errors: defaultdict[type[Exception], int] = defaultdict(int)
for line_error in result.line_errors:
n_line_errors[type(line_error)] += 1
for exc_type, exc_count in n_line_errors.items():
self.m_line_errors.labels(exc_type=exc_type.__name__).inc(exc_count)

n_transfer_errors: defaultdict[type[Exception], int] = defaultdict(int)
for transfer_error in result.transfer_errors:
n_transfer_errors[type(transfer_error)] += 1
for exc_type, exc_count in n_transfer_errors.items():
self.m_transfer_errors.labels(exc_type=exc_type.__name__).inc(exc_count)

self.m_receipts.inc(result.n_receipts)

0 comments on commit bfe0c72

Please sign in to comment.