Skip to content

Commit

Permalink
This is an automated cherry-pick of #55692
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
joccau authored and ti-chi-bot committed Oct 31, 2024
1 parent 116ffee commit 1b8f809
Show file tree
Hide file tree
Showing 3 changed files with 579 additions and 10 deletions.
4 changes: 4 additions & 0 deletions owner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ go_test(
],
embed = [":owner"],
flaky = True,
<<<<<<< HEAD:owner/BUILD.bazel
=======
shard_count = 7,
>>>>>>> 0720ea86949 (ddl: watch the ddl ownerkey with the createRevision (#55692)):pkg/owner/BUILD.bazel
deps = [
"//ddl",
"//infoschema",
Expand Down
149 changes: 139 additions & 10 deletions owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,18 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
continue
}

<<<<<<< HEAD:owner/manager.go
ownerKey, err := GetOwnerInfo(campaignContext, logCtx, elec, m.id)
=======
ownerKey, currRev, err := GetOwnerKeyInfo(campaignContext, logCtx, m.etcdCli, m.key, m.id)
>>>>>>> 0720ea86949 (ddl: watch the ddl ownerkey with the createRevision (#55692)):pkg/owner/manager.go
if err != nil {
continue
}

m.toBeOwner(elec)
m.watchOwner(campaignContext, etcdSession, ownerKey)
err = m.watchOwner(campaignContext, etcdSession, ownerKey, currRev)
logutil.Logger(logCtx).Info("watch owner finished", zap.Error(err))
m.RetireOwner()

metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc()
Expand All @@ -274,9 +279,56 @@ func (m *ownerManager) revokeSession(_ string, leaseID clientv3.LeaseID) {

// GetOwnerID implements Manager.GetOwnerID interface.
func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) {
<<<<<<< HEAD:owner/manager.go
resp, err := m.etcdCli.Get(ctx, m.key, clientv3.WithFirstCreate()...)
=======
_, ownerID, _, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
return string(ownerID), errors.Trace(err)
}

func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, int64, error) {
var op OpType
var resp *clientv3.GetResponse
var err error
for i := 0; i < 3; i++ {
if err = ctx.Err(); err != nil {
return "", nil, op, 0, 0, errors.Trace(err)
}

childCtx, cancel := context.WithTimeout(ctx, util.KeyOpDefaultTimeout)
resp, err = etcdCli.Get(childCtx, ownerPath, clientv3.WithFirstCreate()...)
cancel()
if err == nil {
break
}
logutil.Logger(logCtx).Info("etcd-cli get owner info failed", zap.String("key", ownerPath), zap.Int("retryCnt", i), zap.Error(err))
time.Sleep(util.KeyOpRetryInterval)
}
if err != nil {
return "", errors.Trace(err)
logutil.Logger(logCtx).Warn("etcd-cli get owner info failed", zap.Error(err))
return "", nil, op, 0, 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return "", nil, op, 0, 0, concurrency.ErrElectionNoLeader
}

var ownerID []byte
ownerID, op = splitOwnerValues(resp.Kvs[0].Value)
logutil.Logger(logCtx).Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key),
zap.ByteString("ownerID", ownerID), zap.Stringer("op", op))
return string(resp.Kvs[0].Key), ownerID, op, resp.Header.Revision, resp.Kvs[0].ModRevision, nil
}

// GetOwnerKeyInfo gets the owner key and current revision.
func GetOwnerKeyInfo(
ctx, logCtx context.Context,
etcdCli *clientv3.Client,
etcdKey, id string,
) (string, int64, error) {
ownerKey, ownerID, _, currRevision, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey)
>>>>>>> 0720ea86949 (ddl: watch the ddl ownerkey with the createRevision (#55692)):pkg/owner/manager.go
if err != nil {
return "", 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return "", concurrency.ErrElectionNoLeader
Expand All @@ -296,44 +348,121 @@ func GetOwnerInfo(ctx, logCtx context.Context, elec *concurrency.Election, id st
logutil.Logger(logCtx).Info("get owner", zap.String("ownerID", ownerID))
if ownerID != id {
logutil.Logger(logCtx).Warn("is not the owner")
return "", errors.New("ownerInfoNotMatch")
return "", 0, errors.New("ownerInfoNotMatch")
}

<<<<<<< HEAD:owner/manager.go
return string(resp.Kvs[0].Key), nil
=======
return ownerKey, currRevision, nil
}

func splitOwnerValues(val []byte) ([]byte, OpType) {
vals := bytes.Split(val, []byte("_"))
var op OpType
if len(vals) == 2 {
op = OpType(vals[1][0])
}
return vals[0], op
}

func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string) {
func joinOwnerValues(vals ...[]byte) []byte {
return bytes.Join(vals, []byte("_"))
}

// SetOwnerOpValue implements Manager.SetOwnerOpValue interface.
func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error {
// owner don't change.
ownerKey, ownerID, currOp, _, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
if err != nil {
return errors.Trace(err)
}
if currOp == op {
logutil.Logger(m.logCtx).Info("set owner op is the same as the original, so do nothing.", zap.Stringer("op", op))
return nil
}
if string(ownerID) != m.id {
return errors.New("ownerInfoNotMatch")
}
newOwnerVal := joinOwnerValues(ownerID, []byte{byte(op)})

failpoint.Inject("MockDelOwnerKey", func(v failpoint.Value) {
if valStr, ok := v.(string); ok {
if err := mockDelOwnerKey(valStr, ownerKey, m); err != nil {
failpoint.Return(err)
}
}
})

leaseOp := clientv3.WithLease(clientv3.LeaseID(m.sessionLease.Load()))
resp, err := m.etcdCli.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(ownerKey), "=", modRevision)).
Then(clientv3.OpPut(ownerKey, string(newOwnerVal), leaseOp)).
Commit()
if err == nil && !resp.Succeeded {
err = errors.New("put owner key failed, cmp is false")
}
logutil.BgLogger().Info("set owner op value", zap.String("owner key", ownerKey), zap.ByteString("ownerID", ownerID),
zap.Stringer("old Op", currOp), zap.Stringer("op", op), zap.Error(err))
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.PutValue+"_"+metrics.RetLabel(err)).Inc()
return errors.Trace(err)
}

// GetOwnerOpValue gets the owner op value.
func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath, logPrefix string) (OpType, error) {
// It's using for testing.
if etcdCli == nil {
return *mockOwnerOpValue.Load(), nil
}

logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
_, _, op, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath)
return op, errors.Trace(err)
>>>>>>> 0720ea86949 (ddl: watch the ddl ownerkey with the createRevision (#55692)):pkg/owner/manager.go
}

// WatchOwnerForTest watches the ownerKey.
// This function is used to test watchOwner().
func WatchOwnerForTest(ctx context.Context, m Manager, etcdSession *concurrency.Session, key string, createRevison int64) error {
if ownerManager, ok := m.(*ownerManager); ok {
return ownerManager.watchOwner(ctx, etcdSession, key, createRevison)
}
return nil
}

func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string, currRev int64) error {
logPrefix := fmt.Sprintf("[%s] ownerManager %s watch owner key %v", m.prompt, m.id, key)
logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
logutil.BgLogger().Debug(logPrefix)
watchCh := m.etcdCli.Watch(ctx, key)
// we need to watch the ownerKey since currRev + 1.
watchCh := m.etcdCli.Watch(ctx, key, clientv3.WithRev(currRev+1))
for {
select {
case resp, ok := <-watchCh:
if !ok {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.WatcherClosed).Inc()
logutil.Logger(logCtx).Info("watcher is closed, no owner")
return
return errors.Errorf("watcher is closed, key: %v", key)
}
if resp.Canceled {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Cancelled).Inc()
logutil.Logger(logCtx).Info("watch canceled, no owner")
return
return errors.Errorf("watch canceled, key: %v", key)
}

for _, ev := range resp.Events {
if ev.Type == mvccpb.DELETE {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Deleted).Inc()
logutil.Logger(logCtx).Info("watch failed, owner is deleted")
return
return nil
}
}
case <-etcdSession.Done():
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.SessionDone).Inc()
return
return nil
case <-ctx.Done():
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.CtxDone).Inc()
return
return nil
}
}
}
Expand Down
Loading

0 comments on commit 1b8f809

Please sign in to comment.