Skip to content

Commit

Permalink
Engine primitive for join-values (#17518)
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui authored Feb 25, 2025
1 parent e6b1b5d commit 965253a
Show file tree
Hide file tree
Showing 18 changed files with 519 additions and 93 deletions.
35 changes: 35 additions & 0 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 18 additions & 18 deletions go/vt/vtgate/engine/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestDeleteUnsharded(t *testing.T) {
},
}

vc := newDMLTestVCursor("0")
vc := newTestVCursor("0")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestDeleteEqual(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestDeleteEqualMultiCol(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestDeleteEqualNoRoute(t *testing.T) {
},
}

vc := newDMLTestVCursor("0")
vc := newTestVCursor("0")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestDeleteEqualNoScatter(t *testing.T) {
},
}

vc := newDMLTestVCursor("0")
vc := newTestVCursor("0")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "cannot map vindex to unique keyspace id: DestinationKeyRange(-)")
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
"1|4|5|6",
)}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.results = results

_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand All @@ -231,7 +231,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
})

// No rows changing
vc = newDMLTestVCursor("-20", "20-")
vc = newTestVCursor("-20", "20-")
_, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand All @@ -252,7 +252,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
"1|4|5|6",
"1|7|8|9",
)}
vc = newDMLTestVCursor("-20", "20-")
vc = newTestVCursor("-20", "20-")
vc.results = results

_, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestDeleteOwnedVindexMultiCol(t *testing.T) {
"1|2|4",
)}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.results = results

_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestDeleteSharded(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestDeleteShardedStreaming(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
err := del.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error {
return nil
})
Expand Down Expand Up @@ -435,7 +435,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
"1|4|5|6",
)}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.results = results

_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand All @@ -453,7 +453,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
})

// No rows changing
vc = newDMLTestVCursor("-20", "20-")
vc = newTestVCursor("-20", "20-")

_, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
Expand All @@ -475,7 +475,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
"1|4|5|6",
"1|7|8|9",
)}
vc = newDMLTestVCursor("-20", "20-")
vc = newTestVCursor("-20", "20-")
vc.results = results

_, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -528,7 +528,7 @@ func TestDeleteInChangedVindexMultiCol(t *testing.T) {
"1|3|6",
"2|3|7",
)}
vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.results = results

_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -565,7 +565,7 @@ func TestDeleteEqualSubshard(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.shardForKsid = []string{"-20", "20-"}
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
Expand Down Expand Up @@ -602,7 +602,7 @@ func TestDeleteMultiEqual(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.shardForKsid = []string{"-20", "20-"}
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
Expand Down Expand Up @@ -635,7 +635,7 @@ func TestDeleteInUnique(t *testing.T) {
Type: querypb.Type_TUPLE,
Values: append([]*querypb.Value{sqltypes.ValueToProto(sqltypes.NewInt64(1))}, sqltypes.ValueToProto(sqltypes.NewInt64(2)), sqltypes.ValueToProto(sqltypes.NewInt64(4))),
}
vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.shardForKsid = []string{"-20", "20-"}
_, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{"__vals": tupleBV}, false)
require.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/dml_with_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestDeleteWithInputSingleOffset(t *testing.T) {
OutputCols: [][]int{{0}},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestDeleteWithInputMultiOffset(t *testing.T) {
OutputCols: [][]int{{1, 0}},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestDeleteWithMultiTarget(t *testing.T) {
OutputCols: [][]int{{0}, {1, 2}},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
_, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestUpdateWithInputNonLiteral(t *testing.T) {
},
}

vc := newDMLTestVCursor("-20", "20-")
vc := newTestVCursor("-20", "20-")
vc.results = []*sqltypes.Result{
{RowsAffected: 1}, {RowsAffected: 1}, {RowsAffected: 1},
}
Expand Down
13 changes: 10 additions & 3 deletions go/vt/vtgate/engine/fake_primitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type fakePrimitive struct {
allResultsInOneCall bool

async bool

useNewPrintBindVars bool
}

func (f *fakePrimitive) Inputs() ([]Primitive, []map[string]any) {
Expand All @@ -72,7 +74,12 @@ func (f *fakePrimitive) GetTableName() string {
}

func (f *fakePrimitive) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields))
if f.useNewPrintBindVars {
f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields))
} else {
f.log = append(f.log, fmt.Sprintf("Execute %v %v", deprecatedPrintBindVars(bindVars), wantfields))
}

if f.results == nil {
return nil, f.sendErr
}
Expand All @@ -87,7 +94,7 @@ func (f *fakePrimitive) TryExecute(ctx context.Context, vcursor VCursor, bindVar

func (f *fakePrimitive) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
if !f.noLog {
f.log = append(f.log, fmt.Sprintf("StreamExecute %v %v", printBindVars(bindVars), wantfields))
f.log = append(f.log, fmt.Sprintf("StreamExecute %v %v", deprecatedPrintBindVars(bindVars), wantfields))
}
if f.results == nil {
return f.sendErr
Expand Down Expand Up @@ -171,7 +178,7 @@ func (f *fakePrimitive) asyncCall(callback func(*sqltypes.Result) error) error {
}

func (f *fakePrimitive) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
f.log = append(f.log, fmt.Sprintf("GetFields %v", printBindVars(bindVars)))
f.log = append(f.log, fmt.Sprintf("GetFields %v", deprecatedPrintBindVars(bindVars)))
return f.TryExecute(ctx, vcursor, bindVars, true /* wantfields */)
}

Expand Down
47 changes: 43 additions & 4 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func (f *loggingVCursor) Execute(ctx context.Context, method string, query strin
case vtgatepb.CommitOrder_AUTOCOMMIT:
name = "ExecuteAutocommit"
}
f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, printBindVars(bindvars), rollbackOnError))
f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, deprecatedPrintBindVars(bindvars), rollbackOnError))
return f.nextResult()
}

