diff --git a/go/test/endtoend/vtgateproxytest/failure_test.go b/go/test/endtoend/vtgateproxytest/failure_test.go new file mode 100644 index 00000000000..27c50baef6a --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/failure_test.go @@ -0,0 +1,235 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This tests select/insert using the unshared keyspace added in main_test +*/ +package vtgateproxytest + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/log" +) + +func TestVtgateProxyVtgateFailureRoundRobin(t *testing.T) { + defer cluster.PanicHandler(t) + + // insert test value + func() { + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + utils.Exec(t, conn, "insert into customer(id, email) values(1, 'email1')") + }() + + const targetAffinity = "use1-az1" + const vtgateCount = 4 + const vtgateproxyConnections = 4 + + vtgates, err := startAdditionalVtgates(vtgateCount) + if err != nil { + t.Fatal(err) + } + defer teardownVtgates(vtgates) + + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + var config []map[string]string + + for i, vtgate := range vtgates { + config = append(config, map[string]string{ + "host": fmt.Sprintf("vtgate%v", i), + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(vtgate.GrpcPort), + "az_id": targetAffinity, + "type": "pool1", + }) + } + + b, err := json.Marshal(config) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + clusterInstance.TmpDirectory, + vtgateHostsFile, + targetAffinity, + "round_robin", + vtgateproxyConnections, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + conn, err := vtgateproxyProcInstance.GetMySQLConn("pool1", targetAffinity) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + log.Info("Stopping 1 vtgate") + err = vtgates[0].TearDown() + if err != nil { + t.Fatal(err) + } + + log.Info("Reading test value with one stopped vtgate") + for i := 0; i < vtgateproxyConnections; i++ { + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) + } +} + +func TestVtgateProxyVtgateFailureFirstReady(t *testing.T) { + defer cluster.PanicHandler(t) + + // insert test value + func() { + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + utils.Exec(t, conn, "insert into customer(id, email) values(1, 'email1')") + }() + + const targetAffinity = "use1-az1" + const vtgateCount = 4 + const vtgateproxyConnections = 4 + + vtgates, err := startAdditionalVtgates(vtgateCount) + if err != nil { + t.Fatal(err) + } + defer teardownVtgates(vtgates) + + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + var config []map[string]string + + for i, vtgate := range vtgates { + config = append(config, map[string]string{ + "host": fmt.Sprintf("vtgate%v", i), + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(vtgate.GrpcPort), + "az_id": targetAffinity, + "type": "pool1", + }) + } + + b, err := json.Marshal(config) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + clusterInstance.TmpDirectory, + vtgateHostsFile, + targetAffinity, + "first_ready", + vtgateproxyConnections, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + conn, err := vtgateproxyProcInstance.GetMySQLConn("pool1", targetAffinity) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + // First send some queries to the active vtgate + for i := 0; i < 10; i++ { + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) + } + + // Now kill the active vtgate + for i := range vtgates { + queryCount, err := getVtgateQueryCount(vtgates[i]) + if err != nil { + t.Fatal(err) + } + + if queryCount.Sum() > 0 { + err = vtgates[i].TearDown() + if err != nil { + t.Fatal(err) + } + } + } + + log.Info("Reading test value after killing the active vtgate") + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) +} diff --git a/go/test/endtoend/vtgateproxytest/main_test.go b/go/test/endtoend/vtgateproxytest/main_test.go new file mode 100644 index 00000000000..bc470085dd0 --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/main_test.go @@ -0,0 +1,461 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgateproxytest + +import ( + "context" + "database/sql" + "encoding/json" + "flag" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "path" + "reflect" + "strconv" + "strings" + "syscall" + "testing" + "time" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "commerce" + cell = "zone1" + sqlSchema = `create table product( + sku varbinary(128), + description varbinary(128), + price bigint, + primary key(sku) + ) ENGINE=InnoDB; + create table customer( + id bigint not null auto_increment, + email varchar(128), + primary key(id) + ) ENGINE=InnoDB; + create table corder( + order_id bigint not null auto_increment, + customer_id bigint, + sku varbinary(128), + price bigint, + primary key(order_id) + ) ENGINE=InnoDB;` + + vSchema = `{ + "tables": { + "product": {}, + "customer": {}, + "corder": {} + } + }` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, true) + if err != nil { + return 1 + } + + // Start vtgate + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func selectHelper[T any](ctx context.Context, conn *sql.DB, query string) ([]T, error) { + var result []T + + rows, err := conn.QueryContext(ctx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var row T + v := reflect.ValueOf(&row).Elem() + + var fields []any + for i := 0; i < v.NumField(); i++ { + if v.Field(i).CanAddr() { + fields = append(fields, v.Field(i).Addr().Interface()) + } + } + + err = rows.Scan(fields...) + if err != nil { + return nil, err + } + + result = append(result, row) + } + + return result, nil +} + +type customerEntry struct { + ID int + Email string +} + +func NewVtgateProxyProcess(logDir, vtgateHostsFile, affinity, balancerType string, numConnections, httpPort, grpcPort, mySQLPort int) *VtgateProxyProcess { + return &VtgateProxyProcess{ + Name: "vtgateproxy", + Binary: "vtgateproxy", + LogDir: logDir, + VtgateHostsFile: vtgateHostsFile, + BalancerType: balancerType, + AddressField: "address", + PortField: "grpc", + PoolTypeField: "type", + AffinityField: "az_id", + AffinityValue: affinity, + NumConnections: numConnections, + HTTPPort: httpPort, + GrpcPort: grpcPort, + MySQLPort: mySQLPort, + VerifyURL: "http://" + net.JoinHostPort("localhost", strconv.Itoa(httpPort)) + "/debug/vars", + } +} + +type VtgateProxyProcess struct { + Name string + Binary string + VtgateHostsFile string + BalancerType string + AddressField string + PortField string + PoolTypeField string + AffinityField string + AffinityValue string + LogDir string + NumConnections int + HTTPPort int + GrpcPort int + MySQLPort int + ExtraArgs []string + VerifyURL string + + proc *exec.Cmd + exit chan error +} + +func (vt *VtgateProxyProcess) Setup() error { + args := []string{ + "--port", strconv.Itoa(vt.HTTPPort), + "--grpc_port", strconv.Itoa(vt.GrpcPort), + "--mysql_server_port", strconv.Itoa(vt.MySQLPort), + "--vtgate_hosts_file", vt.VtgateHostsFile, + "--balancer", vt.BalancerType, + "--address_field", vt.AddressField, + "--port_field", vt.PortField, + "--pool_type_field", vt.PoolTypeField, + "--affinity_field", vt.AffinityField, + "--affinity_value", vt.AffinityValue, + "--num_connections", strconv.Itoa(vt.NumConnections), + "--log_dir", vt.LogDir, + "--v", "999", + "--mysql_auth_server_impl", "none", + "--alsologtostderr", + "--grpc_prometheus", + "--vtgate_grpc_fail_fast", + } + args = append(args, vt.ExtraArgs...) + + vt.proc = exec.Command( + vt.Binary, + args..., + ) + vt.proc.Env = append(vt.proc.Env, os.Environ()...) + //errFile, _ := os.Create(path.Join(vt.LogDir, "vtgateproxy-stderr.txt")) + //vt.proc.Stderr = errFile + vt.proc.Stderr = os.Stderr + vt.proc.Stdout = os.Stdout + + log.Infof("Running vtgateproxy with command: %v", strings.Join(vt.proc.Args, " ")) + + err := vt.proc.Start() + if err != nil { + return err + } + vt.exit = make(chan error) + go func() { + if vt.proc != nil { + vt.exit <- vt.proc.Wait() + } + }() + + timeout := time.Now().Add(60 * time.Second) + for time.Now().Before(timeout) { + if vt.WaitForStatus() { + return nil + } + select { + case err := <-vt.exit: + return fmt.Errorf("process '%s' exited prematurely (err: %s)", vt.Name, err) + default: + time.Sleep(300 * time.Millisecond) + } + } + + return fmt.Errorf("process '%s' timed out after 60s (err: %s)", vt.Name, <-vt.exit) +} + +// WaitForStatus function checks if vtgateproxy process is up and running +func (vt *VtgateProxyProcess) WaitForStatus() bool { + resp, err := http.Get(vt.VerifyURL) + if err != nil { + return false + } + defer resp.Body.Close() + + return resp.StatusCode == 200 +} + +func (vt *VtgateProxyProcess) Teardown() error { + if err := vt.proc.Process.Kill(); err != nil { + log.Errorf("Failed to kill %v: %v", vt.Name, err) + } + if vt.proc == nil || vt.exit == nil { + return nil + } + vt.proc.Process.Signal(syscall.SIGTERM) + + select { + case <-vt.exit: + vt.proc = nil + return nil + + case <-time.After(30 * time.Second): + vt.proc.Process.Kill() + vt.proc = nil + return <-vt.exit + } +} + +func (vt *VtgateProxyProcess) GetMySQLConn(poolType, affinity string) (*sql.DB, error) { + // Use the go mysql driver since the vitess mysql client does not support + // connectionAttributes. + dsn := fmt.Sprintf("tcp(%v)/ks?connectionAttributes=type:%v,az_id:%v", net.JoinHostPort(clusterInstance.Hostname, strconv.Itoa(vt.MySQLPort)), poolType, affinity) + log.Infof("Using DSN %v", dsn) + + return sql.Open("mysql", dsn) +} + +// WaitForConfig waits until the proxy targets match the config sent to it. +func (vt *VtgateProxyProcess) WaitForConfig(config []map[string]string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + timer := time.NewTicker(100 * time.Millisecond) + defer timer.Stop() + + expect := map[string]int{} + result := map[string]int{} + for _, target := range config { + expect[target["type"]]++ + if expect[target["type"]] > vt.NumConnections { + expect[target["type"]] = vt.NumConnections + } + } + +OUTER: + for { + select { + case <-ctx.Done(): + return fmt.Errorf("targets never updated to match config: %v != %v", result, expect) + case <-timer.C: + } + + result = map[string]int{} + + vars, err := vt.GetVars() + if err != nil { + return err + } + + targets, ok := vars["JsonDiscoveryTargetCount"] + if !ok { + continue OUTER + } + + for k, v := range targets.(map[string]any) { + result[k] = int(v.(float64)) + } + + if len(result) != len(expect) { + continue OUTER + } + + for k, v := range expect { + if result[k] != v { + continue OUTER + } + } + + break OUTER + } + + return nil +} + +// GetVars returns map of vars +func (vt *VtgateProxyProcess) GetVars() (map[string]any, error) { + resultMap := make(map[string]any) + resp, err := http.Get(vt.VerifyURL) + if err != nil { + return nil, fmt.Errorf("error getting response from %s", vt.VerifyURL) + } + defer resp.Body.Close() + + if resp.StatusCode == 200 { + respByte, _ := io.ReadAll(resp.Body) + err := json.Unmarshal(respByte, &resultMap) + if err != nil { + return nil, fmt.Errorf("not able to parse response body") + } + return resultMap, nil + } + return nil, fmt.Errorf("unsuccessful response") +} + +func startAdditionalVtgates(count int) ([]*cluster.VtgateProcess, error) { + var vtgates []*cluster.VtgateProcess + var err error + defer func() { + if err != nil { + teardownVtgates(vtgates) + } + }() + + for i := 0; i < count; i++ { + vtgateInstance := newVtgateInstance(i) + log.Infof("Starting additional vtgate on port %d", vtgateInstance.Port) + if err = vtgateInstance.Setup(); err != nil { + return nil, err + } + + vtgates = append(vtgates, vtgateInstance) + } + + return vtgates, nil +} + +func newVtgateInstance(i int) *cluster.VtgateProcess { + vtgateProcInstance := cluster.VtgateProcessInstance( + clusterInstance.GetAndReservePort(), + clusterInstance.GetAndReservePort(), + clusterInstance.GetAndReservePort(), + clusterInstance.Cell, + clusterInstance.Cell, + clusterInstance.Hostname, + "PRIMARY,REPLICA", + clusterInstance.TopoProcess.Port, + clusterInstance.TmpDirectory, + clusterInstance.VtGateExtraArgs, + clusterInstance.VtGatePlannerVersion, + ) + vtgateProcInstance.MySQLServerSocketPath = path.Join(clusterInstance.TmpDirectory, fmt.Sprintf("mysql%v.sock", i)) + + return vtgateProcInstance +} + +func teardownVtgates(vtgates []*cluster.VtgateProcess) error { + var err error + for _, vtgate := range vtgates { + if vErr := vtgate.TearDown(); vErr != nil { + err = vErr + } + } + + return err +} + +func getVtgateQueryCount(vtgate *cluster.VtgateProcess) (queryCount, error) { + var result queryCount + + vars, err := vtgate.GetVars() + if err != nil { + return result, err + } + + queriesProcessed, ok := vars["QueriesProcessed"] + if !ok { + return result, nil + } + + v := reflect.ValueOf(&result).Elem() + + for k, val := range queriesProcessed.(map[string]any) { + v.FieldByName(k).SetInt(int64(val.(float64))) + } + + return result, err +} + +type queryCount struct { + Begin int + Commit int + Unsharded int + InsertUnsharded int +} + +func (q queryCount) Sum() int { + var result int + v := reflect.ValueOf(q) + for i := 0; i < v.NumField(); i++ { + result += int(v.Field(i).Int()) + } + + return result +} diff --git a/go/test/endtoend/vtgateproxytest/rebalance_test.go b/go/test/endtoend/vtgateproxytest/rebalance_test.go new file mode 100644 index 00000000000..6f3e838bb9b --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/rebalance_test.go @@ -0,0 +1,215 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This tests select/insert using the unshared keyspace added in main_test +*/ +package vtgateproxytest + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" +) + +func TestVtgateProxyRebalanceRoundRobin(t *testing.T) { + testVtgateProxyRebalance(t, "round_robin") +} + +func TestVtgateProxyRebalanceFirstReady(t *testing.T) { + testVtgateProxyRebalance(t, "first_ready") +} + +func testVtgateProxyRebalance(t *testing.T, loadBalancer string) { + defer cluster.PanicHandler(t) + + const targetAffinity = "use1-az1" + const targetPool = "pool1" + const vtgateCount = 10 + const vtgatesInAffinity = 8 + const vtgateproxyConnections = 4 + + vtgates, err := startAdditionalVtgates(vtgateCount) + if err != nil { + t.Fatal(err) + } + defer teardownVtgates(vtgates) + + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + var config []map[string]string + + for i, vtgate := range vtgates { + affinity := targetAffinity + if i >= vtgatesInAffinity { + affinity = "use1-az2" + } + config = append(config, map[string]string{ + "host": fmt.Sprintf("vtgate%v", i), + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(vtgate.GrpcPort), + "az_id": affinity, + "type": targetPool, + }) + } + + vtgateIdx := vtgateproxyConnections + b, err := json.Marshal(config[:vtgateIdx]) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + clusterInstance.TmpDirectory, + vtgateHostsFile, + targetAffinity, + loadBalancer, + vtgateproxyConnections, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + conn, err := vtgateproxyProcInstance.GetMySQLConn(targetPool, targetAffinity) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + log.Info("Inserting test value") + tx, err := conn.BeginTx(context.Background(), nil) + if err != nil { + t.Fatal(err) + } + + _, err = tx.Exec("insert into customer(id, email) values(1, 'email1')") + if err != nil { + t.Fatal(err) + } + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + + log.Info("Reading test value while adding vtgates") + + const totalQueries = 1000 + addVtgateEveryN := totalQueries / len(vtgates) + + for i := 0; i < totalQueries; i++ { + if i%(addVtgateEveryN) == 0 && vtgateIdx <= len(vtgates) { + log.Infof("Adding vtgate %v", vtgateIdx-1) + b, err = json.Marshal(config[:vtgateIdx]) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + if err := vtgateproxyProcInstance.WaitForConfig(config[:vtgateIdx], 5*time.Second); err != nil { + t.Fatal(err) + } + + vtgateIdx++ + } + + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) + } + + // No queries should be sent to vtgates outside target affinity + const expectMaxQueryCountNonAffinity = 0 + + switch loadBalancer { + case "round_robin": + // At least 1 query should be sent to every vtgate matching target + // affinity + const expectMinQueryCountAffinity = 1 + + for i, vtgate := range vtgates { + queryCount, err := getVtgateQueryCount(vtgate) + if err != nil { + t.Fatal(err) + } + + affinity := config[i]["az_id"] + + log.Infof("vtgate %v (%v) query counts: %+v", i, affinity, queryCount) + + if affinity == targetAffinity { + assert.GreaterOrEqual(t, queryCount.Sum(), expectMinQueryCountAffinity, "vtgate %v did not recieve the expected number of queries", i) + } else { + assert.LessOrEqual(t, queryCount.Sum(), expectMaxQueryCountNonAffinity, "vtgate %v recieved more than the expected number of queries", i) + } + } + case "first_ready": + // A single vtgate should become the target, and it should recieve all + // queries + targetVtgate := -1 + + for i, vtgate := range vtgates { + queryCount, err := getVtgateQueryCount(vtgate) + if err != nil { + t.Fatal(err) + } + + affinity := config[i]["az_id"] + + log.Infof("vtgate %v (%v) query counts: %+v", i, affinity, queryCount) + + sum := queryCount.Sum() + if sum == 0 { + continue + } + + if targetVtgate != -1 { + t.Logf("only vtgate %v should have received queries; vtgate %v got %v", targetVtgate, i, sum) + t.Fail() + } else if affinity == targetAffinity { + targetVtgate = i + } else { + assert.LessOrEqual(t, queryCount.Sum(), expectMaxQueryCountNonAffinity, "vtgate %v recieved more than the expected number of queries", i) + } + } + } +} diff --git a/go/test/endtoend/vtgateproxytest/scale_test.go b/go/test/endtoend/vtgateproxytest/scale_test.go new file mode 100644 index 00000000000..f81ad52cbcd --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/scale_test.go @@ -0,0 +1,187 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This tests select/insert using the unshared keyspace added in main_test +*/ +package vtgateproxytest + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/log" +) + +func TestVtgateProxyScaleRoundRobin(t *testing.T) { + testVtgateProxyScale(t, "round_robin") +} + +func TestVtgateProxyScaleFirstReady(t *testing.T) { + testVtgateProxyScale(t, "first_ready") +} + +func testVtgateProxyScale(t *testing.T, loadBalancer string) { + defer cluster.PanicHandler(t) + + // insert test value + func() { + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + utils.Exec(t, conn, "insert into customer(id, email) values(1, 'email1')") + }() + + const targetAffinity = "use1-az1" + const targetPool = "pool1" + const vtgateCount = 5 + const vtgateproxyConnections = 4 + + vtgates, err := startAdditionalVtgates(vtgateCount) + if err != nil { + t.Fatal(err) + } + defer teardownVtgates(vtgates) + + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + var config []map[string]string + + for i, vtgate := range vtgates { + pool := targetPool + if i == 0 { + // First vtgate is in a different pool and should not have any + // queries routed to it. + pool = "pool2" + } + + config = append(config, map[string]string{ + "host": fmt.Sprintf("vtgate%v", i), + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(vtgate.GrpcPort), + "az_id": targetAffinity, + "type": pool, + }) + } + b, err := json.Marshal(config[:1]) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + clusterInstance.TmpDirectory, + vtgateHostsFile, + targetAffinity, + loadBalancer, + vtgateproxyConnections, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + conn, err := vtgateproxyProcInstance.GetMySQLConn(targetPool, targetAffinity) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + log.Info("Reading test value while scaling vtgates") + + // Start with an empty list of vtgates, then scale up, then scale back to + // 0. We should expect to see immediate failure when there are no vtgates, + // then success at each scale, until we hit 0 vtgates again, at which point + // we should fail fast again. + i := 0 + scaleUp := true + for { + t.Logf("writing config file with %v vtgates", i) + b, err = json.Marshal(config[:i]) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + if err := vtgateproxyProcInstance.WaitForConfig(config[:i], 5*time.Second); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + result, err := selectHelper[customerEntry](ctx, conn, "select id, email from customer") + // 0 vtgates should fail + // First vtgate is in the wrong pool, so it should also fail + if i <= 1 { + if err == nil { + t.Fatal("query should have failed with no vtgates") + } + + // In first_ready mode, we expect to fail fast and not time out. + if loadBalancer == "first_ready" && errors.Is(err, context.DeadlineExceeded) { + t.Fatal("query timed out but it should have failed fast") + } + } else if err != nil { + t.Fatalf("%v vtgates were present, but the query still failed: %v", i, err) + } else { + assert.Equal(t, []customerEntry{{1, "email1"}}, result) + } + + if scaleUp { + i++ + if i >= len(config) { + scaleUp = false + i -= 2 + } + + continue + } + + i-- + if i < 0 { + break + } + } +} diff --git a/go/test/endtoend/vtgateproxytest/vtgateproxy_test.go b/go/test/endtoend/vtgateproxytest/vtgateproxy_test.go new file mode 100644 index 00000000000..2f96e74ebf5 --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/vtgateproxy_test.go @@ -0,0 +1,116 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This tests select/insert using the unshared keyspace added in main_test +*/ +package vtgateproxytest + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "strconv" + "testing" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" +) + +func TestVtgateProxyProcessRoundRobin(t *testing.T) { + testVtgateProxyProcess(t, "round_robin") +} + +func TestVtgateProxyProcessFirstReady(t *testing.T) { + testVtgateProxyProcess(t, "first_ready") +} + +func testVtgateProxyProcess(t *testing.T, loadBalancer string) { + defer cluster.PanicHandler(t) + + config := []map[string]string{ + { + "host": "vtgate1", + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(clusterInstance.VtgateProcess.GrpcPort), + "az_id": "use1-az1", + "type": "pool1", + }, + } + b, err := json.Marshal(config) + if err != nil { + t.Fatal(err) + } + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + clusterInstance.TmpDirectory, + vtgateHostsFile, + "use1-az1", + loadBalancer, + 1, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + conn, err := vtgateproxyProcInstance.GetMySQLConn("pool1", "use1-az1") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + log.Info("Inserting test value") + tx, err := conn.BeginTx(context.Background(), nil) + if err != nil { + t.Fatal(err) + } + + _, err = tx.Exec("insert into customer(id, email) values(1, 'email1')") + if err != nil { + t.Fatal(err) + } + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + + log.Info("Reading test value") + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + log.Infof("Read value %v", result) + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) +}