diff --git a/go/base/context.go b/go/base/context.go index 2518ecf4e..f90171698 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -194,6 +194,7 @@ type MigrationContext struct { CurrentLag int64 currentProgress uint64 etaNanoseonds int64 + EtaRowsPerSecond int64 ThrottleHTTPIntervalMillis int64 ThrottleHTTPStatusCode int64 ThrottleHTTPTimeoutMillis int64 diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b6bf3fc5e..751b54a08 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -819,11 +819,18 @@ func (this *Migrator) initiateStatus() { this.printStatus(ForcePrintStatusAndHintRule) ticker := time.NewTicker(time.Second) defer ticker.Stop() + var previousCount int64 for range ticker.C { if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } go this.printStatus(HeuristicPrintStatusRule) + totalCopied := atomic.LoadInt64(&this.migrationContext.TotalRowsCopied) + if previousCount > 0 { + copiedThisLoop := totalCopied - previousCount + atomic.StoreInt64(&this.migrationContext.EtaRowsPerSecond, copiedThisLoop) + } + previousCount = totalCopied } } @@ -925,9 +932,20 @@ func (this *Migrator) getMigrationETA(rowsEstimate int64) (eta string, duration duration = 0 } else if progressPct >= 0.1 { totalRowsCopied := this.migrationContext.GetTotalRowsCopied() - elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds() - totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied) - etaSeconds := totalExpectedSeconds - elapsedRowCopySeconds + etaRowsPerSecond := atomic.LoadInt64(&this.migrationContext.EtaRowsPerSecond) + var etaSeconds float64 + // If there is data available on our current row-copies-per-second rate, use it. + // Otherwise we can fallback to the total elapsed time and extrapolate. + // This is going to be less accurate on a longer copy as the insert rate + // will tend to slow down. + if etaRowsPerSecond > 0 { + remainingRows := float64(rowsEstimate) - float64(totalRowsCopied) + etaSeconds = remainingRows / float64(etaRowsPerSecond) + } else { + elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds() + totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied) + etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds + } if etaSeconds >= 0 { duration = time.Duration(etaSeconds) * time.Second } else { diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index a2a096e69..dfdfe5390 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -215,6 +215,15 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) { require.Equal(t, "4h29m44s", eta) require.Equal(t, "4h29m44s", etaDuration.String()) } + { + // Test using rows-per-second added data. + migrationContext.TotalRowsCopied = 456 + migrationContext.EtaRowsPerSecond = 100 + state, eta, etaDuration := migrator.getMigrationStateAndETA(123456) + require.Equal(t, "migrating", state) + require.Equal(t, "20m30s", eta) + require.Equal(t, "20m30s", etaDuration.String()) + } { migrationContext.TotalRowsCopied = 456 state, eta, etaDuration := migrator.getMigrationStateAndETA(456)