diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index 6a577026a49b2..78af117c80518 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -304,6 +305,7 @@ func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) { type MergeFirstOverTimeExpr struct { syntax.SampleExpr downstreams []DownstreamSampleExpr + offset time.Duration } func (e MergeFirstOverTimeExpr) String() string { @@ -332,6 +334,7 @@ func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) { type MergeLastOverTimeExpr struct { syntax.SampleExpr downstreams []DownstreamSampleExpr + offset time.Duration } func (e MergeLastOverTimeExpr) String() string { @@ -590,7 +593,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( } } - return NewMergeFirstOverTimeStepEvaluator(params, xs), nil + return NewMergeFirstOverTimeStepEvaluator(params, xs, e.offset), nil case *MergeLastOverTimeExpr: queries := make([]DownstreamQuery, len(e.downstreams)) @@ -625,7 +628,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator( return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type()) } } - return NewMergeLastOverTimeStepEvaluator(params, xs), nil + return NewMergeLastOverTimeStepEvaluator(params, xs, e.offset), nil case *CountMinSketchEvalExpr: queries := make([]DownstreamQuery, len(e.downstreams)) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 615bf80a9c276..b3c0e1ede7f15 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -64,6 +64,7 @@ func TestMappingEquivalence(t *testing.T) { {`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s])`, true, nil}, {`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s]) without (stream)`, true, nil}, {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}}, + {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s] offset 2s)`, true, []string{ShardQuantileOverTime}}, { ` (quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a) > 1) @@ -75,8 +76,12 @@ func TestMappingEquivalence(t *testing.T) { }, {`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}}, {`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}}, + {`first_over_time({a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, false, []string{ShardFirstOverTime}}, + {`first_over_time({a=~".+"} | logfmt | unwrap value [1s] offset -2s) by (a)`, false, []string{ShardFirstOverTime}}, {`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}}, {`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}}, + {`last_over_time({a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, false, []string{ShardLastOverTime}}, + {`last_over_time({a=~".+"} | logfmt | unwrap value [1s] offset -2s) by (a)`, false, []string{ShardLastOverTime}}, // topk prefers already-seen values in tiebreakers. Since the test data generates // the same log lines for each series & the resulting promql.Vectors aren't deterministically // sorted by labels, we don't expect this to pass. @@ -190,6 +195,7 @@ func TestMappingEquivalenceSketches(t *testing.T) { }{ {`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05}, {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02}, + {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, 0.02}, } { q := NewMockQuerier( shards, @@ -241,8 +247,8 @@ func TestMappingEquivalenceSketches(t *testing.T) { // plus set step and interval to 0 params, err := NewLiteralParams( tc.query, - time.Unix(1, 0), - time.Unix(1, 0), + time.Unix(10, 0), + time.Unix(10, 0), 0, 0, logproto.FORWARD, @@ -294,7 +300,7 @@ func TestApproxTopkSketches(t *testing.T) { shardedQuery string regularQuery string realtiveError float64 - //cardinalityEstimate int + // cardinalityEstimate int }{ // Note:our data generation results in less spread between topk things for 10k streams than for 100k streams // if we have 1k streams, we can get much more accurate results for topk 10 than topk 100 @@ -304,7 +310,7 @@ func TestApproxTopkSketches(t *testing.T) { shardedQuery: `approx_topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, regularQuery: `topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`, realtiveError: 0.0012, - //cardinalityEstimate: 3, + // cardinalityEstimate: 3, }, { labelShards: 10, diff --git a/pkg/logql/first_last_over_time.go b/pkg/logql/first_last_over_time.go index 4b6bbde55173c..f379401d47ac6 100644 --- a/pkg/logql/first_last_over_time.go +++ b/pkg/logql/first_last_over_time.go @@ -16,6 +16,15 @@ func newFirstWithTimestampIterator( it iter.PeekingSampleIterator, selRange, step, start, end, offset int64, ) RangeVectorIterator { + // forces at least one step. + if step == 0 { + step = 1 + } + if offset != 0 { + start = start - offset + end = end - offset + } + inner := &batchRangeVectorIterator{ iter: it, step: step, @@ -70,6 +79,15 @@ func newLastWithTimestampIterator( it iter.PeekingSampleIterator, selRange, step, start, end, offset int64, ) RangeVectorIterator { + // forces at least one step. + if step == 0 { + step = 1 + } + if offset != 0 { + start = start - offset + end = end - offset + } + inner := &batchRangeVectorIterator{ iter: it, step: step, @@ -127,6 +145,7 @@ type mergeOverTimeStepEvaluator struct { step time.Duration matrices []promql.Matrix merge func(promql.Vector, int, int, promql.Series) promql.Vector + offset time.Duration } // Next returns the first or last element within one step of each matrix. @@ -170,6 +189,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) { // inRange returns true if t is in step range of ts. func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool { + // The time stamp needs to be adjusted because the original datapoint at t is + // from a shifted query. + ts -= e.offset.Milliseconds() + // special case instant queries if e.step.Milliseconds() == 0 { return true @@ -181,7 +204,7 @@ func (*mergeOverTimeStepEvaluator) Close() error { return nil } func (*mergeOverTimeStepEvaluator) Error() error { return nil } -func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { +func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix, offset time.Duration) StepEvaluator { if len(m) == 0 { return EmptyEvaluator[SampleVector]{} } @@ -199,6 +222,7 @@ func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEv step: step, matrices: m, merge: mergeFirstOverTime, + offset: offset, } } @@ -218,7 +242,7 @@ func mergeFirstOverTime(vec promql.Vector, pos int, nSeries int, series promql.S return vec } -func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { +func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix, offset time.Duration) StepEvaluator { if len(m) == 0 { return EmptyEvaluator[SampleVector]{} } @@ -236,6 +260,7 @@ func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEva step: step, matrices: m, merge: mergeLastOverTime, + offset: offset, } } diff --git a/pkg/logql/quantile_over_time_sketch.go b/pkg/logql/quantile_over_time_sketch.go index 49eca3453e081..538013e7c6e66 100644 --- a/pkg/logql/quantile_over_time_sketch.go +++ b/pkg/logql/quantile_over_time_sketch.go @@ -185,6 +185,15 @@ func newQuantileSketchIterator( it iter.PeekingSampleIterator, selRange, step, start, end, offset int64, ) RangeVectorIterator { + // forces at least one step. + if step == 0 { + step = 1 + } + if offset != 0 { + start = start - offset + end = end - offset + } + inner := &batchRangeVectorIterator{ iter: it, step: step, @@ -302,23 +311,20 @@ func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluat // QuantileSketchMatrixStepEvaluator steps through a matrix of quantile sketch // vectors, ie t-digest or DDSketch structures per time step. type QuantileSketchMatrixStepEvaluator struct { - start, end, ts time.Time - step time.Duration - m ProbabilisticQuantileMatrix + end, ts time.Time + step time.Duration + m ProbabilisticQuantileMatrix } func NewQuantileSketchMatrixStepEvaluator(m ProbabilisticQuantileMatrix, params Params) *QuantileSketchMatrixStepEvaluator { var ( - start = params.Start() - end = params.End() - step = params.Step() + step = params.Step() ) return &QuantileSketchMatrixStepEvaluator{ - start: start, - end: end, - ts: start.Add(-step), // will be corrected on first Next() call - step: step, - m: m, + end: params.End(), + ts: params.Start().Add(-step), // will be corrected on first Next() call + step: step, + m: m, } } diff --git a/pkg/logql/range_vector_test.go b/pkg/logql/range_vector_test.go index 0fe81e7e2a193..f0ee5454b6334 100644 --- a/pkg/logql/range_vector_test.go +++ b/pkg/logql/range_vector_test.go @@ -67,7 +67,6 @@ func newPoint(t time.Time, v float64) promql.FPoint { } func Benchmark_RangeVectorIteratorCompare(b *testing.B) { - // no overlap test case. buildStreamingIt := func() (RangeVectorIterator, error) { tt := struct { @@ -183,7 +182,6 @@ func Benchmark_RangeVectorIteratorCompare(b *testing.B) { } } }) - } func Benchmark_RangeVectorIterator(b *testing.B) { @@ -214,7 +212,6 @@ func Benchmark_RangeVectorIterator(b *testing.B) { i++ } } - } func Test_RangeVectorIterator_InstantQuery(t *testing.T) { @@ -445,6 +442,7 @@ func Test_RangeVectorIterator(t *testing.T) { time.Unix(110, 0), time.Unix(120, 0), }, { + // TODO: use this test case (5 * time.Second).Nanoseconds(), // no overlap (30 * time.Second).Nanoseconds(), (10 * time.Second).Nanoseconds(), diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index fd07ec0a73ba3..3ffa3cc030c1c 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -593,6 +593,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, return &MergeFirstOverTimeExpr{ downstreams: downstreams, + offset: expr.Left.Offset, }, bytesPerShard, nil case syntax.OpRangeTypeLast: if !m.lastOverTimeSharding { @@ -623,6 +624,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, return &MergeLastOverTimeExpr{ downstreams: downstreams, + offset: expr.Left.Offset, }, bytesPerShard, nil default: // don't shard if there's not an appropriate optimization