Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ms/tso: move startGlobalAllocatorLoop outside NewAllocatorManager #8725

Merged
merged 4 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@
rootPath string,
storage endpoint.TSOStorage,
cfg Config,
startGlobalLeaderLoop bool,
) *AllocatorManager {
ctx, cancel := context.WithCancel(ctx)
am := &AllocatorManager{
Expand All @@ -224,7 +223,7 @@
am.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn)

// Set up the Global TSO Allocator here, it will be initialized once the member campaigns leader successfully.
am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership(), startGlobalLeaderLoop)
am.SetUpGlobalAllocator(am.ctx, am.member.GetLeadership())
am.svcLoopWG.Add(1)
go am.tsoAllocatorLoop()

Expand All @@ -234,11 +233,11 @@
// SetUpGlobalAllocator is used to set up the global allocator, which will initialize the allocator and put it into
// an allocator daemon. An TSO Allocator should only be set once, and may be initialized and reset multiple times
// depending on the election.
func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership, startGlobalLeaderLoop bool) {
func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership *election.Leadership) {
am.mu.Lock()
defer am.mu.Unlock()

allocator := NewGlobalTSOAllocator(ctx, am, startGlobalLeaderLoop)
allocator := NewGlobalTSOAllocator(ctx, am)
// Create a new allocatorGroup
ctx, cancel := context.WithCancel(ctx)
am.mu.allocatorGroups[GlobalDCLocation] = &allocatorGroup{
Expand Down Expand Up @@ -1389,3 +1388,14 @@
}
return leaderAddrs[0]
}

func (am *AllocatorManager) startGlobalAllocatorLoop() {
globalTSOAllocator, ok := am.mu.allocatorGroups[GlobalDCLocation].allocator.(*GlobalTSOAllocator)
if !ok {
// it should never happen
log.Error("failed to start global allocator loop, global allocator not found")
return
}

Check warning on line 1398 in pkg/tso/allocator_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/allocator_manager.go#L1395-L1398

Added lines #L1395 - L1398 were not covered by tests
globalTSOAllocator.wg.Add(1)
go globalTSOAllocator.primaryElectionLoop()
}
8 changes: 2 additions & 6 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ type GlobalTSOAllocator struct {
func NewGlobalTSOAllocator(
ctx context.Context,
am *AllocatorManager,
startGlobalLeaderLoop bool,
) Allocator {
ctx, cancel := context.WithCancel(ctx)
gta := &GlobalTSOAllocator{
Expand All @@ -109,11 +108,6 @@ func NewGlobalTSOAllocator(
tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), GlobalDCLocation),
}

if startGlobalLeaderLoop {
gta.wg.Add(1)
go gta.primaryElectionLoop()
}

return gta
}

Expand Down Expand Up @@ -537,6 +531,8 @@ func (gta *GlobalTSOAllocator) Reset() {
gta.timestampOracle.ResetTimestamp()
}

// primaryElectionLoop is used to maintain the TSO primary election and TSO's
// running allocator. It is only used in API mode.
func (gta *GlobalTSOAllocator) primaryElectionLoop() {
defer logutil.LogPanic()
defer gta.wg.Done()
Expand Down
3 changes: 2 additions & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
storage = kgm.tsoSvcStorage
}
// Initialize all kinds of maps.
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg)
am.startGlobalAllocatorLoop()
log.Info("created allocator manager",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("timestamp-path", am.GetTimestampPath("")))
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize)
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false)
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s)
// When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists.
if !s.cfg.EnableLocalTSO {
if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {
Expand Down