Skip to content

Commit

Permalink
Merge pull request #80 from planetscale/use-cell-alias
Browse files Browse the repository at this point in the history
Use cell alias for all cells in a vitess cluster instead of querying vitess
  • Loading branch information
mcrauwel authored Oct 12, 2023
2 parents ff11302 + abd29e0 commit 7ab217c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 41 deletions.
39 changes: 7 additions & 32 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"net/http"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -171,25 +170,6 @@ func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScale
return p.Mysql.GetVitessShards(ctx, psc)
}

func (p PlanetScaleEdgeDatabase) ListCells(ctx context.Context, psc PlanetScaleSource) ([]string, error) {
var cells []string
tablets, err := p.Mysql.GetVitessTablets(ctx, psc)

if err != nil {
return cells, err
}

for _, vttablet := range tablets {
if strings.EqualFold(vttablet.Keyspace, psc.Database) {
if !slices.Contains(cells, vttablet.Cell) {
cells = append(cells, vttablet.Cell)
}
}
}

return cells, nil
}

// Read streams rows from a table given a starting cursor.
// 1. We will get the latest vgtid for a given table in a shard when a sync session starts.
// 2. This latest vgtid is now the stopping point for this sync session.
Expand All @@ -208,20 +188,15 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
tabletType = psdbconnect.TabletType_replica
}

cells, err := p.ListCells(ctx, ps)
if err != nil {
return currentSerializedCursor, err
}

p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("Syncing from tabletType \"%v\", from cells: %v", TabletTypeToString(tabletType), cells))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("Syncing from tabletType \"%v\"", TabletTypeToString(tabletType)))

currentPosition := lastKnownPosition
table := s.Stream
readDuration := 1 * time.Minute
preamble := fmt.Sprintf("[%v:%v:%v shard : %v] ", table.Namespace, TabletTypeToString(tabletType), table.Name, currentPosition.Shard)
for {
p.Logger.Log(LOGLEVEL_INFO, preamble+"peeking to see if there's any new rows")
latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType, cells)
latestCursorPosition, lcErr := p.getLatestCursorPosition(ctx, currentPosition.Shard, currentPosition.Keyspace, table, ps, tabletType)
if lcErr != nil {
return currentSerializedCursor, errors.Wrap(err, "Unable to get latest cursor position")
}
Expand All @@ -234,7 +209,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("new rows found, syncing rows for %v", readDuration))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"syncing rows with cursor [%v]", currentPosition))

currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, cells, readDuration)
currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration)
if currentPosition.Position != "" {
currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition)
if sErr != nil {
Expand Down Expand Up @@ -262,7 +237,7 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
}
}

func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, cells []string, readDuration time.Duration) (*psdbconnect.TableCursor, error) {
func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.TableCursor, stopPosition string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, readDuration time.Duration) (*psdbconnect.TableCursor, error) {
defer p.Logger.Flush()
ctx, cancel := context.WithTimeout(ctx, readDuration)
defer cancel()
Expand Down Expand Up @@ -303,7 +278,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
TableName: s.Name,
Cursor: tc,
TabletType: tabletType,
Cells: cells,
Cells: []string{"planetscale_operator_default"},
}
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("DEBUG: SyncRequest.Cells = %v", sReq.GetCells()))

Expand Down Expand Up @@ -358,7 +333,7 @@ func (p PlanetScaleEdgeDatabase) sync(ctx context.Context, tc *psdbconnect.Table
}
}

func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType, cells []string) (string, error) {
func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, shard, keyspace string, s Stream, ps PlanetScaleSource, tabletType psdbconnect.TabletType) (string, error) {
defer p.Logger.Flush()
timeout := 45 * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
Expand Down Expand Up @@ -397,7 +372,7 @@ func (p PlanetScaleEdgeDatabase) getLatestCursorPosition(ctx context.Context, sh
Position: "current",
},
TabletType: tabletType,
Cells: cells,
Cells: []string{"planetscale_operator_default"},
}

c, err := client.Sync(ctx, sReq)
Expand Down
18 changes: 9 additions & 9 deletions cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestRead_CanPeekBeforeRead(t *testing.T) {
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.True(t, tma.GetVitessTabletsFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestRead_CanEarlyExitIfNoNewVGtidInPeek(t *testing.T) {
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestRead_CanPickPrimaryForShardedKeyspaces(t *testing.T) {
cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType)
assert.Contains(t, in.Cells, "test_cell_primary")
assert.Contains(t, in.Cells, "planetscale_operator_default")
return syncClient, nil
},
}
Expand All @@ -149,7 +149,7 @@ func TestRead_CanPickPrimaryForShardedKeyspaces(t *testing.T) {
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.True(t, tma.GetVitessTabletsFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) {
Expand All @@ -174,7 +174,7 @@ func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) {
cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
assert.Equal(t, psdbconnect.TabletType_replica, in.TabletType)
assert.Contains(t, in.Cells, "test_cell_replica")
assert.Contains(t, in.Cells, "planetscale_operator_default")
return syncClient, nil
},
}
Expand All @@ -198,7 +198,7 @@ func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) {
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.True(t, tma.GetVitessTabletsFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestDiscover_CanPickRightAirbyteType(t *testing.T) {
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestRead_CanPickPrimaryForUnshardedKeyspaces(t *testing.T) {
cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
assert.Equal(t, psdbconnect.TabletType_primary, in.TabletType)
assert.Contains(t, in.Cells, "test_cell_primary")
assert.Contains(t, in.Cells, "planetscale_operator_default")
return syncClient, nil
},
}
Expand All @@ -328,7 +328,7 @@ func TestRead_CanPickPrimaryForUnshardedKeyspaces(t *testing.T) {
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.True(t, tma.GetVitessTabletsFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestRead_CanPickReplicaForUnshardedKeyspaces(t *testing.T) {
Expand All @@ -355,7 +355,7 @@ func TestRead_CanPickReplicaForUnshardedKeyspaces(t *testing.T) {
cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
assert.Equal(t, psdbconnect.TabletType_replica, in.TabletType)
assert.Contains(t, in.Cells, "test_cell_replica")
assert.Contains(t, in.Cells, "planetscale_operator_default")
return syncClient, nil
},
}
Expand All @@ -379,7 +379,7 @@ func TestRead_CanPickReplicaForUnshardedKeyspaces(t *testing.T) {
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.True(t, tma.GetVitessTabletsFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestRead_CanReturnOriginalCursorIfNoNewFound(t *testing.T) {
Expand Down

0 comments on commit 7ab217c

Please sign in to comment.