From 1c1cd99ba73eaf1fd80a03310221b37ec48cf7d1 Mon Sep 17 00:00:00 2001 From: Hu# Date: Tue, 13 Aug 2024 12:58:31 +0800 Subject: [PATCH] pdms: support primary/transfer api for scheduling and tso (#8157) ref tikv/pd#5766, close tikv/pd#7995 Signed-off-by: husharp Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/election/leadership.go | 53 +++- pkg/election/leadership_test.go | 4 +- pkg/election/lease.go | 24 +- pkg/election/lease_test.go | 19 +- pkg/mcs/scheduling/server/apis/v1/api.go | 36 +++ pkg/mcs/scheduling/server/server.go | 33 ++ pkg/mcs/scheduling/server/testutil.go | 2 + pkg/mcs/tso/server/apis/v1/api.go | 49 +++ pkg/mcs/utils/expected_primary.go | 182 +++++++++++ pkg/member/member.go | 4 +- pkg/member/participant.go | 16 + pkg/tso/global_allocator.go | 49 ++- tests/integrations/mcs/members/member_test.go | 283 +++++++++++++++++- 13 files changed, 705 insertions(+), 49 deletions(-) create mode 100644 pkg/mcs/utils/expected_primary.go diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index f252eabe072..1361d685b57 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -71,6 +71,9 @@ type Leadership struct { // campaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`. // It is ordered by time to prevent the leader from campaigning too frequently. campaignTimes []time.Time + // primaryWatch is for the primary watch only, + // which is used to reuse `Watch` interface in `Leadership`. + primaryWatch atomic.Bool } // NewLeadership creates a new Leadership. @@ -84,17 +87,18 @@ func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadersh return leadership } -// getLease gets the lease of leadership, only if leadership is valid, +// GetLease gets the lease of leadership, only if leadership is valid, // i.e. the owner is a true leader, the lease is not nil. -func (ls *Leadership) getLease() *lease { +func (ls *Leadership) GetLease() *Lease { l := ls.lease.Load() if l == nil { return nil } - return l.(*lease) + return l.(*Lease) } -func (ls *Leadership) setLease(lease *lease) { +// SetLease sets the lease of leadership. +func (ls *Leadership) SetLease(lease *Lease) { ls.lease.Store(lease) } @@ -114,6 +118,16 @@ func (ls *Leadership) GetLeaderKey() string { return ls.leaderKey } +// SetPrimaryWatch sets the primary watch flag. +func (ls *Leadership) SetPrimaryWatch(val bool) { + ls.primaryWatch.Store(val) +} + +// IsPrimary gets the primary watch flag. +func (ls *Leadership) IsPrimary() bool { + return ls.primaryWatch.Load() +} + // GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`. // Need to make sure `AddCampaignTimes` is called before this function. func (ls *Leadership) GetCampaignTimesNum() int { @@ -152,18 +166,19 @@ func (ls *Leadership) AddCampaignTimes() { func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error { ls.leaderValue = leaderData // Create a new lease to campaign - newLease := &lease{ - Purpose: ls.purpose, - client: ls.client, - lease: clientv3.NewLease(ls.client), - } - ls.setLease(newLease) + newLease := NewLease(ls.client, ls.purpose) + ls.SetLease(newLease) failpoint.Inject("skipGrantLeader", func(val failpoint.Value) { + name, ok := val.(string) + if len(name) == 0 { + // return directly when not set the name + failpoint.Return(errors.Errorf("failed to grant lease")) + } var member pdpb.Member _ = member.Unmarshal([]byte(leaderData)) - name, ok := val.(string) if ok && member.Name == name { + // only return when the name is set and the name is equal to the leader name failpoint.Return(errors.Errorf("failed to grant lease")) } }) @@ -200,12 +215,12 @@ func (ls *Leadership) Keep(ctx context.Context) { ls.keepAliveCancelFuncLock.Lock() ls.keepAliveCtx, ls.keepAliveCancelFunc = context.WithCancel(ctx) ls.keepAliveCancelFuncLock.Unlock() - go ls.getLease().KeepAlive(ls.keepAliveCtx) + go ls.GetLease().KeepAlive(ls.keepAliveCtx) } // Check returns whether the leadership is still available. func (ls *Leadership) Check() bool { - return ls != nil && ls.getLease() != nil && !ls.getLease().IsExpired() + return ls != nil && ls.GetLease() != nil && !ls.GetLease().IsExpired() } // LeaderTxn returns txn() with a leader comparison to guarantee that @@ -376,6 +391,13 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return } + // ONLY `{service}/primary/transfer` API update primary will meet this condition. + if ev.Type == mvccpb.PUT && ls.IsPrimary() { + log.Info("current leadership is updated", zap.Int64("revision", wresp.Header.Revision), + zap.String("leader-key", ls.leaderKey), zap.ByteString("cur-value", ev.Kv.Value), + zap.String("purpose", ls.purpose)) + return + } } revision = wresp.Header.Revision + 1 } @@ -385,7 +407,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { // Reset does some defer jobs such as closing lease, resetting lease etc. func (ls *Leadership) Reset() { - if ls == nil || ls.getLease() == nil { + if ls == nil || ls.GetLease() == nil { return } ls.keepAliveCancelFuncLock.Lock() @@ -393,5 +415,6 @@ func (ls *Leadership) Reset() { ls.keepAliveCancelFunc() } ls.keepAliveCancelFuncLock.Unlock() - ls.getLease().Close() + ls.GetLease().Close() + ls.SetPrimaryWatch(false) } diff --git a/pkg/election/leadership_test.go b/pkg/election/leadership_test.go index 40f0bcbee23..e16c5842542 100644 --- a/pkg/election/leadership_test.go +++ b/pkg/election/leadership_test.go @@ -100,9 +100,9 @@ func TestLeadership(t *testing.T) { leadership2.Keep(ctx) // Check the lease. - lease1 := leadership1.getLease() + lease1 := leadership1.GetLease() re.NotNil(lease1) - lease2 := leadership2.getLease() + lease2 := leadership2.GetLease() re.NotNil(lease2) re.True(lease1.IsExpired()) diff --git a/pkg/election/lease.go b/pkg/election/lease.go index 45d702def5e..c2e9eb97117 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -34,9 +34,9 @@ const ( slowRequestTime = etcdutil.DefaultSlowRequestTime ) -// lease is used as the low-level mechanism for campaigning and renewing elected leadership. +// Lease is used as the low-level mechanism for campaigning and renewing elected leadership. // The way to gain and maintain leadership is to update and keep the lease alive continuously. -type lease struct { +type Lease struct { // purpose is used to show what this election for Purpose string // etcd client and lease @@ -48,8 +48,17 @@ type lease struct { expireTime atomic.Value } +// NewLease creates a new Lease instance. +func NewLease(client *clientv3.Client, purpose string) *Lease { + return &Lease{ + Purpose: purpose, + client: client, + lease: clientv3.NewLease(client), + } +} + // Grant uses `lease.Grant` to initialize the lease and expireTime. -func (l *lease) Grant(leaseTimeout int64) error { +func (l *Lease) Grant(leaseTimeout int64) error { if l == nil { return errs.ErrEtcdGrantLease.GenWithStackByCause("lease is nil") } @@ -71,7 +80,7 @@ func (l *lease) Grant(leaseTimeout int64) error { } // Close releases the lease. -func (l *lease) Close() error { +func (l *Lease) Close() error { if l == nil { return nil } @@ -92,7 +101,7 @@ func (l *lease) Close() error { // IsExpired checks if the lease is expired. If it returns true, // current leader should step down and try to re-elect again. -func (l *lease) IsExpired() bool { +func (l *Lease) IsExpired() bool { if l == nil || l.expireTime.Load() == nil { return true } @@ -100,7 +109,7 @@ func (l *lease) IsExpired() bool { } // KeepAlive auto renews the lease and update expireTime. -func (l *lease) KeepAlive(ctx context.Context) { +func (l *Lease) KeepAlive(ctx context.Context) { defer logutil.LogPanic() if l == nil { @@ -109,6 +118,7 @@ func (l *lease) KeepAlive(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3) + defer log.Info("lease keep alive stopped", zap.String("purpose", l.Purpose)) var maxExpire time.Time timer := time.NewTimer(l.leaseTimeout) @@ -146,7 +156,7 @@ func (l *lease) KeepAlive(ctx context.Context) { } // Periodically call `lease.KeepAliveOnce` and post back latest received expire time into the channel. -func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time { +func (l *Lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time { ch := make(chan time.Time) go func() { diff --git a/pkg/election/lease_test.go b/pkg/election/lease_test.go index 3d8515eadb2..3a02de97239 100644 --- a/pkg/election/lease_test.go +++ b/pkg/election/lease_test.go @@ -21,7 +21,6 @@ import ( "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" - "go.etcd.io/etcd/clientv3" ) func TestLease(t *testing.T) { @@ -30,16 +29,8 @@ func TestLease(t *testing.T) { defer clean() // Create the lease. - lease1 := &lease{ - Purpose: "test_lease_1", - client: client, - lease: clientv3.NewLease(client), - } - lease2 := &lease{ - Purpose: "test_lease_2", - client: client, - lease: clientv3.NewLease(client), - } + lease1 := NewLease(client, "test_lease_1") + lease2 := NewLease(client, "test_lease_2") re.True(lease1.IsExpired()) re.True(lease2.IsExpired()) re.NoError(lease1.Close()) @@ -95,11 +86,7 @@ func TestLeaseKeepAlive(t *testing.T) { defer clean() // Create the lease. - lease := &lease{ - Purpose: "test_lease", - client: client, - lease: clientv3.NewLease(client), - } + lease := NewLease(client, "test_lease") re.NoError(lease.Grant(defaultLeaseTimeout)) ch := lease.keepAliveWorker(context.Background(), 2*time.Second) diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index ce1522d465f..8b9427a8896 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/errs" scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server" mcsutils "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/response" sche "github.com/tikv/pd/pkg/schedule/core" "github.com/tikv/pd/pkg/schedule/handler" @@ -120,6 +121,7 @@ func NewService(srv *scheserver.Service) *Service { s.RegisterHotspotRouter() s.RegisterRegionsRouter() s.RegisterStoresRouter() + s.RegisterPrimaryRouter() return s } @@ -226,6 +228,12 @@ func (s *Service) RegisterConfigRouter() { regions.GET("/:id/labels", getRegionLabels) } +// RegisterPrimaryRouter registers the router of the primary handler. +func (s *Service) RegisterPrimaryRouter() { + router := s.root.Group("primary") + router.POST("transfer", transferPrimary) +} + // @Tags admin // @Summary Change the log level. // @Produce json @@ -1478,3 +1486,31 @@ func getRegionByID(c *gin.Context) { } c.Data(http.StatusOK, "application/json", b) } + +// TransferPrimary transfers the primary member to `new_primary`. +// @Tags primary +// @Summary Transfer the primary member to `new_primary`. +// @Produce json +// @Param new_primary body string false "new primary name" +// @Success 200 string string +// @Router /primary/transfer [post] +func transferPrimary(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server) + var input map[string]string + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + newPrimary := "" + if v, ok := input["new_primary"]; ok { + newPrimary = v + } + + if err := mcsutils.TransferPrimary(svr.GetClient(), svr.GetParticipant().GetExpectedPrimaryLease(), + constant.SchedulingServiceName, svr.Name(), newPrimary, 0); err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, "success") +} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 6ec4e444d6d..e1753cf2972 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -21,6 +21,7 @@ import ( "os" "os/signal" "runtime" + "strings" "sync" "sync/atomic" "syscall" @@ -132,6 +133,11 @@ func (s *Server) GetBackendEndpoints() string { return s.cfg.BackendEndpoints } +// GetParticipant returns the participant. +func (s *Server) GetParticipant() *member.Participant { + return s.participant +} + // SetLogLevel sets log level. func (s *Server) SetLogLevel(level string) error { if !logutil.IsLevelLegal(level) { @@ -242,6 +248,20 @@ func (s *Server) primaryElectionLoop() { log.Info("the scheduling primary has changed, try to re-campaign a primary") } + // To make sure the expected primary(if existed) and new primary are on the same server. + expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath()) + // skip campaign the primary if the expected primary is not empty and not equal to the current memberValue. + // expected primary ONLY SET BY `{service}/primary/transfer` API. + if len(expectedPrimary) > 0 && !strings.Contains(s.participant.MemberValue(), expectedPrimary) { + log.Info("skip campaigning of scheduling primary and check later", + zap.String("server-name", s.Name()), + zap.String("expected-primary-id", expectedPrimary), + zap.Uint64("member-id", s.participant.ID()), + zap.String("cur-member-value", s.participant.MemberValue())) + time.Sleep(200 * time.Millisecond) + continue + } + s.campaignLeader() } } @@ -285,7 +305,17 @@ func (s *Server) campaignLeader() { cb() } }() + // check expected primary and watch the primary. + exitPrimary := make(chan struct{}) + lease, err := utils.KeepExpectedPrimaryAlive(ctx, s.GetClient(), exitPrimary, + s.cfg.LeaderLease, s.participant.GetLeaderPath(), s.participant.MemberValue(), constant.SchedulingServiceName) + if err != nil { + log.Error("prepare scheduling primary watch error", errs.ZapError(err)) + return + } + s.participant.SetExpectedPrimaryLease(lease) s.participant.EnableLeader() + member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1) log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name())) @@ -303,6 +333,9 @@ func (s *Server) campaignLeader() { // Server is closed and it should return nil. log.Info("server is closed") return + case <-exitPrimary: + log.Info("no longer be primary because primary have been updated, the scheduling primary will step down") + return } } } diff --git a/pkg/mcs/scheduling/server/testutil.go b/pkg/mcs/scheduling/server/testutil.go index aba88945434..312365c81ab 100644 --- a/pkg/mcs/scheduling/server/testutil.go +++ b/pkg/mcs/scheduling/server/testutil.go @@ -49,12 +49,14 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *config.Conf // GenerateConfig generates a new config with the given options. func GenerateConfig(c *config.Config) (*config.Config, error) { arguments := []string{ + "--name=" + c.Name, "--listen-addr=" + c.ListenAddr, "--advertise-listen-addr=" + c.AdvertiseListenAddr, "--backend-endpoints=" + c.BackendEndpoints, } flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError) + flagSet.StringP("name", "", "", "human-readable name for this scheduling member") flagSet.BoolP("version", "V", false, "print version information and exit") flagSet.StringP("config", "", "", "config file") flagSet.StringP("backend-endpoints", "", "", "url for etcd client") diff --git a/pkg/mcs/tso/server/apis/v1/api.go b/pkg/mcs/tso/server/apis/v1/api.go index d5c4cd4ec48..19b3a1be612 100644 --- a/pkg/mcs/tso/server/apis/v1/api.go +++ b/pkg/mcs/tso/server/apis/v1/api.go @@ -30,6 +30,7 @@ import ( "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" "github.com/tikv/pd/pkg/utils/logutil" @@ -98,6 +99,7 @@ func NewService(srv *tsoserver.Service) *Service { s.RegisterKeyspaceGroupRouter() s.RegisterHealthRouter() s.RegisterConfigRouter() + s.RegisterPrimaryRouter() return s } @@ -126,6 +128,12 @@ func (s *Service) RegisterConfigRouter() { router.GET("", getConfig) } +// RegisterPrimaryRouter registers the router of the primary handler. +func (s *Service) RegisterPrimaryRouter() { + router := s.root.Group("primary") + router.POST("transfer", transferPrimary) +} + func changeLogLevel(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) var level string @@ -266,3 +274,44 @@ func getConfig(c *gin.Context) { svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) c.IndentedJSON(http.StatusOK, svr.GetConfig()) } + +// TransferPrimary transfers the primary member to `new_primary`. +// @Tags primary +// @Summary Transfer the primary member to `new_primary`. +// @Produce json +// @Param new_primary body string false "new primary name" +// @Success 200 string string +// @Router /primary/transfer [post] +func transferPrimary(c *gin.Context) { + svr := c.MustGet(multiservicesapi.ServiceContextKey).(*tsoserver.Service) + var input map[string]string + if err := c.BindJSON(&input); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + + // We only support default keyspace group now. + newPrimary, keyspaceGroupID := "", constant.DefaultKeyspaceGroupID + if v, ok := input["new_primary"]; ok { + newPrimary = v + } + + allocator, err := svr.GetTSOAllocatorManager(keyspaceGroupID) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + + globalAllocator, err := allocator.GetAllocator(tso.GlobalDCLocation) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + + if err := utils.TransferPrimary(svr.GetClient(), globalAllocator.(*tso.GlobalTSOAllocator).GetExpectedPrimaryLease(), + constant.TSOServiceName, svr.Name(), newPrimary, keyspaceGroupID); err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.IndentedJSON(http.StatusOK, "success") +} diff --git a/pkg/mcs/utils/expected_primary.go b/pkg/mcs/utils/expected_primary.go new file mode 100644 index 00000000000..d44b2eae436 --- /dev/null +++ b/pkg/mcs/utils/expected_primary.go @@ -0,0 +1,182 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "context" + "fmt" + "math/rand" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/election" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" + "github.com/tikv/pd/pkg/mcs/utils/constant" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +// expectedPrimaryFlag is the flag to indicate the expected primary. +// 1. When the primary was campaigned successfully, it will set the `expected_primary` flag. +// 2. Using `{service}/primary/transfer` API will revoke the previous lease and set a new `expected_primary` flag. +// This flag used to help new primary to campaign successfully while other secondaries can skip the campaign. +const expectedPrimaryFlag = "expected_primary" + +// expectedPrimaryPath formats the primary path with the expected primary flag. +func expectedPrimaryPath(primaryPath string) string { + return fmt.Sprintf("%s/%s", primaryPath, expectedPrimaryFlag) +} + +// GetExpectedPrimaryFlag gets the expected primary flag. +func GetExpectedPrimaryFlag(client *clientv3.Client, primaryPath string) string { + path := expectedPrimaryPath(primaryPath) + primary, err := etcdutil.GetValue(client, path) + if err != nil { + log.Error("get expected primary flag error", errs.ZapError(err), zap.String("primary-path", path)) + return "" + } + + return string(primary) +} + +// markExpectedPrimaryFlag marks the expected primary flag when the primary is specified. +func markExpectedPrimaryFlag(client *clientv3.Client, primaryPath string, leaderRaw string, leaseID clientv3.LeaseID) (int64, error) { + path := expectedPrimaryPath(primaryPath) + log.Info("set expected primary flag", zap.String("primary-path", path), zap.String("leader-raw", leaderRaw)) + // write a flag to indicate the expected primary. + resp, err := kv.NewSlowLogTxn(client). + Then(clientv3.OpPut(expectedPrimaryPath(primaryPath), leaderRaw, clientv3.WithLease(leaseID))). + Commit() + if err != nil || !resp.Succeeded { + log.Error("mark expected primary error", errs.ZapError(err), zap.String("primary-path", path)) + return 0, err + } + return resp.Header.Revision, nil +} + +// KeepExpectedPrimaryAlive keeps the expected primary alive. +// We use lease to keep `expected primary` healthy. +// ONLY reset by the following conditions: +// - changed by `{service}/primary/transfer` API. +// - leader lease expired. +// ONLY primary called this function. +func KeepExpectedPrimaryAlive(ctx context.Context, cli *clientv3.Client, exitPrimary chan<- struct{}, + leaseTimeout int64, leaderPath, memberValue, service string) (*election.Lease, error) { + log.Info("primary start to watch the expected primary", zap.String("service", service), zap.String("primary-value", memberValue)) + service = fmt.Sprintf("%s expected primary", service) + lease := election.NewLease(cli, service) + if err := lease.Grant(leaseTimeout); err != nil { + return nil, err + } + + revision, err := markExpectedPrimaryFlag(cli, leaderPath, memberValue, lease.ID.Load().(clientv3.LeaseID)) + if err != nil { + log.Error("mark expected primary error", errs.ZapError(err)) + return nil, err + } + // Keep alive the current expected primary leadership to indicate that the server is still alive. + // Watch the expected primary path to check whether the expected primary has changed by `{service}/primary/transfer` API. + expectedPrimary := election.NewLeadership(cli, expectedPrimaryPath(leaderPath), service) + expectedPrimary.SetLease(lease) + expectedPrimary.Keep(ctx) + + go watchExpectedPrimary(ctx, expectedPrimary, revision+1, exitPrimary) + return lease, nil +} + +// watchExpectedPrimary watches `{service}/primary/transfer` API whether changed the expected primary. +func watchExpectedPrimary(ctx context.Context, + expectedPrimary *election.Leadership, revision int64, exitPrimary chan<- struct{}) { + expectedPrimary.SetPrimaryWatch(true) + // ONLY exited watch by the following conditions: + // - changed by `{service}/primary/transfer` API. + // - leader lease expired. + expectedPrimary.Watch(ctx, revision) + expectedPrimary.Reset() + defer log.Info("primary exit the primary watch loop") + select { + case <-ctx.Done(): + return + case exitPrimary <- struct{}{}: + return + } +} + +// TransferPrimary transfers the primary of the specified service. +// keyspaceGroupID is optional, only used for TSO service. +func TransferPrimary(client *clientv3.Client, lease *election.Lease, serviceName, + oldPrimary, newPrimary string, keyspaceGroupID uint32) error { + if lease == nil { + return errors.New("current lease is nil, please check leadership") + } + log.Info("try to transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimary), zap.String("to", newPrimary)) + entries, err := discovery.GetMSMembers(serviceName, client) + if err != nil { + return err + } + + // Do nothing when I am the only member of cluster. + if len(entries) == 1 { + return errors.Errorf("no valid secondary to transfer primary, the only member is %s", entries[0].Name) + } + + var primaryIDs []string + for _, member := range entries { + if (newPrimary == "" && member.Name != oldPrimary) || (newPrimary != "" && member.Name == newPrimary) { + primaryIDs = append(primaryIDs, member.ServiceAddr) + } + } + if len(primaryIDs) == 0 { + return errors.Errorf("no valid secondary to transfer primary, from %s to %s", oldPrimary, newPrimary) + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + nextPrimaryID := r.Intn(len(primaryIDs)) + + clusterID, err := etcdutil.GetClusterID(client, constant.ClusterIDPath) + if err != nil { + return errors.Errorf("failed to get cluster ID: %v", err) + } + + // update expected primary flag + grantResp, err := client.Grant(client.Ctx(), constant.DefaultLeaderLease) + if err != nil { + return errors.Errorf("failed to grant lease for expected primary, err: %v", err) + } + + // revoke current primary's lease to ensure keepalive goroutine of primary exits. + if err := lease.Close(); err != nil { + return errors.Errorf("failed to revoke current primary's lease: %v", err) + } + + var primaryPath string + switch serviceName { + case constant.SchedulingServiceName: + primaryPath = endpoint.SchedulingPrimaryPath(clusterID) + case constant.TSOServiceName: + tsoRootPath := endpoint.TSOSvcRootPath(clusterID) + primaryPath = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID) + } + _, err = markExpectedPrimaryFlag(client, primaryPath, primaryIDs[nextPrimaryID], grantResp.ID) + if err != nil { + return errors.Errorf("failed to mark expected primary flag for %s, err: %v", serviceName, err) + } + return nil +} diff --git a/pkg/member/member.go b/pkg/member/member.go index 99fc5457b71..9c61a832b5f 100644 --- a/pkg/member/member.go +++ b/pkg/member/member.go @@ -331,8 +331,8 @@ func (m *EmbeddedEtcdMember) GetEtcdLeader() uint64 { } // IsSameLeader checks whether a server is the leader itself. -func (m *EmbeddedEtcdMember) IsSameLeader(leader *pdpb.Member) bool { - return leader.GetMemberId() == m.ID() +func (m *EmbeddedEtcdMember) IsSameLeader(leader any) bool { + return leader.(*pdpb.Member).GetMemberId() == m.ID() } // InitMemberInfo initializes the member info. diff --git a/pkg/member/participant.go b/pkg/member/participant.go index 43a91195bff..5e35d127bf7 100644 --- a/pkg/member/participant.go +++ b/pkg/member/participant.go @@ -67,6 +67,8 @@ type Participant struct { campaignChecker atomic.Value // Store as leadershipCheckFunc // lastLeaderUpdatedTime is the last time when the leader is updated. lastLeaderUpdatedTime atomic.Value + // expectedPrimaryLease is the expected lease for the primary. + expectedPrimaryLease atomic.Value // stored as *election.Lease } // NewParticipant create a new Participant. @@ -374,6 +376,20 @@ func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) { m.campaignChecker.Store(checker) } +// SetExpectedPrimaryLease sets the expected lease for the primary. +func (m *Participant) SetExpectedPrimaryLease(lease *election.Lease) { + m.expectedPrimaryLease.Store(lease) +} + +// GetExpectedPrimaryLease gets the expected lease for the primary. +func (m *Participant) GetExpectedPrimaryLease() *election.Lease { + l := m.expectedPrimaryLease.Load() + if l == nil { + return nil + } + return l.(*election.Lease) +} + // NewParticipantByService creates a new participant by service name. func NewParticipantByService(serviceName string) (p participant) { switch serviceName { diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index b745544ad05..38511ee2913 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "runtime/trace" + "strings" "sync" "sync/atomic" "time" @@ -27,7 +28,9 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" + mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/slice" @@ -78,8 +81,10 @@ type GlobalTSOAllocator struct { // for global TSO synchronization am *AllocatorManager // for election use - member ElectionMember - timestampOracle *timestampOracle + member ElectionMember + // expectedPrimaryLease is used to store the expected primary lease. + expectedPrimaryLease atomic.Value // store as *election.LeaderLease + timestampOracle *timestampOracle // syncRTT is the RTT duration a SyncMaxTS RPC call will cost, // which is used to estimate the MaxTS in a Global TSO generation // to reduce the gRPC network IO latency. @@ -560,6 +565,20 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) } + // To make sure the expected primary(if existed) and new primary are on the same server. + expectedPrimary := mcsutils.GetExpectedPrimaryFlag(gta.member.Client(), gta.member.GetLeaderPath()) + // skip campaign the primary if the expected primary is not empty and not equal to the current memberValue. + // expected primary ONLY SET BY `{service}/primary/transfer` API. + if len(expectedPrimary) > 0 && !strings.Contains(gta.member.MemberValue(), expectedPrimary) { + log.Info("skip campaigning of tso primary and check later", + zap.String("server-name", gta.member.Name()), + zap.String("expected-primary-id", expectedPrimary), + zap.Uint64("member-id", gta.member.ID()), + zap.String("cur-memberValue", gta.member.MemberValue())) + time.Sleep(200 * time.Millisecond) + continue + } + gta.campaignLeader() } } @@ -596,7 +615,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { gta.member.ResetLeader() }) - // maintain the the leadership, after this, TSO can be service. + // maintain the leadership, after this, TSO can be service. gta.member.KeepLeader(ctx) log.Info("campaign tso primary ok", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0), @@ -620,8 +639,18 @@ func (gta *GlobalTSOAllocator) campaignLeader() { gta.am.ResetAllocatorGroup(GlobalDCLocation, false) }() - tsoLabel := fmt.Sprintf("TSO Service Group %d", gta.getGroupID()) + // check expected primary and watch the primary. + exitPrimary := make(chan struct{}) + lease, err := mcsutils.KeepExpectedPrimaryAlive(ctx, gta.member.Client(), exitPrimary, + gta.am.leaderLease, gta.member.GetLeaderPath(), gta.member.MemberValue(), constant.TSOServiceName) + if err != nil { + log.Error("prepare tso primary watch error", errs.ZapError(err)) + return + } + gta.expectedPrimaryLease.Store(lease) gta.member.EnableLeader() + + tsoLabel := fmt.Sprintf("TSO Service Group %d", gta.getGroupID()) member.ServiceMemberGauge.WithLabelValues(tsoLabel).Set(1) defer resetLeaderOnce.Do(func() { cancel() @@ -651,10 +680,22 @@ func (gta *GlobalTSOAllocator) campaignLeader() { log.Info("exit leader campaign", logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0)) return + case <-exitPrimary: + log.Info("no longer be primary because primary have been updated, the TSO primary will step down") + return } } } +// GetExpectedPrimaryLease returns the expected primary lease. +func (gta *GlobalTSOAllocator) GetExpectedPrimaryLease() *election.Lease { + l := gta.expectedPrimaryLease.Load() + if l == nil { + return nil + } + return l.(*election.Lease) +} + func (gta *GlobalTSOAllocator) getMetrics() *tsoMetrics { return gta.timestampOracle.metrics } diff --git a/tests/integrations/mcs/members/member_test.go b/tests/integrations/mcs/members/member_test.go index 79b269e2e8a..dffa6305d0b 100644 --- a/tests/integrations/mcs/members/member_test.go +++ b/tests/integrations/mcs/members/member_test.go @@ -15,13 +15,18 @@ package members_test import ( + "bytes" "context" + "encoding/json" + "fmt" + "net/http" "testing" + "time" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/suite" pdClient "github.com/tikv/pd/client/http" bs "github.com/tikv/pd/pkg/basicserver" - "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" @@ -35,6 +40,9 @@ type memberTestSuite struct { server *tests.TestServer backendEndpoints string pdClient pdClient.Client + + tsoNodes map[string]bs.Server + schedulingNodes map[string]bs.Server } func TestMemberTestSuite(t *testing.T) { @@ -57,7 +65,7 @@ func (suite *memberTestSuite) SetupTest() { // TSO nodes := make(map[string]bs.Server) - for i := 0; i < constant.DefaultKeyspaceGroupReplicaCount; i++ { + for i := 0; i < 3; i++ { s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) nodes[s.GetAddr()] = s suite.cleanupFunc = append(suite.cleanupFunc, func() { @@ -65,6 +73,7 @@ func (suite *memberTestSuite) SetupTest() { }) } tests.WaitForPrimaryServing(re, nodes) + suite.tsoNodes = nodes // Scheduling nodes = make(map[string]bs.Server) @@ -76,6 +85,7 @@ func (suite *memberTestSuite) SetupTest() { }) } tests.WaitForPrimaryServing(re, nodes) + suite.schedulingNodes = nodes suite.cleanupFunc = append(suite.cleanupFunc, func() { cancel() @@ -96,7 +106,7 @@ func (suite *memberTestSuite) TestMembers() { re := suite.Require() members, err := suite.pdClient.GetMicroServiceMembers(suite.ctx, "tso") re.NoError(err) - re.Len(members, constant.DefaultKeyspaceGroupReplicaCount) + re.Len(members, 3) members, err = suite.pdClient.GetMicroServiceMembers(suite.ctx, "scheduling") re.NoError(err) @@ -113,3 +123,270 @@ func (suite *memberTestSuite) TestPrimary() { re.NoError(err) re.NotEmpty(primary) } + +func (suite *memberTestSuite) TestCampaignPrimaryWhileServerClose() { + re := suite.Require() + primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, "tso") + re.NoError(err) + re.NotEmpty(primary) + + supportedServices := []string{"tso", "scheduling"} + for _, service := range supportedServices { + var nodes map[string]bs.Server + switch service { + case "tso": + nodes = suite.tsoNodes + case "scheduling": + nodes = suite.schedulingNodes + } + + primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + + // Close old and new primary to mock campaign primary + for _, member := range nodes { + if member.GetAddr() != primary { + nodes[member.Name()].Close() + break + } + } + nodes[primary].Close() + tests.WaitForPrimaryServing(re, nodes) + + // primary should be different with before + onlyPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + re.NotEqual(primary, onlyPrimary) + } +} + +func (suite *memberTestSuite) TestTransferPrimary() { + re := suite.Require() + supportedServices := []string{"tso", "scheduling"} + for _, service := range supportedServices { + var nodes map[string]bs.Server + switch service { + case "tso": + nodes = suite.tsoNodes + case "scheduling": + nodes = suite.schedulingNodes + } + + // Test resign primary by random + primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + + newPrimaryData := make(map[string]any) + newPrimaryData["new_primary"] = "" + data, _ := json.Marshal(newPrimaryData) + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode) + resp.Body.Close() + + testutil.Eventually(re, func() bool { + for _, member := range nodes { + if member.GetAddr() != primary && member.IsServing() { + return true + } + } + return false + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + primary, err = suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + + // Test transfer primary to a specific node + var newPrimary string + for _, member := range nodes { + if member.GetAddr() != primary { + newPrimary = member.Name() + break + } + } + newPrimaryData["new_primary"] = newPrimary + data, _ = json.Marshal(newPrimaryData) + resp, err = tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode) + resp.Body.Close() + + testutil.Eventually(re, func() bool { + return nodes[newPrimary].IsServing() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + primary, err = suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + re.Equal(primary, newPrimary) + + // Test transfer primary to a non-exist node + newPrimary = "http://" + newPrimaryData["new_primary"] = newPrimary + data, _ = json.Marshal(newPrimaryData) + resp, err = tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) + re.NoError(err) + re.Equal(http.StatusInternalServerError, resp.StatusCode) + resp.Body.Close() + } +} + +func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() { + re := suite.Require() + supportedServices := []string{"tso", "scheduling"} + for _, service := range supportedServices { + var nodes map[string]bs.Server + switch service { + case "tso": + nodes = suite.tsoNodes + case "scheduling": + nodes = suite.schedulingNodes + } + + primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + + // Test transfer primary to a specific node + var newPrimary string + for _, member := range nodes { + if member.GetAddr() != primary { + newPrimary = member.Name() + break + } + } + newPrimaryData := make(map[string]any) + newPrimaryData["new_primary"] = newPrimary + data, _ := json.Marshal(newPrimaryData) + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode) + resp.Body.Close() + + tests.WaitForPrimaryServing(re, nodes) + newPrimary, err = suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + re.NotEqual(primary, newPrimary) + + // Close old and new primary to mock campaign primary + nodes[primary].Close() + nodes[newPrimary].Close() + tests.WaitForPrimaryServing(re, nodes) + // Primary should be different with before + onlyPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + re.NotEqual(primary, onlyPrimary) + re.NotEqual(newPrimary, onlyPrimary) + } +} + +func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpired() { + re := suite.Require() + primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, "tso") + re.NoError(err) + re.NotEmpty(primary) + + supportedServices := []string{"tso", "scheduling"} + for _, service := range supportedServices { + var nodes map[string]bs.Server + switch service { + case "tso": + nodes = suite.tsoNodes + case "scheduling": + nodes = suite.schedulingNodes + } + + primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + + // Test transfer primary to a specific node + var newPrimary string + for _, member := range nodes { + if member.GetAddr() != primary { + newPrimary = member.Name() + break + } + } + // Mock the new primary can not grant leader which means the lease will expire + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", `return()`)) + newPrimaryData := make(map[string]any) + newPrimaryData["new_primary"] = newPrimary + data, _ := json.Marshal(newPrimaryData) + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode) + resp.Body.Close() + + // Wait for the old primary exit and new primary campaign + // cannot check newPrimary isServing when skipGrantLeader is enabled + testutil.Eventually(re, func() bool { + return !nodes[primary].IsServing() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + // TODO: Add campaign times check in mcs to avoid frequent campaign + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/skipGrantLeader")) + // Can still work after lease expired + tests.WaitForPrimaryServing(re, nodes) + } +} + +// TestTransferPrimaryWhileLeaseExpiredAndServerDown tests transfer primary while lease expired and server down +func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown() { + re := suite.Require() + primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, "tso") + re.NoError(err) + re.NotEmpty(primary) + + supportedServices := []string{"tso", "scheduling"} + for _, service := range supportedServices { + var nodes map[string]bs.Server + switch service { + case "tso": + nodes = suite.tsoNodes + case "scheduling": + nodes = suite.schedulingNodes + } + + primary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + + // Test transfer primary to a specific node + var newPrimary string + for _, member := range nodes { + if member.GetAddr() != primary { + newPrimary = member.Name() + break + } + } + // Mock the new primary can not grant leader which means the lease will expire + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", `return()`)) + newPrimaryData := make(map[string]any) + newPrimaryData["new_primary"] = "" + data, _ := json.Marshal(newPrimaryData) + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + "application/json", bytes.NewBuffer(data)) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode) + resp.Body.Close() + + // Wait for the old primary exit and new primary campaign + // cannot check newPrimary isServing when skipGrantLeader is enabled + testutil.Eventually(re, func() bool { + return !nodes[primary].IsServing() + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + + // TODO: Add campaign times check in mcs to avoid frequent campaign + // for now, close the current primary to mock the server down + nodes[newPrimary].Close() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/skipGrantLeader")) + + tests.WaitForPrimaryServing(re, nodes) + // Primary should be different with before + onlyPrimary, err := suite.pdClient.GetMicroServicePrimary(suite.ctx, service) + re.NoError(err) + re.NotEqual(newPrimary, onlyPrimary) + } +}