From 8fef7afa4b14a24e0cff0d54329abf51475cb528 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 16 Oct 2024 16:41:39 +0800 Subject: [PATCH] fix Signed-off-by: Ryan Leung --- server/cluster/cluster.go | 45 ++++++++++++---------- tests/integrations/tso/client_test.go | 4 +- tests/integrations/tso/consistency_test.go | 1 + tests/server/tso/allocator_test.go | 3 ++ tests/server/tso/global_tso_test.go | 3 ++ tests/server/tso/tso_test.go | 2 + 6 files changed, 35 insertions(+), 23 deletions(-) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b1e41b637a1..e60f8385521 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -402,29 +402,32 @@ func (c *RaftCluster) checkSchedulingService() { // checkTSOService checks the TSO service. func (c *RaftCluster) checkTSOService() { - if !c.isAPIServiceMode { - if c.member.IsLeader() { - if err := c.startTSOJobs(); err != nil { - // If there is an error, need to wait for the next check. - return + if c.isAPIServiceMode { + return + } + if c.member.IsLeader() { + if err := c.startTSOJobs(); err != nil { + // If there is an error, need to wait for the next check. + log.Error("failed to start TSO jobs", errs.ZapError(err)) + return + } + } else { + // leader exits, reset the allocator group + if err := c.stopTSOJobs(); err != nil { + // If there is an error, need to wait for the next check. + log.Error("failed to stop TSO jobs", errs.ZapError(err)) + return + } + + failpoint.Inject("updateAfterResetTSO", func() { + allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) + if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { + log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) } - } else { - // leader exits, reset the allocator group - if err := c.stopTSOJobs(); err != nil { - // If there is an error, need to wait for the next check. - return + if allocator.IsInitialize() { + log.Panic("the allocator should be uninitialized after reset") } - - failpoint.Inject("updateAfterResetTSO", func() { - allocator, _ := c.tsoAllocator.GetAllocator(tso.GlobalDCLocation) - if err := allocator.UpdateTSO(); !errorspkg.Is(err, errs.ErrUpdateTimestamp) { - log.Panic("the tso update after reset should return ErrUpdateTimestamp as expected", zap.Error(err)) - } - if allocator.IsInitialize() { - log.Panic("the allocator should be uninitialized after reset") - } - }) - } + }) } } diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index a669e093200..d1a649cbfa6 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -339,12 +339,12 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() { return err == nil }) // Resign leader to trigger the TSO resetting. - re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)")) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/updateAfterResetTSO", "return(true)")) oldLeaderName := suite.cluster.WaitLeader() re.NotEmpty(oldLeaderName) err := suite.cluster.GetServer(oldLeaderName).ResignLeader() re.NoError(err) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/updateAfterResetTSO")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/updateAfterResetTSO")) newLeaderName := suite.cluster.WaitLeader() re.NotEmpty(newLeaderName) re.NotEqual(oldLeaderName, newLeaderName) diff --git a/tests/integrations/tso/consistency_test.go b/tests/integrations/tso/consistency_test.go index 9ebe6dec8af..f82f58ee6c8 100644 --- a/tests/integrations/tso/consistency_test.go +++ b/tests/integrations/tso/consistency_test.go @@ -81,6 +81,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() re.NotEmpty(leaderName) suite.pdLeaderServer = suite.cluster.GetServer(leaderName) + suite.pdLeaderServer.BootstrapCluster() backendEndpoints := suite.pdLeaderServer.GetAddr() if suite.legacy { suite.pdClient = tu.MustNewGrpcClient(re, backendEndpoints) diff --git a/tests/server/tso/allocator_test.go b/tests/server/tso/allocator_test.go index 692aec490eb..257cd3b6a34 100644 --- a/tests/server/tso/allocator_test.go +++ b/tests/server/tso/allocator_test.go @@ -127,6 +127,9 @@ func TestPriorityAndDifferentLocalTSO(t *testing.T) { re.NoError(cluster.RunInitialServers()) cluster.WaitAllLeaders(re, dcLocationConfig) + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() // Wait for all nodes becoming healthy. time.Sleep(time.Second * 5) diff --git a/tests/server/tso/global_tso_test.go b/tests/server/tso/global_tso_test.go index d8f64afe871..c340c44d3d2 100644 --- a/tests/server/tso/global_tso_test.go +++ b/tests/server/tso/global_tso_test.go @@ -99,6 +99,7 @@ func TestDelaySyncTimestamp(t *testing.T) { var leaderServer, nextLeaderServer *tests.TestServer leaderServer = cluster.GetLeaderServer() re.NotNil(leaderServer) + leaderServer.BootstrapCluster() for _, s := range cluster.GetServers() { if s.GetConfig().Name != cluster.GetLeader() { nextLeaderServer = s @@ -146,6 +147,8 @@ func TestLogicalOverflow(t *testing.T) { re.NotEmpty(cluster.WaitLeader()) leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + leaderServer.BootstrapCluster() grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 5be37e293cf..26f3878694f 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -114,6 +114,8 @@ func TestDisableLocalTSOAfterEnabling(t *testing.T) { re.NoError(cluster.RunInitialServers()) cluster.WaitAllLeaders(re, dcLocationConfig) + leaderServer := cluster.GetLeaderServer() + leaderServer.BootstrapCluster() requestLocalTSOs(re, cluster, dcLocationConfig) // Reboot the cluster.