From 1b0a1f636b9aed611e7b00fd0699ae50f6e55951 Mon Sep 17 00:00:00 2001 From: volokluev <3169433+volokluev@users.noreply.github.com> Date: Thu, 31 Oct 2024 15:18:56 -0700 Subject: [PATCH] feat(eap): Timeseries V1 RPC (#6475) This PR implements the basics of the timeseries API. Supported: * Aggregations on all attributes * filters * groupby * zerofilling nonexistent data To come in future PRs: * Sample count * extrapolation --- snuba/web/rpc/common/common.py | 10 +- snuba/web/rpc/v1/endpoint_time_series.py | 314 +++++++++ snuba/web/rpc/v1/endpoint_trace_item_table.py | 2 +- tests/web/rpc/v1/test_endpoint_time_series.py | 612 ++++++++++++++++++ 4 files changed, 932 insertions(+), 6 deletions(-) create mode 100644 snuba/web/rpc/v1/endpoint_time_series.py create mode 100644 tests/web/rpc/v1/test_endpoint_time_series.py diff --git a/snuba/web/rpc/common/common.py b/snuba/web/rpc/common/common.py index b1bc1a0a99..cf6cc9199c 100644 --- a/snuba/web/rpc/common/common.py +++ b/snuba/web/rpc/common/common.py @@ -271,7 +271,7 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression :param item_filter: :return: """ - if item_filter.and_filter: + if item_filter.HasField("and_filter"): filters = item_filter.and_filter.filters if len(filters) == 0: return literal(True) @@ -279,7 +279,7 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression return trace_item_filters_to_expression(filters[0]) return and_cond(*(trace_item_filters_to_expression(x) for x in filters)) - if item_filter.or_filter: + if item_filter.HasField("or_filter"): filters = item_filter.or_filter.filters if len(filters) == 0: raise BadSnubaRPCRequestException( @@ -289,7 +289,7 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression return trace_item_filters_to_expression(filters[0]) return or_cond(*(trace_item_filters_to_expression(x) for x in filters)) - if item_filter.comparison_filter: + if item_filter.HasField("comparison_filter"): k = item_filter.comparison_filter.key k_expression = attribute_key_to_expression(k) op = item_filter.comparison_filter.op @@ -337,7 +337,7 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression f"Invalid string comparison, unknown op: {item_filter.comparison_filter}" ) - if item_filter.exists_filter: + if item_filter.HasField("exists_filter"): k = item_filter.exists_filter.key if k.name in NORMALIZED_COLUMNS.keys(): return f.isNotNull(column(k.name)) @@ -346,7 +346,7 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression else: return f.mapContains(column("attr_num"), literal(k.name)) - raise Exception("Unknown filter: ", item_filter) + return literal(True) def project_id_and_org_conditions(meta: RequestMeta) -> Expression: diff --git a/snuba/web/rpc/v1/endpoint_time_series.py b/snuba/web/rpc/v1/endpoint_time_series.py new file mode 100644 index 0000000000..8ffd8dbfb4 --- /dev/null +++ b/snuba/web/rpc/v1/endpoint_time_series.py @@ -0,0 +1,314 @@ +import math +import uuid +from collections import defaultdict +from datetime import datetime +from typing import Any, Dict, Iterable, Type + +from google.protobuf.json_format import MessageToDict +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( + DataPoint, + TimeSeries, + TimeSeriesRequest, + TimeSeriesResponse, +) + +from snuba.attribution.appid import AppID +from snuba.attribution.attribution_info import AttributionInfo +from snuba.datasets.entities.entity_key import EntityKey +from snuba.datasets.entities.factory import get_entity +from snuba.datasets.pluggable_dataset import PluggableDataset +from snuba.query import OrderBy, OrderByDirection, SelectedExpression +from snuba.query.data_source.simple import Entity +from snuba.query.dsl import column +from snuba.query.logical import Query +from snuba.query.query_settings import HTTPQuerySettings +from snuba.request import Request as SnubaRequest +from snuba.web.query import run_query +from snuba.web.rpc import RPCEndpoint +from snuba.web.rpc.common.common import ( + aggregation_to_expression, + attribute_key_to_expression, + base_conditions_and, + trace_item_filters_to_expression, + treeify_or_and_conditions, +) +from snuba.web.rpc.common.debug_info import ( + extract_response_meta, + setup_trace_query_settings, +) +from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException + +_VALID_GRANULARITY_SECS = set( + [ + 15, + 30, + 60, # seconds + 2 * 60, + 5 * 60, + 10 * 60, + 30 * 60, # minutes + 1 * 3600, + 3 * 3600, + 12 * 3600, + 24 * 3600, # hours + ] +) + +_MAX_BUCKETS_IN_REQUEST = 1000 + + +def _convert_result_timeseries( + request: TimeSeriesRequest, data: list[Dict[str, Any]] +) -> Iterable[TimeSeries]: + """This function takes the results of the clickhouse query and converts it to a list of TimeSeries objects. It also handles + zerofilling data points where data was not present for a specific bucket. + + Example: + data is a list of dictionaries that look like this: + + >>> [ + >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a1"} + >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a2"} + >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a1"} + >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a2"} + >>> # next time bucket starts below + + >>> {"time": "2024-4-20 16:21:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a1"} + >>> # here you can see that not every timeseries had data in every time bucket + >>> {"time": "2024-4-20 16:22:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a2"} + >>> {"time": "2024-4-20 16:23:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a1"} + >>> {"time": "2024-4-20 16:24:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a2"} + + >>> ... + >>> ] + + In this example we have 8 different timeseries and they are all sparse: + + sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a1"} + sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a2"} + sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a1"} + sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a2"} + + + p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a1"} + p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a2"} + p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a1"} + p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a2"} + + Returns: + an Iterable of TimeSeries objects where each possible bucket has a DataPoint with `data_present` set correctly + + """ + + # to convert the results, need to know which were the groupby columns and which ones + # were aggregations + aggregation_labels = set([agg.label for agg in request.aggregations]) + group_by_labels = set([attr.name for attr in request.group_by]) + + # create a mapping with (all the group by attribute key,val pairs as strs, label name) + # In the example in the docstring it would look like: + # { ("group_by_attr_1,g1|group_by_attr_2,g2", "sum(sentry.duration"): TimeSeries()} + result_timeseries: dict[tuple[str, str], TimeSeries] = {} + + # create a mapping for each timeseries of timestamp: row to fill data points not returned in the query + # { + # ("group_by_attr_1,g1|group_by_attr_2,g2", "sum(sentry.duration"): { + # time_converted_to_integer_timestamp: row_data_for_that_time_bucket + # } + # } + result_timeseries_timestamp_to_row: defaultdict[ + tuple[str, str], dict[int, Dict[str, Any]] + ] = defaultdict(dict) + + query_duration = ( + request.meta.end_timestamp.seconds - request.meta.start_timestamp.seconds + ) + time_buckets = [ + Timestamp(seconds=(request.meta.start_timestamp.seconds) + secs) + for secs in range(0, query_duration, request.granularity_secs) + ] + + # this loop fill in our pre-computed dictionaries so that we can zerofill later + for row in data: + group_by_map = {} + + for col_name, col_value in row.items(): + if col_name in group_by_labels: + group_by_map[col_name] = col_value + + group_by_key = "|".join([f"{k},{v}" for k, v in group_by_map.items()]) + for col_name in aggregation_labels: + if not result_timeseries.get((group_by_key, col_name), None): + result_timeseries[(group_by_key, col_name)] = TimeSeries( + group_by_attributes=group_by_map, + label=col_name, + buckets=time_buckets, + ) + result_timeseries_timestamp_to_row[(group_by_key, col_name)][ + int(datetime.fromisoformat(row["time"]).timestamp()) + ] = row + + # Go through every possible time bucket in the query, if there's row data for it, fill in its data + # otherwise put a dummy datapoint in + + for bucket in time_buckets: + for timeseries_key, timeseries in result_timeseries.items(): + row_data = result_timeseries_timestamp_to_row.get(timeseries_key, {}).get( + bucket.seconds + ) + if not row_data: + timeseries.data_points.append(DataPoint(data=0, data_present=False)) + else: + timeseries.data_points.append( + DataPoint(data=row_data[timeseries.label], data_present=True) + ) + return result_timeseries.values() + + +def _build_query(request: TimeSeriesRequest) -> Query: + # TODO: This is hardcoded still + entity = Entity( + key=EntityKey("eap_spans"), + schema=get_entity(EntityKey("eap_spans")).get_data_model(), + sample=None, + ) + + aggregation_columns = [ + SelectedExpression( + name=aggregation.label, expression=aggregation_to_expression(aggregation) + ) + for aggregation in request.aggregations + ] + + groupby_columns = [ + SelectedExpression( + name=attr_key.name, expression=attribute_key_to_expression(attr_key) + ) + for attr_key in request.group_by + ] + + res = Query( + from_clause=entity, + selected_columns=[ + SelectedExpression(name="time", expression=column("time", alias="time")), + *aggregation_columns, + *groupby_columns, + ], + granularity=request.granularity_secs, + condition=base_conditions_and( + request.meta, trace_item_filters_to_expression(request.filter) + ), + groupby=[ + column("time"), + *[attribute_key_to_expression(attr_key) for attr_key in request.group_by], + ], + order_by=[OrderBy(expression=column("time"), direction=OrderByDirection.ASC)], + ) + treeify_or_and_conditions(res) + return res + + +def _build_snuba_request(request: TimeSeriesRequest) -> SnubaRequest: + query_settings = ( + setup_trace_query_settings() if request.meta.debug else HTTPQuerySettings() + ) + + return SnubaRequest( + id=uuid.UUID(request.meta.request_id), + original_body=MessageToDict(request), + query=_build_query(request), + query_settings=query_settings, + attribution_info=AttributionInfo( + referrer=request.meta.referrer, + team="eap", + feature="eap", + tenant_ids={ + "organization_id": request.meta.organization_id, + "referrer": request.meta.referrer, + }, + app_id=AppID("eap"), + parent_api="eap_span_samples", + ), + ) + + +def _enforce_no_duplicate_labels(request: TimeSeriesRequest) -> None: + labels = set() + + for agg in request.aggregations: + if agg.label in labels: + raise BadSnubaRPCRequestException(f"duplicate label {agg.label} in request") + labels.add(agg.label) + + +def _validate_time_buckets(request: TimeSeriesRequest) -> None: + if request.meta.start_timestamp.seconds > request.meta.end_timestamp.seconds: + raise BadSnubaRPCRequestException("start timestamp is after end timestamp") + if request.granularity_secs == 0: + raise BadSnubaRPCRequestException("granularity of 0 is invalid") + + if request.granularity_secs not in _VALID_GRANULARITY_SECS: + raise BadSnubaRPCRequestException( + f"Granularity of {request.granularity_secs} is not valid, valid granularity_secs: {sorted(_VALID_GRANULARITY_SECS)}" + ) + request_duration = ( + request.meta.end_timestamp.seconds - request.meta.start_timestamp.seconds + ) + num_buckets = request_duration / request.granularity_secs + if num_buckets > _MAX_BUCKETS_IN_REQUEST: + raise BadSnubaRPCRequestException( + f"This request is asking for too many datapoints ({num_buckets}, please raise your granularity_secs or shorten your time window" + ) + if num_buckets < 1: + raise BadSnubaRPCRequestException( + "This request will return no datapoints lower your granularity or lengthen your time window" + ) + + ceil_num_buckets = math.ceil(num_buckets) + # if the granularity and time windoes don't match up evenly, adjust the window to include another data point + if num_buckets != ceil_num_buckets: + request.meta.end_timestamp.seconds = request.meta.start_timestamp.seconds + ( + ceil_num_buckets * request.granularity_secs + ) + + +class EndpointTimeSeries(RPCEndpoint[TimeSeriesRequest, TimeSeriesResponse]): + @classmethod + def version(cls) -> str: + return "v1" + + @classmethod + def request_class(cls) -> Type[TimeSeriesRequest]: + return TimeSeriesRequest + + @classmethod + def response_class(cls) -> Type[TimeSeriesResponse]: + return TimeSeriesResponse + + def _execute(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse: + # TODO: Move this to base + in_msg.meta.request_id = getattr(in_msg.meta, "request_id", None) or str( + uuid.uuid4() + ) + _enforce_no_duplicate_labels(in_msg) + _validate_time_buckets(in_msg) + snuba_request = _build_snuba_request(in_msg) + res = run_query( + dataset=PluggableDataset(name="eap", all_entities=[]), + request=snuba_request, + timer=self._timer, + ) + response_meta = extract_response_meta( + in_msg.meta.request_id, + in_msg.meta.debug, + [res], + [self._timer], + ) + + return TimeSeriesResponse( + result_timeseries=list( + _convert_result_timeseries(in_msg, res.result.get("data", [])) + ), + meta=response_meta, + ) diff --git a/snuba/web/rpc/v1/endpoint_trace_item_table.py b/snuba/web/rpc/v1/endpoint_trace_item_table.py index e179c19dca..9b3399aeb4 100644 --- a/snuba/web/rpc/v1/endpoint_trace_item_table.py +++ b/snuba/web/rpc/v1/endpoint_trace_item_table.py @@ -209,7 +209,7 @@ def _validate_select_and_groupby(in_msg: TraceItemTableRequest) -> None: aggregation_present = any([c for c in in_msg.columns if c.HasField("aggregation")]) if non_aggregted_columns != grouped_by_columns and aggregation_present: raise BadSnubaRPCRequestException( - f"Non aggregated columns should be in group_by. non_aggregted_columns: {non_aggregted_columns}, grouped_by_columns: {grouped_by_columns}" + f"Non aggregated columns should be in group_by. non_aggregated_columns: {non_aggregted_columns}, grouped_by_columns: {grouped_by_columns}" ) diff --git a/tests/web/rpc/v1/test_endpoint_time_series.py b/tests/web/rpc/v1/test_endpoint_time_series.py new file mode 100644 index 0000000000..82d15efbaa --- /dev/null +++ b/tests/web/rpc/v1/test_endpoint_time_series.py @@ -0,0 +1,612 @@ +import uuid +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from typing import Any, Callable, MutableMapping + +import pytest +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( + DataPoint, + TimeSeries, + TimeSeriesRequest, +) +from sentry_protos.snuba.v1.error_pb2 import Error +from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeAggregation, + AttributeKey, + AttributeValue, + ExtrapolationMode, + Function, +) +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ( + ComparisonFilter, + TraceItemFilter, +) + +from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.storage_key import StorageKey +from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException +from snuba.web.rpc.v1.endpoint_time_series import ( + EndpointTimeSeries, + _validate_time_buckets, +) +from tests.base import BaseApiTest +from tests.helpers import write_raw_unprocessed_events + + +def gen_message( + dt: datetime, tags: dict[str, str], numerical_attributes: dict[str, float] +) -> MutableMapping[str, Any]: + return { + "description": "/api/0/relays/projectconfigs/", + "duration_ms": 152, + "event_id": "d826225de75d42d6b2f01b957d51f18f", + "exclusive_time_ms": 0.228, + "is_segment": True, + "data": { + "sentry.environment": "development", + "sentry.release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "thread.name": "uWSGIWorker1Core0", + "thread.id": "8522009600", + "sentry.segment.name": "/api/0/relays/projectconfigs/", + "sentry.sdk.name": "sentry.python.django", + "sentry.sdk.version": "2.7.0", + **numerical_attributes, + }, + "measurements": { + "num_of_spans": {"value": 50.0}, + "client_sample_rate": {"value": 1}, + }, + "organization_id": 1, + "origin": "auto.http.django", + "project_id": 1, + "received": 1721319572.877828, + "retention_days": 90, + "segment_id": "8873a98879faf06d", + "sentry_tags": { + "category": "http", + "environment": "development", + "op": "http.server", + "platform": "python", + "release": "backend@24.7.0.dev0+c45b49caed1e5fcbf70097ab3f434b487c359b6b", + "sdk.name": "sentry.python.django", + "sdk.version": "2.7.0", + "status": "ok", + "status_code": "200", + "thread.id": "8522009600", + "thread.name": "uWSGIWorker1Core0", + "trace.status": "ok", + "transaction": "/api/0/relays/projectconfigs/", + "transaction.method": "POST", + "transaction.op": "http.server", + "user": "ip:127.0.0.1", + }, + "span_id": uuid.uuid4().hex, + "tags": tags, + "trace_id": uuid.uuid4().hex, + "start_timestamp_ms": int(dt.timestamp()) * 1000, + "start_timestamp_precise": dt.timestamp(), + "end_timestamp_precise": dt.timestamp() + 1, + } + + +BASE_TIME = datetime.utcnow().replace( + hour=8, minute=0, second=0, microsecond=0, tzinfo=UTC +) - timedelta(hours=24) + + +SecsFromSeriesStart = int + + +@dataclass +class DummyMetric: + name: str + get_value: Callable[[SecsFromSeriesStart], float] + + +def store_timeseries( + start_datetime: datetime, + period_secs: int, + len_secs: int, + metrics: list[DummyMetric], + tags: dict[str, str] | None = None, +) -> None: + tags = tags or {} + messages = [] + for secs in range(0, len_secs, period_secs): + dt = start_datetime + timedelta(seconds=secs) + numerical_attributes = {m.name: m.get_value(secs) for m in metrics} + messages.append(gen_message(dt, tags, numerical_attributes)) + spans_storage = get_storage(StorageKey("eap_spans")) + write_raw_unprocessed_events(spans_storage, messages) # type: ignore + + +@pytest.mark.clickhouse_db +@pytest.mark.redis_db +class TestTimeSeriesApi(BaseApiTest): + def test_basic(self) -> None: + ts = Timestamp() + ts.GetCurrentTime() + tstart = Timestamp(seconds=ts.seconds - 3600) + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=tstart, + end_timestamp=ts, + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_AVG, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="sentry.duration" + ), + label="p50", + ), + AttributeAggregation( + aggregate=Function.FUNCTION_P95, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="sentry.duration" + ), + label="p90", + ), + ], + granularity_secs=60, + ) + response = self.app.post( + "/rpc/EndpointTimeSeries/v1", data=message.SerializeToString() + ) + if response.status_code != 200: + error = Error() + error.ParseFromString(response.data) + assert response.status_code == 200, (error.message, error.details) + + def test_sum(self) -> None: + # store a a test metric with a value of 1, every second of one hour + granularity_secs = 300 + query_duration = 60 * 30 + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[DummyMetric("test_metric", get_value=lambda x: 1)], + ) + + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp( + seconds=int(BASE_TIME.timestamp() + query_duration) + ), + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + AttributeAggregation( + aggregate=Function.FUNCTION_AVG, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="avg", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + granularity_secs=granularity_secs, + ) + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range(0, query_duration, granularity_secs) + ] + assert sorted(response.result_timeseries, key=lambda x: x.label) == [ + TimeSeries( + label="avg", + buckets=expected_buckets, + data_points=[ + DataPoint(data=1, data_present=True) + for _ in range(len(expected_buckets)) + ], + ), + TimeSeries( + label="sum", + buckets=expected_buckets, + data_points=[ + DataPoint(data=300, data_present=True) + for _ in range(len(expected_buckets)) + ], + ), + ] + + def test_with_group_by(self) -> None: + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[DummyMetric("test_metric", get_value=lambda x: 1)], + tags={"consumer_group": "a", "environment": "prod"}, + ) + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[DummyMetric("test_metric", get_value=lambda x: 10)], + tags={"consumer_group": "z", "environment": "prod"}, + ) + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[DummyMetric("test_metric", get_value=lambda x: 100)], + tags={"consumer_group": "z", "environment": "dev"}, + ) + + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)), + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + group_by=[ + AttributeKey(type=AttributeKey.TYPE_STRING, name="consumer_group"), + AttributeKey(type=AttributeKey.TYPE_STRING, name="environment"), + ], + granularity_secs=300, + ) + + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range(0, 60 * 30, 300) + ] + + def sort_key(t: TimeSeries) -> tuple[str, str]: + return ( + t.group_by_attributes["consumer_group"], + t.group_by_attributes["environment"], + ) + + assert sorted(response.result_timeseries, key=sort_key) == sorted( + [ + TimeSeries( + label="sum", + buckets=expected_buckets, + group_by_attributes={"consumer_group": "a", "environment": "prod"}, + data_points=[ + DataPoint(data=300, data_present=True) + for _ in range(len(expected_buckets)) + ], + ), + TimeSeries( + label="sum", + buckets=expected_buckets, + group_by_attributes={"consumer_group": "z", "environment": "prod"}, + data_points=[ + DataPoint(data=3000, data_present=True) + for _ in range(len(expected_buckets)) + ], + ), + TimeSeries( + label="sum", + buckets=expected_buckets, + group_by_attributes={"consumer_group": "z", "environment": "dev"}, + data_points=[ + DataPoint(data=30000, data_present=True) + for _ in range(len(expected_buckets)) + ], + ), + ], + key=sort_key, + ) + + def test_with_no_data_present(self) -> None: + granularity_secs = 300 + query_duration = 60 * 30 + store_timeseries( + BASE_TIME, + 1800, + 3600, + metrics=[DummyMetric("sparse_metric", get_value=lambda x: 1)], + ) + + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp( + seconds=int(BASE_TIME.timestamp() + query_duration) + ), + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="sparse_metric" + ), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + AttributeAggregation( + aggregate=Function.FUNCTION_AVG, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="sparse_metric" + ), + label="avg", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + granularity_secs=granularity_secs, + ) + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range(0, query_duration, granularity_secs) + ] + assert sorted(response.result_timeseries, key=lambda x: x.label) == [ + TimeSeries( + label="avg", + buckets=expected_buckets, + data_points=[ + DataPoint(data=1, data_present=True), + *[DataPoint() for _ in range(len(expected_buckets) - 1)], + ], + ), + TimeSeries( + label="sum", + buckets=expected_buckets, + data_points=[ + DataPoint(data=1, data_present=True), + *[DataPoint() for _ in range(len(expected_buckets) - 1)], + ], + ), + ] + pass + + def test_with_filters(self) -> None: + # store a a test metric with a value of 1, every second of one hour + granularity_secs = 300 + query_duration = 60 * 30 + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[DummyMetric("test_metric", get_value=lambda x: 1)], + tags={"customer": "bob"}, + ) + + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[DummyMetric("test_metric", get_value=lambda x: 999)], + tags={"customer": "alice"}, + ) + + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp( + seconds=int(BASE_TIME.timestamp() + query_duration) + ), + debug=True, + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + AttributeAggregation( + aggregate=Function.FUNCTION_AVG, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="avg", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + filter=TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(type=AttributeKey.TYPE_STRING, name="customer"), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_str="bob"), + ) + ), + granularity_secs=granularity_secs, + ) + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range(0, query_duration, granularity_secs) + ] + assert sorted(response.result_timeseries, key=lambda x: x.label) == [ + TimeSeries( + label="avg", + buckets=expected_buckets, + data_points=[ + DataPoint(data=1, data_present=True) + for _ in range(len(expected_buckets)) + ], + ), + TimeSeries( + label="sum", + buckets=expected_buckets, + data_points=[ + DataPoint(data=300, data_present=True) + for _ in range(len(expected_buckets)) + ], + ), + ] + + def test_with_unaligned_granularities(self) -> None: + query_offset = 5 + query_duration = 1800 + query_offset + granularity_secs = 300 + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[DummyMetric("test_metric", get_value=lambda x: 1)], + tags={"customer": "bob"}, + ) + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp( + seconds=int(BASE_TIME.timestamp()) + query_duration + ), + debug=True, + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + granularity_secs=granularity_secs, + ) + + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range( + 0, query_duration - query_offset + granularity_secs, granularity_secs + ) + ] + assert response.result_timeseries == [ + TimeSeries( + label="sum", + buckets=expected_buckets, + data_points=[ + DataPoint(data=300, data_present=True) + for _ in range(len(expected_buckets)) + ], + ) + ] + + +class TestUtils: + def test_no_duplicate_labels(self) -> None: + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + debug=True, + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + AttributeAggregation( + aggregate=Function.FUNCTION_AVG, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + granularity_secs=1, + ) + + with pytest.raises(BadSnubaRPCRequestException): + EndpointTimeSeries().execute(message) + + @pytest.mark.parametrize( + ("start_ts", "end_ts", "granularity"), + [ + (BASE_TIME, BASE_TIME + timedelta(hours=1), 1), + (BASE_TIME, BASE_TIME + timedelta(hours=24), 15), + (BASE_TIME, BASE_TIME + timedelta(hours=1), 0), + (BASE_TIME + timedelta(hours=1), BASE_TIME, 0), + (BASE_TIME, BASE_TIME + timedelta(hours=1), 3 * 3600), + ], + ) + def test_bad_granularity( + self, start_ts: datetime, end_ts: datetime, granularity: int + ) -> None: + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(start_ts.timestamp())), + end_timestamp=Timestamp(seconds=int(end_ts.timestamp())), + debug=True, + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + granularity_secs=granularity, + ) + + with pytest.raises(BadSnubaRPCRequestException): + _validate_time_buckets(message) + + def test_adjust_buckets(self) -> None: + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp()) + 65), + debug=True, + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey(type=AttributeKey.TYPE_FLOAT, name="test_metric"), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + granularity_secs=15, + ) + + _validate_time_buckets(message) + # add another bucket to fit into granularity_secs + assert message.meta.end_timestamp.seconds == int(BASE_TIME.timestamp()) + 75