Skip to content

Commit

Permalink
VReplication: Support for BETWEEN/NOT BETWEEN filter in VStream (#…
Browse files Browse the repository at this point in the history
…17721)

Signed-off-by: Noble Mittal <[email protected]>
Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
beingnoble03 and rohit-nayak-ps authored Feb 19, 2025
1 parent 0c6ad63 commit 83e2a4f
Show file tree
Hide file tree
Showing 5 changed files with 414 additions and 31 deletions.
38 changes: 21 additions & 17 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ func TestVStreamPushdownFilters(t *testing.T) {
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(context.Background(), 1*time.Minute)
streamCtx, streamCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer streamCancel()
done := make(chan struct{})

Expand Down Expand Up @@ -1176,19 +1176,31 @@ func TestVStreamPushdownFilters(t *testing.T) {
Filter: "select * from customer where name = 'påul'",
}},
}
flags := &vtgatepb.VStreamFlags{}
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

// So we should have at least one paul row event in the copy phase.
copyPhaseRowEvents := 0
// And we should have many paul row events in the running phase.
runningPhaseRowEvents := 0
copyPhase := true
// So we should have at least one paul row event in the copy phase, and
// we should have many paul row events in the running phase.
copyPhaseRowEvents, runningPhaseRowEvents := runVStreamAndGetNumOfRowEvents(t, ctx, vstreamConn, vgtid, filter, done)

require.NotZero(t, createdPauls)
require.NotZero(t, createdNonPauls)
require.Greater(t, createdNonPauls, createdPauls)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents)
require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents)
}

// runVStreamAndGetNumOfRowEvents runs VStream with the specified filter and
// returns number of copy phase and running phase row events.
func runVStreamAndGetNumOfRowEvents(t *testing.T, ctx context.Context, vstreamConn *vtgateconn.VTGateConn,
vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, done chan struct{}) (copyPhaseRowEvents int, runningPhaseRowEvents int) {
copyPhase := true
func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{})
require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down Expand Up @@ -1217,13 +1229,5 @@ func TestVStreamPushdownFilters(t *testing.T) {
}
}
}()

require.NotZero(t, createdPauls)
require.NotZero(t, createdNonPauls)
require.Greater(t, createdNonPauls, createdPauls)
require.NotZero(t, copyPhaseRowEvents)
require.NotZero(t, runningPhaseRowEvents)

t.Logf("Created pauls: %d, pauls copied: %d, pauls replicated: %d", createdPauls, copyPhaseRowEvents, runningPhaseRowEvents)
require.Equal(t, createdPauls, copyPhaseRowEvents+runningPhaseRowEvents)
return
}
107 changes: 94 additions & 13 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ const (
IsNotNull
// In is used to filter a comparable column if equals any of the values from a specific tuple
In
// Note that we do not implement filtering for BETWEEN because
// in the plan we rewrite `x BETWEEN a AND b` to `x >= a AND x <= b`
// NotBetween is used to filter a comparable column if it doesn't lie within a specific range
NotBetween
)

// Filter contains opcodes for filtering.
Expand Down Expand Up @@ -273,6 +277,26 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations.
if !found {
return false, nil
}
case NotBetween:
// Note that we do not implement filtering for BETWEEN because
// in the plan we rewrite `x BETWEEN a AND b` to `x >= a AND x <= b`
// This is the filtering for NOT BETWEEN since we don't have support
// for OR yet.
if filter.Values == nil || len(filter.Values) != 2 {
return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected 2 filter values when performing NOT BETWEEN")
}
leftFilterValue, rightFilterValue := filter.Values[0], filter.Values[1]
isValueLessThanLeftFilter, err := compare(LessThan, values[filter.ColNum], leftFilterValue, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
return false, err
}
if isValueLessThanLeftFilter {
continue
}
isValueGreaterThanRightFilter, err := compare(GreaterThan, values[filter.ColNum], rightFilterValue, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil || !isValueGreaterThanRightFilter {
return false, err
}
default:
match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
Expand Down Expand Up @@ -570,6 +594,23 @@ func (plan *Plan) appendTupleFilter(values sqlparser.ValTuple, opcode Opcode, co
return nil
}

func (plan *Plan) getEvalResultForLiteral(expr sqlparser.Expr) (*evalengine.EvalResult, error) {
literalExpr, ok := expr.(*sqlparser.Literal)
if !ok {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
pv, err := evalengine.Translate(literalExpr, &evalengine.Config{
Collation: plan.env.CollationEnv().DefaultConnectionCharset(),
Environment: plan.env,
})
if err != nil {
return nil, err
}
env := evalengine.EmptyExpressionEnv(plan.env)
resolved, err := env.Evaluate(pv)
return &resolved, err
}

func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) error {
if where == nil {
return nil
Expand Down Expand Up @@ -606,21 +647,11 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if err != nil {
return err
}
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
continue
}
val, ok := expr.Right.(*sqlparser.Literal)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
pv, err := evalengine.Translate(val, &evalengine.Config{
Collation: plan.env.CollationEnv().DefaultConnectionCharset(),
Environment: plan.env,
})
if err != nil {
return err
}
env := evalengine.EmptyExpressionEnv(plan.env)
resolved, err := env.Evaluate(pv)
resolved, err := plan.getEvalResultForLiteral(expr.Right)
if err != nil {
return err
}
Expand Down Expand Up @@ -661,6 +692,56 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
})
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
case *sqlparser.BetweenExpr:
qualifiedName, ok := expr.Left.(*sqlparser.ColName)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
if !qualifiedName.Qualifier.IsEmpty() {
return fmt.Errorf("unsupported qualifier for column: %v", sqlparser.String(qualifiedName))
}
colnum, err := findColumn(plan.Table, qualifiedName.Name)
if err != nil {
return err
}
fromResolved, err := plan.getEvalResultForLiteral(expr.From)
if err != nil {
return err
}
toResolved, err := plan.getEvalResultForLiteral(expr.To)
if err != nil {
return err
}

