Skip to content

Commit

Permalink
Support tables with foreign key constraints Shopify#161
Browse files Browse the repository at this point in the history
Ghostferry currently documents that it cannot work with foreign-key
constraints (FKCs). This is because data- and binlog-writers operate
at the same time and copying happens on tables in parallel, which may
insert data that (temporarily) conflicts constraints.

MySQL allows disabling foreign-key constraint checks on a per-session
basis, and it does not re-validate constraints when this is disabled.
As a result, we may may temporarily disable constraint enforcement until
the database is back in a consistent state. The only issue that does
arise is that tables must be created in an order that satisfies their
inter-dependencies.

To disable the FKCs, one must add the following config to the target DB
configuration:

  "Params": {
    "foreign_key_checks": "0"
  }

Change-Id: I61ffcca56d6bc5517e961bfd5f5df4e02878b557
  • Loading branch information
Clemens Kolbitsch committed Mar 23, 2020
1 parent ddf70c7 commit a001c61
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 4 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,15 @@ Features/fixes added in this fork include
the copy phase.
Note that an *incomplete* execution of `Ghostferry` will leave the database in
an inconsistent state until the copy is resumed and completed.

- improved handling of [foreign key constraints](https://github.com/Shopify/ghostferry/issues/161):
support infer the table creation order automatically if the database contains
*foreign key constraints* and no manual order of tables is specified in the
configuration.
Note that this merely automates part of the setup needed for supporting
foreign key constraints. There are still several limitations in place for
migrating such databases and the feature must be used with great care.
Especially the use of database- or table-rewrites may introduce invalid target
database states that are not recoverable.

Overview of How it Works
------------------------
Expand Down
24 changes: 21 additions & 3 deletions copydb/copydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,33 @@ func (this *CopydbFerry) Start() error {
}

func (this *CopydbFerry) CreateDatabasesAndTables() error {
logger := logrus.WithField("tag", "create_databases_and_tables")

// We need to create the same table/schemas on the target database
// as the ones we are copying.
logrus.Info("creating databases and tables on target")
for _, tableName := range this.Ferry.Tables.GetTableListWithPriority(this.config.TablesToBeCreatedFirst) {
logger.Info("creating databases and tables on target")
var prioritzedTableNames []string
if len(this.config.TablesToBeCreatedFirst) > 0 {
// if specified, use what the config tells us
logger.Debug("config contains table creation order")
prioritzedTableNames = this.Ferry.Tables.GetTableListWithPriority(this.config.TablesToBeCreatedFirst)
} else {
// otherwise infer the right order ourselves
logger.Debug("inferring table creation order from source database")
var err error
prioritzedTableNames, err = this.Ferry.Tables.GetTableCreationOrder(this.Ferry.SourceDB)
if err != nil {
return err
}
}

for _, tableName := range prioritzedTableNames {
logger.Debugf("creating database table %s", tableName)
t := strings.Split(tableName, ".")

err := this.createDatabaseIfExistsOnTarget(t[0])
if err != nil {
logrus.WithError(err).WithField("database", t[0]).Error("cannot create database, this may leave the target database in an insane state")
logger.WithField("database", t[0]).Error("cannot create database, this may leave the target database in an insane state")
return err
}

Expand Down
58 changes: 58 additions & 0 deletions table_schema_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,64 @@ func (c TableSchemaCache) GetTableListWithPriority(priorityList []string) (prior
return
}

// Helper to sort the given map of tables based on the dependencies between
// tables in terms of foreign key constraints
func (c TableSchemaCache) GetTableCreationOrder(db *sql.DB) (prioritzedTableNames []string, err error) {
logger := logrus.WithField("tag", "table_schema_cache")

tableReferences := make(map[QualifiedTableName]TableForeignKeys)
for tableName, _ := range c {
t := strings.Split(tableName, ".")
table := NewQualifiedTableName(t[0], t[1])

referencedTables, dbErr := GetForeignKeyTablesOfTable(db, table)
if dbErr != nil {
logger.WithField("table", table).Error("cannot analyze database table foreign keys")
err = dbErr
return
}

logger.Debugf("found %d reference tables for %s", len(referencedTables), table)
tableReferences[table] = referencedTables
}

// simple fix-point loop: make sure we create at least one table per
// iteration and mark tables as able to create as soon as they no-longer
// refer to other tables
for len(tableReferences) > 0 {
createdTable := false
for table, referencedTables := range tableReferences {
if len(referencedTables) > 0 {
continue
}
logger.Debugf("queuing %s", table)
prioritzedTableNames = append(prioritzedTableNames, table.String())

// mark any table referring to the table as potential candidates
// for being created now
for otherTable, otherReferencedTables := range tableReferences {
if _, found := otherReferencedTables[table]; found {
delete(otherReferencedTables, table)
if len(otherReferencedTables) == 0 {
logger.Debugf("creation of %s unblocked creation of %s", table, otherTable)
}
}

}

delete(tableReferences, table)
createdTable = true
}

if !createdTable {
err = fmt.Errorf("failed creating tables: all %d remaining tables have foreign references", len(tableReferences))
return
}
}

return
}

func showDatabases(c *sql.DB) ([]string, error) {
rows, err := c.Query("show databases")
if err != nil {
Expand Down
51 changes: 51 additions & 0 deletions test/go/table_schema_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package test

import (
"fmt"
"strings"
"testing"

"github.com/Shopify/ghostferry"
Expand Down Expand Up @@ -399,6 +400,56 @@ func (this *TableSchemaCacheTestSuite) TestGetTableListWithPriorityIgnoreUnknown
this.Require().Equal(creationOrder[0], "schema.table2")
}

func (this *TableSchemaCacheTestSuite) TestGetTableCreationOrderWithoutForeignKeyConstraints() {
tables, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.Ferry.TableFilter, nil, nil, nil)
this.Require().Nil(err)

creationOrder, err := tables.GetTableCreationOrder(this.Ferry.SourceDB)
this.Require().Nil(err)

this.Require().Equal(len(creationOrder), 3)
this.Require().ElementsMatch(creationOrder, tables.AllTableNames())
}

func (this *TableSchemaCacheTestSuite) TestGetTableCreationOrderWithForeignKeyConstraints() {
_, err := this.Ferry.SourceDB.Exec(fmt.Sprintf("CREATE TABLE `%s`.`table1` (`id1` BIGINT, PRIMARY KEY (`id1`))", testhelpers.TestSchemaName))
this.Require().Nil(err)
_, err = this.Ferry.SourceDB.Exec(fmt.Sprintf("CREATE TABLE `%s`.`table2` (`id2` BIGINT, PRIMARY KEY (`id2`), CONSTRAINT `fkc2` FOREIGN KEY (`id2`) REFERENCES `table1` (`id1`))", testhelpers.TestSchemaName))
this.Require().Nil(err)
_, err = this.Ferry.SourceDB.Exec(fmt.Sprintf("CREATE TABLE `%s`.`table3` (`id3` BIGINT, PRIMARY KEY (`id3`), CONSTRAINT `fkc3_1` FOREIGN KEY (`id3`) REFERENCES `table1` (`id1`), CONSTRAINT `fkc3_2` FOREIGN KEY (`id3`) REFERENCES `table2` (`id2`))", testhelpers.TestSchemaName))
this.Require().Nil(err)

tables, err := ghostferry.LoadTables(this.Ferry.SourceDB, this.Ferry.TableFilter, nil, nil, nil)
this.Require().Nil(err)

creationOrder, err := tables.GetTableCreationOrder(this.Ferry.SourceDB)
this.Require().Nil(err)

// 3 tests from the base test setup plus 3 added above
this.Require().Equal(len(creationOrder), 6)
this.Require().ElementsMatch(creationOrder, tables.AllTableNames())

// verify the order: all we care for is that table1 is created before
// table2, which is created before table3
table1Index := -1
table2Index := -1
table3Index := -1
for i, tableName := range creationOrder {
if strings.HasSuffix(tableName, ".table1") {
table1Index = i
} else if strings.HasSuffix(tableName, ".table2") {
table2Index = i
} else if strings.HasSuffix(tableName, ".table3") {
table3Index = i
}
}
this.Require().NotEqual(table1Index, -1)
this.Require().NotEqual(table2Index, -1)
this.Require().NotEqual(table3Index, -1)
this.Require().True(table1Index < table2Index)
this.Require().True(table2Index < table3Index)
}

func TestTableSchemaCache(t *testing.T) {
testhelpers.SetupTest()
suite.Run(t, &TableSchemaCacheTestSuite{GhostferryUnitTestSuite: &testhelpers.GhostferryUnitTestSuite{}})
Expand Down
52 changes: 52 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"

"github.com/Masterminds/squirrel"
"github.com/siddontang/go-mysql/mysql"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -235,3 +236,54 @@ func CheckDbIsAReplica(db *sql.DB) (bool, error) {
err := row.Scan(&isReadOnly)
return isReadOnly, err
}

type QualifiedTableName struct {
SchemaName string
TableName string
}

func NewQualifiedTableName(schemaName, tableName string) QualifiedTableName {
return QualifiedTableName{
SchemaName: schemaName,
TableName: tableName,
}
}

func (n QualifiedTableName) String() string {
return fmt.Sprintf("%s.%s", n.SchemaName, n.TableName)
}

// define a simple set of table names
type TableForeignKeys map[QualifiedTableName]bool

func GetForeignKeyTablesOfTable(db *sql.DB, table QualifiedTableName) (TableForeignKeys, error) {
rows, err := squirrel.
Select("UNIQUE_CONSTRAINT_SCHEMA", "REFERENCED_TABLE_NAME").
Distinct().
From("information_schema.REFERENTIAL_CONSTRAINTS").
Where(
squirrel.And{
squirrel.Eq{"CONSTRAINT_SCHEMA": table.SchemaName},
squirrel.Eq{"TABLE_NAME": table.TableName},
},
).
RunWith(db.DB).
Query()
if err != nil {
return nil, err
}
defer rows.Close()

foreignKeys := make(TableForeignKeys)
for rows.Next() {
var targetSchema, targetTable string
err = rows.Scan(&targetSchema, &targetTable)
if err != nil {
return nil, err
}

foreignKeys[NewQualifiedTableName(targetSchema, targetTable)] = true
}

return foreignKeys, nil
}

0 comments on commit a001c61

Please sign in to comment.