Skip to content

Commit

Permalink
pdms: support primary/transfer api for scheduling and tso (#8157)
Browse files Browse the repository at this point in the history
ref #5766, close #7995

Signed-off-by: husharp <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] authored Aug 13, 2024
1 parent 917f24a commit 1c1cd99
Show file tree
Hide file tree
Showing 13 changed files with 705 additions and 49 deletions.
53 changes: 38 additions & 15 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Leadership struct {
// campaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`.
// It is ordered by time to prevent the leader from campaigning too frequently.
campaignTimes []time.Time
// primaryWatch is for the primary watch only,
// which is used to reuse `Watch` interface in `Leadership`.
primaryWatch atomic.Bool
}

// NewLeadership creates a new Leadership.
Expand All @@ -84,17 +87,18 @@ func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadersh
return leadership
}

// getLease gets the lease of leadership, only if leadership is valid,
// GetLease gets the lease of leadership, only if leadership is valid,
// i.e. the owner is a true leader, the lease is not nil.
func (ls *Leadership) getLease() *lease {
func (ls *Leadership) GetLease() *Lease {
l := ls.lease.Load()
if l == nil {
return nil
}
return l.(*lease)
return l.(*Lease)
}

func (ls *Leadership) setLease(lease *lease) {
// SetLease sets the lease of leadership.
func (ls *Leadership) SetLease(lease *Lease) {
ls.lease.Store(lease)
}

Expand All @@ -114,6 +118,16 @@ func (ls *Leadership) GetLeaderKey() string {
return ls.leaderKey
}

// SetPrimaryWatch sets the primary watch flag.
func (ls *Leadership) SetPrimaryWatch(val bool) {
ls.primaryWatch.Store(val)
}

// IsPrimary gets the primary watch flag.
func (ls *Leadership) IsPrimary() bool {
return ls.primaryWatch.Load()
}

// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`.
// Need to make sure `AddCampaignTimes` is called before this function.
func (ls *Leadership) GetCampaignTimesNum() int {
Expand Down Expand Up @@ -152,18 +166,19 @@ func (ls *Leadership) AddCampaignTimes() {
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.leaderValue = leaderData
// Create a new lease to campaign
newLease := &lease{
Purpose: ls.purpose,
client: ls.client,
lease: clientv3.NewLease(ls.client),
}
ls.setLease(newLease)
newLease := NewLease(ls.client, ls.purpose)
ls.SetLease(newLease)

failpoint.Inject("skipGrantLeader", func(val failpoint.Value) {
name, ok := val.(string)
if len(name) == 0 {
// return directly when not set the name
failpoint.Return(errors.Errorf("failed to grant lease"))
}
var member pdpb.Member
_ = member.Unmarshal([]byte(leaderData))
name, ok := val.(string)
if ok && member.Name == name {
// only return when the name is set and the name is equal to the leader name
failpoint.Return(errors.Errorf("failed to grant lease"))
}
})
Expand Down Expand Up @@ -200,12 +215,12 @@ func (ls *Leadership) Keep(ctx context.Context) {
ls.keepAliveCancelFuncLock.Lock()
ls.keepAliveCtx, ls.keepAliveCancelFunc = context.WithCancel(ctx)
ls.keepAliveCancelFuncLock.Unlock()
go ls.getLease().KeepAlive(ls.keepAliveCtx)
go ls.GetLease().KeepAlive(ls.keepAliveCtx)
}

// Check returns whether the leadership is still available.
func (ls *Leadership) Check() bool {
return ls != nil && ls.getLease() != nil && !ls.getLease().IsExpired()
return ls != nil && ls.GetLease() != nil && !ls.GetLease().IsExpired()
}

// LeaderTxn returns txn() with a leader comparison to guarantee that
Expand Down Expand Up @@ -376,6 +391,13 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
// ONLY `{service}/primary/transfer` API update primary will meet this condition.
if ev.Type == mvccpb.PUT && ls.IsPrimary() {
log.Info("current leadership is updated", zap.Int64("revision", wresp.Header.Revision),
zap.String("leader-key", ls.leaderKey), zap.ByteString("cur-value", ev.Kv.Value),
zap.String("purpose", ls.purpose))
return
}
}
revision = wresp.Header.Revision + 1
}
Expand All @@ -385,13 +407,14 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {

// Reset does some defer jobs such as closing lease, resetting lease etc.
func (ls *Leadership) Reset() {
if ls == nil || ls.getLease() == nil {
if ls == nil || ls.GetLease() == nil {
return
}
ls.keepAliveCancelFuncLock.Lock()
if ls.keepAliveCancelFunc != nil {
ls.keepAliveCancelFunc()
}
ls.keepAliveCancelFuncLock.Unlock()
ls.getLease().Close()
ls.GetLease().Close()
ls.SetPrimaryWatch(false)
}
4 changes: 2 additions & 2 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func TestLeadership(t *testing.T) {
leadership2.Keep(ctx)

// Check the lease.
lease1 := leadership1.getLease()
lease1 := leadership1.GetLease()
re.NotNil(lease1)
lease2 := leadership2.getLease()
lease2 := leadership2.GetLease()
re.NotNil(lease2)

re.True(lease1.IsExpired())
Expand Down
24 changes: 17 additions & 7 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ const (
slowRequestTime = etcdutil.DefaultSlowRequestTime
)

// lease is used as the low-level mechanism for campaigning and renewing elected leadership.
// Lease is used as the low-level mechanism for campaigning and renewing elected leadership.
// The way to gain and maintain leadership is to update and keep the lease alive continuously.
type lease struct {
type Lease struct {
// purpose is used to show what this election for
Purpose string
// etcd client and lease
Expand All @@ -48,8 +48,17 @@ type lease struct {
expireTime atomic.Value
}

// NewLease creates a new Lease instance.
func NewLease(client *clientv3.Client, purpose string) *Lease {
return &Lease{
Purpose: purpose,
client: client,
lease: clientv3.NewLease(client),
}
}

// Grant uses `lease.Grant` to initialize the lease and expireTime.
func (l *lease) Grant(leaseTimeout int64) error {
func (l *Lease) Grant(leaseTimeout int64) error {
if l == nil {
return errs.ErrEtcdGrantLease.GenWithStackByCause("lease is nil")
}
Expand All @@ -71,7 +80,7 @@ func (l *lease) Grant(leaseTimeout int64) error {
}

// Close releases the lease.
func (l *lease) Close() error {
func (l *Lease) Close() error {
if l == nil {
return nil
}
Expand All @@ -92,15 +101,15 @@ func (l *lease) Close() error {

// IsExpired checks if the lease is expired. If it returns true,
// current leader should step down and try to re-elect again.
func (l *lease) IsExpired() bool {
func (l *Lease) IsExpired() bool {
if l == nil || l.expireTime.Load() == nil {
return true
}
return time.Now().After(l.expireTime.Load().(time.Time))
}

// KeepAlive auto renews the lease and update expireTime.
func (l *lease) KeepAlive(ctx context.Context) {
func (l *Lease) KeepAlive(ctx context.Context) {
defer logutil.LogPanic()

if l == nil {
Expand All @@ -109,6 +118,7 @@ func (l *lease) KeepAlive(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3)
defer log.Info("lease keep alive stopped", zap.String("purpose", l.Purpose))

var maxExpire time.Time
timer := time.NewTimer(l.leaseTimeout)
Expand Down Expand Up @@ -146,7 +156,7 @@ func (l *lease) KeepAlive(ctx context.Context) {
}

// Periodically call `lease.KeepAliveOnce` and post back latest received expire time into the channel.
func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time {
func (l *Lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time {
ch := make(chan time.Time)

go func() {
Expand Down
19 changes: 3 additions & 16 deletions pkg/election/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
)

func TestLease(t *testing.T) {
Expand All @@ -30,16 +29,8 @@ func TestLease(t *testing.T) {
defer clean()

// Create the lease.
lease1 := &lease{
Purpose: "test_lease_1",
client: client,
lease: clientv3.NewLease(client),
}
lease2 := &lease{
Purpose: "test_lease_2",
client: client,
lease: clientv3.NewLease(client),
}
lease1 := NewLease(client, "test_lease_1")
lease2 := NewLease(client, "test_lease_2")
re.True(lease1.IsExpired())
re.True(lease2.IsExpired())
re.NoError(lease1.Close())
Expand Down Expand Up @@ -95,11 +86,7 @@ func TestLeaseKeepAlive(t *testing.T) {
defer clean()

// Create the lease.
lease := &lease{
Purpose: "test_lease",
client: client,
lease: clientv3.NewLease(client),
}
lease := NewLease(client, "test_lease")

re.NoError(lease.Grant(defaultLeaseTimeout))
ch := lease.keepAliveWorker(context.Background(), 2*time.Second)
Expand Down
36 changes: 36 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/response"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
Expand Down Expand Up @@ -120,6 +121,7 @@ func NewService(srv *scheserver.Service) *Service {
s.RegisterHotspotRouter()
s.RegisterRegionsRouter()
s.RegisterStoresRouter()
s.RegisterPrimaryRouter()
return s
}

Expand Down Expand Up @@ -226,6 +228,12 @@ func (s *Service) RegisterConfigRouter() {
regions.GET("/:id/labels", getRegionLabels)
}

// RegisterPrimaryRouter registers the router of the primary handler.
func (s *Service) RegisterPrimaryRouter() {
router := s.root.Group("primary")
router.POST("transfer", transferPrimary)
}

// @Tags admin
// @Summary Change the log level.
// @Produce json
Expand Down Expand Up @@ -1478,3 +1486,31 @@ func getRegionByID(c *gin.Context) {
}
c.Data(http.StatusOK, "application/json", b)
}

// TransferPrimary transfers the primary member to `new_primary`.
// @Tags primary
// @Summary Transfer the primary member to `new_primary`.
// @Produce json
// @Param new_primary body string false "new primary name"
// @Success 200 string string
// @Router /primary/transfer [post]
func transferPrimary(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var input map[string]string
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

newPrimary := ""
if v, ok := input["new_primary"]; ok {
newPrimary = v
}

if err := mcsutils.TransferPrimary(svr.GetClient(), svr.GetParticipant().GetExpectedPrimaryLease(),
constant.SchedulingServiceName, svr.Name(), newPrimary, 0); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, "success")
}
33 changes: 33 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"os/signal"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -132,6 +133,11 @@ func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

// GetParticipant returns the participant.
func (s *Server) GetParticipant() *member.Participant {
return s.participant
}

// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
Expand Down Expand Up @@ -242,6 +248,20 @@ func (s *Server) primaryElectionLoop() {
log.Info("the scheduling primary has changed, try to re-campaign a primary")
}

// To make sure the expected primary(if existed) and new primary are on the same server.
expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath())
// skip campaign the primary if the expected primary is not empty and not equal to the current memberValue.
// expected primary ONLY SET BY `{service}/primary/transfer` API.
if len(expectedPrimary) > 0 && !strings.Contains(s.participant.MemberValue(), expectedPrimary) {
log.Info("skip campaigning of scheduling primary and check later",
zap.String("server-name", s.Name()),
zap.String("expected-primary-id", expectedPrimary),
zap.Uint64("member-id", s.participant.ID()),
zap.String("cur-member-value", s.participant.MemberValue()))
time.Sleep(200 * time.Millisecond)
continue
}

s.campaignLeader()
}
}
Expand Down Expand Up @@ -285,7 +305,17 @@ func (s *Server) campaignLeader() {
cb()
}
}()
// check expected primary and watch the primary.
exitPrimary := make(chan struct{})
lease, err := utils.KeepExpectedPrimaryAlive(ctx, s.GetClient(), exitPrimary,
s.cfg.LeaderLease, s.participant.GetLeaderPath(), s.participant.MemberValue(), constant.SchedulingServiceName)
if err != nil {
log.Error("prepare scheduling primary watch error", errs.ZapError(err))
return
}
s.participant.SetExpectedPrimaryLease(lease)
s.participant.EnableLeader()

member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

Expand All @@ -303,6 +333,9 @@ func (s *Server) campaignLeader() {
// Server is closed and it should return nil.
log.Info("server is closed")
return
case <-exitPrimary:
log.Info("no longer be primary because primary have been updated, the scheduling primary will step down")
return
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/mcs/scheduling/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *config.Conf
// GenerateConfig generates a new config with the given options.
func GenerateConfig(c *config.Config) (*config.Config, error) {
arguments := []string{
"--name=" + c.Name,
"--listen-addr=" + c.ListenAddr,
"--advertise-listen-addr=" + c.AdvertiseListenAddr,
"--backend-endpoints=" + c.BackendEndpoints,
}

flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError)
flagSet.StringP("name", "", "", "human-readable name for this scheduling member")
flagSet.BoolP("version", "V", false, "print version information and exit")
flagSet.StringP("config", "", "", "config file")
flagSet.StringP("backend-endpoints", "", "", "url for etcd client")
Expand Down
Loading

0 comments on commit 1c1cd99

Please sign in to comment.