Skip to content

Commit

Permalink
Reimplement Walkable and use RootVisitor where possible
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jan 31, 2025
1 parent 66a8595 commit 0cc6d8f
Show file tree
Hide file tree
Showing 18 changed files with 522 additions and 466 deletions.
52 changes: 29 additions & 23 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ func (d DownstreamLogSelectorExpr) Pretty(level int) string {
return s
}

func (d DownstreamSampleExpr) Walk(f syntax.WalkFn) { f(d) }
func (d DownstreamSampleExpr) Visit(f syntax.WalkFn) error {
return nil
}

var defaultMaxDepth = 4

Expand Down Expand Up @@ -171,9 +173,11 @@ func (c *ConcatSampleExpr) string(maxDepth int) string {
return fmt.Sprintf("%s ++ %s", c.DownstreamSampleExpr.String(), c.next.string(maxDepth-1))
}

func (c *ConcatSampleExpr) Walk(f syntax.WalkFn) {
f(c)
f(c.next)
func (c *ConcatSampleExpr) Visit(f syntax.WalkFn) error {
if c == nil {
return nil
}
return syntax.Walk(f, c.DownstreamSampleExpr, c.next)
}

// ConcatSampleExpr has no LogQL repretenstation. It is expressed in in the
Expand Down Expand Up @@ -269,9 +273,11 @@ func (e QuantileSketchEvalExpr) String() string {
return fmt.Sprintf("quantileSketchEval<%s>", e.quantileMergeExpr.String())
}

func (e *QuantileSketchEvalExpr) Walk(f syntax.WalkFn) {
f(e)
e.quantileMergeExpr.Walk(f)
func (e *QuantileSketchEvalExpr) Visit(f syntax.WalkFn) error {
if e == nil {
return nil
}
return syntax.Walk(f, e.quantileMergeExpr)
}

type QuantileSketchMergeExpr struct {
Expand All @@ -295,11 +301,11 @@ func (e QuantileSketchMergeExpr) String() string {
return fmt.Sprintf("quantileSketchMerge<%s>", sb.String())
}

func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
func (e *QuantileSketchMergeExpr) Visit(f syntax.WalkFn) error {
if e == nil {
return nil
}
return syntax.Walk(f, syntax.ConvertToWalkables(e.downstreams)...)
}

type MergeFirstOverTimeExpr struct {
Expand All @@ -324,11 +330,11 @@ func (e MergeFirstOverTimeExpr) String() string {
return fmt.Sprintf("MergeFirstOverTime<%s>", sb.String())
}

func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
func (e *MergeFirstOverTimeExpr) Visit(f syntax.WalkFn) error {
if e == nil {
return nil
}
return syntax.Walk(f, syntax.ConvertToWalkables(e.downstreams)...)
}

type MergeLastOverTimeExpr struct {
Expand All @@ -353,11 +359,11 @@ func (e MergeLastOverTimeExpr) String() string {
return fmt.Sprintf("MergeLastOverTime<%s>", sb.String())
}

func (e *MergeLastOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
func (e *MergeLastOverTimeExpr) Visit(f syntax.WalkFn) error {
if e == nil {
return nil
}
return syntax.Walk(f, syntax.ConvertToWalkables(e.downstreams)...)
}

type CountMinSketchEvalExpr struct {
Expand All @@ -381,11 +387,11 @@ func (e CountMinSketchEvalExpr) String() string {
return fmt.Sprintf("CountMinSketchEval<%s>", sb.String())
}

func (e *CountMinSketchEvalExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
func (e *CountMinSketchEvalExpr) Visit(f syntax.WalkFn) error {
if e == nil {
return nil
}
return syntax.Walk(f, syntax.ConvertToWalkables(e.downstreams)...)
}

type Downstreamable interface {
Expand Down
15 changes: 7 additions & 8 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,15 +487,14 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval

func (q *query) checkIntervalLimit(expr syntax.SampleExpr, limit time.Duration) error {
var err error
expr.Walk(func(e syntax.Expr) {
switch e := e.(type) {
case *syntax.RangeAggregationExpr:
if e.Left == nil || e.Left.Interval <= limit {
return
v := &syntax.DepthFirstTraversal{
VisitLogRangeFn: func(v syntax.RootVisitor, e *syntax.LogRangeExpr) {

Check warning on line 491 in pkg/logql/engine.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'v' seems to be unused, consider removing or renaming it as _ (revive)
if e.Interval > limit {
err = fmt.Errorf("%w: [%s] > [%s]", logqlmodel.ErrIntervalLimit, model.Duration(e.Interval), model.Duration(limit))
}
err = fmt.Errorf("%w: [%s] > [%s]", logqlmodel.ErrIntervalLimit, model.Duration(e.Left.Interval), model.Duration(limit))
}
})
},
}
expr.Accept(v)
return err
}

Expand Down
27 changes: 14 additions & 13 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,23 +234,24 @@ func ParamOverridesFromShard(base Params, shard *ShardWithChunkRefs) (result Par
}

// Sortable logql contain sort or sort_desc.
func Sortable(q Params) (bool, error) {
var sortable bool
func Sortable(q Params) (sortable bool, err error) {
expr, ok := q.GetExpression().(syntax.SampleExpr)
if !ok {
return false, errors.New("only sample expression supported")
return sortable, errors.New("only sample expression supported")
}
expr.Walk(func(e syntax.Expr) {
rangeExpr, ok := e.(*syntax.VectorAggregationExpr)
if !ok {
return
}
if rangeExpr.Operation == syntax.OpTypeSort || rangeExpr.Operation == syntax.OpTypeSortDesc {
sortable = true
return

err = syntax.Walk(func(node syntax.Walkable) (bool, error) {
switch e := node.(type) {
case *syntax.VectorAggregationExpr:
if e.Operation == syntax.OpTypeSort || e.Operation == syntax.OpTypeSortDesc {
sortable = true
return false, nil
}
}
})
return sortable, nil
return true, nil
}, expr)

return sortable, err
}

// EvaluatorFactory is an interface for iterating over data at different nodes in the AST
Expand Down
108 changes: 54 additions & 54 deletions pkg/logql/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ import "github.com/grafana/loki/v3/pkg/logql/syntax"
func optimizeSampleExpr(expr syntax.SampleExpr) (syntax.SampleExpr, error) {
var skip bool
// we skip sharding AST for now, it's not easy to clone them since they are not part of the language.
expr.Walk(func(e syntax.Expr) {
syntax.Walk(func(e syntax.Walkable) (bool, error) {

Check failure on line 9 in pkg/logql/optimize.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

Error return value of `syntax.Walk` is not checked (errcheck)
switch e.(type) {
case *ConcatSampleExpr, DownstreamSampleExpr, *QuantileSketchEvalExpr, *QuantileSketchMergeExpr, *MergeFirstOverTimeExpr, *MergeLastOverTimeExpr:
skip = true
return
return false, nil
}
})
return true, nil
}, expr)
if skip {
return expr, nil
}
expr, err := syntax.Clone[syntax.SampleExpr](expr)
expr, err := syntax.Clone(expr)
if err != nil {
return nil, err
}
Expand All @@ -26,61 +27,60 @@ func optimizeSampleExpr(expr syntax.SampleExpr) (syntax.SampleExpr, error) {

// removeLineformat removes unnecessary line_format within a SampleExpr.
func removeLineformat(expr syntax.SampleExpr) {
expr.Walk(func(e syntax.Expr) {
rangeExpr, ok := e.(*syntax.RangeAggregationExpr)
if !ok {
return
}
// bytes operation count bytes of the log line so line_format changes the result.
if rangeExpr.Operation == syntax.OpRangeTypeBytes ||
rangeExpr.Operation == syntax.OpRangeTypeBytesRate {
return
}
pipelineExpr, ok := rangeExpr.Left.Left.(*syntax.PipelineExpr)
if !ok {
return
}
temp := pipelineExpr.MultiStages[:0]
for i, s := range pipelineExpr.MultiStages {
_, ok := s.(*syntax.LineFmtExpr)
v := &syntax.DepthFirstTraversal{
VisitRangeAggregationFn: func(v syntax.RootVisitor, e *syntax.RangeAggregationExpr) {

Check warning on line 31 in pkg/logql/optimize.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'v' seems to be unused, consider removing or renaming it as _ (revive)
// bytes operation count bytes of the log line so line_format changes the result.
if e.Operation == syntax.OpRangeTypeBytes ||
e.Operation == syntax.OpRangeTypeBytesRate {
return
}
pipelineExpr, ok := e.Left.Left.(*syntax.PipelineExpr)
if !ok {
temp = append(temp, s)
continue
return
}
// we found a lineFmtExpr, we need to check if it's followed by a labelParser or lineFilter
// in which case it could be useful for further processing.
var found bool
for j := i; j < len(pipelineExpr.MultiStages); j++ {
if _, ok := pipelineExpr.MultiStages[j].(*syntax.LogfmtParserExpr); ok {
found = true
break
}
if _, ok := pipelineExpr.MultiStages[j].(*syntax.ParserExpr); ok {
found = true
break
temp := pipelineExpr.MultiStages[:0]
for i, s := range pipelineExpr.MultiStages {
_, ok := s.(*syntax.LineFmtExpr)
if !ok {
temp = append(temp, s)
continue
}
if _, ok := pipelineExpr.MultiStages[j].(*syntax.LineFilterExpr); ok {
found = true
break
// we found a lineFmtExpr, we need to check if it's followed by a labelParser or lineFilter
// in which case it could be useful for further processing.
var found bool
for j := i; j < len(pipelineExpr.MultiStages); j++ {
if _, ok := pipelineExpr.MultiStages[j].(*syntax.LogfmtParserExpr); ok {
found = true
break
}
if _, ok := pipelineExpr.MultiStages[j].(*syntax.ParserExpr); ok {
found = true
break
}
if _, ok := pipelineExpr.MultiStages[j].(*syntax.LineFilterExpr); ok {
found = true
break
}
if _, ok := pipelineExpr.MultiStages[j].(*syntax.JSONExpressionParserExpr); ok {
found = true
break
}
if _, ok := pipelineExpr.MultiStages[j].(*syntax.LogfmtExpressionParserExpr); ok {
found = true
break
}
}
if _, ok := pipelineExpr.MultiStages[j].(*syntax.JSONExpressionParserExpr); ok {
found = true
break
}
if _, ok := pipelineExpr.MultiStages[j].(*syntax.LogfmtExpressionParserExpr); ok {
found = true
break
if found {
// we cannot remove safely the linefmtExpr.
temp = append(temp, s)
}
}
if found {
// we cannot remove safely the linefmtExpr.
temp = append(temp, s)
pipelineExpr.MultiStages = temp
// transform into a matcherExpr if there's no more pipeline.
if len(pipelineExpr.MultiStages) == 0 {
e.Left.Left = &syntax.MatchersExpr{Mts: e.Left.Left.Matchers()}
}
}
pipelineExpr.MultiStages = temp
// transform into a matcherExpr if there's no more pipeline.
if len(pipelineExpr.MultiStages) == 0 {
rangeExpr.Left.Left = &syntax.MatchersExpr{Mts: rangeExpr.Left.Left.Matchers()}
}
})
},
}
expr.Accept(v)
}
57 changes: 29 additions & 28 deletions pkg/logql/rangemapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,31 +191,32 @@ func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.Vect
// Example: expression `count_over_time({app="foo"}[10m])` returns 10m
func getRangeInterval(expr syntax.SampleExpr) time.Duration {
var rangeInterval time.Duration
expr.Walk(func(e syntax.Expr) {
switch concrete := e.(type) {
case *syntax.RangeAggregationExpr:
rangeInterval = concrete.Left.Interval
}
})
v := &syntax.DepthFirstTraversal{
VisitLogRangeFn: func(_ syntax.RootVisitor, e *syntax.LogRangeExpr) {
rangeInterval = e.Interval
},
}
expr.Accept(v)
return rangeInterval
}

// hasLabelExtractionStage returns true if an expression contains a stage for label extraction,
// such as `| json` or `| logfmt`, that would result in an exploding amount of series in downstream queries.
func hasLabelExtractionStage(expr syntax.SampleExpr) bool {
found := false
expr.Walk(func(e syntax.Expr) {
switch concrete := e.(type) {
case *syntax.LogfmtParserExpr:
v := &syntax.DepthFirstTraversal{
VisitLogfmtParserFn: func(_ syntax.RootVisitor, _ *syntax.LogfmtParserExpr) {
found = true
case *syntax.ParserExpr:
},
VisitParserExprFn: func(_ syntax.RootVisitor, e *syntax.ParserExpr) {
// It will **not** return true for `regexp`, `unpack` and `pattern`, since these label extraction
// stages can control how many labels, and therefore the resulting amount of series, are extracted.
if concrete.Op == syntax.OpParserTypeJSON {
if e.Op == syntax.OpParserTypeJSON {
found = true
}
}
})
},
}
expr.Accept(v)
return found
}

Expand Down Expand Up @@ -291,16 +292,17 @@ func (m RangeMapper) vectorAggrWithRangeDownstreams(expr *syntax.RangeAggregatio
// appendDownstream adds expression expr with a range interval 'interval' and offset 'offset' to the downstreams list.
// Returns the updated downstream ConcatSampleExpr.
func appendDownstream(downstreams *ConcatSampleExpr, expr syntax.SampleExpr, interval time.Duration, offset time.Duration) *ConcatSampleExpr {
sampleExpr := syntax.MustClone(expr)
sampleExpr.Walk(func(e syntax.Expr) {
switch concrete := e.(type) {
case *syntax.RangeAggregationExpr:
concrete.Left.Interval = interval
v := &syntax.DepthFirstTraversal{
VisitRangeAggregationFn: func(_ syntax.RootVisitor, e *syntax.RangeAggregationExpr) {
e.Left.Interval = interval
if offset != 0 {
concrete.Left.Offset = offset
e.Left.Offset = offset
}
}
})
},
}
sampleExpr := syntax.MustClone(expr)
sampleExpr.Accept(v)

downstreams = &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: sampleExpr,
Expand All @@ -313,13 +315,12 @@ func appendDownstream(downstreams *ConcatSampleExpr, expr syntax.SampleExpr, int
func getOffsets(expr syntax.SampleExpr) []time.Duration {
// Expect to always find at most 1 offset, so preallocate it accordingly
offsets := make([]time.Duration, 0, 1)

expr.Walk(func(e syntax.Expr) {
switch concrete := e.(type) {
case *syntax.RangeAggregationExpr:
offsets = append(offsets, concrete.Left.Offset)
}
})
v := &syntax.DepthFirstTraversal{
VisitLogRangeFn: func(_ syntax.RootVisitor, e *syntax.LogRangeExpr) {
offsets = append(offsets, e.Offset)
},
}
expr.Accept(v)
return offsets
}

Expand Down
Loading

0 comments on commit 0cc6d8f

Please sign in to comment.