Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: register tso allocator #7891

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (*Server) SetExternalTS(uint64) error {

// ResetTS resets the TSO with the specified one.
func (s *Server) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, keyspaceGroupID uint32) error {
log.Info("reset-ts",
log.Info("reset ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck),
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const (
keyspaceGroupsMembershipKey = "membership"
keyspaceGroupsElectionKey = "election"

tsoAllocatorsPrefix = "tso_allocators"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
)
Expand Down Expand Up @@ -415,3 +417,8 @@ func FullTimestampPath(clusterID uint64, groupID uint32) string {
}
return path.Join(rootPath, tsPath)
}

// GlobalTSOAllocatorsPrefix returns the global TSO allocators prefix.
func GlobalTSOAllocatorsPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), tsoAllocatorsPrefix)
}
16 changes: 13 additions & 3 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ type AllocatorManager struct {

ctx context.Context
cancel context.CancelFunc

etcdClient *clientv3.Client
// kgID is the keyspace group ID
kgID uint32
// member is for election use
Expand All @@ -184,9 +186,11 @@ type AllocatorManager struct {
// leaderLease defines the time within which a TSO primary/leader must update its TTL
// in etcd, otherwise etcd will expire the leader key and other servers can campaign
// the primary/leader again. Etcd only supports seconds TTL, so here is second too.
leaderLease int64
maxResetTSGap func() time.Duration
securityConfig *grpcutil.TLSConfig
leaderLease int64
maxResetTSGap func() time.Duration
securityConfig *grpcutil.TLSConfig
allocatorKeyPrefix string
allocatorKey string
// for gRPC use
localAllocatorConn struct {
syncutil.RWMutex
Expand All @@ -197,17 +201,21 @@ type AllocatorManager struct {
// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(
ctx context.Context,
etcdClient *clientv3.Client,
keyspaceGroupID uint32,
member ElectionMember,
rootPath string,
storage endpoint.TSOStorage,
cfg Config,
startGlobalLeaderLoop bool,
allocatorKeyPrefix string,
allocatorKey string,
) *AllocatorManager {
ctx, cancel := context.WithCancel(ctx)
am := &AllocatorManager{
ctx: ctx,
cancel: cancel,
etcdClient: etcdClient,
kgID: keyspaceGroupID,
member: member,
rootPath: rootPath,
Expand All @@ -218,6 +226,8 @@ func NewAllocatorManager(
leaderLease: cfg.GetLeaderLease(),
maxResetTSGap: cfg.GetMaxResetTSGap,
securityConfig: cfg.GetTLSConfig(),
allocatorKeyPrefix: allocatorKeyPrefix,
allocatorKey: allocatorKey,
}
am.mu.allocatorGroups = make(map[string]*allocatorGroup)
am.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
Expand Down
128 changes: 128 additions & 0 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"errors"
"fmt"
"runtime/trace"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -32,13 +33,18 @@
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const ttlSeconds = 3

// Allocator is a Timestamp Oracle allocator.
type Allocator interface {
// Initialize is used to initialize a TSO allocator.
Expand Down Expand Up @@ -131,6 +137,9 @@
// close is used to shutdown the primary election loop.
// tso service call this function to shutdown the loop here, but pd manages its own loop.
func (gta *GlobalTSOAllocator) close() {
if err := gta.deregisterAllocator(); err != nil {
log.Warn("deregister tso allocator failed", zap.String("key", gta.am.allocatorKey), errs.ZapError(err))
}
gta.cancel()
gta.wg.Wait()
}
Expand Down Expand Up @@ -184,6 +193,9 @@

// Initialize will initialize the created global TSO allocator.
func (gta *GlobalTSOAllocator) Initialize(int) error {
if err := gta.registerAllocator(); err != nil {
return err
}
gta.tsoAllocatorRoleGauge.Set(1)
// The suffix of a Global TSO should always be 0.
gta.timestampOracle.suffix = 0
Expand Down Expand Up @@ -658,3 +670,119 @@
func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics {
return gta.timestampOracle.metrics
}

// registerAllocator registers the tso allocator to etcd.
// It is used when switching mode between pd and ms. We need to make sure only one TSO allocator is working at the same time.
func (gta *GlobalTSOAllocator) registerAllocator() error {
log.Info("register tso allocator", zap.String("key", gta.am.allocatorKey))
kresp := gta.tryRegister()
if kresp == nil {
return errors.New("context canceled")
}
go func() {
defer logutil.LogPanic()
for {
select {
case <-gta.ctx.Done():
log.Info("exit register tso allocator", zap.String("key", gta.am.allocatorKey))
return
case _, ok := <-kresp:
if !ok {
log.Error("keep alive tso allocator failed", zap.String("key", gta.am.allocatorKey))
kresp = gta.renewKeepalive()
}
}
}
}()
return nil
}

func (gta *GlobalTSOAllocator) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse {
t := time.NewTicker(time.Duration(ttlSeconds) * time.Second / 2)
defer t.Stop()
for {
select {
case <-gta.ctx.Done():
log.Info("exit register tso allocator", zap.String("key", gta.am.allocatorKey))
return nil
case <-t.C:
return gta.tryRegister()
}
}
}

func (gta *GlobalTSOAllocator) txnWithTTL(key, value string) (clientv3.LeaseID, error) {
ctx, cancel := context.WithTimeout(gta.ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
grantResp, err := gta.am.etcdClient.Grant(ctx, ttlSeconds)
if err != nil {
return 0, err
}
resp, err := kv.NewSlowLogTxn(gta.am.etcdClient).
Then(clientv3.OpPut(key, value, clientv3.WithLease(grantResp.ID))).
Commit()
if err != nil {
return 0, errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByArgs()

Check warning on line 725 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L725

Added line #L725 was not covered by tests
}
if !resp.Succeeded {
return 0, errs.ErrEtcdTxnConflict.FastGenByArgs()

Check warning on line 728 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L728

Added line #L728 was not covered by tests
}

return grantResp.ID, nil
}

// deregisterAllocator deregisters the tso allocator from etcd.
func (gta *GlobalTSOAllocator) deregisterAllocator() error {
log.Info("deregister tso allocator", zap.String("key", gta.am.allocatorKey))
ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second)
defer cancel()
_, err := gta.am.etcdClient.Delete(ctx, gta.am.allocatorKey)
return err
}

func (gta *GlobalTSOAllocator) tryRegister() <-chan *clientv3.LeaseKeepAliveResponse {
// register immediately
kresp, needRetry := gta.register()
if !needRetry {
return kresp
}
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-gta.ctx.Done():
return nil
case <-ticker.C:
if kresp, needRetry := gta.register(); !needRetry {
return kresp

Check warning on line 757 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L757

Added line #L757 was not covered by tests
}
}
}
}

func (gta *GlobalTSOAllocator) register() (<-chan *clientv3.LeaseKeepAliveResponse, bool) {
ctx, cancel := context.WithTimeout(gta.ctx, time.Duration(3)*time.Second)
resp, err := gta.am.etcdClient.Get(ctx, gta.am.allocatorKeyPrefix, clientv3.WithPrefix())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocatorKeyPrefix varies in different modes, and it cannot retrieve keys from other allocators as expected.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch!

cancel()
if err != nil {
return nil, true
}
// wait for the previous allocator with different mode to be deregistered
if len(resp.Kvs) > 0 {
for _, kv := range resp.Kvs {
key := string(kv.Key)
if !strings.Contains(key, gta.am.allocatorKeyPrefix) {
return nil, true

Check warning on line 775 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L775

Added line #L775 was not covered by tests
}
}
}
id, err := gta.txnWithTTL(gta.am.allocatorKey, "")
if err != nil {
return nil, true
}
kresp, err := gta.am.etcdClient.KeepAlive(gta.ctx, id)
if err != nil {
return nil, true

Check warning on line 785 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L785

Added line #L785 was not covered by tests
}
return kresp, false
}
7 changes: 6 additions & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"net/http"
"path"
"regexp"
"sort"
"sync"
Expand Down Expand Up @@ -319,6 +320,7 @@ type KeyspaceGroupManager struct {

// tsoServiceID is the service ID of the TSO service, registered in the service discovery
tsoServiceID *discovery.ServiceRegistryEntry
clusterID uint64
etcdClient *clientv3.Client
httpClient *http.Client
// electionNamePrefix is the name prefix to generate the unique name of a participant,
Expand Down Expand Up @@ -414,6 +416,7 @@ func NewKeyspaceGroupManager(
ctx: ctx,
cancel: cancel,
tsoServiceID: tsoServiceID,
clusterID: clusterID,
etcdClient: etcdClient,
httpClient: httpClient,
electionNamePrefix: electionNamePrefix,
Expand Down Expand Up @@ -768,7 +771,9 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
storage = kgm.tsoSvcStorage
}
// Initialize all kinds of maps.
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)
allocatorKeyPrefix := endpoint.GlobalTSOAllocatorsPrefix(kgm.clusterID)
am := NewAllocatorManager(kgm.ctx, kgm.etcdClient, group.ID, participant, tsRootPath, storage, kgm.cfg, true,
allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "tso", fmt.Sprintf("keyspace_group_%d", group.ID)))
log.Info("created allocator manager",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("timestamp-path", am.GetTimestampPath("")))
Expand Down
2 changes: 0 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() {
tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr}
clusterID := rand.Uint64()
clusterIDStr := strconv.FormatUint(clusterID, 10)

legacySvcRootPath := path.Join("/pd", clusterIDStr)
tsoSvcRootPath := path.Join(mcsutils.MicroserviceRootPath, clusterIDStr, "tso")
electionNamePrefix := "tso-server-" + clusterIDStr
Expand Down Expand Up @@ -1046,7 +1045,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() {
defaultPriority := mcsutils.DefaultKeyspaceGroupReplicaPriority
clusterID := rand.Uint64()
clusterIDStr := strconv.FormatUint(clusterID, 10)

rootPath := path.Join("/pd", clusterIDStr)
cfg1 := suite.createConfig()
cfg2 := suite.createConfig()
Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (h *Handler) GetSchedulerConfigHandler() (http.Handler, error) {

// ResetTS resets the ts with specified tso.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool, _ uint32) error {
log.Info("reset-ts",
log.Info("reset ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
Expand Down
5 changes: 4 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"net/http"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
Expand Down Expand Up @@ -467,7 +468,9 @@ func (s *Server) startServer(ctx context.Context) error {
s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.pdProtoFactory = &tsoutil.PDProtoFactory{}
if !s.IsAPIServiceMode() {
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false)
allocatorKeyPrefix := endpoint.GlobalTSOAllocatorsPrefix(s.clusterID.Load())
s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, s.client, mcs.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s, false,
allocatorKeyPrefix, path.Join(allocatorKeyPrefix, "pd"))
// When disabled the Local TSO, we should clean up the Local TSO Allocator's meta info written in etcd if it exists.
if !s.cfg.EnableLocalTSO {
if err = s.tsoAllocatorManager.CleanUpDCLocation(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) {
tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
re.NoError(leaderServer.BootstrapCluster())
ttc.WaitForDefaultPrimaryServing(re)

urlPrefix := fmt.Sprintf("%s/pd/api/v1", pdAddr)

Expand Down
9 changes: 9 additions & 0 deletions tests/integrations/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/client/testutil"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
tsopkg "github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/tempurl"
tu "github.com/tikv/pd/pkg/utils/testutil"
Expand Down Expand Up @@ -84,6 +86,13 @@ func (suite *tsoServerTestSuite) SetupSuite() {
} else {
suite.tsoServer, suite.tsoServerCleanup = tests.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc())
suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr())
testutil.Eventually(re, func() bool {
am, err := suite.tsoServer.GetKeyspaceGroupManager().GetAllocatorManager(utils.DefaultKeyspaceGroupID)
if err != nil {
return false
}
return am.GetMember().IsLeaderElected()
})
}
}

Expand Down
Loading