diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 23412990b0a..e4ca72897be 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -1420,6 +1420,41 @@ func (cached *VStream) CachedSize(alloc bool) int64 { size += hack.RuntimeAllocSize(int64(len(cached.Position))) return size } +func (cached *ValuesJoin) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(128) + } + // field Left vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Left.(cachedObject); ok { + size += cc.CachedSize(true) + } + // field Right vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Right.(cachedObject); ok { + size += cc.CachedSize(true) + } + // field Vars []int + { + size += hack.RuntimeAllocSize(int64(cap(cached.Vars)) * int64(8)) + } + // field RowConstructorArg string + size += hack.RuntimeAllocSize(int64(len(cached.RowConstructorArg))) + // field Cols []int + { + size += hack.RuntimeAllocSize(int64(cap(cached.Cols)) * int64(8)) + } + // field ColNames []string + { + size += hack.RuntimeAllocSize(int64(cap(cached.ColNames)) * int64(16)) + for _, elem := range cached.ColNames { + size += hack.RuntimeAllocSize(int64(len(elem))) + } + } + return size +} func (cached *Verify) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) diff --git a/go/vt/vtgate/engine/delete_test.go b/go/vt/vtgate/engine/delete_test.go index 18dcef5cbe4..56d3467aac3 100644 --- a/go/vt/vtgate/engine/delete_test.go +++ b/go/vt/vtgate/engine/delete_test.go @@ -45,7 +45,7 @@ func TestDeleteUnsharded(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -80,7 +80,7 @@ func TestDeleteEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -112,7 +112,7 @@ func TestDeleteEqualMultiCol(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -148,7 +148,7 @@ func TestDeleteEqualNoRoute(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -181,7 +181,7 @@ func TestDeleteEqualNoScatter(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.EqualError(t, err, "cannot map vindex to unique keyspace id: DestinationKeyRange(-)") } @@ -213,7 +213,7 @@ func TestDeleteOwnedVindex(t *testing.T) { "1|4|5|6", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -231,7 +231,7 @@ func TestDeleteOwnedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -252,7 +252,7 @@ func TestDeleteOwnedVindex(t *testing.T) { "1|4|5|6", "1|7|8|9", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -300,7 +300,7 @@ func TestDeleteOwnedVindexMultiCol(t *testing.T) { "1|2|4", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -371,7 +371,7 @@ func TestDeleteSharded(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -399,7 +399,7 @@ func TestDeleteShardedStreaming(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") err := del.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { return nil }) @@ -435,7 +435,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) { "1|4|5|6", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -453,7 +453,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -475,7 +475,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) { "1|4|5|6", "1|7|8|9", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -528,7 +528,7 @@ func TestDeleteInChangedVindexMultiCol(t *testing.T) { "1|3|6", "2|3|7", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -565,7 +565,7 @@ func TestDeleteEqualSubshard(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -602,7 +602,7 @@ func TestDeleteMultiEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -635,7 +635,7 @@ func TestDeleteInUnique(t *testing.T) { Type: querypb.Type_TUPLE, Values: append([]*querypb.Value{sqltypes.ValueToProto(sqltypes.NewInt64(1))}, sqltypes.ValueToProto(sqltypes.NewInt64(2)), sqltypes.ValueToProto(sqltypes.NewInt64(4))), } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{"__vals": tupleBV}, false) require.NoError(t, err) diff --git a/go/vt/vtgate/engine/dml_with_input_test.go b/go/vt/vtgate/engine/dml_with_input_test.go index 6fcf2040dfc..38d9068b433 100644 --- a/go/vt/vtgate/engine/dml_with_input_test.go +++ b/go/vt/vtgate/engine/dml_with_input_test.go @@ -51,7 +51,7 @@ func TestDeleteWithInputSingleOffset(t *testing.T) { OutputCols: [][]int{{0}}, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -95,7 +95,7 @@ func TestDeleteWithInputMultiOffset(t *testing.T) { OutputCols: [][]int{{1, 0}}, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -160,7 +160,7 @@ func TestDeleteWithMultiTarget(t *testing.T) { OutputCols: [][]int{{0}, {1, 2}}, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := del.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -210,7 +210,7 @@ func TestUpdateWithInputNonLiteral(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = []*sqltypes.Result{ {RowsAffected: 1}, {RowsAffected: 1}, {RowsAffected: 1}, } diff --git a/go/vt/vtgate/engine/fake_primitive_test.go b/go/vt/vtgate/engine/fake_primitive_test.go index f3ab5ad5336..bddbca87664 100644 --- a/go/vt/vtgate/engine/fake_primitive_test.go +++ b/go/vt/vtgate/engine/fake_primitive_test.go @@ -46,6 +46,8 @@ type fakePrimitive struct { allResultsInOneCall bool async bool + + useNewPrintBindVars bool } func (f *fakePrimitive) Inputs() ([]Primitive, []map[string]any) { @@ -72,7 +74,12 @@ func (f *fakePrimitive) GetTableName() string { } func (f *fakePrimitive) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { - f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields)) + if f.useNewPrintBindVars { + f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields)) + } else { + f.log = append(f.log, fmt.Sprintf("Execute %v %v", deprecatedPrintBindVars(bindVars), wantfields)) + } + if f.results == nil { return nil, f.sendErr } @@ -87,7 +94,7 @@ func (f *fakePrimitive) TryExecute(ctx context.Context, vcursor VCursor, bindVar func (f *fakePrimitive) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { if !f.noLog { - f.log = append(f.log, fmt.Sprintf("StreamExecute %v %v", printBindVars(bindVars), wantfields)) + f.log = append(f.log, fmt.Sprintf("StreamExecute %v %v", deprecatedPrintBindVars(bindVars), wantfields)) } if f.results == nil { return f.sendErr @@ -171,7 +178,7 @@ func (f *fakePrimitive) asyncCall(callback func(*sqltypes.Result) error) error { } func (f *fakePrimitive) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { - f.log = append(f.log, fmt.Sprintf("GetFields %v", printBindVars(bindVars))) + f.log = append(f.log, fmt.Sprintf("GetFields %v", deprecatedPrintBindVars(bindVars))) return f.TryExecute(ctx, vcursor, bindVars, true /* wantfields */) } diff --git a/go/vt/vtgate/engine/fake_vcursor_test.go b/go/vt/vtgate/engine/fake_vcursor_test.go index b277b018faa..3b04ce3aa63 100644 --- a/go/vt/vtgate/engine/fake_vcursor_test.go +++ b/go/vt/vtgate/engine/fake_vcursor_test.go @@ -597,7 +597,7 @@ func (f *loggingVCursor) Execute(ctx context.Context, method string, query strin case vtgatepb.CommitOrder_AUTOCOMMIT: name = "ExecuteAutocommit" } - f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, printBindVars(bindvars), rollbackOnError)) + f.log = append(f.log, fmt.Sprintf("%s %s %v %v", name, query, deprecatedPrintBindVars(bindvars), rollbackOnError)) return f.nextResult() } @@ -621,7 +621,7 @@ func (f *loggingVCursor) AutocommitApproval() bool { } func (f *loggingVCursor) ExecuteStandalone(ctx context.Context, _ Primitive, query string, bindvars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard, fetchLastInsertID bool) (*sqltypes.Result, error) { - f.log = append(f.log, fmt.Sprintf("ExecuteStandalone %s %v %s %s", query, printBindVars(bindvars), rs.Target.Keyspace, rs.Target.Shard)) + f.log = append(f.log, fmt.Sprintf("ExecuteStandalone %s %v %s %s", query, deprecatedPrintBindVars(bindvars), rs.Target.Keyspace, rs.Target.Shard)) return f.nextResult() } @@ -943,6 +943,24 @@ func expectResultAnyOrder(t *testing.T, result, want *sqltypes.Result) { } } +// deprecatedPrintBindVars does not print bind variables, specifically tuples, correctly. +// We should use printBindVars instead. +func deprecatedPrintBindVars(bindvars map[string]*querypb.BindVariable) string { + var keys []string + for k := range bindvars { + keys = append(keys, k) + } + sort.Strings(keys) + buf := &bytes.Buffer{} + for i, k := range keys { + if i > 0 { + fmt.Fprintf(buf, " ") + } + fmt.Fprintf(buf, "%s: %v", k, bindvars[k]) + } + return buf.String() +} + func printBindVars(bindvars map[string]*querypb.BindVariable) string { var keys []string for k := range bindvars { @@ -954,6 +972,27 @@ func printBindVars(bindvars map[string]*querypb.BindVariable) string { if i > 0 { fmt.Fprintf(buf, " ") } + + if bindvars[k].Type == querypb.Type_TUPLE { + fmt.Fprintf(buf, "%s: [", k) + for _, val := range bindvars[k].Values { + if val.Type != querypb.Type_TUPLE { + fmt.Fprintf(buf, "[%s]", val.String()) + continue + } + var s []string + v := sqltypes.ProtoToValue(val) + err := v.ForEachValue(func(bv sqltypes.Value) { + s = append(s, bv.String()) + }) + if err != nil { + panic(err) + } + fmt.Fprintf(buf, "[%s]", strings.Join(s, " ")) + } + fmt.Fprintf(buf, "]") + continue + } fmt.Fprintf(buf, "%s: %v", k, bindvars[k]) } return buf.String() @@ -962,7 +1001,7 @@ func printBindVars(bindvars map[string]*querypb.BindVariable) string { func printResolvedShardQueries(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery) string { buf := &bytes.Buffer{} for i, rs := range rss { - fmt.Fprintf(buf, "%s.%s: %s {%s} ", rs.Target.Keyspace, rs.Target.Shard, queries[i].Sql, printBindVars(queries[i].BindVariables)) + fmt.Fprintf(buf, "%s.%s: %s {%s} ", rs.Target.Keyspace, rs.Target.Shard, queries[i].Sql, deprecatedPrintBindVars(queries[i].BindVariables)) } return buf.String() } @@ -970,7 +1009,7 @@ func printResolvedShardQueries(rss []*srvtopo.ResolvedShard, queries []*querypb. func printResolvedShardsBindVars(rss []*srvtopo.ResolvedShard, bvs []map[string]*querypb.BindVariable) string { buf := &bytes.Buffer{} for i, rs := range rss { - fmt.Fprintf(buf, "%s.%s: {%v} ", rs.Target.Keyspace, rs.Target.Shard, printBindVars(bvs[i])) + fmt.Fprintf(buf, "%s.%s: {%v} ", rs.Target.Keyspace, rs.Target.Shard, deprecatedPrintBindVars(bvs[i])) } return buf.String() } diff --git a/go/vt/vtgate/engine/fk_cascade_test.go b/go/vt/vtgate/engine/fk_cascade_test.go index 942fe44a709..c93e487067b 100644 --- a/go/vt/vtgate/engine/fk_cascade_test.go +++ b/go/vt/vtgate/engine/fk_cascade_test.go @@ -62,7 +62,7 @@ func TestDeleteCascade(t *testing.T) { Parent: parentP, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) @@ -123,7 +123,7 @@ func TestUpdateCascade(t *testing.T) { Parent: parentP, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) @@ -195,7 +195,7 @@ func TestNonLiteralUpdateCascade(t *testing.T) { Parent: parentP, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) diff --git a/go/vt/vtgate/engine/fk_verify_test.go b/go/vt/vtgate/engine/fk_verify_test.go index 5c9ff83c2ec..465dd81d3b2 100644 --- a/go/vt/vtgate/engine/fk_verify_test.go +++ b/go/vt/vtgate/engine/fk_verify_test.go @@ -58,7 +58,7 @@ func TestFKVerifyUpdate(t *testing.T) { t.Run("foreign key verification success", func(t *testing.T) { fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64")) - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.NoError(t, err) @@ -83,7 +83,7 @@ func TestFKVerifyUpdate(t *testing.T) { t.Run("parent foreign key verification failure", func(t *testing.T) { // No results from select, should cause the foreign key verification to fail. fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1", "1", "1") - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.ErrorContains(t, err, "Cannot add or update a child row: a foreign key constraint fails") @@ -105,7 +105,7 @@ func TestFKVerifyUpdate(t *testing.T) { t.Run("child foreign key verification failure", func(t *testing.T) { // No results from select, should cause the foreign key verification to fail. fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("1", "int64"), "1", "1", "1") - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{fakeRes} _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) require.ErrorContains(t, err, "Cannot delete or update a parent row: a foreign key constraint fails") diff --git a/go/vt/vtgate/engine/insert_test.go b/go/vt/vtgate/engine/insert_test.go index 118a2ab0458..46cf8100709 100644 --- a/go/vt/vtgate/engine/insert_test.go +++ b/go/vt/vtgate/engine/insert_test.go @@ -42,7 +42,7 @@ func TestInsertUnsharded(t *testing.T) { "dummy_insert", ) - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{{ InsertID: 4, }} @@ -91,7 +91,7 @@ func TestInsertUnshardedGenerate(t *testing.T) { ), } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -144,7 +144,7 @@ func TestInsertUnshardedGenerate_Zeros(t *testing.T) { ), } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -215,7 +215,7 @@ func TestInsertShardedSimple(t *testing.T) { }, nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -254,7 +254,7 @@ func TestInsertShardedSimple(t *testing.T) { }, nil, ) - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -297,7 +297,7 @@ func TestInsertShardedSimple(t *testing.T) { ) ins.MultiShardAutocommit = true - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -362,7 +362,7 @@ func TestInsertShardWithONDuplicateKey(t *testing.T) { &sqlparser.UpdateExpr{Name: sqlparser.NewColName("suffix1"), Expr: sqlparser.NewTypedArgument("_id_0", sqltypes.Int64)}, &sqlparser.UpdateExpr{Name: sqlparser.NewColName("suffix2"), Expr: funcExpr}}, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{ @@ -408,7 +408,7 @@ func TestInsertShardWithONDuplicateKey(t *testing.T) { &sqlparser.UpdateExpr{Name: sqlparser.NewColName("suffix"), Expr: &sqlparser.Argument{Name: "_id_0", Type: sqltypes.Int64}}, }, ) - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -453,7 +453,7 @@ func TestInsertShardWithONDuplicateKey(t *testing.T) { ) ins.MultiShardAutocommit = true - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err = ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -586,7 +586,7 @@ func TestInsertShardedGenerate(t *testing.T) { ), } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( @@ -711,7 +711,7 @@ func TestInsertShardedOwned(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -803,7 +803,7 @@ func TestInsertShardedOwnedWithNull(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -889,7 +889,7 @@ func TestInsertShardedGeo(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20"} _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -1025,7 +1025,7 @@ func TestInsertShardedIgnoreOwned(t *testing.T) { "\x00", ) noresult := &sqltypes.Result{} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20"} vc.results = []*sqltypes.Result{ // primary vindex lookups: fail row 2. @@ -1143,7 +1143,7 @@ func TestInsertShardedIgnoreOwnedWithNull(t *testing.T) { ), "\x00", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} vc.results = []*sqltypes.Result{ ksid0, @@ -1263,7 +1263,7 @@ func TestInsertShardedUnownedVerify(t *testing.T) { "1", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ nonemptyResult, @@ -1377,7 +1377,7 @@ func TestInsertShardedIgnoreUnownedVerify(t *testing.T) { "1", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20"} vc.results = []*sqltypes.Result{ nonemptyResult, @@ -1468,7 +1468,7 @@ func TestInsertShardedIgnoreUnownedVerifyFail(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.EqualError(t, err, `values [[INT64(2)]] for column [c3] does not map to keyspace ids`) @@ -1574,7 +1574,7 @@ func TestInsertShardedUnownedReverseMap(t *testing.T) { "1", ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ nonemptyResult, @@ -1659,7 +1659,7 @@ func TestInsertShardedUnownedReverseMapSuccess(t *testing.T) { nil, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := ins.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -1690,7 +1690,7 @@ func TestInsertSelectSimple(t *testing.T) { Keyspace: ks.Keyspace}} ins := newInsertSelect(false, ks.Keyspace, ks.Tables["t1"], "prefix ", nil, [][]int{{1}}, rb) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( @@ -1783,7 +1783,7 @@ func TestInsertSelectOwned(t *testing.T) { rb, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult( @@ -1890,7 +1890,7 @@ func TestInsertSelectGenerate(t *testing.T) { Offset: 1, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -1983,7 +1983,7 @@ func TestStreamingInsertSelectGenerate(t *testing.T) { Offset: 1, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -2078,7 +2078,7 @@ func TestInsertSelectGenerateNotProvided(t *testing.T) { Offset: 2, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -2165,7 +2165,7 @@ func TestStreamingInsertSelectGenerateNotProvided(t *testing.T) { Offset: 2, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ // This is the result from the input SELECT @@ -2254,7 +2254,7 @@ func TestInsertSelectUnowned(t *testing.T) { rb, ) - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"20-", "-20", "20-"} vc.results = []*sqltypes.Result{ sqltypes.MakeTestResult(sqltypes.MakeTestFields("id", "int64"), "1", "3", "2"), diff --git a/go/vt/vtgate/engine/join.go b/go/vt/vtgate/engine/join.go index 51976396cba..8134d78ff4a 100644 --- a/go/vt/vtgate/engine/join.go +++ b/go/vt/vtgate/engine/join.go @@ -220,10 +220,10 @@ func joinFields(lfields, rfields []*querypb.Field, cols []int) []*querypb.Field fields := make([]*querypb.Field, len(cols)) for i, index := range cols { if index < 0 { - fields[i] = lfields[-index-1] + fields[i] = lfields[-index-1].CloneVT() continue } - fields[i] = rfields[index-1] + fields[i] = rfields[index-1].CloneVT() } return fields } diff --git a/go/vt/vtgate/engine/join_values.go b/go/vt/vtgate/engine/join_values.go new file mode 100644 index 00000000000..7b4fc19e908 --- /dev/null +++ b/go/vt/vtgate/engine/join_values.go @@ -0,0 +1,150 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vterrors" +) + +var _ Primitive = (*ValuesJoin)(nil) + +// ValuesJoin is a primitive that joins two primitives by constructing a table from the rows of the LHS primitive. +// The table is passed in as a bind variable to the RHS primitive. +// It's called ValuesJoin because the LHS of the join is sent to the RHS as a VALUES clause. +type ValuesJoin struct { + // Left and Right are the LHS and RHS primitives + // of the Join. They can be any primitive. + Left, Right Primitive + + Vars []int + RowConstructorArg string + Cols []int + ColNames []string +} + +// TryExecute performs a non-streaming exec. +func (jv *ValuesJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + lresult, err := vcursor.ExecutePrimitive(ctx, jv.Left, bindVars, wantfields) + if err != nil { + return nil, err + } + bv := &querypb.BindVariable{ + Type: querypb.Type_TUPLE, + } + if len(lresult.Rows) == 0 && wantfields { + // If there are no rows, we still need to construct a single row + // to send down to RHS for Values Table to execute correctly. + // It will be used to execute the field query to provide the output fields. + var vals []sqltypes.Value + for _, field := range lresult.Fields { + val, _ := sqltypes.NewValue(field.Type, nil) + vals = append(vals, val) + } + bv.Values = append(bv.Values, sqltypes.TupleToProto(vals)) + + bindVars[jv.RowConstructorArg] = bv + return jv.Right.GetFields(ctx, vcursor, bindVars) + } + + for i, row := range lresult.Rows { + newRow := make(sqltypes.Row, 0, len(jv.Vars)+1) // +1 since we always add the row ID + newRow = append(newRow, sqltypes.NewInt64(int64(i))) // Adding the LHS row ID + + for _, loffset := range jv.Vars { + newRow = append(newRow, row[loffset]) + } + + bv.Values = append(bv.Values, sqltypes.TupleToProto(newRow)) + } + + bindVars[jv.RowConstructorArg] = bv + rresult, err := vcursor.ExecutePrimitive(ctx, jv.Right, bindVars, wantfields) + if err != nil { + return nil, err + } + + result := &sqltypes.Result{} + + result.Fields = joinFields(lresult.Fields, rresult.Fields, jv.Cols) + for i := range result.Fields { + result.Fields[i].Name = jv.ColNames[i] + } + + for _, rrow := range rresult.Rows { + lhsRowID, err := rrow[len(rrow)-1].ToCastInt64() + if err != nil { + return nil, vterrors.VT13001("values joins cannot fetch lhs row ID: " + err.Error()) + } + + result.Rows = append(result.Rows, joinRows(lresult.Rows[lhsRowID], rrow, jv.Cols)) + } + + return result, nil +} + +// TryStreamExecute performs a streaming exec. +func (jv *ValuesJoin) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + panic("implement me") +} + +// GetFields fetches the field info. +func (jv *ValuesJoin) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return jv.Right.GetFields(ctx, vcursor, bindVars) +} + +// Inputs returns the input primitives for this join +func (jv *ValuesJoin) Inputs() ([]Primitive, []map[string]any) { + return []Primitive{jv.Left, jv.Right}, nil +} + +// RouteType returns a description of the query routing type used by the primitive +func (jv *ValuesJoin) RouteType() string { + return "ValuesJoin" +} + +// GetKeyspaceName specifies the Keyspace that this primitive routes to. +func (jv *ValuesJoin) GetKeyspaceName() string { + if jv.Left.GetKeyspaceName() == jv.Right.GetKeyspaceName() { + return jv.Left.GetKeyspaceName() + } + return jv.Left.GetKeyspaceName() + "_" + jv.Right.GetKeyspaceName() +} + +// GetTableName specifies the table that this primitive routes to. +func (jv *ValuesJoin) GetTableName() string { + return jv.Left.GetTableName() + "_" + jv.Right.GetTableName() +} + +// NeedsTransaction implements the Primitive interface +func (jv *ValuesJoin) NeedsTransaction() bool { + return jv.Right.NeedsTransaction() || jv.Left.NeedsTransaction() +} + +func (jv *ValuesJoin) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: "Join", + Variant: "Values", + Other: map[string]any{ + "ValuesArg": jv.RowConstructorArg, + "Vars": jv.Vars, + }, + } +} diff --git a/go/vt/vtgate/engine/join_values_test.go b/go/vt/vtgate/engine/join_values_test.go new file mode 100644 index 00000000000..068259a4e3e --- /dev/null +++ b/go/vt/vtgate/engine/join_values_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" +) + +func TestJoinValuesExecute(t *testing.T) { + + /* + select col1, col2, col3, col4, col5, col6 from left join right on left.col1 = right.col4 + LHS: select col1, col2, col3 from left + RHS: select col5, col6, id from (values row(1,2), ...) left(id,col1) join right on left.col1 = right.col4 + */ + + leftPrim := &fakePrimitive{ + useNewPrintBindVars: true, + results: []*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "col1|col2|col3", + "int64|varchar|varchar", + ), + "1|a|aa", + "2|b|bb", + "3|c|cc", + "4|d|dd", + ), + }, + } + rightPrim := &fakePrimitive{ + useNewPrintBindVars: true, + results: []*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "col5|col6|id", + "varchar|varchar|int64", + ), + "d|dd|0", + "e|ee|1", + "f|ff|2", + "g|gg|3", + ), + }, + } + + bv := map[string]*querypb.BindVariable{ + "a": sqltypes.Int64BindVariable(10), + } + + vjn := &ValuesJoin{ + Left: leftPrim, + Right: rightPrim, + Vars: []int{0}, + RowConstructorArg: "v", + Cols: []int{-1, -2, -3, -1, 1, 2}, + ColNames: []string{"col1", "col2", "col3", "col4", "col5", "col6"}, + } + + r, err := vjn.TryExecute(context.Background(), &noopVCursor{}, bv, true) + require.NoError(t, err) + leftPrim.ExpectLog(t, []string{ + `Execute a: type:INT64 value:"10" true`, + }) + rightPrim.ExpectLog(t, []string{ + `Execute a: type:INT64 value:"10" v: [[INT64(0) INT64(1)][INT64(1) INT64(2)][INT64(2) INT64(3)][INT64(3) INT64(4)]] true`, + }) + + result := sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "col1|col2|col3|col4|col5|col6", + "int64|varchar|varchar|int64|varchar|varchar", + ), + "1|a|aa|1|d|dd", + "2|b|bb|2|e|ee", + "3|c|cc|3|f|ff", + "4|d|dd|4|g|gg", + ) + expectResult(t, r, result) +} diff --git a/go/vt/vtgate/engine/routing.go b/go/vt/vtgate/engine/routing.go index 067278c1a93..dd6143f6aa4 100644 --- a/go/vt/vtgate/engine/routing.go +++ b/go/vt/vtgate/engine/routing.go @@ -431,6 +431,7 @@ func (rp *RoutingParameters) multiEqual(ctx context.Context, vcursor VCursor, bi if err != nil { return nil, nil, err } + multiBindVars := make([]map[string]*querypb.BindVariable, len(rss)) for i := range multiBindVars { multiBindVars[i] = bindVars @@ -480,7 +481,13 @@ func setReplaceSchemaName(bindVars map[string]*querypb.BindVariable) { bindVars[sqltypes.BvReplaceSchemaName] = sqltypes.Int64BindVariable(1) } -func resolveShards(ctx context.Context, vcursor VCursor, vindex vindexes.SingleColumn, keyspace *vindexes.Keyspace, vindexKeys []sqltypes.Value) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) { +func resolveShards( + ctx context.Context, + vcursor VCursor, + vindex vindexes.SingleColumn, + keyspace *vindexes.Keyspace, + vindexKeys []sqltypes.Value, +) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error) { // Convert vindexKeys to []*querypb.Value ids := make([]*querypb.Value, len(vindexKeys)) for i, vik := range vindexKeys { diff --git a/go/vt/vtgate/engine/routing_parameter_test.go b/go/vt/vtgate/engine/routing_parameter_test.go new file mode 100644 index 00000000000..596a2f7f424 --- /dev/null +++ b/go/vt/vtgate/engine/routing_parameter_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +func TestFindRouteValuesJoin(t *testing.T) { + vindex, err := vindexes.CreateVindex("hash", "", nil) + require.NoError(t, err) + + const valueBvName = "v" + rp := &RoutingParameters{ + Opcode: MultiEqual, + + Keyspace: &vindexes.Keyspace{ + Name: "ks", + Sharded: true, + }, + + Vindex: vindex, + + Values: []evalengine.Expr{ + &evalengine.TupleBindVariable{Key: valueBvName, Index: 0, Collation: collations.Unknown}, + }, + } + + bv := &querypb.BindVariable{ + Type: querypb.Type_TUPLE, + Values: []*querypb.Value{ + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewVarBinary("hello")}), + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(2), sqltypes.NewVarBinary("good morning")}), + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(3), sqltypes.NewVarBinary("bonjour")}), + sqltypes.TupleToProto([]sqltypes.Value{sqltypes.NewInt64(4), sqltypes.NewVarBinary("bonjour")}), + }, + } + + vc := newTestVCursor("-20", "20-") + vc.shardForKsid = []string{"-20", "-20", "20-", "20-"} + rss, bvs, err := rp.findRoute(context.Background(), vc, map[string]*querypb.BindVariable{ + valueBvName: bv, + }) + + require.NoError(t, err) + require.Len(t, rss, 2) + require.Len(t, bvs, 2) +} diff --git a/go/vt/vtgate/engine/update_test.go b/go/vt/vtgate/engine/update_test.go index eb6af5a5299..e29ffeccd6f 100644 --- a/go/vt/vtgate/engine/update_test.go +++ b/go/vt/vtgate/engine/update_test.go @@ -50,7 +50,7 @@ func TestUpdateUnsharded(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -85,7 +85,7 @@ func TestUpdateEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -116,7 +116,7 @@ func TestUpdateEqualMultiCol(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -142,7 +142,7 @@ func TestUpdateScatter(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestUpdateScatter(t *testing.T) { }, } - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -199,7 +199,7 @@ func TestUpdateEqualNoRoute(t *testing.T) { }, } - vc := newDMLTestVCursor("0") + vc := newTestVCursor("0") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -250,7 +250,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { ), "1|4|5|6|0|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -272,7 +272,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -294,7 +294,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { "1|4|5|6|0|0", "1|7|8|9|0|0", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -330,7 +330,7 @@ func TestUpdateEqualChangedVindex(t *testing.T) { "1|4|5|6|0|1", // twocol changes "1|7|8|9|1|0", // onecol changes )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -387,7 +387,7 @@ func TestUpdateEqualMultiColChangedVindex(t *testing.T) { ), "1|2|4|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -514,7 +514,7 @@ func TestUpdateScatterChangedVindex(t *testing.T) { ), "1|4|5|6|0|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -534,7 +534,7 @@ func TestUpdateScatterChangedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) if err != nil { @@ -558,7 +558,7 @@ func TestUpdateScatterChangedVindex(t *testing.T) { "1|4|5|6|0|0", "1|7|8|9|0|0", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -604,7 +604,7 @@ func TestUpdateIn(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -628,7 +628,7 @@ func TestUpdateInStreamExecute(t *testing.T) { Query: "dummy_update", }} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") err := upd.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false, func(result *sqltypes.Result) error { return nil }) @@ -655,7 +655,7 @@ func TestUpdateInMultiCol(t *testing.T) { Query: "dummy_update", }} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) vc.ExpectLog(t, []string{ @@ -710,7 +710,7 @@ func TestUpdateInChangedVindex(t *testing.T) { "1|4|5|6|0|0", "2|21|22|23|0|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -738,7 +738,7 @@ func TestUpdateInChangedVindex(t *testing.T) { }) // No rows changing - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -761,7 +761,7 @@ func TestUpdateInChangedVindex(t *testing.T) { "1|7|8|9|0|0", "2|21|22|23|0|0", )} - vc = newDMLTestVCursor("-20", "20-") + vc = newTestVCursor("-20", "20-") vc.results = results _, err = upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -835,7 +835,7 @@ func TestUpdateInChangedVindexMultiCol(t *testing.T) { "1|3|6|0", "2|3|7|0", )} - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.results = results _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) @@ -874,7 +874,7 @@ func TestUpdateEqualSubshard(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -911,7 +911,7 @@ func TestUpdateMultiEqual(t *testing.T) { }, } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) require.NoError(t, err) @@ -944,7 +944,7 @@ func TestUpdateInUnique(t *testing.T) { Type: querypb.Type_TUPLE, Values: append([]*querypb.Value{sqltypes.ValueToProto(sqltypes.NewInt64(1))}, sqltypes.ValueToProto(sqltypes.NewInt64(2)), sqltypes.ValueToProto(sqltypes.NewInt64(4))), } - vc := newDMLTestVCursor("-20", "20-") + vc := newTestVCursor("-20", "20-") vc.shardForKsid = []string{"-20", "20-"} _, err := upd.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{"__vals": tupleBV}, false) require.NoError(t, err) @@ -1033,6 +1033,6 @@ func buildTestVSchema() *vindexes.VSchema { return vs } -func newDMLTestVCursor(shards ...string) *loggingVCursor { +func newTestVCursor(shards ...string) *loggingVCursor { return &loggingVCursor{shards: shards, resolvedTargetTabletType: topodatapb.TabletType_PRIMARY} } diff --git a/go/vt/vtgate/evalengine/eval.go b/go/vt/vtgate/evalengine/eval.go index 916c5e200f4..f75ac0f8202 100644 --- a/go/vt/vtgate/evalengine/eval.go +++ b/go/vt/vtgate/evalengine/eval.go @@ -378,6 +378,16 @@ func valueToEval(value sqltypes.Value, collation collations.TypedCollation, valu } switch tt := value.Type(); { + case tt == sqltypes.Tuple: + t := &evalTuple{} + err := value.ForEachValue(func(bv sqltypes.Value) { + e, err := valueToEval(bv, collation, values) + if err != nil { + return + } + t.t = append(t.t, e) + }) + return t, wrap(err) case sqltypes.IsSigned(tt): ival, err := value.ToInt64() return newEvalInt64(ival), wrap(err) diff --git a/go/vt/vtgate/evalengine/eval_tuple.go b/go/vt/vtgate/evalengine/eval_tuple.go index 81fa3317977..1faff68e155 100644 --- a/go/vt/vtgate/evalengine/eval_tuple.go +++ b/go/vt/vtgate/evalengine/eval_tuple.go @@ -27,7 +27,15 @@ type evalTuple struct { var _ eval = (*evalTuple)(nil) func (e *evalTuple) ToRawBytes() []byte { - return nil + var vals []sqltypes.Value + for _, e2 := range e.t { + v, err := sqltypes.NewValue(e2.SQLType(), e2.ToRawBytes()) + if err != nil { + panic(err) + } + vals = append(vals, v) + } + return sqltypes.TupleToProto(vals).Value } func (e *evalTuple) SQLType() sqltypes.Type { diff --git a/go/vt/vtgate/evalengine/expr_tuple_bvar.go b/go/vt/vtgate/evalengine/expr_tuple_bvar.go index 14cfbd95a8b..754ed8cf4f8 100644 --- a/go/vt/vtgate/evalengine/expr_tuple_bvar.go +++ b/go/vt/vtgate/evalengine/expr_tuple_bvar.go @@ -30,7 +30,6 @@ type ( Key string Index int - Type sqltypes.Type Collation collations.ID } ) diff --git a/go/vt/vtgate/planbuilder/operators/sharded_routing.go b/go/vt/vtgate/planbuilder/operators/sharded_routing.go index 0cc828a7ae2..891e3cf5862 100644 --- a/go/vt/vtgate/planbuilder/operators/sharded_routing.go +++ b/go/vt/vtgate/planbuilder/operators/sharded_routing.go @@ -613,7 +613,6 @@ func (tr *ShardedRouting) planCompositeInOpArg( Index: idx, } if typ, found := ctx.TypeForExpr(col); found { - value.Type = typ.Type() value.Collation = typ.Collation() }