Skip to content

Commit

Permalink
fix(streaming): group by order for raw event connector (#1987)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike authored Dec 19, 2024
1 parent 68cbb6b commit 16c52ea
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
9 changes: 8 additions & 1 deletion openmeter/streaming/clickhouse/raw_events/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log/slog"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -337,6 +338,11 @@ func (c *Connector) queryCountEvents(ctx context.Context, namespace string, para
}

func (c *Connector) queryMeter(ctx context.Context, namespace string, meter models.Meter, params streaming.QueryParams) ([]models.MeterQueryRow, error) {
// We sort the group by keys to ensure the order of the group by columns is deterministic
// It helps testing the SQL queries.
groupBy := params.GroupBy
sort.Strings(groupBy)

queryMeter := queryMeter{
Database: c.config.Database,
EventsTableName: c.config.EventsTableName,
Expand All @@ -346,7 +352,7 @@ func (c *Connector) queryMeter(ctx context.Context, namespace string, meter mode
To: params.To,
Subject: params.FilterSubject,
FilterGroupBy: params.FilterGroupBy,
GroupBy: params.GroupBy,
GroupBy: groupBy,
WindowSize: params.WindowSize,
WindowTimeZone: params.WindowTimeZone,
}
Expand Down Expand Up @@ -393,6 +399,7 @@ func (c *Connector) queryMeter(ctx context.Context, namespace string, meter mode

for i, key := range queryMeter.GroupBy {
if s, ok := args[i+argCount].(*string); ok {
// Subject is a top level field
if key == "subject" {
row.Subject = s
continue
Expand Down
16 changes: 4 additions & 12 deletions openmeter/streaming/clickhouse/raw_events/meter_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package raw_events
import (
_ "embed"
"fmt"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -105,22 +104,15 @@ func (d queryMeter) toSQL() (string, []interface{}, error) {
selectColumns = append(selectColumns, fmt.Sprintf("%s(cast(JSON_VALUE(%s, '%s'), 'Float64')) AS value", sqlAggregation, getColumn("data"), sqlbuilder.Escape(d.Meter.ValueProperty)))
}

groupBys := make([]string, 0, len(d.GroupBy))

for _, groupBy := range d.GroupBy {
if groupBy == "subject" {
for _, groupByKey := range d.GroupBy {
// Subject is a special case as it's a top level column
if groupByKey == "subject" {
selectColumns = append(selectColumns, getColumn("subject"))
groupByColumns = append(groupByColumns, "subject")
continue
}

groupBys = append(groupBys, groupBy)
}

// Select Group By
slices.Sort(groupBys)

for _, groupByKey := range groupBys {
// Group by columns need to be parsed from the JSON data
groupByColumn := sqlbuilder.Escape(groupByKey)
groupByJSONPath := sqlbuilder.Escape(d.Meter.GroupBy[groupByKey])
selectColumn := fmt.Sprintf("JSON_VALUE(%s, '%s') as %s", getColumn("data"), groupByJSONPath, groupByColumn)
Expand Down

0 comments on commit 16c52ea

Please sign in to comment.