From c00c42e77b3143f7293f9513a780b4e73597d603 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 20 Mar 2024 16:17:13 +0800 Subject: [PATCH 01/15] client/tso: fix the bug that collected TSO requests could never be finished (#7951) close tikv/pd#7849 If the `dispatcherCtx` is canceled, the collected `tsoRequest`s could be left unfinished forever, which could cause the upper caller never to get the results. This PR fixed this bug by finishing it on all the possible paths. Signed-off-by: JmPotato --- client/client.go | 43 +++++++++++++++------ client/tso_batch_controller.go | 16 +++++++- client/tso_client.go | 2 +- client/tso_dispatcher.go | 30 ++++++++------- client/tso_stream.go | 8 ++-- tests/integrations/tso/client_test.go | 55 +++++++++++++++++++++++++-- 6 files changed, 119 insertions(+), 35 deletions(-) diff --git a/client/client.go b/client/client.go index 31481b918e6..8838c184d92 100644 --- a/client/client.go +++ b/client/client.go @@ -606,12 +606,22 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { log.Info("[pd] changing service mode", zap.String("old-mode", c.serviceMode.String()), zap.String("new-mode", newMode.String())) + c.resetTSOClientLocked(newMode) + oldMode := c.serviceMode + c.serviceMode = newMode + log.Info("[pd] service mode changed", + zap.String("old-mode", oldMode.String()), + zap.String("new-mode", newMode.String())) +} + +// Reset a new TSO client. +func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) { // Re-create a new TSO client. var ( newTSOCli *tsoClient newTSOSvcDiscovery ServiceDiscovery ) - switch newMode { + switch mode { case pdpb.ServiceMode_PD_SVC_MODE: newTSOCli = newTSOClient(c.ctx, c.option, c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{}) @@ -649,11 +659,6 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { // We are switching from API service mode to PD service mode, so delete the old tso microservice discovery. oldTSOSvcDiscovery.Close() } - oldMode := c.serviceMode - c.serviceMode = newMode - log.Info("[pd] service mode changed", - zap.String("old-mode", oldMode.String()), - zap.String("new-mode", newMode.String())) } func (c *client) getTSOClient() *tsoClient { @@ -662,6 +667,13 @@ func (c *client) getTSOClient() *tsoClient { return c.tsoClient } +// ResetTSOClient resets the TSO client, only for test. +func (c *client) ResetTSOClient() { + c.Lock() + defer c.Unlock() + c.resetTSOClientLocked(c.serviceMode) +} + func (c *client) getServiceMode() pdpb.ServiceMode { c.RLock() defer c.RUnlock() @@ -779,18 +791,25 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur defer span.Finish() } - req := tsoReqPool.Get().(*tsoRequest) - req.requestCtx = ctx - req.clientCtx = c.ctx - req.start = time.Now() - req.dcLocation = dcLocation - + req := c.getTSORequest(ctx, dcLocation) if err := c.dispatchTSORequestWithRetry(req); err != nil { req.done <- err } return req } +func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest { + req := tsoReqPool.Get().(*tsoRequest) + // Set needed fields in the request before using it. + req.start = time.Now() + req.clientCtx = c.ctx + req.requestCtx = ctx + req.physical = 0 + req.logical = 0 + req.dcLocation = dcLocation + return req +} + const ( dispatchRetryDelay = 50 * time.Millisecond dispatchRetryCount = 2 diff --git a/client/tso_batch_controller.go b/client/tso_batch_controller.go index 842c772abd9..bd7a440fb08 100644 --- a/client/tso_batch_controller.go +++ b/client/tso_batch_controller.go @@ -16,7 +16,10 @@ package pd import ( "context" + "runtime/trace" "time" + + "github.com/tikv/pd/client/tsoutil" ) type tsoBatchController struct { @@ -130,7 +133,18 @@ func (tbc *tsoBatchController) adjustBestBatchSize() { } } -func (tbc *tsoBatchController) revokePendingRequest(err error) { +func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) { + for i := 0; i < tbc.collectedRequestCount; i++ { + tsoReq := tbc.collectedRequests[i] + tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) + defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End() + tsoReq.done <- err + } + // Prevent the finished requests from being processed again. + tbc.collectedRequestCount = 0 +} + +func (tbc *tsoBatchController) revokePendingRequests(err error) { for i := 0; i < len(tbc.tsoRequestCh); i++ { req := <-tbc.tsoRequestCh req.done <- err diff --git a/client/tso_client.go b/client/tso_client.go index eeeaf202ce6..c563df0efdb 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -141,7 +141,7 @@ func (c *tsoClient) Close() { if dispatcherInterface != nil { dispatcher := dispatcherInterface.(*tsoDispatcher) tsoErr := errors.WithStack(errClosing) - dispatcher.tsoBatchController.revokePendingRequest(tsoErr) + dispatcher.tsoBatchController.revokePendingRequests(tsoErr) dispatcher.dispatcherCancel() } return true diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 3a6f109bfd4..a625f8dbbe1 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -350,7 +350,8 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) { func (c *tsoClient) handleDispatcher( dispatcherCtx context.Context, dc string, - tbc *tsoBatchController) { + tbc *tsoBatchController, +) { var ( err error streamURL string @@ -428,7 +429,11 @@ tsoBatchLoop: } // Start to collect the TSO requests. maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() + // Once the TSO requests are collected, must make sure they could be finished or revoked eventually, + // otherwise the upper caller may get blocked on waiting for the results. if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { + // Finish the collected requests if the fetch failed. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) if err == context.Canceled { log.Info("[tso] stop fetching the pending tso requests due to context canceled", zap.String("dc-location", dc)) @@ -468,13 +473,16 @@ tsoBatchLoop: timer := time.NewTimer(retryInterval) select { case <-dispatcherCtx.Done(): + // Finish the collected requests if the context is canceled. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) timer.Stop() return case <-streamLoopTimer.C: err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) c.svcDiscovery.ScheduleCheckMemberChanged() - c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) + // Finish the collected requests if the stream is failed to be created. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) timer.Stop() continue tsoBatchLoop case <-timer.C: @@ -504,9 +512,12 @@ tsoBatchLoop: } select { case <-dispatcherCtx.Done(): + // Finish the collected requests if the context is canceled. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) return case tsDeadlineCh.(chan *deadline) <- dl: } + // processRequests guarantees that the collected requests could be finished properly. err = c.processRequests(stream, dc, tbc) close(done) // If error happens during tso stream handling, reset stream and run the next trial. @@ -776,13 +787,14 @@ func (c *tsoClient) processRequests( defer span.Finish() } } + count := int64(len(requests)) reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID() respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID, - dcLocation, requests, tbc.batchStartTime) + dcLocation, count, tbc.batchStartTime) if err != nil { - c.finishRequest(requests, 0, 0, 0, err) + tbc.finishCollectedRequests(0, 0, 0, err) return err } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. @@ -796,7 +808,7 @@ func (c *tsoClient) processRequests( logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), } c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) - c.finishRequest(requests, physical, firstLogical, suffixBits, nil) + tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil) return nil } @@ -843,11 +855,3 @@ func (c *tsoClient) compareAndSwapTS( lastTSOInfo.physical = curTSOInfo.physical lastTSOInfo.logical = curTSOInfo.logical } - -func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) { - for i := 0; i < len(requests); i++ { - requests[i].physical, requests[i].logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) - defer trace.StartRegion(requests[i].requestCtx, "pdclient.tsoReqDequeue").End() - requests[i].done <- err - } -} diff --git a/client/tso_stream.go b/client/tso_stream.go index acefa19d21c..83c0f08d4e0 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -106,7 +106,7 @@ type tsoStream interface { // processRequests processes TSO requests in streaming mode to get timestamps processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - requests []*tsoRequest, batchStartTime time.Time, + count int64, batchStartTime time.Time, ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) } @@ -120,10 +120,9 @@ func (s *pdTSOStream) getServerURL() string { } func (s *pdTSOStream) processRequests( - clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time, + clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time, ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { start := time.Now() - count := int64(len(requests)) req := &pdpb.TsoRequest{ Header: &pdpb.RequestHeader{ ClusterId: clusterID, @@ -175,10 +174,9 @@ func (s *tsoTSOStream) getServerURL() string { func (s *tsoTSOStream) processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - requests []*tsoRequest, batchStartTime time.Time, + count int64, batchStartTime time.Time, ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { start := time.Now() - count := int64(len(requests)) req := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: clusterID, diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 3d7b099f342..b0bd6f1d4e5 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -21,6 +21,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "testing" "time" @@ -66,6 +67,10 @@ type tsoClientTestSuite struct { clients []pd.Client } +func (suite *tsoClientTestSuite) getBackendEndpoints() []string { + return strings.Split(suite.backendEndpoints, ",") +} + func TestLegacyTSOClient(t *testing.T) { suite.Run(t, &tsoClientTestSuite{ legacy: true, @@ -98,7 +103,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.keyspaceIDs = make([]uint32, 0) if suite.legacy { - client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}, pd.WithForwardingOption(true)) + client, err := pd.NewClientWithContext(suite.ctx, suite.getBackendEndpoints(), pd.SecurityOption{}, pd.WithForwardingOption(true)) re.NoError(err) innerClient, ok := client.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) @@ -173,7 +178,7 @@ func (suite *tsoClientTestSuite) waitForAllKeyspaceGroupsInServing(re *require.A // Create clients and make sure they all have discovered the tso service. suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( - suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) + suite.ctx, re, suite.keyspaceIDs, suite.getBackendEndpoints()) re.Equal(len(suite.keyspaceIDs), len(suite.clients)) } @@ -254,7 +259,7 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { ctx, cancel := context.WithCancel(suite.ctx) defer cancel() client := mcs.SetupClientWithKeyspaceID( - ctx, re, keyspaceID, strings.Split(suite.backendEndpoints, ",")) + ctx, re, keyspaceID, suite.getBackendEndpoints()) defer client.Close() var lastTS uint64 for j := 0; j < tsoRequestRound; j++ { @@ -420,6 +425,50 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")) } +func (suite *tsoClientTestSuite) TestGetTSWhileRestingTSOClient() { + re := suite.Require() + var ( + clients []pd.Client + stopSignal atomic.Bool + wg sync.WaitGroup + ) + // Create independent clients to prevent interfering with other tests. + if suite.legacy { + client, err := pd.NewClientWithContext(suite.ctx, suite.getBackendEndpoints(), pd.SecurityOption{}, pd.WithForwardingOption(true)) + re.NoError(err) + clients = []pd.Client{client} + } else { + clients = mcs.WaitForMultiKeyspacesTSOAvailable(suite.ctx, re, suite.keyspaceIDs, suite.getBackendEndpoints()) + } + wg.Add(tsoRequestConcurrencyNumber * len(clients)) + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + for _, client := range clients { + go func(client pd.Client) { + defer wg.Done() + var lastTS uint64 + for !stopSignal.Load() { + physical, logical, err := client.GetTS(suite.ctx) + if err != nil { + re.ErrorContains(err, context.Canceled.Error()) + } else { + ts := tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + } + }(client) + } + } + // Reset the TSO clients while requesting TSO concurrently. + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + for _, client := range clients { + client.(interface{ ResetTSOClient() }).ResetTSOClient() + } + } + stopSignal.Store(true) + wg.Wait() +} + // When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time. func TestMixedTSODeployment(t *testing.T) { re := require.New(t) From bb8ba31c0c0716012ce903893505ed6cead06f33 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 21 Mar 2024 12:57:44 +0800 Subject: [PATCH 02/15] tests/api: reduce TestRegionsWithKillRequest test time (#7953) ref tikv/pd#7930 Signed-off-by: lhy1024 Co-authored-by: disksing --- server/api/region_test.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/server/api/region_test.go b/server/api/region_test.go index 8c0d78abd4a..e10bfbd1af0 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -23,6 +23,7 @@ import ( "net/http" "net/url" "sort" + "sync" "testing" "time" @@ -333,13 +334,28 @@ func TestRegionsWithKillRequest(t *testing.T) { url := fmt.Sprintf("%s%s/api/v1/regions", addr, apiPrefix) mustBootstrapCluster(re, svr) regionCount := 100000 + + // create data + var wg sync.WaitGroup + tasks := make(chan int, regionCount) + for w := 0; w < 16; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := range tasks { + r := core.NewTestRegionInfo(uint64(i+2), 1, + []byte(fmt.Sprintf("%09d", i)), + []byte(fmt.Sprintf("%09d", i+1)), + core.SetApproximateKeys(10), core.SetApproximateSize(10)) + mustRegionHeartbeat(re, svr, r) + } + }() + } for i := 0; i < regionCount; i++ { - r := core.NewTestRegionInfo(uint64(i+2), 1, - []byte(fmt.Sprintf("%09d", i)), - []byte(fmt.Sprintf("%09d", i+1)), - core.SetApproximateKeys(10), core.SetApproximateSize(10)) - mustRegionHeartbeat(re, svr, r) + tasks <- i } + close(tasks) + wg.Wait() ctx, cancel := context.WithCancel(context.Background()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) From 89a80875bd6933d52176a7b0f264ab5324ac12bf Mon Sep 17 00:00:00 2001 From: Hu# Date: Thu, 21 Mar 2024 14:15:43 +0800 Subject: [PATCH 03/15] tests/server: move `TestCheckClusterID` out of `leaderServerTestSuite` (#7950) ref tikv/pd#7930 which can use parallel test Signed-off-by: husharp Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- server/server_test.go | 94 +++++++++++++++++++++---------------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/server/server_test.go b/server/server_test.go index 32f5d0646bc..b2b15962fdc 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -88,7 +88,7 @@ func (suite *leaderServerTestSuite) TearDownSuite() { } } -func (suite *leaderServerTestSuite) newTestServersWithCfgs( +func newTestServersWithCfgs( ctx context.Context, cfgs []*config.Config, re *require.Assertions, @@ -135,52 +135,6 @@ func (suite *leaderServerTestSuite) newTestServersWithCfgs( return svrs, cleanup } -func (suite *leaderServerTestSuite) TestCheckClusterID() { - re := suite.Require() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - cfgs := NewTestMultiConfig(assertutil.CheckerWithNilAssert(re), 2) - for i, cfg := range cfgs { - cfg.DataDir = fmt.Sprintf("/tmp/test_pd_check_clusterID_%d", i) - // Clean up before testing. - testutil.CleanServer(cfg.DataDir) - } - originInitial := cfgs[0].InitialCluster - for _, cfg := range cfgs { - cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, cfg.PeerUrls) - } - - cfgA, cfgB := cfgs[0], cfgs[1] - // Start a standalone cluster. - svrsA, cleanA := suite.newTestServersWithCfgs(ctx, []*config.Config{cfgA}, re) - defer cleanA() - // Close it. - for _, svr := range svrsA { - svr.Close() - } - - // Start another cluster. - _, cleanB := suite.newTestServersWithCfgs(ctx, []*config.Config{cfgB}, re) - defer cleanB() - - // Start previous cluster, expect an error. - cfgA.InitialCluster = originInitial - mockHandler := CreateMockHandler(re, "127.0.0.1") - svr, err := CreateServer(ctx, cfgA, nil, mockHandler) - re.NoError(err) - - etcd, err := embed.StartEtcd(svr.etcdCfg) - re.NoError(err) - urlsMap, err := types.NewURLsMap(svr.cfg.InitialCluster) - re.NoError(err) - tlsConfig, err := svr.cfg.Security.ToTLSConfig() - re.NoError(err) - err = etcdutil.CheckClusterID(etcd.Server.Cluster().ID(), urlsMap, tlsConfig) - re.Error(err) - etcd.Close() - testutil.CleanServer(cfgA.DataDir) -} - func (suite *leaderServerTestSuite) TestRegisterServerHandler() { re := suite.Require() cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re)) @@ -330,3 +284,49 @@ func TestIsPathInDirectory(t *testing.T) { path = filepath.Join(directory, fileName) re.False(isPathInDirectory(path, directory)) } + +func TestCheckClusterID(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cfgs := NewTestMultiConfig(assertutil.CheckerWithNilAssert(re), 2) + for i, cfg := range cfgs { + cfg.DataDir = fmt.Sprintf("/tmp/test_pd_check_clusterID_%d", i) + // Clean up before testing. + testutil.CleanServer(cfg.DataDir) + } + originInitial := cfgs[0].InitialCluster + for _, cfg := range cfgs { + cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, cfg.PeerUrls) + } + + cfgA, cfgB := cfgs[0], cfgs[1] + // Start a standalone cluster. + svrsA, cleanA := newTestServersWithCfgs(ctx, []*config.Config{cfgA}, re) + defer cleanA() + // Close it. + for _, svr := range svrsA { + svr.Close() + } + + // Start another cluster. + _, cleanB := newTestServersWithCfgs(ctx, []*config.Config{cfgB}, re) + defer cleanB() + + // Start previous cluster, expect an error. + cfgA.InitialCluster = originInitial + mockHandler := CreateMockHandler(re, "127.0.0.1") + svr, err := CreateServer(ctx, cfgA, nil, mockHandler) + re.NoError(err) + + etcd, err := embed.StartEtcd(svr.etcdCfg) + re.NoError(err) + urlsMap, err := types.NewURLsMap(svr.cfg.InitialCluster) + re.NoError(err) + tlsConfig, err := svr.cfg.Security.ToTLSConfig() + re.NoError(err) + err = etcdutil.CheckClusterID(etcd.Server.Cluster().ID(), urlsMap, tlsConfig) + re.Error(err) + etcd.Close() + testutil.CleanServer(cfgA.DataDir) +} From 7ec0058913c2c8d4176c7cd92cb34a85f700b422 Mon Sep 17 00:00:00 2001 From: Hu# Date: Thu, 21 Mar 2024 15:02:14 +0800 Subject: [PATCH 04/15] mcs: fix micro-service topo's display (#7956) close tikv/pd#7957 Signed-off-by: husharp --- pkg/mcs/scheduling/server/server.go | 4 +++- pkg/mcs/tso/server/server.go | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 738140612b8..47a7cf9962b 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -20,6 +20,7 @@ import ( "net/http" "os" "os/signal" + "path/filepath" "runtime" "strconv" "sync" @@ -413,7 +414,8 @@ func (s *Server) startServer() (err error) { // different service modes provided by the same pd-server binary bs.ServerInfoGauge.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) bs.ServerMaxProcsGauge.Set(float64(runtime.GOMAXPROCS(0))) - deployPath, err := os.Executable() + execPath, err := os.Executable() + deployPath := filepath.Dir(execPath) if err != nil { deployPath = "" } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index bac93ddfb6b..f5f46a29504 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -20,6 +20,7 @@ import ( "net/http" "os" "os/signal" + "path/filepath" "runtime" "strconv" "sync" @@ -368,7 +369,8 @@ func (s *Server) startServer() (err error) { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID) tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID) - deployPath, err := os.Executable() + execPath, err := os.Executable() + deployPath := filepath.Dir(execPath) if err != nil { deployPath = "" } From 955d30ab97c71b669badc69e09d21dcec2cdbb0e Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 21 Mar 2024 17:20:13 +0800 Subject: [PATCH 05/15] metrics: fix alloc id, current tso and patrol panel (#7961) close tikv/pd#7959 Signed-off-by: lhy1024 --- metrics/grafana/pd.json | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 15a4b0bfc43..30014a959dc 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -1738,7 +1738,7 @@ "tableColumn": "idalloc", "targets": [ { - "expr": "max(pd_cluster_id{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"idalloc\"})", + "expr": "pd_cluster_id{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"idalloc\"}!=0", "format": "time_series", "hide": false, "instant": true, @@ -2284,7 +2284,7 @@ "tableColumn": "tso", "targets": [ { - "expr": "max(pd_cluster_tso{type=\"tso\", dc=\"global\"})", + "expr": "pd_cluster_tso{type=\"tso\", dc=\"global\"}!=0", "format": "time_series", "instant": true, "interval": "", @@ -2588,7 +2588,7 @@ "tableColumn": "tso", "targets": [ { - "expr": "max(pd_cluster_tso{type=\"tso\", dc=\"global\"})", + "expr": "pd_cluster_tso{type=\"tso\", dc=\"global\"}!=0", "format": "time_series", "instant": true, "interval": "", @@ -7895,6 +7895,7 @@ "targets": [ { "expr": "pd_checker_patrol_regions_time{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"} != 0", + "legendFormat": "{{instance}}", "format": "time_series", "intervalFactor": 1, "refId": "A" @@ -8474,14 +8475,14 @@ "refId": "A" }, { - "expr": "rate(pd_schedule_scatter_operators_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"fail\"}[1m]*60)", + "expr": "rate(pd_schedule_scatter_operators_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"fail\"}[1m])*60", "format": "time_series", "intervalFactor": 2, "legendFormat": "fail", "refId": "B" }, { - "expr": "rate(pd_schedule_scatter_operators_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"success\"}[1m]*60)", + "expr": "rate(pd_schedule_scatter_operators_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"success\"}[1m])*60", "format": "time_series", "intervalFactor": 2, "legendFormat": "success", @@ -9296,7 +9297,7 @@ "steppedLine": false, "targets": [ { - "expr": "etcd_mvcc_db_total_size_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=\"pd\"}", + "expr": "etcd_mvcc_db_total_size_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\".*pd.*\"}", "format": "time_series", "hide": false, "intervalFactor": 1, @@ -9304,7 +9305,7 @@ "refId": "A" }, { - "expr": "etcd_mvcc_db_total_size_in_use_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=\"pd\"}", + "expr": "etcd_mvcc_db_total_size_in_use_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\".*pd.*\"}", "format": "time_series", "hide": false, "intervalFactor": 1, From 52e8763377be59a571389018e2186ddbdf64f072 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Fri, 22 Mar 2024 00:23:15 +0800 Subject: [PATCH 06/15] resource_manager: record the max RU per second (#7936) close tikv/pd#7908 resource_manager: record the max RU per second Signed-off-by: nolouch Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/resourcemanager/server/manager.go | 97 ++++++++++++++++++- pkg/mcs/resourcemanager/server/metrics.go | 18 ++++ .../resourcemanager/server/metrics_test.go | 51 ++++++++++ 3 files changed, 164 insertions(+), 2 deletions(-) create mode 100644 pkg/mcs/resourcemanager/server/metrics_test.go diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index ea67bb847ef..ef402b8cbf9 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" @@ -41,7 +42,9 @@ const ( defaultConsumptionChanSize = 1024 metricsCleanupInterval = time.Minute metricsCleanupTimeout = 20 * time.Minute - metricsAvailableRUInterval = 30 * time.Second + metricsAvailableRUInterval = 1 * time.Second + defaultCollectIntervalSec = 20 + tickPerSecond = time.Second reservedDefaultGroupName = "default" middlePriority = 8 @@ -357,6 +360,9 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { defer cleanUpTicker.Stop() availableRUTicker := time.NewTicker(metricsAvailableRUInterval) defer availableRUTicker.Stop() + recordMaxTicker := time.NewTicker(tickPerSecond) + defer recordMaxTicker.Stop() + maxPerSecTrackers := make(map[string]*maxPerSecCostTracker) for { select { case <-ctx.Done(): @@ -386,6 +392,13 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { readRequestCountMetrics = requestCount.WithLabelValues(name, name, readTypeLabel) writeRequestCountMetrics = requestCount.WithLabelValues(name, name, writeTypeLabel) ) + t, ok := maxPerSecTrackers[name] + if !ok { + t = newMaxPerSecCostTracker(name, defaultCollectIntervalSec) + maxPerSecTrackers[name] = t + } + t.CollectConsumption(consumption) + // RU info. if consumption.RRU > 0 { rruMetrics.Add(consumption.RRU) @@ -437,21 +450,101 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel) availableRUCounter.DeleteLabelValues(r.name, r.name, r.ruType) delete(m.consumptionRecord, r) + delete(maxPerSecTrackers, r.name) + readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name) + writeRequestUnitMaxPerSecCost.DeleteLabelValues(r.name) } } case <-availableRUTicker.C: m.RLock() + groups := make([]*ResourceGroup, 0, len(m.groups)) for name, group := range m.groups { if name == reservedDefaultGroupName { continue } + groups = append(groups, group) + } + m.RUnlock() + // prevent many groups and hold the lock long time. + for _, group := range groups { ru := group.getRUToken() if ru < 0 { ru = 0 } - availableRUCounter.WithLabelValues(name, name).Set(ru) + availableRUCounter.WithLabelValues(group.Name, group.Name).Set(ru) + } + + case <-recordMaxTicker.C: + // Record the sum of RRU and WRU every second. + m.RLock() + names := make([]string, 0, len(m.groups)) + for name := range m.groups { + names = append(names, name) } m.RUnlock() + for _, name := range names { + if t, ok := maxPerSecTrackers[name]; !ok { + maxPerSecTrackers[name] = newMaxPerSecCostTracker(name, defaultCollectIntervalSec) + } else { + t.FlushMetrics() + } + } } } } + +type maxPerSecCostTracker struct { + name string + maxPerSecRRU float64 + maxPerSecWRU float64 + rruSum float64 + wruSum float64 + lastRRUSum float64 + lastWRUSum float64 + flushPeriod int + cnt int + rruMaxMetrics prometheus.Gauge + wruMaxMetrics prometheus.Gauge +} + +func newMaxPerSecCostTracker(name string, flushPeriod int) *maxPerSecCostTracker { + return &maxPerSecCostTracker{ + name: name, + flushPeriod: flushPeriod, + rruMaxMetrics: readRequestUnitMaxPerSecCost.WithLabelValues(name), + wruMaxMetrics: writeRequestUnitMaxPerSecCost.WithLabelValues(name), + } +} + +// CollectConsumption collects the consumption info. +func (t *maxPerSecCostTracker) CollectConsumption(consume *rmpb.Consumption) { + t.rruSum += consume.RRU + t.wruSum += consume.WRU +} + +// FlushMetrics and set the maxPerSecRRU and maxPerSecWRU to the metrics. +func (t *maxPerSecCostTracker) FlushMetrics() { + if t.lastRRUSum == 0 && t.lastWRUSum == 0 { + t.lastRRUSum = t.rruSum + t.lastWRUSum = t.wruSum + return + } + deltaRRU := t.rruSum - t.lastRRUSum + deltaWRU := t.wruSum - t.lastWRUSum + t.lastRRUSum = t.rruSum + t.lastWRUSum = t.wruSum + if deltaRRU > t.maxPerSecRRU { + t.maxPerSecRRU = deltaRRU + } + if deltaWRU > t.maxPerSecWRU { + t.maxPerSecWRU = deltaWRU + } + t.cnt++ + // flush to metrics in every flushPeriod. + if t.cnt%t.flushPeriod == 0 { + t.rruMaxMetrics.Set(t.maxPerSecRRU) + t.wruMaxMetrics.Set(t.maxPerSecWRU) + t.maxPerSecRRU = 0 + t.maxPerSecWRU = 0 + } +} diff --git a/pkg/mcs/resourcemanager/server/metrics.go b/pkg/mcs/resourcemanager/server/metrics.go index 6bb90c45d12..45c94e5c735 100644 --- a/pkg/mcs/resourcemanager/server/metrics.go +++ b/pkg/mcs/resourcemanager/server/metrics.go @@ -48,6 +48,22 @@ var ( Name: "write_request_unit_sum", Help: "Counter of the write request unit cost for all resource groups.", }, []string{resourceGroupNameLabel, newResourceGroupNameLabel, typeLabel}) + + readRequestUnitMaxPerSecCost = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "read_request_unit_max_per_sec", + Help: "Gauge of the max read request unit per second for all resource groups.", + }, []string{newResourceGroupNameLabel}) + writeRequestUnitMaxPerSecCost = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "write_request_unit_max_per_sec", + Help: "Gauge of the max write request unit per second for all resource groups.", + }, []string{newResourceGroupNameLabel}) + sqlLayerRequestUnitCost = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -112,4 +128,6 @@ func init() { prometheus.MustRegister(sqlCPUCost) prometheus.MustRegister(requestCount) prometheus.MustRegister(availableRUCounter) + prometheus.MustRegister(readRequestUnitMaxPerSecCost) + prometheus.MustRegister(writeRequestUnitMaxPerSecCost) } diff --git a/pkg/mcs/resourcemanager/server/metrics_test.go b/pkg/mcs/resourcemanager/server/metrics_test.go new file mode 100644 index 00000000000..62d07286eaf --- /dev/null +++ b/pkg/mcs/resourcemanager/server/metrics_test.go @@ -0,0 +1,51 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "testing" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/stretchr/testify/require" +) + +func TestMaxPerSecCostTracker(t *testing.T) { + tracker := newMaxPerSecCostTracker("test", defaultCollectIntervalSec) + re := require.New(t) + + // Define the expected max values for each flushPeriod + expectedMaxRU := []float64{19, 39, 59} + expectedSum := []float64{190, 780, 1770} + + for i := 0; i < 60; i++ { + // Record data + consumption := &rmpb.Consumption{ + RRU: float64(i), + WRU: float64(i), + } + tracker.CollectConsumption(consumption) + tracker.FlushMetrics() + + // Check the max values at the end of each flushPeriod + if (i+1)%20 == 0 { + period := i / 20 + re.Equal(tracker.maxPerSecRRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecRRU in period %d is incorrect", period+1)) + re.Equal(tracker.maxPerSecWRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecWRU in period %d is incorrect", period+1)) + re.Equal(tracker.rruSum, expectedSum[period]) + re.Equal(tracker.rruSum, expectedSum[period]) + } + } +} From c2eac4b8cc341d36ee6e673506a6c1cf080f4d89 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 22 Mar 2024 11:44:13 +0800 Subject: [PATCH 07/15] client/http: fix the panic when merging empty RegionsInfo (#7970) ref tikv/pd#7300 Check if the source and target `*RegionsInfo` are nil before merging. Signed-off-by: JmPotato --- client/http/types.go | 15 +++- client/http/types_test.go | 148 +++++++++++++++++++++++++++++++++----- 2 files changed, 142 insertions(+), 21 deletions(-) diff --git a/client/http/types.go b/client/http/types.go index 0cd9abb3843..91a2463ff64 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -133,11 +133,22 @@ type RegionsInfo struct { Regions []RegionInfo `json:"regions"` } +func newRegionsInfo(count int64) *RegionsInfo { + return &RegionsInfo{ + Count: count, + Regions: make([]RegionInfo, 0, count), + } +} + // Merge merges two RegionsInfo together and returns a new one. func (ri *RegionsInfo) Merge(other *RegionsInfo) *RegionsInfo { - newRegionsInfo := &RegionsInfo{ - Regions: make([]RegionInfo, 0, ri.Count+other.Count), + if ri == nil { + ri = newRegionsInfo(0) + } + if other == nil { + other = newRegionsInfo(0) } + newRegionsInfo := newRegionsInfo(ri.Count + other.Count) m := make(map[int64]RegionInfo, ri.Count+other.Count) for _, region := range ri.Regions { m[region.ID] = region diff --git a/client/http/types_test.go b/client/http/types_test.go index 39c53ae525d..1b8df4f8ed6 100644 --- a/client/http/types_test.go +++ b/client/http/types_test.go @@ -23,30 +23,140 @@ import ( func TestMergeRegionsInfo(t *testing.T) { re := require.New(t) - regionsInfo1 := &RegionsInfo{ - Count: 1, - Regions: []RegionInfo{ - { - ID: 1, - StartKey: "", - EndKey: "a", + testCases := []struct { + source *RegionsInfo + target *RegionsInfo + }{ + // Different regions. + { + source: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + }, + target: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 2, + StartKey: "a", + EndKey: "", + }, + }, }, }, - } - regionsInfo2 := &RegionsInfo{ - Count: 1, - Regions: []RegionInfo{ - { - ID: 2, - StartKey: "a", - EndKey: "", + // Same region. + { + source: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + }, + target: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + }, + }, + { + source: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + }, + target: nil, + }, + { + source: nil, + target: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 2, + StartKey: "a", + EndKey: "", + }, + }, }, }, + { + source: nil, + target: nil, + }, + { + source: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + }, + target: newRegionsInfo(0), + }, + { + source: newRegionsInfo(0), + target: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 2, + StartKey: "a", + EndKey: "", + }, + }, + }, + }, + { + source: newRegionsInfo(0), + target: newRegionsInfo(0), + }, + } + for idx, tc := range testCases { + regionsInfo := tc.source.Merge(tc.target) + if tc.source == nil { + tc.source = newRegionsInfo(0) + } + if tc.target == nil { + tc.target = newRegionsInfo(0) + } + m := make(map[int64]RegionInfo, tc.source.Count+tc.target.Count) + for _, region := range tc.source.Regions { + m[region.ID] = region + } + for _, region := range tc.target.Regions { + m[region.ID] = region + } + mergedCount := len(m) + re.Equal(int64(mergedCount), regionsInfo.Count, "case %d", idx) + re.Len(regionsInfo.Regions, mergedCount, "case %d", idx) + // All regions in source and target should be in the merged result. + for _, region := range append(tc.source.Regions, tc.target.Regions...) { + re.Contains(regionsInfo.Regions, region, "case %d", idx) + } } - regionsInfo := regionsInfo1.Merge(regionsInfo2) - re.Equal(int64(2), regionsInfo.Count) - re.Len(regionsInfo.Regions, 2) - re.Subset(regionsInfo.Regions, append(regionsInfo1.Regions, regionsInfo2.Regions...)) } func TestRuleStartEndKey(t *testing.T) { From fb9e2d561b6efa54a210f5d739952bc45beb3e81 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 22 Mar 2024 13:14:14 +0800 Subject: [PATCH 08/15] client/tso: double-check the contexts to prevent waiting for TSO requests in closed chan (#7962) close tikv/pd#7849 This PR ensures that a `tsoRequest` could be done by double-checking the contexts to prevent waiting for TSO requests in the closed channel. Signed-off-by: JmPotato --- client/client.go | 2 +- client/tso_batch_controller.go | 16 ++++++++++++++-- client/tso_client.go | 11 ++++++++--- client/tso_dispatcher.go | 17 +++++++++++++++++ tests/integrations/tso/client_test.go | 4 +++- 5 files changed, 43 insertions(+), 7 deletions(-) diff --git a/client/client.go b/client/client.go index 8838c184d92..b9535aa504e 100644 --- a/client/client.go +++ b/client/client.go @@ -793,7 +793,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur req := c.getTSORequest(ctx, dcLocation) if err := c.dispatchTSORequestWithRetry(req); err != nil { - req.done <- err + req.tryDone(err) } return req } diff --git a/client/tso_batch_controller.go b/client/tso_batch_controller.go index bd7a440fb08..5f3b08c2895 100644 --- a/client/tso_batch_controller.go +++ b/client/tso_batch_controller.go @@ -19,7 +19,10 @@ import ( "runtime/trace" "time" + "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/tikv/pd/client/tsoutil" + "go.uber.org/zap" ) type tsoBatchController struct { @@ -138,7 +141,7 @@ func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical in tsoReq := tbc.collectedRequests[i] tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End() - tsoReq.done <- err + tsoReq.tryDone(err) } // Prevent the finished requests from being processed again. tbc.collectedRequestCount = 0 @@ -147,6 +150,15 @@ func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical in func (tbc *tsoBatchController) revokePendingRequests(err error) { for i := 0; i < len(tbc.tsoRequestCh); i++ { req := <-tbc.tsoRequestCh - req.done <- err + req.tryDone(err) } } + +func (tbc *tsoBatchController) clear() { + log.Info("[pd] clear the tso batch controller", + zap.Int("max-batch-size", tbc.maxBatchSize), zap.Int("best-batch-size", tbc.bestBatchSize), + zap.Int("collected-request-count", tbc.collectedRequestCount), zap.Int("pending-request-count", len(tbc.tsoRequestCh))) + tsoErr := errors.WithStack(errClosing) + tbc.finishCollectedRequests(0, 0, 0, tsoErr) + tbc.revokePendingRequests(tsoErr) +} diff --git a/client/tso_client.go b/client/tso_client.go index c563df0efdb..5f8b12df36f 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "go.uber.org/zap" @@ -64,6 +63,13 @@ var tsoReqPool = sync.Pool{ }, } +func (req *tsoRequest) tryDone(err error) { + select { + case req.done <- err: + default: + } +} + type tsoClient struct { ctx context.Context cancel context.CancelFunc @@ -140,9 +146,8 @@ func (c *tsoClient) Close() { c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { if dispatcherInterface != nil { dispatcher := dispatcherInterface.(*tsoDispatcher) - tsoErr := errors.WithStack(errClosing) - dispatcher.tsoBatchController.revokePendingRequests(tsoErr) dispatcher.dispatcherCancel() + dispatcher.tsoBatchController.clear() } return true }) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index a625f8dbbe1..88f8ffd61b5 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -95,8 +95,23 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { // tsoClient is closed due to the PD service mode switch, which is retryable. return true, c.ctx.Err() default: + // This failpoint will increase the possibility that the request is sent to a closed dispatcher. + failpoint.Inject("delayDispatchTSORequest", func() { + time.Sleep(time.Second) + }) dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request } + // Check the contexts again to make sure the request is not been sent to a closed dispatcher. + // Never retry on these conditions to prevent unexpected data race. + select { + case <-request.requestCtx.Done(): + return false, request.requestCtx.Err() + case <-request.clientCtx.Done(): + return false, request.clientCtx.Err() + case <-c.ctx.Done(): + return false, c.ctx.Err() + default: + } return false, nil } @@ -368,6 +383,8 @@ func (c *tsoClient) handleDispatcher( cc.(*tsoConnectionContext).cancel() return true }) + // Clear the tso batch controller. + tbc.clear() c.wg.Done() }() // Call updateTSOConnectionCtxs once to init the connectionCtxs first. diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index b0bd6f1d4e5..d4f484087cf 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -425,8 +425,9 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")) } -func (suite *tsoClientTestSuite) TestGetTSWhileRestingTSOClient() { +func (suite *tsoClientTestSuite) TestGetTSWhileResettingTSOClient() { re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/client/delayDispatchTSORequest", "return(true)")) var ( clients []pd.Client stopSignal atomic.Bool @@ -467,6 +468,7 @@ func (suite *tsoClientTestSuite) TestGetTSWhileRestingTSOClient() { } stopSignal.Store(true) wg.Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/client/delayDispatchTSORequest")) } // When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time. From 39108f7be343368ed7487848d81d4528734dc02a Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 25 Mar 2024 15:05:16 +0800 Subject: [PATCH 09/15] tests/etcdutil: reduce `TestRandomKillEtcd` test time (#7947) ref tikv/pd#7930 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/utils/etcdutil/etcdutil_test.go | 28 ++++++-------------------- pkg/utils/testutil/testutil.go | 23 +++++++++++++++++++++ server/api/region_test.go | 31 ++++++++--------------------- 3 files changed, 37 insertions(+), 45 deletions(-) diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 4fb96895942..e02615b695f 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -239,7 +239,7 @@ func TestRandomKillEtcd(t *testing.T) { // Randomly kill an etcd server and restart it cfgs := []embed.Config{etcds[0].Config(), etcds[1].Config(), etcds[2].Config()} - for i := 0; i < 10; i++ { + for i := 0; i < len(cfgs)*2; i++ { killIndex := rand.Intn(len(etcds)) etcds[killIndex].Close() checkEtcdEndpointNum(re, client1, 2) @@ -452,9 +452,9 @@ func (suite *loopWatcherTestSuite) TestLoadWithLimitChange() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/meetEtcdError", `return()`)) cache := make(map[string]struct{}) - for i := 0; i < int(maxLoadBatchSize)*2; i++ { + testutil.GenerateTestDataConcurrently(int(maxLoadBatchSize)*2, func(i int) { suite.put(re, fmt.Sprintf("TestLoadWithLimitChange%d", i), "") - } + }) watcher := NewLoopWatcher( suite.ctx, &suite.wg, @@ -583,25 +583,9 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLargeKey() { count := 65536 ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - - // create data - var wg sync.WaitGroup - tasks := make(chan int, count) - for w := 0; w < 16; w++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := range tasks { - suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "") - } - }() - } - for i := 0; i < count; i++ { - tasks <- i - } - close(tasks) - wg.Wait() - + testutil.GenerateTestDataConcurrently(count, func(i int) { + suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "") + }) cache := make([]string, 0) watcher := NewLoopWatcher( ctx, diff --git a/pkg/utils/testutil/testutil.go b/pkg/utils/testutil/testutil.go index a41fc436ca6..cef952353bc 100644 --- a/pkg/utils/testutil/testutil.go +++ b/pkg/utils/testutil/testutil.go @@ -16,7 +16,9 @@ package testutil import ( "os" + "runtime" "strings" + "sync" "time" "github.com/pingcap/kvproto/pkg/pdpb" @@ -101,3 +103,24 @@ func InitTempFileLogger(level string) (fname string) { log.ReplaceGlobals(lg, p) return fname } + +// GenerateTestDataConcurrently generates test data concurrently. +func GenerateTestDataConcurrently(count int, f func(int)) { + var wg sync.WaitGroup + tasks := make(chan int, count) + workers := runtime.NumCPU() + for w := 0; w < workers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := range tasks { + f(i) + } + }() + } + for i := 0; i < count; i++ { + tasks <- i + } + close(tasks) + wg.Wait() +} diff --git a/server/api/region_test.go b/server/api/region_test.go index e10bfbd1af0..4198cdcb694 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -23,7 +23,6 @@ import ( "net/http" "net/url" "sort" - "sync" "testing" "time" @@ -333,29 +332,15 @@ func TestRegionsWithKillRequest(t *testing.T) { addr := svr.GetAddr() url := fmt.Sprintf("%s%s/api/v1/regions", addr, apiPrefix) mustBootstrapCluster(re, svr) - regionCount := 100000 - // create data - var wg sync.WaitGroup - tasks := make(chan int, regionCount) - for w := 0; w < 16; w++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := range tasks { - r := core.NewTestRegionInfo(uint64(i+2), 1, - []byte(fmt.Sprintf("%09d", i)), - []byte(fmt.Sprintf("%09d", i+1)), - core.SetApproximateKeys(10), core.SetApproximateSize(10)) - mustRegionHeartbeat(re, svr, r) - } - }() - } - for i := 0; i < regionCount; i++ { - tasks <- i - } - close(tasks) - wg.Wait() + regionCount := 100000 + tu.GenerateTestDataConcurrently(regionCount, func(i int) { + r := core.NewTestRegionInfo(uint64(i+2), 1, + []byte(fmt.Sprintf("%09d", i)), + []byte(fmt.Sprintf("%09d", i+1)), + core.SetApproximateKeys(10), core.SetApproximateSize(10)) + mustRegionHeartbeat(re, svr, r) + }) ctx, cancel := context.WithCancel(context.Background()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) From 29a4a2a1fafe6f8185851501d3a9211d572b7f4b Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 26 Mar 2024 14:59:17 +0800 Subject: [PATCH 10/15] metrics: fix dr-auto-sync metics (#7975) close tikv/pd#7974 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- metrics/grafana/pd.json | 12 ++++++------ pkg/replication/replication_mode.go | 5 ++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 30014a959dc..89f2828757f 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -1738,7 +1738,7 @@ "tableColumn": "idalloc", "targets": [ { - "expr": "pd_cluster_id{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"idalloc\"}!=0", + "expr": "max(pd_cluster_id{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"idalloc\"})by(type)", "format": "time_series", "hide": false, "instant": true, @@ -2284,7 +2284,7 @@ "tableColumn": "tso", "targets": [ { - "expr": "pd_cluster_tso{type=\"tso\", dc=\"global\"}!=0", + "expr": "max(pd_cluster_tso{type=\"tso\", dc=\"global\"})by(type)", "format": "time_series", "instant": true, "interval": "", @@ -2588,7 +2588,7 @@ "tableColumn": "tso", "targets": [ { - "expr": "pd_cluster_tso{type=\"tso\", dc=\"global\"}!=0", + "expr": "max(pd_cluster_tso{type=\"tso\", dc=\"global\"})by(type)", "format": "time_series", "instant": true, "interval": "", @@ -13081,7 +13081,7 @@ "id": 1601, "options": { "colorMode": "value", - "graphMode": "area", + "graphMode": "none", "justifyMode": "auto", "orientation": "auto", "reduceOptions": { @@ -13098,7 +13098,7 @@ "targets": [ { "exemplar": true, - "expr": "pd_replication_dr_state{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}", + "expr": "max(pd_replication_dr_state{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"})", "instant": false, "interval": "", "legendFormat": "{{instance}}", @@ -13266,7 +13266,7 @@ "targets": [ { "exemplar": true, - "expr": "rate(pd_replication_dr_tick_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}[5m])", + "expr": "rate(pd_replication_dr_tick_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[5m])", "instant": false, "interval": "", "legendFormat": "{{instance}}", diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 9776a36a8f3..5f6b212529b 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -366,7 +366,10 @@ func (m *ModeManager) Run(ctx context.Context) { }() go func() { - defer wg.Done() + defer func() { + wg.Done() + drStateGauge.Set(0) + }() ticker := time.NewTicker(replicateStateInterval) defer ticker.Stop() for { From 13188c9f72f9ccc4949d163b94d456aada6c3e6a Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 26 Mar 2024 17:29:17 +0800 Subject: [PATCH 11/15] client/test: add some tests where member change (#7972) ref tikv/pd#4399 Signed-off-by: lhy1024 --- client/client_test.go | 6 ++++++ tests/integrations/client/client_test.go | 26 ++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index fda334a9ef2..76cded79053 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -63,6 +63,12 @@ func TestUpdateURLs(t *testing.T) { re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs()) cli.updateURLs(members) re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetServiceURLs()) + cli.updateURLs(members[1:]) + re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs()) + cli.updateURLs(members[2:]) + re.Equal(getURLs([]*pdpb.Member{members[3], members[2]}), cli.GetServiceURLs()) + cli.updateURLs(members[3:]) + re.Equal(getURLs([]*pdpb.Member{members[3]}), cli.GetServiceURLs()) } const testClientURL = "tmp://test.url:5255" diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index e91e549d797..da4be99638d 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -162,11 +162,11 @@ func TestClientLeaderChange(t *testing.T) { re.Equal(endpoints, urls) } -func TestLeaderTransfer(t *testing.T) { +func TestLeaderTransferAndMoveCluster(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2) + cluster, err := tests.NewTestCluster(ctx, 3) re.NoError(err) defer cluster.Destroy() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)")) @@ -222,6 +222,28 @@ func TestLeaderTransfer(t *testing.T) { newLeaderName := cluster.WaitLeader() re.NotEqual(oldLeaderName, newLeaderName) } + + // ABC->ABCDEF + oldServers := cluster.GetServers() + oldLeaderName := cluster.WaitLeader() + for i := 0; i < 3; i++ { + newPD, err := cluster.Join(ctx) + re.NoError(err) + re.NoError(newPD.Run()) + oldLeaderName = cluster.WaitLeader() + time.Sleep(5 * time.Second) + } + + // ABCDEF->DEF + oldNames := make([]string, 0) + for _, s := range oldServers { + oldNames = append(oldNames, s.GetServer().GetMemberInfo().GetName()) + s.Stop() + } + newLeaderName := cluster.WaitLeader() + re.NotEqual(oldLeaderName, newLeaderName) + re.NotContains(oldNames, newLeaderName) + close(quit) wg.Wait() } From 423e36cf84f601ab4bfbf25a1119e56ba807e159 Mon Sep 17 00:00:00 2001 From: Hu# Date: Wed, 27 Mar 2024 09:38:17 +0800 Subject: [PATCH 12/15] tests/discovery: reduce TestRegister cost time (#7946) ref tikv/pd#7930 Signed-off-by: husharp --- pkg/mcs/discovery/register_test.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/mcs/discovery/register_test.go b/pkg/mcs/discovery/register_test.go index 032b0558a79..707c251e5fb 100644 --- a/pkg/mcs/discovery/register_test.go +++ b/pkg/mcs/discovery/register_test.go @@ -16,6 +16,8 @@ package discovery import ( "context" + "os" + "regexp" "testing" "time" @@ -59,10 +61,21 @@ func TestRegister(t *testing.T) { sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1) err = sr.Register() re.NoError(err) + fname := testutil.InitTempFileLogger("info") + defer os.Remove(fname) for i := 0; i < 3; i++ { re.Equal("127.0.0.1:2", getKeyAfterLeaseExpired(re, client, sr.key)) - etcd.Server.HardStop() // close the etcd to make the keepalive failed - time.Sleep(etcdutil.DefaultDialTimeout) // ensure that the request is timeout + etcd.Server.HardStop() // close the etcd to make the keepalive failed + // ensure that the request is timeout + testutil.Eventually(re, func() bool { + content, _ := os.ReadFile(fname) + // check log in function `ServiceRegister.Register` + // ref https://github.com/tikv/pd/blob/6377b26e4e879e7623fbc1d0b7f1be863dea88ad/pkg/mcs/discovery/register.go#L77 + // need to both contain `register.go` and `keep alive failed` + pattern := regexp.MustCompile(`register.go.*keep alive failed`) + matches := pattern.FindAll(content, -1) + return len(matches) >= i+1 + }) etcd.Close() etcd, err = embed.StartEtcd(&cfg) re.NoError(err) From 5c58dddc66feb343e19d7f792852bb5b6184d9dd Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 27 Mar 2024 13:29:17 +0800 Subject: [PATCH 13/15] cmd, server: check the release version before starting the pd-server (#7981) close tikv/pd#7978 Move the release version check before the startup to ensure we can know it as soon as possible. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- cmd/pd-server/main.go | 2 ++ server/grpc_service.go | 2 +- server/server.go | 2 +- server/util.go | 14 +++++++++++--- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index d0f4e458412..0bf28ccfdea 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -221,6 +221,8 @@ func start(cmd *cobra.Command, args []string, services ...string) { exit(0) } + // Check the PD version first before running. + server.CheckAndGetPDVersion() // New zap logger err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) if err == nil { diff --git a/server/grpc_service.go b/server/grpc_service.go index b6cdce4c8b8..823f09b530b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -833,7 +833,7 @@ func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest } log.Info("put store ok", zap.Stringer("store", store)) - CheckPDVersion(s.persistOptions) + CheckPDVersionWithClusterVersion(s.persistOptions) return &pdpb.PutStoreResponse{ Header: s.header(), diff --git a/server/server.go b/server/server.go index 5e8d3dea03b..410de96f63a 100644 --- a/server/server.go +++ b/server/server.go @@ -1800,7 +1800,7 @@ func (s *Server) campaignLeader() { member.ServiceMemberGauge.WithLabelValues(s.mode).Set(0) }) - CheckPDVersion(s.persistOptions) + CheckPDVersionWithClusterVersion(s.persistOptions) log.Info(fmt.Sprintf("%s leader is ready to serve", s.mode), zap.String("leader-name", s.Name())) leaderTicker := time.NewTicker(mcs.LeaderTickInterval) diff --git a/server/util.go b/server/util.go index f88d0146a7f..83455e2a6fe 100644 --- a/server/util.go +++ b/server/util.go @@ -21,6 +21,7 @@ import ( "path/filepath" "strings" + "github.com/coreos/go-semver/semver" "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/pdpb" @@ -33,14 +34,21 @@ import ( "go.uber.org/zap" ) -// CheckPDVersion checks if PD needs to be upgraded. -func CheckPDVersion(opt *config.PersistOptions) { +// CheckAndGetPDVersion checks and returns the PD version. +func CheckAndGetPDVersion() *semver.Version { pdVersion := versioninfo.MinSupportedVersion(versioninfo.Base) if versioninfo.PDReleaseVersion != "None" { pdVersion = versioninfo.MustParseVersion(versioninfo.PDReleaseVersion) } + return pdVersion +} + +// CheckPDVersionWithClusterVersion checks if PD needs to be upgraded by comparing the PD version with the cluster version. +func CheckPDVersionWithClusterVersion(opt *config.PersistOptions) { + pdVersion := CheckAndGetPDVersion() clusterVersion := *opt.GetClusterVersion() - log.Info("load cluster version", zap.Stringer("cluster-version", clusterVersion)) + log.Info("load pd and cluster version", + zap.Stringer("pd-version", pdVersion), zap.Stringer("cluster-version", clusterVersion)) if pdVersion.LessThan(clusterVersion) { log.Warn( "PD version less than cluster version, please upgrade PD", From 5a24dc0c42db26cbff35ce9ecfbc5375a3078ac7 Mon Sep 17 00:00:00 2001 From: Sparkle <1284531+baurine@users.noreply.github.com> Date: Wed, 27 Mar 2024 14:21:47 +0800 Subject: [PATCH 14/15] chore(dashboard): update TiDB Dashboard to v8.0.0-9768844f [master] (#7985) ref tikv/pd#4257 Signed-off-by: baurine <2008.hbl@gmail.com> Co-authored-by: Hu# Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- scripts/dashboard-version | 2 +- tests/integrations/go.mod | 2 +- tests/integrations/go.sum | 4 ++-- tools/go.mod | 2 +- tools/go.sum | 4 ++-- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 733246a1e99..8b403b92075 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 - github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 + github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/common v0.46.0 github.com/sasha-s/go-deadlock v0.2.0 diff --git a/go.sum b/go.sum index 3f618ac0af8..65df1568815 100644 --- a/go.sum +++ b/go.sum @@ -424,8 +424,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 h1:mqWKTL6jkeG/MtxNmUbseSD/QvUtO1RAkr5e9Juy0Vk= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 h1:eFu98FbfJB7PKWOtkaV6YNXXJWqDhczQX56j/iucgU4= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/scripts/dashboard-version b/scripts/dashboard-version index 2bef159c1ae..9b2a3898256 100644 --- a/scripts/dashboard-version +++ b/scripts/dashboard-version @@ -1,3 +1,3 @@ # This file is updated by running scripts/update-dashboard.sh # Don't edit it manullay -8.0.0-ab48e09f +8.0.0-9768844f diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index d689298c314..01294247963 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -126,7 +126,7 @@ require ( github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index 852e8fab5fa..b3b535cd2c9 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -406,8 +406,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 h1:mqWKTL6jkeG/MtxNmUbseSD/QvUtO1RAkr5e9Juy0Vk= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 h1:eFu98FbfJB7PKWOtkaV6YNXXJWqDhczQX56j/iucgU4= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tools/go.mod b/tools/go.mod index 077f9377728..9d48c3f3152 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -126,7 +126,7 @@ require ( github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect diff --git a/tools/go.sum b/tools/go.sum index 6b46e5bfbfb..70e16a4c679 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -402,8 +402,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 h1:mqWKTL6jkeG/MtxNmUbseSD/QvUtO1RAkr5e9Juy0Vk= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 h1:eFu98FbfJB7PKWOtkaV6YNXXJWqDhczQX56j/iucgU4= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= From bc92c13c26bb7abbe8f332cacdb5817a509391c9 Mon Sep 17 00:00:00 2001 From: Hu# Date: Wed, 27 Mar 2024 15:38:17 +0800 Subject: [PATCH 15/15] ttl: Trying to get float64 after get uint64 fails. (#7979) close tikv/pd#7980 Signed-off-by: husharp --- server/config/persist_options.go | 44 ++++++++++++------- tests/integrations/client/http_client_test.go | 15 +++++++ 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/server/config/persist_options.go b/server/config/persist_options.go index e383f519e63..144fa93e724 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -257,27 +257,27 @@ func IsSupportedTTLConfig(key string) bool { // GetMaxSnapshotCount returns the number of the max snapshot which is allowed to send. func (o *PersistOptions) GetMaxSnapshotCount() uint64 { - return o.getTTLUintOr(sc.MaxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount) + return o.getTTLNumberOr(sc.MaxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount) } // GetMaxPendingPeerCount returns the number of the max pending peers. func (o *PersistOptions) GetMaxPendingPeerCount() uint64 { - return o.getTTLUintOr(sc.MaxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount) + return o.getTTLNumberOr(sc.MaxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount) } // GetMaxMergeRegionSize returns the max region size. func (o *PersistOptions) GetMaxMergeRegionSize() uint64 { - return o.getTTLUintOr(sc.MaxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize) + return o.getTTLNumberOr(sc.MaxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize) } // GetMaxMergeRegionKeys returns the max number of keys. // It returns size * 10000 if the key of max-merge-region-Keys doesn't exist. func (o *PersistOptions) GetMaxMergeRegionKeys() uint64 { - keys, exist, err := o.getTTLUint(sc.MaxMergeRegionKeysKey) + keys, exist, err := o.getTTLNumber(sc.MaxMergeRegionKeysKey) if exist && err == nil { return keys } - size, exist, err := o.getTTLUint(sc.MaxMergeRegionSizeKey) + size, exist, err := o.getTTLNumber(sc.MaxMergeRegionSizeKey) if exist && err == nil { return size * 10000 } @@ -419,32 +419,32 @@ func (o *PersistOptions) GetMaxStorePreparingTime() time.Duration { // GetLeaderScheduleLimit returns the limit for leader schedule. func (o *PersistOptions) GetLeaderScheduleLimit() uint64 { - return o.getTTLUintOr(sc.LeaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit) + return o.getTTLNumberOr(sc.LeaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit) } // GetRegionScheduleLimit returns the limit for region schedule. func (o *PersistOptions) GetRegionScheduleLimit() uint64 { - return o.getTTLUintOr(sc.RegionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit) + return o.getTTLNumberOr(sc.RegionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit) } // GetWitnessScheduleLimit returns the limit for region schedule. func (o *PersistOptions) GetWitnessScheduleLimit() uint64 { - return o.getTTLUintOr(sc.WitnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit) + return o.getTTLNumberOr(sc.WitnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit) } // GetReplicaScheduleLimit returns the limit for replica schedule. func (o *PersistOptions) GetReplicaScheduleLimit() uint64 { - return o.getTTLUintOr(sc.ReplicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit) + return o.getTTLNumberOr(sc.ReplicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit) } // GetMergeScheduleLimit returns the limit for merge schedule. func (o *PersistOptions) GetMergeScheduleLimit() uint64 { - return o.getTTLUintOr(sc.MergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit) + return o.getTTLNumberOr(sc.MergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit) } // GetHotRegionScheduleLimit returns the limit for hot region schedule. func (o *PersistOptions) GetHotRegionScheduleLimit() uint64 { - return o.getTTLUintOr(sc.HotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit) + return o.getTTLNumberOr(sc.HotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit) } // GetStoreLimit returns the limit of a store. @@ -547,7 +547,7 @@ func (o *PersistOptions) GetRegionScoreFormulaVersion() string { // GetSchedulerMaxWaitingOperator returns the number of the max waiting operators. func (o *PersistOptions) GetSchedulerMaxWaitingOperator() uint64 { - return o.getTTLUintOr(sc.SchedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator) + return o.getTTLNumberOr(sc.SchedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator) } // GetLeaderSchedulePolicy is to get leader schedule policy. @@ -870,17 +870,29 @@ func (o *PersistOptions) SetTTLData(parCtx context.Context, client *clientv3.Cli return nil } -func (o *PersistOptions) getTTLUint(key string) (uint64, bool, error) { +// getTTLNumber try to parse uint64 from ttl storage first, if failed, try to parse float64 +func (o *PersistOptions) getTTLNumber(key string) (uint64, bool, error) { stringForm, ok := o.GetTTLData(key) if !ok { return 0, false, nil } r, err := strconv.ParseUint(stringForm, 10, 64) - return r, true, err + if err == nil { + return r, true, nil + } + // try to parse float64 + // json unmarshal will convert number(such as `uint64(math.MaxInt32)`) to float64 + f, err := strconv.ParseFloat(stringForm, 64) + if err != nil { + return 0, false, err + } + return uint64(f), true, nil } -func (o *PersistOptions) getTTLUintOr(key string, defaultValue uint64) uint64 { - if v, ok, err := o.getTTLUint(key); ok { +// getTTLNumberOr try to parse uint64 from ttl storage first, if failed, try to parse float64. +// If both failed, return defaultValue. +func (o *PersistOptions) getTTLNumberOr(key string, defaultValue uint64) uint64 { + if v, ok, err := o.getTTLNumber(key); ok { if err == nil { return v } diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index f53174d8089..5945b70bb2c 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -523,6 +523,21 @@ func (suite *httpClientTestSuite) checkConfig(mode mode, client pd.Client) { resp, err = env.cluster.GetEtcdClient().Get(env.ctx, sc.TTLConfigPrefix+"/schedule.leader-schedule-limit") re.NoError(err) re.Empty(resp.Kvs) + + // Test the config with TTL for storing float64 as uint64. + newConfig = map[string]any{ + "schedule.max-pending-peer-count": uint64(math.MaxInt32), + } + err = client.SetConfig(env.ctx, newConfig, 4) + re.NoError(err) + c := env.cluster.GetLeaderServer().GetRaftCluster().GetOpts().GetMaxPendingPeerCount() + re.Equal(uint64(math.MaxInt32), c) + + err = client.SetConfig(env.ctx, newConfig, 0) + re.NoError(err) + resp, err = env.cluster.GetEtcdClient().Get(env.ctx, sc.TTLConfigPrefix+"/schedule.max-pending-peer-count") + re.NoError(err) + re.Empty(resp.Kvs) } func (suite *httpClientTestSuite) TestScheduleConfig() {