Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

pessimistic: record latest done ddl to handle conflict #1626

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ var (
// ShardDDLPessimismOperationKeyAdapter is used to store shard DDL operation in pessimistic model.
// k/v: Encode(task-name, source-id) -> shard DDL operation.
ShardDDLPessimismOperationKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/operation/")
// ShardDDLPessimismDDLsKeyAdapter is used to store last done DDLs in pessimistic model.
// k/v: Encode(task-name, downSchema, downTable) -> DDLs.
ShardDDLPessimismDDLsKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/shardddl-pessimism/ddls/")
lance6716 marked this conversation as resolved.
Show resolved Hide resolved

// ShardDDLOptimismSourceTablesKeyAdapter is used to store INITIAL upstream schema & table names when starting the subtask.
// In other words, if any Info for this subtask exists, we should obey source tables in the Info.
Expand Down Expand Up @@ -100,7 +103,7 @@ func keyAdapterKeysLen(s KeyAdapter) int {
ShardDDLPessimismInfoKeyAdapter, ShardDDLPessimismOperationKeyAdapter,
ShardDDLOptimismSourceTablesKeyAdapter:
return 2
case ShardDDLOptimismInitSchemaKeyAdapter:
case ShardDDLOptimismInitSchemaKeyAdapter, ShardDDLPessimismDDLsKeyAdapter:
return 3
case ShardDDLOptimismInfoKeyAdapter, ShardDDLOptimismOperationKeyAdapter:
return 4
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
schema, table = "foo", "bar"
ID = fmt.Sprintf("%s-`%s`.`%s`", taskName, schema, table)
i11 = pessimism.NewInfo(taskName, sources[0], schema, table, DDLs)
op2 = pessimism.NewOperation(ID, taskName, sources[0], DDLs, true, false)
op2 = pessimism.NewOperation(ID, taskName, sources[0], DDLs, true, false, false)
)
_, err = pessimism.PutInfo(etcdTestCli, i11)
c.Assert(err, check.IsNil)
Expand Down
49 changes: 29 additions & 20 deletions dm/master/shardddl/pessimist.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,13 @@ func (p *Pessimist) buildLocks(etcdCli *clientv3.Client) (int64, int64, error) {
}
p.logger.Info("get history shard DDL lock operation", zap.Reflect("operation", opm), zap.Int64("revision", rev2))

latestDoneDDLsMap, _, err := pessimism.GetAllLatestDoneDDLs(etcdCli)
if err != nil {
return 0, 0, err
}

// recover the shard DDL lock based on history shard DDL info & lock operation.
err = p.recoverLocks(ifm, opm)
err = p.recoverLocks(ifm, opm, latestDoneDDLsMap)
if err != nil {
// only log the error, and don't return it to forbid the startup of the DM-master leader.
// then these unexpected locks can be handled by the user.
Expand Down Expand Up @@ -291,7 +296,7 @@ func (p *Pessimist) UnlockLock(ctx context.Context, id, replaceOwner string, for

// 2. check whether has resolved before (this often should not happen).
if lock.IsResolved() {
err := p.removeLock(lock)
err := p.removeLockPutDDLs(lock)
if err != nil {
return err
}
Expand Down Expand Up @@ -398,20 +403,25 @@ func (p *Pessimist) RemoveMetaData(task string) error {
p.lk.RemoveLock(op.ID)
}

p.lk.RemoveLatestDoneDDLsByTask(task)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
// clear meta data in etcd
_, err = pessimism.DeleteInfosOperationsByTask(p.cli, task)
_, err = pessimism.DeleteInfosOperationsDDLsByTask(p.cli, task)
return err
}

// recoverLocks recovers shard DDL locks based on shard DDL info and shard DDL lock operation.
func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info, opm map[string]map[string]pessimism.Operation) error {
func (p *Pessimist) recoverLocks(ifm map[string]map[string]pessimism.Info,
opm map[string]map[string]pessimism.Operation, latestDoneDDLsMap map[string]map[string]map[string][]string) error {
// add all last done ddls.
p.lk.AddAllLatestDoneDDLs(latestDoneDDLsMap)

// construct locks based on the shard DDL info.
for task, ifs := range ifm {
sources := p.taskSources(task)
// if no operation exists for the lock, we let the smallest (lexicographical order) source as the owner of the lock.
// if any operation exists for the lock, we let the source with `exec=true` as the owner of the lock (the logic is below).
for _, info := range pessimismInfoMapToSlice(ifs) {
_, _, _, err := p.lk.TrySync(info, sources)
_, _, _, err := p.lk.TrySync(p.cli, info, sources)
if err != nil {
return err
}
Expand Down Expand Up @@ -471,7 +481,7 @@ func (p *Pessimist) handleInfoPut(ctx context.Context, infoCh <-chan pessimism.I
p.logger.Info("receive a shard DDL info", zap.Stringer("info", info))

p.infoOpMu.Lock()
lockID, synced, remain, err := p.lk.TrySync(info, p.taskSources(info.Task))
lockID, synced, remain, err := p.lk.TrySync(p.cli, info, p.taskSources(info.Task))
if err != nil {
p.logger.Error("fail to try sync shard DDL lock", zap.Stringer("info", info), log.ShortError(err))
// currently, only DDL mismatch will cause error
Expand Down Expand Up @@ -530,7 +540,7 @@ func (p *Pessimist) handleOperationPut(ctx context.Context, opCh <-chan pessimis
if lock.IsResolved() {
p.logger.Info("the lock for the shard DDL lock operation has been resolved", zap.Stringer("operation", op))
// remove all operations for this shard DDL lock.
err := p.removeLock(lock)
err := p.removeLockPutDDLs(lock)
if err != nil {
p.logger.Error("fail to delete the shard DDL lock operations", zap.String("lock", lock.ID), log.ShortError(err))
metrics.ReportDDLError(op.Task, metrics.OpErrRemoveLock)
Expand Down Expand Up @@ -576,11 +586,9 @@ func (p *Pessimist) handleLock(lockID, source string) error {
if lock.IsResolved() {
// remove all operations for this shard DDL lock.
// this is to handle the case where dm-master exit before deleting operations for them.
err := p.removeLock(lock)
if err != nil {
if err := p.removeLockPutDDLs(lock); err != nil {
return err
}
return nil
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}

// check whether the owner has done.
Expand All @@ -601,7 +609,7 @@ func (p *Pessimist) handleLock(lockID, source string) error {

// putOpForOwner PUTs the shard DDL lock operation for the owner into etcd.
func (p *Pessimist) putOpForOwner(lock *pessimism.Lock, owner string, skipDone bool) error {
op := pessimism.NewOperation(lock.ID, lock.Task, owner, lock.DDLs, true, false)
op := pessimism.NewOperation(lock.ID, lock.Task, owner, lock.DDLs, true, false, false)
rev, succ, err := pessimism.PutOperations(p.cli, skipDone, op)
if err != nil {
return err
Expand All @@ -625,7 +633,7 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s

ops := make([]pessimism.Operation, 0, len(sources))
for _, source := range sources {
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false, false))
}

rev, succ, err := pessimism.PutOperations(p.cli, skipDone, ops...)
Expand All @@ -637,9 +645,9 @@ func (p *Pessimist) putOpsForNonOwner(lock *pessimism.Lock, onlySource string, s
}

// removeLock removes the lock in memory and its information in etcd.
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
func (p *Pessimist) removeLock(lock *pessimism.Lock) error {
func (p *Pessimist) removeLockPutDDLs(lock *pessimism.Lock) error {
// remove all operations for this shard DDL lock.
if err := p.deleteOps(lock); err != nil {
if err := p.deleteOpsPutDDLs(lock); err != nil {
return err
}

Expand Down Expand Up @@ -672,23 +680,24 @@ func (p *Pessimist) removeLock(lock *pessimism.Lock) error {
}
})
p.lk.RemoveLock(lock.ID)
p.lk.AddLatestDoneDDLs(lock.ID, lock.DDLs)
return nil
}

// deleteOps DELETEs shard DDL lock operations relative to the lock.
func (p *Pessimist) deleteOps(lock *pessimism.Lock) error {
func (p *Pessimist) deleteOpsPutDDLs(lock *pessimism.Lock) error {
ready := lock.Ready()
ops := make([]pessimism.Operation, 0, len(ready))
for source := range ready {
// When deleting operations, we do not verify the value of the operation now,
// so simply set `exec=false` and `done=true`.
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false))
}
rev, err := pessimism.DeleteOperations(p.cli, ops...)
rev, err := pessimism.DeleteOperationsPutDDLs(p.cli, lock.ID, ops, lock.DDLs)
if err != nil {
return err
}
p.logger.Info("delete shard DDL lock operations", zap.String("lock", lock.ID), zap.Int64("revision", rev))
p.logger.Info("delete shard DDL lock operations and put latest done ddls", zap.String("lock", lock.ID), zap.Int64("revision", rev), zap.Strings("ddls", lock.DDLs))
return err
}

Expand All @@ -704,7 +713,7 @@ func (p *Pessimist) deleteInfosOps(lock *pessimism.Lock) error {
for source := range ready {
// When deleting operations, we do not verify the value of the operation now,
// so simply set `exec=false` and `done=true`.
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, true, false))
}

rev, err := pessimism.DeleteInfosOperations(p.cli, infos, ops)
Expand Down Expand Up @@ -785,7 +794,7 @@ func (p *Pessimist) waitNonOwnerToBeDone(ctx context.Context, lock *pessimism.Lo
// we still put `skip` operations for waitSources one more time with `skipDone=true`.
ops := make([]pessimism.Operation, 0, len(waitSources))
for _, source := range waitSources {
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false))
ops = append(ops, pessimism.NewOperation(lock.ID, lock.Task, source, lock.DDLs, false, false, false))
}
rev, succ, err := pessimism.PutOperations(p.cli, true, ops...)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions dm/master/shardddl/pessimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) {
c.Assert(p.Locks()[ID2].IsDone(source2), IsFalse)

// mark exec operation for one non-owner as `done` (and delete the info).
op22c := pessimism.NewOperation(ID2, task2, source2, DDLs, false, true)
op22c := pessimism.NewOperation(ID2, task2, source2, DDLs, false, true, false)
done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op22c, i22)
c.Assert(err, IsNil)
c.Assert(done, IsTrue)
Expand All @@ -325,7 +325,7 @@ func (t *testPessimist) testPessimistProgress(c *C, restart int) {

// mark skip operation for the non-owner as `done` (and delete the info).
// the lock should become resolved and deleted.
op23c := pessimism.NewOperation(ID2, task2, source3, DDLs, false, true)
op23c := pessimism.NewOperation(ID2, task2, source3, DDLs, false, true, false)
done, _, err = pessimism.PutOperationDeleteExistInfo(etcdTestCli, op23c, i23)
c.Assert(err, IsNil)
c.Assert(done, IsTrue)
Expand Down Expand Up @@ -844,7 +844,7 @@ func (t *testPessimist) TestMeetEtcdCompactError(c *C) {
ID1 = fmt.Sprintf("%s-`%s`.`%s`", task1, schema, table)
i11 = pessimism.NewInfo(task1, source1, schema, table, DDLs)
i12 = pessimism.NewInfo(task1, source2, schema, table, DDLs)
op = pessimism.NewOperation(ID1, task1, source1, DDLs, true, false)
op = pessimism.NewOperation(ID1, task1, source1, DDLs, true, false, false)
revCompacted int64

infoCh chan pessimism.Info
Expand Down
84 changes: 84 additions & 0 deletions pkg/shardddl/pessimism/ddls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pessimism

import (
"context"
"encoding/json"

"go.etcd.io/etcd/clientv3"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/etcdutil"
)

// putLatestDoneDDLsOp returns a PUT etcd operation for latest done ddls.
// This operation should often be sent by DM-master.
func putLatestDoneDDLsOp(task, downSchema, downTable string, ddls []string) (clientv3.Op, error) {
data, err := json.Marshal(ddls)
if err != nil {
return clientv3.Op{}, err
}
key := common.ShardDDLPessimismDDLsKeyAdapter.Encode(task, downSchema, downTable)

return clientv3.OpPut(key, string(data)), nil
}

// PutLatestDoneDDLs puts the last done shard DDL ddls into etcd.
func PutLatestDoneDDLs(cli *clientv3.Client, task, downSchema, downTable string, ddls []string) (int64, error) {
putOp, err := putLatestDoneDDLsOp(task, downSchema, downTable, ddls)
if err != nil {
return 0, err
}
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, putOp)
return rev, err
}

// GetAllLatestDoneDDLs gets all last done shard DDL ddls in etcd currently.
// k/v: task -> downSchema -> downTable -> DDLs
// This function should often be called by DM-master.
func GetAllLatestDoneDDLs(cli *clientv3.Client) (map[string]map[string]map[string][]string, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := cli.Get(ctx, common.ShardDDLPessimismDDLsKeyAdapter.Path(), clientv3.WithPrefix())
if err != nil {
return nil, 0, err
}

ddlsMap := make(map[string]map[string]map[string][]string, len(resp.Kvs))
for _, kv := range resp.Kvs {
var ddls []string
if err2 := json.Unmarshal(kv.Value, &ddls); err2 != nil {
return nil, 0, err2
}
keys, err2 := common.ShardDDLPessimismDDLsKeyAdapter.Decode(string(kv.Key))
if err2 != nil {
return nil, 0, err2
}
task := keys[0]
downSchema := keys[1]
downTable := keys[2]

if _, ok := ddlsMap[task]; !ok {
ddlsMap[task] = make(map[string]map[string][]string)
}
if _, ok := ddlsMap[task][downSchema]; !ok {
ddlsMap[task][downSchema] = make(map[string][]string)
}
ddlsMap[task][downSchema][downTable] = ddls
}

return ddlsMap, resp.Header.Revision, nil
}
58 changes: 58 additions & 0 deletions pkg/shardddl/pessimism/ddls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pessimism

import (
. "github.com/pingcap/check"

"github.com/pingcap/dm/pkg/utils"
)

func (t *testForEtcd) TestDDLsEtcd(c *C) {
defer clearTestInfoOperation(c)

var (
ID1 = "test1-`foo`.`bar`"
ID2 = "test2-`foo`.`bar`"
task1, downSchema1, downTable1 = utils.ExtractAllFromLockID(ID1)
task2, downSchema2, downTable2 = utils.ExtractAllFromLockID(ID2)
DDLs1 = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
DDLs2 = []string{"ALTER TABLE bar ADD COLUMN c2 INT"}
)

// put the same keys twice.
rev1, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs1)
c.Assert(err, IsNil)
rev2, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs1)
c.Assert(err, IsNil)
c.Assert(rev2, Greater, rev1)

// put another DDLs
rev3, err := PutLatestDoneDDLs(etcdTestCli, task1, downSchema1, downTable1, DDLs2)
c.Assert(err, IsNil)
c.Assert(rev3, Greater, rev2)

// put for another lock
rev4, err := PutLatestDoneDDLs(etcdTestCli, task2, downSchema2, downTable2, DDLs1)
c.Assert(err, IsNil)
c.Assert(rev4, Greater, rev3)

// get all ddls.
latestDoneDDLsMap, rev5, err := GetAllLatestDoneDDLs(etcdTestCli)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, rev4)
c.Assert(latestDoneDDLsMap, HasLen, 2)
c.Assert(latestDoneDDLsMap[task1][downSchema1][downTable1], DeepEquals, DDLs2)
c.Assert(latestDoneDDLsMap[task2][downSchema2][downTable2], DeepEquals, DDLs1)
}
2 changes: 1 addition & 1 deletion pkg/shardddl/pessimism/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (t *testForEtcd) TestPutInfoIfOpNotDone(c *C) {
DDLs = []string{"ALTER TABLE bar ADD COLUMN c1 INT"}
ID = fmt.Sprintf("%s-%s", task, dbutil.TableName(schema, table))
info = NewInfo(task, source, schema, table, DDLs)
op = NewOperation(ID, task, source, DDLs, false, false)
op = NewOperation(ID, task, source, DDLs, false, false, false)
)

// put info success because no operation exist.
Expand Down
Loading