From 68b0c8eba92c2b78164e3fe8a3dbd1475b503d06 Mon Sep 17 00:00:00 2001 From: zouxingyuks <1308345487@qq.com> Date: Wed, 11 Dec 2024 16:40:43 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20etcd=20keepAlive=20?= =?UTF-8?q?=E6=97=A0=E6=B3=95=E5=8F=8A=E6=97=B6=E5=85=B3=E9=97=AD=20conten?= =?UTF-8?q?t=20--story=3D121171780?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/serviced/service.go | 83 +++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 32 deletions(-) diff --git a/pkg/serviced/service.go b/pkg/serviced/service.go index f2a337bb64..04ff7bdc3a 100644 --- a/pkg/serviced/service.go +++ b/pkg/serviced/service.go @@ -169,43 +169,13 @@ func (s *service) keepAlive(key string, value string) { // if the current lease is 0, you need to lease the lease and use this put kv (bind lease). // if the lease is not 0, the put has been completed and the lease needs to be renewed. if curLeaseID == 0 { - leaseResp, err := lease.Grant(s.ctx, defaultGrantLeaseTTL) - if err != nil { - logs.Errorf("grant lease failed, key: %s, err: %v", key, err) + if err := s.grantLeaseEtcd(lease, key, value); err != nil { time.Sleep(defaultErrSleepTime) continue } - ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout) - defer cancel() - _, err = s.cli.Put(ctx, key, value, etcd3.WithLease(leaseResp.ID)) - if err != nil { - logs.Errorf("put kv failed, key: %s, lease: %d, err: %v", key, leaseResp.ID, err) - time.Sleep(defaultErrSleepTime) - continue - } - - s.updateRegisterFlag(true) - s.updateLeaseID(leaseResp.ID) } else { - // before keep alive, need to judge if service key exists. - // if not exist, need to re-register. - ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout) - defer cancel() - resp, err := s.cli.Get(ctx, key) - if err != nil { - logs.Errorf("get key failed, lease: %d, err: %v", curLeaseID, err) - s.keepAliveFailed() - continue - } - if len(resp.Kvs) == 0 { - logs.Warnf("current service key [%s, %s] is not exist, need to re-register", key, value) - s.keepAliveFailed() - continue - } - - if _, err := lease.KeepAliveOnce(s.ctx, curLeaseID); err != nil { - logs.Errorf("keep alive lease failed, lease: %d, err: %v", curLeaseID, err) + if exist := s.KeepAlive(key, curLeaseID, value, lease); !exist { s.keepAliveFailed() continue } @@ -216,6 +186,55 @@ func (s *service) keepAlive(key string, value string) { }() } +// grantLeaseEtcd grant lease and put kv with lease. +func (s *service) grantLeaseEtcd(lease etcd3.Lease, key string, value string) error { + leaseResp, err := lease.Grant(s.ctx, defaultGrantLeaseTTL) + if err != nil { + logs.Errorf("grant lease failed, key: %s, err: %v", key, err) + return err + } + + ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout) + defer cancel() + + _, err = s.cli.Put(ctx, key, value, etcd3.WithLease(leaseResp.ID)) + if err != nil { + logs.Errorf("put kv failed, key: %s, lease: %d, err: %v", key, leaseResp.ID, err) + return err + } + + s.updateRegisterFlag(true) + s.updateLeaseID(leaseResp.ID) + + logs.Infof("put kv with lease success, key: %s, lease: %d", key, leaseResp.ID) + return nil +} + +// KeepAlive keep alive lease, if the lease is not exist, need to re-register. +func (s *service) KeepAlive(key string, curLeaseID etcd3.LeaseID, value string, lease etcd3.Lease) bool { + // before keep alive, need to judge if service key exists. + // if not exist, need to re-register. + + ctx, cancel := context.WithTimeout(s.ctx, defaultEtcdTimeout) + defer cancel() + resp, err := s.cli.Get(ctx, key) + if err != nil { + logs.Errorf("get key failed, lease: %d, err: %v", curLeaseID, err) + return false + } + + if len(resp.Kvs) == 0 { + logs.Warnf("current service key [%s, %s] is not exist, need to re-register", key, value) + return false + } + + if _, err := lease.KeepAliveOnce(s.ctx, curLeaseID); err != nil { + logs.Errorf("keep alive lease failed, lease: %d, err: %v", curLeaseID, err) + return false + } + return true +} + // keepAliveFailed keep alive lease failed, need to exec action. func (s *service) keepAliveFailed() { s.updateRegisterFlag(false)