Skip to content

Commit

Permalink
feat: wavefront fetcher (#413)
Browse files Browse the repository at this point in the history
Introducing a wavefront data fetcher.
Supports simple metric based queries as well as raw queries.

---------

Signed-off-by: Avik Basu <[email protected]>
Co-authored-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 and ab93 authored Sep 18, 2024
1 parent f6ce546 commit ed81ef0
Show file tree
Hide file tree
Showing 6 changed files with 383 additions and 1 deletion.
2 changes: 2 additions & 0 deletions numalogic/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
RDSFetcherConf,
)
from numalogic.connectors.prometheus import PrometheusFetcher
from numalogic.connectors.wavefront import WavefrontFetcher

__all__ = [
"RedisConf",
Expand All @@ -23,6 +24,7 @@
"RDSFetcher",
"RDSConf",
"RDSFetcherConf",
"WavefrontFetcher",
]

if find_spec("boto3"):
Expand Down
159 changes: 159 additions & 0 deletions numalogic/connectors/wavefront.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import os
from datetime import datetime
from typing import Optional

import pandas as pd
from wavefront_api_client import Configuration, QueryApi, ApiClient

from numalogic.connectors._base import DataFetcher
from numalogic.tools.exceptions import WavefrontFetcherError

import logging

LOGGER = logging.getLogger(__name__)


class WavefrontFetcher(DataFetcher):
"""
Fetches data from Wavefront.
Args:
url (str): Wavefront URL.
api_token (str): Wavefront API token.
Raises
------
ValueError: If API token is not provided.
WavefrontFetcherError: If there is an error fetching data from Wavefront.
"""

def __init__(self, url: str, api_token: Optional[str] = None):
super().__init__(url)
api_token = api_token or os.getenv("WAVEFRONT_API_TOKEN")
if not api_token:
raise ValueError("WAVEFRONT API token is not provided")
configuration = Configuration()
configuration.host = url
configuration.api_key["X-AUTH-TOKEN"] = api_token
self.api_client = QueryApi(
ApiClient(
configuration,
header_name="Authorization",
header_value=f"Bearer {api_token}",
)
)

def _call_api(
self, query: str, start: int, end: Optional[int], granularity: str
) -> pd.DataFrame:
"""Calls the Wavefront API to fetch data."""
return self.api_client.query_api(
query, start, granularity, e=end, include_obsolete_metrics=True, use_raw_qk=True
)

@staticmethod
def _format_results(res: dict) -> pd.DataFrame:
"""Validates and formats the results from the API."""
if res.get("error_type") is not None:
raise WavefrontFetcherError(
f"Error fetching data from Wavefront: "
f"{res.get('error_type')}: {res.get('error_message')}"
)
if res.get("timeseries") is None:
raise WavefrontFetcherError("No timeseries data found for the query")
dfs = []
for ts in res["timeseries"]:
dfs.append(pd.DataFrame(ts["data"], columns=["timestamp", "value"]))
df = pd.concat(dfs)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
return df.set_index("timestamp").sort_index()

def fetch(
self,
metric: str,
start: datetime,
filters: Optional[dict] = None,
end: Optional[datetime] = None,
granularity: str = "m",
) -> pd.DataFrame:
"""
Fetches data from Wavefront as a single metric.
Args:
metric (str): Metric to fetch. Example: 'system.cpu.usage'.
Do not include the 'ts()' function.
start (datetime): Start time.
filters (dict): Filters to apply to the query.
end (datetime): End time. Set to None to fetch data until now.
granularity (str): Granularity of the data. Default is 'm' (minute).
Returns
-------
Dataframe with the fetched data in the format: timestamp (index), value (column).
Raises
------
WavefrontFetcherError: If there is an error fetching data from Wavefront
"""
start = int(start.timestamp())
if end:
end = int(end.timestamp())
if filters:
_filters = " and ".join([f'{key}="{value}"' for key, value in filters.items()])
query = f"ts({metric}, {_filters})"
else:
query = f"ts({metric}"
LOGGER.info("Fetching data from Wavefront for query: %s", query)
res = self._call_api(query, start, end, granularity)
return self._format_results(res.to_dict())

def raw_fetch(
self,
query: str,
start: datetime,
filters: Optional[dict] = None,
end: Optional[datetime] = None,
granularity: str = "m",
) -> pd.DataFrame:
"""
Fetches data from Wavefront using a raw query, allowing for more complex queries.
Args:
query (str): Raw query to fetch data.
start (datetime): Start time.
filters (dict): Filters to apply to the query.
end (datetime): End time. Set to None to fetch data until now.
granularity (str): Granularity of the data. Default is 'm' (minute).
Returns
-------
Dataframe with the fetched data in the format: timestamp (index), value (column).
Raises
------
WavefrontFetcherError:
- If there is an error fetching data from Wavefront
- If there is a key error in the query.
>>> from datetime import datetime, timedelta
...
>>> fetcher = WavefrontFetcher(url="https://miata.wavefront.com", api_token="6spd-manual")
>>> df = fetcher.raw_fetch(
... query="rawsum(ts(engine.rpm, gear='{gear}' and track='{track}'))",
... start=datetime.now() - timedelta(minutes=5),
... filters={"gear": "1", "track": "laguna_seca"},
... end=datetime.now(),
... )
"""
start = start.timestamp()
if end:
end = end.timestamp()

try:
query = query.format(**filters)
except KeyError as key_err:
raise WavefrontFetcherError(f"Key error in query: {key_err}") from key_err

LOGGER.info("Fetching data from Wavefront for query: %s", query)
qres = self._call_api(query, start, granularity, end)
return self._format_results(qres.to_dict())
6 changes: 6 additions & 0 deletions numalogic/tools/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,9 @@ class RDSFetcherError(Exception):
"""Base class for all exceptions raised by the RDSFetcher class."""

pass


class WavefrontFetcherError(Exception):
"""Base class for all exceptions raised by the WavefrontFetcher class."""

pass
19 changes: 18 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pynumaflow = "~0.8"
prometheus_client = "^0.18.0"
structlog = "^24.1.0"
numalogic-prometheus = { version = "^0.8", allow-prereleases = true }
wavefront-api-client = "^2.202.2"

# extras
mlflow-skinny = { version = "^2.0", optional = true }
Expand Down
Loading

0 comments on commit ed81ef0

Please sign in to comment.