From 058cdb44b13db9470a12c7e7e2b6ad8b51e9dfe5 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 17 Oct 2024 18:03:22 +0800 Subject: [PATCH 1/3] move place Signed-off-by: okJiang <819421878@qq.com> --- pkg/tso/allocator_manager.go | 18 ++++++++++++++---- pkg/tso/global_allocator.go | 8 ++------ pkg/tso/keyspace_group_manager.go | 3 ++- server/server.go | 2 +- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 56ee8313d57..930ba7d4ce1 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -202,7 +202,6 @@ func NewAllocatorManager( rootPath string, storage endpoint.TSOStorage, cfg Config, - startGlobalLeaderLoop bool, ) *AllocatorManager { ctx, cancel := context.WithCancel(ctx) am := &AllocatorManager{ @@ -224,7 +223,7 @@ func NewAllocatorManager( am.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn) // Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully. - am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership(), startGlobalLeaderLoop) + am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership()) am.svcLoopWG.Add(1) go am.tsoAllocatorLoop() @@ -234,11 +233,11 @@ func NewAllocatorManager( // SetUpGlobalAllocator is used to set up the global allocator, which will initialize the allocator and put it into // an allocator daemon. An TSO Allocator should only be set once, and may be initialized and reset multiple times // depending on the election. -func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership, startGlobalLeaderLoop bool) { +func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership) { am.mu.Lock() defer am.mu.Unlock() - allocator := NewGlobalTSOAllocator(ctx, am, startGlobalLeaderLoop) + allocator := NewGlobalTSOAllocator(ctx, am) // Create a new allocatorGroup ctx, cancel := context.WithCancel(ctx) am.mu.allocatorGroups[GlobalDCLocation] = &allocatorGroup{ @@ -1389,3 +1388,14 @@ func (am *AllocatorManager) GetLeaderAddr() string { } return leaderAddrs[0] } + +func (am *AllocatorManager) startGlobalLeaderLoop() { + globalTSOAllocator, ok := am.mu.allocatorGroups[GlobalDCLocation].allocator.(*GlobalTSOAllocator) + if !ok { + // it should never happen + log.Error("failed to start global leader loop, global allocator not found") + return + } + globalTSOAllocator.wg.Add(1) + go globalTSOAllocator.primaryElectionLoop() +} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 53a6f65a25d..5c7c905089c 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -97,7 +97,6 @@ type GlobalTSOAllocator struct { func NewGlobalTSOAllocator( ctx context.Context, am *AllocatorManager, - startGlobalLeaderLoop bool, ) Allocator { ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ @@ -109,11 +108,6 @@ func NewGlobalTSOAllocator( tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), GlobalDCLocation), } - if startGlobalLeaderLoop { - gta.wg.Add(1) - go gta.primaryElectionLoop() - } - return gta } @@ -537,6 +531,8 @@ func (gta *GlobalTSOAllocator) Reset() { gta.timestampOracle.ResetTimestamp() } +// primaryElectionLoop is used to maintain the TSO primary election and TSO's +// running allocator. It is only used in API mode. func (gta *GlobalTSOAllocator) primaryElectionLoop() { defer logutil.LogPanic() defer gta.wg.Done() diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 7c1d3426ce9..6561c4880b2 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -790,7 +790,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro storage = kgm.tsoSvcStorage } // Initialize all kinds of maps. - am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg) + am.startGlobalLeaderLoop() log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID), zap.String("timestamp-path", am.GetTimestampPath(""))) diff --git a/server/server.go b/server/server.go index c79f51d8153..9691633bae2 100644 --- a/server/server.go +++ b/server/server.go @@ -475,7 +475,7 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize) s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} - s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false) + s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s) // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. if !s.cfg.EnableLocalTSO { if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil { From 4ffa54650ccaeae3273146b5d8644d9cc66f7555 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 17 Oct 2024 18:06:54 +0800 Subject: [PATCH 2/3] rename Signed-off-by: okJiang <819421878@qq.com> --- pkg/tso/allocator_manager.go | 2 +- pkg/tso/keyspace_group_manager.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 930ba7d4ce1..186d31d2a7f 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -1389,7 +1389,7 @@ func (am *AllocatorManager) GetLeaderAddr() string { return leaderAddrs[0] } -func (am *AllocatorManager) startGlobalLeaderLoop() { +func (am *AllocatorManager) startGlobalAllocatorLoop() { globalTSOAllocator, ok := am.mu.allocatorGroups[GlobalDCLocation].allocator.(*GlobalTSOAllocator) if !ok { // it should never happen diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 6561c4880b2..c19d790efc5 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -791,7 +791,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro } // Initialize all kinds of maps. am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg) - am.startGlobalLeaderLoop() + am.startGlobalAllocatorLoop() log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID), zap.String("timestamp-path", am.GetTimestampPath(""))) From 0b0bd70cce0beb4fce5bb6ebda95f4726f5db9f2 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Thu, 17 Oct 2024 18:12:03 +0800 Subject: [PATCH 3/3] fix typo Signed-off-by: okJiang <819421878@qq.com> --- pkg/tso/allocator_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 186d31d2a7f..a02d4884e17 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -1393,7 +1393,7 @@ func (am *AllocatorManager) startGlobalAllocatorLoop() { globalTSOAllocator, ok := am.mu.allocatorGroups[GlobalDCLocation].allocator.(*GlobalTSOAllocator) if !ok { // it should never happen - log.Error("failed to start global leader loop, global allocator not found") + log.Error("failed to start global allocator loop, global allocator not found") return } globalTSOAllocator.wg.Add(1)