Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Adjust with offset in last, first and quantile over time queries. #15915

Merged
merged 3 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -304,6 +305,7 @@ func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) {
type MergeFirstOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
offset time.Duration
}

func (e MergeFirstOverTimeExpr) String() string {
Expand Down Expand Up @@ -332,6 +334,7 @@ func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
type MergeLastOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
offset time.Duration
}

func (e MergeLastOverTimeExpr) String() string {
Expand Down Expand Up @@ -590,7 +593,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
}
}

return NewMergeFirstOverTimeStepEvaluator(params, xs), nil
return NewMergeFirstOverTimeStepEvaluator(params, xs, e.offset), nil
case *MergeLastOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

Expand Down Expand Up @@ -625,7 +628,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}
return NewMergeLastOverTimeStepEvaluator(params, xs), nil
return NewMergeLastOverTimeStepEvaluator(params, xs, e.offset), nil
case *CountMinSketchEvalExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

Expand Down
14 changes: 10 additions & 4 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestMappingEquivalence(t *testing.T) {
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s])`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s]) without (stream)`, true, nil},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s] offset 2s)`, true, []string{ShardQuantileOverTime}},
{
`
(quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a) > 1)
Expand All @@ -75,8 +76,12 @@ func TestMappingEquivalence(t *testing.T) {
},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s] offset -2s) by (a)`, false, []string{ShardFirstOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s] offset -2s) by (a)`, false, []string{ShardLastOverTime}},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
Expand Down Expand Up @@ -190,6 +195,7 @@ func TestMappingEquivalenceSketches(t *testing.T) {
}{
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, 0.02},
} {
q := NewMockQuerier(
shards,
Expand Down Expand Up @@ -241,8 +247,8 @@ func TestMappingEquivalenceSketches(t *testing.T) {
// plus set step and interval to 0
params, err := NewLiteralParams(
tc.query,
time.Unix(1, 0),
time.Unix(1, 0),
time.Unix(10, 0),
time.Unix(10, 0),
0,
0,
logproto.FORWARD,
Expand Down Expand Up @@ -294,7 +300,7 @@ func TestApproxTopkSketches(t *testing.T) {
shardedQuery string
regularQuery string
realtiveError float64
//cardinalityEstimate int
// cardinalityEstimate int
}{
// Note:our data generation results in less spread between topk things for 10k streams than for 100k streams
// if we have 1k streams, we can get much more accurate results for topk 10 than topk 100
Expand All @@ -304,7 +310,7 @@ func TestApproxTopkSketches(t *testing.T) {
shardedQuery: `approx_topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
regularQuery: `topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.0012,
//cardinalityEstimate: 3,
// cardinalityEstimate: 3,
},
{
labelShards: 10,
Expand Down
29 changes: 27 additions & 2 deletions pkg/logql/first_last_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ func newFirstWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64,
) RangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
if offset != 0 {
start = start - offset
end = end - offset
}

inner := &batchRangeVectorIterator{
iter: it,
step: step,
Expand Down Expand Up @@ -70,6 +79,15 @@ func newLastWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64,
) RangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
if offset != 0 {
start = start - offset
end = end - offset
}

inner := &batchRangeVectorIterator{
iter: it,
step: step,
Expand Down Expand Up @@ -127,6 +145,7 @@ type mergeOverTimeStepEvaluator struct {
step time.Duration
matrices []promql.Matrix
merge func(promql.Vector, int, int, promql.Series) promql.Vector
offset time.Duration
}

// Next returns the first or last element within one step of each matrix.
Expand Down Expand Up @@ -170,6 +189,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) {

// inRange returns true if t is in step range of ts.
func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
// The time stamp needs to be adjusted because the original datapoint at t is
// from a shifted query.
ts -= e.offset.Milliseconds()

// special case instant queries
if e.step.Milliseconds() == 0 {
return true
Expand All @@ -181,7 +204,7 @@ func (*mergeOverTimeStepEvaluator) Close() error { return nil }

func (*mergeOverTimeStepEvaluator) Error() error { return nil }

func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator {
func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix, offset time.Duration) StepEvaluator {
if len(m) == 0 {
return EmptyEvaluator[SampleVector]{}
}
Expand All @@ -199,6 +222,7 @@ func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEv
step: step,
matrices: m,
merge: mergeFirstOverTime,
offset: offset,
}
}

Expand All @@ -218,7 +242,7 @@ func mergeFirstOverTime(vec promql.Vector, pos int, nSeries int, series promql.S
return vec
}

func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator {
func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix, offset time.Duration) StepEvaluator {
if len(m) == 0 {
return EmptyEvaluator[SampleVector]{}
}
Expand All @@ -236,6 +260,7 @@ func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEva
step: step,
matrices: m,
merge: mergeLastOverTime,
offset: offset,
}
}

Expand Down
28 changes: 17 additions & 11 deletions pkg/logql/quantile_over_time_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ func newQuantileSketchIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64,
) RangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
if offset != 0 {
start = start - offset
end = end - offset
}

inner := &batchRangeVectorIterator{
iter: it,
step: step,
Expand Down Expand Up @@ -302,23 +311,20 @@ func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluat
// QuantileSketchMatrixStepEvaluator steps through a matrix of quantile sketch
// vectors, ie t-digest or DDSketch structures per time step.
type QuantileSketchMatrixStepEvaluator struct {
start, end, ts time.Time
step time.Duration
m ProbabilisticQuantileMatrix
end, ts time.Time
step time.Duration
m ProbabilisticQuantileMatrix
}

func NewQuantileSketchMatrixStepEvaluator(m ProbabilisticQuantileMatrix, params Params) *QuantileSketchMatrixStepEvaluator {
var (
start = params.Start()
end = params.End()
step = params.Step()
step = params.Step()
)
return &QuantileSketchMatrixStepEvaluator{
start: start,
end: end,
ts: start.Add(-step), // will be corrected on first Next() call
step: step,
m: m,
end: params.End(),
ts: params.Start().Add(-step), // will be corrected on first Next() call
step: step,
m: m,
}
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/logql/range_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func newPoint(t time.Time, v float64) promql.FPoint {
}

func Benchmark_RangeVectorIteratorCompare(b *testing.B) {

// no overlap test case.
buildStreamingIt := func() (RangeVectorIterator, error) {
tt := struct {
Expand Down Expand Up @@ -183,7 +182,6 @@ func Benchmark_RangeVectorIteratorCompare(b *testing.B) {
}
}
})

}

func Benchmark_RangeVectorIterator(b *testing.B) {
Expand Down Expand Up @@ -214,7 +212,6 @@ func Benchmark_RangeVectorIterator(b *testing.B) {
i++
}
}

}

func Test_RangeVectorIterator_InstantQuery(t *testing.T) {
Expand Down Expand Up @@ -445,6 +442,7 @@ func Test_RangeVectorIterator(t *testing.T) {
time.Unix(110, 0), time.Unix(120, 0),
},
{
// TODO: use this test case
(5 * time.Second).Nanoseconds(), // no overlap
(30 * time.Second).Nanoseconds(),
(10 * time.Second).Nanoseconds(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,

return &MergeFirstOverTimeExpr{
downstreams: downstreams,
offset: expr.Left.Offset,
}, bytesPerShard, nil
case syntax.OpRangeTypeLast:
if !m.lastOverTimeSharding {
Expand Down Expand Up @@ -623,6 +624,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,

return &MergeLastOverTimeExpr{
downstreams: downstreams,
offset: expr.Left.Offset,
}, bytesPerShard, nil
default:
// don't shard if there's not an appropriate optimization
Expand Down
Loading