From b579f8d5f75f4b72154c82cfd17afc7a22123aa8 Mon Sep 17 00:00:00 2001 From: Pawan Dubey Date: Wed, 1 Dec 2021 16:35:27 -0500 Subject: [PATCH 1/3] Add support for specifying included tables to sharding config The sharding config takes in a list of ignored tables currently. This works well when the use case calls for including most of the tables. However, if the use case calls for including a small subset of the tables, then the ignored list starts becoming very long and unwieldy. In this commit, we introduce an additional `IncludedTables` config option, identitcal to `IgnoredTables` in strucuture, but also mutually excusive with it. The sharding ferry initializes the correct filter based on which one of these two options are populated, and fails if it finds both to be populated at the same time. We also add a couple of tests for verifying this behaviour. I considered a few ways to do this: - Passing in a flag to ShardedTableFilter alongside the list the tables - Passing in the type of the filter to ShardedTableFilter alongside the list of tables - Breaking down ShardedTableFilter into two separate types and using the right one based on the config The type safety of the solutions increases down the list, but so does the complexity. I initially started with option 3, thinking it to be the most robust, but the complexity introduced by the extra layer of abstraction quickly became hard to justify. As we only have one possible "seam" to break ShardedTableFilter by, I abandoned this approach till we found a second seam (YAGNI). Passing in a boolean flag was the simplest, but also mandated that one of the two filters become the "default" for naming the flag itself (i.g. `isIgnore` vs `isInclude`). The second approach was a good mix of simplicity and robustness as we have named enum values to work with rather than mysterious booleans, so I went with that in the end. Co-authored-by: Shiv Nagarajan --- sharding/config.go | 14 ++++++++++-- sharding/filter.go | 24 +++++++++++++++++---- sharding/sharding.go | 34 ++++++++++++++++++++++-------- sharding/test/config_test.go | 30 ++++++++++++++++++++++++++ sharding/test/table_filter_test.go | 29 ++++++++++++++++++++++++- 5 files changed, 115 insertions(+), 16 deletions(-) create mode 100644 sharding/test/config_test.go diff --git a/sharding/config.go b/sharding/config.go index 2e803118..e244583b 100644 --- a/sharding/config.go +++ b/sharding/config.go @@ -1,6 +1,8 @@ package sharding import ( + "fmt" + "github.com/Shopify/ghostferry" ) @@ -18,14 +20,22 @@ type Config struct { StatsDAddress string - JoinedTables map[string][]JoinTable - IgnoredTables []string + JoinedTables map[string][]JoinTable + + // IgnoredTables and IncludedTables are mutually exclusive. Specifying both is an error. + IgnoredTables []string + IncludedTables []string + PrimaryKeyTables []string Throttle *ghostferry.LagThrottlerConfig } func (c *Config) ValidateConfig() error { + if len(c.IgnoredTables) != 0 && len(c.IncludedTables) != 0 { + return fmt.Errorf("IgnoredTables and IncludedTables cannot be defined at the same time.") + } + if c.RunFerryFromReplica && c.SourceReplicationMaster != nil { if err := c.SourceReplicationMaster.Validate(); err != nil { return err diff --git a/sharding/filter.go b/sharding/filter.go index 71cfd20d..e94a7622 100644 --- a/sharding/filter.go +++ b/sharding/filter.go @@ -185,21 +185,37 @@ func (f *ShardedCopyFilter) ApplicableEvent(event ghostferry.DMLEvent) (bool, er return false, nil } +type ShardedTableFilterType int64 + +const ( + IgnoredTablesFilter ShardedTableFilterType = iota + IncludedTablesFilter +) + type ShardedTableFilter struct { SourceShard string ShardingKey string JoinedTables map[string][]JoinTable - IgnoredTables []*regexp.Regexp + Type ShardedTableFilterType + Tables []*regexp.Regexp PrimaryKeyTables map[string]struct{} } +func (s *ShardedTableFilter) isIgnoreFilter() bool { + return s.Type == IgnoredTablesFilter +} + +func (s *ShardedTableFilter) isIncludeFilter() bool { + return s.Type == IncludedTablesFilter +} + func (s *ShardedTableFilter) ApplicableDatabases(dbs []string) ([]string, error) { return []string{s.SourceShard}, nil } func (s *ShardedTableFilter) ApplicableTables(tables []*ghostferry.TableSchema) (applicable []*ghostferry.TableSchema, err error) { for _, table := range tables { - if s.isIgnored(table) { + if ((s.isIgnoreFilter() && s.isPresent(table)) || (s.isIncludeFilter() && !s.isPresent(table))) { continue } @@ -225,8 +241,8 @@ func (s *ShardedTableFilter) ApplicableTables(tables []*ghostferry.TableSchema) return } -func (s *ShardedTableFilter) isIgnored(table *ghostferry.TableSchema) bool { - for _, re := range s.IgnoredTables { +func (s *ShardedTableFilter) isPresent(table *ghostferry.TableSchema) bool { + for _, re := range s.Tables { if re.Match([]byte(table.Name)) { return true } diff --git a/sharding/sharding.go b/sharding/sharding.go index e6b831b0..2ddc23f2 100644 --- a/sharding/sharding.go +++ b/sharding/sharding.go @@ -28,16 +28,32 @@ func NewFerry(config *Config) (*ShardingFerry, error) { config.VerifierType = ghostferry.VerifierTypeInline - ignored, err := compileRegexps(config.IgnoredTables) - if err != nil { - return nil, fmt.Errorf("failed to compile ignored tables: %v", err) - } + if len(config.IgnoredTables) != 0 { + ignored, err := compileRegexps(config.IgnoredTables) + if err != nil { + return nil, fmt.Errorf("failed to compile ignored tables: %v", err) + } - config.TableFilter = &ShardedTableFilter{ - ShardingKey: config.ShardingKey, - SourceShard: config.SourceDB, - JoinedTables: config.JoinedTables, - IgnoredTables: ignored, + config.TableFilter = &ShardedTableFilter{ + ShardingKey: config.ShardingKey, + SourceShard: config.SourceDB, + JoinedTables: config.JoinedTables, + Type: IgnoredTablesFilter, + Tables: ignored, + } + } else if len(config.IncludedTables) != 0 { + included, err := compileRegexps(config.IncludedTables) + if err != nil { + return nil, fmt.Errorf("failed to compile included tables: %v", err) + } + + config.TableFilter = &ShardedTableFilter{ + ShardingKey: config.ShardingKey, + SourceShard: config.SourceDB, + JoinedTables: config.JoinedTables, + Type: IncludedTablesFilter, + Tables: included, + } } if err := config.ValidateConfig(); err != nil { diff --git a/sharding/test/config_test.go b/sharding/test/config_test.go new file mode 100644 index 00000000..d35090b3 --- /dev/null +++ b/sharding/test/config_test.go @@ -0,0 +1,30 @@ +package test + +import ( + "testing" + + "github.com/Shopify/ghostferry" + "github.com/Shopify/ghostferry/sharding" + "github.com/stretchr/testify/assert" +) + +func TestConfigWithBothIncludedAndIgnoredTablesIsInvalid(t *testing.T) { + config := sharding.Config{ + IncludedTables: []string{"table_one", "table_two"}, + IgnoredTables: []string{"table_one", "table_two"}, + } + + err := config.ValidateConfig() + assert.EqualError(t, err, "IgnoredTables and IncludedTables cannot be defined at the same time.") +} + + +func TestConfigWithRunFerryFromReplicaWithoutValidSourceReplicationMasterIsInvlaid(t *testing.T) { + config := sharding.Config{ + RunFerryFromReplica: true, + SourceReplicationMaster: &ghostferry.DatabaseConfig{}, + } + + err := config.ValidateConfig() + assert.EqualError(t, err, "host is empty") +} diff --git a/sharding/test/table_filter_test.go b/sharding/test/table_filter_test.go index 17a14b6d..6d3d2243 100644 --- a/sharding/test/table_filter_test.go +++ b/sharding/test/table_filter_test.go @@ -25,7 +25,8 @@ func TestShardedTableFilterRejectsIgnoredTables(t *testing.T) { filter := &sharding.ShardedTableFilter{ SourceShard: "shard_42", ShardingKey: "tenant_id", - IgnoredTables: []*regexp.Regexp{ + Type: sharding.IgnoredTablesFilter, + Tables: []*regexp.Regexp{ regexp.MustCompile("^_(.*)_new$"), regexp.MustCompile("^_(.*)_old$"), regexp.MustCompile("^lhm._(.*)"), @@ -53,6 +54,32 @@ func TestShardedTableFilterRejectsIgnoredTables(t *testing.T) { assert.Equal(t, tables[5:], applicable) } +func TestShardedTableFilterSelectsIncludedTables(t *testing.T) { + filter := &sharding.ShardedTableFilter{ + SourceShard: "shard_42", + ShardingKey: "tenant_id", + Type: sharding.IncludedTablesFilter, + Tables: []*regexp.Regexp{ + regexp.MustCompile("^table_.*"), + regexp.MustCompile("ghost"), + }, + } + + tables := []*ghostferry.TableSchema{ + {Table: &schema.Table{Schema: "shard_42", Name: "_table_name_new", Columns: []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "lhma_1234_table_name", Columns: []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "new", Columns: []schema.TableColumn{{Name: "id"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "x_lhmn_table_name", Columns: []schema.TableColumn{{Name: "bar"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "table_new", Columns: []schema.TableColumn{{Name: "foo"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "ghost", Columns: []schema.TableColumn{{Name: "foo"}, {Name: "tenant_id"}}}}, + {Table: &schema.Table{Schema: "shard_42", Name: "table_name", Columns: []schema.TableColumn{{Name: "bar"}, {Name: "tenant_id"}}}}, + } + + applicable, err := filter.ApplicableTables(tables) + assert.Nil(t, err) + assert.Equal(t, tables[4:], applicable) +} + func TestShardedTableFilterSelectsTablesWithShardingKey(t *testing.T) { filter := &sharding.ShardedTableFilter{SourceShard: "shard_42", ShardingKey: "tenant_id"} From 18071de28d79f49ad19bc55d12bb41139af280d9 Mon Sep 17 00:00:00 2001 From: Pawan Dubey Date: Fri, 10 Dec 2021 17:22:29 -0500 Subject: [PATCH 2/3] Make ignored filter the fallback if neither filter is specified This is prevent breaking the current behavior (and hence, tests). Co-authored-by: Shiv Nagarajan --- sharding/sharding.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/sharding/sharding.go b/sharding/sharding.go index 2ddc23f2..d2d174e0 100644 --- a/sharding/sharding.go +++ b/sharding/sharding.go @@ -28,34 +28,38 @@ func NewFerry(config *Config) (*ShardingFerry, error) { config.VerifierType = ghostferry.VerifierTypeInline - if len(config.IgnoredTables) != 0 { - ignored, err := compileRegexps(config.IgnoredTables) + var tableFilter ghostferry.TableFilter + + if len(config.IncludedTables) != 0 { + included, err := compileRegexps(config.IncludedTables) if err != nil { - return nil, fmt.Errorf("failed to compile ignored tables: %v", err) + return nil, fmt.Errorf("failed to compile included tables: %v", err) } - config.TableFilter = &ShardedTableFilter{ + tableFilter = &ShardedTableFilter{ ShardingKey: config.ShardingKey, SourceShard: config.SourceDB, JoinedTables: config.JoinedTables, - Type: IgnoredTablesFilter, - Tables: ignored, + Type: IncludedTablesFilter, + Tables: included, } - } else if len(config.IncludedTables) != 0 { - included, err := compileRegexps(config.IncludedTables) + } else { + ignored, err := compileRegexps(config.IgnoredTables) if err != nil { - return nil, fmt.Errorf("failed to compile included tables: %v", err) + return nil, fmt.Errorf("failed to compile ignored tables: %v", err) } - config.TableFilter = &ShardedTableFilter{ + tableFilter = &ShardedTableFilter{ ShardingKey: config.ShardingKey, SourceShard: config.SourceDB, JoinedTables: config.JoinedTables, - Type: IncludedTablesFilter, - Tables: included, + Type: IgnoredTablesFilter, + Tables: ignored, } } + config.TableFilter = tableFilter + if err := config.ValidateConfig(); err != nil { return nil, fmt.Errorf("failed to validate config: %v", err) } From 83948965760c6960c2a507a5338decbb0a4d3367 Mon Sep 17 00:00:00 2001 From: Pawan Dubey Date: Fri, 10 Dec 2021 17:26:11 -0500 Subject: [PATCH 3/3] Address PR feedback: - Fix typo Co-authored-by: Shiv Nagarajan --- sharding/test/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sharding/test/config_test.go b/sharding/test/config_test.go index d35090b3..1734083b 100644 --- a/sharding/test/config_test.go +++ b/sharding/test/config_test.go @@ -19,7 +19,7 @@ func TestConfigWithBothIncludedAndIgnoredTablesIsInvalid(t *testing.T) { } -func TestConfigWithRunFerryFromReplicaWithoutValidSourceReplicationMasterIsInvlaid(t *testing.T) { +func TestConfigWithRunFerryFromReplicaWithoutValidSourceReplicationMasterIsInvalid(t *testing.T) { config := sharding.Config{ RunFerryFromReplica: true, SourceReplicationMaster: &ghostferry.DatabaseConfig{},