Skip to content

Commit

Permalink
Merge pull request #77 from planetscale/mcrauwel/support-cross-cell-r…
Browse files Browse the repository at this point in the history
…eads

support cross cell reads
  • Loading branch information
Phani Raj authored Oct 11, 2023
2 parents 454a400 + b9f927f commit ff11302
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 8 deletions.
38 changes: 34 additions & 4 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/http"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -170,6 +171,25 @@ func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScale
return p.Mysql.GetVitessShards(ctx, psc)
}

func (p PlanetScaleEdgeDatabase) ListCells(ctx context.Context, psc PlanetScaleSource) ([]string, error) {
var cells []string
tablets, err := p.Mysql.GetVitessTablets(ctx, psc)

if err != nil {
return cells, err
}

for _, vttablet := range tablets {
if strings.EqualFold(vttablet.Keyspace, psc.Database) {
if !slices.Contains(cells, vttablet.Cell) {
cells = append(cells, vttablet.Cell)
}
}
}

return cells, nil
}

// Read streams rows from a table given a starting cursor.
// 1. We will get the latest vgtid for a given table in a shard when a sync session starts.
// 2. This latest vgtid is now the stopping point for this sync session.
Expand All @@ -188,13 +208,20 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
tabletType = psdbconnect.TabletType_replica
}

cells, err := p.ListCells(ctx, ps)
if err != nil {
return currentSerializedCursor, err
}

p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("Syncing from tabletType \"%v\", from cells: %v", TabletTypeToString(tabletType), cells))

currentPosition := lastKnownPosition
table := s.Stream
readDuration := 1 * time.Minute
preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard)
for {
p.Logger.Log(LOGLEVEL_INFO, preamble+"peeking to see if there's any new rows")
latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType)
latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType, cells)
if lcErr != nil {
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position")
}
Expand All @@ -207,7 +234,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("new rows found, syncing rows for %v", readDuration))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"syncing rows with cursor [%v]", currentPosition))

currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration)
currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, cells, readDuration)
if currentPosition.Position != "" {
currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition)
if sErr != nil {
Expand Down Expand Up @@ -235,7 +262,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
}
}

func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, error) {
func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, cells []string, readDuration time.Duration) (*psdbconnect.TableCursor, error) {
defer p.Logger.Flush()
ctx, cancel := context.WithTimeout(ctx, readDuration)
defer cancel()
Expand Down Expand Up @@ -276,7 +303,9 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
TableName: s.Name,
Cursor: tc,
TabletType: tabletType,
Cells: cells,
}
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("DEBUG: SyncRequest.Cells = %v", sReq.GetCells()))

c, err := client.Sync(ctx, sReq)
if err != nil {
Expand Down Expand Up @@ -329,7 +358,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
}
}

func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) {
func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, cells []string) (string, error) {
defer p.Logger.Flush()
timeout := 45 * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
Expand Down Expand Up @@ -368,6 +397,7 @@ func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, sh
Position: "current",
},
TabletType: tabletType,
Cells: cells,
}

c, err := client.Sync(ctx, sReq)
Expand Down
64 changes: 60 additions & 4 deletions cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestRead_CanPeekBeforeRead(t *testing.T) {
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
assert.True(t, tma.GetVitessTabletsFnInvoked)
}

func TestRead_CanEarlyExitIfNoNewVGtidInPeek(t *testing.T) {
Expand Down Expand Up @@ -126,6 +126,7 @@ func TestRead_CanPickPrimaryForShardedKeyspaces(t *testing.T) {
cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType)
assert.Contains(t, in.Cells, "test_cell_primary")
return syncClient, nil
},
}
Expand All @@ -148,7 +149,7 @@ func TestRead_CanPickPrimaryForShardedKeyspaces(t *testing.T) {
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
assert.True(t, tma.GetVitessTabletsFnInvoked)
}

func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) {
Expand All @@ -173,6 +174,7 @@ func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) {
cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
assert.Equal(t, psdbconnect.TabletType_replica, in.TabletType)
assert.Contains(t, in.Cells, "test_cell_replica")
return syncClient, nil
},
}
Expand All @@ -196,7 +198,7 @@ func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) {
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
assert.True(t, tma.GetVitessTabletsFnInvoked)
}

func TestDiscover_CanPickRightAirbyteType(t *testing.T) {
Expand Down Expand Up @@ -303,6 +305,7 @@ func TestRead_CanPickPrimaryForUnshardedKeyspaces(t *testing.T) {
cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType)
assert.Contains(t, in.Cells, "test_cell_primary")
return syncClient, nil
},
}
Expand All @@ -325,7 +328,58 @@ func TestRead_CanPickPrimaryForUnshardedKeyspaces(t *testing.T) {
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
assert.True(t, tma.GetVitessTabletsFnInvoked)
}

func TestRead_CanPickReplicaForUnshardedKeyspaces(t *testing.T) {
tma := getTestMysqlAccess()
b := bytes.NewBufferString("")
ped := PlanetScaleEdgeDatabase{
Logger: NewLogger(b),
Mysql: tma,
}
tc := &psdbconnect.TableCursor{
Shard: "-",
Position: "THIS_IS_A_SHARD_GTID",
Keyspace: "connect-test",
}

syncClient := &connectSyncClientMock{
syncResponses: []*psdbconnect.SyncResponse{
{
Cursor: tc,
},
},
}

cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
assert.Equal(t, psdbconnect.TabletType_replica, in.TabletType)
assert.Contains(t, in.Cells, "test_cell_replica")
return syncClient, nil
},
}
ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) {
return &cc, nil
}
ps := PlanetScaleSource{
Database: "connect-test",
UseReplica: true,
}
cs := ConfiguredStream{
Stream: Stream{
Name: "customers",
Namespace: "connect-test",
},
}
sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc)
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(tc)
assert.NoError(t, err)
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.True(t, tma.GetVitessTabletsFnInvoked)
}

func TestRead_CanReturnOriginalCursorIfNoNewFound(t *testing.T) {
Expand Down Expand Up @@ -600,11 +654,13 @@ func getTestMysqlAccess() *mysqlAccessMock {
GetVitessTabletsFn: func(ctx context.Context, psc PlanetScaleSource) ([]VitessTablet, error) {
return []VitessTablet{
{
Cell: "test_cell_primary",
Keyspace: "connect-test",
TabletType: TabletTypeToString(psdbconnect.TabletType_primary),
State: "SERVING",
},
{
Cell: "test_cell_replica",
Keyspace: "connect-test",
TabletType: TabletTypeToString(psdbconnect.TabletType_replica),
State: "SERVING",
Expand Down

0 comments on commit ff11302

Please sign in to comment.