Skip to content

Commit

Permalink
Merge pull request #321 from Shopify/sharding-include-tables
Browse files Browse the repository at this point in the history
Add support for specifying included tables to sharding config
  • Loading branch information
pawandubey authored Dec 17, 2021
2 parents f6da384 + 8394896 commit 3882a39
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 16 deletions.
14 changes: 12 additions & 2 deletions sharding/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sharding

import (
"fmt"

"github.com/Shopify/ghostferry"
)

Expand All @@ -18,14 +20,22 @@ type Config struct {

StatsDAddress string

JoinedTables map[string][]JoinTable
IgnoredTables []string
JoinedTables map[string][]JoinTable

// IgnoredTables and IncludedTables are mutually exclusive. Specifying both is an error.
IgnoredTables []string
IncludedTables []string

PrimaryKeyTables []string

Throttle *ghostferry.LagThrottlerConfig
}

func (c *Config) ValidateConfig() error {
if len(c.IgnoredTables) != 0 && len(c.IncludedTables) != 0 {
return fmt.Errorf("IgnoredTables and IncludedTables cannot be defined at the same time.")
}

if c.RunFerryFromReplica && c.SourceReplicationMaster != nil {
if err := c.SourceReplicationMaster.Validate(); err != nil {
return err
Expand Down
24 changes: 20 additions & 4 deletions sharding/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,21 +185,37 @@ func (f *ShardedCopyFilter) ApplicableEvent(event ghostferry.DMLEvent) (bool, er
return false, nil
}

type ShardedTableFilterType int64

const (
IgnoredTablesFilter ShardedTableFilterType = iota
IncludedTablesFilter
)

type ShardedTableFilter struct {
SourceShard string
ShardingKey string
JoinedTables map[string][]JoinTable
IgnoredTables []*regexp.Regexp
Type ShardedTableFilterType
Tables []*regexp.Regexp
PrimaryKeyTables map[string]struct{}
}

func (s *ShardedTableFilter) isIgnoreFilter() bool {
return s.Type == IgnoredTablesFilter
}

func (s *ShardedTableFilter) isIncludeFilter() bool {
return s.Type == IncludedTablesFilter
}

func (s *ShardedTableFilter) ApplicableDatabases(dbs []string) ([]string, error) {
return []string{s.SourceShard}, nil
}

func (s *ShardedTableFilter) ApplicableTables(tables []*ghostferry.TableSchema) (applicable []*ghostferry.TableSchema, err error) {
for _, table := range tables {
if s.isIgnored(table) {
if ((s.isIgnoreFilter() && s.isPresent(table)) || (s.isIncludeFilter() && !s.isPresent(table))) {
continue
}

Expand All @@ -225,8 +241,8 @@ func (s *ShardedTableFilter) ApplicableTables(tables []*ghostferry.TableSchema)
return
}

func (s *ShardedTableFilter) isIgnored(table *ghostferry.TableSchema) bool {
for _, re := range s.IgnoredTables {
func (s *ShardedTableFilter) isPresent(table *ghostferry.TableSchema) bool {
for _, re := range s.Tables {
if re.Match([]byte(table.Name)) {
return true
}
Expand Down
38 changes: 29 additions & 9 deletions sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,38 @@ func NewFerry(config *Config) (*ShardingFerry, error) {

config.VerifierType = ghostferry.VerifierTypeInline

ignored, err := compileRegexps(config.IgnoredTables)
if err != nil {
return nil, fmt.Errorf("failed to compile ignored tables: %v", err)
}
var tableFilter ghostferry.TableFilter

config.TableFilter = &ShardedTableFilter{
ShardingKey: config.ShardingKey,
SourceShard: config.SourceDB,
JoinedTables: config.JoinedTables,
IgnoredTables: ignored,
if len(config.IncludedTables) != 0 {
included, err := compileRegexps(config.IncludedTables)
if err != nil {
return nil, fmt.Errorf("failed to compile included tables: %v", err)
}

tableFilter = &ShardedTableFilter{
ShardingKey: config.ShardingKey,
SourceShard: config.SourceDB,
JoinedTables: config.JoinedTables,
Type: IncludedTablesFilter,
Tables: included,
}
} else {
ignored, err := compileRegexps(config.IgnoredTables)
if err != nil {
return nil, fmt.Errorf("failed to compile ignored tables: %v", err)
}

tableFilter = &ShardedTableFilter{
ShardingKey: config.ShardingKey,
SourceShard: config.SourceDB,
JoinedTables: config.JoinedTables,
Type: IgnoredTablesFilter,
Tables: ignored,
}
}

config.TableFilter = tableFilter

if err := config.ValidateConfig(); err != nil {
return nil, fmt.Errorf("failed to validate config: %v", err)
}
Expand Down
30 changes: 30 additions & 0 deletions sharding/test/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package test

import (
"testing"

"github.com/Shopify/ghostferry"
"github.com/Shopify/ghostferry/sharding"
"github.com/stretchr/testify/assert"
)

func TestConfigWithBothIncludedAndIgnoredTablesIsInvalid(t *testing.T) {
config := sharding.Config{
IncludedTables: []string{"table_one", "table_two"},
IgnoredTables: []string{"table_one", "table_two"},
}

err := config.ValidateConfig()
assert.EqualError(t, err, "IgnoredTables and IncludedTables cannot be defined at the same time.")
}


func TestConfigWithRunFerryFromReplicaWithoutValidSourceReplicationMasterIsInvalid(t *testing.T) {
config := sharding.Config{
RunFerryFromReplica: true,
SourceReplicationMaster: &ghostferry.DatabaseConfig{},
}

err := config.ValidateConfig()
assert.EqualError(t, err, "host is empty")
}
29 changes: 28 additions & 1 deletion sharding/test/table_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func TestShardedTableFilterRejectsIgnoredTables(t *testing.T) {
filter := &sharding.ShardedTableFilter{
SourceShard: "shard_42",
ShardingKey: "tenant_id",
IgnoredTables: []*regexp.Regexp{
Type: sharding.IgnoredTablesFilter,
Tables: []*regexp.Regexp{
regexp.MustCompile("^_(.*)_new$"),
regexp.MustCompile("^_(.*)_old$"),
regexp.MustCompile("^lhm._(.*)"),
Expand Down Expand Up @@ -53,6 +54,32 @@ func TestShardedTableFilterRejectsIgnoredTables(t *testing.T) {
assert.Equal(t, tables[5:], applicable)
}

func TestShardedTableFilterSelectsIncludedTables(t *testing.T) {
filter := &sharding.ShardedTableFilter{
SourceShard: "shard_42",
ShardingKey: "tenant_id",
Type: sharding.IncludedTablesFilter,
Tables: []*regexp.Regexp{
regexp.MustCompile("^table_.*"),
regexp.MustCompile("ghost"),
},
}

tables := []*ghostferry.TableSchema{
{Table: &schema.Table{Schema: "shard_42", Name: "_table_name_new", Columns: []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}}}},
{Table: &schema.Table{Schema: "shard_42", Name: "lhma_1234_table_name", Columns: []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}}}},
{Table: &schema.Table{Schema: "shard_42", Name: "new", Columns: []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}}}},
{Table: &schema.Table{Schema: "shard_42", Name: "x_lhmn_table_name", Columns: []schema.TableColumn{{Name: "bar"}, {Name: "tenant_id"}}}},
{Table: &schema.Table{Schema: "shard_42", Name: "table_new", Columns: []schema.TableColumn{{Name: "foo"}, {Name: "tenant_id"}}}},
{Table: &schema.Table{Schema: "shard_42", Name: "ghost", Columns: []schema.TableColumn{{Name: "foo"}, {Name: "tenant_id"}}}},
{Table: &schema.Table{Schema: "shard_42", Name: "table_name", Columns: []schema.TableColumn{{Name: "bar"}, {Name: "tenant_id"}}}},
}

applicable, err := filter.ApplicableTables(tables)
assert.Nil(t, err)
assert.Equal(t, tables[4:], applicable)
}

func TestShardedTableFilterSelectsTablesWithShardingKey(t *testing.T) {
filter := &sharding.ShardedTableFilter{SourceShard: "shard_42", ShardingKey: "tenant_id"}

Expand Down

0 comments on commit 3882a39

Please sign in to comment.