Skip to content

Commit

Permalink
feat: dont add predicates if already under the route
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Feb 28, 2025
1 parent ea3d4e9 commit 1c1260d
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 25 deletions.
18 changes: 14 additions & 4 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,21 @@ type (
}
)

func NewApplyJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, predicate sqlparser.Expr, joinType sqlparser.JoinType) *ApplyJoin {
func NewApplyJoin(
ctx *plancontext.PlanningContext,
lhs, rhs Operator,
predicate sqlparser.Expr,
joinType sqlparser.JoinType,
pushDownPredicate bool,
) *ApplyJoin {
aj := &ApplyJoin{
binaryOperator: newBinaryOp(lhs, rhs),
Vars: map[string]int{},
JoinType: joinType,
JoinColumns: &applyJoinColumns{},
JoinPredicates: &applyJoinColumns{},
}
aj.AddJoinPredicate(ctx, predicate)
aj.AddJoinPredicate(ctx, predicate, pushDownPredicate)
return aj
}

Expand Down Expand Up @@ -142,7 +148,7 @@ func (aj *ApplyJoin) IsInner() bool {
return aj.JoinType.IsInner()
}

func (aj *ApplyJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
func (aj *ApplyJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr, pushDown bool) {
if expr == nil {
return
}
Expand All @@ -151,9 +157,13 @@ func (aj *ApplyJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sql
for _, pred := range preds {
lhsID := TableID(aj.LHS)
col := breakExpressionInLHSandRHS(ctx, pred, lhsID)
aj.JoinPredicates.add(col)
if !pushDown {
// we are already under a route, so no need to push down predicates and track them
continue
}
newPred := ctx.PredTracker.NewJoinPredicate(col.RHSExpr)
col.JoinPredicateID = newPred.ID
aj.JoinPredicates.add(col)
rhs = rhs.AddPredicate(ctx, newPred)
}
aj.RHS = rhs
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (hj *HashJoin) IsInner() bool {
return !hj.LeftJoin
}

func (hj *HashJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
func (hj *HashJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr, pushDown bool) { // TODO: consider whether we should honor the pushDown flag
cmp, ok := expr.(*sqlparser.ComparisonExpr)
if !ok || !canBeSolvedWithHashJoin(cmp.Operator) {
panic(vterrors.VT12001(fmt.Sprintf("can't use [%s] with hash joins", sqlparser.String(expr))))
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/hash_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestJoinPredicates(t *testing.T) {
Left: lcol,
Right: rcol,
}
hj.AddJoinPredicate(ctx, cmp)
hj.AddJoinPredicate(ctx, cmp, false)
require.Len(t, hj.JoinComparisons, 1)
hj.planOffsets(ctx)
require.Len(t, hj.LHSKeys, 1)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (j *Join) IsInner() bool {
return j.JoinType.IsInner()
}

func (j *Join) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) {
func (j *Join) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr, _ bool) {
j.Predicate = ctx.SemTable.AndExpressions(j.Predicate, expr)
}

Expand Down
7 changes: 1 addition & 6 deletions go/vt/vtgate/planbuilder/operators/join_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,11 @@ func mergeShardedRouting(r1 *ShardedRouting, r2 *ShardedRouting) *ShardedRouting
return tr
}

func (jm *joinMerger) getApplyJoin(ctx *plancontext.PlanningContext, op1, op2 *Route) *ApplyJoin {
return NewApplyJoin(ctx, op1.Source, op2.Source, ctx.SemTable.AndExpressions(jm.predicates...), jm.joinType)
}

func (jm *joinMerger) merge(ctx *plancontext.PlanningContext, op1, op2 *Route, r Routing) *Route {
aj := jm.getApplyJoin(ctx, op1, op2)
aj := NewApplyJoin(ctx, op1.Source, op2.Source, ctx.SemTable.AndExpressions(jm.predicates...), jm.joinType, false)
for _, column := range aj.JoinPredicates.columns {
ctx.PredTracker.Set(column.JoinPredicateID, column.Original)
}
//UpdateRoutingLogic(ctx, )
return &Route{
unaryOperator: newUnaryOp(aj),
MergedWith: []*Route{op2},
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/joins.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type JoinOp interface {
SetRHS(Operator)
MakeInner()
IsInner() bool
AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr)
AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr, pushDown bool)
}

func IsOuter(outer JoinOp) bool {
Expand Down Expand Up @@ -85,7 +85,7 @@ func AddPredicate(
return newFilter(join, expr)
}

join.AddJoinPredicate(ctx, expr)
join.AddJoinPredicate(ctx, expr, true)

return join
}
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtgate/planbuilder/operators/predicates/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ type JoinPredicate struct {
}

func (j *JoinPredicate) Clone(inner sqlparser.SQLNode) sqlparser.SQLNode {
expr, ok := inner.(sqlparser.Expr)
if !ok {
panic(vterrors.VT13001("unexpected type in JoinPredicate.Clone"))
}
return j.tracker.NewJoinPredicate(expr)
panic(vterrors.VT13001("unexpected type in JoinPredicate.Clone"))
//expr, ok := inner.(sqlparser.Expr)
//if !ok {
// panic(vterrors.VT13001("unexpected type in JoinPredicate.Clone"))
//}
//return j.tracker.NewJoinPredicate(expr)
}

var _ sqlparser.Expr = (*JoinPredicate)(nil)
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtgate/planbuilder/operators/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,22 +302,22 @@ func mergeOrJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredic
// we can't switch sides, so let's see if we can use a HashJoin to solve it
join := NewHashJoin(lhs, rhs, !joinType.IsInner())
for _, pred := range joinPredicates {
join.AddJoinPredicate(ctx, pred)
join.AddJoinPredicate(ctx, pred, true)
}
ctx.SemTable.QuerySignature.HashJoin = true
return join, Rewrote("use a hash join because we have LIMIT on the LHS")
}

join := NewApplyJoin(ctx, Clone(rhs), Clone(lhs), nil, joinType)
join := NewApplyJoin(ctx, Clone(rhs), Clone(lhs), nil, joinType, false)
for _, pred := range joinPredicates {
join.AddJoinPredicate(ctx, pred)
join.AddJoinPredicate(ctx, pred, true)
}
return join, Rewrote("logical join to applyJoin, switching side because LIMIT")
}

join := NewApplyJoin(ctx, Clone(lhs), Clone(rhs), nil, joinType)
join := NewApplyJoin(ctx, Clone(lhs), Clone(rhs), nil, joinType, false)
for _, pred := range joinPredicates {
join.AddJoinPredicate(ctx, pred)
join.AddJoinPredicate(ctx, pred, true)
}

return join, Rewrote("logical join to applyJoin ")
Expand Down

0 comments on commit 1c1260d

Please sign in to comment.