Skip to content

Commit

Permalink
fix: 修复 etcd keepAlive 无法及时关闭 content --story=121171780
Browse files Browse the repository at this point in the history
  • Loading branch information
zouxingyuks committed Dec 11, 2024
1 parent 65df776 commit d650d7e
Showing 1 changed file with 51 additions and 32 deletions.
83 changes: 51 additions & 32 deletions pkg/serviced/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,43 +169,13 @@ func (s *service) keepAlive(key string, value string) {
// if the current lease is 0, you need to lease the lease and use this put kv (bind lease).
// if the lease is not 0, the put has been completed and the lease needs to be renewed.
if curLeaseID == 0 {
leaseResp, err := lease.Grant(s.ctx, defaultGrantLeaseTTL)
if err != nil {
logs.Errorf("grant lease failed, key: %s, err: %v", key, err)
if err := s.grantLeaseEtcd(lease, key, value); err != nil {
time.Sleep(defaultErrSleepTime)
continue
}

ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout)
defer cancel()
_, err = s.cli.Put(ctx, key, value, etcd3.WithLease(leaseResp.ID))
if err != nil {
logs.Errorf("put kv failed, key: %s, lease: %d, err: %v", key, leaseResp.ID, err)
time.Sleep(defaultErrSleepTime)
continue
}

s.updateRegisterFlag(true)
s.updateLeaseID(leaseResp.ID)
} else {
// before keep alive, need to judge if service key exists.
// if not exist, need to re-register.
ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout)
defer cancel()
resp, err := s.cli.Get(ctx, key)
if err != nil {
logs.Errorf("get key failed, lease: %d, err: %v", curLeaseID, err)
s.keepAliveFailed()
continue
}
if len(resp.Kvs) == 0 {
logs.Warnf("current service key [%s, %s] is not exist, need to re-register", key, value)
s.keepAliveFailed()
continue
}

if _, err := lease.KeepAliveOnce(s.ctx, curLeaseID); err != nil {
logs.Errorf("keep alive lease failed, lease: %d, err: %v", curLeaseID, err)
if exist, _ := s.KeepAlive(key, curLeaseID, value, lease); !exist {
s.keepAliveFailed()
continue
}
Expand All @@ -216,6 +186,55 @@ func (s *service) keepAlive(key string, value string) {
}()
}

// grantLeaseEtcd grant lease and put kv with lease.
func (s *service) grantLeaseEtcd(lease etcd3.Lease, key string, value string) error {
leaseResp, err := lease.Grant(s.ctx, defaultGrantLeaseTTL)
if err != nil {
logs.Errorf("grant lease failed, key: %s, err: %v", key, err)
return err
}

ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout)
defer cancel()

_, err = s.cli.Put(ctx, key, value, etcd3.WithLease(leaseResp.ID))
if err != nil {
logs.Errorf("put kv failed, key: %s, lease: %d, err: %v", key, leaseResp.ID, err)
return err
}

s.updateRegisterFlag(true)
s.updateLeaseID(leaseResp.ID)

logs.Infof("put kv with lease success, key: %s, lease: %d", key, leaseResp.ID)
return nil
}

// KeepAlive keep alive lease, if the lease is not exist, need to re-register.
func (s *service) KeepAlive(key string, curLeaseID etcd3.LeaseID, value string, lease etcd3.Lease) (bool, error) {
// before keep alive, need to judge if service key exists.
// if not exist, need to re-register.

ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout)
defer cancel()
resp, err := s.cli.Get(ctx, key)
if err != nil {
logs.Errorf("get key failed, lease: %d, err: %v", curLeaseID, err)
return false, err
}

if len(resp.Kvs) == 0 {
logs.Warnf("current service key [%s, %s] is not exist, need to re-register", key, value)
return false, errors.New("service key not exist")
}

if _, err := lease.KeepAliveOnce(s.ctx, curLeaseID); err != nil {
logs.Errorf("keep alive lease failed, lease: %d, err: %v", curLeaseID, err)
return false, err
}
return true, nil
}

// keepAliveFailed keep alive lease failed, need to exec action.
func (s *service) keepAliveFailed() {
s.updateRegisterFlag(false)
Expand Down

0 comments on commit d650d7e

Please sign in to comment.