if !expr.IsBetween {
// `x NOT BETWEEN a AND b` means: `x < a OR x > b`
// Also, since we do not have OR implemented yet,
// NOT BETWEEN needs to be handled separately.
plan.Filters = append(plan.Filters, Filter{
Opcode: NotBetween,
ColNum: colnum,
Values: []sqltypes.Value{
fromResolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
toResolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
},
})
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
continue
}

// `x BETWEEN a AND b` means: `x >= a AND x <= b`
plan.Filters = append(plan.Filters, Filter{
Opcode: GreaterThanEqual,
ColNum: colnum,
Value: fromResolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
}, Filter{
Opcode: LessThanEqual,
ColNum: colnum,
Value: toResolved.Value(plan.env.CollationEnv().DefaultConnectionCharset()),
})
// Add it to the expressions that get pushed down to mysqld.
plan.whereExprsToPushDown = append(plan.whereExprsToPushDown, expr)
default:
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
}
Expand Down
100 changes: 99 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,89 @@ func TestPlanBuilder(t *testing.T) {
}},
env: vtenv.NewTestEnv(),
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select val, id from t1 where id between 2 and 5"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarChar,
Charset: unicodeCollationID,
},
}, {
ColNum: 0,
Field: &querypb.Field{
Name: "id",
Type: sqltypes.Int64,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_NUM_FLAG),
},
}},
Filters: []Filter{{
Opcode: GreaterThanEqual,
ColNum: 0,
Value: sqltypes.NewInt64(2),
Vindex: nil,
VindexColumns: nil,
KeyRange: nil,
}, {
Opcode: LessThanEqual,
ColNum: 0,
Value: sqltypes.NewInt64(5),
Vindex: nil,
VindexColumns: nil,
KeyRange: nil,
}},
whereExprsToPushDown: []sqlparser.Expr{
&sqlparser.BetweenExpr{
IsBetween: true,
Left: sqlparser.NewColName("id"),
From: sqlparser.NewIntLiteral("2"),
To: sqlparser.NewIntLiteral("5"),
},
},
env: vtenv.NewTestEnv(),
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select val, id from t1 where id not between 2 and 5"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 1,
Field: &querypb.Field{
Name: "val",
Type: sqltypes.VarChar,
Charset: unicodeCollationID,
},
}, {
ColNum: 0,
Field: &querypb.Field{
Name: "id",
Type: sqltypes.Int64,
Charset: collations.CollationBinaryID,
Flags: uint32(querypb.MySqlFlag_NUM_FLAG),
},
}},
Filters: []Filter{{
Opcode: NotBetween,
ColNum: 0,
Values: []sqltypes.Value{sqltypes.NewInt64(2), sqltypes.NewInt64(5)},
Vindex: nil,
VindexColumns: nil,
KeyRange: nil,
}},
whereExprsToPushDown: []sqlparser.Expr{
&sqlparser.BetweenExpr{
IsBetween: false,
Left: sqlparser.NewColName("id"),
From: sqlparser.NewIntLiteral("2"),
To: sqlparser.NewIntLiteral("5"),
},
},
env: vtenv.NewTestEnv(),
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "/*/"},
Expand Down Expand Up @@ -752,9 +835,22 @@ func TestPlanBuilderFilterComparison(t *testing.T) {
outFilters: []Filter{
{Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}},
},
}, {
name: "between-operator",
inFilter: "select * from t1 where id between 1 and 5",
outFilters: []Filter{
{Opcode: GreaterThanEqual, ColNum: 0, Value: sqltypes.NewInt64(1)},
{Opcode: LessThanEqual, ColNum: 0, Value: sqltypes.NewInt64(5)},
},
}, {
name: "not-between-operator",
inFilter: "select * from t1 where id not between 1 and 5",
outFilters: []Filter{
{Opcode: NotBetween, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(5)}},
},
}, {
name: "vindex-and-operators",
inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz' and id in (100, 30)",
inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz' and id in (100, 30) and id between 20 and 60",
outFilters: []Filter{
{
Opcode: VindexMatch,
Expand All @@ -770,6 +866,8 @@ func TestPlanBuilderFilterComparison(t *testing.T) {
{Opcode: Equal, ColNum: 0, Value: sqltypes.NewInt64(2)},
{Opcode: NotEqual, ColNum: 1, Value: sqltypes.NewVarChar("xyz")},
{Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(100), sqltypes.NewInt64(30)}},
{Opcode: GreaterThanEqual, ColNum: 0, Value: sqltypes.NewInt64(20)},
{Opcode: LessThanEqual, ColNum: 0, Value: sqltypes.NewInt64(60)},
},
}}

Expand Down
Loading

0 comments on commit 83e2a4f

Please sign in to comment.