Expand All @@ -621,7 +621,7 @@ func (f *loggingVCursor) AutocommitApproval() bool {
}

func (f *loggingVCursor) ExecuteStandalone(ctx context.Context, _ Primitive, query string, bindvars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard, fetchLastInsertID bool) (*sqltypes.Result, error) {
f.log = append(f.log, fmt.Sprintf("ExecuteStandalone %s %v %s %s", query, printBindVars(bindvars), rs.Target.Keyspace, rs.Target.Shard))
f.log = append(f.log, fmt.Sprintf("ExecuteStandalone %s %v %s %s", query, deprecatedPrintBindVars(bindvars), rs.Target.Keyspace, rs.Target.Shard))
return f.nextResult()
}

Expand Down Expand Up @@ -943,6 +943,24 @@ func expectResultAnyOrder(t *testing.T, result, want *sqltypes.Result) {
}
}

// deprecatedPrintBindVars does not print bind variables, specifically tuples, correctly.
// We should use printBindVars instead.
func deprecatedPrintBindVars(bindvars map[string]*querypb.BindVariable) string {
var keys []string
for k := range bindvars {
keys = append(keys, k)
}
sort.Strings(keys)
buf := &bytes.Buffer{}
for i, k := range keys {
if i > 0 {
fmt.Fprintf(buf, " ")
}
fmt.Fprintf(buf, "%s: %v", k, bindvars[k])
}
return buf.String()
}

func printBindVars(bindvars map[string]*querypb.BindVariable) string {
var keys []string
for k := range bindvars {
Expand All @@ -954,6 +972,27 @@ func printBindVars(bindvars map[string]*querypb.BindVariable) string {
if i > 0 {
fmt.Fprintf(buf, " ")
}

if bindvars[k].Type == querypb.Type_TUPLE {
fmt.Fprintf(buf, "%s: [", k)
for _, val := range bindvars[k].Values {
if val.Type != querypb.Type_TUPLE {
fmt.Fprintf(buf, "[%s]", val.String())
continue
}
var s []string
v := sqltypes.ProtoToValue(val)
err := v.ForEachValue(func(bv sqltypes.Value) {
s = append(s, bv.String())
})
if err != nil {
panic(err)
}
fmt.Fprintf(buf, "[%s]", strings.Join(s, " "))
}
fmt.Fprintf(buf, "]")
continue
}
fmt.Fprintf(buf, "%s: %v", k, bindvars[k])
}
return buf.String()
Expand All @@ -962,15 +1001,15 @@ func printBindVars(bindvars map[string]*querypb.BindVariable) string {
func printResolvedShardQueries(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery) string {
buf := &bytes.Buffer{}
for i, rs := range rss {
fmt.Fprintf(buf, "%s.%s: %s {%s} ", rs.Target.Keyspace, rs.Target.Shard, queries[i].Sql, printBindVars(queries[i].BindVariables))
fmt.Fprintf(buf, "%s.%s: %s {%s} ", rs.Target.Keyspace, rs.Target.Shard, queries[i].Sql, deprecatedPrintBindVars(queries[i].BindVariables))
}
return buf.String()
}

func printResolvedShardsBindVars(rss []*srvtopo.ResolvedShard, bvs []map[string]*querypb.BindVariable) string {
buf := &bytes.Buffer{}
for i, rs := range rss {
fmt.Fprintf(buf, "%s.%s: {%v} ", rs.Target.Keyspace, rs.Target.Shard, printBindVars(bvs[i]))
fmt.Fprintf(buf, "%s.%s: {%v} ", rs.Target.Keyspace, rs.Target.Shard, deprecatedPrintBindVars(bvs[i]))
}
return buf.String()
}
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/fk_cascade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestDeleteCascade(t *testing.T) {
Parent: parentP,
}

vc := newDMLTestVCursor("0")
vc := newTestVCursor("0")
vc.results = []*sqltypes.Result{fakeRes}
_, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true)
require.NoError(t, err)
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestUpdateCascade(t *testing.T) {
Parent: parentP,
}

vc := newDMLTestVCursor("0")
vc := newTestVCursor("0")
vc.results = []*sqltypes.Result{fakeRes}
_, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true)
require.NoError(t, err)
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestNonLiteralUpdateCascade(t *testing.T) {
Parent: parentP,
}

vc := newDMLTestVCursor("0")
vc := newTestVCursor("0")
vc.results = []*sqltypes.Result{fakeRes}
_, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 965253a

Please sign in to comment.