diff --git a/br/pkg/checkpoint/backup.go b/br/pkg/checkpoint/backup.go index 2725cdc65666a..4d517009937d3 100644 --- a/br/pkg/checkpoint/backup.go +++ b/br/pkg/checkpoint/backup.go @@ -64,7 +64,7 @@ func StartCheckpointBackupRunnerForTest( runner := newCheckpointRunner[BackupKeyType, BackupValueType]( checkpointStorage, cipher, valueMarshalerForBackup) - runner.startCheckpointMainLoop(ctx, tick, tick, tick) + runner.startCheckpointMainLoop(ctx, tick, tick, tick, tick) return runner, nil } @@ -84,8 +84,9 @@ func StartCheckpointRunnerForBackup( runner.startCheckpointMainLoop( ctx, defaultTickDurationForFlush, - defaultTckDurationForChecksum, + defaultTickDurationForChecksum, defaultTickDurationForLock, + defaultRetryDuration, ) return runner, nil } diff --git a/br/pkg/checkpoint/checkpoint.go b/br/pkg/checkpoint/checkpoint.go index d5352380e867b..13ff1c60fca9d 100644 --- a/br/pkg/checkpoint/checkpoint.go +++ b/br/pkg/checkpoint/checkpoint.go @@ -51,10 +51,12 @@ const MaxChecksumTotalCost float64 = 60.0 const defaultTickDurationForFlush = 30 * time.Second -const defaultTckDurationForChecksum = 5 * time.Second +const defaultTickDurationForChecksum = 5 * time.Second const defaultTickDurationForLock = 4 * time.Minute +const defaultRetryDuration = 3 * time.Second + const lockTimeToLive = 5 * time.Minute type KeyType interface { @@ -305,12 +307,73 @@ func (r *CheckpointRunner[K, V]) setLock(ctx context.Context, errCh chan error) return nil } +type flusher[K KeyType, V ValueType] struct { + incompleteMetas []map[K]*RangeGroup[K, V] + incompleteChecksums []ChecksumItems +} + +func newFlusher[K KeyType, V ValueType]() *flusher[K, V] { + return &flusher[K, V]{ + incompleteMetas: make([]map[K]*RangeGroup[K, V], 0), + incompleteChecksums: make([]ChecksumItems, 0), + } +} + +func (f *flusher[K, V]) doFlush(ctx context.Context, r *CheckpointRunner[K, V], meta map[K]*RangeGroup[K, V]) { + if err := r.doFlush(ctx, meta); err != nil { + log.Warn("failed to flush checkpoint data", zap.Error(err)) + f.incompleteMetas = append(f.incompleteMetas, meta) + } +} + +func (f *flusher[K, V]) doChecksumFlush(ctx context.Context, r *CheckpointRunner[K, V], checksums ChecksumItems) { + if err := r.doChecksumFlush(ctx, checksums); err != nil { + log.Warn("failed to flush checkpoint checksum", zap.Error(err)) + f.incompleteChecksums = append(f.incompleteChecksums, checksums) + } +} + +func (f *flusher[K, V]) flushOneIncomplete(ctx context.Context, r *CheckpointRunner[K, V]) { + if len(f.incompleteMetas) > 0 { + if err := r.doFlush(ctx, f.incompleteMetas[0]); err != nil { + log.Warn("failed to flush checkpoint data", zap.Error(err)) + return + } + f.incompleteMetas = f.incompleteMetas[1:] + } else if len(f.incompleteChecksums) > 0 { + if err := r.doChecksumFlush(ctx, f.incompleteChecksums[0]); err != nil { + log.Warn("failed to flush checkpoint data", zap.Error(err)) + return + } + f.incompleteChecksums = f.incompleteChecksums[1:] + } +} + +func (f *flusher[K, V]) flushAllIncompleteMeta(ctx context.Context, r *CheckpointRunner[K, V]) { + for _, meta := range f.incompleteMetas { + if err := r.doFlush(ctx, meta); err != nil { + log.Warn("failed to flush checkpoint data", zap.Error(err)) + } + } +} + +func (f *flusher[K, V]) flushAllIncompleteChecksum(ctx context.Context, r *CheckpointRunner[K, V]) { + for _, checksums := range f.incompleteChecksums { + if err := r.doChecksumFlush(ctx, checksums); err != nil { + log.Warn("failed to flush checkpoint checksum", zap.Error(err)) + } + } +} + // start a goroutine to flush the meta, which is sent from `checkpoint looper`, to the external storage -func (r *CheckpointRunner[K, V]) startCheckpointFlushLoop(ctx context.Context, wg *sync.WaitGroup) chan error { +func (r *CheckpointRunner[K, V]) startCheckpointFlushLoop(ctx context.Context, wg *sync.WaitGroup, retryDuration time.Duration) chan error { errCh := make(chan error, 1) wg.Add(1) flushWorker := func(ctx context.Context, errCh chan error) { defer wg.Done() + flusher := newFlusher[K, V]() + retryTicker := time.NewTicker(retryDuration) + defer retryTicker.Stop() for { select { case <-ctx.Done(): @@ -320,22 +383,18 @@ func (r *CheckpointRunner[K, V]) startCheckpointFlushLoop(ctx context.Context, w return case meta, ok := <-r.metaCh: if !ok { + flusher.flushAllIncompleteMeta(ctx, r) log.Info("stop checkpoint flush worker") return } - if err := r.doFlush(ctx, meta); err != nil { - errCh <- errors.Annotate(err, "failed to flush checkpoint data.") - return - } + flusher.doFlush(ctx, r, meta) case checksums, ok := <-r.checksumMetaCh: if !ok { + flusher.flushAllIncompleteChecksum(ctx, r) log.Info("stop checkpoint flush worker") return } - if err := r.doChecksumFlush(ctx, checksums); err != nil { - errCh <- errors.Annotate(err, "failed to flush checkpoint checksum.") - return - } + flusher.doChecksumFlush(ctx, r, checksums) case _, ok := <-r.lockCh: if !ok { log.Info("stop checkpoint flush worker") @@ -345,6 +404,8 @@ func (r *CheckpointRunner[K, V]) startCheckpointFlushLoop(ctx context.Context, w errCh <- errors.Annotate(err, "failed to update checkpoint lock.") return } + case <-retryTicker.C: + flusher.flushOneIncomplete(ctx, r) } } } @@ -370,7 +431,8 @@ func (r *CheckpointRunner[K, V]) startCheckpointMainLoop( ctx context.Context, tickDurationForFlush, tickDurationForChecksum, - tickDurationForLock time.Duration, + tickDurationForLock, + retryDuration time.Duration, ) { failpoint.Inject("checkpoint-more-quickly-flush", func(_ failpoint.Value) { tickDurationForChecksum = 1 * time.Second @@ -390,7 +452,7 @@ func (r *CheckpointRunner[K, V]) startCheckpointMainLoop( cctx, cancel := context.WithCancel(ctx) defer cancel() var wg sync.WaitGroup - errCh := r.startCheckpointFlushLoop(cctx, &wg) + errCh := r.startCheckpointFlushLoop(cctx, &wg, retryDuration) flushTicker := time.NewTicker(tickDurationForFlush) defer flushTicker.Stop() checksumTicker := time.NewTicker(tickDurationForChecksum) diff --git a/br/pkg/checkpoint/checkpoint_test.go b/br/pkg/checkpoint/checkpoint_test.go index 1904017feb47e..457463baa8f44 100644 --- a/br/pkg/checkpoint/checkpoint_test.go +++ b/br/pkg/checkpoint/checkpoint_test.go @@ -17,10 +17,12 @@ package checkpoint_test import ( "context" "encoding/json" + "fmt" "os" "testing" "time" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" "github.com/pingcap/tidb/br/pkg/checkpoint" @@ -278,7 +280,7 @@ func TestCheckpointRestoreRunner(t *testing.T) { err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 5*time.Second) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 5*time.Second, 3*time.Second) require.NoError(t, err) data := map[string]struct { @@ -360,6 +362,85 @@ func TestCheckpointRestoreRunner(t *testing.T) { require.False(t, exists) } +func TestCheckpointRunnerRetry(t *testing.T) { + ctx := context.Background() + s := utiltest.CreateRestoreSchemaSuite(t) + g := gluetidb.New() + se, err := g.CreateSession(s.Mock.Storage) + require.NoError(t, err) + + err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{}) + require.NoError(t, err) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond) + require.NoError(t, err) + + err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes", "return(true)") + require.NoError(t, err) + defer func() { + err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes") + require.NoError(t, err) + }() + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123") + require.NoError(t, err) + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456") + require.NoError(t, err) + err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1) + require.NoError(t, err) + err = checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2) + time.Sleep(time.Second) + err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes") + require.NoError(t, err) + checkpointRunner.WaitForFinish(ctx, true) + recordSet := make(map[string]int) + _, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), + func(tableID int64, rangeKey checkpoint.RestoreValueType) { + recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1 + }) + require.NoError(t, err) + require.LessOrEqual(t, 1, recordSet["1_{123}"]) + require.LessOrEqual(t, 1, recordSet["2_{456}"]) + items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1") + require.Equal(t, fmt.Sprintf("%d_%d_%d", items[2].Crc64xor, items[2].TotalBytes, items[2].TotalKvs), "2_2_2") +} + +func TestCheckpointRunnerNoRetry(t *testing.T) { + ctx := context.Background() + s := utiltest.CreateRestoreSchemaSuite(t) + g := gluetidb.New() + se, err := g.CreateSession(s.Mock.Storage) + require.NoError(t, err) + + err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{}) + require.NoError(t, err) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond) + require.NoError(t, err) + + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123") + require.NoError(t, err) + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456") + require.NoError(t, err) + err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1) + require.NoError(t, err) + err = checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2) + require.NoError(t, err) + time.Sleep(time.Second) + checkpointRunner.WaitForFinish(ctx, true) + recordSet := make(map[string]int) + _, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), + func(tableID int64, rangeKey checkpoint.RestoreValueType) { + recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1 + }) + require.NoError(t, err) + require.Equal(t, 1, recordSet["1_{123}"]) + require.Equal(t, 1, recordSet["2_{456}"]) + items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1") + require.Equal(t, fmt.Sprintf("%d_%d_%d", items[2].Crc64xor, items[2].TotalBytes, items[2].TotalKvs), "2_2_2") +} + func TestCheckpointLogRestoreRunner(t *testing.T) { ctx := context.Background() s := utiltest.CreateRestoreSchemaSuite(t) diff --git a/br/pkg/checkpoint/log_restore.go b/br/pkg/checkpoint/log_restore.go index 138eb86965eac..9a4640b2fdb3a 100644 --- a/br/pkg/checkpoint/log_restore.go +++ b/br/pkg/checkpoint/log_restore.go @@ -116,7 +116,7 @@ func StartCheckpointLogRestoreRunnerForTest( newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName), nil, valueMarshalerForLogRestore) - runner.startCheckpointMainLoop(ctx, tick, tick, 0) + runner.startCheckpointMainLoop(ctx, tick, tick, 0, defaultRetryDuration) return runner, nil } @@ -129,7 +129,7 @@ func StartCheckpointRunnerForLogRestore( nil, valueMarshalerForLogRestore) // for restore, no need to set lock - runner.startCheckpointMainLoop(ctx, defaultTickDurationForFlush, defaultTckDurationForChecksum, 0) + runner.startCheckpointMainLoop(ctx, defaultTickDurationForFlush, defaultTickDurationForChecksum, 0, defaultRetryDuration) return runner, nil } diff --git a/br/pkg/checkpoint/restore.go b/br/pkg/checkpoint/restore.go index 971938ba45e5b..3091a17e19144 100644 --- a/br/pkg/checkpoint/restore.go +++ b/br/pkg/checkpoint/restore.go @@ -46,12 +46,13 @@ func StartCheckpointRestoreRunnerForTest( ctx context.Context, se glue.Session, tick time.Duration, + retryDuration time.Duration, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) { runner := newCheckpointRunner[RestoreKeyType, RestoreValueType]( newTableCheckpointStorage(se, SnapshotRestoreCheckpointDatabaseName), nil, valueMarshalerForRestore) - runner.startCheckpointMainLoop(ctx, tick, tick, 0) + runner.startCheckpointMainLoop(ctx, tick, tick, 0, retryDuration) return runner, nil } @@ -64,7 +65,7 @@ func StartCheckpointRunnerForRestore( nil, valueMarshalerForRestore) // for restore, no need to set lock - runner.startCheckpointMainLoop(ctx, defaultTickDurationForFlush, defaultTckDurationForChecksum, 0) + runner.startCheckpointMainLoop(ctx, defaultTickDurationForFlush, defaultTickDurationForChecksum, 0, defaultRetryDuration) return runner, nil }