Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#43726
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3pointer authored and ti-chi-bot committed Jun 28, 2023
1 parent 38ef1f9 commit d13b769
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 105 deletions.
2 changes: 2 additions & 0 deletions br/pkg/restore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ go_library(
"//ddl",
"//ddl/util",
"//domain",
"//domain/infosync",
"//kv",
"//meta",
"//parser/model",
"//parser/mysql",
"//sessionctx/variable",
"//statistics/handle",
"//store/helper",
"//store/pdtypes",
"//tablecodec",
"//util",
Expand Down
13 changes: 7 additions & 6 deletions br/pkg/restore/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Batcher struct {
// sendCh is for communiate with sendWorker.
sendCh chan<- SendType
// outCh is for output the restored table, so it can be sent to do something like checksum.
outCh chan<- CreatedTable
outCh chan<- *CreatedTable

updateCh glue.Progress

Expand Down Expand Up @@ -92,7 +92,8 @@ func (b *Batcher) contextCleaner(ctx context.Context, tables <-chan []CreatedTab
return
}
for _, tbl := range tbls {
b.outCh <- tbl
cloneTable := tbl
b.outCh <- &cloneTable
}
}
}
Expand All @@ -109,13 +110,13 @@ func NewBatcher(
manager ContextManager,
errCh chan<- error,
updateCh glue.Progress,
) (*Batcher, <-chan CreatedTable) {
output := make(chan CreatedTable, defaultChannelSize)
) (*Batcher, chan *CreatedTable) {
outCh := DefaultOutputTableChan()
sendChan := make(chan SendType, 2)
b := &Batcher{
rewriteRules: EmptyRewriteRule(),
sendErr: errCh,
outCh: output,
outCh: outCh,
sender: sender,
manager: manager,
sendCh: sendChan,
Expand All @@ -130,7 +131,7 @@ func NewBatcher(
go b.contextCleaner(ctx, restoredTables)
sink := chanTableSink{restoredTables, errCh}
sender.PutSink(sink)
return b, output
return b, outCh
}

// EnableAutoCommit enables the batcher commit batch periodically even batcher size isn't big enough.
Expand Down
240 changes: 158 additions & 82 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ import (
"github.com/pingcap/tidb/config"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/pdtypes"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -1605,77 +1607,83 @@ func (rc *Client) switchTiKVMode(ctx context.Context, mode import_sstpb.SwitchMo
return nil
}

func concurrentHandleTablesCh(
ctx context.Context,
inCh <-chan *CreatedTable,
outCh chan<- *CreatedTable,
errCh chan<- error,
workers *utils.WorkerPool,
processFun func(context.Context, *CreatedTable) error,
deferFun func()) {
eg, ectx := errgroup.WithContext(ctx)
defer func() {
if err := eg.Wait(); err != nil {
errCh <- err
}
close(outCh)
deferFun()
}()

for {
select {
// if we use ectx here, maybe canceled will mask real error.
case <-ctx.Done():
errCh <- ctx.Err()
case tbl, ok := <-inCh:
if !ok {
return
}
cloneTable := tbl
worker := workers.ApplyWorker()
eg.Go(func() error {
defer workers.RecycleWorker(worker)
err := processFun(ectx, cloneTable)
if err != nil {
return err
}
outCh <- cloneTable
return nil
})
}
}
}

// GoValidateChecksum forks a goroutine to validate checksum after restore.
// it returns a channel fires a struct{} when all things get done.
func (rc *Client) GoValidateChecksum(
ctx context.Context,
tableStream <-chan CreatedTable,
inCh <-chan *CreatedTable,
kvClient kv.Client,
errCh chan<- error,
updateCh glue.Progress,
concurrency uint,
) <-chan struct{} {
) <-chan *CreatedTable {
log.Info("Start to validate checksum")
outCh := make(chan struct{}, 1)
wg := new(sync.WaitGroup)
wg.Add(2)
loadStatCh := make(chan *CreatedTable, 1024)
// run the stat loader
go func() {
defer wg.Done()
rc.updateMetaAndLoadStats(ctx, loadStatCh)
}()
outCh := DefaultOutputTableChan()
workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum")
go func() {
eg, ectx := errgroup.WithContext(ctx)
go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error {
start := time.Now()
defer func() {
if err := eg.Wait(); err != nil {
errCh <- err
}
close(loadStatCh)
wg.Done()
elapsed := time.Since(start)
summary.CollectSuccessUnit("table checksum", 1, elapsed)
}()

for {
select {
// if we use ectx here, maybe canceled will mask real error.
case <-ctx.Done():
errCh <- ctx.Err()
case tbl, ok := <-tableStream:
if !ok {
return
}

workers.ApplyOnErrorGroup(eg, func() error {
start := time.Now()
defer func() {
elapsed := time.Since(start)
summary.CollectSuccessUnit("table checksum", 1, elapsed)
}()
err := rc.execChecksum(ectx, tbl, kvClient, concurrency, loadStatCh)
if err != nil {
return errors.Trace(err)
}
updateCh.Inc()
return nil
})
}
err := rc.execChecksum(c, tbl, kvClient, concurrency)
if err != nil {
return errors.Trace(err)
}
}()
go func() {
wg.Wait()
updateCh.Inc()
return nil
}, func() {
log.Info("all checksum ended")
close(outCh)
}()
})
return outCh
}

func (rc *Client) execChecksum(
ctx context.Context,
tbl CreatedTable,
tbl *CreatedTable,
kvClient kv.Client,
concurrency uint,
loadStatCh chan<- *CreatedTable,
) error {
logger := log.L().With(
zap.String("db", tbl.OldTable.DB.Name.O),
Expand Down Expand Up @@ -1742,48 +1750,108 @@ func (rc *Client) execChecksum(
return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum")
}
logger.Info("success in validate checksum")
loadStatCh <- &tbl
return nil
}

func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *CreatedTable) {
for {
select {
case <-ctx.Done():
return
case tbl, ok := <-input:
if !ok {
return
func (rc *Client) GoUpdateMetaAndLoadStats(ctx context.Context, inCh <-chan *CreatedTable, errCh chan<- error) chan *CreatedTable {
log.Info("Start to update meta then load stats")
outCh := DefaultOutputTableChan()
workers := utils.NewWorkerPool(1, "UpdateStats")
go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error {
oldTable := tbl.OldTable
// Not need to return err when failed because of update analysis-meta
restoreTS, err := rc.GetTSWithRetry(ctx)
if err != nil {
log.Error("getTS failed", zap.Error(err))
} else {
log.Info("start update metas",
zap.Stringer("table", oldTable.Info.Name),
zap.Stringer("db", oldTable.DB.Name))
err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, oldTable.TotalKvs)
if err != nil {
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err))
}
}

// Not need to return err when failed because of update analysis-meta
restoreTS, err := rc.GetTSWithRetry(ctx)
if err != nil {
log.Error("getTS failed", zap.Error(err))
} else {
err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, tbl.OldTable.TotalKvs)
if err != nil {
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err))
}
if oldTable.Stats != nil {
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", oldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), oldTable.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", oldTable.Stats), zap.Error(err))
}
log.Info("restore stat done",
zap.Stringer("table", oldTable.Info.Name),
zap.Stringer("db", oldTable.DB.Name),
zap.Duration("cost", time.Since(start)))
}
return nil
}, func() {
log.Info("all stats updated")
})
return outCh
}

table := tbl.OldTable
if table.Stats != nil {
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
}
log.Info("restore stat done",
zap.String("table", table.Info.Name.L),
zap.String("db", table.DB.Name.L),
zap.Duration("cost", time.Since(start)))
func (rc *Client) GoWaitTiFlashReady(ctx context.Context, inCh <-chan *CreatedTable, updateCh glue.Progress, errCh chan<- error) chan *CreatedTable {
log.Info("Start to wait tiflash replica sync")
outCh := DefaultOutputTableChan()
workers := utils.NewWorkerPool(4, "WaitForTiflashReady")
// TODO support tiflash store changes
tikvStats, err := infosync.GetTiFlashStoresStat(context.Background())
if err != nil {
errCh <- err
}
tiFlashStores := make(map[int64]helper.StoreStat)
for _, store := range tikvStats.Stores {
for _, l := range store.Store.Labels {
if l.Key == "engine" && l.Value == "tiflash" {
tiFlashStores[store.Store.ID] = store
}
}
}
go concurrentHandleTablesCh(ctx, inCh, outCh, errCh, workers, func(c context.Context, tbl *CreatedTable) error {
if tbl.Table != nil && tbl.Table.TiFlashReplica == nil {
log.Info("table has no tiflash replica",
zap.Stringer("table", tbl.OldTable.Info.Name),
zap.Stringer("db", tbl.OldTable.DB.Name))
updateCh.Inc()
return nil
}
if rc.dom != nil {
log.Info("table has tiflash replica, start sync..",
zap.Stringer("table", tbl.OldTable.Info.Name),
zap.Stringer("db", tbl.OldTable.DB.Name))
for {
progress, err := infosync.CalculateTiFlashProgress(tbl.Table.ID, tbl.Table.TiFlashReplica.Count, tiFlashStores)
if err != nil {
log.Warn("failed to get tiflash replica progress, wait for next retry", zap.Error(err))
time.Sleep(time.Second)
continue
}
// check until progress is 1
if progress == 1 {
log.Info("tiflash replica synced",
zap.Stringer("table", tbl.OldTable.Info.Name),
zap.Stringer("db", tbl.OldTable.DB.Name))
break
}
// just wait for next check
// tiflash check the progress every 2s
// we can wait 2.5x times
time.Sleep(5 * time.Second)
}
} else {
// unreachable, current we have initial domain in mgr.
log.Fatal("unreachable, domain is nil")
}
updateCh.Inc()
return nil
}, func() {
log.Info("all tiflash replica synced")
})
return outCh
}

// called by failpoint, only used for test
Expand Down Expand Up @@ -2120,9 +2188,17 @@ func (rc *Client) PreCheckTableTiFlashReplica(
return err
}
for _, table := range tables {
<<<<<<< HEAD
if recorder != nil ||
(table.Info.TiFlashReplica != nil && table.Info.TiFlashReplica.Count > tiFlashStoreCount) {
if recorder != nil && table.Info.TiFlashReplica != nil {
=======
if table.Info.TiFlashReplica != nil {
// we should not set available to true. because we cannot guarantee the raft log lag of tiflash when restore finished.
// just let tiflash ticker set it by checking lag of all related regions.
table.Info.TiFlashReplica.Available = false
if recorder != nil {
>>>>>>> 0f20315681d (br: pipeline wait tiflash synced (#43726))
recorder.AddTable(table.Info.ID, *table.Info.TiFlashReplica)
}
// we cannot satisfy TiFlash replica in restore cluster. so we should
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/restore/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ type CreatedTable struct {
OldTable *metautil.Table
}

func DefaultOutputTableChan() chan *CreatedTable {
return make(chan *CreatedTable, defaultChannelSize)
}

// TableWithRange is a CreatedTable that has been bind to some of key ranges.
type TableWithRange struct {
CreatedTable
Expand Down
Loading

0 comments on commit d13b769

Please sign in to comment.