From 94ee7adf13dc36e27b4abee39acb74ae2c523312 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 7 Mar 2024 13:41:58 +0800 Subject: [PATCH 1/6] register tso allocator Signed-off-by: Ryan Leung --- pkg/storage/endpoint/key_path.go | 7 ++ pkg/tso/allocator_manager.go | 15 +++- pkg/tso/global_allocator.go | 116 +++++++++++++++++++++++++ pkg/tso/keyspace_group_manager.go | 6 +- pkg/tso/keyspace_group_manager_test.go | 2 - server/server.go | 4 +- 6 files changed, 143 insertions(+), 7 deletions(-) diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 69b8d0f2f8e..0cfcd91c685 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -64,6 +64,8 @@ const ( keyspaceGroupsMembershipKey = "membership" keyspaceGroupsElectionKey = "election" + tsoAllocatorsPrefix = "tso_allocators" + // we use uint64 to represent ID, the max length of uint64 is 20. keyLen = 20 ) @@ -415,3 +417,8 @@ func FullTimestampPath(clusterID uint64, groupID uint32) string { } return path.Join(rootPath, tsPath) } + +// GlobalTSOAllocatorsPrefix returns the global TSO allocators prefix. +func GlobalTSOAllocatorsPrefix(clusterID uint64) string { + return path.Join(PDRootPath(clusterID), tsoAllocatorsPrefix) +} diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index f1683de1352..1a0da1fe5a7 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -171,6 +171,8 @@ type AllocatorManager struct { ctx context.Context cancel context.CancelFunc + + etcdClient *clientv3.Client // kgID is the keyspace group ID kgID uint32 // member is for election use @@ -184,9 +186,11 @@ type AllocatorManager struct { // leaderLease defines the time within which a TSO primary/leader must update its TTL // in etcd, otherwise etcd will expire the leader key and other servers can campaign // the primary/leader again. Etcd only supports seconds TTL, so here is second too. - leaderLease int64 - maxResetTSGap func() time.Duration - securityConfig *grpcutil.TLSConfig + leaderLease int64 + maxResetTSGap func() time.Duration + securityConfig *grpcutil.TLSConfig + allocatorKeyPrefix string + allocatorKey string // for gRPC use localAllocatorConn struct { syncutil.RWMutex @@ -197,17 +201,20 @@ type AllocatorManager struct { // NewAllocatorManager creates a new TSO Allocator Manager. func NewAllocatorManager( ctx context.Context, + etcdClient *clientv3.Client, keyspaceGroupID uint32, member ElectionMember, rootPath string, storage endpoint.TSOStorage, cfg Config, startGlobalLeaderLoop bool, + allocatorKeyPrefix string, ) *AllocatorManager { ctx, cancel := context.WithCancel(ctx) am := &AllocatorManager{ ctx: ctx, cancel: cancel, + etcdClient: etcdClient, kgID: keyspaceGroupID, member: member, rootPath: rootPath, @@ -218,6 +225,8 @@ func NewAllocatorManager( leaderLease: cfg.GetLeaderLease(), maxResetTSGap: cfg.GetMaxResetTSGap, securityConfig: cfg.GetTLSConfig(), + allocatorKey: path.Join(allocatorKeyPrefix, fmt.Sprintf("keyspace_group_%d", keyspaceGroupID)), + allocatorKeyPrefix: allocatorKeyPrefix, } am.mu.allocatorGroups = make(map[string]*allocatorGroup) am.mu.clusterDCLocations = make(map[string]*DCLocationInfo) diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index f90dc5f26fe..727bf29fef9 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "runtime/trace" + "strings" "sync" "sync/atomic" "time" @@ -32,9 +33,12 @@ import ( "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -131,6 +135,9 @@ func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle { // close is used to shutdown the primary election loop. // tso service call this function to shutdown the loop here, but pd manages its own loop. func (gta *GlobalTSOAllocator) close() { + if err := gta.deregisterAllocator(); err != nil { + log.Warn("deregister tso allocator failed", zap.String("key", gta.am.allocatorKey), errs.ZapError(err)) + } gta.cancel() gta.wg.Wait() } @@ -184,6 +191,9 @@ func (gta *GlobalTSOAllocator) estimateMaxTS(ctx context.Context, count uint32, // Initialize will initialize the created global TSO allocator. func (gta *GlobalTSOAllocator) Initialize(int) error { + if err := gta.registerAllocator(); err != nil { + return err + } gta.tsoAllocatorRoleGauge.Set(1) // The suffix of a Global TSO should always be 0. gta.timestampOracle.suffix = 0 @@ -658,3 +668,109 @@ func (gta *GlobalTSOAllocator) campaignLeader() { func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics { return gta.timestampOracle.metrics } + +// registerAllocator registers the tso allocator to etcd. +// It is used when switching mode between pd and ms. We need to make sure only one TSO allocator is working at the same time. +func (gta *GlobalTSOAllocator) registerAllocator() error { + log.Info("register tso allocator", zap.String("key", gta.am.allocatorKey)) + kresp := gta.tryRegister() + if kresp == nil { + return errors.New("context canceled") + } + go func() { + defer logutil.LogPanic() + for { + select { + case <-gta.ctx.Done(): + log.Info("exit register tso allocator", zap.String("key", gta.am.allocatorKey)) + return + case _, ok := <-kresp: + if !ok { + log.Error("keep alive tso allocator failed", zap.String("key", gta.am.allocatorKey)) + kresp = gta.renewKeepalive() + } + } + } + }() + return nil +} + +func (gta *GlobalTSOAllocator) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse { + t := time.NewTicker(time.Duration(3) * time.Second / 2) + defer t.Stop() + for { + select { + case <-gta.ctx.Done(): + log.Info("exit register tso allocator", zap.String("key", gta.am.allocatorKey)) + return nil + case <-t.C: + return gta.tryRegister() + } + } +} + +func (gta *GlobalTSOAllocator) txnWithTTL(key, value string) (clientv3.LeaseID, error) { + ctx, cancel := context.WithTimeout(gta.ctx, etcdutil.DefaultRequestTimeout) + defer cancel() + grantResp, err := gta.am.etcdClient.Grant(ctx, 3) + if err != nil { + return 0, err + } + resp, err := kv.NewSlowLogTxn(gta.am.etcdClient). + Then(clientv3.OpPut(key, value, clientv3.WithLease(grantResp.ID))). + Commit() + if err != nil { + return 0, errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByArgs() + } + if !resp.Succeeded { + return 0, errs.ErrEtcdTxnConflict.FastGenByArgs() + } + + return grantResp.ID, nil +} + +// deregisterAllocator deregisters the tso allocator from etcd. +func (gta *GlobalTSOAllocator) deregisterAllocator() error { + log.Info("deregister tso allocator", zap.String("key", gta.am.allocatorKey)) + ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second) + defer cancel() + _, err := gta.am.etcdClient.Delete(ctx, gta.am.allocatorKey) + return err +} + +func (gta *GlobalTSOAllocator) tryRegister() <-chan *clientv3.LeaseKeepAliveResponse { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() +outerLoop: + for { + select { + case <-gta.ctx.Done(): + return nil + case <-ticker.C: + ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second) + resp, err := gta.am.etcdClient.Get(ctx, gta.am.allocatorKeyPrefix, clientv3.WithPrefix()) + cancel() + if err != nil { + continue + } + // wait for the previous allocator with different mode to be deregistered + if len(resp.Kvs) > 0 { + for _, kv := range resp.Kvs { + key := string(kv.Key) + if !strings.Contains(key, gta.am.allocatorKeyPrefix) { + continue outerLoop + } + } + } + id, err := gta.txnWithTTL(gta.am.allocatorKey, "") + if err != nil { + continue + } + kresp, err := gta.am.etcdClient.KeepAlive(gta.ctx, id) + if err != nil { + continue + } + return kresp + } + } +} diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index d1e94d445cc..804bd6669df 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "net/http" + "path" "regexp" "sort" "sync" @@ -319,6 +320,7 @@ type KeyspaceGroupManager struct { // tsoServiceID is the service ID of the TSO service, registered in the service discovery tsoServiceID *discovery.ServiceRegistryEntry + clusterID uint64 etcdClient *clientv3.Client httpClient *http.Client // electionNamePrefix is the name prefix to generate the unique name of a participant, @@ -414,6 +416,7 @@ func NewKeyspaceGroupManager( ctx: ctx, cancel: cancel, tsoServiceID: tsoServiceID, + clusterID: clusterID, etcdClient: etcdClient, httpClient: httpClient, electionNamePrefix: electionNamePrefix, @@ -768,7 +771,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro storage = kgm.tsoSvcStorage } // Initialize all kinds of maps. - am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + am := NewAllocatorManager(kgm.ctx, kgm.etcdClient, group.ID, participant, tsRootPath, storage, kgm.cfg, true, + path.Join(endpoint.GlobalTSOAllocatorsPrefix(kgm.clusterID), "tso")) log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID), zap.String("timestamp-path", am.GetTimestampPath(""))) diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index ad67c49fa5e..22ed792496c 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -154,7 +154,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} clusterID := rand.Uint64() clusterIDStr := strconv.FormatUint(clusterID, 10) - legacySvcRootPath := path.Join("/pd", clusterIDStr) tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, clusterIDStr, "tso") electionNamePrefix := "tso-server-" + clusterIDStr @@ -1046,7 +1045,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { defaultPriority := mcsutils.DefaultKeyspaceGroupReplicaPriority clusterID := rand.Uint64() clusterIDStr := strconv.FormatUint(clusterID, 10) - rootPath := path.Join("/pd", clusterIDStr) cfg1 := suite.createConfig() cfg2 := suite.createConfig() diff --git a/server/server.go b/server/server.go index 8d7b83cfe4a..35fc7daa850 100644 --- a/server/server.go +++ b/server/server.go @@ -22,6 +22,7 @@ import ( "math/rand" "net/http" "os" + "path" "path/filepath" "runtime" "strconv" @@ -467,7 +468,8 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} if !s.IsAPIServiceMode() { - s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false) + s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, s.client, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false, + path.Join(endpoint.GlobalTSOAllocatorsPrefix(s.clusterID.Load()), "pd")) // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. if !s.cfg.EnableLocalTSO { if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil { From 389f1247ef8927c3fbce46adbf7a0d398ee65348 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 8 Mar 2024 12:09:55 +0800 Subject: [PATCH 2/6] fix the test Signed-off-by: Ryan Leung --- pkg/mcs/tso/server/server.go | 2 +- server/handler.go | 2 +- tests/integrations/tso/server_test.go | 9 +++++++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index c38c7142730..8daf3b22220 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -322,7 +322,7 @@ func (*Server) SetExternalTS(uint64) error { // ResetTS resets the TSO with the specified one. func (s *Server) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, keyspaceGroupID uint32) error { - log.Info("reset-ts", + log.Info("reset ts", zap.Uint64("new-ts", ts), zap.Bool("ignore-smaller", ignoreSmaller), zap.Bool("skip-upper-bound-check", skipUpperBoundCheck), diff --git a/server/handler.go b/server/handler.go index f5d9b9035b2..4aceaaa1f74 100644 --- a/server/handler.go +++ b/server/handler.go @@ -404,7 +404,7 @@ func (h *Handler) GetSchedulerConfigHandler() (http.Handler, error) { // ResetTS resets the ts with specified tso. func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _ uint32) error { - log.Info("reset-ts", + log.Info("reset ts", zap.Uint64("new-ts", ts), zap.Bool("ignore-smaller", ignoreSmaller), zap.Bool("skip-upper-bound-check", skipUpperBoundCheck)) diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index ac3d914aa80..25d115f387b 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -23,7 +23,9 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/testutil" tso "github.com/tikv/pd/pkg/mcs/tso/server" + "github.com/tikv/pd/pkg/mcs/utils" tsopkg "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/tempurl" tu "github.com/tikv/pd/pkg/utils/testutil" @@ -84,6 +86,13 @@ func (suite *tsoServerTestSuite) SetupSuite() { } else { suite.tsoServer, suite.tsoServerCleanup = tests.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr()) + testutil.Eventually(re, func() bool { + am, err := suite.tsoServer.GetKeyspaceGroupManager().GetAllocatorManager(utils.DefaultKeyspaceGroupID) + if err != nil { + return false + } + return am.GetMember().IsLeaderElected() + }) } } From f3cc91abbccb31ea1446adab1e5e58ef526ce540 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 8 Mar 2024 14:30:08 +0800 Subject: [PATCH 3/6] fix the test Signed-off-by: Ryan Leung --- tests/integrations/mcs/tso/api_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index dc9bfa1e291..9d81f5639aa 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -214,6 +214,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) + ttc.WaitForDefaultPrimaryServing(re) urlPrefix := fmt.Sprintf("%s/pd/api/v1", pdAddr) From 03e3d0e949050f194bfce53ee543e1d0cf0dbc0d Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 8 Mar 2024 18:01:07 +0800 Subject: [PATCH 4/6] adjust the ticker Signed-off-by: Ryan Leung --- pkg/tso/global_allocator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 727bf29fef9..c65ffbc005a 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -739,7 +739,7 @@ func (gta *GlobalTSOAllocator) deregisterAllocator() error { } func (gta *GlobalTSOAllocator) tryRegister() <-chan *clientv3.LeaseKeepAliveResponse { - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() outerLoop: for { From 759521365c2c0b711fbb4d1f4e434c5b842f3390 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 11 Mar 2024 12:53:07 +0800 Subject: [PATCH 5/6] register immediately Signed-off-by: Ryan Leung --- pkg/tso/global_allocator.go | 56 ++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index c65ffbc005a..f3ae3d0a311 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -739,38 +739,48 @@ func (gta *GlobalTSOAllocator) deregisterAllocator() error { } func (gta *GlobalTSOAllocator) tryRegister() <-chan *clientv3.LeaseKeepAliveResponse { + // register immediately + kresp, needRetry := gta.register() + if !needRetry { + return kresp + } ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() -outerLoop: for { select { case <-gta.ctx.Done(): return nil case <-ticker.C: - ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second) - resp, err := gta.am.etcdClient.Get(ctx, gta.am.allocatorKeyPrefix, clientv3.WithPrefix()) - cancel() - if err != nil { - continue - } - // wait for the previous allocator with different mode to be deregistered - if len(resp.Kvs) > 0 { - for _, kv := range resp.Kvs { - key := string(kv.Key) - if !strings.Contains(key, gta.am.allocatorKeyPrefix) { - continue outerLoop - } - } + if kresp, needRetry := gta.register(); !needRetry { + return kresp } - id, err := gta.txnWithTTL(gta.am.allocatorKey, "") - if err != nil { - continue - } - kresp, err := gta.am.etcdClient.KeepAlive(gta.ctx, id) - if err != nil { - continue + } + } +} + +func (gta *GlobalTSOAllocator) register() (<-chan *clientv3.LeaseKeepAliveResponse, bool) { + ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second) + resp, err := gta.am.etcdClient.Get(ctx, gta.am.allocatorKeyPrefix, clientv3.WithPrefix()) + cancel() + if err != nil { + return nil, true + } + // wait for the previous allocator with different mode to be deregistered + if len(resp.Kvs) > 0 { + for _, kv := range resp.Kvs { + key := string(kv.Key) + if !strings.Contains(key, gta.am.allocatorKeyPrefix) { + return nil, true } - return kresp } } + id, err := gta.txnWithTTL(gta.am.allocatorKey, "") + if err != nil { + return nil, true + } + kresp, err := gta.am.etcdClient.KeepAlive(gta.ctx, id) + if err != nil { + return nil, true + } + return kresp, false } From 7f23d25640b5b62b090a3e98da09d69af6c37bb2 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 12 Mar 2024 16:29:25 +0800 Subject: [PATCH 6/6] address comments Signed-off-by: Ryan Leung --- pkg/tso/allocator_manager.go | 3 ++- pkg/tso/global_allocator.go | 6 ++++-- pkg/tso/keyspace_group_manager.go | 3 ++- server/server.go | 3 ++- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 1a0da1fe5a7..64f91c2f1a5 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -209,6 +209,7 @@ func NewAllocatorManager( cfg Config, startGlobalLeaderLoop bool, allocatorKeyPrefix string, + allocatorKey string, ) *AllocatorManager { ctx, cancel := context.WithCancel(ctx) am := &AllocatorManager{ @@ -225,8 +226,8 @@ func NewAllocatorManager( leaderLease: cfg.GetLeaderLease(), maxResetTSGap: cfg.GetMaxResetTSGap, securityConfig: cfg.GetTLSConfig(), - allocatorKey: path.Join(allocatorKeyPrefix, fmt.Sprintf("keyspace_group_%d", keyspaceGroupID)), allocatorKeyPrefix: allocatorKeyPrefix, + allocatorKey: allocatorKey, } am.mu.allocatorGroups = make(map[string]*allocatorGroup) am.mu.clusterDCLocations = make(map[string]*DCLocationInfo) diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index f3ae3d0a311..c8df7f0bca9 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -43,6 +43,8 @@ import ( "google.golang.org/grpc" ) +const ttlSeconds = 3 + // Allocator is a Timestamp Oracle allocator. type Allocator interface { // Initialize is used to initialize a TSO allocator. @@ -696,7 +698,7 @@ func (gta *GlobalTSOAllocator) registerAllocator() error { } func (gta *GlobalTSOAllocator) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse { - t := time.NewTicker(time.Duration(3) * time.Second / 2) + t := time.NewTicker(time.Duration(ttlSeconds) * time.Second / 2) defer t.Stop() for { select { @@ -712,7 +714,7 @@ func (gta *GlobalTSOAllocator) renewKeepalive() <-chan *clientv3.LeaseKeepAliveR func (gta *GlobalTSOAllocator) txnWithTTL(key, value string) (clientv3.LeaseID, error) { ctx, cancel := context.WithTimeout(gta.ctx, etcdutil.DefaultRequestTimeout) defer cancel() - grantResp, err := gta.am.etcdClient.Grant(ctx, 3) + grantResp, err := gta.am.etcdClient.Grant(ctx, ttlSeconds) if err != nil { return 0, err } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 804bd6669df..8d150ceae59 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -771,8 +771,9 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro storage = kgm.tsoSvcStorage } // Initialize all kinds of maps. + allocatorKeyPrefix := endpoint.GlobalTSOAllocatorsPrefix(kgm.clusterID) am := NewAllocatorManager(kgm.ctx, kgm.etcdClient, group.ID, participant, tsRootPath, storage, kgm.cfg, true, - path.Join(endpoint.GlobalTSOAllocatorsPrefix(kgm.clusterID), "tso")) + allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "tso", fmt.Sprintf("keyspace_group_%d", group.ID))) log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID), zap.String("timestamp-path", am.GetTimestampPath(""))) diff --git a/server/server.go b/server/server.go index 35fc7daa850..82185b20883 100644 --- a/server/server.go +++ b/server/server.go @@ -468,8 +468,9 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} if !s.IsAPIServiceMode() { + allocatorKeyPrefix := endpoint.GlobalTSOAllocatorsPrefix(s.clusterID.Load()) s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, s.client, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false, - path.Join(endpoint.GlobalTSOAllocatorsPrefix(s.clusterID.Load()), "pd")) + allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "pd")) // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. if !s.cfg.EnableLocalTSO { if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {