Skip to content

Commit

Permalink
Merge pull request #112 from planetscale/anders-add-rdonly
Browse files Browse the repository at this point in the history
Add UseRdonly option to the airbyte connector
  • Loading branch information
notfelineit authored Sep 20, 2024
2 parents 989fc17 + 0e7201c commit c1ab01c
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 4 deletions.
11 changes: 9 additions & 2 deletions cmd/airbyte-source/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"description": "Comma separated list of shards you'd like to sync, by default all shards are synced.",
"title": "Shards",
"type": "string",
"order": 5
"order": 6
},
"database": {
"description": "The PlanetScale database name.",
Expand Down Expand Up @@ -52,12 +52,19 @@
"default": false,
"order": 4
},
"use_rdonly": {
"description": "Use a rdonly replica to pull data from",
"title": "Use rdonly?",
"type": "boolean",
"default": false,
"order": 5
},
"starting_gtids": {
"type": "string",
"title": "Starting GTIDs",
"default": "",
"description": "A JSON string containing start GTIDs for every { keyspace: { shard: starting_gtid } }",
"order": 6
"order": 7
},
"options": {
"type": "object",
Expand Down
5 changes: 4 additions & 1 deletion cmd/internal/planetscale_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type PlanetScaleSource struct {
Password string `json:"password"`
Shards string `json:"shards"`
UseReplica bool `json:"use_replica"`
UseRdonly bool `json:"use_rdonly"`
StartingGtids string `json:"starting_gtids"`
Options CustomSourceOptions `json:"options"`
}
Expand All @@ -36,7 +37,9 @@ func (psc PlanetScaleSource) DSN() string {
config.Passwd = psc.Password

tt := psdbconnect.TabletType_primary
if psc.UseReplica {
if psc.UseRdonly {
tt = psdbconnect.TabletType_batch
} else if psc.UseReplica {
tt = psdbconnect.TabletType_replica
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
)

tabletType := psdbconnect.TabletType_primary
if ps.UseReplica {
if ps.UseRdonly {
tabletType = psdbconnect.TabletType_batch
} else if ps.UseReplica {
tabletType = psdbconnect.TabletType_replica
}

Expand Down
49 changes: 49 additions & 0 deletions cmd/internal/planetscale_edge_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,55 @@ func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) {
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestRead_CanPickRdonlyForShardedKeyspaces(t *testing.T) {
tma := getTestMysqlAccess()
b := bytes.NewBufferString("")
ped := PlanetScaleEdgeDatabase{
Logger: NewLogger(b),
Mysql: tma,
}
tc := &psdbconnect.TableCursor{
Shard: "40-80",
Position: "THIS_IS_A_SHARD_GTID",
Keyspace: "connect-test",
}

syncClient := &connectSyncClientMock{
syncResponses: []*psdbconnect.SyncResponse{
{Cursor: tc},
},
}

cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
assert.Equal(t, psdbconnect.TabletType_batch, in.TabletType)
assert.Contains(t, in.Cells, "planetscale_operator_default")
return syncClient, nil
},
}
ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) {
return &cc, nil
}
ps := PlanetScaleSource{
Database: "connect-test",
UseRdonly: true,
}
cs := ConfiguredStream{
Stream: Stream{
Name: "customers",
Namespace: "connect-test",
},
}
sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc)
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(tc)
assert.NoError(t, err)
assert.Equal(t, esc, sc)
assert.Equal(t, 1, cc.syncFnInvokedCount)
assert.False(t, tma.PingContextFnInvoked)
assert.False(t, tma.GetVitessTabletsFnInvoked)
}

func TestDiscover_CanPickRightAirbyteType(t *testing.T) {
var tests = []struct {
MysqlType string
Expand Down

0 comments on commit c1ab01c

Please sign in to comment.