diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index c38c7142730..8daf3b22220 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -322,7 +322,7 @@ func (*Server) SetExternalTS(uint64) error { // ResetTS resets the TSO with the specified one. func (s *Server) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, keyspaceGroupID uint32) error { - log.Info("reset-ts", + log.Info("reset ts", zap.Uint64("new-ts", ts), zap.Bool("ignore-smaller", ignoreSmaller), zap.Bool("skip-upper-bound-check", skipUpperBoundCheck), diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 69b8d0f2f8e..0cfcd91c685 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -64,6 +64,8 @@ const ( keyspaceGroupsMembershipKey = "membership" keyspaceGroupsElectionKey = "election" + tsoAllocatorsPrefix = "tso_allocators" + // we use uint64 to represent ID, the max length of uint64 is 20. keyLen = 20 ) @@ -415,3 +417,8 @@ func FullTimestampPath(clusterID uint64, groupID uint32) string { } return path.Join(rootPath, tsPath) } + +// GlobalTSOAllocatorsPrefix returns the global TSO allocators prefix. +func GlobalTSOAllocatorsPrefix(clusterID uint64) string { + return path.Join(PDRootPath(clusterID), tsoAllocatorsPrefix) +} diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index f1683de1352..64f91c2f1a5 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -171,6 +171,8 @@ type AllocatorManager struct { ctx context.Context cancel context.CancelFunc + + etcdClient *clientv3.Client // kgID is the keyspace group ID kgID uint32 // member is for election use @@ -184,9 +186,11 @@ type AllocatorManager struct { // leaderLease defines the time within which a TSO primary/leader must update its TTL // in etcd, otherwise etcd will expire the leader key and other servers can campaign // the primary/leader again. Etcd only supports seconds TTL, so here is second too. - leaderLease int64 - maxResetTSGap func() time.Duration - securityConfig *grpcutil.TLSConfig + leaderLease int64 + maxResetTSGap func() time.Duration + securityConfig *grpcutil.TLSConfig + allocatorKeyPrefix string + allocatorKey string // for gRPC use localAllocatorConn struct { syncutil.RWMutex @@ -197,17 +201,21 @@ type AllocatorManager struct { // NewAllocatorManager creates a new TSO Allocator Manager. func NewAllocatorManager( ctx context.Context, + etcdClient *clientv3.Client, keyspaceGroupID uint32, member ElectionMember, rootPath string, storage endpoint.TSOStorage, cfg Config, startGlobalLeaderLoop bool, + allocatorKeyPrefix string, + allocatorKey string, ) *AllocatorManager { ctx, cancel := context.WithCancel(ctx) am := &AllocatorManager{ ctx: ctx, cancel: cancel, + etcdClient: etcdClient, kgID: keyspaceGroupID, member: member, rootPath: rootPath, @@ -218,6 +226,8 @@ func NewAllocatorManager( leaderLease: cfg.GetLeaderLease(), maxResetTSGap: cfg.GetMaxResetTSGap, securityConfig: cfg.GetTLSConfig(), + allocatorKeyPrefix: allocatorKeyPrefix, + allocatorKey: allocatorKey, } am.mu.allocatorGroups = make(map[string]*allocatorGroup) am.mu.clusterDCLocations = make(map[string]*DCLocationInfo) diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index f90dc5f26fe..c8df7f0bca9 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "runtime/trace" + "strings" "sync" "sync/atomic" "time" @@ -32,13 +33,18 @@ import ( "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "google.golang.org/grpc" ) +const ttlSeconds = 3 + // Allocator is a Timestamp Oracle allocator. type Allocator interface { // Initialize is used to initialize a TSO allocator. @@ -131,6 +137,9 @@ func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle { // close is used to shutdown the primary election loop. // tso service call this function to shutdown the loop here, but pd manages its own loop. func (gta *GlobalTSOAllocator) close() { + if err := gta.deregisterAllocator(); err != nil { + log.Warn("deregister tso allocator failed", zap.String("key", gta.am.allocatorKey), errs.ZapError(err)) + } gta.cancel() gta.wg.Wait() } @@ -184,6 +193,9 @@ func (gta *GlobalTSOAllocator) estimateMaxTS(ctx context.Context, count uint32, // Initialize will initialize the created global TSO allocator. func (gta *GlobalTSOAllocator) Initialize(int) error { + if err := gta.registerAllocator(); err != nil { + return err + } gta.tsoAllocatorRoleGauge.Set(1) // The suffix of a Global TSO should always be 0. gta.timestampOracle.suffix = 0 @@ -658,3 +670,119 @@ func (gta *GlobalTSOAllocator) campaignLeader() { func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics { return gta.timestampOracle.metrics } + +// registerAllocator registers the tso allocator to etcd. +// It is used when switching mode between pd and ms. We need to make sure only one TSO allocator is working at the same time. +func (gta *GlobalTSOAllocator) registerAllocator() error { + log.Info("register tso allocator", zap.String("key", gta.am.allocatorKey)) + kresp := gta.tryRegister() + if kresp == nil { + return errors.New("context canceled") + } + go func() { + defer logutil.LogPanic() + for { + select { + case <-gta.ctx.Done(): + log.Info("exit register tso allocator", zap.String("key", gta.am.allocatorKey)) + return + case _, ok := <-kresp: + if !ok { + log.Error("keep alive tso allocator failed", zap.String("key", gta.am.allocatorKey)) + kresp = gta.renewKeepalive() + } + } + } + }() + return nil +} + +func (gta *GlobalTSOAllocator) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse { + t := time.NewTicker(time.Duration(ttlSeconds) * time.Second / 2) + defer t.Stop() + for { + select { + case <-gta.ctx.Done(): + log.Info("exit register tso allocator", zap.String("key", gta.am.allocatorKey)) + return nil + case <-t.C: + return gta.tryRegister() + } + } +} + +func (gta *GlobalTSOAllocator) txnWithTTL(key, value string) (clientv3.LeaseID, error) { + ctx, cancel := context.WithTimeout(gta.ctx, etcdutil.DefaultRequestTimeout) + defer cancel() + grantResp, err := gta.am.etcdClient.Grant(ctx, ttlSeconds) + if err != nil { + return 0, err + } + resp, err := kv.NewSlowLogTxn(gta.am.etcdClient). + Then(clientv3.OpPut(key, value, clientv3.WithLease(grantResp.ID))). + Commit() + if err != nil { + return 0, errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByArgs() + } + if !resp.Succeeded { + return 0, errs.ErrEtcdTxnConflict.FastGenByArgs() + } + + return grantResp.ID, nil +} + +// deregisterAllocator deregisters the tso allocator from etcd. +func (gta *GlobalTSOAllocator) deregisterAllocator() error { + log.Info("deregister tso allocator", zap.String("key", gta.am.allocatorKey)) + ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second) + defer cancel() + _, err := gta.am.etcdClient.Delete(ctx, gta.am.allocatorKey) + return err +} + +func (gta *GlobalTSOAllocator) tryRegister() <-chan *clientv3.LeaseKeepAliveResponse { + // register immediately + kresp, needRetry := gta.register() + if !needRetry { + return kresp + } + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-gta.ctx.Done(): + return nil + case <-ticker.C: + if kresp, needRetry := gta.register(); !needRetry { + return kresp + } + } + } +} + +func (gta *GlobalTSOAllocator) register() (<-chan *clientv3.LeaseKeepAliveResponse, bool) { + ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second) + resp, err := gta.am.etcdClient.Get(ctx, gta.am.allocatorKeyPrefix, clientv3.WithPrefix()) + cancel() + if err != nil { + return nil, true + } + // wait for the previous allocator with different mode to be deregistered + if len(resp.Kvs) > 0 { + for _, kv := range resp.Kvs { + key := string(kv.Key) + if !strings.Contains(key, gta.am.allocatorKeyPrefix) { + return nil, true + } + } + } + id, err := gta.txnWithTTL(gta.am.allocatorKey, "") + if err != nil { + return nil, true + } + kresp, err := gta.am.etcdClient.KeepAlive(gta.ctx, id) + if err != nil { + return nil, true + } + return kresp, false +} diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index d1e94d445cc..8d150ceae59 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "net/http" + "path" "regexp" "sort" "sync" @@ -319,6 +320,7 @@ type KeyspaceGroupManager struct { // tsoServiceID is the service ID of the TSO service, registered in the service discovery tsoServiceID *discovery.ServiceRegistryEntry + clusterID uint64 etcdClient *clientv3.Client httpClient *http.Client // electionNamePrefix is the name prefix to generate the unique name of a participant, @@ -414,6 +416,7 @@ func NewKeyspaceGroupManager( ctx: ctx, cancel: cancel, tsoServiceID: tsoServiceID, + clusterID: clusterID, etcdClient: etcdClient, httpClient: httpClient, electionNamePrefix: electionNamePrefix, @@ -768,7 +771,9 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro storage = kgm.tsoSvcStorage } // Initialize all kinds of maps. - am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + allocatorKeyPrefix := endpoint.GlobalTSOAllocatorsPrefix(kgm.clusterID) + am := NewAllocatorManager(kgm.ctx, kgm.etcdClient, group.ID, participant, tsRootPath, storage, kgm.cfg, true, + allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "tso", fmt.Sprintf("keyspace_group_%d", group.ID))) log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID), zap.String("timestamp-path", am.GetTimestampPath(""))) diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index ad67c49fa5e..22ed792496c 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -154,7 +154,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} clusterID := rand.Uint64() clusterIDStr := strconv.FormatUint(clusterID, 10) - legacySvcRootPath := path.Join("/pd", clusterIDStr) tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, clusterIDStr, "tso") electionNamePrefix := "tso-server-" + clusterIDStr @@ -1046,7 +1045,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { defaultPriority := mcsutils.DefaultKeyspaceGroupReplicaPriority clusterID := rand.Uint64() clusterIDStr := strconv.FormatUint(clusterID, 10) - rootPath := path.Join("/pd", clusterIDStr) cfg1 := suite.createConfig() cfg2 := suite.createConfig() diff --git a/server/handler.go b/server/handler.go index f5d9b9035b2..4aceaaa1f74 100644 --- a/server/handler.go +++ b/server/handler.go @@ -404,7 +404,7 @@ func (h *Handler) GetSchedulerConfigHandler() (http.Handler, error) { // ResetTS resets the ts with specified tso. func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _ uint32) error { - log.Info("reset-ts", + log.Info("reset ts", zap.Uint64("new-ts", ts), zap.Bool("ignore-smaller", ignoreSmaller), zap.Bool("skip-upper-bound-check", skipUpperBoundCheck)) diff --git a/server/server.go b/server/server.go index 8d7b83cfe4a..82185b20883 100644 --- a/server/server.go +++ b/server/server.go @@ -22,6 +22,7 @@ import ( "math/rand" "net/http" "os" + "path" "path/filepath" "runtime" "strconv" @@ -467,7 +468,9 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} if !s.IsAPIServiceMode() { - s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false) + allocatorKeyPrefix := endpoint.GlobalTSOAllocatorsPrefix(s.clusterID.Load()) + s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, s.client, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false, + allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "pd")) // When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists. if !s.cfg.EnableLocalTSO { if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil { diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index dc9bfa1e291..9d81f5639aa 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -214,6 +214,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) { tc.WaitLeader() leaderServer := tc.GetLeaderServer() re.NoError(leaderServer.BootstrapCluster()) + ttc.WaitForDefaultPrimaryServing(re) urlPrefix := fmt.Sprintf("%s/pd/api/v1", pdAddr) diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index ac3d914aa80..25d115f387b 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -23,7 +23,9 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/testutil" tso "github.com/tikv/pd/pkg/mcs/tso/server" + "github.com/tikv/pd/pkg/mcs/utils" tsopkg "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/tempurl" tu "github.com/tikv/pd/pkg/utils/testutil" @@ -84,6 +86,13 @@ func (suite *tsoServerTestSuite) SetupSuite() { } else { suite.tsoServer, suite.tsoServerCleanup = tests.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr()) + testutil.Eventually(re, func() bool { + am, err := suite.tsoServer.GetKeyspaceGroupManager().GetAllocatorManager(utils.DefaultKeyspaceGroupID) + if err != nil { + return false + } + return am.GetMember().IsLeaderElected() + }) } }