Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 修复 etcd keepAlive 无法及时关闭 context --story=121171780 #1214

Open
wants to merge 1 commit into
base: v1.7.x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 53 additions & 27 deletions pkg/serviced/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,43 +169,20 @@ func (s *service) keepAlive(key string, value string) {
// if the current lease is 0, you need to lease the lease and use this put kv (bind lease).
// if the lease is not 0, the put has been completed and the lease needs to be renewed.
if curLeaseID == 0 {
leaseResp, err := lease.Grant(s.ctx, defaultGrantLeaseTTL)
if err != nil {
logs.Errorf("grant lease failed, key: %s, err: %v", key, err)
time.Sleep(defaultErrSleepTime)
continue
}

ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout)
defer cancel()
_, err = s.cli.Put(ctx, key, value, etcd3.WithLease(leaseResp.ID))
if err != nil {
logs.Errorf("put kv failed, key: %s, lease: %d, err: %v", key, leaseResp.ID, err)
if err := s.grantLeaseEtcd(lease, key, value); err != nil {
time.Sleep(defaultErrSleepTime)
continue
}

s.updateRegisterFlag(true)
s.updateLeaseID(leaseResp.ID)
} else {
// before keep alive, need to judge if service key exists.
// if not exist, need to re-register.
ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout)
defer cancel()
resp, err := s.cli.Get(ctx, key)
exist, err := s.KeepAlive(key, curLeaseID, value, lease)
if err != nil {
logs.Errorf("get key failed, lease: %d, err: %v", curLeaseID, err)
s.keepAliveFailed()
continue
}
if len(resp.Kvs) == 0 {
logs.Warnf("current service key [%s, %s] is not exist, need to re-register", key, value)
logs.Errorf("keep alive failed, key: %s, lease: %d, err: %v", key, curLeaseID, err)
s.keepAliveFailed()
continue
}

if _, err := lease.KeepAliveOnce(s.ctx, curLeaseID); err != nil {
logs.Errorf("keep alive lease failed, lease: %d, err: %v", curLeaseID, err)
if !exist {
s.keepAliveFailed()
continue
}
Expand All @@ -216,6 +193,55 @@ func (s *service) keepAlive(key string, value string) {
}()
}

// grantLeaseEtcd grant lease and put kv with lease.
func (s *service) grantLeaseEtcd(lease etcd3.Lease, key string, value string) error {
leaseResp, err := lease.Grant(s.ctx, defaultGrantLeaseTTL)
if err != nil {
logs.Errorf("grant lease failed, key: %s, err: %v", key, err)
return err
}
Comment on lines +198 to +202
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里没设置超时

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

另外在调用keepalive之前会第一次grant lease 也没有配置超时


ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout)
defer cancel()

_, err = s.cli.Put(ctx, key, value, etcd3.WithLease(leaseResp.ID))
if err != nil {
logs.Errorf("put kv failed, key: %s, lease: %d, err: %v", key, leaseResp.ID, err)
return err
}

s.updateRegisterFlag(true)
s.updateLeaseID(leaseResp.ID)

logs.Infof("put kv with lease success, key: %s, lease: %d", key, leaseResp.ID)
return nil
}

// KeepAlive keep alive lease, if the lease is not exist, need to re-register.
func (s *service) KeepAlive(key string, curLeaseID etcd3.LeaseID, value string, lease etcd3.Lease) (bool, error) {
// before keep alive, need to judge if service key exists.
// if not exist, need to re-register.

ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout)
defer cancel()
resp, err := s.cli.Get(ctx, key)
if err != nil {
logs.Errorf("get key failed, lease: %d, err: %v", curLeaseID, err)
return false, err
}

if len(resp.Kvs) == 0 {
logs.Warnf("current service key [%s, %s] is not exist, need to re-register", key, value)
return false, errors.New("service key not exist")
}

if _, err := lease.KeepAliveOnce(s.ctx, curLeaseID); err != nil {
logs.Errorf("keep alive lease failed, lease: %d, err: %v", curLeaseID, err)
return false, err
}
return true, nil
}

// keepAliveFailed keep alive lease failed, need to exec action.
func (s *service) keepAliveFailed() {
s.updateRegisterFlag(false)
Expand Down