Skip to content

Commit

Permalink
Allow specifying table creation order Shopify#161
Browse files Browse the repository at this point in the history
This commit introduces the ability for ghostferry-copydb to create
tables in a specific order. This can be useful if tables to be created
contain foreign-key constraints (FKCs).

NOTE: This does not mean that ghostferry supports FKCs! However, with
this feature and by disabling FKCs in the target DB, it is
theoretically possible to migrate DBs with FKCs - this is experimental
and not recommended for production usage. Use with care!

To disable the FKCs, one must add the following config to the target DB
configuration:

    "Params": {
        "foreign_key_checks": "0"
    }
  • Loading branch information
Clemens Kolbitsch committed Mar 17, 2020
1 parent 76555c1 commit 4b4900d
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 1 deletion.
9 changes: 9 additions & 0 deletions copydb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ type Config struct {
// Filter configuration for tables to copy
Tables FilterAndRewriteConfigs

// Hints in which order to create database tables, as <db>.<table> . If a
// table is to be created on start and appears in this list, it is created
// before any other table, and is created in the order listed here
//
// This is useful when tables have dependencies amongst each other. By
// default, create them in an arbitrary order. Names refer to original
// databases and tables - that is, before renaming occurs.
TableCreationOrder []string

// If you're running Ghostferry from a read only replica, turn this option
// on and specify SourceReplicationMaster and ReplicatedMasterPositionQuery.
RunFerryFromReplica bool
Expand Down
3 changes: 2 additions & 1 deletion copydb/copydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func (this *CopydbFerry) CreateDatabasesAndTables() error {
// We need to create the same table/schemas on the target database
// as the ones we are copying.
logrus.Info("creating databases and tables on target")
for tableName := range this.Ferry.Tables {
tablesToCreate := this.Ferry.Tables.GetTableListWithPriority(this.config.TableCreationOrder)
for _, tableName := range tablesToCreate {
t := strings.Split(tableName, ".")

err := this.createDatabaseIfExistsOnTarget(t[0])
Expand Down
7 changes: 7 additions & 0 deletions copydb/test/copydb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ func (t *CopydbTestSuite) TestCreateDatabaseAndTableWithRewrites() {
t.Require().Equal(renamedTableName, value)
}

func (t *CopydbTestSuite) TestCreateDatabaseAndTableWithOrdering() {
// NOTE: Here we just ensure passing a table does not cause issues in the
// invocation. A more thorough test is done in the table-schema tests
t.copydbConfig.TableCreationOrder = []string{testSchemaName + "." + testTableName}
t.TestCreateDatabaseAndTableWithRewrites()
}

func TestCopydb(t *testing.T) {
testhelpers.SetupTest()
suite.Run(t, &CopydbTestSuite{})
Expand Down
25 changes: 25 additions & 0 deletions table_schema_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,31 @@ func (c TableSchemaCache) Get(database, table string) *TableSchema {
return c[fullTableName(database, table)]
}

// Helper to sort a given map of tables with a second list giving a priority.
// If an element is present in the input and the priority lists, the item will
// appear first (in the order of the priority list), all other items appear in
// the order given in the input
func (c TableSchemaCache) GetTableListWithPriority(priorityList []string) (tableNames []string) {
// just a fast lookup if the list contains items already
contains := map[string]bool{}
if len(priorityList) >= 0 {
for _, tableName := range priorityList {
// ignore tables given in the priority list that we don't know
if c[tableName] != nil {
contains[tableName] = true
tableNames = append(tableNames, tableName)
}
}
}
for tableName, _ := range c {
if contains[tableName] != true {
tableNames = append(tableNames, tableName)
}
}

return
}

func showDatabases(c *sql.DB) ([]string, error) {
rows, err := c.Query("show databases")
if err != nil {
Expand Down
48 changes: 48 additions & 0 deletions test/go/table_schema_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,54 @@ func (this *TableSchemaCacheTestSuite) TestQuotedTableNameFromString() {
this.Require().Equal("``.``", ghostferry.QuotedTableNameFromString("", ""))
}

func getMultiTableMap() *ghostferry.TableSchemaCache {
return &ghostferry.TableSchemaCache{
"schema.table1": &ghostferry.TableSchema{
Table: &sqlSchema.Table{
Schema: "schema",
Name: "table1",
},
},
"schema.table2": &ghostferry.TableSchema{
Table: &sqlSchema.Table{
Schema: "schema",
Name: "table2",
},
},
"schema.table3": &ghostferry.TableSchema{
Table: &sqlSchema.Table{
Schema: "schema",
Name: "table3",
},
},
}
}

func (this *TableSchemaCacheTestSuite) TestGetTableListWithPriorityNil() {
tables := getMultiTableMap()
// make sure we are not losing any elements, even if the priority does not
// mater
creationOrder := tables.GetTableListWithPriority(nil)
this.Require().Equal(len(creationOrder), 3)
this.Require().ElementsMatch(creationOrder, tables.AllTableNames())
}

func (this *TableSchemaCacheTestSuite) TestGetTableListWithPriority() {
tables := getMultiTableMap()
creationOrder := tables.GetTableListWithPriority([]string{"schema.table2"})
this.Require().Equal(len(creationOrder), 3)
this.Require().ElementsMatch(creationOrder, tables.AllTableNames())
this.Require().Equal(creationOrder[0], "schema.table2")
}

func (this *TableSchemaCacheTestSuite) TestGetTableListWithPriorityIgnoreUnknown() {
tables := getMultiTableMap()
creationOrder := tables.GetTableListWithPriority([]string{"schema.table2", "schema.unknown_table"})
this.Require().Equal(len(creationOrder), 3)
this.Require().ElementsMatch(creationOrder, tables.AllTableNames())
this.Require().Equal(creationOrder[0], "schema.table2")
}

func TestTableSchemaCache(t *testing.T) {
testhelpers.SetupTest()
suite.Run(t, &TableSchemaCacheTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}})
Expand Down

0 comments on commit 4b4900d

Please sign in to comment.