Skip to content

Commit

Permalink
Showing 3 changed files with 36 additions and 2 deletions.
5 changes: 5 additions & 0 deletions copydb/config.go
Original file line number Diff line number Diff line change
@@ -70,6 +70,11 @@ type Config struct {

// The duration to wait for the replication to catchup before aborting. Only use if RunFerryFromReplica is true.
WaitForReplicationTimeout string

// Ghostferry will by default create tables on your target and fail if these already exist.
// This allows pre-existing tables on your target, schemas and data compatibility won't be validated.
// Use at your own risk.
AllowExistingTargetTable bool
}

func (c *Config) InitializeAndValidateConfig() error {
10 changes: 8 additions & 2 deletions copydb/copydb.go
Original file line number Diff line number Diff line change
@@ -159,7 +159,7 @@ func (this *CopydbFerry) createDatabaseIfExistsOnTarget(database string) error {
}

func (this *CopydbFerry) createTableOnTarget(database, table string) error {
var tableNameAgain, createTableQuery string
var tableNameAgain, createTableQuery, createStatement string

r := this.Ferry.SourceDB.QueryRow(fmt.Sprintf("SHOW CREATE TABLE `%s`.`%s`", database, table))
err := r.Scan(&tableNameAgain, &createTableQuery)
@@ -180,10 +180,16 @@ func (this *CopydbFerry) createTableOnTarget(database, table string) error {
tableNameAgain = targetTableName
}

if this.config.AllowExistingTargetTable {
createStatement = "CREATE TABLE IF NOT EXISTS `%s`.`%s`"
} else {
createStatement = "CREATE TABLE `%s`.`%s`"
}

createTableQueryReplaced := strings.Replace(
createTableQuery,
fmt.Sprintf("CREATE TABLE `%s`", table),
fmt.Sprintf("CREATE TABLE `%s`.`%s`", database, tableNameAgain),
fmt.Sprintf(createStatement, database, tableNameAgain),
1,
)

23 changes: 23 additions & 0 deletions copydb/test/copydb_test.go
Original file line number Diff line number Diff line change
@@ -85,6 +85,29 @@ func (t *CopydbTestSuite) TestCreateDatabaseAndTableWithRewrites() {
t.Require().Equal(renamedTableName, value)
}

func (t *CopydbTestSuite) TestCreateDatabasesAndTablesAlreadyExists() {
var err error
t.copydbFerry.Ferry.Tables, err = ghostferry.LoadTables(t.ferry.SourceDB, t.copydbFerry.Ferry.TableFilter, nil, nil, nil)
t.Require().Nil(err)

testhelpers.SeedInitialData(t.ferry.TargetDB, renamedSchemaName, renamedTableName, 1)

err = t.copydbFerry.CreateDatabasesAndTables()
t.Require().EqualError(err, "Error 1050: Table 'test_table_1_renamed' already exists")
}

func (t *CopydbTestSuite) TestCreateDatabasesAndTablesAlreadyExistsAllowed() {
var err error
t.copydbFerry.Ferry.Tables, err = ghostferry.LoadTables(t.ferry.SourceDB, t.copydbFerry.Ferry.TableFilter, nil, nil, nil)
t.Require().Nil(err)

testhelpers.SeedInitialData(t.ferry.TargetDB, renamedSchemaName, renamedTableName, 1)
t.copydbConfig.AllowExistingTargetTable = true

err = t.copydbFerry.CreateDatabasesAndTables()
t.Require().Nil(err)
}

func (t *CopydbTestSuite) TestCreateDatabaseAndTableWithOrdering() {
// NOTE: Here we just ensure passing a table does not cause issues in the
// invocation. A more thorough test is done in the table-schema tests

0 comments on commit ee5d539

Please sign in to comment.