Skip to content

Commit

Permalink
ms/tso: move startGlobalAllocatorLoop outside NewAllocatorManager (
Browse files Browse the repository at this point in the history
…#8725)

ref #4399

Signed-off-by: okJiang <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
okJiang and ti-chi-bot[bot] authored Oct 22, 2024
1 parent 003def6 commit 25629f2
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 12 deletions.
18 changes: 14 additions & 4 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ func NewAllocatorManager(
rootPath string,
storage endpoint.TSOStorage,
cfg Config,
startGlobalLeaderLoop bool,
) *AllocatorManager {
ctx, cancel := context.WithCancel(ctx)
am := &AllocatorManager{
Expand All @@ -224,7 +223,7 @@ func NewAllocatorManager(
am.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn)

// Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully.
am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership(), startGlobalLeaderLoop)
am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership())
am.svcLoopWG.Add(1)
go am.tsoAllocatorLoop()

Expand All @@ -234,11 +233,11 @@ func NewAllocatorManager(
// SetUpGlobalAllocator is used to set up the global allocator, which will initialize the allocator and put it into
// an allocator daemon. An TSO Allocator should only be set once, and may be initialized and reset multiple times
// depending on the election.
func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership, startGlobalLeaderLoop bool) {
func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership) {
am.mu.Lock()
defer am.mu.Unlock()

allocator := NewGlobalTSOAllocator(ctx, am, startGlobalLeaderLoop)
allocator := NewGlobalTSOAllocator(ctx, am)
// Create a new allocatorGroup
ctx, cancel := context.WithCancel(ctx)
am.mu.allocatorGroups[GlobalDCLocation] = &allocatorGroup{
Expand Down Expand Up @@ -1389,3 +1388,14 @@ func (am *AllocatorManager) GetLeaderAddr() string {
}
return leaderAddrs[0]
}

func (am *AllocatorManager) startGlobalAllocatorLoop() {
globalTSOAllocator, ok := am.mu.allocatorGroups[GlobalDCLocation].allocator.(*GlobalTSOAllocator)
if !ok {
// it should never happen
log.Error("failed to start global allocator loop, global allocator not found")
return
}
globalTSOAllocator.wg.Add(1)
go globalTSOAllocator.primaryElectionLoop()
}
8 changes: 2 additions & 6 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ type GlobalTSOAllocator struct {
func NewGlobalTSOAllocator(
ctx context.Context,
am *AllocatorManager,
startGlobalLeaderLoop bool,
) Allocator {
ctx, cancel := context.WithCancel(ctx)
gta := &GlobalTSOAllocator{
Expand All @@ -109,11 +108,6 @@ func NewGlobalTSOAllocator(
tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), GlobalDCLocation),
}

if startGlobalLeaderLoop {
gta.wg.Add(1)
go gta.primaryElectionLoop()
}

return gta
}

Expand Down Expand Up @@ -537,6 +531,8 @@ func (gta *GlobalTSOAllocator) Reset() {
gta.timestampOracle.ResetTimestamp()
}

// primaryElectionLoop is used to maintain the TSO primary election and TSO's
// running allocator. It is only used in API mode.
func (gta *GlobalTSOAllocator) primaryElectionLoop() {
defer logutil.LogPanic()
defer gta.wg.Done()
Expand Down
3 changes: 2 additions & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
storage = kgm.tsoSvcStorage
}
// Initialize all kinds of maps.
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg)
am.startGlobalAllocatorLoop()
log.Info("created allocator manager",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("timestamp-path", am.GetTimestampPath("")))
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize)
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false)
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s)
// When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists.
if !s.cfg.EnableLocalTSO {
if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {
Expand Down

0 comments on commit 25629f2

Please sign in to comment.