Skip to content

Commit

Permalink
ddl: record get owner TS and compare it before runReorgJob quit (#55049
Browse files Browse the repository at this point in the history
…) (#55111)

close #54897
  • Loading branch information
ti-chi-bot authored Jul 31, 2024
1 parent 6990c1f commit 70bfd90
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
6 changes: 1 addition & 5 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,12 +674,8 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
totalAddedCount := job.GetRowCount()

startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey

if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
if errors.ErrorEqual(err, dbterror.ErrNotOwner) {
// This instance is not DDL owner, we remove reorgctx proactively
// to avoid being used later.
dc.removeReorgCtx(reorgInfo.ID)
}
return errors.Trace(err)
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,19 @@ type reorgContexts struct {
sync.RWMutex
// reorgCtxMap maps job ID to reorg context.
reorgCtxMap map[int64]*reorgCtx
beOwnerTS int64
}

func (r *reorgContexts) getOwnerTS() int64 {
r.RLock()
defer r.RUnlock()
return r.beOwnerTS
}

func (r *reorgContexts) setOwnerTS(ts int64) {
r.Lock()
r.beOwnerTS = ts
r.Unlock()
}

func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
Expand All @@ -510,7 +523,7 @@ func (dc *ddlCtx) newReorgCtx(jobID int64, rowCount int64) *reorgCtx {
return existedRC
}
rc := &reorgCtx{}
rc.doneCh = make(chan error, 1)
rc.doneCh = make(chan reorgFnResult, 1)
// initial reorgCtx
rc.setRowCount(rowCount)
rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
Expand Down Expand Up @@ -746,6 +759,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
if err != nil {
logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
}
d.reorgCtx.setOwnerTS(time.Now().Unix())
d.runningJobs.clear()
})

Expand Down
24 changes: 21 additions & 3 deletions pkg/ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type reorgCtx struct {
// If the reorganization job is done, we will use this channel to notify outer.
// TODO: Now we use goroutine to simulate reorganization jobs, later we may
// use a persistent job list.
doneCh chan error
doneCh chan reorgFnResult
// rowCount is used to simulate a job's row count.
rowCount int64
jobState model.JobState
Expand All @@ -74,6 +74,13 @@ type reorgCtx struct {
references atomicutil.Int32
}

// reorgFnResult records the DDL owner TS before executing reorg function, in order to help
// receiver determine if the result is from reorg function of previous DDL owner in this instance.
type reorgFnResult struct {
ownerTS int64
err error
}

// newContext gets a context. It is only used for adding column in reorganization state.
func newContext(store kv.Storage) sessionctx.Context {
c := mock.NewContext()
Expand Down Expand Up @@ -200,11 +207,13 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo,
return dbterror.ErrCancelledDDLJob
}

beOwnerTS := w.ddlCtx.reorgCtx.getOwnerTS()
rc = w.newReorgCtx(reorgInfo.Job.ID, reorgInfo.Job.GetRowCount())
w.wg.Add(1)
go func() {
defer w.wg.Done()
rc.doneCh <- f()
err := f()
rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err}
}()
}

Expand All @@ -220,7 +229,16 @@ func (w *worker) runReorgJob(reorgInfo *reorgInfo, tblInfo *model.TableInfo,

// wait reorganization job done or timeout
select {
case err := <-rc.doneCh:
case res := <-rc.doneCh:
err := res.err
curTS := w.ddlCtx.reorgCtx.getOwnerTS()
if res.ownerTS != curTS {
d.removeReorgCtx(job.ID)
logutil.BgLogger().Warn("owner ts mismatch, return timeout error and retry",
zap.Int64("prevTS", res.ownerTS),
zap.Int64("curTS", curTS))
return dbterror.ErrWaitReorgTimeout
}
// Since job is cancelled,we don't care about its partial counts.
if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) {
d.removeReorgCtx(job.ID)
Expand Down

0 comments on commit 70bfd90

Please sign in to comment.