diff --git a/cmd/airbyte-source/spec.json b/cmd/airbyte-source/spec.json index ebbc9db..4c6edec 100644 --- a/cmd/airbyte-source/spec.json +++ b/cmd/airbyte-source/spec.json @@ -24,7 +24,7 @@ "description": "Comma separated list of shards you'd like to sync, by default all shards are synced.", "title": "Shards", "type": "string", - "order": 5 + "order": 6 }, "database": { "description": "The PlanetScale database name.", @@ -52,12 +52,19 @@ "default": false, "order": 4 }, + "use_rdonly": { + "description": "Use a rdonly replica to pull data from", + "title": "Use rdonly?", + "type": "boolean", + "default": false, + "order": 5 + }, "starting_gtids": { "type": "string", "title": "Starting GTIDs", "default": "", "description": "A JSON string containing start GTIDs for every { keyspace: { shard: starting_gtid } }", - "order": 6 + "order": 7 }, "options": { "type": "object", diff --git a/cmd/internal/planetscale_connection.go b/cmd/internal/planetscale_connection.go index a8d520f..349c101 100644 --- a/cmd/internal/planetscale_connection.go +++ b/cmd/internal/planetscale_connection.go @@ -18,6 +18,7 @@ type PlanetScaleSource struct { Password string `json:"password"` Shards string `json:"shards"` UseReplica bool `json:"use_replica"` + UseRdonly bool `json:"use_rdonly"` StartingGtids string `json:"starting_gtids"` Options CustomSourceOptions `json:"options"` } @@ -36,7 +37,9 @@ func (psc PlanetScaleSource) DSN() string { config.Passwd = psc.Password tt := psdbconnect.TabletType_primary - if psc.UseReplica { + if psc.UseRdonly { + tt = psdbconnect.TabletType_batch + } else if psc.UseReplica { tt = psdbconnect.TabletType_replica } diff --git a/cmd/internal/planetscale_edge_database.go b/cmd/internal/planetscale_edge_database.go index 3f10b26..0f346b8 100644 --- a/cmd/internal/planetscale_edge_database.go +++ b/cmd/internal/planetscale_edge_database.go @@ -171,7 +171,9 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane ) tabletType := psdbconnect.TabletType_primary - if ps.UseReplica { + if ps.UseRdonly { + tabletType = psdbconnect.TabletType_batch + } else if ps.UseReplica { tabletType = psdbconnect.TabletType_replica } diff --git a/cmd/internal/planetscale_edge_database_test.go b/cmd/internal/planetscale_edge_database_test.go index b492148..c0f42c8 100644 --- a/cmd/internal/planetscale_edge_database_test.go +++ b/cmd/internal/planetscale_edge_database_test.go @@ -202,6 +202,55 @@ func TestRead_CanPickReplicaForShardedKeyspaces(t *testing.T) { assert.False(t, tma.GetVitessTabletsFnInvoked) } +func TestRead_CanPickRdonlyForShardedKeyspaces(t *testing.T) { + tma := getTestMysqlAccess() + b := bytes.NewBufferString("") + ped := PlanetScaleEdgeDatabase{ + Logger: NewLogger(b), + Mysql: tma, + } + tc := &psdbconnect.TableCursor{ + Shard: "40-80", + Position: "THIS_IS_A_SHARD_GTID", + Keyspace: "connect-test", + } + + syncClient := &connectSyncClientMock{ + syncResponses: []*psdbconnect.SyncResponse{ + {Cursor: tc}, + }, + } + + cc := clientConnectionMock{ + syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { + assert.Equal(t, psdbconnect.TabletType_batch, in.TabletType) + assert.Contains(t, in.Cells, "planetscale_operator_default") + return syncClient, nil + }, + } + ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { + return &cc, nil + } + ps := PlanetScaleSource{ + Database: "connect-test", + UseRdonly: true, + } + cs := ConfiguredStream{ + Stream: Stream{ + Name: "customers", + Namespace: "connect-test", + }, + } + sc, err := ped.Read(context.Background(), os.Stdout, ps, cs, tc) + assert.NoError(t, err) + esc, err := TableCursorToSerializedCursor(tc) + assert.NoError(t, err) + assert.Equal(t, esc, sc) + assert.Equal(t, 1, cc.syncFnInvokedCount) + assert.False(t, tma.PingContextFnInvoked) + assert.False(t, tma.GetVitessTabletsFnInvoked) +} + func TestDiscover_CanPickRightAirbyteType(t *testing.T) { var tests = []struct { MysqlType string