-
-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(eap): Timeseries V1 RPC #6475
Conversation
❌ 1 Tests Failed:
View the top 1 failed tests by shortest run time
To view individual test run time comparison to the main branch, go to the Test Analytics Dashboard |
) | ||
time_buckets = [ | ||
Timestamp(seconds=(request.meta.start_timestamp.seconds) + secs) | ||
for secs in range(0, query_duration, request.granularity_secs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the granularity doesn't line up with the duration? E.g. a granularity of 61 seconds with a 10 minute window?
Do we need extra validation to ensure that the result timestamps will line up with these generated buckets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I added some logic to handle this and tests for it as well
if col_name in group_by_labels: | ||
group_by_map[col_name] = col_value | ||
|
||
group_by_key = "|".join([f"{k},{v}" for k, v in group_by_map.items()]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a performance nit, but I might make this a tuple instead of a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how much faster that will actually make it as the tuple will still need to be hashed eventually
res = Query( | ||
from_clause=entity, | ||
selected_columns=[ | ||
SelectedExpression(name="time", expression=column("time", alias="time")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather not have things depend on the TimeSeriesProcessor, but that is a personal opinion. I've been trying to deprecate that processor for a while.
) -> Iterable[TimeSeries]: | ||
# to convert the results, need to know which were the groupby columns and which ones | ||
# were aggregations | ||
aggregation_labels = set([agg.label for agg in request.aggregations]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it valid to have a request with duplicate labels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, met me enforce that that does not happen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got a bit confused trying to understand _convert_result_timeseries
but other than that it looks good
|
||
def _execute(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse: | ||
# TODO: Move this to base | ||
in_msg.meta.request_id = getattr(in_msg.meta, "request_id", None) or str( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that all fields of the protobuf can be empty / unset.
Is the only way we enforce required fields right now just from implicit internal error if they don't provide?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah pretty much
for row in data: | ||
group_by_map = {} | ||
|
||
for col_name, col_value in row.items(): | ||
if col_name in group_by_labels: | ||
group_by_map[col_name] = col_value | ||
|
||
group_by_key = "|".join([f"{k},{v}" for k, v in group_by_map.items()]) | ||
for col_name in aggregation_labels: | ||
if not result_timeseries.get((group_by_key, col_name), None): | ||
result_timeseries[(group_by_key, col_name)] = TimeSeries( | ||
group_by_attributes=group_by_map, | ||
label=col_name, | ||
buckets=time_buckets, | ||
) | ||
result_timeseries_timestamp_to_row[(group_by_key, col_name)][ | ||
int(datetime.fromisoformat(row["time"]).timestamp()) | ||
] = row |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is group_by_map and group_by_key? Im not sure what you're doing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an explanation, lemme know if it makes sense now
cd925c9
to
aff967f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
This PR implements the basics of the timeseries API.
Supported:
To come in future PRs: