From 98e4a6a73e56a1b1c54e21c3f657166f6da8c307 Mon Sep 17 00:00:00 2001 From: Matthias Crauwels Date: Wed, 11 Oct 2023 10:32:27 +0200 Subject: [PATCH 1/8] add cross cell support --- cmd/internal/planetscale_edge_database.go | 31 +++++++++++++++++++++-- go.mod | 3 ++- go.sum | 4 +++ 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index e031d84..40549c0 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -3,6 +3,7 @@ package internal import ( "context" "fmt" + "golang.org/x/exp/slices" "io" "net/http" "strings" @@ -170,6 +171,25 @@ func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScale return p.Mysql.GetVitessShards(ctx, psc) } +func (p PlanetScaleEdgeDatabase) ListCells(ctx context.Context, psc PlanetScaleSource, tabletType psdbconnect.TabletType) ([]string, error) { + var cells []string + tablets, err := p.Mysql.GetVitessTablets(ctx, psc) + + if err != nil { + return cells, err + } + + for _, vttablet := range tablets { + if strings.EqualFold(vttablet.TabletType, TabletTypeToString(tabletType)) && strings.EqualFold(vttablet.Keyspace, psc.Database) { + if !slices.Contains(cells, vttablet.Cell) { + cells = append(cells, vttablet.Cell) + } + } + } + + return cells, nil +} + // Read streams rows from a table given a starting cursor. // 1. We will get the latest vgtid for a given table in a shard when a sync session starts. // 2. This latest vgtid is now the stopping point for this sync session. @@ -188,6 +208,11 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane tabletType = psdbconnect.TabletType_replica } + cells, err := p.ListCells(ctx, ps, tabletType) + if err != nil { + return currentSerializedCursor, err + } + currentPosition := lastKnownPosition table := s.Stream readDuration := 1 * time.Minute @@ -207,7 +232,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("new rows found, syncing rows for %v", readDuration)) p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"syncing rows with cursor [%v]", currentPosition)) - currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration) + currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, cells, readDuration) if currentPosition.Position != "" { currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition) if sErr != nil { @@ -235,7 +260,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane } } -func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, error) { +func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, cells []string, readDuration time.Duration) (*psdbconnect.TableCursor, error) { defer p.Logger.Flush() ctx, cancel := context.WithTimeout(ctx, readDuration) defer cancel() @@ -276,7 +301,9 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table TableName: s.Name, Cursor: tc, TabletType: tabletType, + Cells: cells, } + p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("DEBUG: SyncRequest.Cells = %v", sReq.GetCells())) c, err := client.Sync(ctx, sReq) if err != nil { diff --git a/go.mod b/go.mod index 960d6cc..339187f 100644 --- a/go.mod +++ b/go.mod @@ -60,8 +60,9 @@ require ( github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect github.com/uber/jaeger-lib v2.4.1+incompatible // indirect go.uber.org/atomic v1.9.0 // indirect + golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/net v0.7.0 // indirect - golang.org/x/sys v0.5.0 // indirect + golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.7.0 // indirect golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect diff --git a/go.sum b/go.sum index 9dfd593..06144a5 100644 --- a/go.sum +++ b/go.sum @@ -889,6 +889,8 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20200901203048-c4f52b2c50aa/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925/go.mod h1:1phAWC201xIgDyaFpmDeZkgf70Q4Pd/CNqfRtVPtxNw= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -1080,6 +1082,8 @@ golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= From 4be4738068b47622357b8663ec5d9f5920407562 Mon Sep 17 00:00:00 2001 From: Matthias Crauwels Date: Wed, 11 Oct 2023 13:39:39 +0200 Subject: [PATCH 2/8] add cells to ALL syncrequests --- cmd/internal/planetscale_edge_database.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 40549c0..6e61e0c 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -171,7 +171,7 @@ func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScale return p.Mysql.GetVitessShards(ctx, psc) } -func (p PlanetScaleEdgeDatabase) ListCells(ctx context.Context, psc PlanetScaleSource, tabletType psdbconnect.TabletType) ([]string, error) { +func (p PlanetScaleEdgeDatabase) ListCells(ctx context.Context, psc PlanetScaleSource) ([]string, error) { var cells []string tablets, err := p.Mysql.GetVitessTablets(ctx, psc) @@ -180,7 +180,7 @@ func (p PlanetScaleEdgeDatabase) ListCells(ctx context.Context, psc PlanetScaleS } for _, vttablet := range tablets { - if strings.EqualFold(vttablet.TabletType, TabletTypeToString(tabletType)) && strings.EqualFold(vttablet.Keyspace, psc.Database) { + if strings.EqualFold(vttablet.Keyspace, psc.Database) { if !slices.Contains(cells, vttablet.Cell) { cells = append(cells, vttablet.Cell) } @@ -208,7 +208,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane tabletType = psdbconnect.TabletType_replica } - cells, err := p.ListCells(ctx, ps, tabletType) + cells, err := p.ListCells(ctx, ps) if err != nil { return currentSerializedCursor, err } @@ -219,7 +219,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard) for { p.Logger.Log(LOGLEVEL_INFO, preamble+"peeking to see if there's any new rows") - latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType) + latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType, cells) if lcErr != nil { return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position") } @@ -356,7 +356,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table } } -func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) { +func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, cells []string) (string, error) { defer p.Logger.Flush() timeout := 45 * time.Second ctx, cancel := context.WithTimeout(ctx, timeout) @@ -395,6 +395,7 @@ func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, sh Position: "current", }, TabletType: tabletType, + Cells: cells, } c, err := client.Sync(ctx, sReq) From 094d8925221dfe82811da739e722a791809936aa Mon Sep 17 00:00:00 2001 From: Matthias Crauwels Date: Wed, 11 Oct 2023 14:03:54 +0200 Subject: [PATCH 3/8] move debug message --- cmd/internal/planetscale_edge_database.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 6e61e0c..4e2d4a8 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -213,6 +213,8 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane return currentSerializedCursor, err } + p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("Syncing from tabletType \"%v\", from cells: %v", TabletTypeToString(tabletType), cells)) + currentPosition := lastKnownPosition table := s.Stream readDuration := 1 * time.Minute @@ -303,7 +305,6 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table TabletType: tabletType, Cells: cells, } - p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("DEBUG: SyncRequest.Cells = %v", sReq.GetCells())) c, err := client.Sync(ctx, sReq) if err != nil { From f33019292b528fab421551e74cfb85fe7ef066d9 Mon Sep 17 00:00:00 2001 From: Matthias Crauwels Date: Wed, 11 Oct 2023 14:29:09 +0200 Subject: [PATCH 4/8] add tests --- .../planetscale_edge_database_test.go | 68 +++++++++++++++++-- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index 7502023..8dacdea 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -59,7 +59,7 @@ func TestRead_CanPeekBeforeRead(t *testing.T) { assert.Equal(t, esc, sc) assert.Equal(t, 1, cc.syncFnInvokedCount) assert.False(t, tma.PingContextFnInvoked) - assert.False(t, tma.GetVitessTabletsFnInvoked) + assert.True(t, tma.GetVitessTabletsFnInvoked) } func TestRead_CanEarlyExitIfNoNewVGtidInPeek(t *testing.T) { @@ -126,6 +126,8 @@ func TestRead_CanPickPrimaryForShardedKeyspaces(t *testing.T) { cc := clientConnectionMock{ syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType) + assert.Contains(t, in.Cells, "test_cell_a") + assert.Contains(t, in.Cells, "test_cell_b") return syncClient, nil }, } @@ -148,7 +150,7 @@ func TestRead_CanPickPrimaryForShardedKeyspaces(t *testing.T) { assert.Equal(t, esc, sc) assert.Equal(t, 1, cc.syncFnInvokedCount) assert.False(t, tma.PingContextFnInvoked) - assert.False(t, tma.GetVitessTabletsFnInvoked) + assert.True(t, tma.GetVitessTabletsFnInvoked) } func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) { @@ -173,6 +175,8 @@ func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) { cc := clientConnectionMock{ syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { assert.Equal(t, psdbconnect.TabletType_replica, in.TabletType) + assert.Contains(t, in.Cells, "test_cell_a") + assert.Contains(t, in.Cells, "test_cell_b") return syncClient, nil }, } @@ -196,7 +200,7 @@ func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) { assert.Equal(t, esc, sc) assert.Equal(t, 1, cc.syncFnInvokedCount) assert.False(t, tma.PingContextFnInvoked) - assert.False(t, tma.GetVitessTabletsFnInvoked) + assert.True(t, tma.GetVitessTabletsFnInvoked) } func TestDiscover_CanPickRightAirbyteType(t *testing.T) { @@ -303,6 +307,8 @@ func TestRead_CanPickPrimaryForUnshardedKeyspaces(t *testing.T) { cc := clientConnectionMock{ syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType) + assert.Contains(t, in.Cells, "test_cell_a") + assert.Contains(t, in.Cells, "test_cell_b") return syncClient, nil }, } @@ -325,7 +331,59 @@ func TestRead_CanPickPrimaryForUnshardedKeyspaces(t *testing.T) { assert.Equal(t, esc, sc) assert.Equal(t, 1, cc.syncFnInvokedCount) assert.False(t, tma.PingContextFnInvoked) - assert.False(t, tma.GetVitessTabletsFnInvoked) + assert.True(t, tma.GetVitessTabletsFnInvoked) +} + +func TestRead_CanPickReplicaForUnshardedKeyspaces(t *testing.T) { + tma := getTestMysqlAccess() + b := bytes.NewBufferString("") + ped := PlanetScaleEdgeDatabase{ + Logger: NewLogger(b), + Mysql: tma, + } + tc := &psdbconnect.TableCursor{ + Shard: "-", + Position: "THIS_IS_A_SHARD_GTID", + Keyspace: "connect-test", + } + + syncClient := &connectSyncClientMock{ + syncResponses: []*psdbconnect.SyncResponse{ + { + Cursor: tc, + }, + }, + } + + cc := clientConnectionMock{ + syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { + assert.Equal(t, psdbconnect.TabletType_replica, in.TabletType) + assert.Contains(t, in.Cells, "test_cell_a") + assert.Contains(t, in.Cells, "test_cell_b") + return syncClient, nil + }, + } + ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { + return &cc, nil + } + ps := PlanetScaleSource{ + Database: "connect-test", + UseReplica: true, + } + cs := ConfiguredStream{ + Stream: Stream{ + Name: "customers", + Namespace: "connect-test", + }, + } + sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) + assert.NoError(t, err) + esc, err := TableCursorToSerializedCursor(tc) + assert.NoError(t, err) + assert.Equal(t, esc, sc) + assert.Equal(t, 1, cc.syncFnInvokedCount) + assert.False(t, tma.PingContextFnInvoked) + assert.True(t, tma.GetVitessTabletsFnInvoked) } func TestRead_CanReturnOriginalCursorIfNoNewFound(t *testing.T) { @@ -600,11 +658,13 @@ func getTestMysqlAccess() *mysqlAccessMock { GetVitessTabletsFn: func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) { return []VitessTablet{ { + Cell: "test_cell_a", Keyspace: "connect-test", TabletType: TabletTypeToString(psdbconnect.TabletType_primary), State: "SERVING", }, { + Cell: "test_cell_b", Keyspace: "connect-test", TabletType: TabletTypeToString(psdbconnect.TabletType_replica), State: "SERVING", From a3d04adeda8d14bbed058d998911b4b8b18efd16 Mon Sep 17 00:00:00 2001 From: Matthias Crauwels Date: Wed, 11 Oct 2023 10:32:27 +0200 Subject: [PATCH 5/8] add cross cell support --- cmd/internal/planetscale_edge_database.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 4e2d4a8..55a047d 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -305,6 +305,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table TabletType: tabletType, Cells: cells, } + p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("DEBUG: SyncRequest.Cells = %v", sReq.GetCells())) c, err := client.Sync(ctx, sReq) if err != nil { From 1c906308f0344d0211992e7274b727d9198912d1 Mon Sep 17 00:00:00 2001 From: Matthias Crauwels Date: Wed, 11 Oct 2023 18:42:49 +0200 Subject: [PATCH 6/8] switch to using built-in slices after golang upgrade --- cmd/internal/planetscale_edge_database.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 55a047d..504cc25 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -3,9 +3,9 @@ package internal import ( "context" "fmt" - "golang.org/x/exp/slices" "io" "net/http" + "slices" "strings" "time" From dff4e220ab4345febe0894816345d91e38688030 Mon Sep 17 00:00:00 2001 From: Matthias Crauwels Date: Wed, 11 Oct 2023 18:47:22 +0200 Subject: [PATCH 7/8] remove unused dependency --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index e620236..82a4b49 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( require ( github.com/pkg/errors v0.9.1 github.com/planetscale/psdb v0.0.0-20220429000526-e2a0e798aaf3 - golang.org/x/exp v0.0.0-20230131160201-f062dba9d201 google.golang.org/grpc v1.58.3 google.golang.org/protobuf v1.31.0 ) From b9f927f2ffd78fea15cfe8b126d5540378f983ae Mon Sep 17 00:00:00 2001 From: Phani Raj Date: Wed, 11 Oct 2023 12:18:05 -0500 Subject: [PATCH 8/8] filter by tabletType and fix tests --- cmd/internal/planetscale_edge_database_test.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index 8dacdea..8d2ee27 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -126,8 +126,7 @@ func TestRead_CanPickPrimaryForShardedKeyspaces(t *testing.T) { cc := clientConnectionMock{ syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType) - assert.Contains(t, in.Cells, "test_cell_a") - assert.Contains(t, in.Cells, "test_cell_b") + assert.Contains(t, in.Cells, "test_cell_primary") return syncClient, nil }, } @@ -175,8 +174,7 @@ func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) { cc := clientConnectionMock{ syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { assert.Equal(t, psdbconnect.TabletType_replica, in.TabletType) - assert.Contains(t, in.Cells, "test_cell_a") - assert.Contains(t, in.Cells, "test_cell_b") + assert.Contains(t, in.Cells, "test_cell_replica") return syncClient, nil }, } @@ -307,8 +305,7 @@ func TestRead_CanPickPrimaryForUnshardedKeyspaces(t *testing.T) { cc := clientConnectionMock{ syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType) - assert.Contains(t, in.Cells, "test_cell_a") - assert.Contains(t, in.Cells, "test_cell_b") + assert.Contains(t, in.Cells, "test_cell_primary") return syncClient, nil }, } @@ -358,8 +355,7 @@ func TestRead_CanPickReplicaForUnshardedKeyspaces(t *testing.T) { cc := clientConnectionMock{ syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { assert.Equal(t, psdbconnect.TabletType_replica, in.TabletType) - assert.Contains(t, in.Cells, "test_cell_a") - assert.Contains(t, in.Cells, "test_cell_b") + assert.Contains(t, in.Cells, "test_cell_replica") return syncClient, nil }, } @@ -658,13 +654,13 @@ func getTestMysqlAccess() *mysqlAccessMock { GetVitessTabletsFn: func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) { return []VitessTablet{ { - Cell: "test_cell_a", + Cell: "test_cell_primary", Keyspace: "connect-test", TabletType: TabletTypeToString(psdbconnect.TabletType_primary), State: "SERVING", }, { - Cell: "test_cell_b", + Cell: "test_cell_replica", Keyspace: "connect-test", TabletType: TabletTypeToString(psdbconnect.TabletType_replica), State: "SERVING",