diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 56ee8313d57..a02d4884e17 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -202,7 +202,6 @@ func NewAllocatorManager( rootPath string, storage endpoint.TSOStorage, cfg Config, - startGlobalLeaderLoop bool, ) *AllocatorManager { ctx, cancel := context.WithCancel(ctx) am := &AllocatorManager{ @@ -224,7 +223,7 @@ func NewAllocatorManager( am.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn) // Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully. - am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership(), startGlobalLeaderLoop) + am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership()) am.svcLoopWG.Add(1) go am.tsoAllocatorLoop() @@ -234,11 +233,11 @@ func NewAllocatorManager( // SetUpGlobalAllocator is used to set up the global allocator, which will initialize the allocator and put it into // an allocator daemon. An TSO Allocator should only be set once, and may be initialized and reset multiple times // depending on the election. -func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership, startGlobalLeaderLoop bool) { +func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership) { am.mu.Lock() defer am.mu.Unlock() - allocator := NewGlobalTSOAllocator(ctx, am, startGlobalLeaderLoop) + allocator := NewGlobalTSOAllocator(ctx, am) // Create a new allocatorGroup ctx, cancel := context.WithCancel(ctx) am.mu.allocatorGroups[GlobalDCLocation] = &allocatorGroup{ @@ -1389,3 +1388,14 @@ func (am *AllocatorManager) GetLeaderAddr() string { } return leaderAddrs[0] } + +func (am *AllocatorManager) startGlobalAllocatorLoop() { + globalTSOAllocator, ok := am.mu.allocatorGroups[GlobalDCLocation].allocator.(*GlobalTSOAllocator) + if !ok { + // it should never happen + log.Error("failed to start global allocator loop, global allocator not found") + return + } + globalTSOAllocator.wg.Add(1) + go globalTSOAllocator.primaryElectionLoop() +} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 53a6f65a25d..5c7c905089c 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -97,7 +97,6 @@ type GlobalTSOAllocator struct { func NewGlobalTSOAllocator( ctx context.Context, am *AllocatorManager, - startGlobalLeaderLoop bool, ) Allocator { ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ @@ -109,11 +108,6 @@ func NewGlobalTSOAllocator( tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), GlobalDCLocation), } - if startGlobalLeaderLoop { - gta.wg.Add(1) - go gta.primaryElectionLoop() - } - return gta } @@ -537,6 +531,8 @@ func (gta *GlobalTSOAllocator) Reset() { gta.timestampOracle.ResetTimestamp() } +// primaryElectionLoop is used to maintain the TSO primary election and TSO's +// running allocator. It is only used in API mode. func (gta *GlobalTSOAllocator) primaryElectionLoop() { defer logutil.LogPanic() defer gta.wg.Done() diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 7c1d3426ce9..c19d790efc5 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -790,7 +790,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro storage = kgm.tsoSvcStorage } // Initialize all kinds of maps. - am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg) + am.startGlobalAllocatorLoop() log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID), zap.String("timestamp-path", am.GetTimestampPath(""))) diff --git a/server/server.go b/server/server.go index c79f51d8153..9691633bae2 100644 --- a/server/server.go +++ b/server/server.go @@ -475,7 +475,7 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize) s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} - s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false) + s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s) // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. if !s.cfg.EnableLocalTSO { if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {