diff --git a/client/client.go b/client/client.go index 31481b918e6..b9535aa504e 100644 --- a/client/client.go +++ b/client/client.go @@ -606,12 +606,22 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { log.Info("[pd] changing service mode", zap.String("old-mode", c.serviceMode.String()), zap.String("new-mode", newMode.String())) + c.resetTSOClientLocked(newMode) + oldMode := c.serviceMode + c.serviceMode = newMode + log.Info("[pd] service mode changed", + zap.String("old-mode", oldMode.String()), + zap.String("new-mode", newMode.String())) +} + +// Reset a new TSO client. +func (c *client) resetTSOClientLocked(mode pdpb.ServiceMode) { // Re-create a new TSO client. var ( newTSOCli *tsoClient newTSOSvcDiscovery ServiceDiscovery ) - switch newMode { + switch mode { case pdpb.ServiceMode_PD_SVC_MODE: newTSOCli = newTSOClient(c.ctx, c.option, c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{}) @@ -649,11 +659,6 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { // We are switching from API service mode to PD service mode, so delete the old tso microservice discovery. oldTSOSvcDiscovery.Close() } - oldMode := c.serviceMode - c.serviceMode = newMode - log.Info("[pd] service mode changed", - zap.String("old-mode", oldMode.String()), - zap.String("new-mode", newMode.String())) } func (c *client) getTSOClient() *tsoClient { @@ -662,6 +667,13 @@ func (c *client) getTSOClient() *tsoClient { return c.tsoClient } +// ResetTSOClient resets the TSO client, only for test. +func (c *client) ResetTSOClient() { + c.Lock() + defer c.Unlock() + c.resetTSOClientLocked(c.serviceMode) +} + func (c *client) getServiceMode() pdpb.ServiceMode { c.RLock() defer c.RUnlock() @@ -779,15 +791,22 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur defer span.Finish() } + req := c.getTSORequest(ctx, dcLocation) + if err := c.dispatchTSORequestWithRetry(req); err != nil { + req.tryDone(err) + } + return req +} + +func (c *client) getTSORequest(ctx context.Context, dcLocation string) *tsoRequest { req := tsoReqPool.Get().(*tsoRequest) - req.requestCtx = ctx - req.clientCtx = c.ctx + // Set needed fields in the request before using it. req.start = time.Now() + req.clientCtx = c.ctx + req.requestCtx = ctx + req.physical = 0 + req.logical = 0 req.dcLocation = dcLocation - - if err := c.dispatchTSORequestWithRetry(req); err != nil { - req.done <- err - } return req } diff --git a/client/client_test.go b/client/client_test.go index fda334a9ef2..76cded79053 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -63,6 +63,12 @@ func TestUpdateURLs(t *testing.T) { re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs()) cli.updateURLs(members) re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2], members[0]}), cli.GetServiceURLs()) + cli.updateURLs(members[1:]) + re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs()) + cli.updateURLs(members[2:]) + re.Equal(getURLs([]*pdpb.Member{members[3], members[2]}), cli.GetServiceURLs()) + cli.updateURLs(members[3:]) + re.Equal(getURLs([]*pdpb.Member{members[3]}), cli.GetServiceURLs()) } const testClientURL = "tmp://test.url:5255" diff --git a/client/http/types.go b/client/http/types.go index 0cd9abb3843..91a2463ff64 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -133,11 +133,22 @@ type RegionsInfo struct { Regions []RegionInfo `json:"regions"` } +func newRegionsInfo(count int64) *RegionsInfo { + return &RegionsInfo{ + Count: count, + Regions: make([]RegionInfo, 0, count), + } +} + // Merge merges two RegionsInfo together and returns a new one. func (ri *RegionsInfo) Merge(other *RegionsInfo) *RegionsInfo { - newRegionsInfo := &RegionsInfo{ - Regions: make([]RegionInfo, 0, ri.Count+other.Count), + if ri == nil { + ri = newRegionsInfo(0) + } + if other == nil { + other = newRegionsInfo(0) } + newRegionsInfo := newRegionsInfo(ri.Count + other.Count) m := make(map[int64]RegionInfo, ri.Count+other.Count) for _, region := range ri.Regions { m[region.ID] = region diff --git a/client/http/types_test.go b/client/http/types_test.go index 39c53ae525d..1b8df4f8ed6 100644 --- a/client/http/types_test.go +++ b/client/http/types_test.go @@ -23,30 +23,140 @@ import ( func TestMergeRegionsInfo(t *testing.T) { re := require.New(t) - regionsInfo1 := &RegionsInfo{ - Count: 1, - Regions: []RegionInfo{ - { - ID: 1, - StartKey: "", - EndKey: "a", + testCases := []struct { + source *RegionsInfo + target *RegionsInfo + }{ + // Different regions. + { + source: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + }, + target: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 2, + StartKey: "a", + EndKey: "", + }, + }, }, }, - } - regionsInfo2 := &RegionsInfo{ - Count: 1, - Regions: []RegionInfo{ - { - ID: 2, - StartKey: "a", - EndKey: "", + // Same region. + { + source: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + }, + target: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + }, + }, + { + source: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + }, + target: nil, + }, + { + source: nil, + target: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 2, + StartKey: "a", + EndKey: "", + }, + }, }, }, + { + source: nil, + target: nil, + }, + { + source: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 1, + StartKey: "", + EndKey: "a", + }, + }, + }, + target: newRegionsInfo(0), + }, + { + source: newRegionsInfo(0), + target: &RegionsInfo{ + Count: 1, + Regions: []RegionInfo{ + { + ID: 2, + StartKey: "a", + EndKey: "", + }, + }, + }, + }, + { + source: newRegionsInfo(0), + target: newRegionsInfo(0), + }, + } + for idx, tc := range testCases { + regionsInfo := tc.source.Merge(tc.target) + if tc.source == nil { + tc.source = newRegionsInfo(0) + } + if tc.target == nil { + tc.target = newRegionsInfo(0) + } + m := make(map[int64]RegionInfo, tc.source.Count+tc.target.Count) + for _, region := range tc.source.Regions { + m[region.ID] = region + } + for _, region := range tc.target.Regions { + m[region.ID] = region + } + mergedCount := len(m) + re.Equal(int64(mergedCount), regionsInfo.Count, "case %d", idx) + re.Len(regionsInfo.Regions, mergedCount, "case %d", idx) + // All regions in source and target should be in the merged result. + for _, region := range append(tc.source.Regions, tc.target.Regions...) { + re.Contains(regionsInfo.Regions, region, "case %d", idx) + } } - regionsInfo := regionsInfo1.Merge(regionsInfo2) - re.Equal(int64(2), regionsInfo.Count) - re.Len(regionsInfo.Regions, 2) - re.Subset(regionsInfo.Regions, append(regionsInfo1.Regions, regionsInfo2.Regions...)) } func TestRuleStartEndKey(t *testing.T) { diff --git a/client/tso_batch_controller.go b/client/tso_batch_controller.go index 842c772abd9..5f3b08c2895 100644 --- a/client/tso_batch_controller.go +++ b/client/tso_batch_controller.go @@ -16,7 +16,13 @@ package pd import ( "context" + "runtime/trace" "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/tikv/pd/client/tsoutil" + "go.uber.org/zap" ) type tsoBatchController struct { @@ -130,9 +136,29 @@ func (tbc *tsoBatchController) adjustBestBatchSize() { } } -func (tbc *tsoBatchController) revokePendingRequest(err error) { +func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) { + for i := 0; i < tbc.collectedRequestCount; i++ { + tsoReq := tbc.collectedRequests[i] + tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) + defer trace.StartRegion(tsoReq.requestCtx, "pdclient.tsoReqDequeue").End() + tsoReq.tryDone(err) + } + // Prevent the finished requests from being processed again. + tbc.collectedRequestCount = 0 +} + +func (tbc *tsoBatchController) revokePendingRequests(err error) { for i := 0; i < len(tbc.tsoRequestCh); i++ { req := <-tbc.tsoRequestCh - req.done <- err + req.tryDone(err) } } + +func (tbc *tsoBatchController) clear() { + log.Info("[pd] clear the tso batch controller", + zap.Int("max-batch-size", tbc.maxBatchSize), zap.Int("best-batch-size", tbc.bestBatchSize), + zap.Int("collected-request-count", tbc.collectedRequestCount), zap.Int("pending-request-count", len(tbc.tsoRequestCh))) + tsoErr := errors.WithStack(errClosing) + tbc.finishCollectedRequests(0, 0, 0, tsoErr) + tbc.revokePendingRequests(tsoErr) +} diff --git a/client/tso_client.go b/client/tso_client.go index eeeaf202ce6..5f8b12df36f 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -21,7 +21,6 @@ import ( "sync" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" "go.uber.org/zap" @@ -64,6 +63,13 @@ var tsoReqPool = sync.Pool{ }, } +func (req *tsoRequest) tryDone(err error) { + select { + case req.done <- err: + default: + } +} + type tsoClient struct { ctx context.Context cancel context.CancelFunc @@ -140,9 +146,8 @@ func (c *tsoClient) Close() { c.tsoDispatcher.Range(func(_, dispatcherInterface any) bool { if dispatcherInterface != nil { dispatcher := dispatcherInterface.(*tsoDispatcher) - tsoErr := errors.WithStack(errClosing) - dispatcher.tsoBatchController.revokePendingRequest(tsoErr) dispatcher.dispatcherCancel() + dispatcher.tsoBatchController.clear() } return true }) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 3a6f109bfd4..88f8ffd61b5 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -95,8 +95,23 @@ func (c *tsoClient) dispatchRequest(request *tsoRequest) (bool, error) { // tsoClient is closed due to the PD service mode switch, which is retryable. return true, c.ctx.Err() default: + // This failpoint will increase the possibility that the request is sent to a closed dispatcher. + failpoint.Inject("delayDispatchTSORequest", func() { + time.Sleep(time.Second) + }) dispatcher.(*tsoDispatcher).tsoBatchController.tsoRequestCh <- request } + // Check the contexts again to make sure the request is not been sent to a closed dispatcher. + // Never retry on these conditions to prevent unexpected data race. + select { + case <-request.requestCtx.Done(): + return false, request.requestCtx.Err() + case <-request.clientCtx.Done(): + return false, request.clientCtx.Err() + case <-c.ctx.Done(): + return false, c.ctx.Err() + default: + } return false, nil } @@ -350,7 +365,8 @@ func (c *tsoClient) createTSODispatcher(dcLocation string) { func (c *tsoClient) handleDispatcher( dispatcherCtx context.Context, dc string, - tbc *tsoBatchController) { + tbc *tsoBatchController, +) { var ( err error streamURL string @@ -367,6 +383,8 @@ func (c *tsoClient) handleDispatcher( cc.(*tsoConnectionContext).cancel() return true }) + // Clear the tso batch controller. + tbc.clear() c.wg.Done() }() // Call updateTSOConnectionCtxs once to init the connectionCtxs first. @@ -428,7 +446,11 @@ tsoBatchLoop: } // Start to collect the TSO requests. maxBatchWaitInterval := c.option.getMaxTSOBatchWaitInterval() + // Once the TSO requests are collected, must make sure they could be finished or revoked eventually, + // otherwise the upper caller may get blocked on waiting for the results. if err = tbc.fetchPendingRequests(dispatcherCtx, maxBatchWaitInterval); err != nil { + // Finish the collected requests if the fetch failed. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) if err == context.Canceled { log.Info("[tso] stop fetching the pending tso requests due to context canceled", zap.String("dc-location", dc)) @@ -468,13 +490,16 @@ tsoBatchLoop: timer := time.NewTimer(retryInterval) select { case <-dispatcherCtx.Done(): + // Finish the collected requests if the context is canceled. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) timer.Stop() return case <-streamLoopTimer.C: err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr) log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) c.svcDiscovery.ScheduleCheckMemberChanged() - c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err)) + // Finish the collected requests if the stream is failed to be created. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(err)) timer.Stop() continue tsoBatchLoop case <-timer.C: @@ -504,9 +529,12 @@ tsoBatchLoop: } select { case <-dispatcherCtx.Done(): + // Finish the collected requests if the context is canceled. + tbc.finishCollectedRequests(0, 0, 0, errors.WithStack(dispatcherCtx.Err())) return case tsDeadlineCh.(chan *deadline) <- dl: } + // processRequests guarantees that the collected requests could be finished properly. err = c.processRequests(stream, dc, tbc) close(done) // If error happens during tso stream handling, reset stream and run the next trial. @@ -776,13 +804,14 @@ func (c *tsoClient) processRequests( defer span.Finish() } } + count := int64(len(requests)) reqKeyspaceGroupID := c.svcDiscovery.GetKeyspaceGroupID() respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), reqKeyspaceGroupID, - dcLocation, requests, tbc.batchStartTime) + dcLocation, count, tbc.batchStartTime) if err != nil { - c.finishRequest(requests, 0, 0, 0, err) + tbc.finishCollectedRequests(0, 0, 0, err) return err } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. @@ -796,7 +825,7 @@ func (c *tsoClient) processRequests( logical: tsoutil.AddLogical(firstLogical, count-1, suffixBits), } c.compareAndSwapTS(dcLocation, curTSOInfo, physical, firstLogical) - c.finishRequest(requests, physical, firstLogical, suffixBits, nil) + tbc.finishCollectedRequests(physical, firstLogical, suffixBits, nil) return nil } @@ -843,11 +872,3 @@ func (c *tsoClient) compareAndSwapTS( lastTSOInfo.physical = curTSOInfo.physical lastTSOInfo.logical = curTSOInfo.logical } - -func (c *tsoClient) finishRequest(requests []*tsoRequest, physical, firstLogical int64, suffixBits uint32, err error) { - for i := 0; i < len(requests); i++ { - requests[i].physical, requests[i].logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits) - defer trace.StartRegion(requests[i].requestCtx, "pdclient.tsoReqDequeue").End() - requests[i].done <- err - } -} diff --git a/client/tso_stream.go b/client/tso_stream.go index acefa19d21c..83c0f08d4e0 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -106,7 +106,7 @@ type tsoStream interface { // processRequests processes TSO requests in streaming mode to get timestamps processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - requests []*tsoRequest, batchStartTime time.Time, + count int64, batchStartTime time.Time, ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) } @@ -120,10 +120,9 @@ func (s *pdTSOStream) getServerURL() string { } func (s *pdTSOStream) processRequests( - clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time, + clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time, ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { start := time.Now() - count := int64(len(requests)) req := &pdpb.TsoRequest{ Header: &pdpb.RequestHeader{ ClusterId: clusterID, @@ -175,10 +174,9 @@ func (s *tsoTSOStream) getServerURL() string { func (s *tsoTSOStream) processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - requests []*tsoRequest, batchStartTime time.Time, + count int64, batchStartTime time.Time, ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { start := time.Now() - count := int64(len(requests)) req := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: clusterID, diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index d0f4e458412..0bf28ccfdea 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -221,6 +221,8 @@ func start(cmd *cobra.Command, args []string, services ...string) { exit(0) } + // Check the PD version first before running. + server.CheckAndGetPDVersion() // New zap logger err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) if err == nil { diff --git a/go.mod b/go.mod index 733246a1e99..8b403b92075 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/pingcap/kvproto v0.0.0-20240222024302-881fcbf5bc41 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 - github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 + github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 github.com/prometheus/client_golang v1.18.0 github.com/prometheus/common v0.46.0 github.com/sasha-s/go-deadlock v0.2.0 diff --git a/go.sum b/go.sum index 3f618ac0af8..65df1568815 100644 --- a/go.sum +++ b/go.sum @@ -424,8 +424,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 h1:mqWKTL6jkeG/MtxNmUbseSD/QvUtO1RAkr5e9Juy0Vk= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 h1:eFu98FbfJB7PKWOtkaV6YNXXJWqDhczQX56j/iucgU4= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 15a4b0bfc43..89f2828757f 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -1738,7 +1738,7 @@ "tableColumn": "idalloc", "targets": [ { - "expr": "max(pd_cluster_id{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"idalloc\"})", + "expr": "max(pd_cluster_id{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"idalloc\"})by(type)", "format": "time_series", "hide": false, "instant": true, @@ -2284,7 +2284,7 @@ "tableColumn": "tso", "targets": [ { - "expr": "max(pd_cluster_tso{type=\"tso\", dc=\"global\"})", + "expr": "max(pd_cluster_tso{type=\"tso\", dc=\"global\"})by(type)", "format": "time_series", "instant": true, "interval": "", @@ -2588,7 +2588,7 @@ "tableColumn": "tso", "targets": [ { - "expr": "max(pd_cluster_tso{type=\"tso\", dc=\"global\"})", + "expr": "max(pd_cluster_tso{type=\"tso\", dc=\"global\"})by(type)", "format": "time_series", "instant": true, "interval": "", @@ -7895,6 +7895,7 @@ "targets": [ { "expr": "pd_checker_patrol_regions_time{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"} != 0", + "legendFormat": "{{instance}}", "format": "time_series", "intervalFactor": 1, "refId": "A" @@ -8474,14 +8475,14 @@ "refId": "A" }, { - "expr": "rate(pd_schedule_scatter_operators_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"fail\"}[1m]*60)", + "expr": "rate(pd_schedule_scatter_operators_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"fail\"}[1m])*60", "format": "time_series", "intervalFactor": 2, "legendFormat": "fail", "refId": "B" }, { - "expr": "rate(pd_schedule_scatter_operators_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"success\"}[1m]*60)", + "expr": "rate(pd_schedule_scatter_operators_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"success\"}[1m])*60", "format": "time_series", "intervalFactor": 2, "legendFormat": "success", @@ -9296,7 +9297,7 @@ "steppedLine": false, "targets": [ { - "expr": "etcd_mvcc_db_total_size_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=\"pd\"}", + "expr": "etcd_mvcc_db_total_size_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\".*pd.*\"}", "format": "time_series", "hide": false, "intervalFactor": 1, @@ -9304,7 +9305,7 @@ "refId": "A" }, { - "expr": "etcd_mvcc_db_total_size_in_use_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=\"pd\"}", + "expr": "etcd_mvcc_db_total_size_in_use_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\".*pd.*\"}", "format": "time_series", "hide": false, "intervalFactor": 1, @@ -13080,7 +13081,7 @@ "id": 1601, "options": { "colorMode": "value", - "graphMode": "area", + "graphMode": "none", "justifyMode": "auto", "orientation": "auto", "reduceOptions": { @@ -13097,7 +13098,7 @@ "targets": [ { "exemplar": true, - "expr": "pd_replication_dr_state{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}", + "expr": "max(pd_replication_dr_state{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"})", "instant": false, "interval": "", "legendFormat": "{{instance}}", @@ -13265,7 +13266,7 @@ "targets": [ { "exemplar": true, - "expr": "rate(pd_replication_dr_tick_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=\"$instance\"}[5m])", + "expr": "rate(pd_replication_dr_tick_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[5m])", "instant": false, "interval": "", "legendFormat": "{{instance}}", diff --git a/pkg/mcs/discovery/register_test.go b/pkg/mcs/discovery/register_test.go index 0b7cf105f69..707c251e5fb 100644 --- a/pkg/mcs/discovery/register_test.go +++ b/pkg/mcs/discovery/register_test.go @@ -17,7 +17,7 @@ package discovery import ( "context" "os" - "strings" + "regexp" "testing" "time" @@ -62,15 +62,19 @@ func TestRegister(t *testing.T) { err = sr.Register() re.NoError(err) fname := testutil.InitTempFileLogger("info") + defer os.Remove(fname) for i := 0; i < 3; i++ { re.Equal("127.0.0.1:2", getKeyAfterLeaseExpired(re, client, sr.key)) etcd.Server.HardStop() // close the etcd to make the keepalive failed // ensure that the request is timeout testutil.Eventually(re, func() bool { - b, _ := os.ReadFile(fname) - l := string(b) - count := strings.Count(l, "keep alive failed") - return count >= i+1 + content, _ := os.ReadFile(fname) + // check log in function `ServiceRegister.Register` + // ref https://github.com/tikv/pd/blob/6377b26e4e879e7623fbc1d0b7f1be863dea88ad/pkg/mcs/discovery/register.go#L77 + // need to both contain `register.go` and `keep alive failed` + pattern := regexp.MustCompile(`register.go.*keep alive failed`) + matches := pattern.FindAll(content, -1) + return len(matches) >= i+1 }) etcd.Close() etcd, err = embed.StartEtcd(&cfg) diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index ea67bb847ef..ef402b8cbf9 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" @@ -41,7 +42,9 @@ const ( defaultConsumptionChanSize = 1024 metricsCleanupInterval = time.Minute metricsCleanupTimeout = 20 * time.Minute - metricsAvailableRUInterval = 30 * time.Second + metricsAvailableRUInterval = 1 * time.Second + defaultCollectIntervalSec = 20 + tickPerSecond = time.Second reservedDefaultGroupName = "default" middlePriority = 8 @@ -357,6 +360,9 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { defer cleanUpTicker.Stop() availableRUTicker := time.NewTicker(metricsAvailableRUInterval) defer availableRUTicker.Stop() + recordMaxTicker := time.NewTicker(tickPerSecond) + defer recordMaxTicker.Stop() + maxPerSecTrackers := make(map[string]*maxPerSecCostTracker) for { select { case <-ctx.Done(): @@ -386,6 +392,13 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { readRequestCountMetrics = requestCount.WithLabelValues(name, name, readTypeLabel) writeRequestCountMetrics = requestCount.WithLabelValues(name, name, writeTypeLabel) ) + t, ok := maxPerSecTrackers[name] + if !ok { + t = newMaxPerSecCostTracker(name, defaultCollectIntervalSec) + maxPerSecTrackers[name] = t + } + t.CollectConsumption(consumption) + // RU info. if consumption.RRU > 0 { rruMetrics.Add(consumption.RRU) @@ -437,21 +450,101 @@ func (m *Manager) backgroundMetricsFlush(ctx context.Context) { requestCount.DeleteLabelValues(r.name, r.name, writeTypeLabel) availableRUCounter.DeleteLabelValues(r.name, r.name, r.ruType) delete(m.consumptionRecord, r) + delete(maxPerSecTrackers, r.name) + readRequestUnitMaxPerSecCost.DeleteLabelValues(r.name) + writeRequestUnitMaxPerSecCost.DeleteLabelValues(r.name) } } case <-availableRUTicker.C: m.RLock() + groups := make([]*ResourceGroup, 0, len(m.groups)) for name, group := range m.groups { if name == reservedDefaultGroupName { continue } + groups = append(groups, group) + } + m.RUnlock() + // prevent many groups and hold the lock long time. + for _, group := range groups { ru := group.getRUToken() if ru < 0 { ru = 0 } - availableRUCounter.WithLabelValues(name, name).Set(ru) + availableRUCounter.WithLabelValues(group.Name, group.Name).Set(ru) + } + + case <-recordMaxTicker.C: + // Record the sum of RRU and WRU every second. + m.RLock() + names := make([]string, 0, len(m.groups)) + for name := range m.groups { + names = append(names, name) } m.RUnlock() + for _, name := range names { + if t, ok := maxPerSecTrackers[name]; !ok { + maxPerSecTrackers[name] = newMaxPerSecCostTracker(name, defaultCollectIntervalSec) + } else { + t.FlushMetrics() + } + } } } } + +type maxPerSecCostTracker struct { + name string + maxPerSecRRU float64 + maxPerSecWRU float64 + rruSum float64 + wruSum float64 + lastRRUSum float64 + lastWRUSum float64 + flushPeriod int + cnt int + rruMaxMetrics prometheus.Gauge + wruMaxMetrics prometheus.Gauge +} + +func newMaxPerSecCostTracker(name string, flushPeriod int) *maxPerSecCostTracker { + return &maxPerSecCostTracker{ + name: name, + flushPeriod: flushPeriod, + rruMaxMetrics: readRequestUnitMaxPerSecCost.WithLabelValues(name), + wruMaxMetrics: writeRequestUnitMaxPerSecCost.WithLabelValues(name), + } +} + +// CollectConsumption collects the consumption info. +func (t *maxPerSecCostTracker) CollectConsumption(consume *rmpb.Consumption) { + t.rruSum += consume.RRU + t.wruSum += consume.WRU +} + +// FlushMetrics and set the maxPerSecRRU and maxPerSecWRU to the metrics. +func (t *maxPerSecCostTracker) FlushMetrics() { + if t.lastRRUSum == 0 && t.lastWRUSum == 0 { + t.lastRRUSum = t.rruSum + t.lastWRUSum = t.wruSum + return + } + deltaRRU := t.rruSum - t.lastRRUSum + deltaWRU := t.wruSum - t.lastWRUSum + t.lastRRUSum = t.rruSum + t.lastWRUSum = t.wruSum + if deltaRRU > t.maxPerSecRRU { + t.maxPerSecRRU = deltaRRU + } + if deltaWRU > t.maxPerSecWRU { + t.maxPerSecWRU = deltaWRU + } + t.cnt++ + // flush to metrics in every flushPeriod. + if t.cnt%t.flushPeriod == 0 { + t.rruMaxMetrics.Set(t.maxPerSecRRU) + t.wruMaxMetrics.Set(t.maxPerSecWRU) + t.maxPerSecRRU = 0 + t.maxPerSecWRU = 0 + } +} diff --git a/pkg/mcs/resourcemanager/server/metrics.go b/pkg/mcs/resourcemanager/server/metrics.go index 6bb90c45d12..45c94e5c735 100644 --- a/pkg/mcs/resourcemanager/server/metrics.go +++ b/pkg/mcs/resourcemanager/server/metrics.go @@ -48,6 +48,22 @@ var ( Name: "write_request_unit_sum", Help: "Counter of the write request unit cost for all resource groups.", }, []string{resourceGroupNameLabel, newResourceGroupNameLabel, typeLabel}) + + readRequestUnitMaxPerSecCost = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "read_request_unit_max_per_sec", + Help: "Gauge of the max read request unit per second for all resource groups.", + }, []string{newResourceGroupNameLabel}) + writeRequestUnitMaxPerSecCost = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: ruSubsystem, + Name: "write_request_unit_max_per_sec", + Help: "Gauge of the max write request unit per second for all resource groups.", + }, []string{newResourceGroupNameLabel}) + sqlLayerRequestUnitCost = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -112,4 +128,6 @@ func init() { prometheus.MustRegister(sqlCPUCost) prometheus.MustRegister(requestCount) prometheus.MustRegister(availableRUCounter) + prometheus.MustRegister(readRequestUnitMaxPerSecCost) + prometheus.MustRegister(writeRequestUnitMaxPerSecCost) } diff --git a/pkg/mcs/resourcemanager/server/metrics_test.go b/pkg/mcs/resourcemanager/server/metrics_test.go new file mode 100644 index 00000000000..62d07286eaf --- /dev/null +++ b/pkg/mcs/resourcemanager/server/metrics_test.go @@ -0,0 +1,51 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "fmt" + "testing" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/stretchr/testify/require" +) + +func TestMaxPerSecCostTracker(t *testing.T) { + tracker := newMaxPerSecCostTracker("test", defaultCollectIntervalSec) + re := require.New(t) + + // Define the expected max values for each flushPeriod + expectedMaxRU := []float64{19, 39, 59} + expectedSum := []float64{190, 780, 1770} + + for i := 0; i < 60; i++ { + // Record data + consumption := &rmpb.Consumption{ + RRU: float64(i), + WRU: float64(i), + } + tracker.CollectConsumption(consumption) + tracker.FlushMetrics() + + // Check the max values at the end of each flushPeriod + if (i+1)%20 == 0 { + period := i / 20 + re.Equal(tracker.maxPerSecRRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecRRU in period %d is incorrect", period+1)) + re.Equal(tracker.maxPerSecWRU, expectedMaxRU[period], fmt.Sprintf("maxPerSecWRU in period %d is incorrect", period+1)) + re.Equal(tracker.rruSum, expectedSum[period]) + re.Equal(tracker.rruSum, expectedSum[period]) + } + } +} diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 738140612b8..47a7cf9962b 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -20,6 +20,7 @@ import ( "net/http" "os" "os/signal" + "path/filepath" "runtime" "strconv" "sync" @@ -413,7 +414,8 @@ func (s *Server) startServer() (err error) { // different service modes provided by the same pd-server binary bs.ServerInfoGauge.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) bs.ServerMaxProcsGauge.Set(float64(runtime.GOMAXPROCS(0))) - deployPath, err := os.Executable() + execPath, err := os.Executable() + deployPath := filepath.Dir(execPath) if err != nil { deployPath = "" } diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index bac93ddfb6b..f5f46a29504 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -20,6 +20,7 @@ import ( "net/http" "os" "os/signal" + "path/filepath" "runtime" "strconv" "sync" @@ -368,7 +369,8 @@ func (s *Server) startServer() (err error) { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID) tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID) - deployPath, err := os.Executable() + execPath, err := os.Executable() + deployPath := filepath.Dir(execPath) if err != nil { deployPath = "" } diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 9776a36a8f3..5f6b212529b 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -366,7 +366,10 @@ func (m *ModeManager) Run(ctx context.Context) { }() go func() { - defer wg.Done() + defer func() { + wg.Done() + drStateGauge.Set(0) + }() ticker := time.NewTicker(replicateStateInterval) defer ticker.Stop() for { diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 4fb96895942..e02615b695f 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -239,7 +239,7 @@ func TestRandomKillEtcd(t *testing.T) { // Randomly kill an etcd server and restart it cfgs := []embed.Config{etcds[0].Config(), etcds[1].Config(), etcds[2].Config()} - for i := 0; i < 10; i++ { + for i := 0; i < len(cfgs)*2; i++ { killIndex := rand.Intn(len(etcds)) etcds[killIndex].Close() checkEtcdEndpointNum(re, client1, 2) @@ -452,9 +452,9 @@ func (suite *loopWatcherTestSuite) TestLoadWithLimitChange() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/meetEtcdError", `return()`)) cache := make(map[string]struct{}) - for i := 0; i < int(maxLoadBatchSize)*2; i++ { + testutil.GenerateTestDataConcurrently(int(maxLoadBatchSize)*2, func(i int) { suite.put(re, fmt.Sprintf("TestLoadWithLimitChange%d", i), "") - } + }) watcher := NewLoopWatcher( suite.ctx, &suite.wg, @@ -583,25 +583,9 @@ func (suite *loopWatcherTestSuite) TestWatcherLoadLargeKey() { count := 65536 ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - - // create data - var wg sync.WaitGroup - tasks := make(chan int, count) - for w := 0; w < 16; w++ { - wg.Add(1) - go func() { - defer wg.Done() - for i := range tasks { - suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "") - } - }() - } - for i := 0; i < count; i++ { - tasks <- i - } - close(tasks) - wg.Wait() - + testutil.GenerateTestDataConcurrently(count, func(i int) { + suite.put(re, fmt.Sprintf("TestWatcherLoadLargeKey/test-%d", i), "") + }) cache := make([]string, 0) watcher := NewLoopWatcher( ctx, diff --git a/pkg/utils/testutil/testutil.go b/pkg/utils/testutil/testutil.go index a41fc436ca6..cef952353bc 100644 --- a/pkg/utils/testutil/testutil.go +++ b/pkg/utils/testutil/testutil.go @@ -16,7 +16,9 @@ package testutil import ( "os" + "runtime" "strings" + "sync" "time" "github.com/pingcap/kvproto/pkg/pdpb" @@ -101,3 +103,24 @@ func InitTempFileLogger(level string) (fname string) { log.ReplaceGlobals(lg, p) return fname } + +// GenerateTestDataConcurrently generates test data concurrently. +func GenerateTestDataConcurrently(count int, f func(int)) { + var wg sync.WaitGroup + tasks := make(chan int, count) + workers := runtime.NumCPU() + for w := 0; w < workers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := range tasks { + f(i) + } + }() + } + for i := 0; i < count; i++ { + tasks <- i + } + close(tasks) + wg.Wait() +} diff --git a/scripts/dashboard-version b/scripts/dashboard-version index 2bef159c1ae..9b2a3898256 100644 --- a/scripts/dashboard-version +++ b/scripts/dashboard-version @@ -1,3 +1,3 @@ # This file is updated by running scripts/update-dashboard.sh # Don't edit it manullay -8.0.0-ab48e09f +8.0.0-9768844f diff --git a/server/api/region_test.go b/server/api/region_test.go index 8c0d78abd4a..4198cdcb694 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -332,14 +332,15 @@ func TestRegionsWithKillRequest(t *testing.T) { addr := svr.GetAddr() url := fmt.Sprintf("%s%s/api/v1/regions", addr, apiPrefix) mustBootstrapCluster(re, svr) + regionCount := 100000 - for i := 0; i < regionCount; i++ { + tu.GenerateTestDataConcurrently(regionCount, func(i int) { r := core.NewTestRegionInfo(uint64(i+2), 1, []byte(fmt.Sprintf("%09d", i)), []byte(fmt.Sprintf("%09d", i+1)), core.SetApproximateKeys(10), core.SetApproximateSize(10)) mustRegionHeartbeat(re, svr, r) - } + }) ctx, cancel := context.WithCancel(context.Background()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) diff --git a/server/config/persist_options.go b/server/config/persist_options.go index e383f519e63..144fa93e724 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -257,27 +257,27 @@ func IsSupportedTTLConfig(key string) bool { // GetMaxSnapshotCount returns the number of the max snapshot which is allowed to send. func (o *PersistOptions) GetMaxSnapshotCount() uint64 { - return o.getTTLUintOr(sc.MaxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount) + return o.getTTLNumberOr(sc.MaxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount) } // GetMaxPendingPeerCount returns the number of the max pending peers. func (o *PersistOptions) GetMaxPendingPeerCount() uint64 { - return o.getTTLUintOr(sc.MaxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount) + return o.getTTLNumberOr(sc.MaxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount) } // GetMaxMergeRegionSize returns the max region size. func (o *PersistOptions) GetMaxMergeRegionSize() uint64 { - return o.getTTLUintOr(sc.MaxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize) + return o.getTTLNumberOr(sc.MaxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize) } // GetMaxMergeRegionKeys returns the max number of keys. // It returns size * 10000 if the key of max-merge-region-Keys doesn't exist. func (o *PersistOptions) GetMaxMergeRegionKeys() uint64 { - keys, exist, err := o.getTTLUint(sc.MaxMergeRegionKeysKey) + keys, exist, err := o.getTTLNumber(sc.MaxMergeRegionKeysKey) if exist && err == nil { return keys } - size, exist, err := o.getTTLUint(sc.MaxMergeRegionSizeKey) + size, exist, err := o.getTTLNumber(sc.MaxMergeRegionSizeKey) if exist && err == nil { return size * 10000 } @@ -419,32 +419,32 @@ func (o *PersistOptions) GetMaxStorePreparingTime() time.Duration { // GetLeaderScheduleLimit returns the limit for leader schedule. func (o *PersistOptions) GetLeaderScheduleLimit() uint64 { - return o.getTTLUintOr(sc.LeaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit) + return o.getTTLNumberOr(sc.LeaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit) } // GetRegionScheduleLimit returns the limit for region schedule. func (o *PersistOptions) GetRegionScheduleLimit() uint64 { - return o.getTTLUintOr(sc.RegionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit) + return o.getTTLNumberOr(sc.RegionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit) } // GetWitnessScheduleLimit returns the limit for region schedule. func (o *PersistOptions) GetWitnessScheduleLimit() uint64 { - return o.getTTLUintOr(sc.WitnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit) + return o.getTTLNumberOr(sc.WitnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit) } // GetReplicaScheduleLimit returns the limit for replica schedule. func (o *PersistOptions) GetReplicaScheduleLimit() uint64 { - return o.getTTLUintOr(sc.ReplicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit) + return o.getTTLNumberOr(sc.ReplicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit) } // GetMergeScheduleLimit returns the limit for merge schedule. func (o *PersistOptions) GetMergeScheduleLimit() uint64 { - return o.getTTLUintOr(sc.MergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit) + return o.getTTLNumberOr(sc.MergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit) } // GetHotRegionScheduleLimit returns the limit for hot region schedule. func (o *PersistOptions) GetHotRegionScheduleLimit() uint64 { - return o.getTTLUintOr(sc.HotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit) + return o.getTTLNumberOr(sc.HotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit) } // GetStoreLimit returns the limit of a store. @@ -547,7 +547,7 @@ func (o *PersistOptions) GetRegionScoreFormulaVersion() string { // GetSchedulerMaxWaitingOperator returns the number of the max waiting operators. func (o *PersistOptions) GetSchedulerMaxWaitingOperator() uint64 { - return o.getTTLUintOr(sc.SchedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator) + return o.getTTLNumberOr(sc.SchedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator) } // GetLeaderSchedulePolicy is to get leader schedule policy. @@ -870,17 +870,29 @@ func (o *PersistOptions) SetTTLData(parCtx context.Context, client *clientv3.Cli return nil } -func (o *PersistOptions) getTTLUint(key string) (uint64, bool, error) { +// getTTLNumber try to parse uint64 from ttl storage first, if failed, try to parse float64 +func (o *PersistOptions) getTTLNumber(key string) (uint64, bool, error) { stringForm, ok := o.GetTTLData(key) if !ok { return 0, false, nil } r, err := strconv.ParseUint(stringForm, 10, 64) - return r, true, err + if err == nil { + return r, true, nil + } + // try to parse float64 + // json unmarshal will convert number(such as `uint64(math.MaxInt32)`) to float64 + f, err := strconv.ParseFloat(stringForm, 64) + if err != nil { + return 0, false, err + } + return uint64(f), true, nil } -func (o *PersistOptions) getTTLUintOr(key string, defaultValue uint64) uint64 { - if v, ok, err := o.getTTLUint(key); ok { +// getTTLNumberOr try to parse uint64 from ttl storage first, if failed, try to parse float64. +// If both failed, return defaultValue. +func (o *PersistOptions) getTTLNumberOr(key string, defaultValue uint64) uint64 { + if v, ok, err := o.getTTLNumber(key); ok { if err == nil { return v } diff --git a/server/grpc_service.go b/server/grpc_service.go index b6cdce4c8b8..823f09b530b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -833,7 +833,7 @@ func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest } log.Info("put store ok", zap.Stringer("store", store)) - CheckPDVersion(s.persistOptions) + CheckPDVersionWithClusterVersion(s.persistOptions) return &pdpb.PutStoreResponse{ Header: s.header(), diff --git a/server/server.go b/server/server.go index 5e8d3dea03b..410de96f63a 100644 --- a/server/server.go +++ b/server/server.go @@ -1800,7 +1800,7 @@ func (s *Server) campaignLeader() { member.ServiceMemberGauge.WithLabelValues(s.mode).Set(0) }) - CheckPDVersion(s.persistOptions) + CheckPDVersionWithClusterVersion(s.persistOptions) log.Info(fmt.Sprintf("%s leader is ready to serve", s.mode), zap.String("leader-name", s.Name())) leaderTicker := time.NewTicker(mcs.LeaderTickInterval) diff --git a/server/util.go b/server/util.go index f88d0146a7f..83455e2a6fe 100644 --- a/server/util.go +++ b/server/util.go @@ -21,6 +21,7 @@ import ( "path/filepath" "strings" + "github.com/coreos/go-semver/semver" "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/pdpb" @@ -33,14 +34,21 @@ import ( "go.uber.org/zap" ) -// CheckPDVersion checks if PD needs to be upgraded. -func CheckPDVersion(opt *config.PersistOptions) { +// CheckAndGetPDVersion checks and returns the PD version. +func CheckAndGetPDVersion() *semver.Version { pdVersion := versioninfo.MinSupportedVersion(versioninfo.Base) if versioninfo.PDReleaseVersion != "None" { pdVersion = versioninfo.MustParseVersion(versioninfo.PDReleaseVersion) } + return pdVersion +} + +// CheckPDVersionWithClusterVersion checks if PD needs to be upgraded by comparing the PD version with the cluster version. +func CheckPDVersionWithClusterVersion(opt *config.PersistOptions) { + pdVersion := CheckAndGetPDVersion() clusterVersion := *opt.GetClusterVersion() - log.Info("load cluster version", zap.Stringer("cluster-version", clusterVersion)) + log.Info("load pd and cluster version", + zap.Stringer("pd-version", pdVersion), zap.Stringer("cluster-version", clusterVersion)) if pdVersion.LessThan(clusterVersion) { log.Warn( "PD version less than cluster version, please upgrade PD", diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index e91e549d797..da4be99638d 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -162,11 +162,11 @@ func TestClientLeaderChange(t *testing.T) { re.Equal(endpoints, urls) } -func TestLeaderTransfer(t *testing.T) { +func TestLeaderTransferAndMoveCluster(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestCluster(ctx, 2) + cluster, err := tests.NewTestCluster(ctx, 3) re.NoError(err) defer cluster.Destroy() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)")) @@ -222,6 +222,28 @@ func TestLeaderTransfer(t *testing.T) { newLeaderName := cluster.WaitLeader() re.NotEqual(oldLeaderName, newLeaderName) } + + // ABC->ABCDEF + oldServers := cluster.GetServers() + oldLeaderName := cluster.WaitLeader() + for i := 0; i < 3; i++ { + newPD, err := cluster.Join(ctx) + re.NoError(err) + re.NoError(newPD.Run()) + oldLeaderName = cluster.WaitLeader() + time.Sleep(5 * time.Second) + } + + // ABCDEF->DEF + oldNames := make([]string, 0) + for _, s := range oldServers { + oldNames = append(oldNames, s.GetServer().GetMemberInfo().GetName()) + s.Stop() + } + newLeaderName := cluster.WaitLeader() + re.NotEqual(oldLeaderName, newLeaderName) + re.NotContains(oldNames, newLeaderName) + close(quit) wg.Wait() } diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index f53174d8089..5945b70bb2c 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -523,6 +523,21 @@ func (suite *httpClientTestSuite) checkConfig(mode mode, client pd.Client) { resp, err = env.cluster.GetEtcdClient().Get(env.ctx, sc.TTLConfigPrefix+"/schedule.leader-schedule-limit") re.NoError(err) re.Empty(resp.Kvs) + + // Test the config with TTL for storing float64 as uint64. + newConfig = map[string]any{ + "schedule.max-pending-peer-count": uint64(math.MaxInt32), + } + err = client.SetConfig(env.ctx, newConfig, 4) + re.NoError(err) + c := env.cluster.GetLeaderServer().GetRaftCluster().GetOpts().GetMaxPendingPeerCount() + re.Equal(uint64(math.MaxInt32), c) + + err = client.SetConfig(env.ctx, newConfig, 0) + re.NoError(err) + resp, err = env.cluster.GetEtcdClient().Get(env.ctx, sc.TTLConfigPrefix+"/schedule.max-pending-peer-count") + re.NoError(err) + re.Empty(resp.Kvs) } func (suite *httpClientTestSuite) TestScheduleConfig() { diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index d689298c314..01294247963 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -126,7 +126,7 @@ require ( github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index 852e8fab5fa..b3b535cd2c9 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -406,8 +406,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 h1:mqWKTL6jkeG/MtxNmUbseSD/QvUtO1RAkr5e9Juy0Vk= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 h1:eFu98FbfJB7PKWOtkaV6YNXXJWqDhczQX56j/iucgU4= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 3d7b099f342..d4f484087cf 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -21,6 +21,7 @@ import ( "math/rand" "strings" "sync" + "sync/atomic" "testing" "time" @@ -66,6 +67,10 @@ type tsoClientTestSuite struct { clients []pd.Client } +func (suite *tsoClientTestSuite) getBackendEndpoints() []string { + return strings.Split(suite.backendEndpoints, ",") +} + func TestLegacyTSOClient(t *testing.T) { suite.Run(t, &tsoClientTestSuite{ legacy: true, @@ -98,7 +103,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.keyspaceIDs = make([]uint32, 0) if suite.legacy { - client, err := pd.NewClientWithContext(suite.ctx, strings.Split(suite.backendEndpoints, ","), pd.SecurityOption{}, pd.WithForwardingOption(true)) + client, err := pd.NewClientWithContext(suite.ctx, suite.getBackendEndpoints(), pd.SecurityOption{}, pd.WithForwardingOption(true)) re.NoError(err) innerClient, ok := client.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) @@ -173,7 +178,7 @@ func (suite *tsoClientTestSuite) waitForAllKeyspaceGroupsInServing(re *require.A // Create clients and make sure they all have discovered the tso service. suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( - suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) + suite.ctx, re, suite.keyspaceIDs, suite.getBackendEndpoints()) re.Equal(len(suite.keyspaceIDs), len(suite.clients)) } @@ -254,7 +259,7 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { ctx, cancel := context.WithCancel(suite.ctx) defer cancel() client := mcs.SetupClientWithKeyspaceID( - ctx, re, keyspaceID, strings.Split(suite.backendEndpoints, ",")) + ctx, re, keyspaceID, suite.getBackendEndpoints()) defer client.Close() var lastTS uint64 for j := 0; j < tsoRequestRound; j++ { @@ -420,6 +425,52 @@ func (suite *tsoClientTestSuite) TestRandomShutdown() { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastUpdatePhysicalInterval")) } +func (suite *tsoClientTestSuite) TestGetTSWhileResettingTSOClient() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/client/delayDispatchTSORequest", "return(true)")) + var ( + clients []pd.Client + stopSignal atomic.Bool + wg sync.WaitGroup + ) + // Create independent clients to prevent interfering with other tests. + if suite.legacy { + client, err := pd.NewClientWithContext(suite.ctx, suite.getBackendEndpoints(), pd.SecurityOption{}, pd.WithForwardingOption(true)) + re.NoError(err) + clients = []pd.Client{client} + } else { + clients = mcs.WaitForMultiKeyspacesTSOAvailable(suite.ctx, re, suite.keyspaceIDs, suite.getBackendEndpoints()) + } + wg.Add(tsoRequestConcurrencyNumber * len(clients)) + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + for _, client := range clients { + go func(client pd.Client) { + defer wg.Done() + var lastTS uint64 + for !stopSignal.Load() { + physical, logical, err := client.GetTS(suite.ctx) + if err != nil { + re.ErrorContains(err, context.Canceled.Error()) + } else { + ts := tsoutil.ComposeTS(physical, logical) + re.Less(lastTS, ts) + lastTS = ts + } + } + }(client) + } + } + // Reset the TSO clients while requesting TSO concurrently. + for i := 0; i < tsoRequestConcurrencyNumber; i++ { + for _, client := range clients { + client.(interface{ ResetTSOClient() }).ResetTSOClient() + } + } + stopSignal.Store(true) + wg.Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/client/delayDispatchTSORequest")) +} + // When we upgrade the PD cluster, there may be a period of time that the old and new PDs are running at the same time. func TestMixedTSODeployment(t *testing.T) { re := require.New(t) diff --git a/tools/go.mod b/tools/go.mod index 5caecbf7bec..6ff30b2baf4 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -127,7 +127,7 @@ require ( github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d // indirect github.com/pingcap/errcode v0.3.0 // indirect github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 // indirect - github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 // indirect + github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 // indirect github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect diff --git a/tools/go.sum b/tools/go.sum index 9432114ee97..39968d9ad2e 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -402,8 +402,8 @@ github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 h1:QV6jqlfOkh8hqvEAgwBZa+4bSgO0EeKC7s5c6Luam2I= github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21/go.mod h1:QYnjfA95ZaMefyl1NO8oPtKeb8pYUdnDVhQgf+qdpjM= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762 h1:mqWKTL6jkeG/MtxNmUbseSD/QvUtO1RAkr5e9Juy0Vk= -github.com/pingcap/tidb-dashboard v0.0.0-20240315083732-ab48e09f7762/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7 h1:eFu98FbfJB7PKWOtkaV6YNXXJWqDhczQX56j/iucgU4= +github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7/go.mod h1:ucZBRz52icb23T/5Z4CsuUHmarYiin7p2MeiVBe+o8c= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e h1:FBaTXU8C3xgt/drM58VHxojHo/QoG1oPsgWTGvaSpO4= github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=