diff --git a/sharding/config.go b/sharding/config.go index 2e803118..e244583b 100644 --- a/sharding/config.go +++ b/sharding/config.go @@ -1,6 +1,8 @@ package sharding import ( + "fmt" + "github.com/Shopify/ghostferry" ) @@ -18,14 +20,22 @@ type Config struct { StatsDAddress string - JoinedTables map[string][]JoinTable - IgnoredTables []string + JoinedTables map[string][]JoinTable + + // IgnoredTables and IncludedTables are mutually exclusive. Specifying both is an error. + IgnoredTables []string + IncludedTables []string + PrimaryKeyTables []string Throttle *ghostferry.LagThrottlerConfig } func (c *Config) ValidateConfig() error { + if len(c.IgnoredTables) != 0 && len(c.IncludedTables) != 0 { + return fmt.Errorf("IgnoredTables and IncludedTables cannot be defined at the same time.") + } + if c.RunFerryFromReplica && c.SourceReplicationMaster != nil { if err := c.SourceReplicationMaster.Validate(); err != nil { return err diff --git a/sharding/filter.go b/sharding/filter.go index 71cfd20d..e94a7622 100644 --- a/sharding/filter.go +++ b/sharding/filter.go @@ -185,21 +185,37 @@ func (f *ShardedCopyFilter) ApplicableEvent(event ghostferry.DMLEvent) (bool, er return false, nil } +type ShardedTableFilterType int64 + +const ( + IgnoredTablesFilter ShardedTableFilterType = iota + IncludedTablesFilter +) + type ShardedTableFilter struct { SourceShard string ShardingKey string JoinedTables map[string][]JoinTable - IgnoredTables []*regexp.Regexp + Type ShardedTableFilterType + Tables []*regexp.Regexp PrimaryKeyTables map[string]struct{} } +func (s *ShardedTableFilter) isIgnoreFilter() bool { + return s.Type == IgnoredTablesFilter +} + +func (s *ShardedTableFilter) isIncludeFilter() bool { + return s.Type == IncludedTablesFilter +} + func (s *ShardedTableFilter) ApplicableDatabases(dbs []string) ([]string, error) { return []string{s.SourceShard}, nil } func (s *ShardedTableFilter) ApplicableTables(tables []*ghostferry.TableSchema) (applicable []*ghostferry.TableSchema, err error) { for _, table := range tables { - if s.isIgnored(table) { + if ((s.isIgnoreFilter() && s.isPresent(table)) || (s.isIncludeFilter() && !s.isPresent(table))) { continue } @@ -225,8 +241,8 @@ func (s *ShardedTableFilter) ApplicableTables(tables []*ghostferry.TableSchema) return } -func (s *ShardedTableFilter) isIgnored(table *ghostferry.TableSchema) bool { - for _, re := range s.IgnoredTables { +func (s *ShardedTableFilter) isPresent(table *ghostferry.TableSchema) bool { + for _, re := range s.Tables { if re.Match([]byte(table.Name)) { return true } diff --git a/sharding/sharding.go b/sharding/sharding.go index e6b831b0..d2d174e0 100644 --- a/sharding/sharding.go +++ b/sharding/sharding.go @@ -28,18 +28,38 @@ func NewFerry(config *Config) (*ShardingFerry, error) { config.VerifierType = ghostferry.VerifierTypeInline - ignored, err := compileRegexps(config.IgnoredTables) - if err != nil { - return nil, fmt.Errorf("failed to compile ignored tables: %v", err) - } + var tableFilter ghostferry.TableFilter - config.TableFilter = &ShardedTableFilter{ - ShardingKey: config.ShardingKey, - SourceShard: config.SourceDB, - JoinedTables: config.JoinedTables, - IgnoredTables: ignored, + if len(config.IncludedTables) != 0 { + included, err := compileRegexps(config.IncludedTables) + if err != nil { + return nil, fmt.Errorf("failed to compile included tables: %v", err) + } + + tableFilter = &ShardedTableFilter{ + ShardingKey: config.ShardingKey, + SourceShard: config.SourceDB, + JoinedTables: config.JoinedTables, + Type: IncludedTablesFilter, + Tables: included, + } + } else { + ignored, err := compileRegexps(config.IgnoredTables) + if err != nil { + return nil, fmt.Errorf("failed to compile ignored tables: %v", err) + } + + tableFilter = &ShardedTableFilter{ + ShardingKey: config.ShardingKey, + SourceShard: config.SourceDB, + JoinedTables: config.JoinedTables, + Type: IgnoredTablesFilter, + Tables: ignored, + } } + config.TableFilter = tableFilter + if err := config.ValidateConfig(); err != nil { return nil, fmt.Errorf("failed to validate config: %v", err) } diff --git a/sharding/test/config_test.go b/sharding/test/config_test.go new file mode 100644 index 00000000..1734083b --- /dev/null +++ b/sharding/test/config_test.go @@ -0,0 +1,30 @@ +package test + +import ( + "testing" + + "github.com/Shopify/ghostferry" + "github.com/Shopify/ghostferry/sharding" + "github.com/stretchr/testify/assert" +) + +func TestConfigWithBothIncludedAndIgnoredTablesIsInvalid(t *testing.T) { + config := sharding.Config{ + IncludedTables: []string{"table_one", "table_two"}, + IgnoredTables: []string{"table_one", "table_two"}, + } + + err := config.ValidateConfig() + assert.EqualError(t, err, "IgnoredTables and IncludedTables cannot be defined at the same time.") +} + + +func TestConfigWithRunFerryFromReplicaWithoutValidSourceReplicationMasterIsInvalid(t *testing.T) { + config := sharding.Config{ + RunFerryFromReplica: true, + SourceReplicationMaster: &ghostferry.DatabaseConfig{}, + } + + err := config.ValidateConfig() + assert.EqualError(t, err, "host is empty") +} diff --git a/sharding/test/table_filter_test.go b/sharding/test/table_filter_test.go index 17a14b6d..6d3d2243 100644 --- a/sharding/test/table_filter_test.go +++ b/sharding/test/table_filter_test.go @@ -25,7 +25,8 @@ func TestShardedTableFilterRejectsIgnoredTables(t *testing.T) { filter := &sharding.ShardedTableFilter{ SourceShard: "shard_42", ShardingKey: "tenant_id", - IgnoredTables: []*regexp.Regexp{ + Type: sharding.IgnoredTablesFilter, + Tables: []*regexp.Regexp{ regexp.MustCompile("^_(.*)_new$"), regexp.MustCompile("^_(.*)_old$"), regexp.MustCompile("^lhm._(.*)"), @@ -53,6 +54,32 @@ func TestShardedTableFilterRejectsIgnoredTables(t *testing.T) { assert.Equal(t, tables[5:], applicable) } +func TestShardedTableFilterSelectsIncludedTables(t *testing.T) { + filter := &sharding.ShardedTableFilter{ + SourceShard: "shard_42", + ShardingKey: "tenant_id", + Type: sharding.IncludedTablesFilter, + Tables: []*regexp.Regexp{ + regexp.MustCompile("^table_.*"), + regexp.MustCompile("ghost"), + }, + } + + tables := []*ghostferry.TableSchema{ + {Table: &schema.Table{Schema: "shard_42", Name: "_table_name_new", Columns: []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "lhma_1234_table_name", Columns: []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "new", Columns: []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "x_lhmn_table_name", Columns: []schema.TableColumn{{Name: "bar"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "table_new", Columns: []schema.TableColumn{{Name: "foo"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "ghost", Columns: []schema.TableColumn{{Name: "foo"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "table_name", Columns: []schema.TableColumn{{Name: "bar"}, {Name: "tenant_id"}}}}, + } + + applicable, err := filter.ApplicableTables(tables) + assert.Nil(t, err) + assert.Equal(t, tables[4:], applicable) +} + func TestShardedTableFilterSelectsTablesWithShardingKey(t *testing.T) { filter := &sharding.ShardedTableFilter{SourceShard: "shard_42", ShardingKey: "tenant_id"}