Skip to content

Commit

Permalink
Merge branch 'main' into mtocker-add-progress-api
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo authored Apr 3, 2024
2 parents cd19792 + 22bfae7 commit 3ecc7d1
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 94 deletions.
37 changes: 19 additions & 18 deletions pkg/checksum/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package checksum
import (
"context"
"testing"
"time"

"github.com/cashapp/spirit/pkg/testutils"

Expand Down Expand Up @@ -36,9 +37,9 @@ func TestBasicChecksum(t *testing.T) {
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, feed.Run())

Expand Down Expand Up @@ -71,9 +72,9 @@ func TestBasicValidation(t *testing.T) {
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, feed.Run())

Expand Down Expand Up @@ -108,9 +109,9 @@ func TestFixCorrupt(t *testing.T) {
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, feed.Run())

Expand Down Expand Up @@ -151,9 +152,9 @@ func TestCorruptChecksum(t *testing.T) {
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, feed.Run())

Expand Down Expand Up @@ -183,9 +184,9 @@ func TestBoundaryCases(t *testing.T) {
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, feed.Run())

Expand Down Expand Up @@ -247,9 +248,9 @@ func TestChangeDataTypeDatetime(t *testing.T) {
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, feed.Run())

Expand Down
19 changes: 10 additions & 9 deletions pkg/migration/cutover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"testing"
"time"

"github.com/cashapp/spirit/pkg/dbconn"
"github.com/cashapp/spirit/pkg/repl"
Expand Down Expand Up @@ -44,9 +45,9 @@ func TestCutOver(t *testing.T) {
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t1new, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
// the feed must be started.
assert.NoError(t, feed.Run())
Expand Down Expand Up @@ -103,9 +104,9 @@ func TestMDLLockFails(t *testing.T) {
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t1new, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
// the feed must be started.
assert.NoError(t, feed.Run())
Expand Down Expand Up @@ -141,9 +142,9 @@ func TestInvalidOptions(t *testing.T) {
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t1new, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
_, err = NewCutOver(db, nil, t1new, feed, dbconn.NewDBConfig(), logger)
assert.Error(t, err)
Expand Down
12 changes: 6 additions & 6 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,9 @@ func (r *Runner) setup(ctx context.Context) error {
return err
}
r.replClient = repl.NewClient(r.db, r.migration.Host, r.table, r.newTable, r.migration.Username, r.migration.Password, &repl.ClientConfig{
Logger: r.logger,
Concurrency: r.migration.Threads,
BatchSize: repl.DefaultBatchSize,
Logger: r.logger,
Concurrency: r.migration.Threads,
TargetBatchTime: r.migration.TargetChunkTime,
})
// Start the binary log feed now
if err := r.replClient.Run(); err != nil {
Expand Down Expand Up @@ -747,9 +747,9 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
// Set the binlog position.
// Create a binlog subscriber
r.replClient = repl.NewClient(r.db, r.migration.Host, r.table, r.newTable, r.migration.Username, r.migration.Password, &repl.ClientConfig{
Logger: r.logger,
Concurrency: r.migration.Threads,
BatchSize: repl.DefaultBatchSize,
Logger: r.logger,
Concurrency: r.migration.Threads,
TargetBatchTime: r.migration.TargetChunkTime,
})
r.replClient.SetPos(mysql.Position{
Name: binlogName,
Expand Down
42 changes: 21 additions & 21 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,9 +761,9 @@ func TestCheckpoint(t *testing.T) {
assert.NoError(t, r.alterNewTable(context.TODO()))
assert.NoError(t, r.createCheckpointTable(context.TODO()))
r.replClient = repl.NewClient(r.db, r.migration.Host, r.table, r.newTable, r.migration.Username, r.migration.Password, &repl.ClientConfig{
Logger: logrus.New(), // don't use the logger for migration since we feed status to it.
Concurrency: 4,
BatchSize: 10000,
Logger: logrus.New(), // don't use the logger for migration since we feed status to it.
Concurrency: 4,
TargetBatchTime: r.migration.TargetChunkTime,
})
r.copier, err = row.NewCopier(r.db, r.table, r.newTable, row.NewCopierDefaultConfig())
assert.NoError(t, err)
Expand Down Expand Up @@ -909,9 +909,9 @@ func TestCheckpointRestore(t *testing.T) {
assert.NoError(t, r.createCheckpointTable(context.TODO()))

r.replClient = repl.NewClient(r.db, r.migration.Host, r.table, r.newTable, r.migration.Username, r.migration.Password, &repl.ClientConfig{
Logger: logrus.New(),
Concurrency: 4,
BatchSize: 10000,
Logger: logrus.New(),
Concurrency: 4,
TargetBatchTime: r.migration.TargetChunkTime,
})
r.copier, err = row.NewCopier(r.db, r.table, r.newTable, row.NewCopierDefaultConfig())
assert.NoError(t, err)
Expand Down Expand Up @@ -995,9 +995,9 @@ func TestCheckpointDifferentRestoreOptions(t *testing.T) {
assert.NoError(t, m.createCheckpointTable(context.TODO()))
logger := logrus.New()
m.replClient = repl.NewClient(m.db, m.migration.Host, m.table, m.newTable, m.migration.Username, m.migration.Password, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: m.migration.TargetChunkTime,
})
m.copier, err = row.NewCopier(m.db, m.table, m.newTable, row.NewCopierDefaultConfig())
assert.NoError(t, err)
Expand Down Expand Up @@ -1191,9 +1191,9 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) {
assert.NoError(t, m.createCheckpointTable(context.TODO()))
logger := logrus.New()
m.replClient = repl.NewClient(m.db, m.migration.Host, m.table, m.newTable, m.migration.Username, m.migration.Password, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: m.migration.TargetChunkTime,
})
m.copier, err = row.NewCopier(m.db, m.table, m.newTable, &row.CopierConfig{
Concurrency: m.migration.Threads,
Expand Down Expand Up @@ -1319,9 +1319,9 @@ func TestE2EBinlogSubscribingNonCompositeKey(t *testing.T) {
assert.NoError(t, m.createCheckpointTable(context.TODO()))
logger := logrus.New()
m.replClient = repl.NewClient(m.db, m.migration.Host, m.table, m.newTable, m.migration.Username, m.migration.Password, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: 10000,
Logger: logger,
Concurrency: 4,
TargetBatchTime: m.migration.TargetChunkTime,
})
m.copier, err = row.NewCopier(m.db, m.table, m.newTable, &row.CopierConfig{
Concurrency: m.migration.Threads,
Expand Down Expand Up @@ -1920,9 +1920,9 @@ func TestE2ERogueValues(t *testing.T) {
assert.NoError(t, m.createCheckpointTable(context.TODO()))
logger := logrus.New()
m.replClient = repl.NewClient(m.db, m.migration.Host, m.table, m.newTable, m.migration.Username, m.migration.Password, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: repl.DefaultBatchSize,
Logger: logger,
Concurrency: 4,
TargetBatchTime: m.migration.TargetChunkTime,
})
m.copier, err = row.NewCopier(m.db, m.table, m.newTable, &row.CopierConfig{
Concurrency: m.migration.Threads,
Expand Down Expand Up @@ -2082,9 +2082,9 @@ func TestResumeFromCheckpointPhantom(t *testing.T) {
assert.NoError(t, m.createCheckpointTable(ctx))
logger := logrus.New()
m.replClient = repl.NewClient(m.db, m.migration.Host, m.table, m.newTable, m.migration.Username, m.migration.Password, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
BatchSize: repl.DefaultBatchSize,
Logger: logger,
Concurrency: 4,
TargetBatchTime: m.migration.TargetChunkTime,
})
m.copier, err = row.NewCopier(m.db, m.table, m.newTable, &row.CopierConfig{
Concurrency: m.migration.Threads,
Expand Down
Loading

0 comments on commit 3ecc7d1

Please sign in to comment.