From 69f56d4a377d37051093b0cb057d4c310a9e38a0 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 23 Apr 2021 16:21:31 +0800 Subject: [PATCH 1/7] pessimistic: record latest done ddl to handle conflict --- dm/common/common.go | 5 +- dm/master/server_test.go | 2 +- dm/master/shardddl/pessimist.go | 48 +++++++++++----- dm/master/shardddl/pessimist_test.go | 6 +- go.sum | 3 - pkg/shardddl/pessimism/ddls.go | 73 ++++++++++++++++++++++++ pkg/shardddl/pessimism/info_test.go | 2 +- pkg/shardddl/pessimism/keeper.go | 37 ++++++++++-- pkg/shardddl/pessimism/keeper_test.go | 6 +- pkg/shardddl/pessimism/lock.go | 41 ++++++++++++- pkg/shardddl/pessimism/lock_test.go | 10 ++-- pkg/shardddl/pessimism/operation.go | 4 +- pkg/shardddl/pessimism/operation_test.go | 12 ++-- pkg/shardddl/pessimism/ops.go | 8 ++- pkg/shardddl/pessimism/ops_test.go | 2 +- syncer/shardddl/pessimist.go | 6 ++ syncer/shardddl/pessimist_test.go | 2 +- syncer/syncer.go | 7 +++ tests/shardddl2/run.sh | 73 ++++++++++++++++++++++++ 19 files changed, 299 insertions(+), 48 deletions(-) create mode 100644 pkg/shardddl/pessimism/ddls.go diff --git a/dm/common/common.go b/dm/common/common.go index e8dde43435..625c4a4e6e 100644 --- a/dm/common/common.go +++ b/dm/common/common.go @@ -64,6 +64,9 @@ var ( // ShardDDLPessimismOperationKeyAdapter is used to store shard DDL operation in pessimistic model. // k/v: Encode(task-name, source-id) -> shard DDL operation. ShardDDLPessimismOperationKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/operation/") + // ShardDDLPessimismDDLsKeyAdapter is used to store last done DDLs in pessimistic model. + // k/v: Encode(lockID) -> DDLs. + ShardDDLPessimismDDLsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/ddls/") // ShardDDLOptimismSourceTablesKeyAdapter is used to store INITIAL upstream schema & table names when starting the subtask. // In other words, if any Info for this subtask exists, we should obey source tables in the Info. @@ -94,7 +97,7 @@ func keyAdapterKeysLen(s KeyAdapter) int { switch s { case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter, WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter, TaskConfigKeyAdapter, - UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter: + UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter, ShardDDLPessimismDDLsKeyAdapter: return 1 case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter, ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter, diff --git a/dm/master/server_test.go b/dm/master/server_test.go index f87ad31000..aeaab30de0 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -589,7 +589,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { schema, table = "foo", "bar" ID = fmt.Sprintf("%s-`%s`.`%s`", taskName, schema, table) i11 = pessimism.NewInfo(taskName, sources[0], schema, table, DDLs) - op2 = pessimism.NewOperation(ID, taskName, sources[0], DDLs, true, false) + op2 = pessimism.NewOperation(ID, taskName, sources[0], DDLs, true, false, false) ) _, err = pessimism.PutInfo(etcdTestCli, i11) c.Assert(err, check.IsNil) diff --git a/dm/master/shardddl/pessimist.go b/dm/master/shardddl/pessimist.go index 901ad4935f..8fdfdaa917 100644 --- a/dm/master/shardddl/pessimist.go +++ b/dm/master/shardddl/pessimist.go @@ -147,8 +147,13 @@ func (p *Pessimist) buildLocks(etcdCli *clientv3.Client) (int64, int64, error) { } p.logger.Info("get history shard DDL lock operation", zap.Reflect("operation", opm), zap.Int64("revision", rev2)) + latestDoneDDLsMap, _, err := pessimism.GetAllLatestDoneDDLs(etcdCli) + if err != nil { + return 0, 0, err + } + // recover the shard DDL lock based on history shard DDL info & lock operation. - err = p.recoverLocks(ifm, opm) + err = p.recoverLocks(ifm, opm, latestDoneDDLsMap) if err != nil { // only log the error, and don't return it to forbid the startup of the DM-master leader. // then these unexpected locks can be handled by the user. @@ -394,24 +399,30 @@ func (p *Pessimist) RemoveMetaData(task string) error { for _, info := range infos { p.lk.RemoveLockByInfo(info) } + lockIDSet := make(map[string]struct{}, len(ops)) for _, op := range ops { p.lk.RemoveLock(op.ID) + lockIDSet[op.ID] = struct{}{} } // clear meta data in etcd - _, err = pessimism.DeleteInfosOperationsByTask(p.cli, task) + _, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task, lockIDSet) return err } // recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation. -func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info, opm map[string]map[string]pessimism.Operation) error { +func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info, + opm map[string]map[string]pessimism.Operation, latestDoneDDLsMap map[string][]string) error { + // add all last done ddls. + p.lk.AddAllLatestDoneDDLs(latestDoneDDLsMap) + // construct locks based on the shard DDL info. for task, ifs := range ifm { sources := p.taskSources(task) // if no operation exists for the lock, we let the smallest (lexicographical order) source as the owner of the lock. // if any operation exists for the lock, we let the source with `exec=true` as the owner of the lock (the logic is below). for _, info := range pessimismInfoMapToSlice(ifs) { - _, _, _, err := p.lk.TrySync(info, sources) + _, _, _, err := p.lk.TrySync(p.cli, info, sources) if err != nil { return err } @@ -471,7 +482,7 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I p.logger.Info("receive a shard DDL info", zap.Stringer("info", info)) p.infoOpMu.Lock() - lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task)) + lockID, synced, remain, err := p.lk.TrySync(p.cli, info, p.taskSources(info.Task)) if err != nil { p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err)) // currently, only DDL mismatch will cause error @@ -536,6 +547,9 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock) } p.logger.Info("the lock info for the shard DDL lock operation has been cleared", zap.Stringer("operation", op)) + if err := p.addLatestDoneDDLs(lock.ID, lock.DDLs); err != nil { + p.logger.Info("fail to record last done ddls", zap.String("lock", lock.ID), log.ShortError(err)) + } p.infoOpMu.Unlock() continue } @@ -576,11 +590,10 @@ func (p *Pessimist) handleLock(lockID, source string) error { if lock.IsResolved() { // remove all operations for this shard DDL lock. // this is to handle the case where dm-master exit before deleting operations for them. - err := p.removeLock(lock) - if err != nil { + if err := p.removeLock(lock); err != nil { return err } - return nil + return p.addLatestDoneDDLs(lock.ID, lock.DDLs) } // check whether the owner has done. @@ -601,7 +614,7 @@ func (p *Pessimist) handleLock(lockID, source string) error { // putOpForOwner PUTs the shard DDL lock operation for the owner into etcd. func (p *Pessimist) putOpForOwner(lock *pessimism.Lock, owner string, skipDone bool) error { - op := pessimism.NewOperation(lock.ID, lock.Task, owner, lock.DDLs, true, false) + op := pessimism.NewOperation(lock.ID, lock.Task, owner, lock.DDLs, true, false, false) rev, succ, err := pessimism.PutOperations(p.cli, skipDone, op) if err != nil { return err @@ -625,7 +638,7 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s ops := make([]pessimism.Operation, 0, len(sources)) for _, source := range sources { - ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false)) + ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false, false)) } rev, succ, err := pessimism.PutOperations(p.cli, skipDone, ops...) @@ -676,6 +689,15 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error { return nil } +// addLatestDoneDDLs add last done ddls into etcd and LockKeeper. +func (p *Pessimist) addLatestDoneDDLs(lockID string, ddls []string) error { + if _, err := pessimism.PutLatestDoneDDLs(p.cli, lockID, ddls); err != nil { + return err + } + p.lk.AddLatestDoneDDLs(lockID, ddls) + return nil +} + // deleteOps DELETEs shard DDL lock operations relative to the lock. func (p *Pessimist) deleteOps(lock *pessimism.Lock) error { ready := lock.Ready() @@ -683,7 +705,7 @@ func (p *Pessimist) deleteOps(lock *pessimism.Lock) error { for source := range ready { // When deleting operations, we do not verify the value of the operation now, // so simply set `exec=false` and `done=true`. - ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true)) + ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false)) } rev, err := pessimism.DeleteOperations(p.cli, ops...) if err != nil { @@ -705,7 +727,7 @@ func (p *Pessimist) deleteInfosOps(lock *pessimism.Lock) error { for source := range ready { // When deleting operations, we do not verify the value of the operation now, // so simply set `exec=false` and `done=true`. - ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true)) + ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false)) } rev, err := pessimism.DeleteInfosOperations(p.cli, infos, ops) @@ -786,7 +808,7 @@ func (p *Pessimist) waitNonOwnerToBeDone(ctx context.Context, lock *pessimism.Lo // we still put `skip` operations for waitSources one more time with `skipDone=true`. ops := make([]pessimism.Operation, 0, len(waitSources)) for _, source := range waitSources { - ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false)) + ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false, false)) } rev, succ, err := pessimism.PutOperations(p.cli, true, ops...) if err != nil { diff --git a/dm/master/shardddl/pessimist_test.go b/dm/master/shardddl/pessimist_test.go index 8e796c0e3e..de469a3e3c 100644 --- a/dm/master/shardddl/pessimist_test.go +++ b/dm/master/shardddl/pessimist_test.go @@ -305,7 +305,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) { c.Assert(p.Locks()[ID2].IsDone(source2), IsFalse) // mark exec operation for one non-owner as `done` (and delete the info). - op22c := pessimism.NewOperation(ID2, task2, source2, DDLs, false, true) + op22c := pessimism.NewOperation(ID2, task2, source2, DDLs, false, true, false) done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op22c, i22) c.Assert(err, IsNil) c.Assert(done, IsTrue) @@ -325,7 +325,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) { // mark skip operation for the non-owner as `done` (and delete the info). // the lock should become resolved and deleted. - op23c := pessimism.NewOperation(ID2, task2, source3, DDLs, false, true) + op23c := pessimism.NewOperation(ID2, task2, source3, DDLs, false, true, false) done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op23c, i23) c.Assert(err, IsNil) c.Assert(done, IsTrue) @@ -844,7 +844,7 @@ func (t *testPessimist) TestMeetEtcdCompactError(c *C) { ID1 = fmt.Sprintf("%s-`%s`.`%s`", task1, schema, table) i11 = pessimism.NewInfo(task1, source1, schema, table, DDLs) i12 = pessimism.NewInfo(task1, source2, schema, table, DDLs) - op = pessimism.NewOperation(ID1, task1, source1, DDLs, true, false) + op = pessimism.NewOperation(ID1, task1, source1, DDLs, true, false, false) revCompacted int64 infoCh chan pessimism.Info diff --git a/go.sum b/go.sum index 33240e2f3c..1856d3a406 100644 --- a/go.sum +++ b/go.sum @@ -783,7 +783,6 @@ github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= @@ -801,7 +800,6 @@ github.com/pingcap/parser v0.0.0-20200813083329-a4bff035d3e2/go.mod h1:vQdbJqobJ github.com/pingcap/parser v0.0.0-20200821073936-cf85e80665c4/go.mod h1:vQdbJqobJAgFyiRNNtXahpMoGWwPEuWciVEK5A20NS0= github.com/pingcap/parser v0.0.0-20200924053142-5d7e8ebf605e/go.mod h1:RlLfMRJwFBSiXd2lUaWdV5pSXtrpyvZM8k5bbZWsheU= github.com/pingcap/parser v0.0.0-20210125075924-ffe0fda947cb/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE= -github.com/pingcap/parser v0.0.0-20210324190955-ab6d0f2c18ee h1:q5PF2QMZ9iGTCQOuWNnXJ5m1C2VifTOns/n6rvCe5rw= github.com/pingcap/parser v0.0.0-20210324190955-ab6d0f2c18ee/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74 h1:FkVEC3Fck3fD16hMObMl/IWs72jR9FmqPn0Bdf728Sk= github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= @@ -1337,7 +1335,6 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/pkg/shardddl/pessimism/ddls.go b/pkg/shardddl/pessimism/ddls.go new file mode 100644 index 0000000000..3755ebe313 --- /dev/null +++ b/pkg/shardddl/pessimism/ddls.go @@ -0,0 +1,73 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pessimism + +import ( + "context" + "encoding/json" + + "go.etcd.io/etcd/clientv3" + + "github.com/pingcap/dm/dm/common" + "github.com/pingcap/dm/pkg/etcdutil" +) + +// PutLatestDoneDDLs puts the last done shard DDL ddls into etcd. +func PutLatestDoneDDLs(cli *clientv3.Client, lockID string, ddls []string) (int64, error) { + data, err := json.Marshal(ddls) + if err != nil { + return 0, err + } + value := string(data) + key := common.ShardDDLPessimismDDLsKeyAdapter.Encode(lockID) + + ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout) + defer cancel() + + resp, err := cli.Put(ctx, key, value) + if err != nil { + return 0, err + } + return resp.Header.Revision, nil +} + +// GetAllLatestDoneDDLs gets all last done shard DDL ddls in etcd currently. +// k/v: lockID -> DDLs +// This function should often be called by DM-master. +func GetAllLatestDoneDDLs(cli *clientv3.Client) (map[string][]string, int64, error) { + ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout) + defer cancel() + + resp, err := cli.Get(ctx, common.ShardDDLPessimismDDLsKeyAdapter.Path(), clientv3.WithPrefix()) + if err != nil { + return nil, 0, err + } + + ddlsMap := make(map[string][]string, len(resp.Kvs)) + for _, kv := range resp.Kvs { + var ddls []string + if err2 := json.Unmarshal(kv.Value, &ddls); err2 != nil { + return nil, 0, err2 + } + keys, err2 := common.ShardDDLPessimismDDLsKeyAdapter.Decode(string(kv.Key)) + if err2 != nil { + return nil, 0, err2 + } + lockID := keys[0] + + ddlsMap[lockID] = ddls + } + + return ddlsMap, resp.Header.Revision, nil +} diff --git a/pkg/shardddl/pessimism/info_test.go b/pkg/shardddl/pessimism/info_test.go index c55f706ee9..c79fb527be 100644 --- a/pkg/shardddl/pessimism/info_test.go +++ b/pkg/shardddl/pessimism/info_test.go @@ -170,7 +170,7 @@ func (t *testForEtcd) TestPutInfoIfOpNotDone(c *C) { DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} ID = fmt.Sprintf("%s-%s", task, dbutil.TableName(schema, table)) info = NewInfo(task, source, schema, table, DDLs) - op = NewOperation(ID, task, source, DDLs, false, false) + op = NewOperation(ID, task, source, DDLs, false, false, false) ) // put info success because no operation exist. diff --git a/pkg/shardddl/pessimism/keeper.go b/pkg/shardddl/pessimism/keeper.go index 35f672db33..7737311b55 100644 --- a/pkg/shardddl/pessimism/keeper.go +++ b/pkg/shardddl/pessimism/keeper.go @@ -16,25 +16,29 @@ package pessimism import ( "sync" + "go.etcd.io/etcd/clientv3" + "github.com/pingcap/dm/pkg/utils" ) // LockKeeper used to keep and handle DDL lock conveniently. // The lock information do not need to be persistent, and can be re-constructed from the shard DDL info. type LockKeeper struct { - mu sync.RWMutex - locks map[string]*Lock // lockID -> Lock + mu sync.RWMutex + locks map[string]*Lock // lockID -> Lock + latestDoneDDLs map[string][]string } // NewLockKeeper creates a new LockKeeper instance. func NewLockKeeper() *LockKeeper { return &LockKeeper{ - locks: make(map[string]*Lock), + locks: make(map[string]*Lock), + latestDoneDDLs: make(map[string][]string), } } // TrySync tries to sync the lock. -func (lk *LockKeeper) TrySync(info Info, sources []string) (string, bool, int, error) { +func (lk *LockKeeper) TrySync(cli *clientv3.Client, info Info, sources []string) (string, bool, int, error) { var ( lockID = genDDLLockID(info) l *Lock @@ -49,10 +53,33 @@ func (lk *LockKeeper) TrySync(info Info, sources []string) (string, bool, int, e l = lk.locks[lockID] } - synced, remain, err := l.TrySync(info.Source, info.DDLs, sources) + synced, remain, err := l.TrySync(cli, info.Source, info.DDLs, sources, lk.GetLatestDoneDDLs(lockID)) return lockID, synced, remain, err } +// AddAllLatestDoneDDLs add all last done ddls. +func (lk *LockKeeper) AddAllLatestDoneDDLs(ddls map[string][]string) { + lk.mu.Lock() + defer lk.mu.Unlock() + lk.latestDoneDDLs = ddls +} + +// AddLatestDoneDDLs add last done ddls by lockID. +func (lk *LockKeeper) AddLatestDoneDDLs(lockID string, ddls []string) { + lk.mu.Lock() + defer lk.mu.Unlock() + lk.latestDoneDDLs[lockID] = ddls +} + +// GetLatestDoneDDLs gets last done ddls by lockID. +func (lk *LockKeeper) GetLatestDoneDDLs(lockID string) []string { + latestDoneDDLs, ok := lk.latestDoneDDLs[lockID] + if !ok { + return nil + } + return latestDoneDDLs +} + // RemoveLock removes a lock. func (lk *LockKeeper) RemoveLock(lockID string) bool { lk.mu.Lock() diff --git a/pkg/shardddl/pessimism/keeper_test.go b/pkg/shardddl/pessimism/keeper_test.go index da532c9438..a9a680f0bf 100644 --- a/pkg/shardddl/pessimism/keeper_test.go +++ b/pkg/shardddl/pessimism/keeper_test.go @@ -37,19 +37,19 @@ func (t *testLockKeeper) TestLockKeeper(c *C) { ) // lock with 2 sources. - lockID1, synced, remain, err := lk.TrySync(info11, []string{source1, source2}) + lockID1, synced, remain, err := lk.TrySync(etcdTestCli, info11, []string{source1, source2}) c.Assert(err, IsNil) c.Assert(lockID1, Equals, "task1-`foo`.`bar`") c.Assert(synced, IsFalse) c.Assert(remain, Equals, 1) - lockID1, synced, remain, err = lk.TrySync(info12, []string{source1, source2}) + lockID1, synced, remain, err = lk.TrySync(etcdTestCli, info12, []string{source1, source2}) c.Assert(err, IsNil) c.Assert(lockID1, Equals, "task1-`foo`.`bar`") c.Assert(synced, IsTrue) c.Assert(remain, Equals, 0) // lock with only 1 source. - lockID2, synced, remain, err := lk.TrySync(info21, []string{source1}) + lockID2, synced, remain, err := lk.TrySync(etcdTestCli, info21, []string{source1}) c.Assert(err, IsNil) c.Assert(lockID2, Equals, "task2-`foo`.`bar`") c.Assert(synced, IsTrue) diff --git a/pkg/shardddl/pessimism/lock.go b/pkg/shardddl/pessimism/lock.go index 034dde731f..93539b9833 100644 --- a/pkg/shardddl/pessimism/lock.go +++ b/pkg/shardddl/pessimism/lock.go @@ -16,7 +16,11 @@ package pessimism import ( "sync" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" + "github.com/pingcap/dm/dm/master/metrics" + "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" ) @@ -64,13 +68,46 @@ func NewLock(id, task, owner string, ddls, sources []string) *Lock { // TrySync tries to sync the lock, does decrease on remain, re-entrant. // new upstream sources may join when the DDL lock is in syncing, // so we need to merge these new sources. -func (l *Lock) TrySync(caller string, ddls, sources []string) (bool, int, error) { +func (l *Lock) TrySync(cli *clientv3.Client, caller string, ddls, sources []string, latestDoneDDLs []string) (bool, int, error) { l.mu.Lock() defer l.mu.Unlock() // check DDL statement first. if !utils.CompareShardingDDLs(ddls, l.DDLs) { - return l.remain <= 0, l.remain, terror.ErrMasterShardingDDLDiff.Generate(l.DDLs, ddls) + // handle conflict + if !utils.CompareShardingDDLs(latestDoneDDLs, ddls) && !utils.CompareShardingDDLs(latestDoneDDLs, l.DDLs) { + return l.remain <= 0, l.remain, terror.ErrMasterShardingDDLDiff.Generate(l.DDLs, ddls) + } + + // current ddls idempotent, skip it. + if utils.CompareShardingDDLs(latestDoneDDLs, ddls) { + log.L().Warn("conflict ddls equals last done ddls, skip it", zap.Strings("ddls", ddls), zap.String("source", caller)) + _, _, err := PutOperations(cli, true, NewOperation(l.ID, l.Task, caller, latestDoneDDLs, false, false, true)) + return l.remain <= 0, l.remain, err + } + + // other sources' ddls idempotent, skip them. + readySources := make([]string, 0, len(l.ready)) + ops := make([]Operation, 0, len(l.ready)) + for source, isReady := range l.ready { + if isReady { + readySources = append(readySources, source) + ops = append(ops, NewOperation(l.ID, l.Task, source, latestDoneDDLs, false, false, true)) + } + } + + log.L().Warn("conflict ddls equals last done ddls, skip them", zap.Strings("ddls", ddls), zap.Strings("sources", readySources)) + if _, _, err := PutOperations(cli, true, ops...); err != nil { + return l.remain <= 0, l.remain, err + } + + // revert ready + for _, source := range readySources { + l.ready[source] = false + l.remain++ + } + l.Owner = caller + l.DDLs = ddls } // try to merge any newly joined sources. diff --git a/pkg/shardddl/pessimism/lock_test.go b/pkg/shardddl/pessimism/lock_test.go index 59f1cb7562..5dab158e8f 100644 --- a/pkg/shardddl/pessimism/lock_test.go +++ b/pkg/shardddl/pessimism/lock_test.go @@ -40,7 +40,7 @@ func (t *testLock) TestLock(c *C) { l1 := NewLock(ID, task, source1, DDLs, []string{source1}) // DDLs mismatch. - synced, remain, err := l1.TrySync(source1, DDLs[1:], []string{source1}) + synced, remain, err := l1.TrySync(etcdTestCli, source1, DDLs[1:], []string{source1}, nil) c.Assert(terror.ErrMasterShardingDDLDiff.Equal(err), IsTrue) c.Assert(synced, IsFalse) c.Assert(remain, Equals, 1) @@ -51,7 +51,7 @@ func (t *testLock) TestLock(c *C) { c.Assert(l1.IsResolved(), IsFalse) // synced. - synced, remain, err = l1.TrySync(source1, DDLs, []string{source1}) + synced, remain, err = l1.TrySync(etcdTestCli, source1, DDLs, []string{source1}, nil) c.Assert(err, IsNil) c.Assert(synced, IsTrue) c.Assert(remain, Equals, 0) @@ -70,7 +70,7 @@ func (t *testLock) TestLock(c *C) { l2 := NewLock(ID, task, source1, DDLs, []string{source1, source2}) // join a new source. - synced, remain, err = l2.TrySync(source1, DDLs, []string{source2, source3}) + synced, remain, err = l2.TrySync(etcdTestCli, source1, DDLs, []string{source2, source3}, nil) c.Assert(err, IsNil) c.Assert(synced, IsFalse) c.Assert(remain, Equals, 2) @@ -81,7 +81,7 @@ func (t *testLock) TestLock(c *C) { }) // sync other sources. - synced, remain, err = l2.TrySync(source2, DDLs, []string{}) + synced, remain, err = l2.TrySync(etcdTestCli, source2, DDLs, []string{}, nil) c.Assert(err, IsNil) c.Assert(synced, IsFalse) c.Assert(remain, Equals, 1) @@ -90,7 +90,7 @@ func (t *testLock) TestLock(c *C) { source2: true, source3: false, }) - synced, remain, err = l2.TrySync(source3, DDLs, nil) + synced, remain, err = l2.TrySync(etcdTestCli, source3, DDLs, nil, nil) c.Assert(err, IsNil) c.Assert(synced, IsTrue) c.Assert(remain, Equals, 0) diff --git a/pkg/shardddl/pessimism/operation.go b/pkg/shardddl/pessimism/operation.go index 8520e345ca..4bb4c55ab1 100644 --- a/pkg/shardddl/pessimism/operation.go +++ b/pkg/shardddl/pessimism/operation.go @@ -35,6 +35,7 @@ type Operation struct { DDLs []string `json:"ddls"` // DDL statements Exec bool `json:"exec"` // execute or skip the DDL statements Done bool `json:"done"` // whether the `Exec` operation has done + Skip bool `json:"skip"` // Whether worker skip this operation // only used to report to the caller of the watcher, do not marsh it. // if it's true, it means the Operation has been deleted in etcd. @@ -42,7 +43,7 @@ type Operation struct { } // NewOperation creates a new Operation instance. -func NewOperation(id, task, source string, ddls []string, exec, done bool) Operation { +func NewOperation(id, task, source string, ddls []string, exec, done bool, skip bool) Operation { return Operation{ ID: id, Task: task, @@ -50,6 +51,7 @@ func NewOperation(id, task, source string, ddls []string, exec, done bool) Opera DDLs: ddls, Exec: exec, Done: done, + Skip: skip, } } diff --git a/pkg/shardddl/pessimism/operation_test.go b/pkg/shardddl/pessimism/operation_test.go index 1e37381d92..0511b742a1 100644 --- a/pkg/shardddl/pessimism/operation_test.go +++ b/pkg/shardddl/pessimism/operation_test.go @@ -25,11 +25,11 @@ import ( func (t *testForEtcd) TestOperationJSON(c *C) { o1 := NewOperation("test-ID", "test", "mysql-replica-1", []string{ "ALTER TABLE bar ADD COLUMN c1 INT", - }, true, false) + }, true, false, false) j, err := o1.toJSON() c.Assert(err, IsNil) - c.Assert(j, Equals, `{"id":"test-ID","task":"test","source":"mysql-replica-1","ddls":["ALTER TABLE bar ADD COLUMN c1 INT"],"exec":true,"done":false}`) + c.Assert(j, Equals, `{"id":"test-ID","task":"test","source":"mysql-replica-1","ddls":["ALTER TABLE bar ADD COLUMN c1 INT"],"exec":true,"done":false,"skip":false}`) c.Assert(j, Equals, o1.String()) o2, err := operationFromJSON(j) @@ -49,10 +49,10 @@ func (t *testForEtcd) TestOperationEtcd(c *C) { source2 = "mysql-replica-2" source3 = "mysql-replica-3" DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} - op11 = NewOperation(ID1, task1, source1, DDLs, true, false) - op12 = NewOperation(ID1, task1, source2, DDLs, true, false) - op13 = NewOperation(ID1, task1, source3, DDLs, true, false) - op21 = NewOperation(ID2, task2, source1, DDLs, false, true) + op11 = NewOperation(ID1, task1, source1, DDLs, true, false, false) + op12 = NewOperation(ID1, task1, source2, DDLs, true, false, false) + op13 = NewOperation(ID1, task1, source3, DDLs, true, false, false) + op21 = NewOperation(ID2, task2, source1, DDLs, false, true, false) ) // put the same keys twice. diff --git a/pkg/shardddl/pessimism/ops.go b/pkg/shardddl/pessimism/ops.go index eb6b4f5a4e..40f67c8ea7 100644 --- a/pkg/shardddl/pessimism/ops.go +++ b/pkg/shardddl/pessimism/ops.go @@ -60,12 +60,16 @@ func DeleteInfosOperations(cli *clientv3.Client, infos []Info, ops []Operation) return rev, err } -// DeleteInfosOperationsByTask deletes the shard DDL infos and operations of a specified task in etcd. +// DeleteInfosOperationsDDLsByTask deletes the shard DDL infos and operations of a specified task in etcd. // This function should often be called by DM-master when deleting ddl meta data. -func DeleteInfosOperationsByTask(cli *clientv3.Client, task string) (int64, error) { +func DeleteInfosOperationsDDLsByTask(cli *clientv3.Client, task string, lockIDSet map[string]struct{}) (int64, error) { opsDel := make([]clientv3.Op, 0, 2) opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismInfoKeyAdapter.Encode(task), clientv3.WithPrefix())) opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismOperationKeyAdapter.Encode(task), clientv3.WithPrefix())) + opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismDDLsKeyAdapter.Encode(task), clientv3.WithPrefix())) + for lockID := range lockIDSet { + opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismDDLsKeyAdapter.Encode(lockID), clientv3.WithPrefix())) + } _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, opsDel...) return rev, err } diff --git a/pkg/shardddl/pessimism/ops_test.go b/pkg/shardddl/pessimism/ops_test.go index 7519175ad2..c9725d6aca 100644 --- a/pkg/shardddl/pessimism/ops_test.go +++ b/pkg/shardddl/pessimism/ops_test.go @@ -25,7 +25,7 @@ func (t *testForEtcd) TestPutOperationDeleteInfo(c *C) { source = "mysql-replica-1" DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} info = NewInfo(task, source, "foo", "bar", DDLs) - op = NewOperation("test-ID", task, source, DDLs, true, false) + op = NewOperation("test-ID", task, source, DDLs, true, false, false) ) // put info. diff --git a/syncer/shardddl/pessimist.go b/syncer/shardddl/pessimist.go index 524c8aee24..46adeb974a 100644 --- a/syncer/shardddl/pessimist.go +++ b/syncer/shardddl/pessimist.go @@ -129,6 +129,12 @@ func (p *Pessimist) GetOperation(ctx context.Context, info pessimism.Info, rev i } } +// DeleteInfosOperations deletes the shard DDL infos and operations in etcd. +// This function should often be called by DM-worker when skip operation. +func (p *Pessimist) DeleteInfosOperations(infos []pessimism.Info, ops []pessimism.Operation) (int64, error) { + return pessimism.DeleteInfosOperations(p.cli, infos, ops) +} + // DoneOperationDeleteInfo marks the shard DDL lock operation as done and delete the shard DDL info. func (p *Pessimist) DoneOperationDeleteInfo(op pessimism.Operation, info pessimism.Info) error { op.Done = true // mark the operation as `done`. diff --git a/syncer/shardddl/pessimist_test.go b/syncer/shardddl/pessimist_test.go index 6b01e8da8e..5f33d0806e 100644 --- a/syncer/shardddl/pessimist_test.go +++ b/syncer/shardddl/pessimist_test.go @@ -65,7 +65,7 @@ func (t *testPessimist) TestPessimist(c *C) { schema, table = "foo", "bar" DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} ID = "task-`foo`.`bar`" - op = pessimism.NewOperation(ID, task, source, DDLs, true, false) + op = pessimism.NewOperation(ID, task, source, DDLs, true, false, false) logger = log.L() p = NewPessimist(&logger, etcdTestCli, task, source) diff --git a/syncer/syncer.go b/syncer/syncer.go index 843c2ec1b1..6e0f843b91 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -979,8 +979,15 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn, tctx.L().Warn("skip shard DDL handle in pessimistic shard mode", zap.Strings("ddl", sqlJob.ddls)) case shardPessimistOp == nil: err = terror.ErrWorkerDDLLockOpNotFound.Generate(shardInfo) + case shardPessimistOp.Skip: + tctx.L().Warn("skip shard DDL operation in pessimistic shard mode", zap.Strings("ddl", sqlJob.ddls)) + _, err = s.pessimist.DeleteInfosOperations([]pessimism.Info{*shardInfo}, []pessimism.Operation{*shardPessimistOp}) default: err = s.pessimist.DoneOperationDeleteInfo(*shardPessimistOp, *shardInfo) + failpoint.Inject("ErrorAfterOpDone", func() { + tctx.L().Warn("error after operation done", zap.Strings("DDL", sqlJob.ddls), zap.String("failpoint", "ErrorAfterOpDone")) + err = errors.Errorf("error after operation done") + }) } case config.ShardOptimistic: shardInfo := s.optimist.PendingInfo() diff --git a/tests/shardddl2/run.sh b/tests/shardddl2/run.sh index 02b413d441..eec1c9602b 100644 --- a/tests/shardddl2/run.sh +++ b/tests/shardddl2/run.sh @@ -1032,6 +1032,78 @@ function DM_DROP_COLUMN_ALL_DONE() { "clean_table" "optimistic" } +function DM_PESSIMISTIC_LAST_DONE_CASE() { + run_sql_source1 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(1,'aaa');" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,'bbb');" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # get worker of source1 + w="1" + got=`grep "mysql-replica-01" $WORK_DIR/worker1/log/dm-worker.log | wc -l` + if [[ "$got" = "0" ]]; then + w="2" + fi + + restart_worker $w "github.com/pingcap/dm/syncer/ErrorAfterOpDone=return()" + + run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" + # make sure source1 put info + check_log_contain_with_retry "putted shard DDL info" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + run_sql_source2 "alter table ${shardddl1}.${tb1} drop column b;" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "error after operation done" 1 + + # case1: source2 put info, source1 reput last done ddl. + if [[ "$2" == "1" ]]; then + run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "ALTER TABLE .* ADD COLUMN" 2 + fi + + restart_master + restart_worker $w "" + + # case2: source1 reput last done ddl, source2 put info. + if [[ "$2" == "2" ]]; then + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "ALTER TABLE .* DROP COLUMN" 2 + run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + fi + + # conflict happen, skip source1's drop column statement + check_log_contain_with_retry "ddls equals last done ddls, skip" $WORK_DIR/master/log/dm-master.log + check_log_contain_with_retry "skip shard DDL operation in pessimistic shard mode" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + + restart_master + + run_sql_source1 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + + run_sql_source1 "insert into ${shardddl1}.${tb1} values(3,'ccc');" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(4,'ddd');" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"result\": true" 3 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml +} + +function DM_PESSIMISTIC_LAST_DONE() { + run_case PESSIMISTIC_LAST_DONE "double-source-pessimistic" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key);\"; \ + run_sql_source2 \"create table ${shardddl1}.${tb1} (a int primary key);\"" \ + "clean_table" "pessimistic" 1 + + run_case PESSIMISTIC_LAST_DONE "double-source-pessimistic" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key);\"; \ + run_sql_source2 \"create table ${shardddl1}.${tb1} (a int primary key);\"" \ + "clean_table" "pessimistic" 2 +} + function run() { init_cluster init_database @@ -1050,6 +1122,7 @@ function run() { DM_INIT_SCHEMA DM_DROP_COLUMN_EXEC_ERROR DM_DROP_COLUMN_ALL_DONE + DM_PESSIMISTIC_LAST_DONE } cleanup_data $shardddl From 24609437414dbaadb3fd4390329a0b16b4fe17a3 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 23 Apr 2021 16:49:19 +0800 Subject: [PATCH 2/7] add ut --- pkg/shardddl/pessimism/ddls_test.go | 56 +++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 pkg/shardddl/pessimism/ddls_test.go diff --git a/pkg/shardddl/pessimism/ddls_test.go b/pkg/shardddl/pessimism/ddls_test.go new file mode 100644 index 0000000000..4d1fb13d9d --- /dev/null +++ b/pkg/shardddl/pessimism/ddls_test.go @@ -0,0 +1,56 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package pessimism + +import ( + . "github.com/pingcap/check" +) + +func (t *testForEtcd) TestDDLsEtcd(c *C) { + defer clearTestInfoOperation(c) + + var ( + ID1 = "test1-`foo`.`bar`" + ID2 = "test2-`foo`.`bar`" + DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} + DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"} + ) + + // put the same keys twice. + rev1, err := PutLatestDoneDDLs(etcdTestCli, ID1, DDLs1) + c.Assert(err, IsNil) + rev2, err := PutLatestDoneDDLs(etcdTestCli, ID1, DDLs1) + c.Assert(err, IsNil) + c.Assert(rev2, Greater, rev1) + + // put another DDLs + rev3, err := PutLatestDoneDDLs(etcdTestCli, ID1, DDLs2) + c.Assert(err, IsNil) + c.Assert(rev3, Greater, rev2) + + // put for another lock + rev4, err := PutLatestDoneDDLs(etcdTestCli, ID2, DDLs1) + c.Assert(err, IsNil) + c.Assert(rev4, Greater, rev3) + + // get all ddls. + latestDoneDDLsMap, rev5, err := GetAllLatestDoneDDLs(etcdTestCli) + c.Assert(err, IsNil) + c.Assert(rev5, Equals, rev4) + c.Assert(latestDoneDDLsMap, HasLen, 2) + c.Assert(latestDoneDDLsMap, HasKey, ID1) + c.Assert(latestDoneDDLsMap, HasKey, ID2) + c.Assert(latestDoneDDLsMap[ID1], DeepEquals, DDLs2) + c.Assert(latestDoneDDLsMap[ID2], DeepEquals, DDLs1) +} From fc5f8984a3b7b3dbaf0410b3d3b476074d8ad382 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 23 Apr 2021 18:25:07 +0800 Subject: [PATCH 3/7] fix test --- dm/master/shardddl/pessimist.go | 3 ++- pkg/shardddl/pessimism/keeper.go | 14 ++++++++++++++ pkg/shardddl/pessimism/ops.go | 4 ++-- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/dm/master/shardddl/pessimist.go b/dm/master/shardddl/pessimist.go index 8fdfdaa917..2b01ea30f0 100644 --- a/dm/master/shardddl/pessimist.go +++ b/dm/master/shardddl/pessimist.go @@ -405,8 +405,9 @@ func (p *Pessimist) RemoveMetaData(task string) error { lockIDSet[op.ID] = struct{}{} } + lockIDs := p.lk.RemoveLatestDoneDDLsByTask(task) // clear meta data in etcd - _, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task, lockIDSet) + _, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task, lockIDs) return err } diff --git a/pkg/shardddl/pessimism/keeper.go b/pkg/shardddl/pessimism/keeper.go index 7737311b55..2ef6643eb0 100644 --- a/pkg/shardddl/pessimism/keeper.go +++ b/pkg/shardddl/pessimism/keeper.go @@ -71,6 +71,20 @@ func (lk *LockKeeper) AddLatestDoneDDLs(lockID string, ddls []string) { lk.latestDoneDDLs[lockID] = ddls } +// RemoveLatestDoneDDLsByTask remove last done ddls by task. +func (lk *LockKeeper) RemoveLatestDoneDDLsByTask(task string) []string { + lk.mu.Lock() + defer lk.mu.Unlock() + lockIDs := make([]string, 0, len(lk.latestDoneDDLs)) + for lockID := range lk.latestDoneDDLs { + if t := utils.ExtractTaskFromLockID(lockID); t == task { + lockIDs = append(lockIDs, lockID) + } + delete(lk.latestDoneDDLs, lockID) + } + return lockIDs +} + // GetLatestDoneDDLs gets last done ddls by lockID. func (lk *LockKeeper) GetLatestDoneDDLs(lockID string) []string { latestDoneDDLs, ok := lk.latestDoneDDLs[lockID] diff --git a/pkg/shardddl/pessimism/ops.go b/pkg/shardddl/pessimism/ops.go index 40f67c8ea7..780de97fa7 100644 --- a/pkg/shardddl/pessimism/ops.go +++ b/pkg/shardddl/pessimism/ops.go @@ -62,12 +62,12 @@ func DeleteInfosOperations(cli *clientv3.Client, infos []Info, ops []Operation) // DeleteInfosOperationsDDLsByTask deletes the shard DDL infos and operations of a specified task in etcd. // This function should often be called by DM-master when deleting ddl meta data. -func DeleteInfosOperationsDDLsByTask(cli *clientv3.Client, task string, lockIDSet map[string]struct{}) (int64, error) { +func DeleteInfosOperationsDDLsByTask(cli *clientv3.Client, task string, lockIDs []string) (int64, error) { opsDel := make([]clientv3.Op, 0, 2) opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismInfoKeyAdapter.Encode(task), clientv3.WithPrefix())) opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismOperationKeyAdapter.Encode(task), clientv3.WithPrefix())) opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismDDLsKeyAdapter.Encode(task), clientv3.WithPrefix())) - for lockID := range lockIDSet { + for _, lockID := range lockIDs { opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismDDLsKeyAdapter.Encode(lockID), clientv3.WithPrefix())) } _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, opsDel...) From c9c9b49e687e6c4ffb0bffa48f0079c842ff9e16 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Sun, 25 Apr 2021 17:06:24 +0800 Subject: [PATCH 4/7] atomic put latest done ddls --- dm/master/shardddl/pessimist.go | 30 +++++++++--------------------- pkg/shardddl/pessimism/ddls.go | 12 ++++++++++++ pkg/shardddl/pessimism/ops.go | 18 ++++++++++++++++++ 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/dm/master/shardddl/pessimist.go b/dm/master/shardddl/pessimist.go index 2b01ea30f0..2d661a0811 100644 --- a/dm/master/shardddl/pessimist.go +++ b/dm/master/shardddl/pessimist.go @@ -296,7 +296,7 @@ func (p *Pessimist) UnlockLock(ctx context.Context, id, replaceOwner string, for // 2. check whether has resolved before (this often should not happen). if lock.IsResolved() { - err := p.removeLock(lock) + err := p.removeLockPutDDLs(lock) if err != nil { return err } @@ -542,15 +542,12 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis if lock.IsResolved() { p.logger.Info("the lock for the shard DDL lock operation has been resolved", zap.Stringer("operation", op)) // remove all operations for this shard DDL lock. - err := p.removeLock(lock) + err := p.removeLockPutDDLs(lock) if err != nil { p.logger.Error("fail to delete the shard DDL lock operations", zap.String("lock", lock.ID), log.ShortError(err)) metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock) } p.logger.Info("the lock info for the shard DDL lock operation has been cleared", zap.Stringer("operation", op)) - if err := p.addLatestDoneDDLs(lock.ID, lock.DDLs); err != nil { - p.logger.Info("fail to record last done ddls", zap.String("lock", lock.ID), log.ShortError(err)) - } p.infoOpMu.Unlock() continue } @@ -591,10 +588,9 @@ func (p *Pessimist) handleLock(lockID, source string) error { if lock.IsResolved() { // remove all operations for this shard DDL lock. // this is to handle the case where dm-master exit before deleting operations for them. - if err := p.removeLock(lock); err != nil { + if err := p.removeLockPutDDLs(lock); err != nil { return err } - return p.addLatestDoneDDLs(lock.ID, lock.DDLs) } // check whether the owner has done. @@ -651,9 +647,9 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s } // removeLock removes the lock in memory and its information in etcd. -func (p *Pessimist) removeLock(lock *pessimism.Lock) error { +func (p *Pessimist) removeLockPutDDLs(lock *pessimism.Lock) error { // remove all operations for this shard DDL lock. - if err := p.deleteOps(lock); err != nil { + if err := p.deleteOpsPutDDLs(lock); err != nil { return err } @@ -686,21 +682,13 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error { } }) p.lk.RemoveLock(lock.ID) + p.lk.AddLatestDoneDDLs(lock.ID, lock.DDLs) metrics.ReportDDLPending(lock.Task, metrics.DDLPendingSynced, metrics.DDLPendingNone) return nil } -// addLatestDoneDDLs add last done ddls into etcd and LockKeeper. -func (p *Pessimist) addLatestDoneDDLs(lockID string, ddls []string) error { - if _, err := pessimism.PutLatestDoneDDLs(p.cli, lockID, ddls); err != nil { - return err - } - p.lk.AddLatestDoneDDLs(lockID, ddls) - return nil -} - // deleteOps DELETEs shard DDL lock operations relative to the lock. -func (p *Pessimist) deleteOps(lock *pessimism.Lock) error { +func (p *Pessimist) deleteOpsPutDDLs(lock *pessimism.Lock) error { ready := lock.Ready() ops := make([]pessimism.Operation, 0, len(ready)) for source := range ready { @@ -708,11 +696,11 @@ func (p *Pessimist) deleteOps(lock *pessimism.Lock) error { // so simply set `exec=false` and `done=true`. ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false)) } - rev, err := pessimism.DeleteOperations(p.cli, ops...) + rev, err := pessimism.DeleteOperationsPutDDLs(p.cli, lock.ID, ops, lock.DDLs) if err != nil { return err } - p.logger.Info("delete shard DDL lock operations", zap.String("lock", lock.ID), zap.Int64("revision", rev)) + p.logger.Info("delete shard DDL lock operations and put latest done ddls", zap.String("lock", lock.ID), zap.Int64("revision", rev), zap.Strings("ddls", lock.DDLs)) return err } diff --git a/pkg/shardddl/pessimism/ddls.go b/pkg/shardddl/pessimism/ddls.go index 3755ebe313..4430a85da0 100644 --- a/pkg/shardddl/pessimism/ddls.go +++ b/pkg/shardddl/pessimism/ddls.go @@ -23,6 +23,18 @@ import ( "github.com/pingcap/dm/pkg/etcdutil" ) +// putLatestDoneDDLsOp returns a PUT etcd operation for latest done ddls. +// This operation should often be sent by DM-master. +func putLatestDoneDDLsOp(lockID string, ddls []string) (clientv3.Op, error) { + data, err := json.Marshal(ddls) + if err != nil { + return clientv3.Op{}, err + } + key := common.ShardDDLPessimismDDLsKeyAdapter.Encode(lockID) + + return clientv3.OpPut(key, string(data)), nil +} + // PutLatestDoneDDLs puts the last done shard DDL ddls into etcd. func PutLatestDoneDDLs(cli *clientv3.Client, lockID string, ddls []string) (int64, error) { data, err := json.Marshal(ddls) diff --git a/pkg/shardddl/pessimism/ops.go b/pkg/shardddl/pessimism/ops.go index 780de97fa7..5250ec9e75 100644 --- a/pkg/shardddl/pessimism/ops.go +++ b/pkg/shardddl/pessimism/ops.go @@ -60,6 +60,24 @@ func DeleteInfosOperations(cli *clientv3.Client, infos []Info, ops []Operation) return rev, err } +// DeleteOperationsPutDDLs deletes the shard DDL operations and add latest done DDLs in etcd. +// This function should often be called by DM-master when calling UnlockDDL. +func DeleteOperationsPutDDLs(cli *clientv3.Client, lockID string, ops []Operation, ddls []string) (int64, error) { + etcdOps := make([]clientv3.Op, 0, len(ops)) + for _, op := range ops { + etcdOps = append(etcdOps, deleteOperationOp(op)) + } + + putOp, err := putLatestDoneDDLsOp(lockID, ddls) + if err != nil { + return 0, err + } + + etcdOps = append(etcdOps, putOp) + _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, etcdOps...) + return rev, err +} + // DeleteInfosOperationsDDLsByTask deletes the shard DDL infos and operations of a specified task in etcd. // This function should often be called by DM-master when deleting ddl meta data. func DeleteInfosOperationsDDLsByTask(cli *clientv3.Client, task string, lockIDs []string) (int64, error) { From a770b5d1067488c7fd58dcf12b9fc07e6b3b91f0 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 7 May 2021 17:12:49 +0800 Subject: [PATCH 5/7] address comment --- dm/common/common.go | 6 ++--- dm/master/server.go | 2 +- dm/master/shardddl/pessimist.go | 8 +++--- pkg/shardddl/pessimism/ddls.go | 39 +++++++++++++-------------- pkg/shardddl/pessimism/ddls_test.go | 26 +++++++++--------- pkg/shardddl/pessimism/keeper.go | 42 ++++++++++++++++------------- pkg/shardddl/pessimism/lock.go | 7 ++--- pkg/shardddl/pessimism/operation.go | 2 +- pkg/shardddl/pessimism/ops.go | 9 +++---- pkg/utils/common.go | 12 ++++++++- 10 files changed, 84 insertions(+), 69 deletions(-) diff --git a/dm/common/common.go b/dm/common/common.go index 625c4a4e6e..6da35f7c6a 100644 --- a/dm/common/common.go +++ b/dm/common/common.go @@ -65,7 +65,7 @@ var ( // k/v: Encode(task-name, source-id) -> shard DDL operation. ShardDDLPessimismOperationKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/operation/") // ShardDDLPessimismDDLsKeyAdapter is used to store last done DDLs in pessimistic model. - // k/v: Encode(lockID) -> DDLs. + // k/v: Encode(task-name, downSchema, downTable) -> DDLs. ShardDDLPessimismDDLsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/ddls/") // ShardDDLOptimismSourceTablesKeyAdapter is used to store INITIAL upstream schema & table names when starting the subtask. @@ -97,13 +97,13 @@ func keyAdapterKeysLen(s KeyAdapter) int { switch s { case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter, WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter, TaskConfigKeyAdapter, - UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter, ShardDDLPessimismDDLsKeyAdapter: + UpstreamLastBoundWorkerKeyAdapter, UpstreamRelayWorkerKeyAdapter: return 1 case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter, ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter, ShardDDLOptimismSourceTablesKeyAdapter: return 2 - case ShardDDLOptimismInitSchemaKeyAdapter: + case ShardDDLOptimismInitSchemaKeyAdapter, ShardDDLPessimismDDLsKeyAdapter: return 3 case ShardDDLOptimismInfoKeyAdapter, ShardDDLOptimismOperationKeyAdapter: return 4 diff --git a/dm/master/server.go b/dm/master/server.go index 1170b508be..f0d1f6630a 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -711,7 +711,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusListRequest } // adjust unsynced field in sync status by looking at DDL locks. -// because if a DM-worker doesn't receive any shard DDL, it doesn't even know it's unsynced for itself +// because if a DM-worker doesn't receive any shard DDL, it doesn't even know it's unsynced for itself. func (s *Server) fillUnsyncedStatus(resps []*pb.QueryStatusResponse) { for _, resp := range resps { for _, subtaskStatus := range resp.SubTaskStatus { diff --git a/dm/master/shardddl/pessimist.go b/dm/master/shardddl/pessimist.go index 82375fe4ac..acdeee8936 100644 --- a/dm/master/shardddl/pessimist.go +++ b/dm/master/shardddl/pessimist.go @@ -399,21 +399,19 @@ func (p *Pessimist) RemoveMetaData(task string) error { for _, info := range infos { p.lk.RemoveLockByInfo(info) } - lockIDSet := make(map[string]struct{}, len(ops)) for _, op := range ops { p.lk.RemoveLock(op.ID) - lockIDSet[op.ID] = struct{}{} } - lockIDs := p.lk.RemoveLatestDoneDDLsByTask(task) + p.lk.RemoveLatestDoneDDLsByTask(task) // clear meta data in etcd - _, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task, lockIDs) + _, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task) return err } // recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation. func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info, - opm map[string]map[string]pessimism.Operation, latestDoneDDLsMap map[string][]string) error { + opm map[string]map[string]pessimism.Operation, latestDoneDDLsMap map[string]map[string]map[string][]string) error { // add all last done ddls. p.lk.AddAllLatestDoneDDLs(latestDoneDDLsMap) diff --git a/pkg/shardddl/pessimism/ddls.go b/pkg/shardddl/pessimism/ddls.go index 4430a85da0..4e6c214cc4 100644 --- a/pkg/shardddl/pessimism/ddls.go +++ b/pkg/shardddl/pessimism/ddls.go @@ -25,39 +25,30 @@ import ( // putLatestDoneDDLsOp returns a PUT etcd operation for latest done ddls. // This operation should often be sent by DM-master. -func putLatestDoneDDLsOp(lockID string, ddls []string) (clientv3.Op, error) { +func putLatestDoneDDLsOp(task, downSchema, downTable string, ddls []string) (clientv3.Op, error) { data, err := json.Marshal(ddls) if err != nil { return clientv3.Op{}, err } - key := common.ShardDDLPessimismDDLsKeyAdapter.Encode(lockID) + key := common.ShardDDLPessimismDDLsKeyAdapter.Encode(task, downSchema, downTable) return clientv3.OpPut(key, string(data)), nil } // PutLatestDoneDDLs puts the last done shard DDL ddls into etcd. -func PutLatestDoneDDLs(cli *clientv3.Client, lockID string, ddls []string) (int64, error) { - data, err := json.Marshal(ddls) - if err != nil { - return 0, err - } - value := string(data) - key := common.ShardDDLPessimismDDLsKeyAdapter.Encode(lockID) - - ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout) - defer cancel() - - resp, err := cli.Put(ctx, key, value) +func PutLatestDoneDDLs(cli *clientv3.Client, task, downSchema, downTable string, ddls []string) (int64, error) { + putOp, err := putLatestDoneDDLsOp(task, downSchema, downTable, ddls) if err != nil { return 0, err } - return resp.Header.Revision, nil + _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, putOp) + return rev, err } // GetAllLatestDoneDDLs gets all last done shard DDL ddls in etcd currently. -// k/v: lockID -> DDLs +// k/v: task -> downSchema -> downTable -> DDLs // This function should often be called by DM-master. -func GetAllLatestDoneDDLs(cli *clientv3.Client) (map[string][]string, int64, error) { +func GetAllLatestDoneDDLs(cli *clientv3.Client) (map[string]map[string]map[string][]string, int64, error) { ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout) defer cancel() @@ -66,7 +57,7 @@ func GetAllLatestDoneDDLs(cli *clientv3.Client) (map[string][]string, int64, err return nil, 0, err } - ddlsMap := make(map[string][]string, len(resp.Kvs)) + ddlsMap := make(map[string]map[string]map[string][]string, len(resp.Kvs)) for _, kv := range resp.Kvs { var ddls []string if err2 := json.Unmarshal(kv.Value, &ddls); err2 != nil { @@ -76,9 +67,17 @@ func GetAllLatestDoneDDLs(cli *clientv3.Client) (map[string][]string, int64, err if err2 != nil { return nil, 0, err2 } - lockID := keys[0] + task := keys[0] + downSchema := keys[1] + downTable := keys[2] - ddlsMap[lockID] = ddls + if _, ok := ddlsMap[task]; !ok { + ddlsMap[task] = make(map[string]map[string][]string) + } + if _, ok := ddlsMap[task][downSchema]; !ok { + ddlsMap[task][downSchema] = make(map[string][]string) + } + ddlsMap[task][downSchema][downTable] = ddls } return ddlsMap, resp.Header.Revision, nil diff --git a/pkg/shardddl/pessimism/ddls_test.go b/pkg/shardddl/pessimism/ddls_test.go index 4d1fb13d9d..69dff0cbe0 100644 --- a/pkg/shardddl/pessimism/ddls_test.go +++ b/pkg/shardddl/pessimism/ddls_test.go @@ -15,32 +15,36 @@ package pessimism import ( . "github.com/pingcap/check" + + "github.com/pingcap/dm/pkg/utils" ) func (t *testForEtcd) TestDDLsEtcd(c *C) { defer clearTestInfoOperation(c) var ( - ID1 = "test1-`foo`.`bar`" - ID2 = "test2-`foo`.`bar`" - DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} - DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"} + ID1 = "test1-`foo`.`bar`" + ID2 = "test2-`foo`.`bar`" + task1, downSchema1, downTable1 = utils.ExtractAllFromLockID(ID1) + task2, downSchema2, downTable2 = utils.ExtractAllFromLockID(ID2) + DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"} + DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"} ) // put the same keys twice. - rev1, err := PutLatestDoneDDLs(etcdTestCli, ID1, DDLs1) + rev1, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs1) c.Assert(err, IsNil) - rev2, err := PutLatestDoneDDLs(etcdTestCli, ID1, DDLs1) + rev2, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs1) c.Assert(err, IsNil) c.Assert(rev2, Greater, rev1) // put another DDLs - rev3, err := PutLatestDoneDDLs(etcdTestCli, ID1, DDLs2) + rev3, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs2) c.Assert(err, IsNil) c.Assert(rev3, Greater, rev2) // put for another lock - rev4, err := PutLatestDoneDDLs(etcdTestCli, ID2, DDLs1) + rev4, err := PutLatestDoneDDLs(etcdTestCli, task2, downSchema2, downTable2, DDLs1) c.Assert(err, IsNil) c.Assert(rev4, Greater, rev3) @@ -49,8 +53,6 @@ func (t *testForEtcd) TestDDLsEtcd(c *C) { c.Assert(err, IsNil) c.Assert(rev5, Equals, rev4) c.Assert(latestDoneDDLsMap, HasLen, 2) - c.Assert(latestDoneDDLsMap, HasKey, ID1) - c.Assert(latestDoneDDLsMap, HasKey, ID2) - c.Assert(latestDoneDDLsMap[ID1], DeepEquals, DDLs2) - c.Assert(latestDoneDDLsMap[ID2], DeepEquals, DDLs1) + c.Assert(latestDoneDDLsMap[task1][downSchema1][downTable1], DeepEquals, DDLs2) + c.Assert(latestDoneDDLsMap[task2][downSchema2][downTable2], DeepEquals, DDLs1) } diff --git a/pkg/shardddl/pessimism/keeper.go b/pkg/shardddl/pessimism/keeper.go index 84cdccb6e4..b6634e68cc 100644 --- a/pkg/shardddl/pessimism/keeper.go +++ b/pkg/shardddl/pessimism/keeper.go @@ -26,15 +26,15 @@ import ( // The lock information do not need to be persistent, and can be re-constructed from the shard DDL info. type LockKeeper struct { mu sync.RWMutex - locks map[string]*Lock // lockID -> Lock - latestDoneDDLs map[string][]string + locks map[string]*Lock // lockID -> Lock + latestDoneDDLs map[string]map[string]map[string][]string // task -> downSchema -> downTable -> ddls } // NewLockKeeper creates a new LockKeeper instance. func NewLockKeeper() *LockKeeper { return &LockKeeper{ locks: make(map[string]*Lock), - latestDoneDDLs: make(map[string][]string), + latestDoneDDLs: make(map[string]map[string]map[string][]string), } } @@ -54,41 +54,47 @@ func (lk *LockKeeper) TrySync(cli *clientv3.Client, info Info, sources []string) l = lk.locks[lockID] } - synced, remain, err := l.TrySync(cli, info.Source, info.DDLs, sources, lk.GetLatestDoneDDLs(lockID)) + synced, remain, err := l.TrySync(cli, info.Source, info.DDLs, sources, lk.GetLatestDoneDDLs(info.Task, info.Schema, info.Table)) return lockID, synced, remain, err } // AddAllLatestDoneDDLs add all last done ddls. -func (lk *LockKeeper) AddAllLatestDoneDDLs(ddls map[string][]string) { +func (lk *LockKeeper) AddAllLatestDoneDDLs(latestDoneDDLs map[string]map[string]map[string][]string) { lk.mu.Lock() defer lk.mu.Unlock() - lk.latestDoneDDLs = ddls + lk.latestDoneDDLs = latestDoneDDLs } // AddLatestDoneDDLs add last done ddls by lockID. func (lk *LockKeeper) AddLatestDoneDDLs(lockID string, ddls []string) { lk.mu.Lock() defer lk.mu.Unlock() - lk.latestDoneDDLs[lockID] = ddls + task, downSchema, downTable := utils.ExtractAllFromLockID(lockID) + if _, ok := lk.latestDoneDDLs[task]; !ok { + lk.latestDoneDDLs[task] = make(map[string]map[string][]string) + } + if _, ok := lk.latestDoneDDLs[task][downSchema]; !ok { + lk.latestDoneDDLs[task][downSchema] = make(map[string][]string) + } + lk.latestDoneDDLs[task][downSchema][downTable] = ddls } // RemoveLatestDoneDDLsByTask remove last done ddls by task. -func (lk *LockKeeper) RemoveLatestDoneDDLsByTask(task string) []string { +func (lk *LockKeeper) RemoveLatestDoneDDLsByTask(task string) { lk.mu.Lock() defer lk.mu.Unlock() - lockIDs := make([]string, 0, len(lk.latestDoneDDLs)) - for lockID := range lk.latestDoneDDLs { - if t := utils.ExtractTaskFromLockID(lockID); t == task { - lockIDs = append(lockIDs, lockID) - } - delete(lk.latestDoneDDLs, lockID) - } - return lockIDs + delete(lk.latestDoneDDLs, task) } // GetLatestDoneDDLs gets last done ddls by lockID. -func (lk *LockKeeper) GetLatestDoneDDLs(lockID string) []string { - latestDoneDDLs, ok := lk.latestDoneDDLs[lockID] +func (lk *LockKeeper) GetLatestDoneDDLs(task, downSchema, downTable string) []string { + if _, ok := lk.latestDoneDDLs[task]; !ok { + return nil + } + if _, ok := lk.latestDoneDDLs[task][downSchema]; !ok { + return nil + } + latestDoneDDLs, ok := lk.latestDoneDDLs[task][downSchema][downTable] if !ok { return nil } diff --git a/pkg/shardddl/pessimism/lock.go b/pkg/shardddl/pessimism/lock.go index 93539b9833..c3a44b9638 100644 --- a/pkg/shardddl/pessimism/lock.go +++ b/pkg/shardddl/pessimism/lock.go @@ -68,19 +68,20 @@ func NewLock(id, task, owner string, ddls, sources []string) *Lock { // TrySync tries to sync the lock, does decrease on remain, re-entrant. // new upstream sources may join when the DDL lock is in syncing, // so we need to merge these new sources. -func (l *Lock) TrySync(cli *clientv3.Client, caller string, ddls, sources []string, latestDoneDDLs []string) (bool, int, error) { +func (l *Lock) TrySync(cli *clientv3.Client, caller string, ddls, sources, latestDoneDDLs []string) (bool, int, error) { l.mu.Lock() defer l.mu.Unlock() // check DDL statement first. if !utils.CompareShardingDDLs(ddls, l.DDLs) { + curIdempotent := utils.CompareShardingDDLs(latestDoneDDLs, ddls) // handle conflict - if !utils.CompareShardingDDLs(latestDoneDDLs, ddls) && !utils.CompareShardingDDLs(latestDoneDDLs, l.DDLs) { + if !curIdempotent && !utils.CompareShardingDDLs(latestDoneDDLs, l.DDLs) { return l.remain <= 0, l.remain, terror.ErrMasterShardingDDLDiff.Generate(l.DDLs, ddls) } // current ddls idempotent, skip it. - if utils.CompareShardingDDLs(latestDoneDDLs, ddls) { + if curIdempotent { log.L().Warn("conflict ddls equals last done ddls, skip it", zap.Strings("ddls", ddls), zap.String("source", caller)) _, _, err := PutOperations(cli, true, NewOperation(l.ID, l.Task, caller, latestDoneDDLs, false, false, true)) return l.remain <= 0, l.remain, err diff --git a/pkg/shardddl/pessimism/operation.go b/pkg/shardddl/pessimism/operation.go index 4bb4c55ab1..7e5e9eee12 100644 --- a/pkg/shardddl/pessimism/operation.go +++ b/pkg/shardddl/pessimism/operation.go @@ -43,7 +43,7 @@ type Operation struct { } // NewOperation creates a new Operation instance. -func NewOperation(id, task, source string, ddls []string, exec, done bool, skip bool) Operation { +func NewOperation(id, task, source string, ddls []string, exec, done, skip bool) Operation { return Operation{ ID: id, Task: task, diff --git a/pkg/shardddl/pessimism/ops.go b/pkg/shardddl/pessimism/ops.go index 5250ec9e75..2a52548dcc 100644 --- a/pkg/shardddl/pessimism/ops.go +++ b/pkg/shardddl/pessimism/ops.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/dm/dm/common" "github.com/pingcap/dm/pkg/etcdutil" + "github.com/pingcap/dm/pkg/utils" ) // TODO(csuzhangxc): assign terror code before merged into the master branch. @@ -68,7 +69,8 @@ func DeleteOperationsPutDDLs(cli *clientv3.Client, lockID string, ops []Operatio etcdOps = append(etcdOps, deleteOperationOp(op)) } - putOp, err := putLatestDoneDDLsOp(lockID, ddls) + task, downSchema, downTable := utils.ExtractAllFromLockID(lockID) + putOp, err := putLatestDoneDDLsOp(task, downSchema, downTable, ddls) if err != nil { return 0, err } @@ -80,14 +82,11 @@ func DeleteOperationsPutDDLs(cli *clientv3.Client, lockID string, ops []Operatio // DeleteInfosOperationsDDLsByTask deletes the shard DDL infos and operations of a specified task in etcd. // This function should often be called by DM-master when deleting ddl meta data. -func DeleteInfosOperationsDDLsByTask(cli *clientv3.Client, task string, lockIDs []string) (int64, error) { +func DeleteInfosOperationsDDLsByTask(cli *clientv3.Client, task string) (int64, error) { opsDel := make([]clientv3.Op, 0, 2) opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismInfoKeyAdapter.Encode(task), clientv3.WithPrefix())) opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismOperationKeyAdapter.Encode(task), clientv3.WithPrefix())) opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismDDLsKeyAdapter.Encode(task), clientv3.WithPrefix())) - for _, lockID := range lockIDs { - opsDel = append(opsDel, clientv3.OpDelete(common.ShardDDLPessimismDDLsKeyAdapter.Encode(lockID), clientv3.WithPrefix())) - } _, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, opsDel...) return rev, err } diff --git a/pkg/utils/common.go b/pkg/utils/common.go index ef3cbe4b59..647a227cd1 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -183,7 +183,7 @@ func ExtractTaskFromLockID(lockID string) string { return strs[1] } -// ExtractDBAndTableFromLockID extract schema and table from lockID +// ExtractDBAndTableFromLockID extract schema and table from lockID. func ExtractDBAndTableFromLockID(lockID string) (string, string) { strs := lockIDPattern.FindStringSubmatch(lockID) // strs should be [full-lock-ID, task, db, table] if successful matched @@ -193,6 +193,16 @@ func ExtractDBAndTableFromLockID(lockID string) (string, string) { return strs[2], strs[3] } +// ExtractAllFromLockID extract task, downSchema, downTable from lockID. +func ExtractAllFromLockID(lockID string) (string, string, string) { + strs := lockIDPattern.FindStringSubmatch(lockID) + // strs should be [full-lock-ID, task, db, table] if successful matched + if len(strs) < 4 { + return "", "", "" + } + return strs[1], strs[2], strs[3] +} + // NonRepeatStringsEqual is used to compare two un-ordered, non-repeat-element string slice is equal. func NonRepeatStringsEqual(a, b []string) bool { if len(a) != len(b) { From c4405c34fdc7ab70478920ddbd9d7caa999327a4 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 18 May 2021 11:22:33 +0800 Subject: [PATCH 6/7] fix shfmt --- tests/shardddl2/run.sh | 134 ++++++++++++++++++++--------------------- 1 file changed, 67 insertions(+), 67 deletions(-) diff --git a/tests/shardddl2/run.sh b/tests/shardddl2/run.sh index f6ac1eafe9..92b518059e 100644 --- a/tests/shardddl2/run.sh +++ b/tests/shardddl2/run.sh @@ -463,7 +463,7 @@ function DM_058() { run_case 058 "double-source-optimistic" "init_table 111 211 212" "clean_table" "optimistic" } -function DM_059_CASE { +function DM_059_CASE() { run_sql_source1 "insert into ${shardddl1}.${tb1} (id) values(1);" run_sql_source2 "insert into ${shardddl1}.${tb1} (id) values(2);" run_sql_source2 "insert into ${shardddl1}.${tb2} (id) values(3);" @@ -665,7 +665,7 @@ function DM_067() { run_case 067 "double-source-optimistic" "init_table 111 211 212" "clean_table" "optimistic" } -function DM_068_CASE { +function DM_068_CASE() { run_sql_source1 "alter table ${shardddl1}.${tb1} modify id datetime default now();" run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,now());" run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,now());" @@ -694,7 +694,7 @@ function DM_068() { "clean_table" "optimistic" } -function DM_ADD_DROP_COLUMNS_CASE { +function DM_ADD_DROP_COLUMNS_CASE() { # add cols run_sql_source1 "alter table ${shardddl1}.${tb1} add column col1 int, add column col2 int, add column col3 int;" run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,now(),1,1,1);" @@ -767,7 +767,7 @@ function DM_ADD_DROP_COLUMNS() { "clean_table" "optimistic" } -function DM_COLUMN_INDEX_CASE { +function DM_COLUMN_INDEX_CASE() { # add col and index run_sql_source1 "alter table ${shardddl1}.${tb1} add column col3 int, add index idx_col1(col1);" run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,1,1,1);" @@ -1031,75 +1031,75 @@ function DM_DROP_COLUMN_ALL_DONE() { } function DM_PESSIMISTIC_LAST_DONE_CASE() { - run_sql_source1 "alter table ${shardddl1}.${tb1} add column b varchar(10);" - run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" - run_sql_source2 "insert into ${shardddl1}.${tb1} values(1,'aaa');" - run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,'bbb');" - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - - # get worker of source1 - w="1" - got=`grep "mysql-replica-01" $WORK_DIR/worker1/log/dm-worker.log | wc -l` - if [[ "$got" = "0" ]]; then - w="2" - fi - - restart_worker $w "github.com/pingcap/dm/syncer/ErrorAfterOpDone=return()" - - run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" - # make sure source1 put info - check_log_contain_with_retry "putted shard DDL info" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log - run_sql_source2 "alter table ${shardddl1}.${tb1} drop column b;" - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ - "error after operation done" 1 - - # case1: source2 put info, source1 reput last done ddl. - if [[ "$2" == "1" ]]; then - run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ - "ALTER TABLE .* ADD COLUMN" 2 - fi - - restart_master - restart_worker $w "" - - # case2: source1 reput last done ddl, source2 put info. - if [[ "$2" == "2" ]]; then - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ - "ALTER TABLE .* DROP COLUMN" 2 - run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" - fi - - # conflict happen, skip source1's drop column statement - check_log_contain_with_retry "ddls equals last done ddls, skip" $WORK_DIR/master/log/dm-master.log - check_log_contain_with_retry "skip shard DDL operation in pessimistic shard mode" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log - - restart_master - - run_sql_source1 "alter table ${shardddl1}.${tb1} add column b varchar(10);" - - run_sql_source1 "insert into ${shardddl1}.${tb1} values(3,'ccc');" - run_sql_source2 "insert into ${shardddl1}.${tb1} values(4,'ddd');" - - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ - "\"result\": true" 3 - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + run_sql_source1 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(1,'aaa');" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(2,'bbb');" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # get worker of source1 + w="1" + got=$(grep "mysql-replica-01" $WORK_DIR/worker1/log/dm-worker.log | wc -l) + if [[ "$got" = "0" ]]; then + w="2" + fi + + restart_worker $w "github.com/pingcap/dm/syncer/ErrorAfterOpDone=return()" + + run_sql_source1 "alter table ${shardddl1}.${tb1} drop column b;" + # make sure source1 put info + check_log_contain_with_retry "putted shard DDL info" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + run_sql_source2 "alter table ${shardddl1}.${tb1} drop column b;" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "error after operation done" 1 + + # case1: source2 put info, source1 reput last done ddl. + if [[ "$2" == "1" ]]; then + run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "ALTER TABLE .* ADD COLUMN" 2 + fi + + restart_master + restart_worker $w "" + + # case2: source1 reput last done ddl, source2 put info. + if [[ "$2" == "2" ]]; then + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "ALTER TABLE .* DROP COLUMN" 2 + run_sql_source2 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + fi + + # conflict happen, skip source1's drop column statement + check_log_contain_with_retry "ddls equals last done ddls, skip" $WORK_DIR/master/log/dm-master.log + check_log_contain_with_retry "skip shard DDL operation in pessimistic shard mode" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + + restart_master + + run_sql_source1 "alter table ${shardddl1}.${tb1} add column b varchar(10);" + + run_sql_source1 "insert into ${shardddl1}.${tb1} values(3,'ccc');" + run_sql_source2 "insert into ${shardddl1}.${tb1} values(4,'ddd');" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"result\": true" 3 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml } function DM_PESSIMISTIC_LAST_DONE() { - run_case PESSIMISTIC_LAST_DONE "double-source-pessimistic" \ - "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key);\"; \ + run_case PESSIMISTIC_LAST_DONE "double-source-pessimistic" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key);\"; \ run_sql_source2 \"create table ${shardddl1}.${tb1} (a int primary key);\"" \ - "clean_table" "pessimistic" 1 + "clean_table" "pessimistic" 1 - run_case PESSIMISTIC_LAST_DONE "double-source-pessimistic" \ - "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key);\"; \ + run_case PESSIMISTIC_LAST_DONE "double-source-pessimistic" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key);\"; \ run_sql_source2 \"create table ${shardddl1}.${tb1} (a int primary key);\"" \ - "clean_table" "pessimistic" 2 + "clean_table" "pessimistic" 2 } function run() { From 672a5b78b5359504deec41bd40a7f1162b279a87 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 18 May 2021 14:11:55 +0800 Subject: [PATCH 7/7] address comment --- dm/master/shardddl/pessimist.go | 14 +++++++------- pkg/shardddl/pessimism/ops.go | 2 +- pkg/utils/common_test.go | 7 +++++++ 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/dm/master/shardddl/pessimist.go b/dm/master/shardddl/pessimist.go index acdeee8936..754242c593 100644 --- a/dm/master/shardddl/pessimist.go +++ b/dm/master/shardddl/pessimist.go @@ -403,10 +403,12 @@ func (p *Pessimist) RemoveMetaData(task string) error { p.lk.RemoveLock(op.ID) } - p.lk.RemoveLatestDoneDDLsByTask(task) // clear meta data in etcd - _, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task) - return err + if _, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task); err != nil { + return err + } + p.lk.RemoveLatestDoneDDLsByTask(task) + return nil } // recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation. @@ -586,9 +588,7 @@ func (p *Pessimist) handleLock(lockID, source string) error { if lock.IsResolved() { // remove all operations for this shard DDL lock. // this is to handle the case where dm-master exit before deleting operations for them. - if err := p.removeLockPutDDLs(lock); err != nil { - return err - } + return p.removeLockPutDDLs(lock) } // check whether the owner has done. @@ -644,7 +644,7 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s return nil } -// removeLock removes the lock in memory and its information in etcd. +// removeLockPutDDLs removes the lock in memory and its information and put ddls in etcd. func (p *Pessimist) removeLockPutDDLs(lock *pessimism.Lock) error { // remove all operations for this shard DDL lock. if err := p.deleteOpsPutDDLs(lock); err != nil { diff --git a/pkg/shardddl/pessimism/ops.go b/pkg/shardddl/pessimism/ops.go index 2a52548dcc..c4bde54f04 100644 --- a/pkg/shardddl/pessimism/ops.go +++ b/pkg/shardddl/pessimism/ops.go @@ -62,7 +62,7 @@ func DeleteInfosOperations(cli *clientv3.Client, infos []Info, ops []Operation) } // DeleteOperationsPutDDLs deletes the shard DDL operations and add latest done DDLs in etcd. -// This function should often be called by DM-master when calling UnlockDDL. +// This function should often be called by DM-master when calling UnlockDDL and the lock is resolved. func DeleteOperationsPutDDLs(cli *clientv3.Client, lockID string, ops []Operation, ddls []string) (int64, error) { etcdOps := make([]clientv3.Op, 0, len(ops)) for _, op := range ops { diff --git a/pkg/utils/common_test.go b/pkg/utils/common_test.go index 5b2b8de634..b0830ef9be 100644 --- a/pkg/utils/common_test.go +++ b/pkg/utils/common_test.go @@ -246,6 +246,13 @@ func (s *testCommonSuite) TestDDLLockID(c *C) { c.Assert(ID, Equals, "test-`d``b`.`tb``l`") c.Assert(ExtractTaskFromLockID(ID), Equals, task) + ID = GenDDLLockID("test-task", "d-b", "tb-l") + c.Assert(ID, Equals, "test-task-`d-b`.`tb-l`") + t, ds, dt := ExtractAllFromLockID(ID) + c.Assert(t, Equals, "test-task") + c.Assert(ds, Equals, "d-b") + c.Assert(dt, Equals, "tb-l") + // invalid ID c.Assert(ExtractTaskFromLockID("invalid-lock-id"), Equals, "") }